Spaces:
Running
Running
import os | |
from dotenv import load_dotenv | |
load_dotenv() | |
import requests | |
from bs4 import BeautifulSoup | |
from newsapi import NewsApiClient | |
import pandas as pd | |
import torch | |
import soundfile as sf | |
import gradio as gr | |
from transformers import ( | |
AutoModelForSequenceClassification, AutoTokenizer, pipeline, | |
BartTokenizer, BartForConditionalGeneration, | |
MarianMTModel, MarianTokenizer, | |
BarkModel, AutoProcessor | |
) | |
import librosa | |
import re | |
# ------------------------- | |
# Global Setup and Environment Variables | |
# ------------------------- | |
NEWS_API_KEY = os.getenv("NEWS_API_KEY") # Set this in your .env file | |
device = "cpu" # Force CPU since no GPU is available in Hugging Face Spaces | |
# ------------------------- | |
# News Extraction Functions | |
# ------------------------- | |
def fetch_and_scrape_news(company, api_key, count=1, output_file='news_articles.xlsx'): | |
print("Starting news fetch from NewsAPI...") | |
newsapi = NewsApiClient(api_key=api_key) | |
all_articles = newsapi.get_everything(q=company, language='en', sort_by='relevancy', page_size=count) | |
articles = all_articles.get('articles', []) | |
scraped_data = [] | |
print(f"Found {len(articles)} articles. Starting scraping individual articles...") | |
for i, article in enumerate(articles): | |
url = article.get('url') | |
if url: | |
print(f"Scraping article {i+1}: {url}") | |
scraped_article = scrape_news(url) | |
if scraped_article: | |
scraped_article['url'] = url | |
scraped_data.append(scraped_article) | |
df = pd.DataFrame(scraped_data) | |
df.to_excel(output_file, index=False, header=True) | |
print(f"News scraping complete. Data saved to {output_file}") | |
return df | |
def scrape_news(url): | |
headers = {"User-Agent": "Mozilla/5.0"} | |
try: | |
response = requests.get(url, headers=headers, timeout=10) | |
response.raise_for_status() | |
except Exception as e: | |
print(f"Failed to fetch the page: {url} ({e})") | |
return None | |
soup = BeautifulSoup(response.text, "html.parser") | |
headline = soup.find("h1").get_text(strip=True) if soup.find("h1") else "No headline found" | |
paragraphs = soup.find_all("p") | |
article_text = " ".join(p.get_text(strip=True) for p in paragraphs) | |
return {"headline": headline, "content": article_text} | |
# ------------------------- | |
# Sentiment Analysis Setup | |
# ------------------------- | |
print("Loading sentiment analysis model...") | |
sentiment_model_name = "cross-encoder/nli-distilroberta-base" | |
sentiment_model = AutoModelForSequenceClassification.from_pretrained( | |
sentiment_model_name, | |
torch_dtype=torch.float32 | |
) | |
sentiment_tokenizer = AutoTokenizer.from_pretrained(sentiment_model_name) | |
classifier = pipeline("zero-shot-classification", model=sentiment_model, tokenizer=sentiment_tokenizer, device=-1) | |
labels = ["positive", "negative", "neutral"] | |
# ------------------------- | |
# Summarization Setup | |
# ------------------------- | |
print("Loading summarization model (BART)...") | |
bart_tokenizer = BartTokenizer.from_pretrained('facebook/bart-large-cnn') | |
bart_model = BartForConditionalGeneration.from_pretrained('facebook/bart-large-cnn') | |
def split_into_chunks(text, tokenizer, max_tokens=1024): | |
words = text.split() | |
chunks = [] | |
current_chunk = [] | |
current_length = 0 | |
for word in words: | |
tokenized_word = tokenizer.encode(word, add_special_tokens=False) | |
if current_length + len(tokenized_word) <= max_tokens: | |
current_chunk.append(word) | |
current_length += len(tokenized_word) | |
else: | |
chunks.append(' '.join(current_chunk)) | |
current_chunk = [word] | |
current_length = len(tokenized_word) | |
if current_chunk: | |
chunks.append(' '.join(current_chunk)) | |
return chunks | |
# ------------------------- | |
# Translation Setup (English to Hindi) | |
# ------------------------- | |
print("Loading translation model (MarianMT)...") | |
translation_model_name = 'Helsinki-NLP/opus-mt-en-hi' | |
trans_tokenizer = MarianTokenizer.from_pretrained(translation_model_name) | |
trans_model = MarianMTModel.from_pretrained(translation_model_name) | |
def translate_text(text): | |
tokens = trans_tokenizer(text, return_tensors="pt", padding=True) | |
translated = trans_model.generate(**tokens) | |
return trans_tokenizer.decode(translated[0], skip_special_tokens=True) | |
# ------------------------- | |
# Bark TTS Setup (Hindi) | |
# ------------------------- | |
print("Loading Bark TTS model...") | |
bark_model = BarkModel.from_pretrained("suno/bark-small") | |
bark_model.to(device) | |
processor = AutoProcessor.from_pretrained("suno/bark") | |
# ------------------------- | |
# Helper Functions for Audio and Text Preprocessing | |
# ------------------------- | |
def normalize_text(text): | |
return re.sub(r"[^\w\s]", "", text.lower()).strip() | |
def resample_audio(audio_array, orig_sr, target_sr=16000): | |
if orig_sr != target_sr: | |
audio_array = librosa.resample(audio_array, orig_sr=orig_sr, target_sr=target_sr) | |
return audio_array | |
# ------------------------- | |
# Main Pipeline Function | |
# ------------------------- | |
def process_company(company): | |
print(f"Processing company: {company}") | |
# Step 1: Fetch and scrape news | |
print("Fetching and scraping news...") | |
fetch_and_scrape_news(company, NEWS_API_KEY) | |
df = pd.read_excel('news_articles.xlsx') | |
print("Scraped Articles:") | |
print(df) | |
articles_data = [] | |
for index, row in df.iterrows(): | |
print(f"Processing article {index+1}...") | |
article_text = row.get("content", "") | |
title = row.get("headline", "No title") | |
url = row.get("url", "") | |
chunks = split_into_chunks(article_text, bart_tokenizer) | |
chunk_summaries = [] | |
for i, chunk in enumerate(chunks): | |
print(f"Summarizing chunk {i+1}/{len(chunks)}...") | |
inputs = bart_tokenizer([chunk], max_length=1024, return_tensors='pt', truncation=True) | |
summary_ids = bart_model.generate(inputs.input_ids, num_beams=2, max_length=50, min_length=30, early_stopping=True) | |
chunk_summary = bart_tokenizer.decode(summary_ids[0], skip_special_tokens=True) | |
chunk_summaries.append(chunk_summary) | |
final_summary = ' '.join(chunk_summaries) | |
print("Performing sentiment analysis...") | |
sentiment_result = classifier(final_summary, labels) | |
sentiment = sentiment_result["labels"][0] | |
articles_data.append({ | |
"Title": title, | |
"Summary": final_summary, | |
"Sentiment": sentiment, | |
"URL": url | |
}) | |
# Comparative Analysis: Build a simple sentiment distribution | |
sentiment_distribution = {"Positive": 0, "Negative": 0, "Neutral": 0} | |
for article in articles_data: | |
key = article["Sentiment"].capitalize() | |
sentiment_distribution[key] += 1 | |
print("Sentiment distribution computed.") | |
# Step 2: Translate summaries and generate Hindi speech | |
print("Translating summaries to Hindi...") | |
translated_summaries = [translate_text(article["Summary"]) for article in articles_data] | |
final_translated_text = "\n\n".join(translated_summaries) | |
print("Generating Hindi speech with Bark TTS...") | |
inputs = processor(final_translated_text, return_tensors="pt") | |
speech_output = bark_model.generate(**inputs) | |
audio_path = "final_summary.wav" | |
sf.write(audio_path, speech_output[0].cpu().numpy(), bark_model.generation_config.sample_rate) | |
print("Audio generated and saved.") | |
# Build final report | |
report = { | |
"Company": company, | |
"Articles": articles_data, | |
"Comparative Sentiment Score": { | |
"Sentiment Distribution": sentiment_distribution, | |
"Coverage Differences": "Detailed comparative analysis not implemented", | |
"Topic Overlap": "Topic extraction not implemented" | |
}, | |
"Final Sentiment Analysis": "Overall sentiment analysis not fully computed", | |
"Audio": audio_path | |
} | |
print("Final report prepared.") | |
return report, audio_path | |
# ------------------------- | |
# Gradio Interface Function | |
# ------------------------- | |
def gradio_interface(company): | |
print(f"Received input: {company}") | |
report, audio_path = process_company(company) | |
return report, audio_path | |
# ------------------------- | |
# Gradio UI Setup | |
# ------------------------- | |
iface = gr.Interface( | |
fn=gradio_interface, | |
inputs=gr.Textbox(label="Enter Company Name"), | |
outputs=[ | |
gr.JSON(label="News Sentiment Report"), | |
gr.Audio(type="filepath", label="Hindi Summary Audio") | |
], | |
title="News Summarization & Text-to-Speech", | |
description="Enter a company name to fetch news articles, perform sentiment analysis, and listen to a Hindi TTS summary." | |
) | |
if __name__ == "__main__": | |
iface.launch() | |