Hussam commited on
Commit
fcc0368
·
1 Parent(s): 76a0d0a

refactored mongo init, context and vectordb services to be synchronous

Browse files
src/ctp_slack_bot/db/mongo_db.py CHANGED
@@ -1,8 +1,7 @@
1
- #from motor.motor_asyncio import AsyncIOMotorClient
2
  from loguru import logger
3
- from pydantic import BaseModel, model_validator
4
- #from pymongo import IndexModel, ASCENDING
5
- from typing import Optional, Self
6
 
7
  from ctp_slack_bot.core.config import Settings
8
 
@@ -13,115 +12,114 @@ class MongoDB(BaseModel):
13
  """
14
 
15
  settings: Settings
 
 
 
 
 
 
16
 
17
  @model_validator(mode='after')
18
  def post_init(self: Self) -> Self:
19
  logger.debug("Created {}", self.__class__.__name__)
20
  return self
21
 
22
- def __init__(self: Self, settings: Settings) -> Self:
23
- #self.client: Optional[AsyncIOMotorClient] = None
24
- #self.db = None
25
- #self.vector_collection = None
26
- #self.initialized = False
27
- pass # The above initialization needs to be done some other way.
28
 
29
- # async def connect(self):
30
- # """
31
- # Connect to MongoDB using connection string from settings.
32
- # """
33
- # if self.client is not None:
34
- # return
35
 
36
- # if not settings.MONGODB_URI:
37
- # raise ValueError("MONGODB_URI is not set in environment variables")
 
 
 
 
 
 
 
38
 
39
- # try:
40
- # # Create MongoDB connection
41
- # self.client = AsyncIOMotorClient(settings.MONGODB_URI.get_secret_value())
42
- # self.db = self.client[settings.MONGODB_DB_NAME]
43
- # self.vector_collection = self.db["vector_store"]
44
- # logger.info(f"Connected to MongoDB: {settings.MONGODB_DB_NAME}")
45
- # except Exception as e:
46
- # logger.error(f"Error connecting to MongoDB: {str(e)}")
47
- # raise
48
-
49
- # async def initialize(self):
50
- # """
51
- # Initialize MongoDB with required collections and indexes.
52
- # """
53
- # if self.initialized:
54
- # return
55
 
56
- # if not self.client:
57
- # await self.connect()
58
 
59
- # try:
60
- # # Create vector index for similarity search
61
- # await self.create_vector_index()
62
- # self.initialized = True
63
- # logger.info("MongoDB initialized successfully")
64
- # except Exception as e:
65
- # logger.error(f"Error initializing MongoDB: {str(e)}")
66
- # raise
67
 
68
- # async def create_vector_index(self):
69
- # """
70
- # Create vector index for similarity search using MongoDB Atlas Vector Search.
71
- # """
72
- # try:
73
- # # Check if index already exists
74
- # existing_indexes = await self.vector_collection.list_indexes().to_list(length=None)
75
- # index_names = [index.get('name') for index in existing_indexes]
76
 
77
- # if "vector_index" not in index_names:
78
- # # Create vector search index
79
- # index_definition = {
80
- # "mappings": {
81
- # "dynamic": True,
82
- # "fields": {
83
- # "embedding": {
84
- # "dimensions": settings.VECTOR_DIMENSION,
85
- # "similarity": "cosine",
86
- # "type": "knnVector"
87
- # }
88
- # }
89
- # }
90
- # }
91
 
92
- # # Create the index
93
- # await self.db.command({
94
- # "createIndexes": self.vector_collection.name,
95
- # "indexes": [
96
- # {
97
- # "name": "vector_index",
98
- # "key": {"embedding": "vector"},
99
- # "weights": {"embedding": 1},
100
- # "vectorSearchOptions": index_definition
101
- # }
102
- # ]
103
- # })
104
 
105
- # # Create additional metadata indexes for filtering
106
- # await self.vector_collection.create_index([("metadata.source", ASCENDING)])
107
- # await self.vector_collection.create_index([("metadata.timestamp", ASCENDING)])
108
 
109
- # logger.info("Vector search index created")
110
- # else:
111
- # logger.info("Vector search index already exists")
112
 
113
- # except Exception as e:
114
- # logger.error(f"Error creating vector index: {str(e)}")
115
- # raise
116
 
117
- # async def close(self):
118
- # """
119
- # Close MongoDB connection.
120
- # """
121
- # if self.client:
122
- # self.client.close()
123
- # self.client = None
124
- # self.db = None
125
- # self.vector_collection = None
126
- # self.initialized = False
127
- # logger.info("MongoDB connection closed")
 
1
+ from pymongo import MongoClient, ASCENDING
2
  from loguru import logger
3
+ from pydantic import BaseModel, model_validator, ConfigDict
4
+ from typing import Optional, Self, Any
 
5
 
6
  from ctp_slack_bot.core.config import Settings
7
 
 
12
  """
13
 
14
  settings: Settings
15
+ client: Optional[MongoClient] = None
16
+ db: Optional[Any] = None
17
+ vector_collection: Optional[Any] = None
18
+ initialized: bool = False
19
+
20
+ model_config = ConfigDict(arbitrary_types_allowed=True)
21
 
22
  @model_validator(mode='after')
23
  def post_init(self: Self) -> Self:
24
  logger.debug("Created {}", self.__class__.__name__)
25
  return self
26
 
27
+ def connect(self):
28
+ """
29
+ Connect to MongoDB using connection string from settings.
30
+ """
31
+ if self.client is not None:
32
+ return
33
 
34
+ if not self.settings.MONGODB_URI:
35
+ raise ValueError("MONGODB_URI is not set in environment variables")
 
 
 
 
36
 
37
+ try:
38
+ # Create MongoDB connection
39
+ self.client = MongoClient(self.settings.MONGODB_URI.get_secret_value())
40
+ self.db = self.client[self.settings.MONGODB_NAME]
41
+ self.vector_collection = self.db["vector_store"]
42
+ logger.info(f"Connected to MongoDB: {self.settings.MONGODB_NAME}")
43
+ except Exception as e:
44
+ logger.error(f"Error connecting to MongoDB: {str(e)}")
45
+ raise
46
 
47
+ def initialize(self):
48
+ """
49
+ Initialize MongoDB with required collections and indexes.
50
+ """
51
+ if self.initialized:
52
+ return
 
 
 
 
 
 
 
 
 
 
53
 
54
+ if not self.client:
55
+ self.connect()
56
 
57
+ try:
58
+ # Create vector index for similarity search
59
+ self.create_vector_index()
60
+ self.initialized = True
61
+ logger.info("MongoDB initialized successfully")
62
+ except Exception as e:
63
+ logger.error(f"Error initializing MongoDB: {str(e)}")
64
+ raise
65
 
66
+ def create_vector_index(self):
67
+ """
68
+ Create vector index for similarity search using MongoDB Atlas Vector Search.
69
+ """
70
+ try:
71
+ # Check if index already exists
72
+ existing_indexes = list(self.vector_collection.list_indexes())
73
+ index_names = [index.get('name') for index in existing_indexes]
74
 
75
+ if "vector_index" not in index_names:
76
+ # Create vector search index
77
+ index_definition = {
78
+ "mappings": {
79
+ "dynamic": True,
80
+ "fields": {
81
+ "embedding": {
82
+ "dimensions": self.settings.VECTOR_DIMENSION,
83
+ "similarity": "cosine",
84
+ "type": "knnVector"
85
+ }
86
+ }
87
+ }
88
+ }
89
 
90
+ # Create the index
91
+ self.db.command({
92
+ "createIndexes": self.vector_collection.name,
93
+ "indexes": [
94
+ {
95
+ "name": "vector_index",
96
+ "key": {"embedding": "vector"},
97
+ "weights": {"embedding": 1},
98
+ "vectorSearchOptions": index_definition
99
+ }
100
+ ]
101
+ })
102
 
103
+ # Create additional metadata indexes for filtering
104
+ self.vector_collection.create_index([("metadata.source", ASCENDING)])
105
+ self.vector_collection.create_index([("metadata.timestamp", ASCENDING)])
106
 
107
+ logger.info("Vector search index created")
108
+ else:
109
+ logger.info("Vector search index already exists")
110
 
111
+ except Exception as e:
112
+ logger.error(f"Error creating vector index: {str(e)}")
113
+ raise
114
 
115
+ def close(self):
116
+ """
117
+ Close MongoDB connection.
118
+ """
119
+ if self.client:
120
+ self.client.close()
121
+ self.client = None
122
+ self.db = None
123
+ self.vector_collection = None
124
+ self.initialized = False
125
+ logger.info("MongoDB connection closed")
src/ctp_slack_bot/services/context_retrieval_service.py CHANGED
@@ -1,6 +1,6 @@
1
  from loguru import logger
2
  from pydantic import BaseModel, model_validator
3
- from typing import Any, Dict, List, Optional, Self
4
 
5
  from ctp_slack_bot.core.config import Settings
6
  from ctp_slack_bot.models import RetreivedContext, SlackMessage, VectorQuery
@@ -21,13 +21,7 @@ class ContextRetrievalService(BaseModel):
21
  logger.debug("Created {}", self.__class__.__name__)
22
  return self
23
 
24
- async def initialize(self):
25
- """
26
- Initialize the required services.
27
- """
28
- await self.vector_database_service.initialize()
29
-
30
- async def get_context(self, message: SlackMessage) -> List[RetreivedContext]:
31
  """
32
  Retrieve relevant context for a given Slack message.
33
 
@@ -59,12 +53,12 @@ class ContextRetrievalService(BaseModel):
59
  # Create vector query
60
  vector_query = VectorQuery(
61
  query_text=message.text,
62
- k=settings.TOP_K_MATCHES,
63
  score_threshold=0.7 # Minimum similarity threshold
64
  )
65
 
66
  # Search for similar content in vector database
67
- context_results = await self.vector_database_service.search_by_similarity(
68
  query=vector_query,
69
  query_embedding=query_embedding
70
  )
 
1
  from loguru import logger
2
  from pydantic import BaseModel, model_validator
3
+ from typing import Self, List
4
 
5
  from ctp_slack_bot.core.config import Settings
6
  from ctp_slack_bot.models import RetreivedContext, SlackMessage, VectorQuery
 
21
  logger.debug("Created {}", self.__class__.__name__)
22
  return self
23
 
24
+ def get_context(self, message: SlackMessage) -> List[RetreivedContext]:
 
 
 
 
 
 
25
  """
26
  Retrieve relevant context for a given Slack message.
27
 
 
53
  # Create vector query
54
  vector_query = VectorQuery(
55
  query_text=message.text,
56
+ k=self.settings.TOP_K_MATCHES,
57
  score_threshold=0.7 # Minimum similarity threshold
58
  )
59
 
60
  # Search for similar content in vector database
61
+ context_results = self.vector_database_service.search_by_similarity(
62
  query=vector_query,
63
  query_embedding=query_embedding
64
  )
src/ctp_slack_bot/services/vector_database_service.py CHANGED
@@ -1,6 +1,6 @@
1
  from loguru import logger
2
  from pydantic import BaseModel, model_validator
3
- from typing import Any, Dict, List, Optional, Self
4
 
5
  from ctp_slack_bot.core import Settings
6
  from ctp_slack_bot.db import MongoDB
@@ -10,7 +10,7 @@ class VectorDatabaseService(BaseModel): # TODO: this should not rely specificall
10
  """
11
  Service for storing and retrieving vector embeddings from MongoDB.
12
  """
13
-
14
  settings: Settings
15
  mongo_db: MongoDB
16
 
@@ -19,13 +19,7 @@ class VectorDatabaseService(BaseModel): # TODO: this should not rely specificall
19
  logger.debug("Created {}", self.__class__.__name__)
20
  return self
21
 
22
- async def initialize(self):
23
- """
24
- Initialize the database connection.
25
- """
26
- await mongodb.initialize()
27
-
28
- async def store(self, text: str, embedding: List[float], metadata: Dict[str, Any]) -> str:
29
  """
30
  Store text and its embedding vector in the database.
31
 
@@ -37,8 +31,8 @@ class VectorDatabaseService(BaseModel): # TODO: this should not rely specificall
37
  Returns:
38
  str: The ID of the stored document
39
  """
40
- if not mongodb.initialized:
41
- await mongodb.initialize()
42
 
43
  try:
44
  # Create document to store
@@ -49,7 +43,7 @@ class VectorDatabaseService(BaseModel): # TODO: this should not rely specificall
49
  }
50
 
51
  # Insert into collection
52
- result = await mongodb.vector_collection.insert_one(document)
53
  logger.debug(f"Stored document with ID: {result.inserted_id}")
54
 
55
  return str(result.inserted_id)
@@ -57,7 +51,7 @@ class VectorDatabaseService(BaseModel): # TODO: this should not rely specificall
57
  logger.error(f"Error storing embedding: {str(e)}")
58
  raise
59
 
60
- async def search_by_similarity(self, query: VectorQuery, query_embedding: List[float]) -> List[RetreivedContext]:
61
  """
62
  Query the vector database for similar documents.
63
 
@@ -68,8 +62,8 @@ class VectorDatabaseService(BaseModel): # TODO: this should not rely specificall
68
  Returns:
69
  List[RetreivedContext]: List of similar documents with similarity scores
70
  """
71
- if not mongodb.initialized:
72
- await mongodb.initialize()
73
 
74
  try:
75
  # Build aggregation pipeline for vector search
@@ -100,7 +94,7 @@ class VectorDatabaseService(BaseModel): # TODO: this should not rely specificall
100
  pipeline.insert(1, {"$match": metadata_filter})
101
 
102
  # Execute the pipeline
103
- results = await mongodb.vector_collection.aggregate(pipeline).to_list(length=query.k)
104
 
105
  # Convert to RetreivedContext objects directly
106
  context_results = []
 
1
  from loguru import logger
2
  from pydantic import BaseModel, model_validator
3
+ from typing import Any, Dict, List, Self
4
 
5
  from ctp_slack_bot.core import Settings
6
  from ctp_slack_bot.db import MongoDB
 
10
  """
11
  Service for storing and retrieving vector embeddings from MongoDB.
12
  """
13
+
14
  settings: Settings
15
  mongo_db: MongoDB
16
 
 
19
  logger.debug("Created {}", self.__class__.__name__)
20
  return self
21
 
22
+ def store(self, text: str, embedding: List[float], metadata: Dict[str, Any]) -> str:
 
 
 
 
 
 
23
  """
24
  Store text and its embedding vector in the database.
25
 
 
31
  Returns:
32
  str: The ID of the stored document
33
  """
34
+ if not self.mongo_db.initialized:
35
+ self.mongo_db.initialize()
36
 
37
  try:
38
  # Create document to store
 
43
  }
44
 
45
  # Insert into collection
46
+ result = self.mongo_db.vector_collection.insert_one(document)
47
  logger.debug(f"Stored document with ID: {result.inserted_id}")
48
 
49
  return str(result.inserted_id)
 
51
  logger.error(f"Error storing embedding: {str(e)}")
52
  raise
53
 
54
+ def search_by_similarity(self, query: VectorQuery, query_embedding: List[float]) -> List[RetreivedContext]:
55
  """
56
  Query the vector database for similar documents.
57
 
 
62
  Returns:
63
  List[RetreivedContext]: List of similar documents with similarity scores
64
  """
65
+ if not self.mongo_db.initialized:
66
+ self.mongo_db.initialize()
67
 
68
  try:
69
  # Build aggregation pipeline for vector search
 
94
  pipeline.insert(1, {"$match": metadata_filter})
95
 
96
  # Execute the pipeline
97
+ results = list(self.mongo_db.vector_collection.aggregate(pipeline, maxTimeMS=30000))
98
 
99
  # Convert to RetreivedContext objects directly
100
  context_results = []