Hussam commited on
Commit
6f98944
·
1 Parent(s): 3da2136

W.I.P: fixing vector index format and search pipeline

Browse files
src/ctp_slack_bot/db/mongo_db.py CHANGED
@@ -113,26 +113,32 @@ class MongoDB(BaseModel):
113
  try:
114
  # Create the vector search index with the proper MongoDB format
115
  vector_search_index = {
116
- "mappings": {
117
- "dynamic": True,
118
- "fields": {
119
- "embedding": {
120
- "type": "knnVector",
121
- "dimensions": self.settings.VECTOR_DIMENSION,
122
- "similarity": "cosine"
123
- }
124
  }
125
- }
126
  }
127
 
128
- # Using createSearchIndex command which is the proper way to create vector search indexes
129
- await self.db.command({
130
- "createSearchIndex": collection_name,
131
- "name": f"{collection_name}_vector_index",
132
- "definition": vector_search_index
133
- })
134
-
135
- logger.info("Created vector search index for collection {}", collection_name)
 
 
 
 
 
 
 
 
136
  except Exception as e:
137
  logger.error("Failed to create vector index: {}", e)
138
  raise
 
113
  try:
114
  # Create the vector search index with the proper MongoDB format
115
  vector_search_index = {
116
+ "fields": [
117
+ {
118
+ "type": "vector",
119
+ "path": "embedding",
120
+ "numDimensions": self.settings.VECTOR_DIMENSION,
121
+ "similarity": "cosine"
 
 
122
  }
123
+ ]
124
  }
125
 
126
+ try:
127
+ # Using createSearchIndex command which is the proper way to create vector search indexes
128
+ await self.db.command({
129
+ "createSearchIndex": collection_name,
130
+ "name": f"{collection_name}_vector_index",
131
+ "definition": vector_search_index
132
+ })
133
+ logger.info("Created vector search index for collection {}", collection_name)
134
+ except Exception as e:
135
+ if "command not found" in str(e).lower():
136
+ logger.warning("Vector search not supported by this MongoDB instance. Some functionality may be limited.")
137
+ # Create a fallback standard index on embedding field
138
+ await collection.create_index("embedding")
139
+ logger.info("Created standard index on 'embedding' field as fallback")
140
+ else:
141
+ raise
142
  except Exception as e:
143
  logger.error("Failed to create vector index: {}", e)
144
  raise
src/ctp_slack_bot/services/content_ingestion_service.py CHANGED
@@ -34,16 +34,16 @@ class ContentIngestionService(BaseModel):
34
  # logger.debug("Ignored content with ID {} because it already exists in the database.", content.id)
35
  # return
36
  chunks = content.get_chunks()
37
- self.__vectorize_and_store_chunks_in_database(chunks)
38
  logger.debug("Stored {} vectorized chunk(s) in the database.", len(chunks))
39
 
40
  async def process_incoming_slack_message(self: Self, slack_message: SlackMessage) -> None:
41
  logger.debug("Content ingestion service received a Slack message: {}", slack_message.text)
42
  chunks = slack_message.get_chunks()
43
- self.__vectorize_and_store_chunks_in_database(chunks)
44
  logger.debug("Stored {} vectorized chunk(s) in the database.", len(chunks))
45
 
46
- def __vectorize_and_store_chunks_in_database(self: Self, chunks: Sequence[Chunk]) -> None:
47
- # vectorized_chunks = self.vectorization_service.vectorize(chunks) # TODO
48
- # self.vector_database_service.store(vectorized_chunks) # TODO
49
- pass
 
34
  # logger.debug("Ignored content with ID {} because it already exists in the database.", content.id)
35
  # return
36
  chunks = content.get_chunks()
37
+ await self.__vectorize_and_store_chunks_in_database(chunks)
38
  logger.debug("Stored {} vectorized chunk(s) in the database.", len(chunks))
39
 
40
  async def process_incoming_slack_message(self: Self, slack_message: SlackMessage) -> None:
41
  logger.debug("Content ingestion service received a Slack message: {}", slack_message.text)
42
  chunks = slack_message.get_chunks()
43
+ await self.__vectorize_and_store_chunks_in_database(chunks)
44
  logger.debug("Stored {} vectorized chunk(s) in the database.", len(chunks))
45
 
46
+ async def __vectorize_and_store_chunks_in_database(self: Self, chunks: Sequence[Chunk]) -> None:
47
+ vectorized_chunks = self.vectorization_service.vectorize(chunks) # TODO
48
+ await self.vector_database_service.store(vectorized_chunks) # TODO
49
+
src/ctp_slack_bot/services/vector_database_service.py CHANGED
@@ -87,16 +87,15 @@ class VectorDatabaseService(BaseModel): # TODO: this should not rely specificall
87
  # Get the vector collection
88
  vector_collection = await self.mongo_db.get_collection("vectors")
89
 
90
- # Build aggregation pipeline for vector search
91
  pipeline = [
92
  {
93
- "$search": {
94
  "index": "vectors_vector_index",
95
- "knnBeta": {
96
- "vector": list(query.query_embeddings),
97
- "path": "embedding",
98
- "k": query.k
99
- }
100
  }
101
  },
102
  {
@@ -105,7 +104,7 @@ class VectorDatabaseService(BaseModel): # TODO: this should not rely specificall
105
  "metadata": 1,
106
  "parent_id": 1,
107
  "chunk_id": 1,
108
- "score": { "$meta": "searchScore" }
109
  }
110
  }
111
  ]
@@ -115,7 +114,7 @@ class VectorDatabaseService(BaseModel): # TODO: this should not rely specificall
115
  metadata_filter = {f"metadata.{k}": v for k, v in query.filter_metadata.items()}
116
  pipeline.insert(1, {"$match": metadata_filter})
117
 
118
- # Add score threshold filter
119
  if query.score_threshold > 0:
120
  pipeline.append({
121
  "$match": {
@@ -123,8 +122,17 @@ class VectorDatabaseService(BaseModel): # TODO: this should not rely specificall
123
  }
124
  })
125
 
126
- # Execute the pipeline
127
- results = await vector_collection.aggregate(pipeline).to_list(length=query.k)
 
 
 
 
 
 
 
 
 
128
 
129
  # Convert results to Chunk objects
130
  chunks = []
 
87
  # Get the vector collection
88
  vector_collection = await self.mongo_db.get_collection("vectors")
89
 
90
+ # Build aggregation pipeline for vector search using official MongoDB format
91
  pipeline = [
92
  {
93
+ "$vectorSearch": {
94
  "index": "vectors_vector_index",
95
+ "queryVector": list(query.query_embeddings),
96
+ "path": "embedding",
97
+ "numCandidates": query.k * 10,
98
+ "limit": query.k
 
99
  }
100
  },
101
  {
 
104
  "metadata": 1,
105
  "parent_id": 1,
106
  "chunk_id": 1,
107
+ "score": { "$meta": "vectorSearchScore" }
108
  }
109
  }
110
  ]
 
114
  metadata_filter = {f"metadata.{k}": v for k, v in query.filter_metadata.items()}
115
  pipeline.insert(1, {"$match": metadata_filter})
116
 
117
+ # Add score threshold filter if needed
118
  if query.score_threshold > 0:
119
  pipeline.append({
120
  "$match": {
 
122
  }
123
  })
124
 
125
+ try:
126
+ # Execute the vector search pipeline
127
+ results = await vector_collection.aggregate(pipeline).to_list(length=query.k)
128
+ except Exception as e:
129
+ logger.warning(f"Vector search failed: {str(e)}. Falling back to basic text search.")
130
+ # Fall back to basic filtering with limit
131
+ query_filter = {}
132
+ if query.filter_metadata:
133
+ query_filter.update({f"metadata.{k}": v for k, v in query.filter_metadata.items()})
134
+
135
+ results = await vector_collection.find(query_filter).limit(query.k).to_list(length=query.k)
136
 
137
  # Convert results to Chunk objects
138
  chunks = []