approve-dev / chatbot /plugins /autobanch.py
randydev's picture
Update chatbot/plugins/autobanch.py
f4d0d61 verified
raw
history blame
4.54 kB
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright 2020-2024 (c) Randy W @xtdevs, @xtsea
#
# from : https://github.com/TeamKillerX
# Channel : @RendyProjects
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import time
import json
import asyncio
import io
import os
import re
import logging
from pyrogram import *
from pyrogram.enums import ChatMemberStatus, ChatType
from pyrogram import enums
from pyrogram import Client, filters
from pyrogram.types import *
from pyrogram.errors import *
from database import db
from logger import LOGS
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@Client.on_callback_query(filters.regex("^unbanch_"))
async def unbanch_usert(client: Client, cb: CallbackQuery):
data = cb.data.split("_")
if len(data) != 2:
await cb.answer("Invalid data format.", True)
return
try:
user_id = int(data[1])
except ValueError:
await cb.answer("Invalid user ID.", True)
return
try:
if cb.from_user.id == 1191668125:
await client.unban_chat_member(
"RendyProjects",
user_id,
)
await cb.edit_message_text(
f"User {user_id} has been unbanned from Rendy Projects.",
disable_web_page_preview=True,
)
logger.info(f"User {user_id} has been unbanned by admin {cb.from_user.id}.")
else:
await cb.answer("Only the Developer can perform this action.", True)
logger.warning(f"Unauthorized unban attempt by user {cb.from_user.id}.")
except Exception as e:
await cb.answer(f"Error: {e}", True)
logger.error(f"Error unbanning user {user_id}: {e}")
def create_button_unban(user_id):
return InlineKeyboardMarkup(
[[InlineKeyboardButton(
text="⚠️ Unban", callback_data=f"unbanch_{user_id}")]]
)
@Client.on_chat_member_updated(
filters.chat("RendyProjects")
)
async def auto_banned_ch(client: Client, event: ChatMemberUpdated):
logger.info(f"Chat member update: {event}")
old_status = event.old_chat_member.status if event.old_chat_member else None
new_status = event.new_chat_member.status if event.new_chat_member else None
if old_status == ChatMemberStatus.MEMBER:
if event.chat.type == ChatType.CHANNEL:
try:
text_ban = f"User {event.from_user.first_name} (ID: {event.from_user.id}) was banned from {event.chat.title}."
await client.ban_chat_member(
event.chat.id,
event.from_user.id,
)
await client.send_message(
"KillerXSupport",
text_ban,
reply_markup=create_button_unban(event.from_user.id)
)
logger.info(f"User {event.from_user.id} has been banned for leaving the chat.")
except Exception as e:
await client.send_message("KillerXSupport", f"Error banning user {event.from_user.id}: {e}")
logger.error(f"Error banning user {event.from_user.id}: {e}")
if new_status == ChatMemberStatus.MEMBER and event.new_chat_member.status == ChatMemberStatus.MEMBER:
try:
if await db.is_gbanned(user_id):
text_ban = f"User {user_first_name} (ID: {user_id}) was banned from {event.chat.title} due to global ban."
await client.ban_chat_member(
event.chat.id,
user_id,
)
await client.send_message("KillerXSupport", text_ban)
logger.info(f"User {user_id} was globally banned and has been banned from the chat.")
except Exception as e:
await client.send_message("KillerXSupport", f"Error banning user {user_id}: {e}")
logger.error(f"Error banning user {user_id}: {e}")