Spaces:
Running
Running
import logging | |
import numpy as np | |
from sklearn.metrics.pairwise import cosine_similarity | |
from datetime import datetime, timedelta | |
import re | |
# Import the centralized NLP model handler | |
from utils.models import get_nlp_model | |
logger = logging.getLogger("misinformation_detector") | |
def extract_entities(text): | |
"""Extract named entities from text""" | |
if not text: | |
return [] | |
try: | |
# Use centralized NLP model | |
nlp_model = get_nlp_model() | |
doc = nlp_model(text) | |
entities = [ | |
{ | |
"text": ent.text, | |
"label": ent.label_, | |
"start": ent.start_char, | |
"end": ent.end_char | |
} | |
for ent in doc.ents | |
] | |
return entities | |
except Exception as e: | |
logger.error(f"Error extracting entities: {str(e)}") | |
return [] | |
def get_vector_representation(text): | |
"""Get vector representation of text using spaCy""" | |
if not text: | |
return None | |
try: | |
# Use centralized NLP model | |
nlp_model = get_nlp_model() | |
doc = nlp_model(text) | |
# Return document vector if available | |
if doc.has_vector: | |
return doc.vector | |
# Fallback: average of token vectors | |
vectors = [token.vector for token in doc if token.has_vector] | |
if vectors: | |
return np.mean(vectors, axis=0) | |
return None | |
except Exception as e: | |
logger.error(f"Error getting vector representation: {str(e)}") | |
return None | |
def calculate_similarity(text1, text2): | |
"""Calculate semantic similarity between two texts""" | |
if not text1 or not text2: | |
return 0.0 | |
try: | |
vec1 = get_vector_representation(text1) | |
vec2 = get_vector_representation(text2) | |
if vec1 is None or vec2 is None: | |
return 0.0 | |
# Reshape vectors for cosine_similarity | |
vec1 = vec1.reshape(1, -1) | |
vec2 = vec2.reshape(1, -1) | |
# Calculate cosine similarity | |
similarity = cosine_similarity(vec1, vec2)[0][0] | |
return float(similarity) | |
except Exception as e: | |
logger.error(f"Error calculating similarity: {str(e)}") | |
return 0.0 | |
def extract_date_from_evidence(evidence_text): | |
"""Extract date from evidence text""" | |
if not evidence_text: | |
return None | |
try: | |
# Look for date patterns in text | |
date_patterns = [ | |
r'Date: (\d{4}-\d{2}-\d{2})', # ISO format | |
r'published.*?(\d{1,2}[/-]\d{1,2}[/-]\d{2,4})', # published on MM/DD/YYYY | |
r'(\d{1,2}\s+(?:January|February|March|April|May|June|July|August|September|October|November|December)\s+\d{4})', # DD Month YYYY | |
r'(?:January|February|March|April|May|June|July|August|September|October|November|December)\s+\d{1,2},?\s+\d{4}' # Month DD, YYYY | |
] | |
for pattern in date_patterns: | |
match = re.search(pattern, evidence_text) | |
if match: | |
date_str = match.group(1) | |
# Parse date string based on format | |
try: | |
if '-' in date_str: | |
return datetime.strptime(date_str, '%Y-%m-%d') | |
elif '/' in date_str or '-' in date_str: | |
formats = ['%m/%d/%Y', '%d/%m/%Y', '%m-%d-%Y', '%d-%m-%Y'] | |
for fmt in formats: | |
try: | |
return datetime.strptime(date_str, fmt) | |
except ValueError: | |
continue | |
else: | |
# Try different month formats | |
formats = ['%d %B %Y', '%B %d, %Y', '%B %d %Y'] | |
for fmt in formats: | |
try: | |
return datetime.strptime(date_str, fmt) | |
except ValueError: | |
continue | |
except Exception: | |
pass | |
return None | |
except Exception as e: | |
logger.error(f"Error extracting date from evidence: {str(e)}") | |
return None | |
def is_temporally_relevant(evidence_text, claim_text, max_days_old=30): | |
"""Check if evidence is temporally relevant to the claim""" | |
# Check if claim seems to require recent evidence | |
temporal_terms = ["today", "now", "current", "currently", "recent", "recently", "latest", "just", "this week", "this month", "this year"] | |
requires_recent = any(term in claim_text.lower() for term in temporal_terms) | |
# If claim doesn't specify temporality, consider evidence relevant | |
if not requires_recent: | |
return True | |
# Extract date from evidence | |
date = extract_date_from_evidence(evidence_text) | |
if not date: | |
return True # If we can't determine date, assume it's relevant | |
# Check if evidence is recent enough | |
cutoff = datetime.now() - timedelta(days=max_days_old) | |
return date >= cutoff | |
def has_authority_signal(evidence_text): | |
"""Check if evidence contains authority signals""" | |
authority_signals = { | |
"scientific_consensus": ["consensus", "scientists agree", "research shows", "studies confirm", "experts agree"], | |
"fact_check": ["fact check", "rated false", "rated true", "debunked", "confirmed", "verification"], | |
"high_authority": ["nasa", "world health organization", "who", "cdc", "national academy", | |
"oxford", "harvard", "stanford", "mit", "cambridge", "yale", | |
"princeton", "government", "official", "authorities", "minister", | |
"ministry", "department", "administration", "university", "professor"] | |
} | |
evidence_lower = evidence_text.lower() | |
authority_type = None | |
authority_score = 1.0 | |
for signal_type, phrases in authority_signals.items(): | |
if any(phrase in evidence_lower for phrase in phrases): | |
if signal_type == "scientific_consensus": | |
authority_score = 1.8 | |
authority_type = "scientific_consensus" | |
elif signal_type == "fact_check": | |
authority_score = 1.5 | |
authority_type = "fact_check" | |
elif signal_type == "high_authority": | |
authority_score = 1.3 | |
authority_type = "high_authority" | |
break | |
return authority_score, authority_type | |
def analyze_evidence_relevance(claim, evidence_list, source_credibility=None): | |
""" | |
Analyze evidence relevance to claim using semantic similarity with improved handling | |
for claims requiring strong evidence | |
Args: | |
claim (str): The claim being verified | |
evidence_list (list): List of evidence items | |
source_credibility (dict): Dictionary mapping source domains to credibility scores | |
Returns: | |
list: Sorted list of evidence items with relevance scores | |
""" | |
if not evidence_list: | |
return [] | |
# Ensure evidence_list is a list of strings | |
if not isinstance(evidence_list, list): | |
evidence_list = [str(evidence_list)] | |
# Filter out None or empty items | |
evidence_list = [item for item in evidence_list if item] | |
# Check if claim contains strong assertions that would require specific evidence | |
strong_assertion_markers = [ | |
"solved", "cured", "discovered", "breakthrough", "revolutionary", | |
"first ever", "confirmed", "definitive", "conclusive", "proven", | |
"groundbreaking", "unprecedented", "remarkable", "extends lifespan", | |
"extends life", "definitively", "successfully" | |
] | |
# Determine if claim contains strong assertions | |
claim_has_strong_assertions = any(marker in claim.lower() for marker in strong_assertion_markers) | |
# Log detection result | |
if claim_has_strong_assertions: | |
logger.info(f"Evidence analysis: Detected claim with strong assertions requiring specific evidence") | |
# Extract named entities from claim | |
claim_entities = extract_entities(claim) | |
claim_entity_texts = [entity["text"].lower() for entity in claim_entities] | |
# Process each evidence item | |
analyzed_evidence = [] | |
# Track domains found in evidence to identify source diversity | |
found_domains = set() | |
for evidence in evidence_list: | |
if not isinstance(evidence, str): | |
continue | |
# Calculate semantic similarity | |
similarity = calculate_similarity(claim, evidence) | |
# Check for entity overlap | |
evidence_entities = extract_entities(evidence) | |
evidence_entity_texts = [entity["text"].lower() for entity in evidence_entities] | |
# Calculate entity overlap | |
common_entities = set(claim_entity_texts).intersection(set(evidence_entity_texts)) | |
entity_overlap = len(common_entities) / max(1, len(claim_entity_texts)) | |
# Check temporal relevance | |
temporal_relevance = 1.0 | |
if is_temporally_relevant(evidence, claim): | |
temporal_relevance = 1.2 | |
else: | |
# Penalty for temporally irrelevant evidence | |
temporal_relevance = 0.7 | |
# Check for authority signals | |
authority_score, authority_type = has_authority_signal(evidence) | |
# Extract source from evidence if available | |
source_boost = 1.0 | |
domain = None | |
if source_credibility: | |
# Try to extract domain from URL in evidence | |
domain_match = re.search(r'URL: https?://(?:www\.)?([^/]+)', evidence) | |
if domain_match: | |
domain = domain_match.group(1) | |
# Check if domain or its parent domain is in credibility list | |
for cred_domain, cred_score in source_credibility.items(): | |
if cred_domain in domain: | |
try: | |
source_boost = float(cred_score) | |
break | |
except (ValueError, TypeError): | |
pass | |
# Track this domain for source diversity | |
if domain: | |
found_domains.add(domain) | |
# For claims with strong assertions: check if evidence specifically addresses assertions | |
claim_specificity_match = 1.0 | |
evidence_specificity_match = 1.0 | |
if claim_has_strong_assertions: | |
# Check if evidence provides specific confirmation or contradiction | |
direct_contradiction_terms = [ | |
"not yet", "has not", "have not", "cannot", "can't", "doesn't", "don't", | |
"unlikely", "challenging", "remains a challenge", "in the future", | |
"experimental", "in development", "proposed", "theoretical", | |
"preliminary", "hypothesized", "potential", "promising but" | |
] | |
# Check for contradictions to strong assertions | |
if any(term in evidence.lower() for term in direct_contradiction_terms): | |
# This evidence likely contradicts the strong assertion | |
evidence_specificity_match = 2.0 # Boost relevance of contradicting evidence | |
logger.debug(f"Found contradiction to strong assertion in evidence") | |
# For claims with strong assertions, check if evidence specifically confirms | |
direct_confirmation_terms = [ | |
"successfully demonstrated", "breakthrough", "solved", "cured", | |
"confirmed", "definitive evidence", "conclusive results", "proven", | |
"revolutionary results", "milestone achievement", "groundbreaking results" | |
] | |
# If evidence confirms the strong assertion, adjust relevance | |
if any(term in evidence.lower() for term in direct_confirmation_terms): | |
# Apply higher scoring for evidence that specifically confirms | |
evidence_specificity_match = 1.8 | |
logger.debug(f"Found confirmation of strong assertion in evidence") | |
# For claims with strong assertions, check for high-quality sources | |
high_quality_source_markers = [ | |
"journal", "doi.org", "research", "university", "institute", | |
"laboratory", "professor", "study", "publication", "published in" | |
] | |
is_high_quality = any(term in evidence.lower() for term in high_quality_source_markers) | |
quality_boost = 1.4 if is_high_quality else 1.0 | |
# Apply the quality boost | |
source_boost *= quality_boost | |
# Calculate final relevance score with improvements for all claim types | |
if claim_has_strong_assertions: | |
relevance_score = ( | |
(similarity * 0.35) + # Semantic similarity | |
(entity_overlap * 0.25) + # Entity overlap | |
(0.25) # Base value to ensure all evidence has some relevance | |
) * temporal_relevance * authority_score * source_boost * claim_specificity_match * evidence_specificity_match | |
else: | |
# Original formula for regular claims | |
relevance_score = ( | |
(similarity * 0.4) + # Semantic similarity | |
(entity_overlap * 0.3) + # Entity overlap | |
(0.3) # Base value to ensure all evidence has some relevance | |
) * temporal_relevance * authority_score * source_boost | |
# Add metadata and relevance score | |
analyzed_evidence.append({ | |
"text": evidence, | |
"relevance_score": relevance_score, | |
"similarity": similarity, | |
"entity_overlap": entity_overlap, | |
"temporal_relevance": temporal_relevance, | |
"authority_score": authority_score, | |
"authority_type": authority_type, | |
"source_boost": source_boost, | |
"domain": domain | |
}) | |
# Sort by relevance score (descending) | |
analyzed_evidence.sort(key=lambda x: x["relevance_score"], reverse=True) | |
# Ensure we have diverse sources in top results for all claims | |
if len(found_domains) > 1: | |
# Try to promote evidence from reliable sources if we haven't selected any yet | |
reliable_sources_seen = False | |
# Check if top 3 results contain any reliable sources | |
for item in analyzed_evidence[:3]: | |
domain = item.get("domain", "") | |
if domain and source_credibility and any(cred_domain in domain for cred_domain in source_credibility): | |
reliable_sources_seen = True | |
break | |
# If no reliable sources in top results, promote one if available | |
if not reliable_sources_seen: | |
for i, item in enumerate(analyzed_evidence[3:]): | |
domain = item.get("domain", "") | |
if domain and source_credibility and any(cred_domain in domain for cred_domain in source_credibility): | |
# Swap this item into the top 3 | |
analyzed_evidence.insert(2, analyzed_evidence.pop(i+3)) | |
break | |
return analyzed_evidence | |
def select_diverse_evidence(analyzed_evidence, max_items=5): | |
""" | |
Select diverse evidence items based on relevance, source diversity and claim characteristics | |
Args: | |
analyzed_evidence (list): List of evidence items with relevance scores | |
max_items (int): Maximum number of evidence items to return | |
Returns: | |
list: Selected diverse evidence items | |
""" | |
if not analyzed_evidence: | |
return [] | |
# Check if top evidence suggests claim has strong assertions | |
strong_assertion_markers = [ | |
"solved", "cured", "discovered", "breakthrough", "revolutionary", | |
"first ever", "confirmed", "definitive", "conclusive", "proven", | |
"groundbreaking", "unprecedented", "extends lifespan", "definitively" | |
] | |
# Determine if this is a claim with strong assertions by checking evidence text | |
has_strong_assertions = False | |
for item in analyzed_evidence[:3]: # Check just the top items for efficiency | |
if "text" in item: | |
item_text = item["text"].lower() | |
if any(f"claim {marker}" in item_text or f"claim has {marker}" in item_text | |
for marker in strong_assertion_markers): | |
has_strong_assertions = True | |
break | |
# Also check for contradiction markers in evidence which can indicate a strong assertion | |
contradiction_markers = [ | |
"not yet solved", "hasn't been proven", "no evidence that", | |
"remains unsolved", "has not been confirmed", "remains theoretical" | |
] | |
if not has_strong_assertions: | |
for item in analyzed_evidence[:3]: | |
if "text" in item: | |
item_text = item["text"].lower() | |
if any(marker in item_text for marker in contradiction_markers): | |
has_strong_assertions = True | |
break | |
# Ensure we don't select more than available | |
max_items = min(max_items, len(analyzed_evidence)) | |
# Initialize selected items with the most relevant item | |
selected = [analyzed_evidence[0]] | |
remaining = analyzed_evidence[1:] | |
# Track sources to ensure diversity | |
selected_sources = set() | |
for item in selected: | |
# Try to extract source from evidence | |
source_match = re.search(r'Source: ([^,]+)', item["text"]) | |
if source_match: | |
selected_sources.add(source_match.group(1)) | |
# For all claims, track if we have high-quality sources yet | |
has_quality_source = False | |
quality_source_markers = ["journal", "doi.org", "research", "university", | |
"institute", "laboratory", "professor", "study"] | |
# Check if our top item is already from a quality source | |
if any(marker in selected[0]["text"].lower() for marker in quality_source_markers): | |
has_quality_source = True | |
# Select remaining items balancing relevance and diversity | |
while len(selected) < max_items and remaining: | |
best_item = None | |
best_score = -1 | |
for i, item in enumerate(remaining): | |
# Base score is the item's relevance | |
score = item["relevance_score"] | |
# Extract source if available | |
source = None | |
source_match = re.search(r'Source: ([^,]+)', item["text"]) | |
if source_match: | |
source = source_match.group(1) | |
# Apply diversity bonus if source is new | |
if source and source not in selected_sources: | |
score *= 1.2 # Diversity bonus | |
# For claims with strong assertions, apply bonus for contradicting evidence | |
if has_strong_assertions: | |
# Check for contradiction markers in the text | |
if any(marker in item["text"].lower() for marker in contradiction_markers): | |
score *= 1.3 # Bonus for evidence that may contradict strong assertions | |
# For any claim, apply bonus for high-quality sources if we don't have one yet | |
if not has_quality_source: | |
is_item_quality = any(marker in item["text"].lower() for marker in quality_source_markers) | |
if is_item_quality: | |
score *= 1.5 # Significant bonus for quality sources | |
if score > best_score: | |
best_score = score | |
best_item = (i, item) | |
if best_item: | |
idx, item = best_item | |
selected.append(item) | |
remaining.pop(idx) | |
# Add source to selected sources | |
source_match = re.search(r'Source: ([^,]+)', item["text"]) | |
if source_match: | |
selected_sources.add(source_match.group(1)) | |
# Check if we found a quality source | |
if not has_quality_source: | |
if any(marker in item["text"].lower() for marker in quality_source_markers): | |
has_quality_source = True | |
else: | |
break | |
# For any claim with strong assertions, ensure we have at least one quality source if available | |
if has_strong_assertions and not has_quality_source and remaining: | |
for i, item in enumerate(remaining): | |
if any(marker in item["text"].lower() for marker in quality_source_markers): | |
# Replace the least relevant selected item with this quality one | |
selected.sort(key=lambda x: x["relevance_score"]) | |
selected[0] = item | |
break | |
# Return only the text portion | |
return [item["text"] for item in selected] |