realtime-rag-pipeline / retriever /
gourisankar85's picture
Update retriever/
67adc64 verified
history blame
4.07 kB
'''import os
import logging
from langchain_huggingface import HuggingFaceEmbeddings
from langchain_community.vectorstores import FAISS
from config import ConfigConstants
def embed_documents(documents, embedding_path="embeddings.faiss"):
embedding_model = HuggingFaceEmbeddings(model_name=ConfigConstants.EMBEDDING_MODEL_NAME)
if os.path.exists(embedding_path):"Loading embeddings from local file")
vector_store = FAISS.load_local(embedding_path, embedding_model, allow_dangerous_deserialization=True)
else:"Generating and saving embeddings")
vector_store = FAISS.from_texts([doc['text'] for doc in documents], embedding_model)
return vector_store'''
import os
import logging
import hashlib
from typing import List, Dict
from concurrent.futures import ThreadPoolExecutor
from tqdm import tqdm
from langchain_community.vectorstores import FAISS
from langchain_huggingface import HuggingFaceEmbeddings
from config import ConfigConstants
def embed_documents(documents: List[Dict], embedding_path: str = ConfigConstants.DATA_SET_PATH + "embeddings/embeddings.faiss", metadata_path: str = ConfigConstants.DATA_SET_PATH + "embeddings/metadata.json") -> FAISS:"Total documents got :{len(documents)}")
os.makedirs(os.path.dirname(embedding_path), exist_ok=True)
os.makedirs(os.path.dirname(metadata_path), exist_ok=True)
embedding_model = HuggingFaceEmbeddings(model_name=ConfigConstants.EMBEDDING_MODEL_NAME)
if os.path.exists(embedding_path) and os.path.exists(metadata_path):"Loading embeddings and metadata from local files")
vector_store = FAISS.load_local(embedding_path, embedding_model, allow_dangerous_deserialization=True)
existing_metadata = _load_metadata(metadata_path)
# Initialize FAISS with at least one document to avoid the IndexError
if documents:
vector_store = FAISS.from_texts([documents[0]['text']], embedding_model)
# If no documents are provided, initialize an empty FAISS index with a dummy document
vector_store = FAISS.from_texts(["dummy document"], embedding_model)
existing_metadata = {}
# Identify new or modified documents
new_documents = []
for doc in documents:
doc_hash = _generate_document_hash(doc['text'])
if doc_hash not in existing_metadata:
existing_metadata[doc_hash] = True # Mark as processed
if new_documents:"Generating embeddings for {len(new_documents)} new documents")
with ThreadPoolExecutor() as executor:
futures = []
for doc in new_documents:
futures.append(executor.submit(_embed_single_document, doc, embedding_model))
for future in tqdm(futures, desc="Generating embeddings", unit="doc"):
# Save updated embeddings and metadata
_save_metadata(metadata_path, existing_metadata)
else:"No new documents to process. Using existing embeddings.")
return vector_store
def _embed_single_document(doc: Dict, embedding_model: HuggingFaceEmbeddings) -> str:
return doc['text']
def _generate_document_hash(text: str) -> str:
"""Generate a unique hash for a document based on its text."""
return hashlib.sha256(text.encode()).hexdigest()
def _load_metadata(metadata_path: str) -> Dict[str, bool]:
"""Load metadata from a file."""
import json
if os.path.exists(metadata_path):
with open(metadata_path, "r") as f:
return json.load(f)
return {}
def _save_metadata(metadata_path: str, metadata: Dict[str, bool]):
"""Save metadata to a file."""
import json
with open(metadata_path, "w") as f:
json.dump(metadata, f)