import logging from typing import Dict, Any, List, Optional from transformers import pipeline import numpy as np import nltk from nltk.tokenize import sent_tokenize logger = logging.getLogger(__name__) class EvidenceAnalyzer: def __init__(self, use_ai: bool = True, model_registry: Optional[Any] = None): """ Initialize evidence analyzer with LLM and traditional approaches. Args: use_ai: Boolean indicating whether to use AI-powered analysis (True) or traditional analysis (False) model_registry: Optional shared model registry for better performance """ self.use_ai = use_ai self.llm_available = False self.model_registry = model_registry if use_ai: try: if model_registry and model_registry.is_available: # Use shared models self.classifier = model_registry.zero_shot self.llm_available = True logger.info("Using shared model pipeline for evidence analysis") else: # Initialize own pipeline self.classifier = pipeline( "zero-shot-classification", model="facebook/bart-large-mnli", device=-1, batch_size=8 ) self.llm_available = True logger.info("Initialized dedicated model pipeline for evidence analysis") except Exception as e: logger.warning(f"Failed to initialize LLM pipeline: {str(e)}") self.llm_available = False else: logger.info("Initializing evidence analyzer in traditional mode") # Traditional markers for fallback self.citation_markers = [ "according to", "said", "reported", "stated", "shows", "found", "study", "research", "data", "evidence" ] self.vague_markers = [ "some say", "many believe", "people think", "experts claim", "sources say", "it is believed", "reportedly", "allegedly" ] def _analyze_with_llm(self, text: str) -> Dict[str, Any]: """Analyze evidence using LLM.""" try: logger.info("\n" + "="*50) logger.info("EVIDENCE ANALYSIS STARTED") logger.info("="*50) # Clean the text of formatting markers logger.info("Cleaning and preparing text...") cleaned_text = text.replace('$!/$', '').replace('##', '').replace('#', '') cleaned_text = '\n'.join(line for line in cleaned_text.split('\n') if not line.startswith('[') and not line.startswith('More on')) logger.info(f"Text prepared - Length: {len(cleaned_text)} characters") # Download NLTK data if needed try: nltk.data.find('tokenizers/punkt') except LookupError: logger.info("Downloading required NLTK data...") nltk.download('punkt') # Split text into chunks chunks = [cleaned_text[i:i+2000] for i in range(0, len(cleaned_text), 2000)] logger.info(f"Split text into {len(chunks)} chunks for processing") # Categories for evidence classification evidence_categories = [ "factual statement with source", "verifiable claim", "expert opinion", "data-backed claim", "unsubstantiated claim", "opinion statement" ] logger.info("\nUsing evidence categories:") for cat in evidence_categories: logger.info(f" - {cat}") chunk_scores = [] flagged_phrases = [] for i, chunk in enumerate(chunks, 1): logger.info(f"\n{'-'*30}") logger.info(f"Processing chunk {i}/{len(chunks)}") logger.info(f"Chunk length: {len(chunk)} characters") # Analyze each sentence in the chunk sentences = sent_tokenize(chunk) logger.info(f"Found {len(sentences)} sentences to analyze") sentence_count = 0 strong_evidence_count = 0 for sentence in sentences: if len(sentence.strip()) > 10: sentence_count += 1 # Classify the type of evidence result = self.classifier( sentence.strip(), evidence_categories, multi_label=True ) # Calculate evidence score for the sentence evidence_scores = { label: score for label, score in zip(result['labels'], result['scores']) } # Strong evidence indicators strong_evidence = sum([ evidence_scores.get("factual statement with source", 0), evidence_scores.get("data-backed claim", 0), evidence_scores.get("expert opinion", 0) ]) / 3 # Average the strong evidence scores # Weak or no evidence indicators weak_evidence = sum([ evidence_scores.get("unsubstantiated claim", 0), evidence_scores.get("opinion statement", 0) ]) / 2 # Average the weak evidence scores # Store scores for overall calculation chunk_scores.append({ 'strong_evidence': strong_evidence, 'weak_evidence': weak_evidence }) # Flag high-quality evidence if strong_evidence > 0.7 and not any( marker in sentence.lower() for marker in ['more on this story', 'click here', 'read more'] ): strong_evidence_count += 1 logger.info(f"Found strong evidence (score: {strong_evidence:.3f}):") logger.info(f" \"{sentence.strip()}\"") flagged_phrases.append({ 'text': sentence.strip(), 'type': 'strong_evidence', 'score': strong_evidence }) logger.info(f"Processed {sentence_count} sentences in chunk {i}") logger.info(f"Found {strong_evidence_count} sentences with strong evidence") # Calculate overall evidence score logger.info("\nCalculating final evidence scores...") if chunk_scores: avg_strong = np.mean([s['strong_evidence'] for s in chunk_scores]) avg_weak = np.mean([s['weak_evidence'] for s in chunk_scores]) logger.info("Average evidence scores:") logger.info(f" - Strong evidence: {avg_strong:.3f}") logger.info(f" - Weak evidence: {avg_weak:.3f}") # Evidence score formula: # - Reward strong evidence (70% weight) # - Penalize weak/unsubstantiated claims (30% weight) # - Ensure score is between 0 and 100 evidence_score = min(100, ( (avg_strong * 0.7) + ((1 - avg_weak) * 0.3) ) * 100) else: evidence_score = 0 logger.warning("No scores available, defaulting to 0") logger.info(f"Final evidence score: {evidence_score:.1f}") # Sort and select top evidence phrases sorted_phrases = sorted( flagged_phrases, key=lambda x: x['score'], reverse=True ) # Filter out formatting text and duplicates unique_phrases = [] seen = set() for phrase in sorted_phrases: clean_text = phrase['text'].strip() if clean_text not in seen and not any( marker in clean_text.lower() for marker in ['more on this story', 'click here', 'read more'] ): unique_phrases.append(clean_text) seen.add(clean_text) if len(unique_phrases) >= 5: break logger.info(f"\nFlagged {len(unique_phrases)} unique evidence-based phrases") logger.info("\nEvidence analysis completed successfully") return { "evidence_based_score": round(evidence_score, 1), "flagged_phrases": unique_phrases } except Exception as e: logger.error(f"LLM analysis failed: {str(e)}") return None def _analyze_traditional(self, text: str) -> Dict[str, Any]: """Traditional evidence analysis as fallback.""" try: text_lower = text.lower() # Find citations and evidence evidence_phrases = [] for marker in self.citation_markers: index = text_lower.find(marker) while index != -1: # Get the sentence containing the marker start = max(0, text_lower.rfind('.', 0, index) + 1) end = text_lower.find('.', index) if end == -1: end = len(text_lower) evidence_phrases.append(text[start:end].strip()) index = text_lower.find(marker, end) # Count vague references vague_count = sum(1 for marker in self.vague_markers if marker in text_lower) # Calculate score citation_count = len(evidence_phrases) base_score = min(citation_count * 20, 100) penalty = vague_count * 10 evidence_score = max(0, base_score - penalty) return { "evidence_based_score": evidence_score, "flagged_phrases": list(set(evidence_phrases))[:5] # Limit to top 5 unique phrases } except Exception as e: logger.error(f"Traditional analysis failed: {str(e)}") return { "evidence_based_score": 0, "flagged_phrases": [] } def analyze(self, text: str) -> Dict[str, Any]: """Analyze evidence using LLM with fallback to traditional method.""" try: # Try LLM analysis if enabled and available if self.use_ai and self.llm_available: llm_result = self._analyze_with_llm(text) if llm_result: return llm_result # Use traditional analysis logger.info("Using traditional evidence analysis") return self._analyze_traditional(text) except Exception as e: logger.error(f"Error in evidence analysis: {str(e)}") return { "evidence_based_score": 0, "flagged_phrases": [] }