media-trust-analyser / media_trust.py
ModelForge's picture
article count changed
a7a3f24 verified
raw
history blame
6.7 kB
import requests
import pandas as pd
import gradio as gr
import datetime
import nltk
from datetime import datetime, timedelta
from nltk.sentiment.vader import SentimentIntensityAnalyzer
try:
nltk.data.find('sentiment/vader_lexicon')
except LookupError:
nltk.download('vader_lexicon')
from transformers import pipeline
summarizer = pipeline("summarization", model="sshleifer/distilbart-cnn-12-6")
from concurrent.futures import ThreadPoolExecutor
from dotenv import load_dotenv
import os
from concurrent.futures import ThreadPoolExecutor
load_dotenv()
api_key = os.getenv("API_KEY")
if not api_key:
raise ValueError("API_KEY not found. Make sure to set it in the .env file.")
SOURCE_BIAS_MAP = {
"fox news": "right",
"breitbart": "right",
"new york post": "right",
"the wall street journal": "center-right",
"reuters": "center",
"associated press": "center",
"bloomberg": "center",
"npr": "center-left",
"cnn": "left",
"msnbc": "left",
"the new york times": "left",
"the washington post": "left",
"the guardian": "left",
"bbc news": "center",
"sky news": "center-right",
"the telegraph": "right",
"the times": "center-right",
"daily mail": "right",
"the independent": "center-left",
"the sun": "right",
"financial times": "center",
}
BIAS_SCORE_MAP = {
"left": -1,
"center-left": -0.5,
"center": 0,
"center-right": 0.5,
"right": 1,
"unknown": 0
}
def query(topic, sort_by="popularity", max_tokens=100):
if not topic:
print("Topic must be provided.")
return None
today = datetime.today()
last_week = today - timedelta(days=7)
from_date = last_week.strftime('%Y-%m-%d')
to_date = today.strftime('%Y-%m-%d')
base_url = "https://newsapi.org/v2/everything"
url = (
f"{base_url}?q={topic}&from={from_date}&to={to_date}"
f"&sortBy={sort_by}&pageSize=20&apiKey={api_key}"
)
try:
response = requests.get(url, timeout=10)
if response.status_code != 200:
print(f"API returned error: {response.status_code}")
return None
data = response.json()
if data.get("totalResults", 0) == 0:
print("No articles found for the given query and date range.")
return None
articles = data.get("articles", [])
extracted = [
{
"title": article.get("title", "N/A"),
"description": article.get("description", "N/A"),
"source_name": article.get("source", {}).get("name", "N/A"),
"url": article.get("url", "N/A"),
"publishedAt": article.get("publishedAt", "N/A"),
}
for article in articles
]
return pd.DataFrame(extracted)
except Exception as e:
print(f"An error occurred: {e}")
return None
def process_data(df):
if df is None or df.empty or not all(col in df.columns for col in ["title", "description"]):
print("Invalid or empty DataFrame passed to process_data()")
return pd.DataFrame()
df_cleaned = df.dropna(subset=["title", "description"])
df_cleaned = df_cleaned[df_cleaned["title"].str.strip() != ""]
df_cleaned = df_cleaned[df_cleaned["description"].str.strip() != ""]
df_cleaned = df_cleaned.drop_duplicates(subset=["title", "url"])
df_cleaned["text"] = df_cleaned["title"] + df_cleaned["description"].str.lower()
return df_cleaned
def analyse_sentiment(df):
analyser = SentimentIntensityAnalyzer()
def get_scores(text):
scores = analyser.polarity_scores(text)
return scores['compound'], scores['neg'], scores['neu'], scores['pos']
with ThreadPoolExecutor(max_workers=4) as executor:
results = list(executor.map(get_scores, df['text']))
df[['compound', 'neg', 'neu', 'pos']] = results
def label_sentiment(score):
if score >= 0.05:
return "positive"
elif score <= -0.05:
return "negative"
else:
return "neutral"
df['sentiment_label'] = df['compound'].apply(label_sentiment)
return df
def get_bias_label(source_name):
source = source_name.strip().lower()
return SOURCE_BIAS_MAP.get(source, "unknown")
def add_bias_annotation(df):
bias_series = pd.Series(SOURCE_BIAS_MAP)
df['bias_label'] = df['source_name'].str.strip().str.lower().map(bias_series).fillna("unknown")
return df
def set_article_extremity(df, top_n=5):
def get_bias_extremity(label):
return BIAS_SCORE_MAP.get(label, 0)
df['bias_score'] = df['bias_label'].apply(get_bias_extremity)
df['extremity_score'] = df['compound'].abs() + df['bias_score'].abs()
df['extremity_pct'] = (df['extremity_score'] / 2) * 100
df['extremity_pct'] = df['extremity_pct'].round(1)
df = df.sort_values(by='extremity_score', ascending=False)
df['extreme'] = False
df.loc[df.index[:top_n], 'extreme'] = True
return df
def summarise_text(row, max_tokens=512):
try:
text = row['text'] if 'text' in row and pd.notna(row['text']) else ''
source_name = row['source_name'] if 'source_name' in row and pd.notna(row['source_name']) else 'unknown'
input_length = len(text.split())
max_length = min(input_length - 10, max_tokens)
min_length = max(10, max_length - 10)
summary = summarizer(text, max_length=max_length, min_length=min_length, do_sample=False)
summary_text = summary[0]['summary_text']
bias_label = get_bias_label(source_name)
return pd.Series({'summary': summary_text, 'bias_score': bias_label, 'source': source_name})
except Exception as e:
print(f"Error summarising row: {e}")
return pd.Series({'summary': 'Summary unavailable', 'bias_score': 'unknown', 'source': 'unknown'})
def add_article_summaries(df, max_tokens=512):
with ThreadPoolExecutor(max_workers=4) as executor:
summaries = list(executor.map(lambda row: summarise_text(row, max_tokens), df.to_dict('records')))
summary_df = pd.DataFrame(summaries)
df[['summary', 'bias_score', 'source']] = summary_df
return df
def main():
raw_df = query("Tesla")
if raw_df is None or raw_df.empty:
print("No data found!")
return
processed_df = process_data(raw_df)
analyser = SentimentIntensityAnalyzer()
sentiment_df = analyse_sentiment(processed_df, analyser)
bias_df = add_bias_annotation(sentiment_df)
extremity_df = set_article_extremity(bias_df)
final_df = add_article_summaries(extremity_df)
print(final_df.head())
if __name__ == "__main__":
main()