nappenstance's picture
Update app.py
8b4bfe1 verified
raw
history blame
137 kB
import os
import gradio as gr
import pandas as pd
from datetime import datetime
from pydantic import BaseModel, Field
from typing import List, Dict, Any, Optional
import numpy as np
from mistralai import Mistral
from openai import OpenAI
import re
import json
import logging
import time
import concurrent.futures
from concurrent.futures import ThreadPoolExecutor
import threading
import pymongo
from pymongo import MongoClient
from bson.objectid import ObjectId
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s [%(levelname)s] %(message)s',
handlers=[
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
class HallucinationJudgment(BaseModel):
hallucination_detected: bool = Field(description="Whether a hallucination is detected across the responses")
confidence_score: float = Field(description="Confidence score between 0-1 for the hallucination judgment")
conflicting_facts: List[Dict[str, Any]] = Field(description="List of conflicting facts found in the responses")
reasoning: str = Field(description="Detailed reasoning for the judgment")
summary: str = Field(description="A summary of the analysis")
class PAS2:
"""Paraphrase-based Approach for LLM Systems - Using llm-as-judge methods"""
def __init__(self, mistral_api_key=None, openai_api_key=None, xai_api_key=None, qwen_api_key=None, deepseek_api_key=None, gemini_api_key=None, progress_callback=None):
"""Initialize the PAS2 with API keys"""
# For Hugging Face Spaces, we prioritize getting API keys from HF_* environment variables
# which are set from the Secrets tab in the Space settings
self.mistral_api_key = mistral_api_key or os.environ.get("HF_MISTRAL_API_KEY") or os.environ.get("MISTRAL_API_KEY")
self.openai_api_key = openai_api_key or os.environ.get("HF_OPENAI_API_KEY") or os.environ.get("OPENAI_API_KEY")
self.xai_api_key = xai_api_key or os.environ.get("HF_XAI_API_KEY") or os.environ.get("XAI_API_KEY")
self.qwen_api_key = qwen_api_key or os.environ.get("HF_QWEN_API_KEY") or os.environ.get("QWEN_API_KEY")
self.deepseek_api_key = deepseek_api_key or os.environ.get("HF_DEEPSEEK_API_KEY") or os.environ.get("DEEPSEEK_API_KEY")
self.gemini_api_key = gemini_api_key or os.environ.get("HF_GEMINI_API_KEY") or os.environ.get("GEMINI_API_KEY")
self.progress_callback = progress_callback
if not self.mistral_api_key:
raise ValueError("Mistral API key is required. Set it via HF_MISTRAL_API_KEY in Hugging Face Spaces secrets or pass it as a parameter.")
if not self.openai_api_key:
raise ValueError("OpenAI API key is required. Set it via HF_OPENAI_API_KEY in Hugging Face Spaces secrets or pass it as a parameter.")
self.mistral_client = Mistral(api_key=self.mistral_api_key)
self.openai_client = OpenAI(api_key=self.openai_api_key)
self.xai_client = OpenAI(api_key=self.xai_api_key, base_url="https://api.x.ai/v1")
self.qwen_client = OpenAI(api_key=self.qwen_api_key, base_url="https://router.huggingface.co/nebius/v1")
self.deepseek_client = OpenAI(api_key=self.deepseek_api_key, base_url="https://api.deepseek.com")
self.gemini_client = OpenAI(api_key=self.gemini_api_key, base_url="https://generativelanguage.googleapis.com/v1beta/openai/")
# Define model names
self.mistral_model = "mistral-large-latest"
self.openai_o4mini = "o4-mini"
self.openai_4o = "gpt-4o"
self.deepseek_model = "deepseek-reasoner"
self.grok_model = "grok-3-beta"
self.qwen_model = "Qwen/Qwen3-235B-A22B"
self.gemini_model = "gemini-2.5-pro-preview-05-06"
# Create a dictionary mapping model names to their clients and model identifiers
self.model_configs = {
"mistral-large": {
"client": self.mistral_client,
"model_id": self.mistral_model,
"type": "mistral"
},
"o4-mini": {
"client": self.openai_client,
"model_id": self.openai_o4mini,
"type": "openai"
},
"gpt-4o": {
"client": self.openai_client,
"model_id": self.openai_4o,
"type": "openai"
},
"deepseek-reasoner": {
"client": self.deepseek_client,
"model_id": self.deepseek_model,
"type": "openai"
},
"grok-3": {
"client": self.xai_client,
"model_id": self.grok_model,
"type": "openai"
},
"qwen-235b": {
"client": self.qwen_client,
"model_id": self.qwen_model,
"type": "openai"
},
"gemini-2.5-pro": {
"client": self.gemini_client,
"model_id": self.gemini_model,
"type": "openai"
}
}
# Set default models (will be randomized later)
self.generator_model = "mistral-large"
self.judge_model = "o4-mini"
logger.info("PAS2 initialized with available models: %s", ", ".join(self.model_configs.keys()))
def generate_paraphrases(self, query: str, n_paraphrases: int = 3) -> List[str]:
"""Generate paraphrases of the input query using Mistral API"""
logger.info("Generating %d paraphrases for query: %s", n_paraphrases, query)
start_time = time.time()
messages = [
{
"role": "system",
"content": f"You are an expert at creating semantically equivalent paraphrases. Generate {n_paraphrases} different paraphrases of the given query that preserve the original meaning but vary in wording and structure. Return a JSON array of strings, each containing one paraphrase."
},
{
"role": "user",
"content": query
}
]
try:
logger.info("Sending paraphrase generation request to Mistral API...")
response = self.mistral_client.chat.complete(
model=self.mistral_model,
messages=messages,
response_format={"type": "json_object"}
)
content = response.choices[0].message.content
logger.debug("Received raw paraphrase response: %s", content)
paraphrases_data = json.loads(content)
# Handle different possible JSON structures
if isinstance(paraphrases_data, dict) and "paraphrases" in paraphrases_data:
paraphrases = paraphrases_data["paraphrases"]
elif isinstance(paraphrases_data, dict) and "results" in paraphrases_data:
paraphrases = paraphrases_data["results"]
elif isinstance(paraphrases_data, list):
paraphrases = paraphrases_data
else:
# Try to extract a list from any field
for key, value in paraphrases_data.items():
if isinstance(value, list) and len(value) > 0:
paraphrases = value
break
else:
logger.warning("Could not extract paraphrases from response: %s", content)
raise ValueError(f"Could not extract paraphrases from response: {content}")
# Ensure we have the right number of paraphrases
paraphrases = paraphrases[:n_paraphrases]
# Add the original query as the first item
all_queries = [query] + paraphrases
elapsed_time = time.time() - start_time
logger.info("Generated %d paraphrases in %.2f seconds", len(paraphrases), elapsed_time)
for i, p in enumerate(paraphrases, 1):
logger.info("Paraphrase %d: %s", i, p)
return all_queries
except Exception as e:
logger.error("Error generating paraphrases: %s", str(e), exc_info=True)
# Return original plus simple paraphrases as fallback
fallback_paraphrases = [
query,
f"Could you tell me about {query.strip('?')}?",
f"I'd like to know: {query}",
f"Please provide information on {query.strip('?')}."
][:n_paraphrases+1]
logger.info("Using fallback paraphrases due to error")
for i, p in enumerate(fallback_paraphrases[1:], 1):
logger.info("Fallback paraphrase %d: %s", i, p)
return fallback_paraphrases
def set_random_model_pair(self):
"""Randomly select a pair of generator and judge models"""
import random
# Get list of available models
available_models = list(self.model_configs.keys())
# Randomly select generator and judge models
self.generator_model = random.choice(available_models)
# Make sure judge is different from generator
judge_options = [m for m in available_models if m != self.generator_model]
self.judge_model = random.choice(judge_options)
logger.info("Randomly selected model pair - Generator: %s, Judge: %s",
self.generator_model, self.judge_model)
return self.generator_model, self.judge_model
def _get_single_response(self, query: str, index: int = None) -> str:
"""Get a single response from the selected generator model for a query"""
try:
query_description = f"Query {index}: {query}" if index is not None else f"Query: {query}"
logger.info("Getting response for %s using %s", query_description, self.generator_model)
start_time = time.time()
# Get the model configuration
model_config = self.model_configs[self.generator_model]
client = model_config["client"]
model_id = model_config["model_id"]
model_type = model_config["type"]
# Customize messages based on model
system_content = "You are a helpful AI assistant. Provide accurate, factual information in response to questions."
user_content = query
# Special handling for deepseek-reasoner
if model_id == "deepseek-reasoner":
user_content = f"Extract the following information and format it as JSON:\n\n{query}"
messages = [
{
"role": "system",
"content": system_content
},
{
"role": "user",
"content": user_content
}
]
# Use the appropriate client and model based on the type
if model_type == "mistral":
response = client.chat.complete(
model=model_id,
messages=messages
)
result = response.choices[0].message.content
else: # openai-compatible API
response = client.chat.completions.create(
model=model_id,
messages=messages
)
result = response.choices[0].message.content
elapsed_time = time.time() - start_time
logger.info("Received response from %s for %s (%.2f seconds)",
self.generator_model, query_description, elapsed_time)
logger.debug("Response content for %s: %s", query_description, result[:100] + "..." if len(result) > 100 else result)
return result
except Exception as e:
error_msg = f"Error getting response for query '{query}' with model {self.generator_model}: {e}"
logger.error(error_msg, exc_info=True)
return f"Error: Failed to get response for this query with model {self.generator_model}."
def get_responses(self, queries: List[str]) -> List[str]:
"""Get responses from Mistral API for each query in parallel"""
logger.info("Getting responses for %d queries in parallel", len(queries))
start_time = time.time()
# Use ThreadPoolExecutor for parallel API calls
with ThreadPoolExecutor(max_workers=min(len(queries), 5)) as executor:
# Submit tasks and map them to their original indices
future_to_index = {
executor.submit(self._get_single_response, query, i): i
for i, query in enumerate(queries)
}
# Prepare a list with the correct length
responses = [""] * len(queries)
# Counter for completed responses
completed_count = 0
# Collect results as they complete
for future in concurrent.futures.as_completed(future_to_index):
index = future_to_index[future]
try:
responses[index] = future.result()
# Update completion count and report progress
completed_count += 1
if self.progress_callback:
self.progress_callback("responses_progress",
completed_responses=completed_count,
total_responses=len(queries))
except Exception as e:
logger.error("Error processing response for index %d: %s", index, str(e))
responses[index] = f"Error: Failed to get response for query {index}."
# Still update completion count even for errors
completed_count += 1
if self.progress_callback:
self.progress_callback("responses_progress",
completed_responses=completed_count,
total_responses=len(queries))
elapsed_time = time.time() - start_time
logger.info("Received all %d responses in %.2f seconds total", len(responses), elapsed_time)
return responses
def detect_hallucination(self, query: str, n_paraphrases: int = 3) -> Dict:
"""
Detect hallucinations by comparing responses to paraphrased queries using a judge model
Returns:
Dict containing hallucination judgment and all responses
"""
logger.info("Starting hallucination detection for query: %s", query)
start_time = time.time()
# Randomly select a model pair for this detection
generator_model, judge_model = self.set_random_model_pair()
logger.info("Using %s as generator and %s as judge for this detection", generator_model, judge_model)
# Report progress
if self.progress_callback:
self.progress_callback("starting", query=query)
# Generate paraphrases
logger.info("Step 1: Generating paraphrases")
if self.progress_callback:
self.progress_callback("generating_paraphrases", query=query)
all_queries = self.generate_paraphrases(query, n_paraphrases)
if self.progress_callback:
self.progress_callback("paraphrases_complete", query=query, count=len(all_queries))
# Get responses to all queries
logger.info("Step 2: Getting responses to all %d queries using %s", len(all_queries), generator_model)
if self.progress_callback:
self.progress_callback("getting_responses", query=query, total=len(all_queries), model=generator_model)
all_responses = []
for i, q in enumerate(all_queries):
logger.info("Getting response %d/%d for query: %s", i+1, len(all_queries), q)
if self.progress_callback:
self.progress_callback("responses_progress", query=query, completed=i, total=len(all_queries))
response = self._get_single_response(q, index=i)
all_responses.append(response)
if self.progress_callback:
self.progress_callback("responses_complete", query=query)
# Judge the responses for hallucinations
logger.info("Step 3: Judging for hallucinations using %s", judge_model)
if self.progress_callback:
self.progress_callback("judging", query=query, model=judge_model)
# The first query is the original, rest are paraphrases
original_query = all_queries[0]
original_response = all_responses[0]
paraphrased_queries = all_queries[1:] if len(all_queries) > 1 else []
paraphrased_responses = all_responses[1:] if len(all_responses) > 1 else []
# Judge the responses
judgment = self.judge_hallucination(
original_query=original_query,
original_response=original_response,
paraphrased_queries=paraphrased_queries,
paraphrased_responses=paraphrased_responses
)
# Assemble the results
results = {
"original_query": original_query,
"original_response": original_response,
"paraphrased_queries": paraphrased_queries,
"paraphrased_responses": paraphrased_responses,
"hallucination_detected": judgment.hallucination_detected,
"confidence_score": judgment.confidence_score,
"conflicting_facts": judgment.conflicting_facts,
"reasoning": judgment.reasoning,
"summary": judgment.summary,
"generator_model": generator_model,
"judge_model": judge_model
}
# Report completion
if self.progress_callback:
self.progress_callback("complete", query=query, generator=generator_model, judge=judge_model)
logger.info("Hallucination detection completed in %.2f seconds using %s (generator) and %s (judge)",
time.time() - start_time, generator_model, judge_model)
return results
def judge_hallucination(self,
original_query: str,
original_response: str,
paraphrased_queries: List[str],
paraphrased_responses: List[str]) -> HallucinationJudgment:
"""
Use the selected judge model to detect hallucinations in the responses
"""
logger.info("Judging hallucinations with %s model", self.judge_model)
start_time = time.time()
# Get the model configuration for the judge
model_config = self.model_configs[self.judge_model]
client = model_config["client"]
model_id = model_config["model_id"]
model_type = model_config["type"]
# Prepare the context for the judge
context = f"""
Original Question: {original_query}
Original Response:
{original_response}
Paraphrased Questions and their Responses:
"""
for i, (query, response) in enumerate(zip(paraphrased_queries, paraphrased_responses), 1):
context += f"\nParaphrased Question {i}: {query}\n\nResponse {i}:\n{response}\n"
system_prompt = """
You are a judge evaluating whether an AI is hallucinating across different responses to semantically equivalent questions.
Analyze all responses carefully to identify any factual inconsistencies or contradictions.
Focus on factual discrepancies, not stylistic differences.
A hallucination is when the AI states different facts in response to questions that are asking for the same information.
Your response should be a JSON with the following fields:
- hallucination_detected: boolean indicating whether hallucinations were found
- confidence_score: number between 0 and 1 representing your confidence in the judgment
- conflicting_facts: an array of objects describing any conflicting information found
- reasoning: detailed explanation for your judgment
- summary: a concise summary of your analysis
"""
try:
logger.info("Sending judgment request to %s...", self.judge_model)
# Customize the system prompt for deepseek-reasoner
customized_system_prompt = system_prompt
user_content = f"Evaluate these responses for hallucinations:\n\n{context}"
# Additional prompt engineering for deepseek-reasoner
if model_id == "deepseek-reasoner":
user_content = f"""Extract the following information and format it as JSON:
Evaluate these responses for hallucinations:\n\n{context}\n\n
- hallucination_detected: boolean indicating whether hallucinations were found
- confidence_score: number between 0 and 1 representing your confidence in the judgment
- conflicting_facts: an array of objects describing any conflicting information found
- reasoning: detailed explanation for your judgment
- summary: a concise summary of your analysis
Respond ONLY with valid JSON and no other text.
"""
# Use the appropriate client and model based on the type
if model_type == "mistral":
response = client.chat.complete(
model=model_id,
messages=[
{"role": "system", "content": customized_system_prompt},
{"role": "user", "content": user_content}
],
response_format={"type": "json_object"}
)
content = response.choices[0].message.content
# Normal JSON parsing for mistral
result_json = json.loads(content)
elif model_id == "deepseek-reasoner":
response = client.chat.completions.create(
model=model_id,
messages=[
{"role": "system", "content": customized_system_prompt},
{"role": "user", "content": user_content}
],
)
content = response.choices[0].message.content
result_json = json.loads(content)
else: # openai-compatible API
response = client.chat.completions.create(
model=model_id,
messages=[
{"role": "system", "content": customized_system_prompt},
{"role": "user", "content": user_content}
],
response_format={"type": "json_object"}
)
content = response.choices[0].message.content
result_json = json.loads(content)
logger.debug("Received judgment response from %s: %s", self.judge_model, result_json)
# Create the HallucinationJudgment object from the JSON response
judgment = HallucinationJudgment(
hallucination_detected=result_json.get("hallucination_detected", False),
confidence_score=result_json.get("confidence_score", 0.0),
conflicting_facts=result_json.get("conflicting_facts", []),
reasoning=result_json.get("reasoning", "No reasoning provided."),
summary=result_json.get("summary", "No summary provided.")
)
elapsed_time = time.time() - start_time
logger.info("Judgment completed by %s in %.2f seconds", self.judge_model, elapsed_time)
return judgment
except Exception as e:
logger.error("Error in hallucination judgment with %s: %s", self.judge_model, str(e), exc_info=True)
# Return a fallback judgment
return HallucinationJudgment(
hallucination_detected=False,
confidence_score=0.0,
conflicting_facts=[],
reasoning=f"Failed to obtain judgment from the {self.judge_model} model: {str(e)}",
summary="Analysis failed due to API error."
)
class HallucinationDetectorApp:
def __init__(self):
self.pas2 = None
logger.info("Initializing HallucinationDetectorApp")
self._initialize_database()
self.progress_callback = None
def _initialize_database(self):
"""Initialize MongoDB connection for persistent feedback storage"""
try:
# Get MongoDB connection string from environment variable
mongo_uri = os.environ.get("MONGODB_URI")
if not mongo_uri:
logger.warning("MONGODB_URI not found in environment variables. Please set it in HuggingFace Spaces secrets.")
logger.warning("Using a placeholder URI for now - connection will fail until proper URI is provided.")
# Use a placeholder - this will fail but allows the app to initialize
mongo_uri = "mongodb+srv://username:[email protected]/?retryWrites=true&w=majority"
# Connect to MongoDB
self.mongo_client = MongoClient(mongo_uri)
# Access or create database
self.db = self.mongo_client["hallucination_detector"]
# Access or create collection
self.feedback_collection = self.db["feedback"]
# Create index on timestamp for faster querying
self.feedback_collection.create_index("timestamp")
# Test connection
self.mongo_client.admin.command('ping')
logger.info("MongoDB connection successful")
except Exception as e:
logger.error(f"Error initializing MongoDB: {str(e)}", exc_info=True)
logger.warning("Proceeding without database connection. Data will not be saved persistently.")
self.mongo_client = None
self.db = None
self.feedback_collection = None
def set_progress_callback(self, callback):
"""Set the progress callback function"""
self.progress_callback = callback
def initialize_api(self, mistral_api_key, openai_api_key):
"""Initialize the PAS2 with API keys"""
try:
logger.info("Initializing PAS2 with API keys")
self.pas2 = PAS2(
mistral_api_key=mistral_api_key,
openai_api_key=openai_api_key,
progress_callback=self.progress_callback
)
logger.info("API initialization successful")
return "API keys set successfully! You can now use the application."
except Exception as e:
logger.error("Error initializing API: %s", str(e), exc_info=True)
return f"Error initializing API: {str(e)}"
def process_query(self, query: str):
"""Process the query using PAS2"""
if not self.pas2:
logger.error("PAS2 not initialized")
return {
"error": "Please set API keys first before processing queries."
}
if not query.strip():
logger.warning("Empty query provided")
return {
"error": "Please enter a query."
}
try:
# Set the progress callback if needed
if self.progress_callback and self.pas2.progress_callback != self.progress_callback:
self.pas2.progress_callback = self.progress_callback
# Process the query
logger.info("Processing query with PAS2: %s", query)
results = self.pas2.detect_hallucination(query)
logger.info("Query processing completed successfully")
return results
except Exception as e:
logger.error("Error processing query: %s", str(e), exc_info=True)
return {
"error": f"Error processing query: {str(e)}"
}
def save_feedback(self, results, feedback):
"""Save results and user feedback to MongoDB"""
try:
logger.info("Saving user feedback: %s", feedback)
if self.feedback_collection is None:
logger.error("MongoDB connection not available. Cannot save feedback.")
return "Database connection not available. Feedback not saved."
# Prepare document for MongoDB
document = {
"timestamp": datetime.now(),
"original_query": results.get('original_query', ''),
"original_response": results.get('original_response', ''),
"paraphrased_queries": results.get('paraphrased_queries', []),
"paraphrased_responses": results.get('paraphrased_responses', []),
"hallucination_detected": results.get('hallucination_detected', False),
"confidence_score": results.get('confidence_score', 0.0),
"conflicting_facts": results.get('conflicting_facts', []),
"reasoning": results.get('reasoning', ''),
"summary": results.get('summary', ''),
"generator_model": results.get('generator_model', 'unknown'),
"judge_model": results.get('judge_model', 'unknown'),
"user_feedback": feedback
}
# Insert document into collection
result = self.feedback_collection.insert_one(document)
# Update model leaderboard scores
self._update_model_scores(
generator=results.get('generator_model', 'unknown'),
judge=results.get('judge_model', 'unknown'),
feedback=feedback,
hallucination_detected=results.get('hallucination_detected', False)
)
logger.info("Feedback saved successfully to MongoDB")
return "Feedback saved successfully!"
except Exception as e:
logger.error("Error saving feedback: %s", str(e), exc_info=True)
return f"Error saving feedback: {str(e)}"
def _update_model_scores(self, generator, judge, feedback, hallucination_detected):
"""Update the ELO scores for the generator and judge models based on feedback"""
try:
if self.db is None:
logger.error("MongoDB connection not available. Cannot update model scores.")
return
# Access or create the models collection
models_collection = self.db.get_collection("model_scores")
# Create indexes if they don't exist
models_collection.create_index("model_name", unique=True)
# Parse the feedback to determine scenario
actual_hallucination = "Yes, there was a hallucination" in feedback
no_hallucination = "No, there was no hallucination" in feedback
judge_correct = "Yes, the judge was correct" in feedback
judge_incorrect = "No, the judge was incorrect" in feedback
# Determine scores based on different scenarios:
# 1. Actual hallucination + Judge correct = positive for judge, negative for generator
# 2. No hallucination + Judge correct = positive for both
# 3. No hallucination + Judge incorrect = negative for judge, positive for generator
# 4. Actual hallucination + Judge incorrect = negative for both
if judge_correct:
if actual_hallucination:
# Scenario 1: Judge correctly detected hallucination
judge_score = 1 # Positive for judge
generator_score = 0 # Negative for generator (hallucinated)
logger.info("Judge %s correctly detected hallucination from generator %s", judge, generator)
elif no_hallucination:
# Scenario 2: Judge correctly determined no hallucination
judge_score = 1 # Positive for judge
generator_score = 1 # Positive for generator (didn't hallucinate)
logger.info("Judge %s correctly determined no hallucination from generator %s", judge, generator)
else:
# User unsure about hallucination, but confirmed judge was correct
judge_score = 1 # Positive for judge
generator_score = 0.5 # Neutral for generator (unclear)
logger.info("User confirmed judge %s was correct, but unclear about hallucination from %s", judge, generator)
elif judge_incorrect:
if no_hallucination:
# Scenario 3: Judge incorrectly claimed hallucination (false positive)
judge_score = 0 # Negative for judge
generator_score = 1 # Positive for generator (unfairly accused)
logger.info("Judge %s incorrectly claimed hallucination from generator %s", judge, generator)
elif actual_hallucination:
# Scenario 4: Judge missed actual hallucination (false negative)
judge_score = 0 # Negative for judge
generator_score = 0 # Negative for generator (hallucination went undetected)
logger.info("Judge %s missed actual hallucination from generator %s", judge, generator)
else:
# User unsure about hallucination, but confirmed judge was incorrect
judge_score = 0 # Negative for judge
generator_score = 0.5 # Neutral for generator (unclear)
logger.info("User confirmed judge %s was incorrect, but unclear about hallucination from %s", judge, generator)
else:
# User unsure about judge correctness, don't update scores
judge_score = 0.5 # Neutral for judge (unclear)
generator_score = 0.5 # Neutral for generator (unclear)
logger.info("User unsure about judge %s correctness and generator %s hallucination", judge, generator)
# Update generator model stats with specific score
self._update_model_stats(models_collection, generator, generator_score, "generator")
# Update judge model stats with specific score
self._update_model_stats(models_collection, judge, judge_score, "judge")
# Determine if the detection was correct based on judge correctness
detection_correct = judge_correct
# Determine if there was actually hallucination based on user feedback
actual_hallucination_present = actual_hallucination
# Update model pair stats
self._update_model_pair_stats(generator, judge, detection_correct, actual_hallucination_present,
generator_score, judge_score)
logger.info("Updated model scores based on feedback: generator(%s)=%s, judge(%s)=%s",
generator, generator_score, judge, judge_score)
except Exception as e:
logger.error("Error updating model scores: %s", str(e), exc_info=True)
def _update_model_stats(self, collection, model_name, score, role):
"""Update statistics for a single model"""
# Simplified ELO calculation
K_FACTOR = 32 # Standard K-factor for ELO
# Get current model data or create if not exists
model_data = collection.find_one({"model_name": model_name})
if model_data is None:
# Initialize new model with default values
model_data = {
"model_name": model_name,
"elo_score": 1500, # Starting ELO
"total_samples": 0,
"correct_predictions": 0,
"accuracy": 0.0,
"as_generator": 0,
"as_judge": 0,
"as_generator_correct": 0,
"as_judge_correct": 0,
"neutral_samples": 0 # Add a counter for neutral samples
}
# Skip counting for neutral feedback (0.5)
if score == 0.5:
# Increment neutral samples counter instead
if "neutral_samples" not in model_data:
model_data["neutral_samples"] = 0
model_data["neutral_samples"] += 1
# Expected score based on current rating (vs average rating)
expected_score = 1 / (1 + 10**((1500 - model_data["elo_score"]) / 400))
# For neutral score, use a much smaller K factor to slightly adjust the ELO
# This handles the "unsure" case with minimal impact
model_data["elo_score"] = model_data["elo_score"] + (K_FACTOR/4) * (0.5 - expected_score)
# Update or insert the model data
collection.replace_one(
{"model_name": model_name},
model_data,
upsert=True
)
return
# Update sample counts for non-neutral cases
model_data["total_samples"] += 1
if role == "generator":
model_data["as_generator"] += 1
if score == 1: # Only count as correct if score is 1 (not 0)
model_data["as_generator_correct"] += 1
else: # role == "judge"
model_data["as_judge"] += 1
if score == 1: # Only count as correct if score is 1 (not 0)
model_data["as_judge_correct"] += 1
# Update correct predictions based on score
if score == 1:
model_data["correct_predictions"] += 1
# Calculate new accuracy
model_data["accuracy"] = model_data["correct_predictions"] / model_data["total_samples"]
# Update ELO score based on the specific score value (0 or 1)
# Expected score based on current rating (vs average rating)
expected_score = 1 / (1 + 10**((1500 - model_data["elo_score"]) / 400))
# Use the provided score (0 or 1)
actual_score = score
# New ELO calculation
model_data["elo_score"] = model_data["elo_score"] + K_FACTOR * (actual_score - expected_score)
# Update or insert the model data
collection.replace_one(
{"model_name": model_name},
model_data,
upsert=True
)
def _update_model_pair_stats(self, generator, judge, detection_correct, hallucination_detected,
generator_score, judge_score):
"""Update statistics for a model pair combination"""
try:
# Access or create the model pairs collection
pairs_collection = self.db.get_collection("model_pairs")
# Create compound index if it doesn't exist
pairs_collection.create_index([("generator", 1), ("judge", 1)], unique=True)
# Get current pair data or create if not exists
pair_data = pairs_collection.find_one({
"generator": generator,
"judge": judge
})
if pair_data is None:
# Initialize new pair with default values
pair_data = {
"generator": generator,
"judge": judge,
"elo_score": 1500, # Starting ELO
"total_samples": 0,
"correct_predictions": 0,
"accuracy": 0.0,
"hallucinations_detected": 0,
"generator_performance": 0.0,
"judge_performance": 0.0,
"consistency_score": 0.0
}
# Update sample counts
pair_data["total_samples"] += 1
if detection_correct:
pair_data["correct_predictions"] += 1
if hallucination_detected:
pair_data["hallucinations_detected"] += 1
# Track model-specific performances within the pair
if "generator_correct_count" not in pair_data:
pair_data["generator_correct_count"] = 0
if "judge_correct_count" not in pair_data:
pair_data["judge_correct_count"] = 0
# Update individual performance counters based on scores
if generator_score == 1:
pair_data["generator_correct_count"] += 1
if judge_score == 1:
pair_data["judge_correct_count"] += 1
# Calculate individual performance rates within the pair
pair_data["generator_performance"] = pair_data["generator_correct_count"] / pair_data["total_samples"]
pair_data["judge_performance"] = pair_data["judge_correct_count"] / pair_data["total_samples"]
# Calculate new accuracy for the pair (detection accuracy)
pair_data["accuracy"] = pair_data["correct_predictions"] / pair_data["total_samples"]
# Calculate consistency score - weighted average of individual performances
# Gives more weight to the generator when hallucinations are detected
if hallucination_detected:
# When hallucination is detected, judge's role is more critical
pair_data["consistency_score"] = (0.4 * pair_data["generator_performance"] +
0.6 * pair_data["judge_performance"])
else:
# When no hallucination is detected, both roles are equally important
pair_data["consistency_score"] = (0.5 * pair_data["generator_performance"] +
0.5 * pair_data["judge_performance"])
# Update ELO score (simplified version)
K_FACTOR = 24 # Slightly lower K-factor for pairs
# Expected score based on current rating
expected_score = 1 / (1 + 10**((1500 - pair_data["elo_score"]) / 400))
# Actual score - use the average of both model scores (0-1 range)
# This represents the pair's overall performance
actual_score = (generator_score + judge_score) / 2
# New ELO calculation
pair_data["elo_score"] = pair_data["elo_score"] + K_FACTOR * (actual_score - expected_score)
# Update or insert the pair data
pairs_collection.replace_one(
{"generator": generator, "judge": judge},
pair_data,
upsert=True
)
logger.info("Updated model pair stats for %s (generator) and %s (judge)", generator, judge)
except Exception as e:
logger.error("Error updating model pair stats: %s", str(e), exc_info=True)
return None
def get_feedback_stats(self):
"""Get statistics about collected feedback from MongoDB"""
try:
if self.feedback_collection is None:
logger.error("MongoDB connection not available. Cannot get feedback stats.")
return None
# Get total feedback count
total_count = self.feedback_collection.count_documents({})
# Get accuracy stats based on user feedback
correct_predictions = 0
# Fetch all feedback documents
feedback_docs = list(self.feedback_collection.find({}, {"user_feedback": 1}))
# Count correct predictions based on user feedback
for doc in feedback_docs:
if "user_feedback" in doc:
# If feedback starts with "Yes", it's a correct prediction
if doc["user_feedback"].startswith("Yes"):
correct_predictions += 1
# Calculate accuracy percentage
accuracy = correct_predictions / max(total_count, 1)
return {
"total_feedback": total_count,
"correct_predictions": correct_predictions,
"accuracy": accuracy
}
except Exception as e:
logger.error("Error getting feedback stats: %s", str(e), exc_info=True)
return None
def get_model_leaderboard(self):
"""Get the current model leaderboard data"""
try:
if self.db is None:
logger.error("MongoDB connection not available. Cannot get model leaderboard.")
return None
# Access models collection
models_collection = self.db.get_collection("model_scores")
# Get all models and sort by ELO score
models = list(models_collection.find().sort("elo_score", pymongo.DESCENDING))
# Format percentages and convert ObjectId
for model in models:
model["_id"] = str(model["_id"])
model["accuracy"] = round(model["accuracy"] * 100, 1)
if "as_generator" in model and model["as_generator"] > 0:
model["generator_accuracy"] = round((model["as_generator_correct"] / model["as_generator"]) * 100, 1)
else:
model["generator_accuracy"] = 0.0
if "as_judge" in model and model["as_judge"] > 0:
model["judge_accuracy"] = round((model["as_judge_correct"] / model["as_judge"]) * 100, 1)
else:
model["judge_accuracy"] = 0.0
return models
except Exception as e:
logger.error("Error getting model leaderboard: %s", str(e), exc_info=True)
return []
def get_pair_leaderboard(self):
"""Get the current model pair leaderboard data"""
try:
if self.db is None:
logger.error("MongoDB connection not available. Cannot get pair leaderboard.")
return None
# Access model pairs collection
pairs_collection = self.db.get_collection("model_pairs")
# Get all pairs and sort by ELO score
pairs = list(pairs_collection.find().sort("elo_score", pymongo.DESCENDING))
# Format percentages and convert ObjectId
for pair in pairs:
pair["_id"] = str(pair["_id"])
pair["accuracy"] = round(pair["accuracy"] * 100, 1)
pair["consistency_score"] = round(pair["consistency_score"] * 100, 1)
return pairs
except Exception as e:
logger.error("Error getting pair leaderboard: %s", str(e), exc_info=True)
return []
def export_data_to_csv(self, filepath=None):
"""Export all feedback data to a CSV file for analysis"""
try:
if self.feedback_collection is None:
logger.error("MongoDB connection not available. Cannot export data.")
return "Database connection not available. Cannot export data."
# Query all feedback data
cursor = self.feedback_collection.find({})
# Convert cursor to list of dictionaries
records = list(cursor)
# Convert MongoDB documents to pandas DataFrame
# Handle nested arrays and complex objects
for record in records:
# Convert ObjectId to string
record['_id'] = str(record['_id'])
# Convert datetime objects to string
if 'timestamp' in record:
record['timestamp'] = record['timestamp'].strftime("%Y-%m-%d %H:%M:%S")
# Convert lists to strings for CSV storage
if 'paraphrased_queries' in record:
record['paraphrased_queries'] = json.dumps(record['paraphrased_queries'])
if 'paraphrased_responses' in record:
record['paraphrased_responses'] = json.dumps(record['paraphrased_responses'])
if 'conflicting_facts' in record:
record['conflicting_facts'] = json.dumps(record['conflicting_facts'])
# Create DataFrame
df = pd.DataFrame(records)
# Define default filepath if not provided
if not filepath:
filepath = os.path.join(os.path.dirname(os.path.abspath(__file__)),
f"hallucination_data_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv")
# Export to CSV
df.to_csv(filepath, index=False)
logger.info(f"Data successfully exported to {filepath}")
return filepath
except Exception as e:
logger.error(f"Error exporting data: {str(e)}", exc_info=True)
return f"Error exporting data: {str(e)}"
def get_recent_queries(self, limit=10):
"""Get most recent queries for display in the UI"""
try:
if self.feedback_collection is None:
logger.error("MongoDB connection not available. Cannot get recent queries.")
return []
# Get most recent queries
cursor = self.feedback_collection.find(
{},
{"original_query": 1, "hallucination_detected": 1, "timestamp": 1}
).sort("timestamp", pymongo.DESCENDING).limit(limit)
# Convert to list of dictionaries
recent_queries = []
for doc in cursor:
recent_queries.append({
"id": str(doc["_id"]),
"query": doc["original_query"],
"hallucination_detected": doc.get("hallucination_detected", False),
"timestamp": doc["timestamp"].strftime("%Y-%m-%d %H:%M:%S") if isinstance(doc["timestamp"], datetime) else doc["timestamp"]
})
return recent_queries
except Exception as e:
logger.error(f"Error getting recent queries: {str(e)}", exc_info=True)
return []
def get_query_details(self, query_id):
"""Get full details for a specific query by ID"""
try:
if self.feedback_collection is None:
logger.error("MongoDB connection not available. Cannot get query details.")
return None
# Convert string ID to ObjectId
obj_id = ObjectId(query_id)
# Find the query by ID
doc = self.feedback_collection.find_one({"_id": obj_id})
if doc is None:
logger.warning(f"No query found with ID {query_id}")
return None
# Convert ObjectId to string for JSON serialization
doc["_id"] = str(doc["_id"])
# Convert timestamp to string
if "timestamp" in doc and isinstance(doc["timestamp"], datetime):
doc["timestamp"] = doc["timestamp"].strftime("%Y-%m-%d %H:%M:%S")
return doc
except Exception as e:
logger.error(f"Error getting query details: {str(e)}", exc_info=True)
return None
# Progress tracking for UI updates
class ProgressTracker:
"""Tracks progress of hallucination detection for UI updates"""
STAGES = {
"idle": {"status": "Ready", "progress": 0, "color": "#757575"},
"starting": {"status": "Starting process...", "progress": 5, "color": "#2196F3"},
"generating_paraphrases": {"status": "Generating paraphrases...", "progress": 15, "color": "#2196F3"},
"paraphrases_complete": {"status": "Paraphrases generated", "progress": 30, "color": "#2196F3"},
"getting_responses": {"status": "Getting responses using {model}...", "progress": 35, "color": "#2196F3"},
"responses_progress": {"status": "Getting responses ({completed}/{total})...", "progress": 40, "color": "#2196F3"},
"responses_complete": {"status": "All responses received", "progress": 65, "color": "#2196F3"},
"judging": {"status": "Analyzing responses for hallucinations using {model}...", "progress": 70, "color": "#2196F3"},
"complete": {"status": "Analysis complete! Using {generator} (generator) and {judge} (judge)", "progress": 100, "color": "#4CAF50"},
"error": {"status": "Error: {error_message}", "progress": 100, "color": "#F44336"}
}
def __init__(self):
self.stage = "idle"
self.stage_data = self.STAGES[self.stage].copy()
self.query = ""
self.completed_responses = 0
self.total_responses = 0
self.error_message = ""
self.generator_model = ""
self.judge_model = ""
self.model = "" # For general model reference in status messages
self._lock = threading.Lock()
self._status_callback = None
self._stop_event = threading.Event()
self._update_thread = None
def register_callback(self, callback_fn):
"""Register callback function to update UI"""
self._status_callback = callback_fn
def update_stage(self, stage, **kwargs):
"""Update the current stage and trigger callback"""
with self._lock:
if stage in self.STAGES:
self.stage = stage
self.stage_data = self.STAGES[stage].copy()
# Update with any additional parameters
for key, value in kwargs.items():
if key == 'query':
self.query = value
elif key == 'completed_responses':
self.completed_responses = value
elif key == 'total_responses':
self.total_responses = value
elif key == 'error_message':
self.error_message = value
elif key == 'model':
self.model = value
elif key == 'generator':
self.generator_model = value
elif key == 'judge':
self.judge_model = value
# Format status message
if stage == 'responses_progress':
self.stage_data['status'] = self.stage_data['status'].format(
completed=self.completed_responses,
total=self.total_responses
)
elif stage == 'getting_responses' and 'model' in kwargs:
self.stage_data['status'] = self.stage_data['status'].format(
model=kwargs.get('model', 'selected model')
)
elif stage == 'judging' and 'model' in kwargs:
self.stage_data['status'] = self.stage_data['status'].format(
model=kwargs.get('model', 'selected model')
)
elif stage == 'complete' and 'generator' in kwargs and 'judge' in kwargs:
self.stage_data['status'] = self.stage_data['status'].format(
generator=self.generator_model,
judge=self.judge_model
)
elif stage == 'error':
self.stage_data['status'] = self.stage_data['status'].format(
error_message=self.error_message
)
if self._status_callback:
self._status_callback(self.get_html_status())
def get_html_status(self):
"""Get HTML representation of current status"""
progress_width = f"{self.stage_data['progress']}%"
status_text = self.stage_data['status']
color = self.stage_data['color']
query_info = f'<div class="query-display">{self.query}</div>' if self.query else ''
# Only show status text if not in idle state
status_display = f'<div class="progress-status" style="color: {color};">{status_text}</div>' if self.stage != "idle" else ''
# Add model information if available and we're not in idle or error state
model_info = ''
if self.stage not in ["idle", "error", "starting"] and (self.generator_model or self.judge_model):
model_info = f'<div class="progress-model-info">'
if self.generator_model:
model_info += f'<div><span style="font-weight: bold;">Generator:</span> {self.generator_model}</div>'
if self.judge_model:
model_info += f'<div><span style="font-weight: bold;">Judge:</span> {self.judge_model}</div>'
model_info += '</div>'
html = f"""
<div class="progress-container">
{query_info}
{status_display}
<div class="progress-bar-container">
<div class="progress-bar" style="width: {progress_width}; background-color: {color};"></div>
</div>
{model_info}
</div>
"""
return html
def start_pulsing(self):
"""Start a pulsing animation for the progress bar during long operations"""
if self._update_thread and self._update_thread.is_alive():
return
self._stop_event.clear()
self._update_thread = threading.Thread(target=self._pulse_progress)
self._update_thread.daemon = True
self._update_thread.start()
def stop_pulsing(self):
"""Stop the pulsing animation"""
self._stop_event.set()
if self._update_thread:
self._update_thread.join(0.5)
def _pulse_progress(self):
"""Animate the progress bar to show activity"""
pulse_stages = ["⋯", "⋯⋯", "⋯⋯⋯", "⋯⋯", "⋯"]
i = 0
while not self._stop_event.is_set():
with self._lock:
if self.stage not in ["idle", "complete", "error"]:
status_base = self.stage_data['status'].split("...")[0] if "..." in self.stage_data['status'] else self.stage_data['status']
self.stage_data['status'] = f"{status_base}... {pulse_stages[i]}"
if self._status_callback:
self._status_callback(self.get_html_status())
i = (i + 1) % len(pulse_stages)
time.sleep(0.3)
def create_interface():
"""Create Gradio interface"""
detector = HallucinationDetectorApp()
# Initialize Progress Tracker
progress_tracker = ProgressTracker()
# Initialize APIs from environment variables automatically
try:
detector.initialize_api(
mistral_api_key=os.environ.get("HF_MISTRAL_API_KEY"),
openai_api_key=os.environ.get("HF_OPENAI_API_KEY")
)
except Exception as e:
print(f"Warning: Failed to initialize APIs from environment variables: {e}")
print("Please make sure HF_MISTRAL_API_KEY and HF_OPENAI_API_KEY are set in your environment")
# CSS for styling
css = """
/* Base styles */
.container {
max-width: 1000px;
margin: 0 auto;
}
/* Light theme default styles */
.title {
text-align: center;
margin-bottom: 0.5em;
font-weight: 600;
color: #0d47a1;
}
.subtitle {
text-align: center;
margin-bottom: 1.5em;
font-size: 1.2em;
color: #37474f;
}
.section-title {
margin-top: 1em;
margin-bottom: 0.5em;
font-weight: bold;
color: #1565c0;
}
.info-box {
padding: 1.2em;
border-radius: 8px;
margin-bottom: 1em;
box-shadow: 0 2px 5px rgba(0,0,0,0.1);
line-height: 1.5;
border: 1px solid #dee2e6;
border-left: 3px solid #6c757d;
background-color: #f8f9fa;
color: #212529;
}
.info-box p strong {
color: #495057;
font-weight: 600;
}
.hallucination-positive {
padding: 1.2em;
border-radius: 8px;
background-color: #ffeaea;
border-left: 5px solid #e53e3e;
margin-bottom: 1em;
box-shadow: 0 2px 5px rgba(0,0,0,0.05);
color: #742a2a;
}
.hallucination-positive h3 {
color: #e53e3e;
margin-top: 0;
margin-bottom: 0.5em;
}
.hallucination-positive p {
color: #742a2a;
line-height: 1.5;
}
.hallucination-negative {
padding: 1.2em;
border-radius: 8px;
background-color: #f0fff4;
border-left: 5px solid #38a169;
margin-bottom: 1em;
box-shadow: 0 2px 5px rgba(0,0,0,0.05);
color: #22543d;
}
.hallucination-negative h3 {
color: #38a169;
margin-top: 0;
margin-bottom: 0.5em;
}
.hallucination-negative p {
color: #22543d;
line-height: 1.5;
}
.response-box {
padding: 1.2em;
border-radius: 8px;
background-color: #f7fafc;
margin-bottom: 0.8em;
box-shadow: 0 2px 5px rgba(0,0,0,0.05);
color: #2d3748;
line-height: 1.5;
border-left: 3px solid #a0aec0;
}
.example-queries {
display: flex;
flex-wrap: wrap;
gap: 8px;
margin-bottom: 15px;
}
.example-query {
background-color: #ebf8ff;
padding: 8px 15px;
border-radius: 18px;
font-size: 0.9em;
cursor: pointer;
transition: all 0.2s;
border: 1px solid #bee3f8;
color: #2c5282;
}
.example-query:hover {
background-color: #bee3f8;
box-shadow: 0 2px 5px rgba(0,0,0,0.1);
}
.stats-section {
display: flex;
justify-content: space-between;
background-color: #ebf8ff;
padding: 15px;
border-radius: 10px;
margin-bottom: 20px;
margin-top: 5px;
box-shadow: 0 2px 10px rgba(0,0,0,0.1);
border: 1px solid #bee3f8;
}
.stat-item {
text-align: center;
padding: 10px;
}
.stat-value {
font-size: 2em;
font-weight: bold;
color: #2c5282;
}
.stat-label {
font-size: 0.9em;
font-weight: bold;
color: #3182ce;
}
.feedback-section {
border-top: 1px solid #e2e8f0;
padding-top: 15px;
margin-top: 20px;
}
footer {
text-align: center;
padding: 20px;
margin-top: 30px;
color: #718096;
font-size: 0.9em;
}
.processing-status {
padding: 12px;
background-color: #ebf8ff;
border-left: 4px solid #3182ce;
margin-bottom: 15px;
font-weight: 500;
color: #2c5282;
}
.debug-panel {
background-color: #f7fafc;
border: 1px solid #e2e8f0;
border-radius: 4px;
padding: 10px;
margin-top: 15px;
font-family: monospace;
font-size: 0.9em;
white-space: pre-wrap;
max-height: 200px;
overflow-y: auto;
color: #4a5568;
}
.progress-container {
padding: 15px;
background-color: #ffffff;
border-radius: 8px;
box-shadow: 0 2px 5px rgba(0,0,0,0.05);
margin-bottom: 15px;
border: 1px solid #e2e8f0;
}
.progress-status {
font-weight: 500;
margin-bottom: 8px;
padding: 4px 0;
font-size: 0.95em;
}
.progress-bar-container {
background-color: #edf2f7;
height: 10px;
border-radius: 5px;
overflow: hidden;
margin-bottom: 10px;
box-shadow: inset 0 1px 3px rgba(0,0,0,0.1);
}
.progress-bar {
height: 100%;
transition: width 0.5s ease;
background-image: linear-gradient(to right, #3182ce, #2b6cb0);
}
.query-display {
font-style: italic;
color: #718096;
margin-bottom: 10px;
background-color: #f7fafc;
padding: 8px;
border-radius: 4px;
border-left: 3px solid #3182ce;
}
/* Dark theme styles */
@media (prefers-color-scheme: dark) {
.title {
color: #63b3ed;
}
.subtitle {
color: #a0aec0;
}
.section-title {
color: #90cdf4;
}
.info-box {
background-color: #2d3748;
color: #e2e8f0;
border-color: #4a5568;
border-left-color: #718096;
}
.info-box p strong {
color: #f7fafc;
}
.hallucination-positive {
background-color: #553c39;
color: #fed7d7;
border-left-color: #fc8181;
}
.hallucination-positive h3 {
color: #fc8181;
}
.hallucination-positive p {
color: #fed7d7;
}
.hallucination-negative {
background-color: #22543d;
color: #c6f6d5;
border-left-color: #68d391;
}
.hallucination-negative h3 {
color: #68d391;
}
.hallucination-negative p {
color: #c6f6d5;
}
.response-box {
background-color: #1a202c;
color: #e2e8f0;
border-left-color: #4a5568;
}
.example-query {
background-color: #2a4365;
border-color: #2c5282;
color: #bee3f8;
}
.example-query:hover {
background-color: #3182ce;
}
.stats-section {
background-color: #2a4365;
border-color: #2c5282;
}
.stat-value {
color: #bee3f8;
}
.stat-label {
color: #90cdf4;
}
.feedback-section {
border-top-color: #4a5568;
}
.footer {
color: #a0aec0;
}
.processing-status {
background-color: #2a4365;
border-left-color: #90cdf4;
color: #bee3f8;
}
.debug-panel {
background-color: #1a202c;
border-color: #4a5568;
color: #e2e8f0;
}
.progress-container {
background-color: #2d3748;
border-color: #4a5568;
}
.progress-bar-container {
background-color: #4a5568;
}
.progress-bar {
background-image: linear-gradient(to right, #90cdf4, #63b3ed);
}
.query-display {
color: #a0aec0;
background-color: #1a202c;
border-left-color: #90cdf4;
}
}
/* Gradio theme detection fallbacks */
html[data-theme="dark"] .title,
.dark .title {
color: #63b3ed !important;
}
html[data-theme="dark"] .subtitle,
.dark .subtitle {
color: #a0aec0 !important;
}
html[data-theme="dark"] .section-title,
.dark .section-title {
color: #90cdf4 !important;
}
html[data-theme="dark"] .info-box,
.dark .info-box {
background-color: #2d3748 !important;
color: #e2e8f0 !important;
border-color: #4a5568 !important;
border-left-color: #718096 !important;
}
html[data-theme="dark"] .info-box p strong,
.dark .info-box p strong {
color: #f7fafc !important;
}
html[data-theme="dark"] .response-box,
.dark .response-box {
background-color: #1a202c !important;
color: #e2e8f0 !important;
border-left-color: #4a5568 !important;
}
html[data-theme="dark"] .example-query,
.dark .example-query {
background-color: #2a4365 !important;
border-color: #2c5282 !important;
color: #bee3f8 !important;
}
html[data-theme="dark"] .stats-section,
.dark .stats-section {
background-color: #2a4365 !important;
border-color: #2c5282 !important;
}
html[data-theme="dark"] .stat-value,
.dark .stat-value {
color: #bee3f8 !important;
}
html[data-theme="dark"] .stat-label,
.dark .stat-label {
color: #90cdf4 !important;
}
html[data-theme="dark"] .processing-status,
.dark .processing-status {
background-color: #2a4365 !important;
border-left-color: #90cdf4 !important;
color: #bee3f8 !important;
}
html[data-theme="dark"] .debug-panel,
.dark .debug-panel {
background-color: #1a202c !important;
border-color: #4a5568 !important;
color: #e2e8f0 !important;
}
html[data-theme="dark"] .progress-container,
.dark .progress-container {
background-color: #2d3748 !important;
border-color: #4a5568 !important;
}
html[data-theme="dark"] .progress-bar-container,
.dark .progress-bar-container {
background-color: #4a5568 !important;
}
html[data-theme="dark"] .query-display,
.dark .query-display {
color: #a0aec0 !important;
background-color: #1a202c !important;
border-left-color: #90cdf4 !important;
}
/* Additional theme-aware classes */
.model-info-bar {
background-color: #ebf8ff;
padding: 10px 15px;
border-radius: 8px;
margin-bottom: 15px;
display: flex;
justify-content: space-between;
border: 1px solid #bee3f8;
}
.model-info-section {
flex: 1;
text-align: center;
padding-right: 10px;
border-right: 1px solid #bee3f8;
}
.model-info-section:last-child {
border-right: none;
padding-right: 0;
padding-left: 10px;
}
.model-label {
font-weight: bold;
color: #2c5282;
}
.model-name {
font-size: 1.2em;
color: #2b6cb0;
}
.app-title {
font-size: 2.2em;
font-weight: 600;
color: #2c5282;
margin-bottom: 0.2em;
}
.app-subtitle {
font-size: 1.3em;
color: #4a5568;
margin-bottom: 0.8em;
}
.app-description {
font-size: 1.1em;
color: #718096;
max-width: 800px;
margin: 0 auto;
}
.section-meta {
font-size: 0.8em;
color: #718096;
}
.divider-line {
margin-top: 20px;
border-top: 1px dashed #e2e8f0;
padding-top: 15px;
font-size: 0.9em;
color: #718096;
text-align: center;
}
.info-message {
padding: 20px;
background-color: #ebf8ff;
border-radius: 8px;
text-align: center;
margin: 20px 0;
border: 1px solid #bee3f8;
}
.info-message h3 {
margin-top: 0;
color: #2c5282;
}
.error-message {
padding: 20px;
background-color: #ffeaea;
border-radius: 8px;
text-align: center;
margin: 20px 0;
border: 1px solid #fc8181;
}
.error-message h3 {
margin-top: 0;
color: #e53e3e;
}
.perf-metric {
font-weight: 500;
}
.perf-generator {
color: #38a169;
}
.perf-judge {
color: #3182ce;
}
.perf-consistency {
color: #805ad5;
}
.perf-distribution {
color: #d69e2e;
}
/* Dark theme versions */
@media (prefers-color-scheme: dark) {
.model-info-bar {
background-color: #2a4365;
border-color: #2c5282;
}
.model-info-section {
border-right-color: #2c5282;
}
.model-label {
color: #bee3f8;
}
.model-name {
color: #90cdf4;
}
.app-title {
color: #63b3ed;
}
.app-subtitle {
color: #a0aec0;
}
.app-description {
color: #cbd5e0;
}
.section-meta {
color: #a0aec0;
}
.divider-line {
border-top-color: #4a5568;
color: #a0aec0;
}
.info-message {
background-color: #2a4365;
border-color: #2c5282;
}
.info-message h3 {
color: #bee3f8;
}
.error-message {
background-color: #553c39;
border-color: #fc8181;
}
.error-message h3 {
color: #fc8181;
}
.perf-generator {
color: #68d391;
}
.perf-judge {
color: #90cdf4;
}
.perf-consistency {
color: #b794f6;
}
.perf-distribution {
color: #f6e05e;
}
}
/* Gradio fallbacks for new classes */
html[data-theme="dark"] .model-info-bar,
.dark .model-info-bar {
background-color: #2a4365 !important;
border-color: #2c5282 !important;
}
html[data-theme="dark"] .model-label,
.dark .model-label {
color: #bee3f8 !important;
}
html[data-theme="dark"] .model-name,
.dark .model-name {
color: #90cdf4 !important;
}
html[data-theme="dark"] .app-title,
.dark .app-title {
color: #63b3ed !important;
}
html[data-theme="dark"] .app-subtitle,
.dark .app-subtitle {
color: #a0aec0 !important;
}
html[data-theme="dark"] .app-description,
.dark .app-description {
color: #cbd5e0 !important;
}
html[data-theme="dark"] .section-meta,
.dark .section-meta {
color: #a0aec0 !important;
}
html[data-theme="dark"] .divider-line,
.dark .divider-line {
border-top-color: #4a5568 !important;
color: #a0aec0 !important;
}
/* Progress model info styling */
.progress-model-info {
display: flex;
justify-content: space-between;
margin-top: 8px;
font-size: 0.85em;
color: #4a5568;
background-color: #ebf8ff;
padding: 5px 10px;
border-radius: 4px;
border: 1px solid #bee3f8;
}
@media (prefers-color-scheme: dark) {
.progress-model-info {
color: #a0aec0;
background-color: #2a4365;
border-color: #2c5282;
}
}
html[data-theme="dark"] .progress-model-info,
.dark .progress-model-info {
color: #a0aec0 !important;
background-color: #2a4365 !important;
border-color: #2c5282 !important;
}
/* Metrics explanation box styling */
.metrics-explanation {
margin-top: 15px;
padding: 12px;
background-color: #f7fafc;
border-radius: 8px;
font-size: 0.95em;
color: #2d3748;
box-shadow: 0 2px 5px rgba(0,0,0,0.1);
border: 1px solid #e2e8f0;
}
.metrics-explanation p {
margin-bottom: 8px;
color: #2c5282;
}
.metrics-explanation ul {
margin-top: 5px;
padding-left: 20px;
line-height: 1.4;
}
.metrics-explanation strong {
color: #2b6cb0;
}
@media (prefers-color-scheme: dark) {
.metrics-explanation {
background-color: #2d3748;
color: #e2e8f0;
border-color: #4a5568;
}
.metrics-explanation p {
color: #90cdf4;
}
.metrics-explanation strong {
color: #bee3f8;
}
}
html[data-theme="dark"] .metrics-explanation,
.dark .metrics-explanation {
background-color: #2d3748 !important;
color: #e2e8f0 !important;
border-color: #4a5568 !important;
}
html[data-theme="dark"] .metrics-explanation p,
.dark .metrics-explanation p {
color: #90cdf4 !important;
}
html[data-theme="dark"] .metrics-explanation strong,
.dark .metrics-explanation strong {
color: #bee3f8 !important;
}
/* Leaderboard table styling */
.leaderboard-container {
margin: 15px 0;
overflow-x: auto;
}
.leaderboard-table {
width: 100%;
border-collapse: collapse;
font-size: 0.95em;
box-shadow: 0 2px 10px rgba(0,0,0,0.1);
border-radius: 8px;
overflow: hidden;
border: 1px solid #e2e8f0;
}
.leaderboard-table thead {
background-color: #3182ce;
color: white;
}
.leaderboard-table th,
.leaderboard-table td {
padding: 12px 15px;
text-align: left;
border-bottom: 1px solid #e2e8f0;
color: #2d3748;
}
.leaderboard-table thead th {
color: white;
border-bottom-color: #2c5282;
}
.leaderboard-table tbody tr {
transition: background-color 0.3s;
background-color: #ffffff;
}
.leaderboard-table tbody tr:nth-child(even) {
background-color: #f7fafc;
}
.leaderboard-table tbody tr:hover {
background-color: #ebf8ff;
}
.leaderboard-table tbody tr.top-rank-1 {
background-color: #f0fff4;
color: #22543d;
font-weight: bold;
}
.leaderboard-table tbody tr.top-rank-2 {
background-color: #fefcbf;
color: #744210;
font-weight: 500;
}
.leaderboard-table tbody tr.top-rank-3 {
background-color: #fed7cc;
color: #7c2d12;
font-weight: 500;
}
/* Dark theme leaderboard */
@media (prefers-color-scheme: dark) {
.leaderboard-table {
border-color: #4a5568;
box-shadow: 0 2px 10px rgba(0,0,0,0.3);
}
.leaderboard-table thead {
background-color: #2c5282;
}
.leaderboard-table th,
.leaderboard-table td {
border-bottom-color: #4a5568;
color: #e2e8f0;
}
.leaderboard-table thead th {
border-bottom-color: #1a365d;
}
.leaderboard-table tbody tr {
background-color: #2d3748;
}
.leaderboard-table tbody tr:nth-child(even) {
background-color: #1a202c;
}
.leaderboard-table tbody tr:hover {
background-color: #2a4365;
}
.leaderboard-table tbody tr.top-rank-1 {
background-color: #22543d;
color: #c6f6d5;
}
.leaderboard-table tbody tr.top-rank-2 {
background-color: #744210;
color: #fefcbf;
}
.leaderboard-table tbody tr.top-rank-3 {
background-color: #7c2d12;
color: #fed7cc;
}
}
/* Gradio fallbacks for leaderboard */
html[data-theme="dark"] .leaderboard-table,
.dark .leaderboard-table {
border-color: #4a5568 !important;
box-shadow: 0 2px 10px rgba(0,0,0,0.3) !important;
}
html[data-theme="dark"] .leaderboard-table thead,
.dark .leaderboard-table thead {
background-color: #2c5282 !important;
}
html[data-theme="dark"] .leaderboard-table th,
html[data-theme="dark"] .leaderboard-table td,
.dark .leaderboard-table th,
.dark .leaderboard-table td {
border-bottom-color: #4a5568 !important;
color: #e2e8f0 !important;
}
html[data-theme="dark"] .leaderboard-table thead th,
.dark .leaderboard-table thead th {
border-bottom-color: #1a365d !important;
color: white !important;
}
html[data-theme="dark"] .leaderboard-table tbody tr,
.dark .leaderboard-table tbody tr {
background-color: #2d3748 !important;
}
html[data-theme="dark"] .leaderboard-table tbody tr:nth-child(even),
.dark .leaderboard-table tbody tr:nth-child(even) {
background-color: #1a202c !important;
}
html[data-theme="dark"] .leaderboard-table tbody tr:hover,
.dark .leaderboard-table tbody tr:hover {
background-color: #2a4365 !important;
}
"""
# Example queries
example_queries = [
"Who was the first person to land on the moon?",
"What is the capital of France?",
"How many planets are in our solar system?",
"Who wrote the novel 1984?",
"What is the speed of light?",
"What was the first computer?"
]
# Function to update the progress display
def update_progress_display(html):
"""Update the progress display with the provided HTML"""
return gr.update(visible=True, value=html)
# Register the callback with the tracker
progress_tracker.register_callback(update_progress_display)
# Register the tracker with the detector
detector.set_progress_callback(progress_tracker.update_stage)
# Helper function to set example query
def set_example_query(example):
return example
# Function to show processing is starting
def start_processing(query):
logger.info("Processing query: %s", query)
# Stop any existing pulsing to prepare for incremental progress updates
progress_tracker.stop_pulsing()
# Reset to a processing state without the "Ready" text
# Use "starting" stage but with minimal UI display
progress_tracker.stage = "starting"
progress_tracker.query = query
# Force UI update with clean display
if progress_tracker._status_callback:
progress_tracker._status_callback(progress_tracker.get_html_status())
return [
gr.update(visible=True), # Show the progress display
gr.update(visible=False), # Hide the results accordion
gr.update(visible=False), # Hide the feedback accordion
None # Reset hidden results
]
# Main processing function
def process_query_and_display_results(query, progress=gr.Progress()):
if not query.strip():
logger.warning("Empty query submitted")
progress_tracker.stop_pulsing()
progress_tracker.update_stage("error", error_message="Please enter a query.")
return [
gr.update(visible=True), # Show the progress with error
gr.update(visible=False),
gr.update(visible=False),
None
]
# Check if API is initialized
if not detector.pas2:
try:
# Try to initialize from environment variables
logger.info("Initializing APIs from environment variables")
progress(0.05, desc="Initializing API...")
init_message = detector.initialize_api(
mistral_api_key=os.environ.get("HF_MISTRAL_API_KEY"),
openai_api_key=os.environ.get("HF_OPENAI_API_KEY")
)
if "successfully" not in init_message:
logger.error("Failed to initialize APIs: %s", init_message)
progress_tracker.stop_pulsing()
progress_tracker.update_stage("error", error_message="API keys not found in environment variables.")
return [
gr.update(visible=True),
gr.update(visible=False),
gr.update(visible=False),
None
]
except Exception as e:
logger.error("Error initializing API: %s", str(e), exc_info=True)
progress_tracker.stop_pulsing()
progress_tracker.update_stage("error", error_message=f"Error initializing API: {str(e)}")
return [
gr.update(visible=True),
gr.update(visible=False),
gr.update(visible=False),
None
]
try:
# Process the query
logger.info("Starting hallucination detection process")
start_time = time.time()
# Set up a custom progress callback that uses both the progress_tracker and the gr.Progress
def combined_progress_callback(stage, **kwargs):
# Skip the idle stage, which shows "Ready"
if stage == "idle":
return
progress_tracker.update_stage(stage, **kwargs)
# Map the stages to progress values for the gr.Progress bar
stage_to_progress = {
"starting": 0.05,
"generating_paraphrases": 0.15,
"paraphrases_complete": 0.3,
"getting_responses": 0.35,
"responses_progress": lambda kwargs: 0.35 + (0.3 * (kwargs.get("completed", 0) / max(kwargs.get("total", 1), 1))),
"responses_complete": 0.65,
"judging": 0.7,
"complete": 1.0,
"error": 1.0
}
# Update the gr.Progress bar
if stage in stage_to_progress:
prog_value = stage_to_progress[stage]
if callable(prog_value):
prog_value = prog_value(kwargs)
desc = progress_tracker.STAGES[stage]["status"]
if "{" in desc and "}" in desc:
# Format the description with any kwargs
desc = desc.format(**kwargs)
# Ensure UI updates by adding a small delay
# This forces the progress updates to be rendered
progress(prog_value, desc=desc)
# For certain key stages, add a small sleep to ensure progress is visible
if stage in ["starting", "generating_paraphrases", "paraphrases_complete",
"getting_responses", "responses_complete", "judging", "complete"]:
time.sleep(0.2) # Small delay to ensure UI update is visible
# Use these steps for processing
detector.set_progress_callback(combined_progress_callback)
# Create a wrapper function for detect_hallucination that gives more control over progress updates
def run_detection_with_visible_progress():
# Step 1: Start
combined_progress_callback("starting", query=query)
time.sleep(0.3) # Ensure starting status is visible
# Step 1.5: Randomly select model pair
generator_model, judge_model = detector.pas2.set_random_model_pair()
combined_progress_callback("starting", query=query, generator=generator_model, judge=judge_model)
time.sleep(0.3) # Ensure model info is visible
# Step 2: Generate paraphrases (15-30%)
combined_progress_callback("generating_paraphrases", query=query)
all_queries = detector.pas2.generate_paraphrases(query)
combined_progress_callback("paraphrases_complete", query=query, count=len(all_queries))
# Step 3: Get responses (35-65%)
combined_progress_callback("getting_responses", query=query, total=len(all_queries), model=generator_model)
all_responses = []
for i, q in enumerate(all_queries):
# Show incremental progress for each response
combined_progress_callback("responses_progress", query=query, completed=i, total=len(all_queries))
response = detector.pas2._get_single_response(q, index=i)
all_responses.append(response)
combined_progress_callback("responses_complete", query=query)
# Step 4: Judge hallucinations (70-100%)
combined_progress_callback("judging", query=query, model=judge_model)
# The first query is the original, rest are paraphrases
original_query = all_queries[0]
original_response = all_responses[0]
paraphrased_queries = all_queries[1:] if len(all_queries) > 1 else []
paraphrased_responses = all_responses[1:] if len(all_responses) > 1 else []
# Judge the responses
judgment = detector.pas2.judge_hallucination(
original_query=original_query,
original_response=original_response,
paraphrased_queries=paraphrased_queries,
paraphrased_responses=paraphrased_responses
)
# Assemble the results
results = {
"original_query": original_query,
"original_response": original_response,
"paraphrased_queries": paraphrased_queries,
"paraphrased_responses": paraphrased_responses,
"hallucination_detected": judgment.hallucination_detected,
"confidence_score": judgment.confidence_score,
"conflicting_facts": judgment.conflicting_facts,
"reasoning": judgment.reasoning,
"summary": judgment.summary,
"generator_model": generator_model,
"judge_model": judge_model
}
# Show completion
combined_progress_callback("complete", query=query, generator=generator_model, judge=judge_model)
time.sleep(0.3) # Ensure complete status is visible
return results
# Run the detection process with visible progress
results = run_detection_with_visible_progress()
# Calculate elapsed time
elapsed_time = time.time() - start_time
logger.info("Hallucination detection completed in %.2f seconds", elapsed_time)
# Check for errors
if "error" in results:
logger.error("Error in results: %s", results["error"])
progress_tracker.stop_pulsing()
progress_tracker.update_stage("error", error_message=results["error"])
return [
gr.update(visible=True),
gr.update(visible=False),
gr.update(visible=False),
None
]
# Prepare responses for display
original_query = results["original_query"]
original_response = results["original_response"]
paraphrased_queries = results["paraphrased_queries"]
paraphrased_responses = results["paraphrased_responses"]
hallucination_detected = results["hallucination_detected"]
confidence = results["confidence_score"]
reasoning = results["reasoning"]
summary = results["summary"]
# Format conflicting facts
conflicting_facts = results["conflicting_facts"]
conflicting_facts_text = ""
if conflicting_facts:
for i, fact in enumerate(conflicting_facts, 1):
conflicting_facts_text += f"{i}. "
if isinstance(fact, dict):
for key, value in fact.items():
conflicting_facts_text += f"{key}: {value}, "
conflicting_facts_text = conflicting_facts_text.rstrip(", ")
else:
conflicting_facts_text += str(fact)
conflicting_facts_text += "\n"
# Format responses to escape any backslashes
original_response_safe = original_response.replace('\\', '\\\\').replace('\n', '<br>')
paraphrased_responses_safe = [r.replace('\\', '\\\\').replace('\n', '<br>') for r in paraphrased_responses]
reasoning_safe = reasoning.replace('\\', '\\\\').replace('\n', '<br>')
conflicting_facts_text_safe = conflicting_facts_text.replace('\\', '\\\\').replace('\n', '<br>') if conflicting_facts_text else "<strong>None identified</strong>"
# Get model info from the results
generator_model = results.get("generator_model", "unknown model")
judge_model = results.get("judge_model", "unknown model")
html_output = f"""
<div class="container">
<h2 class="title">Hallucination Detection Results</h2>
<div class="model-info-bar">
<div class="model-info-section">
<div class="model-label">Generator Model</div>
<div class="model-name">{generator_model}</div>
</div>
<div class="model-info-section">
<div class="model-label">Judge Model</div>
<div class="model-name">{judge_model}</div>
</div>
</div>
<div class="stats-section">
<div class="stat-item">
<div class="stat-value">{'Yes' if hallucination_detected else 'No'}</div>
<div class="stat-label">Hallucination Detected</div>
</div>
<div class="stat-item">
<div class="stat-value">{confidence:.2f}</div>
<div class="stat-label">Confidence Score</div>
</div>
<div class="stat-item">
<div class="stat-value">{len(paraphrased_queries)}</div>
<div class="stat-label">Paraphrases Analyzed</div>
</div>
<div class="stat-item">
<div class="stat-value">{elapsed_time:.1f}s</div>
<div class="stat-label">Processing Time</div>
</div>
</div>
<div class="{'hallucination-positive' if hallucination_detected else 'hallucination-negative'}">
<h3>Analysis Summary</h3>
<p>{summary}</p>
</div>
<div class="section-title">Original Query</div>
<div class="response-box">
{original_query}
</div>
<div class="section-title">Original Response <span class="section-meta">(generated by {generator_model})</span></div>
<div class="response-box">
{original_response_safe}
</div>
<div class="section-title">Paraphrased Queries and Responses</div>
"""
for i, (q, r) in enumerate(zip(paraphrased_queries, paraphrased_responses_safe), 1):
html_output += f"""
<div class="section-title">Paraphrased Query {i}</div>
<div class="response-box">
{q}
</div>
<div class="section-title">Response {i} <span class="section-meta">(generated by {generator_model})</span></div>
<div class="response-box">
{r}
</div>
"""
html_output += f"""
<div class="section-title">Detailed Analysis <span class="section-meta">(judged by {judge_model})</span></div>
<div class="info-box">
<p><strong>Reasoning:</strong></p>
<p>{reasoning_safe}</p>
<p><strong>Conflicting Facts:</strong></p>
<p>{conflicting_facts_text_safe}</p>
</div>
<div class="divider-line">
Models randomly selected for this analysis: <strong>{generator_model}</strong> (Generator) and <strong>{judge_model}</strong> (Judge)
</div>
</div>
"""
logger.info("Updating UI with results")
progress_tracker.stop_pulsing()
return [
gr.update(visible=False), # Hide progress display when showing results
gr.update(visible=True, value=html_output),
gr.update(visible=True), # Show feedback accordion after results
results
]
except Exception as e:
logger.error("Error processing query: %s", str(e), exc_info=True)
progress_tracker.stop_pulsing()
progress_tracker.update_stage("error", error_message=f"Error processing query: {str(e)}")
return [
gr.update(visible=True),
gr.update(visible=False),
gr.update(visible=False),
None
]
# Helper function to submit feedback
def combine_feedback(hallucination_present, judge_correct, fb_text, results):
combined_feedback = f"Hallucination: {hallucination_present}, Judge Correct: {judge_correct}"
if fb_text:
combined_feedback += f", Comments: {fb_text}"
if not results:
return "No results to attach feedback to."
response = detector.save_feedback(results, combined_feedback)
# Check if this is a duplicate feedback submission message
is_duplicate = "already provided feedback" in response
notification_color = "#ff9800" if is_duplicate else "#4caf50"
icon = "ℹ" if is_duplicate else "✓"
heading_text = "Note" if is_duplicate else "Thank You!"
message_text = response
status_text = "already submitted" if is_duplicate else "submitted successfully"
# Return a message that will trigger a JS notification
feedback_response = f"""
<div id="feedback-popup-container"></div>
<script>
(function() {{
// Create the notification element
const container = document.getElementById('feedback-popup-container');
const notification = document.createElement('div');
notification.id = 'feedback-notification';
notification.style.cssText = `
position: fixed;
top: 50px;
right: 20px;
background-color: {notification_color};
color: white;
padding: 15px;
border-radius: 5px;
box-shadow: 0 2px 10px rgba(0,0,0,0.2);
z-index: 1000;
opacity: 0;
transform: translateX(50px);
transition: opacity 0.3s, transform 0.3s;
display: flex;
align-items: center;
`;
// Create notification content
const checkmark = document.createElement('div');
checkmark.style.marginRight = '10px';
checkmark.textContent = '{icon}';
const textContainer = document.createElement('div');
const heading = document.createElement('div');
heading.style.fontWeight = 'bold';
heading.textContent = '{heading_text}';
const message = document.createElement('div');
message.textContent = '{message_text}';
message.style.fontSize = '0.9em';
message.style.marginTop = '2px';
textContainer.appendChild(heading);
textContainer.appendChild(message);
notification.appendChild(checkmark);
notification.appendChild(textContainer);
// Add to document
document.body.appendChild(notification);
// Show notification
setTimeout(function() {{
notification.style.opacity = '1';
notification.style.transform = 'translateX(0)';
// Hide after 3 seconds
setTimeout(function() {{
notification.style.opacity = '0';
notification.style.transform = 'translateX(50px)';
// Remove element after animation
setTimeout(function() {{
notification.remove();
}}, 300);
}}, 3000);
}}, 100);
}})();
</script>
<div>Feedback {status_text}!</div>
"""
return feedback_response
# Create the interface
with gr.Blocks(css=css, theme=gr.themes.Soft()) as interface:
gr.HTML(
"""
<div style="text-align: center; margin-bottom: 1.5rem">
<h1 class="app-title">PAS2 - Hallucination Detector</h1>
<h3 class="app-subtitle">Advanced AI Response Verification Using Model-as-Judge</h3>
<p class="app-description">
This tool detects hallucinations in AI responses by comparing answers to semantically equivalent questions and using a specialized judge model.
</p>
</div>
"""
)
# Main tabs for the application
with gr.Tabs() as tabs:
# Tab 1: Hallucination Detector
with gr.TabItem("Detector"):
with gr.Accordion("About this Tool", open=False):
gr.Markdown(
"""
### How It Works
This tool implements the Paraphrase-based Approach for Scrutinizing Systems (PAS2) with a model-as-judge enhancement:
1. **Paraphrase Generation**: Your question is paraphrased multiple ways while preserving its core meaning
2. **Multiple Responses**: All questions (original + paraphrases) are sent to a randomly selected generator model
3. **Expert Judgment**: A randomly selected judge model analyzes all responses to detect factual inconsistencies
### Why This Approach?
When an AI hallucinates, it often provides different answers to the same question when phrased differently.
By using a separate judge model, we can identify these inconsistencies more effectively than with
metric-based approaches.
### Understanding the Results
- **Confidence Score**: Indicates the judge's confidence in the hallucination detection
- **Conflicting Facts**: Specific inconsistencies found across responses
- **Reasoning**: The judge's detailed analysis explaining its decision
### Privacy Notice
Your queries and the system's responses are saved to help improve hallucination detection.
No personally identifiable information is collected.
"""
)
with gr.Row():
with gr.Column():
# First define the query input
gr.Markdown("### Enter Your Question")
with gr.Row():
query_input = gr.Textbox(
label="",
placeholder="Ask a factual question (e.g., Who was the first person to land on the moon?)",
lines=3
)
# Now define the example queries
gr.Markdown("### Or Try an Example")
example_row = gr.Row()
with example_row:
for example in example_queries:
example_btn = gr.Button(
example,
elem_classes=["example-query"],
scale=0
)
example_btn.click(
fn=set_example_query,
inputs=[gr.Textbox(value=example, visible=False)],
outputs=[query_input]
)
with gr.Row():
submit_button = gr.Button("Detect Hallucinations", variant="primary", scale=1)
# Error message
error_message = gr.HTML(
label="Status",
visible=False
)
# Progress display
progress_display = gr.HTML(
value=progress_tracker.get_html_status(),
visible=True
)
# Results display
results_accordion = gr.HTML(visible=False)
# Add feedback stats display
feedback_stats = gr.HTML(visible=True)
# Feedback section
with gr.Accordion("Provide Feedback", open=True, elem_id="detector-feedback") as feedback_accordion:
gr.Markdown("### Help Improve the System")
gr.Markdown("Your feedback helps us refine the hallucination detection system.")
hallucination_present = gr.Radio(
label="Was there actually a hallucination in the responses?",
choices=["Yes, there was a hallucination", "No, there was no hallucination", "Not sure"],
value="Not sure"
)
judge_correct = gr.Radio(
label="Did the judge model correctly identify the situation?",
choices=["Yes, the judge was correct", "No, the judge was incorrect", "Not sure"],
value="Not sure"
)
feedback_text = gr.Textbox(
label="Additional comments (optional)",
placeholder="Please provide any additional observations or details...",
lines=2
)
feedback_button = gr.Button("Submit Feedback", variant="secondary")
feedback_status = gr.HTML(visible=True)
# Tab 2: Model Leaderboard
with gr.TabItem("Model Leaderboard", elem_id="model-leaderboard-tab"):
gr.Markdown("## Hallucination Detection Scores")
gr.Markdown("Performance comparison of different Generator + Judge model combinations.")
# Function to generate the HTML for the model pair leaderboard
def generate_pair_leaderboard_html():
try:
# Get leaderboard data
pairs = detector.get_pair_leaderboard() or []
if not pairs:
return (
"<div class=\"info-message\">"
"<h3>No Data Available Yet</h3>"
"<p>Try the detector with more queries to populate the leaderboard!</p>"
"</div>"
)
# Generate table rows
rows = ""
for rank, pair in enumerate(pairs, 1):
# Add special styling for top 3
row_class = ""
if rank == 1:
row_class = "class='top-rank-1'"
elif rank == 2:
row_class = "class='top-rank-2'"
elif rank == 3:
row_class = "class='top-rank-3'"
# Format percentages for display
generator_perf = f"{pair.get('generator_performance', 0) * 100:.1f}%" if 'generator_performance' in pair else "N/A"
judge_perf = f"{pair.get('judge_performance', 0) * 100:.1f}%" if 'judge_performance' in pair else "N/A"
consistency = f"{pair.get('consistency_score', 0)}%" if 'consistency_score' in pair else "N/A"
rows += (
f"<tr {row_class}>"
f"<td>{rank}</td>"
f"<td>{pair.get('generator', 'unknown')}</td>"
f"<td>{pair.get('judge', 'unknown')}</td>"
f"<td>{round(pair.get('elo_score', 0))}</td>"
f"<td>{pair.get('accuracy')}%</td>"
f"<td class='perf-metric perf-generator'>{generator_perf}</td>"
f"<td class='perf-metric perf-judge'>{judge_perf}</td>"
f"<td class='perf-metric perf-consistency'>{consistency}</td>"
f"<td>{pair.get('total_samples', 0)}</td>"
f"</tr>"
)
# Build the full table
html = (
f"<div class=\"leaderboard-container\">"
f"<table class=\"leaderboard-table\">"
f"<thead>"
f"<tr>"
f"<th>Rank</th>"
f"<th>Generator Model</th>"
f"<th>Judge Model</th>"
f"<th>ELO Score</th>"
f"<th>Accuracy</th>"
f"<th>Generator Perf.</th>"
f"<th>Judge Perf.</th>"
f"<th>Consistency</th>"
f"<th>Sample Size</th>"
f"</tr>"
f"</thead>"
f"<tbody>"
f"{rows}"
f"</tbody>"
f"</table>"
f"</div>"
f"<div class='metrics-explanation'>"
f"<p><strong>Model Pair Performance Metrics:</strong></p>"
f"<ul>"
f"<li><strong>Accuracy</strong>: Percentage of correct hallucination judgments based on user feedback</li>"
f"<li><strong>Generator Performance</strong>: How well the generator model avoids hallucinations</li>"
f"<li><strong>Judge Performance</strong>: How accurately the judge model identifies hallucinations</li>"
f"<li><strong>Consistency</strong>: Weighted measure of how well the pair works together</li>"
f"</ul>"
f"</div>"
)
return html
except Exception as e:
logger.error("Error generating leaderboard HTML: %s", str(e), exc_info=True)
return (
f"<div class=\"error-message\">"
f"<h3>Error Loading Leaderboard</h3>"
f"<p>{str(e)}</p>"
f"</div>"
)
# Create leaderboard table for model combinations
model_leaderboard_html = gr.HTML(generate_pair_leaderboard_html())
refresh_leaderboard_btn = gr.Button("Refresh Leaderboard", variant="primary")
refresh_leaderboard_btn.click(
fn=lambda: generate_pair_leaderboard_html(),
outputs=[model_leaderboard_html]
)
# ELO rating explanation
with gr.Accordion("ELO Rating System Explanation", open=False):
gr.HTML(
"<div style='margin-top: 20px; padding: 15px; background-color: #0d47a1; border-radius: 8px; box-shadow: 0 2px 8px rgba(0,0,0,0.1);'>" +
"<h3 style='margin-top: 0; color: #ffffff;'>ELO Rating System Explanation</h3>" +
"<div style='display: flex; flex-wrap: wrap; gap: 15px; margin-top: 15px;'>" +
"<div style='flex: 1; min-width: 280px; padding: 12px; background-color: #455a64; border-radius: 6px; box-shadow: 0 1px 3px rgba(0,0,0,0.12);'>" +
"<h4 style='margin-top: 0; color: #ffffff;'>How ELO Scores Are Calculated</h4>" +
"<p style='color: #eceff1;'>Our ELO rating system assigns scores to model pairs based on user feedback, using the following formula:</p>" +
"<div style='background-color: #37474f; padding: 12px; border-radius: 5px; color: #eceff1;'>" +
"<code style='color: #80deea;'>ELO_new = ELO_old + K * (S - E)</code><br><br>" +
"Where:<br>* <strong style='color: #b2dfdb;'>ELO_old</strong>: Previous rating of the model combination<br>" +
"* <strong style='color: #b2dfdb;'>K</strong>: Weight factor (24 for model pairs)<br>" +
"* <strong style='color: #b2dfdb;'>S</strong>: Actual score from user feedback (1 for correct, 0 for incorrect)<br>" +
"* <strong style='color: #b2dfdb;'>E</strong>: Expected score based on current rating<br><br>" +
"<em style='color: #80deea;'>E = 1 / (1 + 10<sup>(1500 - ELO_model)/400</sup>)</em></div></div>" +
"<div style='flex: 1; min-width: 280px; padding: 12px; background-color: #455a64; border-radius: 6px; box-shadow: 0 1px 3px rgba(0,0,0,0.12);'>" +
"<h4 style='margin-top: 0; color: #ffffff;'>Available Models</h4>" +
"<p style='color: #eceff1;'>The system randomly selects from these models for each hallucination detection:</p>" +
"<div style='display: flex; flex-wrap: wrap; gap: 10px; margin-top: 10px;'>" +
"<div style='flex: 1; min-width: 120px;'>" +
"<h5 style='margin-top: 0; margin-bottom: 5px; color: #b2dfdb;'>All Models (Used as both Generator & Judge)</h5>" +
"<ul style='margin-bottom: 0; padding-left: 20px; color: #eceff1;'>" +
"<li>mistral-large</li><li>gpt-4o</li><li>qwen-235b</li><li>grok-3</li>" +
"<li>deepseek-reasoner</li><li>o4-mini</li><li>gemini-2.5-pro</li>" +
"</ul></div></div></div></div></div>"
)
# Tab 3: Individual Models Leaderboard
with gr.TabItem("Individual Models", elem_id="user-feedback-tab"):
gr.Markdown("## Individual Model Performance")
gr.Markdown("Performance ranking of models based on user feedback, showing statistics for both generator and judge roles.")
# Function to generate individual model leaderboard HTML
def generate_model_leaderboard_html():
try:
# Get model scores from MongoDB
models = detector.get_model_leaderboard() or []
if not models:
return (
"<div class=\"info-message\">"
"<h3>No Data Available Yet</h3>"
"<p>Try the detector with more queries to populate the model scores!</p>"
"</div>"
)
# Generate table rows
rows = ""
for rank, model in enumerate(models, 1):
# Add special styling for top 3
row_class = ""
if rank == 1:
row_class = "class='top-rank-1'"
elif rank == 2:
row_class = "class='top-rank-2'"
elif rank == 3:
row_class = "class='top-rank-3'"
# Calculate role distribution
as_generator = model.get('as_generator', 0)
as_judge = model.get('as_judge', 0)
if as_generator + as_judge > 0:
generator_pct = round((as_generator / (as_generator + as_judge)) * 100)
judge_pct = 100 - generator_pct
role_distribution = f"{generator_pct}% / {judge_pct}%"
else:
role_distribution = "N/A"
# Format percentages with better contrast against dark background
generator_acc = f"{model.get('generator_accuracy', 0.0)}%"
judge_acc = f"{model.get('judge_accuracy', 0.0)}%"
rows += (
f"<tr {row_class}>"
f"<td>{rank}</td>"
f"<td>{model.get('model_name', 'unknown')}</td>"
f"<td>{round(model.get('elo_score', 0))}</td>"
f"<td>{model.get('accuracy')}%</td>"
f"<td class='perf-metric perf-generator'>{generator_acc}</td>"
f"<td class='perf-metric perf-judge'>{judge_acc}</td>"
f"<td>{model.get('total_samples', 0)}</td>"
f"<td class='perf-metric perf-distribution'>{role_distribution}</td>"
f"</tr>"
)
# Build the full table
html = (
f"<div class=\"leaderboard-container\">"
f"<table class=\"leaderboard-table\">"
f"<thead>"
f"<tr>"
f"<th>Rank</th>"
f"<th>Model</th>"
f"<th>ELO Score</th>"
f"<th>Overall Accuracy</th>"
f"<th>Generator Accuracy</th>"
f"<th>Judge Accuracy</th>"
f"<th>Sample Size</th>"
f"<th>Generator/Judge Ratio</th>"
f"</tr>"
f"</thead>"
f"<tbody>"
f"{rows}"
f"</tbody>"
f"</table>"
f"</div>"
)
return html
except Exception as e:
logger.error("Error generating model leaderboard HTML: %s", str(e), exc_info=True)
return (
f"<div class=\"error-message\">"
f"<h3>Error Loading Model Leaderboard</h3>"
f"<p>{str(e)}</p>"
f"</div>"
)
# Create leaderboard table for individual models
model_scores_html = gr.HTML(generate_model_leaderboard_html())
refresh_models_btn = gr.Button("Refresh Model Scores", variant="primary")
refresh_models_btn.click(
fn=lambda: generate_model_leaderboard_html(),
outputs=[model_scores_html]
)
# ELO rating explanation for individual models
with gr.Accordion("ELO Rating Explanation for Individual Models", open=False):
gr.HTML(
"<div style='margin-top: 20px; padding: 15px; background-color: #0d47a1; border-radius: 8px; box-shadow: 0 2px 8px rgba(0,0,0,0.1);'>" +
"<h3 style='margin-top: 0; color: #ffffff;'>Individual Model ELO Rating System</h3>" +
"<div style='display: flex; flex-wrap: wrap; gap: 15px; margin-top: 15px;'>" +
"<div style='flex: 1; min-width: 280px; padding: 12px; background-color: #455a64; border-radius: 6px; box-shadow: 0 1px 3px rgba(0,0,0,0.12);'>" +
"<h4 style='margin-top: 0; color: #ffffff;'>How Individual ELO Scores Are Calculated</h4>" +
"<p style='color: #eceff1;'>Our ELO rating system assigns scores to individual models based on user feedback, using the following formula:</p>" +
"<div style='background-color: #37474f; padding: 12px; border-radius: 5px; color: #eceff1;'>" +
"<code style='color: #80deea;'>ELO_new = ELO_old + K * (S - E)</code><br><br>" +
"Where:<br>* <strong style='color: #b2dfdb;'>ELO_old</strong>: Previous rating of the model<br>" +
"* <strong style='color: #b2dfdb;'>K</strong>: Weight factor (32 for individual models)<br>" +
"* <strong style='color: #b2dfdb;'>S</strong>: Actual score (1 for correct judgment, 0 for incorrect)<br>" +
"* <strong style='color: #b2dfdb;'>E</strong>: Expected score based on current rating<br><br>" +
"<em style='color: #80deea;'>E = 1 / (1 + 10<sup>(1500 - ELO_model)/400</sup>)</em></div>" +
"<p style='color: #eceff1; margin-top: 10px;'>All models start with a base ELO of 1500. Scores are updated after each user evaluation.</p></div>" +
"<div style='flex: 1; min-width: 280px; padding: 12px; background-color: #455a64; border-radius: 6px; box-shadow: 0 1px 3px rgba(0,0,0,0.12);'>" +
"<h4 style='margin-top: 0; color: #ffffff;'>Interpretation Guidelines</h4>" +
"<ul style='margin-bottom: 0; padding-left: 20px; color: #eceff1;'>" +
"<li><strong style='color: #b2dfdb;'>1800+</strong>: Exceptional performance, very rare hallucinations</li>" +
"<li><strong style='color: #b2dfdb;'>1700-1799</strong>: Superior performance, minimal hallucinations</li>" +
"<li><strong style='color: #b2dfdb;'>1600-1699</strong>: Good performance, occasional hallucinations</li>" +
"<li><strong style='color: #b2dfdb;'>1500-1599</strong>: Average performance</li>" +
"<li><strong style='color: #b2dfdb;'>&lt;1500</strong>: Below average, frequent hallucinations</li>" +
"</ul><p style='font-style: italic; color: #b3e5fc; margin-top: 10px;'>" +
"Note: ELO scores are comparative and reflect relative performance between models in our specific hallucination detection tasks.</p>" +
"</div></div></div>"
)
# Function to continuously update stats
def update_stats():
stats = detector.get_feedback_stats()
if stats:
total = stats['total_feedback']
correct = stats['correct_predictions']
# Get accuracy directly from the stats
accuracy = stats['accuracy']
# Format accuracy percentage
accuracy_pct = f"{accuracy * 100:.1f}%"
stats_html = f"""
<div class="stats-section">
<div class="stat-item">
<div class="stat-value">{total}</div>
<div class="stat-label">Total Responses</div>
</div>
<div class="stat-item">
<div class="stat-value">{accuracy_pct}</div>
<div class="stat-label">Correct Predictions</div>
</div>
</div>
<div class="section-meta" style="text-align: center; margin-top: 10px; font-style: italic;">
Based on user feedback: {correct} correct out of {total} total predictions
</div>
"""
return stats_html
return ""
# Feedback section is now moved directly inside the Detector tab
# Add JavaScript to enhance the tabs
gr.HTML("""
<script>
// Add highlighting to the selected tab and handle feedback section visibility
function setupTabHighlighting() {
// Add hover effects to tabs
const tabs = document.querySelectorAll('.tabs button');
if (tabs.length > 0) {
tabs.forEach(tab => {
tab.addEventListener('mouseover', () => {
if (!tab.classList.contains('selected')) {
tab.style.backgroundColor = '#e8eaf6';
}
});
tab.addEventListener('mouseout', () => {
if (!tab.classList.contains('selected')) {
tab.style.backgroundColor = '';
}
});
// Handle tab click events to manage feedback section visibility
tab.addEventListener('click', function() {
// Use setTimeout to let Gradio UI update first
setTimeout(() => {
// Check if this tab is selected and what its text is
const isDetectorTab = this.classList.contains('selected') &&
!this.textContent.includes('Model') &&
!this.textContent.includes('User');
// Find all accordions in the page
const accordions = document.querySelectorAll('.accordion');
// Loop through all accordions
accordions.forEach(acc => {
// Check if this is the feedback accordion
if (acc.textContent.includes('Provide Feedback') ||
acc.textContent.includes('Help Improve')) {
if (isDetectorTab) {
acc.style.display = 'block';
} else {
acc.style.display = 'none';
}
}
});
}, 100);
});
});
}
}
// Set up all JavaScript enhancements after the page loads
function setupAllEnhancements() {
setupTabHighlighting();
// Simple solution to ensure feedback is only visible in detector tab
setTimeout(() => {
// Get the feedback accordion by ID
const feedbackAccordion = document.getElementById('detector-feedback');
if (!feedbackAccordion) return;
// Get all tabs
const tabs = document.querySelectorAll('.tabs button');
if (tabs.length === 0) return;
// Add click handlers to each tab
tabs.forEach((tab, index) => {
// Check if it's the first tab (Detector)
const isDetectorTab = index === 0;
// When a tab is clicked, toggle the feedback visibility
tab.addEventListener('click', function() {
if (feedbackAccordion) {
// Give time for Gradio to update the UI
setTimeout(() => {
feedbackAccordion.style.display = this.classList.contains('selected') && isDetectorTab ? 'block' : 'none';
}, 100);
}
});
});
// Initial setup - make sure feedback is only visible if detector tab is active
const activeTab = document.querySelector('.tabs button.selected');
const activeTabIndex = Array.from(tabs).indexOf(activeTab);
if (activeTabIndex !== 0) { // If not on detector tab
feedbackAccordion.style.display = 'none';
}
// Also create a style rule for safety
const style = document.createElement('style');
style.textContent = `
.tabs[data-testid*="tab"] button:not(:first-child).selected ~ .tabitem #detector-feedback {
display: none !important;
}
`;
document.head.appendChild(style);
}, 300);
}
if (window.gradio_loaded) {
setupAllEnhancements();
} else {
document.addEventListener('DOMContentLoaded', setupAllEnhancements);
}
</script>
<style>
/* Additional styling for tabs */
.tabs button.selected {
background-color: #3f51b5 !important;
color: white !important;
font-weight: 600;
border-bottom: 3px solid #3f51b5;
}
.tabs button:not(.selected):hover {
background-color: #e8eaf6;
}
/* Add animation to tab transitions */
.tabitem {
animation: fadeIn 0.3s ease-in-out;
}
@keyframes fadeIn {
from { opacity: 0; }
to { opacity: 1; }
}
/* Initial setting - show feedback accordion */
#detector-feedback {
display: block !important;
}
/* Hide when in other tabs using IDs */
#model-leaderboard-tab #detector-feedback,
#user-feedback-tab #detector-feedback {
display: none !important;
}
</style>
""")
# Removed duplicate feedback section (moved to above the stats container)
# Hidden state to store results for feedback
hidden_results = gr.State()
# Set up event handlers
submit_button.click(
fn=start_processing,
inputs=[query_input],
outputs=[progress_display, results_accordion, feedback_accordion, hidden_results],
queue=False
).then(
fn=process_query_and_display_results,
inputs=[query_input],
outputs=[progress_display, results_accordion, feedback_accordion, hidden_results]
)
feedback_button.click(
fn=combine_feedback,
inputs=[hallucination_present, judge_correct, feedback_text, hidden_results],
outputs=[feedback_status]
)
# Footer
gr.HTML(
"""<footer><p>Paraphrase-based Approach for Scrutinizing Systems (PAS2) - Advanced Hallucination Detection</p><p>Multiple LLM models tested as generators and judges for optimal hallucination detection</p><p><small>Models in testing: mistral-large, gpt-4o, Qwen3-235B-A22B, grok-3, o4-mini, gemini-2.5-pro, deepseek-r1</small></p></footer>"""
)
return interface
# Add a test function to demonstrate progress bar in isolation
def test_progress():
"""Simple test function to demonstrate progress bar"""
import gradio as gr
import time
def slow_process(progress=gr.Progress()):
progress(0, desc="Starting process...")
time.sleep(0.5)
# Phase 1: Generating paraphrases
progress(0.15, desc="Generating paraphrases...")
time.sleep(1)
progress(0.3, desc="Paraphrases generated")
time.sleep(0.5)
# Phase 2: Getting responses
progress(0.35, desc="Getting responses...")
# Show incremental progress for responses
for i in range(3):
time.sleep(0.8)
prog = 0.35 + (0.3 * ((i+1) / 3))
progress(prog, desc=f"Getting responses ({i+1}/3)...")
progress(0.65, desc="All responses received")
time.sleep(0.5)
# Phase 3: Analyzing
progress(0.7, desc="Analyzing responses for hallucinations...")
time.sleep(2)
# Complete
progress(1.0, desc="Analysis complete!")
return "Process completed successfully!"
with gr.Blocks() as demo:
with gr.Row():
btn = gr.Button("Start Process")
output = gr.Textbox(label="Result")
btn.click(fn=slow_process, outputs=output)
demo.launch()
# Main application entry point
if __name__ == "__main__":
logger.info("Starting PAS2 Hallucination Detector")
interface = create_interface()
logger.info("Launching Gradio interface...")
interface.launch(
server_name="0.0.0.0", # Bind to all interfaces
server_port=7860, # Default Hugging Face Spaces port
show_api=False,
quiet=True, # Changed to True for Hugging Face deployment
share=False,
max_threads=10,
debug=False # Changed to False for production deployment
)
# Uncomment this line to run the test function instead of the main interface
# if __name__ == "__main__":
# test_progress()