Spaces:
Sleeping
Sleeping
File size: 5,276 Bytes
4e46bd2 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 |
import os
import requests
import fitz # PyMuPDF for PDF reading
import faiss
import numpy as np
import gradio as gr
from sentence_transformers import SentenceTransformer
from huggingface_hub import InferenceClient
# πΉ Define Directories and Chunk Size
APP_DIR = "./" # Root app folder
PDF_DIR = "./pdfs" # Where PDFs will be stored
CHUNK_SIZE = 2500 # Larger chunks for better context
# πΉ Ensure PDF Directory Exists
os.makedirs(PDF_DIR, exist_ok=True)
# πΉ Function to Auto-Detect PDFs in App Folder & Generate Download Links
def get_pdf_links():
base_url = "https://huggingface.co/spaces/tstone87/ccr-colorado/resolve/main/"
pdf_links = {}
for file in os.listdir(APP_DIR):
if file.endswith(".pdf"):
encoded_file = requests.utils.quote(file) # Encode spaces correctly
pdf_links[file] = f"{base_url}{encoded_file}?download=true"
return pdf_links
# πΉ Get List of PDFs & Their Download Links
PDF_FILES = get_pdf_links()
# πΉ Function to Download PDFs
def download_pdfs():
for filename, url in PDF_FILES.items():
pdf_path = os.path.join(PDF_DIR, filename)
if not os.path.exists(pdf_path):
print(f"π₯ Downloading {filename}...")
try:
response = requests.get(url, stream=True)
response.raise_for_status() # Ensure the request was successful
with open(pdf_path, "wb") as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
print(f"β
Successfully downloaded {filename}")
except Exception as e:
print(f"β Error downloading {filename}: {e}")
print("β
All PDFs downloaded.")
# πΉ Function to Extract Text from PDFs
def extract_text_from_pdfs():
all_text = ""
for pdf_file in os.listdir(PDF_DIR):
if pdf_file.endswith(".pdf"):
pdf_path = os.path.join(PDF_DIR, pdf_file)
doc = fitz.open(pdf_path)
for page in doc:
all_text += page.get_text("text") + "\n"
return all_text
# πΉ Initialize FAISS and Embed Text
def initialize_faiss():
download_pdfs()
text_data = extract_text_from_pdfs()
if not text_data:
raise ValueError("β No text extracted from PDFs!")
# Split text into chunks
chunks = [text_data[i:i+CHUNK_SIZE] for i in range(0, len(text_data), CHUNK_SIZE)]
# Generate embeddings
model = SentenceTransformer("multi-qa-mpnet-base-dot-v1")
embeddings = np.array([model.encode(chunk) for chunk in chunks])
# Create FAISS index
index = faiss.IndexFlatL2(embeddings.shape[1])
index.add(embeddings)
print("β
FAISS index initialized.")
return index, chunks
# πΉ Initialize FAISS on Startup
index, chunks = initialize_faiss()
# πΉ Function to Search FAISS
def search_policy(query, top_k=3):
query_embedding = SentenceTransformer("multi-qa-mpnet-base-dot-v1").encode(query).reshape(1, -1)
distances, indices = index.search(query_embedding, top_k)
return "\n\n".join([chunks[i] for i in indices[0] if i < len(chunks)])
# πΉ Hugging Face LLM Client
client = InferenceClient("HuggingFaceH4/zephyr-7b-beta")
# πΉ Function to Handle Chat Responses
def respond(message, history, system_message, max_tokens, temperature, top_p):
messages = [{"role": "system", "content": system_message}]
for val in history:
if val[0]:
messages.append({"role": "user", "content": val[0]})
if val[1]:
messages.append({"role": "assistant", "content": val[1]})
# πΉ Retrieve relevant policy info from FAISS
policy_context = search_policy(message)
if policy_context:
messages.append({"role": "assistant", "content": f"π **Relevant Policy Context:**\n\n{policy_context}"})
user_query_with_context = f"""
The following is the most relevant policy information retrieved from the official Colorado public assistance policies:
{policy_context}
Based on this information, answer the following question:
{message}
"""
messages.append({"role": "user", "content": user_query_with_context})
else:
messages.append({"role": "user", "content": message})
response = ""
for message in client.chat_completion(
messages,
max_tokens=max_tokens,
stream=True,
temperature=temperature,
top_p=top_p,
):
token = message.choices[0].delta.content
response += token
yield response
# πΉ Gradio Chat Interface
demo = gr.ChatInterface(
respond,
additional_inputs=[
gr.Textbox(
value="You are a knowledgeable chatbot designed to assist Colorado case workers with Medicaid, SNAP, TANF, CHP+, and other programs.",
label="System message"
),
gr.Slider(minimum=1, maximum=2048, value=512, step=1, label="Max new tokens"),
gr.Slider(minimum=0.1, maximum=4.0, value=0.7, step=0.1, label="Temperature"),
gr.Slider(minimum=0.1, maximum=1.0, value=0.95, step=0.05, label="Top-p (nucleus sampling)"),
],
)
if __name__ == "__main__":
demo.launch()
|