approve-dev / chatbot /plugins /join_request.py
randydev's picture
Update chatbot/plugins/join_request.py
974a56d verified
raw
history blame
7.29 kB
import os
import time
import json
import asyncio
import io
import os
import re
import logging
import string
import random
from pyrogram import *
from pyrogram.types import *
from pyrogram.errors import *
from database import db
from logger import LOGS
from pyrogram import Client, filters
from pyrogram.enums import ChatMemberStatus, ChatMembersFilter, ChatType
from pyrogram.types import (
CallbackQuery,
InlineKeyboardMarkup,
InlineKeyboardButton,
Message
)
from PIL import Image, ImageDraw, ImageFont
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
user_list = []
captcha_texts = {}
async def remove_captcha_after_timeout(client, user_id, delay=300):
await asyncio.sleep(delay)
if user_id in captcha_texts:
del captcha_texts[user_id]
await client.send_message(user_id, "⏰ Your CAPTCHA verification has expired. Please try again.")
logger.info(f"CAPTCHA for user {user_id} has expired and been removed.")
def generate_captcha_multiple_choice():
letters = string.ascii_uppercase + string.digits
captcha_text = ''.join(random.choice(letters) for _ in range(5))
choices = [captcha_text]
for _ in range(2):
wrong_choice = ''.join(random.choice(letters) for _ in range(5))
while wrong_choice in choices:
wrong_choice = ''.join(random.choice(letters) for _ in range(5))
choices.append(wrong_choice)
random.shuffle(choices)
img = Image.new('RGB', (300, 60), color=(255, 255, 255))
d = ImageDraw.Draw(img)
try:
font = ImageFont.truetype("arial.ttf", 40)
except IOError:
font = ImageFont.load_default()
d.text((10, 10), captcha_text, font=font, fill=(0, 0, 0))
img_path = f"captcha_{captcha_text}.png"
img.save(img_path)
return captcha_text, img_path, choices
@Client.on_chat_join_request(filters.chat("KillerXSupport"))
async def join_request(client: Client, event: ChatJoinRequest):
member = await client.get_chat_member(event.chat.id, "me")
if member.status != ChatMemberStatus.ADMINISTRATOR:
return await client.send_message(event.chat.id, text="I am not an administrator in this group.")
async for m in client.get_chat_members(event.chat.id, filter=ChatMembersFilter.ADMINISTRATORS):
if not m.user.is_bot:
user_list.append(m.user.id)
captcha_text, img_path, choices = generate_captcha_multiple_choice()
captcha_texts[event.from_user.id] = captcha_text
captcha_texts["chat_id"] = event.chat.id
keyboard = InlineKeyboardMarkup(
[
[InlineKeyboardButton(choice, callback_data=f"verify_{event.from_user.id}_{choice}")]
for choice in choices
] + [
[InlineKeyboardButton("πŸ”„ Refresh CAPTCHA", callback_data="refresh_captcha")],
[InlineKeyboardButton("❌ Cancel", callback_data="cancel_captcha")]
]
)
if event.chat.type == ChatType.SUPERGROUP:
try:
await client.send_photo(
event.from_user.id,
photo=img_path,
caption=f"❗️ **Verify that you are human!**\n\n❔ Please select the correct CAPTCHA text shown in the image below.",
reply_markup=keyboard
)
os.remove(img_path)
asyncio.create_task(remove_captcha_after_timeout(client, event.from_user.id))
except Exception as e:
await client.send_message(
event.chat.id,
text=str(e)
)
logger.error(str(e))
@Client.on_callback_query(filters.regex("^cancel_captcha$"))
async def cancel_captcha_callback(client: Client, cb: CallbackQuery):
user_id = cb.from_user.id
if user_id in captcha_texts:
del captcha_texts[user_id]
logger.info(f"User {user_id} has canceled CAPTCHA verification.")
await cb.edit_message_text(
"❌ CAPTCHA verification has been canceled. If you wish to try again,",
disable_web_page_preview=True
)
await cb.answer("CAPTCHA verification canceled.", show_alert=False)
else:
await cb.answer("No active CAPTCHA verification found.", show_alert=True)
@Client.on_callback_query(filters.regex("^refresh_captcha$"))
async def refresh_captcha_callback(client: Client, cb: CallbackQuery):
user_id = cb.from_user.id
if user_id in captcha_texts:
del captcha_texts[user_id]
captcha_text, img_path, choices = generate_captcha_multiple_choice()
captcha_texts[user_id] = captcha_text
keyboard = InlineKeyboardMarkup(
[
[InlineKeyboardButton("πŸ”„ Refresh CAPTCHA", callback_data="refresh_captcha")],
[InlineKeyboardButton("βœ… Verify", callback_data="verify_captcha")],
[InlineKeyboardButton("❌ Cancel", callback_data="cancel_captcha")]
]
)
await cb.edit_message_media(
media=InputMediaPhoto(img_path),
reply_markup=keyboard
)
os.remove(img_path)
await cb.answer("CAPTCHA refreshed!", show_alert=False)
@Client.on_callback_query(filters.regex("^verify_"))
async def verify_captcha_multiple_choice_callback(client: Client, cb: CallbackQuery):
data = cb.data.split("_")
if len(data) != 3:
await cb.answer("Invalid data format.", show_alert=True)
return
_, user_id_str, user_choice = data
try:
user_id = int(user_id_str)
except ValueError:
await cb.answer("Invalid user ID.", show_alert=True)
return
if user_id not in captcha_texts:
await cb.answer("❗️ Please start the CAPTCHA verification first.", show_alert=True)
logger.warning(f"User {user_id} attempted to verify CAPTCHA without starting verification.")
return
correct_captcha = captcha_texts.get(user_id)
logger.info(f"User {user_id} selected CAPTCHA: {user_choice} (Expected: {correct_captcha})")
if user_choice == correct_captcha:
await cb.edit_message_text("βœ… CAPTCHA verification successful!")
logger.info(f"User {user_id} successfully verified CAPTCHA.")
await client.approve_chat_join_request(
chat_id=captcha_texts.get("chat_id"),
user_id=user_id
)
del captcha_texts[user_id]
del captcha_texts["chat_id"]
else:
await cb.edit_message_text("❌ Incorrect CAPTCHA. Please try again")
logger.info(f"User {user_id} failed CAPTCHA verification.")
await client.decline_chat_join_request(
chat_id=captcha_texts.get("chat_id"),
user_id=user_id
)
del captcha_texts[user_id]
del captcha_texts["chat_id"]
@Client.on_callback_query(filters.regex("^verify_captcha$"))
async def verify_captcha_callback(client: Client, cb: CallbackQuery):
user_id = cb.from_user.id
if user_id not in captcha_texts:
await cb.answer("❗️ Please start the CAPTCHA verification first", show_alert=True)
logger.warning(f"User {user_id} attempted to verify CAPTCHA without starting verification.")
return
await cb.message.reply_text("πŸ” **Please enter the CAPTCHA text exactly as shown in the image:**")
await cb.answer()
logger.info(f"User {user_id} is attempting to verify CAPTCHA.")