import telebot import subprocess import threading import time import re import html from collections import deque import os import re import signal import psutil from fastapi import FastAPI # Initialize bot with your token TOKEN = os.environ["BOT_TOKEN"] bot = telebot.TeleBot(TOKEN) # Store authorized chat IDs AUTHORIZED_USERS = list(map(int, re.findall(r"\d+", str(os.getenv("CHAT_IDS", ""))))) # Store active command processes active_processes = {} # Store logs history (for each chat_id) logs_history = {} MESSAGE_LIMIT = 4000 # Maximum number of log lines to keep in history MAX_LOG_HISTORY = 1000 # Maximum number of command history entries MAX_COMMAND_HISTORY = 20 # Store command history command_history = {} def is_authorized(chat_id): """Check if user is authorized""" return chat_id in AUTHORIZED_USERS def initialize_logs_history(chat_id): """Initialize logs history for a chat""" if chat_id not in logs_history: logs_history[chat_id] = deque(maxlen=MAX_LOG_HISTORY) def add_to_logs_history(chat_id, log_line): """Add log line to history""" initialize_logs_history(chat_id) logs_history[chat_id].append(log_line) def initialize_command_history(chat_id): """Initialize command history for a chat""" if chat_id not in command_history: command_history[chat_id] = deque(maxlen=MAX_COMMAND_HISTORY) def add_to_command_history(chat_id, command): """Add a command to the history""" initialize_command_history(chat_id) command_history[chat_id].append(command) def get_command_history(chat_id): """Get the command history for a chat""" if chat_id in command_history: return list(command_history[chat_id]) return [] def get_latest_logs(chat_id): """Get the latest logs for a chat""" if chat_id in logs_history: return "\n".join(logs_history[chat_id]) return "" def sanitize_log(text): """Sanitize text for HTML formatting""" return html.escape(text) def split_logs(logs, latest_only=False): """Split logs into chunks respecting MESSAGE_LIMIT. If `latest_only` is True, only return the last chunk. """ chunks = [] current_chunk = "" for line in logs.split('\n'): if len(current_chunk) + len(line) + 1 > MESSAGE_LIMIT: chunks.append(current_chunk) current_chunk = line + '\n' else: current_chunk += line + '\n' if current_chunk: chunks.append(current_chunk) # If latest_only is True, return only the last chunk if latest_only and chunks: return [chunks[-1]] return chunks def create_log_keyboard(is_live=True): """Create inline keyboard for log control""" keyboard = telebot.types.InlineKeyboardMarkup(row_width=2) if is_live: keyboard.add( telebot.types.InlineKeyboardButton("βΈοΈ Pause", callback_data="stop_logs"), telebot.types.InlineKeyboardButton("βΆοΈ Resume", callback_data="resume_logs"), telebot.types.InlineKeyboardButton("π Refresh", callback_data="refresh_logs"), telebot.types.InlineKeyboardButton("β Clear", callback_data="clear_logs") ) else: keyboard.add( telebot.types.InlineKeyboardButton("β Clear log history", callback_data="clear_logs") ) return keyboard def stream_command(chat_id, command): initialize_logs_history(chat_id) initialize_command_history(chat_id) add_to_command_history(chat_id, command) process = subprocess.Popen( command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=True, text=True, bufsize=1, universal_newlines=True ) active_processes[chat_id] = { 'process': process, 'last_message_id': None, 'buffer': "", 'last_update': time.time(), 'show_logs': True, 'paused_at': None } UPDATE_INTERVAL = 2 try: while True: line = process.stdout.readline() if not line and process.poll() is not None: break if line: add_to_logs_history(chat_id, line.strip()) active_processes[chat_id]['buffer'] += line current_time = time.time() if (active_processes[chat_id]['show_logs'] and current_time - active_processes[chat_id]['last_update'] >= UPDATE_INTERVAL): send_log_update(chat_id) active_processes[chat_id]['last_update'] = current_time except Exception as e: bot.send_message(chat_id, f"β Error: {str(e)}", parse_mode='HTML') finally: if chat_id in active_processes: if active_processes[chat_id]['buffer'] and active_processes[chat_id]['show_logs']: send_log_update(chat_id) if process.poll() is not None: bot.send_message( chat_id, f"β Command completed with exit code: {process.returncode}", parse_mode='HTML' ) try: del active_processes[chat_id] except KeyError: pass def send_log_update(chat_id): """Send or update log message""" process_info = active_processes[chat_id] log_content = process_info['buffer'] if process_info['process'].poll() is None: chunks = split_logs(log_content) total_pages = len(chunks) # Determine the content of the current page (latest log chunk) current_chunk = chunks[-1] if chunks else "" # Add the page count to the log header status = "π Live Log (updating...)" if process_info['show_logs'] else "π Live Log (paused)" header = f"{status} (Page {total_pages}/{total_pages})\n" formatted_message = f"{header}\n
{sanitize_log(current_chunk)}"
if not process_info['show_logs']:
formatted_message += "\nLogs are paused. Click Resume to continue showing logs." # Append a timestamp to force a slight change in the content when paused if not process_info['show_logs']: formatted_message += f"\n\n
Last paused at: {time.strftime('%H:%M:%S')}"
try:
if process_info['last_message_id']:
# Edit the current message only if content differs
bot.edit_message_text(
formatted_message,
chat_id=chat_id,
message_id=process_info['last_message_id'],
parse_mode='HTML',
reply_markup=create_log_keyboard()
)
else:
# Send a new message if there isn't an existing one
msg = bot.send_message(
chat_id,
formatted_message,
parse_mode='HTML',
reply_markup=create_log_keyboard()
)
process_info['last_message_id'] = msg.message_id
except telebot.apihelper.ApiException as e:
# Handle specific "message is not modified" errors
if "message is not modified" in str(e):
# Skip the update since content is the same
pass
else:
bot.send_message(chat_id, f"Error updating message: {str(e)}")
else:
chunks = split_logs(log_content)
for i, chunk in enumerate(chunks, 1):
formatted_chunk = (
f"π Final Log Part {i}/{len(chunks)}\n"
f"{sanitize_log(chunk)}"
)
bot.send_message(chat_id, formatted_chunk, parse_mode='HTML')
if process_info['process'].poll() is not None:
process_info['buffer'] = ""
@bot.callback_query_handler(func=lambda call: True)
def handle_callback(call):
chat_id = call.message.chat.id
if chat_id not in active_processes and call.data != "clear_logs":
bot.answer_callback_query(call.id, "No active process")
return
if call.data == "stop_logs":
active_processes[chat_id]['show_logs'] = False
bot.answer_callback_query(call.id, "Logs paused")
send_log_update(chat_id)
elif call.data == "resume_logs":
active_processes[chat_id]['show_logs'] = True
bot.answer_callback_query(call.id, "Logs resumed")
send_log_update(chat_id)
elif call.data == "refresh_logs":
bot.answer_callback_query(call.id, "Logs refreshed")
send_log_update(chat_id)
elif call.data == "clear_logs":
if chat_id in logs_history:
logs_history[chat_id].clear()
if chat_id in active_processes:
active_processes[chat_id]['buffer'] = ""
bot.answer_callback_query(call.id, "Logs cleared")
@bot.message_handler(commands=['logs'])
def show_logs(message):
chat_id = message.chat.id
# Check if the user is authorized to use the bot
if not is_authorized(chat_id):
bot.reply_to(message, "β You are not authorized to use this bot.")
return
# Initialize the log history for the chat if it doesn't exist
initialize_logs_history(chat_id)
# Determine the logs content and whether it's live or historical
logs_content = ""
if chat_id in active_processes:
logs_content = active_processes[chat_id]['buffer']
is_live = True
else:
logs_content = get_latest_logs(chat_id)
is_live = False
# Show only the current live logs if the command is active
if is_live and logs_content:
# Get only the latest chunk of logs
chunks = split_logs(logs_content, latest_only=True)
if chunks:
status = "π Live Logs"
formatted_chunk = f"{status} (Current Live Logs)\n{sanitize_log(chunks[0])}"
# Send the formatted log chunk as a message
msg = bot.send_message(
chat_id,
formatted_chunk,
parse_mode='HTML',
reply_markup=create_log_keyboard(is_live)
)
# Store the message ID for future updates
active_processes[chat_id]['last_message_id'] = msg.message_id
else:
# If there are logs to show and no active process
if logs_content:
# Split the logs into chunks to avoid hitting the message length limit
chunks = split_logs(logs_content)
# Send all parts of the logs
for i, chunk in enumerate(chunks, 1):
status = "π Historical Logs"
formatted_chunk = f"{status} (Part {i}/{len(chunks)})\n{sanitize_log(chunk)}"
# Send the formatted log chunk as a message
bot.send_message(
chat_id,
formatted_chunk,
parse_mode='HTML',
reply_markup=create_log_keyboard(is_live)
)
else:
# If no logs are available, inform the user
bot.reply_to(message, "No logs available.")
@bot.message_handler(commands=['stop'])
def stop_command(message):
chat_id = message.chat.id
if not is_authorized(chat_id):
bot.reply_to(message, "β You are not authorized to use this bot.")
return
if chat_id in active_processes:
try:
# 1. First, disable log updates
active_processes[chat_id]['show_logs'] = False
active_processes[chat_id]['buffer'] = ""
# 2. Get the process
process = active_processes[chat_id]['process']
# 3. Try to terminate the process using Windows-friendly approach
try:
# Get the process using psutil
parent = psutil.Process(process.pid)
# Get all children processes
children = parent.children(recursive=True)
# Terminate children first
for child in children:
child.terminate()
# Terminate parent
parent.terminate()
# Wait for processes to terminate
psutil.wait_procs(children + [parent], timeout=3)
except:
# Fallback: try direct termination
try:
process.terminate()
except:
pass
# 4. Send success message
bot.reply_to(message, "π Command stopped successfully.")
except Exception as e:
bot.reply_to(message, f"β Error stopping command: {str(e)}")
finally:
# 5. Clean up
if chat_id in active_processes:
try:
# One final attempt to kill if still running
process = active_processes[chat_id]['process']
if process.poll() is None:
process.kill()
except:
pass
# Remove from active processes
del active_processes[chat_id]
else:
bot.reply_to(message, "No active command to stop.")
@bot.message_handler(commands=['cmd'])
def show_command_history(message):
chat_id = message.chat.id
if not is_authorized(chat_id):
bot.reply_to(message, "β You are not authorized to use this bot.")
return
commands = get_command_history(chat_id)
if commands:
# formatted_commands = "\n".join(f"{i + 1}. {cmd}" for i, cmd in enumerate(commands))
formatted_commands = "\n".join([f"{i+1}. {cmd}" for i, cmd in enumerate(commands)])
# bot.reply_to(message, f"π Command History:\n{formatted_commands}")
# bot.reply_to(message, f"π Last {len(commands)} commands:\n{formatted_commands}", parse_mode='HTML')
bot.reply_to(message, f"π Command History (Last {len(commands)} commands):\n{formatted_commands}", parse_mode='HTML')
else:
bot.reply_to(message, "No command history available.")
@bot.message_handler(commands=['start'])
def send_welcome(message):
bot.reply_to(
message,
"π Welcome! Send me any command to execute and monitor logs.\n"
"Use /stop to stop the current command.\n"
"Use /cmds to see the last 20 executed commands."
)
@bot.message_handler(func=lambda message: True)
def execute_command(message):
chat_id = message.chat.id
if not is_authorized(chat_id):
bot.reply_to(message, "β You are not authorized to use this bot.")
return
if chat_id in active_processes:
bot.reply_to(
message,
"β οΈ A command is already running. Use /stop to stop it first."
)
return
command = message.text
bot.reply_to(message, f"βΆοΈ Executing command: {command}")
thread = threading.Thread(
target=stream_command,
args=(chat_id, command)
)
thread.start()
# # Start the bot
# try:
# bot.polling(none_stop=True)
# except Exception as e:
# print(f"Bot polling error: {e}")
# βββββββββββββββββββββββββββββββββββββββββββββββββββ FastAPI βββ
# from telegram_preview import router as telegram_preview_router
import telegram_preview
app = FastAPI()
telegram_preview.include_in_app(app) # mounts / and /static
@app.get("/")
def health():
return {"status": "ok"
@app.on_event("startup")
def startup():
# Launch the bot *after* Uvicorn has started
threading.Thread(target=bot.infinity_polling, daemon=True).start()
if __name__ == "__main__":
import uvicorn
uvicorn.run("app:app", host="0.0.0.0", port=7860)