Spaces:
Sleeping
Sleeping
import os | |
import faiss | |
import numpy as np | |
from sentence_transformers import SentenceTransformer | |
from huggingface_hub import HfApi, hf_hub_download, login, whoami | |
# πΉ Hugging Face Repository Details | |
HF_REPO_ID = "tstone87/repo" # Your repo | |
HF_TOKEN = os.getenv("HF_TOKEN") # Retrieve token securely from environment variable | |
if not HF_TOKEN: | |
raise ValueError("β ERROR: Hugging Face token not found. Add it as a secret in the Hugging Face Space settings.") | |
# πΉ Authenticate with Hugging Face | |
login(token=HF_TOKEN) | |
# πΉ File Paths | |
EMBEDDINGS_FILE = "policy_embeddings.npy" | |
INDEX_FILE = "faiss_index.bin" | |
TEXT_FILE = "combined_text_documents.txt" | |
# πΉ Load policy text from file | |
if os.path.exists(TEXT_FILE): | |
with open(TEXT_FILE, "r", encoding="utf-8") as f: | |
POLICY_TEXT = f.read() | |
print("β Loaded policy text from combined_text_documents.txt") | |
else: | |
print("β ERROR: combined_text_documents.txt not found! Ensure it's uploaded.") | |
POLICY_TEXT = "" | |
# πΉ Sentence Embedding Model (Optimized for Speed) | |
model = SentenceTransformer("all-MiniLM-L6-v2") | |
# πΉ Split policy text into chunks for FAISS indexing | |
chunk_size = 500 | |
chunks = [POLICY_TEXT[i:i+chunk_size] for i in range(0, len(POLICY_TEXT), chunk_size)] if POLICY_TEXT else [] | |
# πΉ Function to Upload FAISS Files to Hugging Face Hub | |
def upload_faiss_to_hf(): | |
api = HfApi() | |
if os.path.exists(EMBEDDINGS_FILE): | |
print("π€ Uploading FAISS embeddings to Hugging Face...") | |
api.upload_file( | |
path_or_fileobj=EMBEDDINGS_FILE, | |
path_in_repo=EMBEDDINGS_FILE, | |
repo_id=HF_REPO_ID, | |
repo_type="dataset", | |
token=HF_TOKEN, | |
) | |
if os.path.exists(INDEX_FILE): | |
print("π€ Uploading FAISS index to Hugging Face...") | |
api.upload_file( | |
path_or_fileobj=INDEX_FILE, | |
path_in_repo=INDEX_FILE, | |
repo_id=HF_REPO_ID, | |
repo_type="dataset", | |
token=HF_TOKEN, | |
) | |
print("β FAISS files successfully uploaded to Hugging Face.") | |
# πΉ Function to Download FAISS Files from Hugging Face Hub if Missing | |
def download_faiss_from_hf(): | |
if not os.path.exists(EMBEDDINGS_FILE): | |
print("π₯ Downloading FAISS embeddings from Hugging Face...") | |
hf_hub_download(repo_id=HF_REPO_ID, filename=EMBEDDINGS_FILE, local_dir=".", token=HF_TOKEN) | |
if not os.path.exists(INDEX_FILE): | |
print("π₯ Downloading FAISS index from Hugging Face...") | |
hf_hub_download(repo_id=HF_REPO_ID, filename=INDEX_FILE, local_dir=".", token=HF_TOKEN) | |
print("β FAISS files downloaded from Hugging Face.") | |
# πΉ Check if FAISS Files Exist, Otherwise Download | |
if os.path.exists(EMBEDDINGS_FILE) and os.path.exists(INDEX_FILE): | |
print("β FAISS files found locally. Loading from disk...") | |
embeddings = np.load(EMBEDDINGS_FILE) | |
index = faiss.read_index(INDEX_FILE) | |
else: | |
print("π FAISS files not found! Downloading from Hugging Face...") | |
download_faiss_from_hf() | |
if os.path.exists(EMBEDDINGS_FILE) and os.path.exists(INDEX_FILE): | |
embeddings = np.load(EMBEDDINGS_FILE) | |
index = faiss.read_index(INDEX_FILE) | |
else: | |
print("π No FAISS files found. Recomputing...") | |
if chunks: | |
embeddings = np.array([model.encode(chunk) for chunk in chunks]) | |
# Save embeddings for future use | |
np.save(EMBEDDINGS_FILE, embeddings) | |
# Use FAISS optimized index for faster lookup | |
d = embeddings.shape[1] | |
nlist = 10 # Number of clusters | |
index = faiss.IndexIVFFlat(faiss.IndexFlatL2(d), d, nlist) | |
index.train(embeddings) | |
index.add(embeddings) | |
index.nprobe = 2 # Speed optimization | |
# Save FAISS index | |
faiss.write_index(index, INDEX_FILE) | |
upload_faiss_to_hf() # Upload FAISS files to Hugging Face | |
print("β FAISS index created and saved.") | |
else: | |
print("β ERROR: No text to index. Check combined_text_documents.txt.") | |
index = None | |
# πΉ Function to Search FAISS | |
def search_policy(query, top_k=3): | |
if index is None: | |
return "Error: FAISS index is not available." | |
query_embedding = model.encode(query).reshape(1, -1) | |
distances, indices = index.search(query_embedding, top_k) | |
return "\n\n".join([chunks[i] for i in indices[0] if i < len(chunks)]) | |
# πΉ Hugging Face LLM Client | |
from huggingface_hub import InferenceClient | |
client = InferenceClient("HuggingFaceH4/zephyr-7b-beta") | |
# πΉ Function to Handle Chat Responses | |
def respond(message, history, system_message, max_tokens, temperature, top_p): | |
messages = [{"role": "system", "content": system_message}] | |
for val in history: | |
if val[0]: | |
messages.append({"role": "user", "content": val[0]}) | |
if val[1]: | |
messages.append({"role": "assistant", "content": val[1]}) | |
# πΉ Retrieve relevant policy info from FAISS | |
policy_context = search_policy(message) | |
if policy_context: | |
# πΉ Display retrieved context in chat | |
messages.append({"role": "assistant", "content": f"π **Relevant Policy Context:**\n\n{policy_context}"}) | |
# πΉ Force the LLM to use the retrieved policy text | |
user_query_with_context = f""" | |
The following is the most relevant policy information retrieved from the official Colorado public assistance policies: | |
{policy_context} | |
Based on this information, answer the following question: | |
{message} | |
""" | |
messages.append({"role": "user", "content": user_query_with_context}) | |
else: | |
# If no relevant policy info is found, use the original message | |
messages.append({"role": "user", "content": message}) | |
response = "" | |
for message in client.chat_completion( | |
messages, | |
max_tokens=max_tokens, | |
stream=True, | |
temperature=temperature, | |
top_p=top_p, | |
): | |
token = message.choices[0].delta.content | |
response += token | |
yield response | |
# πΉ Gradio Chat Interface | |
import gradio as gr | |
demo = gr.ChatInterface( | |
respond, | |
additional_inputs=[ | |
gr.Textbox( | |
value="You are a knowledgeable and professional chatbot designed to assist Colorado case workers in determining eligibility for public assistance programs.", | |
label="System message" | |
), | |
gr.Slider(minimum=1, maximum=2048, value=512, step=1, label="Max new tokens"), | |
gr.Slider(minimum=0.1, maximum=4.0, value=0.7, step=0.1, label="Temperature"), | |
gr.Slider(minimum=0.1, maximum=1.0, value=0.95, step=0.05, label="Top-p (nucleus sampling)"), | |
], | |
) | |
if __name__ == "__main__": | |
demo.launch() | |