Spaces:
Running
Running
File size: 4,765 Bytes
b9ae50a 23a7785 b9ae50a dc82c6a b9ae50a f8d2230 dc82c6a b9ae50a 23a7785 3246e10 23a7785 3246e10 23a7785 13def26 bb1806c b9ae50a bb1806c 9ec318a bb1806c 23a7785 13def26 b9ae50a 13def26 dc82c6a |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 |
import os
import streamlit as st
import tempfile
from pymongo import MongoClient
from datetime import datetime
from pathlib import Path
from document_chunker import DocumentChunker
from urllib.parse import quote_plus
# === MongoDB connection via Hugging Face secrets ===
user = quote_plus(os.getenv("MONGO_USER"))
password = quote_plus(os.getenv("MONGO_PASS"))
cluster = os.getenv("MONGO_CLUSTER")
db_name = os.environ.get("MONGO_DB", "grant_docs")
mongo_uri = f"mongodb+srv://{user}:{password}@{cluster}/{db_name}?retryWrites=true&w=majority&tls=true&tlsAllowInvalidCertificates=true"
client = MongoClient(mongo_uri, tls=True, tlsAllowInvalidCertificates=True, serverSelectionTimeoutMS=20000)
db = client[db_name]
st.set_page_config(page_title="Doc Chunker", layout="wide")
def gate_ui():
APP_PASSWORD=st.secrets.get("APP_PASSWORD", os.getenv("APP_PASSWORD")).strip()
if "authed" not in st.session_state:
st.session_state.authed = False
if not APP_PASSWORD:
st.session_state.authed = True
return True
if st.session_state.authed:
return True
st.title("🔒 Document Chunker Login")
pwd=st.text_input("Enter password", type="password")
if st.button("Login"):
if pwd==APP_PASSWORD:
st.session_state.authed=True
st.rerun()
else:
st.error("Incorrect password.")
return False
# === Streamlit UI ===
def main():
if not gate_ui():
return
st.title("📄 Document Chunker & Uploader")
with st.sidebar:
st.header("Settings")
# Fetch collection names for dropdown
try:
existing_categories = db["final_chunks"].distinct("collection_category") or []
except Exception:
existing_categories = []
existing_categories=sorted([c for c in existing_categories if c])+["Create New Category"]
selected_category = st.selectbox(
"Choose Category (collection_category)",
existing_categories,
index=existing_categories.index("Create New Category") if "Create New Category" in existing_categories else 0
)
if selected_category == "Create New Category":
selected_category = st.sidebar.text_input("Enter Category Name:")
if not selected_category:
st.warning("⚠️ Enter a category name to proceed.")
st.stop()
is_grant_app = st.toggle("Is this a Grant Application?", value=False)
uploaded_file = st.file_uploader("Upload a DOCX, TXT, or PDF file", type=["docx", "txt", "pdf"])
if uploaded_file:
temp_path = Path(tempfile.gettempdir()) / uploaded_file.name
with open(temp_path, "wb") as f:
f.write(uploaded_file.getbuffer())
st.success(f"Uploaded `{uploaded_file.name}`")
modified_time = datetime.now().isoformat()
collection = db['final_chunks']
already = collection.find_one({
"metadata.title": uploaded_file.name,
"collection_category": selected_category
})
if already:
st.warning(f"⚠️ `{uploaded_file.name}` already exists in category `{selected_category}`. Skipping…")
else:
st.write("⏳ Processing with DocumentChunker...")
chunker = DocumentChunker()
chunks = chunker.process_document(str(temp_path))
if chunks:
for chunk in chunks:
chunk['collection_category']=selected_category
chunk['metadata'].update({
"title": uploaded_file.name,
"uploaded_at": modified_time,
"is_grant_app": is_grant_app,
})
collection.insert_one(chunk)
st.success(f"✅ {len(chunks)} chunks inserted into `final_chunks` (category: `{selected_category}`)")
# Show a few previews
for i, c in enumerate(chunks[:3]):
st.subheader(f"Chunk {i+1}: {c['metadata'].get('header') or 'No Header'}")
st.markdown(c['text'][:400] + "...")
st.caption(f"Topics: {', '.join(c['metadata']['topics'])} | Category: {c['metadata']['category']}")
st.progress(c['metadata']['confidence_score'])
if len(chunks) > 3:
st.info(f"... and {len(chunks)-3} more chunks processed.")
else:
st.warning("⚠️ No chunks were generated.")
if __name__ == "__main__":
main()
# try:
# os.remove(temp_path)
# except Exception as e:
# st.warning(f"⚠️ Could not delete temp file: {e}")
|