File size: 5,736 Bytes
4f2cb88
 
c6a2a56
9fd6e20
4f2cb88
 
c6a2a56
 
 
 
 
4f2cb88
c6a2a56
 
9fd6e20
 
4f2cb88
 
 
 
c6a2a56
 
4f2cb88
 
c6a2a56
 
4f2cb88
 
 
 
 
c6a2a56
4f2cb88
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6445846
4f2cb88
 
 
 
6445846
4f2cb88
 
6445846
4f2cb88
 
 
 
 
 
6445846
4f2cb88
 
6445846
4f2cb88
 
 
 
 
 
 
 
 
 
 
 
 
06d7b2d
 
 
 
4f2cb88
 
06d7b2d
 
 
 
 
 
 
 
4f2cb88
 
06d7b2d
 
 
 
 
 
4f2cb88
06d7b2d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
4f2cb88
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
from motor.motor_asyncio import AsyncIOMotorClient
from pymongo.errors import ConnectionFailure, ServerSelectionTimeoutError
from loguru import logger
from pydantic import BaseModel, model_validator, PrivateAttr
from typing import Any, Dict, Optional, Self
import asyncio

from ctp_slack_bot.core.config import Settings

class MongoDB(BaseModel):
    """
    MongoDB connection manager using Motor for async operations.
    """
    settings: Settings
    _client: PrivateAttr = PrivateAttr()
    _db: PrivateAttr = PrivateAttr()
    
    class Config:
        arbitrary_types_allowed = True
    
    @model_validator(mode='after')
    def post_init(self: Self) -> Self:
        """Initialize MongoDB connection after model creation."""
        self._initialize_client()
        logger.debug("Created {}", self.__class__.__name__)
        return self
    
    def _initialize_client(self: Self) -> None:
        """Initialize MongoDB client with settings."""
        try:
            connection_string = self.settings.MONGODB_URI.get_secret_value()
            
            # Create client with appropriate settings
            self._client = AsyncIOMotorClient(
                connection_string,
                serverSelectionTimeoutMS=5000,
                connectTimeoutMS=10000,
                socketTimeoutMS=45000,
                maxPoolSize=100,
                retryWrites=True,
                w="majority"
            )
            
            # Set database
            db_name = self.settings.MONGODB_NAME
            
            self._db = self._client[db_name]
            logger.debug("MongoDB client initialized for database: {}", db_name)
        
        except Exception as e:
            logger.error("Failed to initialize MongoDB client: {}", e)
            self._client = None
            self._db = None
            raise
    
    @property
    def client(self: Self) -> AsyncIOMotorClient:
        """Get the MongoDB client instance."""
        if self._client is None:
            logger.warning("MongoDB client not initialized. Attempting to initialize.")
            self._initialize_client()
            if self._client is None:
                raise ConnectionError("Failed to initialize MongoDB client")
        return self._client
    
    @property
    def db(self: Self) -> Any:
        """Get the MongoDB database instance."""
        if self._db is None:
            logger.warning("MongoDB database not initialized. Attempting to initialize client.")
            self._initialize_client()
            if self._db is None:
                raise ConnectionError("Failed to initialize MongoDB database")
        return self._db
    
    async def ping(self: Self) -> bool:
        """Check if MongoDB connection is alive."""
        try:
            await self.client.admin.command('ping')
            return True
        except (ConnectionFailure, ServerSelectionTimeoutError) as e:
            logger.error("MongoDB connection failed: {}", e)
            return False
    
    async def get_collection(self: Self, name: str) -> Any:
        """
        Get a collection by name with validation.
        Creates the collection if it doesn't exist.
        """
        if not await self.ping():
            raise ConnectionError("MongoDB connection is not available")
        
        # Get all collection names to check if this one exists
        collection_names = await self.db.list_collection_names()
        if name not in collection_names:
            logger.info(f"Collection {name} does not exist. Creating it.")
            # Create the collection
            await self.db.create_collection(name)
            
        return self.db[name]
    
    async def create_indexes(self: Self, collection_name: str, indexes: list = None) -> None:
        """
        Create indexes on a collection.
        If no indexes provided and collection needs vector search capability,
        creates a vector search index using config settings.
        """
        collection = await self.get_collection(collection_name)
        
        if indexes:
            await collection.create_indexes(indexes)
            logger.info("Created custom indexes for collection {}: {}", collection_name, indexes)
        
        else: # Create vector search index using settings from config
            try:
                # Create the vector search index with the proper MongoDB format
                vector_search_index = {
                    "mappings": {
                        "dynamic": True,
                        "fields": {
                            "embedding": {
                                "type": "knnVector",
                                "dimensions": self.settings.VECTOR_DIMENSION,
                                "similarity": "cosine"
                            }
                        }
                    }
                }
                
                # Using createSearchIndex command which is the proper way to create vector search indexes
                await self.db.command({
                    "createSearchIndex": collection_name,
                    "name": f"{collection_name}_vector_index",
                    "definition": vector_search_index
                })
                
                logger.info("Created vector search index for collection {}", collection_name)
            except Exception as e:
                logger.error("Failed to create vector index: {}", e)
                raise
    
    async def close(self: Self) -> None:
        """Close MongoDB connection."""
        if self._client:
            self._client.close()
            logger.info("MongoDB connection closed")
            self._client = None
            self._db = None