ccr-colorado / app.py
tstone87's picture
Update app.py
cbe5279 verified
raw
history blame
4.89 kB
import os
import urllib.parse
import fitz # PyMuPDF for PDF reading
import faiss
import numpy as np
import gradio as gr
from sentence_transformers import SentenceTransformer
from huggingface_hub import hf_hub_download, InferenceClient
# πŸ”Ή Hugging Face Space Repository Details
HF_REPO_ID = "tstone87/ccr-colorado"
# πŸ”Ή Load Embedding Model (Optimized for QA Retrieval)
model = SentenceTransformer("multi-qa-mpnet-base-dot-v1")
# πŸ”Ή Define PDF Directory and Chunk Size
PDF_DIR = "./pdfs" # Local folder for downloaded PDFs
CHUNK_SIZE = 2500 # Larger chunks for better context
# πŸ”Ή Ensure Directory Exists
os.makedirs(PDF_DIR, exist_ok=True)
# πŸ”Ή Function to Download PDFs from Hugging Face Space (Handles Spaces)
def download_pdfs():
pdf_files = [
"SNAP 10 CCR 2506-1 .pdf",
"Med 10 CCR 2505-10 8.100.pdf",
]
for pdf_file in pdf_files:
pdf_path = os.path.join(PDF_DIR, pdf_file)
if not os.path.exists(pdf_path): # Download if not already present
print(f"πŸ“₯ Downloading {pdf_file}...")
# URL encode spaces correctly
encoded_filename = urllib.parse.quote(pdf_file)
try:
hf_hub_download(repo_id=HF_REPO_ID, filename=encoded_filename, local_dir=PDF_DIR, force_download=True)
print(f"βœ… Successfully downloaded {pdf_file}")
except Exception as e:
print(f"❌ Error downloading {pdf_file}: {e}")
print("βœ… All PDFs downloaded.")
# πŸ”Ή Function to Extract Text from PDFs
def extract_text_from_pdfs():
all_text = ""
for pdf_file in os.listdir(PDF_DIR):
if pdf_file.endswith(".pdf"):
pdf_path = os.path.join(PDF_DIR, pdf_file)
doc = fitz.open(pdf_path)
for page in doc:
all_text += page.get_text("text") + "\n"
return all_text
# πŸ”Ή Initialize FAISS and Embed Text
def initialize_faiss():
download_pdfs()
text_data = extract_text_from_pdfs()
if not text_data:
raise ValueError("❌ No text extracted from PDFs!")
# Split text into chunks
chunks = [text_data[i:i+CHUNK_SIZE] for i in range(0, len(text_data), CHUNK_SIZE)]
# Generate embeddings
embeddings = np.array([model.encode(chunk) for chunk in chunks])
# Create FAISS index
index = faiss.IndexFlatL2(embeddings.shape[1])
index.add(embeddings)
print("βœ… FAISS index initialized.")
return index, chunks
# πŸ”Ή Initialize FAISS on Startup
index, chunks = initialize_faiss()
# πŸ”Ή Function to Search FAISS
def search_policy(query, top_k=3):
query_embedding = model.encode(query).reshape(1, -1)
distances, indices = index.search(query_embedding, top_k)
return "\n\n".join([chunks[i] for i in indices[0] if i < len(chunks)])
# πŸ”Ή Hugging Face LLM Client
client = InferenceClient("HuggingFaceH4/zephyr-7b-beta")
# πŸ”Ή Function to Handle Chat Responses
def respond(message, history, system_message, max_tokens, temperature, top_p):
messages = [{"role": "system", "content": system_message}]
for val in history:
if val[0]:
messages.append({"role": "user", "content": val[0]})
if val[1]:
messages.append({"role": "assistant", "content": val[1]})
# πŸ”Ή Retrieve relevant policy info from FAISS
policy_context = search_policy(message)
if policy_context:
messages.append({"role": "assistant", "content": f"πŸ“„ **Relevant Policy Context:**\n\n{policy_context}"})
user_query_with_context = f"""
The following is the most relevant policy information retrieved from the official Colorado public assistance policies:
{policy_context}
Based on this information, answer the following question:
{message}
"""
messages.append({"role": "user", "content": user_query_with_context})
else:
messages.append({"role": "user", "content": message})
response = ""
for message in client.chat_completion(
messages,
max_tokens=max_tokens,
stream=True,
temperature=temperature,
top_p=top_p,
):
token = message.choices[0].delta.content
response += token
yield response
# πŸ”Ή Gradio Chat Interface
demo = gr.ChatInterface(
respond,
additional_inputs=[
gr.Textbox(
value="You are a knowledgeable chatbot designed to assist Colorado case workers with Medicaid, SNAP, TANF, CHP+, and other programs.",
label="System message"
),
gr.Slider(minimum=1, maximum=2048, value=512, step=1, label="Max new tokens"),
gr.Slider(minimum=0.1, maximum=4.0, value=0.7, step=0.1, label="Temperature"),
gr.Slider(minimum=0.1, maximum=1.0, value=0.95, step=0.05, label="Top-p (nucleus sampling)"),
],
)
if __name__ == "__main__":
demo.launch()