import os import gradio as gr import pandas as pd from datetime import datetime from pydantic import BaseModel, Field from typing import List, Dict, Any, Optional import numpy as np from mistralai import Mistral from openai import OpenAI import re import json import logging import time import concurrent.futures from concurrent.futures import ThreadPoolExecutor import threading import sqlite3 # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s', handlers=[ logging.StreamHandler() ] ) logger = logging.getLogger(__name__) class HallucinationJudgment(BaseModel): hallucination_detected: bool = Field(description="Whether a hallucination is detected across the responses") confidence_score: float = Field(description="Confidence score between 0-1 for the hallucination judgment") conflicting_facts: List[Dict[str, Any]] = Field(description="List of conflicting facts found in the responses") reasoning: str = Field(description="Detailed reasoning for the judgment") summary: str = Field(description="A summary of the analysis") class PAS2: """Paraphrase-based Approach for Scrutinizing Systems - Using model-as-judge""" def __init__(self, mistral_api_key=None, openai_api_key=None, progress_callback=None): """Initialize the PAS2 with API keys""" # For Hugging Face Spaces, we prioritize getting API keys from HF_* environment variables # which are set from the Secrets tab in the Space settings self.mistral_api_key = mistral_api_key or os.environ.get("HF_MISTRAL_API_KEY") or os.environ.get("MISTRAL_API_KEY") self.openai_api_key = openai_api_key or os.environ.get("HF_OPENAI_API_KEY") or os.environ.get("OPENAI_API_KEY") self.progress_callback = progress_callback if not self.mistral_api_key: raise ValueError("Mistral API key is required. Set it via HF_MISTRAL_API_KEY in Hugging Face Spaces secrets or pass it as a parameter.") if not self.openai_api_key: raise ValueError("OpenAI API key is required. Set it via HF_OPENAI_API_KEY in Hugging Face Spaces secrets or pass it as a parameter.") self.mistral_client = Mistral(api_key=self.mistral_api_key) self.openai_client = OpenAI(api_key=self.openai_api_key) self.mistral_model = "mistral-large-latest" self.openai_model = "o3-mini" logger.info("PAS2 initialized with Mistral model: %s and OpenAI model: %s", self.mistral_model, self.openai_model) def generate_paraphrases(self, query: str, n_paraphrases: int = 3) -> List[str]: """Generate paraphrases of the input query using Mistral API""" logger.info("Generating %d paraphrases for query: %s", n_paraphrases, query) start_time = time.time() messages = [ { "role": "system", "content": f"You are an expert at creating semantically equivalent paraphrases. Generate {n_paraphrases} different paraphrases of the given query that preserve the original meaning but vary in wording and structure. Return a JSON array of strings, each containing one paraphrase." }, { "role": "user", "content": query } ] try: logger.info("Sending paraphrase generation request to Mistral API...") response = self.mistral_client.chat.complete( model=self.mistral_model, messages=messages, response_format={"type": "json_object"} ) content = response.choices[0].message.content logger.debug("Received raw paraphrase response: %s", content) paraphrases_data = json.loads(content) # Handle different possible JSON structures if isinstance(paraphrases_data, dict) and "paraphrases" in paraphrases_data: paraphrases = paraphrases_data["paraphrases"] elif isinstance(paraphrases_data, dict) and "results" in paraphrases_data: paraphrases = paraphrases_data["results"] elif isinstance(paraphrases_data, list): paraphrases = paraphrases_data else: # Try to extract a list from any field for key, value in paraphrases_data.items(): if isinstance(value, list) and len(value) > 0: paraphrases = value break else: logger.warning("Could not extract paraphrases from response: %s", content) raise ValueError(f"Could not extract paraphrases from response: {content}") # Ensure we have the right number of paraphrases paraphrases = paraphrases[:n_paraphrases] # Add the original query as the first item all_queries = [query] + paraphrases elapsed_time = time.time() - start_time logger.info("Generated %d paraphrases in %.2f seconds", len(paraphrases), elapsed_time) for i, p in enumerate(paraphrases, 1): logger.info("Paraphrase %d: %s", i, p) return all_queries except Exception as e: logger.error("Error generating paraphrases: %s", str(e), exc_info=True) # Return original plus simple paraphrases as fallback fallback_paraphrases = [ query, f"Could you tell me about {query.strip('?')}?", f"I'd like to know: {query}", f"Please provide information on {query.strip('?')}." ][:n_paraphrases+1] logger.info("Using fallback paraphrases due to error") for i, p in enumerate(fallback_paraphrases[1:], 1): logger.info("Fallback paraphrase %d: %s", i, p) return fallback_paraphrases def _get_single_response(self, query: str, index: int = None) -> str: """Get a single response from Mistral API for a query""" try: query_description = f"Query {index}: {query}" if index is not None else f"Query: {query}" logger.info("Getting response for %s", query_description) start_time = time.time() messages = [ { "role": "system", "content": "You are a helpful AI assistant. Provide accurate, factual information in response to questions." }, { "role": "user", "content": query } ] response = self.mistral_client.chat.complete( model=self.mistral_model, messages=messages ) result = response.choices[0].message.content elapsed_time = time.time() - start_time logger.info("Received response for %s (%.2f seconds)", query_description, elapsed_time) logger.debug("Response content for %s: %s", query_description, result[:100] + "..." if len(result) > 100 else result) return result except Exception as e: error_msg = f"Error getting response for query '{query}': {e}" logger.error(error_msg, exc_info=True) return f"Error: Failed to get response for this query." def get_responses(self, queries: List[str]) -> List[str]: """Get responses from Mistral API for each query in parallel""" logger.info("Getting responses for %d queries in parallel", len(queries)) start_time = time.time() # Use ThreadPoolExecutor for parallel API calls with ThreadPoolExecutor(max_workers=min(len(queries), 5)) as executor: # Submit tasks and map them to their original indices future_to_index = { executor.submit(self._get_single_response, query, i): i for i, query in enumerate(queries) } # Prepare a list with the correct length responses = [""] * len(queries) # Counter for completed responses completed_count = 0 # Collect results as they complete for future in concurrent.futures.as_completed(future_to_index): index = future_to_index[future] try: responses[index] = future.result() # Update completion count and report progress completed_count += 1 if self.progress_callback: self.progress_callback("responses_progress", completed_responses=completed_count, total_responses=len(queries)) except Exception as e: logger.error("Error processing response for index %d: %s", index, str(e)) responses[index] = f"Error: Failed to get response for query {index}." # Still update completion count even for errors completed_count += 1 if self.progress_callback: self.progress_callback("responses_progress", completed_responses=completed_count, total_responses=len(queries)) elapsed_time = time.time() - start_time logger.info("Received all %d responses in %.2f seconds total", len(responses), elapsed_time) return responses def detect_hallucination(self, query: str, n_paraphrases: int = 3) -> Dict: """ Detect hallucinations by comparing responses to paraphrased queries using a judge model Returns: Dict containing hallucination judgment and all responses """ logger.info("Starting hallucination detection for query: %s", query) start_time = time.time() # Report progress if self.progress_callback: self.progress_callback("starting", query=query) # Generate paraphrases logger.info("Step 1: Generating paraphrases") if self.progress_callback: self.progress_callback("generating_paraphrases", query=query) all_queries = self.generate_paraphrases(query, n_paraphrases) if self.progress_callback: self.progress_callback("paraphrases_complete", query=query, count=len(all_queries)) # Get responses to all queries logger.info("Step 2: Getting responses to all %d queries", len(all_queries)) if self.progress_callback: self.progress_callback("getting_responses", query=query, total=len(all_queries)) all_responses = [] for i, q in enumerate(all_queries): logger.info("Getting response %d/%d for query: %s", i+1, len(all_queries), q) if self.progress_callback: self.progress_callback("responses_progress", query=query, completed=i, total=len(all_queries)) response = self._get_single_response(q, index=i) all_responses.append(response) if self.progress_callback: self.progress_callback("responses_complete", query=query) # Judge the responses for hallucinations logger.info("Step 3: Judging for hallucinations") if self.progress_callback: self.progress_callback("judging", query=query) # The first query is the original, rest are paraphrases original_query = all_queries[0] original_response = all_responses[0] paraphrased_queries = all_queries[1:] if len(all_queries) > 1 else [] paraphrased_responses = all_responses[1:] if len(all_responses) > 1 else [] # Judge the responses judgment = self.judge_hallucination( original_query=original_query, original_response=original_response, paraphrased_queries=paraphrased_queries, paraphrased_responses=paraphrased_responses ) # Assemble the results results = { "original_query": original_query, "original_response": original_response, "paraphrased_queries": paraphrased_queries, "paraphrased_responses": paraphrased_responses, "hallucination_detected": judgment.hallucination_detected, "confidence_score": judgment.confidence_score, "conflicting_facts": judgment.conflicting_facts, "reasoning": judgment.reasoning, "summary": judgment.summary } # Report completion if self.progress_callback: self.progress_callback("complete", query=query) logger.info("Hallucination detection completed in %.2f seconds", time.time() - start_time) return results def judge_hallucination(self, original_query: str, original_response: str, paraphrased_queries: List[str], paraphrased_responses: List[str]) -> HallucinationJudgment: """ Use OpenAI's o3-mini as a judge to detect hallucinations in the responses """ logger.info("Judging hallucinations with OpenAI's %s model", self.openai_model) start_time = time.time() # Prepare the context for the judge context = f""" Original Question: {original_query} Original Response: {original_response} Paraphrased Questions and their Responses: """ for i, (query, response) in enumerate(zip(paraphrased_queries, paraphrased_responses), 1): context += f"\nParaphrased Question {i}: {query}\n\nResponse {i}:\n{response}\n" system_prompt = """ You are a judge evaluating whether an AI is hallucinating across different responses to semantically equivalent questions. Analyze all responses carefully to identify any factual inconsistencies or contradictions. Focus on factual discrepancies, not stylistic differences. A hallucination is when the AI states different facts in response to questions that are asking for the same information. Your response should be a JSON with the following fields: - hallucination_detected: boolean indicating whether hallucinations were found - confidence_score: number between 0 and 1 representing your confidence in the judgment - conflicting_facts: an array of objects describing any conflicting information found - reasoning: detailed explanation for your judgment - summary: a concise summary of your analysis """ try: logger.info("Sending judgment request to OpenAI API...") response = self.openai_client.chat.completions.create( model=self.openai_model, messages=[ {"role": "system", "content": system_prompt}, {"role": "user", "content": f"Evaluate these responses for hallucinations:\n\n{context}"} ], response_format={"type": "json_object"} ) result_json = json.loads(response.choices[0].message.content) logger.debug("Received judgment response: %s", result_json) # Create the HallucinationJudgment object from the JSON response judgment = HallucinationJudgment( hallucination_detected=result_json.get("hallucination_detected", False), confidence_score=result_json.get("confidence_score", 0.0), conflicting_facts=result_json.get("conflicting_facts", []), reasoning=result_json.get("reasoning", "No reasoning provided."), summary=result_json.get("summary", "No summary provided.") ) elapsed_time = time.time() - start_time logger.info("Judgment completed in %.2f seconds", elapsed_time) return judgment except Exception as e: logger.error("Error in hallucination judgment: %s", str(e), exc_info=True) # Return a fallback judgment return HallucinationJudgment( hallucination_detected=False, confidence_score=0.0, conflicting_facts=[], reasoning="Failed to obtain judgment from the model.", summary="Analysis failed due to API error." ) class HallucinationDetectorApp: def __init__(self): self.pas2 = None # Use the default HF Spaces persistent storage location self.data_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "data") self.db_path = os.path.join(self.data_dir, "feedback.db") logger.info("Initializing HallucinationDetectorApp") self._initialize_database() self.progress_callback = None def _initialize_database(self): """Initialize SQLite database for feedback storage in persistent directory""" try: # Create data directory if it doesn't exist os.makedirs(self.data_dir, exist_ok=True) logger.info(f"Ensuring data directory exists at {self.data_dir}") conn = sqlite3.connect(self.db_path) cursor = conn.cursor() # Create table if it doesn't exist cursor.execute(''' CREATE TABLE IF NOT EXISTS feedback ( id INTEGER PRIMARY KEY AUTOINCREMENT, timestamp TEXT, original_query TEXT, original_response TEXT, paraphrased_queries TEXT, paraphrased_responses TEXT, hallucination_detected INTEGER, confidence_score REAL, conflicting_facts TEXT, reasoning TEXT, summary TEXT, user_feedback TEXT ) ''') conn.commit() conn.close() logger.info(f"Database initialized successfully at {self.db_path}") except Exception as e: logger.error(f"Error initializing database: {str(e)}", exc_info=True) # Fallback to temporary directory if /data is not accessible temp_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "temp_data") os.makedirs(temp_dir, exist_ok=True) self.db_path = os.path.join(temp_dir, "feedback.db") logger.warning(f"Using fallback database location: {self.db_path}") # Try creating database in fallback location try: conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute(''' CREATE TABLE IF NOT EXISTS feedback ( id INTEGER PRIMARY KEY AUTOINCREMENT, timestamp TEXT, original_query TEXT, original_response TEXT, paraphrased_queries TEXT, paraphrased_responses TEXT, hallucination_detected INTEGER, confidence_score REAL, conflicting_facts TEXT, reasoning TEXT, summary TEXT, user_feedback TEXT ) ''') conn.commit() conn.close() logger.info(f"Database initialized in fallback location") except Exception as fallback_error: logger.error(f"Critical error: Could not initialize database in fallback location: {str(fallback_error)}", exc_info=True) raise def set_progress_callback(self, callback): """Set the progress callback function""" self.progress_callback = callback def initialize_api(self, mistral_api_key, openai_api_key): """Initialize the PAS2 with API keys""" try: logger.info("Initializing PAS2 with API keys") self.pas2 = PAS2( mistral_api_key=mistral_api_key, openai_api_key=openai_api_key, progress_callback=self.progress_callback ) logger.info("API initialization successful") return "API keys set successfully! You can now use the application." except Exception as e: logger.error("Error initializing API: %s", str(e), exc_info=True) return f"Error initializing API: {str(e)}" def process_query(self, query: str): """Process the query using PAS2""" if not self.pas2: logger.error("PAS2 not initialized") return { "error": "Please set API keys first before processing queries." } if not query.strip(): logger.warning("Empty query provided") return { "error": "Please enter a query." } try: # Set the progress callback if needed if self.progress_callback and self.pas2.progress_callback != self.progress_callback: self.pas2.progress_callback = self.progress_callback # Process the query logger.info("Processing query with PAS2: %s", query) results = self.pas2.detect_hallucination(query) logger.info("Query processing completed successfully") return results except Exception as e: logger.error("Error processing query: %s", str(e), exc_info=True) return { "error": f"Error processing query: {str(e)}" } def save_feedback(self, results, feedback): """Save results and user feedback to SQLite database""" try: logger.info("Saving user feedback: %s", feedback) conn = sqlite3.connect(self.db_path) cursor = conn.cursor() # Prepare data data = ( datetime.now().strftime("%Y-%m-%d %H:%M:%S"), results.get('original_query', ''), results.get('original_response', ''), str(results.get('paraphrased_queries', [])), str(results.get('paraphrased_responses', [])), 1 if results.get('hallucination_detected', False) else 0, results.get('confidence_score', 0.0), str(results.get('conflicting_facts', [])), results.get('reasoning', ''), results.get('summary', ''), feedback ) # Insert data cursor.execute(''' INSERT INTO feedback ( timestamp, original_query, original_response, paraphrased_queries, paraphrased_responses, hallucination_detected, confidence_score, conflicting_facts, reasoning, summary, user_feedback ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ''', data) conn.commit() conn.close() logger.info("Feedback saved successfully to database") return "Feedback saved successfully!" except Exception as e: logger.error("Error saving feedback: %s", str(e), exc_info=True) return f"Error saving feedback: {str(e)}" def get_feedback_stats(self): """Get statistics about collected feedback""" try: conn = sqlite3.connect(self.db_path) cursor = conn.cursor() # Get total feedback count cursor.execute("SELECT COUNT(*) FROM feedback") total_count = cursor.fetchone()[0] # Get hallucination detection stats cursor.execute(""" SELECT hallucination_detected, COUNT(*) FROM feedback GROUP BY hallucination_detected """) detection_stats = dict(cursor.fetchall()) # Get average confidence score cursor.execute("SELECT AVG(confidence_score) FROM feedback") avg_confidence = cursor.fetchone()[0] or 0 conn.close() return { "total_feedback": total_count, "hallucinations_detected": detection_stats.get(1, 0), "no_hallucinations": detection_stats.get(0, 0), "average_confidence": round(avg_confidence, 2) } except Exception as e: logger.error("Error getting feedback stats: %s", str(e), exc_info=True) return None # Progress tracking for UI updates class ProgressTracker: """Tracks progress of hallucination detection for UI updates""" STAGES = { "idle": {"status": "Ready", "progress": 0, "color": "#757575"}, "starting": {"status": "Starting process...", "progress": 5, "color": "#2196F3"}, "generating_paraphrases": {"status": "Generating paraphrases...", "progress": 15, "color": "#2196F3"}, "paraphrases_complete": {"status": "Paraphrases generated", "progress": 30, "color": "#2196F3"}, "getting_responses": {"status": "Getting responses (0/0)...", "progress": 35, "color": "#2196F3"}, "responses_progress": {"status": "Getting responses ({completed}/{total})...", "progress": 40, "color": "#2196F3"}, "responses_complete": {"status": "All responses received", "progress": 65, "color": "#2196F3"}, "judging": {"status": "Analyzing responses for hallucinations...", "progress": 70, "color": "#2196F3"}, "complete": {"status": "Analysis complete!", "progress": 100, "color": "#4CAF50"}, "error": {"status": "Error: {error_message}", "progress": 100, "color": "#F44336"} } def __init__(self): self.stage = "idle" self.stage_data = self.STAGES[self.stage].copy() self.query = "" self.completed_responses = 0 self.total_responses = 0 self.error_message = "" self._lock = threading.Lock() self._status_callback = None self._stop_event = threading.Event() self._update_thread = None def register_callback(self, callback_fn): """Register callback function to update UI""" self._status_callback = callback_fn def update_stage(self, stage, **kwargs): """Update the current stage and trigger callback""" with self._lock: if stage in self.STAGES: self.stage = stage self.stage_data = self.STAGES[stage].copy() # Update with any additional parameters for key, value in kwargs.items(): if key == 'query': self.query = value elif key == 'completed_responses': self.completed_responses = value elif key == 'total_responses': self.total_responses = value elif key == 'error_message': self.error_message = value # Format status message if stage == 'responses_progress': self.stage_data['status'] = self.stage_data['status'].format( completed=self.completed_responses, total=self.total_responses ) elif stage == 'error': self.stage_data['status'] = self.stage_data['status'].format( error_message=self.error_message ) if self._status_callback: self._status_callback(self.get_html_status()) def get_html_status(self): """Get HTML representation of current status""" progress_width = f"{self.stage_data['progress']}%" status_text = self.stage_data['status'] color = self.stage_data['color'] query_info = f'
{summary}
Reasoning:
{reasoning_safe}
Conflicting Facts:
{conflicting_facts_text_safe}
This tool detects hallucinations in AI responses by comparing answers to semantically equivalent questions and using a specialized judge model.