Spaces:
Running
Running
| import os | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| import requests | |
| from bs4 import BeautifulSoup | |
| from newsapi import NewsApiClient | |
| import pandas as pd | |
| import torch | |
| import soundfile as sf | |
| import gradio as gr | |
| from transformers import ( | |
| AutoModelForSequenceClassification, AutoTokenizer, pipeline, | |
| BartTokenizer, BartForConditionalGeneration, | |
| MarianMTModel, MarianTokenizer, | |
| BarkModel, AutoProcessor | |
| ) | |
| import librosa | |
| import re | |
| # ------------------------- | |
| # Global Setup and Environment Variables | |
| # ------------------------- | |
| NEWS_API_KEY = os.getenv("NEWS_API_KEY") # Set this in your .env file | |
| device = "cpu" # Force CPU since no GPU is available in Hugging Face Spaces | |
| # ------------------------- | |
| # News Extraction Functions | |
| # ------------------------- | |
| def fetch_and_scrape_news(company, api_key, count=1, output_file='news_articles.xlsx'): | |
| print("Starting news fetch from NewsAPI...") | |
| newsapi = NewsApiClient(api_key=api_key) | |
| all_articles = newsapi.get_everything(q=company, language='en', sort_by='relevancy', page_size=count) | |
| articles = all_articles.get('articles', []) | |
| scraped_data = [] | |
| print(f"Found {len(articles)} articles. Starting scraping individual articles...") | |
| for i, article in enumerate(articles): | |
| url = article.get('url') | |
| if url: | |
| print(f"Scraping article {i+1}: {url}") | |
| scraped_article = scrape_news(url) | |
| if scraped_article: | |
| scraped_article['url'] = url | |
| scraped_data.append(scraped_article) | |
| df = pd.DataFrame(scraped_data) | |
| df.to_excel(output_file, index=False, header=True) | |
| print(f"News scraping complete. Data saved to {output_file}") | |
| return df | |
| def scrape_news(url): | |
| headers = {"User-Agent": "Mozilla/5.0"} | |
| try: | |
| response = requests.get(url, headers=headers, timeout=10) | |
| response.raise_for_status() | |
| except Exception as e: | |
| print(f"Failed to fetch the page: {url} ({e})") | |
| return None | |
| soup = BeautifulSoup(response.text, "html.parser") | |
| headline = soup.find("h1").get_text(strip=True) if soup.find("h1") else "No headline found" | |
| paragraphs = soup.find_all("p") | |
| article_text = " ".join(p.get_text(strip=True) for p in paragraphs) | |
| return {"headline": headline, "content": article_text} | |
| # ------------------------- | |
| # Sentiment Analysis Setup | |
| # ------------------------- | |
| print("Loading sentiment analysis model...") | |
| sentiment_model_name = "cross-encoder/nli-distilroberta-base" | |
| sentiment_model = AutoModelForSequenceClassification.from_pretrained( | |
| sentiment_model_name, | |
| torch_dtype=torch.float32 | |
| ) | |
| sentiment_tokenizer = AutoTokenizer.from_pretrained(sentiment_model_name) | |
| classifier = pipeline("zero-shot-classification", model=sentiment_model, tokenizer=sentiment_tokenizer, device=-1) | |
| labels = ["positive", "negative", "neutral"] | |
| # ------------------------- | |
| # Summarization Setup | |
| # ------------------------- | |
| print("Loading summarization model (BART)...") | |
| bart_tokenizer = BartTokenizer.from_pretrained('facebook/bart-large-cnn') | |
| bart_model = BartForConditionalGeneration.from_pretrained('facebook/bart-large-cnn') | |
| def split_into_chunks(text, tokenizer, max_tokens=1024): | |
| words = text.split() | |
| chunks = [] | |
| current_chunk = [] | |
| current_length = 0 | |
| for word in words: | |
| tokenized_word = tokenizer.encode(word, add_special_tokens=False) | |
| if current_length + len(tokenized_word) <= max_tokens: | |
| current_chunk.append(word) | |
| current_length += len(tokenized_word) | |
| else: | |
| chunks.append(' '.join(current_chunk)) | |
| current_chunk = [word] | |
| current_length = len(tokenized_word) | |
| if current_chunk: | |
| chunks.append(' '.join(current_chunk)) | |
| return chunks | |
| # ------------------------- | |
| # Translation Setup (English to Hindi) | |
| # ------------------------- | |
| print("Loading translation model (MarianMT)...") | |
| translation_model_name = 'Helsinki-NLP/opus-mt-en-hi' | |
| trans_tokenizer = MarianTokenizer.from_pretrained(translation_model_name) | |
| trans_model = MarianMTModel.from_pretrained(translation_model_name) | |
| def translate_text(text): | |
| tokens = trans_tokenizer(text, return_tensors="pt", padding=True) | |
| translated = trans_model.generate(**tokens) | |
| return trans_tokenizer.decode(translated[0], skip_special_tokens=True) | |
| # ------------------------- | |
| # Bark TTS Setup (Hindi) | |
| # ------------------------- | |
| print("Loading Bark TTS model...") | |
| bark_model = BarkModel.from_pretrained("suno/bark-small") | |
| bark_model.to(device) | |
| processor = AutoProcessor.from_pretrained("suno/bark") | |
| # ------------------------- | |
| # Helper Functions for Audio and Text Preprocessing | |
| # ------------------------- | |
| def normalize_text(text): | |
| return re.sub(r"[^\w\s]", "", text.lower()).strip() | |
| def resample_audio(audio_array, orig_sr, target_sr=16000): | |
| if orig_sr != target_sr: | |
| audio_array = librosa.resample(audio_array, orig_sr=orig_sr, target_sr=target_sr) | |
| return audio_array | |
| # ------------------------- | |
| # Main Pipeline Function | |
| # ------------------------- | |
| def process_company(company): | |
| print(f"Processing company: {company}") | |
| # Step 1: Fetch and scrape news | |
| print("Fetching and scraping news...") | |
| fetch_and_scrape_news(company, NEWS_API_KEY) | |
| df = pd.read_excel('news_articles.xlsx') | |
| print("Scraped Articles:") | |
| print(df) | |
| articles_data = [] | |
| for index, row in df.iterrows(): | |
| print(f"Processing article {index+1}...") | |
| article_text = row.get("content", "") | |
| title = row.get("headline", "No title") | |
| url = row.get("url", "") | |
| chunks = split_into_chunks(article_text, bart_tokenizer) | |
| chunk_summaries = [] | |
| for i, chunk in enumerate(chunks): | |
| print(f"Summarizing chunk {i+1}/{len(chunks)}...") | |
| inputs = bart_tokenizer([chunk], max_length=1024, return_tensors='pt', truncation=True) | |
| summary_ids = bart_model.generate(inputs.input_ids, num_beams=2, max_length=50, min_length=30, early_stopping=True) | |
| chunk_summary = bart_tokenizer.decode(summary_ids[0], skip_special_tokens=True) | |
| chunk_summaries.append(chunk_summary) | |
| final_summary = ' '.join(chunk_summaries) | |
| print("Performing sentiment analysis...") | |
| sentiment_result = classifier(final_summary, labels) | |
| sentiment = sentiment_result["labels"][0] | |
| articles_data.append({ | |
| "Title": title, | |
| "Summary": final_summary, | |
| "Sentiment": sentiment, | |
| "URL": url | |
| }) | |
| # Comparative Analysis: Build a simple sentiment distribution | |
| sentiment_distribution = {"Positive": 0, "Negative": 0, "Neutral": 0} | |
| for article in articles_data: | |
| key = article["Sentiment"].capitalize() | |
| sentiment_distribution[key] += 1 | |
| print("Sentiment distribution computed.") | |
| # Step 2: Translate summaries and generate Hindi speech | |
| print("Translating summaries to Hindi...") | |
| translated_summaries = [translate_text(article["Summary"]) for article in articles_data] | |
| final_translated_text = "\n\n".join(translated_summaries) | |
| print("Generating Hindi speech with Bark TTS...") | |
| inputs = processor(final_translated_text, return_tensors="pt") | |
| speech_output = bark_model.generate(**inputs) | |
| audio_path = "final_summary.wav" | |
| sf.write(audio_path, speech_output[0].cpu().numpy(), bark_model.generation_config.sample_rate) | |
| print("Audio generated and saved.") | |
| # Build final report | |
| report = { | |
| "Company": company, | |
| "Articles": articles_data, | |
| "Comparative Sentiment Score": { | |
| "Sentiment Distribution": sentiment_distribution, | |
| "Coverage Differences": "Detailed comparative analysis not implemented", | |
| "Topic Overlap": "Topic extraction not implemented" | |
| }, | |
| "Final Sentiment Analysis": "Overall sentiment analysis not fully computed", | |
| "Audio": audio_path | |
| } | |
| print("Final report prepared.") | |
| return report, audio_path | |
| # ------------------------- | |
| # Gradio Interface Function | |
| # ------------------------- | |
| def gradio_interface(company): | |
| print(f"Received input: {company}") | |
| report, audio_path = process_company(company) | |
| return report, audio_path | |
| # ------------------------- | |
| # Gradio UI Setup | |
| # ------------------------- | |
| iface = gr.Interface( | |
| fn=gradio_interface, | |
| inputs=gr.Textbox(label="Enter Company Name"), | |
| outputs=[ | |
| gr.JSON(label="News Sentiment Report"), | |
| gr.Audio(type="filepath", label="Hindi Summary Audio") | |
| ], | |
| title="News Summarization & Text-to-Speech", | |
| description="Enter a company name to fetch news articles, perform sentiment analysis, and listen to a Hindi TTS summary." | |
| ) | |
| if __name__ == "__main__": | |
| iface.launch() | |