LiKenun commited on
Commit
dd69fa3
·
2 Parent(s): 20f38b2 fdc3cf0

Merge branch health-check into alt

Browse files
Dockerfile CHANGED
@@ -28,5 +28,10 @@ USER appuser
28
  # Expose a volume mount for logs ― Hugging Face Spaces requires specifically /data.
29
  VOLUME /data
30
 
 
 
 
 
 
31
  # Run the application.
32
- CMD ["python", "-m", "ctp_slack_bot.app"]
 
28
  # Expose a volume mount for logs ― Hugging Face Spaces requires specifically /data.
29
  VOLUME /data
30
 
31
+ # Temporary block for the health server fix:
32
+ COPY scripts/run.sh ./scripts/
33
+ COPY temporary_health_check_server.py ./
34
+ CMD ["./scripts/run.sh"]
35
+
36
  # Run the application.
37
+ #CMD ["python", "-m", "ctp_slack_bot.app"]
README.md CHANGED
@@ -7,6 +7,7 @@ sdk: docker
7
  pinned: false
8
  license: mit
9
  short_description: Spring 2025 CTP Slack Bot RAG system
 
10
  ---
11
 
12
 
 
7
  pinned: false
8
  license: mit
9
  short_description: Spring 2025 CTP Slack Bot RAG system
10
+ app_port: 8080
11
  ---
12
 
13
 
scripts/run.sh ADDED
@@ -0,0 +1,7 @@
 
 
 
 
 
 
 
 
1
+ #!/bin/bash
2
+
3
+ parent_path=$(cd "$(dirname "${BASH_SOURCE[0]}")"; pwd -P)
4
+
5
+ cd "${parent_path}/.."
6
+
7
+ python "temporary_health_check_server.py" & python -m ctp_slack_bot.app
src/ctp_slack_bot/containers.py CHANGED
@@ -7,7 +7,7 @@ from slack_bolt.async_app import AsyncApp
7
 
8
  from ctp_slack_bot.core.config import Settings
9
  from ctp_slack_bot.db.mongo_db import MongoDBResource
10
- from ctp_slack_bot.db.repositories.mongo_db_vectorized_chunk_repository import MongoVectorizedChunkRepository
11
  from ctp_slack_bot.mime_type_handlers.base import MimeTypeHandlerMeta
12
  from ctp_slack_bot.services.answer_retrieval_service import AnswerRetrievalService
13
  from ctp_slack_bot.services.content_ingestion_service import ContentIngestionService
@@ -34,13 +34,13 @@ def __load_plugins(plugin_dir) -> None:
34
  __load_plugins("ctp_slack_bot/mime_type_handlers")
35
 
36
 
37
- class Container(DeclarativeContainer):
38
  settings = Singleton(Settings)
39
  event_brokerage_service = Singleton(EventBrokerageService)
40
  schedule_service = Resource(ScheduleServiceResource, settings=settings)
41
  mongo_db = Resource(MongoDBResource, settings=settings) # TODO: generalize to any database.
42
- vectorized_chunk_repository = Singleton(MongoVectorizedChunkRepository, mongo_db=mongo_db)
43
- vector_database_service = Singleton(VectorDatabaseService, settings=settings, mongo_db=mongo_db)
44
  embeddings_model_service = Singleton(EmbeddingsModelService, settings=settings)
45
  vectorization_service = Singleton(VectorizationService, settings=settings, embeddings_model_service=embeddings_model_service)
46
  content_ingestion_service = Singleton(ContentIngestionService, settings=settings, event_brokerage_service=event_brokerage_service, vector_database_service=vector_database_service, vectorization_service=vectorization_service)
 
7
 
8
  from ctp_slack_bot.core.config import Settings
9
  from ctp_slack_bot.db.mongo_db import MongoDBResource
10
+ from ctp_slack_bot.db.repositories.mongo_db_vectorized_chunk_repository import MongoVectorizedChunkRepositoryResource
11
  from ctp_slack_bot.mime_type_handlers.base import MimeTypeHandlerMeta
12
  from ctp_slack_bot.services.answer_retrieval_service import AnswerRetrievalService
13
  from ctp_slack_bot.services.content_ingestion_service import ContentIngestionService
 
34
  __load_plugins("ctp_slack_bot/mime_type_handlers")
35
 
36
 
37
+ class Container(DeclarativeContainer): # TODO: audit for potential async-related bugs.
38
  settings = Singleton(Settings)
39
  event_brokerage_service = Singleton(EventBrokerageService)
40
  schedule_service = Resource(ScheduleServiceResource, settings=settings)
41
  mongo_db = Resource(MongoDBResource, settings=settings) # TODO: generalize to any database.
42
+ vectorized_chunk_repository = Resource(MongoVectorizedChunkRepositoryResource, settings=settings, mongo_db=mongo_db)
43
+ vector_database_service = Singleton(VectorDatabaseService, settings=settings, vectorized_chunk_repository=vectorized_chunk_repository)
44
  embeddings_model_service = Singleton(EmbeddingsModelService, settings=settings)
45
  vectorization_service = Singleton(VectorizationService, settings=settings, embeddings_model_service=embeddings_model_service)
46
  content_ingestion_service = Singleton(ContentIngestionService, settings=settings, event_brokerage_service=event_brokerage_service, vector_database_service=vector_database_service, vectorization_service=vectorization_service)
src/ctp_slack_bot/db/mongo_db.py CHANGED
@@ -1,15 +1,14 @@
1
- from asyncio import create_task
2
  from dependency_injector.resources import AsyncResource
3
- from motor.motor_asyncio import AsyncIOMotorClient
4
  from pymongo.errors import ConnectionFailure, ServerSelectionTimeoutError
5
- from pymongo.operations import SearchIndexModel
6
  from loguru import logger
7
  from pydantic import BaseModel, PrivateAttr
8
- from typing import Any, Dict, Optional, Self
9
 
10
  from ctp_slack_bot.core.config import Settings
11
  from ctp_slack_bot.utils import sanitize_mongo_db_uri
12
 
 
13
  class MongoDB(BaseModel):
14
  """
15
  MongoDB connection manager using Motor for async operations.
@@ -19,6 +18,7 @@ class MongoDB(BaseModel):
19
  _db: PrivateAttr = PrivateAttr()
20
 
21
  class Config:
 
22
  arbitrary_types_allowed = True
23
 
24
  def __init__(self: Self, **data: Dict[str, Any]) -> None:
@@ -31,7 +31,7 @@ class MongoDB(BaseModel):
31
  connection_string = self.settings.MONGODB_URI.get_secret_value()
32
  logger.debug("Connecting to MongoDB using URI: {}", sanitize_mongo_db_uri(connection_string))
33
 
34
- # Create client with appropriate settings
35
  self._client = AsyncIOMotorClient(
36
  connection_string,
37
  serverSelectionTimeoutMS=5000,
@@ -42,7 +42,7 @@ class MongoDB(BaseModel):
42
  w="majority"
43
  )
44
 
45
- # Set database
46
  db_name = self.settings.MONGODB_NAME
47
 
48
  self._db = self._client[db_name]
@@ -54,116 +54,50 @@ class MongoDB(BaseModel):
54
  self._db = None
55
  raise
56
 
57
- @property
58
- def client(self: Self) -> AsyncIOMotorClient:
59
- """Get the MongoDB client instance."""
60
- if not hasattr(self, '_client') or self._client is None:
61
- logger.warning("MongoDB client not initialized. Attempting to initialize…")
62
- self.connect()
63
- if not hasattr(self, '_client') or self._client is None:
64
- raise ConnectionError("Failed to initialize MongoDB client.")
65
- return self._client
66
-
67
- @property
68
- def db(self: Self) -> Any:
69
- """Get the MongoDB database instance."""
70
- if not hasattr(self, '_db') or self._db is None:
71
- logger.warning("MongoDB database not initialized. Attempting to initialize client…")
72
- self.connect()
73
- if not hasattr(self, '_db') or self._db is None:
74
- raise ConnectionError("Failed to initialize MongoDB database.")
75
- return self._db
76
-
77
  async def ping(self: Self) -> bool:
78
  """Check if MongoDB connection is alive."""
79
  try:
80
- # Get client to ensure we're connected
81
- client = self.client
82
-
83
- # Try a simple ping command
84
- await client.admin.command('ping')
85
  logger.debug("MongoDB connection is active!")
86
  return True
87
  except (ConnectionFailure, ServerSelectionTimeoutError) as e:
88
  logger.error("MongoDB connection failed: {}", e)
89
- return False
90
  except Exception as e:
91
  logger.error("Unexpected error during MongoDB ping: {}", e)
92
- return False
93
 
94
- async def get_collection(self: Self, name: str) -> Any:
95
  """
96
- Get a collection by name with validation.
97
- Creates the collection if it doesn't exist.
98
  """
99
- # First ensure we can connect at all
100
  if not await self.ping():
101
  logger.error("Cannot get collection '{}' because a MongoDB connection is not available.", name)
102
  raise ConnectionError("MongoDB connection is not available.")
103
 
104
  try:
105
- # Get all collection names to check if this one exists
106
  logger.debug("Checking if collection '{}' exists…", name)
107
- collection_names = await self.db.list_collection_names()
108
 
109
  if name not in collection_names:
110
  logger.info("Collection '{}' does not exist. Creating it…", name)
111
- # Create the collection
112
- await self.db.create_collection(name)
 
113
  logger.debug("Successfully created collection: {}", name)
114
  else:
115
  logger.debug("Collection '{}' already exists!", name)
116
 
117
- # Get and return the collection
118
- collection = self.db[name]
119
  return collection
120
  except Exception as e:
121
  logger.error("Error accessing collection '{}': {}", name, e)
122
  raise
123
 
124
- async def create_indexes(self: Self, collection_name: str) -> None:
125
- """
126
- Create a vector search index on a collection.
127
-
128
- Args:
129
- collection_name: Name of the collection
130
- """
131
- collection = await self.get_collection(collection_name)
132
-
133
- try:
134
- # Create search index model using MongoDB's recommended approach
135
- search_index_model = SearchIndexModel(
136
- definition={
137
- "fields": [
138
- {
139
- "type": "vector",
140
- "path": "embedding",
141
- "numDimensions": self.settings.VECTOR_DIMENSION,
142
- "similarity": "cosine",
143
- "quantization": "scalar"
144
- }
145
- ]
146
- },
147
- name=f"{collection_name}_vector_index",
148
- type="vectorSearch"
149
- )
150
-
151
- # Create the search index using the motor collection
152
- result = await collection.create_search_index(search_index_model)
153
- logger.info("Vector search index '{}' created for collection {}.", result, collection_name)
154
-
155
- except Exception as e:
156
- if "command not found" in str(e).lower():
157
- logger.warning("Vector search not supported by this MongoDB instance. Some functionality may be limited.")
158
- # Create a fallback standard index on embedding field
159
- await collection.create_index("embedding")
160
- logger.info("Created standard index on 'embedding' field as fallback.")
161
- else:
162
- logger.error("Failed to create vector index: {}", e)
163
- raise
164
-
165
- async def close(self: Self) -> None:
166
- """Close MongoDB connection."""
167
  if self._client:
168
  self._client.close()
169
  logger.info("Closed MongoDB connection.")
@@ -193,6 +127,6 @@ class MongoDBResource(AsyncResource):
193
  async def shutdown(self: Self, mongo_db: MongoDB) -> None:
194
  """Close MongoDB connection on shutdown."""
195
  try:
196
- await mongo_db.close()
197
  except Exception as e:
198
  logger.error("Error closing MongoDB connection: {}", e)
 
 
1
  from dependency_injector.resources import AsyncResource
2
+ from motor.motor_asyncio import AsyncIOMotorClient, AsyncIOMotorCollection
3
  from pymongo.errors import ConnectionFailure, ServerSelectionTimeoutError
 
4
  from loguru import logger
5
  from pydantic import BaseModel, PrivateAttr
6
+ from typing import Any, Dict, Self
7
 
8
  from ctp_slack_bot.core.config import Settings
9
  from ctp_slack_bot.utils import sanitize_mongo_db_uri
10
 
11
+
12
  class MongoDB(BaseModel):
13
  """
14
  MongoDB connection manager using Motor for async operations.
 
18
  _db: PrivateAttr = PrivateAttr()
19
 
20
  class Config:
21
+ frozen=True
22
  arbitrary_types_allowed = True
23
 
24
  def __init__(self: Self, **data: Dict[str, Any]) -> None:
 
31
  connection_string = self.settings.MONGODB_URI.get_secret_value()
32
  logger.debug("Connecting to MongoDB using URI: {}", sanitize_mongo_db_uri(connection_string))
33
 
34
+ # Create client with appropriate settings.
35
  self._client = AsyncIOMotorClient(
36
  connection_string,
37
  serverSelectionTimeoutMS=5000,
 
42
  w="majority"
43
  )
44
 
45
+ # Get the database name.
46
  db_name = self.settings.MONGODB_NAME
47
 
48
  self._db = self._client[db_name]
 
54
  self._db = None
55
  raise
56
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
57
  async def ping(self: Self) -> bool:
58
  """Check if MongoDB connection is alive."""
59
  try:
60
+ await self._client.admin.command("ping")
 
 
 
 
61
  logger.debug("MongoDB connection is active!")
62
  return True
63
  except (ConnectionFailure, ServerSelectionTimeoutError) as e:
64
  logger.error("MongoDB connection failed: {}", e)
 
65
  except Exception as e:
66
  logger.error("Unexpected error during MongoDB ping: {}", e)
67
+ return False
68
 
69
+ async def get_collection(self: Self, name: str) -> AsyncIOMotorCollection:
70
  """
71
+ Get a collection by name or creates it if it doesn’t exist.
 
72
  """
73
+ # First ensure we can connect at all.
74
  if not await self.ping():
75
  logger.error("Cannot get collection '{}' because a MongoDB connection is not available.", name)
76
  raise ConnectionError("MongoDB connection is not available.")
77
 
78
  try:
79
+ # Get all collection names to check if this one exists.
80
  logger.debug("Checking if collection '{}' exists…", name)
81
+ collection_names = await self._db.list_collection_names()
82
 
83
  if name not in collection_names:
84
  logger.info("Collection '{}' does not exist. Creating it…", name)
85
+
86
+ # Create the collection.
87
+ await self._db.create_collection(name)
88
  logger.debug("Successfully created collection: {}", name)
89
  else:
90
  logger.debug("Collection '{}' already exists!", name)
91
 
92
+ # Get and return the collection.
93
+ collection = self._db[name]
94
  return collection
95
  except Exception as e:
96
  logger.error("Error accessing collection '{}': {}", name, e)
97
  raise
98
 
99
+ def close(self: Self) -> None:
100
+ """Close the MongoDB connection."""
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
101
  if self._client:
102
  self._client.close()
103
  logger.info("Closed MongoDB connection.")
 
127
  async def shutdown(self: Self, mongo_db: MongoDB) -> None:
128
  """Close MongoDB connection on shutdown."""
129
  try:
130
+ mongo_db.close()
131
  except Exception as e:
132
  logger.error("Error closing MongoDB connection: {}", e)
src/ctp_slack_bot/db/repositories/__init__.py CHANGED
@@ -1,2 +1,3 @@
1
  from ctp_slack_bot.db.repositories.mongo_db_vectorized_chunk_repository import MongoVectorizedChunkRepository
2
  from ctp_slack_bot.db.repositories.vectorized_chunk_repository import VectorizedChunkRepository
 
 
1
  from ctp_slack_bot.db.repositories.mongo_db_vectorized_chunk_repository import MongoVectorizedChunkRepository
2
  from ctp_slack_bot.db.repositories.vectorized_chunk_repository import VectorizedChunkRepository
3
+ from ctp_slack_bot.db.repositories.vector_repository_base import VectorRepositoryBase
src/ctp_slack_bot/db/repositories/mongo_db_vectorized_chunk_repository.py CHANGED
@@ -1,65 +1,161 @@
1
- from typing import List, Optional, Dict, Any
2
- import pymongo
3
- from bson import ObjectId
 
4
 
5
- from ctp_slack_bot.db import MongoDB
 
 
6
  from ctp_slack_bot.db.repositories.vectorized_chunk_repository import VectorizedChunkRepository
7
- from ctp_slack_bot.models.base import VectorizedChunk
8
-
9
- class MongoVectorizedChunkRepository(VectorizedChunkRepository):
10
- """MongoDB implementation of VectorizedChunkRepository."""
11
-
12
- def __init__(self, mongo_db: MongoDB):
13
- self.mongo_db = mongo_db
14
- self.collection = self.mongo_db.db.get_collection("vectorized_chunks")
15
-
16
- # Create indexes for efficient queries
17
- self.collection.create_index("chunk_id")
18
- self.collection.create_index("parent_id")
19
-
20
- async def find_by_id(self, id: str) -> Optional[VectorizedChunk]:
21
- doc = await self.collection.find_one({"_id": ObjectId(id)})
22
- return self._map_to_entity(doc) if doc else None
23
-
24
- async def find_all(self) -> List[VectorizedChunk]:
25
- cursor = self.collection.find({})
26
- return [self._map_to_entity(doc) async for doc in cursor]
27
-
28
- async def find_by_parent_id(self, parent_id: str) -> List[VectorizedChunk]:
 
 
 
 
29
  cursor = self.collection.find({"parent_id": parent_id})
30
- return [self._map_to_entity(doc) async for doc in cursor]
31
-
32
- async def save(self, chunk: VectorizedChunk) -> VectorizedChunk:
33
- doc = self._map_to_document(chunk)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
34
 
35
- if "_id" in doc and doc["_id"]:
36
- # Update existing document
37
- await self.collection.replace_one({"_id": doc["_id"]}, doc)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
38
  else:
39
- # Insert new document
40
- result = await self.collection.insert_one(doc)
41
- doc["_id"] = result.inserted_id
42
-
43
- return self._map_to_entity(doc)
44
-
45
- async def delete(self, id: str) -> bool:
46
- result = await self.collection.delete_one({"_id": ObjectId(id)})
47
- return result.deleted_count > 0
48
-
49
- async def find_by_metadata(self, metadata_query: Dict[str, Any]) -> List[VectorizedChunk]:
50
- # Convert the metadata query to MongoDB query format
51
- query = {f"metadata.{k}": v for k, v in metadata_query.items()}
52
- cursor = self.collection.find(query)
53
- return [self._map_to_entity(doc) async for doc in cursor]
54
-
55
- def _map_to_document(self, chunk: VectorizedChunk) -> Dict[str, Any]:
56
- """Convert a VectorizedChunk to a MongoDB document."""
57
- doc = chunk.model_dump()
58
- # Handle any special conversions needed
59
- return doc
60
-
61
- def _map_to_entity(self, doc: Dict[str, Any]) -> VectorizedChunk:
62
- """Convert a MongoDB document to a VectorizedChunk."""
63
- if "_id" in doc:
64
- doc["id"] = str(doc.pop("_id"))
65
- return VectorizedChunk(**doc)
 
1
+ from dependency_injector.resources import AsyncResource
2
+ from loguru import logger
3
+ from pymongo import ASCENDING, ReturnDocument
4
+ from typing import Any, Collection, Dict, Iterable, Mapping, Optional, Self, Sequence, Set
5
 
6
+ from ctp_slack_bot.core import Settings
7
+ from ctp_slack_bot.models import Chunk, VectorizedChunk, VectorQuery
8
+ from ctp_slack_bot.db.mongo_db import MongoDB
9
  from ctp_slack_bot.db.repositories.vectorized_chunk_repository import VectorizedChunkRepository
10
+ from ctp_slack_bot.db.repositories.vector_repository_base import VectorRepositoryBase
11
+
12
+
13
+ class MongoVectorizedChunkRepository(VectorRepositoryBase, VectorizedChunkRepository):
14
+ """MongoDB implementation of VectorizedChunkRepository"""
15
+
16
+ def __init__(self: Self, **data: Dict[str, Any]) -> None:
17
+ super().__init__(**data)
18
+ logger.debug("Created {}", self.__class__.__name__)
19
+
20
+ async def count_by_id(self: Self, parent_id: str, chunk_id: Optional[str] = None) -> int:
21
+ if chunk_id is None:
22
+ return await self.collection.count_documents({"parent_id": parent_id})
23
+ else:
24
+ return await self.collection.count_documents({"parent_id": parent_id, "chunk_id": chunk_id})
25
+
26
+ async def find_all(self: Self) -> Collection[VectorizedChunk]:
27
+ cursor = self.collection.find()
28
+ return [VectorizedChunk(**document) async for document in cursor] # TODO: mutable until async support is extended to tuples
29
+
30
+ async def find_by_metadata(self: Self, metadata_query: Mapping[str, Any]) -> Collection[VectorizedChunk]:
31
+ query = {f"metadata.{key}": value for key, value in metadata_query.items()}
32
+ cursor = self.collection.find(query)
33
+ return [VectorizedChunk(**document) async for document in cursor] # TODO: mutable until async support is extended to tuples
34
+
35
+ async def find_by_parent_id(self: Self, parent_id: str) -> Collection[VectorizedChunk]:
36
  cursor = self.collection.find({"parent_id": parent_id})
37
+ return [VectorizedChunk(**document) async for document in cursor] # TODO: mutable until async support is extended to tuples
38
+
39
+ async def find_by_parent_and_chunk_ids(self: Self, parent_id: str, chunk_id: str) -> Optional[VectorizedChunk]:
40
+ document = await self.collection.find_one({"parent_id": parent_id, "chunk_id": chunk_id})
41
+ return VectorizedChunk(**document) if document else None
42
+
43
+ async def find_by_vector(self: Self, query_embedding: Sequence[float], k: int = 5, score_threshold: float = 0.7) -> Sequence[VectorizedChunk]:
44
+ pipeline = [
45
+ {
46
+ "$vectorSearch": {
47
+ "index": "vector_index",
48
+ "path": "embedding",
49
+ "queryVector": query_embedding,
50
+ "numCandidates": k * 2,
51
+ "limit": k,
52
+ "score": {"$meta": "vectorSearchScore"}
53
+ }
54
+ },
55
+ {"$match": {"score": {"$gte": score_threshold}}}
56
+ ]
57
+ cursor = self.collection.aggregate(pipeline)
58
+ return [VectorizedChunk(**document) async for document in cursor] # TODO: mutable until async support is extended to tuples
59
+
60
+ async def find_by_vector(self: Self, query: VectorQuery) -> Sequence[Chunk]:
61
+ """
62
+ Query the vector database for similar documents.
63
 
64
+ Args:
65
+ query: VectorQuery object with search parameters
66
+
67
+ Returns:
68
+ Sequence[Chunk]: List of similar chunks
69
+ """
70
+ # Build aggregation pipeline for vector search using official MongoDB format.
71
+ pipeline = [
72
+ {
73
+ "$vectorSearch": {
74
+ "index": f"{self.collection.name}_vector_index",
75
+ "path": "embedding",
76
+ "queryVector": query.query_embeddings,
77
+ "numCandidates": query.k * 10,
78
+ "limit": query.k
79
+ }
80
+ },
81
+ {
82
+ "$project": {
83
+ "text": 1,
84
+ "metadata": 1,
85
+ "parent_id": 1,
86
+ "chunk_id": 1,
87
+ "score": { "$meta": "vectorSearchScore" }
88
+ }
89
+ },
90
+ {
91
+ "$match": {
92
+ "score": { "$gte": query.score_threshold }
93
+ }
94
+ }
95
+ ]
96
+ if query.filter_metadata: # Add metadata filters if provided.
97
+ metadata_filter = {f"metadata.{key}": value for key, value in query.filter_metadata.items()}
98
+ pipeline.insert(1, {"$match": metadata_filter})
99
+
100
+ # Execute the vector search pipeline.
101
+ results = await self.collection.aggregate(pipeline).to_list(length=query.k)
102
+
103
+ # Convert results to Chunk objects ― don’t care about the embeddings.
104
+ return tuple(Chunk(text=result["text"],
105
+ parent_id=result["parent_id"],
106
+ chunk_id=result["chunk_id"],
107
+ metadata={**result["metadata"], "similarity_score": result.get("score", 0)})
108
+ for result
109
+ in results)
110
+
111
+ async def insert_one(self, chunk: VectorizedChunk) -> str:
112
+ document = chunk.model_dump()
113
+ result = await self.collection.insert_one(document)
114
+ return str(result.inserted_id)
115
+
116
+ async def insert_many(self, chunks: Iterable[VectorizedChunk]) -> Set[str]:
117
+ documents = [chunk.model_dump() for chunk in chunks]
118
+ result = await self.collection.insert_many(documents)
119
+ return frozenset(map(str, result.inserted_ids))
120
+
121
+ async def replace_all(self: Self, chunks: Iterable[VectorizedChunk]) -> Set[str]:
122
+ parent_ids = set()
123
+ documents = []
124
+ for chunk in chunks:
125
+ parent_ids.add(chunk.parent_id)
126
+ documents.append(chunk.model_dump())
127
+ async with await self.collection.database.client.start_session() as session:
128
+ async with session.start_transaction():
129
+ delete_result = await self.collection.delete_many({"parent_id": {"$in": tuple(parent_ids)}}, session=session)
130
+ insert_result = await self.collection.insert_many(documents, session=session)
131
+ return frozenset(map(str, insert_result.inserted_ids))
132
+
133
+ async def replace_one(self: Self, chunk: VectorizedChunk) -> str:
134
+ result = await self.collection.find_one_and_replace(
135
+ {"parent_id": chunk.parent_id, "chunk_id": chunk.chunk_id},
136
+ chunk.model_dump(),
137
+ upsert=True,
138
+ return_document=ReturnDocument.AFTER
139
+ )
140
+ return result["_id"]
141
+
142
+ async def delete(self: Self, parent_id: str, chunk_id: Optional[str] = None) -> int:
143
+ if chunk_id is not None:
144
+ result = await self.collection.delete_one({"parent_id": parent_id, "chunk_id": chunk_id})
145
  else:
146
+ result = await self.collection.delete_many({"parent_id": parent_id})
147
+ return result.deleted_count
148
+
149
+ async def ensure_indices_exist(self: Self) -> None:
150
+ await super().ensure_indices_exist()
151
+ index_name = "parent_chunk_unique"
152
+ existing_indices = await self.collection.index_information()
153
+ if index_name not in existing_indices:
154
+ await self.collection.create_index([("parent_id", ASCENDING), ("chunk_id", ASCENDING)], unique=True, name=index_name)
155
+
156
+ class MongoVectorizedChunkRepositoryResource(AsyncResource):
157
+ async def init(self: Self, settings: Settings, mongo_db: MongoDB) -> MongoVectorizedChunkRepository:
158
+ vectorized_chunk_collection = await mongo_db.get_collection("vectorized_chunks")
159
+ vectorized_chunk_repository = MongoVectorizedChunkRepository(settings=settings, collection=vectorized_chunk_collection)
160
+ await vectorized_chunk_repository.ensure_indices_exist()
161
+ return vectorized_chunk_repository
 
 
 
 
 
 
 
 
 
 
 
src/ctp_slack_bot/db/repositories/vector_repository_base.py ADDED
@@ -0,0 +1,62 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from abc import ABC
2
+ from loguru import logger
3
+ from motor.motor_asyncio import AsyncIOMotorCollection
4
+ from pydantic import BaseModel
5
+ from pymongo.operations import SearchIndexModel
6
+ from typing import Self
7
+
8
+ from ctp_slack_bot.core import Settings
9
+
10
+ class VectorRepositoryBase(ABC, BaseModel):
11
+ """MongoDB implementation of VectorizedChunkRepository"""
12
+
13
+ settings: Settings
14
+ collection: AsyncIOMotorCollection
15
+
16
+ class Config:
17
+ frozen=True
18
+ arbitrary_types_allowed = True
19
+
20
+ async def ensure_indices_exist(self: Self) -> None:
21
+ """Ensure that indices exist."""
22
+ await self.ensure_search_index_exists()
23
+
24
+ async def ensure_search_index_exists(self: Self) -> None:
25
+ """
26
+ Ensure that a vector search index exists.
27
+ """
28
+ index_name = f"{self.collection.name}_vector_index"
29
+ try:
30
+ existing_indexes = [index["name"] async for index in self.collection.list_search_indexes()]
31
+ logger.debug("{} existing indices were found: {}", len(existing_indexes), existing_indexes)
32
+ if index_name in existing_indexes:
33
+ logger.debug("Index '{}' already exists; duplicate index will not be created.", index_name)
34
+ return
35
+
36
+ # Create search index model using MongoDB's recommended approach.
37
+ search_index_model = SearchIndexModel(
38
+ definition={
39
+ "fields": [
40
+ {
41
+ "type": "vector",
42
+ "path": "embedding",
43
+ "numDimensions": self.settings.VECTOR_DIMENSION,
44
+ "similarity": "cosine",
45
+ "quantization": "scalar"
46
+ }
47
+ ]
48
+ },
49
+ name=index_name,
50
+ type="vectorSearch"
51
+ )
52
+ result = await self.collection.create_search_index(search_index_model)
53
+ logger.info("Vector search index '{}' created for collection {}.", result, self.collection.name)
54
+ except Exception as e:
55
+ if "command not found" in str(e).lower():
56
+ logger.warning("Vector search not supported by this MongoDB instance. Some functionality may be limited.")
57
+ # Create a fallback standard index on embedding field.
58
+ await self.collection.create_index("embedding")
59
+ logger.info("Created standard index on 'embedding' field as fallback.")
60
+ else:
61
+ logger.error("Failed to create vector index: {}", e)
62
+ raise
src/ctp_slack_bot/db/repositories/vectorized_chunk_repository.py CHANGED
@@ -1,30 +1,52 @@
1
- from typing import List, Optional, Dict, Any
 
 
2
 
3
- from ctp_slack_bot.models.base import VectorizedChunk
4
 
5
- class VectorizedChunkRepository:
6
  """Repository interface for VectorizedChunk entities."""
7
 
8
- async def find_by_id(self, id: str) -> Optional[VectorizedChunk]:
9
- """Find a chunk by its ID."""
10
  pass
11
 
12
- async def find_all(self) -> List[VectorizedChunk]:
13
- """Find all chunks."""
14
  pass
15
 
16
- async def find_by_parent_id(self, parent_id: str) -> List[VectorizedChunk]:
17
- """Find chunks by parent document ID."""
18
  pass
19
 
20
- async def save(self, chunk: VectorizedChunk) -> VectorizedChunk:
21
- """Save a chunk to the database."""
22
  pass
23
 
24
- async def delete(self, id: str) -> bool:
25
- """Delete a chunk by its ID."""
26
  pass
27
 
28
- async def find_by_metadata(self, metadata_query: Dict[str, Any]) -> List[VectorizedChunk]:
29
- """Find chunks by metadata criteria."""
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
30
  pass
 
1
+ from abc import ABC, abstractmethod
2
+ from pydantic import BaseModel
3
+ from typing import Any, Collection, Iterable, Mapping, Optional, Self, Sequence, Set
4
 
5
+ from ctp_slack_bot.models import Chunk, VectorizedChunk, VectorQuery
6
 
7
+ class VectorizedChunkRepository(ABC, BaseModel):
8
  """Repository interface for VectorizedChunk entities."""
9
 
10
+ @abstractmethod
11
+ async def count_by_id(self: Self, parent_id: str, chunk_id: Optional[str] = None) -> int:
12
  pass
13
 
14
+ @abstractmethod
15
+ async def find_all(self: Self) -> Collection[VectorizedChunk]:
16
  pass
17
 
18
+ @abstractmethod
19
+ async def find_by_metadata(self: Self, metadata_query: Mapping[str, Any]) -> Collection[VectorizedChunk]:
20
  pass
21
 
22
+ @abstractmethod
23
+ async def find_by_parent_id(self: Self, parent_id: str) -> Collection[VectorizedChunk]:
24
  pass
25
 
26
+ @abstractmethod
27
+ async def find_by_parent_and_chunk_ids(self: Self, parent_id: str, chunk_id: str) -> Optional[VectorizedChunk]:
28
  pass
29
 
30
+ @abstractmethod
31
+ async def find_by_vector(self: Self, query: VectorQuery) -> Sequence[Chunk]:
32
+ pass
33
+
34
+ @abstractmethod
35
+ async def insert_one(self, chunk: VectorizedChunk) -> str:
36
+ pass
37
+
38
+ @abstractmethod
39
+ async def insert_many(self, chunks: Iterable[VectorizedChunk]) -> Set[str]:
40
+ pass
41
+
42
+ @abstractmethod
43
+ async def replace_all(self: Self, chunks: Iterable[VectorizedChunk]) -> Set[str]:
44
+ pass
45
+
46
+ @abstractmethod
47
+ async def replace_one(self: Self, chunk: VectorizedChunk) -> str:
48
+ pass
49
+
50
+ @abstractmethod
51
+ async def delete(self: Self, parent_id: str, chunk_id: Optional[str] = None) -> int:
52
  pass
src/ctp_slack_bot/models/base.py CHANGED
@@ -1,6 +1,8 @@
1
  from abc import ABC, abstractmethod
2
- from pydantic import BaseModel, ConfigDict, Field
3
- from typing import Any, final, Mapping, Self, Sequence, Optional
 
 
4
 
5
 
6
  class Chunk(BaseModel):
@@ -9,10 +11,15 @@ class Chunk(BaseModel):
9
  text: str # The text representation
10
  parent_id: str # The source content’s identity
11
  chunk_id: str # This chunk’s identity—unique within the source content
12
- metadata: Mapping[str, Any]
13
 
14
  model_config = ConfigDict(frozen=True)
15
 
 
 
 
 
 
16
 
17
  @final
18
  class VectorQuery(BaseModel):
@@ -25,19 +32,24 @@ class VectorQuery(BaseModel):
25
  filter_metadata: Optional filters for metadata fields
26
  """
27
 
28
- query_embeddings: Sequence[float]
29
  k: int
30
  score_threshold: float = Field(default=0.7)
31
- filter_metadata: Optional[Mapping[str, Any]] = None
32
 
33
  model_config = ConfigDict(frozen=True)
34
 
 
 
 
 
 
35
 
36
  @final
37
  class VectorizedChunk(Chunk):
38
  """A class representing a vectorized chunk of content."""
39
 
40
- embedding: Sequence[float] # The vector representation
41
 
42
 
43
  class Content(ABC, BaseModel):
@@ -50,7 +62,7 @@ class Content(ABC, BaseModel):
50
  pass
51
 
52
  @abstractmethod
53
- def get_chunks(self: Self) -> Sequence[Chunk]:
54
  pass
55
 
56
  @abstractmethod
 
1
  from abc import ABC, abstractmethod
2
+ from pydantic import BaseModel, ConfigDict, Field, field_validator
3
+ from typing import Any, final, Mapping, Optional, Self
4
+
5
+ from ctp_slack_bot.utils import to_deep_immutable
6
 
7
 
8
  class Chunk(BaseModel):
 
11
  text: str # The text representation
12
  parent_id: str # The source content’s identity
13
  chunk_id: str # This chunk’s identity—unique within the source content
14
+ metadata: Mapping[str, Any] = Field(default_factory=dict)
15
 
16
  model_config = ConfigDict(frozen=True)
17
 
18
+ @field_validator('metadata')
19
+ @classmethod
20
+ def __make_metadata_readonly(cls, value: Mapping[str, Any]) -> Mapping[str, Any]:
21
+ return to_deep_immutable(value)
22
+
23
 
24
  @final
25
  class VectorQuery(BaseModel):
 
32
  filter_metadata: Optional filters for metadata fields
33
  """
34
 
35
+ query_embeddings: tuple[float, ...]
36
  k: int
37
  score_threshold: float = Field(default=0.7)
38
+ filter_metadata: Mapping[str, Any] = Field(default_factory=dict)
39
 
40
  model_config = ConfigDict(frozen=True)
41
 
42
+ @field_validator('filter_metadata')
43
+ @classmethod
44
+ def __make_metadata_readonly(cls, value: Mapping[str, Any]) -> Mapping[str, Any]:
45
+ return to_deep_immutable(value)
46
+
47
 
48
  @final
49
  class VectorizedChunk(Chunk):
50
  """A class representing a vectorized chunk of content."""
51
 
52
+ embedding: tuple[float, ...] # The vector representation
53
 
54
 
55
  class Content(ABC, BaseModel):
 
62
  pass
63
 
64
  @abstractmethod
65
+ def get_chunks(self: Self) -> tuple[Chunk, ...]:
66
  pass
67
 
68
  @abstractmethod
src/ctp_slack_bot/models/slack.py CHANGED
@@ -2,7 +2,7 @@ from datetime import datetime
2
  from json import dumps
3
  from pydantic import BaseModel, ConfigDict, PositiveInt, PrivateAttr
4
  from types import MappingProxyType
5
- from typing import Any, Dict, Literal, Mapping, Optional, Self, Sequence
6
 
7
  from ctp_slack_bot.models.base import Chunk, Content
8
 
@@ -23,7 +23,7 @@ class SlackEvent(BaseModel):
23
  type: str
24
  event_id: str
25
  event_time: int
26
- authed_users: Sequence[str]
27
 
28
  model_config = ConfigDict(frozen=True)
29
 
@@ -40,7 +40,7 @@ class SlackReaction(BaseModel):
40
 
41
  name: str
42
  count: PositiveInt
43
- users: Sequence[str]
44
 
45
  model_config = ConfigDict(frozen=True)
46
 
@@ -61,14 +61,14 @@ class SlackMessage(Content):
61
  deleted_ts: Optional[str] = None
62
  hidden: bool = False
63
  is_starred: Optional[bool] = None
64
- pinned_to: Optional[Sequence[str]] = None
65
- reactions: Optional[Sequence[SlackReaction]] = None
66
 
67
  def get_id(self: Self) -> str:
68
  """Unique identifier for this message."""
69
  return f"slack-message:{self.channel}:{self.ts}"
70
 
71
- def get_chunks(self: Self) -> Sequence[Chunk]:
72
  return (Chunk(text=self.text, parent_id=self.get_id(), chunk_id="", metadata=self.get_metadata()), )
73
 
74
  def get_metadata(self: Self) -> Mapping[str, Any]:
 
2
  from json import dumps
3
  from pydantic import BaseModel, ConfigDict, PositiveInt, PrivateAttr
4
  from types import MappingProxyType
5
+ from typing import Any, Literal, Mapping, Optional, Self
6
 
7
  from ctp_slack_bot.models.base import Chunk, Content
8
 
 
23
  type: str
24
  event_id: str
25
  event_time: int
26
+ authed_users: tuple[str, ...]
27
 
28
  model_config = ConfigDict(frozen=True)
29
 
 
40
 
41
  name: str
42
  count: PositiveInt
43
+ users: tuple[str, ...]
44
 
45
  model_config = ConfigDict(frozen=True)
46
 
 
61
  deleted_ts: Optional[str] = None
62
  hidden: bool = False
63
  is_starred: Optional[bool] = None
64
+ pinned_to: Optional[tuple[str, ...]] = None
65
+ reactions: Optional[tuple[SlackReaction, ...]] = None
66
 
67
  def get_id(self: Self) -> str:
68
  """Unique identifier for this message."""
69
  return f"slack-message:{self.channel}:{self.ts}"
70
 
71
+ def get_chunks(self: Self) -> tuple[Chunk]:
72
  return (Chunk(text=self.text, parent_id=self.get_id(), chunk_id="", metadata=self.get_metadata()), )
73
 
74
  def get_metadata(self: Self) -> Mapping[str, Any]:
src/ctp_slack_bot/models/webvtt.py CHANGED
@@ -1,14 +1,13 @@
1
  from datetime import datetime, timedelta
2
  from io import BytesIO
3
- from itertools import starmap
4
- from json import dumps
5
  from more_itertools import windowed
6
- from pydantic import BaseModel, ConfigDict, Field, PositiveInt, PrivateAttr
7
  from types import MappingProxyType
8
- from typing import Any, Dict, Literal, Mapping, Optional, Self, Sequence
9
  from webvtt import Caption, WebVTT
10
 
11
  from ctp_slack_bot.models.base import Chunk, Content
 
12
 
13
 
14
  CHUNK_FRAMES_OVERLAP = 1
@@ -45,12 +44,12 @@ class WebVTTContent(Content):
45
  id: str
46
  metadata: Mapping[str, Any] = Field(default_factory=dict)
47
  start_time: Optional[datetime]
48
- frames: Sequence[WebVTTFrame]
49
 
50
  def get_id(self: Self) -> str:
51
  return self.id
52
 
53
- def get_chunks(self: Self) -> Sequence[Chunk]:
54
  windows = (tuple(filter(None, window))
55
  for window
56
  in windowed(self.frames, CHUNK_FRAMES_WINDOW, step=CHUNK_FRAMES_WINDOW-CHUNK_FRAMES_OVERLAP))
@@ -62,7 +61,7 @@ class WebVTTContent(Content):
62
  metadata={
63
  "start": self.start_time + frames[0].start if self.start_time else None,
64
  "end": self.start_time + frames[-1].end if self.start_time else None,
65
- "speakers": tuple(frame.speaker for frame in frames if frame.speaker)
66
  })
67
  for frames
68
  in windows)
 
1
  from datetime import datetime, timedelta
2
  from io import BytesIO
 
 
3
  from more_itertools import windowed
4
+ from pydantic import BaseModel, ConfigDict, Field, field_validator
5
  from types import MappingProxyType
6
+ from typing import Any, Literal, Mapping, Optional, Self
7
  from webvtt import Caption, WebVTT
8
 
9
  from ctp_slack_bot.models.base import Chunk, Content
10
+ from ctp_slack_bot.utils import to_deep_immutable
11
 
12
 
13
  CHUNK_FRAMES_OVERLAP = 1
 
44
  id: str
45
  metadata: Mapping[str, Any] = Field(default_factory=dict)
46
  start_time: Optional[datetime]
47
+ frames: tuple[WebVTTFrame, ...]
48
 
49
  def get_id(self: Self) -> str:
50
  return self.id
51
 
52
+ def get_chunks(self: Self) -> tuple[Chunk]:
53
  windows = (tuple(filter(None, window))
54
  for window
55
  in windowed(self.frames, CHUNK_FRAMES_WINDOW, step=CHUNK_FRAMES_WINDOW-CHUNK_FRAMES_OVERLAP))
 
61
  metadata={
62
  "start": self.start_time + frames[0].start if self.start_time else None,
63
  "end": self.start_time + frames[-1].end if self.start_time else None,
64
+ "speakers": (frame.speaker for frame in frames if frame.speaker)
65
  })
66
  for frames
67
  in windows)
src/ctp_slack_bot/services/content_ingestion_service.py CHANGED
@@ -30,9 +30,9 @@ class ContentIngestionService(BaseModel):
30
 
31
  async def process_incoming_content(self: Self, content: Content) -> None:
32
  logger.debug("Content ingestion service received content with metadata: {}", content.get_metadata())
33
- # if self.vector_database_service.has_content(content.get_id()) # TODO
34
- # logger.debug("Ignored content with ID {} because it already exists in the database.", content.get_id())
35
- # return
36
  chunks = content.get_chunks()
37
  await self.__vectorize_and_store_chunks_in_database(chunks)
38
  logger.debug("Stored {} vectorized chunk(s) in the database.", len(chunks))
@@ -44,6 +44,5 @@ class ContentIngestionService(BaseModel):
44
  logger.debug("Stored {} vectorized chunk(s) in the database.", len(chunks))
45
 
46
  async def __vectorize_and_store_chunks_in_database(self: Self, chunks: Sequence[Chunk]) -> None:
47
- vectorized_chunks = self.vectorization_service.vectorize(chunks) # TODO
48
- await self.vector_database_service.store(vectorized_chunks) # TODO
49
-
 
30
 
31
  async def process_incoming_content(self: Self, content: Content) -> None:
32
  logger.debug("Content ingestion service received content with metadata: {}", content.get_metadata())
33
+ if self.vector_database_service.content_exists(content.get_id()):
34
+ logger.debug("Ignored content with identifier, {}, because it already exists in the database.", content.get_id())
35
+ return
36
  chunks = content.get_chunks()
37
  await self.__vectorize_and_store_chunks_in_database(chunks)
38
  logger.debug("Stored {} vectorized chunk(s) in the database.", len(chunks))
 
44
  logger.debug("Stored {} vectorized chunk(s) in the database.", len(chunks))
45
 
46
  async def __vectorize_and_store_chunks_in_database(self: Self, chunks: Sequence[Chunk]) -> None:
47
+ vectorized_chunks = self.vectorization_service.vectorize(chunks)
48
+ await self.vector_database_service.store(vectorized_chunks)
 
src/ctp_slack_bot/services/context_retrieval_service.py CHANGED
@@ -34,33 +34,23 @@ class ContextRetrievalService(BaseModel):
34
  Returns:
35
  Sequence[Chunk]: List of retrieved context items with similarity scores
36
  """
37
- # Extract chunks from the message
38
- message_chunks = message.get_chunks()
39
-
40
- # Vectorize the chunks
41
- vectorized_chunks = self.vectorization_service.vectorize(message_chunks)
42
-
43
- # Create vector query using the first chunk's embedding (typically there's only one chunk for a message)
44
- if not vectorized_chunks:
45
- logger.warning("No vectorized chunks were created for message")
46
- return []
47
-
48
  query = VectorQuery(
49
- query_embeddings=vectorized_chunks[0].embedding,
50
  k=self.settings.TOP_K_MATCHES,
51
  score_threshold=self.settings.SCORE_THRESHOLD,
52
- filter_metadata=None # Can be expanded to include filters based on message metadata
53
  )
54
-
55
- # Perform similarity search
56
  try:
57
- results = await self.vector_database_service.search_by_similarity(query)
58
- # logger.info(f"Retrieved {len(results)} context chunks for query")
59
  return results
60
  except Exception as e:
61
- logger.error(f"Error retrieving context: {str(e)}")
62
- return []
63
-
64
- # test return statement
65
- # return (VectorizedChunk(text="Mock context chunk", parent_id="lol", chunk_id="no", metadata={}, embedding=tuple()),
66
- # VectorizedChunk(text="Moar mock context chunk", parent_id="lol", chunk_id="wut", metadata={}, embedding=tuple()))
 
34
  Returns:
35
  Sequence[Chunk]: List of retrieved context items with similarity scores
36
  """
37
+ message_chunks = message.get_chunks() # Guaranteed to have exactly 1 chunk
38
+
39
+ try:
40
+ vectorized_message_chunks = self.vectorization_service.vectorize(message_chunks)
41
+ except Exception as e:
42
+ logger.error("An error occurred while vectorizing the question, “{}”: {}", message.text, e)
43
+
 
 
 
 
44
  query = VectorQuery(
45
+ query_embeddings=vectorized_message_chunks[0].embedding,
46
  k=self.settings.TOP_K_MATCHES,
47
  score_threshold=self.settings.SCORE_THRESHOLD,
48
+ filter_metadata={} # Can be expanded to include filters based on message metadata
49
  )
50
+
 
51
  try:
52
+ results = await self.vector_database_service.find_by_vector(query)
 
53
  return results
54
  except Exception as e:
55
+ logger.error("An error occurred while searching the vector database for context: {}", e)
56
+ return ()
 
 
 
 
src/ctp_slack_bot/services/vector_database_service.py CHANGED
@@ -1,17 +1,18 @@
1
  from loguru import logger
2
  from pydantic import BaseModel
3
- from typing import Any, Collection, Dict, List, Optional, Self, Sequence
4
 
5
  from ctp_slack_bot.core import Settings
6
- from ctp_slack_bot.db import MongoDB
7
  from ctp_slack_bot.models import Chunk, VectorizedChunk, VectorQuery
8
 
9
  class VectorDatabaseService(BaseModel): # TODO: this should not rely specifically on MongoDB.
10
  """
11
  Service for storing and retrieving vector embeddings from MongoDB.
12
  """
 
13
  settings: Settings
14
- mongo_db: MongoDB
15
 
16
  class Config:
17
  frozen=True
@@ -19,157 +20,48 @@ class VectorDatabaseService(BaseModel): # TODO: this should not rely specificall
19
  def __init__(self: Self, **data) -> None:
20
  super().__init__(**data)
21
  logger.debug("Created {}", self.__class__.__name__)
22
-
23
- async def store(self: Self, chunks: Collection[VectorizedChunk]) -> None:
24
- """
25
- Stores vectorized chunks and their embedding vectors in the database.
26
-
27
- Args:
28
- chunks: Collection of VectorizedChunk objects to store
29
-
30
- Returns: None
31
- """
32
- if not chunks:
33
- logger.debug("No chunks to store")
34
- return
35
-
36
- try:
37
- # Get the vector collection - this will create it if it doesn't exist
38
- logger.debug("Getting vectors collection for storing {} chunks", len(chunks))
39
- vector_collection = await self.mongo_db.get_collection("vectors")
40
-
41
- # Ensure vector search index exists
42
- logger.debug("Creating vector search index for vectors collection")
43
- await self.mongo_db.create_indexes("vectors")
44
-
45
- # Create documents to store, ensuring compatibility with BSON
46
- documents = []
47
- for chunk in chunks:
48
- # Convert embedding to standard list format (important for BSON compatibility)
49
- embedding = list(chunk.embedding) if not isinstance(chunk.embedding, list) else chunk.embedding
50
-
51
- # Build document with proper structure
52
- document = {
53
- "text": chunk.text,
54
- "embedding": embedding,
55
- "metadata": chunk.metadata,
56
- "parent_id": chunk.parent_id,
57
- "chunk_id": chunk.chunk_id
58
- }
59
- documents.append(document)
60
-
61
- # Insert into collection as a batch
62
- logger.debug("Inserting {} documents into vectors collection", len(documents))
63
- result = await vector_collection.insert_many(documents)
64
- logger.info("Stored {} vector chunks in database", len(result.inserted_ids))
65
-
66
- except Exception as e:
67
- logger.error("Error storing vector embeddings: {}", str(e))
68
- # Include more diagnostic information
69
- logger.debug("MongoDB connection info: URI defined: {}, DB name: {}",
70
- bool(self.settings.MONGODB_URI), self.settings.MONGODB_NAME)
71
- raise
72
 
73
- async def content_exists(self: Self, key: str)-> bool: # TODO: implement this.
74
  """
75
- Check if content exists in the database.
76
 
77
  Args:
78
- key: The key to check for content existence
 
79
  """
80
- pass
 
81
 
82
- async def search_by_similarity(self: Self, query: VectorQuery) -> Sequence[Chunk]:
83
  """
84
- Query the vector database for similar documents.
85
-
86
  Args:
87
- query: VectorQuery object with search parameters
88
 
89
  Returns:
90
- Sequence[Chunk]: List of similar chunks
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
91
  """
92
  try:
93
- # Get the vector collection
94
- logger.debug("Getting vectors collection for similarity search")
95
- vector_collection = await self.mongo_db.get_collection("vectors")
96
-
97
- # Build aggregation pipeline for vector search using official MongoDB format
98
- logger.debug("Building vector search pipeline with query embedding dimension: {}", len(query.query_embeddings))
99
- pipeline = [
100
- {
101
- "$vectorSearch": {
102
- "index": "vectors_vector_index",
103
- "path": "embedding",
104
- "queryVector": query.query_embeddings, #list(query.query_embeddings),
105
- "numCandidates": query.k * 10,
106
- "limit": query.k
107
- }
108
- },
109
- {
110
- "$project": {
111
- "text": 1,
112
- "metadata": 1,
113
- "parent_id": 1,
114
- "chunk_id": 1,
115
- "score": { "$meta": "vectorSearchScore" }
116
- }
117
- }
118
- ]
119
-
120
- # Add metadata filters if provided
121
- if query.filter_metadata:
122
- metadata_filter = {f"metadata.{k}": v for k, v in query.filter_metadata.items()}
123
- pipeline.insert(1, {"$match": metadata_filter})
124
- logger.debug("Added metadata filters to search: {}", query.filter_metadata)
125
-
126
- # Add score threshold filter if needed
127
- if query.score_threshold > 0:
128
- pipeline.append({
129
- "$match": {
130
- "score": { "$gte": query.score_threshold }
131
- }
132
- })
133
- logger.debug("Added score threshold filter: {}", query.score_threshold)
134
-
135
- try:
136
- # Execute the vector search pipeline
137
- logger.debug("Executing vector search pipeline")
138
- results = await vector_collection.aggregate(pipeline).to_list(length=query.k)
139
- logger.debug("Vector search returned {} results", len(results))
140
- except Exception as e:
141
- logger.warning("Vector search failed: {}. Falling back to basic text search.", str(e))
142
- # Fall back to basic filtering with limit
143
- query_filter = {}
144
- if query.filter_metadata:
145
- query_filter.update({f"metadata.{k}": v for k, v in query.filter_metadata.items()})
146
-
147
- logger.debug("Executing fallback basic search with filter: {}", query_filter)
148
- results = await vector_collection.find(query_filter).limit(query.k).to_list(length=query.k)
149
- logger.debug("Fallback search returned {} results", len(results))
150
-
151
- # Convert results to Chunk objects
152
- chunks = []
153
- for result in results:
154
- chunk = Chunk(
155
- text=result["text"],
156
- parent_id=result["parent_id"],
157
- chunk_id=result["chunk_id"],
158
- metadata={
159
- **result["metadata"],
160
- "similarity_score": result.get("score", 0)
161
- }
162
- )
163
- chunks.append(chunk)
164
-
165
- logger.info("Found {} similar chunks with similarity search", len(chunks))
166
- return chunks
167
-
168
  except Exception as e:
169
- logger.error("Error in similarity search: {}", str(e))
170
- # Include additional diagnostic information
171
- logger.debug("MongoDB connection info: URI defined: {}, DB name: {}",
172
- bool(self.settings.MONGODB_URI), self.settings.MONGODB_NAME)
173
- logger.debug("Query details: k={}, dimension={}",
174
- query.k, len(query.query_embeddings) if query.query_embeddings else "None")
175
  raise
 
1
  from loguru import logger
2
  from pydantic import BaseModel
3
+ from typing import Iterable, Optional, Self, Sequence
4
 
5
  from ctp_slack_bot.core import Settings
6
+ from ctp_slack_bot.db.repositories import VectorizedChunkRepository
7
  from ctp_slack_bot.models import Chunk, VectorizedChunk, VectorQuery
8
 
9
  class VectorDatabaseService(BaseModel): # TODO: this should not rely specifically on MongoDB.
10
  """
11
  Service for storing and retrieving vector embeddings from MongoDB.
12
  """
13
+
14
  settings: Settings
15
+ vectorized_chunk_repository: VectorizedChunkRepository
16
 
17
  class Config:
18
  frozen=True
 
20
  def __init__(self: Self, **data) -> None:
21
  super().__init__(**data)
22
  logger.debug("Created {}", self.__class__.__name__)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
23
 
24
+ async def content_exists(self: Self, parent_id: str, chunk_id: Optional[str] = None)-> bool:
25
  """
26
+ Check if the content identified by the parent and optionally the chunk identifiers exist in the database.
27
 
28
  Args:
29
+ parent_id: the identifier of the source content
30
+ chunk_id: the identifier of the chunk within the source content
31
  """
32
+ matching_chunk_count = await self.vectorized_chunk_repository.count_by_id(parent_id, chunk_id)
33
+ return 0 < matching_chunk_count
34
 
35
+ async def find_by_vector(self: Self, query: VectorQuery) -> Sequence[Chunk]:
36
  """
37
+ Query the vector database for similar chunks.
38
+
39
  Args:
40
+ query: the query criteria
41
 
42
  Returns:
43
+ Sequence[Chunk]: an ordered collection of similar chunks
44
+ """
45
+ try:
46
+ result = await self.vectorized_chunk_repository.find_by_vector(query)
47
+ logger.debug("Found {} chunks in the database by similarity search.", len(result))
48
+ return result
49
+ except Exception as e:
50
+ logger.error("Error finding chunks by vector: {}", str(e))
51
+ raise
52
+
53
+ async def store(self: Self, chunks: Iterable[VectorizedChunk]) -> None:
54
+ """
55
+ Stores vectorized chunks and their embedding vectors in the database.
56
+
57
+ Args:
58
+ chunks: a collection of vectorized chunks to store
59
+
60
+ Returns: None
61
  """
62
  try:
63
+ inserted_ids = await self.vectorized_chunk_repository.insert_many(chunks)
64
+ logger.debug("Stored {} vectorized chunks in the database.", len(inserted_ids))
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
65
  except Exception as e:
66
+ logger.error("Error storing vectorized chunks: {}", str(e))
 
 
 
 
 
67
  raise
src/ctp_slack_bot/utils/__init__.py CHANGED
@@ -1 +1,2 @@
 
1
  from ctp_slack_bot.utils.secret_stripper import sanitize_mongo_db_uri
 
1
+ from ctp_slack_bot.utils.immutable import to_deep_immutable
2
  from ctp_slack_bot.utils.secret_stripper import sanitize_mongo_db_uri
src/ctp_slack_bot/utils/immutable.py ADDED
@@ -0,0 +1,22 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from types import MappingProxyType
2
+ from collections.abc import Iterable, Mapping, Sequence, Set
3
+ from typing import Any
4
+
5
+
6
+ def to_deep_immutable(obj: Any):
7
+ """Recursively convert mutable containers to immutable equivalents."""
8
+
9
+ # Handle mappings (dict-like).
10
+ if isinstance(obj, Mapping):
11
+ return MappingProxyType({to_deep_immutable(key): to_deep_immutable(value) for key, value in obj.items()})
12
+
13
+ # Handle sets.
14
+ if isinstance(obj, Set):
15
+ return frozenset(to_deep_immutable(item) for item in obj)
16
+
17
+ # Handle sequences (list/tuple-like).
18
+ if isinstance(obj, (Iterable, Sequence)) and not isinstance(obj, (str, bytes)):
19
+ return tuple(to_deep_immutable(item) for item in obj)
20
+
21
+ # Return anything else as-is.
22
+ return obj
temporary_health_check_server.py ADDED
@@ -0,0 +1,11 @@
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from aiohttp import web
2
+
3
+ async def aliveness_handler(request):
4
+ return web.Response(text="Server is alive and kicking!")
5
+
6
+ app = web.Application()
7
+ app.router.add_get('/', aliveness_handler)
8
+ app.router.add_get('/health', aliveness_handler)
9
+
10
+ if __name__ == "__main__":
11
+ web.run_app(app, host='0.0.0.0', port=8080)