import os import faiss import numpy as np from sentence_transformers import SentenceTransformer from huggingface_hub import HfApi, hf_hub_download, login, whoami # 🔹 Hugging Face Repository Details HF_REPO_ID = "tstone87/repo" # Your repo HF_TOKEN = os.getenv("HF_TOKEN") # Retrieve token securely from environment variable if not HF_TOKEN: raise ValueError("❌ ERROR: Hugging Face token not found. Add it as a secret in the Hugging Face Space settings.") # 🔹 Authenticate with Hugging Face login(token=HF_TOKEN) # 🔹 File Paths EMBEDDINGS_FILE = "policy_embeddings.npy" INDEX_FILE = "faiss_index.bin" TEXT_FILE = "combined_text_documents.txt" # 🔹 Load policy text from file if os.path.exists(TEXT_FILE): with open(TEXT_FILE, "r", encoding="utf-8") as f: POLICY_TEXT = f.read() print("✅ Loaded policy text from combined_text_documents.txt") else: print("❌ ERROR: combined_text_documents.txt not found! Ensure it's uploaded.") POLICY_TEXT = "" # 🔹 Sentence Embedding Model (Optimized for Speed) model = SentenceTransformer("all-MiniLM-L6-v2") # 🔹 Split policy text into chunks for FAISS indexing chunk_size = 500 chunks = [POLICY_TEXT[i:i+chunk_size] for i in range(0, len(POLICY_TEXT), chunk_size)] if POLICY_TEXT else [] # 🔹 Function to Upload FAISS Files to Hugging Face Hub def upload_faiss_to_hf(): api = HfApi() if os.path.exists(EMBEDDINGS_FILE): print("📤 Uploading FAISS embeddings to Hugging Face...") api.upload_file( path_or_fileobj=EMBEDDINGS_FILE, path_in_repo=EMBEDDINGS_FILE, repo_id=HF_REPO_ID, repo_type="dataset", token=HF_TOKEN, ) if os.path.exists(INDEX_FILE): print("📤 Uploading FAISS index to Hugging Face...") api.upload_file( path_or_fileobj=INDEX_FILE, path_in_repo=INDEX_FILE, repo_id=HF_REPO_ID, repo_type="dataset", token=HF_TOKEN, ) print("✅ FAISS files successfully uploaded to Hugging Face.") # 🔹 Function to Download FAISS Files from Hugging Face Hub if Missing def download_faiss_from_hf(): if not os.path.exists(EMBEDDINGS_FILE): print("📥 Downloading FAISS embeddings from Hugging Face...") hf_hub_download(repo_id=HF_REPO_ID, filename=EMBEDDINGS_FILE, local_dir=".", token=HF_TOKEN) if not os.path.exists(INDEX_FILE): print("📥 Downloading FAISS index from Hugging Face...") hf_hub_download(repo_id=HF_REPO_ID, filename=INDEX_FILE, local_dir=".", token=HF_TOKEN) print("✅ FAISS files downloaded from Hugging Face.") # 🔹 Check if FAISS Files Exist, Otherwise Download if os.path.exists(EMBEDDINGS_FILE) and os.path.exists(INDEX_FILE): print("✅ FAISS files found locally. Loading from disk...") embeddings = np.load(EMBEDDINGS_FILE) index = faiss.read_index(INDEX_FILE) else: print("🚀 FAISS files not found! Downloading from Hugging Face...") download_faiss_from_hf() if os.path.exists(EMBEDDINGS_FILE) and os.path.exists(INDEX_FILE): embeddings = np.load(EMBEDDINGS_FILE) index = faiss.read_index(INDEX_FILE) else: print("🚀 No FAISS files found. Recomputing...") if chunks: embeddings = np.array([model.encode(chunk) for chunk in chunks]) # Save embeddings for future use np.save(EMBEDDINGS_FILE, embeddings) # Use FAISS optimized index for faster lookup d = embeddings.shape[1] nlist = 10 # Number of clusters index = faiss.IndexIVFFlat(faiss.IndexFlatL2(d), d, nlist) index.train(embeddings) index.add(embeddings) index.nprobe = 2 # Speed optimization # Save FAISS index faiss.write_index(index, INDEX_FILE) upload_faiss_to_hf() # Upload FAISS files to Hugging Face print("✅ FAISS index created and saved.") else: print("❌ ERROR: No text to index. Check combined_text_documents.txt.") index = None # 🔹 Function to Search FAISS def search_policy(query, top_k=3): if index is None: return "Error: FAISS index is not available." query_embedding = model.encode(query).reshape(1, -1) distances, indices = index.search(query_embedding, top_k) return "\n\n".join([chunks[i] for i in indices[0] if i < len(chunks)]) # 🔹 Hugging Face LLM Client from huggingface_hub import InferenceClient client = InferenceClient("HuggingFaceH4/zephyr-7b-beta") # 🔹 Function to Handle Chat Responses def respond(message, history, system_message, max_tokens, temperature, top_p): messages = [{"role": "system", "content": system_message}] for val in history: if val[0]: messages.append({"role": "user", "content": val[0]}) if val[1]: messages.append({"role": "assistant", "content": val[1]}) # 🔹 Retrieve relevant policy info from FAISS policy_context = search_policy(message) if policy_context: # 🔹 Display retrieved context in chat messages.append({"role": "assistant", "content": f"📄 **Relevant Policy Context:**\n\n{policy_context}"}) # 🔹 Force the LLM to use the retrieved policy text user_query_with_context = f""" The following is the most relevant policy information retrieved from the official Colorado public assistance policies: {policy_context} Based on this information, answer the following question: {message} """ messages.append({"role": "user", "content": user_query_with_context}) else: # If no relevant policy info is found, use the original message messages.append({"role": "user", "content": message}) response = "" for message in client.chat_completion( messages, max_tokens=max_tokens, stream=True, temperature=temperature, top_p=top_p, ): token = message.choices[0].delta.content response += token yield response # 🔹 Gradio Chat Interface import gradio as gr demo = gr.ChatInterface( respond, additional_inputs=[ gr.Textbox( value="You are a knowledgeable and professional chatbot designed to assist Colorado case workers in determining eligibility for public assistance programs.", label="System message" ), gr.Slider(minimum=1, maximum=2048, value=512, step=1, label="Max new tokens"), gr.Slider(minimum=0.1, maximum=4.0, value=0.7, step=0.1, label="Temperature"), gr.Slider(minimum=0.1, maximum=1.0, value=0.95, step=0.05, label="Top-p (nucleus sampling)"), ], ) if __name__ == "__main__": demo.launch()