Spaces:
Runtime error
Runtime error
File size: 5,045 Bytes
92e41ba bb5dde5 4f2cb88 c6a2a56 c21d29c bb5dde5 c6a2a56 92e41ba c6a2a56 bb5dde5 c6a2a56 4f2cb88 c6a2a56 9fd6e20 92e41ba 4f2cb88 bb5dde5 4f2cb88 92e41ba c21d29c c6a2a56 92e41ba c21d29c 4f2cb88 92e41ba bb5dde5 4f2cb88 92e41ba bb5dde5 4f2cb88 92e41ba 4f2cb88 92e41ba 4f2cb88 6445846 92e41ba 4f2cb88 bb5dde5 92e41ba 4f2cb88 7ee11d0 bb5dde5 92e41ba bb5dde5 06d7b2d bb5dde5 06d7b2d bb5dde5 4f2cb88 92e41ba 7ee11d0 bb5dde5 92e41ba bb5dde5 06d7b2d 7ee11d0 92e41ba bb5dde5 92e41ba 7ee11d0 92e41ba bb5dde5 7ee11d0 92e41ba bb5dde5 4f2cb88 92e41ba 4f2cb88 c21d29c 92e41ba 7ee11d0 c21d29c 92e41ba c21d29c 92e41ba 7ee11d0 92e41ba 7ee11d0 bb5dde5 7ee11d0 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 |
from dependency_injector.resources import AsyncResource
from motor.motor_asyncio import AsyncIOMotorClient, AsyncIOMotorCollection
from pymongo.errors import ConnectionFailure, ServerSelectionTimeoutError
from loguru import logger
from pydantic import BaseModel, PrivateAttr
from typing import Any, Dict, Self
from ctp_slack_bot.core.config import Settings
from ctp_slack_bot.utils import sanitize_mongo_db_uri
class MongoDB(BaseModel):
"""
MongoDB connection manager using Motor for async operations.
"""
settings: Settings
_client: PrivateAttr = PrivateAttr()
_db: PrivateAttr = PrivateAttr()
class Config:
frozen=True
arbitrary_types_allowed = True
def __init__(self: Self, **data: Dict[str, Any]) -> None:
super().__init__(**data)
logger.debug("Created {}", self.__class__.__name__)
def connect(self: Self) -> None:
"""Initialize MongoDB client with settings."""
try:
connection_string = self.settings.MONGODB_URI.get_secret_value()
logger.debug("Connecting to MongoDB using URI: {}", sanitize_mongo_db_uri(connection_string))
# Create client with appropriate settings.
self._client = AsyncIOMotorClient(
connection_string,
serverSelectionTimeoutMS=5000,
connectTimeoutMS=10000,
socketTimeoutMS=45000,
maxPoolSize=100,
retryWrites=True,
w="majority"
)
# Get the database name.
db_name = self.settings.MONGODB_NAME
self._db = self._client[db_name]
logger.debug("MongoDB client initialized for database: {}", db_name)
except Exception as e:
logger.error("Failed to initialize MongoDB client: {}", e)
self._client = None
self._db = None
raise
async def ping(self: Self) -> bool:
"""Check if MongoDB connection is alive."""
try:
await self._client.admin.command("ping")
logger.debug("MongoDB connection is active!")
return True
except (ConnectionFailure, ServerSelectionTimeoutError) as e:
logger.error("MongoDB connection failed: {}", e)
except Exception as e:
logger.error("Unexpected error during MongoDB ping: {}", e)
return False
async def get_collection(self: Self, name: str) -> AsyncIOMotorCollection:
"""
Get a collection by name or creates it if it doesn’t exist.
"""
# First ensure we can connect at all.
if not await self.ping():
logger.error("Cannot get collection '{}' because a MongoDB connection is not available.", name)
raise ConnectionError("MongoDB connection is not available.")
try:
# Get all collection names to check if this one exists.
logger.debug("Checking if collection '{}' exists…", name)
collection_names = await self._db.list_collection_names()
if name not in collection_names:
logger.info("Collection '{}' does not exist. Creating it…", name)
# Create the collection.
await self._db.create_collection(name)
logger.debug("Successfully created collection: {}", name)
else:
logger.debug("Collection '{}' already exists!", name)
# Get and return the collection.
collection = self._db[name]
return collection
except Exception as e:
logger.error("Error accessing collection '{}': {}", name, e)
raise
def close(self: Self) -> None:
"""Close the MongoDB connection."""
if self._client:
self._client.close()
logger.info("Closed MongoDB connection.")
self._client = None
self._db = None
class MongoDBResource(AsyncResource):
async def init(self: Self, settings: Settings) -> MongoDB:
logger.info("Initializing MongoDB connection for database: {}", settings.MONGODB_NAME)
mongo_db = MongoDB(settings=settings)
mongo_db.connect()
await self._test_connection(mongo_db)
return mongo_db
async def _test_connection(self: Self, mongo_db: MongoDB) -> None:
"""Test MongoDB connection and log the result."""
try:
is_connected = await mongo_db.ping()
if is_connected:
logger.info("MongoDB connection test successful!")
else:
logger.error("MongoDB connection test failed!")
except Exception as e:
logger.error("Error testing MongoDB connection: {}", e)
raise
async def shutdown(self: Self, mongo_db: MongoDB) -> None:
"""Close MongoDB connection on shutdown."""
try:
mongo_db.close()
except Exception as e:
logger.error("Error closing MongoDB connection: {}", e)
|