Spaces:
Sleeping
Sleeping
import gradio as gr | |
import faiss | |
import numpy as np | |
import os | |
from sentence_transformers import SentenceTransformer | |
from huggingface_hub import InferenceClient | |
# Load embedding model | |
model = SentenceTransformer('all-MiniLM-L6-v2') | |
# File paths | |
TEXT_FILE = "combined_text_documents.txt" | |
EMBEDDINGS_FILE = "policy_embeddings.npy" | |
INDEX_FILE = "faiss_index.bin" | |
# Load policy text from the file | |
if os.path.exists(TEXT_FILE): | |
with open(TEXT_FILE, "r", encoding="utf-8") as f: | |
POLICY_TEXT = f.read() | |
print("β Loaded policy text from combined_text_documents.txt") | |
else: | |
print("β ERROR: combined_text_documents.txt not found! Ensure it's uploaded.") | |
POLICY_TEXT = "" | |
# Split text into chunks | |
chunk_size = 500 | |
chunks = [POLICY_TEXT[i:i+chunk_size] for i in range(0, len(POLICY_TEXT), chunk_size)] if POLICY_TEXT else [] | |
# Check if precomputed embeddings and FAISS index exist | |
if os.path.exists(EMBEDDINGS_FILE) and os.path.exists(INDEX_FILE): | |
print("β Loading precomputed FAISS index and embeddings...") | |
embeddings = np.load(EMBEDDINGS_FILE) | |
index = faiss.read_index(INDEX_FILE) | |
else: | |
print("π Generating embeddings and FAISS index (First-time setup)...") | |
if chunks: | |
embeddings = np.array([model.encode(chunk) for chunk in chunks]) | |
np.save(EMBEDDINGS_FILE, embeddings) # Save for future runs | |
# Use FAISS optimized index for faster lookup | |
d = embeddings.shape[1] | |
nlist = 10 # Number of clusters | |
index = faiss.IndexIVFFlat(faiss.IndexFlatL2(d), d, nlist) | |
index.train(embeddings) | |
index.add(embeddings) | |
index.nprobe = 2 # Speed optimization | |
faiss.write_index(index, INDEX_FILE) # Save FAISS index | |
print("β FAISS index created and saved.") | |
else: | |
print("β ERROR: No text to index. Check combined_text_documents.txt.") | |
index = None | |
# πΉ Function to search FAISS | |
def search_policy(query, top_k=3): | |
if index is None: | |
return "Error: FAISS index is not available." | |
query_embedding = model.encode(query).reshape(1, -1) | |
distances, indices = index.search(query_embedding, top_k) | |
return "\n\n".join([chunks[i] for i in indices[0] if i < len(chunks)]) | |
# πΉ Hugging Face LLM Client | |
client = InferenceClient("HuggingFaceH4/zephyr-7b-beta") | |
def respond( | |
message, | |
history: list[tuple[str, str]], | |
system_message, | |
max_tokens, | |
temperature, | |
top_p, | |
): | |
messages = [{"role": "system", "content": system_message}] | |
for val in history: | |
if val[0]: | |
messages.append({"role": "user", "content": val[0]}) | |
if val[1]: | |
messages.append({"role": "assistant", "content": val[1]}) | |
# πΉ Search policy text efficiently | |
policy_context = search_policy(message) | |
if policy_context: | |
messages.append({"role": "system", "content": f"Relevant Policy Info:\n{policy_context}"}) | |
messages.append({"role": "user", "content": message}) | |
response = "" | |
for message in client.chat_completion( | |
messages, | |
max_tokens=max_tokens, | |
stream=True, | |
temperature=temperature, | |
top_p=top_p, | |
): | |
token = message.choices[0].delta.content | |
response += token | |
yield response | |
# πΉ Gradio Chat Interface | |
demo = gr.ChatInterface( | |
respond, | |
additional_inputs=[ | |
gr.Textbox( | |
value="You are a knowledgeable and professional chatbot designed to assist Colorado case workers in determining eligibility for public assistance programs. Your primary role is to provide accurate, up-to-date, and policy-compliant information on Medicaid, SNAP, TANF, CHP+, and other state and federal assistance programs. Responses should be clear, concise, and structured based on eligibility criteria, income limits, deductions, federal poverty level guidelines, and program-specific requirements.", | |
label="System message" | |
), | |
gr.Slider(minimum=1, maximum=2048, value=512, step=1, label="Max new tokens"), | |
gr.Slider(minimum=0.1, maximum=4.0, value=0.7, step=0.1, label="Temperature"), | |
gr.Slider(minimum=0.1, maximum=1.0, value=0.95, step=0.05, label="Top-p (nucleus sampling)"), | |
], | |
) | |
if __name__ == "__main__": | |
demo.launch() | |