File size: 5,045 Bytes
92e41ba
bb5dde5
4f2cb88
c6a2a56
c21d29c
bb5dde5
c6a2a56
 
92e41ba
c6a2a56
bb5dde5
c6a2a56
 
4f2cb88
c6a2a56
 
9fd6e20
 
92e41ba
4f2cb88
bb5dde5
4f2cb88
92e41ba
c21d29c
 
c6a2a56
92e41ba
c21d29c
4f2cb88
 
 
92e41ba
 
bb5dde5
4f2cb88
 
 
 
 
 
 
 
 
92e41ba
bb5dde5
4f2cb88
92e41ba
4f2cb88
 
92e41ba
4f2cb88
 
 
 
6445846
92e41ba
4f2cb88
 
 
bb5dde5
92e41ba
4f2cb88
 
 
7ee11d0
 
bb5dde5
92e41ba
bb5dde5
06d7b2d
bb5dde5
06d7b2d
bb5dde5
4f2cb88
92e41ba
 
 
7ee11d0
bb5dde5
92e41ba
bb5dde5
06d7b2d
7ee11d0
92e41ba
bb5dde5
 
 
92e41ba
7ee11d0
92e41ba
 
bb5dde5
 
7ee11d0
 
 
 
92e41ba
bb5dde5
 
4f2cb88
 
92e41ba
4f2cb88
 
c21d29c
92e41ba
 
7ee11d0
c21d29c
 
92e41ba
c21d29c
92e41ba
 
7ee11d0
 
 
 
 
 
 
 
 
92e41ba
 
7ee11d0
 
 
bb5dde5
7ee11d0
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
from dependency_injector.resources import AsyncResource
from motor.motor_asyncio import AsyncIOMotorClient, AsyncIOMotorCollection
from pymongo.errors import ConnectionFailure, ServerSelectionTimeoutError
from loguru import logger
from pydantic import BaseModel, PrivateAttr
from typing import Any, Dict, Self

from ctp_slack_bot.core.config import Settings
from ctp_slack_bot.utils import sanitize_mongo_db_uri


class MongoDB(BaseModel):
    """
    MongoDB connection manager using Motor for async operations.
    """
    settings: Settings
    _client: PrivateAttr = PrivateAttr()
    _db: PrivateAttr = PrivateAttr()

    class Config:
        frozen=True
        arbitrary_types_allowed = True

    def __init__(self: Self, **data: Dict[str, Any]) -> None:
        super().__init__(**data)
        logger.debug("Created {}", self.__class__.__name__)

    def connect(self: Self) -> None:
        """Initialize MongoDB client with settings."""
        try:
            connection_string = self.settings.MONGODB_URI.get_secret_value()
            logger.debug("Connecting to MongoDB using URI: {}", sanitize_mongo_db_uri(connection_string))

            # Create client with appropriate settings.
            self._client = AsyncIOMotorClient(
                connection_string,
                serverSelectionTimeoutMS=5000,
                connectTimeoutMS=10000,
                socketTimeoutMS=45000,
                maxPoolSize=100,
                retryWrites=True,
                w="majority"
            )

            # Get the database name.
            db_name = self.settings.MONGODB_NAME

            self._db = self._client[db_name]
            logger.debug("MongoDB client initialized for database: {}", db_name)

        except Exception as e:
            logger.error("Failed to initialize MongoDB client: {}", e)
            self._client = None
            self._db = None
            raise

    async def ping(self: Self) -> bool:
        """Check if MongoDB connection is alive."""
        try:
            await self._client.admin.command("ping")
            logger.debug("MongoDB connection is active!")
            return True
        except (ConnectionFailure, ServerSelectionTimeoutError) as e:
            logger.error("MongoDB connection failed: {}", e)
        except Exception as e:
            logger.error("Unexpected error during MongoDB ping: {}", e)
        return False

    async def get_collection(self: Self, name: str) -> AsyncIOMotorCollection:
        """
        Get a collection by name or creates it if it doesn’t exist.
        """
        # First ensure we can connect at all.
        if not await self.ping():
            logger.error("Cannot get collection '{}' because a MongoDB connection is not available.", name)
            raise ConnectionError("MongoDB connection is not available.")

        try:
            # Get all collection names to check if this one exists.
            logger.debug("Checking if collection '{}' exists…", name)
            collection_names = await self._db.list_collection_names()
            
            if name not in collection_names:
                logger.info("Collection '{}' does not exist. Creating it…", name)

                # Create the collection.
                await self._db.create_collection(name)
                logger.debug("Successfully created collection: {}", name)
            else:
                logger.debug("Collection '{}' already exists!", name)

            # Get and return the collection.
            collection = self._db[name]
            return collection
        except Exception as e:
            logger.error("Error accessing collection '{}': {}", name, e)
            raise

    def close(self: Self) -> None:
        """Close the MongoDB connection."""
        if self._client:
            self._client.close()
            logger.info("Closed MongoDB connection.")
            self._client = None
            self._db = None

class MongoDBResource(AsyncResource):
    async def init(self: Self, settings: Settings) -> MongoDB:
        logger.info("Initializing MongoDB connection for database: {}", settings.MONGODB_NAME)
        mongo_db = MongoDB(settings=settings)
        mongo_db.connect()
        await self._test_connection(mongo_db)
        return mongo_db

    async def _test_connection(self: Self, mongo_db: MongoDB) -> None:
        """Test MongoDB connection and log the result."""
        try:
            is_connected = await mongo_db.ping()
            if is_connected:
                logger.info("MongoDB connection test successful!")
            else:
                logger.error("MongoDB connection test failed!")
        except Exception as e:
            logger.error("Error testing MongoDB connection: {}", e)
            raise

    async def shutdown(self: Self, mongo_db: MongoDB) -> None:
        """Close MongoDB connection on shutdown."""
        try:
            mongo_db.close()
        except Exception as e:
            logger.error("Error closing MongoDB connection: {}", e)