ccr-colorado / app.py
tstone87's picture
Update app.py
93b0124 verified
raw
history blame
6.82 kB
import os
import faiss
import numpy as np
from sentence_transformers import SentenceTransformer
from huggingface_hub import HfApi, hf_hub_download, login, whoami
# πŸ”Ή Hugging Face Repository Details
HF_REPO_ID = "tstone87/repo" # Your repo
HF_TOKEN = os.getenv("HF_TOKEN") # Retrieve token securely from environment variable
if not HF_TOKEN:
raise ValueError("❌ ERROR: Hugging Face token not found. Add it as a secret in the Hugging Face Space settings.")
# πŸ”Ή Authenticate with Hugging Face
login(token=HF_TOKEN)
# πŸ”Ή File Paths
EMBEDDINGS_FILE = "policy_embeddings.npy"
INDEX_FILE = "faiss_index.bin"
TEXT_FILE = "combined_text_documents.txt"
# πŸ”Ή Load policy text from file
if os.path.exists(TEXT_FILE):
with open(TEXT_FILE, "r", encoding="utf-8") as f:
POLICY_TEXT = f.read()
print("βœ… Loaded policy text from combined_text_documents.txt")
else:
print("❌ ERROR: combined_text_documents.txt not found! Ensure it's uploaded.")
POLICY_TEXT = ""
# πŸ”Ή Sentence Embedding Model (Optimized for Speed)
model = SentenceTransformer("all-MiniLM-L6-v2")
# πŸ”Ή Split policy text into chunks for FAISS indexing
chunk_size = 500
chunks = [POLICY_TEXT[i:i+chunk_size] for i in range(0, len(POLICY_TEXT), chunk_size)] if POLICY_TEXT else []
# πŸ”Ή Function to Upload FAISS Files to Hugging Face Hub
def upload_faiss_to_hf():
api = HfApi()
if os.path.exists(EMBEDDINGS_FILE):
print("πŸ“€ Uploading FAISS embeddings to Hugging Face...")
api.upload_file(
path_or_fileobj=EMBEDDINGS_FILE,
path_in_repo=EMBEDDINGS_FILE,
repo_id=HF_REPO_ID,
repo_type="dataset",
token=HF_TOKEN,
)
if os.path.exists(INDEX_FILE):
print("πŸ“€ Uploading FAISS index to Hugging Face...")
api.upload_file(
path_or_fileobj=INDEX_FILE,
path_in_repo=INDEX_FILE,
repo_id=HF_REPO_ID,
repo_type="dataset",
token=HF_TOKEN,
)
print("βœ… FAISS files successfully uploaded to Hugging Face.")
# πŸ”Ή Function to Download FAISS Files from Hugging Face Hub if Missing
def download_faiss_from_hf():
if not os.path.exists(EMBEDDINGS_FILE):
print("πŸ“₯ Downloading FAISS embeddings from Hugging Face...")
hf_hub_download(repo_id=HF_REPO_ID, filename=EMBEDDINGS_FILE, local_dir=".", token=HF_TOKEN)
if not os.path.exists(INDEX_FILE):
print("πŸ“₯ Downloading FAISS index from Hugging Face...")
hf_hub_download(repo_id=HF_REPO_ID, filename=INDEX_FILE, local_dir=".", token=HF_TOKEN)
print("βœ… FAISS files downloaded from Hugging Face.")
# πŸ”Ή Check if FAISS Files Exist, Otherwise Download
if os.path.exists(EMBEDDINGS_FILE) and os.path.exists(INDEX_FILE):
print("βœ… FAISS files found locally. Loading from disk...")
embeddings = np.load(EMBEDDINGS_FILE)
index = faiss.read_index(INDEX_FILE)
else:
print("πŸš€ FAISS files not found! Downloading from Hugging Face...")
download_faiss_from_hf()
if os.path.exists(EMBEDDINGS_FILE) and os.path.exists(INDEX_FILE):
embeddings = np.load(EMBEDDINGS_FILE)
index = faiss.read_index(INDEX_FILE)
else:
print("πŸš€ No FAISS files found. Recomputing...")
if chunks:
embeddings = np.array([model.encode(chunk) for chunk in chunks])
# Save embeddings for future use
np.save(EMBEDDINGS_FILE, embeddings)
# Use FAISS optimized index for faster lookup
d = embeddings.shape[1]
nlist = 10 # Number of clusters
index = faiss.IndexIVFFlat(faiss.IndexFlatL2(d), d, nlist)
index.train(embeddings)
index.add(embeddings)
index.nprobe = 2 # Speed optimization
# Save FAISS index
faiss.write_index(index, INDEX_FILE)
upload_faiss_to_hf() # Upload FAISS files to Hugging Face
print("βœ… FAISS index created and saved.")
else:
print("❌ ERROR: No text to index. Check combined_text_documents.txt.")
index = None
# πŸ”Ή Function to Search FAISS
def search_policy(query, top_k=3):
if index is None:
return "Error: FAISS index is not available."
query_embedding = model.encode(query).reshape(1, -1)
distances, indices = index.search(query_embedding, top_k)
return "\n\n".join([chunks[i] for i in indices[0] if i < len(chunks)])
# πŸ”Ή Hugging Face LLM Client
from huggingface_hub import InferenceClient
client = InferenceClient("HuggingFaceH4/zephyr-7b-beta")
# πŸ”Ή Function to Handle Chat Responses
def respond(message, history, system_message, max_tokens, temperature, top_p):
messages = [{"role": "system", "content": system_message}]
for val in history:
if val[0]:
messages.append({"role": "user", "content": val[0]})
if val[1]:
messages.append({"role": "assistant", "content": val[1]})
# πŸ”Ή Retrieve relevant policy info from FAISS
policy_context = search_policy(message)
if policy_context:
# πŸ”Ή Display retrieved context in chat
messages.append({"role": "assistant", "content": f"πŸ“„ **Relevant Policy Context:**\n\n{policy_context}"})
# πŸ”Ή Force the LLM to use the retrieved policy text
user_query_with_context = f"""
The following is the most relevant policy information retrieved from the official Colorado public assistance policies:
{policy_context}
Based on this information, answer the following question:
{message}
"""
messages.append({"role": "user", "content": user_query_with_context})
else:
# If no relevant policy info is found, use the original message
messages.append({"role": "user", "content": message})
response = ""
for message in client.chat_completion(
messages,
max_tokens=max_tokens,
stream=True,
temperature=temperature,
top_p=top_p,
):
token = message.choices[0].delta.content
response += token
yield response
# πŸ”Ή Gradio Chat Interface
import gradio as gr
demo = gr.ChatInterface(
respond,
additional_inputs=[
gr.Textbox(
value="You are a knowledgeable and professional chatbot designed to assist Colorado case workers in determining eligibility for public assistance programs.",
label="System message"
),
gr.Slider(minimum=1, maximum=2048, value=512, step=1, label="Max new tokens"),
gr.Slider(minimum=0.1, maximum=4.0, value=0.7, step=0.1, label="Temperature"),
gr.Slider(minimum=0.1, maximum=1.0, value=0.95, step=0.05, label="Top-p (nucleus sampling)"),
],
)
if __name__ == "__main__":
demo.launch()