File size: 5,955 Bytes
a33458e 48a1a2b a33458e 48a1a2b a33458e 48a1a2b a33458e 48a1a2b a33458e 48a1a2b a33458e 48a1a2b a33458e 48a1a2b a33458e 48a1a2b |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 |
import os
import sys
import logging
import time
import random
from typing import List, Dict, Any
from langchain.document_loaders import (
PyPDFLoader,
TextLoader,
CSVLoader
)
from langchain.text_splitter import RecursiveCharacterTextSplitter
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Add project root to path for imports
sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
from app.config import CHUNK_SIZE, CHUNK_OVERLAP
from app.core.memory import MemoryManager
class DocumentProcessor:
"""Processes documents for ingestion into the vector database."""
def __init__(self, memory_manager: MemoryManager):
self.memory_manager = memory_manager
self.text_splitter = RecursiveCharacterTextSplitter(
chunk_size=CHUNK_SIZE,
chunk_overlap=CHUNK_OVERLAP
)
logger.info(f"DocumentProcessor initialized with chunk size {CHUNK_SIZE}, overlap {CHUNK_OVERLAP}")
def process_file(self, file_path: str) -> List[str]:
"""Process a file and return a list of document chunks."""
if not os.path.exists(file_path):
raise FileNotFoundError(f"File not found: {file_path}")
# Get the file extension
_, extension = os.path.splitext(file_path)
extension = extension.lower()
logger.info(f"Processing file: {file_path} with extension {extension}")
# Load the file using the appropriate loader
if extension == '.pdf':
loader = PyPDFLoader(file_path)
elif extension == '.txt':
loader = TextLoader(file_path)
elif extension == '.csv':
loader = CSVLoader(file_path)
else:
raise ValueError(f"Unsupported file type: {extension}")
# Load and split the documents
documents = loader.load()
chunks = self.text_splitter.split_documents(documents)
logger.info(f"Split file into {len(chunks)} chunks")
return chunks
def _retry_operation(self, operation, max_retries=3):
"""Retry an operation with exponential backoff."""
for attempt in range(max_retries):
try:
return operation()
except Exception as e:
if "already accessed by another instance" in str(e) and attempt < max_retries - 1:
wait_time = random.uniform(0.5, 2.0) * (attempt + 1)
logger.warning(f"Vector store access conflict, retrying ({attempt+1}/{max_retries}) in {wait_time:.2f}s...")
time.sleep(wait_time)
else:
# Different error or last attempt, re-raise
raise
def ingest_file(self, file_path: str, metadata: Dict[str, Any] = None) -> List[str]:
"""Ingest a file into the vector database."""
try:
# Process the file
chunks = self.process_file(file_path)
# Add metadata to each chunk
if metadata is None:
metadata = {}
# Add file path to metadata
base_metadata = {
"source": file_path,
"file_name": os.path.basename(file_path)
}
base_metadata.update(metadata)
# Prepare chunks and metadatas
texts = [chunk.page_content for chunk in chunks]
metadatas = []
for i, chunk in enumerate(chunks):
chunk_metadata = base_metadata.copy()
if hasattr(chunk, 'metadata'):
chunk_metadata.update(chunk.metadata)
chunk_metadata["chunk_id"] = i
metadatas.append(chunk_metadata)
# Store in vector database with retry mechanism
logger.info(f"Adding {len(texts)} chunks to vector database")
def add_to_vectordb():
return self.memory_manager.add_texts(texts, metadatas)
ids = self._retry_operation(add_to_vectordb)
logger.info(f"Successfully added chunks with IDs: {ids[:3]}...")
return ids
except Exception as e:
logger.error(f"Error ingesting file {file_path}: {str(e)}")
# Return placeholder IDs if there's an error
return [f"error-{random.randint(1000, 9999)}" for _ in range(len(chunks) if 'chunks' in locals() else 1)]
def ingest_text(self, text: str, metadata: Dict[str, Any] = None) -> List[str]:
"""Ingest raw text into the vector database."""
try:
if metadata is None:
metadata = {}
# Split the text
chunks = self.text_splitter.split_text(text)
logger.info(f"Split text into {len(chunks)} chunks")
# Prepare metadatas
metadatas = []
for i in range(len(chunks)):
chunk_metadata = metadata.copy()
chunk_metadata["chunk_id"] = i
chunk_metadata["source"] = "direct_input"
metadatas.append(chunk_metadata)
# Store in vector database with retry mechanism
def add_to_vectordb():
return self.memory_manager.add_texts(chunks, metadatas)
ids = self._retry_operation(add_to_vectordb)
logger.info(f"Successfully added text chunks with IDs: {ids[:3] if len(ids) > 3 else ids}...")
return ids
except Exception as e:
logger.error(f"Error ingesting text: {str(e)}")
# Return placeholder IDs if there's an error
return [f"error-{random.randint(1000, 9999)}" for _ in range(len(chunks) if 'chunks' in locals() else 1)] |