Spaces:
Running
Running
import telebot | |
import subprocess | |
import threading | |
import time | |
import re | |
import html | |
from collections import deque | |
import os | |
import re | |
import signal | |
import psutil | |
from fastapi import FastAPI | |
# Initialize bot with your token | |
TOKEN = os.environ["BOT_TOKEN"] | |
bot = telebot.TeleBot(TOKEN) | |
# Store authorized chat IDs | |
AUTHORIZED_USERS = list(map(int, re.findall(r"\d+", str(os.getenv("CHAT_IDS", ""))))) | |
# Store active command processes | |
active_processes = {} | |
# Store logs history (for each chat_id) | |
logs_history = {} | |
MESSAGE_LIMIT = 4000 | |
# Maximum number of log lines to keep in history | |
MAX_LOG_HISTORY = 1000 | |
# Maximum number of command history entries | |
MAX_COMMAND_HISTORY = 20 | |
# Store command history | |
command_history = {} | |
def is_authorized(chat_id): | |
"""Check if user is authorized""" | |
return chat_id in AUTHORIZED_USERS | |
def initialize_logs_history(chat_id): | |
"""Initialize logs history for a chat""" | |
if chat_id not in logs_history: | |
logs_history[chat_id] = deque(maxlen=MAX_LOG_HISTORY) | |
def add_to_logs_history(chat_id, log_line): | |
"""Add log line to history""" | |
initialize_logs_history(chat_id) | |
logs_history[chat_id].append(log_line) | |
def initialize_command_history(chat_id): | |
"""Initialize command history for a chat""" | |
if chat_id not in command_history: | |
command_history[chat_id] = deque(maxlen=MAX_COMMAND_HISTORY) | |
def add_to_command_history(chat_id, command): | |
"""Add a command to the history""" | |
initialize_command_history(chat_id) | |
command_history[chat_id].append(command) | |
def get_command_history(chat_id): | |
"""Get the command history for a chat""" | |
if chat_id in command_history: | |
return list(command_history[chat_id]) | |
return [] | |
def get_latest_logs(chat_id): | |
"""Get the latest logs for a chat""" | |
if chat_id in logs_history: | |
return "\n".join(logs_history[chat_id]) | |
return "" | |
def sanitize_log(text): | |
"""Sanitize text for HTML formatting""" | |
return html.escape(text) | |
def split_logs(logs, latest_only=False): | |
"""Split logs into chunks respecting MESSAGE_LIMIT. | |
If `latest_only` is True, only return the last chunk. | |
""" | |
chunks = [] | |
current_chunk = "" | |
for line in logs.split('\n'): | |
if len(current_chunk) + len(line) + 1 > MESSAGE_LIMIT: | |
chunks.append(current_chunk) | |
current_chunk = line + '\n' | |
else: | |
current_chunk += line + '\n' | |
if current_chunk: | |
chunks.append(current_chunk) | |
# If latest_only is True, return only the last chunk | |
if latest_only and chunks: | |
return [chunks[-1]] | |
return chunks | |
def create_log_keyboard(is_live=True): | |
"""Create inline keyboard for log control""" | |
keyboard = telebot.types.InlineKeyboardMarkup(row_width=2) | |
if is_live: | |
keyboard.add( | |
telebot.types.InlineKeyboardButton("βΈοΈ Pause", callback_data="stop_logs"), | |
telebot.types.InlineKeyboardButton("βΆοΈ Resume", callback_data="resume_logs"), | |
telebot.types.InlineKeyboardButton("π Refresh", callback_data="refresh_logs"), | |
telebot.types.InlineKeyboardButton("β Clear", callback_data="clear_logs") | |
) | |
else: | |
keyboard.add( | |
telebot.types.InlineKeyboardButton("β Clear log history", callback_data="clear_logs") | |
) | |
return keyboard | |
def stream_command(chat_id, command): | |
initialize_logs_history(chat_id) | |
initialize_command_history(chat_id) | |
add_to_command_history(chat_id, command) | |
process = subprocess.Popen( | |
command, | |
stdout=subprocess.PIPE, | |
stderr=subprocess.STDOUT, | |
shell=True, | |
text=True, | |
bufsize=1, | |
universal_newlines=True | |
) | |
active_processes[chat_id] = { | |
'process': process, | |
'last_message_id': None, | |
'buffer': "", | |
'last_update': time.time(), | |
'show_logs': True, | |
'paused_at': None | |
} | |
UPDATE_INTERVAL = 2 | |
try: | |
while True: | |
line = process.stdout.readline() | |
if not line and process.poll() is not None: | |
break | |
if line: | |
add_to_logs_history(chat_id, line.strip()) | |
active_processes[chat_id]['buffer'] += line | |
current_time = time.time() | |
if (active_processes[chat_id]['show_logs'] and | |
current_time - active_processes[chat_id]['last_update'] >= UPDATE_INTERVAL): | |
send_log_update(chat_id) | |
active_processes[chat_id]['last_update'] = current_time | |
except Exception as e: | |
bot.send_message(chat_id, f"β Error: {str(e)}", parse_mode='HTML') | |
finally: | |
if chat_id in active_processes: | |
if active_processes[chat_id]['buffer'] and active_processes[chat_id]['show_logs']: | |
send_log_update(chat_id) | |
if process.poll() is not None: | |
bot.send_message( | |
chat_id, | |
f"β Command completed with exit code: {process.returncode}", | |
parse_mode='HTML' | |
) | |
try: | |
del active_processes[chat_id] | |
except KeyError: | |
pass | |
def send_log_update(chat_id): | |
"""Send or update log message""" | |
process_info = active_processes[chat_id] | |
log_content = process_info['buffer'] | |
if process_info['process'].poll() is None: | |
chunks = split_logs(log_content) | |
total_pages = len(chunks) | |
# Determine the content of the current page (latest log chunk) | |
current_chunk = chunks[-1] if chunks else "" | |
# Add the page count to the log header | |
status = "π Live Log (updating...)" if process_info['show_logs'] else "π Live Log (paused)" | |
header = f"{status} (Page {total_pages}/{total_pages})\n" | |
formatted_message = f"{header}\n<pre>{sanitize_log(current_chunk)}</pre>" | |
if not process_info['show_logs']: | |
formatted_message += "\n<pre>Logs are paused. Click Resume to continue showing logs.</pre>" | |
# Append a timestamp to force a slight change in the content when paused | |
if not process_info['show_logs']: | |
formatted_message += f"\n\n<pre>Last paused at: {time.strftime('%H:%M:%S')}</pre>" | |
try: | |
if process_info['last_message_id']: | |
# Edit the current message only if content differs | |
bot.edit_message_text( | |
formatted_message, | |
chat_id=chat_id, | |
message_id=process_info['last_message_id'], | |
parse_mode='HTML', | |
reply_markup=create_log_keyboard() | |
) | |
else: | |
# Send a new message if there isn't an existing one | |
msg = bot.send_message( | |
chat_id, | |
formatted_message, | |
parse_mode='HTML', | |
reply_markup=create_log_keyboard() | |
) | |
process_info['last_message_id'] = msg.message_id | |
except telebot.apihelper.ApiException as e: | |
# Handle specific "message is not modified" errors | |
if "message is not modified" in str(e): | |
# Skip the update since content is the same | |
pass | |
else: | |
bot.send_message(chat_id, f"Error updating message: {str(e)}") | |
else: | |
chunks = split_logs(log_content) | |
for i, chunk in enumerate(chunks, 1): | |
formatted_chunk = ( | |
f"π Final Log Part {i}/{len(chunks)}\n" | |
f"<pre>{sanitize_log(chunk)}</pre>" | |
) | |
bot.send_message(chat_id, formatted_chunk, parse_mode='HTML') | |
if process_info['process'].poll() is not None: | |
process_info['buffer'] = "" | |
def handle_callback(call): | |
chat_id = call.message.chat.id | |
if chat_id not in active_processes and call.data != "clear_logs": | |
bot.answer_callback_query(call.id, "No active process") | |
return | |
if call.data == "stop_logs": | |
active_processes[chat_id]['show_logs'] = False | |
bot.answer_callback_query(call.id, "Logs paused") | |
send_log_update(chat_id) | |
elif call.data == "resume_logs": | |
active_processes[chat_id]['show_logs'] = True | |
bot.answer_callback_query(call.id, "Logs resumed") | |
send_log_update(chat_id) | |
elif call.data == "refresh_logs": | |
bot.answer_callback_query(call.id, "Logs refreshed") | |
send_log_update(chat_id) | |
elif call.data == "clear_logs": | |
if chat_id in logs_history: | |
logs_history[chat_id].clear() | |
if chat_id in active_processes: | |
active_processes[chat_id]['buffer'] = "" | |
bot.answer_callback_query(call.id, "Logs cleared") | |
def show_logs(message): | |
chat_id = message.chat.id | |
# Check if the user is authorized to use the bot | |
if not is_authorized(chat_id): | |
bot.reply_to(message, "β You are not authorized to use this bot.") | |
return | |
# Initialize the log history for the chat if it doesn't exist | |
initialize_logs_history(chat_id) | |
# Determine the logs content and whether it's live or historical | |
logs_content = "" | |
if chat_id in active_processes: | |
logs_content = active_processes[chat_id]['buffer'] | |
is_live = True | |
else: | |
logs_content = get_latest_logs(chat_id) | |
is_live = False | |
# Show only the current live logs if the command is active | |
if is_live and logs_content: | |
# Get only the latest chunk of logs | |
chunks = split_logs(logs_content, latest_only=True) | |
if chunks: | |
status = "π Live Logs" | |
formatted_chunk = f"{status} (Current Live Logs)\n<pre>{sanitize_log(chunks[0])}</pre>" | |
# Send the formatted log chunk as a message | |
msg = bot.send_message( | |
chat_id, | |
formatted_chunk, | |
parse_mode='HTML', | |
reply_markup=create_log_keyboard(is_live) | |
) | |
# Store the message ID for future updates | |
active_processes[chat_id]['last_message_id'] = msg.message_id | |
else: | |
# If there are logs to show and no active process | |
if logs_content: | |
# Split the logs into chunks to avoid hitting the message length limit | |
chunks = split_logs(logs_content) | |
# Send all parts of the logs | |
for i, chunk in enumerate(chunks, 1): | |
status = "π Historical Logs" | |
formatted_chunk = f"{status} (Part {i}/{len(chunks)})\n<pre>{sanitize_log(chunk)}</pre>" | |
# Send the formatted log chunk as a message | |
bot.send_message( | |
chat_id, | |
formatted_chunk, | |
parse_mode='HTML', | |
reply_markup=create_log_keyboard(is_live) | |
) | |
else: | |
# If no logs are available, inform the user | |
bot.reply_to(message, "No logs available.") | |
def stop_command(message): | |
chat_id = message.chat.id | |
if not is_authorized(chat_id): | |
bot.reply_to(message, "β You are not authorized to use this bot.") | |
return | |
if chat_id in active_processes: | |
try: | |
# 1. First, disable log updates | |
active_processes[chat_id]['show_logs'] = False | |
active_processes[chat_id]['buffer'] = "" | |
# 2. Get the process | |
process = active_processes[chat_id]['process'] | |
# 3. Try to terminate the process using Windows-friendly approach | |
try: | |
# Get the process using psutil | |
parent = psutil.Process(process.pid) | |
# Get all children processes | |
children = parent.children(recursive=True) | |
# Terminate children first | |
for child in children: | |
child.terminate() | |
# Terminate parent | |
parent.terminate() | |
# Wait for processes to terminate | |
psutil.wait_procs(children + [parent], timeout=3) | |
except: | |
# Fallback: try direct termination | |
try: | |
process.terminate() | |
except: | |
pass | |
# 4. Send success message | |
bot.reply_to(message, "π Command stopped successfully.") | |
except Exception as e: | |
bot.reply_to(message, f"β Error stopping command: {str(e)}") | |
finally: | |
# 5. Clean up | |
if chat_id in active_processes: | |
try: | |
# One final attempt to kill if still running | |
process = active_processes[chat_id]['process'] | |
if process.poll() is None: | |
process.kill() | |
except: | |
pass | |
# Remove from active processes | |
del active_processes[chat_id] | |
else: | |
bot.reply_to(message, "No active command to stop.") | |
def show_command_history(message): | |
chat_id = message.chat.id | |
if not is_authorized(chat_id): | |
bot.reply_to(message, "β You are not authorized to use this bot.") | |
return | |
commands = get_command_history(chat_id) | |
if commands: | |
# formatted_commands = "\n".join(f"{i + 1}. {cmd}" for i, cmd in enumerate(commands)) | |
formatted_commands = "\n".join([f"{i+1}. {cmd}" for i, cmd in enumerate(commands)]) | |
# bot.reply_to(message, f"π Command History:\n{formatted_commands}") | |
# bot.reply_to(message, f"π Last {len(commands)} commands:\n<pre>{formatted_commands}</pre>", parse_mode='HTML') | |
bot.reply_to(message, f"π Command History (Last {len(commands)} commands):\n<pre>{formatted_commands}</pre>", parse_mode='HTML') | |
else: | |
bot.reply_to(message, "No command history available.") | |
def send_welcome(message): | |
bot.reply_to( | |
message, | |
"π Welcome! Send me any command to execute and monitor logs.\n" | |
"Use /stop to stop the current command.\n" | |
"Use /cmds to see the last 20 executed commands." | |
) | |
def execute_command(message): | |
chat_id = message.chat.id | |
if not is_authorized(chat_id): | |
bot.reply_to(message, "β You are not authorized to use this bot.") | |
return | |
if chat_id in active_processes: | |
bot.reply_to( | |
message, | |
"β οΈ A command is already running. Use /stop to stop it first." | |
) | |
return | |
command = message.text | |
bot.reply_to(message, f"βΆοΈ Executing command: {command}") | |
thread = threading.Thread( | |
target=stream_command, | |
args=(chat_id, command) | |
) | |
thread.start() | |
# # Start the bot | |
# try: | |
# bot.polling(none_stop=True) | |
# except Exception as e: | |
# print(f"Bot polling error: {e}") | |
# βββββββββββββββββββββββββββββββββββββββββββββββββββ FastAPI βββ | |
# from telegram_preview import router as telegram_preview_router | |
import telegram_preview | |
app = FastAPI() | |
telegram_preview.include_in_app(app) | |
# Simple healthβcheck endpoint | |
def health(): | |
return {"status": "ok"} | |
# OPTIONAL: if you really want a startup hook, keep it complete: | |
# @app.on_event("startup") | |
# async def startup(): | |
# print("Application started") | |
def startup(): | |
# Launch the bot *after* Uvicorn has started | |
threading.Thread(target=bot.infinity_polling, daemon=True).start() | |
if __name__ == "__main__": | |
import uvicorn | |
uvicorn.run("app:app", host="0.0.0.0", port=7860) | |