Spaces:
Running
Running
import os | |
import shutil | |
import faiss | |
import numpy as np | |
import gradio as gr | |
from sentence_transformers import SentenceTransformer | |
from huggingface_hub import HfApi, hf_hub_download, login | |
# πΉ Hugging Face Repository Details | |
HF_REPO_ID = "tstone87/repo" # Your dataset repo | |
HF_TOKEN = os.getenv("HF_TOKEN") # Secure API token | |
if not HF_TOKEN: | |
raise ValueError("β ERROR: Hugging Face token not found. Add it as a secret in the Hugging Face Space settings.") | |
# πΉ Authenticate with Hugging Face | |
login(token=HF_TOKEN) | |
# πΉ File Paths | |
EMBEDDINGS_FILE = "policy_embeddings.npy" | |
INDEX_FILE = "faiss_index.bin" | |
TEXT_FILE = "combined_text_documents.txt" | |
# πΉ Load policy text from file | |
if os.path.exists(TEXT_FILE): | |
with open(TEXT_FILE, "r", encoding="utf-8") as f: | |
POLICY_TEXT = f.read() | |
print("β Loaded policy text from combined_text_documents.txt") | |
else: | |
print("β ERROR: combined_text_documents.txt not found! Ensure it's uploaded.") | |
POLICY_TEXT = "" | |
# πΉ Sentence Embedding Model (Optimized for Speed) | |
model = SentenceTransformer("all-MiniLM-L6-v2") | |
# πΉ Split policy text into chunks for FAISS indexing | |
chunk_size = 500 | |
chunks = [POLICY_TEXT[i:i+chunk_size] for i in range(0, len(POLICY_TEXT), chunk_size)] if POLICY_TEXT else [] | |
# πΉ Function to Download FAISS Files from Hugging Face Hub if Available | |
def download_faiss_from_hf(): | |
try: | |
if not os.path.exists(EMBEDDINGS_FILE): | |
print("π₯ Downloading FAISS embeddings from Hugging Face...") | |
hf_hub_download(repo_id=HF_REPO_ID, filename=EMBEDDINGS_FILE, local_dir=".", token=HF_TOKEN) | |
if not os.path.exists(INDEX_FILE): | |
print("π₯ Downloading FAISS index from Hugging Face...") | |
hf_hub_download(repo_id=HF_REPO_ID, filename=INDEX_FILE, local_dir=".", token=HF_TOKEN) | |
print("β FAISS files downloaded from Hugging Face.") | |
return True | |
except Exception as e: | |
print(f"β οΈ FAISS files not found in Hugging Face repo. Recomputing... ({e})") | |
return False | |
# πΉ Check if FAISS Files Exist, Otherwise Download or Generate | |
if os.path.exists(EMBEDDINGS_FILE) and os.path.exists(INDEX_FILE): | |
print("β FAISS files found locally. Loading from disk...") | |
embeddings = np.load(EMBEDDINGS_FILE) | |
index = faiss.read_index(INDEX_FILE) | |
elif download_faiss_from_hf(): | |
embeddings = np.load(EMBEDDINGS_FILE) | |
index = faiss.read_index(INDEX_FILE) | |
else: | |
print("π No FAISS files found. Creating new index...") | |
if chunks: | |
embeddings = np.array([model.encode(chunk) for chunk in chunks]) | |
# Save embeddings for future use | |
np.save(EMBEDDINGS_FILE, embeddings) | |
# Use FAISS optimized index for faster lookup | |
d = embeddings.shape[1] | |
nlist = 10 # Number of clusters | |
index = faiss.IndexIVFFlat(faiss.IndexFlatL2(d), d, nlist) | |
index.train(embeddings) | |
index.add(embeddings) | |
index.nprobe = 2 # Speed optimization | |
# Save FAISS index | |
faiss.write_index(index, INDEX_FILE) | |
print("β FAISS index created and saved.") | |
else: | |
print("β ERROR: No text to index. Check combined_text_documents.txt.") | |
index = None | |
# πΉ Function to Search FAISS | |
def search_policy(query, top_k=3): | |
if index is None: | |
return "Error: FAISS index is not available." | |
query_embedding = model.encode(query).reshape(1, -1) | |
distances, indices = index.search(query_embedding, top_k) | |
return "\n\n".join([chunks[i] for i in indices[0] if i < len(chunks)]) | |
# πΉ Hugging Face LLM Client | |
from huggingface_hub import InferenceClient | |
client = InferenceClient("HuggingFaceH4/zephyr-7b-beta") | |
# πΉ Function to Handle Chat Responses | |
def respond(message, history, system_message, max_tokens, temperature, top_p): | |
messages = [{"role": "system", "content": system_message}] | |
for val in history: | |
if val[0]: | |
messages.append({"role": "user", "content": val[0]}) | |
if val[1]: | |
messages.append({"role": "assistant", "content": val[1]}) | |
# πΉ Retrieve relevant policy info from FAISS | |
policy_context = search_policy(message) | |
if policy_context: | |
# πΉ Display retrieved context in chat | |
messages.append({"role": "assistant", "content": f"π **Relevant Policy Context:**\n\n{policy_context}"}) | |
# πΉ Force the LLM to use the retrieved policy text | |
user_query_with_context = f""" | |
The following is the most relevant policy information retrieved from the official Colorado public assistance policies: | |
{policy_context} | |
Based on this information, answer the following question: | |
{message} | |
""" | |
messages.append({"role": "user", "content": user_query_with_context}) | |
else: | |
# If no relevant policy info is found, use the original message | |
messages.append({"role": "user", "content": message}) | |
response = "" | |
for message in client.chat_completion( | |
messages, | |
max_tokens=max_tokens, | |
stream=True, | |
temperature=temperature, | |
top_p=top_p, | |
): | |
token = message.choices[0].delta.content | |
response += token | |
yield response | |
# πΉ Gradio Chat Interface | |
demo = gr.ChatInterface( | |
respond, | |
additional_inputs=[ | |
gr.Textbox( | |
value="You are a knowledgeable and professional chatbot designed to assist Colorado case workers in determining eligibility for public assistance programs.", | |
label="System message" | |
), | |
gr.Slider(minimum=1, maximum=2048, value=512, step=1, label="Max new tokens"), | |
gr.Slider(minimum=0.1, maximum=4.0, value=0.7, step=0.1, label="Temperature"), | |
gr.Slider(minimum=0.1, maximum=1.0, value=0.95, step=0.05, label="Top-p (nucleus sampling)"), | |
], | |
) | |
# πΉ Function to Provide FAISS Files for Download | |
def download_faiss_files(): | |
if os.path.exists(EMBEDDINGS_FILE) and os.path.exists(INDEX_FILE): | |
shutil.copy(EMBEDDINGS_FILE, "/mnt/data/policy_embeddings.npy") | |
shutil.copy(INDEX_FILE, "/mnt/data/faiss_index.bin") | |
return "β FAISS files ready for download! Check the 'Files' tab in your Hugging Face Space." | |
else: | |
return "β FAISS files not found. Run the chatbot first to generate them." | |
# Gradio button for downloading FAISS files | |
with gr.Blocks() as file_download: | |
gr.Markdown("### π½ Download FAISS Files to Your Computer") | |
download_button = gr.Button("Prepare FAISS Files for Download") | |
output_text = gr.Textbox() | |
download_button.click(fn=download_faiss_files, outputs=output_text) | |
if __name__ == "__main__": | |
demo.launch() | |
file_download.launch() | |