Spaces:
Runtime error
Runtime error
File size: 5,736 Bytes
4f2cb88 c6a2a56 9fd6e20 4f2cb88 c6a2a56 4f2cb88 c6a2a56 9fd6e20 4f2cb88 c6a2a56 4f2cb88 c6a2a56 4f2cb88 c6a2a56 4f2cb88 6445846 4f2cb88 6445846 4f2cb88 6445846 4f2cb88 6445846 4f2cb88 6445846 4f2cb88 06d7b2d 4f2cb88 06d7b2d 4f2cb88 06d7b2d 4f2cb88 06d7b2d 4f2cb88 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 |
from motor.motor_asyncio import AsyncIOMotorClient
from pymongo.errors import ConnectionFailure, ServerSelectionTimeoutError
from loguru import logger
from pydantic import BaseModel, model_validator, PrivateAttr
from typing import Any, Dict, Optional, Self
import asyncio
from ctp_slack_bot.core.config import Settings
class MongoDB(BaseModel):
"""
MongoDB connection manager using Motor for async operations.
"""
settings: Settings
_client: PrivateAttr = PrivateAttr()
_db: PrivateAttr = PrivateAttr()
class Config:
arbitrary_types_allowed = True
@model_validator(mode='after')
def post_init(self: Self) -> Self:
"""Initialize MongoDB connection after model creation."""
self._initialize_client()
logger.debug("Created {}", self.__class__.__name__)
return self
def _initialize_client(self: Self) -> None:
"""Initialize MongoDB client with settings."""
try:
connection_string = self.settings.MONGODB_URI.get_secret_value()
# Create client with appropriate settings
self._client = AsyncIOMotorClient(
connection_string,
serverSelectionTimeoutMS=5000,
connectTimeoutMS=10000,
socketTimeoutMS=45000,
maxPoolSize=100,
retryWrites=True,
w="majority"
)
# Set database
db_name = self.settings.MONGODB_NAME
self._db = self._client[db_name]
logger.debug("MongoDB client initialized for database: {}", db_name)
except Exception as e:
logger.error("Failed to initialize MongoDB client: {}", e)
self._client = None
self._db = None
raise
@property
def client(self: Self) -> AsyncIOMotorClient:
"""Get the MongoDB client instance."""
if self._client is None:
logger.warning("MongoDB client not initialized. Attempting to initialize.")
self._initialize_client()
if self._client is None:
raise ConnectionError("Failed to initialize MongoDB client")
return self._client
@property
def db(self: Self) -> Any:
"""Get the MongoDB database instance."""
if self._db is None:
logger.warning("MongoDB database not initialized. Attempting to initialize client.")
self._initialize_client()
if self._db is None:
raise ConnectionError("Failed to initialize MongoDB database")
return self._db
async def ping(self: Self) -> bool:
"""Check if MongoDB connection is alive."""
try:
await self.client.admin.command('ping')
return True
except (ConnectionFailure, ServerSelectionTimeoutError) as e:
logger.error("MongoDB connection failed: {}", e)
return False
async def get_collection(self: Self, name: str) -> Any:
"""
Get a collection by name with validation.
Creates the collection if it doesn't exist.
"""
if not await self.ping():
raise ConnectionError("MongoDB connection is not available")
# Get all collection names to check if this one exists
collection_names = await self.db.list_collection_names()
if name not in collection_names:
logger.info(f"Collection {name} does not exist. Creating it.")
# Create the collection
await self.db.create_collection(name)
return self.db[name]
async def create_indexes(self: Self, collection_name: str, indexes: list = None) -> None:
"""
Create indexes on a collection.
If no indexes provided and collection needs vector search capability,
creates a vector search index using config settings.
"""
collection = await self.get_collection(collection_name)
if indexes:
await collection.create_indexes(indexes)
logger.info("Created custom indexes for collection {}: {}", collection_name, indexes)
else: # Create vector search index using settings from config
try:
# Create the vector search index with the proper MongoDB format
vector_search_index = {
"mappings": {
"dynamic": True,
"fields": {
"embedding": {
"type": "knnVector",
"dimensions": self.settings.VECTOR_DIMENSION,
"similarity": "cosine"
}
}
}
}
# Using createSearchIndex command which is the proper way to create vector search indexes
await self.db.command({
"createSearchIndex": collection_name,
"name": f"{collection_name}_vector_index",
"definition": vector_search_index
})
logger.info("Created vector search index for collection {}", collection_name)
except Exception as e:
logger.error("Failed to create vector index: {}", e)
raise
async def close(self: Self) -> None:
"""Close MongoDB connection."""
if self._client:
self._client.close()
logger.info("MongoDB connection closed")
self._client = None
self._db = None
|