Spaces:
Sleeping
Sleeping
| # app.py | |
| import streamlit as st | |
| import os | |
| from io import BytesIO | |
| from PyPDF2 import PdfReader | |
| from langchain.text_splitter import RecursiveCharacterTextSplitter | |
| from langchain_community.embeddings import HuggingFaceEmbeddings | |
| from langchain_community.vectorstores import FAISS | |
| from langchain_community.docstore.in_memory import InMemoryDocstore | |
| from langchain_community.llms import HuggingFaceHub | |
| from langchain.chains import RetrievalQA | |
| from langchain.prompts import PromptTemplate | |
| import faiss | |
| import uuid | |
| from dotenv import load_dotenv | |
| # Load environment variables | |
| load_dotenv() | |
| HUGGINGFACEHUB_API_TOKEN = os.getenv("HUGGINGFACEHUB_API_TOKEN", "").strip() | |
| RAG_ACCESS_KEY = os.getenv("RAG_ACCESS_KEY") | |
| if not HUGGINGFACEHUB_API_TOKEN: | |
| st.warning("Hugging Face API token not found! Please set HUGGINGFACEHUB_API_TOKEN in your .env file.") | |
| # Initialize session state | |
| if "vectorstore" not in st.session_state: | |
| st.session_state.vectorstore = None | |
| if "history" not in st.session_state: | |
| st.session_state.history = [] | |
| if "authenticated" not in st.session_state: | |
| st.session_state.authenticated = False | |
| # PDF processing logic | |
| def process_input(input_data): | |
| # Initialize progress bar and status | |
| progress_bar = st.progress(0) | |
| status = st.empty() | |
| # Step 1: Read PDF file in memory | |
| status.text("Reading PDF file...") | |
| progress_bar.progress(0.25) | |
| pdf_reader = PdfReader(BytesIO(input_data.read())) | |
| documents = "".join([page.extract_text() or "" for page in pdf_reader.pages]) | |
| # Step 2: Split text | |
| status.text("Splitting text into chunks...") | |
| progress_bar.progress(0.50) | |
| text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=100) | |
| texts = text_splitter.split_text(documents) | |
| # Step 3: Create embeddings | |
| status.text("Creating embeddings...") | |
| progress_bar.progress(0.75) | |
| hf_embeddings = HuggingFaceEmbeddings( | |
| model_name="sentence-transformers/all-mpnet-base-v2", | |
| model_kwargs={'device': 'cpu'} | |
| ) | |
| # Step 4: Initialize FAISS vector store | |
| status.text("Building vector store...") | |
| progress_bar.progress(1.0) | |
| dimension = len(hf_embeddings.embed_query("test")) | |
| index = faiss.IndexFlatL2(dimension) | |
| vector_store = FAISS( | |
| embedding_function=hf_embeddings, | |
| index=index, | |
| docstore=InMemoryDocstore({}), | |
| index_to_docstore_id={} | |
| ) | |
| # Add texts to vector store | |
| uuids = [str(uuid.uuid4()) for _ in texts] | |
| vector_store.add_texts(texts, ids=uuids) | |
| # Complete processing | |
| status.text("Processing complete!") | |
| return vector_store | |
| # Question-answering logic | |
| def answer_question(vectorstore, query): | |
| if not HUGGINGFACEHUB_API_TOKEN: | |
| raise RuntimeError("Missing Hugging Face API token. Please set it in your secrets.") | |
| llm = HuggingFaceHub( | |
| repo_id="mistralai/Mistral-7B-Instruct-v0.1", | |
| model_kwargs={"temperature": 0.7, "max_length": 512}, | |
| huggingfacehub_api_token=HUGGINGFACEHUB_API_TOKEN | |
| ) | |
| retriever = vectorstore.as_retriever(search_kwargs={"k": 3}) | |
| prompt_template = PromptTemplate( | |
| template="Use the context to answer the question concisely:\n\nContext: {context}\n\nQuestion: {question}\n\nAnswer:", | |
| input_variables=["context", "question"] | |
| ) | |
| qa_chain = RetrievalQA.from_chain_type( | |
| llm=llm, | |
| chain_type="stuff", | |
| retriever=retriever, | |
| return_source_documents=False, | |
| chain_type_kwargs={"prompt": prompt_template} | |
| ) | |
| result = qa_chain({"query": query}) | |
| return result["result"].split("Answer:")[-1].strip() | |
| # Sidebar with BSNL logo and authentication | |
| with st.sidebar: | |
| try: | |
| st.image("bsnl_logo.png", width=200) | |
| except Exception: | |
| st.warning("BSNL logo not found.") | |
| st.header("RAG Control Panel") | |
| api_key_input = st.text_input("Enter RAG Access Key", type="password") | |
| # Blue authenticate button style | |
| st.markdown(""" | |
| <style> | |
| .auth-button button { | |
| background-color: #007BFF !important; | |
| color: white !important; | |
| font-weight: bold; | |
| border-radius: 8px; | |
| padding: 10px 20px; | |
| border: none; | |
| transition: all 0.3s ease; | |
| width: 100%; | |
| } | |
| .auth-button button:hover { | |
| background-color: #0056b3 !important; | |
| transform: scale(1.05); | |
| } | |
| </style> | |
| """, unsafe_allow_html=True) | |
| with st.container(): | |
| st.markdown('<div class="auth-button">', unsafe_allow_html=True) | |
| if st.button("Authenticate"): | |
| if api_key_input == RAG_ACCESS_KEY and RAG_ACCESS_KEY is not None: | |
| st.session_state.authenticated = True | |
| st.success("Authentication successful!") | |
| else: | |
| st.error("Invalid API key.") | |
| st.markdown('</div>', unsafe_allow_html=True) | |
| if st.session_state.authenticated: | |
| input_data = st.file_uploader("Upload a PDF file", type=["pdf"]) | |
| if st.button("Process File") and input_data is not None: | |
| try: | |
| vector_store = process_input(input_data) | |
| st.session_state.vectorstore = vector_store | |
| st.success("File processed successfully. You can now ask questions.") | |
| except PermissionError as e: | |
| st.error(f"File upload failed: Permission error - {str(e)}. Check file system access.") | |
| except OSError as e: | |
| st.error(f"File upload failed: OS error - {str(e)}. Check server configuration.") | |
| except Exception as e: | |
| st.error(f"File upload failed: {str(e)} (Exception type: {type(e).__name__}). Please try again or check server logs.") | |
| st.subheader("Chat History") | |
| for i, (q, a) in enumerate(st.session_state.history): | |
| st.write(f"**Q{i+1}:** {q}") | |
| st.write(f"**A{i+1}:** {a}") | |
| st.markdown("---") | |
| # Main app UI | |
| def main(): | |
| st.markdown(""" | |
| <style> | |
| @import url('https://fonts.googleapis.com/css2?family=Roboto:wght@400;700&display=swap'); | |
| .stApp { | |
| background-color: #FFFFFF; | |
| font-family: 'Roboto', sans-serif; | |
| color: #333333; | |
| } | |
| .stTextInput > div > div > input { | |
| background-color: #FFFFFF; | |
| color: #333333; | |
| border-radius: 8px; | |
| border: 1px solid #007BFF; | |
| padding: 10px; | |
| box-shadow: 0 2px 4px rgba(0,0,0,0.1); | |
| } | |
| .stButton > button { | |
| background-color: #007BFF; | |
| color: white; | |
| border-radius: 8px; | |
| padding: 10px 20px; | |
| border: none; | |
| transition: all 0.3s ease; | |
| box-shadow: 0 2px 4px rgba(0,0,0,0.2); | |
| } | |
| .stButton > button:hover { | |
| background-color: #0056b3; | |
| transform: scale(1.05); | |
| } | |
| .stSidebar { | |
| background-color: #F5F5F5; | |
| padding: 20px; | |
| border-right: 2px solid #007BFF; | |
| } | |
| </style> | |
| """, unsafe_allow_html=True) | |
| st.title("RAG Q&A App with Mistral AI") | |
| st.markdown("Welcome to the BSNL RAG App! Upload a PDF file and ask questions.", unsafe_allow_html=True) | |
| if not st.session_state.authenticated: | |
| st.warning("Please authenticate using the sidebar.") | |
| return | |
| if st.session_state.vectorstore is None: | |
| st.info("Please upload and process a PDF file.") | |
| return | |
| query = st.text_input("Enter your question:") | |
| if st.button("Submit") and query: | |
| with st.spinner("Generating answer..."): | |
| try: | |
| answer = answer_question(st.session_state.vectorstore, query) | |
| st.session_state.history.append((query, answer)) | |
| st.write("**Answer:**", answer) | |
| except Exception as e: | |
| st.error(f"Error generating answer: {str(e)}") | |
| if __name__ == "__main__": | |
| main() | |