Spaces:
Running
Running
import gradio as gr | |
import chromadb | |
import google.generativeai as genai | |
import os | |
from dotenv import load_dotenv | |
import logging | |
import functools | |
from collections import defaultdict | |
# --- Configuration --- | |
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
# Load environment variables (for API Key) | |
load_dotenv() | |
API_KEY = os.getenv("GEMINI_API_KEY") | |
if not API_KEY: | |
logging.error("GEMINI_API_KEY not found in environment variables.") | |
else: | |
try: | |
genai.configure(api_key=API_KEY) | |
logging.info("Gemini API configured successfully.") | |
except Exception as e: | |
logging.error(f"Error configuring Gemini API: {e}") | |
# Chroma DB Configuration | |
CHROMA_DB_PATH = "./chroma" | |
COLLECTION_NAME = "phil_de" | |
# Gemini Embedding Model Configuration | |
# Make sure this matches the model used to create the DB (expecting 3072 dims based on past errors) | |
EMBEDDING_MODEL = "models/gemini-embedding-exp-03-07" | |
logging.info(f"Using embedding model: {EMBEDDING_MODEL}") | |
# --- Constants --- | |
MAX_RESULTS = 20 | |
# --- ChromaDB Connection and Author Fetching --- | |
collection = None | |
unique_authors = [] | |
try: | |
client = chromadb.PersistentClient(path=CHROMA_DB_PATH) | |
collection = client.get_collection(name=COLLECTION_NAME) | |
logging.info(f"Successfully connected to ChromaDB collection '{COLLECTION_NAME}'. Collection count: {collection.count()}") | |
logging.info("Fetching all metadata to extract unique authors...") | |
all_metadata = collection.get(include=['metadatas']) | |
if all_metadata and 'metadatas' in all_metadata and all_metadata['metadatas']: | |
authors_set = set() | |
for meta in all_metadata['metadatas']: | |
if meta and 'author' in meta and meta['author']: | |
authors_set.add(meta['author']) | |
unique_authors = sorted(list(authors_set)) | |
logging.info(f"Found {len(unique_authors)} unique authors.") | |
else: | |
logging.warning("Could not retrieve metadata or no metadata found to extract authors.") | |
except Exception as e: | |
logging.critical(f"FATAL: Could not connect to Chroma DB or fetch authors: {e}", exc_info=True) | |
unique_authors = [] | |
# --- Embedding Function --- | |
def get_embedding(text, task="RETRIEVAL_QUERY"): | |
if not API_KEY: | |
logging.error("Cannot generate embedding: API key not configured.") | |
return None | |
if not text: | |
logging.warning("Embedding requested for empty text.") | |
return None | |
try: | |
logging.info(f"Generating embedding for task: {task}") | |
result = genai.embed_content( | |
model=EMBEDDING_MODEL, | |
content=text, | |
task_type=task | |
) | |
logging.info("Embedding generated successfully.") | |
return result['embedding'] | |
except Exception as e: | |
logging.error(f"Error generating Gemini embedding: {e}", exc_info=True) | |
if "model" in str(e).lower() and ("not found" in str(e).lower() or "permission" in str(e).lower()): | |
logging.error(f"The configured embedding model '{EMBEDDING_MODEL}' might be incorrect, unavailable, or lack permissions.") | |
elif "dimension" in str(e).lower(): | |
logging.error(f"Potential dimension mismatch issue with model '{EMBEDDING_MODEL}'.") | |
return None | |
# --- Helper: Format Single Result (for top display area) --- | |
def format_single_result(result_data, index, total_results): | |
"""Formats the data for a single result into Markdown for the top preview area.""" | |
if not result_data: | |
return "No result data available." | |
metadata = result_data.get('metadata', {}) | |
doc = result_data.get('document', "N/A") | |
distance = result_data.get('distance', float('inf')) | |
author = metadata.get('author', 'N/A') | |
book = metadata.get('book', 'N/A') | |
section = metadata.get('section', 'N/A') | |
md_content = "" | |
md_content += f"* **Author:** {author}\n" | |
md_content += f"* **Book:** {book}\n" | |
if section not in ['Unknown', 'N/A', None]: | |
md_content += f"* **Section:** {section}\n" | |
md_content += f"* **Distance:** {distance:.4f}\n\n" | |
md_content += f"> {doc}\n\n" | |
return md_content | |
# --- Helper: Format Reading Passage (Deprecated - formatting now done in format_context_markdown) --- | |
# def format_reading_passage(passage_data): # No longer needed as separate function | |
# ... | |
# --- Context Formatting Helper --- | |
def format_context_markdown(passages): | |
""" | |
Formats a list of passage dictionaries into a seamless Markdown string | |
for the reading area, *without* a header. | |
""" | |
if not passages: | |
return "" | |
valid_passages = [p for p in passages if p and p.get('id') is not None] | |
valid_passages.sort(key=lambda p: int(p.get('id', -1))) | |
if not valid_passages: | |
return "" | |
# Combine Passage Texts | |
full_text = "" | |
for i, passage in enumerate(valid_passages): | |
doc = passage.get('doc', '_Passage text missing_') | |
role = passage.get('role', 'context') # Includes 'current_reading', 'prev', 'next' | |
if role == 'missing': | |
continue # Skip placeholders like "Beginning/End of document" | |
full_text += doc | |
# Add separator if not the last passage and next isn't missing | |
if i < len(valid_passages) - 1: | |
if valid_passages[i+1].get('role') != 'missing': | |
full_text += "\n\n" | |
return full_text | |
# --- Search Function (Complete) --- | |
def search_philosophical_texts(query, selected_authors): | |
""" | |
Performs search, stores all results in state, displays the first result. | |
Returns updates for multiple components and state variables. | |
""" | |
# Initialize updates dictionary with default states | |
updates = { | |
full_search_results_state: [], | |
current_result_index_state: 0, | |
single_result_group: gr.Group(visible=False), | |
result_index_indicator_md: gr.Markdown(""), | |
single_result_display_md: gr.Markdown(""), | |
previous_result_button: gr.Button(visible=False), | |
next_result_button: gr.Button(visible=False), | |
weiterlesen_button: gr.Button(visible=False), # Default to hidden | |
context_display: gr.Markdown(""), | |
displayed_context_passages: [], | |
load_previous_button: gr.Button(visible=False), | |
load_next_button: gr.Button(visible=False), | |
} | |
# --- Pre-computation Checks --- | |
if collection is None: | |
logging.error("Search attempted but ChromaDB collection is not available.") | |
updates[single_result_display_md] = gr.Markdown("Error: Database connection failed.") | |
updates[single_result_group] = gr.Group(visible=True) # Show group to display error | |
return updates | |
if not query: | |
logging.warning("Empty query received.") | |
updates[single_result_display_md] = gr.Markdown("Please enter a query.") | |
updates[single_result_group] = gr.Group(visible=True) # Show group to display message | |
return updates | |
logging.info(f"Received query: '{query[:50]}...'") | |
logging.info(f"Selected Authors for filtering: {selected_authors}") | |
# --- Embedding --- | |
query_embedding = get_embedding(query, task="RETRIEVAL_QUERY") | |
if query_embedding is None: | |
logging.error("Failed to generate query embedding.") | |
updates[single_result_display_md] = gr.Markdown("Error: Failed to generate query embedding.") | |
updates[single_result_group] = gr.Group(visible=True) | |
return updates | |
# --- Filtering --- | |
where_filter = None | |
if selected_authors: | |
where_filter = {"author": {"$in": selected_authors}} | |
logging.info(f"Applying WHERE filter: {where_filter}") | |
# --- Query Execution and Result Processing --- | |
try: | |
logging.info(f"Querying collection '{COLLECTION_NAME}' for top {MAX_RESULTS} results.") | |
# --->>> ACTUAL QUERY CALL <<<--- | |
results = collection.query( | |
query_embeddings=[query_embedding], | |
n_results=MAX_RESULTS, | |
where=where_filter, | |
include=['documents', 'metadatas', 'distances'] # IDs are included by default | |
) | |
# --->>> END QUERY CALL <<<--- | |
# Process results if found | |
all_results_data = [] | |
if results and results.get('ids') and results['ids'][0]: | |
num_found = len(results['ids'][0]) | |
logging.info(f"Query successful. Found {num_found} results.") | |
ids_list = results['ids'][0] | |
docs_list = results['documents'][0] | |
metadatas_list = results['metadatas'][0] | |
distances_list = results['distances'][0] | |
# --->>> ACTUAL RESULT PROCESSING LOOP <<<--- | |
for i in range(num_found): | |
# Validate ID conversion (just in case) | |
try: | |
_ = int(ids_list[i]) # Check if convertible | |
except ValueError: | |
logging.warning(f"Skipping result with non-integer ID: {ids_list[i]}") | |
continue | |
all_results_data.append({ | |
"id": ids_list[i], | |
"document": docs_list[i], | |
"metadata": metadatas_list[i], | |
"distance": distances_list[i] | |
}) | |
# --->>> END RESULT PROCESSING LOOP <<<--- | |
if all_results_data: | |
# Results found and processed successfully | |
updates[full_search_results_state] = all_results_data | |
updates[current_result_index_state] = 0 | |
first_result_md = format_single_result(all_results_data[0], 0, len(all_results_data)) | |
updates[single_result_display_md] = gr.Markdown(first_result_md) | |
updates[single_result_group] = gr.Group(visible=True) # Show group | |
updates[result_index_indicator_md] = gr.Markdown(f"Result **1** of **{len(all_results_data)}**") | |
updates[previous_result_button] = gr.Button(visible=True, interactive=False) | |
updates[next_result_button] = gr.Button(visible=True, interactive=(len(all_results_data) > 1)) | |
updates[weiterlesen_button] = gr.Button(visible=True) # Show this button | |
else: | |
# Query returned results, but none were valid after processing | |
logging.info("No valid results found after filtering/validation.") | |
updates[single_result_display_md] = gr.Markdown("No results found matching your query and filters.") | |
updates[single_result_group] = gr.Group(visible=True) # Show message | |
updates[weiterlesen_button] = gr.Button(visible=False) # Hide button | |
else: | |
# Query returned no results | |
logging.info("No results found for the query (or matching the filter).") | |
updates[single_result_display_md] = gr.Markdown("No results found matching your query and filters.") | |
updates[single_result_group] = gr.Group(visible=True) # Show message | |
updates[weiterlesen_button] = gr.Button(visible=False) # Hide button | |
return updates | |
# --->>> ACTUAL EXCEPTION HANDLING <<<--- | |
except Exception as e: | |
logging.error(f"Error querying ChromaDB or processing results: {e}", exc_info=True) | |
# Define error_msg based on the exception | |
if "dimension" in str(e).lower(): | |
error_msg = "**Error:** Database search failed due to embedding mismatch. Please check configuration." | |
else: | |
# Display the actual error message type from the exception | |
error_msg = f"**Error:** An unexpected error occurred during search. See logs for details. ({type(e).__name__})" | |
# Update the UI to show the error message | |
updates[single_result_display_md] = gr.Markdown(error_msg) | |
updates[single_result_group] = gr.Group(visible=True) # Show the group to display the error | |
# Reset state on error | |
updates[full_search_results_state] = [] | |
updates[current_result_index_state] = 0 | |
updates[weiterlesen_button] = gr.Button(visible=False) | |
updates[previous_result_button] = gr.Button(visible=False) | |
updates[next_result_button] = gr.Button(visible=False) | |
updates[result_index_indicator_md] = gr.Markdown("") | |
updates[context_display] = gr.Markdown("") | |
updates[displayed_context_passages] = [] | |
updates[load_previous_button] = gr.Button(visible=False) | |
updates[load_next_button] = gr.Button(visible=False) | |
return updates | |
# --->>> END EXCEPTION HANDLING <<<--- | |
# --- Result Navigation Function --- | |
def navigate_results(direction, current_index, full_results): | |
"""Handles moving between search results in the top display area.""" | |
updates = {} | |
if not full_results: | |
logging.warning("Navigate called with no results in state.") | |
return { current_result_index_state: 0 } | |
total_results = len(full_results) | |
new_index = current_index | |
if direction == 'previous': | |
new_index = max(0, current_index - 1) | |
elif direction == 'next': | |
new_index = min(total_results - 1, current_index + 1) | |
# Only update display if the index actually changed | |
if new_index != current_index: | |
logging.info(f"Navigating from result index {current_index} to {new_index}") | |
result_data = full_results[new_index] | |
result_md = format_single_result(result_data, new_index, total_results) | |
updates[single_result_display_md] = gr.Markdown(result_md) | |
updates[current_result_index_state] = new_index | |
updates[result_index_indicator_md] = gr.Markdown(f"Result **{new_index + 1}** of **{total_results}**") | |
updates[context_display] = gr.Markdown("") # Clear reading area | |
updates[displayed_context_passages] = [] | |
updates[load_previous_button] = gr.Button(visible=False) | |
updates[load_next_button] = gr.Button(visible=False) | |
updates[weiterlesen_button] = gr.Button(visible=True) # Make visible again | |
# Update navigation button interactivity based on the *new* index | |
updates[previous_result_button] = gr.Button(interactive=(new_index > 0)) | |
updates[next_result_button] = gr.Button(interactive=(new_index < total_results - 1)) | |
# If index didn't change, ensure button states are still returned correctly | |
if new_index == current_index: | |
# Ensure weiterlesen visibility is returned if index didn't change | |
# (it should already be visible unless user clicked at boundary where it was hidden) | |
# Let's explicitly set it visible for safety upon any nav click if results exist | |
if total_results > 0: | |
updates[weiterlesen_button] = gr.Button(visible=True) | |
return updates | |
# --- Fetch Single Passage Helper --- | |
def fetch_passage_data(passage_id_int): | |
"""Fetches a single passage dictionary from ChromaDB by its integer ID.""" | |
if collection is None or passage_id_int < 0: | |
return None | |
try: | |
passage_id_str = str(passage_id_int) | |
result = collection.get(ids=[passage_id_str], include=['documents', 'metadatas']) | |
if result and result.get('ids') and result['ids']: | |
return { | |
'id': result['ids'][0], | |
'doc': result['documents'][0] if result.get('documents') else "N/A", | |
'meta': result['metadatas'][0] if result.get('metadatas') else {}, | |
} | |
else: | |
logging.info(f"Passage ID {passage_id_str} not found in collection.") | |
return None | |
except Exception as e: | |
logging.error(f"Error fetching passage ID {passage_id_int} from ChromaDB: {e}", exc_info=True) | |
return None | |
# --- Move Passage to Reading Area --- | |
def move_to_reading_area(current_index, full_results): | |
""" | |
Moves the selected result passage's text to the reading area below, | |
hides the 'weiterlesen' button, and enables context loading buttons. | |
Keeps the metadata preview in the top area. | |
""" | |
updates = { | |
# Keep top preview area unchanged | |
# Prepare context/reading area | |
context_display: gr.Markdown("_Loading reading passage..._"), | |
displayed_context_passages: [], | |
load_previous_button: gr.Button(visible=False), | |
load_next_button: gr.Button(visible=False), | |
weiterlesen_button: gr.Button(visible=False) # Hide this button | |
} | |
if not full_results or current_index < 0 or current_index >= len(full_results): | |
logging.warning(f"Attempted to move passage with invalid state or index. Index: {current_index}, Results Count: {len(full_results)}") | |
updates[context_display] = gr.Markdown("Error: Could not load passage reference.") | |
updates[weiterlesen_button] = gr.Button(visible=False) | |
return updates | |
try: | |
target_result_data = full_results[current_index] | |
reading_passage_state_data = { | |
'id': target_result_data.get('id'), | |
'doc': target_result_data.get('document'), | |
'meta': target_result_data.get('metadata'), | |
'role': 'current_reading' | |
} | |
if not reading_passage_state_data['id'] or not reading_passage_state_data['doc']: | |
logging.error(f"Cannot move passage: Missing ID or document in result at index {current_index}.") | |
updates[context_display] = gr.Markdown("Error: Selected passage data is incomplete.") | |
updates[weiterlesen_button] = gr.Button(visible=False) | |
return updates | |
formatted_passage_md = format_context_markdown([reading_passage_state_data]) | |
updates[context_display] = gr.Markdown(formatted_passage_md) | |
updates[displayed_context_passages] = [reading_passage_state_data] | |
updates[load_previous_button] = gr.Button(visible=True) | |
updates[load_next_button] = gr.Button(visible=True) | |
logging.info(f"Moved passage ID {reading_passage_state_data['id']} to reading area.") | |
return updates | |
except Exception as e: | |
logging.error(f"Error moving passage for result index {current_index}: {e}", exc_info=True) | |
updates[context_display] = gr.Markdown(f"Error moving passage to reading area: {e}") | |
updates[weiterlesen_button] = gr.Button(visible=False) | |
return updates | |
# --- Load More Context Function --- | |
def load_more_context(direction, current_passages_state): | |
""" | |
Loads one more passage either before or after the passages in the reading/context area. | |
Updates the Markdown display and the context state list. | |
""" | |
if collection is None: | |
return "Error: Database connection failed.", current_passages_state | |
if not current_passages_state: | |
logging.warning("Load more context called with empty state.") | |
return "_No reading passage loaded yet._", [] | |
current_passages_state.sort(key=lambda p: int(p.get('id', -1))) | |
updated_passages = list(current_passages_state) | |
try: | |
if direction == 'previous': | |
earliest_id_str = updated_passages[0].get('id') | |
if earliest_id_str is None: return format_context_markdown(updated_passages), updated_passages | |
earliest_id_int = int(earliest_id_str) | |
id_to_fetch = earliest_id_int - 1 | |
if id_to_fetch < 0: | |
if not (updated_passages[0].get('role') == 'missing' and updated_passages[0].get('id') == '-1'): | |
if updated_passages[0].get('role') == 'missing': updated_passages.pop(0) | |
updated_passages.insert(0, {'id': '-1', 'role': 'missing', 'doc': '_(Beginning of document reached)_'}) | |
else: | |
new_passage_data = fetch_passage_data(id_to_fetch) | |
if new_passage_data: | |
new_passage_data['role'] = 'prev' | |
if updated_passages[0].get('role') == 'missing' and updated_passages[0].get('id') == str(id_to_fetch + 1): | |
updated_passages.pop(0) | |
updated_passages.insert(0, new_passage_data) | |
else: | |
if not (updated_passages[0].get('role') == 'missing' and updated_passages[0].get('id') == str(id_to_fetch)): | |
if updated_passages[0].get('role') == 'missing': updated_passages.pop(0) | |
updated_passages.insert(0, {'id': str(id_to_fetch), 'role': 'missing', 'doc': '_(Beginning of document reached)_'}) | |
elif direction == 'next': | |
latest_id_str = updated_passages[-1].get('id') | |
if latest_id_str is None: return format_context_markdown(updated_passages), updated_passages | |
latest_id_int = int(latest_id_str) | |
id_to_fetch = latest_id_int + 1 | |
new_passage_data = fetch_passage_data(id_to_fetch) | |
if new_passage_data: | |
new_passage_data['role'] = 'next' | |
if updated_passages[-1].get('role') == 'missing' and updated_passages[-1].get('id') == str(id_to_fetch -1): | |
updated_passages.pop(-1) | |
updated_passages.append(new_passage_data) | |
else: | |
if not (updated_passages[-1].get('role') == 'missing' and updated_passages[-1].get('id') == str(id_to_fetch)): | |
if updated_passages[-1].get('role') == 'missing': updated_passages.pop(-1) | |
updated_passages.append({'id': str(id_to_fetch), 'role': 'missing', 'doc': '_(End of document reached)_'}) | |
context_md = format_context_markdown(updated_passages) | |
return context_md, updated_passages | |
except ValueError: | |
logging.error(f"Error converting passage ID to integer in load_more_context. State: {current_passages_state}", exc_info=True) | |
error_message = format_context_markdown(current_passages_state) + "\n\n**Error processing context expansion.**" | |
return error_message, current_passages_state | |
except Exception as e: | |
logging.error(f"Error loading more context (direction: {direction}): {e}", exc_info=True) | |
error_message = format_context_markdown(current_passages_state) + f"\n\n**Error loading passage: {e}**" | |
return error_message, current_passages_state | |
# --- Gradio UI Definition --- | |
with gr.Blocks(theme=gr.themes.Default()) as demo: | |
gr.Markdown("# Philosophical Text Search & Context Explorer") | |
# --- State Variables --- | |
full_search_results_state = gr.State([]) | |
current_result_index_state = gr.State(0) | |
displayed_context_passages = gr.State([]) | |
# --- Search Input Row --- | |
with gr.Row(): | |
query_input = gr.Textbox(label="Enter query", placeholder="z. B. 'Was ist der Unterschied zwischen Herstellen und Handeln?'", lines=2, scale=3) | |
author_dropdown = gr.Dropdown( | |
label="Filter by Author(s) (Optional)", | |
choices=unique_authors, | |
multiselect=True, | |
scale=2 | |
) | |
search_button = gr.Button("Search", variant="primary", scale=1) | |
# --- Result Navigation Row (MOVED HERE) --- | |
with gr.Row(): | |
previous_result_button = gr.Button("⬅️", visible=False) | |
next_result_button = gr.Button("➡️", visible=False) | |
gr.Markdown("---") # Separator after search and navigation | |
# --- Single Result Display Area --- | |
# Contains the preview text and the "weiterlesen" button | |
with gr.Column(visible=True) as results_area: | |
with gr.Group(visible=False) as single_result_group: | |
result_index_indicator_md = gr.Markdown("Result 0 of 0") | |
single_result_display_md = gr.Markdown("...") # Shows the preview | |
# "weiterlesen" button remains at the end of the preview group | |
weiterlesen_button = gr.Button("weiterlesen", variant="secondary", visible=True) | |
gr.Markdown("---") # Separator before reading area | |
# --- Context / Reading Area --- | |
with gr.Column(visible=True) as context_area: | |
load_previous_button = gr.Button("⬆️", variant="secondary", visible=False) | |
context_display = gr.Markdown(label="Reading Area") | |
load_next_button = gr.Button("⬇️", variant="secondary", visible=False) | |
# --- Event Handlers (Wiring remains the same) --- | |
# Search Button Action | |
search_outputs = [ | |
full_search_results_state, current_result_index_state, single_result_group, | |
result_index_indicator_md, single_result_display_md, previous_result_button, | |
next_result_button, weiterlesen_button, context_display, | |
displayed_context_passages, load_previous_button, load_next_button, | |
] | |
search_button.click( | |
fn=search_philosophical_texts, | |
inputs=[query_input, author_dropdown], | |
outputs=search_outputs | |
) | |
# Previous/Next Result Button Actions | |
nav_outputs = [ # Combined list for prev/next | |
single_result_display_md, current_result_index_state, result_index_indicator_md, | |
previous_result_button, next_result_button, weiterlesen_button, | |
context_display, displayed_context_passages, | |
load_previous_button, load_next_button, | |
] | |
previous_result_button.click( | |
fn=navigate_results, | |
inputs=[gr.State('previous'), current_result_index_state, full_search_results_state], | |
outputs=nav_outputs | |
) | |
next_result_button.click( | |
fn=navigate_results, | |
inputs=[gr.State('next'), current_result_index_state, full_search_results_state], | |
outputs=nav_outputs | |
) | |
# "weiterlesen" Button Action | |
weiterlesen_outputs = [ | |
context_display, displayed_context_passages, | |
load_previous_button, load_next_button, | |
weiterlesen_button # Target button itself to control visibility | |
] | |
weiterlesen_button.click( | |
fn=move_to_reading_area, | |
inputs=[current_result_index_state, full_search_results_state], | |
outputs=weiterlesen_outputs | |
) | |
# Load More Context Buttons | |
load_previous_button.click( | |
fn=load_more_context, | |
inputs=[gr.State('previous'), displayed_context_passages], | |
outputs=[context_display, displayed_context_passages] | |
) | |
load_next_button.click( | |
fn=load_more_context, | |
inputs=[gr.State('next'), displayed_context_passages], | |
outputs=[context_display, displayed_context_passages] | |
) | |
# --- Launch the Application --- | |
if __name__ == "__main__": | |
if collection is None: | |
print("\n--- ERROR: ChromaDB collection failed to load. UI might not function correctly. Check logs. ---\n") | |
elif not unique_authors: | |
print("\n--- WARNING: No unique authors found in DB metadata. Author filter will be empty. ---\n") | |
print("Launching Gradio Interface...") | |
# Make sure debug=True is helpful during testing | |
demo.launch(server_name="0.0.0.0", share=False, debug=True) |