File size: 5,955 Bytes
a33458e
 
48a1a2b
 
 
a33458e
 
 
 
 
 
 
 
48a1a2b
 
 
 
a33458e
 
 
 
 
 
 
 
 
 
 
 
 
 
48a1a2b
a33458e
 
 
 
 
 
 
 
 
 
48a1a2b
 
a33458e
 
 
 
 
 
 
 
 
 
 
 
 
 
48a1a2b
a33458e
 
48a1a2b
 
 
 
 
 
 
 
 
 
 
 
 
 
a33458e
 
48a1a2b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a33458e
 
 
48a1a2b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
import os
import sys
import logging
import time
import random
from typing import List, Dict, Any
from langchain.document_loaders import (
    PyPDFLoader,
    TextLoader,
    CSVLoader
)
from langchain.text_splitter import RecursiveCharacterTextSplitter

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Add project root to path for imports
sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
from app.config import CHUNK_SIZE, CHUNK_OVERLAP
from app.core.memory import MemoryManager

class DocumentProcessor:
    """Processes documents for ingestion into the vector database."""
    
    def __init__(self, memory_manager: MemoryManager):
        self.memory_manager = memory_manager
        self.text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=CHUNK_SIZE,
            chunk_overlap=CHUNK_OVERLAP
        )
        logger.info(f"DocumentProcessor initialized with chunk size {CHUNK_SIZE}, overlap {CHUNK_OVERLAP}")
    
    def process_file(self, file_path: str) -> List[str]:
        """Process a file and return a list of document chunks."""
        if not os.path.exists(file_path):
            raise FileNotFoundError(f"File not found: {file_path}")
        
        # Get the file extension
        _, extension = os.path.splitext(file_path)
        extension = extension.lower()
        
        logger.info(f"Processing file: {file_path} with extension {extension}")
        
        # Load the file using the appropriate loader
        if extension == '.pdf':
            loader = PyPDFLoader(file_path)
        elif extension == '.txt':
            loader = TextLoader(file_path)
        elif extension == '.csv':
            loader = CSVLoader(file_path)
        else:
            raise ValueError(f"Unsupported file type: {extension}")
        
        # Load and split the documents
        documents = loader.load()
        chunks = self.text_splitter.split_documents(documents)
        
        logger.info(f"Split file into {len(chunks)} chunks")
        return chunks
    
    def _retry_operation(self, operation, max_retries=3):
        """Retry an operation with exponential backoff."""
        for attempt in range(max_retries):
            try:
                return operation()
            except Exception as e:
                if "already accessed by another instance" in str(e) and attempt < max_retries - 1:
                    wait_time = random.uniform(0.5, 2.0) * (attempt + 1)
                    logger.warning(f"Vector store access conflict, retrying ({attempt+1}/{max_retries}) in {wait_time:.2f}s...")
                    time.sleep(wait_time)
                else:
                    # Different error or last attempt, re-raise
                    raise
    
    def ingest_file(self, file_path: str, metadata: Dict[str, Any] = None) -> List[str]:
        """Ingest a file into the vector database."""
        try:
            # Process the file
            chunks = self.process_file(file_path)
            
            # Add metadata to each chunk
            if metadata is None:
                metadata = {}
            
            # Add file path to metadata
            base_metadata = {
                "source": file_path,
                "file_name": os.path.basename(file_path)
            }
            base_metadata.update(metadata)
            
            # Prepare chunks and metadatas
            texts = [chunk.page_content for chunk in chunks]
            metadatas = []
            
            for i, chunk in enumerate(chunks):
                chunk_metadata = base_metadata.copy()
                if hasattr(chunk, 'metadata'):
                    chunk_metadata.update(chunk.metadata)
                chunk_metadata["chunk_id"] = i
                metadatas.append(chunk_metadata)
            
            # Store in vector database with retry mechanism
            logger.info(f"Adding {len(texts)} chunks to vector database")
            
            def add_to_vectordb():
                return self.memory_manager.add_texts(texts, metadatas)
                
            ids = self._retry_operation(add_to_vectordb)
            logger.info(f"Successfully added chunks with IDs: {ids[:3]}...")
            
            return ids
        except Exception as e:
            logger.error(f"Error ingesting file {file_path}: {str(e)}")
            # Return placeholder IDs if there's an error
            return [f"error-{random.randint(1000, 9999)}" for _ in range(len(chunks) if 'chunks' in locals() else 1)]
    
    def ingest_text(self, text: str, metadata: Dict[str, Any] = None) -> List[str]:
        """Ingest raw text into the vector database."""
        try:
            if metadata is None:
                metadata = {}
            
            # Split the text
            chunks = self.text_splitter.split_text(text)
            logger.info(f"Split text into {len(chunks)} chunks")
            
            # Prepare metadatas
            metadatas = []
            for i in range(len(chunks)):
                chunk_metadata = metadata.copy()
                chunk_metadata["chunk_id"] = i
                chunk_metadata["source"] = "direct_input"
                metadatas.append(chunk_metadata)
            
            # Store in vector database with retry mechanism
            def add_to_vectordb():
                return self.memory_manager.add_texts(chunks, metadatas)
                
            ids = self._retry_operation(add_to_vectordb)
            logger.info(f"Successfully added text chunks with IDs: {ids[:3] if len(ids) > 3 else ids}...")
            
            return ids
        except Exception as e:
            logger.error(f"Error ingesting text: {str(e)}")
            # Return placeholder IDs if there's an error
            return [f"error-{random.randint(1000, 9999)}" for _ in range(len(chunks) if 'chunks' in locals() else 1)]