Spaces:
Running
Running
import logging | |
import time | |
import re | |
import random | |
import requests | |
import json | |
import ssl | |
from urllib.parse import urlencode | |
from bs4 import BeautifulSoup | |
from SPARQLWrapper import SPARQLWrapper, JSON | |
from datetime import datetime, timedelta | |
from concurrent.futures import ThreadPoolExecutor, as_completed, wait, FIRST_COMPLETED | |
from utils.api_utils import api_error_handler, safe_json_parse | |
from utils.models import get_nlp_model | |
from modules.claim_extraction import shorten_claim_for_evidence, extract_claims | |
from modules.rss_feed import retrieve_evidence_from_rss | |
from modules.semantic_analysis import analyze_evidence_relevance, select_diverse_evidence | |
from config import SOURCE_CREDIBILITY, NEWS_API_KEY, FACTCHECK_API_KEY | |
# Import the performance tracker | |
from utils.performance import PerformanceTracker | |
performance_tracker = PerformanceTracker() | |
logger = logging.getLogger("misinformation_detector") | |
# Define early analysis function at the module level so it's available everywhere | |
def analyze_early_evidence(claim, source_name, source_evidence): | |
"""Pre-analyze evidence while waiting for other sources to complete""" | |
try: | |
if not source_evidence: | |
return None | |
logger.info(f"Pre-analyzing {len(source_evidence)} evidence items from {source_name}") | |
# Do a quick relevance check using similarity scoring | |
nlp_model = get_nlp_model() | |
claim_doc = nlp_model(claim) | |
relevant_evidence = [] | |
for evidence in source_evidence: | |
if not isinstance(evidence, str): | |
continue | |
# Look for direct keyword matches first (fast check) | |
is_related = False | |
keywords = [word.lower() for word in claim.split() if len(word) > 3] | |
for keyword in keywords: | |
if keyword in evidence.lower(): | |
is_related = True | |
break | |
# If no keywords match, do a basic entity check | |
if not is_related: | |
# Check if claim and evidence share any entities | |
evidence_doc = nlp_model(evidence[:500]) # Limit for speed | |
claim_entities = [ent.text.lower() for ent in claim_doc.ents] | |
evidence_entities = [ent.text.lower() for ent in evidence_doc.ents] | |
common_entities = set(claim_entities).intersection(set(evidence_entities)) | |
if common_entities: | |
is_related = True | |
if is_related: | |
relevant_evidence.append(evidence) | |
logger.info(f"Found {len(relevant_evidence)} relevant items out of {len(source_evidence)} from {source_name}") | |
return relevant_evidence | |
except Exception as e: | |
logger.error(f"Error in early evidence analysis: {e}") | |
return source_evidence # On error, return original evidence | |
# New function to get recent date for filtering news | |
def get_recent_date_range(): | |
"""Return date range for recent news filtering - last 3 days""" | |
today = datetime.now() | |
three_days_ago = today - timedelta(days=3) | |
return three_days_ago.strftime('%Y-%m-%d'), today.strftime('%Y-%m-%d') | |
def retrieve_evidence_from_wikipedia(claim): | |
"""Retrieve evidence from Wikipedia for a given claim""" | |
logger.info(f"Retrieving evidence from Wikipedia for: {claim}") | |
# Ensure shortened_claim is a string | |
try: | |
shortened_claim = shorten_claim_for_evidence(claim) | |
except Exception as e: | |
logger.error(f"Error in claim shortening: {e}") | |
shortened_claim = claim # Fallback to original claim | |
# Ensure query_parts is a list of strings | |
query_parts = str(shortened_claim).split() | |
evidence = [] | |
source_count = {"wikipedia": 0} | |
for i in range(len(query_parts), 0, -1): # Start with full query, shorten iteratively | |
try: | |
# Safely join and encode query | |
current_query = "+".join(query_parts[:i]) | |
search_url = f"https://en.wikipedia.org/w/api.php?action=query&list=search&srsearch={current_query}&format=json" | |
logger.info(f"Wikipedia search URL: {search_url}") | |
headers = { | |
"User-Agent": "MisinformationDetectionResearchBot/1.0 (Research Project)" | |
} | |
# Make the search request with reduced timeout | |
response = requests.get(search_url, headers=headers, timeout=7) | |
response.raise_for_status() | |
# Safely parse JSON | |
search_data = safe_json_parse(response, "wikipedia") | |
# Safely extract search results | |
search_results = search_data.get("query", {}).get("search", []) | |
# Ensure search_results is a list | |
if not isinstance(search_results, list): | |
logger.warning(f"Unexpected search results type: {type(search_results)}") | |
search_results = [] | |
# Use ThreadPoolExecutor to fetch page content in parallel | |
with ThreadPoolExecutor(max_workers=3) as executor: | |
# Submit up to 3 page requests in parallel | |
futures = [] | |
for idx, result in enumerate(search_results[:3]): | |
# Ensure result is a dictionary | |
if not isinstance(result, dict): | |
logger.warning(f"Skipping non-dictionary result: {type(result)}") | |
continue | |
# Safely extract title | |
page_title = result.get("title", "") | |
if not page_title: | |
continue | |
page_url = f"https://en.wikipedia.org/wiki/{page_title.replace(' ', '_')}" | |
# Submit the page request task to executor | |
futures.append(executor.submit( | |
fetch_wikipedia_page_content, | |
page_url, | |
page_title, | |
headers | |
)) | |
# Process completed futures as they finish | |
for future in as_completed(futures): | |
try: | |
page_result = future.result() | |
if page_result: | |
evidence.append(page_result) | |
source_count["wikipedia"] += 1 | |
except Exception as e: | |
logger.error(f"Error processing Wikipedia page: {e}") | |
# Stop if we found any evidence | |
if evidence: | |
break | |
except Exception as e: | |
logger.error(f"Error retrieving from Wikipedia: {str(e)}") | |
continue | |
# Ensure success is a boolean | |
success = bool(evidence) | |
# Safely log evidence retrieval | |
try: | |
performance_tracker.log_evidence_retrieval(success, source_count) | |
except Exception as e: | |
logger.error(f"Error logging evidence retrieval: {e}") | |
if not evidence: | |
logger.warning("No evidence found from Wikipedia.") | |
return evidence | |
def fetch_wikipedia_page_content(page_url, page_title, headers): | |
"""Helper function to fetch and parse Wikipedia page content""" | |
try: | |
# Get page content with reduced timeout | |
page_response = requests.get(page_url, headers=headers, timeout=5) | |
page_response.raise_for_status() | |
# Extract relevant sections using BeautifulSoup | |
soup = BeautifulSoup(page_response.text, 'html.parser') | |
paragraphs = soup.find_all('p', limit=3) # Limit to first 3 paragraphs | |
content = " ".join([para.get_text(strip=True) for para in paragraphs]) | |
# Truncate content to reduce token usage earlier in the pipeline | |
if len(content) > 300: | |
content = content[:297] + "..." | |
if content.strip(): # Ensure content is not empty | |
return f"Title: {page_title}, URL: {page_url}, Content: {content}" | |
return None | |
except Exception as e: | |
logger.error(f"Error fetching Wikipedia page {page_url}: {e}") | |
return None | |
# Update the WikiData function to fix SSL issues | |
def retrieve_evidence_from_wikidata(claim): | |
"""Retrieve evidence from WikiData for a given claim""" | |
logger.info(f"Retrieving evidence from WikiData for: {claim}") | |
# Prepare entities for SPARQL query | |
shortened_claim = shorten_claim_for_evidence(claim) | |
query_terms = shortened_claim.split() | |
# Initialize SPARQLWrapper for WikiData | |
sparql = SPARQLWrapper("https://query.wikidata.org/sparql") | |
# Use a more conservative user agent to avoid blocks | |
sparql.addCustomHttpHeader("User-Agent", "MisinformationDetectionResearchBot/1.0") | |
# Fix SSL issues by disabling SSL verification for this specific request | |
try: | |
# Create a context where we don't verify SSL certs | |
import ssl | |
import urllib.request | |
# Create a context that doesn't verify certificates | |
ssl_context = ssl._create_unverified_context() | |
# Monkey patch the opener for SPARQLWrapper | |
opener = urllib.request.build_opener(urllib.request.HTTPSHandler(context=ssl_context)) | |
urllib.request.install_opener(opener) | |
except Exception as e: | |
logger.error(f"Error setting up SSL context: {str(e)}") | |
# Construct basic SPARQL query for relevant entities | |
query = """ | |
SELECT ?item ?itemLabel ?description ?article WHERE { | |
SERVICE wikibase:mwapi { | |
bd:serviceParam wikibase:api "EntitySearch" . | |
bd:serviceParam wikibase:endpoint "www.wikidata.org" . | |
bd:serviceParam mwapi:search "%s" . | |
bd:serviceParam mwapi:language "en" . | |
?item wikibase:apiOutputItem mwapi:item . | |
} | |
?item schema:description ?description . | |
FILTER(LANG(?description) = "en") | |
OPTIONAL { | |
?article schema:about ?item . | |
?article schema:isPartOf <https://en.wikipedia.org/> . | |
} | |
SERVICE wikibase:label { bd:serviceParam wikibase:language "en" . } | |
} | |
LIMIT 5 | |
""" % " ".join(query_terms) | |
sparql.setQuery(query) | |
sparql.setReturnFormat(JSON) | |
try: | |
results = sparql.query().convert() | |
wikidata_evidence = [] | |
for result in results["results"]["bindings"]: | |
entity_label = result.get("itemLabel", {}).get("value", "Unknown") | |
description = result.get("description", {}).get("value", "No description") | |
article_url = result.get("article", {}).get("value", "") | |
# Truncate description to reduce token usage | |
if len(description) > 200: | |
description = description[:197] + "..." | |
evidence_text = f"Entity: {entity_label}, Description: {description}" | |
if article_url: | |
evidence_text += f", URL: {article_url}" | |
wikidata_evidence.append(evidence_text) | |
logger.info(f"Retrieved {len(wikidata_evidence)} WikiData entities") | |
return wikidata_evidence | |
except Exception as e: | |
logger.error(f"Error retrieving from WikiData: {str(e)}") | |
return [] | |
def retrieve_evidence_from_openalex(claim): | |
"""Retrieve evidence from OpenAlex for a given claim (replacement for Semantic Scholar)""" | |
logger.info(f"Retrieving evidence from OpenAlex for: {claim}") | |
try: | |
shortened_claim = shorten_claim_for_evidence(claim) | |
query = shortened_claim.replace(" ", "+") | |
# OpenAlex API endpoint | |
api_url = f"https://api.openalex.org/works?search={query}&filter=is_paratext:false&per_page=3" | |
headers = { | |
"Accept": "application/json", | |
"User-Agent": "MisinformationDetectionResearchBot/1.0 ([email protected])", | |
} | |
scholarly_evidence = [] | |
try: | |
# Request with reduced timeout | |
response = requests.get(api_url, headers=headers, timeout=8) | |
# Check response status | |
if response.status_code == 200: | |
# Successfully retrieved data | |
data = safe_json_parse(response, "openalex") | |
papers = data.get("results", []) | |
for paper in papers: | |
title = paper.get("title", "Unknown Title") | |
abstract = paper.get("abstract_inverted_index", None) | |
# OpenAlex stores abstracts in an inverted index format, so we need to reconstruct it | |
abstract_text = "No abstract available" | |
if abstract: | |
try: | |
# Simple approach to reconstruct from inverted index | |
# For a production app, implement a proper reconstruction algorithm | |
words = list(abstract.keys()) | |
abstract_text = " ".join(words[:30]) + "..." | |
except Exception as e: | |
logger.error(f"Error reconstructing abstract: {e}") | |
url = paper.get("doi", "") | |
if url and not url.startswith("http"): | |
url = f"https://doi.org/{url}" | |
year = "" | |
publication_date = paper.get("publication_date", "") | |
if publication_date: | |
year = publication_date.split("-")[0] | |
# Truncate abstract to reasonable length | |
if len(abstract_text) > 250: | |
abstract_text = abstract_text[:247] + "..." | |
evidence_text = f"Title: {title}, Year: {year}, Abstract: {abstract_text}, URL: {url}" | |
scholarly_evidence.append(evidence_text) | |
else: | |
logger.error(f"OpenAlex API error: {response.status_code}") | |
except requests.exceptions.Timeout: | |
logger.warning("OpenAlex request timed out") | |
except requests.exceptions.ConnectionError: | |
logger.warning("OpenAlex connection error") | |
except Exception as e: | |
logger.error(f"Unexpected error in OpenAlex request: {str(e)}") | |
logger.info(f"Retrieved {len(scholarly_evidence)} scholarly papers from OpenAlex") | |
return scholarly_evidence | |
except Exception as e: | |
logger.error(f"Fatal error in OpenAlex retrieval: {str(e)}") | |
return [] | |
def retrieve_evidence_from_claimreview(claim): | |
"""Retrieve evidence from Google's ClaimReview for a given claim""" | |
logger.info(f"Retrieving evidence from ClaimReview for: {claim}") | |
factcheck_api_key = FACTCHECK_API_KEY | |
# Safely shorten claim | |
try: | |
shortened_claim = shorten_claim_for_evidence(claim) | |
except Exception as e: | |
logger.error(f"Error shortening claim: {e}") | |
shortened_claim = claim | |
query_parts = str(shortened_claim).split() | |
factcheck_results = [] | |
source_count = {"factcheck": 0} | |
for i in range(len(query_parts), 0, -1): # Iteratively try shorter queries | |
try: | |
current_query = " ".join(query_parts[:i]) | |
encoded_query = urlencode({"query": current_query}) | |
factcheck_url = f"https://factchecktools.googleapis.com/v1alpha1/claims:search?{encoded_query}&key={factcheck_api_key}" | |
logger.info(f"Factcheck URL: {factcheck_url}") | |
# Make request with reduced timeout | |
response = requests.get(factcheck_url, timeout=7) | |
response.raise_for_status() | |
data = safe_json_parse(response, "factcheck") | |
# Safely extract claims | |
claims = data.get("claims", []) | |
if not isinstance(claims, list): | |
logger.warning(f"Unexpected claims type: {type(claims)}") | |
claims = [] | |
if claims: # If results found | |
logger.info(f"Results found for query '{current_query}'.") | |
for item in claims: | |
try: | |
# Ensure item is a dictionary | |
if not isinstance(item, dict): | |
logger.warning(f"Skipping non-dictionary item: {type(item)}") | |
continue | |
claim_text = str(item.get("text", "")) | |
# Truncate claim text | |
if len(claim_text) > 200: | |
claim_text = claim_text[:197] + "..." | |
reviews = item.get("claimReview", []) | |
# Ensure reviews is a list | |
if not isinstance(reviews, list): | |
logger.warning(f"Unexpected reviews type: {type(reviews)}") | |
reviews = [] | |
for review in reviews: | |
# Ensure review is a dictionary | |
if not isinstance(review, dict): | |
logger.warning(f"Skipping non-dictionary review: {type(review)}") | |
continue | |
publisher = str(review.get("publisher", {}).get("name", "Unknown Source")) | |
rating = str(review.get("textualRating", "Unknown")) | |
review_url = str(review.get("url", "")) | |
if claim_text: | |
factcheck_results.append( | |
f"Claim: {claim_text}, Rating: {rating}, " + | |
f"Source: {publisher}, URL: {review_url}" | |
) | |
source_count["factcheck"] += 1 | |
except Exception as e: | |
logger.error(f"Error processing FactCheck result: {e}") | |
break # Break once we have results | |
else: | |
logger.info(f"No results for query '{current_query}', trying shorter version.") | |
except Exception as e: | |
logger.error(f"Error in FactCheck retrieval: {e}") | |
# Safely log evidence retrieval | |
try: | |
success = bool(factcheck_results) | |
performance_tracker.log_evidence_retrieval(success, source_count) | |
except Exception as e: | |
logger.error(f"Error logging evidence retrieval: {e}") | |
if not factcheck_results: | |
logger.warning("No factcheck evidence found after trying all query variants.") | |
return factcheck_results | |
def retrieve_news_articles(claim): | |
"""Retrieve evidence from NewsAPI for a given claim with improved single request approach""" | |
logger.info(f"Retrieving evidence from News API for: {claim}") | |
# Get API key | |
news_api_key = NEWS_API_KEY | |
if not news_api_key: | |
logger.error("No NewsAPI key available") | |
return [] | |
news_results = [] | |
source_count = {"news": 0} | |
# Get date range for recent news | |
from_date, to_date = get_recent_date_range() | |
logger.info(f"Filtering for news from {from_date} to {to_date}") | |
try: | |
# Extract a simplified claim for better matching | |
shortened_claim = shorten_claim_for_evidence(claim) | |
# Use a single endpoint with proper parameters | |
encoded_query = urlencode({"q": shortened_claim}) | |
# Use the 'everything' endpoint as it's more comprehensive | |
news_api_url = f"https://newsapi.org/v2/everything?{encoded_query}&apiKey={news_api_key}&language=en&pageSize=5&sortBy=publishedAt&from={from_date}&to={to_date}" | |
log_url = news_api_url.replace(news_api_key, "API_KEY_REDACTED") | |
logger.info(f"Requesting: {log_url}") | |
# Make a single request with proper headers and reduced timeout | |
headers = { | |
"User-Agent": "MisinformationDetectionResearchBot/1.0", | |
"X-Api-Key": news_api_key, | |
"Accept": "application/json" | |
} | |
response = requests.get( | |
news_api_url, | |
headers=headers, | |
timeout=8 | |
) | |
logger.info(f"Response status: {response.status_code}") | |
if response.status_code == 200: | |
data = safe_json_parse(response, "newsapi") | |
if data.get("status") == "ok": | |
articles = data.get("articles", []) | |
logger.info(f"Found {len(articles)} articles") | |
for article in articles: | |
try: | |
# Robust article parsing | |
title = str(article.get("title", "")) | |
description = str(article.get("description", "")) | |
content = str(article.get("content", "")) | |
source_name = str(article.get("source", {}).get("name", "Unknown")) | |
url = str(article.get("url", "")) | |
published_at = str(article.get("publishedAt", "")) | |
# Parse date to prioritize recent content | |
article_date = None | |
try: | |
if published_at: | |
article_date = datetime.strptime(published_at.split('T')[0], '%Y-%m-%d') | |
except Exception as date_error: | |
logger.warning(f"Could not parse date: {published_at}") | |
# Calculate recency score (higher = more recent) | |
recency_score = 1.0 # Default | |
if article_date: | |
days_old = (datetime.now() - article_date).days | |
if days_old == 0: # Today | |
recency_score = 3.0 | |
elif days_old == 1: # Yesterday | |
recency_score = 2.0 | |
# Use description if content is empty or too short | |
if not content or len(content) < 50: | |
content = description | |
# Truncate content to reduce token usage | |
if len(content) > 250: | |
content = content[:247] + "..." | |
# Ensure meaningful content | |
if title and (content or description): | |
news_item = { | |
"text": ( | |
f"Title: {title}, " + | |
f"Source: {source_name}, " + | |
f"Date: {published_at}, " + | |
f"URL: {url}, " + | |
f"Content: {content}" | |
), | |
"recency_score": recency_score, | |
"date": article_date | |
} | |
news_results.append(news_item) | |
source_count["news"] += 1 | |
logger.info(f"Added article: {title}") | |
except Exception as article_error: | |
logger.error(f"Error processing article: {article_error}") | |
# Sort results by recency | |
if news_results: | |
news_results.sort(key=lambda x: x.get('recency_score', 0), reverse=True) | |
except Exception as query_error: | |
logger.error(f"Error processing query: {query_error}") | |
# Convert to plain text list for compatibility with existing code | |
news_texts = [item["text"] for item in news_results] | |
# Log evidence retrieval | |
try: | |
success = bool(news_texts) | |
performance_tracker.log_evidence_retrieval(success, source_count) | |
except Exception as log_error: | |
logger.error(f"Error logging evidence retrieval: {log_error}") | |
# Log results | |
if news_texts: | |
logger.info(f"Retrieved {len(news_texts)} news articles") | |
else: | |
logger.warning("No news articles found") | |
return news_texts | |
def retrieve_combined_evidence(claim): | |
""" | |
Retrieve evidence from multiple sources in parallel and analyze relevance using semantic similarity | |
with category-aware source prioritization and optimized parallel processing | |
""" | |
logger.info(f"Starting evidence retrieval for: {claim}") | |
start_time = time.time() | |
# Use the category detector to prioritize sources | |
from modules.category_detection import get_prioritized_sources, get_category_specific_rss_feeds | |
# Get source priorities based on claim category | |
priorities = get_prioritized_sources(claim) | |
claim_category = priorities.get("category", "general") | |
requires_recent_evidence = priorities.get("requires_recent", False) | |
logger.info(f"Detected claim category: {claim_category} (recent: {requires_recent_evidence})") | |
# Initialize results dictionary | |
results = { | |
"wikipedia": [], | |
"wikidata": [], | |
"claimreview": [], | |
"news": [], | |
"scholarly": [], | |
"rss": [] | |
} | |
# Track source counts and relevant evidence | |
source_counts = {} | |
relevant_evidence = {} | |
total_evidence_count = 0 | |
relevant_evidence_count = 0 | |
# Define primary and secondary sources outside the try block | |
# so they're available in the except block | |
primary_sources = [] | |
for source_name in priorities.get("primary", []): | |
if source_name == "wikipedia": | |
primary_sources.append(("wikipedia", retrieve_evidence_from_wikipedia, claim)) | |
elif source_name == "wikidata": | |
primary_sources.append(("wikidata", retrieve_evidence_from_wikidata, claim)) | |
elif source_name == "claimreview": | |
primary_sources.append(("claimreview", retrieve_evidence_from_claimreview, claim)) | |
elif source_name == "news": | |
primary_sources.append(("news", retrieve_news_articles, claim)) | |
elif source_name == "scholarly": | |
primary_sources.append(("scholarly", retrieve_evidence_from_openalex, claim)) | |
elif source_name == "rss": | |
# Get category-specific RSS max count | |
max_results = 8 if requires_recent_evidence else 5 | |
# If the claim is science or technology related and we need to optimize | |
# use category-specific RSS feeds | |
if claim_category in ["science", "technology", "politics"]: | |
# Get specialized RSS module to temporarily use category-specific feeds | |
category_feeds = get_category_specific_rss_feeds(claim_category) | |
if category_feeds: | |
primary_sources.append(("rss", retrieve_evidence_from_rss, claim, max_results, category_feeds)) | |
else: | |
primary_sources.append(("rss", retrieve_evidence_from_rss, claim, max_results)) | |
else: | |
primary_sources.append(("rss", retrieve_evidence_from_rss, claim, max_results)) | |
# Prepare secondary sources | |
secondary_sources = [] | |
for source_name in priorities.get("secondary", []): | |
if source_name == "wikipedia": | |
secondary_sources.append(("wikipedia", retrieve_evidence_from_wikipedia, claim)) | |
elif source_name == "wikidata": | |
secondary_sources.append(("wikidata", retrieve_evidence_from_wikidata, claim)) | |
elif source_name == "claimreview": | |
secondary_sources.append(("claimreview", retrieve_evidence_from_claimreview, claim)) | |
elif source_name == "news": | |
secondary_sources.append(("news", retrieve_news_articles, claim)) | |
elif source_name == "scholarly": | |
secondary_sources.append(("scholarly", retrieve_evidence_from_openalex, claim)) | |
elif source_name == "rss": | |
max_results = 5 if requires_recent_evidence else 3 | |
# Use category-specific feeds if available | |
if claim_category in ["science", "technology", "politics"]: | |
category_feeds = get_category_specific_rss_feeds(claim_category) | |
if category_feeds: | |
secondary_sources.append(("rss", retrieve_evidence_from_rss, claim, max_results, category_feeds)) | |
else: | |
secondary_sources.append(("rss", retrieve_evidence_from_rss, claim, max_results)) | |
else: | |
secondary_sources.append(("rss", retrieve_evidence_from_rss, claim, max_results)) | |
# Optimize parallel processing for evidence retrieval with early results processing | |
try: | |
# Define function to safely retrieve evidence | |
def safe_retrieve(source_name, retrieval_func, *args): | |
try: | |
source_result = retrieval_func(*args) or [] | |
return source_name, source_result | |
except Exception as e: | |
logger.error(f"Error retrieving from {source_name}: {str(e)}") | |
return source_name, [] | |
# Define function to analyze evidence relevance | |
def analyze_evidence_quick(evidence_items, claim_text): | |
if not evidence_items or not claim_text: | |
return [] | |
# Extract important keywords from claim | |
keywords = [word.lower() for word in claim_text.split() if len(word) > 3] | |
# Check for direct relevance | |
relevant_items = [] | |
for evidence in evidence_items: | |
if not isinstance(evidence, str): | |
continue | |
evidence_lower = evidence.lower() | |
# Check if evidence contains any important keywords from claim | |
if any(keyword in evidence_lower for keyword in keywords): | |
relevant_items.append(evidence) | |
continue | |
# Check for claim subject in evidence (e.g. "earth" in "earth is flat") | |
claim_parts = claim_text.split() | |
if len(claim_parts) > 0 and claim_parts[0].lower() in evidence_lower: | |
relevant_items.append(evidence) | |
continue | |
return relevant_items | |
# Use ThreadPoolExecutor with a reasonable number of workers | |
# Start with primary sources first - use all available sources in parallel | |
with ThreadPoolExecutor(max_workers=min(4, len(primary_sources))) as executor: | |
# Submit all primary source tasks | |
futures_to_source = { | |
executor.submit(safe_retrieve, source_name, func, *args): source_name | |
for source_name, func, *args in primary_sources | |
} | |
# Track completed sources | |
completed_sources = set() | |
# Process results as they complete using as_completed for early processing | |
for future in as_completed(futures_to_source): | |
try: | |
source_name, source_results = future.result() | |
results[source_name] = source_results | |
source_counts[source_name] = len(source_results) | |
completed_sources.add(source_name) | |
logger.info(f"Retrieved {len(source_results)} results from {source_name}") | |
# Quick relevance analysis | |
if source_results: | |
relevant_items = analyze_evidence_quick(source_results, claim) | |
relevant_evidence[source_name] = relevant_items | |
total_evidence_count += len(source_results) | |
relevant_evidence_count += len(relevant_items) | |
logger.info(f"Found {len(relevant_items)} relevant items out of {len(source_results)} from {source_name}") | |
# Start background pre-analysis while waiting for other sources | |
try: | |
executor.submit( | |
analyze_early_evidence, | |
claim, | |
source_name, | |
source_results | |
) | |
except Exception as e: | |
logger.error(f"Error in early evidence analysis: {e}") | |
except Exception as e: | |
logger.error(f"Error processing future result: {str(e)}") | |
# Check if we have sufficient RELEVANT evidence from primary sources | |
# If not enough relevant evidence, query secondary sources | |
# in parallel even if we have a lot of total evidence | |
if relevant_evidence_count < 2: | |
logger.info(f"Only found {relevant_evidence_count} relevant evidence items, querying secondary sources") | |
# Add Wikipedia and Wikidata if they weren't in primary sources and haven't been queried yet | |
must_check_sources = [] | |
if "wikipedia" not in completed_sources: | |
must_check_sources.append(("wikipedia", retrieve_evidence_from_wikipedia, claim)) | |
if "wikidata" not in completed_sources: | |
must_check_sources.append(("wikidata", retrieve_evidence_from_wikidata, claim)) | |
# Combine with other secondary sources | |
remaining_sources = must_check_sources + [ | |
(source_name, func, *args) for source_name, func, *args in secondary_sources | |
if source_name not in completed_sources | |
] | |
with ThreadPoolExecutor(max_workers=min(3, len(remaining_sources))) as executor: | |
# Submit all secondary source tasks | |
futures_to_source = { | |
executor.submit(safe_retrieve, source_name, func, *args): source_name | |
for source_name, func, *args in remaining_sources | |
} | |
# Process results as they complete | |
for future in as_completed(futures_to_source): | |
try: | |
source_name, source_results = future.result() | |
results[source_name] = source_results | |
source_counts[source_name] = len(source_results) | |
logger.info(f"Retrieved {len(source_results)} results from {source_name}") | |
# Quick relevance analysis for these as well | |
if source_results: | |
relevant_items = analyze_evidence_quick(source_results, claim) | |
relevant_evidence[source_name] = relevant_items | |
total_evidence_count += len(source_results) | |
relevant_evidence_count += len(relevant_items) | |
logger.info(f"Found {len(relevant_items)} relevant items out of {len(source_results)} from {source_name}") | |
except Exception as e: | |
logger.error(f"Error processing future result: {str(e)}") | |
except Exception as e: | |
logger.error(f"Error in parallel evidence retrieval: {str(e)}") | |
# Fall back to sequential retrieval as a last resort | |
try: | |
logger.warning("Falling back to sequential retrieval due to parallel execution failure") | |
# Sequential retrieval as fallback method - now primary_sources is in scope | |
for source_name, func, *args in primary_sources: | |
try: | |
results[source_name] = func(*args) or [] | |
source_counts[source_name] = len(results[source_name]) | |
except Exception as source_error: | |
logger.error(f"Error in sequential {source_name} retrieval: {str(source_error)}") | |
# For sequential retrieval, always check Wikipedia and Wikidata as fallbacks | |
if "wikipedia" not in completed_sources: | |
try: | |
results["wikipedia"] = retrieve_evidence_from_wikipedia(claim) or [] | |
source_counts["wikipedia"] = len(results["wikipedia"]) | |
except Exception as e: | |
logger.error(f"Error in fallback Wikipedia retrieval: {e}") | |
if "wikidata" not in completed_sources: | |
try: | |
results["wikidata"] = retrieve_evidence_from_wikidata(claim) or [] | |
source_counts["wikidata"] = len(results["wikidata"]) | |
except Exception as e: | |
logger.error(f"Error in fallback Wikidata retrieval: {e}") | |
except Exception as fallback_error: | |
logger.error(f"Error in fallback sequential retrieval: {str(fallback_error)}") | |
# Gather all evidence | |
all_evidence = [] | |
for source, items in results.items(): | |
if isinstance(items, list): | |
for item in items: | |
if item and isinstance(item, str): | |
all_evidence.append(item) | |
# Skip processing if no evidence | |
if not all_evidence: | |
logger.warning("No evidence collected") | |
# Fallback: try direct search for the claim subject | |
try: | |
logger.info("No evidence found, trying fallback subject search") | |
# Extract the main subject using NLP | |
nlp = get_nlp_model() | |
doc = nlp(claim) | |
# Find main subject entities or nouns | |
subjects = [] | |
for ent in doc.ents: | |
if ent.label_ in ["PERSON", "ORG", "GPE"]: | |
subjects.append(ent.text) | |
# If no entities found, use first noun phrase | |
if not subjects: | |
for chunk in doc.noun_chunks: | |
subjects.append(chunk.text) | |
break | |
if subjects: | |
# Try a direct search with just the subject | |
logger.info(f"Trying fallback search with subject: {subjects[0]}") | |
# Make sure we try Wikipedia for the subject regardless of priorities | |
try: | |
wiki_evidence = retrieve_evidence_from_wikipedia(subjects[0]) or [] | |
all_evidence.extend(wiki_evidence) | |
logger.info(f"Retrieved {len(wiki_evidence)} results from fallback Wikipedia search") | |
except Exception as e: | |
logger.error(f"Error in fallback Wikipedia search: {e}") | |
# If still no evidence, try other sources | |
if not all_evidence: | |
# Do fallback searches in parallel | |
with ThreadPoolExecutor(max_workers=2) as executor: | |
fallback_futures = { | |
"news": executor.submit(retrieve_news_articles, subjects[0]), | |
"wikidata": executor.submit(retrieve_evidence_from_wikidata, subjects[0]) | |
} | |
# Process results as they complete | |
for source, future in fallback_futures.items(): | |
try: | |
fallback_results = future.result() or [] | |
if fallback_results: | |
all_evidence.extend(fallback_results[:2]) # Add up to 2 results from each | |
logger.info(f"Retrieved {len(fallback_results)} results from fallback {source} search") | |
except Exception as e: | |
logger.error(f"Error in fallback {source} search: {str(e)}") | |
except Exception as subj_error: | |
logger.error(f"Error in fallback subject search: {str(subj_error)}") | |
# If still no evidence, return empty list | |
if not all_evidence: | |
return [] | |
# Use semantic analysis to score and select the most relevant evidence | |
try: | |
# For science and technology claims, boost the weight of scholarly sources | |
if claim_category in ["science", "technology"]: | |
from config import SOURCE_CREDIBILITY | |
# Create a temporary copy with boosted reliability for relevant sources | |
enhanced_credibility = dict(SOURCE_CREDIBILITY) | |
# Add enhanced weights for scientific sources | |
from modules.category_detection import SOURCE_RELIABILITY_BY_CATEGORY | |
for domain, reliability in SOURCE_RELIABILITY_BY_CATEGORY.get(claim_category, {}).items(): | |
enhanced_credibility[domain] = reliability | |
# Use the enhanced credibility for evidence analysis | |
analyzed_evidence = analyze_evidence_relevance(claim, all_evidence, enhanced_credibility) | |
else: | |
# Analyze evidence relevance using semantic similarity with default weights | |
from config import SOURCE_CREDIBILITY | |
analyzed_evidence = analyze_evidence_relevance(claim, all_evidence, SOURCE_CREDIBILITY) | |
# Log evidence scoring | |
logger.info(f"Analyzed {len(analyzed_evidence)} evidence items") | |
# Select diverse, relevant evidence items | |
final_evidence = select_diverse_evidence(analyzed_evidence, max_items=5) | |
# Log source distribution and selected count | |
logger.info(f"Evidence source distribution: {source_counts}") | |
logger.info(f"Selected evidence count: {len(final_evidence)}") | |
# Return maximum 5 evidence items (to control API costs) | |
return final_evidence[:5] | |
except Exception as e: | |
logger.error(f"Error in evidence analysis: {str(e)}") | |
# Fallback to simple selection (top 5 items) | |
return all_evidence[:5] |