#!/usr/bin/env python # -*- coding: utf-8 -*- # Copyright 2020-2024 (c) Randy W @xtdevs, @xtsea # # from : https://github.com/TeamKillerX # Channel : @RendyProjects # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . import asyncio import importlib import math import os import base64 import re import uuid import shlex import subprocess import sys import time import traceback from gpytranslate import SyncTranslator from io import BytesIO from types import ModuleType from typing import Dict, List, Optional, Tuple, Union import requests import time import json import io from PIL import Image from pyrogram import * from pyrogram.enums import ChatMemberStatus from pyrogram import enums from pyrogram import Client, filters from pyrogram.types import * from pyrogram.errors import * from config import * from database import db from database import users_collection from logger import LOGS from akenoai import * from akenoai.types import DifferentAPIDefault from google import genai from google.genai import types as ty trans = SyncTranslator() js = AkenoXJs(DifferentAPIDefault()).connect() message_memory_state = {} def obfuscate(word): return word.lower()\ .replace("a", "[a@4]")\ .replace("b", "[b8]")\ .replace("e", "[e3]")\ .replace("g", "[g69]")\ .replace("i", "[i1!|]")\ .replace("o", "[o0]")\ .replace("s", "[s5$]")\ .replace("t", "[t7+]")\ .replace("l", "[l1|]") def regex_all_blacklist(text): with open("blacklist.txt", "r", encoding="utf-8") as f: words = [obfuscate(w.strip()) for w in f if w.strip()] pattern = r"\b(" + "|".join(words) + r")\b" if re.search(pattern, text, re.IGNORECASE): return True else: return False async def geni_files_delete(name: str): url = f"https://generativelanguage.googleapis.com/v1beta/{name}" params = {"key": GOOGLE_API_KEY} response = requests.delete(url, params=params) if response.status_code != 200: return None return response.text async def remove_bg_myapi(return_image): with open(return_image, "rb") as image_file: encoded_string = base64.b64encode(image_file.read()).decode("utf-8") payload = { "image_base64": encoded_string } output_bg = "output.png" response = requests.post("https://randydev-meta-ai.hf.space/remove-bg-base64", json=payload) if response.status_code == 200: result_base64 = response.json().get("output_image_base64") with open(output_bg, "wb") as out_file: out_file.write(base64.b64decode(result_base64)) return output_bg else: return None CREDITS_DEVELOPER = """ ๐Ÿ—“๏ธ 2 May 2025 ๐Ÿค– @GeminiAIDev_Bot โ€” Crafted with passion by @xtdevs ๐Ÿง  Language: Python ๐Ÿ”’ Source Code: Not publicly available โœจ About: Youโ€™ve entered a realm where your imagination meets intelligence. From โœ๏ธ text, ๐Ÿ–ผ๏ธ images etc, to full ๐Ÿ—ฃ๏ธ conversations โ€” Gemini is your AI companion. ๐Ÿงพ Credits: Google Devs, OpenAI Engineers """ GEMINI_START_TEXT = """ Welcome to Gemini AI [Dev] Youโ€™ve entered the realm of limitless possibilities. From text, photos, to full conversations โ€” Gemini is your digital partner. Speak. Create. Explore. Let your thoughts ignite the machine. Type anything to begin. """ gen = genai.Client(api_key=GOOGLE_API_KEY) class TaskManager: def __init__(self): self.running_tasks = {} async def add_task(self, chat_id, coro): task = asyncio.create_task(coro) self.running_tasks[chat_id] = task try: await task finally: self.running_tasks.pop(chat_id, None) async def cancel_task(self, chat_id): task = self.running_tasks.get(chat_id) if task: task.cancel() del self.running_tasks[chat_id] return True return False def list_tasks(self): return list(self.running_tasks.keys()) task_manager = TaskManager() async def progress(current, total, message, start, type_of_ps, file_name=None): """Progress Bar For Showing Progress While Uploading / Downloading File - Normal""" now = time.time() diff = now - start buttons = [ [ InlineKeyboardButton( text="Cancel", callback_data="cancels" ) ], ] if round(diff % 10.00) == 0 or current == total: percentage = current * 100 / total speed = current / diff elapsed_time = round(diff) * 1000 if elapsed_time == 0: return time_to_completion = round((total - current) / speed) * 1000 estimated_total_time = elapsed_time + time_to_completion progress_str = f"{''.join(['โ–ฐ' for i in range(math.floor(percentage / 10))])}" progress_str += ( f"{''.join(['โ–ฑ' for i in range(10 - math.floor(percentage / 10))])}" ) progress_str += f"{round(percentage, 2)}%\n" tmp = f"{progress_str}{humanbytes(current)} of {humanbytes(total)}\n" tmp += f"ETA: {time_formatter(estimated_total_time)}" if file_name: try: await message.edit( f"{type_of_ps}\n**File Name:** `{file_name}`\n{tmp}" ) except FloodWait as e: await asyncio.sleep(e.x) except MessageNotModified: pass else: try: await message.edit_text( f"{type_of_ps}\n{tmp}", parse_mode=enums.ParseMode.MARKDOWN, reply_markup=InlineKeyboardMarkup(buttons) ) except FloodWait as e: await asyncio.sleep(e.x) except MessageNotModified: pass def humanbytes(size): """Convert Bytes To Bytes So That Human Can Read It""" if not size: return "" power = 2**10 raised_to_pow = 0 dict_power_n = {0: "", 1: "Ki", 2: "Mi", 3: "Gi", 4: "Ti"} while size > power: size /= power raised_to_pow += 1 return str(round(size, 2)) + " " + dict_power_n[raised_to_pow] + "B" def time_formatter(milliseconds: int) -> str: """Time Formatter""" seconds, milliseconds = divmod(int(milliseconds), 1000) minutes, seconds = divmod(seconds, 60) hours, minutes = divmod(minutes, 60) days, hours = divmod(hours, 24) tmp = ( ((str(days) + " day(s), ") if days else "") + ((str(hours) + " hour(s), ") if hours else "") + ((str(minutes) + " minute(s), ") if minutes else "") + ((str(seconds) + " second(s), ") if seconds else "") + ((str(milliseconds) + " millisecond(s), ") if milliseconds else "") ) return tmp[:-2] # Original: @GeminiAIDev_Bot # Code by @xtdevs def replace_with_markdown_list(text, pattern=r"^\s*\*\s*(.*)$"): def replace(match): text = match.group(1) text = re.sub(r"\b(\w+)\b", r"**\1**", text) return "- " + text return re.sub(pattern, replace, text, flags=re.MULTILINE) like_counts = {} unlike_counts = {} TELEGRAM_MAX_LENGTH = 4096 def split_message(text: str) -> List[str]: if len(text) <= TELEGRAM_MAX_LENGTH: return [text] parts = [] while len(text) > 0: if len(text) > TELEGRAM_MAX_LENGTH: part = text[:TELEGRAM_MAX_LENGTH] parts.append(part) text = text[TELEGRAM_MAX_LENGTH:] else: parts.append(text) text = "" return parts @Client.on_chat_member_updated( filters.channel ) async def check_steal_channel_id(client, event: ChatMemberUpdated): old_status = event.old_chat_member.status if event.old_chat_member else None new_status = event.new_chat_member.status if event.new_chat_member else None user_id = event.new_chat_member.promoted_by.id if event.new_chat_member else None if not user_id or event.chat.username is None: return keyboard = [] keyboard.append([ InlineKeyboardButton("๐Ÿ”Ž View Channel", url=f"https://t.me/{event.chat.username}"), InlineKeyboardButton("โŒ Close", callback_data="closedd") ]) try: await db.backup_chatbot.update_one( {"user_id": user_id}, {"$set": { "channel_username": event.chat.username, "channel_id": event.chat.id, "can_post_messages": event.new_chat_member.privileges.can_post_messages }}, upsert=True ) await client.send_message( user_id, f"Successfully connected to your [`{event.chat.id}`] channel", reply_markup=InlineKeyboardMarkup(keyboard) ) # LOGS.info(f"Update Channel: {event.new_chat_member}") except Exception as e: LOGS.error(f"Error check_steal_channel_id: {str(e)}") @Client.on_callback_query(filters.regex("^cancels")) async def cancel_(client, callback_query): chat_id = callback_query.message.chat.id if await task_manager.cancel_task(chat_id): spam_chats.remove(chat_id) await callback_query.edit_message_text("โœ… Processing canceled.") else: await callback_query.edit_message_text("โš ๏ธ No active task to cancel.") @Client.on_callback_query(filters.regex("^deletemydatagm$")) async def deletemydata(client, callback): user_id = callback.from_user.id delm = await db._clear_chatbot_database(user_id) if delm: await callback.edit_message_text(delm) else: await callback.answer(delm, True) @Client.on_callback_query(filters.regex("^menutools_(\d+)$")) async def menu_tools(client, callback): user_id = int(callback.matches[0].group(1)) keyboard = [] keyboard.append([ InlineKeyboardButton("๐Ÿ†• Post to Channel", callback_data=f"autopost_{user_id}"), InlineKeyboardButton("๐Ÿงฑ Back To Menu", callback_data=f"response_{user_id}") ]) keyboard.append([ InlineKeyboardButton(f"๐Ÿ’พ", callback_data=f"memory_{user_id}") ]) try: await callback.edit_message_reply_markup( reply_markup=InlineKeyboardMarkup(keyboard) ) await callback.answer("Menu Tools", False) except Exception as e: LOGS.error(f"Exception menu_tools: {str(e)}") await callback.answer("Server busy try again later", True) @Client.on_callback_query(filters.regex("^autopost_(\d+)$")) async def auto_post_channel(client, callback): user_id = int(callback.matches[0].group(1)) try: backup_chat = await db._get_chatbot_chat_from_db(user_id) data = await db.backup_chatbot.find_one({"user_id": user_id}) if not data: return await callback.answer("Can't found user", True) file_id = data.get("file_id", None) if data.get("is_channel_photo", False): if not file_id: return await callback.answer("Can't found file_id", True) who_post = data.get("channel_id", None) if not who_post: return await callback.answer("@GeminiAIDev_bot add to your channel as admin!", True) if data.get("can_post_messages", False): await callback.answer("Posted to Your Channel Check Now", True) await client.send_photo( who_post, photo=file_id, caption=data.get("translate_text", "") ) keyboard = create_keyboard( user_id=user_id, chat=data.get("channel_username", "RendyProjects"), is_menu=True ) await callback.edit_message_reply_markup(reply_markup=keyboard) return await callback.answer("Permissions: can_post_messages", True) return await callback.answer("Need Channel as admin", True) return except Exception as e: LOGS.error(f"auto_post failed: {str(e)}") await callback.answer("Server busy try again later", True) @Client.on_callback_query(filters.regex("^genprompt_(\d+)$")) async def geminigen_prompt(client, callback): user_id = int(callback.matches[0].group(1)) captions = "" file_path = "gemini-native-image.png" try: backup_chat = await db._get_chatbot_chat_from_db(user_id) data = await db.backup_chatbot.find_one({"user_id": user_id}) if not data: return await callback.answer("Can't found user", True) get_response = data.get("prompt_image", None) if not get_response: return await callback.answer("Server busy try again later", True) if regex_all_blacklist(get_response): return await callback.answer("You been blocked blacklisted", True) await callback.message.edit_text("....") await client.send_chat_action(callback.message.chat.id, enums.ChatAction.UPLOAD_PHOTO) await asyncio.sleep(2.0) backup_chat.append({"role": "user", "parts": [{"text": get_response}]}) response = await gen.aio.models.generate_content( model="gemini-2.0-flash-exp-image-generation", contents=get_response, config=ty.GenerateContentConfig( response_modalities=['TEXT', 'IMAGE'] ) ) for part in response.candidates[0].content.parts: if part.text is not None: captions += part.text elif part.inline_data is not None: image = Image.open(BytesIO((part.inline_data.data))) image.save(file_path) keyboard = create_keyboard(user_id=user_id) await callback.message.edit_media( media=InputMediaPhoto( media=file_path, caption=captions ), reply_markup=keyboard ) await db.backup_chatbot.update_one( {"user_id": user_id}, {"$set": {"translate_text": captions}}, upsert=True ) backup_chat.append({"role": "model", "parts": [{"text": captions}]}) await db._update_chatbot_chat_in_db(user_id, backup_chat) await client.send_chat_action(callback.message.chat.id, enums.ChatAction.CANCEL) return except Exception as e: LOGS.error(f"geminigen_prompt failed: {str(e)}") return await callback.message.edit_media( media=InputMediaPhoto( media="error.png", caption="Server busy try again later" ), reply_markup=None ) finally: if file_path: try: os.remove(file_path) except: pass @Client.on_callback_query(filters.regex("^fluxprompt_(\d+)$")) async def flux_prompt(client, callback): user_id = int(callback.matches[0].group(1)) file_path = None file_photo = None try: backup_chat = await db._get_chatbot_chat_from_db(user_id) data = await db.backup_chatbot.find_one({"user_id": user_id}) if not data: return await callback.answer("Can't found user", True) get_response = data.get("prompt_image", None) if not get_response: return await callback.answer("Server busy try again later", True) if regex_all_blacklist(get_response): return await callback.answer("You been blocked blacklisted", True) await callback.message.edit_text("....") await client.send_chat_action(callback.message.chat.id, enums.ChatAction.UPLOAD_PHOTO) await asyncio.sleep(2.0) backup_chat.append({"role": "user", "parts": [{"text": get_response}]}) response_js = await js.image.create( "black-forest-labs/flux-1-schnell", image_read=True, params_data={"query": get_response}, ) file_path = "randydev.jpg" with open(file_path, "wb") as f: f.write(response_js) file_photo = await gen.aio.files.upload(file=file_path) while file_photo.state.name == "PROCESSING": await client.send_chat_action(callback.message.chat.id, enums.ChatAction.UPLOAD_PHOTO) await asyncio.sleep(10) file_photo = await gen.aio.files.get(name=file_photo.name) if file_photo.state.name == "FAILED": return await callback.message.reply_text(f"Error: {file_photo.state.name}") response = await gen.aio.models.generate_content( model="gemini-2.0-flash", contents=[ "You are a picture making a story use small don't be too many", file_photo ] ) keyboard = create_keyboard( user_id=user_id, chat=data.get("channel_username", "RendyProjects"), is_menu=True ) view_button_user = anonymous_user(user_id=user_id) await callback.message.edit_media( media=InputMediaPhoto( media=file_path, caption=response.text ), reply_markup=keyboard ) photo = await client.send_photo( "LicenseAknBotDB", file_path, caption=f"User Mention: {callback.from_user.mention}\n" f"User ID: `{user_id}`", reply_markup=view_button_user ) await db.backup_chatbot.update_one( {"user_id": user_id}, {"$set": { "translate_text": response.text, "file_id": photo.photo.file_id, "is_channel_photo": True }}, upsert=True ) backup_chat.append({"role": "model", "parts": [{"text": response.text}]}) await db._update_chatbot_chat_in_db(user_id, backup_chat) await client.send_chat_action(callback.message.chat.id, enums.ChatAction.CANCEL) return except Exception as e: LOGS.error(f"flux_prompt failed: {str(e)}") return await callback.message.edit_media( media=InputMediaPhoto( media="error.png", caption="Server busy try again later" ), reply_markup=None ) finally: if file_path and file_photo: try: os.remove(file_path) await gen.aio.files.delete(name=file_photo.name) except: pass @Client.on_callback_query(filters.regex("^removerbg_(\d+)$")) async def remover_bg(client, callback): user_id = int(callback.matches[0].group(1)) try: data = await db.backup_chatbot.find_one({"user_id": int(user_id)}) if not data: return await callback.answer("Can't found user", True) get_response = data.get("background_file_id", None) if not get_response: return await callback.answer("Server busy try again later", True) media = await client.download_media(get_response) await callback.message.edit_text(".....") output_bg = await remove_bg_myapi(media) if not output_bg: return await callback.answer("Server busy try again later", True) await callback.message.edit_media( media=InputMediaPhoto( media=output_bg, caption="Uploaded now" ) ) await callback.answer() except Exception as e: LOGS.error(f"remover_bg failed: {str(e)}") return await callback.message.edit_media( media=InputMediaPhoto( media="error.png", caption="Server busy try again later" ), reply_markup=None ) @Client.on_callback_query(filters.regex("^refreshch$")) async def reshch(client, callback): await callback.answer("Coming Soon", True) @Client.on_callback_query(filters.regex("^closedd$")) async def closeed(client, callback): await callback.message.delete() @Client.on_callback_query(filters.regex("^memory_(\d+)$")) async def memory_updated(client, callback): user_id = int(callback.matches[0].group(1)) if message_memory_state.get(callback.message.id, False): return await callback.answer("Memory already updated!", show_alert=False) try: if user_id == 0: return await callback.answer("Server busy, try again later", show_alert=True) data_tr = await db.backup_chatbot.find_one({"user_id": user_id}) if not data_tr: return await callback.answer("User data not found", show_alert=True) backup_chat = await db._get_chatbot_chat_from_db(user_id) if not backup_chat: backup_chat = [] get_response = data_tr.get("translate_text") if not get_response: return await callback.answer("No translation available", show_alert=True) backup_chat.append({ "role": "model", "parts": [{"text": get_response}] }) await asyncio.gather( db._update_chatbot_chat_in_db(user_id, backup_chat), callback.answer("Memory updated successfully!") ) message_memory_state[callback.message.id] = True except Exception as e: LOGS.error(f"Memory update failed: {str(e)}") await callback.answer("Server error", show_alert=True) @Client.on_callback_query(filters.regex("^tr_(\d+)_(\w+)$")) async def terjemahkan(client, callback): user_id, targetlang = callback.matches[0].groups() try: if user_id == 0: return await callback.answer("Server busy try again later", True) data_tr = await db.backup_chatbot.find_one({"user_id": int(user_id)}) if not data_tr: return await callback.answer("Can't found user", True) get_response = data_tr.get("translate_text", None) if not get_response: return await callback.answer("Server busy try again later", True) translation = trans( get_response, sourcelang="auto", targetlang=str(targetlang) ) keyboard = create_keyboard(user_id=int(user_id)) await callback.edit_message_text( translation.text, reply_markup=keyboard ) await callback.answer("Translate Updated Now", False) except Exception as e: LOGS.error(f"Exception translate: {str(e)}") await callback.answer("Server busy try again later", True) @Client.on_callback_query(filters.regex("^trmulti_(\d+)$")) async def multiple_langagues(client, callback): user_id = int(callback.matches[0].group(1)) keyboard = [] keyboard.append([ InlineKeyboardButton("๐Ÿ‡ฆ๐Ÿ‡ช", callback_data=f"tr_{user_id}_ar"), InlineKeyboardButton("๐Ÿ‡ฆ๐Ÿ‡ฟ", callback_data=f"tr_{user_id}_az") ]) keyboard.append([ InlineKeyboardButton("๐Ÿ‡ง๐Ÿ‡พ", callback_data=f"tr_{user_id}_be"), InlineKeyboardButton("๐Ÿ‡ช๐Ÿ‡ธ", callback_data=f"tr_{user_id}_es") ]) keyboard.append([ InlineKeyboardButton("๐Ÿ‡น๐Ÿ‡ท", callback_data=f"tr_{user_id}_tr"), InlineKeyboardButton("๐Ÿ‡ป๐Ÿ‡ณ", callback_data=f"tr_{user_id}_vi") ]) keyboard.append([ InlineKeyboardButton("๐Ÿ‡ฎ๐Ÿ‡ฉ", callback_data=f"tr_{user_id}_id"), InlineKeyboardButton("๐Ÿ‡บ๐Ÿ‡ธ", callback_data=f"tr_{user_id}_en") ]) keyboard.append([ InlineKeyboardButton("๐Ÿ‡ฎ๐Ÿ‡ณ", callback_data=f"tr_{user_id}_hi"), InlineKeyboardButton("๐Ÿ‡ฏ๐Ÿ‡ต", callback_data=f"tr_{user_id}_ja") ]) keyboard.append([ InlineKeyboardButton("๐Ÿ‡ฒ๐Ÿ‡พ", callback_data=f"tr_{user_id}_ms"), InlineKeyboardButton("๐Ÿ‡ท๐Ÿ‡บ", callback_data=f"tr_{user_id}_ru"), ]) keyboard.append([ InlineKeyboardButton(f"๐Ÿ’พ", callback_data=f"memory_{user_id}") ]) try: await callback.edit_message_reply_markup( reply_markup=InlineKeyboardMarkup(keyboard) ) await callback.answer("Translate List Now", False) except Exception as e: LOGS.error(f"Exception multiple_langagues: {str(e)}") await callback.answer("Server busy try again later", True) def anonymous_user(user_id: int = None): keyboard = InlineKeyboardMarkup( [ [ InlineKeyboardButton(f"๐Ÿ‘€ View", url=f"tg://user?id={user_id}"), ] ] ) return keyboard @Client.on_callback_query(filters.regex("^response_(\d+)$")) async def response_call(client, callback): user_id = int(callback.matches[0].group(1)) keyboard = InlineKeyboardMarkup( [ [ InlineKeyboardButton(f"๐Ÿ‘ {likes}", callback_data="like"), InlineKeyboardButton(f"๐Ÿ‘Ž {unlikes}", callback_data="unlike"), InlineKeyboardButton(f"๐Ÿ—ƒ๏ธ", callback_data=f"menutools_{user_id}"), InlineKeyboardButton(f"โ™พ๏ธ", callback_data=f"trmulti_{user_id}"), InlineKeyboardButton(f"๐Ÿ’พ", callback_data=f"memory_{user_id}"), ] ] ) await callback.edit_message_reply_markup( reply_markup=keyboard ) return keyboard def create_keyboard( likes: int = 0, unlikes: int = 0, user_id: int = None, chat: str = None, is_menu: bool = False ): if is_menu: keyboard = InlineKeyboardMarkup( [ [ InlineKeyboardButton(f"๐Ÿ—ƒ๏ธ", callback_data=f"menutools_{user_id}"), InlineKeyboardButton(f"๐Ÿ”Ž", url=f"https://t.me/{chat}"), InlineKeyboardButton(f"โ™พ๏ธ", callback_data=f"trmulti_{user_id}"), InlineKeyboardButton(f"๐Ÿ’พ", callback_data=f"memory_{user_id}"), ], [ InlineKeyboardButton(f"๐Ÿ‘ {likes}", callback_data="like"), InlineKeyboardButton(f"๐Ÿ‘Ž {unlikes}", callback_data="unlike"), ] ] ) else: keyboard = InlineKeyboardMarkup( [ [ InlineKeyboardButton(f"๐Ÿ‘ {likes}", callback_data="like"), InlineKeyboardButton(f"๐Ÿ‘Ž {unlikes}", callback_data="unlike"), InlineKeyboardButton(f"โ™พ๏ธ", callback_data=f"trmulti_{user_id}"), InlineKeyboardButton(f"๐Ÿ”", callback_data=f"refreshch"), InlineKeyboardButton(f"๐Ÿ’พ", callback_data=f"memory_{user_id}"), ] ] ) return keyboard @Client.on_callback_query() async def vote(client, callback_query): message_id = callback_query.message.id user_id = callback_query.from_user.id action = callback_query.data if message_id not in like_counts: like_counts[message_id] = set() if message_id not in unlike_counts: unlike_counts[message_id] = set() if action == "like": if user_id in like_counts[message_id]: like_counts[message_id].remove(user_id) else: like_counts[message_id].add(user_id) await callback_query.answer("Thank you for your feedback!", False) if user_id in unlike_counts[message_id]: unlike_counts[message_id].remove(user_id) elif action == "unlike": if user_id in unlike_counts[message_id]: unlike_counts[message_id].remove(user_id) else: unlike_counts[message_id].add(user_id) if user_id in like_counts[message_id]: like_counts[message_id].remove(user_id) likes = len(like_counts[message_id]) unlikes = len(unlike_counts[message_id]) updated_keyboard = create_keyboard(likes=likes, unlikes=unlikes) await callback_query.edit_message_reply_markup(reply_markup=updated_keyboard) await callback_query.answer() @Client.on_message( ~filters.scheduled & filters.command(["start"]) & ~filters.forwarded ) async def startbot(client: Client, message: Message): buttons = [ [ InlineKeyboardButton( text="Try this Gemini OLD", url=f"https://t.me/ryuzaki_asstbot" ), ], [ InlineKeyboardButton( text="Developer", url=f"https://t.me/xtdevs" ), InlineKeyboardButton( text="Channel", url='https://t.me/RendyProjects' ), ] ] await db.backup_chatbot.update_one( {"bot_id": client.me.id}, {"$addToSet": {"broadcast": message.from_user.id}}, upsert=True ) await message.reply_text( text=GEMINI_START_TEXT, disable_web_page_preview=True, reply_markup=InlineKeyboardMarkup(buttons) ) @Client.on_message( filters.incoming & ( filters.text | filters.photo | filters.video | filters.audio | filters.voice | filters.sticker | filters.document ) & filters.private & ~filters.command(["start", "setmodel"]) & ~filters.forwarded ) async def chatbot_talk(client: Client, message: Message): user = await users_collection.find_one({"user_id": message.from_user.id}) model_ = user.get("model") if user else "gemini-2.0-flash" if message.reply_to_message and message.reply_to_message.from_user: if message.reply_to_message.from_user.id != client.me.id: return if message.photo: file_photo = None captions = "" file_path = "gemini-native-image.png" try: caption = message.caption or "What this?" await client.send_chat_action(message.chat.id, enums.ChatAction.UPLOAD_PHOTO) await asyncio.sleep(1.5) if regex_all_blacklist(caption): return await message.reply_text("You been blocked blacklisted") if re.findall(r"\b(This is a picture of me)\b", caption, re.IGNORECASE): if caption == "picture" or len(caption) > 300: return await message.reply_text( "You can't write **picture** small, it has to be long OR short first\n" "OR Caption Too many `MAX_CAPTION: 300`" ) try: backup_chat = await db._get_chatbot_chat_from_db(message.from_user.id) backup_chat.append({"role": "user", "parts": [{"text": caption}]}) images = Image.open(await message.download()) response = await gen.aio.models.generate_content( model="gemini-2.0-flash-exp-image-generation", contents=[str(caption), images], config=ty.GenerateContentConfig( response_modalities=['TEXT', 'IMAGE'] ) ) for part in response.candidates[0].content.parts: if part.text is not None: captions += part.text elif part.inline_data is not None: image = Image.open(BytesIO(part.inline_data.data)) image.save(file_path) keyboard_like = create_keyboard(user_id=message.from_user.id) await message.reply_photo( file_path, caption=captions, reply_markup=keyboard_like ) await db.backup_chatbot.update_one( {"user_id": message.from_user.id}, {"$set": {"translate_text": captions}}, upsert=True ) backup_chat.append({"role": "model", "parts": [{"text": captions}]}) await db._update_chatbot_chat_in_db(message.from_user.id, backup_chat) await client.send_chat_action(message.chat.id, enums.ChatAction.CANCEL) return except Exception as e: LOGS.error(f"Error: Gemini Edit Image: {str(e)}") return await message.reply_text("Server busy try again later") finally: if file_path: try: os.remove(file_path) except: pass if re.findall(r"\b(pro:editimage)\b", caption, re.IGNORECASE): await db.backup_chatbot.update_one( {"user_id": message.from_user.id}, {"$set": {"background_file_id": message.photo.file_id}}, upsert=True ) buttons = [ [ InlineKeyboardButton( text="โœ‚๏ธ Background Remover", callback_data=f"removerbg_{message.from_user.id}" ) ], [ InlineKeyboardButton( text="โŒ Cancel", callback_data="closedd" ) ] ] await message.reply_photo( message.photo.file_id, caption="Are you sure you want to image this edit image?", reply_markup=InlineKeyboardMarkup(buttons) ) return backup_chat = await db._get_chatbot_chat_from_db(message.from_user.id) backup_chat.append({"role": "user", "parts": [{"text": caption}]}) file_photo = await gen.aio.files.upload(file=await message.download()) while file_photo.state.name == "PROCESSING": await client.send_chat_action(message.chat.id, enums.ChatAction.UPLOAD_PHOTO) await asyncio.sleep(10) file_photo = await gen.aio.files.get(name=file_photo.name) if file_photo.state.name == "FAILED": return await message.reply_text(f"Error: {file_photo.state.name}") response = await gen.aio.models.generate_content( model=model_, contents=[str(caption), file_photo] ) if len(response.text) > 4096: text_parts = split_message(response.text) for part in text_parts: await message.reply_text(part) else: keyboard_like = create_keyboard(user_id=message.from_user.id) await message.reply_text( response.text, disable_web_page_preview=True, reply_markup=keyboard_like ) await db.backup_chatbot.update_one( {"user_id": message.from_user.id}, {"$set": {"translate_text": response.text}}, upsert=True ) backup_chat.append({"role": "model", "parts": [{"text": response.text}]}) await db._update_chatbot_chat_in_db(message.from_user.id, backup_chat) await client.send_chat_action(message.chat.id, enums.ChatAction.CANCEL) return except Exception as e: LOGS.error(f"Error: message.text: {str(e)}") return await message.reply_text("Server busy try again later") finally: if file_photo: await gen.aio.files.delete(name=file_photo.name) if message.video: video_file = None try: await client.send_chat_action(message.chat.id, enums.ChatAction.UPLOAD_VIDEO) await asyncio.sleep(2.0) caption = message.caption or "What this?" if regex_all_blacklist(caption): return await message.reply_text("You been blocked blacklisted") backup_chat = await db._get_chatbot_chat_from_db(message.from_user.id) backup_chat.append({"role": "user", "parts": [{"text": caption}]}) video_file = await gen.aio.files.upload(file=await message.download()) while video_file.state.name == "PROCESSING": await client.send_chat_action(message.chat.id, enums.ChatAction.UPLOAD_VIDEO) await asyncio.sleep(10) video_file = await gen.aio.files.get(name=video_file.name) if video_file.state.name == "FAILED": return await message.reply_text(f"Error: {video_file.state.name}") response = await gen.aio.models.generate_content( model=model_, contents=[str(caption), video_file] ) if len(response.text) > 4096: text_parts = split_message(response.text) for part in text_parts: await message.reply_text(part) else: keyboard_like = create_keyboard(user_id=message.from_user.id) await message.reply_text( response.text, disable_web_page_preview=True, reply_markup=keyboard_like ) await db.backup_chatbot.update_one( {"user_id": message.from_user.id}, {"$set": {"translate_text": response.text}}, upsert=True ) backup_chat.append({"role": "model", "parts": [{"text": response.text}]}) await db._update_chatbot_chat_in_db(message.from_user.id, backup_chat) await client.send_chat_action(message.chat.id, enums.ChatAction.CANCEL) return except Exception as e: LOGS.error(f"Error: message.text: {str(e)}") return await message.reply_text("Server busy try again later") finally: if video_file: await gen.aio.files.delete(name=video_file.name) if message.sticker: sticker_file = None media = None try: f = message.sticker.file_id backup_chat = await db._get_chatbot_chat_from_db(message.from_user.id) await client.send_chat_action(message.chat.id, enums.ChatAction.CHOOSE_STICKER) await asyncio.sleep(2.5) media = await client.download_media(f, file_name=f"{str(uuid.uuid4())[:8]}.jpg") if media.endswith(".jpg"): sticker_file = await gen.aio.files.upload(file=media) while sticker_file.state.name == "PROCESSING": await client.send_chat_action(message.chat.id, enums.ChatAction.CHOOSE_STICKER) await asyncio.sleep(10) sticker_file = await gen.aio.files.get(name=sticker_file.name) if sticker_file.state.name == "FAILED": return await message.reply_text(f"Error: {sticker_file.state.name}") response = await gen.aio.models.generate_content( model=model_, contents=["funny random meme sticker words", sticker_file] ) if len(response.text) > 4096: text_parts = split_message(response.text) for part in text_parts: await message.reply_text(part) else: keyboard_like = create_keyboard(user_id=message.from_user.id) await message.reply_text( response.text, disable_web_page_preview=True, reply_markup=keyboard_like ) await db.backup_chatbot.update_one( {"user_id": message.from_user.id}, {"$set": {"translate_text": response.text}}, upsert=True ) backup_chat.append({"role": "model", "parts": [{"text": response.text}]}) await db._update_chatbot_chat_in_db(message.from_user.id, backup_chat) await client.send_chat_action(message.chat.id, enums.ChatAction.CANCEL) return except Exception as e: LOGS.error(f"Error: message.text: {str(e)}") return await message.reply_text("Server busy try again later") finally: if sticker_file: await gen.aio.files.delete(name=sticker_file.name) try: os.remove(media) except: pass if message.text: await client.send_chat_action(message.chat.id, enums.ChatAction.TYPING) await asyncio.sleep(1.5) query = message.text.strip() query_base = query captions = "" file_path = "gemini-native-image.png" try: if regex_all_blacklist(query_base): return await message.reply_text("You been blocked blacklisted") if query_base in ["/", "/help"]: await client.send_chat_action(message.chat.id, enums.ChatAction.CANCEL) return if re.findall(r"\b(about:developer)\b", query_base, re.IGNORECASE): await message.reply_text(CREDITS_DEVELOPER) return if re.findall(r"\b(image)\b", query_base, re.IGNORECASE): if query_base == "image" or len(query_base) > 350: return await message.reply_text( "You can't write **image** small, it has to be long OR short first\n" "OR Text Too many `MAX_TOO_MANY_TEXT: 350`" ) try: buttons = [ [ InlineKeyboardButton( text="๐ŸŽจ Gemini AI Generate Image", callback_data=f"genprompt_{message.from_user.id}" ) ], [ InlineKeyboardButton( text="๐Ÿ–Œ๏ธ FLUX AI Generate Image", callback_data=f"fluxprompt_{message.from_user.id}" ) ], [ InlineKeyboardButton( text="โŒ Cancel", callback_data="closedd" ) ] ] await db.backup_chatbot.update_one( {"user_id": message.from_user.id}, {"$set": {"prompt_image": query_base}}, upsert=True ) await message.reply_photo( photo="loading.jpg", caption="Are you sure you want to prompt this Image generate?", reply_markup=InlineKeyboardMarkup(buttons) ) return except Exception as e: LOGS.error(f"Error: Gemini Image: {str(e)}") return await message.reply_text("Server busy try again later") if re.findall(r"\b(enabled:chatsystem)\b", query_base, re.IGNORECASE): await db.backup_chatbot.update_one( {"user_id": message.from_user.id}, {"$set": {"is_system": True}}, upsert=True ) await message.reply_text( "Ok set the system successfully" ) return if re.findall(r"\b(disabled:chatsystem)\b", query_base, re.IGNORECASE): await db.backup_chatbot.update_one( {"user_id": message.from_user.id}, {"$set": {"is_system": False}}, upsert=True ) await message.reply_text( "Ok successfully deleted the system" ) return check_is_system = await db.backup_chatbot.find_one({"user_id": message.from_user.id}) if not check_is_system: await db.backup_chatbot.update_one( {"user_id": message.from_user.id}, {"$set": {"is_system": False}}, upsert=True ) await message.reply_text( "Ok Updated Now Type anything to begin." ) return if check_is_system.get("is_system", False): backup_chat = await db._get_chatbot_chat_from_db(message.from_user.id) backup_chat.append({"role": "user", "parts": [{"text": query_base}]}) response = await gen.aio.models.generate_content( model=model_, contents=query_base, config=ty.GenerateContentConfig( system_instruction="You are my name Randy Dev and friends coding language Indonesia and English", temperature=0, top_p=0.95, top_k=20, candidate_count=1, seed=5, max_output_tokens=4096, stop_sequences=['STOP!'], presence_penalty=0.0, frequency_penalty=0.0, ) ) if len(response.text) > 4096: text_parts = split_message(response.text) for part in text_parts: await message.reply_text(part) else: keyboard_like = create_keyboard(user_id=message.from_user.id) await message.reply_text( response.text, disable_web_page_preview=True, reply_markup=keyboard_like ) await db.backup_chatbot.update_one( {"user_id": message.from_user.id}, {"$set": {"translate_text": response.text}}, upsert=True ) backup_chat.append({"role": "model", "parts": [{"text": response.text}]}) await db._update_chatbot_chat_in_db(message.from_user.id, backup_chat) await client.send_chat_action(message.chat.id, enums.ChatAction.CANCEL) return if re.findall(r"\b(I want to delete my data history)\b", query_base, re.IGNORECASE): buttons = [ [ InlineKeyboardButton( text="๐Ÿ’€ Delete my data history", callback_data="deletemydatagm" ) ], [ InlineKeyboardButton( text="โŒ Cancel", callback_data="closedd" ) ] ] await message.reply_text( "Are you sure you want to delete this data history?", reply_markup=InlineKeyboardMarkup(buttons) ) return backup_chat = await db._get_chatbot_chat_from_db(message.from_user.id) backup_chat.append({"role": "user", "parts": [{"text": query_base}]}) chat_session = gen.aio.chats.create( model=model_, history=backup_chat, config=ty.GenerateContentConfig( system_instruction="You are my name Randy Dev and friends coding language Indonesia and English", temperature=0, top_p=0.95, top_k=20, candidate_count=1, seed=5, max_output_tokens=4096, stop_sequences=['STOP!'], presence_penalty=0.0, frequency_penalty=0.0, ) ) response_data = await chat_session.send_message(query_base) output = response_data.text if len(output) > 4096: text_parts = split_message(output) for part in text_parts: await message.reply_text(part) else: keyboard_like = create_keyboard(user_id=message.from_user.id) await message.reply_text( output, disable_web_page_preview=True, reply_markup=keyboard_like ) await db.backup_chatbot.update_one( {"user_id": message.from_user.id}, {"$set": {"translate_text": output}}, upsert=True ) backup_chat.append({"role": "model", "parts": [{"text": output}]}) await db._update_chatbot_chat_in_db(message.from_user.id, backup_chat) await client.send_chat_action(message.chat.id, enums.ChatAction.CANCEL) return except Exception as e: LOGS.error(f"Error: message.text: {str(e)}") return await message.reply_text("Server busy try again later")