Hussam commited on
Commit
b6ce87e
·
1 Parent(s): 3799925

added vectorDB and context retrieval services, vectorquery model and MongoDB initialization

Browse files
src/ctp_slack_bot/db/MongoDB.py ADDED
@@ -0,0 +1,122 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from motor.motor_asyncio import AsyncIOMotorClient
2
+ from pymongo import IndexModel, ASCENDING
3
+ import logging
4
+ from typing import Optional
5
+
6
+ from ctp_slack_bot.core.config import settings
7
+
8
+ logger = logging.getLogger(__name__)
9
+
10
+ class MongoDB:
11
+ """
12
+ MongoDB connection and initialization class.
13
+ Handles connection to MongoDB, database selection, and index creation.
14
+ """
15
+ def __init__(self):
16
+ self.client: Optional[AsyncIOMotorClient] = None
17
+ self.db = None
18
+ self.vector_collection = None
19
+ self.initialized = False
20
+
21
+ async def connect(self):
22
+ """
23
+ Connect to MongoDB using connection string from settings.
24
+ """
25
+ if self.client is not None:
26
+ return
27
+
28
+ if not settings.MONGODB_URI:
29
+ raise ValueError("MONGODB_URI is not set in environment variables")
30
+
31
+ try:
32
+ # Create MongoDB connection
33
+ self.client = AsyncIOMotorClient(settings.MONGODB_URI.get_secret_value())
34
+ self.db = self.client[settings.MONGODB_DB_NAME]
35
+ self.vector_collection = self.db["vector_store"]
36
+ logger.info(f"Connected to MongoDB: {settings.MONGODB_DB_NAME}")
37
+ except Exception as e:
38
+ logger.error(f"Error connecting to MongoDB: {str(e)}")
39
+ raise
40
+
41
+ async def initialize(self):
42
+ """
43
+ Initialize MongoDB with required collections and indexes.
44
+ """
45
+ if self.initialized:
46
+ return
47
+
48
+ if not self.client:
49
+ await self.connect()
50
+
51
+ try:
52
+ # Create vector index for similarity search
53
+ await self.create_vector_index()
54
+ self.initialized = True
55
+ logger.info("MongoDB initialized successfully")
56
+ except Exception as e:
57
+ logger.error(f"Error initializing MongoDB: {str(e)}")
58
+ raise
59
+
60
+ async def create_vector_index(self):
61
+ """
62
+ Create vector index for similarity search using MongoDB Atlas Vector Search.
63
+ """
64
+ try:
65
+ # Check if index already exists
66
+ existing_indexes = await self.vector_collection.list_indexes().to_list(length=None)
67
+ index_names = [index.get('name') for index in existing_indexes]
68
+
69
+ if "vector_index" not in index_names:
70
+ # Create vector search index
71
+ index_definition = {
72
+ "mappings": {
73
+ "dynamic": True,
74
+ "fields": {
75
+ "embedding": {
76
+ "dimensions": settings.VECTOR_DIMENSION,
77
+ "similarity": "cosine",
78
+ "type": "knnVector"
79
+ }
80
+ }
81
+ }
82
+ }
83
+
84
+ # Create the index
85
+ await self.db.command({
86
+ "createIndexes": self.vector_collection.name,
87
+ "indexes": [
88
+ {
89
+ "name": "vector_index",
90
+ "key": {"embedding": "vector"},
91
+ "weights": {"embedding": 1},
92
+ "vectorSearchOptions": index_definition
93
+ }
94
+ ]
95
+ })
96
+
97
+ # Create additional metadata indexes for filtering
98
+ await self.vector_collection.create_index([("metadata.source", ASCENDING)])
99
+ await self.vector_collection.create_index([("metadata.timestamp", ASCENDING)])
100
+
101
+ logger.info("Vector search index created")
102
+ else:
103
+ logger.info("Vector search index already exists")
104
+
105
+ except Exception as e:
106
+ logger.error(f"Error creating vector index: {str(e)}")
107
+ raise
108
+
109
+ async def close(self):
110
+ """
111
+ Close MongoDB connection.
112
+ """
113
+ if self.client:
114
+ self.client.close()
115
+ self.client = None
116
+ self.db = None
117
+ self.vector_collection = None
118
+ self.initialized = False
119
+ logger.info("MongoDB connection closed")
120
+
121
+ # Create a singleton instance
122
+ mongodb = MongoDB()
src/ctp_slack_bot/models/VectorQuery.py ADDED
@@ -0,0 +1,17 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from pydantic import BaseModel, Field, validator
2
+ from typing import Optional, List, Dict, Any
3
+ from ctp_slack_bot.core.config import settings
4
+
5
+ class VectorQuery(BaseModel):
6
+ """Model for vector database similarity search queries.
7
+
8
+ Attributes:
9
+ query_text: The text to be vectorized and used for similarity search
10
+ k: Number of similar documents to retrieve
11
+ score_threshold: Minimum similarity score threshold for inclusion in results
12
+ filter_metadata: Optional filters for metadata fields
13
+ """
14
+ query_text: str
15
+ k: int = Field(default=settings.TOP_K_MATCHES)
16
+ score_threshold: float = Field(default=0.7)
17
+ filter_metadata: Optional[Dict[str, Any]] = None
src/ctp_slack_bot/services/ContextRetrievalService.py ADDED
@@ -0,0 +1,76 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import logging
2
+ from typing import List, Dict, Any, Optional
3
+
4
+ from ctp_slack_bot.models.slack import SlackMessage
5
+ from ctp_slack_bot.models.content import RetreivedContext
6
+ from ctp_slack_bot.models.VectorQuery import VectorQuery
7
+ from ctp_slack_bot.services.VectorizationService import VectorizationService
8
+ from ctp_slack_bot.services.VectorDatabaseService import VectorDatabaseService
9
+ from ctp_slack_bot.core.config import settings
10
+
11
+ logger = logging.getLogger(__name__)
12
+
13
+ class ContextRetrievalService:
14
+ """
15
+ Service for retrieving relevant context from the vector database based on user questions.
16
+ """
17
+
18
+ def __init__(self):
19
+ self.vectorization_service = VectorizationService()
20
+ self.vector_db_service = VectorDatabaseService()
21
+
22
+ async def initialize(self):
23
+ """
24
+ Initialize the required services.
25
+ """
26
+ await self.vector_db_service.initialize()
27
+
28
+ async def get_context(self, message: SlackMessage) -> List[RetreivedContext]:
29
+ """
30
+ Retrieve relevant context for a given Slack message.
31
+
32
+ This function:
33
+ 1. Extracts the question text from the message
34
+ 2. Vectorizes the question using VectorizationService
35
+ 3. Queries VectorDatabaseService for similar context
36
+ 4. Returns the relevant context as a list of RetreivedContext objects
37
+
38
+ Args:
39
+ message: The SlackMessage containing the user's question
40
+
41
+ Returns:
42
+ List[RetreivedContext]: List of retrieved context items with similarity scores
43
+ """
44
+ if not message.is_question:
45
+ logger.debug(f"Message {message.key} is not a question, skipping context retrieval")
46
+ return []
47
+
48
+ try:
49
+ # Vectorize the message text
50
+ embeddings = self.vectorization_service.get_embeddings([message.text])
51
+ if embeddings is None or len(embeddings) == 0:
52
+ logger.error(f"Failed to generate embedding for message: {message.key}")
53
+ return []
54
+
55
+ query_embedding = embeddings[0].tolist()
56
+
57
+ # Create vector query
58
+ vector_query = VectorQuery(
59
+ query_text=message.text,
60
+ k=settings.TOP_K_MATCHES,
61
+ score_threshold=0.7 # Minimum similarity threshold
62
+ )
63
+
64
+ # Search for similar content in vector database
65
+ context_results = await self.vector_db_service.search_by_similarity(
66
+ query=vector_query,
67
+ query_embedding=query_embedding
68
+ )
69
+
70
+ logger.info(f"Retrieved {len(context_results)} context items for message: {message.key}")
71
+ return context_results
72
+
73
+ except Exception as e:
74
+ logger.error(f"Error retrieving context for message {message.key}: {str(e)}")
75
+ return []
76
+
src/ctp_slack_bot/services/VectorDatabaseService.py ADDED
@@ -0,0 +1,124 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import logging
2
+ from typing import List, Dict, Any, Optional
3
+ # import numpy as np
4
+
5
+ from ctp_slack_bot.db.MongoDB import mongodb
6
+ from ctp_slack_bot.models.VectorQuery import VectorQuery
7
+ from ctp_slack_bot.models.content import RetreivedContext
8
+
9
+ logger = logging.getLogger(__name__)
10
+
11
+ class VectorDatabaseService:
12
+ """
13
+ Service for storing and retrieving vector embeddings from MongoDB.
14
+ """
15
+
16
+ async def initialize(self):
17
+ """
18
+ Initialize the database connection.
19
+ """
20
+ await mongodb.initialize()
21
+
22
+ async def store(self, text: str, embedding: List[float], metadata: Dict[str, Any]) -> str:
23
+ """
24
+ Store text and its embedding vector in the database.
25
+
26
+ Args:
27
+ text: The text content to store
28
+ embedding: The vector embedding of the text
29
+ metadata: Additional metadata about the text (source, timestamp, etc.)
30
+
31
+ Returns:
32
+ str: The ID of the stored document
33
+ """
34
+ if not mongodb.initialized:
35
+ await mongodb.initialize()
36
+
37
+ try:
38
+ # Create document to store
39
+ document = {
40
+ "text": text,
41
+ "embedding": embedding,
42
+ "metadata": metadata
43
+ }
44
+
45
+ # Insert into collection
46
+ result = await mongodb.vector_collection.insert_one(document)
47
+ logger.debug(f"Stored document with ID: {result.inserted_id}")
48
+
49
+ return str(result.inserted_id)
50
+ except Exception as e:
51
+ logger.error(f"Error storing embedding: {str(e)}")
52
+ raise
53
+
54
+ async def search_by_similarity(self, query: VectorQuery, query_embedding: List[float]) -> List[RetreivedContext]:
55
+ """
56
+ Query the vector database for similar documents.
57
+
58
+ Args:
59
+ query: VectorQuery object with search parameters
60
+ query_embedding: The vector embedding of the query text
61
+
62
+ Returns:
63
+ List[RetreivedContext]: List of similar documents with similarity scores
64
+ """
65
+ if not mongodb.initialized:
66
+ await mongodb.initialize()
67
+
68
+ try:
69
+ # Build aggregation pipeline for vector search
70
+ pipeline = [
71
+ {
72
+ "$search": {
73
+ "index": "vector_index",
74
+ "knnBeta": {
75
+ "vector": query_embedding,
76
+ "path": "embedding",
77
+ "k": query.k
78
+ }
79
+ }
80
+ },
81
+ {
82
+ "$project": {
83
+ "_id": 0,
84
+ "text": 1,
85
+ "metadata": 1,
86
+ "score": {"$meta": "searchScore"}
87
+ }
88
+ }
89
+ ]
90
+
91
+ # Add metadata filters if provided
92
+ if query.filter_metadata:
93
+ metadata_filter = {f"metadata.{k}": v for k, v in query.filter_metadata.items()}
94
+ pipeline.insert(1, {"$match": metadata_filter})
95
+
96
+ # Execute the pipeline
97
+ results = await mongodb.vector_collection.aggregate(pipeline).to_list(length=query.k)
98
+
99
+ # Convert to RetreivedContext objects directly
100
+ context_results = []
101
+ for result in results:
102
+ # Normalize score to [0,1] range
103
+ normalized_score = result.get("score", 0)
104
+
105
+ # Skip if below threshold
106
+ if normalized_score < query.score_threshold:
107
+ continue
108
+
109
+ context_results.append(
110
+ RetreivedContext(
111
+ contextual_text=result["text"],
112
+ metadata_source=result["metadata"].get("source", "unknown"),
113
+ similarity_score=normalized_score,
114
+ said_by=result["metadata"].get("speaker", None),
115
+ in_reation_to_question=result["metadata"].get("related_question", None)
116
+ )
117
+ )
118
+
119
+ logger.debug(f"Found {len(context_results)} similar documents")
120
+ return context_results
121
+
122
+ except Exception as e:
123
+ logger.error(f"Error in similarity search: {str(e)}")
124
+ raise