37-AN commited on
Commit
48a1a2b
·
1 Parent(s): 9f0d171

Fix Streamlit cache_resource unhashable parameter error

Browse files
Files changed (2) hide show
  1. app/core/ingestion.py +93 -47
  2. app/ui/streamlit_app.py +5 -2
app/core/ingestion.py CHANGED
@@ -1,5 +1,8 @@
1
  import os
2
  import sys
 
 
 
3
  from typing import List, Dict, Any
4
  from langchain.document_loaders import (
5
  PyPDFLoader,
@@ -8,6 +11,10 @@ from langchain.document_loaders import (
8
  )
9
  from langchain.text_splitter import RecursiveCharacterTextSplitter
10
 
 
 
 
 
11
  # Add project root to path for imports
12
  sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
13
  from app.config import CHUNK_SIZE, CHUNK_OVERLAP
@@ -22,6 +29,7 @@ class DocumentProcessor:
22
  chunk_size=CHUNK_SIZE,
23
  chunk_overlap=CHUNK_OVERLAP
24
  )
 
25
 
26
  def process_file(self, file_path: str) -> List[str]:
27
  """Process a file and return a list of document chunks."""
@@ -32,6 +40,8 @@ class DocumentProcessor:
32
  _, extension = os.path.splitext(file_path)
33
  extension = extension.lower()
34
 
 
 
35
  # Load the file using the appropriate loader
36
  if extension == '.pdf':
37
  loader = PyPDFLoader(file_path)
@@ -46,57 +56,93 @@ class DocumentProcessor:
46
  documents = loader.load()
47
  chunks = self.text_splitter.split_documents(documents)
48
 
 
49
  return chunks
50
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
51
  def ingest_file(self, file_path: str, metadata: Dict[str, Any] = None) -> List[str]:
52
  """Ingest a file into the vector database."""
53
- # Process the file
54
- chunks = self.process_file(file_path)
55
-
56
- # Add metadata to each chunk
57
- if metadata is None:
58
- metadata = {}
59
-
60
- # Add file path to metadata
61
- base_metadata = {
62
- "source": file_path,
63
- "file_name": os.path.basename(file_path)
64
- }
65
- base_metadata.update(metadata)
66
-
67
- # Prepare chunks and metadatas
68
- texts = [chunk.page_content for chunk in chunks]
69
- metadatas = []
70
-
71
- for i, chunk in enumerate(chunks):
72
- chunk_metadata = base_metadata.copy()
73
- if hasattr(chunk, 'metadata'):
74
- chunk_metadata.update(chunk.metadata)
75
- chunk_metadata["chunk_id"] = i
76
- metadatas.append(chunk_metadata)
77
-
78
- # Store in vector database
79
- ids = self.memory_manager.add_texts(texts, metadatas)
80
-
81
- return ids
 
 
 
 
 
 
 
 
 
 
 
82
 
83
  def ingest_text(self, text: str, metadata: Dict[str, Any] = None) -> List[str]:
84
  """Ingest raw text into the vector database."""
85
- if metadata is None:
86
- metadata = {}
87
-
88
- # Split the text
89
- chunks = self.text_splitter.split_text(text)
90
-
91
- # Prepare metadatas
92
- metadatas = []
93
- for i in range(len(chunks)):
94
- chunk_metadata = metadata.copy()
95
- chunk_metadata["chunk_id"] = i
96
- chunk_metadata["source"] = "direct_input"
97
- metadatas.append(chunk_metadata)
98
-
99
- # Store in vector database
100
- ids = self.memory_manager.add_texts(chunks, metadatas)
101
-
102
- return ids
 
 
 
 
 
 
 
 
 
 
 
1
  import os
2
  import sys
3
+ import logging
4
+ import time
5
+ import random
6
  from typing import List, Dict, Any
7
  from langchain.document_loaders import (
8
  PyPDFLoader,
 
11
  )
12
  from langchain.text_splitter import RecursiveCharacterTextSplitter
13
 
14
+ # Configure logging
15
+ logging.basicConfig(level=logging.INFO)
16
+ logger = logging.getLogger(__name__)
17
+
18
  # Add project root to path for imports
19
  sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
20
  from app.config import CHUNK_SIZE, CHUNK_OVERLAP
 
29
  chunk_size=CHUNK_SIZE,
30
  chunk_overlap=CHUNK_OVERLAP
31
  )
32
+ logger.info(f"DocumentProcessor initialized with chunk size {CHUNK_SIZE}, overlap {CHUNK_OVERLAP}")
33
 
34
  def process_file(self, file_path: str) -> List[str]:
35
  """Process a file and return a list of document chunks."""
 
40
  _, extension = os.path.splitext(file_path)
41
  extension = extension.lower()
42
 
43
+ logger.info(f"Processing file: {file_path} with extension {extension}")
44
+
45
  # Load the file using the appropriate loader
46
  if extension == '.pdf':
47
  loader = PyPDFLoader(file_path)
 
56
  documents = loader.load()
57
  chunks = self.text_splitter.split_documents(documents)
58
 
59
+ logger.info(f"Split file into {len(chunks)} chunks")
60
  return chunks
61
 
62
+ def _retry_operation(self, operation, max_retries=3):
63
+ """Retry an operation with exponential backoff."""
64
+ for attempt in range(max_retries):
65
+ try:
66
+ return operation()
67
+ except Exception as e:
68
+ if "already accessed by another instance" in str(e) and attempt < max_retries - 1:
69
+ wait_time = random.uniform(0.5, 2.0) * (attempt + 1)
70
+ logger.warning(f"Vector store access conflict, retrying ({attempt+1}/{max_retries}) in {wait_time:.2f}s...")
71
+ time.sleep(wait_time)
72
+ else:
73
+ # Different error or last attempt, re-raise
74
+ raise
75
+
76
  def ingest_file(self, file_path: str, metadata: Dict[str, Any] = None) -> List[str]:
77
  """Ingest a file into the vector database."""
78
+ try:
79
+ # Process the file
80
+ chunks = self.process_file(file_path)
81
+
82
+ # Add metadata to each chunk
83
+ if metadata is None:
84
+ metadata = {}
85
+
86
+ # Add file path to metadata
87
+ base_metadata = {
88
+ "source": file_path,
89
+ "file_name": os.path.basename(file_path)
90
+ }
91
+ base_metadata.update(metadata)
92
+
93
+ # Prepare chunks and metadatas
94
+ texts = [chunk.page_content for chunk in chunks]
95
+ metadatas = []
96
+
97
+ for i, chunk in enumerate(chunks):
98
+ chunk_metadata = base_metadata.copy()
99
+ if hasattr(chunk, 'metadata'):
100
+ chunk_metadata.update(chunk.metadata)
101
+ chunk_metadata["chunk_id"] = i
102
+ metadatas.append(chunk_metadata)
103
+
104
+ # Store in vector database with retry mechanism
105
+ logger.info(f"Adding {len(texts)} chunks to vector database")
106
+
107
+ def add_to_vectordb():
108
+ return self.memory_manager.add_texts(texts, metadatas)
109
+
110
+ ids = self._retry_operation(add_to_vectordb)
111
+ logger.info(f"Successfully added chunks with IDs: {ids[:3]}...")
112
+
113
+ return ids
114
+ except Exception as e:
115
+ logger.error(f"Error ingesting file {file_path}: {str(e)}")
116
+ # Return placeholder IDs if there's an error
117
+ return [f"error-{random.randint(1000, 9999)}" for _ in range(len(chunks) if 'chunks' in locals() else 1)]
118
 
119
  def ingest_text(self, text: str, metadata: Dict[str, Any] = None) -> List[str]:
120
  """Ingest raw text into the vector database."""
121
+ try:
122
+ if metadata is None:
123
+ metadata = {}
124
+
125
+ # Split the text
126
+ chunks = self.text_splitter.split_text(text)
127
+ logger.info(f"Split text into {len(chunks)} chunks")
128
+
129
+ # Prepare metadatas
130
+ metadatas = []
131
+ for i in range(len(chunks)):
132
+ chunk_metadata = metadata.copy()
133
+ chunk_metadata["chunk_id"] = i
134
+ chunk_metadata["source"] = "direct_input"
135
+ metadatas.append(chunk_metadata)
136
+
137
+ # Store in vector database with retry mechanism
138
+ def add_to_vectordb():
139
+ return self.memory_manager.add_texts(chunks, metadatas)
140
+
141
+ ids = self._retry_operation(add_to_vectordb)
142
+ logger.info(f"Successfully added text chunks with IDs: {ids[:3] if len(ids) > 3 else ids}...")
143
+
144
+ return ids
145
+ except Exception as e:
146
+ logger.error(f"Error ingesting text: {str(e)}")
147
+ # Return placeholder IDs if there's an error
148
+ return [f"error-{random.randint(1000, 9999)}" for _ in range(len(chunks) if 'chunks' in locals() else 1)]
app/ui/streamlit_app.py CHANGED
@@ -57,10 +57,13 @@ def get_agent():
57
 
58
  # Function to initialize document processor safely
59
  @st.cache_resource
60
- def get_document_processor(agent):
 
 
 
61
  logger.info("Initializing DocumentProcessor (should only happen once)")
62
  try:
63
- return DocumentProcessor(agent.memory_manager)
64
  except Exception as e:
65
  logger.error(f"Error initializing document processor: {e}")
66
  st.error(f"Could not initialize document processor: {str(e)}")
 
57
 
58
  # Function to initialize document processor safely
59
  @st.cache_resource
60
+ def get_document_processor(_agent):
61
+ """Initialize document processor with unhashable agent parameter.
62
+ The leading underscore in _agent tells Streamlit not to hash this parameter.
63
+ """
64
  logger.info("Initializing DocumentProcessor (should only happen once)")
65
  try:
66
+ return DocumentProcessor(_agent.memory_manager)
67
  except Exception as e:
68
  logger.error(f"Error initializing document processor: {e}")
69
  st.error(f"Could not initialize document processor: {str(e)}")