Spaces:
Runtime error
Runtime error
File size: 4,150 Bytes
92e41ba bb7c9a3 4f2cb88 c6a2a56 bb7c9a3 c6a2a56 bb7c9a3 92e41ba c6a2a56 bb5dde5 bb7c9a3 c6a2a56 4f2cb88 c6a2a56 92e41ba bb7c9a3 92e41ba bb7c9a3 92e41ba c21d29c 4f2cb88 bb7c9a3 92e41ba bb5dde5 4f2cb88 92e41ba bb5dde5 bb7c9a3 92e41ba 4f2cb88 92e41ba 4f2cb88 a1a6d79 92e41ba 4f2cb88 bb5dde5 92e41ba 4f2cb88 7ee11d0 bb5dde5 92e41ba bb5dde5 06d7b2d bb5dde5 06d7b2d 7ee11d0 f0fe0fd 7ee11d0 f0fe0fd 7ee11d0 f0fe0fd a1a6d79 92e41ba bb5dde5 4f2cb88 92e41ba 4f2cb88 c21d29c bb7c9a3 92e41ba bb7c9a3 c21d29c 92e41ba c21d29c 92e41ba 7ee11d0 f0fe0fd 92e41ba 7ee11d0 f0fe0fd |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 |
from dependency_injector.resources import AsyncResource
from motor.motor_asyncio import AsyncIOMotorClient, AsyncIOMotorDatabase, AsyncIOMotorCollection
from pymongo.errors import ConnectionFailure, ServerSelectionTimeoutError
from loguru import logger
from pydantic import ConfigDict
from typing import Any, Self
from ctp_slack_bot.core import HealthReportingApplicationComponentBase, Settings
from ctp_slack_bot.utils import sanitize_mongo_db_uri
class MongoDB(HealthReportingApplicationComponentBase):
"""
MongoDB connection manager using Motor for async operations.
"""
model_config = ConfigDict(frozen=True)
settings: Settings
_client: AsyncIOMotorClient
_db: AsyncIOMotorDatabase
def connect(self: Self) -> None:
"""Initialize MongoDB client with settings."""
try:
connection_string = self.settings.mongodb_uri.get_secret_value()
logger.debug("Connecting to MongoDB using URI: {}", sanitize_mongo_db_uri(connection_string))
# Create client with appropriate settings.
self._client = AsyncIOMotorClient(
connection_string,
serverSelectionTimeoutMS=5000,
connectTimeoutMS=10000,
socketTimeoutMS=45000,
maxPoolSize=100,
retryWrites=True,
w="majority"
)
# Get the database name.
db_name = self.settings.mongodb_name
self._db = self._client[db_name]
logger.debug("MongoDB client initialized for database: {}", db_name)
except Exception as e:
logger.error("Failed to initialize MongoDB client: {}", e)
self._client = None
self._db = None
raise e
async def ping(self: Self) -> bool:
"""Check if MongoDB connection is alive."""
try:
await self._client.admin.command("ping")
logger.debug("MongoDB connection is active!")
return True
except (ConnectionFailure, ServerSelectionTimeoutError) as e:
logger.error("MongoDB connection failed: {}", e)
except Exception as e:
logger.error("Unexpected error during MongoDB ping: {}", e)
return False
async def get_collection(self: Self, name: str) -> AsyncIOMotorCollection:
"""
Get a collection by name or creates it if it doesn’t exist.
"""
try:
if name not in await self._db.list_collection_names():
collection = await self._db.create_collection(name)
logger.debug("Created previously nonexistent collection, {}.", name)
else:
collection = self._db[name]
logger.debug("Retrieved collection, {}.", name)
return collection
except Exception as e:
logger.error("Error accessing collection, {}: {}", name, e)
raise e
def close(self: Self) -> None:
"""Close the MongoDB connection."""
if self._client:
self._client.close()
logger.info("Closed MongoDB connection.")
self._client = None
self._db = None
@property
def name(self: Self) -> str:
return "mongo_db"
async def is_healthy(self: Self) -> bool:
return await self.ping()
class MongoDBResource(AsyncResource):
async def init(self: Self, settings: Settings) -> MongoDB:
logger.info("Initializing MongoDB connection for database: {}", settings.mongodb_name)
mongo_db = MongoDB(settings=settings)
mongo_db.connect()
await self._test_connection(mongo_db)
return mongo_db
async def _test_connection(self: Self, mongo_db: MongoDB) -> None:
"""Test MongoDB connection and log the result."""
if await mongo_db.ping():
logger.info("MongoDB connection test successful!")
else:
logger.error("MongoDB connection test failed!")
async def shutdown(self: Self, mongo_db: MongoDB) -> None:
"""Close MongoDB connection on shutdown."""
mongo_db.close()
|