import os import streamlit as st import tempfile from pymongo import MongoClient from datetime import datetime from pathlib import Path from document_chunker import DocumentChunker from urllib.parse import quote_plus # === MongoDB connection via Hugging Face secrets === user = quote_plus(os.getenv("MONGO_USER")) password = quote_plus(os.getenv("MONGO_PASS")) cluster = os.getenv("MONGO_CLUSTER") db_name = os.environ.get("MONGO_DB", "grant_docs") mongo_uri = f"mongodb+srv://{user}:{password}@{cluster}/{db_name}?retryWrites=true&w=majority&tls=true&tlsAllowInvalidCertificates=true" client = MongoClient(mongo_uri, tls=True, tlsAllowInvalidCertificates=True, serverSelectionTimeoutMS=20000) db = client[db_name] st.set_page_config(page_title="Doc Chunker", layout="wide") def gate_ui(): APP_PASSWORD=st.secrets.get("APP_PASSWORD", os.getenv("APP_PASSWORD")).strip() if "authed" not in st.session_state: st.session_state.authed = False if not APP_PASSWORD: st.session_state.authed = True return True if st.session_state.authed: return True st.title("🔒 Document Chunker Login") pwd=st.text_input("Enter password", type="password") if st.button("Login"): if pwd==APP_PASSWORD: st.session_state.authed=True st.rerun() else: st.error("Incorrect password.") return False # === Streamlit UI === def main(): if not gate_ui(): return st.title("📄 Document Chunker & Uploader") with st.sidebar: st.header("Settings") # Fetch collection names for dropdown try: existing_categories = db["final_chunks"].distinct("collection_category") or [] except Exception: existing_categories = [] existing_categories=sorted([c for c in existing_categories if c])+["Create New Category"] selected_category = st.selectbox( "Choose Category (collection_category)", existing_categories, index=existing_categories.index("Create New Category") if "Create New Category" in existing_categories else 0 ) if selected_category == "Create New Category": selected_category = st.sidebar.text_input("Enter Category Name:") if not selected_category: st.warning("⚠️ Enter a category name to proceed.") st.stop() is_grant_app = st.toggle("Is this a Grant Application?", value=False) uploaded_file = st.file_uploader("Upload a DOCX, TXT, or PDF file", type=["docx", "txt", "pdf"]) if uploaded_file: temp_path = Path(tempfile.gettempdir()) / uploaded_file.name with open(temp_path, "wb") as f: f.write(uploaded_file.getbuffer()) st.success(f"Uploaded `{uploaded_file.name}`") modified_time = datetime.now().isoformat() collection = db['final_chunks'] already = collection.find_one({ "metadata.title": uploaded_file.name, "collection_category": selected_category }) if already: st.warning(f"⚠️ `{uploaded_file.name}` already exists in category `{selected_category}`. Skipping…") else: st.write("⏳ Processing with DocumentChunker...") chunker = DocumentChunker() chunks = chunker.process_document(str(temp_path)) if chunks: for chunk in chunks: chunk['collection_category']=selected_category chunk['metadata'].update({ "title": uploaded_file.name, "uploaded_at": modified_time, "is_grant_app": is_grant_app, }) collection.insert_one(chunk) st.success(f"✅ {len(chunks)} chunks inserted into `final_chunks` (category: `{selected_category}`)") # Show a few previews for i, c in enumerate(chunks[:3]): st.subheader(f"Chunk {i+1}: {c['metadata'].get('header') or 'No Header'}") st.markdown(c['text'][:400] + "...") st.caption(f"Topics: {', '.join(c['metadata']['topics'])} | Category: {c['metadata']['category']}") st.progress(c['metadata']['confidence_score']) if len(chunks) > 3: st.info(f"... and {len(chunks)-3} more chunks processed.") else: st.warning("⚠️ No chunks were generated.") if __name__ == "__main__": main() # try: # os.remove(temp_path) # except Exception as e: # st.warning(f"⚠️ Could not delete temp file: {e}")