2B / app /ui /streamlit_app.py
37-AN
Fix Streamlit cache_resource unhashable parameter error
48a1a2b
raw
history blame
9.66 kB
import streamlit as st
import os
import sys
import tempfile
from datetime import datetime
from typing import List, Dict, Any
import time
import logging
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Add project root to path for imports
sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
# Use relative imports when running as part of the app package
try:
from app.core.agent import AssistantAgent
from app.core.ingestion import DocumentProcessor
from app.utils.helpers import get_document_path, format_sources, save_conversation
from app.config import LLM_MODEL, EMBEDDING_MODEL
except ImportError:
# Fallback to direct imports if app is not recognized as a package
sys.path.append(os.path.abspath('.'))
from app.core.agent import AssistantAgent
from app.core.ingestion import DocumentProcessor
from app.utils.helpers import get_document_path, format_sources, save_conversation
from app.config import LLM_MODEL, EMBEDDING_MODEL
# Set page config
st.set_page_config(
page_title="Personal AI Assistant (Hugging Face)",
page_icon="🤗",
layout="wide"
)
# Function to initialize the agent safely
@st.cache_resource
def get_agent():
logger.info("Initializing AssistantAgent (should only happen once)")
try:
return AssistantAgent()
except Exception as e:
logger.error(f"Error initializing agent: {e}")
st.error(f"Could not initialize AI assistant: {str(e)}")
# Return a dummy agent as fallback
class DummyAgent:
def query(self, question):
return {
"answer": "I'm having trouble starting up. Please try refreshing the page.",
"sources": []
}
def add_conversation_to_memory(self, *args, **kwargs):
pass
return DummyAgent()
# Function to initialize document processor safely
@st.cache_resource
def get_document_processor(_agent):
"""Initialize document processor with unhashable agent parameter.
The leading underscore in _agent tells Streamlit not to hash this parameter.
"""
logger.info("Initializing DocumentProcessor (should only happen once)")
try:
return DocumentProcessor(_agent.memory_manager)
except Exception as e:
logger.error(f"Error initializing document processor: {e}")
st.error(f"Could not initialize document processor: {str(e)}")
# Return a dummy processor as fallback
class DummyProcessor:
def ingest_file(self, *args, **kwargs):
return ["dummy-id"]
def ingest_text(self, *args, **kwargs):
return ["dummy-id"]
return DummyProcessor()
# Initialize session state variables
if "messages" not in st.session_state:
st.session_state.messages = []
# Initialize agent and document processor with caching to prevent multiple instances
agent = get_agent()
document_processor = get_document_processor(agent)
# App title
st.title("🤗 Personal AI Assistant (Hugging Face)")
# Create a sidebar for uploading documents and settings
with st.sidebar:
st.header("Upload Documents")
uploaded_file = st.file_uploader("Choose a file", type=["pdf", "txt", "csv"])
if uploaded_file is not None:
# Create a temporary file
with tempfile.NamedTemporaryFile(delete=False, suffix=f".{uploaded_file.name.split('.')[-1]}") as tmp:
tmp.write(uploaded_file.getvalue())
tmp_path = tmp.name
if st.button("Process Document"):
with st.spinner("Processing document..."):
try:
# Get a path to store the document
doc_path = get_document_path(uploaded_file.name)
# Copy the file to the documents directory
with open(doc_path, "wb") as f:
f.write(uploaded_file.getvalue())
# Ingest the document
document_processor.ingest_file(tmp_path, {"original_name": uploaded_file.name})
# Clean up the temporary file
os.unlink(tmp_path)
st.success(f"Document {uploaded_file.name} processed successfully!")
except Exception as e:
st.error(f"Error processing document: {str(e)}")
st.header("Raw Text Input")
text_input = st.text_area("Enter text to add to the knowledge base")
if st.button("Add Text"):
if text_input:
with st.spinner("Adding text to knowledge base..."):
try:
# Create metadata
metadata = {
"type": "manual_input",
"timestamp": str(datetime.now())
}
# Ingest the text
document_processor.ingest_text(text_input, metadata)
st.success("Text added to knowledge base successfully!")
except Exception as e:
st.error(f"Error adding text: {str(e)}")
# Display model information
st.header("Models")
st.write(f"**LLM**: [{LLM_MODEL}](https://huggingface.co/{LLM_MODEL})")
st.write(f"**Embeddings**: [{EMBEDDING_MODEL}](https://huggingface.co/{EMBEDDING_MODEL})")
# Add Hugging Face deployment info
st.header("Deployment")
st.write("This app can be easily deployed to [Hugging Face Spaces](https://huggingface.co/spaces) for free hosting.")
# Link to Hugging Face
st.markdown("""
<div style="text-align: center; margin-top: 20px;">
<a href="https://huggingface.co" target="_blank">
<img src="https://huggingface.co/front/assets/huggingface_logo.svg" width="200" alt="Hugging Face">
</a>
</div>
""", unsafe_allow_html=True)
# Display chat messages
for message in st.session_state.messages:
with st.chat_message(message["role"]):
st.write(message["content"])
# Display sources if available
if message["role"] == "assistant" and "sources" in message:
with st.expander("View Sources"):
sources = message["sources"]
if sources:
for i, source in enumerate(sources, 1):
st.write(f"{i}. {source['file_name']}" + (f" (Page {source['page']})" if source.get('page') else ""))
st.text(source['content'])
else:
st.write("No specific sources used.")
# Chat input
if prompt := st.chat_input("Ask a question..."):
# Add user message to chat history
st.session_state.messages.append({"role": "user", "content": prompt})
# Display user message
with st.chat_message("user"):
st.write(prompt)
# Generate response
with st.chat_message("assistant"):
with st.spinner("Thinking..."):
try:
# Add retry mechanism for vector store issues
max_retries = 3
for attempt in range(max_retries):
try:
response = agent.query(prompt)
break
except Exception as e:
if "already accessed by another instance" in str(e) and attempt < max_retries - 1:
logger.warning(f"Vector store access conflict, retrying ({attempt+1}/{max_retries})...")
time.sleep(1) # Wait before retrying
else:
raise
answer = response["answer"]
sources = response["sources"]
# Display the response
st.write(answer)
# Display sources in an expander
with st.expander("View Sources"):
if sources:
for i, source in enumerate(sources, 1):
st.write(f"{i}. {source['file_name']}" + (f" (Page {source['page']})" if source.get('page') else ""))
st.text(source['content'])
else:
st.write("No specific sources used.")
# Save conversation
save_conversation(prompt, answer, sources)
# Add assistant response to chat history
st.session_state.messages.append({
"role": "assistant",
"content": answer,
"sources": sources
})
# Update the agent's memory
agent.add_conversation_to_memory(prompt, answer)
except Exception as e:
error_msg = f"Error generating response: {str(e)}"
logger.error(error_msg)
st.error(error_msg)
st.session_state.messages.append({
"role": "assistant",
"content": "I'm sorry, I encountered an error while processing your request. Please try again or refresh the page.",
"sources": []
})
# Add a footer
st.markdown("---")
st.markdown("Built with LangChain, Hugging Face, and Qdrant")
if __name__ == "__main__":
# This is used when running the file directly
pass