Hussam
revised mongo_db index creation, storing, and similarity search using the new Chunk model
06d7b2d
raw
history blame
5.74 kB
from motor.motor_asyncio import AsyncIOMotorClient
from pymongo.errors import ConnectionFailure, ServerSelectionTimeoutError
from loguru import logger
from pydantic import BaseModel, model_validator, PrivateAttr
from typing import Any, Dict, Optional, Self
import asyncio
from ctp_slack_bot.core.config import Settings
class MongoDB(BaseModel):
"""
MongoDB connection manager using Motor for async operations.
"""
settings: Settings
_client: PrivateAttr = PrivateAttr()
_db: PrivateAttr = PrivateAttr()
class Config:
arbitrary_types_allowed = True
@model_validator(mode='after')
def post_init(self: Self) -> Self:
"""Initialize MongoDB connection after model creation."""
self._initialize_client()
logger.debug("Created {}", self.__class__.__name__)
return self
def _initialize_client(self: Self) -> None:
"""Initialize MongoDB client with settings."""
try:
connection_string = self.settings.MONGODB_URI.get_secret_value()
# Create client with appropriate settings
self._client = AsyncIOMotorClient(
connection_string,
serverSelectionTimeoutMS=5000,
connectTimeoutMS=10000,
socketTimeoutMS=45000,
maxPoolSize=100,
retryWrites=True,
w="majority"
)
# Set database
db_name = self.settings.MONGODB_NAME
self._db = self._client[db_name]
logger.debug("MongoDB client initialized for database: {}", db_name)
except Exception as e:
logger.error("Failed to initialize MongoDB client: {}", e)
self._client = None
self._db = None
raise
@property
def client(self: Self) -> AsyncIOMotorClient:
"""Get the MongoDB client instance."""
if self._client is None:
logger.warning("MongoDB client not initialized. Attempting to initialize.")
self._initialize_client()
if self._client is None:
raise ConnectionError("Failed to initialize MongoDB client")
return self._client
@property
def db(self: Self) -> Any:
"""Get the MongoDB database instance."""
if self._db is None:
logger.warning("MongoDB database not initialized. Attempting to initialize client.")
self._initialize_client()
if self._db is None:
raise ConnectionError("Failed to initialize MongoDB database")
return self._db
async def ping(self: Self) -> bool:
"""Check if MongoDB connection is alive."""
try:
await self.client.admin.command('ping')
return True
except (ConnectionFailure, ServerSelectionTimeoutError) as e:
logger.error("MongoDB connection failed: {}", e)
return False
async def get_collection(self: Self, name: str) -> Any:
"""
Get a collection by name with validation.
Creates the collection if it doesn't exist.
"""
if not await self.ping():
raise ConnectionError("MongoDB connection is not available")
# Get all collection names to check if this one exists
collection_names = await self.db.list_collection_names()
if name not in collection_names:
logger.info(f"Collection {name} does not exist. Creating it.")
# Create the collection
await self.db.create_collection(name)
return self.db[name]
async def create_indexes(self: Self, collection_name: str, indexes: list = None) -> None:
"""
Create indexes on a collection.
If no indexes provided and collection needs vector search capability,
creates a vector search index using config settings.
"""
collection = await self.get_collection(collection_name)
if indexes:
await collection.create_indexes(indexes)
logger.info("Created custom indexes for collection {}: {}", collection_name, indexes)
else: # Create vector search index using settings from config
try:
# Create the vector search index with the proper MongoDB format
vector_search_index = {
"mappings": {
"dynamic": True,
"fields": {
"embedding": {
"type": "knnVector",
"dimensions": self.settings.VECTOR_DIMENSION,
"similarity": "cosine"
}
}
}
}
# Using createSearchIndex command which is the proper way to create vector search indexes
await self.db.command({
"createSearchIndex": collection_name,
"name": f"{collection_name}_vector_index",
"definition": vector_search_index
})
logger.info("Created vector search index for collection {}", collection_name)
except Exception as e:
logger.error("Failed to create vector index: {}", e)
raise
async def close(self: Self) -> None:
"""Close MongoDB connection."""
if self._client:
self._client.close()
logger.info("MongoDB connection closed")
self._client = None
self._db = None