Spaces:
Running
Running
import telebot | |
import subprocess | |
import threading | |
import time | |
import re | |
import html | |
from collections import deque | |
import os | |
import re | |
import signal | |
import psutil | |
from fastapi import FastAPI, status | |
from fastapi.responses import RedirectResponse | |
from fastapi.responses import HTMLResponse | |
# Initialize bot with your token | |
TOKEN = os.environ["BOT_TOKEN"] | |
bot = telebot.TeleBot(TOKEN) | |
# Store authorized chat IDs | |
AUTHORIZED_USERS = list(map(int, re.findall(r"\d+", str(os.getenv("CHAT_IDS", ""))))) | |
# Store active command processes | |
active_processes = {} | |
# Store logs history (for each chat_id) | |
logs_history = {} | |
MESSAGE_LIMIT = 4000 | |
# Maximum number of log lines to keep in history | |
MAX_LOG_HISTORY = 1000 | |
# Maximum number of command history entries | |
MAX_COMMAND_HISTORY = 20 | |
# Store command history | |
command_history = {} | |
def is_authorized(chat_id): | |
"""Check if user is authorized""" | |
return chat_id in AUTHORIZED_USERS | |
def initialize_logs_history(chat_id): | |
"""Initialize logs history for a chat""" | |
if chat_id not in logs_history: | |
logs_history[chat_id] = deque(maxlen=MAX_LOG_HISTORY) | |
def add_to_logs_history(chat_id, log_line): | |
"""Add log line to history""" | |
initialize_logs_history(chat_id) | |
logs_history[chat_id].append(log_line) | |
def initialize_command_history(chat_id): | |
"""Initialize command history for a chat""" | |
if chat_id not in command_history: | |
command_history[chat_id] = deque(maxlen=MAX_COMMAND_HISTORY) | |
def add_to_command_history(chat_id, command): | |
"""Add a command to the history""" | |
initialize_command_history(chat_id) | |
command_history[chat_id].append(command) | |
def get_command_history(chat_id): | |
"""Get the command history for a chat""" | |
if chat_id in command_history: | |
return list(command_history[chat_id]) | |
return [] | |
def get_latest_logs(chat_id): | |
"""Get the latest logs for a chat""" | |
if chat_id in logs_history: | |
return "\n".join(logs_history[chat_id]) | |
return "" | |
def sanitize_log(text): | |
"""Sanitize text for HTML formatting""" | |
return html.escape(text) | |
def split_logs(logs, latest_only=False): | |
"""Split logs into chunks respecting MESSAGE_LIMIT. | |
If `latest_only` is True, only return the last chunk. | |
""" | |
chunks = [] | |
current_chunk = "" | |
for line in logs.split('\n'): | |
if len(current_chunk) + len(line) + 1 > MESSAGE_LIMIT: | |
chunks.append(current_chunk) | |
current_chunk = line + '\n' | |
else: | |
current_chunk += line + '\n' | |
if current_chunk: | |
chunks.append(current_chunk) | |
# If latest_only is True, return only the last chunk | |
if latest_only and chunks: | |
return [chunks[-1]] | |
return chunks | |
def create_log_keyboard(is_live=True): | |
"""Create inline keyboard for log control""" | |
keyboard = telebot.types.InlineKeyboardMarkup(row_width=2) | |
if is_live: | |
keyboard.add( | |
telebot.types.InlineKeyboardButton("βΈοΈ Pause", callback_data="stop_logs"), | |
telebot.types.InlineKeyboardButton("βΆοΈ Resume", callback_data="resume_logs"), | |
telebot.types.InlineKeyboardButton("π Refresh", callback_data="refresh_logs"), | |
telebot.types.InlineKeyboardButton("β Clear", callback_data="clear_logs") | |
) | |
else: | |
keyboard.add( | |
telebot.types.InlineKeyboardButton("β Clear log history", callback_data="clear_logs") | |
) | |
return keyboard | |
def stream_command(chat_id, command): | |
initialize_logs_history(chat_id) | |
initialize_command_history(chat_id) | |
add_to_command_history(chat_id, command) | |
process = subprocess.Popen( | |
command, | |
stdout=subprocess.PIPE, | |
stderr=subprocess.STDOUT, | |
shell=True, | |
text=True, | |
bufsize=1, | |
universal_newlines=True | |
) | |
active_processes[chat_id] = { | |
'process': process, | |
'last_message_id': None, | |
'buffer': "", | |
'last_update': time.time(), | |
'show_logs': True, | |
'paused_at': None | |
} | |
UPDATE_INTERVAL = 2 | |
try: | |
while True: | |
line = process.stdout.readline() | |
if not line and process.poll() is not None: | |
break | |
if line: | |
add_to_logs_history(chat_id, line.strip()) | |
active_processes[chat_id]['buffer'] += line | |
current_time = time.time() | |
if (active_processes[chat_id]['show_logs'] and | |
current_time - active_processes[chat_id]['last_update'] >= UPDATE_INTERVAL): | |
send_log_update(chat_id) | |
active_processes[chat_id]['last_update'] = current_time | |
except Exception as e: | |
bot.send_message(chat_id, f"β Error: {str(e)}", parse_mode='HTML') | |
finally: | |
if chat_id in active_processes: | |
if active_processes[chat_id]['buffer'] and active_processes[chat_id]['show_logs']: | |
send_log_update(chat_id) | |
if process.poll() is not None: | |
bot.send_message( | |
chat_id, | |
f"β Command completed with exit code: {process.returncode}", | |
parse_mode='HTML' | |
) | |
try: | |
del active_processes[chat_id] | |
except KeyError: | |
pass | |
def send_log_update(chat_id): | |
"""Send or update log message""" | |
process_info = active_processes[chat_id] | |
log_content = process_info['buffer'] | |
if process_info['process'].poll() is None: | |
chunks = split_logs(log_content) | |
total_pages = len(chunks) | |
# Determine the content of the current page (latest log chunk) | |
current_chunk = chunks[-1] if chunks else "" | |
# Add the page count to the log header | |
status = "π Live Log (updating...)" if process_info['show_logs'] else "π Live Log (paused)" | |
header = f"{status} (Page {total_pages}/{total_pages})\n" | |
formatted_message = f"{header}\n<pre>{sanitize_log(current_chunk)}</pre>" | |
if not process_info['show_logs']: | |
formatted_message += "\n<pre>Logs are paused. Click Resume to continue showing logs.</pre>" | |
# Append a timestamp to force a slight change in the content when paused | |
if not process_info['show_logs']: | |
formatted_message += f"\n\n<pre>Last paused at: {time.strftime('%H:%M:%S')}</pre>" | |
try: | |
if process_info['last_message_id']: | |
# Edit the current message only if content differs | |
bot.edit_message_text( | |
formatted_message, | |
chat_id=chat_id, | |
message_id=process_info['last_message_id'], | |
parse_mode='HTML', | |
reply_markup=create_log_keyboard() | |
) | |
else: | |
# Send a new message if there isn't an existing one | |
msg = bot.send_message( | |
chat_id, | |
formatted_message, | |
parse_mode='HTML', | |
reply_markup=create_log_keyboard() | |
) | |
process_info['last_message_id'] = msg.message_id | |
except telebot.apihelper.ApiException as e: | |
# Handle specific "message is not modified" errors | |
if "message is not modified" in str(e): | |
# Skip the update since content is the same | |
pass | |
else: | |
bot.send_message(chat_id, f"Error updating message: {str(e)}") | |
else: | |
chunks = split_logs(log_content) | |
for i, chunk in enumerate(chunks, 1): | |
formatted_chunk = ( | |
f"π Final Log Part {i}/{len(chunks)}\n" | |
f"<pre>{sanitize_log(chunk)}</pre>" | |
) | |
bot.send_message(chat_id, formatted_chunk, parse_mode='HTML') | |
if process_info['process'].poll() is not None: | |
process_info['buffer'] = "" | |
def handle_callback(call): | |
chat_id = call.message.chat.id | |
if chat_id not in active_processes and call.data != "clear_logs": | |
bot.answer_callback_query(call.id, "No active process") | |
return | |
if call.data == "stop_logs": | |
active_processes[chat_id]['show_logs'] = False | |
bot.answer_callback_query(call.id, "Logs paused") | |
send_log_update(chat_id) | |
elif call.data == "resume_logs": | |
active_processes[chat_id]['show_logs'] = True | |
bot.answer_callback_query(call.id, "Logs resumed") | |
send_log_update(chat_id) | |
elif call.data == "refresh_logs": | |
bot.answer_callback_query(call.id, "Logs refreshed") | |
send_log_update(chat_id) | |
elif call.data == "clear_logs": | |
if chat_id in logs_history: | |
logs_history[chat_id].clear() | |
if chat_id in active_processes: | |
active_processes[chat_id]['buffer'] = "" | |
bot.answer_callback_query(call.id, "Logs cleared") | |
def show_logs(message): | |
chat_id = message.chat.id | |
# Check if the user is authorized to use the bot | |
if not is_authorized(chat_id): | |
bot.reply_to(message, "β You are not authorized to use this bot.") | |
return | |
# Initialize the log history for the chat if it doesn't exist | |
initialize_logs_history(chat_id) | |
# Determine the logs content and whether it's live or historical | |
logs_content = "" | |
if chat_id in active_processes: | |
logs_content = active_processes[chat_id]['buffer'] | |
is_live = True | |
else: | |
logs_content = get_latest_logs(chat_id) | |
is_live = False | |
# Show only the current live logs if the command is active | |
if is_live and logs_content: | |
# Get only the latest chunk of logs | |
chunks = split_logs(logs_content, latest_only=True) | |
if chunks: | |
status = "π Live Logs" | |
formatted_chunk = f"{status} (Current Live Logs)\n<pre>{sanitize_log(chunks[0])}</pre>" | |
# Send the formatted log chunk as a message | |
msg = bot.send_message( | |
chat_id, | |
formatted_chunk, | |
parse_mode='HTML', | |
reply_markup=create_log_keyboard(is_live) | |
) | |
# Store the message ID for future updates | |
active_processes[chat_id]['last_message_id'] = msg.message_id | |
else: | |
# If there are logs to show and no active process | |
if logs_content: | |
# Split the logs into chunks to avoid hitting the message length limit | |
chunks = split_logs(logs_content) | |
# Send all parts of the logs | |
for i, chunk in enumerate(chunks, 1): | |
status = "π Historical Logs" | |
formatted_chunk = f"{status} (Part {i}/{len(chunks)})\n<pre>{sanitize_log(chunk)}</pre>" | |
# Send the formatted log chunk as a message | |
bot.send_message( | |
chat_id, | |
formatted_chunk, | |
parse_mode='HTML', | |
reply_markup=create_log_keyboard(is_live) | |
) | |
else: | |
# If no logs are available, inform the user | |
bot.reply_to(message, "No logs available.") | |
def stop_command(message): | |
chat_id = message.chat.id | |
if not is_authorized(chat_id): | |
bot.reply_to(message, "β You are not authorized to use this bot.") | |
return | |
if chat_id in active_processes: | |
try: | |
# 1. First, disable log updates | |
active_processes[chat_id]['show_logs'] = False | |
active_processes[chat_id]['buffer'] = "" | |
# 2. Get the process | |
process = active_processes[chat_id]['process'] | |
# 3. Try to terminate the process using Windows-friendly approach | |
try: | |
# Get the process using psutil | |
parent = psutil.Process(process.pid) | |
# Get all children processes | |
children = parent.children(recursive=True) | |
# Terminate children first | |
for child in children: | |
child.terminate() | |
# Terminate parent | |
parent.terminate() | |
# Wait for processes to terminate | |
psutil.wait_procs(children + [parent], timeout=3) | |
except: | |
# Fallback: try direct termination | |
try: | |
process.terminate() | |
except: | |
pass | |
# 4. Send success message | |
bot.reply_to(message, "π Command stopped successfully.") | |
except Exception as e: | |
bot.reply_to(message, f"β Error stopping command: {str(e)}") | |
finally: | |
# 5. Clean up | |
if chat_id in active_processes: | |
try: | |
# One final attempt to kill if still running | |
process = active_processes[chat_id]['process'] | |
if process.poll() is None: | |
process.kill() | |
except: | |
pass | |
# Remove from active processes | |
del active_processes[chat_id] | |
else: | |
bot.reply_to(message, "No active command to stop.") | |
def show_command_history(message): | |
chat_id = message.chat.id | |
if not is_authorized(chat_id): | |
bot.reply_to(message, "β You are not authorized to use this bot.") | |
return | |
commands = get_command_history(chat_id) | |
if commands: | |
# formatted_commands = "\n".join(f"{i + 1}. {cmd}" for i, cmd in enumerate(commands)) | |
formatted_commands = "\n".join([f"{i+1}. {cmd}" for i, cmd in enumerate(commands)]) | |
# bot.reply_to(message, f"π Command History:\n{formatted_commands}") | |
# bot.reply_to(message, f"π Last {len(commands)} commands:\n<pre>{formatted_commands}</pre>", parse_mode='HTML') | |
bot.reply_to(message, f"π Command History (Last {len(commands)} commands):\n<pre>{formatted_commands}</pre>", parse_mode='HTML') | |
else: | |
bot.reply_to(message, "No command history available.") | |
def send_welcome(message): | |
bot.reply_to( | |
message, | |
"π Welcome! Send me any command to execute and monitor logs.\n" | |
"Use /stop to stop the current command.\n" | |
"Use /cmds to see the last 20 executed commands." | |
) | |
def execute_command(message): | |
chat_id = message.chat.id | |
if not is_authorized(chat_id): | |
bot.reply_to(message, "β You are not authorized to use this bot.") | |
return | |
if chat_id in active_processes: | |
bot.reply_to( | |
message, | |
"β οΈ A command is already running. Use /stop to stop it first." | |
) | |
return | |
command = message.text | |
bot.reply_to(message, f"βΆοΈ Executing command: {command}") | |
thread = threading.Thread( | |
target=stream_command, | |
args=(chat_id, command) | |
) | |
thread.start() | |
# # Start the bot | |
# try: | |
# bot.polling(none_stop=True) | |
# except Exception as e: | |
# print(f"Bot polling error: {e}") | |
# βββββββββββββββββββββββββββββββββββββββββββββββββββ FastAPI βββ | |
# app = FastAPI() | |
# @app.get("/") | |
# def root(): # β‘ healthβcheck hits this β must return 200 quickly | |
# # return {"status": "ok"} | |
# return RedirectResponse( | |
# url="https://t.me/python3463_bot", | |
# status_code=status.HTTP_302_FOUND # 302 is fine too | |
# ) | |
from fastapi import FastAPI, HTTPException | |
from fastapi.responses import HTMLResponse, FileResponse | |
import requests, os, time, pathlib | |
# ββββββββββββββββββββββββββββββ | |
# Config | |
# ββββββββββββββββββββββββββββββ | |
# BOT_TOKEN = os.getenv("BOT_TOKEN") # required | |
BOT_USERNAME = os.getenv("BOT_USERNAME", "live_logger_bot") | |
AVATAR_TTL = int(os.getenv("AVATAR_TTL", "3600")) # seconds to keep avatar fresh | |
DATA_DIR = pathlib.Path("/tmp") # change if you prefer | |
AVATAR_PATH = DATA_DIR / "bot_avatar.jpg" | |
AVATAR_META = {"ts": 0} # keeps timestamp of last download | |
if not TOKEN: | |
raise RuntimeError("TOKEN environment variable not set") | |
app = FastAPI(title="Telegram Bot Preview") | |
# ββββββββββββββββββββββββββββββ | |
# Helper: refresh avatar if needed | |
# ββββββββββββββββββββββββββββββ | |
def refresh_avatar(force: bool = False) -> None: | |
""" | |
Download the bot's avatar if it's missing or stale. | |
Saves it to AVATAR_PATH and updates AVATAR_META['ts']. | |
""" | |
now = time.time() | |
if not force and AVATAR_PATH.exists() and now - AVATAR_META["ts"] < AVATAR_TTL: | |
return # still fresh | |
try: | |
# 1. get bot id | |
me = requests.get(f"https://api.telegram.org/bot{TOKEN}/getMe").json() | |
user_id = me["result"]["id"] | |
# 2. get latest profile photo | |
photos = requests.get( | |
f"https://api.telegram.org/bot{TOKEN}/getUserProfilePhotos", | |
params={"user_id": user_id, "limit": 1}, | |
timeout=5, | |
).json() | |
if photos["result"]["total_count"] == 0: | |
return # bot has no avatar | |
file_id = photos["result"]["photos"][0][-1]["file_id"] # biggest size | |
# 3. resolve file path | |
file_obj = requests.get( | |
f"https://api.telegram.org/bot{TOKEN}/getFile", | |
params={"file_id": file_id}, | |
timeout=5, | |
).json() | |
file_path = file_obj["result"]["file_path"] | |
# 4. download the actual image | |
file_url = f"https://api.telegram.org/file/bot{TOKEN}/{file_path}" | |
img_bytes = requests.get(file_url, timeout=10).content | |
DATA_DIR.mkdir(parents=True, exist_ok=True) | |
with open(AVATAR_PATH, "wb") as f: | |
f.write(img_bytes) | |
AVATAR_META["ts"] = now | |
print("Avatar refreshed") | |
except Exception as exc: | |
# If something goes wrong, keep the old one | |
print("Avatar refresh failed:", exc) | |
# ββββββββββββββββββββββββββββββ | |
# Helper: bot description (once) | |
# ββββββββββββββββββββββββββββββ | |
def get_bot_description() -> str: | |
resp = requests.get( | |
f"https://api.telegram.org/bot{TOKEN}/getMyShortDescription" | |
).json() | |
return resp["result"].get("short_description", "") | |
# ββββββββββββββββββββββββββββββ | |
# Startup: fetch avatar once | |
# ββββββββββββββββββββββββββββββ | |
def on_startup(): | |
refresh_avatar(force=True) | |
# ββββββββββββββββββββββββββββββ | |
# Route: preview card | |
# ββββββββββββββββββββββββββββββ | |
def preview(): | |
refresh_avatar() # refresh if stale | |
avatar_src = "/avatar.jpg" if AVATAR_PATH.exists() else "https://telegram.org/img/t_logo.png" | |
description = get_bot_description() | |
html = f""" | |
<html> | |
<head> | |
<title>{BOT_USERNAME}</title> | |
<style> | |
body {{ | |
font-family: sans-serif; display: flex; justify-content: center; | |
padding: 40px; background: #f7f7f7; | |
}} | |
.card {{ | |
max-width: 420px; background: #fff; padding: 24px; text-align: center; | |
border-radius: 12px; box-shadow: 0 4px 12px rgba(0,0,0,.1); | |
}} | |
.avatar {{ | |
width: 120px; height: 120px; border-radius: 50%; object-fit: cover; | |
}} | |
.btn {{ | |
display: inline-block; margin-top: 16px; padding: 12px 24px; | |
background: #2AABEE; color: #fff; border-radius: 8px; | |
text-decoration: none; font-weight: bold; | |
}} | |
</style> | |
</head> | |
<body> | |
<div class="card"> | |
<img src="{avatar_src}" alt="avatar" class="avatar"> | |
<h2>@{BOT_USERNAME}</h2> | |
<p>{description}</p> | |
<a class="btn" href="https://t.me/{BOT_USERNAME}" target="_blank">StartΒ Bot</a> | |
</div> | |
</body> | |
</html> | |
""" | |
return HTMLResponse(html) | |
# ββββββββββββββββββββββββββββββ | |
# Route: serve avatar file | |
# ββββββββββββββββββββββββββββββ | |
def avatar(): | |
refresh_avatar() # make sure it's fresh | |
if not AVATAR_PATH.exists(): | |
raise HTTPException(status_code=404, detail="Avatar not found") | |
return FileResponse(AVATAR_PATH, media_type="image/jpeg") | |
def startup(): | |
# Launch the bot *after* Uvicorn has started | |
threading.Thread(target=bot.infinity_polling, daemon=True).start() | |
if __name__ == "__main__": | |
import uvicorn | |
uvicorn.run("app:app", host="0.0.0.0", port=7860) | |