Sa-m's picture
Update app.py
018141a verified
raw
history blame
23.8 kB
import random
import matplotlib.pyplot as plt
import nltk
from nltk.tokenize import word_tokenize, sent_tokenize
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer
from nltk.text import Text
from nltk.probability import FreqDist
from cleantext import clean
import textract
import urllib.request
from io import BytesIO
import sys
import pandas as pd
import cv2
import re
from wordcloud import WordCloud, ImageColorGenerator
from textblob import TextBlob
from PIL import Image
import os
import gradio as gr
from dotenv import load_dotenv
import groq
import json
import traceback
import numpy as np
import unidecode
import contractions
from sklearn.feature_extraction.text import TfidfVectorizer
load_dotenv()
import nltk
import ssl
def ensure_nltk_resources():
try:
nltk.data.find('tokenizers/punkt')
nltk.data.find('corpora/stopwords')
except LookupError:
print("NLTK resources not found. Downloading...")
try:
# Handling potential SSL issues (common on some systems)
_create_unverified_https_context = ssl._create_unverified_context
except AttributeError:
pass
else:
ssl._create_default_https_context = _create_unverified_https_context
nltk.download(['stopwords', 'wordnet', 'words'])
nltk.download('punkt')
nltk.download('punkt_tab')
print("NLTK resources downloaded successfully.")
ensure_nltk_resources()
# Download NLTK resources (Ensure this runs once or handle caching)
# nltk.download(['stopwords', 'wordnet', 'words'])
# nltk.download('punkt')
# nltk.download('punkt_tab')
# Initialize Groq client
groq_api_key = os.getenv("GROQ_API_KEY")
groq_client = groq.Groq(api_key=groq_api_key) if groq_api_key else None
# Stopwords customization
stop_words = set(stopwords.words('english'))
stop_words.update({'ask', 'much', 'thank', 'etc.', 'e', 'We', 'In', 'ed', 'pa', 'This', 'also', 'A', 'fu', 'To', '5', 'ing', 'er', '2'}) # Ensure stop_words is a set
# --- Parsing & Preprocessing Functions ---
def Parsing(parsed_text):
try:
if hasattr(parsed_text, 'name'):
file_path = parsed_text.name
else:
file_path = parsed_text
# Ensure textract handles encoding correctly or handle errors
raw_party = textract.process(file_path) # Removed encoding/method for broader compatibility
decoded_text = raw_party.decode('utf-8', errors='ignore') # Decode bytes to string, handling errors
return clean(decoded_text) # Pass decoded string to clean
except Exception as e:
print(f"Error parsing PDF: {e}")
return f"Error parsing PDF: {e}"
def clean_text(text):
text = text.encode("ascii", errors="ignore").decode("ascii")
text = unidecode.unidecode(text)
text = contractions.fix(text)
text = re.sub(r"\n", " ", text)
text = re.sub(r"\t", " ", text)
text = re.sub(r"/ ", " ", text)
text = text.strip()
text = re.sub(" +", " ", text).strip()
text = [word for word in text.split() if word not in stop_words]
return ' '.join(text)
def Preprocess(textParty):
text1Party = re.sub('[^A-Za-z0-9]+', ' ', textParty)
pattern = re.compile(r'\b(' + r'|'.join(stopwords.words('english')) + r')\b\s*')
text2Party = pattern.sub('', text1Party)
return text2Party
# --- Core Analysis Functions ---
def generate_summary(text):
if not groq_client:
return "Summarization is not available. Please set up your GROQ_API_KEY in the .env file."
if len(text) > 10000:
text = text[:10000]
try:
completion = groq_client.chat.completions.create(
model="llama3-8b-8192", # Or your preferred model
messages=[
{"role": "system", "content": "You are a helpful assistant that summarizes political manifestos. Provide a concise, objective summary that captures the key policy proposals, themes, and promises in the manifesto."},
{"role": "user", "content": f"Please summarize the following political manifesto text in about 300-500 words, focusing on the main policy areas, promises, and themes:\n{text}"}
],
temperature=0.3,
max_tokens=800
)
return completion.choices[0].message.content
except Exception as e:
return f"Error generating summary: {str(e)}"
def fDistance(text2Party):
word_tokens_party = word_tokenize(text2Party)
fdistance = FreqDist(word_tokens_party).most_common(10)
mem = {x[0]: x[1] for x in fdistance}
vectorizer = TfidfVectorizer(max_features=15, stop_words='english')
try:
tfidf_matrix = vectorizer.fit_transform(sent_tokenize(text2Party))
feature_names = vectorizer.get_feature_names_out()
tfidf_scores = {}
sentences = sent_tokenize(text2Party)
for i, word in enumerate(feature_names):
scores = []
for j in range(tfidf_matrix.shape[0]): # Iterate through sentences
if i < tfidf_matrix.shape[1]: # Check if word index is valid for this sentence vector
scores.append(tfidf_matrix[j, i])
if scores:
tfidf_scores[word] = sum(scores) / len(scores) # Average TF-IDF score across sentences
combined_scores = {}
all_words = set(list(mem.keys()) + list(tfidf_scores.keys()))
max_freq = max(mem.values()) if mem else 1
max_tfidf = max(tfidf_scores.values()) if tfidf_scores else 1
for word in all_words:
freq_score = mem.get(word, 0) / max_freq
tfidf_score = tfidf_scores.get(word, 0) / max_tfidf
combined_scores[word] = (freq_score * 0.3) + (tfidf_score * 0.7)
top_words = dict(sorted(combined_scores.items(), key=lambda x: x[1], reverse=True)[:10])
return normalize(top_words)
except ValueError as ve: # Handle case where TF-IDF fails (e.g., empty after processing)
print(f"Warning: TF-IDF failed, using only frequency: {ve}")
# Fallback to just normalized frequency if TF-IDF fails
if mem:
max_freq = max(mem.values())
return {k: v / max_freq for k, v in list(mem.items())[:10]} # Return top 10 freq, normalized
else:
return {}
def normalize(d, target=1.0):
raw = sum(d.values())
factor = target / raw if raw != 0 else 0
return {key: value * factor for key, value in d.items()}
# --- Visualization Functions with Error Handling ---
def safe_plot(func, *args, **kwargs):
"""Executes a plotting function and returns the image, handling errors."""
buf = None # Initialize buffer
try:
# Ensure a clean figure state
fig = plt.figure() # Create a new figure explicitly
func(*args, **kwargs)
buf = BytesIO()
# Try saving with bbox_inches, but catch potential apply_aspect error
try:
plt.savefig(buf, format='png', bbox_inches='tight')
except AttributeError as ae:
if "apply_aspect" in str(ae):
print(f"Warning: bbox_inches='tight' failed ({ae}), saving without it.")
buf.seek(0) # Reset buffer as it might be partially written
buf = BytesIO() # Get a fresh buffer
plt.savefig(buf, format='png') # Save without bbox_inches
else:
raise # Re-raise if it's a different AttributeError
buf.seek(0)
img = Image.open(buf)
plt.close(fig) # Explicitly close the specific figure
return img
except Exception as e:
print(f"Plotting error in safe_plot: {e}")
if buf:
buf.close() # Ensure buffer is closed on error if it was created
traceback.print_exc()
# Try to return a placeholder or None
plt.close('all') # Aggressive close on error
return None
def fDistancePlot(text2Party):
"""Generates the frequency distribution plot."""
def plot_func():
tokens = word_tokenize(text2Party)
if not tokens:
plt.text(0.5, 0.5, "No data to plot", ha='center', va='center', transform=plt.gca().transAxes) # Use Axes coordinates
return
fdist = FreqDist(tokens)
fdist.plot(15, title='Frequency Distribution')
plt.xticks(rotation=45, ha='right')
plt.tight_layout()
return safe_plot(plot_func)
def DispersionPlot(textParty):
"""Generates the word dispersion plot."""
buf = None # Initialize buffer
try:
word_tokens_party = word_tokenize(textParty)
if not word_tokens_party:
print("Warning: No tokens found for dispersion plot.")
return None
moby = Text(word_tokens_party)
fdistance = FreqDist(word_tokens_party)
# Get top 5 words, handle potential IndexError if less than 5 unique words
common_words = fdistance.most_common(6)
if len(common_words) < 5:
word_Lst = [word for word, _ in common_words]
else:
word_Lst = [common_words[x][0] for x in range(5)]
if not word_Lst:
print("Warning: No common words found for dispersion plot.")
return None
# --- Key Fix: Manage figure explicitly without passing 'ax' ---
fig = plt.figure(figsize=(10, 5)) # Create figure explicitly
plt.title('Dispersion Plot')
# Call dispersion_plot without 'ax' argument
moby.dispersion_plot(word_Lst)
plt.tight_layout()
buf = BytesIO()
# Handle potential apply_aspect error for dispersion plot
try:
fig.savefig(buf, format='png', bbox_inches='tight')
except AttributeError as ae:
if "apply_aspect" in str(ae):
print(f"Warning: bbox_inches='tight' failed for Dispersion Plot ({ae}), saving without it.")
buf.seek(0)
buf = BytesIO() # Get a fresh buffer
fig.savefig(buf, format='png')
else:
raise # Re-raise if it's a different AttributeError
buf.seek(0)
img = Image.open(buf)
plt.close(fig) # Close the specific figure created
return img
except Exception as e:
print(f"Dispersion plot error: {e}")
if buf:
buf.close() # Ensure buffer is closed on error
traceback.print_exc()
plt.close('all') # Aggressive close on error
return None # Return None on error
def word_cloud_generator(parsed_text_name, text_Party):
"""Generates the word cloud image."""
buf = None # Initialize buffer
try:
filename_lower = ""
if hasattr(parsed_text_name, 'name') and parsed_text_name.name:
filename_lower = parsed_text_name.name.lower()
elif isinstance(parsed_text_name, str):
filename_lower = parsed_text_name.lower()
mask_path = None
if 'bjp' in filename_lower:
mask_path = 'bjpImg2.jpeg'
elif 'congress' in filename_lower:
mask_path = 'congress3.jpeg'
elif 'aap' in filename_lower:
mask_path = 'aapMain2.jpg'
if text_Party.strip() == "":
raise ValueError("Text for word cloud is empty")
# Generate word cloud object
if mask_path and os.path.exists(mask_path):
orgImg = Image.open(mask_path)
if orgImg.mode != 'RGB':
orgImg = orgImg.convert('RGB')
mask = np.array(orgImg)
wordcloud = WordCloud(max_words=3000, mask=mask, background_color='white', mode='RGBA').generate(text_Party)
else:
wordcloud = WordCloud(max_words=2000, background_color='white', mode='RGBA').generate(text_Party)
# --- Key Fix: Explicitly manage figure and axes for word cloud ---
fig, ax = plt.subplots(figsize=(8, 6)) # Create new figure and axes
ax.imshow(wordcloud, interpolation='bilinear')
ax.axis("off")
fig.tight_layout(pad=0) # Remove padding
buf = BytesIO()
# Handle potential apply_aspect error for word cloud too
try:
fig.savefig(buf, format='png', bbox_inches='tight', dpi=300, facecolor='white') # Added dpi and facecolor
except AttributeError as ae:
if "apply_aspect" in str(ae):
print(f"Warning: bbox_inches='tight' failed for Word Cloud ({ae}), saving without it.")
buf.seek(0)
buf = BytesIO()
fig.savefig(buf, format='png', dpi=300, facecolor='white')
else:
raise
buf.seek(0)
img = Image.open(buf)
plt.close(fig) # Close the specific figure
return img
except Exception as e:
print(f"Word cloud error: {e}")
if buf:
buf.close() # Ensure buffer is closed on error
traceback.print_exc()
plt.close('all') # Aggressive close on error
return None # Return None on error
# Initial design for concordance based search
def get_all_phases_containing_tar_wrd(target_word, tar_passage, left_margin=10, right_margin=10, numLins=4):
"""
Function to get all the phrases that contain the target word in a text/passage.
"""
if not target_word or target_word.strip() == "":
return "Please enter a search term"
tokens = nltk.word_tokenize(tar_passage)
text = nltk.Text(tokens)
c = nltk.ConcordanceIndex(text.tokens, key=lambda s: s.lower())
offsets = c.offsets(target_word)
if not offsets:
return f"Word '{target_word}' not found."
concordance_txt = [
text.tokens[max(0, offset - left_margin):offset + right_margin]
for offset in offsets[:numLins]
]
result = [' '.join(con_sub) for con_sub in concordance_txt]
return '\n'.join(result) # Use newline for better readability in textbox
def get_contextual_search_result(target_word, tar_passage, groq_client_instance, max_context_length=8000):
"""
Uses the LLM to provide contextual information about the target word within the passage.
"""
if not target_word or target_word.strip() == "":
return "Please enter a search term."
if not groq_client_instance:
return "Contextual search requires the LLM API. Please set up your GROQ_API_KEY."
# Basic check if word exists (optional, LLM can handle it too)
# Simple check, might generate false positives/negatives
# if target_word.lower() not in tar_passage.lower():
# return f"The term '{target_word}' was not found in the manifesto text."
# Truncate passage if too long for the model/context window
original_length = len(tar_passage)
if original_length > max_context_length:
# Simple truncation; could be improved to ensure sentences are complete
tar_passage_truncated = tar_passage[:max_context_length]
print(f"Warning: Passage truncated for LLM search context from {original_length} to {max_context_length} characters.")
else:
tar_passage_truncated = tar_passage
# --- Improved Prompt ---
prompt = f"""
You are an expert political analyst. You have been given a section of a political manifesto and a specific search term.
Your task is to extract and summarize all information related to the search term from the provided text.
Focus on:
1. Specific policies, promises, or statements related to the term.
2. The context in which the term is used.
3. Any key details, figures, or commitments mentioned.
Present your findings concisely. If the term is not relevant or not found in the provided text section, state that clearly.
Search Term: {target_word}
Manifesto Text Section:
{tar_passage_truncated}
Relevant Information:
"""
try:
completion = groq_client_instance.chat.completions.create(
model="llama3-8b-8192", # Use the same or a suitable model
messages=[
{"role": "system", "content": "You are a helpful assistant skilled at analyzing political texts and extracting relevant information based on a search query. Provide clear, concise summaries."},
{"role": "user", "content": prompt}
],
temperature=0.2, # Low temperature for more factual extraction
max_tokens=1000 # Adjust based on expected output length
)
result = completion.choices[0].message.content.strip()
# Add a note if the input was truncated
if original_length > max_context_length:
result = f"(Note: Analysis based on the first {max_context_length} characters of the manifesto.)\n\n" + result
return result if result else f"No specific context for '{target_word}' could be generated from the provided text section."
except Exception as e:
error_msg = f"Error during contextual search for '{target_word}': {str(e)}"
print(error_msg)
traceback.print_exc()
# Fallback to concordance if LLM fails?
# return get_all_phases_containing_tar_wrd(target_word, tar_passage)
return error_msg # Or return the error message directly
def analysis(Manifesto, Search):
try:
if Manifesto is None:
# Ensure return order matches the outputs list
return "No file uploaded", {}, None, None, None, None, None, "No file uploaded"
if Search.strip() == "":
Search = "government"
raw_party = Parsing(Manifesto)
if isinstance(raw_party, str) and raw_party.startswith("Error"):
return raw_party, {}, None, None, None, None, None, "Parsing failed"
text_Party = clean_text(raw_party)
text_Party_processed = Preprocess(text_Party)
# --- Perform Search FIRST using the ORIGINAL text for better context ---
# Pass the original raw text for richer context to the LLM
searChRes = get_contextual_search_result(Search, raw_party, groq_client)
# --- Then proceed with other analyses ---
summary = generate_summary(raw_party) # Use raw_party for summary for more context?
# --- Sentiment Analysis ---
if not text_Party_processed.strip():
# Handle empty text after processing
df_dummy = pd.DataFrame({'Polarity_Label': ['Neutral'], 'Subjectivity_Label': ['Low']})
polarity_val = 0.0
subjectivity_val = 0.0
else:
polarity_val = TextBlob(text_Party_processed).sentiment.polarity
subjectivity_val = TextBlob(text_Party_processed).sentiment.subjectivity
polarity_label = 'Positive' if polarity_val > 0 else 'Negative' if polarity_val < 0 else 'Neutral'
subjectivity_label = 'High' if subjectivity_val > 0.5 else 'Low'
df_dummy = pd.DataFrame({'Polarity_Label': [polarity_label], 'Subjectivity_Label': [subjectivity_label]})
# --- Generate Plots with Safe Plotting ---
# Pass the potentially empty text and handle inside plotting functions
sentiment_plot = safe_plot(lambda: df_dummy['Polarity_Label'].value_counts().plot(kind='bar', color="#FF9F45", title='Sentiment Analysis'))
subjectivity_plot = safe_plot(lambda: df_dummy['Subjectivity_Label'].value_counts().plot(kind='bar', color="#B667F1", title='Subjectivity Analysis'))
freq_plot = fDistancePlot(text_Party_processed)
dispersion_plot = DispersionPlot(text_Party_processed) # Use fixed version
wordcloud = word_cloud_generator(Manifesto, text_Party_processed) # Pass Manifesto object itself
fdist_Party = fDistance(text_Party_processed)
return searChRes, fdist_Party, sentiment_plot, subjectivity_plot, wordcloud, freq_plot, dispersion_plot, summary
except Exception as e:
error_msg = f"Critical error in analysis function: {str(e)}"
print(error_msg)
traceback.print_exc()
# Return error messages/images in the correct order
return error_msg, {}, None, None, None, None, None, "Analysis failed"
# --- Gradio Interface (remains largely the same, just ensuring output variable names match) ---
with gr.Blocks(title='Manifesto Analysis') as demo:
gr.Markdown("# Manifesto Analysis")
# Input Section
with gr.Row():
with gr.Column(scale=1): # Adjust scale if needed
file_input = gr.File(label="Upload Manifesto PDF", file_types=[".pdf"])
with gr.Column(scale=1):
search_input = gr.Textbox(label="Search Term", placeholder="Enter a term to search in the manifesto")
submit_btn = gr.Button("Analyze Manifesto", variant='primary') # Make button prominent
# Output Section using Tabs
with gr.Tabs():
# --- Summary Tab ---
with gr.TabItem("Summary"):
summary_output = gr.Textbox(label='AI-Generated Summary', lines=10, interactive=False)
# --- Search Results Tab ---
with gr.TabItem("Search Results"):
# Use the specific output variable defined in the layout
search_output = gr.Textbox(label='Context Based Search Results', lines=15, interactive=False, max_lines=20) # Increased lines/max_lines
# --- Key Topics Tab ---
with gr.TabItem("Key Topics"):
topics_output = gr.Label(label="Most Relevant Topics (LLM Enhanced)", num_top_classes=10) # Show top 10
# --- Visualizations Tab ---
with gr.TabItem("Visualizations"):
# Use Rows and Columns for better arrangement
with gr.Row(): # Row 1: Sentiment & Subjectivity
with gr.Column():
sentiment_output = gr.Image(label='Sentiment Analysis', interactive=False, height=400) # Set height
with gr.Column():
subjectivity_output = gr.Image(label='Subjectivity Analysis', interactive=False, height=400)
with gr.Row(): # Row 2: Word Cloud & Frequency
with gr.Column():
wordcloud_output = gr.Image(label='Word Cloud', interactive=False, height=400)
with gr.Column():
freq_output = gr.Image(label='Frequency Distribution', interactive=False, height=400)
with gr.Row(): # Row 3: Dispersion Plot (Full width)
with gr.Column():
dispersion_output = gr.Image(label='Dispersion Plot', interactive=False, height=400) # Adjust height as needed
# --- Link Button Click to Function and Outputs ---
# Ensure the order of outputs matches the function return order
submit_btn.click(
fn=analysis,
inputs=[file_input, search_input],
outputs=[
search_output, # 1 (Now contextual)
topics_output, # 2
sentiment_output, # 3
subjectivity_output, # 4
wordcloud_output, # 5
freq_output, # 6
dispersion_output, # 7
summary_output # 8
],
concurrency_limit=1 # Limit concurrent analyses if needed
)
# --- Examples ---
# Ensure outputs list references the PREDEFINED components from the layout
gr.Examples(
examples=[
["Example/AAP_Manifesto_2019.pdf", "government"],
["Example/Bjp_Manifesto_2019.pdf", "environment"],
["Example/Congress_Manifesto_2019.pdf", "safety"]
],
inputs=[file_input, search_input],
# --- Key Fix: Reference the predefined output components ---
outputs=[search_output, topics_output, sentiment_output, subjectivity_output, wordcloud_output, freq_output, dispersion_output, summary_output],
fn=analysis # Run analysis on example click
)
if __name__ == "__main__":
demo.launch(debug=True, share=False, show_error=True)