Spaces:
Build error
Build error
import streamlit as st | |
import os | |
import json | |
import requests | |
import pdfplumber | |
import chromadb | |
import re | |
from langchain.document_loaders import PDFPlumberLoader | |
from langchain_huggingface import HuggingFaceEmbeddings | |
from langchain_experimental.text_splitter import SemanticChunker | |
from langchain_chroma import Chroma | |
from langchain.chains import LLMChain | |
from langchain.prompts import PromptTemplate | |
from langchain_groq import ChatGroq | |
from prompts import rag_prompt, relevancy_prompt, relevant_context_picker_prompt, response_synth | |
# ----------------- Streamlit UI Setup ----------------- | |
st.set_page_config(page_title="Blah-1", layout="centered") | |
# ----------------- API Keys ----------------- | |
os.environ["GROQ_API_KEY"] = st.secrets.get("GROQ_API_KEY", "") | |
# Load LLM models | |
llm_judge = ChatGroq(model="deepseek-r1-distill-llama-70b") | |
rag_llm = ChatGroq(model="mixtral-8x7b-32768") | |
llm_judge.verbose = True | |
rag_llm.verbose = True | |
# Clear ChromaDB cache to fix tenant issue | |
chromadb.api.client.SharedSystemClient.clear_system_cache() | |
# ----------------- ChromaDB Persistent Directory ----------------- | |
CHROMA_DB_DIR = "/mnt/data/chroma_db" | |
os.makedirs(CHROMA_DB_DIR, exist_ok=True) | |
# ----------------- Initialize Session State ----------------- | |
if "pdf_loaded" not in st.session_state: | |
st.session_state.pdf_loaded = False | |
if "chunked" not in st.session_state: | |
st.session_state.chunked = False | |
if "vector_created" not in st.session_state: | |
st.session_state.vector_created = False | |
if "processed_chunks" not in st.session_state: | |
st.session_state.processed_chunks = None | |
if "vector_store" not in st.session_state: | |
st.session_state.vector_store = None | |
# ----------------- Metadata Extraction ----------------- | |
def extract_metadata_llm(pdf_path): | |
"""Extracts metadata using LLM instead of regex and logs progress in Streamlit UI.""" | |
with pdfplumber.open(pdf_path) as pdf: | |
first_page_text = pdf.pages[0].extract_text() or "No text found." if pdf.pages else "No text found." | |
# Streamlit Debugging: Show extracted text | |
st.subheader("π Extracted First Page Text for Metadata") | |
st.text_area("First Page Text:", first_page_text, height=200) | |
# Define metadata prompt | |
metadata_prompt = PromptTemplate( | |
input_variables=["text"], | |
template=""" | |
Given the following first page of a research paper, extract metadata **strictly in JSON format**. | |
- If no data is found for a field, return `"Unknown"` instead. | |
- Ensure the output is valid JSON (do not include markdown syntax). | |
Example output: | |
{ | |
"Title": "Example Paper Title", | |
"Author": "John Doe, Jane Smith", | |
"Emails": "[email protected], [email protected]", | |
"Affiliations": "School of AI, University of Example" | |
} | |
Now, extract the metadata from this document: | |
{text} | |
""" | |
) | |
# Run LLM Metadata Extraction | |
metadata_chain = LLMChain(llm=llm_judge, prompt=metadata_prompt, output_key="metadata") | |
# Debugging: Log the LLM input | |
st.subheader("π LLM Input for Metadata Extraction") | |
st.json({"text": first_page_text}) | |
try: | |
metadata_response = metadata_chain.invoke({"text": first_page_text}) | |
# Debugging: Log raw LLM response | |
st.subheader("π Raw LLM Response") | |
st.json(metadata_response) | |
# Handle JSON extraction from LLM response | |
try: | |
metadata_dict = json.loads(metadata_response["metadata"]) | |
except json.JSONDecodeError: | |
try: | |
# Attempt to clean up JSON if needed | |
metadata_dict = json.loads(metadata_response["metadata"].strip("```json\n").strip("\n```")) | |
except json.JSONDecodeError: | |
metadata_dict = { | |
"Title": "Unknown", | |
"Author": "Unknown", | |
"Emails": "No emails found", | |
"Affiliations": "No affiliations found" | |
} | |
except Exception as e: | |
st.error(f"β LLM Metadata Extraction Failed: {e}") | |
metadata_dict = { | |
"Title": "Unknown", | |
"Author": "Unknown", | |
"Emails": "No emails found", | |
"Affiliations": "No affiliations found" | |
} | |
# Ensure all required fields exist | |
required_fields = ["Title", "Author", "Emails", "Affiliations"] | |
for field in required_fields: | |
metadata_dict.setdefault(field, "Unknown") | |
# Streamlit Debugging: Display Final Extracted Metadata | |
st.subheader("β Extracted Metadata") | |
st.json(metadata_dict) | |
return metadata_dict | |
# ----------------- Step 1: Choose PDF Source ----------------- | |
pdf_source = st.radio("Upload or provide a link to a PDF:", ["Upload a PDF file", "Enter a PDF URL"], index=0, horizontal=True) | |
if pdf_source == "Upload a PDF file": | |
uploaded_file = st.file_uploader("Upload your PDF file", type=["pdf"]) | |
if uploaded_file: | |
st.session_state.pdf_path = "/mnt/data/temp.pdf" | |
with open(st.session_state.pdf_path, "wb") as f: | |
f.write(uploaded_file.getbuffer()) | |
st.session_state.pdf_loaded = False | |
st.session_state.chunked = False | |
st.session_state.vector_created = False | |
elif pdf_source == "Enter a PDF URL": | |
pdf_url = st.text_input("Enter PDF URL:") | |
if pdf_url and not st.session_state.pdf_loaded: | |
with st.spinner("π Downloading PDF..."): | |
try: | |
response = requests.get(pdf_url) | |
if response.status_code == 200: | |
st.session_state.pdf_path = "/mnt/data/temp.pdf" | |
with open(st.session_state.pdf_path, "wb") as f: | |
f.write(response.content) | |
st.session_state.pdf_loaded = False | |
st.session_state.chunked = False | |
st.session_state.vector_created = False | |
st.success("β PDF Downloaded Successfully!") | |
else: | |
st.error("β Failed to download PDF. Check the URL.") | |
except Exception as e: | |
st.error(f"Error downloading PDF: {e}") | |
# ----------------- Process PDF ----------------- | |
if not st.session_state.pdf_loaded and "pdf_path" in st.session_state: | |
with st.spinner("π Processing document... Please wait."): | |
loader = PDFPlumberLoader(st.session_state.pdf_path) | |
docs = loader.load() | |
st.json(docs[0].metadata) | |
# Extract metadata | |
metadata = extract_metadata_llm(st.session_state.pdf_path) | |
# Display extracted-metadata | |
if isinstance(metadata, dict): | |
st.subheader("π Extracted Document Metadata") | |
st.write(f"**Title:** {metadata.get('Title', 'Unknown')}") | |
st.write(f"**Author:** {metadata.get('Author', 'Unknown')}") | |
st.write(f"**Emails:** {metadata.get('Emails', 'No emails found')}") | |
st.write(f"**Affiliations:** {metadata.get('Affiliations', 'No affiliations found')}") | |
else: | |
st.error("Metadata extraction failed. Check the LLM response format.") | |
# Embedding Model | |
model_name = "nomic-ai/modernbert-embed-base" | |
embedding_model = HuggingFaceEmbeddings(model_name=model_name, model_kwargs={"device": "cpu"}, encode_kwargs={'normalize_embeddings': False}) | |
# Convert metadata into a retrievable chunk | |
metadata_doc = {"page_content": metadata, "metadata": {"source": "metadata"}} | |
# Prevent unnecessary re-chunking | |
if not st.session_state.chunked: | |
text_splitter = SemanticChunker(embedding_model) | |
document_chunks = text_splitter.split_documents(docs) | |
document_chunks.insert(0, metadata_doc) # Insert metadata as a retrievable document | |
st.session_state.processed_chunks = document_chunks | |
st.session_state.chunked = True | |
st.session_state.pdf_loaded = True | |
st.success("β Document processed and chunked successfully!") | |
# ----------------- Setup Vector Store ----------------- | |
if not st.session_state.vector_created and st.session_state.processed_chunks: | |
with st.spinner("π Initializing Vector Store..."): | |
st.session_state.vector_store = Chroma( | |
persist_directory=CHROMA_DB_DIR, # <-- Ensures persistence | |
collection_name="deepseek_collection", | |
collection_metadata={"hnsw:space": "cosine"}, | |
embedding_function=embedding_model | |
) | |
st.session_state.vector_store.add_documents(st.session_state.processed_chunks) | |
st.session_state.vector_created = True | |
st.success("β Vector store initialized successfully!") | |
# ----------------- Query Input ----------------- | |
query = st.text_input("π Ask a question about the document:") | |
if query: | |
with st.spinner("π Retrieving relevant context..."): | |
retriever = st.session_state.vector_store.as_retriever(search_type="similarity", search_kwargs={"k": 5}) | |
retrieved_docs = retriever.invoke(query) | |
context = [d.page_content for d in retrieved_docs] | |
st.success("β Context retrieved successfully!") | |
# ----------------- Run Individual Chains Explicitly ----------------- | |
context_relevancy_chain = LLMChain(llm=llm_judge, prompt=PromptTemplate(input_variables=["retriever_query", "context"], template=relevancy_prompt), output_key="relevancy_response") | |
relevant_context_chain = LLMChain(llm=llm_judge, prompt=PromptTemplate(input_variables=["relevancy_response"], template=relevant_context_picker_prompt), output_key="context_number") | |
relevant_contexts_chain = LLMChain(llm=llm_judge, prompt=PromptTemplate(input_variables=["context_number", "context"], template=response_synth), output_key="relevant_contexts") | |
response_chain = LLMChain(llm=rag_llm, prompt=PromptTemplate(input_variables=["query", "context"], template=rag_prompt), output_key="final_response") | |
response_crisis = context_relevancy_chain.invoke({"context": context, "retriever_query": query}) | |
relevant_response = relevant_context_chain.invoke({"relevancy_response": response_crisis["relevancy_response"]}) | |
contexts = relevant_contexts_chain.invoke({"context_number": relevant_response["context_number"], "context": context}) | |
final_response = response_chain.invoke({"query": query, "context": contexts["relevant_contexts"]}) | |
# ----------------- Display All Outputs ----------------- | |
st.markdown("### Context Relevancy Evaluation") | |
st.json(response_crisis["relevancy_response"]) | |
st.markdown("### Picked Relevant Contexts") | |
st.json(relevant_response["context_number"]) | |
st.markdown("### Extracted Relevant Contexts") | |
st.json(contexts["relevant_contexts"]) | |
st.subheader("context_relevancy_evaluation_chain Statement") | |
st.json(final_response["relevancy_response"]) | |
st.subheader("pick_relevant_context_chain Statement") | |
st.json(final_response["context_number"]) | |
st.subheader("relevant_contexts_chain Statement") | |
st.json(final_response["relevant_contexts"]) | |
st.subheader("RAG Response Statement") | |
st.json(final_response["final_response"]) | |