Spaces:
Running
Running
| import logging | |
| import numpy as np | |
| from sklearn.metrics.pairwise import cosine_similarity | |
| from datetime import datetime, timedelta | |
| import re | |
| # Import the centralized NLP model handler | |
| from utils.models import get_nlp_model | |
| logger = logging.getLogger("misinformation_detector") | |
| def extract_entities(text): | |
| """Extract named entities from text""" | |
| if not text: | |
| return [] | |
| try: | |
| # Use centralized NLP model | |
| nlp_model = get_nlp_model() | |
| doc = nlp_model(text) | |
| entities = [ | |
| { | |
| "text": ent.text, | |
| "label": ent.label_, | |
| "start": ent.start_char, | |
| "end": ent.end_char | |
| } | |
| for ent in doc.ents | |
| ] | |
| return entities | |
| except Exception as e: | |
| logger.error(f"Error extracting entities: {str(e)}") | |
| return [] | |
| def get_vector_representation(text): | |
| """Get vector representation of text using spaCy""" | |
| if not text: | |
| return None | |
| try: | |
| # Use centralized NLP model | |
| nlp_model = get_nlp_model() | |
| doc = nlp_model(text) | |
| # Return document vector if available | |
| if doc.has_vector: | |
| return doc.vector | |
| # Fallback: average of token vectors | |
| vectors = [token.vector for token in doc if token.has_vector] | |
| if vectors: | |
| return np.mean(vectors, axis=0) | |
| return None | |
| except Exception as e: | |
| logger.error(f"Error getting vector representation: {str(e)}") | |
| return None | |
| def calculate_similarity(text1, text2): | |
| """Calculate semantic similarity between two texts""" | |
| if not text1 or not text2: | |
| return 0.0 | |
| try: | |
| vec1 = get_vector_representation(text1) | |
| vec2 = get_vector_representation(text2) | |
| if vec1 is None or vec2 is None: | |
| return 0.0 | |
| # Reshape vectors for cosine_similarity | |
| vec1 = vec1.reshape(1, -1) | |
| vec2 = vec2.reshape(1, -1) | |
| # Calculate cosine similarity | |
| similarity = cosine_similarity(vec1, vec2)[0][0] | |
| return float(similarity) | |
| except Exception as e: | |
| logger.error(f"Error calculating similarity: {str(e)}") | |
| return 0.0 | |
| def extract_date_from_evidence(evidence_text): | |
| """Extract date from evidence text""" | |
| if not evidence_text: | |
| return None | |
| try: | |
| # Look for date patterns in text | |
| date_patterns = [ | |
| r'Date: (\d{4}-\d{2}-\d{2})', # ISO format | |
| r'published.*?(\d{1,2}[/-]\d{1,2}[/-]\d{2,4})', # published on MM/DD/YYYY | |
| r'(\d{1,2}\s+(?:January|February|March|April|May|June|July|August|September|October|November|December)\s+\d{4})', # DD Month YYYY | |
| r'(?:January|February|March|April|May|June|July|August|September|October|November|December)\s+\d{1,2},?\s+\d{4}' # Month DD, YYYY | |
| ] | |
| for pattern in date_patterns: | |
| match = re.search(pattern, evidence_text) | |
| if match: | |
| date_str = match.group(1) | |
| # Parse date string based on format | |
| try: | |
| if '-' in date_str: | |
| return datetime.strptime(date_str, '%Y-%m-%d') | |
| elif '/' in date_str or '-' in date_str: | |
| formats = ['%m/%d/%Y', '%d/%m/%Y', '%m-%d-%Y', '%d-%m-%Y'] | |
| for fmt in formats: | |
| try: | |
| return datetime.strptime(date_str, fmt) | |
| except ValueError: | |
| continue | |
| else: | |
| # Try different month formats | |
| formats = ['%d %B %Y', '%B %d, %Y', '%B %d %Y'] | |
| for fmt in formats: | |
| try: | |
| return datetime.strptime(date_str, fmt) | |
| except ValueError: | |
| continue | |
| except Exception: | |
| pass | |
| return None | |
| except Exception as e: | |
| logger.error(f"Error extracting date from evidence: {str(e)}") | |
| return None | |
| def is_temporally_relevant(evidence_text, claim_text, max_days_old=30): | |
| """Check if evidence is temporally relevant to the claim""" | |
| # Check if claim seems to require recent evidence | |
| temporal_terms = ["today", "now", "current", "currently", "recent", "recently", "latest", "just", "this week", "this month", "this year"] | |
| requires_recent = any(term in claim_text.lower() for term in temporal_terms) | |
| # If claim doesn't specify temporality, consider evidence relevant | |
| if not requires_recent: | |
| return True | |
| # Extract date from evidence | |
| date = extract_date_from_evidence(evidence_text) | |
| if not date: | |
| return True # If we can't determine date, assume it's relevant | |
| # Check if evidence is recent enough | |
| cutoff = datetime.now() - timedelta(days=max_days_old) | |
| return date >= cutoff | |
| def has_authority_signal(evidence_text): | |
| """Check if evidence contains authority signals""" | |
| authority_signals = { | |
| "scientific_consensus": ["consensus", "scientists agree", "research shows", "studies confirm", "experts agree"], | |
| "fact_check": ["fact check", "rated false", "rated true", "debunked", "confirmed", "verification"], | |
| "high_authority": ["nasa", "world health organization", "who", "cdc", "national academy", | |
| "oxford", "harvard", "stanford", "mit", "cambridge", "yale", | |
| "princeton", "government", "official", "authorities", "minister", | |
| "ministry", "department", "administration", "university", "professor"] | |
| } | |
| evidence_lower = evidence_text.lower() | |
| authority_type = None | |
| authority_score = 1.0 | |
| for signal_type, phrases in authority_signals.items(): | |
| if any(phrase in evidence_lower for phrase in phrases): | |
| if signal_type == "scientific_consensus": | |
| authority_score = 1.8 | |
| authority_type = "scientific_consensus" | |
| elif signal_type == "fact_check": | |
| authority_score = 1.5 | |
| authority_type = "fact_check" | |
| elif signal_type == "high_authority": | |
| authority_score = 1.3 | |
| authority_type = "high_authority" | |
| break | |
| return authority_score, authority_type | |
| def analyze_evidence_relevance(claim, evidence_list, source_credibility=None): | |
| """ | |
| Analyze evidence relevance to claim using semantic similarity with improved handling | |
| for claims requiring strong evidence | |
| Args: | |
| claim (str): The claim being verified | |
| evidence_list (list): List of evidence items | |
| source_credibility (dict): Dictionary mapping source domains to credibility scores | |
| Returns: | |
| list: Sorted list of evidence items with relevance scores | |
| """ | |
| if not evidence_list: | |
| return [] | |
| # Ensure evidence_list is a list of strings | |
| if not isinstance(evidence_list, list): | |
| evidence_list = [str(evidence_list)] | |
| # Filter out None or empty items | |
| evidence_list = [item for item in evidence_list if item] | |
| # Check if claim contains strong assertions that would require specific evidence | |
| strong_assertion_markers = [ | |
| "solved", "cured", "discovered", "breakthrough", "revolutionary", | |
| "first ever", "confirmed", "definitive", "conclusive", "proven", | |
| "groundbreaking", "unprecedented", "remarkable", "extends lifespan", | |
| "extends life", "definitively", "successfully" | |
| ] | |
| # Determine if claim contains strong assertions | |
| claim_has_strong_assertions = any(marker in claim.lower() for marker in strong_assertion_markers) | |
| # Log detection result | |
| if claim_has_strong_assertions: | |
| logger.info(f"Evidence analysis: Detected claim with strong assertions requiring specific evidence") | |
| # Extract named entities from claim | |
| claim_entities = extract_entities(claim) | |
| claim_entity_texts = [entity["text"].lower() for entity in claim_entities] | |
| # Process each evidence item | |
| analyzed_evidence = [] | |
| # Track domains found in evidence to identify source diversity | |
| found_domains = set() | |
| for evidence in evidence_list: | |
| if not isinstance(evidence, str): | |
| continue | |
| # Calculate semantic similarity | |
| similarity = calculate_similarity(claim, evidence) | |
| # Check for entity overlap | |
| evidence_entities = extract_entities(evidence) | |
| evidence_entity_texts = [entity["text"].lower() for entity in evidence_entities] | |
| # Calculate entity overlap | |
| common_entities = set(claim_entity_texts).intersection(set(evidence_entity_texts)) | |
| entity_overlap = len(common_entities) / max(1, len(claim_entity_texts)) | |
| # Check temporal relevance | |
| temporal_relevance = 1.0 | |
| if is_temporally_relevant(evidence, claim): | |
| temporal_relevance = 1.2 | |
| else: | |
| # Penalty for temporally irrelevant evidence | |
| temporal_relevance = 0.7 | |
| # Check for authority signals | |
| authority_score, authority_type = has_authority_signal(evidence) | |
| # Extract source from evidence if available | |
| source_boost = 1.0 | |
| domain = None | |
| if source_credibility: | |
| # Try to extract domain from URL in evidence | |
| domain_match = re.search(r'URL: https?://(?:www\.)?([^/]+)', evidence) | |
| if domain_match: | |
| domain = domain_match.group(1) | |
| # Check if domain or its parent domain is in credibility list | |
| for cred_domain, cred_score in source_credibility.items(): | |
| if cred_domain in domain: | |
| try: | |
| source_boost = float(cred_score) | |
| break | |
| except (ValueError, TypeError): | |
| pass | |
| # Track this domain for source diversity | |
| if domain: | |
| found_domains.add(domain) | |
| # For claims with strong assertions: check if evidence specifically addresses assertions | |
| claim_specificity_match = 1.0 | |
| evidence_specificity_match = 1.0 | |
| if claim_has_strong_assertions: | |
| # Check if evidence provides specific confirmation or contradiction | |
| direct_contradiction_terms = [ | |
| "not yet", "has not", "have not", "cannot", "can't", "doesn't", "don't", | |
| "unlikely", "challenging", "remains a challenge", "in the future", | |
| "experimental", "in development", "proposed", "theoretical", | |
| "preliminary", "hypothesized", "potential", "promising but" | |
| ] | |
| # Check for contradictions to strong assertions | |
| if any(term in evidence.lower() for term in direct_contradiction_terms): | |
| # This evidence likely contradicts the strong assertion | |
| evidence_specificity_match = 2.0 # Boost relevance of contradicting evidence | |
| logger.debug(f"Found contradiction to strong assertion in evidence") | |
| # For claims with strong assertions, check if evidence specifically confirms | |
| direct_confirmation_terms = [ | |
| "successfully demonstrated", "breakthrough", "solved", "cured", | |
| "confirmed", "definitive evidence", "conclusive results", "proven", | |
| "revolutionary results", "milestone achievement", "groundbreaking results" | |
| ] | |
| # If evidence confirms the strong assertion, adjust relevance | |
| if any(term in evidence.lower() for term in direct_confirmation_terms): | |
| # Apply higher scoring for evidence that specifically confirms | |
| evidence_specificity_match = 1.8 | |
| logger.debug(f"Found confirmation of strong assertion in evidence") | |
| # For claims with strong assertions, check for high-quality sources | |
| high_quality_source_markers = [ | |
| "journal", "doi.org", "research", "university", "institute", | |
| "laboratory", "professor", "study", "publication", "published in" | |
| ] | |
| is_high_quality = any(term in evidence.lower() for term in high_quality_source_markers) | |
| quality_boost = 1.4 if is_high_quality else 1.0 | |
| # Apply the quality boost | |
| source_boost *= quality_boost | |
| # Calculate final relevance score with improvements for all claim types | |
| if claim_has_strong_assertions: | |
| relevance_score = ( | |
| (similarity * 0.35) + # Semantic similarity | |
| (entity_overlap * 0.25) + # Entity overlap | |
| (0.25) # Base value to ensure all evidence has some relevance | |
| ) * temporal_relevance * authority_score * source_boost * claim_specificity_match * evidence_specificity_match | |
| else: | |
| # Original formula for regular claims | |
| relevance_score = ( | |
| (similarity * 0.4) + # Semantic similarity | |
| (entity_overlap * 0.3) + # Entity overlap | |
| (0.3) # Base value to ensure all evidence has some relevance | |
| ) * temporal_relevance * authority_score * source_boost | |
| # Add metadata and relevance score | |
| analyzed_evidence.append({ | |
| "text": evidence, | |
| "relevance_score": relevance_score, | |
| "similarity": similarity, | |
| "entity_overlap": entity_overlap, | |
| "temporal_relevance": temporal_relevance, | |
| "authority_score": authority_score, | |
| "authority_type": authority_type, | |
| "source_boost": source_boost, | |
| "domain": domain | |
| }) | |
| # Sort by relevance score (descending) | |
| analyzed_evidence.sort(key=lambda x: x["relevance_score"], reverse=True) | |
| # Ensure we have diverse sources in top results for all claims | |
| if len(found_domains) > 1: | |
| # Try to promote evidence from reliable sources if we haven't selected any yet | |
| reliable_sources_seen = False | |
| # Check if top 3 results contain any reliable sources | |
| for item in analyzed_evidence[:3]: | |
| domain = item.get("domain", "") | |
| if domain and source_credibility and any(cred_domain in domain for cred_domain in source_credibility): | |
| reliable_sources_seen = True | |
| break | |
| # If no reliable sources in top results, promote one if available | |
| if not reliable_sources_seen: | |
| for i, item in enumerate(analyzed_evidence[3:]): | |
| domain = item.get("domain", "") | |
| if domain and source_credibility and any(cred_domain in domain for cred_domain in source_credibility): | |
| # Swap this item into the top 3 | |
| analyzed_evidence.insert(2, analyzed_evidence.pop(i+3)) | |
| break | |
| return analyzed_evidence | |
| def select_diverse_evidence(analyzed_evidence, max_items=5): | |
| """ | |
| Select diverse evidence items based on relevance, source diversity and claim characteristics | |
| Args: | |
| analyzed_evidence (list): List of evidence items with relevance scores | |
| max_items (int): Maximum number of evidence items to return | |
| Returns: | |
| list: Selected diverse evidence items | |
| """ | |
| if not analyzed_evidence: | |
| return [] | |
| # Check if top evidence suggests claim has strong assertions | |
| strong_assertion_markers = [ | |
| "solved", "cured", "discovered", "breakthrough", "revolutionary", | |
| "first ever", "confirmed", "definitive", "conclusive", "proven", | |
| "groundbreaking", "unprecedented", "extends lifespan", "definitively" | |
| ] | |
| # Determine if this is a claim with strong assertions by checking evidence text | |
| has_strong_assertions = False | |
| for item in analyzed_evidence[:3]: # Check just the top items for efficiency | |
| if "text" in item: | |
| item_text = item["text"].lower() | |
| if any(f"claim {marker}" in item_text or f"claim has {marker}" in item_text | |
| for marker in strong_assertion_markers): | |
| has_strong_assertions = True | |
| break | |
| # Also check for contradiction markers in evidence which can indicate a strong assertion | |
| contradiction_markers = [ | |
| "not yet solved", "hasn't been proven", "no evidence that", | |
| "remains unsolved", "has not been confirmed", "remains theoretical" | |
| ] | |
| if not has_strong_assertions: | |
| for item in analyzed_evidence[:3]: | |
| if "text" in item: | |
| item_text = item["text"].lower() | |
| if any(marker in item_text for marker in contradiction_markers): | |
| has_strong_assertions = True | |
| break | |
| # Ensure we don't select more than available | |
| max_items = min(max_items, len(analyzed_evidence)) | |
| # Initialize selected items with the most relevant item | |
| selected = [analyzed_evidence[0]] | |
| remaining = analyzed_evidence[1:] | |
| # Track sources to ensure diversity | |
| selected_sources = set() | |
| for item in selected: | |
| # Try to extract source from evidence | |
| source_match = re.search(r'Source: ([^,]+)', item["text"]) | |
| if source_match: | |
| selected_sources.add(source_match.group(1)) | |
| # For all claims, track if we have high-quality sources yet | |
| has_quality_source = False | |
| quality_source_markers = ["journal", "doi.org", "research", "university", | |
| "institute", "laboratory", "professor", "study"] | |
| # Check if our top item is already from a quality source | |
| if any(marker in selected[0]["text"].lower() for marker in quality_source_markers): | |
| has_quality_source = True | |
| # Select remaining items balancing relevance and diversity | |
| while len(selected) < max_items and remaining: | |
| best_item = None | |
| best_score = -1 | |
| for i, item in enumerate(remaining): | |
| # Base score is the item's relevance | |
| score = item["relevance_score"] | |
| # Extract source if available | |
| source = None | |
| source_match = re.search(r'Source: ([^,]+)', item["text"]) | |
| if source_match: | |
| source = source_match.group(1) | |
| # Apply diversity bonus if source is new | |
| if source and source not in selected_sources: | |
| score *= 1.2 # Diversity bonus | |
| # For claims with strong assertions, apply bonus for contradicting evidence | |
| if has_strong_assertions: | |
| # Check for contradiction markers in the text | |
| if any(marker in item["text"].lower() for marker in contradiction_markers): | |
| score *= 1.3 # Bonus for evidence that may contradict strong assertions | |
| # For any claim, apply bonus for high-quality sources if we don't have one yet | |
| if not has_quality_source: | |
| is_item_quality = any(marker in item["text"].lower() for marker in quality_source_markers) | |
| if is_item_quality: | |
| score *= 1.5 # Significant bonus for quality sources | |
| if score > best_score: | |
| best_score = score | |
| best_item = (i, item) | |
| if best_item: | |
| idx, item = best_item | |
| selected.append(item) | |
| remaining.pop(idx) | |
| # Add source to selected sources | |
| source_match = re.search(r'Source: ([^,]+)', item["text"]) | |
| if source_match: | |
| selected_sources.add(source_match.group(1)) | |
| # Check if we found a quality source | |
| if not has_quality_source: | |
| if any(marker in item["text"].lower() for marker in quality_source_markers): | |
| has_quality_source = True | |
| else: | |
| break | |
| # For any claim with strong assertions, ensure we have at least one quality source if available | |
| if has_strong_assertions and not has_quality_source and remaining: | |
| for i, item in enumerate(remaining): | |
| if any(marker in item["text"].lower() for marker in quality_source_markers): | |
| # Replace the least relevant selected item with this quality one | |
| selected.sort(key=lambda x: x["relevance_score"]) | |
| selected[0] = item | |
| break | |
| # Return only the text portion | |
| return [item["text"] for item in selected] |