Spaces:
Sleeping
Sleeping
import os | |
import re | |
import streamlit as st | |
import google.generativeai as genai | |
from transformers import pipeline, AutoModelForSequenceClassification, AutoTokenizer | |
from db import get_entry_by_index # For fetching a specific entry from MongoDB | |
# Configure Gemini API key | |
GEMINI_API_KEY = os.getenv("gemini_api") | |
if GEMINI_API_KEY: | |
genai.configure(api_key=GEMINI_API_KEY) | |
else: | |
st.error("β οΈ Google API key is missing! Set it in Hugging Face Secrets.") | |
# Load pre-trained sentiment analysis model | |
MODEL_NAME = "cardiffnlp/twitter-roberta-base-sentiment" | |
try: | |
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME) | |
model = AutoModelForSequenceClassification.from_pretrained(MODEL_NAME) | |
sentiment_pipeline = pipeline("sentiment-analysis", model=model, tokenizer=tokenizer) | |
except Exception as e: | |
st.error(f"β Error loading sentiment model: {e}") | |
# Load Topic Extraction Model | |
try: | |
topic_pipeline = pipeline("zero-shot-classification", model="facebook/bart-large-mnli") | |
except Exception as e: | |
st.error(f"β Error loading topic extraction model: {e}") | |
# Predefined topic labels for classification | |
TOPIC_LABELS = [ | |
"Technology", "Politics", "Business", "Sports", "Entertainment", | |
"Health", "Science", "Education", "Finance", "Travel", "Food" | |
] | |
def analyze_sentiment(text): | |
try: | |
result = sentiment_pipeline(text)[0] | |
label = result['label'] | |
score = result['score'] | |
sentiment_mapping = { | |
"LABEL_0": "Negative", | |
"LABEL_1": "Neutral", | |
"LABEL_2": "Positive" | |
} | |
return sentiment_mapping.get(label, "Unknown"), score | |
except Exception as e: | |
return f"Error analyzing sentiment: {e}", None | |
def extract_topic(text): | |
try: | |
result = topic_pipeline(text, TOPIC_LABELS) | |
top_topic = result["labels"][0] | |
confidence = result["scores"][0] | |
return top_topic, confidence | |
except Exception as e: | |
return f"Error extracting topic: {e}", None | |
# Helper: extract an entry index from a query string. | |
# For example, "data entry 1" or "entry 2" will return index 0 or 1 respectively. | |
def extract_entry_index(prompt): | |
match = re.search(r'(data entry|entry)\s+(\d+)', prompt, re.IGNORECASE) | |
if match: | |
index = int(match.group(2)) - 1 # Convert to 0-based index | |
return index | |
return None | |
def chatbot_response(user_prompt): | |
if not user_prompt: | |
return None, None, None, None, None | |
try: | |
# Check if the user query asks for a specific dataset entry. | |
entry_index = extract_entry_index(user_prompt) | |
if entry_index is not None: | |
# Fetch the requested entry from MongoDB. | |
entry = get_entry_by_index(entry_index) | |
if entry is None: | |
return "β No entry found for the requested index.", None, None, None, None | |
# Extract the required fields. | |
entry_text = entry.get("text", "No text available.") | |
entry_user = entry.get("user", "Unknown") | |
entry_date = entry.get("date", "Unknown") | |
# Build a static response message with only the desired parts. | |
ai_response = ( | |
"Let's break down this tweet-like MongoDB entry:\n\n" | |
f"Text: {entry_text}\n" | |
f"User: {entry_user}\n" | |
f"Date: {entry_date}" | |
) | |
# Perform sentiment and topic analysis on the entry's text. | |
sentiment_label, sentiment_confidence = analyze_sentiment(entry_text) | |
topic_label, topic_confidence = extract_topic(entry_text) | |
return ai_response, sentiment_label, sentiment_confidence, topic_label, topic_confidence | |
else: | |
# For other queries, use the generative model flow. | |
model_gen = genai.GenerativeModel("gemini-1.5-pro") | |
ai_response_obj = model_gen.generate_content(user_prompt) | |
ai_response = ai_response_obj.text | |
# Perform sentiment and topic analysis on the user prompt. | |
sentiment_label, sentiment_confidence = analyze_sentiment(user_prompt) | |
topic_label, topic_confidence = extract_topic(user_prompt) | |
return ai_response, sentiment_label, sentiment_confidence, topic_label, topic_confidence | |
except Exception as e: | |
return f"β Error: {e}", None, None, None, None | |