Spaces:
Runtime error
Runtime error
#from motor.motor_asyncio import AsyncIOMotorClient | |
from loguru import logger | |
from pydantic import BaseModel, model_validator | |
#from pymongo import IndexModel, ASCENDING | |
from typing import Optional, Self | |
from ctp_slack_bot.core.config import Settings | |
class MongoDB(BaseModel): | |
""" | |
MongoDB connection and initialization class. | |
Handles connection to MongoDB, database selection, and index creation. | |
""" | |
settings: Settings | |
def post_init(self: Self) -> Self: | |
logger.debug("Created {}", self.__class__.__name__) | |
return self | |
def __init__(self: Self, settings: Settings) -> Self: | |
#self.client: Optional[AsyncIOMotorClient] = None | |
#self.db = None | |
#self.vector_collection = None | |
#self.initialized = False | |
pass # The above initialization needs to be done some other way. | |
# async def connect(self): | |
# """ | |
# Connect to MongoDB using connection string from settings. | |
# """ | |
# if self.client is not None: | |
# return | |
# if not settings.MONGODB_URI: | |
# raise ValueError("MONGODB_URI is not set in environment variables") | |
# try: | |
# # Create MongoDB connection | |
# self.client = AsyncIOMotorClient(settings.MONGODB_URI.get_secret_value()) | |
# self.db = self.client[settings.MONGODB_DB_NAME] | |
# self.vector_collection = self.db["vector_store"] | |
# logger.info(f"Connected to MongoDB: {settings.MONGODB_DB_NAME}") | |
# except Exception as e: | |
# logger.error(f"Error connecting to MongoDB: {str(e)}") | |
# raise | |
# async def initialize(self): | |
# """ | |
# Initialize MongoDB with required collections and indexes. | |
# """ | |
# if self.initialized: | |
# return | |
# if not self.client: | |
# await self.connect() | |
# try: | |
# # Create vector index for similarity search | |
# await self.create_vector_index() | |
# self.initialized = True | |
# logger.info("MongoDB initialized successfully") | |
# except Exception as e: | |
# logger.error(f"Error initializing MongoDB: {str(e)}") | |
# raise | |
# async def create_vector_index(self): | |
# """ | |
# Create vector index for similarity search using MongoDB Atlas Vector Search. | |
# """ | |
# try: | |
# # Check if index already exists | |
# existing_indexes = await self.vector_collection.list_indexes().to_list(length=None) | |
# index_names = [index.get('name') for index in existing_indexes] | |
# if "vector_index" not in index_names: | |
# # Create vector search index | |
# index_definition = { | |
# "mappings": { | |
# "dynamic": True, | |
# "fields": { | |
# "embedding": { | |
# "dimensions": settings.VECTOR_DIMENSION, | |
# "similarity": "cosine", | |
# "type": "knnVector" | |
# } | |
# } | |
# } | |
# } | |
# # Create the index | |
# await self.db.command({ | |
# "createIndexes": self.vector_collection.name, | |
# "indexes": [ | |
# { | |
# "name": "vector_index", | |
# "key": {"embedding": "vector"}, | |
# "weights": {"embedding": 1}, | |
# "vectorSearchOptions": index_definition | |
# } | |
# ] | |
# }) | |
# # Create additional metadata indexes for filtering | |
# await self.vector_collection.create_index([("metadata.source", ASCENDING)]) | |
# await self.vector_collection.create_index([("metadata.timestamp", ASCENDING)]) | |
# logger.info("Vector search index created") | |
# else: | |
# logger.info("Vector search index already exists") | |
# except Exception as e: | |
# logger.error(f"Error creating vector index: {str(e)}") | |
# raise | |
# async def close(self): | |
# """ | |
# Close MongoDB connection. | |
# """ | |
# if self.client: | |
# self.client.close() | |
# self.client = None | |
# self.db = None | |
# self.vector_collection = None | |
# self.initialized = False | |
# logger.info("MongoDB connection closed") | |