LiKenun commited on
Commit
4f2cb88
·
1 Parent(s): 8ec2c5a

Test MongoDB code

Browse files
src/ctp_slack_bot/containers.py CHANGED
@@ -4,6 +4,7 @@ from slack_bolt.async_app import AsyncApp
4
 
5
  from ctp_slack_bot.core.config import Settings
6
  from ctp_slack_bot.db.mongo_db import MongoDB
 
7
  from ctp_slack_bot.services.answer_retrieval_service import AnswerRetrievalService
8
  from ctp_slack_bot.services.content_ingestion_service import ContentIngestionService
9
  from ctp_slack_bot.services.context_retrieval_service import ContextRetrievalService
@@ -20,12 +21,11 @@ class Container(DeclarativeContainer):
20
  settings = Singleton(Settings)
21
  event_brokerage_service = Singleton(EventBrokerageService)
22
  slack_bolt_app = Factory(AsyncApp, token=settings.provided.SLACK_BOT_TOKEN().get_secret_value())
23
- mongo_db = Singleton(MongoDB, settings=settings) # TODO: we could really use less commitment to MongoDB.
24
- # Repositories
25
- # transcript_repository = Factory(
26
- # # Your transcript repository class
27
- # mongo_db=mongo_db
28
- # )
29
  vector_database_service = Singleton(VectorDatabaseService, settings=settings, mongo_db=mongo_db)
30
  embeddings_model_service = Singleton(EmbeddingsModelService, settings=settings)
31
  vectorization_service = Singleton(VectorizationService, settings=settings, embeddings_model_service=embeddings_model_service)
@@ -35,4 +35,4 @@ class Container(DeclarativeContainer):
35
  answer_retrieval_service = Singleton(AnswerRetrievalService, settings=settings, event_brokerage_service=event_brokerage_service, language_model_service=language_model_service)
36
  question_dispatch_service = Singleton(QuestionDispatchService, settings=settings, event_brokerage_service=event_brokerage_service, content_ingestion_service=content_ingestion_service, context_retrieval_service=context_retrieval_service, answer_retrieval_service=answer_retrieval_service)
37
  slack_service = Singleton(SlackService, event_brokerage_service=event_brokerage_service, slack_bolt_app=slack_bolt_app)
38
- primordial_services = List(settings, event_brokerage_service, slack_bolt_app, slack_service, question_dispatch_service, content_ingestion_service)
 
4
 
5
  from ctp_slack_bot.core.config import Settings
6
  from ctp_slack_bot.db.mongo_db import MongoDB
7
+ from ctp_slack_bot.db.repositories import MongoVectorizedChunkRepository
8
  from ctp_slack_bot.services.answer_retrieval_service import AnswerRetrievalService
9
  from ctp_slack_bot.services.content_ingestion_service import ContentIngestionService
10
  from ctp_slack_bot.services.context_retrieval_service import ContextRetrievalService
 
21
  settings = Singleton(Settings)
22
  event_brokerage_service = Singleton(EventBrokerageService)
23
  slack_bolt_app = Factory(AsyncApp, token=settings.provided.SLACK_BOT_TOKEN().get_secret_value())
24
+ mongo_db = Singleton(MongoDB, settings=settings) # TODO: generalize to any database.
25
+ vectorized_chunk_repository = Singleton(
26
+ MongoVectorizedChunkRepository,
27
+ mongo_db=mongo_db
28
+ )
 
29
  vector_database_service = Singleton(VectorDatabaseService, settings=settings, mongo_db=mongo_db)
30
  embeddings_model_service = Singleton(EmbeddingsModelService, settings=settings)
31
  vectorization_service = Singleton(VectorizationService, settings=settings, embeddings_model_service=embeddings_model_service)
 
35
  answer_retrieval_service = Singleton(AnswerRetrievalService, settings=settings, event_brokerage_service=event_brokerage_service, language_model_service=language_model_service)
36
  question_dispatch_service = Singleton(QuestionDispatchService, settings=settings, event_brokerage_service=event_brokerage_service, content_ingestion_service=content_ingestion_service, context_retrieval_service=context_retrieval_service, answer_retrieval_service=answer_retrieval_service)
37
  slack_service = Singleton(SlackService, event_brokerage_service=event_brokerage_service, slack_bolt_app=slack_bolt_app)
38
+ primordial_services = List(settings, event_brokerage_service, slack_bolt_app, slack_service, vectorized_chunk_repository, question_dispatch_service, content_ingestion_service)
src/ctp_slack_bot/db/mongo_db.py CHANGED
@@ -1,131 +1,102 @@
1
- #from motor.motor_asyncio import AsyncIOMotorClient
 
2
  from loguru import logger
3
  from pydantic import BaseModel, model_validator, PrivateAttr
4
- #from pymongo import IndexModel, ASCENDING
5
- from typing import Optional, Self
6
 
7
  from ctp_slack_bot.core.config import Settings
8
 
9
  class MongoDB(BaseModel):
10
  """
11
- MongoDB connection and initialization class.
12
- Handles connection to MongoDB, database selection, and index creation.
13
  """
14
-
15
  settings: Settings
16
  _client: PrivateAttr = PrivateAttr()
17
  _db: PrivateAttr = PrivateAttr()
18
- _vector_collection: PrivateAttr = PrivateAttr()
19
- _initialized: PrivateAttr = PrivateAttr()
20
-
21
- def __init__(self: Self, **data) -> None:
22
- super().__init__(**data)
23
- self._client = None
24
- self._db = None
25
- self._vector_collection = None
26
- self._initialized = False
27
-
28
  @model_validator(mode='after')
29
  def post_init(self: Self) -> Self:
 
 
30
  logger.debug("Created {}", self.__class__.__name__)
31
  return self
32
-
33
- # async def connect(self):
34
- # """
35
- # Connect to MongoDB using connection string from settings.
36
- # """
37
- # if self.client is not None:
38
- # return
39
-
40
- # if not settings.MONGODB_URI:
41
- # raise ValueError("MONGODB_URI is not set in environment variables")
42
-
43
- # try:
44
- # # Create MongoDB connection
45
- # self.client = AsyncIOMotorClient(settings.MONGODB_URI.get_secret_value())
46
- # self.db = self.client[settings.MONGODB_DB_NAME]
47
- # self.vector_collection = self.db["vector_store"]
48
- # logger.info(f"Connected to MongoDB: {settings.MONGODB_DB_NAME}")
49
- # except Exception as e:
50
- # logger.error(f"Error connecting to MongoDB: {str(e)}")
51
- # raise
52
-
53
- # async def initialize(self):
54
- # """
55
- # Initialize MongoDB with required collections and indexes.
56
- # """
57
- # if self.initialized:
58
- # return
59
-
60
- # if not self.client:
61
- # await self.connect()
62
-
63
- # try:
64
- # # Create vector index for similarity search
65
- # await self.create_vector_index()
66
- # self.initialized = True
67
- # logger.info("MongoDB initialized successfully")
68
- # except Exception as e:
69
- # logger.error(f"Error initializing MongoDB: {str(e)}")
70
- # raise
71
-
72
- # async def create_vector_index(self):
73
- # """
74
- # Create vector index for similarity search using MongoDB Atlas Vector Search.
75
- # """
76
- # try:
77
- # # Check if index already exists
78
- # existing_indexes = await self.vector_collection.list_indexes().to_list(length=None)
79
- # index_names = [index.get('name') for index in existing_indexes]
80
 
81
- # if "vector_index" not in index_names:
82
- # # Create vector search index
83
- # index_definition = {
84
- # "mappings": {
85
- # "dynamic": True,
86
- # "fields": {
87
- # "embedding": {
88
- # "dimensions": settings.VECTOR_DIMENSION,
89
- # "similarity": "cosine",
90
- # "type": "knnVector"
91
- # }
92
- # }
93
- # }
94
- # }
95
-
96
- # # Create the index
97
- # await self.db.command({
98
- # "createIndexes": self.vector_collection.name,
99
- # "indexes": [
100
- # {
101
- # "name": "vector_index",
102
- # "key": {"embedding": "vector"},
103
- # "weights": {"embedding": 1},
104
- # "vectorSearchOptions": index_definition
105
- # }
106
- # ]
107
- # })
108
-
109
- # # Create additional metadata indexes for filtering
110
- # await self.vector_collection.create_index([("metadata.source", ASCENDING)])
111
- # await self.vector_collection.create_index([("metadata.timestamp", ASCENDING)])
112
-
113
- # logger.info("Vector search index created")
114
- # else:
115
- # logger.info("Vector search index already exists")
116
-
117
- # except Exception as e:
118
- # logger.error(f"Error creating vector index: {str(e)}")
119
- # raise
120
-
121
- # async def close(self):
122
- # """
123
- # Close MongoDB connection.
124
- # """
125
- # if self.client:
126
- # self.client.close()
127
- # self.client = None
128
- # self.db = None
129
- # self.vector_collection = None
130
- # self.initialized = False
131
- # logger.info("MongoDB connection closed")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from motor.motor_asyncio import AsyncIOMotorClient
2
+ from pymongo.errors import ConnectionFailure, ServerSelectionTimeoutError
3
  from loguru import logger
4
  from pydantic import BaseModel, model_validator, PrivateAttr
5
+ from typing import Any, Dict, Optional, Self
6
+ import asyncio
7
 
8
  from ctp_slack_bot.core.config import Settings
9
 
10
  class MongoDB(BaseModel):
11
  """
12
+ MongoDB connection manager using Motor for async operations.
 
13
  """
 
14
  settings: Settings
15
  _client: PrivateAttr = PrivateAttr()
16
  _db: PrivateAttr = PrivateAttr()
17
+
18
+ class Config:
19
+ arbitrary_types_allowed = True
20
+
 
 
 
 
 
 
21
  @model_validator(mode='after')
22
  def post_init(self: Self) -> Self:
23
+ """Initialize MongoDB connection after model creation."""
24
+ self._initialize_client()
25
  logger.debug("Created {}", self.__class__.__name__)
26
  return self
27
+
28
+ def _initialize_client(self: Self) -> None:
29
+ """Initialize MongoDB client with settings."""
30
+ try:
31
+ connection_string = self.settings.MONGODB_URI.get_secret_value()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
32
 
33
+ # Create client with appropriate settings
34
+ self._client = AsyncIOMotorClient(
35
+ connection_string,
36
+ serverSelectionTimeoutMS=5000,
37
+ connectTimeoutMS=10000,
38
+ socketTimeoutMS=45000,
39
+ maxPoolSize=100,
40
+ retryWrites=True,
41
+ w="majority"
42
+ )
43
+
44
+ # Set database
45
+ db_name = self.settings.MONGODB_NAME
46
+
47
+ self._db = self._client[db_name]
48
+ logger.debug("MongoDB client initialized for database: {}", db_name)
49
+
50
+ except Exception as e:
51
+ logger.error("Failed to initialize MongoDB client: {}", e)
52
+ self._client = None
53
+ self._db = None
54
+
55
+ @property
56
+ def client(self: Self) -> AsyncIOMotorClient:
57
+ """Get the MongoDB client instance."""
58
+ if not self._client:
59
+ logger.warning("MongoDB client not initialized. Attempting to initialize.")
60
+ self._initialize_client()
61
+ if not self._client:
62
+ raise ConnectionError("Failed to initialize MongoDB client")
63
+ return self._client
64
+
65
+ @property
66
+ def db(self: Self) -> Any:
67
+ """Get the MongoDB database instance."""
68
+ if not self._db:
69
+ logger.warning("MongoDB database not initialized. Attempting to initialize client.")
70
+ self._initialize_client()
71
+ if not self._db:
72
+ raise ConnectionError("Failed to initialize MongoDB database")
73
+ return self._db
74
+
75
+ async def ping(self: Self) -> bool:
76
+ """Check if MongoDB connection is alive."""
77
+ try:
78
+ await self.client.admin.command('ping')
79
+ return True
80
+ except (ConnectionFailure, ServerSelectionTimeoutError) as e:
81
+ logger.error("MongoDB connection failed: {}", e)
82
+ return False
83
+
84
+ async def get_collection(self: Self, name: str) -> Any:
85
+ """Get a collection by name with validation."""
86
+ if not await self.ping():
87
+ raise ConnectionError("MongoDB connection is not available")
88
+ return self.db[name]
89
+
90
+ async def create_indexes(self: Self, collection_name: str, indexes: list) -> None:
91
+ """Create indexes on a collection."""
92
+ collection = await self.get_collection(collection_name)
93
+ await collection.create_indexes(indexes)
94
+ logger.info("Created indexes for collection {}: {}", collection_name, indexes)
95
+
96
+ async def close(self: Self) -> None:
97
+ """Close MongoDB connection."""
98
+ if self._client:
99
+ self._client.close()
100
+ logger.info("MongoDB connection closed")
101
+ self._client = None
102
+ self._db = None
src/ctp_slack_bot/db/repositories/__init__.py CHANGED
@@ -0,0 +1,2 @@
 
 
 
1
+ from ctp_slack_bot.db.repositories.mongo_db_vectorized_chunk_repository import MongoVectorizedChunkRepository
2
+ from ctp_slack_bot.db.repositories.vectorized_chunk_repository import VectorizedChunkRepository
src/ctp_slack_bot/db/repositories/mongo_db_vectorized_chunk_repository.py ADDED
@@ -0,0 +1,65 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from typing import List, Optional, Dict, Any
2
+ import pymongo
3
+ from bson import ObjectId
4
+
5
+ from ctp_slack_bot.db import MongoDB
6
+ from ctp_slack_bot.db.repositories.vectorized_chunk_repository import VectorizedChunkRepository
7
+ from ctp_slack_bot.models.base import VectorizedChunk
8
+
9
+ class MongoVectorizedChunkRepository(VectorizedChunkRepository):
10
+ """MongoDB implementation of VectorizedChunkRepository."""
11
+
12
+ def __init__(self, mongo_db: MongoDB):
13
+ self.mongo_db = mongo_db
14
+ self.collection = self.mongo_db.client.get_database().get_collection("vectorized_chunks")
15
+
16
+ # Create indexes for efficient queries
17
+ self.collection.create_index("chunk_id")
18
+ self.collection.create_index("parent_id")
19
+
20
+ async def find_by_id(self, id: str) -> Optional[VectorizedChunk]:
21
+ doc = await self.collection.find_one({"_id": ObjectId(id)})
22
+ return self._map_to_entity(doc) if doc else None
23
+
24
+ async def find_all(self) -> List[VectorizedChunk]:
25
+ cursor = self.collection.find({})
26
+ return [self._map_to_entity(doc) async for doc in cursor]
27
+
28
+ async def find_by_parent_id(self, parent_id: str) -> List[VectorizedChunk]:
29
+ cursor = self.collection.find({"parent_id": parent_id})
30
+ return [self._map_to_entity(doc) async for doc in cursor]
31
+
32
+ async def save(self, chunk: VectorizedChunk) -> VectorizedChunk:
33
+ doc = self._map_to_document(chunk)
34
+
35
+ if "_id" in doc and doc["_id"]:
36
+ # Update existing document
37
+ await self.collection.replace_one({"_id": doc["_id"]}, doc)
38
+ else:
39
+ # Insert new document
40
+ result = await self.collection.insert_one(doc)
41
+ doc["_id"] = result.inserted_id
42
+
43
+ return self._map_to_entity(doc)
44
+
45
+ async def delete(self, id: str) -> bool:
46
+ result = await self.collection.delete_one({"_id": ObjectId(id)})
47
+ return result.deleted_count > 0
48
+
49
+ async def find_by_metadata(self, metadata_query: Dict[str, Any]) -> List[VectorizedChunk]:
50
+ # Convert the metadata query to MongoDB query format
51
+ query = {f"metadata.{k}": v for k, v in metadata_query.items()}
52
+ cursor = self.collection.find(query)
53
+ return [self._map_to_entity(doc) async for doc in cursor]
54
+
55
+ def _map_to_document(self, chunk: VectorizedChunk) -> Dict[str, Any]:
56
+ """Convert a VectorizedChunk to a MongoDB document."""
57
+ doc = chunk.model_dump()
58
+ # Handle any special conversions needed
59
+ return doc
60
+
61
+ def _map_to_entity(self, doc: Dict[str, Any]) -> VectorizedChunk:
62
+ """Convert a MongoDB document to a VectorizedChunk."""
63
+ if "_id" in doc:
64
+ doc["id"] = str(doc.pop("_id"))
65
+ return VectorizedChunk(**doc)
src/ctp_slack_bot/db/repositories/vectorized_chunk_repository.py ADDED
@@ -0,0 +1,30 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from typing import List, Optional, Dict, Any
2
+
3
+ from ctp_slack_bot.models.base import VectorizedChunk
4
+
5
+ class VectorizedChunkRepository:
6
+ """Repository interface for VectorizedChunk entities."""
7
+
8
+ async def find_by_id(self, id: str) -> Optional[VectorizedChunk]:
9
+ """Find a chunk by its ID."""
10
+ pass
11
+
12
+ async def find_all(self) -> List[VectorizedChunk]:
13
+ """Find all chunks."""
14
+ pass
15
+
16
+ async def find_by_parent_id(self, parent_id: str) -> List[VectorizedChunk]:
17
+ """Find chunks by parent document ID."""
18
+ pass
19
+
20
+ async def save(self, chunk: VectorizedChunk) -> VectorizedChunk:
21
+ """Save a chunk to the database."""
22
+ pass
23
+
24
+ async def delete(self, id: str) -> bool:
25
+ """Delete a chunk by its ID."""
26
+ pass
27
+
28
+ async def find_by_metadata(self, metadata_query: Dict[str, Any]) -> List[VectorizedChunk]:
29
+ """Find chunks by metadata criteria."""
30
+ pass