Hussam commited on
Commit
7ee11d0
·
1 Parent(s): 6f98944

W.I.P: mongo vector index formatting

Browse files
src/ctp_slack_bot/db/mongo_db.py CHANGED
@@ -1,6 +1,7 @@
1
  from dependency_injector.resources import Resource
2
  from motor.motor_asyncio import AsyncIOMotorClient
3
  from pymongo.errors import ConnectionFailure, ServerSelectionTimeoutError
 
4
  from loguru import logger
5
  from pydantic import BaseModel, PrivateAttr
6
  from typing import Any, Dict, Optional, Self
@@ -27,6 +28,10 @@ class MongoDB(BaseModel):
27
  """Initialize MongoDB client with settings."""
28
  try:
29
  connection_string = self.settings.MONGODB_URI.get_secret_value()
 
 
 
 
30
 
31
  # Create client with appropriate settings
32
  self._client = AsyncIOMotorClient(
@@ -54,92 +59,108 @@ class MongoDB(BaseModel):
54
  @property
55
  def client(self: Self) -> AsyncIOMotorClient:
56
  """Get the MongoDB client instance."""
57
- if self._client is None:
58
  logger.warning("MongoDB client not initialized. Attempting to initialize.")
59
  self.connect()
60
- if self._client is None:
61
  raise ConnectionError("Failed to initialize MongoDB client")
62
  return self._client
63
 
64
  @property
65
  def db(self: Self) -> Any:
66
  """Get the MongoDB database instance."""
67
- if self._db is None:
68
  logger.warning("MongoDB database not initialized. Attempting to initialize client.")
69
  self.connect()
70
- if self._db is None:
71
  raise ConnectionError("Failed to initialize MongoDB database")
72
  return self._db
73
 
74
  async def ping(self: Self) -> bool:
75
  """Check if MongoDB connection is alive."""
76
  try:
77
- await self.client.admin.command('ping')
 
 
 
 
 
78
  return True
79
  except (ConnectionFailure, ServerSelectionTimeoutError) as e:
80
  logger.error("MongoDB connection failed: {}", e)
81
  return False
 
 
 
82
 
83
  async def get_collection(self: Self, name: str) -> Any:
84
  """
85
  Get a collection by name with validation.
86
  Creates the collection if it doesn't exist.
87
  """
 
88
  if not await self.ping():
 
89
  raise ConnectionError("MongoDB connection is not available")
90
 
91
- # Get all collection names to check if this one exists
92
- collection_names = await self.db.list_collection_names()
93
- if name not in collection_names:
94
- logger.info(f"Collection {name} does not exist. Creating it.")
95
- # Create the collection
96
- await self.db.create_collection(name)
97
 
98
- return self.db[name]
 
 
 
 
 
 
 
 
 
 
 
 
 
99
 
100
- async def create_indexes(self: Self, collection_name: str, indexes: list = None) -> None:
101
  """
102
- Create indexes on a collection.
103
- If no indexes provided and collection needs vector search capability,
104
- creates a vector search index using config settings.
 
105
  """
106
  collection = await self.get_collection(collection_name)
107
 
108
- if indexes:
109
- await collection.create_indexes(indexes)
110
- logger.info("Created custom indexes for collection {}: {}", collection_name, indexes)
111
-
112
- else: # Create vector search index using settings from config
113
- try:
114
- # Create the vector search index with the proper MongoDB format
115
- vector_search_index = {
116
  "fields": [
117
  {
118
  "type": "vector",
119
  "path": "embedding",
120
  "numDimensions": self.settings.VECTOR_DIMENSION,
121
- "similarity": "cosine"
 
122
  }
123
  ]
124
- }
125
-
126
- try:
127
- # Using createSearchIndex command which is the proper way to create vector search indexes
128
- await self.db.command({
129
- "createSearchIndex": collection_name,
130
- "name": f"{collection_name}_vector_index",
131
- "definition": vector_search_index
132
- })
133
- logger.info("Created vector search index for collection {}", collection_name)
134
- except Exception as e:
135
- if "command not found" in str(e).lower():
136
- logger.warning("Vector search not supported by this MongoDB instance. Some functionality may be limited.")
137
- # Create a fallback standard index on embedding field
138
- await collection.create_index("embedding")
139
- logger.info("Created standard index on 'embedding' field as fallback")
140
- else:
141
- raise
142
- except Exception as e:
143
  logger.error("Failed to create vector index: {}", e)
144
  raise
145
 
@@ -153,6 +174,30 @@ class MongoDB(BaseModel):
153
 
154
  class MongoDBResource(Resource):
155
  def init(self: Self, settings: Settings) -> MongoDB:
 
156
  mongo_db = MongoDB(settings=settings)
157
  mongo_db.connect()
 
 
 
 
158
  return mongo_db
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
  from dependency_injector.resources import Resource
2
  from motor.motor_asyncio import AsyncIOMotorClient
3
  from pymongo.errors import ConnectionFailure, ServerSelectionTimeoutError
4
+ from pymongo.operations import SearchIndexModel
5
  from loguru import logger
6
  from pydantic import BaseModel, PrivateAttr
7
  from typing import Any, Dict, Optional, Self
 
28
  """Initialize MongoDB client with settings."""
29
  try:
30
  connection_string = self.settings.MONGODB_URI.get_secret_value()
31
+ logger.debug("Connecting to MongoDB using URI: {}", connection_string.replace(
32
+ connection_string.split('@')[-1].split('/')[0] if '@' in connection_string else '',
33
+ '[REDACTED]'
34
+ ))
35
 
36
  # Create client with appropriate settings
37
  self._client = AsyncIOMotorClient(
 
59
  @property
60
  def client(self: Self) -> AsyncIOMotorClient:
61
  """Get the MongoDB client instance."""
62
+ if not hasattr(self, '_client') or self._client is None:
63
  logger.warning("MongoDB client not initialized. Attempting to initialize.")
64
  self.connect()
65
+ if not hasattr(self, '_client') or self._client is None:
66
  raise ConnectionError("Failed to initialize MongoDB client")
67
  return self._client
68
 
69
  @property
70
  def db(self: Self) -> Any:
71
  """Get the MongoDB database instance."""
72
+ if not hasattr(self, '_db') or self._db is None:
73
  logger.warning("MongoDB database not initialized. Attempting to initialize client.")
74
  self.connect()
75
+ if not hasattr(self, '_db') or self._db is None:
76
  raise ConnectionError("Failed to initialize MongoDB database")
77
  return self._db
78
 
79
  async def ping(self: Self) -> bool:
80
  """Check if MongoDB connection is alive."""
81
  try:
82
+ # Get client to ensure we're connected
83
+ client = self.client
84
+
85
+ # Try a simple ping command
86
+ await client.admin.command('ping')
87
+ logger.debug("MongoDB connection is active")
88
  return True
89
  except (ConnectionFailure, ServerSelectionTimeoutError) as e:
90
  logger.error("MongoDB connection failed: {}", e)
91
  return False
92
+ except Exception as e:
93
+ logger.error("Unexpected error during MongoDB ping: {}", e)
94
+ return False
95
 
96
  async def get_collection(self: Self, name: str) -> Any:
97
  """
98
  Get a collection by name with validation.
99
  Creates the collection if it doesn't exist.
100
  """
101
+ # First ensure we can connect at all
102
  if not await self.ping():
103
+ logger.error("Cannot get collection '{}' - MongoDB connection is not available", name)
104
  raise ConnectionError("MongoDB connection is not available")
105
 
106
+ try:
107
+ # Get all collection names to check if this one exists
108
+ logger.debug("Checking if collection '{}' exists", name)
109
+ collection_names = await self.db.list_collection_names()
 
 
110
 
111
+ if name not in collection_names:
112
+ logger.info("Collection '{}' does not exist. Creating it.", name)
113
+ # Create the collection
114
+ await self.db.create_collection(name)
115
+ logger.debug("Successfully created collection '{}'", name)
116
+ else:
117
+ logger.debug("Collection '{}' already exists", name)
118
+
119
+ # Get and return the collection
120
+ collection = self.db[name]
121
+ return collection
122
+ except Exception as e:
123
+ logger.error("Error accessing collection '{}': {}", name, e)
124
+ raise
125
 
126
+ async def create_indexes(self: Self, collection_name: str) -> None:
127
  """
128
+ Create a vector search index on a collection.
129
+
130
+ Args:
131
+ collection_name: Name of the collection
132
  """
133
  collection = await self.get_collection(collection_name)
134
 
135
+ try:
136
+ # Create search index model using MongoDB's recommended approach
137
+ search_index_model = SearchIndexModel(
138
+ definition={
 
 
 
 
139
  "fields": [
140
  {
141
  "type": "vector",
142
  "path": "embedding",
143
  "numDimensions": self.settings.VECTOR_DIMENSION,
144
+ "similarity": "cosine",
145
+ "quantization": "scalar"
146
  }
147
  ]
148
+ },
149
+ name=f"{collection_name}_vector_index",
150
+ type="vectorSearch"
151
+ )
152
+
153
+ # Create the search index using the motor collection
154
+ result = await collection.create_search_index(search_index_model)
155
+ logger.info("Vector search index '{}' created for collection {}", result, collection_name)
156
+
157
+ except Exception as e:
158
+ if "command not found" in str(e).lower():
159
+ logger.warning("Vector search not supported by this MongoDB instance. Some functionality may be limited.")
160
+ # Create a fallback standard index on embedding field
161
+ await collection.create_index("embedding")
162
+ logger.info("Created standard index on 'embedding' field as fallback")
163
+ else:
 
 
 
164
  logger.error("Failed to create vector index: {}", e)
165
  raise
166
 
 
174
 
175
  class MongoDBResource(Resource):
176
  def init(self: Self, settings: Settings) -> MongoDB:
177
+ logger.info("Initializing MongoDB connection for database: {}", settings.MONGODB_NAME)
178
  mongo_db = MongoDB(settings=settings)
179
  mongo_db.connect()
180
+
181
+ # Test the connection asynchronously - this will run after init returns
182
+ asyncio.create_task(self._test_connection(mongo_db))
183
+
184
  return mongo_db
185
+
186
+ async def _test_connection(self, mongo_db: MongoDB) -> None:
187
+ """Test MongoDB connection and log the result."""
188
+ try:
189
+ is_connected = await mongo_db.ping()
190
+ if is_connected:
191
+ logger.info("MongoDB connection test successful!")
192
+ else:
193
+ logger.error("MongoDB connection test failed!")
194
+ except Exception as e:
195
+ logger.error("Error testing MongoDB connection: {}", e)
196
+
197
+ async def shutdown(self: Self, mongo_db: MongoDB) -> None:
198
+ """Close MongoDB connection on shutdown."""
199
+ try:
200
+ logger.info("Closing MongoDB connection...")
201
+ await mongo_db.close()
202
+ except Exception as e:
203
+ logger.error("Error closing MongoDB connection: {}", e)
src/ctp_slack_bot/services/language_model_service.py CHANGED
@@ -34,22 +34,22 @@ class LanguageModelService(BaseModel):
34
  str: Generated answer
35
  """
36
  logger.debug("Generating response for question “{}” using {} context chunks…", question, len(context))
37
- # messages = [
38
- # {"role": "system", "content": self.settings.SYSTEM_PROMPT},
39
- # {"role": "user", "content":
40
- # f"""Student Question: {question}
41
-
42
- # Context from class materials and transcripts:
43
- # {'\n'.join(chunk.text for chunk in context)}
44
-
45
- # Please answer the Student Question based on the Context from class materials and transcripts. If the context doesn’t contain relevant information, acknowledge that and suggest asking the professor."""}
46
- # ]
47
- # response: ChatCompletion = self._open_ai_client.chat.completions.create(
48
- # model=self.settings.CHAT_MODEL,
49
- # messages=messages,
50
- # max_tokens=self.settings.MAX_TOKENS,
51
- # temperature=self.settings.TEMPERATURE
52
- # )
53
 
54
- # return response.choices[0].message.content
55
- return f"Mock response to “{question}”"
 
34
  str: Generated answer
35
  """
36
  logger.debug("Generating response for question “{}” using {} context chunks…", question, len(context))
37
+ messages = [
38
+ {"role": "system", "content": self.settings.SYSTEM_PROMPT},
39
+ {"role": "user", "content":
40
+ f"""Student Question: {question}
41
+
42
+ Context from class materials and transcripts:
43
+ {'\n'.join(chunk.text for chunk in context)}
44
+
45
+ Please answer the Student Question based on the Context from class materials and transcripts. If the context doesn’t contain relevant information, acknowledge that and suggest asking the professor."""}
46
+ ]
47
+ response: ChatCompletion = self._open_ai_client.chat.completions.create(
48
+ model=self.settings.CHAT_MODEL,
49
+ messages=messages,
50
+ max_tokens=self.settings.MAX_TOKENS,
51
+ temperature=self.settings.TEMPERATURE
52
+ )
53
 
54
+ return response.choices[0].message.content
55
+ # return f"Mock response to “{question}”"
src/ctp_slack_bot/services/vector_database_service.py CHANGED
@@ -35,9 +35,11 @@ class VectorDatabaseService(BaseModel): # TODO: this should not rely specificall
35
 
36
  try:
37
  # Get the vector collection - this will create it if it doesn't exist
 
38
  vector_collection = await self.mongo_db.get_collection("vectors")
39
 
40
  # Ensure vector search index exists
 
41
  await self.mongo_db.create_indexes("vectors")
42
 
43
  # Create documents to store, ensuring compatibility with BSON
@@ -57,11 +59,15 @@ class VectorDatabaseService(BaseModel): # TODO: this should not rely specificall
57
  documents.append(document)
58
 
59
  # Insert into collection as a batch
 
60
  result = await vector_collection.insert_many(documents)
61
- logger.info(f"Stored {len(result.inserted_ids)} vector chunks in database")
62
 
63
  except Exception as e:
64
- logger.error(f"Error storing vector embeddings: {str(e)}")
 
 
 
65
  raise
66
 
67
  async def content_exists(self: Self, key: str)-> bool: # TODO: implement this.
@@ -85,15 +91,17 @@ class VectorDatabaseService(BaseModel): # TODO: this should not rely specificall
85
  """
86
  try:
87
  # Get the vector collection
 
88
  vector_collection = await self.mongo_db.get_collection("vectors")
89
 
90
  # Build aggregation pipeline for vector search using official MongoDB format
 
91
  pipeline = [
92
  {
93
  "$vectorSearch": {
94
  "index": "vectors_vector_index",
95
- "queryVector": list(query.query_embeddings),
96
  "path": "embedding",
 
97
  "numCandidates": query.k * 10,
98
  "limit": query.k
99
  }
@@ -113,6 +121,7 @@ class VectorDatabaseService(BaseModel): # TODO: this should not rely specificall
113
  if query.filter_metadata:
114
  metadata_filter = {f"metadata.{k}": v for k, v in query.filter_metadata.items()}
115
  pipeline.insert(1, {"$match": metadata_filter})
 
116
 
117
  # Add score threshold filter if needed
118
  if query.score_threshold > 0:
@@ -121,18 +130,23 @@ class VectorDatabaseService(BaseModel): # TODO: this should not rely specificall
121
  "score": { "$gte": query.score_threshold }
122
  }
123
  })
 
124
 
125
  try:
126
  # Execute the vector search pipeline
 
127
  results = await vector_collection.aggregate(pipeline).to_list(length=query.k)
 
128
  except Exception as e:
129
- logger.warning(f"Vector search failed: {str(e)}. Falling back to basic text search.")
130
  # Fall back to basic filtering with limit
131
  query_filter = {}
132
  if query.filter_metadata:
133
  query_filter.update({f"metadata.{k}": v for k, v in query.filter_metadata.items()})
134
 
 
135
  results = await vector_collection.find(query_filter).limit(query.k).to_list(length=query.k)
 
136
 
137
  # Convert results to Chunk objects
138
  chunks = []
@@ -148,9 +162,14 @@ class VectorDatabaseService(BaseModel): # TODO: this should not rely specificall
148
  )
149
  chunks.append(chunk)
150
 
151
- logger.info(f"Found {len(chunks)} similar chunks with similarity search")
152
  return chunks
153
 
154
  except Exception as e:
155
- logger.error(f"Error in similarity search: {str(e)}")
 
 
 
 
 
156
  raise
 
35
 
36
  try:
37
  # Get the vector collection - this will create it if it doesn't exist
38
+ logger.debug("Getting vectors collection for storing {} chunks", len(chunks))
39
  vector_collection = await self.mongo_db.get_collection("vectors")
40
 
41
  # Ensure vector search index exists
42
+ logger.debug("Creating vector search index for vectors collection")
43
  await self.mongo_db.create_indexes("vectors")
44
 
45
  # Create documents to store, ensuring compatibility with BSON
 
59
  documents.append(document)
60
 
61
  # Insert into collection as a batch
62
+ logger.debug("Inserting {} documents into vectors collection", len(documents))
63
  result = await vector_collection.insert_many(documents)
64
+ logger.info("Stored {} vector chunks in database", len(result.inserted_ids))
65
 
66
  except Exception as e:
67
+ logger.error("Error storing vector embeddings: {}", str(e))
68
+ # Include more diagnostic information
69
+ logger.debug("MongoDB connection info: URI defined: {}, DB name: {}",
70
+ bool(self.settings.MONGODB_URI), self.settings.MONGODB_NAME)
71
  raise
72
 
73
  async def content_exists(self: Self, key: str)-> bool: # TODO: implement this.
 
91
  """
92
  try:
93
  # Get the vector collection
94
+ logger.debug("Getting vectors collection for similarity search")
95
  vector_collection = await self.mongo_db.get_collection("vectors")
96
 
97
  # Build aggregation pipeline for vector search using official MongoDB format
98
+ logger.debug("Building vector search pipeline with query embedding dimension: {}", len(query.query_embeddings))
99
  pipeline = [
100
  {
101
  "$vectorSearch": {
102
  "index": "vectors_vector_index",
 
103
  "path": "embedding",
104
+ "queryVector": query.query_embeddings, #list(query.query_embeddings),
105
  "numCandidates": query.k * 10,
106
  "limit": query.k
107
  }
 
121
  if query.filter_metadata:
122
  metadata_filter = {f"metadata.{k}": v for k, v in query.filter_metadata.items()}
123
  pipeline.insert(1, {"$match": metadata_filter})
124
+ logger.debug("Added metadata filters to search: {}", query.filter_metadata)
125
 
126
  # Add score threshold filter if needed
127
  if query.score_threshold > 0:
 
130
  "score": { "$gte": query.score_threshold }
131
  }
132
  })
133
+ logger.debug("Added score threshold filter: {}", query.score_threshold)
134
 
135
  try:
136
  # Execute the vector search pipeline
137
+ logger.debug("Executing vector search pipeline")
138
  results = await vector_collection.aggregate(pipeline).to_list(length=query.k)
139
+ logger.debug("Vector search returned {} results", len(results))
140
  except Exception as e:
141
+ logger.warning("Vector search failed: {}. Falling back to basic text search.", str(e))
142
  # Fall back to basic filtering with limit
143
  query_filter = {}
144
  if query.filter_metadata:
145
  query_filter.update({f"metadata.{k}": v for k, v in query.filter_metadata.items()})
146
 
147
+ logger.debug("Executing fallback basic search with filter: {}", query_filter)
148
  results = await vector_collection.find(query_filter).limit(query.k).to_list(length=query.k)
149
+ logger.debug("Fallback search returned {} results", len(results))
150
 
151
  # Convert results to Chunk objects
152
  chunks = []
 
162
  )
163
  chunks.append(chunk)
164
 
165
+ logger.info("Found {} similar chunks with similarity search", len(chunks))
166
  return chunks
167
 
168
  except Exception as e:
169
+ logger.error("Error in similarity search: {}", str(e))
170
+ # Include additional diagnostic information
171
+ logger.debug("MongoDB connection info: URI defined: {}, DB name: {}",
172
+ bool(self.settings.MONGODB_URI), self.settings.MONGODB_NAME)
173
+ logger.debug("Query details: k={}, dimension={}",
174
+ query.k, len(query.query_embeddings) if query.query_embeddings else "None")
175
  raise