Spaces:
Running
Running
import os | |
import time | |
import json | |
import asyncio | |
import io | |
import os | |
import re | |
import logging | |
import string | |
import random | |
from pyrogram import * | |
from pyrogram.types import * | |
from pyrogram.errors import * | |
from database import db | |
from logger import LOGS | |
from pyrogram import Client, filters | |
from pyrogram.enums import ChatMemberStatus, ChatMembersFilter, ChatType | |
from pyrogram.types import ( | |
CallbackQuery, | |
InlineKeyboardMarkup, | |
InlineKeyboardButton, | |
Message | |
) | |
from PIL import Image, ImageDraw, ImageFont | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
user_list = [] | |
captcha_texts = {} | |
async def remove_captcha_after_timeout(client, user_id, delay=300): | |
await asyncio.sleep(delay) | |
if user_id in captcha_texts: | |
del captcha_texts[user_id] | |
await client.send_message(user_id, "β° Your CAPTCHA verification has expired. Please try again.") | |
logger.info(f"CAPTCHA for user {user_id} has expired and been removed.") | |
def generate_captcha_multiple_choice(): | |
letters = string.ascii_uppercase + string.digits | |
captcha_text = ''.join(random.choice(letters) for _ in range(5)) | |
choices = [captcha_text] | |
for _ in range(2): | |
wrong_choice = ''.join(random.choice(letters) for _ in range(5)) | |
while wrong_choice in choices: | |
wrong_choice = ''.join(random.choice(letters) for _ in range(5)) | |
choices.append(wrong_choice) | |
random.shuffle(choices) | |
img = Image.new('RGB', (300, 60), color=(255, 255, 255)) | |
d = ImageDraw.Draw(img) | |
try: | |
font = ImageFont.truetype("arial.ttf", 40) | |
except IOError: | |
font = ImageFont.load_default() | |
d.text((10, 10), captcha_text, font=font, fill=(0, 0, 0)) | |
img_path = f"captcha_{captcha_text}.png" | |
img.save(img_path) | |
return captcha_text, img_path, choices | |
async def join_request(client: Client, event: ChatJoinRequest): | |
member = await client.get_chat_member(event.chat.id, "me") | |
if member.status != ChatMemberStatus.ADMINISTRATOR: | |
return await client.send_message(event.chat.id, text="I am not an administrator in this group.") | |
async for m in client.get_chat_members(event.chat.id, filter=ChatMembersFilter.ADMINISTRATORS): | |
if not m.user.is_bot: | |
user_list.append(m.user.id) | |
captcha_text, img_path, choices = generate_captcha_multiple_choice() | |
captcha_texts[event.from_user.id] = captcha_text | |
captcha_texts["chat_id"] = event.chat.id | |
keyboard = InlineKeyboardMarkup( | |
[ | |
[InlineKeyboardButton(choice, callback_data=f"verify_{event.from_user.id}_{choice}")] | |
for choice in choices | |
] + [ | |
[InlineKeyboardButton("π Refresh CAPTCHA", callback_data="refresh_captcha")], | |
[InlineKeyboardButton("β Cancel", callback_data="cancel_captcha")] | |
] | |
) | |
if event.chat.type == ChatType.SUPERGROUP: | |
try: | |
await client.send_photo( | |
event.from_user.id, | |
photo=img_path, | |
caption=f"βοΈ **Verify that you are human!**\n\nβ Please select the correct CAPTCHA text shown in the image below.", | |
reply_markup=keyboard | |
) | |
os.remove(img_path) | |
asyncio.create_task(remove_captcha_after_timeout(client, event.from_user.id)) | |
except Exception as e: | |
await client.send_message( | |
event.chat.id, | |
text=str(e) | |
) | |
logger.error(str(e)) | |
async def cancel_captcha_callback(client: Client, cb: CallbackQuery): | |
user_id = cb.from_user.id | |
if user_id in captcha_texts: | |
del captcha_texts[user_id] | |
logger.info(f"User {user_id} has canceled CAPTCHA verification.") | |
await cb.edit_message_text( | |
"β CAPTCHA verification has been canceled. If you wish to try again,", | |
disable_web_page_preview=True | |
) | |
await cb.answer("CAPTCHA verification canceled.", show_alert=False) | |
else: | |
await cb.answer("No active CAPTCHA verification found.", show_alert=True) | |
async def refresh_captcha_callback(client: Client, cb: CallbackQuery): | |
user_id = cb.from_user.id | |
if user_id in captcha_texts: | |
del captcha_texts[user_id] | |
captcha_text, img_path, choices = generate_captcha_multiple_choice() | |
captcha_texts[user_id] = captcha_text | |
keyboard = InlineKeyboardMarkup( | |
[ | |
[InlineKeyboardButton("π Refresh CAPTCHA", callback_data="refresh_captcha")], | |
[InlineKeyboardButton("β Verify", callback_data="verify_captcha")], | |
[InlineKeyboardButton("β Cancel", callback_data="cancel_captcha")] | |
] | |
) | |
await cb.edit_message_media( | |
media=InputMediaPhoto(img_path), | |
reply_markup=keyboard | |
) | |
os.remove(img_path) | |
await cb.answer("CAPTCHA refreshed!", show_alert=False) | |
async def verify_captcha_multiple_choice_callback(client: Client, cb: CallbackQuery): | |
data = cb.data.split("_") | |
if len(data) != 3: | |
await cb.answer("Invalid data format.", show_alert=True) | |
return | |
_, user_id_str, user_choice = data | |
try: | |
user_id = int(user_id_str) | |
except ValueError: | |
await cb.answer("Invalid user ID.", show_alert=True) | |
return | |
if user_id not in captcha_texts: | |
await cb.answer("βοΈ Please start the CAPTCHA verification first.", show_alert=True) | |
logger.warning(f"User {user_id} attempted to verify CAPTCHA without starting verification.") | |
return | |
correct_captcha = captcha_texts.get(user_id) | |
logger.info(f"User {user_id} selected CAPTCHA: {user_choice} (Expected: {correct_captcha})") | |
if user_choice == correct_captcha: | |
await cb.edit_message_text("β CAPTCHA verification successful!") | |
logger.info(f"User {user_id} successfully verified CAPTCHA.") | |
await client.approve_chat_join_request( | |
chat_id=captcha_texts.get("chat_id"), | |
user_id=user_id | |
) | |
del captcha_texts[user_id] | |
del captcha_texts["chat_id"] | |
else: | |
await cb.edit_message_text("β Incorrect CAPTCHA. Please try again") | |
logger.info(f"User {user_id} failed CAPTCHA verification.") | |
await client.decline_chat_join_request( | |
chat_id=captcha_texts.get("chat_id"), | |
user_id=user_id | |
) | |
del captcha_texts[user_id] | |
del captcha_texts["chat_id"] | |
async def verify_captcha_callback(client: Client, cb: CallbackQuery): | |
user_id = cb.from_user.id | |
if user_id not in captcha_texts: | |
await cb.answer("βοΈ Please start the CAPTCHA verification first", show_alert=True) | |
logger.warning(f"User {user_id} attempted to verify CAPTCHA without starting verification.") | |
return | |
await cb.message.reply_text("π **Please enter the CAPTCHA text exactly as shown in the image:**") | |
await cb.answer() | |
logger.info(f"User {user_id} is attempting to verify CAPTCHA.") | |