|
import os |
|
from os import getenv |
|
|
|
from dotenv import load_dotenv |
|
|
|
import datetime |
|
from datetime import datetime as dt |
|
import time |
|
|
|
from motor import motor_asyncio |
|
from motor.core import AgnosticClient |
|
from motor.motor_asyncio import AsyncIOMotorClient |
|
|
|
from logger import LOGS |
|
|
|
load_dotenv() |
|
MONGO_URL = os.environ["MONGO_URL"] |
|
|
|
client_lang = AsyncIOMotorClient(MONGO_URL) |
|
dbs = client_lang["Akeno"] |
|
users_collection = dbs["gemini_model"] |
|
|
|
class Database: |
|
def __init__(self, uri: str) -> None: |
|
self.client: AgnosticClient = motor_asyncio.AsyncIOMotorClient(uri) |
|
self.db = self.client["Akeno"] |
|
|
|
self.user_premium = self.db["user_premium"] |
|
self.user_blacklists = self.db["user_blacklist"] |
|
self.backup_chatbot = self.db["google_genai"] |
|
|
|
async def connect(self): |
|
try: |
|
await self.client.admin.command("ping") |
|
LOGS.info(f"Database Connection Established!") |
|
await self.user_premium.create_index("user_id") |
|
await self.user_premium.create_index("last_reset") |
|
await self.user_premium.create_index("premium_expiry") |
|
await self.user_blacklists.create_index("unfreeze_at") |
|
await self.user_blacklists.create_index("is_frozen") |
|
LOGS.info(f"Database Create index full Connected!") |
|
except Exception as e: |
|
LOGS.info(f"DatabaseErr: {e} ") |
|
quit(1) |
|
|
|
async def _close(self): |
|
await self.client.close() |
|
|
|
def get_datetime(self) -> str: |
|
return datetime.datetime.now().strftime("%d/%m/%Y - %H:%M") |
|
|
|
async def _update_chatbot_chat_in_db(self, user_id, chatbot_chat): |
|
await self.backup_chatbot.update_one( |
|
{"user_id": user_id}, |
|
{"$set": {"chatbot_chat": chatbot_chat}}, |
|
upsert=True |
|
) |
|
|
|
async def _get_chatbot_chat_from_db(self, user_id): |
|
user_data = await self.backup_chatbot.find_one({"user_id": user_id}) |
|
return user_data.get("chatbot_chat", []) if user_data else [] |
|
|
|
async def _clear_chatbot_history_in_db(self, user_id): |
|
unset_clear = {"chatbot_chat": None} |
|
return await self.backup_chatbot.update_one({"user_id": user_id}, {"$unset": unset_clear}) |
|
|
|
async def _clear_chatbot_database(self, user_id): |
|
result = await self._clear_chatbot_history_in_db(user_id) |
|
if result.modified_count > 0: |
|
return "Chat history cleared successfully." |
|
else: |
|
return "No chat history found to clear." |
|
|
|
db = Database(MONGO_URL) |