File size: 3,860 Bytes
201376b
bcc15bd
bd69eee
 
 
5485d7c
bcc15bd
5485d7c
 
d93e32b
bcc15bd
 
 
 
 
 
 
 
201376b
d93e32b
201376b
d93e32b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
201376b
d93e32b
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
'''import os
import logging
from langchain_huggingface import HuggingFaceEmbeddings
from langchain_community.vectorstores import FAISS

from config import ConfigConstants

def embed_documents(documents, embedding_path="embeddings.faiss"):
    embedding_model = HuggingFaceEmbeddings(model_name=ConfigConstants.EMBEDDING_MODEL_NAME)

    if os.path.exists(embedding_path):
        logging.info("Loading embeddings from local file")
        vector_store = FAISS.load_local(embedding_path, embedding_model, allow_dangerous_deserialization=True)
    else:
        logging.info("Generating and saving embeddings")
        vector_store = FAISS.from_texts([doc['text'] for doc in documents], embedding_model)
        vector_store.save_local(embedding_path)
    
    return vector_store'''

import os
import logging
import hashlib
from typing import List, Dict
from concurrent.futures import ThreadPoolExecutor
from tqdm import tqdm
from langchain_community.vectorstores import FAISS
from langchain_huggingface import HuggingFaceEmbeddings
from config import ConfigConstants  


def embed_documents(documents: List[Dict], embedding_path: str = "embeddings.faiss", metadata_path: str = "metadata.json") -> FAISS:
    logging.info(f"Total documents got :{len(documents)}")
    embedding_model = HuggingFaceEmbeddings(model_name=ConfigConstants.EMBEDDING_MODEL_NAME)
    
    if os.path.exists(embedding_path) and os.path.exists(metadata_path):
        logging.info("Loading embeddings and metadata from local files")
        vector_store = FAISS.load_local(embedding_path, embedding_model, allow_dangerous_deserialization=True)
        existing_metadata = _load_metadata(metadata_path)
    else:
        # Initialize FAISS with at least one document to avoid the IndexError
        if documents:
            vector_store = FAISS.from_texts([documents[0]['text']], embedding_model)
        else:
            # If no documents are provided, initialize an empty FAISS index with a dummy document
            vector_store = FAISS.from_texts(["dummy document"], embedding_model)
        existing_metadata = {}
    
    # Identify new or modified documents
    new_documents = []
    for doc in documents:
        doc_hash = _generate_document_hash(doc['text'])
        if doc_hash not in existing_metadata:
            new_documents.append(doc)
            existing_metadata[doc_hash] = True  # Mark as processed
    
    if new_documents:
        logging.info(f"Generating embeddings for {len(new_documents)} new documents")
        with ThreadPoolExecutor() as executor:
            futures = []
            for doc in new_documents:
                futures.append(executor.submit(_embed_single_document, doc, embedding_model))
            
            for future in tqdm(futures, desc="Generating embeddings", unit="doc"):
                vector_store.add_texts([future.result()])
        
        # Save updated embeddings and metadata
        vector_store.save_local(embedding_path)
        _save_metadata(metadata_path, existing_metadata)
    else:
        logging.info("No new documents to process. Using existing embeddings.")
    
    return vector_store

def _embed_single_document(doc: Dict, embedding_model: HuggingFaceEmbeddings) -> str:
    return doc['text']

def _generate_document_hash(text: str) -> str:
    """Generate a unique hash for a document based on its text."""
    return hashlib.sha256(text.encode()).hexdigest()

def _load_metadata(metadata_path: str) -> Dict[str, bool]:
    """Load metadata from a file."""
    import json
    if os.path.exists(metadata_path):
        with open(metadata_path, "r") as f:
            return json.load(f)
    return {}

def _save_metadata(metadata_path: str, metadata: Dict[str, bool]):
    """Save metadata to a file."""
    import json
    with open(metadata_path, "w") as f:
        json.dump(metadata, f)