File size: 7,904 Bytes
92e41ba
 
4f2cb88
 
7ee11d0
c6a2a56
c21d29c
4f2cb88
c6a2a56
 
92e41ba
c6a2a56
 
 
4f2cb88
c6a2a56
 
9fd6e20
 
92e41ba
4f2cb88
 
92e41ba
c21d29c
 
c6a2a56
92e41ba
c21d29c
4f2cb88
 
 
92e41ba
 
4f2cb88
 
 
 
 
 
 
 
 
 
92e41ba
4f2cb88
 
92e41ba
4f2cb88
 
92e41ba
4f2cb88
 
 
 
6445846
92e41ba
4f2cb88
 
 
7ee11d0
92e41ba
c21d29c
7ee11d0
92e41ba
4f2cb88
92e41ba
4f2cb88
 
 
7ee11d0
92e41ba
c21d29c
7ee11d0
92e41ba
4f2cb88
92e41ba
4f2cb88
 
 
7ee11d0
 
92e41ba
7ee11d0
 
92e41ba
4f2cb88
 
 
 
7ee11d0
 
 
92e41ba
4f2cb88
06d7b2d
 
 
 
7ee11d0
4f2cb88
92e41ba
 
 
7ee11d0
 
92e41ba
7ee11d0
06d7b2d
7ee11d0
92e41ba
7ee11d0
 
92e41ba
7ee11d0
92e41ba
 
7ee11d0
 
 
 
 
 
92e41ba
7ee11d0
06d7b2d
7ee11d0
 
 
 
06d7b2d
4f2cb88
92e41ba
7ee11d0
 
 
 
6f98944
 
 
 
 
7ee11d0
 
06d7b2d
6f98944
7ee11d0
 
 
 
92e41ba
7ee11d0
 
92e41ba
 
7ee11d0
 
 
 
 
92e41ba
7ee11d0
06d7b2d
 
92e41ba
4f2cb88
 
 
 
92e41ba
4f2cb88
 
c21d29c
92e41ba
 
7ee11d0
c21d29c
 
92e41ba
c21d29c
92e41ba
 
7ee11d0
 
 
 
 
 
 
 
 
92e41ba
 
7ee11d0
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
from asyncio import create_task
from dependency_injector.resources import AsyncResource
from motor.motor_asyncio import AsyncIOMotorClient
from pymongo.errors import ConnectionFailure, ServerSelectionTimeoutError
from pymongo.operations import SearchIndexModel
from loguru import logger
from pydantic import BaseModel, PrivateAttr
from typing import Any, Dict, Optional, Self

from ctp_slack_bot.core.config import Settings
from ctp_slack_bot.utils import sanitize_mongo_db_uri

class MongoDB(BaseModel):
    """
    MongoDB connection manager using Motor for async operations.
    """
    settings: Settings
    _client: PrivateAttr = PrivateAttr()
    _db: PrivateAttr = PrivateAttr()

    class Config:
        arbitrary_types_allowed = True

    def __init__(self: Self, **data: Dict[str, Any]) -> None:
        super().__init__(**data)
        logger.debug("Created {}", self.__class__.__name__)

    def connect(self: Self) -> None:
        """Initialize MongoDB client with settings."""
        try:
            connection_string = self.settings.MONGODB_URI.get_secret_value()
            logger.debug("Connecting to MongoDB using URI: {}", sanitize_mongo_db_uri(connection_string))

            # Create client with appropriate settings
            self._client = AsyncIOMotorClient(
                connection_string,
                serverSelectionTimeoutMS=5000,
                connectTimeoutMS=10000,
                socketTimeoutMS=45000,
                maxPoolSize=100,
                retryWrites=True,
                w="majority"
            )

            # Set database
            db_name = self.settings.MONGODB_NAME

            self._db = self._client[db_name]
            logger.debug("MongoDB client initialized for database: {}", db_name)

        except Exception as e:
            logger.error("Failed to initialize MongoDB client: {}", e)
            self._client = None
            self._db = None
            raise

    @property
    def client(self: Self) -> AsyncIOMotorClient:
        """Get the MongoDB client instance."""
        if not hasattr(self, '_client') or self._client is None:
            logger.warning("MongoDB client not initialized. Attempting to initialize…")
            self.connect()
            if not hasattr(self, '_client') or self._client is None:
                raise ConnectionError("Failed to initialize MongoDB client.")
        return self._client

    @property
    def db(self: Self) -> Any:
        """Get the MongoDB database instance."""
        if not hasattr(self, '_db') or self._db is None:
            logger.warning("MongoDB database not initialized. Attempting to initialize client…")
            self.connect()
            if not hasattr(self, '_db') or self._db is None:
                raise ConnectionError("Failed to initialize MongoDB database.")
        return self._db

    async def ping(self: Self) -> bool:
        """Check if MongoDB connection is alive."""
        try:
            # Get client to ensure we're connected
            client = self.client

            # Try a simple ping command
            await client.admin.command('ping')
            logger.debug("MongoDB connection is active!")
            return True
        except (ConnectionFailure, ServerSelectionTimeoutError) as e:
            logger.error("MongoDB connection failed: {}", e)
            return False
        except Exception as e:
            logger.error("Unexpected error during MongoDB ping: {}", e)
            return False

    async def get_collection(self: Self, name: str) -> Any:
        """
        Get a collection by name with validation.
        Creates the collection if it doesn't exist.
        """
        # First ensure we can connect at all
        if not await self.ping():
            logger.error("Cannot get collection '{}' because a MongoDB connection is not available.", name)
            raise ConnectionError("MongoDB connection is not available.")

        try:
            # Get all collection names to check if this one exists
            logger.debug("Checking if collection '{}' exists…", name)
            collection_names = await self.db.list_collection_names()
            
            if name not in collection_names:
                logger.info("Collection '{}' does not exist. Creating it…", name)
                # Create the collection
                await self.db.create_collection(name)
                logger.debug("Successfully created collection: {}", name)
            else:
                logger.debug("Collection '{}' already exists!", name)

            # Get and return the collection
            collection = self.db[name]
            return collection
        except Exception as e:
            logger.error("Error accessing collection '{}': {}", name, e)
            raise

    async def create_indexes(self: Self, collection_name: str) -> None:
        """
        Create a vector search index on a collection.
        
        Args:
            collection_name: Name of the collection
        """
        collection = await self.get_collection(collection_name)

        try:
            # Create search index model using MongoDB's recommended approach
            search_index_model = SearchIndexModel(
                definition={
                    "fields": [
                        {
                            "type": "vector",
                            "path": "embedding",
                            "numDimensions": self.settings.VECTOR_DIMENSION,
                            "similarity": "cosine",
                            "quantization": "scalar"
                        }
                    ]
                },
                name=f"{collection_name}_vector_index",
                type="vectorSearch"
            )

            # Create the search index using the motor collection
            result = await collection.create_search_index(search_index_model)
            logger.info("Vector search index '{}' created for collection {}.", result, collection_name)

        except Exception as e:
            if "command not found" in str(e).lower():
                logger.warning("Vector search not supported by this MongoDB instance. Some functionality may be limited.")
                # Create a fallback standard index on embedding field
                await collection.create_index("embedding")
                logger.info("Created standard index on 'embedding' field as fallback.")
            else:
                logger.error("Failed to create vector index: {}", e)
                raise

    async def close(self: Self) -> None:
        """Close MongoDB connection."""
        if self._client:
            self._client.close()
            logger.info("Closed MongoDB connection.")
            self._client = None
            self._db = None

class MongoDBResource(AsyncResource):
    async def init(self: Self, settings: Settings) -> MongoDB:
        logger.info("Initializing MongoDB connection for database: {}", settings.MONGODB_NAME)
        mongo_db = MongoDB(settings=settings)
        mongo_db.connect()
        await self._test_connection(mongo_db)
        return mongo_db

    async def _test_connection(self: Self, mongo_db: MongoDB) -> None:
        """Test MongoDB connection and log the result."""
        try:
            is_connected = await mongo_db.ping()
            if is_connected:
                logger.info("MongoDB connection test successful!")
            else:
                logger.error("MongoDB connection test failed!")
        except Exception as e:
            logger.error("Error testing MongoDB connection: {}", e)
            raise

    async def shutdown(self: Self, mongo_db: MongoDB) -> None:
        """Close MongoDB connection on shutdown."""
        try:
            await mongo_db.close()
        except Exception as e:
            logger.error("Error closing MongoDB connection: {}", e)