Spaces:
Running
Running
# -*- coding: utf-8 -*- | |
# --------------------------------------------------------------------------- | |
# 0) Imports | |
# --------------------------------------------------------------------------- | |
import gradio as gr | |
import chromadb | |
import google.generativeai as genai | |
import os | |
from dotenv import load_dotenv | |
import logging | |
import functools | |
from collections import defaultdict | |
import traceback # For detailed error logging | |
import datetime # For timestamped filenames | |
import re # For parsing tangents and LLM JSON output | |
import numpy as np # For cosine similarity calculation | |
import json # For parsing LLM JSON output | |
import threading # tiny fileβlock for the JSON ledger | |
import html # escape text for clickable spans | |
import time # Useful for simple sleeps if needed for debugging timing | |
# --------------------------------------------------------------------------- | |
# --- Configuration --- | |
# Configure logging level | |
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s') | |
# Load environment variables (for API Key) | |
load_dotenv() | |
API_KEY = os.getenv("GEMINI_API_KEY") | |
if not API_KEY: | |
logging.error("GEMINI_API_KEY not found in environment variables.") | |
else: | |
try: | |
genai.configure(api_key=API_KEY) | |
logging.info("Gemini API configured successfully.") | |
except Exception as e: | |
logging.error(f"Error configuring Gemini API: {e}") | |
API_KEY = None | |
# Chroma DB Configuration | |
CHROMA_DB_PATH = "./chroma" | |
COLLECTION_NAME = "phil_de" | |
# Gemini Model Configuration | |
EMBEDDING_MODEL = "models/gemini-embedding-exp-03-07" # Using standard embedding model | |
LLM_RERANK_MODEL_NAME = "models/gemini-2.0-flash" # Use a capable model for reasoning/ranking | |
logging.info(f"Using embedding model: {EMBEDDING_MODEL}") | |
logging.info(f"Using LLM Re-Rank/Truncate generation model: {LLM_RERANK_MODEL_NAME}") | |
# --- Constants --- | |
MAX_RESULTS_STANDARD = 20 # Max results shown in standard search after re-ranking | |
INITIAL_RESULTS_FOR_RERANK = 300 # How many results to fetch initially for re-ranking passes | |
RERANK_WINDOW_SIZE = 2 # +/- N sentences to consider for contextual re-ranking (both passes) | |
MIN_CHARS_FOR_RELEVANT_NEIGHBOR = 6 # Minimum characters for a neighbor to contribute to the re-rank score | |
RERANK_WEIGHT = 0.5 # Weight factor for neighbor similarity in 1st pass re-rank score | |
RERANK_DECAY = 0.1 # Score decay per sentence distance in 1st pass re-rank | |
LLM_RERANK_CANDIDATE_COUNT = 25 # How many candidates (after 1st pass re-rank) to send to LLM | |
LLM_RERANK_TARGET_COUNT = 10 # How many final edited results to request from LLM | |
PROMPT_LOG_DIR = "./prompts" # Directory to save LLM prompts for debugging | |
MAX_RESULTS_PER_AUTHOR = 3 # NEW: Max results from a single author in the final list | |
MAX_FAVOURITES = 50 # Max favourites to load for display | |
# --- Constants for Highlighting --- | |
HIGHLIGHT_HUE = 60 # Yellowish hue | |
HIGHLIGHT_SATURATION = 100 | |
HIGHLIGHT_LIGHTNESS = 90 | |
HIGHLIGHT_MAX_ALPHA = 0.5 # Max transparency (0 = transparent, 1 = opaque) | |
HIGHLIGHT_MIN_ALPHA = 0.05 # Minimum alpha for sentences at the threshold (when max > threshold) | |
HIGHLIGHT_SIMILARITY_THRESHOLD = 0.6 # Minimum cosine similarity score to apply highlighting | |
# βββ FAVOURITES CONFIG ββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
BASE_DIR = os.path.dirname(os.path.abspath(__file__)) # always absolute | |
FAV_FILE = os.path.join(BASE_DIR, "favourites.json") # ./favourites.json | |
_fav_lock = threading.Lock() # fileβwrite lock | |
# --- Define Prompt for LLM Re-ranking V3 --- | |
LLM_RERANKING_PROMPT_TEMPLATE_V3 = """ | |
**Task:** Evaluate, truncate, and re-rank the provided text passages based on their relevance to the user's query. Return exactly the top {target_count} most relevant results, including their original IDs, the edited text, and a brief rationale for each selection. | |
**User Query:** | |
"{user_query}" | |
**Text Passages to Evaluate:** | |
{passage_blocks_str} | |
--- END OF PASSAGES --- | |
**Instructions:** | |
1. **Analyze Query:** Understand the core question or theme of the User Query. | |
2. **Evaluate Each Passage:** For each text passage provided above (identified by "Passage ID:" and separated by '--- PASSAGE SEPARATOR ---'): | |
* Read the entire passage carefully. | |
* Identify the most relevant contiguous sentences within the passage that directly address or best illuminate the User Query. | |
* **Truncate/Edit:** Extract ONLY the most relevant segment. Discard the rest of the passage. The goal is a concise, highly relevant excerpt. If an entire passage seems irrelevant, discard it entirely. | |
* **Rationale Generation:** Briefly explain *why* the segment you extracted is relevant to the User Query. | |
3. **Rank Edited Passages:** Based on the relevance of the *edited/truncated* segments you created, determine a final ranking. The most relevant edited segment should be ranked first. | |
4. **Select Top Results:** Choose exactly the top {target_count} most relevant edited passages from your ranking. If fewer than {target_count} passages were deemed relevant at all, return only those that were. | |
5. **Output:** Provide *only* a JSON formatted list containing exactly the top {target_count} (or fewer, if not enough were relevant) results. Each result object in the list MUST contain: | |
* `"original_id"`: The ID of the passage the text came from. | |
* `"edited_text"`: The concise, truncated text segment you extracted. | |
* `"rationale"`: Your brief explanation of why this segment is relevant. | |
The list should be sorted from most relevant to least relevant. | |
**Required JSON Output Format:** | |
```json | |
{{ | |
"ranked_edited_passages": [ | |
{{ | |
"original_id": "...", | |
"edited_text": "...", | |
"rationale": "..." | |
}}, | |
{{ | |
"original_id": "...", | |
"edited_text": "...", | |
"rationale": "..." | |
}} | |
] | |
}} | |
``` | |
**Final Output (JSON list of objects):** | |
```json | |
""" | |
# --- ChromaDB Connection and Author Fetching --- | |
collection = None | |
unique_authors = [] | |
try: | |
os.makedirs(PROMPT_LOG_DIR, exist_ok=True) | |
logging.info(f"Prompt log directory ensured at: {PROMPT_LOG_DIR}") | |
client = chromadb.PersistentClient(path=CHROMA_DB_PATH) | |
collection = client.get_or_create_collection(name=COLLECTION_NAME) | |
logging.info(f"Successfully connected to ChromaDB collection '{COLLECTION_NAME}'. Collection count: {collection.count()}") | |
logging.info("Fetching all metadata to extract unique authors...") | |
if collection.count() > 0: | |
all_metadata = collection.get(include=['metadatas']) | |
if all_metadata and 'metadatas' in all_metadata and all_metadata['metadatas']: | |
authors_set = set() | |
for meta in all_metadata['metadatas']: | |
if isinstance(meta, dict) and meta.get('author'): | |
authors_set.add(meta['author']) | |
unique_authors = sorted(list(authors_set)) | |
logging.info(f"Found {len(unique_authors)} unique authors.") | |
else: | |
logging.warning("Could not retrieve metadata or no metadata found to extract authors.") | |
else: | |
logging.warning(f"Collection '{COLLECTION_NAME}' is empty. No authors to fetch.") | |
except Exception as e: | |
logging.critical(f"FATAL: Could not connect to Chroma DB, fetch authors, or setup prompt dir: {e}", exc_info=True) | |
unique_authors = [] # Ensure it's an empty list on error | |
# --- Gemini Generation Model Initialization --- | |
llm_rerank_model = None | |
if API_KEY: | |
try: | |
llm_rerank_model = genai.GenerativeModel(LLM_RERANK_MODEL_NAME) | |
logging.info(f"Gemini LLM Re-Rank Model '{LLM_RERANK_MODEL_NAME}' initialized.") | |
except Exception as e: | |
logging.error(f"Error initializing Gemini LLM Re-Rank Model '{LLM_RERANK_MODEL_NAME}': {e}") | |
# --- Embedding Function --- | |
def get_embedding(text, task="RETRIEVAL_QUERY"): | |
"""Generates an embedding for the given text using the configured Gemini model.""" | |
if not API_KEY: | |
logging.error("Cannot generate embedding: API key not configured.") | |
return None | |
if not text or not isinstance(text, str) or not text.strip(): | |
return None | |
valid_task_types = {"RETRIEVAL_QUERY", "RETRIEVAL_DOCUMENT", "SEMANTIC_SIMILARITY", "CLASSIFICATION", "CLUSTERING"} | |
if task not in valid_task_types: | |
logging.warning(f"Invalid task type '{task}' for embedding model. Defaulting to 'RETRIEVAL_QUERY'.") | |
task = "RETRIEVAL_QUERY" | |
try: | |
logging.debug(f"Requesting embedding for text: '{text[:50]}...' with task: {task}") | |
result = genai.embed_content(model=EMBEDDING_MODEL, content=text, task_type=task) | |
embedding = result.get('embedding') | |
if embedding: | |
logging.debug(f"Embedding received. Type: {type(embedding)}, Length (if list): {len(embedding) if isinstance(embedding, list) else 'N/A'}") | |
else: | |
logging.warning("Gemini API returned result without 'embedding' key.") | |
return embedding | |
except Exception as e: | |
logging.error(f"Error generating Gemini embedding for '{text[:50]}...': {e}", exc_info=True) | |
if "resource has been exhausted" in str(e).lower(): | |
logging.error("Embedding failed likely due to quota exhaustion.") | |
elif "api key not valid" in str(e).lower(): | |
logging.error("Embedding failed due to invalid API key.") | |
return None | |
# --- Helper: Fetch Embeddings for Neighbor IDs --- | |
def fetch_embeddings_for_ids(ids_to_fetch_tuple): | |
"""Fetches embeddings for a tuple of passage IDs from ChromaDB.""" | |
if collection is None or not ids_to_fetch_tuple: | |
return {} | |
valid_ids = [str(id_val) for id_val in ids_to_fetch_tuple if id_val is not None] | |
if not valid_ids: | |
return {} | |
embeddings_map = {} | |
try: | |
logging.debug(f"Fetching embeddings for {len(valid_ids)} neighbor IDs.") | |
results = collection.get(ids=valid_ids, include=['embeddings']) | |
ids_list = results.get('ids') | |
embeddings_list = results.get('embeddings') | |
if ids_list is not None and embeddings_list is not None and len(ids_list) == len(embeddings_list): | |
for i, fetched_id in enumerate(ids_list): | |
if embeddings_list[i] is not None: | |
embeddings_map[fetched_id] = embeddings_list[i] | |
else: | |
logging.warning(f"Embedding for neighbor ID {fetched_id} was None in DB result.") | |
elif ids_list is not None or embeddings_list is not None: | |
logging.error(f"Mismatch/Incomplete fetch for neighbor embeddings. Fetched IDs: {len(ids_list) if ids_list is not None else 'None'}, Embeddings: {len(embeddings_list) if embeddings_list is not None else 'None'} for {len(valid_ids)} requested IDs.") | |
except Exception as e: | |
logging.error(f"Error fetching neighbor embeddings for IDs {valid_ids}: {e}", exc_info=True) | |
return embeddings_map | |
# --- Helper: Fetch all sentences for a specific paragraph --- | |
def fetch_paragraph_data(author, book, paragraph_index): | |
"""Fetches all sentence data (doc, meta, embedding) for a specific paragraph.""" | |
logging.debug(f"Attempting fetch_paragraph_data: Author='{author}', Book='{book}', ParaIdx={paragraph_index}") | |
if collection is None or author is None or book is None or paragraph_index is None or paragraph_index < 0: | |
logging.warning(f"fetch_paragraph_data: Invalid arguments provided.") | |
return [] | |
try: | |
paragraph_index_int = int(paragraph_index) # Ensure integer for query | |
results = collection.get( | |
where={"$and": [{"author": author}, {"book": book}, {"paragraph_index": paragraph_index_int}]}, | |
include=['documents', 'metadatas', 'embeddings'] # Crucial: include embeddings for highlighting | |
) | |
if not results or not results.get('ids'): | |
logging.debug(f"No sentences found for Author='{author}', Book='{book}', ParagraphIndex={paragraph_index_int}") | |
return [] | |
paragraph_sentences = [] | |
num_results = len(results['ids']) | |
documents_list = results.get('documents', []) | |
metadatas_list = results.get('metadatas', []) | |
embeddings_list = results.get('embeddings', []) | |
if not (num_results == len(documents_list) == len(metadatas_list) == len(embeddings_list)): | |
logging.warning(f"fetch_paragraph_data: Length mismatch in results for {author}/{book}/P{paragraph_index_int}. IDs:{num_results}, Docs:{len(documents_list)}, Metas:{len(metadatas_list)}, Embs:{len(embeddings_list)}. Clamping to minimum.") | |
num_results = min(num_results, len(documents_list), len(metadatas_list), len(embeddings_list)) | |
for i in range(num_results): | |
sent_id = results['ids'][i] | |
meta = metadatas_list[i] | |
doc = documents_list[i] | |
emb = embeddings_list[i] # Get embedding | |
if doc is None or emb is None: # Embedding needed for highlighting | |
logging.warning(f"Skipping sentence {sent_id} in paragraph {paragraph_index_int} due to missing document or embedding.") | |
continue | |
entry = {'id': sent_id, 'doc': doc, 'meta': meta or {}, 'embedding': emb, 'paragraph_index': meta.get('paragraph_index', paragraph_index_int)} | |
try: | |
entry['sentence_sort_key'] = int(sent_id) | |
except (ValueError, TypeError): | |
entry['sentence_sort_key'] = float('inf') # Put unparsable IDs at the end | |
logging.warning(f"Could not parse sentence ID as integer for sorting: {sent_id}") | |
paragraph_sentences.append(entry) | |
paragraph_sentences.sort(key=lambda x: x.get('sentence_sort_key', float('inf'))) | |
logging.debug(f"Fetched and sorted {len(paragraph_sentences)} sentences for paragraph {paragraph_index_int}.") | |
return paragraph_sentences | |
except Exception as e: | |
logging.error(f"Error fetching paragraph data for Author='{author}', Book='{book}', ParagraphIndex={paragraph_index}: {e}", exc_info=True) | |
return [] | |
# --- Helper: Fetch Documents and Metadata for Multiple IDs --- | |
def fetch_multiple_passage_data(passage_ids): | |
"""Fetches documents and metadata for multiple passage IDs from ChromaDB.""" | |
if not passage_ids or collection is None: | |
logging.warning(f"fetch_multiple_passage_data called with no IDs or no collection.") | |
return {} | |
passage_data_map = {} | |
try: | |
str_ids = [str(pid) for pid in passage_ids if pid is not None] | |
if not str_ids: return {} | |
logging.debug(f"Fetching passage data for {len(str_ids)} IDs: {str_ids[:10]}...") | |
results = collection.get(ids=str_ids, include=['documents', 'metadatas']) | |
if results and results.get('ids'): | |
fetched_ids = results['ids'] | |
docs = results.get('documents', []) | |
metas = results.get('metadatas', []) | |
if not (len(fetched_ids) == len(docs) == len(metas)): | |
logging.error(f"Mismatch in lengths returned by collection.get for multiple IDs: {len(fetched_ids)} IDs, {len(docs)} docs, {len(metas)} metas. IDs requested: {str_ids}") | |
# Attempt to process based on shortest list? For now, proceed cautiously. | |
id_to_index = {fid: i for i, fid in enumerate(fetched_ids)} | |
# num_fetched = len(fetched_ids) # Unused after refactor | |
for req_id in str_ids: | |
if req_id in id_to_index: | |
idx = id_to_index[req_id] | |
# Check index bounds against potentially mismatched lists | |
doc = docs[idx] if idx < len(docs) and docs[idx] is not None else "_Text fehlt_" | |
meta = metas[idx] if idx < len(metas) and metas[idx] is not None else {} | |
passage_data_map[req_id] = {'doc': doc, 'meta': meta} | |
if doc == "_Text fehlt_": logging.warning(f"Missing document for fetched ID: {req_id}") | |
if not meta: logging.warning(f"Missing metadata for fetched ID: {req_id}") | |
else: | |
logging.warning(f"Requested ID not found in collection.get results: {req_id}") | |
missing_ids = set(str_ids) - set(passage_data_map.keys()) | |
if missing_ids: | |
logging.warning(f"Could not find any data (doc/meta) for requested IDs: {missing_ids}") | |
else: | |
logging.warning(f"ChromaDB get returned no results or no IDs for requested list: {str_ids[:10]}...") | |
except Exception as e: | |
logging.error(f"Error fetching multiple passage data for IDs {passage_ids}: {e}", exc_info=True) | |
return passage_data_map | |
# --- Helper: Calculate Cosine Similarity --- | |
def cosine_similarity_np(vec1, vec2): | |
"""Calculates cosine similarity between two vectors using NumPy.""" | |
if vec1 is None or vec2 is None: | |
return 0.0 | |
try: | |
vec1 = np.array(vec1, dtype=np.float32) | |
vec2 = np.array(vec2, dtype=np.float32) | |
except Exception as e: | |
logging.error(f"Error converting vectors to numpy arrays for cosine similarity: {e}. vec1 type: {type(vec1)}, vec2 type: {type(vec2)}") | |
return 0.0 | |
if vec1.shape != vec2.shape: | |
if vec1.size > 0 and vec2.size > 0: | |
logging.warning(f"Cosine similarity shape mismatch: {vec1.shape} vs {vec2.shape}") | |
return 0.0 | |
if vec1.ndim == 0 or vec1.size == 0: | |
return 0.0 | |
norm1 = np.linalg.norm(vec1) | |
norm2 = np.linalg.norm(vec2) | |
if norm1 == 0 or norm2 == 0: | |
return 0.0 | |
epsilon = 1e-10 # Small value to prevent division by zero | |
similarity = np.dot(vec1, vec2) / (norm1 * norm2 + epsilon) | |
return float(np.clip(similarity, -1.0, 1.0)) | |
# --- Helper: Compare Passage Metadata --- | |
def compare_passage_metadata(meta1, meta2): | |
"""Checks if two passages share the same author, book, section, and title metadata.""" | |
if not meta1 or not meta2: return False | |
return (meta1.get('author') == meta2.get('author') and | |
meta1.get('book') == meta2.get('book') and | |
(meta1.get('section') is None and meta2.get('section') is None or meta1.get('section') == meta2.get('section')) and | |
(meta1.get('title') is None and meta2.get('title') is None or meta1.get('title') == meta2.get('title'))) | |
# --- Favourite-helpers --- | |
def _load_favs() -> dict[str, int]: | |
logging.debug(f"Attempting to load favourites from {FAV_FILE}") | |
try: | |
with open(FAV_FILE, "r", encoding="utf-8") as fh: | |
raw = json.load(fh) | |
# Ensure IDs are strings and scores are integers | |
favs = {str(k): int(v) for k, v in raw.items()} | |
logging.debug(f"Successfully loaded {len(favs)} favourites.") | |
return favs | |
except FileNotFoundError: | |
logging.debug(f"Favourites file not found at {FAV_FILE.strip()}. Starting with empty favourites.") | |
return {} | |
except Exception as e: | |
logging.error(f"Could not read {FAV_FILE}: {e}", exc_info=True) | |
return {} | |
def _save_favs() -> None: | |
logging.debug(f"Attempting to save favourites to {FAV_FILE}") | |
tmp = FAV_FILE + ".tmp" | |
try: | |
# This code is now directly executed when _save_favs() is called. | |
# It relies on the CALLER (e.g., inc_favourite) holding the lock. | |
with open(tmp, "w", encoding="utf-8") as fh: | |
logging.debug(f"Opened temp file {tmp} for writing.") | |
json.dump(favourite_scores, fh, ensure_ascii=False, indent=2) | |
logging.debug("Dumped favourites to temp file.") | |
fh.flush() | |
logging.debug("Flushed temp file.") | |
os.fsync(fh.fileno()) # Force write to disk | |
logging.debug("Synced temp file.") | |
# logging.debug(f"Closed temp file {tmp}.") # This line is now after the 'with open' block | |
os.replace(tmp, FAV_FILE) # Atomic replace | |
logging.debug(f"Successfully replaced {FAV_FILE} with temp file.") | |
logging.debug(f"Successfully saved {len(favourite_scores)} favourites.") | |
except Exception as e: | |
logging.error(f"Could not save {FAV_FILE}: {e}", exc_info=True) | |
favourite_scores: dict[str, int] = _load_favs() # Load favourites on startup | |
def inc_favourite(passage_id: str) -> int: | |
"""Add one β to a sentence, persist, return new total.""" | |
logging.info(f"Attempting to increment favourite for ID: {passage_id}") | |
if not passage_id or not isinstance(passage_id, str): | |
logging.warning(f"Invalid passage_id for inc_favourite: {passage_id}") | |
return 0 | |
with _fav_lock: | |
# Ensure ID is treated as string key | |
str_passage_id = str(passage_id) | |
favourite_scores[str_passage_id] = favourite_scores.get(str_passage_id, 0) + 1 | |
_save_favs() | |
new_score = favourite_scores[str_passage_id] | |
logging.info(f"Incremented favourite for ID {str_passage_id}. New score: {new_score}") | |
return new_score | |
def top_favourites(n: int = MAX_FAVOURITES) -> list[dict]: | |
"""Return N topβscored sentences incl. doc/meta.""" | |
logging.debug(f"Fetching top {n} favourites.") | |
if not favourite_scores: | |
logging.debug("No favourites available.") | |
return [] | |
try: | |
# Sort items, convert keys to str explicitly just in case | |
top = sorted([(str(k), v) for k, v in favourite_scores.items()], key=lambda kv: kv[1], reverse=True)[:n] | |
ids = [sid for sid, _ in top] | |
logging.debug(f"Top {len(top)} favourite IDs: {ids}") | |
data = fetch_multiple_passage_data(ids) # Fetch document and metadata | |
logging.debug(f"Fetched data for {len(data)} favourite IDs.") | |
results = [] | |
for sid, score in top: | |
if sid not in data: | |
logging.warning(f"Could not retrieve data for favourite ID {sid}. Skipping.") | |
continue | |
entry = { | |
"id": sid, # The ID | |
"document": data[sid]["doc"], # The text | |
"metadata": data[sid]["meta"], # The metadata | |
"distance": 0.0, # Favourites don't have a semantic distance in this view | |
"favourite_score": score, # The favourite score | |
} | |
results.append(entry) | |
logging.debug(f"Prepared {len(results)} top favourite results.") | |
return results | |
except Exception as e: | |
logging.error(f"Error fetching top favourites: {e}", exc_info=True) | |
return [] | |
# --- Combined Formatting Function for all result types (Standard, LLM, Favourites) --- | |
def format_result_display(result_data, index, total_results, result_type): | |
"""Formats a single search, LLM, or favourite result for Accordion/Textbox display.""" | |
if not result_data or not isinstance(result_data, dict): | |
# Return empty strings for both parts on error | |
return "Keine Ergebnisdaten verfΓΌgbar.", "" | |
metadata = result_data.get('metadata', {}) | |
# Determine what text to display and its label | |
# Favourites might have 'document', Standard/LLM might have 'context_block' or 'edited_text' | |
display_text = result_data.get('edited_text', result_data.get('context_block', result_data.get('document', "_Text fehlt_"))) | |
# Determine what ID label to use | |
# Prioritize original_id (LLM), then id (standard search/context/fav), then fallback | |
result_id = result_data.get('original_id', result_data.get('id', 'N/A')) | |
# --- Construct the Accordion Heading --- | |
accordion_title = "" | |
if result_type == "llm": | |
accordion_title = f"Gedanke {index + 1} von {total_results}" | |
elif result_type == "standard": | |
accordion_title = f"Gedanke {index + 1} von {total_results}" | |
elif result_type == "favourites": | |
score = result_data.get('favourite_score', 0) | |
accordion_title = f"β{score}" # Title is just the star score | |
# --- Construct the Accordion Content (Metadata & Scores) --- | |
accordion_content_md = "" | |
score_info_lines = [] | |
# Favourite score is already in title for favs, only show for standard/LLM if present | |
if 'favourite_score' in result_data and result_data['favourite_score'] is not None: | |
if result_type != "favourites": | |
score_info_lines.append(f"* β Score: {result_data['favourite_score']}") | |
if 'final_similarity' in result_data and result_data['final_similarity'] is not None: | |
score_info_lines.append(f"* Score (Kontext-Gewichtet): {result_data['final_similarity']:.4f}") | |
score_info = "\n".join(score_info_lines) + "\n\n" if score_info_lines else "\n" | |
author = metadata.get('author', 'N/A') | |
book = metadata.get('book', 'N/A') | |
section = metadata.get('section', None) | |
titel = metadata.get('title', None) | |
accordion_content_md += f"* Autor: {author}\n* Buch: {book}\n" | |
if section and str(section).strip().lower() not in ["unknown", "n/a", ""]: | |
accordion_content_md += f"* Abschnitt: {section}\n" | |
if titel is not None and str(titel).strip().lower() not in ["unknown", "n/a", ""]: | |
try: accordion_content_md += f"* Titel/Nr: {int(titel)}\n" | |
except (ValueError, TypeError): accordion_content_md += f"* Titel/Nr: {titel}\n" | |
accordion_content_md += score_info | |
# --- ADDED: Include LLM Rationale if available and this is an LLM result --- | |
# Check for both result_type and the presence of the 'rationale' key | |
if result_type == "llm" and 'rationale' in result_data and result_data['rationale']: | |
accordion_content_md += f"**LLM BegrΓΌndung:**\n> {result_data['rationale']}\n\n" | |
# --- END ADDED --- | |
# The text content for the Textbox is just the display_text | |
text_content = display_text | |
# Return the two separate parts | |
return accordion_title, accordion_content_md, text_content | |
# --- Contextual Re-ranking Function (V4) --- | |
def rerank_with_context(candidates, original_query_embedding, target_n_results, weight, decay_factor, window_size, min_chars_neighbor): | |
""" | |
Re-ranks candidate passages based on context similarity to the query, | |
normalizing initial and context scores, combining them additively, | |
selecting the best-scoring representative for each unique central ID, | |
and finally applying an author quota for diversity. | |
""" | |
logging.info(f"Starting contextual re-ranking (V4: Norm+DeDup+Quota) for {len(candidates)} candidates... " | |
f"(Win={window_size}, Weight={weight:.2f}, Decay={decay_factor:.2f}, MinChars={min_chars_neighbor}, AuthQuota={MAX_RESULTS_PER_AUTHOR})") | |
if not candidates or original_query_embedding is None: | |
logging.warning("rerank_with_context called with no candidates or no query embedding.") | |
return candidates[:target_n_results] if candidates else [] | |
# --- Phase 1: Calculate Initial Similarities and Find Range --- | |
initial_similarities = [] | |
processed_candidates_phase1 = [] | |
logging.debug("Phase 1: Calculating initial similarities...") | |
for i, candidate in enumerate(candidates): | |
initial_distance = candidate.get('distance') | |
if initial_distance is None or not isinstance(initial_distance, (float, int)) or initial_distance < 0: initial_similarity = 0.0 | |
else: initial_similarity = max(0.0, 1.0 - float(initial_distance)) # Convert distance to similarity (lower distance = higher similarity) | |
candidate['initial_similarity'] = initial_similarity | |
initial_similarities.append(initial_similarity) | |
processed_candidates_phase1.append(candidate) | |
min_initial_sim = min(initial_similarities) if initial_similarities else 0.0 | |
max_initial_sim = max(initial_similarities) if initial_similarities else 0.0 | |
logging.debug(f"Initial Similarity Range: Min={min_initial_sim:.4f}, Max={max_initial_sim:.4f}") | |
# --- Phase 2: Calculate Combined Neighbor Similarities --- | |
passage_data_map = {str(cand['id']): {'doc': cand.get('document'), 'meta': cand.get('metadata', {})} for cand in processed_candidates_phase1} | |
neighbor_embeddings_cache = {} | |
all_neighbor_ids_to_fetch = set() | |
candidate_neighbor_map = defaultdict(lambda: {'prev': [], 'next': []}) | |
potential_neighbor_distances = {} | |
# Pass 2.1: Identify neighbors | |
for candidate in processed_candidates_phase1: | |
try: | |
center_id_str = str(candidate['id']) | |
center_id_int = int(center_id_str) | |
potential_neighbor_distances[center_id_str] = {} | |
for dist in range(1, window_size + 1): | |
prev_id_int, next_id_int = center_id_int - dist, center_id_int + dist | |
if prev_id_int >= 0: | |
prev_id_str = str(prev_id_int); all_neighbor_ids_to_fetch.add(prev_id_str); candidate_neighbor_map[center_id_str]['prev'].append(prev_id_str); potential_neighbor_distances[center_id_str][prev_id_str] = dist | |
next_id_str = str(next_id_int); all_neighbor_ids_to_fetch.add(next_id_str); candidate_neighbor_map[center_id_str]['next'].append(next_id_str); potential_neighbor_distances[center_id_str][next_id_str] = dist | |
candidate_neighbor_map[center_id_str]['prev'].sort(key=int, reverse=True) | |
candidate_neighbor_map[center_id_str]['next'].sort(key=int) | |
except (ValueError, TypeError): | |
logging.warning(f"Could not parse candidate ID {candidate.get('id')} as integer for neighbor finding.") | |
continue | |
# Pass 2.2: Fetch neighbor data (embeddings, docs, metas) | |
ids_needed_for_fetch = list(all_neighbor_ids_to_fetch) | |
if ids_needed_for_fetch: | |
fetched_embeddings = fetch_embeddings_for_ids(tuple(ids_needed_for_fetch)); neighbor_embeddings_cache.update(fetched_embeddings) | |
ids_to_fetch_docs_meta = [nid for nid in ids_needed_for_fetch if nid not in passage_data_map] | |
if ids_to_fetch_docs_meta: | |
fetched_neighbor_docs_meta = fetch_multiple_passage_data(ids_to_fetch_docs_meta); passage_data_map.update(fetched_neighbor_docs_meta) | |
# Pass 2.3: Calculate combined similarity per candidate and construct context block | |
combined_neighbor_similarities = [] | |
scored_candidates = [] | |
logging.debug("Phase 2: Calculating combined neighbor similarities and constructing context blocks...") | |
for candidate in processed_candidates_phase1: | |
try: | |
center_id_str = str(candidate['id']) | |
center_meta = candidate.get('metadata', {}) | |
total_weighted_similarity = 0.0 | |
total_weight = 0.0 | |
candidate_neighbors_dist = potential_neighbor_distances.get(center_id_str, {}) | |
# Calculate weighted neighbor similarity | |
for neighbor_id_str, dist_level in candidate_neighbors_dist.items(): | |
neighbor_emb = neighbor_embeddings_cache.get(neighbor_id_str) | |
neighbor_data = passage_data_map.get(neighbor_id_str) | |
if neighbor_emb is not None and neighbor_data: | |
neighbor_meta = neighbor_data.get('meta') | |
neighbor_doc = neighbor_data.get('doc') | |
if (neighbor_meta is not None and compare_passage_metadata(center_meta, neighbor_meta) | |
and neighbor_doc and isinstance(neighbor_doc, str) and len(neighbor_doc) >= min_chars_neighbor): | |
neighbor_sim_to_query = cosine_similarity_np(original_query_embedding, neighbor_emb) | |
current_decay = max(0.0, 1.0 - ((dist_level - 1) * decay_factor)) | |
current_weight = current_decay # Weight by decayed distance | |
total_weighted_similarity += neighbor_sim_to_query * current_weight | |
total_weight += current_weight | |
combined_sim = total_weighted_similarity / total_weight if total_weight > 0 else 0.0 | |
candidate['combined_neighbor_similarity'] = combined_sim | |
combined_neighbor_similarities.append(combined_sim) | |
# Construct context block for this candidate using ALL neighbors (even short ones) | |
context_block_text = _construct_passage_block(center_id_str, passage_data_map, candidate_neighbor_map) | |
candidate['context_block'] = context_block_text | |
scored_candidates.append(candidate) | |
except Exception as e: | |
logging.error(f"Error processing candidate ID {candidate.get('id')} during neighbor scoring/context block: {e}", exc_info=True) | |
candidate['combined_neighbor_similarity'] = 0.0 | |
combined_neighbor_similarities.append(0.0) | |
candidate['context_block'] = "_Fehler bei Kontext-Erstellung_" | |
scored_candidates.append(candidate) | |
# --- Phase 3: Find Context Score Range --- | |
min_combined_sim = min(combined_neighbor_similarities) if combined_neighbor_similarities else 0.0 | |
max_combined_sim = max(combined_neighbor_similarities) if combined_neighbor_similarities else 0.0 | |
logging.debug(f"Combined Neighbor Similarity Range: Min={min_combined_sim:.4f}, Max={max_combined_sim:.4f}") | |
# --- Phase 4: Normalize and Combine Scores --- | |
logging.debug("Phase 4: Normalizing and combining scores...") | |
initial_range = max_initial_sim - min_initial_sim | |
combined_range = max_combined_sim - min_combined_sim | |
for candidate in scored_candidates: | |
try: | |
initial_sim = candidate.get('initial_similarity', 0.0) | |
combined_sim = candidate.get('combined_neighbor_similarity', 0.0) | |
initial_norm = 0.5 # Default to 0.5 if range is zero | |
if initial_range > 1e-9: | |
initial_norm = max(0.0, min(1.0, (initial_sim - min_initial_sim) / initial_range)) | |
combined_norm = 0.5 # Default to 0.5 if range is zero | |
if combined_range > 1e-9: | |
combined_norm = max(0.0, min(1.0, (combined_sim - min_combined_sim) / combined_range)) | |
# Additive combination based on weight | |
final_similarity = (1.0 - weight) * initial_norm + weight * combined_norm | |
candidate['final_similarity'] = final_similarity | |
# logging.debug(f"Candidate ID {candidate.get('id')}: Initial Norm={initial_norm:.4f}, Combined Norm={combined_norm:.4f}, Final Score={final_similarity:.4f}") | |
except Exception as e: | |
logging.error(f"Error calculating final similarity for candidate ID {candidate.get('id')}: {e}", exc_info=True) | |
candidate['final_similarity'] = -1.0 # Penalize on error | |
# --- Phase 5: Group by ID and Select Best Representative --- | |
logging.debug("Phase 5: Grouping by ID and selecting best representative...") | |
best_candidate_by_id = {} | |
for candidate in scored_candidates: | |
center_id = candidate.get('id') | |
current_score = candidate.get('final_similarity', -1.0) | |
if not center_id: | |
logging.warning(f"Skipping candidate with missing ID: {candidate}") | |
continue | |
existing_candidate = best_candidate_by_id.get(center_id) | |
# Keep the candidate with the highest final_similarity for each unique ID | |
if not existing_candidate or current_score > existing_candidate.get('final_similarity', -1.0): | |
best_candidate_by_id[center_id] = candidate | |
unique_best_candidates = list(best_candidate_by_id.values()) | |
logging.info(f"Reduced {len(scored_candidates)} candidates to {len(unique_best_candidates)} unique ID representatives.") | |
# --- Phase 6: Sort Unique Representatives --- | |
unique_best_candidates.sort(key=lambda x: x.get('final_similarity', -1.0), reverse=True) | |
logging.debug(f"Sorted {len(unique_best_candidates)} unique representatives by score.") | |
# --- Phase 7: Apply Author Quota --- | |
logging.debug(f"Phase 7: Applying author quota (max {MAX_RESULTS_PER_AUTHOR} per author)...") | |
author_counts = defaultdict(int) | |
final_diverse_results = [] | |
authors_seen_in_final = set() | |
for candidate in unique_best_candidates: | |
# Stop if we already have enough results | |
if len(final_diverse_results) >= target_n_results: | |
logging.debug(f"Reached target result count {target_n_results}. Stopping quota application.") | |
break | |
meta = candidate.get('metadata', {}) | |
# Use author 'Unknown' if metadata or author key is missing | |
author = meta.get('author', 'Unknown') | |
if author_counts[author] < MAX_RESULTS_PER_AUTHOR: | |
final_diverse_results.append(candidate) | |
author_counts[author] += 1 | |
authors_seen_in_final.add(author) | |
# logging.debug(f"Added candidate ID {candidate.get('id')} from author '{author}'. Count: {author_counts[author]}") | |
# else: | |
# logging.debug(f"Skipping candidate ID {candidate.get('id')} from author '{author}' due to quota ({author_counts[author]}).") | |
logging.info(f"Quota applied. Selected {len(final_diverse_results)} results from {len(authors_seen_in_final)} unique authors.") | |
# Return the quota-filtered list | |
return final_diverse_results # No need to slice again, loop breaks at target_n_results | |
# --- Modified Format Context for Reading Area (Revision 6 - HTML Output) --- | |
def format_context_markdown(passages_state_list, query_embedding): | |
"""Formats a list of paragraph sentences for HTML display with dynamic highlighting. | |
Uses class/data-id for JS event listeners.""" | |
logging.info(f"Formatting context HTML for {len(passages_state_list)} passages.") | |
# --- Validate Query Embedding (same) --- | |
is_query_embedding_valid = False | |
query_embedding_np = None | |
if isinstance(query_embedding, (list, np.ndarray)): | |
try: | |
query_embedding_np = np.array(query_embedding, dtype=np.float32) | |
if query_embedding_np.ndim == 1 and query_embedding_np.size > 0: | |
is_query_embedding_valid = True | |
logging.debug(f"Query embedding is valid (Shape: {query_embedding_np.shape}). Highlighting enabled.") | |
else: logging.warning("Query embedding received but is empty or has wrong dimensions. Highlighting disabled.") | |
except Exception as e: | |
logging.error(f"Error converting or checking query embedding: {e}. Highlighting disabled.") | |
else: logging.warning(f"Query embedding is type {type(query_embedding)}. Highlighting disabled.") | |
if not passages_state_list: | |
return "<div>_Kein Kontext zum Anzeigen._</div>" # Return valid HTML | |
# --- Step 1: Calculate all similarities and find relevant range (same) --- | |
sentence_similarities = {} | |
scores_above_threshold = [] | |
if is_query_embedding_valid: | |
logging.debug("Calculating similarities for dynamic highlighting...") | |
for i, sentence_data in enumerate(passages_state_list): | |
sentence_embedding = sentence_data.get('embedding') | |
sentence_id = sentence_data.get('id', f'index_{i}') # Use index if ID missing | |
sentence_role = sentence_data.get('role', 'context') | |
# Skip markers or sentences without embeddings | |
if sentence_role == 'missing' or sentence_embedding is None: | |
continue | |
try: | |
similarity_score = cosine_similarity_np(query_embedding_np, sentence_embedding) | |
sentence_similarities[i] = similarity_score # Store score by index | |
if similarity_score >= HIGHLIGHT_SIMILARITY_THRESHOLD: | |
scores_above_threshold.append(similarity_score) | |
except Exception as e: | |
logging.warning(f"Error calculating similarity for sentence ID {sentence_id} (Index {i}): {e}") | |
max_relevant_score = -1.0 | |
min_relevant_score = HIGHLIGHT_SIMILARITY_THRESHOLD | |
if scores_above_threshold: | |
max_relevant_score = max(scores_above_threshold) | |
logging.debug(f"Dynamic Highlighting: Min Relevant Score (Threshold) = {min_relevant_score:.4f}, Max Relevant Score = {max_relevant_score:.4f}") | |
else: | |
logging.debug("Dynamic Highlighting: No sentences met the similarity threshold.") | |
# --- Step 2: Format output as HTML --- | |
# Ensure passages are sorted correctly | |
passages_state_list.sort(key=lambda x: (x.get('paragraph_index', -1), x.get('sentence_sort_key', float('inf')))) | |
output_parts = [] | |
current_paragraph_index = None | |
previous_section = "__INITIAL_NONE__" | |
previous_title = "__INITIAL_NONE__" | |
is_first_paragraph_overall = True | |
PLACEHOLDERS_TO_IGNORE = {"unknown", "n/a", "", None} | |
is_paragraph_open = False # Track if we need to close a <p> tag | |
for i, sentence_data in enumerate(passages_state_list): | |
sentence_doc = sentence_data.get('doc', '_Text fehlt_') | |
sentence_meta = sentence_data.get('meta', {}) | |
sentence_para_idx = sentence_data.get('paragraph_index') | |
sentence_role = sentence_data.get('role', 'context') | |
sentence_id = sentence_data.get('id', f'index_{i}') | |
# --- Handle boundary markers (as HTML) --- | |
if sentence_role == 'missing': | |
if is_paragraph_open: | |
output_parts.append("</p>\n") # Close previous paragraph | |
is_paragraph_open = False | |
output_parts.append(f"<p><em>{html.escape(sentence_doc)}</em></p>\n") # Use <em> for italics | |
current_paragraph_index = None | |
is_first_paragraph_overall = True | |
# No need for extra newlines between markers in HTML, <p> handles blocks | |
# if i < len(passages_state_list) - 1: output_parts.append("<br><br>") # Optional: explicit vertical space | |
continue | |
# --- Check for Paragraph Start and Handle Headings/Separators (as HTML) --- | |
is_new_paragraph = (sentence_para_idx is not None and sentence_para_idx != current_paragraph_index) | |
if is_new_paragraph: | |
if is_paragraph_open: | |
output_parts.append("</p>\n") # Close previous paragraph | |
is_paragraph_open = False | |
current_section = sentence_meta.get('section') | |
current_title = sentence_meta.get('title') | |
norm_prev_section = None if str(previous_section).strip().lower() in PLACEHOLDERS_TO_IGNORE else previous_section | |
norm_prev_title = None if str(previous_title).strip().lower() in PLACEHOLDERS_TO_IGNORE else previous_title | |
norm_curr_section = None if str(current_section).strip().lower() in PLACEHOLDERS_TO_IGNORE else current_section | |
norm_curr_title = None if str(current_title).strip().lower() in PLACEHOLDERS_TO_IGNORE else current_title | |
section_changed = (norm_curr_section != norm_prev_section) | |
title_changed = (norm_curr_title != norm_prev_title) | |
# --- REMOVED/COMMENTED OUT: This is where the <hr> was added --- | |
# if not is_first_paragraph_overall: | |
# if section_changed or title_changed: | |
# output_parts.append("<hr>\n") # Use <hr> for separator | |
# --- END REMOVED/COMMENTED OUT --- | |
heading_parts_to_add = [] | |
if section_changed and norm_curr_section is not None: | |
heading_parts_to_add.append(f"<h3>{html.escape(str(norm_curr_section))}</h3>\n") # Use <h3> | |
if title_changed and norm_curr_title is not None: | |
title_str = str(norm_curr_title).strip() | |
title_display = html.escape(title_str) | |
try: title_display = html.escape(str(int(title_str))) # Attempt int cast if relevant | |
except (ValueError, TypeError): pass # Keep string if not int | |
heading_parts_to_add.append(f"<h4>{title_display}</h4>\n") # Use <h4> | |
if heading_parts_to_add: | |
output_parts.extend(heading_parts_to_add) | |
output_parts.append("<p>") # Open new paragraph tag | |
is_paragraph_open = True | |
previous_section = current_section | |
previous_title = current_title | |
current_paragraph_index = sentence_para_idx | |
is_first_paragraph_overall = False | |
elif not is_paragraph_open: | |
# Handle case where first item is not a paragraph start marker | |
output_parts.append("<p>") | |
is_paragraph_open = True | |
# --- Sentence Formatting and DYNAMIC Highlighting (as HTML Spans) --- | |
# Build attributes for the SINGLE span element | |
span_classes = ["clickable-sentence"] | |
# Use inline style for cursor:pointer for simplicity, although CSS is also fine | |
# style_parts = ["cursor:pointer;"] # <-- Moved cursor to CSS | |
style_parts = [] | |
safe_doc = html.escape(sentence_doc) | |
current_score = sentence_similarities.get(i) | |
# Determine if highlighting should be applied | |
apply_highlight = is_query_embedding_valid and current_score is not None and current_score >= min_relevant_score | |
alpha = 0.0 | |
if apply_highlight: | |
try: | |
if max_relevant_score > min_relevant_score: | |
normalized_score = (current_score - min_relevant_score) / (max_relevant_score - min_relevant_score) | |
alpha = HIGHLIGHT_MIN_ALPHA + normalized_score * (HIGHLIGHT_MAX_ALPHA - HIGHLIGHT_MIN_ALPHA) | |
alpha = max(HIGHLIGHT_MIN_ALPHA, min(alpha, HIGHLIGHT_MAX_ALPHA)) | |
elif max_relevant_score == min_relevant_score: | |
alpha = HIGHLIGHT_MIN_ALPHA | |
except Exception as e: | |
logging.warning(f"Error calculating dynamic highlighting alpha for sentence ID {sentence_id}: {e}") | |
alpha = 0.0 # Disable highlighting on error | |
# Apply highlighting by adding the class and style properties (including the CSS variable) | |
if alpha > 0: | |
span_classes.append("highlighted") | |
# Add dynamic styles (padding, border-radius, box-decoration-break) to style_parts | |
style_parts.append("padding: 1px 3px;") | |
style_parts.append("border-radius: 3px;") | |
style_parts.append("box-decoration-break: clone;") | |
style_parts.append("-webkit-box-decoration-break: clone;") | |
# Set the CSS variable for the alpha | |
style_parts.append(f"--highlight-alpha: {alpha:.2f};") | |
# DO NOT set background-color here - it's set in CSS using the variable | |
# Join the classes and styles | |
class_str = " ".join(span_classes) | |
style_str = " ".join(style_parts) | |
# Construct the single span element | |
# ADDED cursor: pointer to CSS, removed from inline style below | |
formatted_sentence = ( | |
f'<span class="{class_str}" data-id="{sentence_id}" style="{style_str}">' | |
f"{safe_doc}</span>" | |
) | |
# --- Append Formatted Sentence with Spacing (handle HTML spaces) --- | |
# Add a space before if not the first sentence in the paragraph | |
if not is_new_paragraph and is_paragraph_open and i > 0 and passages_state_list[i-1].get('role') != 'missing' and sentence_role != 'missing': | |
# Find the previous non-missing sentence to check if it was the end of a paragraph block | |
prev_valid_sentence_index = i - 1 | |
while prev_valid_sentence_index >= 0 and passages_state_list[prev_valid_sentence_index].get('role') == 'missing': | |
prev_valid_sentence_index -= 1 | |
# Add a space unless the previous element was a heading, hr, or paragraph open tag | |
# This check is implicitly handled by the is_new_paragraph logic and checking if is_paragraph_open. | |
# If it's not a new paragraph and the paragraph is open, we generally want a space. | |
if prev_valid_sentence_index >= 0 and passages_state_list[prev_valid_sentence_index].get('paragraph_index') == sentence_para_idx: | |
output_parts.append(" ") | |
# No space needed if it's the very first item in a paragraph after a break/heading | |
output_parts.append(formatted_sentence) | |
# Close the last paragraph tag if it was opened | |
if is_paragraph_open: | |
output_parts.append("</p>\n") | |
# Wrap everything in a main div for robustness | |
return "<div>\n" + "".join(output_parts) + "</div>" | |
# --- Internal Search Helper --- | |
def _perform_single_query_search(query, where_filter, n_results): | |
"""Performs a single vector query against ChromaDB, returning processed results.""" | |
logging.info(f"Performing single query search for: '{query[:50]}...' (n_results={n_results}, filter={where_filter})") | |
if collection is None: | |
logging.error("ChromaDB collection is not available for query.") | |
raise ConnectionError("DB not available.") | |
if not query: | |
logging.error("Cannot perform search with an empty query.") | |
return [] # Return empty list for empty query | |
# Get query embedding (handles errors internally) | |
query_embedding = get_embedding(query, task="RETRIEVAL_QUERY") | |
logging.debug(f"Inside _perform_single_query_search: Generated query embedding. Type: {type(query_embedding)}, Is None: {query_embedding is None}") | |
if isinstance(query_embedding, list): logging.debug(f" Embedding length: {len(query_embedding)}") | |
if query_embedding is None: | |
# Embedding failed, cannot proceed with query | |
raise ValueError(f"Embedding generation failed for query: '{query[:50]}...'") | |
try: | |
results = collection.query( | |
query_embeddings=[query_embedding], | |
n_results=n_results, | |
where=where_filter, # Apply filter if provided | |
include=['documents', 'metadatas', 'distances'] # Fetch necessary fields | |
) | |
processed_results = [] | |
# Results structure: {'ids': [[]], 'documents': [[]], ...} | |
# Check if results and the first list within 'ids' exist and are not empty | |
if results and results.get('ids') and results['ids'] and results['ids'][0]: | |
# Extract the lists for the single query | |
ids_list = results['ids'][0] | |
docs_list = results.get('documents', [[]])[0] or [] # Use default empty list | |
metadatas_list = results.get('metadatas', [[]])[0] or [] | |
distances_list = results.get('distances', [[]])[0] or [] | |
num_found = len(ids_list) | |
# Robustness check on list lengths | |
if not (num_found == len(docs_list) == len(metadatas_list) == len(distances_list)): | |
logging.warning(f"ChromaDB result length mismatch: {num_found} IDs, {len(docs_list)} docs, {len(metadatas_list)} metas, {len(distances_list)} dists. Processing cautiously.") | |
num_found = min(num_found, len(docs_list), len(metadatas_list), len(distances_list)) | |
ids_list = ids_list[:num_found] # Truncate lists to match | |
logging.info(f"ChromaDB query returned {len(ids_list)} results.") | |
for i, res_id in enumerate(ids_list): | |
# Check bounds just in case, though clamping should prevent IndexError | |
if i >= num_found: break | |
doc = docs_list[i] if docs_list[i] is not None else "_Text fehlt_" | |
meta = metadatas_list[i] if metadatas_list[i] is not None else {} | |
dist = distances_list[i] if distances_list[i] is not None else float('inf') | |
# Basic validation | |
if res_id is None: logging.warning(f"Skipping result with None ID at index {i}"); continue | |
res_id_str = str(res_id) # Ensure ID is string | |
if doc == "_Text fehlt_": logging.warning(f"Missing document for ID {res_id_str} at index {i}") | |
if dist == float('inf'): logging.warning(f"Missing distance for ID {res_id_str} at index {i}") | |
processed_results.append({ | |
"id": res_id_str, # Store ID as string | |
"document": doc, | |
"metadata": meta, | |
"distance": dist | |
}) | |
else: | |
logging.info(f"Query '{query[:50]}...' returned no results from ChromaDB.") | |
return processed_results | |
except Exception as e: | |
logging.error(f"Error during ChromaDB query for '{query[:50]}...': {e}", exc_info=True) | |
if "dimension" in str(e).lower(): | |
logging.error("Query failed possibly due to embedding dimension mismatch.") | |
raise ValueError(f"Dimension mismatch error for query '{query[:50]}...'") | |
raise RuntimeError(f"DB search error for query '{query[:50]}...': {type(e).__name__}") | |
# --- Helper Function: Construct Passage Block --- | |
def _construct_passage_block(center_id_str, passage_data_map, candidate_neighbor_map): | |
"""Constructs a continuous text block including neighbors for a given center passage.""" | |
center_data = passage_data_map.get(center_id_str) | |
if not center_data: | |
logging.warning(f"_construct_passage_block: Missing data for center ID {center_id_str}.") | |
return "_Zentrumstext fehlt_" | |
center_meta = center_data.get('meta', {}) | |
center_text = center_data.get('doc') | |
if not center_text or center_text == "_Text fehlt_": | |
logging.warning(f"_construct_passage_block: Missing document text for center ID {center_id_str}.") | |
return "_Zentrumstext fehlt_" | |
block_text_parts = [] | |
neighbors = candidate_neighbor_map.get(center_id_str, {'prev': [], 'next': []}) | |
# Add previous neighbors (if metadata matches) - Iterate in original order (closest first) and insert at beginning | |
# Note: Sorting neighbors.get('prev', []) by int() ensures chronological order | |
for prev_id in sorted(neighbors.get('prev', []), key=int): | |
prev_data = passage_data_map.get(prev_id) | |
if prev_data and compare_passage_metadata(center_meta, prev_data.get('meta', {})): | |
prev_text = prev_data.get('doc') | |
if prev_text and prev_text != "_Text fehlt_": | |
block_text_parts.append(prev_text) # Add to the end temporarily | |
# Add the center text | |
block_text_parts.append(center_text) | |
# Add next neighbors (if metadata matches) - Iterate in original order (closest first) and append | |
for next_id in sorted(neighbors.get('next', []), key=int): | |
next_data = passage_data_map.get(next_id) | |
if next_data and compare_passage_metadata(center_meta, next_data.get('meta', {})): | |
next_text = next_data.get('doc') | |
if next_text and next_text != "_Text fehlt_": | |
block_text_parts.append(next_text) # Add to the end | |
# Join the parts into a single string for the block | |
continuous_block_text = " ".join(block_text_parts) | |
if not continuous_block_text.strip(): | |
logging.warning(f"_construct_passage_block: Constructed empty passage block for center ID {center_id_str}.") | |
return "_Leerer Kontextblock_" | |
return continuous_block_text | |
# --- Modified Core Search Logic (Standard Mode) --- | |
def perform_search_standard(query, selected_authors, window_size, weight, decay, n_results=MAX_RESULTS_STANDARD): | |
"""Performs standard search: Embed -> Query -> Re-rank -> Return results & embedding.""" | |
logging.info(f"--- Starting Standard Search --- Query: '{query[:50]}...' | Authors: {selected_authors} | Target Results: {n_results} | Window={window_size}, Weight={weight:.2f}, Decay={decay:.2f}") | |
original_query_embedding = None | |
try: | |
# Phase 1: Get Query Embedding | |
original_query_embedding = get_embedding(query, task="RETRIEVAL_QUERY") | |
if original_query_embedding is None: | |
raise ValueError("Failed to generate query embedding for standard search.") | |
# Phase 2: Build Filter | |
where_filter = None | |
if selected_authors: | |
authors_filter_list = selected_authors if isinstance(selected_authors, list) else [selected_authors] | |
authors_filter_list = [a for a in authors_filter_list if a and isinstance(a, str)] | |
if authors_filter_list: | |
where_filter = {"author": {"$in": authors_filter_list}} | |
logging.info(f"Applying author filter: {where_filter}") | |
else: | |
logging.warning("Empty or invalid author filter list provided, searching all authors.") | |
# Phase 3: Initial Search | |
logging.info(f"Fetching initial {INITIAL_RESULTS_FOR_RERANK} candidates from DB.") | |
initial_candidates = _perform_single_query_search(query, where_filter, INITIAL_RESULTS_FOR_RERANK) | |
if not initial_candidates: | |
logging.info("Standard Search: No initial results found from DB.") | |
return [], original_query_embedding | |
logging.info(f"Found {len(initial_candidates)} initial candidates. Proceeding to 1st pass re-ranking.") | |
# Phase 4: Contextual Re-ranking (1st Pass) | |
reranked_results = rerank_with_context( | |
initial_candidates, | |
original_query_embedding, | |
n_results, # Target number of final results | |
weight, # Use argument | |
decay, # Use argument | |
window_size, # Use argument | |
MIN_CHARS_FOR_RELEVANT_NEIGHBOR # Pass constant | |
) | |
logging.info(f"Standard Search: Re-ranked {len(initial_candidates)} -> Found {len(reranked_results)} final results.") | |
return reranked_results, original_query_embedding | |
except (ConnectionError, ValueError, RuntimeError) as e: | |
logging.error(f"Standard Search failed: {e}", exc_info=False) | |
return [], original_query_embedding | |
except Exception as e: | |
logging.error(f"Standard Search encountered an unexpected error: {e}", exc_info=True) | |
return [], original_query_embedding | |
# --- Search Function (Standard Mode UI Wrapper) --- | |
def search_standard_mode_ui(search_results, query_embedding): | |
"""Prepares Gradio UI updates for the Standard Search results.""" | |
logging.info("Preparing UI updates for Standard Search results.") | |
updates = create_reset_updates() # Start with a clean reset state dictionary | |
# Store the received embedding (if valid) in the state used for context highlighting | |
if query_embedding is not None: | |
updates[direct_embedding_output_holder] = query_embedding | |
logging.debug("Stored valid query embedding in direct_embedding_output_holder for standard mode.") | |
else: | |
updates[direct_embedding_output_holder] = None | |
logging.warning("Query embedding was None, stored None in direct_embedding_output_holder for standard mode.") | |
if not search_results: | |
logging.info("No standard search results found to display.") | |
# MODIFIED: Update new components | |
updates[result_accordion] = gr.update(visible=True, label="Keine Resultate gefunden.", open=False) | |
updates[result_metadata_display] = gr.update(value="") | |
updates[result_text] = gr.update(value="", visible=True) | |
updates[single_result_group] = gr.update(visible=True) | |
# Ensure states are also reset/empty | |
updates[full_search_results_state] = [] | |
updates[current_result_index_state] = 0 | |
updates[active_view_state] = "standard" # Still set view state even if empty | |
return updates # Return the dictionary of updates | |
# Populate state and update UI elements if results were found | |
logging.info(f"Displaying first of {len(search_results)} standard results.") | |
updates[full_search_results_state] = search_results | |
updates[current_result_index_state] = 0 # Start at the first result | |
updates[active_view_state] = "standard" # Set active view state | |
# Format the first result for immediate display using the combined formatter | |
# MODIFIED: Call format_result_display and get two parts | |
accordion_title, accordion_content_md, text_content = format_result_display(search_results[0], 0, len(search_results), "standard") | |
# MODIFIED: Update new components | |
updates[result_accordion] = gr.update(visible=True, label=accordion_title, open=False) | |
updates[result_metadata_display] = gr.update(value=accordion_content_md) | |
updates[result_text] = gr.update(value=text_content, visible=True) | |
# Make shared result group and navigation visible | |
updates[single_result_group] = gr.update(visible=True) | |
updates[standard_nav_row] = gr.update(visible=True) | |
# Configure navigation buttons for Standard results | |
updates[previous_result_button] = gr.update(visible=True, interactive=False) # Can't go back from first result | |
updates[next_result_button] = gr.update(visible=True, interactive=(len(search_results) > 1)) # Enable if more than one result | |
updates[weiterlesen_button] = gr.update(visible=True, interactive=True, value="weiterlesen") # Enable context button, ensure value | |
return updates # Return the dictionary of updates | |
# --- Modified Core Search Logic (LLM Mode) --- | |
def perform_search_llm(query, selected_authors, window_size, weight, decay): | |
"""Performs LLM Re-Rank Search: Embed -> Query -> Re-rank -> Prep -> LLM -> Parse -> Return results & embedding.""" | |
logging.info(f"--- Starting LLM Re-Rank Search --- Query: '{query[:50]}...' | Authors: {selected_authors} | Window={window_size}, Weight={weight:.2f}, Decay={decay:.2f}") | |
original_query_embedding = None | |
# --- Phase 0: Get Query Embedding --- | |
try: | |
original_query_embedding = get_embedding(query, task="RETRIEVAL_QUERY") | |
if original_query_embedding is None: | |
raise ValueError("Embedding failed for LLM search.") | |
logging.info("Query embedding generated successfully for LLM search.") | |
except Exception as embed_e: | |
logging.error(f"LLM Re-Rank: Embedding error: {embed_e}", exc_info=True) | |
return None, original_query_embedding # Return None for results to indicate failure | |
# --- Phase 1: Initial Search, Filter & First-Pass Re-ranking --- | |
try: | |
logging.info(f"LLM ReRank Mode: Initial search for query: '{query[:50]}...'") | |
# Build Filter | |
where_filter = None | |
if selected_authors: | |
authors_filter_list = selected_authors if isinstance(selected_authors, list) else [selected_authors] | |
authors_filter_list = [a for a in authors_filter_list if a and isinstance(a, str)] | |
if authors_filter_list: | |
where_filter = {"author": {"$in": authors_filter_list}} | |
logging.info(f"LLM ReRank: Applying WHERE filter: {where_filter}") | |
else: logging.warning("Empty or invalid author filter list for LLM rerank.") | |
# Initial DB Search | |
initial_candidates = _perform_single_query_search(query, where_filter, INITIAL_RESULTS_FOR_RERANK) | |
if not initial_candidates: | |
logging.info("LLM ReRank Mode: No initial results found from DB.") | |
return [], original_query_embedding | |
logging.info(f"Found {len(initial_candidates)} initial candidates. Performing 1st pass re-ranking...") | |
# First-Pass Re-ranking (Pass new arguments) | |
first_pass_reranked = rerank_with_context( | |
initial_candidates, | |
original_query_embedding, | |
LLM_RERANK_CANDIDATE_COUNT, # Target N for LLM input pool | |
weight, # Use argument | |
decay, # Use argument | |
window_size, # Use argument | |
MIN_CHARS_FOR_RELEVANT_NEIGHBOR # Pass constant | |
) | |
# Select the top candidates to send to the LLM | |
candidates_for_llm = first_pass_reranked[:LLM_RERANK_CANDIDATE_COUNT] | |
if not candidates_for_llm: | |
logging.info("LLM ReRank Mode: No candidates left after first-pass re-ranking.") | |
return [], original_query_embedding | |
logging.info(f"Selected top {len(candidates_for_llm)} candidates after 1st pass for LLM.") | |
except (ConnectionError, ValueError, RuntimeError) as search_filter_e: | |
logging.error(f"LLM Re-Rank: Initial Search/Filter/Re-rank error: {search_filter_e}", exc_info=True) | |
return None, original_query_embedding # Return None for results to indicate failure | |
except Exception as e: | |
logging.error(f"LLM Re-Rank: Unexpected error in Phase 1 (Search/Filter/Re-rank): {e}", exc_info=True) | |
return None, original_query_embedding # Return None for results to indicate failure | |
# --- Phase 2: Prepare Passage Blocks for LLM Prompt --- | |
try: | |
logging.info("Preparing passage blocks for LLM prompt using pre-constructed blocks...") | |
passage_separator = "\n\n--- PASSAGE SEPARATOR ---\n\n" | |
prompt_passage_blocks_list = [] | |
for cand_data in candidates_for_llm: | |
center_id_str = cand_data.get('id') | |
context_block = cand_data.get('context_block') | |
if not center_id_str or not context_block or context_block in ["_Kontextblock fehlt_", "_Fehler bei Kontext-Erstellung_"]: | |
logging.warning(f"Skipping candidate {center_id_str} for LLM prompt due to missing ID or invalid context block.") | |
continue | |
prompt_block = f"Passage ID: {center_id_str}\nPassage Text:\n{context_block}" | |
prompt_passage_blocks_list.append(prompt_block) | |
if not prompt_passage_blocks_list: | |
logging.warning("No valid context blocks could be prepared for the LLM prompt.") | |
return [], original_query_embedding | |
passage_blocks_str_for_prompt = passage_separator.join(prompt_passage_blocks_list) | |
logging.info(f"Prepared {len(prompt_passage_blocks_list)} passage blocks for the LLM.") | |
except Exception as e: | |
logging.error(f"LLM Re-Rank: Error during passage block preparation (Phase 2): {e}", exc_info=True) | |
return None, original_query_embedding # Return None for results to indicate failure | |
# --- Phase 3: Call LLM for Re-ranking and Truncation --- | |
if not llm_rerank_model: | |
logging.error("LLM Re-rank model is not available/initialized.") | |
return None, original_query_embedding # Return None for results to indicate failure | |
try: | |
# Format the final prompt using the template | |
rerank_prompt = LLM_RERANKING_PROMPT_TEMPLATE_V3.format( | |
user_query=query, | |
passage_blocks_str=passage_blocks_str_for_prompt, # Use constructed string | |
target_count=LLM_RERANK_TARGET_COUNT # Use constant | |
) | |
logging.debug(f"LLM Rank/Truncate Prompt (first 500 chars):\n{rerank_prompt[:500]}...") | |
# Save the full prompt to a file for debugging | |
try: | |
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S_%f") | |
filename = os.path.join(PROMPT_LOG_DIR, f"{timestamp}_llm_rank_truncate_prompt.txt") | |
with open(filename, 'w', encoding='utf-8') as f: | |
f.write(f"--- User Query ---\n{query}\n\n--- Prompt Sent to LLM ({LLM_RERANK_MODEL_NAME}) ---\n{rerank_prompt}") | |
logging.info(f"LLM Rank/Truncate prompt saved to: {filename}") | |
except IOError as log_e: | |
logging.error(f"Error saving LLM Rank/Truncate prompt: {log_e}", exc_info=False) | |
# Make the API call to Gemini | |
logging.info(f"Sending Rank/Truncate request to LLM ({LLM_RERANK_MODEL_NAME})...") | |
generation_config = genai.types.GenerationConfig(temperature=0.2) | |
response = llm_rerank_model.generate_content( | |
rerank_prompt, | |
generation_config=generation_config | |
) | |
logging.info("LLM Rank/Truncate response received.") | |
# --- Phase 4: Parse LLM Response and Fetch Metadata --- | |
logging.info("Processing LLM response...") | |
# ---> START OF ROBUST RESPONSE HANDLING <--- | |
response_text = None | |
finish_reason_name = 'UNKNOWN' | |
try: | |
if hasattr(response, 'prompt_feedback') and getattr(response.prompt_feedback, 'block_reason', None): | |
block_reason = response.prompt_feedback.block_reason | |
finish_reason_name = f"PROMPT_BLOCKED_{block_reason}" | |
logging.error(f"LLM Rank/Truncate prompt was blocked! Reason: {block_reason}") | |
return [], original_query_embedding # Return empty | |
elif response.candidates: | |
first_candidate = response.candidates[0] | |
reason_enum = getattr(first_candidate, 'finish_reason', None) | |
finish_reason_name = getattr(reason_enum, 'name', str(reason_enum)) | |
VALID_FINISH_REASONS = {"STOP", "MAX_TOKENS"} | |
if finish_reason_name in VALID_FINISH_REASONS: | |
if first_candidate.content and first_candidate.content.parts: | |
response_text = first_candidate.content.parts[0].text | |
logging.debug("Successfully extracted text from the first candidate.") | |
else: | |
logging.warning("LLM candidate finished validly buthad no text content part.") | |
response_text = None | |
else: | |
logging.warning(f"LLM Rank/Truncate candidate finished with reason: {finish_reason_name}. No text content expected or extracted.") | |
else: | |
logging.error("LLM response had no candidates.") | |
if response_text is None: | |
logging.error(f"LLM Rank/Truncate returned no usable text content. Final Finish Reason Check: {finish_reason_name}") | |
# Log response details if available for debugging | |
logging.debug(f"Full LLM response object structure: {response}") | |
return [], original_query_embedding # Return empty | |
except Exception as resp_check_e: | |
logging.error(f"Error checking LLM response structure/finish_reason: {resp_check_e}", exc_info=True) | |
logging.debug(f"Full LLM response object structure during check error: {response}") | |
return [], original_query_embedding # Return empty on error checking response | |
# ---> END OF ROBUST RESPONSE HANDLING <--- | |
llm_response_text = response_text | |
logging.debug(f"LLM Raw Response Text (used for parsing):\n{llm_response_text}") | |
# --- Start JSON Parsing --- | |
json_string = None | |
parsed_llm_results = [] | |
try: | |
# Attempt to find JSON inside a ```json ``` block first (preferred format) | |
json_match = re.search(r"```json\s*({.*?})\s*```", llm_response_text, re.DOTALL | re.IGNORECASE) | |
if json_match: | |
json_string = json_match.group(1) | |
logging.debug("Found JSON block using ```json ``` regex.") | |
else: | |
# If no block is found, assume the entire response is potentially JSON | |
json_string = llm_response_text.strip() | |
if not (json_string.startswith('{') and json_string.endswith('}')): | |
logging.warning("LLM response did not contain ```json ``` block and doesn't look like raw JSON object. Attempting parse anyway.") | |
else: | |
logging.debug("Assuming raw LLM response is JSON object.") | |
parsed_response = json.loads(json_string) | |
# Validate the top-level structure | |
if "ranked_edited_passages" not in parsed_response or not isinstance(parsed_response["ranked_edited_passages"], list): | |
logging.error("LLM JSON response missing 'ranked_edited_passages' list or it's not a list.") | |
raise ValueError("JSON response structure invalid: missing 'ranked_edited_passages' list.") | |
raw_results = parsed_response["ranked_edited_passages"] | |
logging.info(f"LLM returned {len(raw_results)} items in 'ranked_edited_passages'.") | |
# Validate and collect individual results from the list | |
parsed_llm_results = [] # Reset before processing | |
for i, item in enumerate(raw_results): | |
if isinstance(item, dict) and 'original_id' in item and 'edited_text' in item: | |
item_id = str(item['original_id']) # Ensure ID is string | |
item_text = str(item['edited_text']) | |
item_rationale = item.get('rationale', '') # Rationale is optional | |
# Logging for individual items | |
# logging.debug(f"Parsed item {i}: ID={item_id}, Text='{item_text[:50]}...', Rationale='{item_rationale[:50]}...'") | |
if item_id and item_text.strip(): # Only add if ID and text are non-empty | |
parsed_llm_results.append({'id': item_id, 'edited_text': item_text, 'rationale': item_rationale}) # Keep rationale here | |
else: | |
logging.warning(f"Skipping invalid or empty LLM result item at index {i}: {item}") | |
else: | |
logging.warning(f"Skipping item with invalid format in 'ranked_edited_passages' at index {i}: {item}") | |
# Truncate to the target count if needed (should be handled by LLM, but safe) | |
parsed_llm_results = parsed_llm_results[:LLM_RERANK_TARGET_COUNT] | |
logging.info(f"Successfully parsed {len(parsed_llm_results)} valid ranked/edited passages from LLM response.") | |
if not parsed_llm_results: | |
logging.info("LLM parsing yielded no valid passages.") | |
return [], original_query_embedding # Return empty list | |
except (json.JSONDecodeError, ValueError) as parse_e: | |
logging.error(f"LLM Rank/Truncate response JSON parsing error: {parse_e}", exc_info=True) | |
logging.error(f"--- LLM Response Text causing JSON error ---\n{llm_response_text}\n--- End Response ---") | |
return [], original_query_embedding # Return empty list on parsing error | |
except Exception as parse_e: | |
logging.error(f"Unexpected error during LLM JSON parsing: {parse_e}", exc_info=True) | |
return [], original_query_embedding # Return empty list on any parsing error | |
# --- End JSON Parsing --- | |
# --- Fetch Metadata for LLM Results --- | |
# We need the original metadata (author, book, etc.) from the DB for displaying results correctly. | |
result_ids_to_fetch = [res['id'] for res in parsed_llm_results] | |
logging.info(f"Fetching metadata directly for {len(result_ids_to_fetch)} final LLM result IDs.") | |
if result_ids_to_fetch: | |
fetched_metadata_map = fetch_multiple_passage_data(result_ids_to_fetch) | |
logging.debug(f"Fetched metadata map contains {len(fetched_metadata_map)} entries for final LLM results.") | |
else: | |
# If no IDs to fetch (e.g., no results parsed), return empty | |
logging.warning("No result IDs to fetch metadata for after LLM parsing.") | |
return [], original_query_embedding | |
# --- Combine parsed text with fetched metadata for the final UI structure --- | |
final_llm_results_for_ui = [] | |
for result in parsed_llm_results: | |
passage_id = result['id'] | |
passage_data = fetched_metadata_map.get(passage_id) | |
if passage_data: | |
final_llm_results_for_ui.append({ | |
'id': passage_id, # Original ID | |
'original_id': passage_id, # Store original_id explicitly for formatter | |
'edited_text': result.get('edited_text', '_Editierter Text fehlt_'), # LLM's edited text | |
'rationale': result.get('rationale', ''), # LLM's rationale | |
'metadata': passage_data.get('meta', {}) # Original metadata from DB fetch | |
# Note: Distance and Initial/Final similarity from previous steps are NOT included | |
# as the LLM result is a new entity, not directly representing a DB passage's score. | |
}) | |
else: | |
logging.warning(f"Could not fetch metadata from DB for final LLM result ID: {passage_id}. Skipping this result.") | |
if not final_llm_results_for_ui: | |
logging.error("Failed to fetch metadata for any of the LLM's ranked passages.") | |
# Still return the original query embedding if available | |
return [], original_query_embedding # Return empty list | |
# --- Success --- | |
logging.info(f"LLM Re-Rank Search successful. Returning {len(final_llm_results_for_ui)} processed results.") | |
# Return the list of results and the original query embedding | |
return final_llm_results_for_ui, original_query_embedding | |
except Exception as e: | |
logging.error(f"LLM Rank/Truncate general processing error after API call: {e}", exc_info=True) | |
# Return None for results to indicate a failure, but return embedding if available | |
return None, original_query_embedding | |
# --- Search Function (LLM Re-Rank Mode UI Wrapper) --- | |
def search_llm_rerank_mode_ui(llm_results, query_embedding): | |
"""Prepares Gradio UI updates for the LLM Re-Rank Search results.""" | |
logging.info("Preparing UI updates for LLM Re-Rank Search results.") | |
updates = create_reset_updates() # Start with reset state | |
# Store the received embedding for context highlighting | |
if query_embedding is not None: | |
updates[direct_embedding_output_holder] = query_embedding | |
logging.debug("Stored valid query embedding in direct_embedding_output_holder for LLM mode.") | |
else: | |
updates[direct_embedding_output_holder] = None | |
logging.warning("Query embedding was None for LLM mode.") | |
# Set active view state early | |
updates[active_view_state] = "llm" | |
# Check if results indicate an error occurred in the core logic (returned None) | |
if llm_results is None: | |
logging.error("LLM core search logic returned None, indicating an error.") | |
# Use the shared display group | |
updates[single_result_group] = gr.update(visible=True) | |
# MODIFIED: Update new components | |
updates[result_accordion] = gr.update(visible=True, label="**Fehler:** LLM Re-Ranking fehlgeschlagen.", open=False) | |
updates[result_metadata_display] = gr.update(value="Details siehe Server-Logs.") | |
updates[result_text] = gr.update(value="", visible=True) | |
# Ensure states are empty | |
updates[llm_results_state] = [] | |
updates[llm_result_index_state] = 0 | |
return updates | |
# Check if results list is empty (no relevant passages found/parsed, but no error) | |
if not llm_results: | |
logging.info("LLM search returned no relevant passages.") | |
# Use the shared display group | |
updates[single_result_group] = gr.update(visible=True) | |
# MODIFIED: Update new components | |
updates[result_accordion] = gr.update(visible=True, label="LLM Resultate", open=False) | |
updates[result_metadata_display] = gr.update(value="_(Keine relevanten Passagen nach LLM Re-Ranking gefunden.)_") | |
updates[result_text] = gr.update(value="", visible=True) | |
# Ensure states are empty | |
updates[llm_results_state] = [] | |
updates[llm_result_index_state] = 0 | |
return updates | |
# Got results, update UI | |
logging.info(f"Displaying first of {len(llm_results)} LLM re-ranked results.") | |
updates[llm_results_state] = llm_results | |
updates[llm_result_index_state] = 0 # Start at first result | |
# Format and display the first result using the combined formatter | |
# MODIFIED: Call format_result_display and get two parts | |
accordion_title, accordion_content_md, text_content = format_result_display(llm_results[0], 0, len(llm_results), "llm") | |
# MODIFIED: Update new components | |
updates[result_accordion] = gr.update(visible=True, label=accordion_title, open=False) | |
updates[result_metadata_display] = gr.update(value=accordion_content_md) | |
updates[result_text] = gr.update(value=text_content, visible=True) | |
# Make shared result group and navigation visible | |
updates[single_result_group] = gr.update(visible=True) | |
updates[standard_nav_row] = gr.update(visible=True) | |
# Configure navigation buttons for LLM results | |
updates[previous_result_button] = gr.update(visible=True, interactive=False) | |
updates[next_result_button] = gr.update(visible=True, interactive=(len(llm_results) > 1)) | |
updates[weiterlesen_button] = gr.update(visible=True, interactive=True, value="im Original weiterlesen") # Enable context button, change value | |
return updates | |
# --- Result Navigation Function (Standard Mode) --- | |
def navigate_results(direction, current_index, full_results): | |
"""Handles UI updates for navigating standard search results.""" | |
logging.info(f"Navigating standard results: Direction={direction}, Index={current_index}") | |
# Define default updates (hide context, show standard results, etc.) | |
updates = { | |
standard_nav_row: gr.update(visible=True), # Show the shared nav row | |
single_result_group: gr.update(visible=True), # Show the shared result group | |
# MODIFIED: Clear new components instead of single_result_display_md | |
result_accordion: gr.update(label="...", open=False, visible=True), | |
result_metadata_display: gr.update(value=""), | |
result_text: gr.update(value="", visible=True), | |
# Buttons in standard_nav_row will be managed based on index below | |
previous_result_button: gr.update(visible=True), | |
next_result_button: gr.update(visible=True), | |
weiterlesen_button: gr.update(visible=True, value="weiterlesen"), # Standard search weiterlesen | |
context_area: gr.update(visible=False), # Hide context | |
back_to_results_button: gr.update(visible=False), # Hide back button | |
current_result_index_state: current_index, # Store potentially new index | |
full_search_results_state: full_results, # Pass state through | |
active_view_state: "standard" # Ensure view state is correct | |
} | |
if not full_results or not isinstance(full_results, list): | |
logging.warning("Cannot navigate: No standard results available in state.") | |
updates[current_result_index_state] = 0 | |
# MODIFIED: Update new components | |
updates[result_accordion] = gr.update(visible=True, label="Keine Resultate zum Navigieren.", open=False) | |
updates[result_metadata_display] = gr.update(value="") | |
updates[result_text] = gr.update(value="", visible=True) | |
# Hide all navigation elements in the shared row | |
updates[previous_result_button] = gr.update(interactive=False, visible=False) | |
updates[next_result_button] = gr.update(interactive=False, visible=False) | |
updates[weiterlesen_button] = gr.update(visible=False) | |
updates[standard_nav_row] = gr.update(visible=False) # Hide the nav row itself | |
updates[single_result_group] = gr.update(visible=False) # Hide the result group itself | |
return updates # Return the dictionary of updates | |
total_results = len(full_results) | |
new_index = current_index | |
# Calculate new index based on direction | |
if direction == 'previous': | |
new_index = max(0, current_index - 1) | |
elif direction == 'next': | |
new_index = min(total_results - 1, current_index + 1) | |
# Update display if index is valid | |
if 0 <= new_index < total_results: | |
result_data = full_results[new_index] | |
# MODIFIED: Use the combined formatter and get two parts | |
accordion_title, accordion_content_md, text_content = format_result_display(result_data, new_index, total_results, "standard") | |
# MODIFIED: Update new components | |
updates[result_accordion] = gr.update(visible=True, label=accordion_title, open=False) | |
updates[result_metadata_display] = gr.update(value=accordion_content_md) | |
updates[result_text] = gr.update(value=text_content, visible=True) | |
updates[current_result_index_state] = new_index # Update state with new index | |
# Update button interactivity based on new index | |
updates[previous_result_button] = gr.update(interactive=(new_index > 0)) | |
updates[next_result_button] = gr.update(interactive=(new_index < total_results - 1)) | |
updates[weiterlesen_button] = gr.update(interactive=True) # Always possible from a result | |
logging.info(f"Navigated standard results to index {new_index}") | |
else: | |
# Should not happen with bounds checking, but handle defensively | |
logging.error(f"Navigation error: New index {new_index} out of bounds [0, {total_results-1}]") | |
# MODIFIED: Update new components on error | |
updates[result_accordion] = gr.update(visible=True, label="Fehler beim Navigieren der Resultate.", open=False) | |
updates[result_metadata_display] = gr.update(value="") | |
updates[result_text] = gr.update(value="", visible=True) | |
updates[previous_result_button] = gr.update(interactive=False) | |
updates[next_result_button] = gr.update(interactive=False) | |
updates[weiterlesen_button] = gr.update(interactive=False) | |
return updates | |
# --- Navigation Function for LLM Results --- | |
def navigate_llm_results(direction, current_index, llm_results): | |
"""Handles UI updates for navigating LLM re-ranked results.""" | |
logging.info(f"Navigating LLM results: Direction={direction}, Index={current_index}") | |
# Define default updates (show LLM results, hide others) | |
updates = { | |
standard_nav_row: gr.update(visible=True), # Show the shared nav row | |
single_result_group: gr.update(visible=True), # Show the shared result group | |
# MODIFIED: Clear new components instead of single_result_display_md | |
result_accordion: gr.update(label="...", open=False, visible=True), | |
result_metadata_display: gr.update(value=""), | |
result_text: gr.update(value="", visible=True), | |
# Buttons in standard_nav_row will be managed based on index below | |
previous_result_button: gr.update(visible=True), | |
next_result_button: gr.update(visible=True), | |
weiterlesen_button: gr.update(visible=True, value="im Original weiterlesen"), # LLM search weiterlesen | |
context_area: gr.update(visible=False), # Hide context | |
back_to_results_button: gr.update(visible=False), # Hide back button | |
llm_results_state: llm_results, # Pass state through | |
llm_result_index_state: current_index, # Store potentially new index | |
active_view_state: "llm" # Ensure view state is correct | |
} | |
if not llm_results or not isinstance(llm_results, list): | |
logging.warning("Cannot navigate: No LLM results available in state.") | |
# MODIFIED: Update new components | |
updates[result_accordion] = gr.update(visible=True, label="Keine LLM-Resultate vorhanden.", open=False) | |
updates[result_metadata_display] = gr.update(value="") | |
updates[result_text] = gr.update(value="", visible=True) | |
# Hide navigation elements in the shared row | |
updates[previous_result_button] = gr.update(interactive=False, visible=False) | |
updates[next_result_button] = gr.update(interactive=False, visible=False) | |
updates[weiterlesen_button] = gr.update(visible=False) | |
updates[standard_nav_row] = gr.update(visible=False) # Hide the nav row itself | |
updates[single_result_group] = gr.update(visible=False) # Hide the result group itself | |
# Reset state | |
updates[llm_results_state] = [] | |
updates[llm_result_index_state] = 0 | |
return updates | |
total_results = len(llm_results) | |
new_index = current_index | |
# Calculate new index | |
if direction == 'previous': | |
new_index = max(0, current_index - 1) | |
elif direction == 'next': | |
new_index = min(total_results - 1, current_index + 1) | |
# Update display if index is valid | |
if 0 <= new_index < total_results: | |
result_data = llm_results[new_index] | |
# MODIFIED: Use the combined formatter and get two parts | |
accordion_title, accordion_content_md, text_content = format_result_display(result_data, new_index, total_results, "llm") | |
# MODIFIED: Update new components | |
updates[result_accordion] = gr.update(visible=True, label=accordion_title, open=False) | |
updates[result_metadata_display] = gr.update(value=accordion_content_md) | |
updates[result_text] = gr.update(value=text_content, visible=True) | |
updates[llm_result_index_state] = new_index # Update state | |
# Update button interactivity | |
updates[previous_result_button] = gr.update(interactive=(new_index > 0)) | |
updates[next_result_button] = gr.update(interactive=(new_index < total_results - 1)) | |
updates[weiterlesen_button] = gr.update(interactive=True) | |
logging.info(f"Navigated LLM results to index {new_index}") | |
else: | |
logging.error(f"LLM Navigation error: New index {new_index} out of bounds [0, {total_results-1}]") | |
# MODIFIED: Update new components on error | |
updates[result_accordion] = gr.update(visible=True, label="Fehler beim Navigieren der LLM-Resultate.", open=False) | |
updates[result_metadata_display] = gr.update(value="") | |
updates[result_text] = gr.update(value="", visible=True) | |
updates[previous_result_button] = gr.update(interactive=False) | |
updates[next_result_button] = gr.update(interactive=False) | |
updates[weiterlesen_button] = gr.update(interactive=False) | |
return updates | |
# --- Navigation Function for Favourites --- | |
def navigate_best_results(direction, current_index, best_results): | |
"""Handles UI updates for navigating favourite results.""" | |
logging.info(f"Navigating favourite results: Direction={direction}, Index={current_index}") | |
# Define default updates (show favourites, hide others) | |
updates = { | |
standard_nav_row: gr.update(visible=True), # Show the shared nav row | |
single_result_group: gr.update(visible=True), # Show the shared result group | |
# MODIFIED: Clear new components instead of single_result_display_md | |
result_accordion: gr.update(label="...", open=False, visible=True), | |
result_metadata_display: gr.update(value=""), | |
result_text: gr.update(value="", visible=True), | |
# Buttons in standard_nav_row will be managed based on index below | |
previous_result_button: gr.update(visible=True), | |
next_result_button: gr.update(visible=True), | |
weiterlesen_button: gr.update(visible=True, value="weiterlesen"), # Favourites weiterlesen | |
context_area: gr.update(visible=False), # Hide context | |
back_to_results_button: gr.update(visible=False), # Hide back button | |
best_results_state: best_results, # Pass state through | |
best_index_state: current_index, # Store potentially new index | |
active_view_state: "favourites" # Ensure view state is correct | |
} | |
if not best_results or not isinstance(best_results, list): | |
logging.warning("Cannot navigate: No favourite results available in state.") | |
updates[best_index_state] = 0 | |
# MODIFIED: Update new components | |
updates[result_accordion] = gr.update(visible=True, label="_Keine Favoriten zum Navigieren._", open=False) | |
updates[result_metadata_display] = gr.update(value="") | |
updates[result_text] = gr.update(value="", visible=True) | |
# Hide navigation elements in the shared row | |
updates[previous_result_button] = gr.update(interactive=False, visible=False) | |
updates[next_result_button] = gr.update(interactive=False, visible=False) | |
updates[weiterlesen_button] = gr.update(visible=False) | |
updates[standard_nav_row] = gr.update(visible=False) # Hide the nav row itself | |
updates[single_result_group] = gr.update(visible=False) # Hide the result group itself | |
# Reset state | |
updates[best_results_state] = [] | |
updates[best_index_state] = 0 | |
return updates | |
total_results = len(best_results) | |
new_index = current_index | |
# Calculate new index | |
if direction == 'previous': | |
new_index = max(0, current_index - 1) | |
elif direction == 'next': | |
new_index = min(total_results - 1, current_index + 1) | |
# Update display if index is valid | |
if 0 <= new_index < total_results: | |
result_data = best_results[new_index] | |
# MODIFIED: Use the combined formatter and get two parts | |
accordion_title, accordion_content_md, text_content = format_result_display(result_data, new_index, total_results, "favourites") | |
# MODIFIED: Update new components | |
updates[result_accordion] = gr.update(visible=True, label=accordion_title, open=False) | |
updates[result_metadata_display] = gr.update(value=accordion_content_md) | |
updates[result_text] = gr.update(value=text_content, visible=True) | |
updates[best_index_state] = new_index # Update state | |
# Update button interactivity | |
updates[previous_result_button] = gr.update(interactive=(new_index > 0)) | |
updates[next_result_button] = gr.update(interactive=(new_index < total_results - 1)) | |
updates[weiterlesen_button] = gr.update(interactive=True) # Always possible from a favourite | |
logging.info(f"Navigated favourite results to index {new_index}") | |
else: | |
logging.error(f"Favourite Navigation error: New index {new_index} out of bounds [0, {total_results-1}]") | |
# MODIFIED: Update new components on error | |
updates[result_accordion] = gr.update(visible=True, label="Fehler beim Navigieren der Favoriten.", open=False) | |
updates[result_metadata_display] = gr.update(value="") | |
updates[result_text] = gr.update(value="", visible=True) | |
updates[previous_result_button] = gr.update(interactive=False) | |
updates[next_result_button] = gr.update(interactive=False) | |
updates[weiterlesen_button] = gr.update(interactive=False) | |
return updates | |
# --- Move Standard Result to Reading Area (UI Logic) --- | |
def move_to_reading_area_ui(current_index, full_results, query_embedding_value, result_type): | |
"""Handles UI updates and data fetching for moving a result (Standard, LLM, or Favourite) | |
to the context reading area.""" | |
logging.info(f"--- Moving {result_type} Result (Index: {current_index}) to Reading Area ---") | |
# Define UI changes: Hide results, show context area, set loading message | |
updates = { | |
standard_nav_row: gr.update(visible=False), # Hide the shared results nav | |
single_result_group: gr.update(visible=False), # Hide the shared results group | |
context_area: gr.update(visible=True), # Show context area immediately | |
context_display: gr.update(value="Lade Paragraphen..."), # Loading message | |
load_previous_button: gr.update(visible=True, interactive=True), | |
load_next_button: gr.update(visible=True, interactive=True), | |
back_to_results_button: gr.update(visible=True, interactive=True) | |
} | |
# Define state changes separately | |
state_updates = { | |
# Preserve the relevant state indices and lists based on result_type | |
full_search_results_state: [], # Will be replaced by full_results if result_type is standard | |
current_result_index_state: 0, | |
llm_results_state: [], # Will be replaced by full_results if result_type is llm | |
llm_result_index_state: 0, | |
best_results_state: [], # Will be replaced by full_results if result_type is favourites | |
best_index_state: 0, | |
displayed_context_passages: [], # Reset context state before loading | |
direct_embedding_output_holder: query_embedding_value # Pass embedding | |
} | |
if result_type == "standard": | |
state_updates[full_search_results_state] = full_results | |
state_updates[current_result_index_state] = current_index | |
state_updates[active_view_state] = "context_from_standard" | |
elif result_type == "llm": | |
state_updates[llm_results_state] = full_results # Note: full_results holds LLM results here | |
state_updates[llm_result_index_state] = current_index | |
state_updates[active_view_state] = "context_from_llm" | |
elif result_type == "favourites": | |
state_updates[best_results_state] = full_results # Note: full_results holds favourite results here | |
state_updates[best_index_state] = current_index | |
state_updates[active_view_state] = "context_from_favourites" # New state for favourites context | |
# For favourites, the query embedding is not directly relevant for highlighting the original text, | |
# as the favourite was selected based on its score. However, we keep the state updated in case needed later. | |
# Maybe set to None or a specific marker if we don't want query highlighting? Let's keep it for now. | |
# state_updates[direct_embedding_output_holder] = None | |
# Log the received embedding for debugging highlighting | |
logging.debug(f"move_to_reading_area_ui: Received query_embedding_value type: {type(query_embedding_value)}, len/shape: {len(query_embedding_value) if isinstance(query_embedding_value, (list, np.ndarray)) else 'N/A'}, result_type: {result_type}") | |
# Validate input | |
if not full_results or not isinstance(full_results, list) or not (0 <= current_index < len(full_results)): | |
logging.error(f"Invalid {result_type} result reference for moving to reading area.") | |
updates[context_display] = gr.update(value="Fehler: UngΓΌltige Resultat-Referenz zum Lesen.") | |
updates[load_previous_button] = gr.update(interactive=False) | |
updates[load_next_button] = gr.update(interactive=False) | |
return {**updates, **state_updates} | |
try: | |
# Get data for the selected result | |
target_result_data = full_results[current_index] | |
passage_meta = target_result_data.get('metadata', {}) | |
selected_passage_id = target_result_data.get('id') # Use 'id' for favourites too | |
# Extract metadata needed to fetch the paragraph | |
author = passage_meta.get('author') | |
book = passage_meta.get('book') | |
paragraph_idx = passage_meta.get('paragraph_index') # Should be integer or None | |
# Check if necessary metadata is present | |
if author is None or book is None or paragraph_idx is None or not isinstance(paragraph_idx, int) or paragraph_idx < 0: | |
logging.error(f"Missing necessary metadata (author/book/paragraph_index) for {result_type} result ID {selected_passage_id}: Meta={passage_meta}") | |
updates[context_display] = gr.update(value="Fehler: Metadaten unvollstΓ€ndig. Paragraph kann nicht geladen werden.") | |
updates[load_previous_button] = gr.update(interactive=False) | |
updates[load_next_button] = gr.update(interactive=False) | |
return {**updates, **state_updates} | |
logging.info(f"Fetching initial paragraph for context: Author='{author}', Book='{book}', ParagraphIndex={paragraph_idx}") | |
# Fetch the full paragraph data (including embeddings) | |
initial_paragraph_sentences = fetch_paragraph_data(author, book, paragraph_idx) | |
if not initial_paragraph_sentences: | |
logging.error(f"Could not fetch paragraph sentences for {author}/{book}/P{paragraph_idx}") | |
updates[context_display] = gr.update(value="Fehler: Der zugehΓΆrige Paragraph konnte nicht geladen werden (mΓΆglicherweise leer?). Die Navigation zum nΓ€chsten/vorherigen Paragraphen ist weiterhin aktiv.") | |
# Buttons remain interactive=True | |
# Still need to update the state, even if empty sentences were returned, | |
# to correctly reflect that the context area is active. | |
state_updates[displayed_context_passages] = [] | |
return {**updates, **state_updates} | |
# Format the fetched paragraph using the VALID query embedding received as input | |
logging.info(f"Formatting paragraph {paragraph_idx} with {len(initial_paragraph_sentences)} sentences for display.") | |
formatted_passage_md = format_context_markdown(initial_paragraph_sentences, query_embedding_value) # Use the passed embedding | |
updates[context_display] = gr.update(value=formatted_passage_md) # Update display | |
# Update state with the fetched sentences | |
state_updates[displayed_context_passages] = initial_paragraph_sentences | |
# Buttons are already interactive=True from the initial update dict | |
logging.info(f"Paragraph {paragraph_idx} (for passage ID {selected_passage_id}) displayed in context area.") | |
except Exception as e: | |
logging.error(f"Error moving {result_type} passage to reading area: {e}", exc_info=True) | |
updates[context_display] = gr.update(value=f"**Fehler:** Der Paragraph konnte nicht angezeigt werden. Details siehe Server-Logs.") | |
updates[load_previous_button] = gr.update(interactive=False) | |
updates[load_next_button] = gr.update(interactive=False) | |
return {**updates, **state_updates} | |
# --- Go Back To Results Function --- | |
# ... (go_back_to_results_wrapper remains the same in logic, but updates new UI components) ... | |
def go_back_to_results_wrapper(last_active_view, std_results, std_index, llm_results, llm_index, best_results, best_index, current_fav_signal_value): | |
"""Handles UI updates for returning from the context view to the appropriate results view.""" | |
logging.info(f"Triggered: go_back_to_results_wrapper from view: {last_active_view}") | |
updates_dict = { | |
# Reset context area visibility | |
context_area: gr.update(visible=False), | |
context_display: gr.update(value=""), # Clear context display | |
displayed_context_passages: gr.State([]), # Reset context state | |
# Pass through existing results and indices states | |
full_search_results_state: std_results, current_result_index_state: std_index, | |
llm_results_state: llm_results, llm_result_index_state: llm_index, | |
best_results_state: best_results, best_index_state: best_index, | |
direct_embedding_output_holder: None, # Clear embedding when leaving context | |
fav_signal: gr.update(value=current_fav_signal_value), # <--- Pass through fav_signal state | |
active_view_state: "none", # Reset active view temporarily before setting correct one | |
# MODIFIED: Ensure the new result components are cleared before potentially showing results | |
result_accordion: gr.update(label="...", open=False, visible=False), | |
result_metadata_display: gr.update(value=""), | |
result_text: gr.update(value="", visible=False), | |
} | |
# Hide status message | |
updates_dict[status_message] = gr.update(value="", visible=False) | |
# Determine which result view to show based on where we came from | |
target_view = "none" | |
target_results_list = [] | |
target_index = 0 | |
result_type = "unknown" # Used for formatting | |
if last_active_view == "context_from_standard": | |
updates_dict[standard_nav_row] = gr.update(visible=True) | |
updates_dict[single_result_group] = gr.update(visible=True) | |
target_view = "standard" | |
target_results_list = std_results | |
target_index = std_index | |
result_type = "standard" | |
logging.info("Going back to Standard results.") | |
elif last_active_view == "context_from_llm": | |
updates_dict[standard_nav_row] = gr.update(visible=True) # Assuming LLM uses standard nav row layout | |
updates_dict[single_result_group] = gr.update(visible=True) # Assuming LLM uses standard group layout | |
target_view = "llm" | |
target_results_list = llm_results | |
target_index = llm_index | |
result_type = "llm" | |
logging.info("Going back to LLM results.") | |
elif last_active_view == "context_from_favourites": | |
# Assuming favourites use the same display/nav components but potentially managed differently | |
updates_dict[standard_nav_row] = gr.update(visible=True) | |
updates_dict[single_result_group] = gr.update(visible=True) | |
target_view = "favourites" | |
target_results_list = best_results | |
target_index = best_index | |
result_type = "favourites" | |
logging.info("Going back to Favourites.") | |
else: | |
logging.warning(f"Back button triggered from unexpected state: {last_active_view}") | |
# Default to showing standard search if view is unknown or error state | |
updates_dict[standard_nav_row] = gr.update(visible=True) | |
updates_dict[single_result_group] = gr.update(visible=True) | |
# MODIFIED: Set initial state for new components | |
updates_dict[result_accordion] = gr.update(label="Kontextansicht verlassen.", open=False, visible=True) | |
updates_dict[result_metadata_display] = gr.update(value="") | |
updates_dict[result_text] = gr.update(value="", visible=True) | |
target_view = "standard" # Fallback view | |
# Ensure buttons are hidden if no data is available | |
updates_dict[previous_result_button] = gr.update(visible=False, interactive=False) | |
updates_dict[next_result_button] = gr.update(visible=False, interactive=False) | |
updates_dict[weiterlesen_button] = gr.update(visible=False, interactive=False) | |
# Return here if we hit an unknown state | |
updates_dict[active_view_state] = target_view # Set fallback view state | |
return updates_dict | |
# Update the active_view state to the results view we returned to | |
updates_dict[active_view_state] = target_view | |
# Now manually update the result display and navigation buttons for the target view | |
if target_results_list and isinstance(target_results_list, list) and 0 <= target_index < len(target_results_list): | |
result_data = target_results_list[target_index] | |
# MODIFIED: Use the combined formatter and update new components | |
accordion_title, accordion_content_md, text_content = format_result_display(result_data, target_index, len(target_results_list), result_type) | |
updates_dict[result_accordion] = gr.update(visible=True, label=accordion_title, open=False) | |
updates_dict[result_metadata_display] = gr.update(value=accordion_content_md) | |
updates_dict[result_text] = gr.update(value=text_content, visible=True) | |
# Update button interactivity based on the selected index and total results | |
updates_dict[previous_result_button] = gr.update(visible=True, interactive=(target_index > 0)) | |
updates_dict[next_result_button] = gr.update(visible=True, interactive=(target_index < len(target_results_list) - 1)) | |
updates_dict[weiterlesen_button] = gr.update(visible=True, interactive=True, value="weiterlesen" if result_type != "llm" else "im Original weiterlesen") | |
else: | |
# If the result list is empty or invalid, show appropriate message | |
error_msg_label = f"_{target_view.capitalize()}-Resultate nicht verfΓΌgbar._" | |
error_msg_content = "" # No content for metadata | |
updates_dict[result_accordion] = gr.update(visible=True, label=error_msg_label, open=False) | |
updates_dict[result_metadata_display] = gr.update(value=error_msg_content) | |
updates_dict[result_text] = gr.update(value="", visible=True) # Clear text area | |
# Hide navigation buttons as there are no results to navigate | |
updates_dict[previous_result_button] = gr.update(visible=False, interactive=False) | |
updates_dict[next_result_button] = gr.update(visible=False, interactive=False) | |
updates_dict[weiterlesen_button] = gr.update(visible=False, interactive=False) | |
return updates_dict | |
# --- Load More Context Function --- | |
def load_more_context(direction, current_passages_state, query_embedding_value): | |
"""Loads the previous or next paragraph in the reading view.""" | |
logging.info(f"--- Loading More Context: Direction={direction} ---") | |
# Log embedding details for debugging highlighting | |
logging.debug(f"load_more_context: Received query_embedding_value type: {type(query_embedding_value)}, len/shape: {len(query_embedding_value) if isinstance(query_embedding_value, (list, np.ndarray)) else 'N/A'}") | |
# --- Initial Checks --- | |
if collection is None: | |
logging.error("Cannot load more context: DB collection not available.") | |
err_msg = format_context_markdown(current_passages_state or [], query_embedding_value) + "\n\n**Fehler: Datenbank nicht verfΓΌgbar.**" | |
return err_msg, current_passages_state # Return existing state | |
if not current_passages_state or not isinstance(current_passages_state, list): | |
logging.warning("load_more_context called with empty or invalid current passage state.") | |
return "_Keine Passage geladen, kann nicht mehr Kontext laden._", [] | |
# Define marker IDs used to indicate boundaries | |
START_MARKER_ID = '-1' # Represents reaching the beginning | |
END_MARKER_ID = 'END_MARKER_ID' # Represents reaching the end | |
try: | |
# --- Determine Boundary and Target Paragraph --- | |
# Ensure current state is sorted (should be, but safe) | |
current_passages_state.sort(key=lambda x: (x.get('paragraph_index', -1), x.get('sentence_sort_key', float('inf')))) | |
boundary_passage = None | |
target_paragraph_index = -1 # Target index to fetch | |
add_at_beginning = False # Flag to prepend or append new paragraph | |
if direction == 'previous': | |
add_at_beginning = True | |
# Find the first non-missing passage to use as the boundary reference | |
first_content_passage = next((p for p in current_passages_state if p.get('role') != 'missing'), None) | |
if not first_content_passage: | |
logging.warning("Context state contains only markers or is empty. Cannot load previous paragraph.") | |
# Format existing (only markers) and return current state | |
return format_context_markdown(current_passages_state, query_embedding_value), current_passages_state | |
boundary_passage = first_content_passage | |
# Check if we are already at the start boundary (by looking at the ID of the very first item) | |
if current_passages_state[0].get('id') == START_MARKER_ID: | |
logging.info("Already at the start boundary marker. No previous paragraph to load.") | |
# Reformat existing content (no change expected) and return current state | |
return format_context_markdown(current_passages_state, query_embedding_value), current_passages_state | |
current_para_idx = boundary_passage.get('paragraph_index') | |
# Calculate target index, handle None or 0 index | |
if current_para_idx is None or not isinstance(current_para_idx, int) or current_para_idx <= 0: | |
target_paragraph_index = -2 # Indicates we've hit the conceptual start (index < 0) | |
else: | |
target_paragraph_index = current_para_idx - 1 | |
elif direction == 'next': | |
add_at_beginning = False | |
# Find the last non-missing passage to use as the boundary reference | |
last_content_passage = next((p for p in reversed(current_passages_state) if p.get('role') != 'missing'), None) | |
if not last_content_passage: | |
logging.warning("Context state contains only markers or is empty. Cannot load next paragraph.") | |
return format_context_markdown(current_passages_state, query_embedding_value), current_passages_state | |
boundary_passage = last_content_passage | |
# Check if we are already at the end boundary (by looking at the ID of the very last item) | |
if current_passages_state[-1].get('id') == END_MARKER_ID: | |
logging.info("Already at the end boundary marker. No next paragraph to load.") | |
return format_context_markdown(current_passages_state, query_embedding_value), current_passages_state | |
current_para_idx = boundary_passage.get('paragraph_index') | |
# Check for missing index on the boundary passage | |
if current_para_idx is None or not isinstance(current_para_idx, int): | |
logging.error("Cannot load next paragraph: current boundary passage is missing a valid paragraph index.") | |
err_msg = format_context_markdown(current_passages_state, query_embedding_value) + "\n\n**Fehler: Interner Zustand inkonsistent (fehlender Paragraph-Index).**" | |
return err_msg, current_passages_state | |
target_paragraph_index = current_para_idx + 1 | |
else: | |
logging.error(f"Invalid direction '{direction}' provided to load_more_context.") | |
return format_context_markdown(current_passages_state, query_embedding_value), current_passages_state # Return unchanged | |
# --- Fetch New Paragraph Data --- | |
new_paragraph_sentences = [] | |
boundary_hit = False # Flag if we reached start/end of book/section | |
new_passage_added = False # Flag if actual content was added/changed | |
# Extract author/book from the boundary passage's metadata | |
boundary_meta = boundary_passage.get('meta', {}) if boundary_passage else {} | |
author = boundary_meta.get('author') | |
book = boundary_meta.get('book') | |
# Fetch if target index is valid and we have author/book context | |
if target_paragraph_index >= 0 and author and book: | |
logging.info(f"Attempting to load paragraph {target_paragraph_index} for {author}/{book}") | |
new_paragraph_sentences = fetch_paragraph_data(author, book, target_paragraph_index) | |
if not new_paragraph_sentences: | |
# Successfully queried but found no sentences -> boundary hit | |
boundary_hit = True | |
logging.info(f"Boundary hit: No sentences found for paragraph {target_paragraph_index}.") | |
else: | |
# Successfully fetched new sentences | |
new_passage_added = True | |
logging.info(f"Successfully fetched {len(new_paragraph_sentences)} sentences for paragraph {target_paragraph_index}.") | |
elif target_paragraph_index == -2: | |
# Explicitly hit the start boundary based on index calculation | |
boundary_hit = True | |
logging.info("Boundary hit: Reached beginning (index <= 0).") | |
else: | |
# Invalid state (e.g., missing author/book on boundary passage) | |
logging.error(f"Cannot load more context: Invalid target index ({target_paragraph_index}) or missing author/book from boundary passage {boundary_passage.get('id') if boundary_passage else 'N/A'}.") | |
boundary_hit = True # Treat as boundary hit to potentially add marker | |
# --- Update Passages State --- | |
updated_passages = list(current_passages_state) # Create a mutable copy | |
# Remove existing boundary markers before adding new content/markers | |
updated_passages = [p for p in updated_passages if p.get('role') != 'missing'] | |
if new_passage_added: | |
# Add the newly fetched sentences | |
if add_at_beginning: | |
updated_passages = new_paragraph_sentences + updated_passages # Prepend | |
else: | |
updated_passages.extend(new_paragraph_sentences) # Append | |
# Only add boundary marker if new content wasn't added AND we hit a boundary | |
# (or if it was a boundary hit but fetch_paragraph_data returned empty). | |
# This prevents adding a boundary marker if the next paragraph exists but is empty, | |
# unless we are at the absolute start/end (target_paragraph_index == -2 or the fetch returns empty). | |
# Also ensure we don't add duplicate markers. | |
if boundary_hit: | |
if add_at_beginning: # Hit previous boundary | |
if not updated_passages or updated_passages[0].get('id') != START_MARKER_ID: | |
updated_passages.insert(0, {'id': START_MARKER_ID, 'paragraph_index': -1, 'role': 'missing', 'doc': '_(Anfang des Buches/Abschnitts)_', 'meta': {}, 'sentence_sort_key': float('-inf'), 'embedding': None}) | |
# new_passage_added = True # Marker addition counts as change | |
else: # Hit next boundary | |
if not updated_passages or updated_passages[-1].get('id') != END_MARKER_ID: | |
updated_passages.append({'id': END_MARKER_ID, 'paragraph_index': float('inf'), 'role': 'missing', 'doc': '_(Ende des Buches/Abschnitts)_', 'meta': {}, 'sentence_sort_key': float('inf'), 'embedding': None}) | |
# new_passage_added = True # Marker addition counts as change | |
# --- Reformat and Return --- | |
# Reformat only if the content of `updated_passages` actually changed (new passage or marker added) | |
# or if the original state had markers removed. | |
# Compare length or check if new_passage_added or boundary_hit. | |
content_changed = new_passage_added or (boundary_hit and len(updated_passages) != len(current_passages_state)) # Simple check for now | |
if content_changed or not updated_passages: # Also reformat if state became empty | |
# Ensure final list is sorted correctly including any added markers/paragraphs | |
updated_passages.sort(key=lambda x: (x.get('paragraph_index', -1), x.get('sentence_sort_key', float('inf')))) | |
logging.info(f"Reformatting context with {len(updated_passages)} total passages after loading more.") | |
# Use the VALID query embedding passed into the function for consistent highlighting | |
context_md = format_context_markdown(updated_passages, query_embedding_value) | |
# Return the new Markdown and the updated state list | |
return context_md, updated_passages | |
else: | |
# No new passage or boundary marker state change. | |
# Reformat existing content just in case metadata/sorting needed fixing, return original state list | |
logging.debug(f"Load Context: No change in passages or boundary marker state for direction '{direction}'. Reformatting existing state.") | |
# Re-sort the original state list just in case, then format it. | |
current_passages_state.sort(key=lambda x: (x.get('paragraph_index', -1), x.get('sentence_sort_key', float('inf')))) | |
original_context_md = format_context_markdown(current_passages_state, query_embedding_value) | |
# Return the reformatted original markdown and the original state list | |
return original_context_md, current_passages_state | |
except Exception as e: | |
logging.error(f"Error loading more context (paragraph mode): {e}", exc_info=True) | |
# Format existing content + error message, return original state | |
error_message = format_context_markdown(current_passages_state or [], query_embedding_value) + f"\n\n**Fehler beim Laden des nΓ€chsten/vorherigen Paragraphen.**" | |
return error_message, current_passages_state | |
# --- Load More Context Function --- | |
def load_more_context_wrapper(direction, current_passages_state, query_embedding_value): | |
"""Loads the previous or next paragraph in the reading view.""" | |
logging.info(f"Triggered: load_more_context_wrapper direction={direction}") | |
# This function's outputs are only context_display and displayed_context_passages state. | |
# It does NOT affect the overall UI layout or result list navigation buttons. | |
output_components = [context_display, displayed_context_passages] | |
try: | |
context_md, updated_passages_state = load_more_context(direction, current_passages_state, query_embedding_value) | |
# load_more_context returns a tuple (markdown_str, updated_state_list) | |
# Map these directly to the output components | |
updates_list = [ | |
gr.update(value=context_md), # update context_display | |
updated_passages_state # update displayed_context_passages state | |
] | |
logging.debug(f"load_more_context_wrapper: Returning {len(updates_list)} updates.") | |
return updates_list | |
except Exception as e: | |
logging.error(f"Error in load_more_context wrapper: {e}", exc_info=True) | |
# On error, return error message and original state | |
error_md = format_context_markdown(current_passages_state or [], query_embedding_value) + f"\n\n**Fehler beim Laden des nΓ€chsten/vorherigen Paragraphen.**" | |
updates_list = [ | |
gr.update(value=error_md), | |
current_passages_state # Return original state on error | |
] | |
return updates_list | |
# --- Modified _on_fav function --- | |
# This function is triggered by the hidden button click via api_name | |
# It expects the passage_id as its argument, provided by the JS Client API predict call. | |
def _on_fav(passage_id): # Removed type hint str for debugging | |
"""Handles favourite signal from JS, only increments and updates status.""" | |
# Log the type and value of the received argument | |
logging.info(f"Triggered: _on_fav with received argument: {passage_id!r} (Type: {type(passage_id)})") | |
updates_dict = { | |
fav_signal: gr.update(value=""), # Always clear the signal textbox after processing | |
status_message: gr.update(visible=False, value="") # Clear status initially | |
} | |
# Check if passage_id is a non-empty string | |
if not isinstance(passage_id, str) or not passage_id.strip(): | |
logging.warning(f"_on_fav called with invalid passage_id: {passage_id!r}.") | |
updates_dict[status_message] = gr.update(visible=True, value="**Fehler:** UngΓΌltige Favoriten-ID erhalten.") | |
return updates_dict # Return the updates dictionary | |
try: | |
# Call the core logic to increment the favourite score | |
new_score = inc_favourite(passage_id) # Use the valid passage_id string | |
logging.info(f"Successfully incremented favourite for ID {passage_id}. New score: {new_score}") | |
# Update the status message to inform the user | |
updates_dict[status_message] = gr.update(visible=True, value=f"β Favorit gespeichert! (Score: {new_score})") | |
except Exception as e: | |
logging.error(f"Error in _on_fav processing ID {passage_id}: {e}", exc_info=True) | |
# Update status message with error info | |
updates_dict[status_message] = gr.update(visible=True, value=f"**Fehler beim Speichern des Favoriten:** {e}") | |
# This function returns a dictionary of updates for its bound outputs. | |
# These are just the fav_signal state (to reset it) and the status_message UI element. | |
return updates_dict | |
js_code = """ | |
// ------------------------------------------------------------ | |
// FAVOURITE HANDLER (uses Gradio JS Client predict endpoint) | |
// ------------------------------------------------------------ | |
let gradioApp = null; // will hold the connected client | |
const ENDPOINT = "/fav"; // same name you set in api_name | |
const STATUS_SEL = '#status-message'; // Selector for the markdown status element | |
// const FAV_SIGNAL_ID = 'fav-signal'; // No longer directly interacting with fav-signal textbox from JS click handler | |
// const DEBUG_ID_SEL = '#clicked-id-debug input'; // Original selector | |
// const DEBUG_ELEM_ID = 'clicked-id-debug'; // The elem_id for the debug textbox container <--- REMOVE THIS | |
// 1 β connect once, then reβuse | |
async function initializeFavClient() { | |
console.log("JS: Initializing fav clientβ¦"); | |
try { | |
// Assuming Client is made global by the <script type="module"> tag in head | |
if (typeof Client === "undefined") { | |
console.error("JS: window.Client not defined (script order?)"); | |
return; | |
} | |
gradioApp = await Client.connect(window.location.origin); | |
console.log("JS: Gradio Client connected."); | |
} catch (e) { | |
console.error("JS: Could not connect:", e); | |
} | |
} | |
setTimeout(initializeFavClient, 100); // Initialize client after a short delay | |
// Basic HTML escape function for safety in JS error display | |
function htmlEscape(str) { | |
return str.replace(/&/g, '&') | |
.replace(/</g, '<') | |
.replace(/>/g, '>') | |
.replace(/"/g, '"') | |
.replace(/'/g, '''); | |
} | |
// 2 β call backend when user clicks a sentence | |
async function gradio_fav(id) { | |
const statusEl = document.querySelector(STATUS_SEL); | |
if (statusEl) { | |
statusEl.style.display = ''; // Make visible | |
statusEl.innerHTML = "Speichere Favoritβ¦"; // Set loading message using innerHTML | |
} | |
console.log(`JS: Calling backend ${ENDPOINT} with ID: ${id}`); // Log before calling predict | |
if (!gradioApp) { | |
console.warn("JS: client not ready yet for /fav call."); | |
if (statusEl) statusEl.innerHTML = "**Fehler:** Gradio Client nicht verbunden."; | |
return; | |
} | |
try { | |
// Pass the ID as a string in an array - matches backend inputs=[fav_signal] | |
const res = await gradioApp.predict(ENDPOINT, [String(id)]); | |
console.log(`JS: ${ENDPOINT} predict call response:`, res); // Log the full response | |
// Expected response structure from _on_fav is [updated_fav_signal_value, updated_status_message_value] | |
// We only care about the status message update | |
const msg = res?.data?.[1]?.value ?? "β Favorit gespeichert!"; // Get the updated value for status_message (index 1) | |
const is_status_visible = res?.data?.[1]?.visible ?? true; // Check if status should be visible | |
if (statusEl) { | |
statusEl.innerHTML = msg; | |
statusEl.style.display = is_status_visible ? '' : 'none'; // Set visibility | |
} | |
} catch (e) { | |
console.error(`JS: backend ${ENDPOINT} error:`, e); | |
// Attempt to get a more specific error message if available | |
let errorMsg = "Unbekannter Fehler"; | |
if (e && typeof e === 'object' && e.message) { | |
errorMsg = e.message; | |
} else if (typeof e === 'string') { | |
errorMsg = e; | |
} else if (e && typeof e === 'object' && e.name) { | |
errorMsg = `${e.name}: ${errorMsg}`; | |
} else if (e && typeof e === 'object' && e.stack) { | |
errorMsg = `${errorMsg} (See console for stack trace)`; | |
} | |
if (statusEl) { | |
statusEl.style.display = ''; // Make visible | |
statusEl.innerHTML = `**Fehler:** ${htmlEscape(errorMsg)}`; // Escape error message for safety | |
} | |
} | |
} | |
// 3 β delegate clicks on any span.clickableβsentence | |
document.addEventListener("click", ev => { | |
const span = ev.target.closest("span.clickable-sentence[data-id]"); | |
if (!span) return; // Not a clickable sentence | |
const passageId = span.dataset.id; // Get the ID from the data attribute | |
console.log(`JS: Clicked sentence. Found data-id: ${passageId}`); // Log the found ID | |
// --- DEBUG: Update debug textbox --- | |
// REMOVE ALL THE DEBUG TEXTBOX JS CODE FROM HERE | |
// const debugElement = document.getElementById(DEBUG_ELEM_ID); // Get element by elem_id | |
// let clickedIdDebugInput = null; | |
// ... rest of debug js code ... | |
// --- END DEBUG --- | |
if (passageId && String(passageId).trim() !== '' && String(passageId) !== 'undefined' && String(passageId) !== 'null') { | |
gradio_fav(passageId); // Call the function with the ID (as string) | |
} else { | |
console.warn("JS: Clickable span found, but missing or empty data-id attribute."); | |
} | |
}); | |
""" | |
# --- Gradio UI Definition --- | |
# Pass the JavaScript code and Custom CSS to the 'head' and 'css' parameters of gr.Blocks | |
custom_css = """ | |
/* Style for clickable sentences */ | |
span.clickable-sentence { | |
cursor: pointer; /* Ensure pointer cursor */ | |
} | |
/* Style for highlighted sentences (base style) */ | |
/* Uses a CSS variable for dynamic alpha */ | |
span.clickable-sentence.highlighted { /* Target the single span when it has both classes */ | |
/* Base highlight color using the dynamic alpha variable */ | |
background-color: hsla(var(--highlight-hue, 60), var(--highlight-saturation, 100%), var(--highlight-lightness, 90%), var(--highlight-alpha)); | |
/* Fixed styles for highlighting shape */ | |
padding: 1px 3px; | |
border-radius: 3px; | |
/* Prevents highlight from breaking awkwardly across lines */ | |
box-decoration-break: clone; | |
-webkit-box-decoration-break: clone; /* For older WebKit browsers */ | |
} | |
/* Style for sentences on hover (applies to ALL clickable spans, highlighted or not) */ | |
/* This rule has higher specificity than span.clickable-sentence.highlighted */ | |
span.clickable-sentence:hover { | |
/* Hover background color - this will override the base highlight color */ | |
background-color: hsla(60, 100%, 70%, 0.8); /* Yellow-ish, more opaque */ | |
transition: background-color 0.2s ease; /* Smooth transition */ | |
} | |
/* Style for the status message to make it stand out */ | |
#status-message { | |
margin-top: 10px; /* Space above the message */ | |
padding: 8px; /* Padding inside the message box */ | |
border-radius: 5px; /* Rounded corners */ | |
background-color: #fff3cd; /* Light yellow background (for info/success) */ | |
color: #664d03; /* Dark yellow text */ | |
border: 1px solid #ffecb5; /* Yellow border */ | |
visibility: visible; /* Make it visible initially */ | |
opacity: 1; /* Fully opaque */ | |
transition: opacity 0.5s ease-in-out; /* Fade effect */ | |
} | |
/* Style for error status message */ | |
#status-message strong { | |
color: #842029; /* Dark red for errors */ | |
} | |
/* You could add data-error="true" using JS/Gradio update for specific error styling */ | |
/* #status-message[data-error="true"] { | |
background-color: #f8d7da; | |
color: #721c24; | |
border-color: #f5c6cb; | |
} */ | |
""" | |
with gr.Blocks(theme = gr.themes.Default( | |
primary_hue="yellow", | |
secondary_hue="blue", | |
text_size="lg", | |
spacing_size="md", | |
radius_size="md", | |
), | |
head=f""" | |
<script type="module"> | |
import {{ Client as GradioClient }} from | |
"https://cdn.jsdelivr.net/npm/@gradio/client/dist/index.min.js"; | |
window.Client = GradioClient; // expose for the page | |
</script> | |
<script> | |
{js_code} | |
</script> | |
""", | |
css=custom_css # Add the custom CSS here | |
) as demo: | |
gr.Markdown("# Thought Loop") | |
gr.Markdown("Semantische Suche") | |
# --- State Variables --- | |
full_search_results_state = gr.State([]) # Stores results from Standard search | |
current_result_index_state = gr.State(0) # Index for Standard search results | |
llm_results_state = gr.State([]) # Stores results from LLM search | |
llm_result_index_state = gr.State(0) # Index for LLM search results | |
best_results_state = gr.State([]) # Stores results from Favourites view | |
best_index_state = gr.State(0) # Index for Favourites results | |
displayed_context_passages = gr.State([]) # Stores passages currently in context view | |
# active_view_state tracks which view is active: | |
# "standard", "llm", "favourites", "context_from_standard", "context_from_llm", "context_from_favourites", "none" | |
active_view_state = gr.State("none") | |
# Holds the query embedding for highlighting in the context view. | |
# Needs to be passed through UI events that transition *to* the context view. | |
direct_embedding_output_holder = gr.State(None) | |
# --- UI Layout --- | |
with gr.Row(): | |
query_input = gr.Textbox(label="Gedanken eingeben", placeholder="Sollte Technologie nicht zu immer krasserer Arbeitsteilung fΓΌhren, sodass wir in Zukunft...", lines=2, scale=4) | |
author_dropdown = gr.Dropdown(label="Autoren auswΓ€hlen (optional)", choices=unique_authors, multiselect=True, scale=2) | |
with gr.Accordion("Feinabstimmung Rankierung", open=False) as result_tuning_accordion: | |
with gr.Row(): | |
window_size_slider = gr.Slider( | |
minimum=0, maximum=5, step=1, value=RERANK_WINDOW_SIZE, | |
label="Kontext-FenstergrΓΆΓe (+/- SΓ€tze)", | |
info="Wie viele SΓ€tze vor/nach dem Treffer-Satz fΓΌr Kontext-Score & Anzeige berΓΌcksichtigt werden (0-5)." | |
) | |
weight_slider = gr.Slider( | |
minimum=0.0, maximum=1.0, step=0.05, value=RERANK_WEIGHT, | |
label="Kontext-Gewichtung", | |
info="Wie stark der Kontext-Score das ursprΓΌngliche Ranking beeinflusst (0.0 = kein Einfluss, 1.0 = stark)." | |
) | |
decay_slider = gr.Slider( | |
minimum=0.0, maximum=1.0, step=0.05, value=RERANK_DECAY, | |
label="Kontext-Abfallfaktor", | |
info="Wie schnell der Einfluss von Nachbarn mit der Distanz abnimmt (0.0 = kein Abfall, 1.0 = stark)." | |
) | |
with gr.Row(): | |
search_button = gr.Button("Embeddingsuche", variant="secondary", scale=1) | |
llm_rerank_button = gr.Button("Embeddingsuche + LLM Auswahl", variant="secondary", scale=1, interactive=(API_KEY is not None and llm_rerank_model is not None)) | |
best_of_button = gr.Button("βββ", variant="secondary", scale=1) | |
# --- Shared Results/Favourites Area --- | |
# We reuse standard_nav_row and single_result_group for all result types | |
with gr.Row(visible=False) as standard_nav_row: | |
# These buttons will be shown/hidden based on active_view_state | |
previous_result_button = gr.Button("β¬ οΈ", min_width=80, visible=False) # General Previous | |
next_result_button = gr.Button("β‘οΈ", min_width=80, visible=False) # General Next | |
weiterlesen_button = gr.Button("weiterlesen", variant="secondary", visible=False) # General Weiterlesen | |
with gr.Group(visible=False) as single_result_group: | |
# MODIFIED: Replaced single_result_display_md with an Accordion and a Textbox | |
result_accordion = gr.Accordion(label="Feinabstimmung", open=False) # Accordion for heading and metadata | |
with result_accordion: | |
# The content of the accordion will be a Markdown component | |
result_metadata_display = gr.Markdown("...") # Placeholder for metadata and scores | |
# This Textbox will contain the actual passage text | |
result_text = gr.Textbox(label="", lines=5, interactive=False, visible=True) | |
# --- Status Message Area --- | |
# Added elem_id for JS to target | |
status_message = gr.Markdown("", visible=False, elem_id="status-message") # Changed to visible=False initially | |
# --- Hidden Signaling Components --- | |
# Hidden textbox to hold the ID (will be used as input in client API call) | |
# JS click handler now uses the client API directly, no longer sets this textbox value | |
# This component's *value* is still used as an output by _on_fav to reset it. | |
fav_signal = gr.Textbox( | |
visible=False, | |
elem_id="fav-signal", # Still useful for potential future JS interactions or debugging | |
value="" # Initialize with empty value | |
) | |
# Hidden button triggered by JS (used to expose the backend function via its api_name binding) | |
# The Client API calls the function bound to the api_name, not the button's click *event*. | |
# This button component is mainly here to provide a place for the api_name binding. | |
fav_trigger_button = gr.Button( | |
visible=False, | |
elem_id="fav-trigger-button" # Still useful for JS to get a reference if needed, though not clicked directly anymore | |
) | |
# --- Reading Area --- | |
with gr.Column(visible=False) as context_area: | |
back_to_results_button = gr.Button("β¬ οΈ ZurΓΌck ", variant="secondary", visible=False) | |
load_previous_button = gr.Button("β¬οΈ", variant="secondary", visible=False) # Added text | |
# --- MODIFIED: Added elem_id to context_display --- | |
context_display = gr.HTML(label="Lesebereich", value="<div>_Kontext wird hier angezeigt._</div>", elem_id="context-display-markdown") # gr.HTML needs valid HTML, so wrap placeholder in div | |
load_next_button = gr.Button("β¬οΈ", variant="secondary", visible=False) # Added text | |
# --- Utility function to create a reset update dictionary --- | |
# This function needs to be defined AFTER all the components it references | |
def create_reset_updates(): | |
"""Creates a dictionary of Gradio updates to reset the UI and state.""" | |
updates = {} | |
# List all components that need resetting/hiding, *excluding* the sliders and the Accordion content display | |
components_to_reset = [ | |
# States | |
full_search_results_state, current_result_index_state, displayed_context_passages, | |
llm_results_state, llm_result_index_state, active_view_state, | |
direct_embedding_output_holder, | |
best_results_state, best_index_state, | |
fav_signal, # <-- Included here as a state to reset its value | |
# Shared Result UI - Containers | |
standard_nav_row, single_result_group, | |
# Shared Result UI - New Components | |
result_accordion, result_metadata_display, result_text, | |
# Tuning Accordion | |
result_tuning_accordion, | |
# Buttons in shared row | |
previous_result_button, next_result_button, weiterlesen_button, | |
# Context Area UI | |
context_area, context_display, load_previous_button, load_next_button, | |
back_to_results_button, | |
# Status message | |
status_message, | |
# fav_trigger_button is intentionally excluded here as its visibility/interactivity isn't controlled by this reset. | |
] | |
for comp in components_to_reset: | |
if isinstance(comp, gr.State): | |
if comp in [current_result_index_state, llm_result_index_state, best_index_state]: updates[comp] = 0 | |
elif comp == active_view_state: updates[comp] = "none" | |
elif comp == direct_embedding_output_holder: updates[comp] = None | |
# Note: fav_signal state value is reset below explicitly | |
elif comp in [full_search_results_state, displayed_context_passages, llm_results_state, best_results_state]: updates[comp] = [] | |
else: # UI Components | |
if isinstance(comp, gr.Markdown): | |
updates[comp] = gr.update(value="") # Clear Markdown content | |
elif isinstance(comp, gr.HTML): | |
updates[comp] = gr.update(value="<div>_Kontext wird hier angezeigt._</div>") # Reset HTML content | |
elif isinstance(comp, gr.Textbox): # Handle Textboxes | |
# result_text needs value reset, visibility handled by single_result_group | |
if comp == result_text: | |
updates[comp] = gr.update(value="", interactive=False) # Keep interactive=False for results view | |
# fav_signal needs value reset AND explicit visibility set to False | |
elif comp == fav_signal: | |
updates[comp] = gr.update(value="", visible=False) | |
# Add any other Textboxes here if needed | |
elif isinstance(comp, gr.Accordion): # New Accordion | |
updates[comp] = gr.update(label="Feinabstimmung", open=False, visible=True) # Reset label, close, keep visible. Visibility controlled by single_result_group. | |
if isinstance(comp, (gr.Row, gr.Group, gr.Column)): | |
# Keep tuning accordion open/visible (Accordion itself isn't in this list, but its contents are) | |
if comp not in []: # Add any other components that should NOT be hidden here | |
updates[comp] = gr.update(visible=False) | |
if isinstance(comp, gr.Button): | |
updates[comp] = gr.update(visible=False, interactive=False) | |
if comp == status_message: | |
updates[comp] = gr.update(value="", visible=False) | |
# Explicitly set tuning sliders to be visible and interactive on reset, | |
# but *don't* reset their values here. Their current values will be retained. | |
# These sliders are NOT included in the components_to_reset list above, | |
# so they won't be affected by the generic hide logic. | |
updates[window_size_slider] = gr.update(visible=True, interactive=True) | |
updates[weight_slider] = gr.update(visible=True, interactive=True) | |
updates[decay_slider] = gr.update(visible=True, interactive=True) | |
# The result_metadata_display (inside the accordion) also needs resetting | |
updates[result_metadata_display] = gr.update(value="...") | |
logging.debug(f"Created reset updates dict with {len(updates)} items.") | |
return updates | |
# --- Wrapper Functions for Gradio Bindings --- | |
# These wrappers prepare the inputs and outputs for the Gradio event handlers. | |
# They return a dictionary of updates which is then converted to a list by Gradio. | |
def search_standard_wrapper(query, selected_authors, window_size, weight, decay): | |
logging.info(f"Triggered: search_standard_wrapper with window={window_size}, weight={weight:.2f}, decay={decay:.2f}") | |
# Start with a reset state (Includes hiding context area and its buttons) | |
updates_dict = create_reset_updates() | |
try: | |
search_results, query_embedding = perform_search_standard( | |
query, selected_authors, | |
window_size, weight, decay | |
) | |
# Merge updates from the mode-specific UI function (Shows results area) | |
# search_standard_mode_ui now handles updating the new components | |
updates_dict.update(search_standard_mode_ui(search_results, query_embedding)) | |
except Exception as e: | |
logging.error(f"Error in search_standard_wrapper: {e}", exc_info=True) | |
# MODIFIED: Update the new components on error | |
updates_dict[result_accordion] = gr.update(label=f"**Fehler bei der Suche:**", open=False, visible=True) | |
updates_dict[result_metadata_display] = gr.update(value=str(e)) # Display error message in metadata area | |
updates_dict[result_text] = gr.update(value="", visible=True) | |
updates_dict[single_result_group] = gr.update(visible=True) # Ensure the result group is visible | |
updates_dict[direct_embedding_output_holder] = None | |
# --- FIX: Ensure context area and its buttons are hidden when showing search results --- | |
# Although create_reset_updates is called, add explicit updates for robustness | |
updates_dict[context_area] = gr.update(visible=False) | |
updates_dict[load_previous_button] = gr.update(visible=False) | |
updates_dict[load_next_button] = gr.update(visible=False) | |
updates_dict[back_to_results_button] = gr.update(visible=False) | |
# --- END FIX --- | |
# Return the dictionary of updates | |
return updates_dict | |
def search_llm_rerank_wrapper(query, selected_authors, window_size, weight, decay): | |
logging.info(f"Triggered: search_llm_rerank_wrapper with window={window_size}, weight={weight:.2f}, decay={decay:.2f}") | |
# Start with a reset state (Includes hiding context area and its buttons) | |
updates_dict = create_reset_updates() | |
try: | |
llm_results, query_embedding = perform_search_llm( | |
query, selected_authors, | |
window_size, weight, decay | |
) | |
# Merge updates from the mode-specific UI function (Shows LLM results area) | |
# search_llm_rerank_mode_ui now handles updating the new components | |
updates_dict.update(search_llm_rerank_mode_ui(llm_results, query_embedding)) | |
except Exception as e: | |
logging.error(f"Error in search_llm_rerank_wrapper: {e}", exc_info=True) | |
# MODIFIED: Update the new components on error | |
updates_dict[result_accordion] = gr.update(label=f"**Fehler bei der LLM-Suche:**", open=False, visible=True) | |
updates_dict[result_metadata_display] = gr.update(value=str(e)) # Display error message | |
updates_dict[result_text] = gr.update(value="", visible=True) | |
updates_dict[single_result_group] = gr.update(visible=True) # Ensure group is visible | |
updates_dict[direct_embedding_output_holder] = None | |
# --- FIX: Ensure context area and its buttons are hidden when showing LLM results --- | |
# Although create_reset_updates is called, add explicit updates for robustness | |
updates_dict[context_area] = gr.update(visible=False) | |
updates_dict[load_previous_button] = gr.update(visible=False) | |
updates_dict[load_next_button] = gr.update(visible=False) | |
updates_dict[back_to_results_button] = gr.update(visible=False) | |
# --- END FIX --- | |
# Return the dictionary of updates | |
return updates_dict | |
def refresh_best_wrapper(): | |
"""Wrapper for _refresh_best to prepare UI updates.""" | |
logging.info("Triggered: refresh_best_wrapper") | |
# Start with a reset state (Includes hiding context area and its buttons) | |
updates_dict = create_reset_updates() | |
# Ensure status message is hidden on view change | |
updates_dict[status_message] = gr.update(value="", visible=False) | |
try: | |
favs = top_favourites(MAX_FAVOURITES) | |
if not favs: | |
logging.info("No favourites to display.") | |
# MODIFIED: Update the new components for no results | |
updates_dict[result_accordion] = gr.update(label="_Noch keine Favoriten gesammelt.", open=False, visible=True) | |
updates_dict[result_metadata_display] = gr.update(value="") | |
updates_dict[result_text] = gr.update(value="", visible=True) | |
updates_dict[single_result_group] = gr.update(visible=True) # Ensure group is visible | |
updates_dict[best_results_state] = [] | |
updates_dict[best_index_state] = 0 | |
updates_dict[active_view_state] = "favourites" # Set view even if empty | |
else: | |
logging.info(f"Displaying first of {len(favs)} favourite results.") | |
# format_result_display returns (accordion_title, accordion_content_md, text_content) | |
accordion_title, accordion_content_md, text_content = format_result_display(favs[0], 0, len(favs), "favourites") | |
# MODIFIED: Update the new components with formatted data | |
updates_dict[result_accordion] = gr.update(label=accordion_title, open=False, visible=True) | |
updates_dict[result_metadata_display] = gr.update(value=accordion_content_md) | |
updates_dict[result_text] = gr.update(value=text_content, visible=True) | |
updates_dict[single_result_group] = gr.update(visible=True) # Ensure group is visible | |
updates_dict[standard_nav_row] = gr.update(visible=True) | |
updates_dict[previous_result_button] = gr.update(visible=True, interactive=False) # First result is not navigable prev | |
updates_dict[next_result_button] = gr.update(visible=True, interactive=(len(favs) > 1)) # Enable if more than one fav | |
updates_dict[weiterlesen_button] = gr.update(visible=True, interactive=True, value="weiterlesen") # Enable context button | |
updates_dict[best_results_state] = favs | |
updates_dict[best_index_state] = 0 | |
updates_dict[active_view_state] = "favourites" # Set active view state | |
except Exception as e: | |
logging.error(f"Error in refresh_best_wrapper: {e}", exc_info=True) | |
# MODIFIED: Update the new components on error | |
updates_dict[result_accordion] = gr.update(label=f"**Fehler beim Laden der Favoriten:**", open=False, visible=True) | |
updates_dict[result_metadata_display] = gr.update(value=str(e)) # Display error message | |
updates_dict[result_text] = gr.update(value="", visible=True) | |
updates_dict[single_result_group] = gr.update(visible=True) # Ensure group is visible | |
updates_dict[best_results_state] = [] | |
updates_dict[best_index_state] = 0 | |
updates_dict[active_view_state] = "none" # Indicate error state | |
# --- FIX: Ensure context area and its buttons are hidden when showing Favourites --- | |
# Although create_reset_updates is called, add explicit updates for robustness | |
updates_dict[context_area] = gr.update(visible=False) | |
updates_dict[load_previous_button] = gr.update(visible=False) | |
updates_dict[load_next_button] = gr.update(visible=False) | |
updates_dict[back_to_results_button] = gr.update(visible=False) | |
# --- END FIX --- | |
# Return the dictionary of updates | |
return updates_dict | |
def navigate_results_wrapper(direction, current_index, full_results, llm_results, llm_index, best_results, best_index, active_view): | |
logging.info(f"Triggered: navigate_results_wrapper direction={direction}, active_view={active_view}") | |
updates_dict = { | |
# Default updates to preserve relevant state based on active view | |
full_search_results_state: full_results, | |
current_result_index_state: current_index, | |
llm_results_state: llm_results, | |
llm_result_index_state: llm_index, | |
best_results_state: best_results, | |
best_index_state: best_index, | |
active_view_state: active_view, # Preserve active view | |
# MODIFIED: Clear new components when navigating (before displaying the next one) | |
result_accordion: gr.update(label="...", open=False, visible=True), | |
result_metadata_display: gr.update(value=""), | |
result_text: gr.update(value="", visible=True), | |
} | |
try: | |
if active_view == "standard": | |
# navigate_results now updates the new components directly | |
nav_updates = navigate_results(direction, current_index, full_results) | |
updates_dict.update(nav_updates) | |
elif active_view == "llm": | |
# navigate_llm_results now updates the new components directly | |
nav_updates = navigate_llm_results(direction, llm_index, llm_results) | |
updates_dict.update(nav_updates) | |
elif active_view == "favourites": | |
# navigate_best_results now updates the new components directly | |
nav_updates = navigate_best_results(direction, best_index, best_results) | |
updates_dict.update(nav_updates) | |
else: | |
logging.warning(f"Navigation triggered in unexpected view state: {active_view}") | |
# MODIFIED: Update new components on error | |
updates_dict[result_accordion] = gr.update(label="Navigation nicht mΓΆglich.", open=False, visible=True) | |
updates_dict[result_metadata_display] = gr.update(value="UngΓΌltiger Status.") | |
updates_dict[result_text] = gr.update(value="", visible=True) | |
# Hide nav buttons as navigation is not possible | |
updates_dict[previous_result_button] = gr.update(interactive=False) | |
updates_dict[next_result_button] = gr.update(interactive=False) | |
updates_dict[weiterlesen_button] = gr.update(interactive=False) | |
except Exception as e: | |
logging.error(f"Error in navigation wrapper: {e}", exc_info=True) | |
# MODIFIED: Update new components on error | |
updates_dict[result_accordion] = gr.update(label=f"**Navigationsfehler:**", open=False, visible=True) | |
updates_dict[result_metadata_display] = gr.update(value=str(e)) | |
updates_dict[result_text] = gr.update(value="", visible=True) | |
# On error, disable navigation buttons | |
updates_dict[previous_result_button] = gr.update(interactive=False) | |
updates_dict[next_result_button] = gr.update(interactive=False) | |
updates_dict[weiterlesen_button] = gr.update(interactive=False) | |
# Return the dictionary of updates | |
# Note: The individual navigate_* functions within the try/except | |
# already populate the updates_dict with the specifics. | |
# We just handle the top-level error/unexpected state here. | |
return updates_dict | |
def go_back_to_results_wrapper(last_active_view, std_results, std_index, llm_results, llm_index, best_results, best_index, current_fav_signal_value): | |
"""Handles UI updates for returning from the context view to the appropriate results view.""" | |
logging.info(f"Triggered: go_back_to_results_wrapper from view: {last_active_view}") | |
updates_dict = { | |
# Reset context area visibility | |
context_area: gr.update(visible=False), | |
context_display: gr.update(value=""), # Clear context display | |
displayed_context_passages: gr.State([]), # Reset context state | |
# Pass through existing results and indices states | |
full_search_results_state: std_results, current_result_index_state: std_index, | |
llm_results_state: llm_results, llm_result_index_state: llm_index, | |
best_results_state: best_results, best_index_state: best_index, | |
direct_embedding_output_holder: None, # Clear embedding when leaving context | |
fav_signal: gr.update(value=current_fav_signal_value), # <--- Pass through fav_signal state | |
active_view_state: "none", # Reset active view temporarily before setting correct one | |
# Ensure the new result components are initially hidden when returning | |
result_accordion: gr.update(label="Feinabstimmung", open=False, visible=False), | |
result_metadata_display: gr.update(value=""), | |
result_text: gr.update(value="", visible=False), | |
# Ensure shared result row and group are initially hidden | |
standard_nav_row: gr.update(visible=False), | |
single_result_group: gr.update(visible=False), | |
# Also ensure all result nav buttons are hidden initially | |
previous_result_button: gr.update(visible=False, interactive=False), | |
next_result_button: gr.update(visible=False, interactive=False), | |
weiterlesen_button: gr.update(visible=False, interactive=False), | |
} | |
# Hide status message | |
updates_dict[status_message] = gr.update(value="", visible=False) | |
# Determine which result view to show based on where we came from | |
target_view = "none" | |
target_results_list = [] | |
target_index = 0 | |
result_type = "unknown" # Used for formatting | |
if last_active_view == "context_from_standard": | |
target_view = "standard" | |
target_results_list = std_results | |
target_index = std_index | |
result_type = "standard" | |
logging.info("Going back to Standard results.") | |
elif last_active_view == "context_from_llm": | |
target_view = "llm" | |
target_results_list = llm_results | |
target_index = llm_index | |
result_type = "llm" | |
logging.info("Going back to LLM results.") | |
elif last_active_view == "context_from_favourites": | |
target_view = "favourites" | |
target_results_list = best_results | |
target_index = best_index | |
result_type = "favourites" | |
logging.info("Going back to Favourites.") | |
else: | |
logging.warning(f"Back button triggered from unexpected state: {last_active_view}") | |
# Default to showing an error message if view is unknown | |
updates_dict[result_accordion] = gr.update(label="ZurΓΌck aus unbekanntem Zustand.", open=False, visible=True) | |
updates_dict[result_metadata_display] = gr.update(value="Resultate konnten nicht geladen werden.") | |
updates_dict[result_text] = gr.update(value="", visible=True) | |
updates_dict[single_result_group] = gr.update(visible=True) # Ensure group is visible | |
updates_dict[standard_nav_row] = gr.update(visible=True) # Ensure nav row is visible (even if buttons are hidden) | |
target_view = "none" # Stay in error state | |
return updates_dict # Return early on error | |
# Update the active_view state to the results view we returned to | |
updates_dict[active_view_state] = target_view | |
# Show the shared result group and nav row | |
updates_dict[single_result_group] = gr.update(visible=True) | |
updates_dict[standard_nav_row] = gr.update(visible=True) | |
# Update the result display and navigation buttons for the target view | |
if target_results_list and isinstance(target_results_list, list) and 0 <= target_index < len(target_results_list): | |
result_data = target_results_list[target_index] | |
# MODIFIED: Use the combined formatter and update new components | |
accordion_title, accordion_content_md, text_content = format_result_display(result_data, target_index, len(target_results_list), result_type) | |
updates_dict[result_accordion] = gr.update(visible=True, label=accordion_title, open=False) | |
updates_dict[result_metadata_display] = gr.update(value=accordion_content_md) | |
updates_dict[result_text] = gr.update(value=text_content, visible=True) | |
# Update button interactivity based on the selected index and total results | |
updates_dict[previous_result_button] = gr.update(visible=True, interactive=(target_index > 0)) | |
updates_dict[next_result_button] = gr.update(visible=True, interactive=(target_index < len(target_results_list) - 1)) | |
updates_dict[weiterlesen_button] = gr.update(visible=True, interactive=True, value="weiterlesen" if result_type != "llm" else "im Original weiterlesen") | |
else: | |
# If the result list is empty or invalid after returning, show appropriate message | |
error_msg_label = f"_{target_view.capitalize()}-Resultate nicht verfΓΌgbar._" | |
error_msg_content = "" # No content for metadata | |
updates_dict[result_accordion] = gr.update(visible=True, label=error_msg_label, open=False) | |
updates_dict[result_metadata_display] = gr.update(value=error_msg_content) | |
updates_dict[result_text] = gr.update(value="", visible=True) # Clear text area | |
# Hide navigation buttons as there are no results to navigate | |
updates_dict[previous_result_button] = gr.update(visible=False, interactive=False) | |
updates_dict[next_result_button] = gr.update(visible=False, interactive=False) | |
updates_dict[weiterlesen_button] = gr.update(visible=False, interactive=False) | |
return updates_dict | |
def move_to_reading_wrapper(std_results, std_index, llm_results, llm_index, best_results, best_index, active_view, query_embedding_value, current_fav_signal_value): | |
logging.info(f"Triggered: move_to_reading_wrapper active_view={active_view}") | |
updates_dict = { | |
# Preserve all state variables by default | |
full_search_results_state: std_results, current_result_index_state: std_index, | |
llm_results_state: llm_results, llm_result_index_state: llm_index, | |
best_results_state: best_results, best_index_state: best_index, | |
active_view_state: active_view, # Preserve active view temporarily | |
direct_embedding_output_holder: query_embedding_value, | |
fav_signal: gr.update(value=current_fav_signal_value) # <--- Pass through fav_signal state | |
} | |
# Hide status message when changing view | |
updates_dict[status_message] = gr.update(value="", visible=False) | |
try: | |
target_results_list = [] | |
target_index = 0 | |
result_type = "unknown" | |
# Identify which result list and index to use based on active_view | |
if active_view == "standard": | |
target_results_list = std_results | |
target_index = std_index | |
result_type = "standard" | |
elif active_view == "llm": | |
target_results_list = llm_results | |
target_index = llm_index | |
result_type = "llm" | |
elif active_view == "favourites": | |
target_results_list = best_results | |
target_index = best_index | |
result_type = "favourites" | |
else: | |
logging.warning(f"Weiterlesen triggered in unexpected view state: {active_view}") | |
updates_dict[context_display] = gr.update(value="Kann Kontext in diesem Zustand nicht laden.") | |
updates_dict[context_area] = gr.update(visible=True) | |
updates_dict[load_previous_button] = gr.update(interactive=False) | |
updates_dict[load_next_button] = gr.update(interactive=False) | |
updates_dict[back_to_results_button] = gr.update(visible=True, interactive=True) | |
updates_dict[active_view_state] = "none" # Indicate an error/transition state | |
return updates_dict # Return early on error | |
# Call the UI function that fetches and formats the initial context | |
# Pass only the data it needs (index within the target list, the list itself, embedding, and type) | |
# The move_to_reading_area_ui function should return a dictionary of updates for UI components like context_display and displayed_context_passages state | |
read_updates = move_to_reading_area_ui(target_index, target_results_list, query_embedding_value, result_type) | |
# Update the active_view state to reflect entering context mode | |
# This state will be used by load_more and back buttons | |
updates_dict[active_view_state] = f"context_from_{result_type}" | |
# Merge the UI updates returned by move_to_reading_area_ui | |
updates_dict.update(read_updates) | |
except Exception as e: | |
logging.error(f"Error in move_to_reading wrapper: {e}", exc_info=True) | |
updates_dict[context_display] = gr.update(value=f"**Fehler:** Konnte Paragraph nicht in Lesebereich laden: {e}") | |
updates_dict[context_area] = gr.update(visible=True) | |
updates_dict[load_previous_button] = gr.update(interactive=False) | |
updates_dict[load_next_button] = gr.update(interactive=False) | |
updates_dict[back_to_results_button] = gr.update(visible=True, interactive=True) | |
updates_dict[active_view_state] = "error_context" # Indicate an error state | |
return updates_dict | |
# This wrapper function remains the same, it's bound to load_previous_button and load_next_button | |
def load_more_context_wrapper(direction, current_passages_state, query_embedding_value): | |
logging.info(f"Triggered: load_more_context_wrapper direction={direction}") | |
# This function's outputs are only context_display and displayed_context_passages state. | |
# It does NOT affect the overall UI layout or result list navigation buttons. | |
output_components = [context_display, displayed_context_passages] | |
try: | |
context_md, updated_passages_state = load_more_context(direction, current_passages_state, query_embedding_value) | |
# load_more_context returns a tuple (markdown_str, updated_state_list) | |
# Map these directly to the output components | |
updates_list = [ | |
gr.update(value=context_md), # update context_display | |
updated_passages_state # update displayed_context_passages state | |
] | |
logging.debug(f"load_more_context_wrapper: Returning {len(updates_list)} updates.") | |
return updates_list | |
except Exception as e: | |
logging.error(f"Error in load_more_context wrapper: {e}", exc_info=True) | |
# On error, return error message and original state | |
error_md = format_context_markdown(current_passages_state or [], query_embedding_value) + f"\n\n**Fehler beim Laden des nΓ€chsten/vorherigen Paragraphen.**" | |
updates_list = [ | |
gr.update(value=error_md), | |
current_passages_state # Return original state on error | |
] | |
return updates_list | |
# --- Define the combined list of all potential UI outputs --- | |
# This list is needed for functions that can trigger updates across multiple parts of the UI. | |
# We add the direct_embedding_output_holder state as well. | |
# fav_trigger_button is NOT in this list because it's strictly | |
# a hidden signaling component updated only by the fav logic binding's outputs. | |
# This list needs to be defined AFTER all components are defined in the Blocks context | |
all_ui_outputs = [ | |
# States | |
full_search_results_state, current_result_index_state, displayed_context_passages, | |
llm_results_state, llm_result_index_state, active_view_state, | |
direct_embedding_output_holder, | |
best_results_state, best_index_state, | |
fav_signal, | |
# Shared Result UI Containers | |
standard_nav_row, single_result_group, | |
# MODIFIED: New Result UI Components | |
result_accordion, result_metadata_display, result_text, | |
# Tuning Accordion | |
result_tuning_accordion, | |
# Buttons in shared row | |
previous_result_button, next_result_button, weiterlesen_button, | |
# Context Area UI | |
context_area, context_display, load_previous_button, load_next_button, | |
back_to_results_button, | |
# Tuning Sliders (Keep them in the list because wrappers might update their visibility/interactivity, | |
# but the reset function explicitly avoids changing their values) | |
window_size_slider, weight_slider, decay_slider, | |
# Status message | |
status_message, | |
] | |
logging.info(f"Length of all_ui_outputs list (used for comprehensive updates): {len(all_ui_outputs)}") | |
# --- Bindings: Connect UI elements to functions --- | |
# Bind search buttons to their wrapper functions. | |
# These wrappers will return a dictionary of updates for the *entire* UI state. | |
search_button.click( | |
search_standard_wrapper, | |
inputs=[query_input, author_dropdown, window_size_slider, weight_slider, decay_slider], | |
# We must list ALL potential outputs here, including states and UI elements that might change visibility or content. | |
# Gradio will use the dictionary returned by the wrapper to update the matching outputs in this list. | |
outputs=all_ui_outputs | |
) | |
llm_rerank_button.click( | |
search_llm_rerank_wrapper, | |
inputs=[query_input, author_dropdown, window_size_slider, weight_slider, decay_slider], | |
outputs=all_ui_outputs | |
) | |
# Bind the favourites button to its wrapper | |
best_of_button.click( | |
refresh_best_wrapper, | |
inputs=[], # No direct inputs, it fetches from the fav_scores state | |
outputs=all_ui_outputs # It updates results display, navigation, and state | |
) | |
# Bind navigation buttons to a single wrapper that handles different view states | |
# Inputs include all state variables needed to know the current view and data | |
nav_inputs = [ | |
current_result_index_state, full_search_results_state, # Standard state | |
llm_results_state, llm_result_index_state, # LLM state | |
best_results_state, best_index_state, # Favourites state | |
active_view_state # Current view indicator | |
] | |
# Outputs include all UI elements and states that might change during navigation | |
nav_outputs = all_ui_outputs # Navigation can affect the result display and state | |
previous_result_button.click( | |
lambda *args: navigate_results_wrapper("previous", *args), # Pass 'previous' as first arg | |
inputs=nav_inputs, | |
outputs=nav_outputs | |
) | |
next_result_button.click( | |
lambda *args: navigate_results_wrapper("next", *args), # Pass 'next' as first arg | |
inputs=nav_inputs, | |
outputs=nav_outputs | |
) | |
# Bind the "weiterlesen" button to a wrapper that handles different view states | |
# Inputs need state necessary to determine which result (standard, llm, fav) to load context for | |
# We also need fav_signal's current value to pass it through in the outputs. | |
read_inputs = [ | |
full_search_results_state, current_result_index_state, # Standard state | |
llm_results_state, llm_result_index_state, # LLM state | |
best_results_state, best_index_state, # Favourites state | |
active_view_state, # Current view indicator (e.g., 'standard', 'llm', 'favourites') | |
direct_embedding_output_holder, # Embedding for highlighting in context | |
fav_signal # <--- ADDED fav_signal here as an input | |
] | |
# Outputs include all UI elements and states that change when entering context view | |
# This is why all_ui_outputs is used here. | |
read_outputs = all_ui_outputs | |
weiterlesen_button.click( | |
move_to_reading_wrapper, | |
inputs=read_inputs, | |
outputs=read_outputs | |
) | |
# Bind context navigation buttons | |
# load_more_context_wrapper already returns updates as a list [context_display_update, state_update] | |
# These only update the context display and state, not the main results area. | |
load_previous_button.click( | |
load_more_context_wrapper, | |
inputs=[gr.State('previous'), displayed_context_passages, direct_embedding_output_holder], | |
outputs=[context_display, displayed_context_passages], # Only update context display and state | |
scroll_to_output=False | |
) | |
load_next_button.click( | |
load_more_context_wrapper, | |
inputs=[gr.State('next'), displayed_context_passages, direct_embedding_output_holder], | |
outputs=[context_display, displayed_context_passages], # Only update context display and state | |
scroll_to_output=False | |
) | |
# Bind the "ZurΓΌck" button to a wrapper that handles returning to results list | |
# Inputs need states relevant to restoring the correct results view. | |
# We also need fav_signal's current value to pass it through in the outputs. | |
back_inputs = [ | |
active_view_state, # Need to know which view we came from to go back correctly | |
full_search_results_state, current_result_index_state, # Standard state | |
llm_results_state, llm_result_index_state, # LLM state | |
best_results_state, best_index_state, # Favourites state | |
fav_signal # <--- ADDED fav_signal here as an input | |
] | |
# Outputs include all UI elements and states that change when returning to results view | |
back_outputs = all_ui_outputs | |
back_to_results_button.click( | |
go_back_to_results_wrapper, | |
inputs=back_inputs, | |
outputs=back_outputs | |
) | |
# --- Binding for favourite signaling --- | |
# This binding exposes the _on_fav function to the Gradio Client API via api_name="fav". | |
# The JS client will call the backend function associated with this api_name, | |
# providing a value for the component(s) in the 'inputs' list. | |
# _on_fav expects the value of fav_signal as its single argument. | |
# It returns updates for fav_signal (to clear it) and status_message. | |
fav_trigger_button.click( | |
_on_fav, | |
inputs=[fav_signal], # This tells Gradio that the API call for /fav expects ONE input, which should correspond to fav_signal's value. | |
outputs=[fav_signal, status_message], # These are the components _on_fav will update | |
api_name="fav" # <-- Exposes route /fav | |
) | |
# --- Launch the Application --- | |
if __name__ == "__main__": | |
print("\n" + "="*50) | |
print("--- Performing Startup Checks ---") | |
startup_warnings = [] | |
if collection is None: startup_warnings.append("--- ERROR: ChromaDB Collection could not be loaded/initialized.") | |
elif collection.count() == 0: startup_warnings.append("--- WARNUNG: ChromaDB Collection is empty. Search will yield no results.") | |
elif not unique_authors: startup_warnings.append("--- WARNUNG: No unique authors found in DB metadata (check 'author' key). Filter will be empty.") | |
if not API_KEY: startup_warnings.append("--- WARNUNG: GEMINI_API_KEY not found. Embedding/LLM features WILL FAIL.") | |
if API_KEY and llm_rerank_model is None: startup_warnings.append(f"--- WARNUNG: Gemini LLM Re-Rank Model ({LLM_RERANK_MODEL_NAME}) failed to initialize despite API key being present.") | |
if not os.path.exists(PROMPT_LOG_DIR) or not os.path.isdir(PROMPT_LOG_DIR): startup_warnings.append(f"--- WARNUNG: Prompt log directory '{PROMPT_LOG_DIR}' not found or is not a directory.") | |
if startup_warnings: | |
print("!!! Startup Issues Found !!!") | |
for w in startup_warnings: print(w) | |
else: | |
print("--- Configuration checks passed successfully. ---") | |
print("\n" + "--- Configuration Summary ---") | |
print(f"- Embedding Model: {EMBEDDING_MODEL}") | |
print(f"- LLM Re-Rank Model: {LLM_RERANK_MODEL_NAME}") | |
print(f"- Initial DB Fetch Size: {INITIAL_RESULTS_FOR_RERANK}") | |
print(f"- 1st Pass Re-rank Window: +/- {RERANK_WINDOW_SIZE} sentences") | |
print(f"- 1st Pass Re-rank Weight: {RERANK_WEIGHT:.2f}, Decay: {RERANK_DECAY:.2f}") | |
print(f"- LLM Candidate Count: {LLM_RERANK_CANDIDATE_COUNT}") | |
print(f"- LLM Target Result Count: {LLM_RERANK_TARGET_COUNT}") | |
print(f"- Max Results per Author (Final): {MAX_RESULTS_PER_AUTHOR}") | |
print(f"- Max Favourites Displayed: {MAX_FAVOURITES}") | |
print(f"- LLM Prompts logged to: '{PROMPT_LOG_DIR}'") | |
print(f"- Favourites saved to: '{FAV_FILE}'") # Log fav file location | |
print("--- End Summary ---") | |
print("\nStarting Gradio Interface...") | |
print("="*50 + "\n") | |
demo.launch( | |
server_name="0.0.0.0", | |
share=False, | |
debug=True # Keep debug=True for now to see all logs | |
) |