import os import logging import threading import time import datetime import traceback import fractions from fastapi import FastAPI, Request import av app = FastAPI() # ------------------------------------------------------------------- # Configuration & Global Variables # ------------------------------------------------------------------- # (No token or outgoing calls are used in this version.) # Conversation state user_inputs = {} # The conversation fields will depend on the mode. # Simple mode (default): Only "input_url" and "output_url" are required. # Advanced mode (if user sends /setting): Additional fields are required. conversation_fields = [] current_step = None advanced_mode = False # Default settings for advanced fields default_settings = { "quality_settings": "medium", "video_codec": "libx264", "audio_codec": "aac", "output_url": "rtmp://a.rtmp.youtube.com/live2" } # Streaming state & statistics streaming_state = "idle" # "idle", "streaming", "paused", "stopped" stream_chat_id = None # Chat ID for periodic updates stream_start_time = None frames_encoded = 0 bytes_sent = 0 # Stream resource objects video_stream = None audio_stream_in = None output_stream = None # Thread references stream_thread = None live_log_thread = None # Live logging globals live_log_lines = [] # Rolling list (max 50 lines) live_log_display = "" # Global variable updated every second by live_log_updater error_notification = "" # Global variable to hold any error notification message # ------------------------------------------------------------------- # Enhanced Logging Setup # ------------------------------------------------------------------- logging.basicConfig( level=logging.DEBUG, format="%(asctime)s [%(levelname)s] %(message)s", datefmt="%Y-%m-%d %H:%M:%S", ) logger = logging.getLogger() def append_live_log(line: str): global live_log_lines live_log_lines.append(line) if len(live_log_lines) > 50: live_log_lines.pop(0) class ListHandler(logging.Handler): def emit(self, record): log_entry = self.format(record) append_live_log(log_entry) list_handler = ListHandler() list_handler.setFormatter(logging.Formatter("%(asctime)s [%(levelname)s] %(message)s", "%Y-%m-%d %H:%M:%S")) logger.addHandler(list_handler) # ------------------------------------------------------------------- # Utility Functions & UI Helpers # ------------------------------------------------------------------- def create_html_message(text: str): # Wrap text in
 tags for monospaced output using HTML parse mode
    return {"parse_mode": "HTML", "text": f"
{text}
"} def get_inline_keyboard_for_stream(): # Inline keyboard for streaming controls after the stream has started keyboard = { "inline_keyboard": [ [ {"text": "⏸ Pause", "callback_data": "pause"}, {"text": "▶️ Resume", "callback_data": "resume"}, {"text": "⏹ Abort", "callback_data": "abort"} ], [ {"text": "📊 Status", "callback_data": "status"} ] ] } return keyboard def get_inline_keyboard_for_start(): # Inline keyboard with a start button for when conversation is complete. keyboard = { "inline_keyboard": [ [ {"text": "🚀 Start Streaming", "callback_data": "start_stream"} ] ] } return keyboard def help_text(): return ( "*Stream Bot Help*\n\n" "*/start* - Begin setup for streaming (simple mode: only Input & Output URL)\n" "*/setting* - Enter advanced settings (Input URL, Quality Settings, Video Codec, Audio Codec, Output URL)\n" "*/help* - Display this help text\n" "*/logs* - Show the log history\n\n" "After inputs are collected, press the inline *Start Streaming* button.\n\n" "While streaming, you can use inline buttons or commands:\n" "*/pause* - Pause the stream\n" "*/resume* - Resume a paused stream\n" "*/abort* - Abort the stream\n" "*/status* - Get current stream statistics" ) def send_guide_message(chat_id, message): # Return a response dictionary (to be sent as the webhook response) logging.info(f"Sending message to chat {chat_id}: {message}") return { "method": "sendMessage", "chat_id": chat_id, "text": message, "parse_mode": "Markdown" } def reset_statistics(): global stream_start_time, frames_encoded, bytes_sent stream_start_time = datetime.datetime.now() frames_encoded = 0 bytes_sent = 0 def get_uptime(): if stream_start_time: uptime = datetime.datetime.now() - stream_start_time return str(uptime).split('.')[0] return "0" def validate_inputs(): # Ensure all fields in conversation_fields have been provided missing = [field for field in conversation_fields if field not in user_inputs or not user_inputs[field]] if missing: return False, f"Missing fields: {', '.join(missing)}" return True, "" # ------------------------------------------------------------------- # Error Notification Helper # ------------------------------------------------------------------- def notify_error(chat_id, error_message): global error_notification error_notification = error_message # (This error will be available for the next webhook update; no outgoing call is made.) logging.error(f"Error notified to user in chat {chat_id}: {error_message}") # ------------------------------------------------------------------- # Live Log Updater (Background Thread) # ------------------------------------------------------------------- def live_log_updater(): global live_log_display, streaming_state try: while streaming_state in ["streaming", "paused"]: # Update the live log display variable with the last 15 log lines in HTML format live_log_display = "
" + "\n".join(live_log_lines[-15:]) + "
" time.sleep(1) except Exception as e: logging.error(f"Error in live log updater: {e}") # ------------------------------------------------------------------- # Logs History Handler (/logs) # ------------------------------------------------------------------- def logs_history(chat_id): # Return the last 50 log lines (if available) in HTML format log_text = "
" + "\n".join(live_log_lines[-50:]) + "
" # If an error notification exists, prepend it to the log text. global error_notification if error_notification: log_text = f"
ERROR: {error_notification}\n\n" + log_text[5:]
    return {
         "method": "sendMessage",
         "chat_id": chat_id,
         "text": log_text,
         "parse_mode": "HTML"
    }

# -------------------------------------------------------------------
# Conversation Handlers
# -------------------------------------------------------------------
def handle_start(chat_id):
    global current_step, user_inputs, conversation_fields, advanced_mode
    # By default, simple mode (unless advanced_mode was set via /setting)
    user_inputs = {}
    if not advanced_mode:
        conversation_fields = ["input_url", "output_url"]
    else:
        conversation_fields = ["input_url", "quality_settings", "video_codec", "audio_codec", "output_url"]
    current_step = conversation_fields[0]
    text = ("👋 *Welcome to the Stream Bot!*\n\n"
            "Let's set up your stream.\n"
            f"Please enter the *{current_step.replace('_', ' ')}*"
            f"{' (no default)' if current_step not in default_settings else f' _(default: {default_settings[current_step]})_'}:")
    logging.info(f"/start command from chat {chat_id} (advanced_mode={advanced_mode})")
    return {
        "method": "sendMessage",
        "chat_id": chat_id,
        "text": text,
        "parse_mode": "Markdown"
    }

def handle_setting(chat_id):
    global advanced_mode, conversation_fields, current_step, user_inputs
    advanced_mode = True
    conversation_fields = ["input_url", "quality_settings", "video_codec", "audio_codec", "output_url"]
    user_inputs = {}
    current_step = conversation_fields[0]
    text = ("⚙️ *Advanced Mode Activated!*\n\n"
            "Please enter the *input url*:")
    logging.info(f"/setting command from chat {chat_id} - advanced mode enabled")
    return {
        "method": "sendMessage",
        "chat_id": chat_id,
        "text": text,
        "parse_mode": "Markdown"
    }

def handle_help(chat_id):
    logging.info(f"/help command from chat {chat_id}")
    return {
        "method": "sendMessage",
        "chat_id": chat_id,
        "text": help_text(),
        "parse_mode": "Markdown"
    }

def handle_conversation(chat_id, text):
    global current_step, user_inputs, conversation_fields
    if current_step:
        # If the response is empty and a default exists, use the default.
        if text.strip() == "" and current_step in default_settings:
            user_inputs[current_step] = default_settings[current_step]
            logging.info(f"Using default for {current_step}: {default_settings[current_step]}")
        else:
            user_inputs[current_step] = text.strip()
            logging.info(f"Received {current_step}: {text.strip()}")

        idx = conversation_fields.index(current_step)
        if idx < len(conversation_fields) - 1:
            current_step = conversation_fields[idx + 1]
            prompt = f"Please enter the *{current_step.replace('_', ' ')}*"
            if current_step in default_settings:
                prompt += f" _(default: {default_settings[current_step]})_"
            return send_guide_message(chat_id, prompt)
        else:
            # All inputs have been collected.
            current_step = None
            valid, msg = validate_inputs()
            if not valid:
                return send_guide_message(chat_id, f"Validation error: {msg}")
            # In simple mode, fill in advanced fields with defaults.
            if not advanced_mode:
                user_inputs.setdefault("quality_settings", default_settings["quality_settings"])
                user_inputs.setdefault("video_codec", default_settings["video_codec"])
                user_inputs.setdefault("audio_codec", default_settings["audio_codec"])
            # Instead of asking the user to type "start", send an inline button.
            return {
                "method": "sendMessage",
                "chat_id": chat_id,
                "text": "All inputs received. Press *🚀 Start Streaming* to begin.",
                "reply_markup": get_inline_keyboard_for_start(),
                "parse_mode": "Markdown"
            }
    else:
        return send_guide_message(chat_id, "Unrecognized input. Type /help for available commands.")

# -------------------------------------------------------------------
# Background Streaming Functions
# -------------------------------------------------------------------
def stream_to_youtube(input_url, quality_settings, video_codec, audio_codec, output_url, chat_id):
    global video_stream, audio_stream_in, output_stream, streaming_state, frames_encoded, bytes_sent
    logging.info("Initiating streaming to YouTube")
    try:
        streaming_state = "streaming"
        reset_statistics()

        input_stream = av.open(input_url)
        output_stream = av.open(output_url, mode='w', format='flv')

        # Configure video stream
        video_stream = output_stream.add_stream(video_codec, rate=30)
        video_stream.width = input_stream.streams.video[0].width
        video_stream.height = input_stream.streams.video[0].height
        video_stream.pix_fmt = input_stream.streams.video[0].format.name
        video_stream.codec_context.options.update({'g': '30'})

        if quality_settings.lower() == "high":
            video_stream.bit_rate = 3000000
            video_stream.bit_rate_tolerance = 1000000
        elif quality_settings.lower() == "medium":
            video_stream.bit_rate = 1500000
            video_stream.bit_rate_tolerance = 500000
        elif quality_settings.lower() == "low":
            video_stream.bit_rate = 800000
            video_stream.bit_rate_tolerance = 200000

        # Configure audio stream
        audio_stream_in = input_stream.streams.audio[0]
        out_audio_stream = output_stream.add_stream(audio_codec, rate=audio_stream_in.rate)
        out_audio_stream.layout = "stereo"

        video_stream.codec_context.time_base = fractions.Fraction(1, video_stream.rate)

        logging.info("Streaming started successfully.")

        # Start the live log updater in a background thread
        global live_log_thread
        live_log_thread = threading.Thread(target=live_log_updater)
        live_log_thread.daemon = True
        live_log_thread.start()
        logging.info("Live log updater thread started.")

        # Stream loop: process packets until state changes
        while streaming_state in ["streaming", "paused"]:
            for packet in input_stream.demux():
                if streaming_state == "stopped":
                    break
                if packet.stream == input_stream.streams.video[0]:
                    for frame in packet.decode():
                        if streaming_state == "paused":
                            time.sleep(0.5)
                            continue
                        for out_packet in video_stream.encode(frame):
                            output_stream.mux(out_packet)
                            frames_encoded += 1
                            if hasattr(out_packet, "size"):
                                bytes_sent += out_packet.size
                elif packet.stream == audio_stream_in:
                    for frame in packet.decode():
                        if streaming_state == "paused":
                            time.sleep(0.5)
                            continue
                        for out_packet in out_audio_stream.encode(frame):
                            output_stream.mux(out_packet)
                            if hasattr(out_packet, "size"):
                                bytes_sent += out_packet.size

            # Flush remaining packets
            for out_packet in video_stream.encode():
                output_stream.mux(out_packet)
            for out_packet in out_audio_stream.encode():
                output_stream.mux(out_packet)

            if streaming_state == "paused":
                time.sleep(1)

        # Clean up resources
        try:
            video_stream.close()
            out_audio_stream.close()
            output_stream.close()
            input_stream.close()
        except Exception as cleanup_error:
            logging.error(f"Error during cleanup: {cleanup_error}")

        logging.info("Streaming complete, resources cleaned up.")
        streaming_state = "idle"
    except Exception as e:
        error_message = f"An error occurred during streaming: {str(e)}\n\n{traceback.format_exc()}"
        logging.error(error_message)
        streaming_state = "idle"
        # Notify the user about the error by storing it in a global variable
        notify_error(chat_id, error_message)

def start_streaming(chat_id):
    global stream_thread, stream_chat_id
    valid, msg = validate_inputs()
    if not valid:
        return send_guide_message(chat_id, f"Validation error: {msg}")

    stream_chat_id = chat_id
    try:
        # Start the background streaming thread
        stream_thread = threading.Thread(
            target=stream_to_youtube,
            args=(
                user_inputs["input_url"],
                user_inputs["quality_settings"],
                user_inputs["video_codec"],
                user_inputs["audio_codec"],
                user_inputs["output_url"],
                chat_id,
            )
        )
        stream_thread.daemon = True
        stream_thread.start()
        logging.info("Streaming thread started.")

        # Immediately inform the user that streaming has started;
        # live log updates will be available via the global variable and /logs command.
        return {
            "method": "sendMessage",
            "chat_id": chat_id,
            "text": "🚀 *Streaming initiated!* Live logs are now updating. Use the inline keyboard to control the stream.",
            "reply_markup": get_inline_keyboard_for_stream(),
            "parse_mode": "Markdown"
        }
    except Exception as e:
        error_message = f"Failed to start streaming: {str(e)}"
        logging.error(error_message)
        notify_error(chat_id, error_message)
        return send_guide_message(chat_id, error_message)

# -------------------------------------------------------------------
# Stream Control Handlers
# -------------------------------------------------------------------
def pause_stream(chat_id):
    global streaming_state
    if streaming_state == "streaming":
        streaming_state = "paused"
        logging.info("Streaming paused.")
        return {
            "method": "sendMessage",
            "chat_id": chat_id,
            "text": "⏸ *Streaming paused.*",
            "parse_mode": "Markdown"
        }
    return send_guide_message(chat_id, "Streaming is not active.")

def resume_stream(chat_id):
    global streaming_state
    if streaming_state == "paused":
        streaming_state = "streaming"
        logging.info("Streaming resumed.")
        return {
            "method": "sendMessage",
            "chat_id": chat_id,
            "text": "▶️ *Streaming resumed.*",
            "parse_mode": "Markdown"
        }
    return send_guide_message(chat_id, "Streaming is not paused.")

def abort_stream(chat_id):
    global streaming_state
    if streaming_state in ["streaming", "paused"]:
        streaming_state = "stopped"
        logging.info("Streaming aborted by user.")
        return {
            "method": "sendMessage",
            "chat_id": chat_id,
            "text": "⏹ *Streaming aborted.*",
            "parse_mode": "Markdown"
        }
    return send_guide_message(chat_id, "No active streaming to abort.")

def stream_status(chat_id):
    stats = (
        f"*Stream Status:*\n\n"
        f"• **State:** {streaming_state}\n"
        f"• **Uptime:** {get_uptime()}\n"
        f"• **Frames Encoded:** {frames_encoded}\n"
        f"• **Bytes Sent:** {bytes_sent}\n"
    )
    return {
        "method": "sendMessage",
        "chat_id": chat_id,
        "text": stats,
        "parse_mode": "Markdown"
    }

# -------------------------------------------------------------------
# FastAPI Webhook Endpoint for Telegram Updates
# -------------------------------------------------------------------
@app.post("/webhook")
async def telegram_webhook(request: Request):
    update = await request.json()
    logging.debug(f"Received update: {update}")

    # Process messages from users
    if "message" in update:
        chat_id = update["message"]["chat"]["id"]
        text = update["message"].get("text", "").strip()

        if text.startswith("/setting"):
            return handle_setting(chat_id)
        elif text.startswith("/start"):
            return handle_start(chat_id)
        elif text.startswith("/help"):
            return handle_help(chat_id)
        elif text.startswith("/logs"):
            return logs_history(chat_id)
        elif text.startswith("/pause"):
            return pause_stream(chat_id)
        elif text.startswith("/resume"):
            return resume_stream(chat_id)
        elif text.startswith("/abort"):
            return abort_stream(chat_id)
        elif text.startswith("/status"):
            return stream_status(chat_id)
        else:
            # Process conversation setup inputs
            return handle_conversation(chat_id, text)

    # Process inline keyboard callback queries
    elif "callback_query" in update:
        callback_data = update["callback_query"]["data"]
        chat_id = update["callback_query"]["message"]["chat"]["id"]
        message_id = update["callback_query"]["message"]["message_id"]

        if callback_data == "pause":
            response = pause_stream(chat_id)
        elif callback_data == "resume":
            response = resume_stream(chat_id)
        elif callback_data == "abort":
            response = abort_stream(chat_id)
        elif callback_data == "status":
            response = stream_status(chat_id)
        elif callback_data == "start_stream":
            response = start_streaming(chat_id)
        else:
            response = send_guide_message(chat_id, "❓ Unknown callback command.")

        # Edit the original message with updated information
        response["method"] = "editMessageText"
        response["message_id"] = message_id
        return response

    return {"status": "ok"}