import telebot import subprocess import threading import time import re import html from collections import deque import os import re import signal import psutil from fastapi import FastAPI # Initialize bot with your token TOKEN = os.environ["BOT_TOKEN"] bot = telebot.TeleBot(TOKEN) # Store authorized chat IDs AUTHORIZED_USERS = list(map(int, re.findall(r"\d+", str(os.getenv("CHAT_IDS", ""))))) # Store active command processes active_processes = {} # Store logs history (for each chat_id) logs_history = {} MESSAGE_LIMIT = 4000 # Maximum number of log lines to keep in history MAX_LOG_HISTORY = 1000 # Maximum number of command history entries MAX_COMMAND_HISTORY = 20 # Store command history command_history = {} def is_authorized(chat_id): """Check if user is authorized""" return chat_id in AUTHORIZED_USERS def initialize_logs_history(chat_id): """Initialize logs history for a chat""" if chat_id not in logs_history: logs_history[chat_id] = deque(maxlen=MAX_LOG_HISTORY) def add_to_logs_history(chat_id, log_line): """Add log line to history""" initialize_logs_history(chat_id) logs_history[chat_id].append(log_line) def initialize_command_history(chat_id): """Initialize command history for a chat""" if chat_id not in command_history: command_history[chat_id] = deque(maxlen=MAX_COMMAND_HISTORY) def add_to_command_history(chat_id, command): """Add a command to the history""" initialize_command_history(chat_id) command_history[chat_id].append(command) def get_command_history(chat_id): """Get the command history for a chat""" if chat_id in command_history: return list(command_history[chat_id]) return [] def get_latest_logs(chat_id): """Get the latest logs for a chat""" if chat_id in logs_history: return "\n".join(logs_history[chat_id]) return "" def sanitize_log(text): """Sanitize text for HTML formatting""" return html.escape(text) def split_logs(logs, latest_only=False): """Split logs into chunks respecting MESSAGE_LIMIT. If `latest_only` is True, only return the last chunk. """ chunks = [] current_chunk = "" for line in logs.split('\n'): if len(current_chunk) + len(line) + 1 > MESSAGE_LIMIT: chunks.append(current_chunk) current_chunk = line + '\n' else: current_chunk += line + '\n' if current_chunk: chunks.append(current_chunk) # If latest_only is True, return only the last chunk if latest_only and chunks: return [chunks[-1]] return chunks def create_log_keyboard(is_live=True): """Create inline keyboard for log control""" keyboard = telebot.types.InlineKeyboardMarkup(row_width=2) if is_live: keyboard.add( telebot.types.InlineKeyboardButton("⏸️ Pause", callback_data="stop_logs"), telebot.types.InlineKeyboardButton("▢️ Resume", callback_data="resume_logs"), telebot.types.InlineKeyboardButton("πŸ”„ Refresh", callback_data="refresh_logs"), telebot.types.InlineKeyboardButton("❌ Clear", callback_data="clear_logs") ) else: keyboard.add( telebot.types.InlineKeyboardButton("❌ Clear log history", callback_data="clear_logs") ) return keyboard def stream_command(chat_id, command): initialize_logs_history(chat_id) initialize_command_history(chat_id) add_to_command_history(chat_id, command) process = subprocess.Popen( command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=True, text=True, bufsize=1, universal_newlines=True ) active_processes[chat_id] = { 'process': process, 'last_message_id': None, 'buffer': "", 'last_update': time.time(), 'show_logs': True, 'paused_at': None } UPDATE_INTERVAL = 2 try: while True: line = process.stdout.readline() if not line and process.poll() is not None: break if line: add_to_logs_history(chat_id, line.strip()) active_processes[chat_id]['buffer'] += line current_time = time.time() if (active_processes[chat_id]['show_logs'] and current_time - active_processes[chat_id]['last_update'] >= UPDATE_INTERVAL): send_log_update(chat_id) active_processes[chat_id]['last_update'] = current_time except Exception as e: bot.send_message(chat_id, f"❌ Error: {str(e)}", parse_mode='HTML') finally: if chat_id in active_processes: if active_processes[chat_id]['buffer'] and active_processes[chat_id]['show_logs']: send_log_update(chat_id) if process.poll() is not None: bot.send_message( chat_id, f"βœ… Command completed with exit code: {process.returncode}", parse_mode='HTML' ) try: del active_processes[chat_id] except KeyError: pass def send_log_update(chat_id): """Send or update log message""" process_info = active_processes[chat_id] log_content = process_info['buffer'] if process_info['process'].poll() is None: chunks = split_logs(log_content) total_pages = len(chunks) # Determine the content of the current page (latest log chunk) current_chunk = chunks[-1] if chunks else "" # Add the page count to the log header status = "πŸ“‹ Live Log (updating...)" if process_info['show_logs'] else "πŸ“‹ Live Log (paused)" header = f"{status} (Page {total_pages}/{total_pages})\n" formatted_message = f"{header}\n
{sanitize_log(current_chunk)}
" if not process_info['show_logs']: formatted_message += "\n
Logs are paused. Click Resume to continue showing logs.
" # Append a timestamp to force a slight change in the content when paused if not process_info['show_logs']: formatted_message += f"\n\n
Last paused at: {time.strftime('%H:%M:%S')}
" try: if process_info['last_message_id']: # Edit the current message only if content differs bot.edit_message_text( formatted_message, chat_id=chat_id, message_id=process_info['last_message_id'], parse_mode='HTML', reply_markup=create_log_keyboard() ) else: # Send a new message if there isn't an existing one msg = bot.send_message( chat_id, formatted_message, parse_mode='HTML', reply_markup=create_log_keyboard() ) process_info['last_message_id'] = msg.message_id except telebot.apihelper.ApiException as e: # Handle specific "message is not modified" errors if "message is not modified" in str(e): # Skip the update since content is the same pass else: bot.send_message(chat_id, f"Error updating message: {str(e)}") else: chunks = split_logs(log_content) for i, chunk in enumerate(chunks, 1): formatted_chunk = ( f"πŸ“‹ Final Log Part {i}/{len(chunks)}\n" f"
{sanitize_log(chunk)}
" ) bot.send_message(chat_id, formatted_chunk, parse_mode='HTML') if process_info['process'].poll() is not None: process_info['buffer'] = "" @bot.callback_query_handler(func=lambda call: True) def handle_callback(call): chat_id = call.message.chat.id if chat_id not in active_processes and call.data != "clear_logs": bot.answer_callback_query(call.id, "No active process") return if call.data == "stop_logs": active_processes[chat_id]['show_logs'] = False bot.answer_callback_query(call.id, "Logs paused") send_log_update(chat_id) elif call.data == "resume_logs": active_processes[chat_id]['show_logs'] = True bot.answer_callback_query(call.id, "Logs resumed") send_log_update(chat_id) elif call.data == "refresh_logs": bot.answer_callback_query(call.id, "Logs refreshed") send_log_update(chat_id) elif call.data == "clear_logs": if chat_id in logs_history: logs_history[chat_id].clear() if chat_id in active_processes: active_processes[chat_id]['buffer'] = "" bot.answer_callback_query(call.id, "Logs cleared") @bot.message_handler(commands=['logs']) def show_logs(message): chat_id = message.chat.id # Check if the user is authorized to use the bot if not is_authorized(chat_id): bot.reply_to(message, "❌ You are not authorized to use this bot.") return # Initialize the log history for the chat if it doesn't exist initialize_logs_history(chat_id) # Determine the logs content and whether it's live or historical logs_content = "" if chat_id in active_processes: logs_content = active_processes[chat_id]['buffer'] is_live = True else: logs_content = get_latest_logs(chat_id) is_live = False # Show only the current live logs if the command is active if is_live and logs_content: # Get only the latest chunk of logs chunks = split_logs(logs_content, latest_only=True) if chunks: status = "πŸ“‹ Live Logs" formatted_chunk = f"{status} (Current Live Logs)\n
{sanitize_log(chunks[0])}
" # Send the formatted log chunk as a message msg = bot.send_message( chat_id, formatted_chunk, parse_mode='HTML', reply_markup=create_log_keyboard(is_live) ) # Store the message ID for future updates active_processes[chat_id]['last_message_id'] = msg.message_id else: # If there are logs to show and no active process if logs_content: # Split the logs into chunks to avoid hitting the message length limit chunks = split_logs(logs_content) # Send all parts of the logs for i, chunk in enumerate(chunks, 1): status = "πŸ“‹ Historical Logs" formatted_chunk = f"{status} (Part {i}/{len(chunks)})\n
{sanitize_log(chunk)}
" # Send the formatted log chunk as a message bot.send_message( chat_id, formatted_chunk, parse_mode='HTML', reply_markup=create_log_keyboard(is_live) ) else: # If no logs are available, inform the user bot.reply_to(message, "No logs available.") @bot.message_handler(commands=['stop']) def stop_command(message): chat_id = message.chat.id if not is_authorized(chat_id): bot.reply_to(message, "❌ You are not authorized to use this bot.") return if chat_id in active_processes: try: # 1. First, disable log updates active_processes[chat_id]['show_logs'] = False active_processes[chat_id]['buffer'] = "" # 2. Get the process process = active_processes[chat_id]['process'] # 3. Try to terminate the process using Windows-friendly approach try: # Get the process using psutil parent = psutil.Process(process.pid) # Get all children processes children = parent.children(recursive=True) # Terminate children first for child in children: child.terminate() # Terminate parent parent.terminate() # Wait for processes to terminate psutil.wait_procs(children + [parent], timeout=3) except: # Fallback: try direct termination try: process.terminate() except: pass # 4. Send success message bot.reply_to(message, "πŸ›‘ Command stopped successfully.") except Exception as e: bot.reply_to(message, f"❌ Error stopping command: {str(e)}") finally: # 5. Clean up if chat_id in active_processes: try: # One final attempt to kill if still running process = active_processes[chat_id]['process'] if process.poll() is None: process.kill() except: pass # Remove from active processes del active_processes[chat_id] else: bot.reply_to(message, "No active command to stop.") @bot.message_handler(commands=['cmd']) def show_command_history(message): chat_id = message.chat.id if not is_authorized(chat_id): bot.reply_to(message, "❌ You are not authorized to use this bot.") return commands = get_command_history(chat_id) if commands: # formatted_commands = "\n".join(f"{i + 1}. {cmd}" for i, cmd in enumerate(commands)) formatted_commands = "\n".join([f"{i+1}. {cmd}" for i, cmd in enumerate(commands)]) # bot.reply_to(message, f"πŸ“œ Command History:\n{formatted_commands}") # bot.reply_to(message, f"πŸ“œ Last {len(commands)} commands:\n
{formatted_commands}
", parse_mode='HTML') bot.reply_to(message, f"πŸ“œ Command History (Last {len(commands)} commands):\n
{formatted_commands}
", parse_mode='HTML') else: bot.reply_to(message, "No command history available.") @bot.message_handler(commands=['start']) def send_welcome(message): bot.reply_to( message, "πŸ‘‹ Welcome! Send me any command to execute and monitor logs.\n" "Use /stop to stop the current command.\n" "Use /cmds to see the last 20 executed commands." ) @bot.message_handler(func=lambda message: True) def execute_command(message): chat_id = message.chat.id if not is_authorized(chat_id): bot.reply_to(message, "❌ You are not authorized to use this bot.") return if chat_id in active_processes: bot.reply_to( message, "⚠️ A command is already running. Use /stop to stop it first." ) return command = message.text bot.reply_to(message, f"▢️ Executing command: {command}") thread = threading.Thread( target=stream_command, args=(chat_id, command) ) thread.start() # # Start the bot # try: # bot.polling(none_stop=True) # except Exception as e: # print(f"Bot polling error: {e}") # ─────────────────────────────────────────────────── FastAPI ─── # from telegram_preview import router as telegram_preview_router import telegram_preview app = FastAPI() telegram_preview.include_in_app(app) # Simple health‑check endpoint @app.get("/health") def health(): return {"status": "ok"} # OPTIONAL: if you really want a startup hook, keep it complete: # @app.on_event("startup") # async def startup(): # print("Application started") @app.on_event("startup") def startup(): # Launch the bot *after* Uvicorn has started threading.Thread(target=bot.infinity_polling, daemon=True).start() if __name__ == "__main__": import uvicorn uvicorn.run("app:app", host="0.0.0.0", port=7860)