import gradio as gr import chromadb import google.generativeai as genai import os from dotenv import load_dotenv import logging import functools from collections import defaultdict # --- Configuration --- logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') # Load environment variables (for API Key) load_dotenv() API_KEY = os.getenv("GEMINI_API_KEY") if not API_KEY: logging.error("GEMINI_API_KEY not found in environment variables.") else: try: genai.configure(api_key=API_KEY) logging.info("Gemini API configured successfully.") except Exception as e: logging.error(f"Error configuring Gemini API: {e}") # Chroma DB Configuration CHROMA_DB_PATH = "./chroma" COLLECTION_NAME = "phil_de" # Gemini Embedding Model Configuration # Make sure this matches the model used to create the DB (expecting 3072 dims based on past errors) EMBEDDING_MODEL = "models/gemini-embedding-exp-03-07" logging.info(f"Using embedding model: {EMBEDDING_MODEL}") # --- Constants --- MAX_RESULTS = 20 # --- ChromaDB Connection and Author Fetching --- collection = None unique_authors = [] try: client = chromadb.PersistentClient(path=CHROMA_DB_PATH) collection = client.get_collection(name=COLLECTION_NAME) logging.info(f"Successfully connected to ChromaDB collection '{COLLECTION_NAME}'. Collection count: {collection.count()}") logging.info("Fetching all metadata to extract unique authors...") all_metadata = collection.get(include=['metadatas']) if all_metadata and 'metadatas' in all_metadata and all_metadata['metadatas']: authors_set = set() for meta in all_metadata['metadatas']: if meta and 'author' in meta and meta['author']: authors_set.add(meta['author']) unique_authors = sorted(list(authors_set)) logging.info(f"Found {len(unique_authors)} unique authors.") else: logging.warning("Could not retrieve metadata or no metadata found to extract authors.") except Exception as e: logging.critical(f"FATAL: Could not connect to Chroma DB or fetch authors: {e}", exc_info=True) unique_authors = [] # --- Embedding Function --- def get_embedding(text, task="RETRIEVAL_QUERY"): if not API_KEY: logging.error("Cannot generate embedding: API key not configured.") return None if not text: logging.warning("Embedding requested for empty text.") return None try: logging.info(f"Generating embedding for task: {task}") result = genai.embed_content( model=EMBEDDING_MODEL, content=text, task_type=task ) logging.info("Embedding generated successfully.") return result['embedding'] except Exception as e: logging.error(f"Error generating Gemini embedding: {e}", exc_info=True) if "model" in str(e).lower() and ("not found" in str(e).lower() or "permission" in str(e).lower()): logging.error(f"The configured embedding model '{EMBEDDING_MODEL}' might be incorrect, unavailable, or lack permissions.") elif "dimension" in str(e).lower(): logging.error(f"Potential dimension mismatch issue with model '{EMBEDDING_MODEL}'.") return None # --- Helper: Format Single Result (for top display area) --- def format_single_result(result_data, index, total_results): """Formats the data for a single result into Markdown for the top preview area.""" if not result_data: return "No result data available." metadata = result_data.get('metadata', {}) doc = result_data.get('document', "N/A") distance = result_data.get('distance', float('inf')) author = metadata.get('author', 'N/A') book = metadata.get('book', 'N/A') section = metadata.get('section', 'N/A') md_content = "" md_content += f"* **Author:** {author}\n" md_content += f"* **Book:** {book}\n" if section not in ['Unknown', 'N/A', None]: md_content += f"* **Section:** {section}\n" md_content += f"* **Distance:** {distance:.4f}\n\n" md_content += f"> {doc}\n\n" return md_content # --- Helper: Format Reading Passage (Deprecated - formatting now done in format_context_markdown) --- # def format_reading_passage(passage_data): # No longer needed as separate function # ... # --- Context Formatting Helper --- def format_context_markdown(passages): """ Formats a list of passage dictionaries into a seamless Markdown string for the reading area, *without* a header. """ if not passages: return "" valid_passages = [p for p in passages if p and p.get('id') is not None] valid_passages.sort(key=lambda p: int(p.get('id', -1))) if not valid_passages: return "" # Combine Passage Texts full_text = "" for i, passage in enumerate(valid_passages): doc = passage.get('doc', '_Passage text missing_') role = passage.get('role', 'context') # Includes 'current_reading', 'prev', 'next' if role == 'missing': continue # Skip placeholders like "Beginning/End of document" full_text += doc # Add separator if not the last passage and next isn't missing if i < len(valid_passages) - 1: if valid_passages[i+1].get('role') != 'missing': full_text += "\n\n" return full_text # --- Search Function (Complete) --- def search_philosophical_texts(query, selected_authors): """ Performs search, stores all results in state, displays the first result. Returns updates for multiple components and state variables. """ # Initialize updates dictionary with default states updates = { full_search_results_state: [], current_result_index_state: 0, single_result_group: gr.Group(visible=False), result_index_indicator_md: gr.Markdown(""), single_result_display_md: gr.Markdown(""), previous_result_button: gr.Button(visible=False), next_result_button: gr.Button(visible=False), weiterlesen_button: gr.Button(visible=False), # Default to hidden context_display: gr.Markdown(""), displayed_context_passages: [], load_previous_button: gr.Button(visible=False), load_next_button: gr.Button(visible=False), } # --- Pre-computation Checks --- if collection is None: logging.error("Search attempted but ChromaDB collection is not available.") updates[single_result_display_md] = gr.Markdown("Error: Database connection failed.") updates[single_result_group] = gr.Group(visible=True) # Show group to display error return updates if not query: logging.warning("Empty query received.") updates[single_result_display_md] = gr.Markdown("Please enter a query.") updates[single_result_group] = gr.Group(visible=True) # Show group to display message return updates logging.info(f"Received query: '{query[:50]}...'") logging.info(f"Selected Authors for filtering: {selected_authors}") # --- Embedding --- query_embedding = get_embedding(query, task="RETRIEVAL_QUERY") if query_embedding is None: logging.error("Failed to generate query embedding.") updates[single_result_display_md] = gr.Markdown("Error: Failed to generate query embedding.") updates[single_result_group] = gr.Group(visible=True) return updates # --- Filtering --- where_filter = None if selected_authors: where_filter = {"author": {"$in": selected_authors}} logging.info(f"Applying WHERE filter: {where_filter}") # --- Query Execution and Result Processing --- try: logging.info(f"Querying collection '{COLLECTION_NAME}' for top {MAX_RESULTS} results.") # --->>> ACTUAL QUERY CALL <<<--- results = collection.query( query_embeddings=[query_embedding], n_results=MAX_RESULTS, where=where_filter, include=['documents', 'metadatas', 'distances'] # IDs are included by default ) # --->>> END QUERY CALL <<<--- # Process results if found all_results_data = [] if results and results.get('ids') and results['ids'][0]: num_found = len(results['ids'][0]) logging.info(f"Query successful. Found {num_found} results.") ids_list = results['ids'][0] docs_list = results['documents'][0] metadatas_list = results['metadatas'][0] distances_list = results['distances'][0] # --->>> ACTUAL RESULT PROCESSING LOOP <<<--- for i in range(num_found): # Validate ID conversion (just in case) try: _ = int(ids_list[i]) # Check if convertible except ValueError: logging.warning(f"Skipping result with non-integer ID: {ids_list[i]}") continue all_results_data.append({ "id": ids_list[i], "document": docs_list[i], "metadata": metadatas_list[i], "distance": distances_list[i] }) # --->>> END RESULT PROCESSING LOOP <<<--- if all_results_data: # Results found and processed successfully updates[full_search_results_state] = all_results_data updates[current_result_index_state] = 0 first_result_md = format_single_result(all_results_data[0], 0, len(all_results_data)) updates[single_result_display_md] = gr.Markdown(first_result_md) updates[single_result_group] = gr.Group(visible=True) # Show group updates[result_index_indicator_md] = gr.Markdown(f"Result **1** of **{len(all_results_data)}**") updates[previous_result_button] = gr.Button(visible=True, interactive=False) updates[next_result_button] = gr.Button(visible=True, interactive=(len(all_results_data) > 1)) updates[weiterlesen_button] = gr.Button(visible=True) # Show this button else: # Query returned results, but none were valid after processing logging.info("No valid results found after filtering/validation.") updates[single_result_display_md] = gr.Markdown("No results found matching your query and filters.") updates[single_result_group] = gr.Group(visible=True) # Show message updates[weiterlesen_button] = gr.Button(visible=False) # Hide button else: # Query returned no results logging.info("No results found for the query (or matching the filter).") updates[single_result_display_md] = gr.Markdown("No results found matching your query and filters.") updates[single_result_group] = gr.Group(visible=True) # Show message updates[weiterlesen_button] = gr.Button(visible=False) # Hide button return updates # --->>> ACTUAL EXCEPTION HANDLING <<<--- except Exception as e: logging.error(f"Error querying ChromaDB or processing results: {e}", exc_info=True) # Define error_msg based on the exception if "dimension" in str(e).lower(): error_msg = "**Error:** Database search failed due to embedding mismatch. Please check configuration." else: # Display the actual error message type from the exception error_msg = f"**Error:** An unexpected error occurred during search. See logs for details. ({type(e).__name__})" # Update the UI to show the error message updates[single_result_display_md] = gr.Markdown(error_msg) updates[single_result_group] = gr.Group(visible=True) # Show the group to display the error # Reset state on error updates[full_search_results_state] = [] updates[current_result_index_state] = 0 updates[weiterlesen_button] = gr.Button(visible=False) updates[previous_result_button] = gr.Button(visible=False) updates[next_result_button] = gr.Button(visible=False) updates[result_index_indicator_md] = gr.Markdown("") updates[context_display] = gr.Markdown("") updates[displayed_context_passages] = [] updates[load_previous_button] = gr.Button(visible=False) updates[load_next_button] = gr.Button(visible=False) return updates # --->>> END EXCEPTION HANDLING <<<--- # --- Result Navigation Function --- def navigate_results(direction, current_index, full_results): """Handles moving between search results in the top display area.""" updates = {} if not full_results: logging.warning("Navigate called with no results in state.") return { current_result_index_state: 0 } total_results = len(full_results) new_index = current_index if direction == 'previous': new_index = max(0, current_index - 1) elif direction == 'next': new_index = min(total_results - 1, current_index + 1) # Only update display if the index actually changed if new_index != current_index: logging.info(f"Navigating from result index {current_index} to {new_index}") result_data = full_results[new_index] result_md = format_single_result(result_data, new_index, total_results) updates[single_result_display_md] = gr.Markdown(result_md) updates[current_result_index_state] = new_index updates[result_index_indicator_md] = gr.Markdown(f"Result **{new_index + 1}** of **{total_results}**") updates[context_display] = gr.Markdown("") # Clear reading area updates[displayed_context_passages] = [] updates[load_previous_button] = gr.Button(visible=False) updates[load_next_button] = gr.Button(visible=False) updates[weiterlesen_button] = gr.Button(visible=True) # Make visible again # Update navigation button interactivity based on the *new* index updates[previous_result_button] = gr.Button(interactive=(new_index > 0)) updates[next_result_button] = gr.Button(interactive=(new_index < total_results - 1)) # If index didn't change, ensure button states are still returned correctly if new_index == current_index: # Ensure weiterlesen visibility is returned if index didn't change # (it should already be visible unless user clicked at boundary where it was hidden) # Let's explicitly set it visible for safety upon any nav click if results exist if total_results > 0: updates[weiterlesen_button] = gr.Button(visible=True) return updates # --- Fetch Single Passage Helper --- def fetch_passage_data(passage_id_int): """Fetches a single passage dictionary from ChromaDB by its integer ID.""" if collection is None or passage_id_int < 0: return None try: passage_id_str = str(passage_id_int) result = collection.get(ids=[passage_id_str], include=['documents', 'metadatas']) if result and result.get('ids') and result['ids']: return { 'id': result['ids'][0], 'doc': result['documents'][0] if result.get('documents') else "N/A", 'meta': result['metadatas'][0] if result.get('metadatas') else {}, } else: logging.info(f"Passage ID {passage_id_str} not found in collection.") return None except Exception as e: logging.error(f"Error fetching passage ID {passage_id_int} from ChromaDB: {e}", exc_info=True) return None # --- Move Passage to Reading Area --- def move_to_reading_area(current_index, full_results): """ Moves the selected result passage's text to the reading area below, hides the 'weiterlesen' button, and enables context loading buttons. Keeps the metadata preview in the top area. """ updates = { # Keep top preview area unchanged # Prepare context/reading area context_display: gr.Markdown("_Loading reading passage..._"), displayed_context_passages: [], load_previous_button: gr.Button(visible=False), load_next_button: gr.Button(visible=False), weiterlesen_button: gr.Button(visible=False) # Hide this button } if not full_results or current_index < 0 or current_index >= len(full_results): logging.warning(f"Attempted to move passage with invalid state or index. Index: {current_index}, Results Count: {len(full_results)}") updates[context_display] = gr.Markdown("Error: Could not load passage reference.") updates[weiterlesen_button] = gr.Button(visible=False) return updates try: target_result_data = full_results[current_index] reading_passage_state_data = { 'id': target_result_data.get('id'), 'doc': target_result_data.get('document'), 'meta': target_result_data.get('metadata'), 'role': 'current_reading' } if not reading_passage_state_data['id'] or not reading_passage_state_data['doc']: logging.error(f"Cannot move passage: Missing ID or document in result at index {current_index}.") updates[context_display] = gr.Markdown("Error: Selected passage data is incomplete.") updates[weiterlesen_button] = gr.Button(visible=False) return updates formatted_passage_md = format_context_markdown([reading_passage_state_data]) updates[context_display] = gr.Markdown(formatted_passage_md) updates[displayed_context_passages] = [reading_passage_state_data] updates[load_previous_button] = gr.Button(visible=True) updates[load_next_button] = gr.Button(visible=True) logging.info(f"Moved passage ID {reading_passage_state_data['id']} to reading area.") return updates except Exception as e: logging.error(f"Error moving passage for result index {current_index}: {e}", exc_info=True) updates[context_display] = gr.Markdown(f"Error moving passage to reading area: {e}") updates[weiterlesen_button] = gr.Button(visible=False) return updates # --- Load More Context Function --- def load_more_context(direction, current_passages_state): """ Loads one more passage either before or after the passages in the reading/context area. Updates the Markdown display and the context state list. """ if collection is None: return "Error: Database connection failed.", current_passages_state if not current_passages_state: logging.warning("Load more context called with empty state.") return "_No reading passage loaded yet._", [] current_passages_state.sort(key=lambda p: int(p.get('id', -1))) updated_passages = list(current_passages_state) try: if direction == 'previous': earliest_id_str = updated_passages[0].get('id') if earliest_id_str is None: return format_context_markdown(updated_passages), updated_passages earliest_id_int = int(earliest_id_str) id_to_fetch = earliest_id_int - 1 if id_to_fetch < 0: if not (updated_passages[0].get('role') == 'missing' and updated_passages[0].get('id') == '-1'): if updated_passages[0].get('role') == 'missing': updated_passages.pop(0) updated_passages.insert(0, {'id': '-1', 'role': 'missing', 'doc': '_(Beginning of document reached)_'}) else: new_passage_data = fetch_passage_data(id_to_fetch) if new_passage_data: new_passage_data['role'] = 'prev' if updated_passages[0].get('role') == 'missing' and updated_passages[0].get('id') == str(id_to_fetch + 1): updated_passages.pop(0) updated_passages.insert(0, new_passage_data) else: if not (updated_passages[0].get('role') == 'missing' and updated_passages[0].get('id') == str(id_to_fetch)): if updated_passages[0].get('role') == 'missing': updated_passages.pop(0) updated_passages.insert(0, {'id': str(id_to_fetch), 'role': 'missing', 'doc': '_(Beginning of document reached)_'}) elif direction == 'next': latest_id_str = updated_passages[-1].get('id') if latest_id_str is None: return format_context_markdown(updated_passages), updated_passages latest_id_int = int(latest_id_str) id_to_fetch = latest_id_int + 1 new_passage_data = fetch_passage_data(id_to_fetch) if new_passage_data: new_passage_data['role'] = 'next' if updated_passages[-1].get('role') == 'missing' and updated_passages[-1].get('id') == str(id_to_fetch -1): updated_passages.pop(-1) updated_passages.append(new_passage_data) else: if not (updated_passages[-1].get('role') == 'missing' and updated_passages[-1].get('id') == str(id_to_fetch)): if updated_passages[-1].get('role') == 'missing': updated_passages.pop(-1) updated_passages.append({'id': str(id_to_fetch), 'role': 'missing', 'doc': '_(End of document reached)_'}) context_md = format_context_markdown(updated_passages) return context_md, updated_passages except ValueError: logging.error(f"Error converting passage ID to integer in load_more_context. State: {current_passages_state}", exc_info=True) error_message = format_context_markdown(current_passages_state) + "\n\n**Error processing context expansion.**" return error_message, current_passages_state except Exception as e: logging.error(f"Error loading more context (direction: {direction}): {e}", exc_info=True) error_message = format_context_markdown(current_passages_state) + f"\n\n**Error loading passage: {e}**" return error_message, current_passages_state # --- Gradio UI Definition --- with gr.Blocks(theme=gr.themes.Default()) as demo: gr.Markdown("# Philosophical Text Search & Context Explorer") # --- State Variables --- full_search_results_state = gr.State([]) current_result_index_state = gr.State(0) displayed_context_passages = gr.State([]) # --- Search Input Row --- with gr.Row(): query_input = gr.Textbox(label="Enter query", placeholder="z. B. 'Was ist der Unterschied zwischen Herstellen und Handeln?'", lines=2, scale=3) author_dropdown = gr.Dropdown( label="Filter by Author(s) (Optional)", choices=unique_authors, multiselect=True, scale=2 ) search_button = gr.Button("Search", variant="primary", scale=1) # --- Result Navigation Row (MOVED HERE) --- with gr.Row(): previous_result_button = gr.Button("⬅️", visible=False) next_result_button = gr.Button("➡️", visible=False) gr.Markdown("---") # Separator after search and navigation # --- Single Result Display Area --- # Contains the preview text and the "weiterlesen" button with gr.Column(visible=True) as results_area: with gr.Group(visible=False) as single_result_group: result_index_indicator_md = gr.Markdown("Result 0 of 0") single_result_display_md = gr.Markdown("...") # Shows the preview # "weiterlesen" button remains at the end of the preview group weiterlesen_button = gr.Button("weiterlesen", variant="secondary", visible=True) gr.Markdown("---") # Separator before reading area # --- Context / Reading Area --- with gr.Column(visible=True) as context_area: load_previous_button = gr.Button("⬆️", variant="secondary", visible=False) context_display = gr.Markdown(label="Reading Area") load_next_button = gr.Button("⬇️", variant="secondary", visible=False) # --- Event Handlers (Wiring remains the same) --- # Search Button Action search_outputs = [ full_search_results_state, current_result_index_state, single_result_group, result_index_indicator_md, single_result_display_md, previous_result_button, next_result_button, weiterlesen_button, context_display, displayed_context_passages, load_previous_button, load_next_button, ] search_button.click( fn=search_philosophical_texts, inputs=[query_input, author_dropdown], outputs=search_outputs ) # Previous/Next Result Button Actions nav_outputs = [ # Combined list for prev/next single_result_display_md, current_result_index_state, result_index_indicator_md, previous_result_button, next_result_button, weiterlesen_button, context_display, displayed_context_passages, load_previous_button, load_next_button, ] previous_result_button.click( fn=navigate_results, inputs=[gr.State('previous'), current_result_index_state, full_search_results_state], outputs=nav_outputs ) next_result_button.click( fn=navigate_results, inputs=[gr.State('next'), current_result_index_state, full_search_results_state], outputs=nav_outputs ) # "weiterlesen" Button Action weiterlesen_outputs = [ context_display, displayed_context_passages, load_previous_button, load_next_button, weiterlesen_button # Target button itself to control visibility ] weiterlesen_button.click( fn=move_to_reading_area, inputs=[current_result_index_state, full_search_results_state], outputs=weiterlesen_outputs ) # Load More Context Buttons load_previous_button.click( fn=load_more_context, inputs=[gr.State('previous'), displayed_context_passages], outputs=[context_display, displayed_context_passages] ) load_next_button.click( fn=load_more_context, inputs=[gr.State('next'), displayed_context_passages], outputs=[context_display, displayed_context_passages] ) # --- Launch the Application --- if __name__ == "__main__": if collection is None: print("\n--- ERROR: ChromaDB collection failed to load. UI might not function correctly. Check logs. ---\n") elif not unique_authors: print("\n--- WARNING: No unique authors found in DB metadata. Author filter will be empty. ---\n") print("Launching Gradio Interface...") # Make sure debug=True is helpful during testing demo.launch(server_name="0.0.0.0", share=False, debug=True)