import logging import numpy as np from sklearn.metrics.pairwise import cosine_similarity from datetime import datetime, timedelta import re # Import the centralized NLP model handler from utils.models import get_nlp_model logger = logging.getLogger("misinformation_detector") def extract_entities(text): """Extract named entities from text""" if not text: return [] try: # Use centralized NLP model nlp_model = get_nlp_model() doc = nlp_model(text) entities = [ { "text": ent.text, "label": ent.label_, "start": ent.start_char, "end": ent.end_char } for ent in doc.ents ] return entities except Exception as e: logger.error(f"Error extracting entities: {str(e)}") return [] def get_vector_representation(text): """Get vector representation of text using spaCy""" if not text: return None try: # Use centralized NLP model nlp_model = get_nlp_model() doc = nlp_model(text) # Return document vector if available if doc.has_vector: return doc.vector # Fallback: average of token vectors vectors = [token.vector for token in doc if token.has_vector] if vectors: return np.mean(vectors, axis=0) return None except Exception as e: logger.error(f"Error getting vector representation: {str(e)}") return None def calculate_similarity(text1, text2): """Calculate semantic similarity between two texts""" if not text1 or not text2: return 0.0 try: vec1 = get_vector_representation(text1) vec2 = get_vector_representation(text2) if vec1 is None or vec2 is None: return 0.0 # Reshape vectors for cosine_similarity vec1 = vec1.reshape(1, -1) vec2 = vec2.reshape(1, -1) # Calculate cosine similarity similarity = cosine_similarity(vec1, vec2)[0][0] return float(similarity) except Exception as e: logger.error(f"Error calculating similarity: {str(e)}") return 0.0 def extract_date_from_evidence(evidence_text): """Extract date from evidence text""" if not evidence_text: return None try: # Look for date patterns in text date_patterns = [ r'Date: (\d{4}-\d{2}-\d{2})', # ISO format r'published.*?(\d{1,2}[/-]\d{1,2}[/-]\d{2,4})', # published on MM/DD/YYYY r'(\d{1,2}\s+(?:January|February|March|April|May|June|July|August|September|October|November|December)\s+\d{4})', # DD Month YYYY r'(?:January|February|March|April|May|June|July|August|September|October|November|December)\s+\d{1,2},?\s+\d{4}' # Month DD, YYYY ] for pattern in date_patterns: match = re.search(pattern, evidence_text) if match: date_str = match.group(1) # Parse date string based on format try: if '-' in date_str: return datetime.strptime(date_str, '%Y-%m-%d') elif '/' in date_str or '-' in date_str: formats = ['%m/%d/%Y', '%d/%m/%Y', '%m-%d-%Y', '%d-%m-%Y'] for fmt in formats: try: return datetime.strptime(date_str, fmt) except ValueError: continue else: # Try different month formats formats = ['%d %B %Y', '%B %d, %Y', '%B %d %Y'] for fmt in formats: try: return datetime.strptime(date_str, fmt) except ValueError: continue except Exception: pass return None except Exception as e: logger.error(f"Error extracting date from evidence: {str(e)}") return None def is_temporally_relevant(evidence_text, claim_text, max_days_old=30): """Check if evidence is temporally relevant to the claim""" # Check if claim seems to require recent evidence temporal_terms = ["today", "now", "current", "currently", "recent", "recently", "latest", "just", "this week", "this month", "this year"] requires_recent = any(term in claim_text.lower() for term in temporal_terms) # If claim doesn't specify temporality, consider evidence relevant if not requires_recent: return True # Extract date from evidence date = extract_date_from_evidence(evidence_text) if not date: return True # If we can't determine date, assume it's relevant # Check if evidence is recent enough cutoff = datetime.now() - timedelta(days=max_days_old) return date >= cutoff def has_authority_signal(evidence_text): """Check if evidence contains authority signals""" authority_signals = { "scientific_consensus": ["consensus", "scientists agree", "research shows", "studies confirm", "experts agree"], "fact_check": ["fact check", "rated false", "rated true", "debunked", "confirmed", "verification"], "high_authority": ["nasa", "world health organization", "who", "cdc", "national academy", "oxford", "harvard", "stanford", "mit", "cambridge", "yale", "princeton", "government", "official", "authorities", "minister", "ministry", "department", "administration", "university", "professor"] } evidence_lower = evidence_text.lower() authority_type = None authority_score = 1.0 for signal_type, phrases in authority_signals.items(): if any(phrase in evidence_lower for phrase in phrases): if signal_type == "scientific_consensus": authority_score = 1.8 authority_type = "scientific_consensus" elif signal_type == "fact_check": authority_score = 1.5 authority_type = "fact_check" elif signal_type == "high_authority": authority_score = 1.3 authority_type = "high_authority" break return authority_score, authority_type def analyze_evidence_relevance(claim, evidence_list, source_credibility=None): """ Analyze evidence relevance to claim using semantic similarity with improved handling for claims requiring strong evidence Args: claim (str): The claim being verified evidence_list (list): List of evidence items source_credibility (dict): Dictionary mapping source domains to credibility scores Returns: list: Sorted list of evidence items with relevance scores """ if not evidence_list: return [] # Ensure evidence_list is a list of strings if not isinstance(evidence_list, list): evidence_list = [str(evidence_list)] # Filter out None or empty items evidence_list = [item for item in evidence_list if item] # Check if claim contains strong assertions that would require specific evidence strong_assertion_markers = [ "solved", "cured", "discovered", "breakthrough", "revolutionary", "first ever", "confirmed", "definitive", "conclusive", "proven", "groundbreaking", "unprecedented", "remarkable", "extends lifespan", "extends life", "definitively", "successfully" ] # Determine if claim contains strong assertions claim_has_strong_assertions = any(marker in claim.lower() for marker in strong_assertion_markers) # Log detection result if claim_has_strong_assertions: logger.info(f"Evidence analysis: Detected claim with strong assertions requiring specific evidence") # Extract named entities from claim claim_entities = extract_entities(claim) claim_entity_texts = [entity["text"].lower() for entity in claim_entities] # Process each evidence item analyzed_evidence = [] # Track domains found in evidence to identify source diversity found_domains = set() for evidence in evidence_list: if not isinstance(evidence, str): continue # Calculate semantic similarity similarity = calculate_similarity(claim, evidence) # Check for entity overlap evidence_entities = extract_entities(evidence) evidence_entity_texts = [entity["text"].lower() for entity in evidence_entities] # Calculate entity overlap common_entities = set(claim_entity_texts).intersection(set(evidence_entity_texts)) entity_overlap = len(common_entities) / max(1, len(claim_entity_texts)) # Check temporal relevance temporal_relevance = 1.0 if is_temporally_relevant(evidence, claim): temporal_relevance = 1.2 else: # Penalty for temporally irrelevant evidence temporal_relevance = 0.7 # Check for authority signals authority_score, authority_type = has_authority_signal(evidence) # Extract source from evidence if available source_boost = 1.0 domain = None if source_credibility: # Try to extract domain from URL in evidence domain_match = re.search(r'URL: https?://(?:www\.)?([^/]+)', evidence) if domain_match: domain = domain_match.group(1) # Check if domain or its parent domain is in credibility list for cred_domain, cred_score in source_credibility.items(): if cred_domain in domain: try: source_boost = float(cred_score) break except (ValueError, TypeError): pass # Track this domain for source diversity if domain: found_domains.add(domain) # For claims with strong assertions: check if evidence specifically addresses assertions claim_specificity_match = 1.0 evidence_specificity_match = 1.0 if claim_has_strong_assertions: # Check if evidence provides specific confirmation or contradiction direct_contradiction_terms = [ "not yet", "has not", "have not", "cannot", "can't", "doesn't", "don't", "unlikely", "challenging", "remains a challenge", "in the future", "experimental", "in development", "proposed", "theoretical", "preliminary", "hypothesized", "potential", "promising but" ] # Check for contradictions to strong assertions if any(term in evidence.lower() for term in direct_contradiction_terms): # This evidence likely contradicts the strong assertion evidence_specificity_match = 2.0 # Boost relevance of contradicting evidence logger.debug(f"Found contradiction to strong assertion in evidence") # For claims with strong assertions, check if evidence specifically confirms direct_confirmation_terms = [ "successfully demonstrated", "breakthrough", "solved", "cured", "confirmed", "definitive evidence", "conclusive results", "proven", "revolutionary results", "milestone achievement", "groundbreaking results" ] # If evidence confirms the strong assertion, adjust relevance if any(term in evidence.lower() for term in direct_confirmation_terms): # Apply higher scoring for evidence that specifically confirms evidence_specificity_match = 1.8 logger.debug(f"Found confirmation of strong assertion in evidence") # For claims with strong assertions, check for high-quality sources high_quality_source_markers = [ "journal", "doi.org", "research", "university", "institute", "laboratory", "professor", "study", "publication", "published in" ] is_high_quality = any(term in evidence.lower() for term in high_quality_source_markers) quality_boost = 1.4 if is_high_quality else 1.0 # Apply the quality boost source_boost *= quality_boost # Calculate final relevance score with improvements for all claim types if claim_has_strong_assertions: relevance_score = ( (similarity * 0.35) + # Semantic similarity (entity_overlap * 0.25) + # Entity overlap (0.25) # Base value to ensure all evidence has some relevance ) * temporal_relevance * authority_score * source_boost * claim_specificity_match * evidence_specificity_match else: # Original formula for regular claims relevance_score = ( (similarity * 0.4) + # Semantic similarity (entity_overlap * 0.3) + # Entity overlap (0.3) # Base value to ensure all evidence has some relevance ) * temporal_relevance * authority_score * source_boost # Add metadata and relevance score analyzed_evidence.append({ "text": evidence, "relevance_score": relevance_score, "similarity": similarity, "entity_overlap": entity_overlap, "temporal_relevance": temporal_relevance, "authority_score": authority_score, "authority_type": authority_type, "source_boost": source_boost, "domain": domain }) # Sort by relevance score (descending) analyzed_evidence.sort(key=lambda x: x["relevance_score"], reverse=True) # Ensure we have diverse sources in top results for all claims if len(found_domains) > 1: # Try to promote evidence from reliable sources if we haven't selected any yet reliable_sources_seen = False # Check if top 3 results contain any reliable sources for item in analyzed_evidence[:3]: domain = item.get("domain", "") if domain and source_credibility and any(cred_domain in domain for cred_domain in source_credibility): reliable_sources_seen = True break # If no reliable sources in top results, promote one if available if not reliable_sources_seen: for i, item in enumerate(analyzed_evidence[3:]): domain = item.get("domain", "") if domain and source_credibility and any(cred_domain in domain for cred_domain in source_credibility): # Swap this item into the top 3 analyzed_evidence.insert(2, analyzed_evidence.pop(i+3)) break return analyzed_evidence def select_diverse_evidence(analyzed_evidence, max_items=5): """ Select diverse evidence items based on relevance, source diversity and claim characteristics Args: analyzed_evidence (list): List of evidence items with relevance scores max_items (int): Maximum number of evidence items to return Returns: list: Selected diverse evidence items """ if not analyzed_evidence: return [] # Check if top evidence suggests claim has strong assertions strong_assertion_markers = [ "solved", "cured", "discovered", "breakthrough", "revolutionary", "first ever", "confirmed", "definitive", "conclusive", "proven", "groundbreaking", "unprecedented", "extends lifespan", "definitively" ] # Determine if this is a claim with strong assertions by checking evidence text has_strong_assertions = False for item in analyzed_evidence[:3]: # Check just the top items for efficiency if "text" in item: item_text = item["text"].lower() if any(f"claim {marker}" in item_text or f"claim has {marker}" in item_text for marker in strong_assertion_markers): has_strong_assertions = True break # Also check for contradiction markers in evidence which can indicate a strong assertion contradiction_markers = [ "not yet solved", "hasn't been proven", "no evidence that", "remains unsolved", "has not been confirmed", "remains theoretical" ] if not has_strong_assertions: for item in analyzed_evidence[:3]: if "text" in item: item_text = item["text"].lower() if any(marker in item_text for marker in contradiction_markers): has_strong_assertions = True break # Ensure we don't select more than available max_items = min(max_items, len(analyzed_evidence)) # Initialize selected items with the most relevant item selected = [analyzed_evidence[0]] remaining = analyzed_evidence[1:] # Track sources to ensure diversity selected_sources = set() for item in selected: # Try to extract source from evidence source_match = re.search(r'Source: ([^,]+)', item["text"]) if source_match: selected_sources.add(source_match.group(1)) # For all claims, track if we have high-quality sources yet has_quality_source = False quality_source_markers = ["journal", "doi.org", "research", "university", "institute", "laboratory", "professor", "study"] # Check if our top item is already from a quality source if any(marker in selected[0]["text"].lower() for marker in quality_source_markers): has_quality_source = True # Select remaining items balancing relevance and diversity while len(selected) < max_items and remaining: best_item = None best_score = -1 for i, item in enumerate(remaining): # Base score is the item's relevance score = item["relevance_score"] # Extract source if available source = None source_match = re.search(r'Source: ([^,]+)', item["text"]) if source_match: source = source_match.group(1) # Apply diversity bonus if source is new if source and source not in selected_sources: score *= 1.2 # Diversity bonus # For claims with strong assertions, apply bonus for contradicting evidence if has_strong_assertions: # Check for contradiction markers in the text if any(marker in item["text"].lower() for marker in contradiction_markers): score *= 1.3 # Bonus for evidence that may contradict strong assertions # For any claim, apply bonus for high-quality sources if we don't have one yet if not has_quality_source: is_item_quality = any(marker in item["text"].lower() for marker in quality_source_markers) if is_item_quality: score *= 1.5 # Significant bonus for quality sources if score > best_score: best_score = score best_item = (i, item) if best_item: idx, item = best_item selected.append(item) remaining.pop(idx) # Add source to selected sources source_match = re.search(r'Source: ([^,]+)', item["text"]) if source_match: selected_sources.add(source_match.group(1)) # Check if we found a quality source if not has_quality_source: if any(marker in item["text"].lower() for marker in quality_source_markers): has_quality_source = True else: break # For any claim with strong assertions, ensure we have at least one quality source if available if has_strong_assertions and not has_quality_source and remaining: for i, item in enumerate(remaining): if any(marker in item["text"].lower() for marker in quality_source_markers): # Replace the least relevant selected item with this quality one selected.sort(key=lambda x: x["relevance_score"]) selected[0] = item break # Return only the text portion return [item["text"] for item in selected]