LiKenun's picture
Make all code participate in dependency injection
c6a2a56
raw
history blame
4.75 kB
#from motor.motor_asyncio import AsyncIOMotorClient
from loguru import logger
from pydantic import BaseModel, model_validator
#from pymongo import IndexModel, ASCENDING
from typing import Optional, Self
from ctp_slack_bot.core.config import Settings
class MongoDB(BaseModel):
"""
MongoDB connection and initialization class.
Handles connection to MongoDB, database selection, and index creation.
"""
settings: Settings
@model_validator(mode='after')
def post_init(self: Self) -> Self:
logger.debug("Created {}", self.__class__.__name__)
return self
def __init__(self: Self, settings: Settings) -> Self:
#self.client: Optional[AsyncIOMotorClient] = None
#self.db = None
#self.vector_collection = None
#self.initialized = False
pass # The above initialization needs to be done some other way.
# async def connect(self):
# """
# Connect to MongoDB using connection string from settings.
# """
# if self.client is not None:
# return
# if not settings.MONGODB_URI:
# raise ValueError("MONGODB_URI is not set in environment variables")
# try:
# # Create MongoDB connection
# self.client = AsyncIOMotorClient(settings.MONGODB_URI.get_secret_value())
# self.db = self.client[settings.MONGODB_DB_NAME]
# self.vector_collection = self.db["vector_store"]
# logger.info(f"Connected to MongoDB: {settings.MONGODB_DB_NAME}")
# except Exception as e:
# logger.error(f"Error connecting to MongoDB: {str(e)}")
# raise
# async def initialize(self):
# """
# Initialize MongoDB with required collections and indexes.
# """
# if self.initialized:
# return
# if not self.client:
# await self.connect()
# try:
# # Create vector index for similarity search
# await self.create_vector_index()
# self.initialized = True
# logger.info("MongoDB initialized successfully")
# except Exception as e:
# logger.error(f"Error initializing MongoDB: {str(e)}")
# raise
# async def create_vector_index(self):
# """
# Create vector index for similarity search using MongoDB Atlas Vector Search.
# """
# try:
# # Check if index already exists
# existing_indexes = await self.vector_collection.list_indexes().to_list(length=None)
# index_names = [index.get('name') for index in existing_indexes]
# if "vector_index" not in index_names:
# # Create vector search index
# index_definition = {
# "mappings": {
# "dynamic": True,
# "fields": {
# "embedding": {
# "dimensions": settings.VECTOR_DIMENSION,
# "similarity": "cosine",
# "type": "knnVector"
# }
# }
# }
# }
# # Create the index
# await self.db.command({
# "createIndexes": self.vector_collection.name,
# "indexes": [
# {
# "name": "vector_index",
# "key": {"embedding": "vector"},
# "weights": {"embedding": 1},
# "vectorSearchOptions": index_definition
# }
# ]
# })
# # Create additional metadata indexes for filtering
# await self.vector_collection.create_index([("metadata.source", ASCENDING)])
# await self.vector_collection.create_index([("metadata.timestamp", ASCENDING)])
# logger.info("Vector search index created")
# else:
# logger.info("Vector search index already exists")
# except Exception as e:
# logger.error(f"Error creating vector index: {str(e)}")
# raise
# async def close(self):
# """
# Close MongoDB connection.
# """
# if self.client:
# self.client.close()
# self.client = None
# self.db = None
# self.vector_collection = None
# self.initialized = False
# logger.info("MongoDB connection closed")