Spaces:
Sleeping
Sleeping
import abc | |
import itertools | |
import logging | |
import multiprocessing | |
import multiprocessing.pool | |
import os | |
import threading | |
from pathlib import Path | |
from typing import Any | |
from llama_index import ( | |
Document, | |
ServiceContext, | |
StorageContext, | |
VectorStoreIndex, | |
load_index_from_storage, | |
) | |
from llama_index.data_structs import IndexDict | |
from llama_index.indices.base import BaseIndex | |
from llama_index.ingestion import run_transformations | |
from private_gpt.components.ingest.ingest_helper import IngestionHelper | |
from private_gpt.paths import local_data_path | |
from private_gpt.settings.settings import Settings | |
logger = logging.getLogger(__name__) | |
class BaseIngestComponent(abc.ABC): | |
def __init__( | |
self, | |
storage_context: StorageContext, | |
service_context: ServiceContext, | |
*args: Any, | |
**kwargs: Any, | |
) -> None: | |
logger.debug("Initializing base ingest component type=%s", type(self).__name__) | |
self.storage_context = storage_context | |
self.service_context = service_context | |
def ingest(self, file_name: str, file_data: Path) -> list[Document]: | |
pass | |
def bulk_ingest(self, files: list[tuple[str, Path]]) -> list[Document]: | |
pass | |
def delete(self, doc_id: str) -> None: | |
pass | |
class BaseIngestComponentWithIndex(BaseIngestComponent, abc.ABC): | |
def __init__( | |
self, | |
storage_context: StorageContext, | |
service_context: ServiceContext, | |
*args: Any, | |
**kwargs: Any, | |
) -> None: | |
super().__init__(storage_context, service_context, *args, **kwargs) | |
self.show_progress = True | |
self._index_thread_lock = ( | |
threading.Lock() | |
) # Thread lock! Not Multiprocessing lock | |
self._index = self._initialize_index() | |
def _initialize_index(self) -> BaseIndex[IndexDict]: | |
"""Initialize the index from the storage context.""" | |
try: | |
# Load the index with store_nodes_override=True to be able to delete them | |
index = load_index_from_storage( | |
storage_context=self.storage_context, | |
service_context=self.service_context, | |
store_nodes_override=True, # Force store nodes in index and document stores | |
show_progress=self.show_progress, | |
) | |
except ValueError: | |
# There are no index in the storage context, creating a new one | |
logger.info("Creating a new vector store index") | |
index = VectorStoreIndex.from_documents( | |
[], | |
storage_context=self.storage_context, | |
service_context=self.service_context, | |
store_nodes_override=True, # Force store nodes in index and document stores | |
show_progress=self.show_progress, | |
) | |
index.storage_context.persist(persist_dir=local_data_path) | |
return index | |
def _save_index(self) -> None: | |
self._index.storage_context.persist(persist_dir=local_data_path) | |
def delete(self, doc_id: str) -> None: | |
with self._index_thread_lock: | |
# Delete the document from the index | |
self._index.delete_ref_doc(doc_id, delete_from_docstore=True) | |
# Save the index | |
self._save_index() | |
class SimpleIngestComponent(BaseIngestComponentWithIndex): | |
def __init__( | |
self, | |
storage_context: StorageContext, | |
service_context: ServiceContext, | |
*args: Any, | |
**kwargs: Any, | |
) -> None: | |
super().__init__(storage_context, service_context, *args, **kwargs) | |
def ingest(self, file_name: str, file_data: Path) -> list[Document]: | |
logger.info("Ingesting file_name=%s", file_name) | |
documents = IngestionHelper.transform_file_into_documents(file_name, file_data) | |
logger.info( | |
"Transformed file=%s into count=%s documents", file_name, len(documents) | |
) | |
logger.debug("Saving the documents in the index and doc store") | |
return self._save_docs(documents) | |
def bulk_ingest(self, files: list[tuple[str, Path]]) -> list[Document]: | |
saved_documents = [] | |
for file_name, file_data in files: | |
documents = IngestionHelper.transform_file_into_documents( | |
file_name, file_data | |
) | |
saved_documents.extend(self._save_docs(documents)) | |
return saved_documents | |
def _save_docs(self, documents: list[Document]) -> list[Document]: | |
logger.debug("Transforming count=%s documents into nodes", len(documents)) | |
with self._index_thread_lock: | |
for document in documents: | |
self._index.insert(document, show_progress=True) | |
logger.debug("Persisting the index and nodes") | |
# persist the index and nodes | |
self._save_index() | |
logger.debug("Persisted the index and nodes") | |
return documents | |
class BatchIngestComponent(BaseIngestComponentWithIndex): | |
"""Parallelize the file reading and parsing on multiple CPU core. | |
This also makes the embeddings to be computed in batches (on GPU or CPU). | |
""" | |
def __init__( | |
self, | |
storage_context: StorageContext, | |
service_context: ServiceContext, | |
count_workers: int, | |
*args: Any, | |
**kwargs: Any, | |
) -> None: | |
super().__init__(storage_context, service_context, *args, **kwargs) | |
# Make an efficient use of the CPU and GPU, the embedding | |
# must be in the transformations | |
assert ( | |
len(self.service_context.transformations) >= 2 | |
), "Embeddings must be in the transformations" | |
assert count_workers > 0, "count_workers must be > 0" | |
self.count_workers = count_workers | |
self._file_to_documents_work_pool = multiprocessing.Pool( | |
processes=self.count_workers | |
) | |
def ingest(self, file_name: str, file_data: Path) -> list[Document]: | |
logger.info("Ingesting file_name=%s", file_name) | |
documents = IngestionHelper.transform_file_into_documents(file_name, file_data) | |
logger.info( | |
"Transformed file=%s into count=%s documents", file_name, len(documents) | |
) | |
logger.debug("Saving the documents in the index and doc store") | |
return self._save_docs(documents) | |
def bulk_ingest(self, files: list[tuple[str, Path]]) -> list[Document]: | |
documents = list( | |
itertools.chain.from_iterable( | |
self._file_to_documents_work_pool.starmap( | |
IngestionHelper.transform_file_into_documents, files | |
) | |
) | |
) | |
logger.info( | |
"Transformed count=%s files into count=%s documents", | |
len(files), | |
len(documents), | |
) | |
return self._save_docs(documents) | |
def _save_docs(self, documents: list[Document]) -> list[Document]: | |
logger.debug("Transforming count=%s documents into nodes", len(documents)) | |
nodes = run_transformations( | |
documents, # type: ignore[arg-type] | |
self.service_context.transformations, | |
show_progress=self.show_progress, | |
) | |
# Locking the index to avoid concurrent writes | |
with self._index_thread_lock: | |
logger.info("Inserting count=%s nodes in the index", len(nodes)) | |
self._index.insert_nodes(nodes, show_progress=True) | |
for document in documents: | |
self._index.docstore.set_document_hash( | |
document.get_doc_id(), document.hash | |
) | |
logger.debug("Persisting the index and nodes") | |
# persist the index and nodes | |
self._save_index() | |
logger.debug("Persisted the index and nodes") | |
return documents | |
class ParallelizedIngestComponent(BaseIngestComponentWithIndex): | |
"""Parallelize the file ingestion (file reading, embeddings, and index insertion). | |
This use the CPU and GPU in parallel (both running at the same time), and | |
reduce the memory pressure by not loading all the files in memory at the same time. | |
""" | |
def __init__( | |
self, | |
storage_context: StorageContext, | |
service_context: ServiceContext, | |
count_workers: int, | |
*args: Any, | |
**kwargs: Any, | |
) -> None: | |
super().__init__(storage_context, service_context, *args, **kwargs) | |
# To make an efficient use of the CPU and GPU, the embeddings | |
# must be in the transformations (to be computed in batches) | |
assert ( | |
len(self.service_context.transformations) >= 2 | |
), "Embeddings must be in the transformations" | |
assert count_workers > 0, "count_workers must be > 0" | |
self.count_workers = count_workers | |
# We are doing our own multiprocessing | |
# To do not collide with the multiprocessing of huggingface, we disable it | |
os.environ["TOKENIZERS_PARALLELISM"] = "false" | |
self._ingest_work_pool = multiprocessing.pool.ThreadPool( | |
processes=self.count_workers | |
) | |
self._file_to_documents_work_pool = multiprocessing.Pool( | |
processes=self.count_workers | |
) | |
def ingest(self, file_name: str, file_data: Path) -> list[Document]: | |
logger.info("Ingesting file_name=%s", file_name) | |
# Running in a single (1) process to release the current | |
# thread, and take a dedicated CPU core for computation | |
documents = self._file_to_documents_work_pool.apply( | |
IngestionHelper.transform_file_into_documents, (file_name, file_data) | |
) | |
logger.info( | |
"Transformed file=%s into count=%s documents", file_name, len(documents) | |
) | |
logger.debug("Saving the documents in the index and doc store") | |
return self._save_docs(documents) | |
def bulk_ingest(self, files: list[tuple[str, Path]]) -> list[Document]: | |
# Lightweight threads, used for parallelize the | |
# underlying IO calls made in the ingestion | |
documents = list( | |
itertools.chain.from_iterable( | |
self._ingest_work_pool.starmap(self.ingest, files) | |
) | |
) | |
return documents | |
def _save_docs(self, documents: list[Document]) -> list[Document]: | |
logger.debug("Transforming count=%s documents into nodes", len(documents)) | |
nodes = run_transformations( | |
documents, # type: ignore[arg-type] | |
self.service_context.transformations, | |
show_progress=self.show_progress, | |
) | |
# Locking the index to avoid concurrent writes | |
with self._index_thread_lock: | |
logger.info("Inserting count=%s nodes in the index", len(nodes)) | |
self._index.insert_nodes(nodes, show_progress=True) | |
for document in documents: | |
self._index.docstore.set_document_hash( | |
document.get_doc_id(), document.hash | |
) | |
logger.debug("Persisting the index and nodes") | |
# persist the index and nodes | |
self._save_index() | |
logger.debug("Persisted the index and nodes") | |
return documents | |
def __del__(self) -> None: | |
# We need to do the appropriate cleanup of the multiprocessing pools | |
# when the object is deleted. Using root logger to avoid | |
# the logger to be deleted before the pool | |
logging.debug("Closing the ingest work pool") | |
self._ingest_work_pool.close() | |
self._ingest_work_pool.join() | |
self._ingest_work_pool.terminate() | |
logging.debug("Closing the file to documents work pool") | |
self._file_to_documents_work_pool.close() | |
self._file_to_documents_work_pool.join() | |
self._file_to_documents_work_pool.terminate() | |
def get_ingestion_component( | |
storage_context: StorageContext, | |
service_context: ServiceContext, | |
settings: Settings, | |
) -> BaseIngestComponent: | |
"""Get the ingestion component for the given configuration.""" | |
ingest_mode = settings.embedding.ingest_mode | |
if ingest_mode == "batch": | |
return BatchIngestComponent( | |
storage_context, service_context, settings.embedding.count_workers | |
) | |
elif ingest_mode == "parallel": | |
return ParallelizedIngestComponent( | |
storage_context, service_context, settings.embedding.count_workers | |
) | |
else: | |
return SimpleIngestComponent(storage_context, service_context) | |