|
import streamlit as st |
|
from typing import List, Dict |
|
import httpx |
|
from pathlib import Path |
|
import os |
|
from dotenv import load_dotenv |
|
import json |
|
import numpy as np |
|
from pymongo import MongoClient |
|
from openai import OpenAI |
|
from datetime import datetime |
|
import asyncio |
|
import pandas as pd |
|
|
|
|
|
load_dotenv() |
|
PERPLEXITY_API_KEY = os.getenv("PERPLEXITY_KEY") |
|
MONGODB_URI = os.getenv("MONGO_URI") |
|
OPENAI_API_KEY = os.getenv("OPENAI_KEY") |
|
|
|
|
|
client = MongoClient(MONGODB_URI) |
|
db = client["document_analysis"] |
|
vectors_collection = db["document_vectors"] |
|
|
|
|
|
openai_client = OpenAI(api_key=OPENAI_API_KEY) |
|
|
|
|
|
class GoalAnalyzer: |
|
def __init__(self): |
|
self.api_key = PERPLEXITY_API_KEY |
|
self.base_url = "https://api.perplexity.ai/chat/completions" |
|
|
|
def clean_json_string(self, content: str) -> str: |
|
"""Clean and extract valid JSON from string""" |
|
|
|
if "```json" in content: |
|
content = content.split("```json")[1].split("```")[0] |
|
elif "```" in content: |
|
content = content.split("```")[1] |
|
|
|
|
|
start_idx = content.find("{") |
|
end_idx = content.rfind("}") + 1 |
|
|
|
if start_idx != -1 and end_idx > 0: |
|
content = content[start_idx:end_idx] |
|
|
|
|
|
content = content.strip() |
|
content = content.replace("\n", "") |
|
content = content.replace("'", '"') |
|
|
|
return content |
|
|
|
async def get_perplexity_analysis(self, text: str, goal: str) -> Dict: |
|
"""Get analysis from Perplexity API""" |
|
headers = { |
|
"Authorization": f"Bearer {self.api_key}", |
|
"Content-Type": "application/json", |
|
} |
|
|
|
prompt = f""" |
|
Analyze the following text in context of the goal: {goal} |
|
|
|
Text: {text} |
|
|
|
Provide analysis in the following JSON format: |
|
{{ |
|
"themes": ["theme1", "theme2"], |
|
"subthemes": {{"theme1": ["subtheme1", "subtheme2"], "theme2": ["subtheme3"]}}, |
|
"keywords": ["keyword1", "keyword2"], |
|
"relevance_score": 0-100 |
|
}} |
|
""" |
|
|
|
try: |
|
async with httpx.AsyncClient() as client: |
|
payload = { |
|
"model": "llama-3.1-sonar-small-128k-chat", |
|
"messages": [ |
|
{ |
|
"role": "system", |
|
"content": "You are an AI assistant that analyzes documents and provides structured analysis.", |
|
}, |
|
{"role": "user", "content": prompt}, |
|
], |
|
"max_tokens": 1024, |
|
} |
|
|
|
|
|
with st.expander("Debug Info", expanded=False): |
|
st.write("Request payload:", payload) |
|
|
|
response = await client.post( |
|
self.base_url, headers=headers, json=payload, timeout=30.0 |
|
) |
|
|
|
|
|
with st.expander("Response Info", expanded=False): |
|
st.write("Response status:", response.status_code) |
|
st.write("Response headers:", dict(response.headers)) |
|
st.write("Response content:", response.text) |
|
|
|
if response.status_code != 200: |
|
error_detail = ( |
|
response.json() if response.content else "No error details" |
|
) |
|
raise Exception( |
|
f"API returned status code {response.status_code}. Details: {error_detail}" |
|
) |
|
|
|
result = response.json() |
|
content = ( |
|
result.get("choices", [{}])[0].get("message", {}).get("content", "") |
|
) |
|
|
|
|
|
cleaned_content = self.clean_json_string(content) |
|
|
|
try: |
|
analysis = json.loads(cleaned_content) |
|
|
|
|
|
required_fields = [ |
|
"themes", |
|
"subthemes", |
|
"keywords", |
|
"relevance_score", |
|
] |
|
for field in required_fields: |
|
if field not in analysis: |
|
analysis[field] = [] if field != "relevance_score" else 0 |
|
|
|
return analysis |
|
|
|
except json.JSONDecodeError as e: |
|
st.error(f"JSON parsing error: {str(e)}") |
|
st.error(f"Failed content: {cleaned_content}") |
|
return { |
|
"themes": ["Error parsing themes"], |
|
"subthemes": {"Error": ["Failed to parse subthemes"]}, |
|
"keywords": ["parsing-error"], |
|
"relevance_score": 0, |
|
} |
|
|
|
except Exception as e: |
|
st.error(f"API Error: {str(e)}") |
|
return None |
|
|
|
def extract_text_from_file(self, file) -> str: |
|
"""Extract text content from uploaded file""" |
|
try: |
|
text = "" |
|
file_type = file.type |
|
|
|
if file_type == "text/plain": |
|
text = file.getvalue().decode("utf-8") |
|
elif file_type == "application/pdf": |
|
import PyPDF2 |
|
|
|
pdf_reader = PyPDF2.PdfReader(file) |
|
for page in pdf_reader.pages: |
|
text += page.extract_text() |
|
elif ( |
|
file_type |
|
== "application/vnd.openxmlformats-officedocument.wordprocessingml.document" |
|
): |
|
import docx |
|
|
|
doc = docx.Document(file) |
|
text = " ".join([paragraph.text for paragraph in doc.paragraphs]) |
|
|
|
return text |
|
except Exception as e: |
|
st.error(f"Error extracting text: {str(e)}") |
|
return "" |
|
|
|
|
|
class DocumentVectorizer: |
|
def __init__(self): |
|
self.model = "text-embedding-ada-002" |
|
self.client = MongoClient(MONGODB_URI) |
|
self.db = self.client["document_analysis"] |
|
self.vectors_collection = self.db["document_vectors"] |
|
|
|
|
|
try: |
|
self.vectors_collection.create_index( |
|
[("vector", "2dsphere")], |
|
{ |
|
"vectorSearchConfig": { |
|
"dimensions": 1536, |
|
"similarity": "cosine", |
|
} |
|
}, |
|
) |
|
except Exception as e: |
|
st.warning(f"Vector index may already exist") |
|
|
|
def get_embedding(self, text: str) -> list: |
|
"""Get embedding vector for text using OpenAI""" |
|
try: |
|
response = openai_client.embeddings.create(model=self.model, input=text) |
|
return response.data[0].embedding |
|
except Exception as e: |
|
st.error(f"Error getting embedding: {str(e)}") |
|
return None |
|
|
|
|
|
def vector_exists(self, doc_name: str) -> bool: |
|
"""Check if vector exists for document""" |
|
return self.vectors_collection.count_documents({"name": doc_name}) > 0 |
|
|
|
|
|
def store_vector(self, doc_name: str, vector: list, text: str, goal: str = None): |
|
"""Store document/goal vector in MongoDB using upsert""" |
|
try: |
|
vector_doc = { |
|
"name": doc_name, |
|
"vector": vector, |
|
"text": text, |
|
"type": "document" if goal is None else "goal", |
|
"goal": goal, |
|
"updated_at": datetime.utcnow(), |
|
} |
|
|
|
|
|
self.vectors_collection.update_one( |
|
{"name": doc_name}, |
|
{"$set": vector_doc, "$setOnInsert": {"created_at": datetime.utcnow()}}, |
|
upsert=True, |
|
) |
|
|
|
except Exception as e: |
|
st.error(f"Error storing vector: {str(e)}") |
|
|
|
|
|
def vector_search(self, query_vector: List[float], limit: int = 5) -> List[Dict]: |
|
"""Search for similar documents using vector similarity""" |
|
try: |
|
|
|
documents = list(self.vectors_collection.find({"type": "document"})) |
|
|
|
|
|
similarities = [] |
|
for doc in documents: |
|
similarity = self.calculate_similarity(query_vector, doc["vector"]) |
|
similarities.append( |
|
{ |
|
"name": doc["name"], |
|
"text": doc["text"], |
|
"similarity": similarity, |
|
"similarity_display": f"{similarity*100:.1f}%", |
|
} |
|
) |
|
|
|
|
|
sorted_docs = sorted( |
|
similarities, |
|
key=lambda x: x["similarity"], |
|
reverse=True, |
|
)[:limit] |
|
|
|
return sorted_docs |
|
|
|
except Exception as e: |
|
st.error(f"Vector search error: {str(e)}") |
|
return [] |
|
|
|
def find_similar_documents(self, text: str, limit: int = 5) -> List[Dict]: |
|
"""Find similar documents for given text""" |
|
vector = self.get_embedding(text) |
|
if vector: |
|
return self.vector_search(vector, limit) |
|
return [] |
|
|
|
def calculate_similarity(self, vector1: list, vector2: list) -> float: |
|
"""Calculate cosine similarity between two vectors""" |
|
return np.dot(vector1, vector2) / ( |
|
np.linalg.norm(vector1) * np.linalg.norm(vector2) |
|
) |
|
|
|
|
|
def display_analysis_results(analysis: Dict): |
|
"""Display analysis results in Streamlit UI""" |
|
if not analysis: |
|
return |
|
|
|
|
|
st.subheader("Themes") |
|
for theme in analysis.get("themes", []): |
|
with st.expander(f"🎯 {theme}"): |
|
|
|
subthemes = analysis.get("subthemes", {}).get(theme, []) |
|
if subthemes: |
|
st.write("**Subthemes:**") |
|
for subtheme in subthemes: |
|
st.write(f"- {subtheme}") |
|
|
|
|
|
st.subheader("Keywords") |
|
keywords = analysis.get("keywords", []) |
|
st.write(" | ".join([f"🔑 {keyword}" for keyword in keywords])) |
|
|
|
|
|
score = analysis.get("relevance_score", 0) |
|
st.metric("Relevance Score", f"{score}%") |
|
|
|
|
|
def display_analyst_dashboard(): |
|
st.title("Multi-Goal Document Analysis") |
|
|
|
with st.sidebar: |
|
st.markdown("### Input Section") |
|
tab1, tab2 = st.tabs(["Document Analysis", "Similarity Search"]) |
|
|
|
|
|
with tab1: |
|
|
|
num_goals = st.number_input("Number of goals:", min_value=1, value=1) |
|
goals = [] |
|
for i in range(num_goals): |
|
goal = st.text_area(f"Goal {i+1}:", key=f"goal_{i}", height=100) |
|
if goal: |
|
goals.append(goal) |
|
|
|
uploaded_files = st.file_uploader( |
|
"Upload documents", |
|
accept_multiple_files=True, |
|
type=["txt", "pdf", "docx"], |
|
) |
|
analyze_button = ( |
|
st.button("Analyze Documents") if goals and uploaded_files else None |
|
) |
|
|
|
with tab2: |
|
|
|
search_text = st.text_area("Enter text to find similar documents:") |
|
search_limit = st.slider("Number of results", 1, 10, 5) |
|
search_button = st.button("Search Similar") if search_text else None |
|
|
|
if st.button("Logout", use_container_width=True): |
|
for key in st.session_state.keys(): |
|
del st.session_state[key] |
|
st.rerun() |
|
|
|
if analyze_button: |
|
analyzer = GoalAnalyzer() |
|
vectorizer = DocumentVectorizer() |
|
|
|
|
|
doc_vectors = {} |
|
goal_vectors = {} |
|
|
|
|
|
with st.spinner("Processing goals..."): |
|
for i, goal in enumerate(goals): |
|
vector = vectorizer.get_embedding(goal) |
|
if vector: |
|
goal_vectors[f"Goal {i+1}"] = vector |
|
vectorizer.store_vector(f"Goal {i+1}", vector, goal, goal) |
|
|
|
|
|
with st.spinner("Processing documents..."): |
|
for file in uploaded_files: |
|
st.markdown(f"### Analysis for {file.name}") |
|
|
|
if vectorizer.vector_exists(file.name): |
|
st.info(f"Vector already exists for {file.name}") |
|
existing_doc = vectorizer.vectors_collection.find_one( |
|
{"name": file.name} |
|
) |
|
doc_vectors[file.name] = existing_doc["vector"] |
|
else: |
|
text = analyzer.extract_text_from_file(file) |
|
if not text: |
|
st.warning(f"Could not extract text from {file.name}") |
|
continue |
|
|
|
vector = vectorizer.get_embedding(text) |
|
if vector: |
|
doc_vectors[file.name] = vector |
|
vectorizer.store_vector(file.name, vector, text) |
|
|
|
|
|
st.subheader("Goal Relevance Scores") |
|
col1, col2 = st.columns([1, 2]) |
|
|
|
with col1: |
|
for goal_name, goal_vector in goal_vectors.items(): |
|
similarity = ( |
|
vectorizer.calculate_similarity( |
|
doc_vectors[file.name], goal_vector |
|
) |
|
* 100 |
|
) |
|
st.metric(f"{goal_name}", f"{similarity:.1f}%") |
|
|
|
with col2: |
|
|
|
analysis = asyncio.run( |
|
analyzer.get_perplexity_analysis(text, " | ".join(goals)) |
|
) |
|
display_analysis_results(analysis) |
|
|
|
st.divider() |
|
|
|
|
|
if len(doc_vectors) > 1: |
|
st.markdown("### Document Similarity Matrix") |
|
files = list(doc_vectors.keys()) |
|
similarity_matrix = [] |
|
|
|
for file1 in files: |
|
row = [] |
|
for file2 in files: |
|
similarity = vectorizer.calculate_similarity( |
|
doc_vectors[file1], doc_vectors[file2] |
|
) |
|
row.append(similarity) |
|
similarity_matrix.append(row) |
|
|
|
df = pd.DataFrame(similarity_matrix, columns=files, index=files) |
|
st.dataframe(df.style.background_gradient(cmap="RdYlGn")) |
|
|
|
|
|
st.markdown("### Goal-Document Similarity Matrix") |
|
goal_doc_matrix = [] |
|
goal_names = list(goal_vectors.keys()) |
|
|
|
for file in files: |
|
row = [] |
|
for goal in goal_names: |
|
similarity = vectorizer.calculate_similarity( |
|
doc_vectors[file], goal_vectors[goal] |
|
) |
|
row.append(similarity) |
|
goal_doc_matrix.append(row) |
|
|
|
df_goals = pd.DataFrame( |
|
goal_doc_matrix, columns=goal_names, index=files |
|
) |
|
st.dataframe(df_goals.style.background_gradient(cmap="RdYlGn")) |
|
|
|
|
|
elif search_button: |
|
vectorizer = DocumentVectorizer() |
|
with st.spinner("Searching similar documents..."): |
|
query_vector = vectorizer.get_embedding(search_text) |
|
if query_vector: |
|
similar_docs = vectorizer.vector_search(query_vector, search_limit) |
|
|
|
if similar_docs: |
|
st.markdown("### Similar Documents Found") |
|
|
|
|
|
df = pd.DataFrame(similar_docs) |
|
|
|
|
|
styled_df = df[["name", "similarity"]].style.background_gradient( |
|
cmap="RdYlGn", subset=["similarity"] |
|
) |
|
|
|
|
|
styled_df = styled_df.format({"similarity": "{:.1%}"}) |
|
|
|
st.dataframe(styled_df) |
|
|
|
|
|
for doc in similar_docs: |
|
with st.expander( |
|
f"📄 {doc['name']} (Similarity: {doc['similarity_display']})" |
|
): |
|
st.text( |
|
doc["text"][:20] + "..." |
|
if len(doc["text"]) > 20 |
|
else doc["text"] |
|
) |
|
else: |
|
st.info("No similar documents found") |
|
else: |
|
st.error("Could not process search query") |
|
|
|
|
|
def main(): |
|
st.title("Multi-Goal Document Analysis") |
|
|
|
with st.sidebar: |
|
st.markdown("### Input Section") |
|
tab1, tab2 = st.tabs(["Document Analysis", "Similarity Search"]) |
|
|
|
|
|
with tab1: |
|
|
|
num_goals = st.number_input("Number of goals:", min_value=1, value=1) |
|
goals = [] |
|
for i in range(num_goals): |
|
goal = st.text_area(f"Goal {i+1}:", key=f"goal_{i}", height=100) |
|
if goal: |
|
goals.append(goal) |
|
|
|
uploaded_files = st.file_uploader( |
|
"Upload documents", |
|
accept_multiple_files=True, |
|
type=["txt", "pdf", "docx"], |
|
) |
|
analyze_button = ( |
|
st.button("Analyze Documents") if goals and uploaded_files else None |
|
) |
|
|
|
with tab2: |
|
|
|
search_text = st.text_area("Enter text to find similar documents:") |
|
search_limit = st.slider("Number of results", 1, 10, 5) |
|
search_button = st.button("Search Similar") if search_text else None |
|
|
|
if analyze_button: |
|
analyzer = GoalAnalyzer() |
|
vectorizer = DocumentVectorizer() |
|
|
|
|
|
doc_vectors = {} |
|
goal_vectors = {} |
|
|
|
|
|
with st.spinner("Processing goals..."): |
|
for i, goal in enumerate(goals): |
|
vector = vectorizer.get_embedding(goal) |
|
if vector: |
|
goal_vectors[f"Goal {i+1}"] = vector |
|
vectorizer.store_vector(f"Goal {i+1}", vector, goal, goal) |
|
|
|
|
|
with st.spinner("Processing documents..."): |
|
for file in uploaded_files: |
|
st.markdown(f"### Analysis for {file.name}") |
|
|
|
if vectorizer.vector_exists(file.name): |
|
st.info(f"Vector already exists for {file.name}") |
|
existing_doc = vectorizer.vectors_collection.find_one( |
|
{"name": file.name} |
|
) |
|
doc_vectors[file.name] = existing_doc["vector"] |
|
else: |
|
text = analyzer.extract_text_from_file(file) |
|
if not text: |
|
st.warning(f"Could not extract text from {file.name}") |
|
continue |
|
|
|
vector = vectorizer.get_embedding(text) |
|
if vector: |
|
doc_vectors[file.name] = vector |
|
vectorizer.store_vector(file.name, vector, text) |
|
|
|
|
|
st.subheader("Goal Relevance Scores") |
|
col1, col2 = st.columns([1, 2]) |
|
|
|
with col1: |
|
for goal_name, goal_vector in goal_vectors.items(): |
|
similarity = ( |
|
vectorizer.calculate_similarity( |
|
doc_vectors[file.name], goal_vector |
|
) |
|
* 100 |
|
) |
|
st.metric(f"{goal_name}", f"{similarity:.1f}%") |
|
|
|
with col2: |
|
|
|
analysis = asyncio.run( |
|
analyzer.get_perplexity_analysis(text, " | ".join(goals)) |
|
) |
|
display_analysis_results(analysis) |
|
|
|
st.divider() |
|
|
|
|
|
if len(doc_vectors) > 1: |
|
st.markdown("### Document Similarity Matrix") |
|
files = list(doc_vectors.keys()) |
|
similarity_matrix = [] |
|
|
|
for file1 in files: |
|
row = [] |
|
for file2 in files: |
|
similarity = vectorizer.calculate_similarity( |
|
doc_vectors[file1], doc_vectors[file2] |
|
) |
|
row.append(similarity) |
|
similarity_matrix.append(row) |
|
|
|
df = pd.DataFrame(similarity_matrix, columns=files, index=files) |
|
st.dataframe(df.style.background_gradient(cmap="RdYlGn")) |
|
|
|
|
|
st.markdown("### Goal-Document Similarity Matrix") |
|
goal_doc_matrix = [] |
|
goal_names = list(goal_vectors.keys()) |
|
|
|
for file in files: |
|
row = [] |
|
for goal in goal_names: |
|
similarity = vectorizer.calculate_similarity( |
|
doc_vectors[file], goal_vectors[goal] |
|
) |
|
row.append(similarity) |
|
goal_doc_matrix.append(row) |
|
|
|
df_goals = pd.DataFrame( |
|
goal_doc_matrix, columns=goal_names, index=files |
|
) |
|
st.dataframe(df_goals.style.background_gradient(cmap="RdYlGn")) |
|
|
|
|
|
elif search_button: |
|
vectorizer = DocumentVectorizer() |
|
with st.spinner("Searching similar documents..."): |
|
query_vector = vectorizer.get_embedding(search_text) |
|
if query_vector: |
|
similar_docs = vectorizer.vector_search(query_vector, search_limit) |
|
|
|
if similar_docs: |
|
st.markdown("### Similar Documents Found") |
|
|
|
|
|
df = pd.DataFrame(similar_docs) |
|
|
|
|
|
styled_df = df[["name", "similarity"]].style.background_gradient( |
|
cmap="RdYlGn", subset=["similarity"] |
|
) |
|
|
|
|
|
styled_df = styled_df.format({"similarity": "{:.1%}"}) |
|
|
|
st.dataframe(styled_df) |
|
|
|
|
|
for doc in similar_docs: |
|
with st.expander( |
|
f"📄 {doc['name']} (Similarity: {doc['similarity_display']})" |
|
): |
|
st.text( |
|
doc["text"][:20] + "..." |
|
if len(doc["text"]) > 20 |
|
else doc["text"] |
|
) |
|
else: |
|
st.info("No similar documents found") |
|
else: |
|
st.error("Could not process search query") |
|
|
|
|
|
if __name__ == "__main__": |
|
main() |
|
|