Spaces:
Running
Running
File size: 4,892 Bytes
93b0124 cbe5279 75f0c72 cfec7bd d5b8fa3 cfec7bd cbe5279 75f0c72 cbe5279 75f0c72 cbe5279 75f0c72 cbe5279 75f0c72 cbe5279 75f0c72 cbe5279 75f0c72 cfec7bd 75f0c72 cfec7bd f073b54 75f0c72 f073b54 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 |
import os
import urllib.parse
import fitz # PyMuPDF for PDF reading
import faiss
import numpy as np
import gradio as gr
from sentence_transformers import SentenceTransformer
from huggingface_hub import hf_hub_download, InferenceClient
# πΉ Hugging Face Space Repository Details
HF_REPO_ID = "tstone87/ccr-colorado"
# πΉ Load Embedding Model (Optimized for QA Retrieval)
model = SentenceTransformer("multi-qa-mpnet-base-dot-v1")
# πΉ Define PDF Directory and Chunk Size
PDF_DIR = "./pdfs" # Local folder for downloaded PDFs
CHUNK_SIZE = 2500 # Larger chunks for better context
# πΉ Ensure Directory Exists
os.makedirs(PDF_DIR, exist_ok=True)
# πΉ Function to Download PDFs from Hugging Face Space (Handles Spaces)
def download_pdfs():
pdf_files = [
"SNAP 10 CCR 2506-1 .pdf",
"Med 10 CCR 2505-10 8.100.pdf",
]
for pdf_file in pdf_files:
pdf_path = os.path.join(PDF_DIR, pdf_file)
if not os.path.exists(pdf_path): # Download if not already present
print(f"π₯ Downloading {pdf_file}...")
# URL encode spaces correctly
encoded_filename = urllib.parse.quote(pdf_file)
try:
hf_hub_download(repo_id=HF_REPO_ID, filename=encoded_filename, local_dir=PDF_DIR, force_download=True)
print(f"β
Successfully downloaded {pdf_file}")
except Exception as e:
print(f"β Error downloading {pdf_file}: {e}")
print("β
All PDFs downloaded.")
# πΉ Function to Extract Text from PDFs
def extract_text_from_pdfs():
all_text = ""
for pdf_file in os.listdir(PDF_DIR):
if pdf_file.endswith(".pdf"):
pdf_path = os.path.join(PDF_DIR, pdf_file)
doc = fitz.open(pdf_path)
for page in doc:
all_text += page.get_text("text") + "\n"
return all_text
# πΉ Initialize FAISS and Embed Text
def initialize_faiss():
download_pdfs()
text_data = extract_text_from_pdfs()
if not text_data:
raise ValueError("β No text extracted from PDFs!")
# Split text into chunks
chunks = [text_data[i:i+CHUNK_SIZE] for i in range(0, len(text_data), CHUNK_SIZE)]
# Generate embeddings
embeddings = np.array([model.encode(chunk) for chunk in chunks])
# Create FAISS index
index = faiss.IndexFlatL2(embeddings.shape[1])
index.add(embeddings)
print("β
FAISS index initialized.")
return index, chunks
# πΉ Initialize FAISS on Startup
index, chunks = initialize_faiss()
# πΉ Function to Search FAISS
def search_policy(query, top_k=3):
query_embedding = model.encode(query).reshape(1, -1)
distances, indices = index.search(query_embedding, top_k)
return "\n\n".join([chunks[i] for i in indices[0] if i < len(chunks)])
# πΉ Hugging Face LLM Client
client = InferenceClient("HuggingFaceH4/zephyr-7b-beta")
# πΉ Function to Handle Chat Responses
def respond(message, history, system_message, max_tokens, temperature, top_p):
messages = [{"role": "system", "content": system_message}]
for val in history:
if val[0]:
messages.append({"role": "user", "content": val[0]})
if val[1]:
messages.append({"role": "assistant", "content": val[1]})
# πΉ Retrieve relevant policy info from FAISS
policy_context = search_policy(message)
if policy_context:
messages.append({"role": "assistant", "content": f"π **Relevant Policy Context:**\n\n{policy_context}"})
user_query_with_context = f"""
The following is the most relevant policy information retrieved from the official Colorado public assistance policies:
{policy_context}
Based on this information, answer the following question:
{message}
"""
messages.append({"role": "user", "content": user_query_with_context})
else:
messages.append({"role": "user", "content": message})
response = ""
for message in client.chat_completion(
messages,
max_tokens=max_tokens,
stream=True,
temperature=temperature,
top_p=top_p,
):
token = message.choices[0].delta.content
response += token
yield response
# πΉ Gradio Chat Interface
demo = gr.ChatInterface(
respond,
additional_inputs=[
gr.Textbox(
value="You are a knowledgeable chatbot designed to assist Colorado case workers with Medicaid, SNAP, TANF, CHP+, and other programs.",
label="System message"
),
gr.Slider(minimum=1, maximum=2048, value=512, step=1, label="Max new tokens"),
gr.Slider(minimum=0.1, maximum=4.0, value=0.7, step=0.1, label="Temperature"),
gr.Slider(minimum=0.1, maximum=1.0, value=0.95, step=0.05, label="Top-p (nucleus sampling)"),
],
)
if __name__ == "__main__":
demo.launch()
|