Spaces:
Sleeping
Sleeping
import requests | |
import pandas as pd | |
import gradio as gr | |
import datetime | |
import nltk | |
from datetime import datetime, timedelta | |
from nltk.sentiment.vader import SentimentIntensityAnalyzer | |
nltk.download('vader_lexicon') | |
from transformers import pipeline | |
summarizer = pipeline("summarization", model="sshleifer/distilbart-cnn-12-6") | |
from dotenv import load_dotenv | |
import os | |
load_dotenv() | |
api_key = os.getenv("API_KEY") | |
if not api_key: | |
raise ValueError("API_KEY not found. Make sure to set it in the .env file.") | |
SOURCE_BIAS_MAP = { | |
"fox news": "right", | |
"breitbart": "right", | |
"new york post": "right", | |
"the wall street journal": "center-right", | |
"reuters": "center", | |
"associated press": "center", | |
"bloomberg": "center", | |
"npr": "center-left", | |
"cnn": "left", | |
"msnbc": "left", | |
"the new york times": "left", | |
"the washington post": "left", | |
"the guardian": "left", | |
"bbc news": "center", | |
"sky news": "center-right", | |
"the telegraph": "right", | |
"the times": "center-right", | |
"daily mail": "right", | |
"the independent": "center-left", | |
"the sun": "right", | |
"financial times": "center", | |
} | |
BIAS_SCORE_MAP = { | |
"left": -1, | |
"center-left": -0.5, | |
"center": 0, | |
"center-right": 0.5, | |
"right": 1, | |
"unknown": 0 | |
} | |
def query(query, sort_by="popularity", max_tokens=100): | |
if query == "": | |
print("Topic needs to be passed in") | |
return | |
today = datetime.today() | |
seven_days_ago = today - timedelta(days=7) | |
from_date = seven_days_ago.strftime('%Y-%m-%d') | |
to_date = today.strftime('%Y-%m-%d') | |
base_url = "https://newsapi.org/v2/everything" | |
url = f"{base_url}?q={query}&from={from_date}&to={to_date}&sortBy={sort_by}&apiKey={api_key}" | |
news = None | |
try: | |
news_response = requests.get(url, timeout=10) | |
if news_response.status_code == 200: | |
news = news_response.json() | |
else: | |
print("API error has occured", news_response.status_code) | |
except Exception: | |
print('An exception occurred') | |
article_arr = news["articles"] | |
extracted_data = [] | |
for article in article_arr: | |
extracted_data.append({ | |
"title": article.get("title", "N/A"), | |
"description": article.get("description", "N/A"), | |
"source_name": article.get("source", {}).get("name", "N/A"), | |
"url": article.get("url", "N/A"), | |
"publishedAt": article.get("publishedAt", "N/A") | |
}) | |
df = pd.DataFrame(extracted_data) | |
return df | |
def process_data(df): | |
df_cleaned = df.dropna(subset=["title", "description"]) | |
df_cleaned = df_cleaned[df_cleaned["title"].str.strip() != ""] | |
df_cleaned = df_cleaned[df_cleaned["description"].str.strip() != ""] | |
df_cleaned = df_cleaned.drop_duplicates(subset=["title", "url"]) | |
df_cleaned["text"] = df_cleaned["title"] + df_cleaned["description"].str.lower() | |
return df_cleaned | |
def analyse_sentiment(df): | |
analyser = SentimentIntensityAnalyzer() | |
df['compound'] = [analyser.polarity_scores(x)['compound'] for x in df['text']] | |
df['neg'] = [analyser.polarity_scores(x)['neg'] for x in df['text']] | |
df['neu'] = [analyser.polarity_scores(x)['neu'] for x in df['text']] | |
df['pos'] = [analyser.polarity_scores(x)['pos'] for x in df['text']] | |
def label_sentiment(score): | |
if score >= 0.05: | |
return "positive" | |
elif score <= -0.05: | |
return "negative" | |
else: | |
return "neutral" | |
df['sentiment_label'] = df['compound'].apply(label_sentiment) | |
return df | |
def get_bias_label(source_name): | |
source = source_name.strip().lower() | |
return SOURCE_BIAS_MAP.get(source, "unknown") | |
def add_bias_annotation(df): | |
df['bias_label'] = df['source_name'].apply(get_bias_label) | |
return df | |
def set_article_extremity(df, top_n=5): | |
def get_bias_extremity(label): | |
return BIAS_SCORE_MAP.get(label, 0) | |
df['bias_score'] = df['bias_label'].apply(get_bias_extremity) | |
df['extremity_score'] = df['compound'].abs() + df['bias_score'].abs() | |
df['extremity_pct'] = (df['extremity_score'] / 2) * 100 | |
df['extremity_pct'] = df['extremity_pct'].round(1) | |
df = df.sort_values(by='extremity_score', ascending=False) | |
df['extreme'] = False | |
df.loc[df.index[:top_n], 'extreme'] = True | |
return df | |
def summarise_text(row, max_tokens=512): | |
try: | |
text = row['text'] if 'text' in row and pd.notna(row['text']) else '' | |
source_name = row['source_name'] if 'source_name' in row and pd.notna(row['source_name']) else 'unknown' | |
input_length = len(text.split()) | |
if input_length < 40: | |
max_length = max(10, int(input_length / 2)) | |
else: | |
max_length = min(input_length - 10, max_tokens) | |
min_length = max(10, max_length - 10) | |
summary = summarizer(text, max_length=max_length, min_length=min_length, do_sample=False) | |
summary_text = summary[0]['summary_text'] | |
bias_label = get_bias_label(source_name) | |
return pd.Series({ | |
'summary': summary_text, | |
'bias_score': bias_label, | |
'source': source_name | |
}) | |
except Exception as e: | |
print(f"Error summarising row: {e}") | |
return pd.Series({ | |
'summary': 'Summary unavailable', | |
'bias_score': 'unknown', | |
'source': 'unknown' | |
}) | |
def add_article_summaries(df, max_tokens=512): | |
summary_df = df.apply(summarise_text, axis=1, max_tokens=max_tokens) | |
df[['summary', 'bias_score', 'source']] = summary_df | |
return df | |
def main(): | |
raw_df = query("Tesla") | |
processed_df = process_data(raw_df) | |
sentiment_df = analyse_sentiment(processed_df) | |
bias_df = add_bias_annotation(sentiment_df) | |
extremity_df = set_article_extremity(bias_df) | |
final_df = add_article_summaries(extremity_df) | |
if __name__ == "__main__": | |
main() | |