Spaces:
Running
Running
import os | |
from dotenv import load_dotenv | |
load_dotenv() | |
import requests | |
from bs4 import BeautifulSoup | |
from newsapi import NewsApiClient | |
import pandas as pd | |
import torch | |
import soundfile as sf | |
import gradio as gr | |
from transformers import ( | |
AutoModelForSequenceClassification, AutoTokenizer, pipeline, | |
BartTokenizer, BartForConditionalGeneration, | |
MarianMTModel, MarianTokenizer, | |
BarkModel, AutoProcessor | |
) | |
# ------------------------- | |
# Global Setup and Environment Variables | |
# ------------------------- | |
NEWS_API_KEY = os.getenv("NEWS_API_KEY") # Set this in your .env file | |
device = "cuda:0" if torch.cuda.is_available() else "cpu" | |
# ------------------------- | |
# News Extraction Functions | |
# ------------------------- | |
def fetch_and_scrape_news(company, api_key, count=11, output_file='news_articles.xlsx'): | |
newsapi = NewsApiClient(api_key=api_key) | |
all_articles = newsapi.get_everything(q=company, language='en', sort_by='relevancy', page_size=count) | |
articles = all_articles.get('articles', []) | |
scraped_data = [] | |
for article in articles: | |
url = article.get('url') | |
if url: | |
scraped_article = scrape_news(url) | |
if scraped_article: | |
scraped_article['url'] = url | |
scraped_data.append(scraped_article) | |
df = pd.DataFrame(scraped_data) | |
df.to_excel(output_file, index=False, header=True) | |
print(f"News scraping complete. Data saved to {output_file}") | |
return df | |
def scrape_news(url): | |
headers = {"User-Agent": "Mozilla/5.0"} | |
response = requests.get(url, headers=headers) | |
if response.status_code != 200: | |
print(f"Failed to fetch the page: {url}") | |
return None | |
soup = BeautifulSoup(response.text, "html.parser") | |
headline = soup.find("h1").get_text(strip=True) if soup.find("h1") else "No headline found" | |
paragraphs = soup.find_all("p") | |
article_text = " ".join(p.get_text(strip=True) for p in paragraphs) | |
return {"headline": headline, "content": article_text} | |
# ------------------------- | |
# Sentiment Analysis Setup | |
# ------------------------- | |
sentiment_model_name = "cross-encoder/nli-distilroberta-base" | |
sentiment_model = AutoModelForSequenceClassification.from_pretrained( | |
sentiment_model_name, | |
torch_dtype=torch.float16, | |
device_map="auto" | |
) | |
sentiment_tokenizer = AutoTokenizer.from_pretrained(sentiment_model_name) | |
classifier = pipeline("zero-shot-classification", model=sentiment_model, tokenizer=sentiment_tokenizer) | |
labels = ["positive", "negative", "neutral"] | |
# ------------------------- | |
# Summarization Setup | |
# ------------------------- | |
bart_tokenizer = BartTokenizer.from_pretrained('facebook/bart-large-cnn') | |
bart_model = BartForConditionalGeneration.from_pretrained('facebook/bart-large-cnn') | |
def split_into_chunks(text, tokenizer, max_tokens=1024): | |
words = text.split() | |
chunks = [] | |
current_chunk = [] | |
current_length = 0 | |
for word in words: | |
tokenized_word = tokenizer.encode(word, add_special_tokens=False) | |
if current_length + len(tokenized_word) <= max_tokens: | |
current_chunk.append(word) | |
current_length += len(tokenized_word) | |
else: | |
chunks.append(' '.join(current_chunk)) | |
current_chunk = [word] | |
current_length = len(tokenized_word) | |
if current_chunk: | |
chunks.append(' '.join(current_chunk)) | |
return chunks | |
# ------------------------- | |
# Translation Setup (English to Hindi) | |
# ------------------------- | |
translation_model_name = 'Helsinki-NLP/opus-mt-en-hi' | |
trans_tokenizer = MarianTokenizer.from_pretrained(translation_model_name) | |
trans_model = MarianMTModel.from_pretrained(translation_model_name) | |
def translate_text(text): | |
tokens = trans_tokenizer(text, return_tensors="pt", padding=True) | |
translated = trans_model.generate(**tokens) | |
return trans_tokenizer.decode(translated[0], skip_special_tokens=True) | |
# ------------------------- | |
# Bark TTS Setup (Hindi) | |
# ------------------------- | |
bark_model = BarkModel.from_pretrained("suno/bark-small").to(device) | |
processor = AutoProcessor.from_pretrained("suno/bark") | |
# ------------------------- | |
# Main Pipeline Function | |
# ------------------------- | |
def process_company(company): | |
# Step 1: Fetch and scrape news | |
fetch_and_scrape_news(company, NEWS_API_KEY) | |
df = pd.read_excel('news_articles.xlsx') | |
print("Scraped Articles:") | |
print(df) | |
articles_data = [] | |
for index, row in df.iterrows(): | |
article_text = row.get("content", "") | |
title = row.get("headline", "No title") | |
url = row.get("url", "") | |
chunks = split_into_chunks(article_text, bart_tokenizer) | |
chunk_summaries = [] | |
for chunk in chunks: | |
inputs = bart_tokenizer([chunk], max_length=1024, return_tensors='pt', truncation=True) | |
summary_ids = bart_model.generate(inputs.input_ids, num_beams=4, max_length=130, min_length=30, early_stopping=True) | |
chunk_summary = bart_tokenizer.decode(summary_ids[0], skip_special_tokens=True) | |
chunk_summaries.append(chunk_summary) | |
final_summary = ' '.join(chunk_summaries) | |
sentiment_result = classifier(final_summary, labels) | |
sentiment = sentiment_result["labels"][0] | |
articles_data.append({ | |
"Title": title, | |
"Summary": final_summary, | |
"Sentiment": sentiment, | |
"URL": url | |
}) | |
# Comparative Analysis: Build a simple sentiment distribution | |
sentiment_distribution = {"Positive": 0, "Negative": 0, "Neutral": 0} | |
for article in articles_data: | |
key = article["Sentiment"].capitalize() | |
sentiment_distribution[key] += 1 | |
# Step 2: Translate summaries and generate Hindi speech | |
translated_summaries = [translate_text(article["Summary"]) for article in articles_data] | |
final_translated_text = "\n\n".join(translated_summaries) | |
inputs = processor(final_translated_text, return_tensors="pt").to(device) | |
speech_output = bark_model.generate(**inputs) | |
audio_path = "final_summary.wav" | |
sf.write(audio_path, speech_output[0].cpu().numpy(), bark_model.generation_config.sample_rate) | |
# Build final report | |
report = { | |
"Company": company, | |
"Articles": articles_data, | |
"Comparative Sentiment Score": { | |
"Sentiment Distribution": sentiment_distribution, | |
"Coverage Differences": "Detailed comparative analysis not implemented", | |
"Topic Overlap": "Topic extraction not implemented" | |
}, | |
"Final Sentiment Analysis": "Overall sentiment analysis not fully computed", | |
"Audio": audio_path | |
} | |
return report, audio_path | |
# Gradio Interface Function | |
def gradio_interface(company): | |
report, audio_path = process_company(company) | |
return report, audio_path | |
# ------------------------- | |
# Gradio UI Setup | |
# ------------------------- | |
iface = gr.Interface( | |
fn=gradio_interface, | |
inputs=gr.Textbox(label="Enter Company Name"), | |
outputs=[ | |
gr.JSON(label="News Sentiment Report"), | |
gr.Audio(type="filepath", label="Hindi Summary Audio") | |
], | |
title="News Summarization & Text-to-Speech", | |
description="Enter a company name to fetch news articles, perform sentiment analysis, and listen to a Hindi TTS summary." | |
) | |
if __name__ == "__main__": | |
iface.launch() | |