File size: 4,150 Bytes
92e41ba
bb7c9a3
4f2cb88
c6a2a56
bb7c9a3
 
c6a2a56
bb7c9a3
92e41ba
c6a2a56
bb5dde5
bb7c9a3
c6a2a56
4f2cb88
c6a2a56
92e41ba
bb7c9a3
92e41ba
bb7c9a3
 
 
92e41ba
c21d29c
4f2cb88
 
bb7c9a3
92e41ba
 
bb5dde5
4f2cb88
 
 
 
 
 
 
 
 
92e41ba
bb5dde5
bb7c9a3
92e41ba
4f2cb88
 
92e41ba
4f2cb88
 
 
 
a1a6d79
92e41ba
4f2cb88
 
 
bb5dde5
92e41ba
4f2cb88
 
 
7ee11d0
 
bb5dde5
92e41ba
bb5dde5
06d7b2d
bb5dde5
06d7b2d
7ee11d0
f0fe0fd
 
 
7ee11d0
f0fe0fd
 
7ee11d0
 
f0fe0fd
a1a6d79
92e41ba
bb5dde5
 
4f2cb88
 
92e41ba
4f2cb88
 
c21d29c
bb7c9a3
 
 
 
 
 
 
 
92e41ba
 
bb7c9a3
c21d29c
 
92e41ba
c21d29c
92e41ba
 
7ee11d0
f0fe0fd
 
 
 
92e41ba
7ee11d0
 
f0fe0fd
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
from dependency_injector.resources import AsyncResource
from motor.motor_asyncio import AsyncIOMotorClient, AsyncIOMotorDatabase, AsyncIOMotorCollection
from pymongo.errors import ConnectionFailure, ServerSelectionTimeoutError
from loguru import logger
from pydantic import ConfigDict
from typing import Any, Self

from ctp_slack_bot.core import HealthReportingApplicationComponentBase, Settings
from ctp_slack_bot.utils import sanitize_mongo_db_uri


class MongoDB(HealthReportingApplicationComponentBase):
    """
    MongoDB connection manager using Motor for async operations.
    """

    model_config = ConfigDict(frozen=True)

    settings: Settings
    _client: AsyncIOMotorClient
    _db: AsyncIOMotorDatabase

    def connect(self: Self) -> None:
        """Initialize MongoDB client with settings."""
        try:
            connection_string = self.settings.mongodb_uri.get_secret_value()
            logger.debug("Connecting to MongoDB using URI: {}", sanitize_mongo_db_uri(connection_string))

            # Create client with appropriate settings.
            self._client = AsyncIOMotorClient(
                connection_string,
                serverSelectionTimeoutMS=5000,
                connectTimeoutMS=10000,
                socketTimeoutMS=45000,
                maxPoolSize=100,
                retryWrites=True,
                w="majority"
            )

            # Get the database name.
            db_name = self.settings.mongodb_name

            self._db = self._client[db_name]
            logger.debug("MongoDB client initialized for database: {}", db_name)

        except Exception as e:
            logger.error("Failed to initialize MongoDB client: {}", e)
            self._client = None
            self._db = None
            raise e

    async def ping(self: Self) -> bool:
        """Check if MongoDB connection is alive."""
        try:
            await self._client.admin.command("ping")
            logger.debug("MongoDB connection is active!")
            return True
        except (ConnectionFailure, ServerSelectionTimeoutError) as e:
            logger.error("MongoDB connection failed: {}", e)
        except Exception as e:
            logger.error("Unexpected error during MongoDB ping: {}", e)
        return False

    async def get_collection(self: Self, name: str) -> AsyncIOMotorCollection:
        """
        Get a collection by name or creates it if it doesn’t exist.
        """
        try:
            if name not in await self._db.list_collection_names():
                collection = await self._db.create_collection(name)
                logger.debug("Created previously nonexistent collection, {}.", name)
            else:
                collection = self._db[name]
                logger.debug("Retrieved collection, {}.", name)
            return collection
        except Exception as e:
            logger.error("Error accessing collection, {}: {}", name, e)
            raise e

    def close(self: Self) -> None:
        """Close the MongoDB connection."""
        if self._client:
            self._client.close()
            logger.info("Closed MongoDB connection.")
            self._client = None
            self._db = None

    @property
    def name(self: Self) -> str:
        return "mongo_db"

    async def is_healthy(self: Self) -> bool:
        return await self.ping()


class MongoDBResource(AsyncResource):
    async def init(self: Self, settings: Settings) -> MongoDB:
        logger.info("Initializing MongoDB connection for database: {}", settings.mongodb_name)
        mongo_db = MongoDB(settings=settings)
        mongo_db.connect()
        await self._test_connection(mongo_db)
        return mongo_db

    async def _test_connection(self: Self, mongo_db: MongoDB) -> None:
        """Test MongoDB connection and log the result."""
        if await mongo_db.ping():
            logger.info("MongoDB connection test successful!")
        else:
            logger.error("MongoDB connection test failed!")

    async def shutdown(self: Self, mongo_db: MongoDB) -> None:
        """Close MongoDB connection on shutdown."""
        mongo_db.close()