Spaces:
Runtime error
Runtime error
File size: 4,753 Bytes
c6a2a56 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 |
#from motor.motor_asyncio import AsyncIOMotorClient
from loguru import logger
from pydantic import BaseModel, model_validator
#from pymongo import IndexModel, ASCENDING
from typing import Optional, Self
from ctp_slack_bot.core.config import Settings
class MongoDB(BaseModel):
"""
MongoDB connection and initialization class.
Handles connection to MongoDB, database selection, and index creation.
"""
settings: Settings
@model_validator(mode='after')
def post_init(self: Self) -> Self:
logger.debug("Created {}", self.__class__.__name__)
return self
def __init__(self: Self, settings: Settings) -> Self:
#self.client: Optional[AsyncIOMotorClient] = None
#self.db = None
#self.vector_collection = None
#self.initialized = False
pass # The above initialization needs to be done some other way.
# async def connect(self):
# """
# Connect to MongoDB using connection string from settings.
# """
# if self.client is not None:
# return
# if not settings.MONGODB_URI:
# raise ValueError("MONGODB_URI is not set in environment variables")
# try:
# # Create MongoDB connection
# self.client = AsyncIOMotorClient(settings.MONGODB_URI.get_secret_value())
# self.db = self.client[settings.MONGODB_DB_NAME]
# self.vector_collection = self.db["vector_store"]
# logger.info(f"Connected to MongoDB: {settings.MONGODB_DB_NAME}")
# except Exception as e:
# logger.error(f"Error connecting to MongoDB: {str(e)}")
# raise
# async def initialize(self):
# """
# Initialize MongoDB with required collections and indexes.
# """
# if self.initialized:
# return
# if not self.client:
# await self.connect()
# try:
# # Create vector index for similarity search
# await self.create_vector_index()
# self.initialized = True
# logger.info("MongoDB initialized successfully")
# except Exception as e:
# logger.error(f"Error initializing MongoDB: {str(e)}")
# raise
# async def create_vector_index(self):
# """
# Create vector index for similarity search using MongoDB Atlas Vector Search.
# """
# try:
# # Check if index already exists
# existing_indexes = await self.vector_collection.list_indexes().to_list(length=None)
# index_names = [index.get('name') for index in existing_indexes]
# if "vector_index" not in index_names:
# # Create vector search index
# index_definition = {
# "mappings": {
# "dynamic": True,
# "fields": {
# "embedding": {
# "dimensions": settings.VECTOR_DIMENSION,
# "similarity": "cosine",
# "type": "knnVector"
# }
# }
# }
# }
# # Create the index
# await self.db.command({
# "createIndexes": self.vector_collection.name,
# "indexes": [
# {
# "name": "vector_index",
# "key": {"embedding": "vector"},
# "weights": {"embedding": 1},
# "vectorSearchOptions": index_definition
# }
# ]
# })
# # Create additional metadata indexes for filtering
# await self.vector_collection.create_index([("metadata.source", ASCENDING)])
# await self.vector_collection.create_index([("metadata.timestamp", ASCENDING)])
# logger.info("Vector search index created")
# else:
# logger.info("Vector search index already exists")
# except Exception as e:
# logger.error(f"Error creating vector index: {str(e)}")
# raise
# async def close(self):
# """
# Close MongoDB connection.
# """
# if self.client:
# self.client.close()
# self.client = None
# self.db = None
# self.vector_collection = None
# self.initialized = False
# logger.info("MongoDB connection closed")
|