sorhwphuo / app.py
HonestAnnie's picture
Create app.py
71b065c verified
raw
history blame
26.8 kB
import gradio as gr
import chromadb
import google.generativeai as genai
import os
from dotenv import load_dotenv
import logging
import functools
from collections import defaultdict
# --- Configuration ---
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# Load environment variables (for API Key)
load_dotenv()
API_KEY = os.getenv("GEMINI_API_KEY")
if not API_KEY:
logging.error("GEMINI_API_KEY not found in environment variables.")
else:
try:
genai.configure(api_key=API_KEY)
logging.info("Gemini API configured successfully.")
except Exception as e:
logging.error(f"Error configuring Gemini API: {e}")
# Chroma DB Configuration
CHROMA_DB_PATH = "./chroma"
COLLECTION_NAME = "phil_de"
# Gemini Embedding Model Configuration
# Make sure this matches the model used to create the DB (expecting 3072 dims based on past errors)
EMBEDDING_MODEL = "models/gemini-embedding-exp-03-07"
logging.info(f"Using embedding model: {EMBEDDING_MODEL}")
# --- Constants ---
MAX_RESULTS = 20
# --- ChromaDB Connection and Author Fetching ---
collection = None
unique_authors = []
try:
client = chromadb.PersistentClient(path=CHROMA_DB_PATH)
collection = client.get_collection(name=COLLECTION_NAME)
logging.info(f"Successfully connected to ChromaDB collection '{COLLECTION_NAME}'. Collection count: {collection.count()}")
logging.info("Fetching all metadata to extract unique authors...")
all_metadata = collection.get(include=['metadatas'])
if all_metadata and 'metadatas' in all_metadata and all_metadata['metadatas']:
authors_set = set()
for meta in all_metadata['metadatas']:
if meta and 'author' in meta and meta['author']:
authors_set.add(meta['author'])
unique_authors = sorted(list(authors_set))
logging.info(f"Found {len(unique_authors)} unique authors.")
else:
logging.warning("Could not retrieve metadata or no metadata found to extract authors.")
except Exception as e:
logging.critical(f"FATAL: Could not connect to Chroma DB or fetch authors: {e}", exc_info=True)
unique_authors = []
# --- Embedding Function ---
def get_embedding(text, task="RETRIEVAL_QUERY"):
if not API_KEY:
logging.error("Cannot generate embedding: API key not configured.")
return None
if not text:
logging.warning("Embedding requested for empty text.")
return None
try:
logging.info(f"Generating embedding for task: {task}")
result = genai.embed_content(
model=EMBEDDING_MODEL,
content=text,
task_type=task
)
logging.info("Embedding generated successfully.")
return result['embedding']
except Exception as e:
logging.error(f"Error generating Gemini embedding: {e}", exc_info=True)
if "model" in str(e).lower() and ("not found" in str(e).lower() or "permission" in str(e).lower()):
logging.error(f"The configured embedding model '{EMBEDDING_MODEL}' might be incorrect, unavailable, or lack permissions.")
elif "dimension" in str(e).lower():
logging.error(f"Potential dimension mismatch issue with model '{EMBEDDING_MODEL}'.")
return None
# --- Helper: Format Single Result (for top display area) ---
def format_single_result(result_data, index, total_results):
"""Formats the data for a single result into Markdown for the top preview area."""
if not result_data:
return "No result data available."
metadata = result_data.get('metadata', {})
doc = result_data.get('document', "N/A")
distance = result_data.get('distance', float('inf'))
author = metadata.get('author', 'N/A')
book = metadata.get('book', 'N/A')
section = metadata.get('section', 'N/A')
md_content = ""
md_content += f"* **Author:** {author}\n"
md_content += f"* **Book:** {book}\n"
if section not in ['Unknown', 'N/A', None]:
md_content += f"* **Section:** {section}\n"
md_content += f"* **Distance:** {distance:.4f}\n\n"
md_content += f"> {doc}\n\n"
return md_content
# --- Helper: Format Reading Passage (Deprecated - formatting now done in format_context_markdown) ---
# def format_reading_passage(passage_data): # No longer needed as separate function
# ...
# --- Context Formatting Helper ---
def format_context_markdown(passages):
"""
Formats a list of passage dictionaries into a seamless Markdown string
for the reading area, *without* a header.
"""
if not passages:
return ""
valid_passages = [p for p in passages if p and p.get('id') is not None]
valid_passages.sort(key=lambda p: int(p.get('id', -1)))
if not valid_passages:
return ""
# Combine Passage Texts
full_text = ""
for i, passage in enumerate(valid_passages):
doc = passage.get('doc', '_Passage text missing_')
role = passage.get('role', 'context') # Includes 'current_reading', 'prev', 'next'
if role == 'missing':
continue # Skip placeholders like "Beginning/End of document"
full_text += doc
# Add separator if not the last passage and next isn't missing
if i < len(valid_passages) - 1:
if valid_passages[i+1].get('role') != 'missing':
full_text += "\n\n"
return full_text
# --- Search Function (Complete) ---
def search_philosophical_texts(query, selected_authors):
"""
Performs search, stores all results in state, displays the first result.
Returns updates for multiple components and state variables.
"""
# Initialize updates dictionary with default states
updates = {
full_search_results_state: [],
current_result_index_state: 0,
single_result_group: gr.Group(visible=False),
result_index_indicator_md: gr.Markdown(""),
single_result_display_md: gr.Markdown(""),
previous_result_button: gr.Button(visible=False),
next_result_button: gr.Button(visible=False),
weiterlesen_button: gr.Button(visible=False), # Default to hidden
context_display: gr.Markdown(""),
displayed_context_passages: [],
load_previous_button: gr.Button(visible=False),
load_next_button: gr.Button(visible=False),
}
# --- Pre-computation Checks ---
if collection is None:
logging.error("Search attempted but ChromaDB collection is not available.")
updates[single_result_display_md] = gr.Markdown("Error: Database connection failed.")
updates[single_result_group] = gr.Group(visible=True) # Show group to display error
return updates
if not query:
logging.warning("Empty query received.")
updates[single_result_display_md] = gr.Markdown("Please enter a query.")
updates[single_result_group] = gr.Group(visible=True) # Show group to display message
return updates
logging.info(f"Received query: '{query[:50]}...'")
logging.info(f"Selected Authors for filtering: {selected_authors}")
# --- Embedding ---
query_embedding = get_embedding(query, task="RETRIEVAL_QUERY")
if query_embedding is None:
logging.error("Failed to generate query embedding.")
updates[single_result_display_md] = gr.Markdown("Error: Failed to generate query embedding.")
updates[single_result_group] = gr.Group(visible=True)
return updates
# --- Filtering ---
where_filter = None
if selected_authors:
where_filter = {"author": {"$in": selected_authors}}
logging.info(f"Applying WHERE filter: {where_filter}")
# --- Query Execution and Result Processing ---
try:
logging.info(f"Querying collection '{COLLECTION_NAME}' for top {MAX_RESULTS} results.")
# --->>> ACTUAL QUERY CALL <<<---
results = collection.query(
query_embeddings=[query_embedding],
n_results=MAX_RESULTS,
where=where_filter,
include=['documents', 'metadatas', 'distances'] # IDs are included by default
)
# --->>> END QUERY CALL <<<---
# Process results if found
all_results_data = []
if results and results.get('ids') and results['ids'][0]:
num_found = len(results['ids'][0])
logging.info(f"Query successful. Found {num_found} results.")
ids_list = results['ids'][0]
docs_list = results['documents'][0]
metadatas_list = results['metadatas'][0]
distances_list = results['distances'][0]
# --->>> ACTUAL RESULT PROCESSING LOOP <<<---
for i in range(num_found):
# Validate ID conversion (just in case)
try:
_ = int(ids_list[i]) # Check if convertible
except ValueError:
logging.warning(f"Skipping result with non-integer ID: {ids_list[i]}")
continue
all_results_data.append({
"id": ids_list[i],
"document": docs_list[i],
"metadata": metadatas_list[i],
"distance": distances_list[i]
})
# --->>> END RESULT PROCESSING LOOP <<<---
if all_results_data:
# Results found and processed successfully
updates[full_search_results_state] = all_results_data
updates[current_result_index_state] = 0
first_result_md = format_single_result(all_results_data[0], 0, len(all_results_data))
updates[single_result_display_md] = gr.Markdown(first_result_md)
updates[single_result_group] = gr.Group(visible=True) # Show group
updates[result_index_indicator_md] = gr.Markdown(f"Result **1** of **{len(all_results_data)}**")
updates[previous_result_button] = gr.Button(visible=True, interactive=False)
updates[next_result_button] = gr.Button(visible=True, interactive=(len(all_results_data) > 1))
updates[weiterlesen_button] = gr.Button(visible=True) # Show this button
else:
# Query returned results, but none were valid after processing
logging.info("No valid results found after filtering/validation.")
updates[single_result_display_md] = gr.Markdown("No results found matching your query and filters.")
updates[single_result_group] = gr.Group(visible=True) # Show message
updates[weiterlesen_button] = gr.Button(visible=False) # Hide button
else:
# Query returned no results
logging.info("No results found for the query (or matching the filter).")
updates[single_result_display_md] = gr.Markdown("No results found matching your query and filters.")
updates[single_result_group] = gr.Group(visible=True) # Show message
updates[weiterlesen_button] = gr.Button(visible=False) # Hide button
return updates
# --->>> ACTUAL EXCEPTION HANDLING <<<---
except Exception as e:
logging.error(f"Error querying ChromaDB or processing results: {e}", exc_info=True)
# Define error_msg based on the exception
if "dimension" in str(e).lower():
error_msg = "**Error:** Database search failed due to embedding mismatch. Please check configuration."
else:
# Display the actual error message type from the exception
error_msg = f"**Error:** An unexpected error occurred during search. See logs for details. ({type(e).__name__})"
# Update the UI to show the error message
updates[single_result_display_md] = gr.Markdown(error_msg)
updates[single_result_group] = gr.Group(visible=True) # Show the group to display the error
# Reset state on error
updates[full_search_results_state] = []
updates[current_result_index_state] = 0
updates[weiterlesen_button] = gr.Button(visible=False)
updates[previous_result_button] = gr.Button(visible=False)
updates[next_result_button] = gr.Button(visible=False)
updates[result_index_indicator_md] = gr.Markdown("")
updates[context_display] = gr.Markdown("")
updates[displayed_context_passages] = []
updates[load_previous_button] = gr.Button(visible=False)
updates[load_next_button] = gr.Button(visible=False)
return updates
# --->>> END EXCEPTION HANDLING <<<---
# --- Result Navigation Function ---
def navigate_results(direction, current_index, full_results):
"""Handles moving between search results in the top display area."""
updates = {}
if not full_results:
logging.warning("Navigate called with no results in state.")
return { current_result_index_state: 0 }
total_results = len(full_results)
new_index = current_index
if direction == 'previous':
new_index = max(0, current_index - 1)
elif direction == 'next':
new_index = min(total_results - 1, current_index + 1)
# Only update display if the index actually changed
if new_index != current_index:
logging.info(f"Navigating from result index {current_index} to {new_index}")
result_data = full_results[new_index]
result_md = format_single_result(result_data, new_index, total_results)
updates[single_result_display_md] = gr.Markdown(result_md)
updates[current_result_index_state] = new_index
updates[result_index_indicator_md] = gr.Markdown(f"Result **{new_index + 1}** of **{total_results}**")
updates[context_display] = gr.Markdown("") # Clear reading area
updates[displayed_context_passages] = []
updates[load_previous_button] = gr.Button(visible=False)
updates[load_next_button] = gr.Button(visible=False)
updates[weiterlesen_button] = gr.Button(visible=True) # Make visible again
# Update navigation button interactivity based on the *new* index
updates[previous_result_button] = gr.Button(interactive=(new_index > 0))
updates[next_result_button] = gr.Button(interactive=(new_index < total_results - 1))
# If index didn't change, ensure button states are still returned correctly
if new_index == current_index:
# Ensure weiterlesen visibility is returned if index didn't change
# (it should already be visible unless user clicked at boundary where it was hidden)
# Let's explicitly set it visible for safety upon any nav click if results exist
if total_results > 0:
updates[weiterlesen_button] = gr.Button(visible=True)
return updates
# --- Fetch Single Passage Helper ---
def fetch_passage_data(passage_id_int):
"""Fetches a single passage dictionary from ChromaDB by its integer ID."""
if collection is None or passage_id_int < 0:
return None
try:
passage_id_str = str(passage_id_int)
result = collection.get(ids=[passage_id_str], include=['documents', 'metadatas'])
if result and result.get('ids') and result['ids']:
return {
'id': result['ids'][0],
'doc': result['documents'][0] if result.get('documents') else "N/A",
'meta': result['metadatas'][0] if result.get('metadatas') else {},
}
else:
logging.info(f"Passage ID {passage_id_str} not found in collection.")
return None
except Exception as e:
logging.error(f"Error fetching passage ID {passage_id_int} from ChromaDB: {e}", exc_info=True)
return None
# --- Move Passage to Reading Area ---
def move_to_reading_area(current_index, full_results):
"""
Moves the selected result passage's text to the reading area below,
hides the 'weiterlesen' button, and enables context loading buttons.
Keeps the metadata preview in the top area.
"""
updates = {
# Keep top preview area unchanged
# Prepare context/reading area
context_display: gr.Markdown("_Loading reading passage..._"),
displayed_context_passages: [],
load_previous_button: gr.Button(visible=False),
load_next_button: gr.Button(visible=False),
weiterlesen_button: gr.Button(visible=False) # Hide this button
}
if not full_results or current_index < 0 or current_index >= len(full_results):
logging.warning(f"Attempted to move passage with invalid state or index. Index: {current_index}, Results Count: {len(full_results)}")
updates[context_display] = gr.Markdown("Error: Could not load passage reference.")
updates[weiterlesen_button] = gr.Button(visible=False)
return updates
try:
target_result_data = full_results[current_index]
reading_passage_state_data = {
'id': target_result_data.get('id'),
'doc': target_result_data.get('document'),
'meta': target_result_data.get('metadata'),
'role': 'current_reading'
}
if not reading_passage_state_data['id'] or not reading_passage_state_data['doc']:
logging.error(f"Cannot move passage: Missing ID or document in result at index {current_index}.")
updates[context_display] = gr.Markdown("Error: Selected passage data is incomplete.")
updates[weiterlesen_button] = gr.Button(visible=False)
return updates
formatted_passage_md = format_context_markdown([reading_passage_state_data])
updates[context_display] = gr.Markdown(formatted_passage_md)
updates[displayed_context_passages] = [reading_passage_state_data]
updates[load_previous_button] = gr.Button(visible=True)
updates[load_next_button] = gr.Button(visible=True)
logging.info(f"Moved passage ID {reading_passage_state_data['id']} to reading area.")
return updates
except Exception as e:
logging.error(f"Error moving passage for result index {current_index}: {e}", exc_info=True)
updates[context_display] = gr.Markdown(f"Error moving passage to reading area: {e}")
updates[weiterlesen_button] = gr.Button(visible=False)
return updates
# --- Load More Context Function ---
def load_more_context(direction, current_passages_state):
"""
Loads one more passage either before or after the passages in the reading/context area.
Updates the Markdown display and the context state list.
"""
if collection is None:
return "Error: Database connection failed.", current_passages_state
if not current_passages_state:
logging.warning("Load more context called with empty state.")
return "_No reading passage loaded yet._", []
current_passages_state.sort(key=lambda p: int(p.get('id', -1)))
updated_passages = list(current_passages_state)
try:
if direction == 'previous':
earliest_id_str = updated_passages[0].get('id')
if earliest_id_str is None: return format_context_markdown(updated_passages), updated_passages
earliest_id_int = int(earliest_id_str)
id_to_fetch = earliest_id_int - 1
if id_to_fetch < 0:
if not (updated_passages[0].get('role') == 'missing' and updated_passages[0].get('id') == '-1'):
if updated_passages[0].get('role') == 'missing': updated_passages.pop(0)
updated_passages.insert(0, {'id': '-1', 'role': 'missing', 'doc': '_(Beginning of document reached)_'})
else:
new_passage_data = fetch_passage_data(id_to_fetch)
if new_passage_data:
new_passage_data['role'] = 'prev'
if updated_passages[0].get('role') == 'missing' and updated_passages[0].get('id') == str(id_to_fetch + 1):
updated_passages.pop(0)
updated_passages.insert(0, new_passage_data)
else:
if not (updated_passages[0].get('role') == 'missing' and updated_passages[0].get('id') == str(id_to_fetch)):
if updated_passages[0].get('role') == 'missing': updated_passages.pop(0)
updated_passages.insert(0, {'id': str(id_to_fetch), 'role': 'missing', 'doc': '_(Beginning of document reached)_'})
elif direction == 'next':
latest_id_str = updated_passages[-1].get('id')
if latest_id_str is None: return format_context_markdown(updated_passages), updated_passages
latest_id_int = int(latest_id_str)
id_to_fetch = latest_id_int + 1
new_passage_data = fetch_passage_data(id_to_fetch)
if new_passage_data:
new_passage_data['role'] = 'next'
if updated_passages[-1].get('role') == 'missing' and updated_passages[-1].get('id') == str(id_to_fetch -1):
updated_passages.pop(-1)
updated_passages.append(new_passage_data)
else:
if not (updated_passages[-1].get('role') == 'missing' and updated_passages[-1].get('id') == str(id_to_fetch)):
if updated_passages[-1].get('role') == 'missing': updated_passages.pop(-1)
updated_passages.append({'id': str(id_to_fetch), 'role': 'missing', 'doc': '_(End of document reached)_'})
context_md = format_context_markdown(updated_passages)
return context_md, updated_passages
except ValueError:
logging.error(f"Error converting passage ID to integer in load_more_context. State: {current_passages_state}", exc_info=True)
error_message = format_context_markdown(current_passages_state) + "\n\n**Error processing context expansion.**"
return error_message, current_passages_state
except Exception as e:
logging.error(f"Error loading more context (direction: {direction}): {e}", exc_info=True)
error_message = format_context_markdown(current_passages_state) + f"\n\n**Error loading passage: {e}**"
return error_message, current_passages_state
# --- Gradio UI Definition ---
with gr.Blocks(theme=gr.themes.Default()) as demo:
gr.Markdown("# Philosophical Text Search & Context Explorer")
# --- State Variables ---
full_search_results_state = gr.State([])
current_result_index_state = gr.State(0)
displayed_context_passages = gr.State([])
# --- Search Input Row ---
with gr.Row():
query_input = gr.Textbox(label="Enter query", placeholder="z. B. 'Was ist der Unterschied zwischen Herstellen und Handeln?'", lines=2, scale=3)
author_dropdown = gr.Dropdown(
label="Filter by Author(s) (Optional)",
choices=unique_authors,
multiselect=True,
scale=2
)
search_button = gr.Button("Search", variant="primary", scale=1)
# --- Result Navigation Row (MOVED HERE) ---
with gr.Row():
previous_result_button = gr.Button("⬅️", visible=False)
next_result_button = gr.Button("➡️", visible=False)
gr.Markdown("---") # Separator after search and navigation
# --- Single Result Display Area ---
# Contains the preview text and the "weiterlesen" button
with gr.Column(visible=True) as results_area:
with gr.Group(visible=False) as single_result_group:
result_index_indicator_md = gr.Markdown("Result 0 of 0")
single_result_display_md = gr.Markdown("...") # Shows the preview
# "weiterlesen" button remains at the end of the preview group
weiterlesen_button = gr.Button("weiterlesen", variant="secondary", visible=True)
gr.Markdown("---") # Separator before reading area
# --- Context / Reading Area ---
with gr.Column(visible=True) as context_area:
load_previous_button = gr.Button("⬆️", variant="secondary", visible=False)
context_display = gr.Markdown(label="Reading Area")
load_next_button = gr.Button("⬇️", variant="secondary", visible=False)
# --- Event Handlers (Wiring remains the same) ---
# Search Button Action
search_outputs = [
full_search_results_state, current_result_index_state, single_result_group,
result_index_indicator_md, single_result_display_md, previous_result_button,
next_result_button, weiterlesen_button, context_display,
displayed_context_passages, load_previous_button, load_next_button,
]
search_button.click(
fn=search_philosophical_texts,
inputs=[query_input, author_dropdown],
outputs=search_outputs
)
# Previous/Next Result Button Actions
nav_outputs = [ # Combined list for prev/next
single_result_display_md, current_result_index_state, result_index_indicator_md,
previous_result_button, next_result_button, weiterlesen_button,
context_display, displayed_context_passages,
load_previous_button, load_next_button,
]
previous_result_button.click(
fn=navigate_results,
inputs=[gr.State('previous'), current_result_index_state, full_search_results_state],
outputs=nav_outputs
)
next_result_button.click(
fn=navigate_results,
inputs=[gr.State('next'), current_result_index_state, full_search_results_state],
outputs=nav_outputs
)
# "weiterlesen" Button Action
weiterlesen_outputs = [
context_display, displayed_context_passages,
load_previous_button, load_next_button,
weiterlesen_button # Target button itself to control visibility
]
weiterlesen_button.click(
fn=move_to_reading_area,
inputs=[current_result_index_state, full_search_results_state],
outputs=weiterlesen_outputs
)
# Load More Context Buttons
load_previous_button.click(
fn=load_more_context,
inputs=[gr.State('previous'), displayed_context_passages],
outputs=[context_display, displayed_context_passages]
)
load_next_button.click(
fn=load_more_context,
inputs=[gr.State('next'), displayed_context_passages],
outputs=[context_display, displayed_context_passages]
)
# --- Launch the Application ---
if __name__ == "__main__":
if collection is None:
print("\n--- ERROR: ChromaDB collection failed to load. UI might not function correctly. Check logs. ---\n")
elif not unique_authors:
print("\n--- WARNING: No unique authors found in DB metadata. Author filter will be empty. ---\n")
print("Launching Gradio Interface...")
# Make sure debug=True is helpful during testing
demo.launch(server_name="0.0.0.0", share=False, debug=True)