Spaces:
Sleeping
Sleeping
import os | |
import shutil | |
import faiss | |
import numpy as np | |
import gradio as gr | |
from sentence_transformers import SentenceTransformer | |
from huggingface_hub import HfApi, hf_hub_download, login | |
# πΉ Hugging Face Repository Details | |
HF_REPO_ID = "tstone87/repo" # Your dataset repo | |
HF_TOKEN = os.getenv("HF_TOKEN") # Retrieve token securely | |
if not HF_TOKEN: | |
raise ValueError("β ERROR: Hugging Face token not found. Add it as a secret in the Hugging Face Space settings.") | |
# πΉ Authenticate with Hugging Face | |
login(token=HF_TOKEN) | |
# πΉ File Paths | |
EMBEDDINGS_FILE = "policy_embeddings.npy" | |
INDEX_FILE = "faiss_index.bin" | |
TEXT_FILE = "combined_text_documents.txt" | |
# πΉ Load policy text from file | |
if os.path.exists(TEXT_FILE): | |
with open(TEXT_FILE, "r", encoding="utf-8") as f: | |
POLICY_TEXT = f.read() | |
print("β Loaded policy text from combined_text_documents.txt") | |
else: | |
print("β ERROR: combined_text_documents.txt not found! Ensure it's uploaded.") | |
POLICY_TEXT = "" | |
# πΉ Sentence Embedding Model (Optimized for Speed) | |
model = SentenceTransformer("all-MiniLM-L6-v2") | |
# πΉ Split policy text into chunks for FAISS indexing | |
chunk_size = 500 | |
chunks = [POLICY_TEXT[i:i+chunk_size] for i in range(0, len(POLICY_TEXT), chunk_size)] if POLICY_TEXT else [] | |
# πΉ Function to Upload FAISS Files to Hugging Face Hub | |
def upload_faiss_to_hf(): | |
api = HfApi() | |
if os.path.exists(EMBEDDINGS_FILE): | |
print("π€ Uploading FAISS embeddings to Hugging Face...") | |
api.upload_file( | |
path_or_fileobj=EMBEDDINGS_FILE, | |
path_in_repo=EMBEDDINGS_FILE, | |
repo_id=HF_REPO_ID, | |
repo_type="dataset", | |
token=HF_TOKEN, | |
) | |
if os.path.exists(INDEX_FILE): | |
print("π€ Uploading FAISS index to Hugging Face...") | |
api.upload_file( | |
path_or_fileobj=INDEX_FILE, | |
path_in_repo=INDEX_FILE, | |
repo_id=HF_REPO_ID, | |
repo_type="dataset", | |
token=HF_TOKEN, | |
) | |
print("β FAISS files successfully uploaded to Hugging Face.") | |
# πΉ Function to Download FAISS Files from Hugging Face Hub if Missing | |
def download_faiss_from_hf(): | |
if not os.path.exists(EMBEDDINGS_FILE): | |
print("π₯ Downloading FAISS embeddings from Hugging Face...") | |
hf_hub_download(repo_id=HF_REPO_ID, filename=EMBEDDINGS_FILE, local_dir=".", token=HF_TOKEN) | |
if not os.path.exists(INDEX_FILE): | |
print("π₯ Downloading FAISS index from Hugging Face...") | |
hf_hub_download(repo_id=HF_REPO_ID, filename=INDEX_FILE, local_dir=".", token=HF_TOKEN) | |
print("β FAISS files downloaded from Hugging Face.") | |
# πΉ Check if FAISS Files Exist, Otherwise Download or Generate | |
if os.path.exists(EMBEDDINGS_FILE) and os.path.exists(INDEX_FILE): | |
print("β FAISS files found locally. Loading from disk...") | |
embeddings = np.load(EMBEDDINGS_FILE) | |
index = faiss.read_index(INDEX_FILE) | |
else: | |
print("π FAISS files not found! Downloading from Hugging Face...") | |
download_faiss_from_hf() | |
if os.path.exists(EMBEDDINGS_FILE) and os.path.exists(INDEX_FILE): | |
embeddings = np.load(EMBEDDINGS_FILE) | |
index = faiss.read_index(INDEX_FILE) | |
else: | |
print("π No FAISS files found. Recomputing...") | |
if chunks: | |
embeddings = np.array([model.encode(chunk) for chunk in chunks]) | |
# Save embeddings for future use | |
np.save(EMBEDDINGS_FILE, embeddings) | |
# Use FAISS optimized index for faster lookup | |
d = embeddings.shape[1] | |
nlist = 10 # Number of clusters | |
index = faiss.IndexIVFFlat(faiss.IndexFlatL2(d), d, nlist) | |
index.train(embeddings) | |
index.add(embeddings) | |
index.nprobe = 2 # Speed optimization | |
# Save FAISS index | |
faiss.write_index(index, INDEX_FILE) | |
upload_faiss_to_hf() # Upload FAISS files to Hugging Face | |
print("β FAISS index created and saved.") | |
else: | |
print("β ERROR: No text to index. Check combined_text_documents.txt.") | |
index = None | |
# πΉ Function to Search FAISS | |
def search_policy(query, top_k=3): | |
if index is None: | |
return "Error: FAISS index is not available." | |
query_embedding = model.encode(query).reshape(1, -1) | |
distances, indices = index.search(query_embedding, top_k) | |
return "\n\n".join([chunks[i] for i in indices[0] if i < len(chunks)]) | |
# πΉ Gradio UI to Download FAISS Files | |
def prepare_faiss_files(): | |
if os.path.exists(EMBEDDINGS_FILE) and os.path.exists(INDEX_FILE): | |
shutil.copy(EMBEDDINGS_FILE, "/mnt/data/policy_embeddings.npy") | |
shutil.copy(INDEX_FILE, "/mnt/data/faiss_index.bin") | |
return "β FAISS files are ready for download. Go to the 'Files' tab in Hugging Face Space and download them." | |
else: | |
return "β FAISS files not found. Try running the chatbot first to generate them." | |
with gr.Blocks() as download_ui: | |
gr.Markdown("### π½ Download FAISS Files") | |
download_button = gr.Button("Prepare FAISS Files for Download") | |
output_text = gr.Textbox() | |
download_button.click(fn=prepare_faiss_files, outputs=output_text) | |
download_ui.launch() | |
print("β FAISS index successfully loaded.") | |