|
import streamlit as st |
|
import os |
|
import sys |
|
import tempfile |
|
from datetime import datetime |
|
from typing import List, Dict, Any |
|
import time |
|
import logging |
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))) |
|
|
|
|
|
try: |
|
from app.core.agent import AssistantAgent |
|
from app.core.ingestion import DocumentProcessor |
|
from app.utils.helpers import get_document_path, format_sources, save_conversation |
|
from app.config import LLM_MODEL, EMBEDDING_MODEL |
|
except ImportError: |
|
|
|
sys.path.append(os.path.abspath('.')) |
|
from app.core.agent import AssistantAgent |
|
from app.core.ingestion import DocumentProcessor |
|
from app.utils.helpers import get_document_path, format_sources, save_conversation |
|
from app.config import LLM_MODEL, EMBEDDING_MODEL |
|
|
|
|
|
st.set_page_config( |
|
page_title="Personal AI Assistant (Hugging Face)", |
|
page_icon="🤗", |
|
layout="wide" |
|
) |
|
|
|
|
|
@st.cache_resource |
|
def get_agent(): |
|
logger.info("Initializing AssistantAgent (should only happen once)") |
|
try: |
|
return AssistantAgent() |
|
except Exception as e: |
|
logger.error(f"Error initializing agent: {e}") |
|
st.error(f"Could not initialize AI assistant: {str(e)}") |
|
|
|
class DummyAgent: |
|
def query(self, question): |
|
return { |
|
"answer": "I'm having trouble starting up. Please try refreshing the page.", |
|
"sources": [] |
|
} |
|
def add_conversation_to_memory(self, *args, **kwargs): |
|
pass |
|
return DummyAgent() |
|
|
|
|
|
@st.cache_resource |
|
def get_document_processor(_agent): |
|
"""Initialize document processor with unhashable agent parameter. |
|
The leading underscore in _agent tells Streamlit not to hash this parameter. |
|
""" |
|
logger.info("Initializing DocumentProcessor (should only happen once)") |
|
try: |
|
return DocumentProcessor(_agent.memory_manager) |
|
except Exception as e: |
|
logger.error(f"Error initializing document processor: {e}") |
|
st.error(f"Could not initialize document processor: {str(e)}") |
|
|
|
class DummyProcessor: |
|
def ingest_file(self, *args, **kwargs): |
|
return ["dummy-id"] |
|
def ingest_text(self, *args, **kwargs): |
|
return ["dummy-id"] |
|
return DummyProcessor() |
|
|
|
|
|
if "messages" not in st.session_state: |
|
st.session_state.messages = [] |
|
|
|
|
|
agent = get_agent() |
|
document_processor = get_document_processor(agent) |
|
|
|
|
|
st.title("🤗 Personal AI Assistant (Hugging Face)") |
|
|
|
|
|
with st.sidebar: |
|
st.header("Upload Documents") |
|
uploaded_file = st.file_uploader("Choose a file", type=["pdf", "txt", "csv"]) |
|
|
|
if uploaded_file is not None: |
|
|
|
with tempfile.NamedTemporaryFile(delete=False, suffix=f".{uploaded_file.name.split('.')[-1]}") as tmp: |
|
tmp.write(uploaded_file.getvalue()) |
|
tmp_path = tmp.name |
|
|
|
if st.button("Process Document"): |
|
with st.spinner("Processing document..."): |
|
try: |
|
|
|
doc_path = get_document_path(uploaded_file.name) |
|
|
|
|
|
with open(doc_path, "wb") as f: |
|
f.write(uploaded_file.getvalue()) |
|
|
|
|
|
document_processor.ingest_file(tmp_path, {"original_name": uploaded_file.name}) |
|
|
|
|
|
os.unlink(tmp_path) |
|
|
|
st.success(f"Document {uploaded_file.name} processed successfully!") |
|
except Exception as e: |
|
st.error(f"Error processing document: {str(e)}") |
|
|
|
st.header("Raw Text Input") |
|
text_input = st.text_area("Enter text to add to the knowledge base") |
|
|
|
if st.button("Add Text"): |
|
if text_input: |
|
with st.spinner("Adding text to knowledge base..."): |
|
try: |
|
|
|
metadata = { |
|
"type": "manual_input", |
|
"timestamp": str(datetime.now()) |
|
} |
|
|
|
|
|
document_processor.ingest_text(text_input, metadata) |
|
|
|
st.success("Text added to knowledge base successfully!") |
|
except Exception as e: |
|
st.error(f"Error adding text: {str(e)}") |
|
|
|
|
|
st.header("Models") |
|
st.write(f"**LLM**: [{LLM_MODEL}](https://huggingface.co/{LLM_MODEL})") |
|
st.write(f"**Embeddings**: [{EMBEDDING_MODEL}](https://huggingface.co/{EMBEDDING_MODEL})") |
|
|
|
|
|
st.header("Deployment") |
|
st.write("This app can be easily deployed to [Hugging Face Spaces](https://huggingface.co/spaces) for free hosting.") |
|
|
|
|
|
st.markdown(""" |
|
<div style="text-align: center; margin-top: 20px;"> |
|
<a href="https://huggingface.co" target="_blank"> |
|
<img src="https://huggingface.co/front/assets/huggingface_logo.svg" width="200" alt="Hugging Face"> |
|
</a> |
|
</div> |
|
""", unsafe_allow_html=True) |
|
|
|
|
|
for message in st.session_state.messages: |
|
with st.chat_message(message["role"]): |
|
st.write(message["content"]) |
|
|
|
|
|
if message["role"] == "assistant" and "sources" in message: |
|
with st.expander("View Sources"): |
|
sources = message["sources"] |
|
if sources: |
|
for i, source in enumerate(sources, 1): |
|
st.write(f"{i}. {source['file_name']}" + (f" (Page {source['page']})" if source.get('page') else "")) |
|
st.text(source['content']) |
|
else: |
|
st.write("No specific sources used.") |
|
|
|
|
|
if prompt := st.chat_input("Ask a question..."): |
|
|
|
st.session_state.messages.append({"role": "user", "content": prompt}) |
|
|
|
|
|
with st.chat_message("user"): |
|
st.write(prompt) |
|
|
|
|
|
with st.chat_message("assistant"): |
|
with st.spinner("Thinking..."): |
|
try: |
|
|
|
max_retries = 3 |
|
for attempt in range(max_retries): |
|
try: |
|
response = agent.query(prompt) |
|
break |
|
except Exception as e: |
|
if "already accessed by another instance" in str(e) and attempt < max_retries - 1: |
|
logger.warning(f"Vector store access conflict, retrying ({attempt+1}/{max_retries})...") |
|
time.sleep(1) |
|
else: |
|
raise |
|
|
|
answer = response["answer"] |
|
sources = response["sources"] |
|
|
|
|
|
st.write(answer) |
|
|
|
|
|
with st.expander("View Sources"): |
|
if sources: |
|
for i, source in enumerate(sources, 1): |
|
st.write(f"{i}. {source['file_name']}" + (f" (Page {source['page']})" if source.get('page') else "")) |
|
st.text(source['content']) |
|
else: |
|
st.write("No specific sources used.") |
|
|
|
|
|
save_conversation(prompt, answer, sources) |
|
|
|
|
|
st.session_state.messages.append({ |
|
"role": "assistant", |
|
"content": answer, |
|
"sources": sources |
|
}) |
|
|
|
|
|
agent.add_conversation_to_memory(prompt, answer) |
|
|
|
except Exception as e: |
|
error_msg = f"Error generating response: {str(e)}" |
|
logger.error(error_msg) |
|
st.error(error_msg) |
|
st.session_state.messages.append({ |
|
"role": "assistant", |
|
"content": "I'm sorry, I encountered an error while processing your request. Please try again or refresh the page.", |
|
"sources": [] |
|
}) |
|
|
|
|
|
st.markdown("---") |
|
st.markdown("Built with LangChain, Hugging Face, and Qdrant") |
|
|
|
if __name__ == "__main__": |
|
|
|
pass |