|
import logging |
|
import tempfile |
|
from pathlib import Path |
|
from typing import BinaryIO, List |
|
|
|
from injector import inject, singleton |
|
from llama_index import ( |
|
ServiceContext, |
|
StorageContext, |
|
) |
|
from llama_index.node_parser import SentenceWindowNodeParser |
|
|
|
from private_gpt.components.embedding.embedding_component import EmbeddingComponent |
|
from private_gpt.components.ingest.ingest_component import get_ingestion_component |
|
from private_gpt.components.llm.llm_component import LLMComponent |
|
from private_gpt.components.node_store.node_store_component import NodeStoreComponent |
|
from private_gpt.components.vector_store.vector_store_component import ( |
|
VectorStoreComponent, |
|
) |
|
from private_gpt.server.ingest.model import IngestedDoc |
|
from private_gpt.settings.settings import settings |
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
@singleton |
|
class IngestService: |
|
@inject |
|
def __init__( |
|
self, |
|
llm_component: LLMComponent, |
|
vector_store_component: VectorStoreComponent, |
|
embedding_component: EmbeddingComponent, |
|
node_store_component: NodeStoreComponent, |
|
) -> None: |
|
self.llm_service = llm_component |
|
self.storage_context = StorageContext.from_defaults( |
|
vector_store=vector_store_component.vector_store, |
|
docstore=node_store_component.doc_store, |
|
index_store=node_store_component.index_store, |
|
) |
|
node_parser = SentenceWindowNodeParser.from_defaults() |
|
self.ingest_service_context = ServiceContext.from_defaults( |
|
llm=self.llm_service.llm, |
|
embed_model=embedding_component.embedding_model, |
|
node_parser=node_parser, |
|
|
|
|
|
transformations=[node_parser, embedding_component.embedding_model], |
|
) |
|
|
|
self.ingest_component = get_ingestion_component( |
|
self.storage_context, self.ingest_service_context, settings=settings() |
|
) |
|
|
|
def ingest(self, file_name: str, file_data: Path) -> list[IngestedDoc]: |
|
logger.info("Ingesting file_name=%s", file_name) |
|
documents = self.ingest_component.ingest(file_name, file_data) |
|
return [IngestedDoc.from_document(document) for document in documents] |
|
|
|
def ingest_bin_data( |
|
self, file_name: str, raw_file_data: BinaryIO |
|
) -> list[IngestedDoc]: |
|
logger.debug("Ingesting binary data with file_name=%s", file_name) |
|
file_data = raw_file_data.read() |
|
logger.debug("Got file data of size=%s to ingest", len(file_data)) |
|
|
|
|
|
|
|
with tempfile.NamedTemporaryFile(delete=False) as tmp: |
|
try: |
|
path_to_tmp = Path(tmp.name) |
|
if isinstance(file_data, bytes): |
|
path_to_tmp.write_bytes(file_data) |
|
else: |
|
path_to_tmp.write_text(str(file_data)) |
|
return self.ingest(file_name, path_to_tmp) |
|
finally: |
|
tmp.close() |
|
path_to_tmp.unlink() |
|
|
|
def bulk_ingest(self, files: list[tuple[str, Path]]) -> list[IngestedDoc]: |
|
logger.info("Ingesting file_names=%s", [f[0] for f in files]) |
|
documents = self.ingest_component.bulk_ingest(files) |
|
return [IngestedDoc.from_document(document) for document in documents] |
|
|
|
def list_ingested(self) -> list[IngestedDoc]: |
|
ingested_docs = [] |
|
try: |
|
docstore = self.storage_context.docstore |
|
ingested_docs_ids: set[str] = set() |
|
|
|
for node in docstore.docs.values(): |
|
if node.ref_doc_id is not None: |
|
ingested_docs_ids.add(node.ref_doc_id) |
|
|
|
for doc_id in ingested_docs_ids: |
|
ref_doc_info = docstore.get_ref_doc_info(ref_doc_id=doc_id) |
|
doc_metadata = None |
|
if ref_doc_info is not None and ref_doc_info.metadata is not None: |
|
doc_metadata = IngestedDoc.curate_metadata(ref_doc_info.metadata) |
|
ingested_docs.append( |
|
IngestedDoc( |
|
object="ingest.document", |
|
doc_id=doc_id, |
|
doc_metadata=doc_metadata, |
|
) |
|
) |
|
except ValueError: |
|
logger.warning("Got an exception when getting list of docs", exc_info=True) |
|
pass |
|
logger.debug("Found count=%s ingested documents", len(ingested_docs)) |
|
return ingested_docs |
|
|
|
def delete(self, doc_id: str) -> None: |
|
"""Delete an ingested document. |
|
|
|
:raises ValueError: if the document does not exist |
|
""" |
|
logger.info( |
|
"Deleting the ingested document=%s in the doc and index store", doc_id |
|
) |
|
self.ingest_component.delete(doc_id) |
|
|
|
def list_ingested_filenames(self) -> List[str]: |
|
"""Lists the filenames of ingested documents.""" |
|
ingested_documents = self.list_ingested() |
|
unique_filenames = set(doc.doc_metadata.get("file_name", "") for doc in ingested_documents) |
|
return list(unique_filenames) |
|
|