diff --git "a/app.py" "b/app.py" --- "a/app.py" +++ "b/app.py" @@ -1,3 +1,8 @@ +# -*- coding: utf-8 -*- + +# --------------------------------------------------------------------------- +# 0) Imports +# --------------------------------------------------------------------------- import gradio as gr import chromadb import google.generativeai as genai @@ -6,13 +11,27 @@ from dotenv import load_dotenv import logging import functools from collections import defaultdict +import traceback # For detailed error logging +import datetime # For timestamped filenames +import re # For parsing tangents and LLM JSON output +import numpy as np # For cosine similarity calculation +import json # For parsing LLM JSON output +import threading # tiny file‑lock for the JSON ledger +import html # escape text for clickable spans +import time # Useful for simple sleeps if needed for debugging timing +# --------------------------------------------------------------------------- + # --- Configuration --- -logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') + +# Configure logging level +logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s') + # Load environment variables (for API Key) load_dotenv() API_KEY = os.getenv("GEMINI_API_KEY") + if not API_KEY: logging.error("GEMINI_API_KEY not found in environment variables.") else: @@ -21,578 +40,3169 @@ else: logging.info("Gemini API configured successfully.") except Exception as e: logging.error(f"Error configuring Gemini API: {e}") + API_KEY = None # Chroma DB Configuration CHROMA_DB_PATH = "./chroma" COLLECTION_NAME = "phil_de" -# Gemini Embedding Model Configuration -# Make sure this matches the model used to create the DB (expecting 3072 dims based on past errors) -EMBEDDING_MODEL = "models/gemini-embedding-exp-03-07" +# Gemini Model Configuration +EMBEDDING_MODEL = "models/gemini-embedding-exp-03-07" # Using standard embedding model +LLM_RERANK_MODEL_NAME = "models/gemini-2.0-flash" # Use a capable model for reasoning/ranking + logging.info(f"Using embedding model: {EMBEDDING_MODEL}") +logging.info(f"Using LLM Re-Rank/Truncate generation model: {LLM_RERANK_MODEL_NAME}") # --- Constants --- -MAX_RESULTS = 20 +MAX_RESULTS_STANDARD = 20 # Max results shown in standard search after re-ranking +INITIAL_RESULTS_FOR_RERANK = 300 # How many results to fetch initially for re-ranking passes +RERANK_WINDOW_SIZE = 2 # +/- N sentences to consider for contextual re-ranking (both passes) +MIN_CHARS_FOR_RELEVANT_NEIGHBOR = 6 # Minimum characters for a neighbor to contribute to the re-rank score +RERANK_WEIGHT = 0.5 # Weight factor for neighbor similarity in 1st pass re-rank score +RERANK_DECAY = 0.1 # Score decay per sentence distance in 1st pass re-rank +LLM_RERANK_CANDIDATE_COUNT = 25 # How many candidates (after 1st pass re-rank) to send to LLM +LLM_RERANK_TARGET_COUNT = 10 # How many final edited results to request from LLM +PROMPT_LOG_DIR = "./prompts" # Directory to save LLM prompts for debugging +MAX_RESULTS_PER_AUTHOR = 3 # NEW: Max results from a single author in the final list +MAX_FAVOURITES = 50 # Max favourites to load for display + +# --- Constants for Highlighting --- +HIGHLIGHT_HUE = 60 # Yellowish hue +HIGHLIGHT_SATURATION = 100 +HIGHLIGHT_LIGHTNESS = 90 +HIGHLIGHT_MAX_ALPHA = 0.5 # Max transparency (0 = transparent, 1 = opaque) +HIGHLIGHT_MIN_ALPHA = 0.05 # Minimum alpha for sentences at the threshold (when max > threshold) +HIGHLIGHT_SIMILARITY_THRESHOLD = 0.6 # Minimum cosine similarity score to apply highlighting + +# ─── FAVOURITES CONFIG ────────────────────────────────────────────────────── +BASE_DIR = os.path.dirname(os.path.abspath(__file__)) # always absolute +FAV_FILE = os.path.join(BASE_DIR, "favourites.json") # ./favourites.json +_fav_lock = threading.Lock() # file‑write lock + +# --- Define Prompt for LLM Re-ranking V3 --- +LLM_RERANKING_PROMPT_TEMPLATE_V3 = """ +**Task:** Evaluate, truncate, and re-rank the provided text passages based on their relevance to the user's query. Return exactly the top {target_count} most relevant results, including their original IDs, the edited text, and a brief rationale for each selection. + +**User Query:** +"{user_query}" + +**Text Passages to Evaluate:** +{passage_blocks_str} +--- END OF PASSAGES --- + +**Instructions:** +1. **Analyze Query:** Understand the core question or theme of the User Query. +2. **Evaluate Each Passage:** For each text passage provided above (identified by "Passage ID:" and separated by '--- PASSAGE SEPARATOR ---'): + * Read the entire passage carefully. + * Identify the most relevant contiguous sentences within the passage that directly address or best illuminate the User Query. + * **Truncate/Edit:** Extract ONLY the most relevant segment. Discard the rest of the passage. The goal is a concise, highly relevant excerpt. If an entire passage seems irrelevant, discard it entirely. + * **Rationale Generation:** Briefly explain *why* the segment you extracted is relevant to the User Query. +3. **Rank Edited Passages:** Based on the relevance of the *edited/truncated* segments you created, determine a final ranking. The most relevant edited segment should be ranked first. +4. **Select Top Results:** Choose exactly the top {target_count} most relevant edited passages from your ranking. If fewer than {target_count} passages were deemed relevant at all, return only those that were. +5. **Output:** Provide *only* a JSON formatted list containing exactly the top {target_count} (or fewer, if not enough were relevant) results. Each result object in the list MUST contain: + * `"original_id"`: The ID of the passage the text came from. + * `"edited_text"`: The concise, truncated text segment you extracted. + * `"rationale"`: Your brief explanation of why this segment is relevant. + The list should be sorted from most relevant to least relevant. + + **Required JSON Output Format:** + ```json + {{ + "ranked_edited_passages": [ + {{ + "original_id": "...", + "edited_text": "...", + "rationale": "..." + }}, + {{ + "original_id": "...", + "edited_text": "...", + "rationale": "..." + }} + ] + }} + ``` + +**Final Output (JSON list of objects):** +```json +""" # --- ChromaDB Connection and Author Fetching --- collection = None unique_authors = [] try: + os.makedirs(PROMPT_LOG_DIR, exist_ok=True) + logging.info(f"Prompt log directory ensured at: {PROMPT_LOG_DIR}") client = chromadb.PersistentClient(path=CHROMA_DB_PATH) - collection = client.get_collection(name=COLLECTION_NAME) + collection = client.get_or_create_collection(name=COLLECTION_NAME) logging.info(f"Successfully connected to ChromaDB collection '{COLLECTION_NAME}'. Collection count: {collection.count()}") - logging.info("Fetching all metadata to extract unique authors...") - all_metadata = collection.get(include=['metadatas']) - if all_metadata and 'metadatas' in all_metadata and all_metadata['metadatas']: - authors_set = set() - for meta in all_metadata['metadatas']: - if meta and 'author' in meta and meta['author']: - authors_set.add(meta['author']) - unique_authors = sorted(list(authors_set)) - logging.info(f"Found {len(unique_authors)} unique authors.") + if collection.count() > 0: + all_metadata = collection.get(include=['metadatas']) + if all_metadata and 'metadatas' in all_metadata and all_metadata['metadatas']: + authors_set = set() + for meta in all_metadata['metadatas']: + if isinstance(meta, dict) and meta.get('author'): + authors_set.add(meta['author']) + unique_authors = sorted(list(authors_set)) + logging.info(f"Found {len(unique_authors)} unique authors.") + else: + logging.warning("Could not retrieve metadata or no metadata found to extract authors.") else: - logging.warning("Could not retrieve metadata or no metadata found to extract authors.") - + logging.warning(f"Collection '{COLLECTION_NAME}' is empty. No authors to fetch.") except Exception as e: - logging.critical(f"FATAL: Could not connect to Chroma DB or fetch authors: {e}", exc_info=True) - unique_authors = [] + logging.critical(f"FATAL: Could not connect to Chroma DB, fetch authors, or setup prompt dir: {e}", exc_info=True) + unique_authors = [] # Ensure it's an empty list on error + + +# --- Gemini Generation Model Initialization --- +llm_rerank_model = None +if API_KEY: + try: + llm_rerank_model = genai.GenerativeModel(LLM_RERANK_MODEL_NAME) + logging.info(f"Gemini LLM Re-Rank Model '{LLM_RERANK_MODEL_NAME}' initialized.") + except Exception as e: + logging.error(f"Error initializing Gemini LLM Re-Rank Model '{LLM_RERANK_MODEL_NAME}': {e}") + # --- Embedding Function --- +@functools.lru_cache(maxsize=1024) def get_embedding(text, task="RETRIEVAL_QUERY"): + """Generates an embedding for the given text using the configured Gemini model.""" if not API_KEY: logging.error("Cannot generate embedding: API key not configured.") return None - if not text: - logging.warning("Embedding requested for empty text.") + if not text or not isinstance(text, str) or not text.strip(): return None + + valid_task_types = {"RETRIEVAL_QUERY", "RETRIEVAL_DOCUMENT", "SEMANTIC_SIMILARITY", "CLASSIFICATION", "CLUSTERING"} + if task not in valid_task_types: + logging.warning(f"Invalid task type '{task}' for embedding model. Defaulting to 'RETRIEVAL_QUERY'.") + task = "RETRIEVAL_QUERY" + try: - logging.info(f"Generating embedding for task: {task}") - result = genai.embed_content( - model=EMBEDDING_MODEL, - content=text, - task_type=task - ) - logging.info("Embedding generated successfully.") - return result['embedding'] + logging.debug(f"Requesting embedding for text: '{text[:50]}...' with task: {task}") + result = genai.embed_content(model=EMBEDDING_MODEL, content=text, task_type=task) + embedding = result.get('embedding') + if embedding: + logging.debug(f"Embedding received. Type: {type(embedding)}, Length (if list): {len(embedding) if isinstance(embedding, list) else 'N/A'}") + else: + logging.warning("Gemini API returned result without 'embedding' key.") + return embedding except Exception as e: - logging.error(f"Error generating Gemini embedding: {e}", exc_info=True) - if "model" in str(e).lower() and ("not found" in str(e).lower() or "permission" in str(e).lower()): - logging.error(f"The configured embedding model '{EMBEDDING_MODEL}' might be incorrect, unavailable, or lack permissions.") - elif "dimension" in str(e).lower(): - logging.error(f"Potential dimension mismatch issue with model '{EMBEDDING_MODEL}'.") + logging.error(f"Error generating Gemini embedding for '{text[:50]}...': {e}", exc_info=True) + if "resource has been exhausted" in str(e).lower(): + logging.error("Embedding failed likely due to quota exhaustion.") + elif "api key not valid" in str(e).lower(): + logging.error("Embedding failed due to invalid API key.") return None +# --- Helper: Fetch Embeddings for Neighbor IDs --- +@functools.lru_cache(maxsize=2048) +def fetch_embeddings_for_ids(ids_to_fetch_tuple): + """Fetches embeddings for a tuple of passage IDs from ChromaDB.""" + if collection is None or not ids_to_fetch_tuple: + return {} + valid_ids = [str(id_val) for id_val in ids_to_fetch_tuple if id_val is not None] + if not valid_ids: + return {} + + embeddings_map = {} + try: + logging.debug(f"Fetching embeddings for {len(valid_ids)} neighbor IDs.") + results = collection.get(ids=valid_ids, include=['embeddings']) + ids_list = results.get('ids') + embeddings_list = results.get('embeddings') + + if ids_list is not None and embeddings_list is not None and len(ids_list) == len(embeddings_list): + for i, fetched_id in enumerate(ids_list): + if embeddings_list[i] is not None: + embeddings_map[fetched_id] = embeddings_list[i] + else: + logging.warning(f"Embedding for neighbor ID {fetched_id} was None in DB result.") + elif ids_list is not None or embeddings_list is not None: + logging.error(f"Mismatch/Incomplete fetch for neighbor embeddings. Fetched IDs: {len(ids_list) if ids_list is not None else 'None'}, Embeddings: {len(embeddings_list) if embeddings_list is not None else 'None'} for {len(valid_ids)} requested IDs.") -# --- Helper: Format Single Result (for top display area) --- -def format_single_result(result_data, index, total_results): - """Formats the data for a single result into Markdown for the top preview area.""" - if not result_data: - return "No result data available." + except Exception as e: + logging.error(f"Error fetching neighbor embeddings for IDs {valid_ids}: {e}", exc_info=True) + return embeddings_map + +# --- Helper: Fetch all sentences for a specific paragraph --- +def fetch_paragraph_data(author, book, paragraph_index): + """Fetches all sentence data (doc, meta, embedding) for a specific paragraph.""" + logging.debug(f"Attempting fetch_paragraph_data: Author='{author}', Book='{book}', ParaIdx={paragraph_index}") + if collection is None or author is None or book is None or paragraph_index is None or paragraph_index < 0: + logging.warning(f"fetch_paragraph_data: Invalid arguments provided.") + return [] + try: + paragraph_index_int = int(paragraph_index) # Ensure integer for query + results = collection.get( + where={"$and": [{"author": author}, {"book": book}, {"paragraph_index": paragraph_index_int}]}, + include=['documents', 'metadatas', 'embeddings'] # Crucial: include embeddings for highlighting + ) + + if not results or not results.get('ids'): + logging.debug(f"No sentences found for Author='{author}', Book='{book}', ParagraphIndex={paragraph_index_int}") + return [] + + paragraph_sentences = [] + num_results = len(results['ids']) + documents_list = results.get('documents', []) + metadatas_list = results.get('metadatas', []) + embeddings_list = results.get('embeddings', []) + + if not (num_results == len(documents_list) == len(metadatas_list) == len(embeddings_list)): + logging.warning(f"fetch_paragraph_data: Length mismatch in results for {author}/{book}/P{paragraph_index_int}. IDs:{num_results}, Docs:{len(documents_list)}, Metas:{len(metadatas_list)}, Embs:{len(embeddings_list)}. Clamping to minimum.") + num_results = min(num_results, len(documents_list), len(metadatas_list), len(embeddings_list)) + + for i in range(num_results): + sent_id = results['ids'][i] + meta = metadatas_list[i] + doc = documents_list[i] + emb = embeddings_list[i] # Get embedding + + if doc is None or emb is None: # Embedding needed for highlighting + logging.warning(f"Skipping sentence {sent_id} in paragraph {paragraph_index_int} due to missing document or embedding.") + continue + + entry = {'id': sent_id, 'doc': doc, 'meta': meta or {}, 'embedding': emb, 'paragraph_index': meta.get('paragraph_index', paragraph_index_int)} + try: + entry['sentence_sort_key'] = int(sent_id) + except (ValueError, TypeError): + entry['sentence_sort_key'] = float('inf') # Put unparsable IDs at the end + logging.warning(f"Could not parse sentence ID as integer for sorting: {sent_id}") + + paragraph_sentences.append(entry) + + paragraph_sentences.sort(key=lambda x: x.get('sentence_sort_key', float('inf'))) + logging.debug(f"Fetched and sorted {len(paragraph_sentences)} sentences for paragraph {paragraph_index_int}.") + return paragraph_sentences + except Exception as e: + logging.error(f"Error fetching paragraph data for Author='{author}', Book='{book}', ParagraphIndex={paragraph_index}: {e}", exc_info=True) + return [] + +# --- Helper: Fetch Documents and Metadata for Multiple IDs --- +def fetch_multiple_passage_data(passage_ids): + """Fetches documents and metadata for multiple passage IDs from ChromaDB.""" + if not passage_ids or collection is None: + logging.warning(f"fetch_multiple_passage_data called with no IDs or no collection.") + return {} + + passage_data_map = {} + try: + str_ids = [str(pid) for pid in passage_ids if pid is not None] + if not str_ids: return {} + + logging.debug(f"Fetching passage data for {len(str_ids)} IDs: {str_ids[:10]}...") + results = collection.get(ids=str_ids, include=['documents', 'metadatas']) + + if results and results.get('ids'): + fetched_ids = results['ids'] + docs = results.get('documents', []) + metas = results.get('metadatas', []) + + if not (len(fetched_ids) == len(docs) == len(metas)): + logging.error(f"Mismatch in lengths returned by collection.get for multiple IDs: {len(fetched_ids)} IDs, {len(docs)} docs, {len(metas)} metas. IDs requested: {str_ids}") + # Attempt to process based on shortest list? For now, proceed cautiously. + + id_to_index = {fid: i for i, fid in enumerate(fetched_ids)} + # num_fetched = len(fetched_ids) # Unused after refactor + + for req_id in str_ids: + if req_id in id_to_index: + idx = id_to_index[req_id] + # Check index bounds against potentially mismatched lists + doc = docs[idx] if idx < len(docs) and docs[idx] is not None else "_Text fehlt_" + meta = metas[idx] if idx < len(metas) and metas[idx] is not None else {} + passage_data_map[req_id] = {'doc': doc, 'meta': meta} + if doc == "_Text fehlt_": logging.warning(f"Missing document for fetched ID: {req_id}") + if not meta: logging.warning(f"Missing metadata for fetched ID: {req_id}") + else: + logging.warning(f"Requested ID not found in collection.get results: {req_id}") + + missing_ids = set(str_ids) - set(passage_data_map.keys()) + if missing_ids: + logging.warning(f"Could not find any data (doc/meta) for requested IDs: {missing_ids}") + else: + logging.warning(f"ChromaDB get returned no results or no IDs for requested list: {str_ids[:10]}...") + + except Exception as e: + logging.error(f"Error fetching multiple passage data for IDs {passage_ids}: {e}", exc_info=True) + return passage_data_map + +# --- Helper: Calculate Cosine Similarity --- +def cosine_similarity_np(vec1, vec2): + """Calculates cosine similarity between two vectors using NumPy.""" + if vec1 is None or vec2 is None: + return 0.0 + try: + vec1 = np.array(vec1, dtype=np.float32) + vec2 = np.array(vec2, dtype=np.float32) + except Exception as e: + logging.error(f"Error converting vectors to numpy arrays for cosine similarity: {e}. vec1 type: {type(vec1)}, vec2 type: {type(vec2)}") + return 0.0 + + if vec1.shape != vec2.shape: + if vec1.size > 0 and vec2.size > 0: + logging.warning(f"Cosine similarity shape mismatch: {vec1.shape} vs {vec2.shape}") + return 0.0 + if vec1.ndim == 0 or vec1.size == 0: + return 0.0 + + norm1 = np.linalg.norm(vec1) + norm2 = np.linalg.norm(vec2) + if norm1 == 0 or norm2 == 0: + return 0.0 + + epsilon = 1e-10 # Small value to prevent division by zero + similarity = np.dot(vec1, vec2) / (norm1 * norm2 + epsilon) + + return float(np.clip(similarity, -1.0, 1.0)) + +# --- Helper: Compare Passage Metadata --- +def compare_passage_metadata(meta1, meta2): + """Checks if two passages share the same author, book, section, and title metadata.""" + if not meta1 or not meta2: return False + return (meta1.get('author') == meta2.get('author') and + meta1.get('book') == meta2.get('book') and + (meta1.get('section') is None and meta2.get('section') is None or meta1.get('section') == meta2.get('section')) and + (meta1.get('title') is None and meta2.get('title') is None or meta1.get('title') == meta2.get('title'))) + +# --- Favourite-helpers --- +def _load_favs() -> dict[str, int]: + logging.debug(f"Attempting to load favourites from {FAV_FILE}") + try: + with open(FAV_FILE, "r", encoding="utf-8") as fh: + raw = json.load(fh) + # Ensure IDs are strings and scores are integers + favs = {str(k): int(v) for k, v in raw.items()} + logging.debug(f"Successfully loaded {len(favs)} favourites.") + return favs + except FileNotFoundError: + logging.debug(f"Favourites file not found at {FAV_FILE.strip()}. Starting with empty favourites.") + return {} + except Exception as e: + logging.error(f"Could not read {FAV_FILE}: {e}", exc_info=True) + return {} + +def _save_favs() -> None: + logging.debug(f"Attempting to save favourites to {FAV_FILE}") + tmp = FAV_FILE + ".tmp" + + try: + # This code is now directly executed when _save_favs() is called. + # It relies on the CALLER (e.g., inc_favourite) holding the lock. + with open(tmp, "w", encoding="utf-8") as fh: + logging.debug(f"Opened temp file {tmp} for writing.") + json.dump(favourite_scores, fh, ensure_ascii=False, indent=2) + logging.debug("Dumped favourites to temp file.") + fh.flush() + logging.debug("Flushed temp file.") + os.fsync(fh.fileno()) # Force write to disk + logging.debug("Synced temp file.") + # logging.debug(f"Closed temp file {tmp}.") # This line is now after the 'with open' block + os.replace(tmp, FAV_FILE) # Atomic replace + logging.debug(f"Successfully replaced {FAV_FILE} with temp file.") + logging.debug(f"Successfully saved {len(favourite_scores)} favourites.") + except Exception as e: + logging.error(f"Could not save {FAV_FILE}: {e}", exc_info=True) + +favourite_scores: dict[str, int] = _load_favs() # Load favourites on startup + +def inc_favourite(passage_id: str) -> int: + """Add one ⭐ to a sentence, persist, return new total.""" + logging.info(f"Attempting to increment favourite for ID: {passage_id}") + if not passage_id or not isinstance(passage_id, str): + logging.warning(f"Invalid passage_id for inc_favourite: {passage_id}") + return 0 + with _fav_lock: + # Ensure ID is treated as string key + str_passage_id = str(passage_id) + favourite_scores[str_passage_id] = favourite_scores.get(str_passage_id, 0) + 1 + _save_favs() + new_score = favourite_scores[str_passage_id] + logging.info(f"Incremented favourite for ID {str_passage_id}. New score: {new_score}") + return new_score + +def top_favourites(n: int = MAX_FAVOURITES) -> list[dict]: + """Return N top‑scored sentences incl. doc/meta.""" + logging.debug(f"Fetching top {n} favourites.") + if not favourite_scores: + logging.debug("No favourites available.") + return [] + try: + # Sort items, convert keys to str explicitly just in case + top = sorted([(str(k), v) for k, v in favourite_scores.items()], key=lambda kv: kv[1], reverse=True)[:n] + ids = [sid for sid, _ in top] + logging.debug(f"Top {len(top)} favourite IDs: {ids}") + data = fetch_multiple_passage_data(ids) # Fetch document and metadata + logging.debug(f"Fetched data for {len(data)} favourite IDs.") + results = [] + for sid, score in top: + if sid not in data: + logging.warning(f"Could not retrieve data for favourite ID {sid}. Skipping.") + continue + entry = { + "id": sid, # The ID + "document": data[sid]["doc"], # The text + "metadata": data[sid]["meta"], # The metadata + "distance": 0.0, # Favourites don't have a semantic distance in this view + "favourite_score": score, # The favourite score + } + results.append(entry) + logging.debug(f"Prepared {len(results)} top favourite results.") + return results + except Exception as e: + logging.error(f"Error fetching top favourites: {e}", exc_info=True) + return [] + +# --- Combined Formatting Function for all result types (Standard, LLM, Favourites) --- +def format_result_display(result_data, index, total_results, result_type): + """Formats a single search, LLM, or favourite result for Accordion/Textbox display.""" + if not result_data or not isinstance(result_data, dict): + # Return empty strings for both parts on error + return "Keine Ergebnisdaten verfügbar.", "" metadata = result_data.get('metadata', {}) - doc = result_data.get('document', "N/A") - distance = result_data.get('distance', float('inf')) + + # Determine what text to display and its label + # Favourites might have 'document', Standard/LLM might have 'context_block' or 'edited_text' + display_text = result_data.get('edited_text', result_data.get('context_block', result_data.get('document', "_Text fehlt_"))) + + # Determine what ID label to use + # Prioritize original_id (LLM), then id (standard search/context/fav), then fallback + result_id = result_data.get('original_id', result_data.get('id', 'N/A')) + + # --- Construct the Accordion Heading --- + accordion_title = "" + if result_type == "llm": + accordion_title = f"Gedanke {index + 1} von {total_results}" + elif result_type == "standard": + accordion_title = f"Gedanke {index + 1} von {total_results}" + elif result_type == "favourites": + score = result_data.get('favourite_score', 0) + accordion_title = f"⭐{score}" # Title is just the star score + + # --- Construct the Accordion Content (Metadata & Scores) --- + accordion_content_md = "" + + score_info_lines = [] + # Favourite score is already in title for favs, only show for standard/LLM if present + if 'favourite_score' in result_data and result_data['favourite_score'] is not None: + if result_type != "favourites": + score_info_lines.append(f"* ⭐ Score: {result_data['favourite_score']}") + if 'final_similarity' in result_data and result_data['final_similarity'] is not None: + score_info_lines.append(f"* Score (Kontext-Gewichtet): {result_data['final_similarity']:.4f}") + + + score_info = "\n".join(score_info_lines) + "\n\n" if score_info_lines else "\n" author = metadata.get('author', 'N/A') book = metadata.get('book', 'N/A') - section = metadata.get('section', 'N/A') - - md_content = "" - md_content += f"* **Author:** {author}\n" - md_content += f"* **Book:** {book}\n" - if section not in ['Unknown', 'N/A', None]: - md_content += f"* **Section:** {section}\n" - md_content += f"* **Distance:** {distance:.4f}\n\n" - md_content += f"> {doc}\n\n" - return md_content - -# --- Helper: Format Reading Passage (Deprecated - formatting now done in format_context_markdown) --- -# def format_reading_passage(passage_data): # No longer needed as separate function -# ... - -# --- Context Formatting Helper --- -def format_context_markdown(passages): - """ - Formats a list of passage dictionaries into a seamless Markdown string - for the reading area, *without* a header. - """ - if not passages: - return "" + section = metadata.get('section', None) + titel = metadata.get('title', None) - valid_passages = [p for p in passages if p and p.get('id') is not None] - valid_passages.sort(key=lambda p: int(p.get('id', -1))) + accordion_content_md += f"* Autor: {author}\n* Buch: {book}\n" + if section and str(section).strip().lower() not in ["unknown", "n/a", ""]: + accordion_content_md += f"* Abschnitt: {section}\n" + if titel is not None and str(titel).strip().lower() not in ["unknown", "n/a", ""]: + try: accordion_content_md += f"* Titel/Nr: {int(titel)}\n" + except (ValueError, TypeError): accordion_content_md += f"* Titel/Nr: {titel}\n" - if not valid_passages: - return "" + accordion_content_md += score_info - # Combine Passage Texts - full_text = "" - for i, passage in enumerate(valid_passages): - doc = passage.get('doc', '_Passage text missing_') - role = passage.get('role', 'context') # Includes 'current_reading', 'prev', 'next' + # --- ADDED: Include LLM Rationale if available and this is an LLM result --- + # Check for both result_type and the presence of the 'rationale' key + if result_type == "llm" and 'rationale' in result_data and result_data['rationale']: + accordion_content_md += f"**LLM Begründung:**\n> {result_data['rationale']}\n\n" + # --- END ADDED --- - if role == 'missing': - continue # Skip placeholders like "Beginning/End of document" - full_text += doc + # The text content for the Textbox is just the display_text + text_content = display_text - # Add separator if not the last passage and next isn't missing - if i < len(valid_passages) - 1: - if valid_passages[i+1].get('role') != 'missing': - full_text += "\n\n" + # Return the two separate parts + return accordion_title, accordion_content_md, text_content - return full_text - -# --- Search Function (Complete) --- -def search_philosophical_texts(query, selected_authors): +# --- Contextual Re-ranking Function (V4) --- +def rerank_with_context(candidates, original_query_embedding, target_n_results, weight, decay_factor, window_size, min_chars_neighbor): """ - Performs search, stores all results in state, displays the first result. - Returns updates for multiple components and state variables. + Re-ranks candidate passages based on context similarity to the query, + normalizing initial and context scores, combining them additively, + selecting the best-scoring representative for each unique central ID, + and finally applying an author quota for diversity. """ - # Initialize updates dictionary with default states - updates = { - full_search_results_state: [], - current_result_index_state: 0, - single_result_group: gr.Group(visible=False), - result_index_indicator_md: gr.Markdown(""), - single_result_display_md: gr.Markdown(""), - previous_result_button: gr.Button(visible=False), - next_result_button: gr.Button(visible=False), - weiterlesen_button: gr.Button(visible=False), # Default to hidden - context_display: gr.Markdown(""), - displayed_context_passages: [], - load_previous_button: gr.Button(visible=False), - load_next_button: gr.Button(visible=False), - } - - # --- Pre-computation Checks --- + logging.info(f"Starting contextual re-ranking (V4: Norm+DeDup+Quota) for {len(candidates)} candidates... " + f"(Win={window_size}, Weight={weight:.2f}, Decay={decay_factor:.2f}, MinChars={min_chars_neighbor}, AuthQuota={MAX_RESULTS_PER_AUTHOR})") + if not candidates or original_query_embedding is None: + logging.warning("rerank_with_context called with no candidates or no query embedding.") + return candidates[:target_n_results] if candidates else [] + + # --- Phase 1: Calculate Initial Similarities and Find Range --- + initial_similarities = [] + processed_candidates_phase1 = [] + logging.debug("Phase 1: Calculating initial similarities...") + for i, candidate in enumerate(candidates): + initial_distance = candidate.get('distance') + if initial_distance is None or not isinstance(initial_distance, (float, int)) or initial_distance < 0: initial_similarity = 0.0 + else: initial_similarity = max(0.0, 1.0 - float(initial_distance)) # Convert distance to similarity (lower distance = higher similarity) + candidate['initial_similarity'] = initial_similarity + initial_similarities.append(initial_similarity) + processed_candidates_phase1.append(candidate) + min_initial_sim = min(initial_similarities) if initial_similarities else 0.0 + max_initial_sim = max(initial_similarities) if initial_similarities else 0.0 + logging.debug(f"Initial Similarity Range: Min={min_initial_sim:.4f}, Max={max_initial_sim:.4f}") + + # --- Phase 2: Calculate Combined Neighbor Similarities --- + passage_data_map = {str(cand['id']): {'doc': cand.get('document'), 'meta': cand.get('metadata', {})} for cand in processed_candidates_phase1} + neighbor_embeddings_cache = {} + all_neighbor_ids_to_fetch = set() + candidate_neighbor_map = defaultdict(lambda: {'prev': [], 'next': []}) + potential_neighbor_distances = {} + + # Pass 2.1: Identify neighbors + for candidate in processed_candidates_phase1: + try: + center_id_str = str(candidate['id']) + center_id_int = int(center_id_str) + potential_neighbor_distances[center_id_str] = {} + for dist in range(1, window_size + 1): + prev_id_int, next_id_int = center_id_int - dist, center_id_int + dist + if prev_id_int >= 0: + prev_id_str = str(prev_id_int); all_neighbor_ids_to_fetch.add(prev_id_str); candidate_neighbor_map[center_id_str]['prev'].append(prev_id_str); potential_neighbor_distances[center_id_str][prev_id_str] = dist + next_id_str = str(next_id_int); all_neighbor_ids_to_fetch.add(next_id_str); candidate_neighbor_map[center_id_str]['next'].append(next_id_str); potential_neighbor_distances[center_id_str][next_id_str] = dist + candidate_neighbor_map[center_id_str]['prev'].sort(key=int, reverse=True) + candidate_neighbor_map[center_id_str]['next'].sort(key=int) + except (ValueError, TypeError): + logging.warning(f"Could not parse candidate ID {candidate.get('id')} as integer for neighbor finding.") + continue + + # Pass 2.2: Fetch neighbor data (embeddings, docs, metas) + ids_needed_for_fetch = list(all_neighbor_ids_to_fetch) + if ids_needed_for_fetch: + fetched_embeddings = fetch_embeddings_for_ids(tuple(ids_needed_for_fetch)); neighbor_embeddings_cache.update(fetched_embeddings) + ids_to_fetch_docs_meta = [nid for nid in ids_needed_for_fetch if nid not in passage_data_map] + if ids_to_fetch_docs_meta: + fetched_neighbor_docs_meta = fetch_multiple_passage_data(ids_to_fetch_docs_meta); passage_data_map.update(fetched_neighbor_docs_meta) + + + # Pass 2.3: Calculate combined similarity per candidate and construct context block + combined_neighbor_similarities = [] + scored_candidates = [] + logging.debug("Phase 2: Calculating combined neighbor similarities and constructing context blocks...") + for candidate in processed_candidates_phase1: + try: + center_id_str = str(candidate['id']) + center_meta = candidate.get('metadata', {}) + total_weighted_similarity = 0.0 + total_weight = 0.0 + candidate_neighbors_dist = potential_neighbor_distances.get(center_id_str, {}) + + # Calculate weighted neighbor similarity + for neighbor_id_str, dist_level in candidate_neighbors_dist.items(): + neighbor_emb = neighbor_embeddings_cache.get(neighbor_id_str) + neighbor_data = passage_data_map.get(neighbor_id_str) + if neighbor_emb is not None and neighbor_data: + neighbor_meta = neighbor_data.get('meta') + neighbor_doc = neighbor_data.get('doc') + if (neighbor_meta is not None and compare_passage_metadata(center_meta, neighbor_meta) + and neighbor_doc and isinstance(neighbor_doc, str) and len(neighbor_doc) >= min_chars_neighbor): + neighbor_sim_to_query = cosine_similarity_np(original_query_embedding, neighbor_emb) + current_decay = max(0.0, 1.0 - ((dist_level - 1) * decay_factor)) + current_weight = current_decay # Weight by decayed distance + total_weighted_similarity += neighbor_sim_to_query * current_weight + total_weight += current_weight + + combined_sim = total_weighted_similarity / total_weight if total_weight > 0 else 0.0 + candidate['combined_neighbor_similarity'] = combined_sim + combined_neighbor_similarities.append(combined_sim) + + # Construct context block for this candidate using ALL neighbors (even short ones) + context_block_text = _construct_passage_block(center_id_str, passage_data_map, candidate_neighbor_map) + candidate['context_block'] = context_block_text + + scored_candidates.append(candidate) + except Exception as e: + logging.error(f"Error processing candidate ID {candidate.get('id')} during neighbor scoring/context block: {e}", exc_info=True) + candidate['combined_neighbor_similarity'] = 0.0 + combined_neighbor_similarities.append(0.0) + candidate['context_block'] = "_Fehler bei Kontext-Erstellung_" + scored_candidates.append(candidate) + + + # --- Phase 3: Find Context Score Range --- + min_combined_sim = min(combined_neighbor_similarities) if combined_neighbor_similarities else 0.0 + max_combined_sim = max(combined_neighbor_similarities) if combined_neighbor_similarities else 0.0 + logging.debug(f"Combined Neighbor Similarity Range: Min={min_combined_sim:.4f}, Max={max_combined_sim:.4f}") + + + # --- Phase 4: Normalize and Combine Scores --- + logging.debug("Phase 4: Normalizing and combining scores...") + initial_range = max_initial_sim - min_initial_sim + combined_range = max_combined_sim - min_combined_sim + for candidate in scored_candidates: + try: + initial_sim = candidate.get('initial_similarity', 0.0) + combined_sim = candidate.get('combined_neighbor_similarity', 0.0) + + initial_norm = 0.5 # Default to 0.5 if range is zero + if initial_range > 1e-9: + initial_norm = max(0.0, min(1.0, (initial_sim - min_initial_sim) / initial_range)) + + combined_norm = 0.5 # Default to 0.5 if range is zero + if combined_range > 1e-9: + combined_norm = max(0.0, min(1.0, (combined_sim - min_combined_sim) / combined_range)) + + # Additive combination based on weight + final_similarity = (1.0 - weight) * initial_norm + weight * combined_norm + candidate['final_similarity'] = final_similarity + # logging.debug(f"Candidate ID {candidate.get('id')}: Initial Norm={initial_norm:.4f}, Combined Norm={combined_norm:.4f}, Final Score={final_similarity:.4f}") + + except Exception as e: + logging.error(f"Error calculating final similarity for candidate ID {candidate.get('id')}: {e}", exc_info=True) + candidate['final_similarity'] = -1.0 # Penalize on error + + + # --- Phase 5: Group by ID and Select Best Representative --- + logging.debug("Phase 5: Grouping by ID and selecting best representative...") + best_candidate_by_id = {} + for candidate in scored_candidates: + center_id = candidate.get('id') + current_score = candidate.get('final_similarity', -1.0) + if not center_id: + logging.warning(f"Skipping candidate with missing ID: {candidate}") + continue + existing_candidate = best_candidate_by_id.get(center_id) + # Keep the candidate with the highest final_similarity for each unique ID + if not existing_candidate or current_score > existing_candidate.get('final_similarity', -1.0): + best_candidate_by_id[center_id] = candidate + + unique_best_candidates = list(best_candidate_by_id.values()) + logging.info(f"Reduced {len(scored_candidates)} candidates to {len(unique_best_candidates)} unique ID representatives.") + + + # --- Phase 6: Sort Unique Representatives --- + unique_best_candidates.sort(key=lambda x: x.get('final_similarity', -1.0), reverse=True) + logging.debug(f"Sorted {len(unique_best_candidates)} unique representatives by score.") + + + # --- Phase 7: Apply Author Quota --- + logging.debug(f"Phase 7: Applying author quota (max {MAX_RESULTS_PER_AUTHOR} per author)...") + author_counts = defaultdict(int) + final_diverse_results = [] + authors_seen_in_final = set() + + for candidate in unique_best_candidates: + # Stop if we already have enough results + if len(final_diverse_results) >= target_n_results: + logging.debug(f"Reached target result count {target_n_results}. Stopping quota application.") + break + + meta = candidate.get('metadata', {}) + # Use author 'Unknown' if metadata or author key is missing + author = meta.get('author', 'Unknown') + + if author_counts[author] < MAX_RESULTS_PER_AUTHOR: + final_diverse_results.append(candidate) + author_counts[author] += 1 + authors_seen_in_final.add(author) + # logging.debug(f"Added candidate ID {candidate.get('id')} from author '{author}'. Count: {author_counts[author]}") + # else: + # logging.debug(f"Skipping candidate ID {candidate.get('id')} from author '{author}' due to quota ({author_counts[author]}).") + + + logging.info(f"Quota applied. Selected {len(final_diverse_results)} results from {len(authors_seen_in_final)} unique authors.") + + # Return the quota-filtered list + return final_diverse_results # No need to slice again, loop breaks at target_n_results + +# --- Modified Format Context for Reading Area (Revision 6 - HTML Output) --- +def format_context_markdown(passages_state_list, query_embedding): + """Formats a list of paragraph sentences for HTML display with dynamic highlighting. + Uses class/data-id for JS event listeners.""" + logging.info(f"Formatting context HTML for {len(passages_state_list)} passages.") + + # --- Validate Query Embedding (same) --- + is_query_embedding_valid = False + query_embedding_np = None + if isinstance(query_embedding, (list, np.ndarray)): + try: + query_embedding_np = np.array(query_embedding, dtype=np.float32) + if query_embedding_np.ndim == 1 and query_embedding_np.size > 0: + is_query_embedding_valid = True + logging.debug(f"Query embedding is valid (Shape: {query_embedding_np.shape}). Highlighting enabled.") + else: logging.warning("Query embedding received but is empty or has wrong dimensions. Highlighting disabled.") + except Exception as e: + logging.error(f"Error converting or checking query embedding: {e}. Highlighting disabled.") + else: logging.warning(f"Query embedding is type {type(query_embedding)}. Highlighting disabled.") + + if not passages_state_list: + return "
_Kein Kontext zum Anzeigen._
" # Return valid HTML + + + # --- Step 1: Calculate all similarities and find relevant range (same) --- + sentence_similarities = {} + scores_above_threshold = [] + if is_query_embedding_valid: + logging.debug("Calculating similarities for dynamic highlighting...") + for i, sentence_data in enumerate(passages_state_list): + sentence_embedding = sentence_data.get('embedding') + sentence_id = sentence_data.get('id', f'index_{i}') # Use index if ID missing + sentence_role = sentence_data.get('role', 'context') + + # Skip markers or sentences without embeddings + if sentence_role == 'missing' or sentence_embedding is None: + continue + + try: + similarity_score = cosine_similarity_np(query_embedding_np, sentence_embedding) + sentence_similarities[i] = similarity_score # Store score by index + if similarity_score >= HIGHLIGHT_SIMILARITY_THRESHOLD: + scores_above_threshold.append(similarity_score) + except Exception as e: + logging.warning(f"Error calculating similarity for sentence ID {sentence_id} (Index {i}): {e}") + + max_relevant_score = -1.0 + min_relevant_score = HIGHLIGHT_SIMILARITY_THRESHOLD + if scores_above_threshold: + max_relevant_score = max(scores_above_threshold) + logging.debug(f"Dynamic Highlighting: Min Relevant Score (Threshold) = {min_relevant_score:.4f}, Max Relevant Score = {max_relevant_score:.4f}") + else: + logging.debug("Dynamic Highlighting: No sentences met the similarity threshold.") + + # --- Step 2: Format output as HTML --- + # Ensure passages are sorted correctly + passages_state_list.sort(key=lambda x: (x.get('paragraph_index', -1), x.get('sentence_sort_key', float('inf')))) + + output_parts = [] + current_paragraph_index = None + previous_section = "__INITIAL_NONE__" + previous_title = "__INITIAL_NONE__" + is_first_paragraph_overall = True + PLACEHOLDERS_TO_IGNORE = {"unknown", "n/a", "", None} + is_paragraph_open = False # Track if we need to close a

tag + + for i, sentence_data in enumerate(passages_state_list): + sentence_doc = sentence_data.get('doc', '_Text fehlt_') + sentence_meta = sentence_data.get('meta', {}) + sentence_para_idx = sentence_data.get('paragraph_index') + sentence_role = sentence_data.get('role', 'context') + sentence_id = sentence_data.get('id', f'index_{i}') + + + # --- Handle boundary markers (as HTML) --- + if sentence_role == 'missing': + if is_paragraph_open: + output_parts.append("

\n") # Close previous paragraph + is_paragraph_open = False + output_parts.append(f"

{html.escape(sentence_doc)}

\n") # Use for italics + current_paragraph_index = None + is_first_paragraph_overall = True + # No need for extra newlines between markers in HTML,

handles blocks + # if i < len(passages_state_list) - 1: output_parts.append("

") # Optional: explicit vertical space + continue + + # --- Check for Paragraph Start and Handle Headings/Separators (as HTML) --- + is_new_paragraph = (sentence_para_idx is not None and sentence_para_idx != current_paragraph_index) + if is_new_paragraph: + if is_paragraph_open: + output_parts.append("

\n") # Close previous paragraph + is_paragraph_open = False + + current_section = sentence_meta.get('section') + current_title = sentence_meta.get('title') + + norm_prev_section = None if str(previous_section).strip().lower() in PLACEHOLDERS_TO_IGNORE else previous_section + norm_prev_title = None if str(previous_title).strip().lower() in PLACEHOLDERS_TO_IGNORE else previous_title + norm_curr_section = None if str(current_section).strip().lower() in PLACEHOLDERS_TO_IGNORE else current_section + norm_curr_title = None if str(current_title).strip().lower() in PLACEHOLDERS_TO_IGNORE else current_title + + section_changed = (norm_curr_section != norm_prev_section) + title_changed = (norm_curr_title != norm_prev_title) + + # --- REMOVED/COMMENTED OUT: This is where the
was added --- + # if not is_first_paragraph_overall: + # if section_changed or title_changed: + # output_parts.append("
\n") # Use
for separator + # --- END REMOVED/COMMENTED OUT --- + + + heading_parts_to_add = [] + if section_changed and norm_curr_section is not None: + heading_parts_to_add.append(f"

{html.escape(str(norm_curr_section))}

\n") # Use

+ if title_changed and norm_curr_title is not None: + title_str = str(norm_curr_title).strip() + title_display = html.escape(title_str) + try: title_display = html.escape(str(int(title_str))) # Attempt int cast if relevant + except (ValueError, TypeError): pass # Keep string if not int + heading_parts_to_add.append(f"

{title_display}

\n") # Use

+ + + if heading_parts_to_add: + output_parts.extend(heading_parts_to_add) + + output_parts.append("

") # Open new paragraph tag + is_paragraph_open = True + + previous_section = current_section + previous_title = current_title + current_paragraph_index = sentence_para_idx + is_first_paragraph_overall = False + elif not is_paragraph_open: + # Handle case where first item is not a paragraph start marker + output_parts.append("

") + is_paragraph_open = True + + + # --- Sentence Formatting and DYNAMIC Highlighting (as HTML Spans) --- + # Build attributes for the SINGLE span element + span_classes = ["clickable-sentence"] + # Use inline style for cursor:pointer for simplicity, although CSS is also fine + # style_parts = ["cursor:pointer;"] # <-- Moved cursor to CSS + style_parts = [] + + safe_doc = html.escape(sentence_doc) + + + current_score = sentence_similarities.get(i) + + # Determine if highlighting should be applied + apply_highlight = is_query_embedding_valid and current_score is not None and current_score >= min_relevant_score + alpha = 0.0 + if apply_highlight: + try: + if max_relevant_score > min_relevant_score: + normalized_score = (current_score - min_relevant_score) / (max_relevant_score - min_relevant_score) + alpha = HIGHLIGHT_MIN_ALPHA + normalized_score * (HIGHLIGHT_MAX_ALPHA - HIGHLIGHT_MIN_ALPHA) + alpha = max(HIGHLIGHT_MIN_ALPHA, min(alpha, HIGHLIGHT_MAX_ALPHA)) + elif max_relevant_score == min_relevant_score: + alpha = HIGHLIGHT_MIN_ALPHA + + except Exception as e: + logging.warning(f"Error calculating dynamic highlighting alpha for sentence ID {sentence_id}: {e}") + alpha = 0.0 # Disable highlighting on error + + # Apply highlighting by adding the class and style properties (including the CSS variable) + if alpha > 0: + span_classes.append("highlighted") + # Add dynamic styles (padding, border-radius, box-decoration-break) to style_parts + style_parts.append("padding: 1px 3px;") + style_parts.append("border-radius: 3px;") + style_parts.append("box-decoration-break: clone;") + style_parts.append("-webkit-box-decoration-break: clone;") + # Set the CSS variable for the alpha + style_parts.append(f"--highlight-alpha: {alpha:.2f};") + # DO NOT set background-color here - it's set in CSS using the variable + + + # Join the classes and styles + class_str = " ".join(span_classes) + style_str = " ".join(style_parts) + + # Construct the single span element + # ADDED cursor: pointer to CSS, removed from inline style below + formatted_sentence = ( + f'' + f"{safe_doc}" + ) + + + # --- Append Formatted Sentence with Spacing (handle HTML spaces) --- + # Add a space before if not the first sentence in the paragraph + if not is_new_paragraph and is_paragraph_open and i > 0 and passages_state_list[i-1].get('role') != 'missing' and sentence_role != 'missing': + # Find the previous non-missing sentence to check if it was the end of a paragraph block + prev_valid_sentence_index = i - 1 + while prev_valid_sentence_index >= 0 and passages_state_list[prev_valid_sentence_index].get('role') == 'missing': + prev_valid_sentence_index -= 1 + + # Add a space unless the previous element was a heading, hr, or paragraph open tag + # This check is implicitly handled by the is_new_paragraph logic and checking if is_paragraph_open. + # If it's not a new paragraph and the paragraph is open, we generally want a space. + if prev_valid_sentence_index >= 0 and passages_state_list[prev_valid_sentence_index].get('paragraph_index') == sentence_para_idx: + output_parts.append(" ") + # No space needed if it's the very first item in a paragraph after a break/heading + + + output_parts.append(formatted_sentence) + + + # Close the last paragraph tag if it was opened + if is_paragraph_open: + output_parts.append("

\n") + + + # Wrap everything in a main div for robustness + return "
\n" + "".join(output_parts) + "
" + +# --- Internal Search Helper --- +def _perform_single_query_search(query, where_filter, n_results): + """Performs a single vector query against ChromaDB, returning processed results.""" + logging.info(f"Performing single query search for: '{query[:50]}...' (n_results={n_results}, filter={where_filter})") if collection is None: - logging.error("Search attempted but ChromaDB collection is not available.") - updates[single_result_display_md] = gr.Markdown("Error: Database connection failed.") - updates[single_result_group] = gr.Group(visible=True) # Show group to display error - return updates - + logging.error("ChromaDB collection is not available for query.") + raise ConnectionError("DB not available.") if not query: - logging.warning("Empty query received.") - updates[single_result_display_md] = gr.Markdown("Please enter a query.") - updates[single_result_group] = gr.Group(visible=True) # Show group to display message - return updates + logging.error("Cannot perform search with an empty query.") + return [] # Return empty list for empty query - logging.info(f"Received query: '{query[:50]}...'") - logging.info(f"Selected Authors for filtering: {selected_authors}") - - # --- Embedding --- + # Get query embedding (handles errors internally) query_embedding = get_embedding(query, task="RETRIEVAL_QUERY") - if query_embedding is None: - logging.error("Failed to generate query embedding.") - updates[single_result_display_md] = gr.Markdown("Error: Failed to generate query embedding.") - updates[single_result_group] = gr.Group(visible=True) - return updates + logging.debug(f"Inside _perform_single_query_search: Generated query embedding. Type: {type(query_embedding)}, Is None: {query_embedding is None}") + if isinstance(query_embedding, list): logging.debug(f" Embedding length: {len(query_embedding)}") - # --- Filtering --- - where_filter = None - if selected_authors: - where_filter = {"author": {"$in": selected_authors}} - logging.info(f"Applying WHERE filter: {where_filter}") + if query_embedding is None: + # Embedding failed, cannot proceed with query + raise ValueError(f"Embedding generation failed for query: '{query[:50]}...'") - # --- Query Execution and Result Processing --- try: - logging.info(f"Querying collection '{COLLECTION_NAME}' for top {MAX_RESULTS} results.") - - # --->>> ACTUAL QUERY CALL <<<--- results = collection.query( query_embeddings=[query_embedding], - n_results=MAX_RESULTS, - where=where_filter, - include=['documents', 'metadatas', 'distances'] # IDs are included by default + n_results=n_results, + where=where_filter, # Apply filter if provided + include=['documents', 'metadatas', 'distances'] # Fetch necessary fields ) - # --->>> END QUERY CALL <<<--- - - # Process results if found - all_results_data = [] - if results and results.get('ids') and results['ids'][0]: - num_found = len(results['ids'][0]) - logging.info(f"Query successful. Found {num_found} results.") + processed_results = [] + # Results structure: {'ids': [[]], 'documents': [[]], ...} + # Check if results and the first list within 'ids' exist and are not empty + if results and results.get('ids') and results['ids'] and results['ids'][0]: + # Extract the lists for the single query ids_list = results['ids'][0] - docs_list = results['documents'][0] - metadatas_list = results['metadatas'][0] - distances_list = results['distances'][0] - - # --->>> ACTUAL RESULT PROCESSING LOOP <<<--- - for i in range(num_found): - # Validate ID conversion (just in case) - try: - _ = int(ids_list[i]) # Check if convertible - except ValueError: - logging.warning(f"Skipping result with non-integer ID: {ids_list[i]}") - continue - - all_results_data.append({ - "id": ids_list[i], - "document": docs_list[i], - "metadata": metadatas_list[i], - "distance": distances_list[i] + docs_list = results.get('documents', [[]])[0] or [] # Use default empty list + metadatas_list = results.get('metadatas', [[]])[0] or [] + distances_list = results.get('distances', [[]])[0] or [] + + num_found = len(ids_list) + # Robustness check on list lengths + if not (num_found == len(docs_list) == len(metadatas_list) == len(distances_list)): + logging.warning(f"ChromaDB result length mismatch: {num_found} IDs, {len(docs_list)} docs, {len(metadatas_list)} metas, {len(distances_list)} dists. Processing cautiously.") + num_found = min(num_found, len(docs_list), len(metadatas_list), len(distances_list)) + ids_list = ids_list[:num_found] # Truncate lists to match + + + logging.info(f"ChromaDB query returned {len(ids_list)} results.") + + for i, res_id in enumerate(ids_list): + # Check bounds just in case, though clamping should prevent IndexError + if i >= num_found: break + doc = docs_list[i] if docs_list[i] is not None else "_Text fehlt_" + meta = metadatas_list[i] if metadatas_list[i] is not None else {} + dist = distances_list[i] if distances_list[i] is not None else float('inf') + + # Basic validation + if res_id is None: logging.warning(f"Skipping result with None ID at index {i}"); continue + res_id_str = str(res_id) # Ensure ID is string + if doc == "_Text fehlt_": logging.warning(f"Missing document for ID {res_id_str} at index {i}") + if dist == float('inf'): logging.warning(f"Missing distance for ID {res_id_str} at index {i}") + + processed_results.append({ + "id": res_id_str, # Store ID as string + "document": doc, + "metadata": meta, + "distance": dist }) - # --->>> END RESULT PROCESSING LOOP <<<--- - - if all_results_data: - # Results found and processed successfully - updates[full_search_results_state] = all_results_data - updates[current_result_index_state] = 0 - first_result_md = format_single_result(all_results_data[0], 0, len(all_results_data)) - updates[single_result_display_md] = gr.Markdown(first_result_md) - updates[single_result_group] = gr.Group(visible=True) # Show group - updates[result_index_indicator_md] = gr.Markdown(f"Result **1** of **{len(all_results_data)}**") - updates[previous_result_button] = gr.Button(visible=True, interactive=False) - updates[next_result_button] = gr.Button(visible=True, interactive=(len(all_results_data) > 1)) - updates[weiterlesen_button] = gr.Button(visible=True) # Show this button + else: + logging.info(f"Query '{query[:50]}...' returned no results from ChromaDB.") + return processed_results + + except Exception as e: + logging.error(f"Error during ChromaDB query for '{query[:50]}...': {e}", exc_info=True) + if "dimension" in str(e).lower(): + logging.error("Query failed possibly due to embedding dimension mismatch.") + raise ValueError(f"Dimension mismatch error for query '{query[:50]}...'") + raise RuntimeError(f"DB search error for query '{query[:50]}...': {type(e).__name__}") + +# --- Helper Function: Construct Passage Block --- +def _construct_passage_block(center_id_str, passage_data_map, candidate_neighbor_map): + """Constructs a continuous text block including neighbors for a given center passage.""" + center_data = passage_data_map.get(center_id_str) + if not center_data: + logging.warning(f"_construct_passage_block: Missing data for center ID {center_id_str}.") + return "_Zentrumstext fehlt_" + + center_meta = center_data.get('meta', {}) + center_text = center_data.get('doc') + if not center_text or center_text == "_Text fehlt_": + logging.warning(f"_construct_passage_block: Missing document text for center ID {center_id_str}.") + return "_Zentrumstext fehlt_" + + block_text_parts = [] + neighbors = candidate_neighbor_map.get(center_id_str, {'prev': [], 'next': []}) + + # Add previous neighbors (if metadata matches) - Iterate in original order (closest first) and insert at beginning + # Note: Sorting neighbors.get('prev', []) by int() ensures chronological order + for prev_id in sorted(neighbors.get('prev', []), key=int): + prev_data = passage_data_map.get(prev_id) + if prev_data and compare_passage_metadata(center_meta, prev_data.get('meta', {})): + prev_text = prev_data.get('doc') + if prev_text and prev_text != "_Text fehlt_": + block_text_parts.append(prev_text) # Add to the end temporarily + + # Add the center text + block_text_parts.append(center_text) + + # Add next neighbors (if metadata matches) - Iterate in original order (closest first) and append + for next_id in sorted(neighbors.get('next', []), key=int): + next_data = passage_data_map.get(next_id) + if next_data and compare_passage_metadata(center_meta, next_data.get('meta', {})): + next_text = next_data.get('doc') + if next_text and next_text != "_Text fehlt_": + block_text_parts.append(next_text) # Add to the end + + # Join the parts into a single string for the block + continuous_block_text = " ".join(block_text_parts) + + if not continuous_block_text.strip(): + logging.warning(f"_construct_passage_block: Constructed empty passage block for center ID {center_id_str}.") + return "_Leerer Kontextblock_" + + return continuous_block_text + +# --- Modified Core Search Logic (Standard Mode) --- +def perform_search_standard(query, selected_authors, window_size, weight, decay, n_results=MAX_RESULTS_STANDARD): + """Performs standard search: Embed -> Query -> Re-rank -> Return results & embedding.""" + logging.info(f"--- Starting Standard Search --- Query: '{query[:50]}...' | Authors: {selected_authors} | Target Results: {n_results} | Window={window_size}, Weight={weight:.2f}, Decay={decay:.2f}") + original_query_embedding = None + + try: + # Phase 1: Get Query Embedding + original_query_embedding = get_embedding(query, task="RETRIEVAL_QUERY") + if original_query_embedding is None: + raise ValueError("Failed to generate query embedding for standard search.") + + # Phase 2: Build Filter + where_filter = None + if selected_authors: + authors_filter_list = selected_authors if isinstance(selected_authors, list) else [selected_authors] + authors_filter_list = [a for a in authors_filter_list if a and isinstance(a, str)] + if authors_filter_list: + where_filter = {"author": {"$in": authors_filter_list}} + logging.info(f"Applying author filter: {where_filter}") else: - # Query returned results, but none were valid after processing - logging.info("No valid results found after filtering/validation.") - updates[single_result_display_md] = gr.Markdown("No results found matching your query and filters.") - updates[single_result_group] = gr.Group(visible=True) # Show message - updates[weiterlesen_button] = gr.Button(visible=False) # Hide button + logging.warning("Empty or invalid author filter list provided, searching all authors.") + + # Phase 3: Initial Search + logging.info(f"Fetching initial {INITIAL_RESULTS_FOR_RERANK} candidates from DB.") + initial_candidates = _perform_single_query_search(query, where_filter, INITIAL_RESULTS_FOR_RERANK) + + if not initial_candidates: + logging.info("Standard Search: No initial results found from DB.") + return [], original_query_embedding + + logging.info(f"Found {len(initial_candidates)} initial candidates. Proceeding to 1st pass re-ranking.") + + # Phase 4: Contextual Re-ranking (1st Pass) + reranked_results = rerank_with_context( + initial_candidates, + original_query_embedding, + n_results, # Target number of final results + weight, # Use argument + decay, # Use argument + window_size, # Use argument + MIN_CHARS_FOR_RELEVANT_NEIGHBOR # Pass constant + ) + logging.info(f"Standard Search: Re-ranked {len(initial_candidates)} -> Found {len(reranked_results)} final results.") - else: - # Query returned no results - logging.info("No results found for the query (or matching the filter).") - updates[single_result_display_md] = gr.Markdown("No results found matching your query and filters.") - updates[single_result_group] = gr.Group(visible=True) # Show message - updates[weiterlesen_button] = gr.Button(visible=False) # Hide button + return reranked_results, original_query_embedding - return updates + except (ConnectionError, ValueError, RuntimeError) as e: + logging.error(f"Standard Search failed: {e}", exc_info=False) + return [], original_query_embedding + except Exception as e: + logging.error(f"Standard Search encountered an unexpected error: {e}", exc_info=True) + return [], original_query_embedding + +# --- Search Function (Standard Mode UI Wrapper) --- +def search_standard_mode_ui(search_results, query_embedding): + """Prepares Gradio UI updates for the Standard Search results.""" + logging.info("Preparing UI updates for Standard Search results.") + updates = create_reset_updates() # Start with a clean reset state dictionary + + # Store the received embedding (if valid) in the state used for context highlighting + if query_embedding is not None: + updates[direct_embedding_output_holder] = query_embedding + logging.debug("Stored valid query embedding in direct_embedding_output_holder for standard mode.") + else: + updates[direct_embedding_output_holder] = None + logging.warning("Query embedding was None, stored None in direct_embedding_output_holder for standard mode.") + + + if not search_results: + logging.info("No standard search results found to display.") + # MODIFIED: Update new components + updates[result_accordion] = gr.update(visible=True, label="Keine Resultate gefunden.", open=False) + updates[result_metadata_display] = gr.update(value="") + updates[result_text] = gr.update(value="", visible=True) + updates[single_result_group] = gr.update(visible=True) + # Ensure states are also reset/empty + updates[full_search_results_state] = [] + updates[current_result_index_state] = 0 + updates[active_view_state] = "standard" # Still set view state even if empty + return updates # Return the dictionary of updates + + # Populate state and update UI elements if results were found + logging.info(f"Displaying first of {len(search_results)} standard results.") + updates[full_search_results_state] = search_results + updates[current_result_index_state] = 0 # Start at the first result + updates[active_view_state] = "standard" # Set active view state + + # Format the first result for immediate display using the combined formatter + # MODIFIED: Call format_result_display and get two parts + accordion_title, accordion_content_md, text_content = format_result_display(search_results[0], 0, len(search_results), "standard") + + # MODIFIED: Update new components + updates[result_accordion] = gr.update(visible=True, label=accordion_title, open=False) + updates[result_metadata_display] = gr.update(value=accordion_content_md) + updates[result_text] = gr.update(value=text_content, visible=True) + + # Make shared result group and navigation visible + updates[single_result_group] = gr.update(visible=True) + updates[standard_nav_row] = gr.update(visible=True) + + # Configure navigation buttons for Standard results + updates[previous_result_button] = gr.update(visible=True, interactive=False) # Can't go back from first result + updates[next_result_button] = gr.update(visible=True, interactive=(len(search_results) > 1)) # Enable if more than one result + updates[weiterlesen_button] = gr.update(visible=True, interactive=True, value="weiterlesen") # Enable context button, ensure value + + return updates # Return the dictionary of updates + + +# --- Modified Core Search Logic (LLM Mode) --- +def perform_search_llm(query, selected_authors, window_size, weight, decay): + """Performs LLM Re-Rank Search: Embed -> Query -> Re-rank -> Prep -> LLM -> Parse -> Return results & embedding.""" + logging.info(f"--- Starting LLM Re-Rank Search --- Query: '{query[:50]}...' | Authors: {selected_authors} | Window={window_size}, Weight={weight:.2f}, Decay={decay:.2f}") + original_query_embedding = None + + # --- Phase 0: Get Query Embedding --- + try: + original_query_embedding = get_embedding(query, task="RETRIEVAL_QUERY") + if original_query_embedding is None: + raise ValueError("Embedding failed for LLM search.") + logging.info("Query embedding generated successfully for LLM search.") + except Exception as embed_e: + logging.error(f"LLM Re-Rank: Embedding error: {embed_e}", exc_info=True) + return None, original_query_embedding # Return None for results to indicate failure + + + # --- Phase 1: Initial Search, Filter & First-Pass Re-ranking --- + try: + logging.info(f"LLM ReRank Mode: Initial search for query: '{query[:50]}...'") + # Build Filter + where_filter = None + if selected_authors: + authors_filter_list = selected_authors if isinstance(selected_authors, list) else [selected_authors] + authors_filter_list = [a for a in authors_filter_list if a and isinstance(a, str)] + if authors_filter_list: + where_filter = {"author": {"$in": authors_filter_list}} + logging.info(f"LLM ReRank: Applying WHERE filter: {where_filter}") + else: logging.warning("Empty or invalid author filter list for LLM rerank.") + + # Initial DB Search + initial_candidates = _perform_single_query_search(query, where_filter, INITIAL_RESULTS_FOR_RERANK) + if not initial_candidates: + logging.info("LLM ReRank Mode: No initial results found from DB.") + return [], original_query_embedding + + logging.info(f"Found {len(initial_candidates)} initial candidates. Performing 1st pass re-ranking...") + # First-Pass Re-ranking (Pass new arguments) + first_pass_reranked = rerank_with_context( + initial_candidates, + original_query_embedding, + LLM_RERANK_CANDIDATE_COUNT, # Target N for LLM input pool + weight, # Use argument + decay, # Use argument + window_size, # Use argument + MIN_CHARS_FOR_RELEVANT_NEIGHBOR # Pass constant + ) + # Select the top candidates to send to the LLM + candidates_for_llm = first_pass_reranked[:LLM_RERANK_CANDIDATE_COUNT] + + if not candidates_for_llm: + logging.info("LLM ReRank Mode: No candidates left after first-pass re-ranking.") + return [], original_query_embedding - # --->>> ACTUAL EXCEPTION HANDLING <<<--- + logging.info(f"Selected top {len(candidates_for_llm)} candidates after 1st pass for LLM.") + + except (ConnectionError, ValueError, RuntimeError) as search_filter_e: + logging.error(f"LLM Re-Rank: Initial Search/Filter/Re-rank error: {search_filter_e}", exc_info=True) + return None, original_query_embedding # Return None for results to indicate failure except Exception as e: - logging.error(f"Error querying ChromaDB or processing results: {e}", exc_info=True) + logging.error(f"LLM Re-Rank: Unexpected error in Phase 1 (Search/Filter/Re-rank): {e}", exc_info=True) + return None, original_query_embedding # Return None for results to indicate failure - # Define error_msg based on the exception - if "dimension" in str(e).lower(): - error_msg = "**Error:** Database search failed due to embedding mismatch. Please check configuration." + + # --- Phase 2: Prepare Passage Blocks for LLM Prompt --- + try: + logging.info("Preparing passage blocks for LLM prompt using pre-constructed blocks...") + passage_separator = "\n\n--- PASSAGE SEPARATOR ---\n\n" + prompt_passage_blocks_list = [] + + for cand_data in candidates_for_llm: + center_id_str = cand_data.get('id') + context_block = cand_data.get('context_block') + + if not center_id_str or not context_block or context_block in ["_Kontextblock fehlt_", "_Fehler bei Kontext-Erstellung_"]: + logging.warning(f"Skipping candidate {center_id_str} for LLM prompt due to missing ID or invalid context block.") + continue + + prompt_block = f"Passage ID: {center_id_str}\nPassage Text:\n{context_block}" + prompt_passage_blocks_list.append(prompt_block) + + if not prompt_passage_blocks_list: + logging.warning("No valid context blocks could be prepared for the LLM prompt.") + return [], original_query_embedding + + passage_blocks_str_for_prompt = passage_separator.join(prompt_passage_blocks_list) + logging.info(f"Prepared {len(prompt_passage_blocks_list)} passage blocks for the LLM.") + + except Exception as e: + logging.error(f"LLM Re-Rank: Error during passage block preparation (Phase 2): {e}", exc_info=True) + return None, original_query_embedding # Return None for results to indicate failure + + + # --- Phase 3: Call LLM for Re-ranking and Truncation --- + if not llm_rerank_model: + logging.error("LLM Re-rank model is not available/initialized.") + return None, original_query_embedding # Return None for results to indicate failure + + try: + # Format the final prompt using the template + rerank_prompt = LLM_RERANKING_PROMPT_TEMPLATE_V3.format( + user_query=query, + passage_blocks_str=passage_blocks_str_for_prompt, # Use constructed string + target_count=LLM_RERANK_TARGET_COUNT # Use constant + ) + + logging.debug(f"LLM Rank/Truncate Prompt (first 500 chars):\n{rerank_prompt[:500]}...") + # Save the full prompt to a file for debugging + try: + timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S_%f") + filename = os.path.join(PROMPT_LOG_DIR, f"{timestamp}_llm_rank_truncate_prompt.txt") + with open(filename, 'w', encoding='utf-8') as f: + f.write(f"--- User Query ---\n{query}\n\n--- Prompt Sent to LLM ({LLM_RERANK_MODEL_NAME}) ---\n{rerank_prompt}") + logging.info(f"LLM Rank/Truncate prompt saved to: {filename}") + except IOError as log_e: + logging.error(f"Error saving LLM Rank/Truncate prompt: {log_e}", exc_info=False) + + # Make the API call to Gemini + logging.info(f"Sending Rank/Truncate request to LLM ({LLM_RERANK_MODEL_NAME})...") + generation_config = genai.types.GenerationConfig(temperature=0.2) + response = llm_rerank_model.generate_content( + rerank_prompt, + generation_config=generation_config + ) + logging.info("LLM Rank/Truncate response received.") + + # --- Phase 4: Parse LLM Response and Fetch Metadata --- + logging.info("Processing LLM response...") + + # ---> START OF ROBUST RESPONSE HANDLING <--- + response_text = None + finish_reason_name = 'UNKNOWN' + + try: + if hasattr(response, 'prompt_feedback') and getattr(response.prompt_feedback, 'block_reason', None): + block_reason = response.prompt_feedback.block_reason + finish_reason_name = f"PROMPT_BLOCKED_{block_reason}" + logging.error(f"LLM Rank/Truncate prompt was blocked! Reason: {block_reason}") + return [], original_query_embedding # Return empty + + elif response.candidates: + first_candidate = response.candidates[0] + reason_enum = getattr(first_candidate, 'finish_reason', None) + finish_reason_name = getattr(reason_enum, 'name', str(reason_enum)) + + VALID_FINISH_REASONS = {"STOP", "MAX_TOKENS"} + if finish_reason_name in VALID_FINISH_REASONS: + if first_candidate.content and first_candidate.content.parts: + response_text = first_candidate.content.parts[0].text + logging.debug("Successfully extracted text from the first candidate.") + else: + logging.warning("LLM candidate finished validly buthad no text content part.") + response_text = None + else: + logging.warning(f"LLM Rank/Truncate candidate finished with reason: {finish_reason_name}. No text content expected or extracted.") + + else: + logging.error("LLM response had no candidates.") + + if response_text is None: + logging.error(f"LLM Rank/Truncate returned no usable text content. Final Finish Reason Check: {finish_reason_name}") + # Log response details if available for debugging + logging.debug(f"Full LLM response object structure: {response}") + return [], original_query_embedding # Return empty + + except Exception as resp_check_e: + logging.error(f"Error checking LLM response structure/finish_reason: {resp_check_e}", exc_info=True) + logging.debug(f"Full LLM response object structure during check error: {response}") + return [], original_query_embedding # Return empty on error checking response + # ---> END OF ROBUST RESPONSE HANDLING <--- + + llm_response_text = response_text + logging.debug(f"LLM Raw Response Text (used for parsing):\n{llm_response_text}") + + # --- Start JSON Parsing --- + json_string = None + parsed_llm_results = [] + + try: + # Attempt to find JSON inside a ```json ``` block first (preferred format) + json_match = re.search(r"```json\s*({.*?})\s*```", llm_response_text, re.DOTALL | re.IGNORECASE) + if json_match: + json_string = json_match.group(1) + logging.debug("Found JSON block using ```json ``` regex.") + else: + # If no block is found, assume the entire response is potentially JSON + json_string = llm_response_text.strip() + if not (json_string.startswith('{') and json_string.endswith('}')): + logging.warning("LLM response did not contain ```json ``` block and doesn't look like raw JSON object. Attempting parse anyway.") + else: + logging.debug("Assuming raw LLM response is JSON object.") + + parsed_response = json.loads(json_string) + + # Validate the top-level structure + if "ranked_edited_passages" not in parsed_response or not isinstance(parsed_response["ranked_edited_passages"], list): + logging.error("LLM JSON response missing 'ranked_edited_passages' list or it's not a list.") + raise ValueError("JSON response structure invalid: missing 'ranked_edited_passages' list.") + + raw_results = parsed_response["ranked_edited_passages"] + logging.info(f"LLM returned {len(raw_results)} items in 'ranked_edited_passages'.") + + # Validate and collect individual results from the list + parsed_llm_results = [] # Reset before processing + for i, item in enumerate(raw_results): + if isinstance(item, dict) and 'original_id' in item and 'edited_text' in item: + item_id = str(item['original_id']) # Ensure ID is string + item_text = str(item['edited_text']) + item_rationale = item.get('rationale', '') # Rationale is optional + + # Logging for individual items + # logging.debug(f"Parsed item {i}: ID={item_id}, Text='{item_text[:50]}...', Rationale='{item_rationale[:50]}...'") + + if item_id and item_text.strip(): # Only add if ID and text are non-empty + parsed_llm_results.append({'id': item_id, 'edited_text': item_text, 'rationale': item_rationale}) # Keep rationale here + else: + logging.warning(f"Skipping invalid or empty LLM result item at index {i}: {item}") + else: + logging.warning(f"Skipping item with invalid format in 'ranked_edited_passages' at index {i}: {item}") + + # Truncate to the target count if needed (should be handled by LLM, but safe) + parsed_llm_results = parsed_llm_results[:LLM_RERANK_TARGET_COUNT] + logging.info(f"Successfully parsed {len(parsed_llm_results)} valid ranked/edited passages from LLM response.") + + if not parsed_llm_results: + logging.info("LLM parsing yielded no valid passages.") + return [], original_query_embedding # Return empty list + + except (json.JSONDecodeError, ValueError) as parse_e: + logging.error(f"LLM Rank/Truncate response JSON parsing error: {parse_e}", exc_info=True) + logging.error(f"--- LLM Response Text causing JSON error ---\n{llm_response_text}\n--- End Response ---") + return [], original_query_embedding # Return empty list on parsing error + except Exception as parse_e: + logging.error(f"Unexpected error during LLM JSON parsing: {parse_e}", exc_info=True) + return [], original_query_embedding # Return empty list on any parsing error + # --- End JSON Parsing --- + + + # --- Fetch Metadata for LLM Results --- + # We need the original metadata (author, book, etc.) from the DB for displaying results correctly. + result_ids_to_fetch = [res['id'] for res in parsed_llm_results] + logging.info(f"Fetching metadata directly for {len(result_ids_to_fetch)} final LLM result IDs.") + + if result_ids_to_fetch: + fetched_metadata_map = fetch_multiple_passage_data(result_ids_to_fetch) + logging.debug(f"Fetched metadata map contains {len(fetched_metadata_map)} entries for final LLM results.") else: - # Display the actual error message type from the exception - error_msg = f"**Error:** An unexpected error occurred during search. See logs for details. ({type(e).__name__})" + # If no IDs to fetch (e.g., no results parsed), return empty + logging.warning("No result IDs to fetch metadata for after LLM parsing.") + return [], original_query_embedding + + + # --- Combine parsed text with fetched metadata for the final UI structure --- + final_llm_results_for_ui = [] + for result in parsed_llm_results: + passage_id = result['id'] + passage_data = fetched_metadata_map.get(passage_id) + if passage_data: + final_llm_results_for_ui.append({ + 'id': passage_id, # Original ID + 'original_id': passage_id, # Store original_id explicitly for formatter + 'edited_text': result.get('edited_text', '_Editierter Text fehlt_'), # LLM's edited text + 'rationale': result.get('rationale', ''), # LLM's rationale + 'metadata': passage_data.get('meta', {}) # Original metadata from DB fetch + # Note: Distance and Initial/Final similarity from previous steps are NOT included + # as the LLM result is a new entity, not directly representing a DB passage's score. + }) + else: + logging.warning(f"Could not fetch metadata from DB for final LLM result ID: {passage_id}. Skipping this result.") + + if not final_llm_results_for_ui: + logging.error("Failed to fetch metadata for any of the LLM's ranked passages.") + # Still return the original query embedding if available + return [], original_query_embedding # Return empty list + + # --- Success --- + logging.info(f"LLM Re-Rank Search successful. Returning {len(final_llm_results_for_ui)} processed results.") + # Return the list of results and the original query embedding + return final_llm_results_for_ui, original_query_embedding + + except Exception as e: + logging.error(f"LLM Rank/Truncate general processing error after API call: {e}", exc_info=True) + # Return None for results to indicate a failure, but return embedding if available + return None, original_query_embedding - # Update the UI to show the error message - updates[single_result_display_md] = gr.Markdown(error_msg) - updates[single_result_group] = gr.Group(visible=True) # Show the group to display the error - # Reset state on error - updates[full_search_results_state] = [] - updates[current_result_index_state] = 0 - updates[weiterlesen_button] = gr.Button(visible=False) - updates[previous_result_button] = gr.Button(visible=False) - updates[next_result_button] = gr.Button(visible=False) - updates[result_index_indicator_md] = gr.Markdown("") - updates[context_display] = gr.Markdown("") - updates[displayed_context_passages] = [] - updates[load_previous_button] = gr.Button(visible=False) - updates[load_next_button] = gr.Button(visible=False) +# --- Search Function (LLM Re-Rank Mode UI Wrapper) --- +def search_llm_rerank_mode_ui(llm_results, query_embedding): + """Prepares Gradio UI updates for the LLM Re-Rank Search results.""" + logging.info("Preparing UI updates for LLM Re-Rank Search results.") + updates = create_reset_updates() # Start with reset state + + # Store the received embedding for context highlighting + if query_embedding is not None: + updates[direct_embedding_output_holder] = query_embedding + logging.debug("Stored valid query embedding in direct_embedding_output_holder for LLM mode.") + else: + updates[direct_embedding_output_holder] = None + logging.warning("Query embedding was None for LLM mode.") + + # Set active view state early + updates[active_view_state] = "llm" + + # Check if results indicate an error occurred in the core logic (returned None) + if llm_results is None: + logging.error("LLM core search logic returned None, indicating an error.") + # Use the shared display group + updates[single_result_group] = gr.update(visible=True) + # MODIFIED: Update new components + updates[result_accordion] = gr.update(visible=True, label="**Fehler:** LLM Re-Ranking fehlgeschlagen.", open=False) + updates[result_metadata_display] = gr.update(value="Details siehe Server-Logs.") + updates[result_text] = gr.update(value="", visible=True) + # Ensure states are empty + updates[llm_results_state] = [] + updates[llm_result_index_state] = 0 return updates - # --->>> END EXCEPTION HANDLING <<<--- + # Check if results list is empty (no relevant passages found/parsed, but no error) + if not llm_results: + logging.info("LLM search returned no relevant passages.") + # Use the shared display group + updates[single_result_group] = gr.update(visible=True) + # MODIFIED: Update new components + updates[result_accordion] = gr.update(visible=True, label="LLM Resultate", open=False) + updates[result_metadata_display] = gr.update(value="_(Keine relevanten Passagen nach LLM Re-Ranking gefunden.)_") + updates[result_text] = gr.update(value="", visible=True) + # Ensure states are empty + updates[llm_results_state] = [] + updates[llm_result_index_state] = 0 + return updates + + # Got results, update UI + logging.info(f"Displaying first of {len(llm_results)} LLM re-ranked results.") + updates[llm_results_state] = llm_results + updates[llm_result_index_state] = 0 # Start at first result + + # Format and display the first result using the combined formatter + # MODIFIED: Call format_result_display and get two parts + accordion_title, accordion_content_md, text_content = format_result_display(llm_results[0], 0, len(llm_results), "llm") -# --- Result Navigation Function --- + # MODIFIED: Update new components + updates[result_accordion] = gr.update(visible=True, label=accordion_title, open=False) + updates[result_metadata_display] = gr.update(value=accordion_content_md) + updates[result_text] = gr.update(value=text_content, visible=True) + + + # Make shared result group and navigation visible + updates[single_result_group] = gr.update(visible=True) + updates[standard_nav_row] = gr.update(visible=True) + + # Configure navigation buttons for LLM results + updates[previous_result_button] = gr.update(visible=True, interactive=False) + updates[next_result_button] = gr.update(visible=True, interactive=(len(llm_results) > 1)) + updates[weiterlesen_button] = gr.update(visible=True, interactive=True, value="im Original weiterlesen") # Enable context button, change value + + return updates + +# --- Result Navigation Function (Standard Mode) --- def navigate_results(direction, current_index, full_results): - """Handles moving between search results in the top display area.""" - updates = {} - if not full_results: - logging.warning("Navigate called with no results in state.") - return { current_result_index_state: 0 } + """Handles UI updates for navigating standard search results.""" + logging.info(f"Navigating standard results: Direction={direction}, Index={current_index}") + # Define default updates (hide context, show standard results, etc.) + updates = { + standard_nav_row: gr.update(visible=True), # Show the shared nav row + single_result_group: gr.update(visible=True), # Show the shared result group + # MODIFIED: Clear new components instead of single_result_display_md + result_accordion: gr.update(label="...", open=False, visible=True), + result_metadata_display: gr.update(value=""), + result_text: gr.update(value="", visible=True), + # Buttons in standard_nav_row will be managed based on index below + previous_result_button: gr.update(visible=True), + next_result_button: gr.update(visible=True), + weiterlesen_button: gr.update(visible=True, value="weiterlesen"), # Standard search weiterlesen + + context_area: gr.update(visible=False), # Hide context + back_to_results_button: gr.update(visible=False), # Hide back button + current_result_index_state: current_index, # Store potentially new index + full_search_results_state: full_results, # Pass state through + active_view_state: "standard" # Ensure view state is correct + } + + if not full_results or not isinstance(full_results, list): + logging.warning("Cannot navigate: No standard results available in state.") + updates[current_result_index_state] = 0 + # MODIFIED: Update new components + updates[result_accordion] = gr.update(visible=True, label="Keine Resultate zum Navigieren.", open=False) + updates[result_metadata_display] = gr.update(value="") + updates[result_text] = gr.update(value="", visible=True) + # Hide all navigation elements in the shared row + updates[previous_result_button] = gr.update(interactive=False, visible=False) + updates[next_result_button] = gr.update(interactive=False, visible=False) + updates[weiterlesen_button] = gr.update(visible=False) + updates[standard_nav_row] = gr.update(visible=False) # Hide the nav row itself + updates[single_result_group] = gr.update(visible=False) # Hide the result group itself + return updates # Return the dictionary of updates + total_results = len(full_results) new_index = current_index + # Calculate new index based on direction if direction == 'previous': new_index = max(0, current_index - 1) elif direction == 'next': new_index = min(total_results - 1, current_index + 1) - # Only update display if the index actually changed - if new_index != current_index: - logging.info(f"Navigating from result index {current_index} to {new_index}") + # Update display if index is valid + if 0 <= new_index < total_results: result_data = full_results[new_index] - result_md = format_single_result(result_data, new_index, total_results) - updates[single_result_display_md] = gr.Markdown(result_md) - updates[current_result_index_state] = new_index - updates[result_index_indicator_md] = gr.Markdown(f"Result **{new_index + 1}** of **{total_results}**") - updates[context_display] = gr.Markdown("") # Clear reading area - updates[displayed_context_passages] = [] - updates[load_previous_button] = gr.Button(visible=False) - updates[load_next_button] = gr.Button(visible=False) - updates[weiterlesen_button] = gr.Button(visible=True) # Make visible again - - # Update navigation button interactivity based on the *new* index - updates[previous_result_button] = gr.Button(interactive=(new_index > 0)) - updates[next_result_button] = gr.Button(interactive=(new_index < total_results - 1)) - - # If index didn't change, ensure button states are still returned correctly - if new_index == current_index: - # Ensure weiterlesen visibility is returned if index didn't change - # (it should already be visible unless user clicked at boundary where it was hidden) - # Let's explicitly set it visible for safety upon any nav click if results exist - if total_results > 0: - updates[weiterlesen_button] = gr.Button(visible=True) + # MODIFIED: Use the combined formatter and get two parts + accordion_title, accordion_content_md, text_content = format_result_display(result_data, new_index, total_results, "standard") + + # MODIFIED: Update new components + updates[result_accordion] = gr.update(visible=True, label=accordion_title, open=False) + updates[result_metadata_display] = gr.update(value=accordion_content_md) + updates[result_text] = gr.update(value=text_content, visible=True) + + updates[current_result_index_state] = new_index # Update state with new index + # Update button interactivity based on new index + updates[previous_result_button] = gr.update(interactive=(new_index > 0)) + updates[next_result_button] = gr.update(interactive=(new_index < total_results - 1)) + updates[weiterlesen_button] = gr.update(interactive=True) # Always possible from a result + logging.info(f"Navigated standard results to index {new_index}") + else: + # Should not happen with bounds checking, but handle defensively + logging.error(f"Navigation error: New index {new_index} out of bounds [0, {total_results-1}]") + # MODIFIED: Update new components on error + updates[result_accordion] = gr.update(visible=True, label="Fehler beim Navigieren der Resultate.", open=False) + updates[result_metadata_display] = gr.update(value="") + updates[result_text] = gr.update(value="", visible=True) + updates[previous_result_button] = gr.update(interactive=False) + updates[next_result_button] = gr.update(interactive=False) + updates[weiterlesen_button] = gr.update(interactive=False) + return updates +# --- Navigation Function for LLM Results --- +def navigate_llm_results(direction, current_index, llm_results): + """Handles UI updates for navigating LLM re-ranked results.""" + logging.info(f"Navigating LLM results: Direction={direction}, Index={current_index}") + # Define default updates (show LLM results, hide others) + updates = { + standard_nav_row: gr.update(visible=True), # Show the shared nav row + single_result_group: gr.update(visible=True), # Show the shared result group + # MODIFIED: Clear new components instead of single_result_display_md + result_accordion: gr.update(label="...", open=False, visible=True), + result_metadata_display: gr.update(value=""), + result_text: gr.update(value="", visible=True), + # Buttons in standard_nav_row will be managed based on index below + previous_result_button: gr.update(visible=True), + next_result_button: gr.update(visible=True), + weiterlesen_button: gr.update(visible=True, value="im Original weiterlesen"), # LLM search weiterlesen + + context_area: gr.update(visible=False), # Hide context + back_to_results_button: gr.update(visible=False), # Hide back button + llm_results_state: llm_results, # Pass state through + llm_result_index_state: current_index, # Store potentially new index + active_view_state: "llm" # Ensure view state is correct + } -# --- Fetch Single Passage Helper --- -def fetch_passage_data(passage_id_int): - """Fetches a single passage dictionary from ChromaDB by its integer ID.""" - if collection is None or passage_id_int < 0: - return None - try: - passage_id_str = str(passage_id_int) - result = collection.get(ids=[passage_id_str], include=['documents', 'metadatas']) - if result and result.get('ids') and result['ids']: - return { - 'id': result['ids'][0], - 'doc': result['documents'][0] if result.get('documents') else "N/A", - 'meta': result['metadatas'][0] if result.get('metadatas') else {}, - } - else: - logging.info(f"Passage ID {passage_id_str} not found in collection.") - return None - except Exception as e: - logging.error(f"Error fetching passage ID {passage_id_int} from ChromaDB: {e}", exc_info=True) - return None + if not llm_results or not isinstance(llm_results, list): + logging.warning("Cannot navigate: No LLM results available in state.") + # MODIFIED: Update new components + updates[result_accordion] = gr.update(visible=True, label="Keine LLM-Resultate vorhanden.", open=False) + updates[result_metadata_display] = gr.update(value="") + updates[result_text] = gr.update(value="", visible=True) + # Hide navigation elements in the shared row + updates[previous_result_button] = gr.update(interactive=False, visible=False) + updates[next_result_button] = gr.update(interactive=False, visible=False) + updates[weiterlesen_button] = gr.update(visible=False) + updates[standard_nav_row] = gr.update(visible=False) # Hide the nav row itself + updates[single_result_group] = gr.update(visible=False) # Hide the result group itself + # Reset state + updates[llm_results_state] = [] + updates[llm_result_index_state] = 0 + return updates + total_results = len(llm_results) + new_index = current_index -# --- Move Passage to Reading Area --- -def move_to_reading_area(current_index, full_results): - """ - Moves the selected result passage's text to the reading area below, - hides the 'weiterlesen' button, and enables context loading buttons. - Keeps the metadata preview in the top area. - """ + # Calculate new index + if direction == 'previous': + new_index = max(0, current_index - 1) + elif direction == 'next': + new_index = min(total_results - 1, current_index + 1) + + # Update display if index is valid + if 0 <= new_index < total_results: + result_data = llm_results[new_index] + # MODIFIED: Use the combined formatter and get two parts + accordion_title, accordion_content_md, text_content = format_result_display(result_data, new_index, total_results, "llm") + + # MODIFIED: Update new components + updates[result_accordion] = gr.update(visible=True, label=accordion_title, open=False) + updates[result_metadata_display] = gr.update(value=accordion_content_md) + updates[result_text] = gr.update(value=text_content, visible=True) + + updates[llm_result_index_state] = new_index # Update state + # Update button interactivity + updates[previous_result_button] = gr.update(interactive=(new_index > 0)) + updates[next_result_button] = gr.update(interactive=(new_index < total_results - 1)) + updates[weiterlesen_button] = gr.update(interactive=True) + logging.info(f"Navigated LLM results to index {new_index}") + else: + logging.error(f"LLM Navigation error: New index {new_index} out of bounds [0, {total_results-1}]") + # MODIFIED: Update new components on error + updates[result_accordion] = gr.update(visible=True, label="Fehler beim Navigieren der LLM-Resultate.", open=False) + updates[result_metadata_display] = gr.update(value="") + updates[result_text] = gr.update(value="", visible=True) + updates[previous_result_button] = gr.update(interactive=False) + updates[next_result_button] = gr.update(interactive=False) + updates[weiterlesen_button] = gr.update(interactive=False) + + + return updates + +# --- Navigation Function for Favourites --- +def navigate_best_results(direction, current_index, best_results): + """Handles UI updates for navigating favourite results.""" + logging.info(f"Navigating favourite results: Direction={direction}, Index={current_index}") + # Define default updates (show favourites, hide others) updates = { - # Keep top preview area unchanged - # Prepare context/reading area - context_display: gr.Markdown("_Loading reading passage..._"), - displayed_context_passages: [], - load_previous_button: gr.Button(visible=False), - load_next_button: gr.Button(visible=False), - weiterlesen_button: gr.Button(visible=False) # Hide this button + standard_nav_row: gr.update(visible=True), # Show the shared nav row + single_result_group: gr.update(visible=True), # Show the shared result group + # MODIFIED: Clear new components instead of single_result_display_md + result_accordion: gr.update(label="...", open=False, visible=True), + result_metadata_display: gr.update(value=""), + result_text: gr.update(value="", visible=True), + # Buttons in standard_nav_row will be managed based on index below + previous_result_button: gr.update(visible=True), + next_result_button: gr.update(visible=True), + weiterlesen_button: gr.update(visible=True, value="weiterlesen"), # Favourites weiterlesen + + context_area: gr.update(visible=False), # Hide context + back_to_results_button: gr.update(visible=False), # Hide back button + best_results_state: best_results, # Pass state through + best_index_state: current_index, # Store potentially new index + active_view_state: "favourites" # Ensure view state is correct } - if not full_results or current_index < 0 or current_index >= len(full_results): - logging.warning(f"Attempted to move passage with invalid state or index. Index: {current_index}, Results Count: {len(full_results)}") - updates[context_display] = gr.Markdown("Error: Could not load passage reference.") - updates[weiterlesen_button] = gr.Button(visible=False) + if not best_results or not isinstance(best_results, list): + logging.warning("Cannot navigate: No favourite results available in state.") + updates[best_index_state] = 0 + # MODIFIED: Update new components + updates[result_accordion] = gr.update(visible=True, label="_Keine Favoriten zum Navigieren._", open=False) + updates[result_metadata_display] = gr.update(value="") + updates[result_text] = gr.update(value="", visible=True) + # Hide navigation elements in the shared row + updates[previous_result_button] = gr.update(interactive=False, visible=False) + updates[next_result_button] = gr.update(interactive=False, visible=False) + updates[weiterlesen_button] = gr.update(visible=False) + updates[standard_nav_row] = gr.update(visible=False) # Hide the nav row itself + updates[single_result_group] = gr.update(visible=False) # Hide the result group itself + # Reset state + updates[best_results_state] = [] + updates[best_index_state] = 0 return updates - try: - target_result_data = full_results[current_index] - reading_passage_state_data = { - 'id': target_result_data.get('id'), - 'doc': target_result_data.get('document'), - 'meta': target_result_data.get('metadata'), - 'role': 'current_reading' - } - if not reading_passage_state_data['id'] or not reading_passage_state_data['doc']: - logging.error(f"Cannot move passage: Missing ID or document in result at index {current_index}.") - updates[context_display] = gr.Markdown("Error: Selected passage data is incomplete.") - updates[weiterlesen_button] = gr.Button(visible=False) - return updates + total_results = len(best_results) + new_index = current_index - formatted_passage_md = format_context_markdown([reading_passage_state_data]) + # Calculate new index + if direction == 'previous': + new_index = max(0, current_index - 1) + elif direction == 'next': + new_index = min(total_results - 1, current_index + 1) - updates[context_display] = gr.Markdown(formatted_passage_md) - updates[displayed_context_passages] = [reading_passage_state_data] - updates[load_previous_button] = gr.Button(visible=True) - updates[load_next_button] = gr.Button(visible=True) + # Update display if index is valid + if 0 <= new_index < total_results: + result_data = best_results[new_index] + # MODIFIED: Use the combined formatter and get two parts + accordion_title, accordion_content_md, text_content = format_result_display(result_data, new_index, total_results, "favourites") + + # MODIFIED: Update new components + updates[result_accordion] = gr.update(visible=True, label=accordion_title, open=False) + updates[result_metadata_display] = gr.update(value=accordion_content_md) + updates[result_text] = gr.update(value=text_content, visible=True) + + updates[best_index_state] = new_index # Update state + # Update button interactivity + updates[previous_result_button] = gr.update(interactive=(new_index > 0)) + updates[next_result_button] = gr.update(interactive=(new_index < total_results - 1)) + updates[weiterlesen_button] = gr.update(interactive=True) # Always possible from a favourite + logging.info(f"Navigated favourite results to index {new_index}") + else: + logging.error(f"Favourite Navigation error: New index {new_index} out of bounds [0, {total_results-1}]") + # MODIFIED: Update new components on error + updates[result_accordion] = gr.update(visible=True, label="Fehler beim Navigieren der Favoriten.", open=False) + updates[result_metadata_display] = gr.update(value="") + updates[result_text] = gr.update(value="", visible=True) + updates[previous_result_button] = gr.update(interactive=False) + updates[next_result_button] = gr.update(interactive=False) + updates[weiterlesen_button] = gr.update(interactive=False) - logging.info(f"Moved passage ID {reading_passage_state_data['id']} to reading area.") - return updates + + return updates + + +# --- Move Standard Result to Reading Area (UI Logic) --- +def move_to_reading_area_ui(current_index, full_results, query_embedding_value, result_type): + """Handles UI updates and data fetching for moving a result (Standard, LLM, or Favourite) + to the context reading area.""" + logging.info(f"--- Moving {result_type} Result (Index: {current_index}) to Reading Area ---") + # Define UI changes: Hide results, show context area, set loading message + updates = { + standard_nav_row: gr.update(visible=False), # Hide the shared results nav + single_result_group: gr.update(visible=False), # Hide the shared results group + context_area: gr.update(visible=True), # Show context area immediately + context_display: gr.update(value="Lade Paragraphen..."), # Loading message + load_previous_button: gr.update(visible=True, interactive=True), + load_next_button: gr.update(visible=True, interactive=True), + back_to_results_button: gr.update(visible=True, interactive=True) + } + # Define state changes separately + state_updates = { + # Preserve the relevant state indices and lists based on result_type + full_search_results_state: [], # Will be replaced by full_results if result_type is standard + current_result_index_state: 0, + llm_results_state: [], # Will be replaced by full_results if result_type is llm + llm_result_index_state: 0, + best_results_state: [], # Will be replaced by full_results if result_type is favourites + best_index_state: 0, + displayed_context_passages: [], # Reset context state before loading + direct_embedding_output_holder: query_embedding_value # Pass embedding + } + + if result_type == "standard": + state_updates[full_search_results_state] = full_results + state_updates[current_result_index_state] = current_index + state_updates[active_view_state] = "context_from_standard" + elif result_type == "llm": + state_updates[llm_results_state] = full_results # Note: full_results holds LLM results here + state_updates[llm_result_index_state] = current_index + state_updates[active_view_state] = "context_from_llm" + elif result_type == "favourites": + state_updates[best_results_state] = full_results # Note: full_results holds favourite results here + state_updates[best_index_state] = current_index + state_updates[active_view_state] = "context_from_favourites" # New state for favourites context + # For favourites, the query embedding is not directly relevant for highlighting the original text, + # as the favourite was selected based on its score. However, we keep the state updated in case needed later. + # Maybe set to None or a specific marker if we don't want query highlighting? Let's keep it for now. + # state_updates[direct_embedding_output_holder] = None + + + # Log the received embedding for debugging highlighting + logging.debug(f"move_to_reading_area_ui: Received query_embedding_value type: {type(query_embedding_value)}, len/shape: {len(query_embedding_value) if isinstance(query_embedding_value, (list, np.ndarray)) else 'N/A'}, result_type: {result_type}") + + + # Validate input + if not full_results or not isinstance(full_results, list) or not (0 <= current_index < len(full_results)): + logging.error(f"Invalid {result_type} result reference for moving to reading area.") + updates[context_display] = gr.update(value="Fehler: Ungültige Resultat-Referenz zum Lesen.") + updates[load_previous_button] = gr.update(interactive=False) + updates[load_next_button] = gr.update(interactive=False) + return {**updates, **state_updates} + + try: + # Get data for the selected result + target_result_data = full_results[current_index] + passage_meta = target_result_data.get('metadata', {}) + selected_passage_id = target_result_data.get('id') # Use 'id' for favourites too + + # Extract metadata needed to fetch the paragraph + author = passage_meta.get('author') + book = passage_meta.get('book') + paragraph_idx = passage_meta.get('paragraph_index') # Should be integer or None + + # Check if necessary metadata is present + if author is None or book is None or paragraph_idx is None or not isinstance(paragraph_idx, int) or paragraph_idx < 0: + logging.error(f"Missing necessary metadata (author/book/paragraph_index) for {result_type} result ID {selected_passage_id}: Meta={passage_meta}") + updates[context_display] = gr.update(value="Fehler: Metadaten unvollständig. Paragraph kann nicht geladen werden.") + updates[load_previous_button] = gr.update(interactive=False) + updates[load_next_button] = gr.update(interactive=False) + return {**updates, **state_updates} + + logging.info(f"Fetching initial paragraph for context: Author='{author}', Book='{book}', ParagraphIndex={paragraph_idx}") + # Fetch the full paragraph data (including embeddings) + initial_paragraph_sentences = fetch_paragraph_data(author, book, paragraph_idx) + + if not initial_paragraph_sentences: + logging.error(f"Could not fetch paragraph sentences for {author}/{book}/P{paragraph_idx}") + updates[context_display] = gr.update(value="Fehler: Der zugehörige Paragraph konnte nicht geladen werden (möglicherweise leer?). Die Navigation zum nächsten/vorherigen Paragraphen ist weiterhin aktiv.") + # Buttons remain interactive=True + + # Still need to update the state, even if empty sentences were returned, + # to correctly reflect that the context area is active. + state_updates[displayed_context_passages] = [] + return {**updates, **state_updates} + + + # Format the fetched paragraph using the VALID query embedding received as input + logging.info(f"Formatting paragraph {paragraph_idx} with {len(initial_paragraph_sentences)} sentences for display.") + formatted_passage_md = format_context_markdown(initial_paragraph_sentences, query_embedding_value) # Use the passed embedding + updates[context_display] = gr.update(value=formatted_passage_md) # Update display + # Update state with the fetched sentences + state_updates[displayed_context_passages] = initial_paragraph_sentences + # Buttons are already interactive=True from the initial update dict + logging.info(f"Paragraph {paragraph_idx} (for passage ID {selected_passage_id}) displayed in context area.") except Exception as e: - logging.error(f"Error moving passage for result index {current_index}: {e}", exc_info=True) - updates[context_display] = gr.Markdown(f"Error moving passage to reading area: {e}") - updates[weiterlesen_button] = gr.Button(visible=False) - return updates + logging.error(f"Error moving {result_type} passage to reading area: {e}", exc_info=True) + updates[context_display] = gr.update(value=f"**Fehler:** Der Paragraph konnte nicht angezeigt werden. Details siehe Server-Logs.") + updates[load_previous_button] = gr.update(interactive=False) + updates[load_next_button] = gr.update(interactive=False) + + return {**updates, **state_updates} + +# --- Go Back To Results Function --- +# ... (go_back_to_results_wrapper remains the same in logic, but updates new UI components) ... +def go_back_to_results_wrapper(last_active_view, std_results, std_index, llm_results, llm_index, best_results, best_index, current_fav_signal_value): + """Handles UI updates for returning from the context view to the appropriate results view.""" + logging.info(f"Triggered: go_back_to_results_wrapper from view: {last_active_view}") + + updates_dict = { + # Reset context area visibility + context_area: gr.update(visible=False), + context_display: gr.update(value=""), # Clear context display + displayed_context_passages: gr.State([]), # Reset context state + + # Pass through existing results and indices states + full_search_results_state: std_results, current_result_index_state: std_index, + llm_results_state: llm_results, llm_result_index_state: llm_index, + best_results_state: best_results, best_index_state: best_index, + direct_embedding_output_holder: None, # Clear embedding when leaving context + fav_signal: gr.update(value=current_fav_signal_value), # <--- Pass through fav_signal state + active_view_state: "none", # Reset active view temporarily before setting correct one + + # MODIFIED: Ensure the new result components are cleared before potentially showing results + result_accordion: gr.update(label="...", open=False, visible=False), + result_metadata_display: gr.update(value=""), + result_text: gr.update(value="", visible=False), + } + # Hide status message + updates_dict[status_message] = gr.update(value="", visible=False) + + + # Determine which result view to show based on where we came from + target_view = "none" + target_results_list = [] + target_index = 0 + result_type = "unknown" # Used for formatting + + if last_active_view == "context_from_standard": + updates_dict[standard_nav_row] = gr.update(visible=True) + updates_dict[single_result_group] = gr.update(visible=True) + target_view = "standard" + target_results_list = std_results + target_index = std_index + result_type = "standard" + logging.info("Going back to Standard results.") + elif last_active_view == "context_from_llm": + updates_dict[standard_nav_row] = gr.update(visible=True) # Assuming LLM uses standard nav row layout + updates_dict[single_result_group] = gr.update(visible=True) # Assuming LLM uses standard group layout + target_view = "llm" + target_results_list = llm_results + target_index = llm_index + result_type = "llm" + logging.info("Going back to LLM results.") + elif last_active_view == "context_from_favourites": + # Assuming favourites use the same display/nav components but potentially managed differently + updates_dict[standard_nav_row] = gr.update(visible=True) + updates_dict[single_result_group] = gr.update(visible=True) + target_view = "favourites" + target_results_list = best_results + target_index = best_index + result_type = "favourites" + logging.info("Going back to Favourites.") + else: + logging.warning(f"Back button triggered from unexpected state: {last_active_view}") + # Default to showing standard search if view is unknown or error state + updates_dict[standard_nav_row] = gr.update(visible=True) + updates_dict[single_result_group] = gr.update(visible=True) + # MODIFIED: Set initial state for new components + updates_dict[result_accordion] = gr.update(label="Kontextansicht verlassen.", open=False, visible=True) + updates_dict[result_metadata_display] = gr.update(value="") + updates_dict[result_text] = gr.update(value="", visible=True) + target_view = "standard" # Fallback view + # Ensure buttons are hidden if no data is available + + updates_dict[previous_result_button] = gr.update(visible=False, interactive=False) + updates_dict[next_result_button] = gr.update(visible=False, interactive=False) + updates_dict[weiterlesen_button] = gr.update(visible=False, interactive=False) + + # Return here if we hit an unknown state + updates_dict[active_view_state] = target_view # Set fallback view state + return updates_dict + + + # Update the active_view state to the results view we returned to + updates_dict[active_view_state] = target_view + + # Now manually update the result display and navigation buttons for the target view + if target_results_list and isinstance(target_results_list, list) and 0 <= target_index < len(target_results_list): + result_data = target_results_list[target_index] + # MODIFIED: Use the combined formatter and update new components + accordion_title, accordion_content_md, text_content = format_result_display(result_data, target_index, len(target_results_list), result_type) + updates_dict[result_accordion] = gr.update(visible=True, label=accordion_title, open=False) + updates_dict[result_metadata_display] = gr.update(value=accordion_content_md) + updates_dict[result_text] = gr.update(value=text_content, visible=True) + + # Update button interactivity based on the selected index and total results + updates_dict[previous_result_button] = gr.update(visible=True, interactive=(target_index > 0)) + updates_dict[next_result_button] = gr.update(visible=True, interactive=(target_index < len(target_results_list) - 1)) + updates_dict[weiterlesen_button] = gr.update(visible=True, interactive=True, value="weiterlesen" if result_type != "llm" else "im Original weiterlesen") + + else: + # If the result list is empty or invalid, show appropriate message + error_msg_label = f"_{target_view.capitalize()}-Resultate nicht verfügbar._" + error_msg_content = "" # No content for metadata + updates_dict[result_accordion] = gr.update(visible=True, label=error_msg_label, open=False) + updates_dict[result_metadata_display] = gr.update(value=error_msg_content) + updates_dict[result_text] = gr.update(value="", visible=True) # Clear text area + + # Hide navigation buttons as there are no results to navigate + updates_dict[previous_result_button] = gr.update(visible=False, interactive=False) + updates_dict[next_result_button] = gr.update(visible=False, interactive=False) + updates_dict[weiterlesen_button] = gr.update(visible=False, interactive=False) + + return updates_dict # --- Load More Context Function --- -def load_more_context(direction, current_passages_state): - """ - Loads one more passage either before or after the passages in the reading/context area. - Updates the Markdown display and the context state list. - """ +def load_more_context(direction, current_passages_state, query_embedding_value): + """Loads the previous or next paragraph in the reading view.""" + logging.info(f"--- Loading More Context: Direction={direction} ---") + # Log embedding details for debugging highlighting + logging.debug(f"load_more_context: Received query_embedding_value type: {type(query_embedding_value)}, len/shape: {len(query_embedding_value) if isinstance(query_embedding_value, (list, np.ndarray)) else 'N/A'}") + + + # --- Initial Checks --- if collection is None: - return "Error: Database connection failed.", current_passages_state - if not current_passages_state: - logging.warning("Load more context called with empty state.") - return "_No reading passage loaded yet._", [] + logging.error("Cannot load more context: DB collection not available.") + err_msg = format_context_markdown(current_passages_state or [], query_embedding_value) + "\n\n**Fehler: Datenbank nicht verfügbar.**" + return err_msg, current_passages_state # Return existing state + + + if not current_passages_state or not isinstance(current_passages_state, list): + logging.warning("load_more_context called with empty or invalid current passage state.") + return "_Keine Passage geladen, kann nicht mehr Kontext laden._", [] + + # Define marker IDs used to indicate boundaries + START_MARKER_ID = '-1' # Represents reaching the beginning + END_MARKER_ID = 'END_MARKER_ID' # Represents reaching the end - current_passages_state.sort(key=lambda p: int(p.get('id', -1))) - updated_passages = list(current_passages_state) try: + # --- Determine Boundary and Target Paragraph --- + # Ensure current state is sorted (should be, but safe) + current_passages_state.sort(key=lambda x: (x.get('paragraph_index', -1), x.get('sentence_sort_key', float('inf')))) + + boundary_passage = None + target_paragraph_index = -1 # Target index to fetch + add_at_beginning = False # Flag to prepend or append new paragraph + + if direction == 'previous': - earliest_id_str = updated_passages[0].get('id') - if earliest_id_str is None: return format_context_markdown(updated_passages), updated_passages - earliest_id_int = int(earliest_id_str) - id_to_fetch = earliest_id_int - 1 - - if id_to_fetch < 0: - if not (updated_passages[0].get('role') == 'missing' and updated_passages[0].get('id') == '-1'): - if updated_passages[0].get('role') == 'missing': updated_passages.pop(0) - updated_passages.insert(0, {'id': '-1', 'role': 'missing', 'doc': '_(Beginning of document reached)_'}) + add_at_beginning = True + # Find the first non-missing passage to use as the boundary reference + first_content_passage = next((p for p in current_passages_state if p.get('role') != 'missing'), None) + + if not first_content_passage: + logging.warning("Context state contains only markers or is empty. Cannot load previous paragraph.") + # Format existing (only markers) and return current state + return format_context_markdown(current_passages_state, query_embedding_value), current_passages_state + + boundary_passage = first_content_passage + + # Check if we are already at the start boundary (by looking at the ID of the very first item) + if current_passages_state[0].get('id') == START_MARKER_ID: + logging.info("Already at the start boundary marker. No previous paragraph to load.") + # Reformat existing content (no change expected) and return current state + return format_context_markdown(current_passages_state, query_embedding_value), current_passages_state + + + current_para_idx = boundary_passage.get('paragraph_index') + # Calculate target index, handle None or 0 index + if current_para_idx is None or not isinstance(current_para_idx, int) or current_para_idx <= 0: + target_paragraph_index = -2 # Indicates we've hit the conceptual start (index < 0) else: - new_passage_data = fetch_passage_data(id_to_fetch) - if new_passage_data: - new_passage_data['role'] = 'prev' - if updated_passages[0].get('role') == 'missing' and updated_passages[0].get('id') == str(id_to_fetch + 1): - updated_passages.pop(0) - updated_passages.insert(0, new_passage_data) - else: - if not (updated_passages[0].get('role') == 'missing' and updated_passages[0].get('id') == str(id_to_fetch)): - if updated_passages[0].get('role') == 'missing': updated_passages.pop(0) - updated_passages.insert(0, {'id': str(id_to_fetch), 'role': 'missing', 'doc': '_(Beginning of document reached)_'}) + target_paragraph_index = current_para_idx - 1 + elif direction == 'next': - latest_id_str = updated_passages[-1].get('id') - if latest_id_str is None: return format_context_markdown(updated_passages), updated_passages - latest_id_int = int(latest_id_str) - id_to_fetch = latest_id_int + 1 - - new_passage_data = fetch_passage_data(id_to_fetch) - if new_passage_data: - new_passage_data['role'] = 'next' - if updated_passages[-1].get('role') == 'missing' and updated_passages[-1].get('id') == str(id_to_fetch -1): - updated_passages.pop(-1) - updated_passages.append(new_passage_data) + add_at_beginning = False + # Find the last non-missing passage to use as the boundary reference + last_content_passage = next((p for p in reversed(current_passages_state) if p.get('role') != 'missing'), None) + + if not last_content_passage: + logging.warning("Context state contains only markers or is empty. Cannot load next paragraph.") + return format_context_markdown(current_passages_state, query_embedding_value), current_passages_state + + boundary_passage = last_content_passage + + # Check if we are already at the end boundary (by looking at the ID of the very last item) + if current_passages_state[-1].get('id') == END_MARKER_ID: + logging.info("Already at the end boundary marker. No next paragraph to load.") + return format_context_markdown(current_passages_state, query_embedding_value), current_passages_state + + + current_para_idx = boundary_passage.get('paragraph_index') + # Check for missing index on the boundary passage + if current_para_idx is None or not isinstance(current_para_idx, int): + logging.error("Cannot load next paragraph: current boundary passage is missing a valid paragraph index.") + err_msg = format_context_markdown(current_passages_state, query_embedding_value) + "\n\n**Fehler: Interner Zustand inkonsistent (fehlender Paragraph-Index).**" + return err_msg, current_passages_state + + target_paragraph_index = current_para_idx + 1 + else: + logging.error(f"Invalid direction '{direction}' provided to load_more_context.") + return format_context_markdown(current_passages_state, query_embedding_value), current_passages_state # Return unchanged + + + # --- Fetch New Paragraph Data --- + new_paragraph_sentences = [] + boundary_hit = False # Flag if we reached start/end of book/section + new_passage_added = False # Flag if actual content was added/changed + + + # Extract author/book from the boundary passage's metadata + boundary_meta = boundary_passage.get('meta', {}) if boundary_passage else {} + author = boundary_meta.get('author') + book = boundary_meta.get('book') + + + # Fetch if target index is valid and we have author/book context + if target_paragraph_index >= 0 and author and book: + logging.info(f"Attempting to load paragraph {target_paragraph_index} for {author}/{book}") + new_paragraph_sentences = fetch_paragraph_data(author, book, target_paragraph_index) + if not new_paragraph_sentences: + # Successfully queried but found no sentences -> boundary hit + boundary_hit = True + logging.info(f"Boundary hit: No sentences found for paragraph {target_paragraph_index}.") else: - if not (updated_passages[-1].get('role') == 'missing' and updated_passages[-1].get('id') == str(id_to_fetch)): - if updated_passages[-1].get('role') == 'missing': updated_passages.pop(-1) - updated_passages.append({'id': str(id_to_fetch), 'role': 'missing', 'doc': '_(End of document reached)_'}) + # Successfully fetched new sentences + new_passage_added = True + logging.info(f"Successfully fetched {len(new_paragraph_sentences)} sentences for paragraph {target_paragraph_index}.") + elif target_paragraph_index == -2: + # Explicitly hit the start boundary based on index calculation + boundary_hit = True + logging.info("Boundary hit: Reached beginning (index <= 0).") + else: + # Invalid state (e.g., missing author/book on boundary passage) + logging.error(f"Cannot load more context: Invalid target index ({target_paragraph_index}) or missing author/book from boundary passage {boundary_passage.get('id') if boundary_passage else 'N/A'}.") + boundary_hit = True # Treat as boundary hit to potentially add marker + + + # --- Update Passages State --- + updated_passages = list(current_passages_state) # Create a mutable copy + + # Remove existing boundary markers before adding new content/markers + updated_passages = [p for p in updated_passages if p.get('role') != 'missing'] + + + if new_passage_added: + # Add the newly fetched sentences + if add_at_beginning: + updated_passages = new_paragraph_sentences + updated_passages # Prepend + else: + updated_passages.extend(new_paragraph_sentences) # Append + # Only add boundary marker if new content wasn't added AND we hit a boundary + # (or if it was a boundary hit but fetch_paragraph_data returned empty). + # This prevents adding a boundary marker if the next paragraph exists but is empty, + # unless we are at the absolute start/end (target_paragraph_index == -2 or the fetch returns empty). + # Also ensure we don't add duplicate markers. + if boundary_hit: + if add_at_beginning: # Hit previous boundary + if not updated_passages or updated_passages[0].get('id') != START_MARKER_ID: + updated_passages.insert(0, {'id': START_MARKER_ID, 'paragraph_index': -1, 'role': 'missing', 'doc': '_(Anfang des Buches/Abschnitts)_', 'meta': {}, 'sentence_sort_key': float('-inf'), 'embedding': None}) + # new_passage_added = True # Marker addition counts as change + + else: # Hit next boundary + if not updated_passages or updated_passages[-1].get('id') != END_MARKER_ID: + updated_passages.append({'id': END_MARKER_ID, 'paragraph_index': float('inf'), 'role': 'missing', 'doc': '_(Ende des Buches/Abschnitts)_', 'meta': {}, 'sentence_sort_key': float('inf'), 'embedding': None}) + # new_passage_added = True # Marker addition counts as change + + + # --- Reformat and Return --- + # Reformat only if the content of `updated_passages` actually changed (new passage or marker added) + # or if the original state had markers removed. + # Compare length or check if new_passage_added or boundary_hit. + content_changed = new_passage_added or (boundary_hit and len(updated_passages) != len(current_passages_state)) # Simple check for now + + if content_changed or not updated_passages: # Also reformat if state became empty + # Ensure final list is sorted correctly including any added markers/paragraphs + updated_passages.sort(key=lambda x: (x.get('paragraph_index', -1), x.get('sentence_sort_key', float('inf')))) + logging.info(f"Reformatting context with {len(updated_passages)} total passages after loading more.") + + # Use the VALID query embedding passed into the function for consistent highlighting + context_md = format_context_markdown(updated_passages, query_embedding_value) + # Return the new Markdown and the updated state list + return context_md, updated_passages + else: + # No new passage or boundary marker state change. + # Reformat existing content just in case metadata/sorting needed fixing, return original state list + logging.debug(f"Load Context: No change in passages or boundary marker state for direction '{direction}'. Reformatting existing state.") + # Re-sort the original state list just in case, then format it. + current_passages_state.sort(key=lambda x: (x.get('paragraph_index', -1), x.get('sentence_sort_key', float('inf')))) + original_context_md = format_context_markdown(current_passages_state, query_embedding_value) + # Return the reformatted original markdown and the original state list + return original_context_md, current_passages_state - context_md = format_context_markdown(updated_passages) - return context_md, updated_passages - except ValueError: - logging.error(f"Error converting passage ID to integer in load_more_context. State: {current_passages_state}", exc_info=True) - error_message = format_context_markdown(current_passages_state) + "\n\n**Error processing context expansion.**" - return error_message, current_passages_state except Exception as e: - logging.error(f"Error loading more context (direction: {direction}): {e}", exc_info=True) - error_message = format_context_markdown(current_passages_state) + f"\n\n**Error loading passage: {e}**" + logging.error(f"Error loading more context (paragraph mode): {e}", exc_info=True) + # Format existing content + error message, return original state + error_message = format_context_markdown(current_passages_state or [], query_embedding_value) + f"\n\n**Fehler beim Laden des nächsten/vorherigen Paragraphen.**" return error_message, current_passages_state +# --- Load More Context Function --- +def load_more_context_wrapper(direction, current_passages_state, query_embedding_value): + """Loads the previous or next paragraph in the reading view.""" + logging.info(f"Triggered: load_more_context_wrapper direction={direction}") + # This function's outputs are only context_display and displayed_context_passages state. + # It does NOT affect the overall UI layout or result list navigation buttons. + output_components = [context_display, displayed_context_passages] + try: + context_md, updated_passages_state = load_more_context(direction, current_passages_state, query_embedding_value) + # load_more_context returns a tuple (markdown_str, updated_state_list) + # Map these directly to the output components + updates_list = [ + gr.update(value=context_md), # update context_display + updated_passages_state # update displayed_context_passages state + ] + logging.debug(f"load_more_context_wrapper: Returning {len(updates_list)} updates.") + return updates_list + except Exception as e: + logging.error(f"Error in load_more_context wrapper: {e}", exc_info=True) + # On error, return error message and original state + error_md = format_context_markdown(current_passages_state or [], query_embedding_value) + f"\n\n**Fehler beim Laden des nächsten/vorherigen Paragraphen.**" + updates_list = [ + gr.update(value=error_md), + current_passages_state # Return original state on error + ] + return updates_list + + +# --- Modified _on_fav function --- +# This function is triggered by the hidden button click via api_name +# It expects the passage_id as its argument, provided by the JS Client API predict call. +def _on_fav(passage_id): # Removed type hint str for debugging + """Handles favourite signal from JS, only increments and updates status.""" + # Log the type and value of the received argument + logging.info(f"Triggered: _on_fav with received argument: {passage_id!r} (Type: {type(passage_id)})") + + updates_dict = { + fav_signal: gr.update(value=""), # Always clear the signal textbox after processing + status_message: gr.update(visible=False, value="") # Clear status initially + } + + # Check if passage_id is a non-empty string + if not isinstance(passage_id, str) or not passage_id.strip(): + logging.warning(f"_on_fav called with invalid passage_id: {passage_id!r}.") + updates_dict[status_message] = gr.update(visible=True, value="**Fehler:** Ungültige Favoriten-ID erhalten.") + return updates_dict # Return the updates dictionary + + try: + # Call the core logic to increment the favourite score + new_score = inc_favourite(passage_id) # Use the valid passage_id string + logging.info(f"Successfully incremented favourite for ID {passage_id}. New score: {new_score}") + # Update the status message to inform the user + updates_dict[status_message] = gr.update(visible=True, value=f"⭐ Favorit gespeichert! (Score: {new_score})") + except Exception as e: + logging.error(f"Error in _on_fav processing ID {passage_id}: {e}", exc_info=True) + # Update status message with error info + updates_dict[status_message] = gr.update(visible=True, value=f"**Fehler beim Speichern des Favoriten:** {e}") + + # This function returns a dictionary of updates for its bound outputs. + # These are just the fav_signal state (to reset it) and the status_message UI element. + return updates_dict + + +js_code = """ +// ------------------------------------------------------------ +// FAVOURITE HANDLER (uses Gradio JS Client predict endpoint) +// ------------------------------------------------------------ +let gradioApp = null; // will hold the connected client +const ENDPOINT = "/fav"; // same name you set in api_name +const STATUS_SEL = '#status-message'; // Selector for the markdown status element +// const FAV_SIGNAL_ID = 'fav-signal'; // No longer directly interacting with fav-signal textbox from JS click handler +// const DEBUG_ID_SEL = '#clicked-id-debug input'; // Original selector +// const DEBUG_ELEM_ID = 'clicked-id-debug'; // The elem_id for the debug textbox container <--- REMOVE THIS + +// 1 ‒ connect once, then re‑use +async function initializeFavClient() { + console.log("JS: Initializing fav client…"); + try { + // Assuming Client is made global by the + + + """, + css=custom_css # Add the custom CSS here + ) as demo: + + gr.Markdown("# Thought Loop") + gr.Markdown("Semantische Suche") # --- State Variables --- - full_search_results_state = gr.State([]) - current_result_index_state = gr.State(0) - displayed_context_passages = gr.State([]) - - # --- Search Input Row --- + full_search_results_state = gr.State([]) # Stores results from Standard search + current_result_index_state = gr.State(0) # Index for Standard search results + llm_results_state = gr.State([]) # Stores results from LLM search + llm_result_index_state = gr.State(0) # Index for LLM search results + best_results_state = gr.State([]) # Stores results from Favourites view + best_index_state = gr.State(0) # Index for Favourites results + + displayed_context_passages = gr.State([]) # Stores passages currently in context view + # active_view_state tracks which view is active: + # "standard", "llm", "favourites", "context_from_standard", "context_from_llm", "context_from_favourites", "none" + active_view_state = gr.State("none") + # Holds the query embedding for highlighting in the context view. + # Needs to be passed through UI events that transition *to* the context view. + direct_embedding_output_holder = gr.State(None) + + # --- UI Layout --- with gr.Row(): - query_input = gr.Textbox(label="Enter query", placeholder="z. B. 'Was ist der Unterschied zwischen Herstellen und Handeln?'", lines=2, scale=3) - author_dropdown = gr.Dropdown( - label="Filter by Author(s) (Optional)", - choices=unique_authors, - multiselect=True, - scale=2 - ) - search_button = gr.Button("Search", variant="primary", scale=1) + query_input = gr.Textbox(label="Gedanken eingeben", placeholder="Sollte Technologie nicht zu immer krasserer Arbeitsteilung führen, sodass wir in Zukunft...", lines=2, scale=4) + author_dropdown = gr.Dropdown(label="Autoren auswählen (optional)", choices=unique_authors, multiselect=True, scale=2) + + with gr.Accordion("Feinabstimmung Rankierung", open=False) as result_tuning_accordion: + with gr.Row(): + window_size_slider = gr.Slider( + minimum=0, maximum=5, step=1, value=RERANK_WINDOW_SIZE, + label="Kontext-Fenstergröße (+/- Sätze)", + info="Wie viele Sätze vor/nach dem Treffer-Satz für Kontext-Score & Anzeige berücksichtigt werden (0-5)." + ) + weight_slider = gr.Slider( + minimum=0.0, maximum=1.0, step=0.05, value=RERANK_WEIGHT, + label="Kontext-Gewichtung", + info="Wie stark der Kontext-Score das ursprüngliche Ranking beeinflusst (0.0 = kein Einfluss, 1.0 = stark)." + ) + decay_slider = gr.Slider( + minimum=0.0, maximum=1.0, step=0.05, value=RERANK_DECAY, + label="Kontext-Abfallfaktor", + info="Wie schnell der Einfluss von Nachbarn mit der Distanz abnimmt (0.0 = kein Abfall, 1.0 = stark)." + ) - # --- Result Navigation Row (MOVED HERE) --- with gr.Row(): - previous_result_button = gr.Button("⬅️", visible=False) - next_result_button = gr.Button("➡️", visible=False) + search_button = gr.Button("Embeddingsuche", variant="secondary", scale=1) + llm_rerank_button = gr.Button("Embeddingsuche + LLM Auswahl", variant="secondary", scale=1, interactive=(API_KEY is not None and llm_rerank_model is not None)) + best_of_button = gr.Button("⭐⭐⭐", variant="secondary", scale=1) + + # --- Shared Results/Favourites Area --- + # We reuse standard_nav_row and single_result_group for all result types + with gr.Row(visible=False) as standard_nav_row: + # These buttons will be shown/hidden based on active_view_state + previous_result_button = gr.Button("⬅️", min_width=80, visible=False) # General Previous + next_result_button = gr.Button("➡️", min_width=80, visible=False) # General Next + weiterlesen_button = gr.Button("weiterlesen", variant="secondary", visible=False) # General Weiterlesen + + with gr.Group(visible=False) as single_result_group: + # MODIFIED: Replaced single_result_display_md with an Accordion and a Textbox + result_accordion = gr.Accordion(label="Feinabstimmung", open=False) # Accordion for heading and metadata + with result_accordion: + # The content of the accordion will be a Markdown component + result_metadata_display = gr.Markdown("...") # Placeholder for metadata and scores + + # This Textbox will contain the actual passage text + result_text = gr.Textbox(label="", lines=5, interactive=False, visible=True) + + + # --- Status Message Area --- + # Added elem_id for JS to target + status_message = gr.Markdown("", visible=False, elem_id="status-message") # Changed to visible=False initially + + # --- Hidden Signaling Components --- + # Hidden textbox to hold the ID (will be used as input in client API call) + # JS click handler now uses the client API directly, no longer sets this textbox value + # This component's *value* is still used as an output by _on_fav to reset it. + fav_signal = gr.Textbox( + visible=False, + elem_id="fav-signal", # Still useful for potential future JS interactions or debugging + value="" # Initialize with empty value + ) + # Hidden button triggered by JS (used to expose the backend function via its api_name binding) + # The Client API calls the function bound to the api_name, not the button's click *event*. + # This button component is mainly here to provide a place for the api_name binding. + fav_trigger_button = gr.Button( + visible=False, + elem_id="fav-trigger-button" # Still useful for JS to get a reference if needed, though not clicked directly anymore + ) - gr.Markdown("---") # Separator after search and navigation + # --- Reading Area --- + with gr.Column(visible=False) as context_area: + back_to_results_button = gr.Button("⬅️ Zurück ", variant="secondary", visible=False) + load_previous_button = gr.Button("⬆️", variant="secondary", visible=False) # Added text + # --- MODIFIED: Added elem_id to context_display --- + context_display = gr.HTML(label="Lesebereich", value="
_Kontext wird hier angezeigt._
", elem_id="context-display-markdown") # gr.HTML needs valid HTML, so wrap placeholder in div + load_next_button = gr.Button("⬇️", variant="secondary", visible=False) # Added text + + # --- Utility function to create a reset update dictionary --- + # This function needs to be defined AFTER all the components it references + def create_reset_updates(): + """Creates a dictionary of Gradio updates to reset the UI and state.""" + updates = {} + # List all components that need resetting/hiding, *excluding* the sliders and the Accordion content display + components_to_reset = [ + # States + full_search_results_state, current_result_index_state, displayed_context_passages, + llm_results_state, llm_result_index_state, active_view_state, + direct_embedding_output_holder, + best_results_state, best_index_state, + fav_signal, # <-- Included here as a state to reset its value + # Shared Result UI - Containers + standard_nav_row, single_result_group, + # Shared Result UI - New Components + result_accordion, result_metadata_display, result_text, + # Tuning Accordion + result_tuning_accordion, + # Buttons in shared row + previous_result_button, next_result_button, weiterlesen_button, + # Context Area UI + context_area, context_display, load_previous_button, load_next_button, + back_to_results_button, + # Status message + status_message, + # fav_trigger_button is intentionally excluded here as its visibility/interactivity isn't controlled by this reset. + ] + + for comp in components_to_reset: + if isinstance(comp, gr.State): + if comp in [current_result_index_state, llm_result_index_state, best_index_state]: updates[comp] = 0 + elif comp == active_view_state: updates[comp] = "none" + elif comp == direct_embedding_output_holder: updates[comp] = None + # Note: fav_signal state value is reset below explicitly + elif comp in [full_search_results_state, displayed_context_passages, llm_results_state, best_results_state]: updates[comp] = [] + else: # UI Components + if isinstance(comp, gr.Markdown): + updates[comp] = gr.update(value="") # Clear Markdown content + elif isinstance(comp, gr.HTML): + updates[comp] = gr.update(value="
_Kontext wird hier angezeigt._
") # Reset HTML content + elif isinstance(comp, gr.Textbox): # Handle Textboxes + # result_text needs value reset, visibility handled by single_result_group + if comp == result_text: + updates[comp] = gr.update(value="", interactive=False) # Keep interactive=False for results view + # fav_signal needs value reset AND explicit visibility set to False + elif comp == fav_signal: + updates[comp] = gr.update(value="", visible=False) + # Add any other Textboxes here if needed + + + elif isinstance(comp, gr.Accordion): # New Accordion + updates[comp] = gr.update(label="Feinabstimmung", open=False, visible=True) # Reset label, close, keep visible. Visibility controlled by single_result_group. + + if isinstance(comp, (gr.Row, gr.Group, gr.Column)): + # Keep tuning accordion open/visible (Accordion itself isn't in this list, but its contents are) + if comp not in []: # Add any other components that should NOT be hidden here + updates[comp] = gr.update(visible=False) + + + if isinstance(comp, gr.Button): + updates[comp] = gr.update(visible=False, interactive=False) + + if comp == status_message: + updates[comp] = gr.update(value="", visible=False) + + + # Explicitly set tuning sliders to be visible and interactive on reset, + # but *don't* reset their values here. Their current values will be retained. + # These sliders are NOT included in the components_to_reset list above, + # so they won't be affected by the generic hide logic. + updates[window_size_slider] = gr.update(visible=True, interactive=True) + updates[weight_slider] = gr.update(visible=True, interactive=True) + updates[decay_slider] = gr.update(visible=True, interactive=True) + + # The result_metadata_display (inside the accordion) also needs resetting + updates[result_metadata_display] = gr.update(value="...") + + + logging.debug(f"Created reset updates dict with {len(updates)} items.") + return updates - # --- Single Result Display Area --- - # Contains the preview text and the "weiterlesen" button - with gr.Column(visible=True) as results_area: - with gr.Group(visible=False) as single_result_group: - result_index_indicator_md = gr.Markdown("Result 0 of 0") - single_result_display_md = gr.Markdown("...") # Shows the preview - # "weiterlesen" button remains at the end of the preview group - weiterlesen_button = gr.Button("weiterlesen", variant="secondary", visible=True) + + # --- Wrapper Functions for Gradio Bindings --- + # These wrappers prepare the inputs and outputs for the Gradio event handlers. + # They return a dictionary of updates which is then converted to a list by Gradio. + + def search_standard_wrapper(query, selected_authors, window_size, weight, decay): + logging.info(f"Triggered: search_standard_wrapper with window={window_size}, weight={weight:.2f}, decay={decay:.2f}") + # Start with a reset state (Includes hiding context area and its buttons) + updates_dict = create_reset_updates() + try: + search_results, query_embedding = perform_search_standard( + query, selected_authors, + window_size, weight, decay + ) + # Merge updates from the mode-specific UI function (Shows results area) + # search_standard_mode_ui now handles updating the new components + updates_dict.update(search_standard_mode_ui(search_results, query_embedding)) + except Exception as e: + logging.error(f"Error in search_standard_wrapper: {e}", exc_info=True) + # MODIFIED: Update the new components on error + updates_dict[result_accordion] = gr.update(label=f"**Fehler bei der Suche:**", open=False, visible=True) + updates_dict[result_metadata_display] = gr.update(value=str(e)) # Display error message in metadata area + updates_dict[result_text] = gr.update(value="", visible=True) + updates_dict[single_result_group] = gr.update(visible=True) # Ensure the result group is visible + updates_dict[direct_embedding_output_holder] = None + + # --- FIX: Ensure context area and its buttons are hidden when showing search results --- + # Although create_reset_updates is called, add explicit updates for robustness + updates_dict[context_area] = gr.update(visible=False) + updates_dict[load_previous_button] = gr.update(visible=False) + updates_dict[load_next_button] = gr.update(visible=False) + updates_dict[back_to_results_button] = gr.update(visible=False) + # --- END FIX --- + + + # Return the dictionary of updates + return updates_dict + + def search_llm_rerank_wrapper(query, selected_authors, window_size, weight, decay): + logging.info(f"Triggered: search_llm_rerank_wrapper with window={window_size}, weight={weight:.2f}, decay={decay:.2f}") + # Start with a reset state (Includes hiding context area and its buttons) + updates_dict = create_reset_updates() + try: + llm_results, query_embedding = perform_search_llm( + query, selected_authors, + window_size, weight, decay + ) + # Merge updates from the mode-specific UI function (Shows LLM results area) + # search_llm_rerank_mode_ui now handles updating the new components + updates_dict.update(search_llm_rerank_mode_ui(llm_results, query_embedding)) + except Exception as e: + logging.error(f"Error in search_llm_rerank_wrapper: {e}", exc_info=True) + # MODIFIED: Update the new components on error + updates_dict[result_accordion] = gr.update(label=f"**Fehler bei der LLM-Suche:**", open=False, visible=True) + updates_dict[result_metadata_display] = gr.update(value=str(e)) # Display error message + updates_dict[result_text] = gr.update(value="", visible=True) + updates_dict[single_result_group] = gr.update(visible=True) # Ensure group is visible + updates_dict[direct_embedding_output_holder] = None + + + # --- FIX: Ensure context area and its buttons are hidden when showing LLM results --- + # Although create_reset_updates is called, add explicit updates for robustness + updates_dict[context_area] = gr.update(visible=False) + updates_dict[load_previous_button] = gr.update(visible=False) + updates_dict[load_next_button] = gr.update(visible=False) + updates_dict[back_to_results_button] = gr.update(visible=False) + # --- END FIX --- + + # Return the dictionary of updates + return updates_dict + + def refresh_best_wrapper(): + """Wrapper for _refresh_best to prepare UI updates.""" + logging.info("Triggered: refresh_best_wrapper") + # Start with a reset state (Includes hiding context area and its buttons) + updates_dict = create_reset_updates() + # Ensure status message is hidden on view change + updates_dict[status_message] = gr.update(value="", visible=False) + try: + favs = top_favourites(MAX_FAVOURITES) + if not favs: + logging.info("No favourites to display.") + # MODIFIED: Update the new components for no results + updates_dict[result_accordion] = gr.update(label="_Noch keine Favoriten gesammelt.", open=False, visible=True) + updates_dict[result_metadata_display] = gr.update(value="") + updates_dict[result_text] = gr.update(value="", visible=True) + updates_dict[single_result_group] = gr.update(visible=True) # Ensure group is visible + updates_dict[best_results_state] = [] + updates_dict[best_index_state] = 0 + updates_dict[active_view_state] = "favourites" # Set view even if empty - gr.Markdown("---") # Separator before reading area + else: + logging.info(f"Displaying first of {len(favs)} favourite results.") + # format_result_display returns (accordion_title, accordion_content_md, text_content) + accordion_title, accordion_content_md, text_content = format_result_display(favs[0], 0, len(favs), "favourites") + + # MODIFIED: Update the new components with formatted data + updates_dict[result_accordion] = gr.update(label=accordion_title, open=False, visible=True) + updates_dict[result_metadata_display] = gr.update(value=accordion_content_md) + updates_dict[result_text] = gr.update(value=text_content, visible=True) + + updates_dict[single_result_group] = gr.update(visible=True) # Ensure group is visible + updates_dict[standard_nav_row] = gr.update(visible=True) + updates_dict[previous_result_button] = gr.update(visible=True, interactive=False) # First result is not navigable prev + updates_dict[next_result_button] = gr.update(visible=True, interactive=(len(favs) > 1)) # Enable if more than one fav + updates_dict[weiterlesen_button] = gr.update(visible=True, interactive=True, value="weiterlesen") # Enable context button + updates_dict[best_results_state] = favs + updates_dict[best_index_state] = 0 + updates_dict[active_view_state] = "favourites" # Set active view state + + + + except Exception as e: + logging.error(f"Error in refresh_best_wrapper: {e}", exc_info=True) + # MODIFIED: Update the new components on error + updates_dict[result_accordion] = gr.update(label=f"**Fehler beim Laden der Favoriten:**", open=False, visible=True) + updates_dict[result_metadata_display] = gr.update(value=str(e)) # Display error message + updates_dict[result_text] = gr.update(value="", visible=True) + updates_dict[single_result_group] = gr.update(visible=True) # Ensure group is visible + updates_dict[best_results_state] = [] + updates_dict[best_index_state] = 0 + updates_dict[active_view_state] = "none" # Indicate error state + + # --- FIX: Ensure context area and its buttons are hidden when showing Favourites --- + # Although create_reset_updates is called, add explicit updates for robustness + updates_dict[context_area] = gr.update(visible=False) + updates_dict[load_previous_button] = gr.update(visible=False) + updates_dict[load_next_button] = gr.update(visible=False) + updates_dict[back_to_results_button] = gr.update(visible=False) + # --- END FIX --- + + + # Return the dictionary of updates + return updates_dict + + def navigate_results_wrapper(direction, current_index, full_results, llm_results, llm_index, best_results, best_index, active_view): + logging.info(f"Triggered: navigate_results_wrapper direction={direction}, active_view={active_view}") + + updates_dict = { + # Default updates to preserve relevant state based on active view + full_search_results_state: full_results, + current_result_index_state: current_index, + llm_results_state: llm_results, + llm_result_index_state: llm_index, + best_results_state: best_results, + best_index_state: best_index, + active_view_state: active_view, # Preserve active view + + # MODIFIED: Clear new components when navigating (before displaying the next one) + result_accordion: gr.update(label="...", open=False, visible=True), + result_metadata_display: gr.update(value=""), + result_text: gr.update(value="", visible=True), + } - # --- Context / Reading Area --- - with gr.Column(visible=True) as context_area: - load_previous_button = gr.Button("⬆️", variant="secondary", visible=False) - context_display = gr.Markdown(label="Reading Area") - load_next_button = gr.Button("⬇️", variant="secondary", visible=False) + try: + if active_view == "standard": + # navigate_results now updates the new components directly + nav_updates = navigate_results(direction, current_index, full_results) + updates_dict.update(nav_updates) + elif active_view == "llm": + # navigate_llm_results now updates the new components directly + nav_updates = navigate_llm_results(direction, llm_index, llm_results) + updates_dict.update(nav_updates) + elif active_view == "favourites": + # navigate_best_results now updates the new components directly + nav_updates = navigate_best_results(direction, best_index, best_results) + updates_dict.update(nav_updates) + else: + logging.warning(f"Navigation triggered in unexpected view state: {active_view}") + # MODIFIED: Update new components on error + updates_dict[result_accordion] = gr.update(label="Navigation nicht möglich.", open=False, visible=True) + updates_dict[result_metadata_display] = gr.update(value="Ungültiger Status.") + updates_dict[result_text] = gr.update(value="", visible=True) + # Hide nav buttons as navigation is not possible + updates_dict[previous_result_button] = gr.update(interactive=False) + updates_dict[next_result_button] = gr.update(interactive=False) + updates_dict[weiterlesen_button] = gr.update(interactive=False) + + + except Exception as e: + logging.error(f"Error in navigation wrapper: {e}", exc_info=True) + # MODIFIED: Update new components on error + updates_dict[result_accordion] = gr.update(label=f"**Navigationsfehler:**", open=False, visible=True) + updates_dict[result_metadata_display] = gr.update(value=str(e)) + updates_dict[result_text] = gr.update(value="", visible=True) + # On error, disable navigation buttons + updates_dict[previous_result_button] = gr.update(interactive=False) + updates_dict[next_result_button] = gr.update(interactive=False) + updates_dict[weiterlesen_button] = gr.update(interactive=False) + + + # Return the dictionary of updates + # Note: The individual navigate_* functions within the try/except + # already populate the updates_dict with the specifics. + # We just handle the top-level error/unexpected state here. + return updates_dict + + + def go_back_to_results_wrapper(last_active_view, std_results, std_index, llm_results, llm_index, best_results, best_index, current_fav_signal_value): + """Handles UI updates for returning from the context view to the appropriate results view.""" + logging.info(f"Triggered: go_back_to_results_wrapper from view: {last_active_view}") + + updates_dict = { + # Reset context area visibility + context_area: gr.update(visible=False), + context_display: gr.update(value=""), # Clear context display + displayed_context_passages: gr.State([]), # Reset context state + + # Pass through existing results and indices states + full_search_results_state: std_results, current_result_index_state: std_index, + llm_results_state: llm_results, llm_result_index_state: llm_index, + best_results_state: best_results, best_index_state: best_index, + direct_embedding_output_holder: None, # Clear embedding when leaving context + fav_signal: gr.update(value=current_fav_signal_value), # <--- Pass through fav_signal state + active_view_state: "none", # Reset active view temporarily before setting correct one + + # Ensure the new result components are initially hidden when returning + result_accordion: gr.update(label="Feinabstimmung", open=False, visible=False), + result_metadata_display: gr.update(value=""), + result_text: gr.update(value="", visible=False), + + # Ensure shared result row and group are initially hidden + standard_nav_row: gr.update(visible=False), + single_result_group: gr.update(visible=False), + + # Also ensure all result nav buttons are hidden initially + previous_result_button: gr.update(visible=False, interactive=False), + next_result_button: gr.update(visible=False, interactive=False), + weiterlesen_button: gr.update(visible=False, interactive=False), + } + # Hide status message + updates_dict[status_message] = gr.update(value="", visible=False) + + + # Determine which result view to show based on where we came from + target_view = "none" + target_results_list = [] + target_index = 0 + result_type = "unknown" # Used for formatting + + if last_active_view == "context_from_standard": + target_view = "standard" + target_results_list = std_results + target_index = std_index + result_type = "standard" + logging.info("Going back to Standard results.") + elif last_active_view == "context_from_llm": + target_view = "llm" + target_results_list = llm_results + target_index = llm_index + result_type = "llm" + logging.info("Going back to LLM results.") + elif last_active_view == "context_from_favourites": + target_view = "favourites" + target_results_list = best_results + target_index = best_index + result_type = "favourites" + logging.info("Going back to Favourites.") + else: + logging.warning(f"Back button triggered from unexpected state: {last_active_view}") + # Default to showing an error message if view is unknown + updates_dict[result_accordion] = gr.update(label="Zurück aus unbekanntem Zustand.", open=False, visible=True) + updates_dict[result_metadata_display] = gr.update(value="Resultate konnten nicht geladen werden.") + updates_dict[result_text] = gr.update(value="", visible=True) + updates_dict[single_result_group] = gr.update(visible=True) # Ensure group is visible + updates_dict[standard_nav_row] = gr.update(visible=True) # Ensure nav row is visible (even if buttons are hidden) + target_view = "none" # Stay in error state + return updates_dict # Return early on error + + + # Update the active_view state to the results view we returned to + updates_dict[active_view_state] = target_view + + # Show the shared result group and nav row + updates_dict[single_result_group] = gr.update(visible=True) + updates_dict[standard_nav_row] = gr.update(visible=True) + + + # Update the result display and navigation buttons for the target view + if target_results_list and isinstance(target_results_list, list) and 0 <= target_index < len(target_results_list): + result_data = target_results_list[target_index] + # MODIFIED: Use the combined formatter and update new components + accordion_title, accordion_content_md, text_content = format_result_display(result_data, target_index, len(target_results_list), result_type) + updates_dict[result_accordion] = gr.update(visible=True, label=accordion_title, open=False) + updates_dict[result_metadata_display] = gr.update(value=accordion_content_md) + updates_dict[result_text] = gr.update(value=text_content, visible=True) + + # Update button interactivity based on the selected index and total results + updates_dict[previous_result_button] = gr.update(visible=True, interactive=(target_index > 0)) + updates_dict[next_result_button] = gr.update(visible=True, interactive=(target_index < len(target_results_list) - 1)) + updates_dict[weiterlesen_button] = gr.update(visible=True, interactive=True, value="weiterlesen" if result_type != "llm" else "im Original weiterlesen") + else: + # If the result list is empty or invalid after returning, show appropriate message + error_msg_label = f"_{target_view.capitalize()}-Resultate nicht verfügbar._" + error_msg_content = "" # No content for metadata + updates_dict[result_accordion] = gr.update(visible=True, label=error_msg_label, open=False) + updates_dict[result_metadata_display] = gr.update(value=error_msg_content) + updates_dict[result_text] = gr.update(value="", visible=True) # Clear text area + + # Hide navigation buttons as there are no results to navigate + updates_dict[previous_result_button] = gr.update(visible=False, interactive=False) + updates_dict[next_result_button] = gr.update(visible=False, interactive=False) + updates_dict[weiterlesen_button] = gr.update(visible=False, interactive=False) + + return updates_dict + + + def move_to_reading_wrapper(std_results, std_index, llm_results, llm_index, best_results, best_index, active_view, query_embedding_value, current_fav_signal_value): + logging.info(f"Triggered: move_to_reading_wrapper active_view={active_view}") + + updates_dict = { + # Preserve all state variables by default + full_search_results_state: std_results, current_result_index_state: std_index, + llm_results_state: llm_results, llm_result_index_state: llm_index, + best_results_state: best_results, best_index_state: best_index, + active_view_state: active_view, # Preserve active view temporarily + direct_embedding_output_holder: query_embedding_value, + fav_signal: gr.update(value=current_fav_signal_value) # <--- Pass through fav_signal state + } + # Hide status message when changing view + updates_dict[status_message] = gr.update(value="", visible=False) + + + try: + target_results_list = [] + target_index = 0 + result_type = "unknown" + + # Identify which result list and index to use based on active_view + if active_view == "standard": + target_results_list = std_results + target_index = std_index + result_type = "standard" + elif active_view == "llm": + target_results_list = llm_results + target_index = llm_index + result_type = "llm" + elif active_view == "favourites": + target_results_list = best_results + target_index = best_index + result_type = "favourites" + else: + logging.warning(f"Weiterlesen triggered in unexpected view state: {active_view}") + updates_dict[context_display] = gr.update(value="Kann Kontext in diesem Zustand nicht laden.") + updates_dict[context_area] = gr.update(visible=True) + updates_dict[load_previous_button] = gr.update(interactive=False) + updates_dict[load_next_button] = gr.update(interactive=False) + updates_dict[back_to_results_button] = gr.update(visible=True, interactive=True) + updates_dict[active_view_state] = "none" # Indicate an error/transition state + return updates_dict # Return early on error + + # Call the UI function that fetches and formats the initial context + # Pass only the data it needs (index within the target list, the list itself, embedding, and type) + # The move_to_reading_area_ui function should return a dictionary of updates for UI components like context_display and displayed_context_passages state + read_updates = move_to_reading_area_ui(target_index, target_results_list, query_embedding_value, result_type) + + # Update the active_view state to reflect entering context mode + # This state will be used by load_more and back buttons + updates_dict[active_view_state] = f"context_from_{result_type}" + + # Merge the UI updates returned by move_to_reading_area_ui + updates_dict.update(read_updates) + + + except Exception as e: + logging.error(f"Error in move_to_reading wrapper: {e}", exc_info=True) + updates_dict[context_display] = gr.update(value=f"**Fehler:** Konnte Paragraph nicht in Lesebereich laden: {e}") + updates_dict[context_area] = gr.update(visible=True) + updates_dict[load_previous_button] = gr.update(interactive=False) + updates_dict[load_next_button] = gr.update(interactive=False) + updates_dict[back_to_results_button] = gr.update(visible=True, interactive=True) + updates_dict[active_view_state] = "error_context" # Indicate an error state + + + return updates_dict + + + # This wrapper function remains the same, it's bound to load_previous_button and load_next_button + def load_more_context_wrapper(direction, current_passages_state, query_embedding_value): + logging.info(f"Triggered: load_more_context_wrapper direction={direction}") + # This function's outputs are only context_display and displayed_context_passages state. + # It does NOT affect the overall UI layout or result list navigation buttons. + output_components = [context_display, displayed_context_passages] + try: + context_md, updated_passages_state = load_more_context(direction, current_passages_state, query_embedding_value) + # load_more_context returns a tuple (markdown_str, updated_state_list) + # Map these directly to the output components + updates_list = [ + gr.update(value=context_md), # update context_display + updated_passages_state # update displayed_context_passages state + ] + logging.debug(f"load_more_context_wrapper: Returning {len(updates_list)} updates.") + return updates_list + except Exception as e: + logging.error(f"Error in load_more_context wrapper: {e}", exc_info=True) + # On error, return error message and original state + error_md = format_context_markdown(current_passages_state or [], query_embedding_value) + f"\n\n**Fehler beim Laden des nächsten/vorherigen Paragraphen.**" + updates_list = [ + gr.update(value=error_md), + current_passages_state # Return original state on error + ] + return updates_list + + + # --- Define the combined list of all potential UI outputs --- + # This list is needed for functions that can trigger updates across multiple parts of the UI. + # We add the direct_embedding_output_holder state as well. + # fav_trigger_button is NOT in this list because it's strictly + # a hidden signaling component updated only by the fav logic binding's outputs. + # This list needs to be defined AFTER all components are defined in the Blocks context + all_ui_outputs = [ + # States + full_search_results_state, current_result_index_state, displayed_context_passages, + llm_results_state, llm_result_index_state, active_view_state, + direct_embedding_output_holder, + best_results_state, best_index_state, + fav_signal, + # Shared Result UI Containers + standard_nav_row, single_result_group, + # MODIFIED: New Result UI Components + result_accordion, result_metadata_display, result_text, + # Tuning Accordion + result_tuning_accordion, + # Buttons in shared row + previous_result_button, next_result_button, weiterlesen_button, + # Context Area UI + context_area, context_display, load_previous_button, load_next_button, + back_to_results_button, + # Tuning Sliders (Keep them in the list because wrappers might update their visibility/interactivity, + # but the reset function explicitly avoids changing their values) + window_size_slider, weight_slider, decay_slider, + # Status message + status_message, + ] + logging.info(f"Length of all_ui_outputs list (used for comprehensive updates): {len(all_ui_outputs)}") - # --- Event Handlers (Wiring remains the same) --- - # Search Button Action - search_outputs = [ - full_search_results_state, current_result_index_state, single_result_group, - result_index_indicator_md, single_result_display_md, previous_result_button, - next_result_button, weiterlesen_button, context_display, - displayed_context_passages, load_previous_button, load_next_button, - ] + # --- Bindings: Connect UI elements to functions --- + + # Bind search buttons to their wrapper functions. + # These wrappers will return a dictionary of updates for the *entire* UI state. search_button.click( - fn=search_philosophical_texts, - inputs=[query_input, author_dropdown], - outputs=search_outputs + search_standard_wrapper, + inputs=[query_input, author_dropdown, window_size_slider, weight_slider, decay_slider], + # We must list ALL potential outputs here, including states and UI elements that might change visibility or content. + # Gradio will use the dictionary returned by the wrapper to update the matching outputs in this list. + outputs=all_ui_outputs + ) + llm_rerank_button.click( + search_llm_rerank_wrapper, + inputs=[query_input, author_dropdown, window_size_slider, weight_slider, decay_slider], + outputs=all_ui_outputs ) - # Previous/Next Result Button Actions - nav_outputs = [ # Combined list for prev/next - single_result_display_md, current_result_index_state, result_index_indicator_md, - previous_result_button, next_result_button, weiterlesen_button, - context_display, displayed_context_passages, - load_previous_button, load_next_button, + # Bind the favourites button to its wrapper + best_of_button.click( + refresh_best_wrapper, + inputs=[], # No direct inputs, it fetches from the fav_scores state + outputs=all_ui_outputs # It updates results display, navigation, and state + ) + + + # Bind navigation buttons to a single wrapper that handles different view states + # Inputs include all state variables needed to know the current view and data + nav_inputs = [ + current_result_index_state, full_search_results_state, # Standard state + llm_results_state, llm_result_index_state, # LLM state + best_results_state, best_index_state, # Favourites state + active_view_state # Current view indicator ] + # Outputs include all UI elements and states that might change during navigation + nav_outputs = all_ui_outputs # Navigation can affect the result display and state previous_result_button.click( - fn=navigate_results, - inputs=[gr.State('previous'), current_result_index_state, full_search_results_state], + lambda *args: navigate_results_wrapper("previous", *args), # Pass 'previous' as first arg + inputs=nav_inputs, outputs=nav_outputs ) next_result_button.click( - fn=navigate_results, - inputs=[gr.State('next'), current_result_index_state, full_search_results_state], + lambda *args: navigate_results_wrapper("next", *args), # Pass 'next' as first arg + inputs=nav_inputs, outputs=nav_outputs ) - # "weiterlesen" Button Action - weiterlesen_outputs = [ - context_display, displayed_context_passages, - load_previous_button, load_next_button, - weiterlesen_button # Target button itself to control visibility + + # Bind the "weiterlesen" button to a wrapper that handles different view states + # Inputs need state necessary to determine which result (standard, llm, fav) to load context for + # We also need fav_signal's current value to pass it through in the outputs. + read_inputs = [ + full_search_results_state, current_result_index_state, # Standard state + llm_results_state, llm_result_index_state, # LLM state + best_results_state, best_index_state, # Favourites state + active_view_state, # Current view indicator (e.g., 'standard', 'llm', 'favourites') + direct_embedding_output_holder, # Embedding for highlighting in context + fav_signal # <--- ADDED fav_signal here as an input ] + + # Outputs include all UI elements and states that change when entering context view + # This is why all_ui_outputs is used here. + read_outputs = all_ui_outputs + weiterlesen_button.click( - fn=move_to_reading_area, - inputs=[current_result_index_state, full_search_results_state], - outputs=weiterlesen_outputs - ) + move_to_reading_wrapper, + inputs=read_inputs, + outputs=read_outputs + ) - # Load More Context Buttons + + # Bind context navigation buttons + # load_more_context_wrapper already returns updates as a list [context_display_update, state_update] + # These only update the context display and state, not the main results area. load_previous_button.click( - fn=load_more_context, - inputs=[gr.State('previous'), displayed_context_passages], - outputs=[context_display, displayed_context_passages] + load_more_context_wrapper, + inputs=[gr.State('previous'), displayed_context_passages, direct_embedding_output_holder], + outputs=[context_display, displayed_context_passages], # Only update context display and state + scroll_to_output=False ) load_next_button.click( - fn=load_more_context, - inputs=[gr.State('next'), displayed_context_passages], - outputs=[context_display, displayed_context_passages] + load_more_context_wrapper, + inputs=[gr.State('next'), displayed_context_passages, direct_embedding_output_holder], + outputs=[context_display, displayed_context_passages], # Only update context display and state + scroll_to_output=False + ) + + # Bind the "Zurück" button to a wrapper that handles returning to results list + # Inputs need states relevant to restoring the correct results view. + # We also need fav_signal's current value to pass it through in the outputs. + back_inputs = [ + active_view_state, # Need to know which view we came from to go back correctly + full_search_results_state, current_result_index_state, # Standard state + llm_results_state, llm_result_index_state, # LLM state + best_results_state, best_index_state, # Favourites state + fav_signal # <--- ADDED fav_signal here as an input + ] + + # Outputs include all UI elements and states that change when returning to results view + back_outputs = all_ui_outputs + + back_to_results_button.click( + go_back_to_results_wrapper, + inputs=back_inputs, + outputs=back_outputs + ) + + # --- Binding for favourite signaling --- + # This binding exposes the _on_fav function to the Gradio Client API via api_name="fav". + # The JS client will call the backend function associated with this api_name, + # providing a value for the component(s) in the 'inputs' list. + # _on_fav expects the value of fav_signal as its single argument. + # It returns updates for fav_signal (to clear it) and status_message. + fav_trigger_button.click( + _on_fav, + inputs=[fav_signal], # This tells Gradio that the API call for /fav expects ONE input, which should correspond to fav_signal's value. + outputs=[fav_signal, status_message], # These are the components _on_fav will update + api_name="fav" # <-- Exposes route /fav ) # --- Launch the Application --- if __name__ == "__main__": - if collection is None: - print("\n--- ERROR: ChromaDB collection failed to load. UI might not function correctly. Check logs. ---\n") - elif not unique_authors: - print("\n--- WARNING: No unique authors found in DB metadata. Author filter will be empty. ---\n") - - print("Launching Gradio Interface...") - # Make sure debug=True is helpful during testing - demo.launch(server_name="0.0.0.0", share=False, debug=True) \ No newline at end of file + print("\n" + "="*50) + print("--- Performing Startup Checks ---") + startup_warnings = [] + if collection is None: startup_warnings.append("--- ERROR: ChromaDB Collection could not be loaded/initialized.") + elif collection.count() == 0: startup_warnings.append("--- WARNUNG: ChromaDB Collection is empty. Search will yield no results.") + elif not unique_authors: startup_warnings.append("--- WARNUNG: No unique authors found in DB metadata (check 'author' key). Filter will be empty.") + if not API_KEY: startup_warnings.append("--- WARNUNG: GEMINI_API_KEY not found. Embedding/LLM features WILL FAIL.") + if API_KEY and llm_rerank_model is None: startup_warnings.append(f"--- WARNUNG: Gemini LLM Re-Rank Model ({LLM_RERANK_MODEL_NAME}) failed to initialize despite API key being present.") + if not os.path.exists(PROMPT_LOG_DIR) or not os.path.isdir(PROMPT_LOG_DIR): startup_warnings.append(f"--- WARNUNG: Prompt log directory '{PROMPT_LOG_DIR}' not found or is not a directory.") + + if startup_warnings: + print("!!! Startup Issues Found !!!") + for w in startup_warnings: print(w) + else: + print("--- Configuration checks passed successfully. ---") + + print("\n" + "--- Configuration Summary ---") + print(f"- Embedding Model: {EMBEDDING_MODEL}") + print(f"- LLM Re-Rank Model: {LLM_RERANK_MODEL_NAME}") + print(f"- Initial DB Fetch Size: {INITIAL_RESULTS_FOR_RERANK}") + print(f"- 1st Pass Re-rank Window: +/- {RERANK_WINDOW_SIZE} sentences") + print(f"- 1st Pass Re-rank Weight: {RERANK_WEIGHT:.2f}, Decay: {RERANK_DECAY:.2f}") + print(f"- LLM Candidate Count: {LLM_RERANK_CANDIDATE_COUNT}") + print(f"- LLM Target Result Count: {LLM_RERANK_TARGET_COUNT}") + print(f"- Max Results per Author (Final): {MAX_RESULTS_PER_AUTHOR}") + print(f"- Max Favourites Displayed: {MAX_FAVOURITES}") + print(f"- LLM Prompts logged to: '{PROMPT_LOG_DIR}'") + print(f"- Favourites saved to: '{FAV_FILE}'") # Log fav file location + print("--- End Summary ---") + + print("\nStarting Gradio Interface...") + print("="*50 + "\n") + + demo.launch( + server_name="0.0.0.0", + share=False, + debug=True # Keep debug=True for now to see all logs + ) \ No newline at end of file