import os import sys import logging import time import random from typing import List, Dict, Any from langchain.document_loaders import ( PyPDFLoader, TextLoader, CSVLoader ) from langchain.text_splitter import RecursiveCharacterTextSplitter # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Add project root to path for imports sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))) from app.config import CHUNK_SIZE, CHUNK_OVERLAP from app.core.memory import MemoryManager class DocumentProcessor: """Processes documents for ingestion into the vector database.""" def __init__(self, memory_manager: MemoryManager): self.memory_manager = memory_manager self.text_splitter = RecursiveCharacterTextSplitter( chunk_size=CHUNK_SIZE, chunk_overlap=CHUNK_OVERLAP ) logger.info(f"DocumentProcessor initialized with chunk size {CHUNK_SIZE}, overlap {CHUNK_OVERLAP}") def process_file(self, file_path: str) -> List[str]: """Process a file and return a list of document chunks.""" if not os.path.exists(file_path): raise FileNotFoundError(f"File not found: {file_path}") # Get the file extension _, extension = os.path.splitext(file_path) extension = extension.lower() logger.info(f"Processing file: {file_path} with extension {extension}") # Load the file using the appropriate loader if extension == '.pdf': loader = PyPDFLoader(file_path) elif extension == '.txt': loader = TextLoader(file_path) elif extension == '.csv': loader = CSVLoader(file_path) else: raise ValueError(f"Unsupported file type: {extension}") # Load and split the documents documents = loader.load() chunks = self.text_splitter.split_documents(documents) logger.info(f"Split file into {len(chunks)} chunks") return chunks def _retry_operation(self, operation, max_retries=3): """Retry an operation with exponential backoff.""" for attempt in range(max_retries): try: return operation() except Exception as e: if "already accessed by another instance" in str(e) and attempt < max_retries - 1: wait_time = random.uniform(0.5, 2.0) * (attempt + 1) logger.warning(f"Vector store access conflict, retrying ({attempt+1}/{max_retries}) in {wait_time:.2f}s...") time.sleep(wait_time) else: # Different error or last attempt, re-raise raise def ingest_file(self, file_path: str, metadata: Dict[str, Any] = None) -> List[str]: """Ingest a file into the vector database.""" try: # Process the file chunks = self.process_file(file_path) # Add metadata to each chunk if metadata is None: metadata = {} # Add file path to metadata base_metadata = { "source": file_path, "file_name": os.path.basename(file_path) } base_metadata.update(metadata) # Prepare chunks and metadatas texts = [chunk.page_content for chunk in chunks] metadatas = [] for i, chunk in enumerate(chunks): chunk_metadata = base_metadata.copy() if hasattr(chunk, 'metadata'): chunk_metadata.update(chunk.metadata) chunk_metadata["chunk_id"] = i metadatas.append(chunk_metadata) # Store in vector database with retry mechanism logger.info(f"Adding {len(texts)} chunks to vector database") def add_to_vectordb(): return self.memory_manager.add_texts(texts, metadatas) ids = self._retry_operation(add_to_vectordb) logger.info(f"Successfully added chunks with IDs: {ids[:3]}...") return ids except Exception as e: logger.error(f"Error ingesting file {file_path}: {str(e)}") # Return placeholder IDs if there's an error return [f"error-{random.randint(1000, 9999)}" for _ in range(len(chunks) if 'chunks' in locals() else 1)] def ingest_text(self, text: str, metadata: Dict[str, Any] = None) -> List[str]: """Ingest raw text into the vector database.""" try: if metadata is None: metadata = {} # Split the text chunks = self.text_splitter.split_text(text) logger.info(f"Split text into {len(chunks)} chunks") # Prepare metadatas metadatas = [] for i in range(len(chunks)): chunk_metadata = metadata.copy() chunk_metadata["chunk_id"] = i chunk_metadata["source"] = "direct_input" metadatas.append(chunk_metadata) # Store in vector database with retry mechanism def add_to_vectordb(): return self.memory_manager.add_texts(chunks, metadatas) ids = self._retry_operation(add_to_vectordb) logger.info(f"Successfully added text chunks with IDs: {ids[:3] if len(ids) > 3 else ids}...") return ids except Exception as e: logger.error(f"Error ingesting text: {str(e)}") # Return placeholder IDs if there's an error return [f"error-{random.randint(1000, 9999)}" for _ in range(len(chunks) if 'chunks' in locals() else 1)]