askveracity / modules /semantic_analysis.py
ankanghosh's picture
Upload 12 files
5dc3509 verified
raw
history blame
21.1 kB
import logging
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity
from datetime import datetime, timedelta
import re
# Import the centralized NLP model handler
from utils.models import get_nlp_model
logger = logging.getLogger("misinformation_detector")
def extract_entities(text):
"""Extract named entities from text"""
if not text:
return []
try:
# Use centralized NLP model
nlp_model = get_nlp_model()
doc = nlp_model(text)
entities = [
{
"text": ent.text,
"label": ent.label_,
"start": ent.start_char,
"end": ent.end_char
}
for ent in doc.ents
]
return entities
except Exception as e:
logger.error(f"Error extracting entities: {str(e)}")
return []
def get_vector_representation(text):
"""Get vector representation of text using spaCy"""
if not text:
return None
try:
# Use centralized NLP model
nlp_model = get_nlp_model()
doc = nlp_model(text)
# Return document vector if available
if doc.has_vector:
return doc.vector
# Fallback: average of token vectors
vectors = [token.vector for token in doc if token.has_vector]
if vectors:
return np.mean(vectors, axis=0)
return None
except Exception as e:
logger.error(f"Error getting vector representation: {str(e)}")
return None
def calculate_similarity(text1, text2):
"""Calculate semantic similarity between two texts"""
if not text1 or not text2:
return 0.0
try:
vec1 = get_vector_representation(text1)
vec2 = get_vector_representation(text2)
if vec1 is None or vec2 is None:
return 0.0
# Reshape vectors for cosine_similarity
vec1 = vec1.reshape(1, -1)
vec2 = vec2.reshape(1, -1)
# Calculate cosine similarity
similarity = cosine_similarity(vec1, vec2)[0][0]
return float(similarity)
except Exception as e:
logger.error(f"Error calculating similarity: {str(e)}")
return 0.0
def extract_date_from_evidence(evidence_text):
"""Extract date from evidence text"""
if not evidence_text:
return None
try:
# Look for date patterns in text
date_patterns = [
r'Date: (\d{4}-\d{2}-\d{2})', # ISO format
r'published.*?(\d{1,2}[/-]\d{1,2}[/-]\d{2,4})', # published on MM/DD/YYYY
r'(\d{1,2}\s+(?:January|February|March|April|May|June|July|August|September|October|November|December)\s+\d{4})', # DD Month YYYY
r'(?:January|February|March|April|May|June|July|August|September|October|November|December)\s+\d{1,2},?\s+\d{4}' # Month DD, YYYY
]
for pattern in date_patterns:
match = re.search(pattern, evidence_text)
if match:
date_str = match.group(1)
# Parse date string based on format
try:
if '-' in date_str:
return datetime.strptime(date_str, '%Y-%m-%d')
elif '/' in date_str or '-' in date_str:
formats = ['%m/%d/%Y', '%d/%m/%Y', '%m-%d-%Y', '%d-%m-%Y']
for fmt in formats:
try:
return datetime.strptime(date_str, fmt)
except ValueError:
continue
else:
# Try different month formats
formats = ['%d %B %Y', '%B %d, %Y', '%B %d %Y']
for fmt in formats:
try:
return datetime.strptime(date_str, fmt)
except ValueError:
continue
except Exception:
pass
return None
except Exception as e:
logger.error(f"Error extracting date from evidence: {str(e)}")
return None
def is_temporally_relevant(evidence_text, claim_text, max_days_old=30):
"""Check if evidence is temporally relevant to the claim"""
# Check if claim seems to require recent evidence
temporal_terms = ["today", "now", "current", "currently", "recent", "recently", "latest", "just", "this week", "this month", "this year"]
requires_recent = any(term in claim_text.lower() for term in temporal_terms)
# If claim doesn't specify temporality, consider evidence relevant
if not requires_recent:
return True
# Extract date from evidence
date = extract_date_from_evidence(evidence_text)
if not date:
return True # If we can't determine date, assume it's relevant
# Check if evidence is recent enough
cutoff = datetime.now() - timedelta(days=max_days_old)
return date >= cutoff
def has_authority_signal(evidence_text):
"""Check if evidence contains authority signals"""
authority_signals = {
"scientific_consensus": ["consensus", "scientists agree", "research shows", "studies confirm", "experts agree"],
"fact_check": ["fact check", "rated false", "rated true", "debunked", "confirmed", "verification"],
"high_authority": ["nasa", "world health organization", "who", "cdc", "national academy",
"oxford", "harvard", "stanford", "mit", "cambridge", "yale",
"princeton", "government", "official", "authorities", "minister",
"ministry", "department", "administration", "university", "professor"]
}
evidence_lower = evidence_text.lower()
authority_type = None
authority_score = 1.0
for signal_type, phrases in authority_signals.items():
if any(phrase in evidence_lower for phrase in phrases):
if signal_type == "scientific_consensus":
authority_score = 1.8
authority_type = "scientific_consensus"
elif signal_type == "fact_check":
authority_score = 1.5
authority_type = "fact_check"
elif signal_type == "high_authority":
authority_score = 1.3
authority_type = "high_authority"
break
return authority_score, authority_type
def analyze_evidence_relevance(claim, evidence_list, source_credibility=None):
"""
Analyze evidence relevance to claim using semantic similarity with improved handling
for claims requiring strong evidence
Args:
claim (str): The claim being verified
evidence_list (list): List of evidence items
source_credibility (dict): Dictionary mapping source domains to credibility scores
Returns:
list: Sorted list of evidence items with relevance scores
"""
if not evidence_list:
return []
# Ensure evidence_list is a list of strings
if not isinstance(evidence_list, list):
evidence_list = [str(evidence_list)]
# Filter out None or empty items
evidence_list = [item for item in evidence_list if item]
# Check if claim contains strong assertions that would require specific evidence
strong_assertion_markers = [
"solved", "cured", "discovered", "breakthrough", "revolutionary",
"first ever", "confirmed", "definitive", "conclusive", "proven",
"groundbreaking", "unprecedented", "remarkable", "extends lifespan",
"extends life", "definitively", "successfully"
]
# Determine if claim contains strong assertions
claim_has_strong_assertions = any(marker in claim.lower() for marker in strong_assertion_markers)
# Log detection result
if claim_has_strong_assertions:
logger.info(f"Evidence analysis: Detected claim with strong assertions requiring specific evidence")
# Extract named entities from claim
claim_entities = extract_entities(claim)
claim_entity_texts = [entity["text"].lower() for entity in claim_entities]
# Process each evidence item
analyzed_evidence = []
# Track domains found in evidence to identify source diversity
found_domains = set()
for evidence in evidence_list:
if not isinstance(evidence, str):
continue
# Calculate semantic similarity
similarity = calculate_similarity(claim, evidence)
# Check for entity overlap
evidence_entities = extract_entities(evidence)
evidence_entity_texts = [entity["text"].lower() for entity in evidence_entities]
# Calculate entity overlap
common_entities = set(claim_entity_texts).intersection(set(evidence_entity_texts))
entity_overlap = len(common_entities) / max(1, len(claim_entity_texts))
# Check temporal relevance
temporal_relevance = 1.0
if is_temporally_relevant(evidence, claim):
temporal_relevance = 1.2
else:
# Penalty for temporally irrelevant evidence
temporal_relevance = 0.7
# Check for authority signals
authority_score, authority_type = has_authority_signal(evidence)
# Extract source from evidence if available
source_boost = 1.0
domain = None
if source_credibility:
# Try to extract domain from URL in evidence
domain_match = re.search(r'URL: https?://(?:www\.)?([^/]+)', evidence)
if domain_match:
domain = domain_match.group(1)
# Check if domain or its parent domain is in credibility list
for cred_domain, cred_score in source_credibility.items():
if cred_domain in domain:
try:
source_boost = float(cred_score)
break
except (ValueError, TypeError):
pass
# Track this domain for source diversity
if domain:
found_domains.add(domain)
# For claims with strong assertions: check if evidence specifically addresses assertions
claim_specificity_match = 1.0
evidence_specificity_match = 1.0
if claim_has_strong_assertions:
# Check if evidence provides specific confirmation or contradiction
direct_contradiction_terms = [
"not yet", "has not", "have not", "cannot", "can't", "doesn't", "don't",
"unlikely", "challenging", "remains a challenge", "in the future",
"experimental", "in development", "proposed", "theoretical",
"preliminary", "hypothesized", "potential", "promising but"
]
# Check for contradictions to strong assertions
if any(term in evidence.lower() for term in direct_contradiction_terms):
# This evidence likely contradicts the strong assertion
evidence_specificity_match = 2.0 # Boost relevance of contradicting evidence
logger.debug(f"Found contradiction to strong assertion in evidence")
# For claims with strong assertions, check if evidence specifically confirms
direct_confirmation_terms = [
"successfully demonstrated", "breakthrough", "solved", "cured",
"confirmed", "definitive evidence", "conclusive results", "proven",
"revolutionary results", "milestone achievement", "groundbreaking results"
]
# If evidence confirms the strong assertion, adjust relevance
if any(term in evidence.lower() for term in direct_confirmation_terms):
# Apply higher scoring for evidence that specifically confirms
evidence_specificity_match = 1.8
logger.debug(f"Found confirmation of strong assertion in evidence")
# For claims with strong assertions, check for high-quality sources
high_quality_source_markers = [
"journal", "doi.org", "research", "university", "institute",
"laboratory", "professor", "study", "publication", "published in"
]
is_high_quality = any(term in evidence.lower() for term in high_quality_source_markers)
quality_boost = 1.4 if is_high_quality else 1.0
# Apply the quality boost
source_boost *= quality_boost
# Calculate final relevance score with improvements for all claim types
if claim_has_strong_assertions:
relevance_score = (
(similarity * 0.35) + # Semantic similarity
(entity_overlap * 0.25) + # Entity overlap
(0.25) # Base value to ensure all evidence has some relevance
) * temporal_relevance * authority_score * source_boost * claim_specificity_match * evidence_specificity_match
else:
# Original formula for regular claims
relevance_score = (
(similarity * 0.4) + # Semantic similarity
(entity_overlap * 0.3) + # Entity overlap
(0.3) # Base value to ensure all evidence has some relevance
) * temporal_relevance * authority_score * source_boost
# Add metadata and relevance score
analyzed_evidence.append({
"text": evidence,
"relevance_score": relevance_score,
"similarity": similarity,
"entity_overlap": entity_overlap,
"temporal_relevance": temporal_relevance,
"authority_score": authority_score,
"authority_type": authority_type,
"source_boost": source_boost,
"domain": domain
})
# Sort by relevance score (descending)
analyzed_evidence.sort(key=lambda x: x["relevance_score"], reverse=True)
# Ensure we have diverse sources in top results for all claims
if len(found_domains) > 1:
# Try to promote evidence from reliable sources if we haven't selected any yet
reliable_sources_seen = False
# Check if top 3 results contain any reliable sources
for item in analyzed_evidence[:3]:
domain = item.get("domain", "")
if domain and source_credibility and any(cred_domain in domain for cred_domain in source_credibility):
reliable_sources_seen = True
break
# If no reliable sources in top results, promote one if available
if not reliable_sources_seen:
for i, item in enumerate(analyzed_evidence[3:]):
domain = item.get("domain", "")
if domain and source_credibility and any(cred_domain in domain for cred_domain in source_credibility):
# Swap this item into the top 3
analyzed_evidence.insert(2, analyzed_evidence.pop(i+3))
break
return analyzed_evidence
def select_diverse_evidence(analyzed_evidence, max_items=5):
"""
Select diverse evidence items based on relevance, source diversity and claim characteristics
Args:
analyzed_evidence (list): List of evidence items with relevance scores
max_items (int): Maximum number of evidence items to return
Returns:
list: Selected diverse evidence items
"""
if not analyzed_evidence:
return []
# Check if top evidence suggests claim has strong assertions
strong_assertion_markers = [
"solved", "cured", "discovered", "breakthrough", "revolutionary",
"first ever", "confirmed", "definitive", "conclusive", "proven",
"groundbreaking", "unprecedented", "extends lifespan", "definitively"
]
# Determine if this is a claim with strong assertions by checking evidence text
has_strong_assertions = False
for item in analyzed_evidence[:3]: # Check just the top items for efficiency
if "text" in item:
item_text = item["text"].lower()
if any(f"claim {marker}" in item_text or f"claim has {marker}" in item_text
for marker in strong_assertion_markers):
has_strong_assertions = True
break
# Also check for contradiction markers in evidence which can indicate a strong assertion
contradiction_markers = [
"not yet solved", "hasn't been proven", "no evidence that",
"remains unsolved", "has not been confirmed", "remains theoretical"
]
if not has_strong_assertions:
for item in analyzed_evidence[:3]:
if "text" in item:
item_text = item["text"].lower()
if any(marker in item_text for marker in contradiction_markers):
has_strong_assertions = True
break
# Ensure we don't select more than available
max_items = min(max_items, len(analyzed_evidence))
# Initialize selected items with the most relevant item
selected = [analyzed_evidence[0]]
remaining = analyzed_evidence[1:]
# Track sources to ensure diversity
selected_sources = set()
for item in selected:
# Try to extract source from evidence
source_match = re.search(r'Source: ([^,]+)', item["text"])
if source_match:
selected_sources.add(source_match.group(1))
# For all claims, track if we have high-quality sources yet
has_quality_source = False
quality_source_markers = ["journal", "doi.org", "research", "university",
"institute", "laboratory", "professor", "study"]
# Check if our top item is already from a quality source
if any(marker in selected[0]["text"].lower() for marker in quality_source_markers):
has_quality_source = True
# Select remaining items balancing relevance and diversity
while len(selected) < max_items and remaining:
best_item = None
best_score = -1
for i, item in enumerate(remaining):
# Base score is the item's relevance
score = item["relevance_score"]
# Extract source if available
source = None
source_match = re.search(r'Source: ([^,]+)', item["text"])
if source_match:
source = source_match.group(1)
# Apply diversity bonus if source is new
if source and source not in selected_sources:
score *= 1.2 # Diversity bonus
# For claims with strong assertions, apply bonus for contradicting evidence
if has_strong_assertions:
# Check for contradiction markers in the text
if any(marker in item["text"].lower() for marker in contradiction_markers):
score *= 1.3 # Bonus for evidence that may contradict strong assertions
# For any claim, apply bonus for high-quality sources if we don't have one yet
if not has_quality_source:
is_item_quality = any(marker in item["text"].lower() for marker in quality_source_markers)
if is_item_quality:
score *= 1.5 # Significant bonus for quality sources
if score > best_score:
best_score = score
best_item = (i, item)
if best_item:
idx, item = best_item
selected.append(item)
remaining.pop(idx)
# Add source to selected sources
source_match = re.search(r'Source: ([^,]+)', item["text"])
if source_match:
selected_sources.add(source_match.group(1))
# Check if we found a quality source
if not has_quality_source:
if any(marker in item["text"].lower() for marker in quality_source_markers):
has_quality_source = True
else:
break
# For any claim with strong assertions, ensure we have at least one quality source if available
if has_strong_assertions and not has_quality_source and remaining:
for i, item in enumerate(remaining):
if any(marker in item["text"].lower() for marker in quality_source_markers):
# Replace the least relevant selected item with this quality one
selected.sort(key=lambda x: x["relevance_score"])
selected[0] = item
break
# Return only the text portion
return [item["text"] for item in selected]