AstraOS commited on
Commit
e0d8f08
Β·
verified Β·
1 Parent(s): 7fb4476

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +58 -75
app.py CHANGED
@@ -4,7 +4,6 @@ import time
4
  import datetime
5
  import traceback
6
  import fractions
7
- import collections
8
 
9
  from fastapi import FastAPI, Request
10
  import av
@@ -12,7 +11,7 @@ import av
12
  app = FastAPI()
13
 
14
  # -------------------------------------------------------------------
15
- # CONFIGURATION & GLOBAL VARIABLES
16
  # -------------------------------------------------------------------
17
  # (No token or outgoing HTTP calls are used in this version.)
18
  # Conversation state
@@ -33,7 +32,7 @@ default_settings = {
33
  }
34
 
35
  # Streaming state & statistics
36
- streaming_state = "idle" # Possible values: "idle", "streaming", "paused", "stopped"
37
  stream_chat_id = None # Chat ID for periodic updates
38
  stream_start_time = None
39
  frames_encoded = 0
@@ -48,13 +47,13 @@ output_stream = None
48
  stream_thread = None
49
  live_log_thread = None
50
 
51
- # Live logging globals using collections.deque for efficiency
52
- live_log_lines = collections.deque(maxlen=50) # Efficient log storage (max 50 lines)
53
- live_log_display = "" # Updated by the live log updater thread
54
- error_notification = "" # Holds error details if any error occurs
55
 
56
  # -------------------------------------------------------------------
57
- # ENHANCED LOGGING SETUP
58
  # -------------------------------------------------------------------
59
  logging.basicConfig(
60
  level=logging.DEBUG,
@@ -62,33 +61,31 @@ logging.basicConfig(
62
  datefmt="%Y-%m-%d %H:%M:%S",
63
  )
64
  logger = logging.getLogger()
65
- logger.setLevel(logging.DEBUG)
66
 
67
  def append_live_log(line: str):
68
- """ Append a log line to the global deque. """
69
  live_log_lines.append(line)
 
 
70
 
71
  class ListHandler(logging.Handler):
72
- """ Custom logging handler to store logs in the deque for live display. """
73
  def emit(self, record):
74
  log_entry = self.format(record)
75
  append_live_log(log_entry)
76
 
77
- # Attach custom logging handler
78
  list_handler = ListHandler()
79
- list_handler.setLevel(logging.DEBUG)
80
- list_handler.setFormatter(logging.Formatter("%Y-%m-%d %H:%M:%S [%(levelname)s] %(message)s"))
81
  logger.addHandler(list_handler)
82
 
83
  # -------------------------------------------------------------------
84
- # UTILITY FUNCTIONS & UI HELPERS
85
  # -------------------------------------------------------------------
86
  def create_html_message(text: str):
87
- """ Returns a message dictionary with HTML formatting. """
88
  return {"parse_mode": "HTML", "text": f"<pre>{text}</pre>"}
89
 
90
  def get_inline_keyboard_for_stream():
91
- """ Returns the inline keyboard for stream controls. """
92
  keyboard = {
93
  "inline_keyboard": [
94
  [
@@ -104,7 +101,7 @@ def get_inline_keyboard_for_stream():
104
  return keyboard
105
 
106
  def get_inline_keyboard_for_start():
107
- """ Returns the inline keyboard with a Start Streaming button. """
108
  keyboard = {
109
  "inline_keyboard": [
110
  [
@@ -115,13 +112,12 @@ def get_inline_keyboard_for_start():
115
  return keyboard
116
 
117
  def help_text():
118
- """ Returns the help text message. """
119
  return (
120
  "*Stream Bot Help*\n\n"
121
  "*/start* - Begin setup for streaming (simple mode: only Input & Output URL)\n"
122
  "*/setting* - Enter advanced settings (Input URL, Quality Settings, Video Codec, Audio Codec, Output URL)\n"
123
  "*/help* - Display this help text\n"
124
- "*/logs* - Show the live log display\n\n"
125
  "After inputs are collected, press the inline *Start Streaming* button.\n\n"
126
  "While streaming, you can use inline buttons or commands:\n"
127
  "*/pause* - Pause the stream\n"
@@ -131,7 +127,7 @@ def help_text():
131
  )
132
 
133
  def send_guide_message(chat_id, message):
134
- """ Returns a response dictionary for sending a guide message. """
135
  logging.info(f"Sending message to chat {chat_id}: {message}")
136
  return {
137
  "method": "sendMessage",
@@ -141,58 +137,55 @@ def send_guide_message(chat_id, message):
141
  }
142
 
143
  def reset_statistics():
144
- """ Resets streaming statistics. """
145
  global stream_start_time, frames_encoded, bytes_sent
146
  stream_start_time = datetime.datetime.now()
147
  frames_encoded = 0
148
  bytes_sent = 0
149
 
150
  def get_uptime():
151
- """ Returns the uptime as a formatted string. """
152
  if stream_start_time:
153
  uptime = datetime.datetime.now() - stream_start_time
154
  return str(uptime).split('.')[0]
155
  return "0"
156
 
157
  def validate_inputs():
158
- """ Validates that all required conversation fields have been provided. """
159
  missing = [field for field in conversation_fields if field not in user_inputs or not user_inputs[field]]
160
  if missing:
161
  return False, f"Missing fields: {', '.join(missing)}"
162
  return True, ""
163
 
164
  # -------------------------------------------------------------------
165
- # ERROR NOTIFICATION VIA GLOBAL VARIABLE
166
  # -------------------------------------------------------------------
167
  def notify_error(chat_id, error_message):
168
- """ Stores the error message in a global variable and logs it. """
169
  global error_notification
170
  error_notification = error_message
171
  logging.error(f"Error for chat {chat_id}: {error_message}")
172
 
173
  # -------------------------------------------------------------------
174
- # LIVE LOG UPDATER THREAD
175
  # -------------------------------------------------------------------
176
  def live_log_updater():
177
- """ Continuously updates the global live_log_display with the latest log lines. """
178
  global live_log_display, streaming_state
179
  try:
180
  while streaming_state in ["streaming", "paused"]:
181
- # Update with all lines from the deque, joined by newlines
182
- live_log_display = "<pre>" + "\n".join(list(live_log_lines)) + "</pre>"
 
183
  time.sleep(1)
184
  except Exception as e:
185
- logging.exception("Error in live log updater")
186
 
187
  # -------------------------------------------------------------------
188
- # LOGS HISTORY HANDLER (/logs)
189
  # -------------------------------------------------------------------
190
  def logs_history(chat_id):
191
- """ Returns the current live log display, prepending any error notification if present. """
192
  global live_log_display, error_notification
 
193
  log_text = live_log_display if live_log_display else "<pre>No logs available yet.</pre>"
 
194
  if error_notification:
195
- # Prepend the error message to the log display
196
  if log_text.startswith("<pre>"):
197
  log_text = f"<pre>ERROR: {error_notification}\n\n" + log_text[5:]
198
  else:
@@ -205,23 +198,21 @@ def logs_history(chat_id):
205
  }
206
 
207
  # -------------------------------------------------------------------
208
- # CONVERSATION HANDLERS
209
  # -------------------------------------------------------------------
210
  def handle_start(chat_id):
211
  global current_step, user_inputs, conversation_fields, advanced_mode
212
- # Default to simple mode unless /setting has been issued
213
  user_inputs = {}
214
  if not advanced_mode:
215
  conversation_fields = ["input_url", "output_url"]
216
  else:
217
  conversation_fields = ["input_url", "quality_settings", "video_codec", "audio_codec", "output_url"]
218
  current_step = conversation_fields[0]
219
- text = (
220
- "πŸ‘‹ *Welcome to the Stream Bot!*\n\n"
221
- "Let's set up your stream.\n"
222
- f"Please enter the *{current_step.replace('_', ' ')}*"
223
- f"{' (no default)' if current_step not in default_settings else f' _(default: {default_settings[current_step]})_'}:"
224
- )
225
  logging.info(f"/start command from chat {chat_id} (advanced_mode={advanced_mode})")
226
  return {
227
  "method": "sendMessage",
@@ -236,10 +227,8 @@ def handle_setting(chat_id):
236
  conversation_fields = ["input_url", "quality_settings", "video_codec", "audio_codec", "output_url"]
237
  user_inputs = {}
238
  current_step = conversation_fields[0]
239
- text = (
240
- "βš™οΈ *Advanced Mode Activated!*\n\n"
241
- "Please enter the *input url*:"
242
- )
243
  logging.info(f"/setting command from chat {chat_id} - advanced mode enabled")
244
  return {
245
  "method": "sendMessage",
@@ -260,13 +249,13 @@ def handle_help(chat_id):
260
  def handle_conversation(chat_id, text):
261
  global current_step, user_inputs, conversation_fields
262
  if current_step:
 
263
  if text.strip() == "" and current_step in default_settings:
264
  user_inputs[current_step] = default_settings[current_step]
265
  logging.info(f"Using default for {current_step}: {default_settings[current_step]}")
266
  else:
267
  user_inputs[current_step] = text.strip()
268
  logging.info(f"Received {current_step}: {text.strip()}")
269
-
270
  idx = conversation_fields.index(current_step)
271
  if idx < len(conversation_fields) - 1:
272
  current_step = conversation_fields[idx + 1]
@@ -275,18 +264,21 @@ def handle_conversation(chat_id, text):
275
  prompt += f" _(default: {default_settings[current_step]})_"
276
  return send_guide_message(chat_id, prompt)
277
  else:
 
278
  current_step = None
279
  valid, msg = validate_inputs()
280
  if not valid:
281
  return send_guide_message(chat_id, f"Validation error: {msg}")
 
282
  if not advanced_mode:
283
  user_inputs.setdefault("quality_settings", default_settings["quality_settings"])
284
  user_inputs.setdefault("video_codec", default_settings["video_codec"])
285
  user_inputs.setdefault("audio_codec", default_settings["audio_codec"])
 
286
  return {
287
  "method": "sendMessage",
288
  "chat_id": chat_id,
289
- "text": "All inputs received. Press *πŸš€ Start Streaming* to begin.",
290
  "reply_markup": get_inline_keyboard_for_start(),
291
  "parse_mode": "Markdown"
292
  }
@@ -294,7 +286,7 @@ def handle_conversation(chat_id, text):
294
  return send_guide_message(chat_id, "Unrecognized input. Type /help for available commands.")
295
 
296
  # -------------------------------------------------------------------
297
- # BACKGROUND STREAMING FUNCTIONS
298
  # -------------------------------------------------------------------
299
  def stream_to_youtube(input_url, quality_settings, video_codec, audio_codec, output_url, chat_id):
300
  global video_stream, audio_stream_in, output_stream, streaming_state, frames_encoded, bytes_sent
@@ -302,17 +294,14 @@ def stream_to_youtube(input_url, quality_settings, video_codec, audio_codec, out
302
  try:
303
  streaming_state = "streaming"
304
  reset_statistics()
305
-
306
  input_stream = av.open(input_url)
307
  output_stream = av.open(output_url, mode='w', format='flv')
308
-
309
  # Configure video stream
310
  video_stream = output_stream.add_stream(video_codec, rate=30)
311
  video_stream.width = input_stream.streams.video[0].width
312
  video_stream.height = input_stream.streams.video[0].height
313
  video_stream.pix_fmt = input_stream.streams.video[0].format.name
314
  video_stream.codec_context.options.update({'g': '30'})
315
-
316
  if quality_settings.lower() == "high":
317
  video_stream.bit_rate = 3000000
318
  video_stream.bit_rate_tolerance = 1000000
@@ -322,24 +311,20 @@ def stream_to_youtube(input_url, quality_settings, video_codec, audio_codec, out
322
  elif quality_settings.lower() == "low":
323
  video_stream.bit_rate = 800000
324
  video_stream.bit_rate_tolerance = 200000
325
-
326
  # Configure audio stream
327
  audio_stream_in = input_stream.streams.audio[0]
328
  out_audio_stream = output_stream.add_stream(audio_codec, rate=audio_stream_in.rate)
329
  out_audio_stream.layout = "stereo"
330
-
331
  video_stream.codec_context.time_base = fractions.Fraction(1, video_stream.rate)
332
-
333
  logging.info("Streaming started successfully.")
334
-
335
- # Start live log updater thread if not already running
336
  global live_log_thread
337
  if live_log_thread is None or not live_log_thread.is_alive():
338
- live_log_thread = threading.Thread(target=live_log_updater, daemon=True)
 
339
  live_log_thread.start()
340
  logging.info("Live log updater thread started.")
341
-
342
- # Main streaming loop
343
  while streaming_state in ["streaming", "paused"]:
344
  for packet in input_stream.demux():
345
  if streaming_state == "stopped":
@@ -363,16 +348,13 @@ def stream_to_youtube(input_url, quality_settings, video_codec, audio_codec, out
363
  output_stream.mux(out_packet)
364
  if hasattr(out_packet, "size"):
365
  bytes_sent += out_packet.size
366
-
367
- # Flush any remaining packets
368
  for out_packet in video_stream.encode():
369
  output_stream.mux(out_packet)
370
  for out_packet in out_audio_stream.encode():
371
  output_stream.mux(out_packet)
372
-
373
  if streaming_state == "paused":
374
  time.sleep(1)
375
-
376
  # Clean up resources
377
  try:
378
  video_stream.close()
@@ -381,13 +363,13 @@ def stream_to_youtube(input_url, quality_settings, video_codec, audio_codec, out
381
  input_stream.close()
382
  except Exception as cleanup_error:
383
  logging.error(f"Error during cleanup: {cleanup_error}")
384
-
385
  logging.info("Streaming complete, resources cleaned up.")
386
  streaming_state = "idle"
387
  except Exception as e:
388
  error_message = f"An error occurred during streaming: {str(e)}\n\n{traceback.format_exc()}"
389
  logging.error(error_message)
390
  streaming_state = "idle"
 
391
  notify_error(chat_id, error_message)
392
 
393
  def start_streaming(chat_id):
@@ -395,7 +377,6 @@ def start_streaming(chat_id):
395
  valid, msg = validate_inputs()
396
  if not valid:
397
  return send_guide_message(chat_id, f"Validation error: {msg}")
398
-
399
  stream_chat_id = chat_id
400
  try:
401
  stream_thread = threading.Thread(
@@ -412,11 +393,12 @@ def start_streaming(chat_id):
412
  stream_thread.daemon = True
413
  stream_thread.start()
414
  logging.info("Streaming thread started.")
415
-
 
416
  return {
417
  "method": "sendMessage",
418
  "chat_id": chat_id,
419
- "text": "πŸš€ *Streaming initiated!* Live logs are now updating. Use the inline keyboard to control the stream.",
420
  "reply_markup": get_inline_keyboard_for_stream(),
421
  "parse_mode": "Markdown"
422
  }
@@ -427,7 +409,7 @@ def start_streaming(chat_id):
427
  return send_guide_message(chat_id, error_message)
428
 
429
  # -------------------------------------------------------------------
430
- # STREAM CONTROL HANDLERS
431
  # -------------------------------------------------------------------
432
  def pause_stream(chat_id):
433
  global streaming_state
@@ -484,7 +466,7 @@ def stream_status(chat_id):
484
  }
485
 
486
  # -------------------------------------------------------------------
487
- # FASTAPI WEBHOOK ENDPOINT FOR TELEGRAM UPDATES
488
  # -------------------------------------------------------------------
489
  @app.post("/webhook")
490
  async def telegram_webhook(request: Request):
@@ -495,7 +477,6 @@ async def telegram_webhook(request: Request):
495
  if "message" in update:
496
  chat_id = update["message"]["chat"]["id"]
497
  text = update["message"].get("text", "").strip()
498
-
499
  if text.startswith("/setting"):
500
  return handle_setting(chat_id)
501
  elif text.startswith("/start"):
@@ -513,14 +494,13 @@ async def telegram_webhook(request: Request):
513
  elif text.startswith("/status"):
514
  return stream_status(chat_id)
515
  else:
 
516
  return handle_conversation(chat_id, text)
517
-
518
  # Process inline keyboard callback queries
519
  elif "callback_query" in update:
520
  callback_data = update["callback_query"]["data"]
521
  chat_id = update["callback_query"]["message"]["chat"]["id"]
522
  message_id = update["callback_query"]["message"]["message_id"]
523
-
524
  if callback_data == "pause":
525
  response = pause_stream(chat_id)
526
  elif callback_data == "resume":
@@ -533,9 +513,12 @@ async def telegram_webhook(request: Request):
533
  response = start_streaming(chat_id)
534
  else:
535
  response = send_guide_message(chat_id, "❓ Unknown callback command.")
536
-
 
 
 
 
537
  response["method"] = "editMessageText"
538
  response["message_id"] = message_id
539
  return response
540
-
541
  return {"status": "ok"}
 
4
  import datetime
5
  import traceback
6
  import fractions
 
7
 
8
  from fastapi import FastAPI, Request
9
  import av
 
11
  app = FastAPI()
12
 
13
  # -------------------------------------------------------------------
14
+ # Configuration & Global Variables
15
  # -------------------------------------------------------------------
16
  # (No token or outgoing HTTP calls are used in this version.)
17
  # Conversation state
 
32
  }
33
 
34
  # Streaming state & statistics
35
+ streaming_state = "idle" # "idle", "streaming", "paused", "stopped"
36
  stream_chat_id = None # Chat ID for periodic updates
37
  stream_start_time = None
38
  frames_encoded = 0
 
47
  stream_thread = None
48
  live_log_thread = None
49
 
50
+ # Live logging globals
51
+ live_log_lines = [] # Rolling list (max 50 log lines)
52
+ live_log_display = "" # Global variable updated every second by live_log_updater
53
+ error_notification = "" # Global variable to hold error details if any
54
 
55
  # -------------------------------------------------------------------
56
+ # Enhanced Logging Setup
57
  # -------------------------------------------------------------------
58
  logging.basicConfig(
59
  level=logging.DEBUG,
 
61
  datefmt="%Y-%m-%d %H:%M:%S",
62
  )
63
  logger = logging.getLogger()
 
64
 
65
  def append_live_log(line: str):
66
+ global live_log_lines
67
  live_log_lines.append(line)
68
+ if len(live_log_lines) > 50:
69
+ live_log_lines.pop(0)
70
 
71
  class ListHandler(logging.Handler):
 
72
  def emit(self, record):
73
  log_entry = self.format(record)
74
  append_live_log(log_entry)
75
 
 
76
  list_handler = ListHandler()
77
+ list_handler.setFormatter(logging.Formatter("%(asctime)s [%(levelname)s] %(message)s", "%Y-%m-%d %H:%M:%S"))
 
78
  logger.addHandler(list_handler)
79
 
80
  # -------------------------------------------------------------------
81
+ # Utility Functions & UI Helpers
82
  # -------------------------------------------------------------------
83
  def create_html_message(text: str):
84
+ # Wrap text in <pre> tags for monospaced output using HTML parse mode
85
  return {"parse_mode": "HTML", "text": f"<pre>{text}</pre>"}
86
 
87
  def get_inline_keyboard_for_stream():
88
+ # Inline keyboard for streaming controls (when streaming is active)
89
  keyboard = {
90
  "inline_keyboard": [
91
  [
 
101
  return keyboard
102
 
103
  def get_inline_keyboard_for_start():
104
+ # Inline keyboard with a start button (when inputs are complete)
105
  keyboard = {
106
  "inline_keyboard": [
107
  [
 
112
  return keyboard
113
 
114
  def help_text():
 
115
  return (
116
  "*Stream Bot Help*\n\n"
117
  "*/start* - Begin setup for streaming (simple mode: only Input & Output URL)\n"
118
  "*/setting* - Enter advanced settings (Input URL, Quality Settings, Video Codec, Audio Codec, Output URL)\n"
119
  "*/help* - Display this help text\n"
120
+ "*/logs* - Show the live log history\n\n"
121
  "After inputs are collected, press the inline *Start Streaming* button.\n\n"
122
  "While streaming, you can use inline buttons or commands:\n"
123
  "*/pause* - Pause the stream\n"
 
127
  )
128
 
129
  def send_guide_message(chat_id, message):
130
+ # Return a response dictionary to be sent as the webhook reply
131
  logging.info(f"Sending message to chat {chat_id}: {message}")
132
  return {
133
  "method": "sendMessage",
 
137
  }
138
 
139
  def reset_statistics():
 
140
  global stream_start_time, frames_encoded, bytes_sent
141
  stream_start_time = datetime.datetime.now()
142
  frames_encoded = 0
143
  bytes_sent = 0
144
 
145
  def get_uptime():
 
146
  if stream_start_time:
147
  uptime = datetime.datetime.now() - stream_start_time
148
  return str(uptime).split('.')[0]
149
  return "0"
150
 
151
  def validate_inputs():
152
+ # Ensure all fields in conversation_fields have been provided
153
  missing = [field for field in conversation_fields if field not in user_inputs or not user_inputs[field]]
154
  if missing:
155
  return False, f"Missing fields: {', '.join(missing)}"
156
  return True, ""
157
 
158
  # -------------------------------------------------------------------
159
+ # Error Notification Helper
160
  # -------------------------------------------------------------------
161
  def notify_error(chat_id, error_message):
 
162
  global error_notification
163
  error_notification = error_message
164
  logging.error(f"Error for chat {chat_id}: {error_message}")
165
 
166
  # -------------------------------------------------------------------
167
+ # Live Log Updater (Background Thread)
168
  # -------------------------------------------------------------------
169
  def live_log_updater():
 
170
  global live_log_display, streaming_state
171
  try:
172
  while streaming_state in ["streaming", "paused"]:
173
+ # Include a header with the current update time in the log display.
174
+ current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
175
+ live_log_display = f"<pre>Live Logs (Updated at {current_time}):\n" + "\n".join(live_log_lines[-15:]) + "</pre>"
176
  time.sleep(1)
177
  except Exception as e:
178
+ logging.error(f"Error in live log updater: {e}")
179
 
180
  # -------------------------------------------------------------------
181
+ # Logs History Handler (/logs)
182
  # -------------------------------------------------------------------
183
  def logs_history(chat_id):
 
184
  global live_log_display, error_notification
185
+ # Use the global live_log_display; if empty, show a default message.
186
  log_text = live_log_display if live_log_display else "<pre>No logs available yet.</pre>"
187
+ # If an error occurred, prepend the error notification.
188
  if error_notification:
 
189
  if log_text.startswith("<pre>"):
190
  log_text = f"<pre>ERROR: {error_notification}\n\n" + log_text[5:]
191
  else:
 
198
  }
199
 
200
  # -------------------------------------------------------------------
201
+ # Conversation Handlers
202
  # -------------------------------------------------------------------
203
  def handle_start(chat_id):
204
  global current_step, user_inputs, conversation_fields, advanced_mode
205
+ # By default, simple mode is used unless advanced_mode is set via /setting.
206
  user_inputs = {}
207
  if not advanced_mode:
208
  conversation_fields = ["input_url", "output_url"]
209
  else:
210
  conversation_fields = ["input_url", "quality_settings", "video_codec", "audio_codec", "output_url"]
211
  current_step = conversation_fields[0]
212
+ text = ("πŸ‘‹ *Welcome to the Stream Bot!*\n\n"
213
+ "Let's set up your stream.\n"
214
+ f"Please enter the *{current_step.replace('_', ' ')}*"
215
+ f"{' (no default)' if current_step not in default_settings else f' _(default: {default_settings[current_step]})_'}:")
 
 
216
  logging.info(f"/start command from chat {chat_id} (advanced_mode={advanced_mode})")
217
  return {
218
  "method": "sendMessage",
 
227
  conversation_fields = ["input_url", "quality_settings", "video_codec", "audio_codec", "output_url"]
228
  user_inputs = {}
229
  current_step = conversation_fields[0]
230
+ text = ("βš™οΈ *Advanced Mode Activated!*\n\n"
231
+ "Please enter the *input url*:")
 
 
232
  logging.info(f"/setting command from chat {chat_id} - advanced mode enabled")
233
  return {
234
  "method": "sendMessage",
 
249
  def handle_conversation(chat_id, text):
250
  global current_step, user_inputs, conversation_fields
251
  if current_step:
252
+ # If the response is empty and a default exists, use the default.
253
  if text.strip() == "" and current_step in default_settings:
254
  user_inputs[current_step] = default_settings[current_step]
255
  logging.info(f"Using default for {current_step}: {default_settings[current_step]}")
256
  else:
257
  user_inputs[current_step] = text.strip()
258
  logging.info(f"Received {current_step}: {text.strip()}")
 
259
  idx = conversation_fields.index(current_step)
260
  if idx < len(conversation_fields) - 1:
261
  current_step = conversation_fields[idx + 1]
 
264
  prompt += f" _(default: {default_settings[current_step]})_"
265
  return send_guide_message(chat_id, prompt)
266
  else:
267
+ # All inputs have been collected.
268
  current_step = None
269
  valid, msg = validate_inputs()
270
  if not valid:
271
  return send_guide_message(chat_id, f"Validation error: {msg}")
272
+ # In simple mode, fill in advanced fields with defaults.
273
  if not advanced_mode:
274
  user_inputs.setdefault("quality_settings", default_settings["quality_settings"])
275
  user_inputs.setdefault("video_codec", default_settings["video_codec"])
276
  user_inputs.setdefault("audio_codec", default_settings["audio_codec"])
277
+ # Instead of asking the user to type "start", send an inline button.
278
  return {
279
  "method": "sendMessage",
280
  "chat_id": chat_id,
281
+ "text": "All inputs received. Press *πŸš€ Start Streaming* to begin.\n\n" + live_log_display,
282
  "reply_markup": get_inline_keyboard_for_start(),
283
  "parse_mode": "Markdown"
284
  }
 
286
  return send_guide_message(chat_id, "Unrecognized input. Type /help for available commands.")
287
 
288
  # -------------------------------------------------------------------
289
+ # Background Streaming Functions
290
  # -------------------------------------------------------------------
291
  def stream_to_youtube(input_url, quality_settings, video_codec, audio_codec, output_url, chat_id):
292
  global video_stream, audio_stream_in, output_stream, streaming_state, frames_encoded, bytes_sent
 
294
  try:
295
  streaming_state = "streaming"
296
  reset_statistics()
 
297
  input_stream = av.open(input_url)
298
  output_stream = av.open(output_url, mode='w', format='flv')
 
299
  # Configure video stream
300
  video_stream = output_stream.add_stream(video_codec, rate=30)
301
  video_stream.width = input_stream.streams.video[0].width
302
  video_stream.height = input_stream.streams.video[0].height
303
  video_stream.pix_fmt = input_stream.streams.video[0].format.name
304
  video_stream.codec_context.options.update({'g': '30'})
 
305
  if quality_settings.lower() == "high":
306
  video_stream.bit_rate = 3000000
307
  video_stream.bit_rate_tolerance = 1000000
 
311
  elif quality_settings.lower() == "low":
312
  video_stream.bit_rate = 800000
313
  video_stream.bit_rate_tolerance = 200000
 
314
  # Configure audio stream
315
  audio_stream_in = input_stream.streams.audio[0]
316
  out_audio_stream = output_stream.add_stream(audio_codec, rate=audio_stream_in.rate)
317
  out_audio_stream.layout = "stereo"
 
318
  video_stream.codec_context.time_base = fractions.Fraction(1, video_stream.rate)
 
319
  logging.info("Streaming started successfully.")
320
+ # Start the live log updater in a background thread if not already running.
 
321
  global live_log_thread
322
  if live_log_thread is None or not live_log_thread.is_alive():
323
+ live_log_thread = threading.Thread(target=live_log_updater)
324
+ live_log_thread.daemon = True
325
  live_log_thread.start()
326
  logging.info("Live log updater thread started.")
327
+ # Stream loop: process packets until state changes
 
328
  while streaming_state in ["streaming", "paused"]:
329
  for packet in input_stream.demux():
330
  if streaming_state == "stopped":
 
348
  output_stream.mux(out_packet)
349
  if hasattr(out_packet, "size"):
350
  bytes_sent += out_packet.size
351
+ # Flush remaining packets
 
352
  for out_packet in video_stream.encode():
353
  output_stream.mux(out_packet)
354
  for out_packet in out_audio_stream.encode():
355
  output_stream.mux(out_packet)
 
356
  if streaming_state == "paused":
357
  time.sleep(1)
 
358
  # Clean up resources
359
  try:
360
  video_stream.close()
 
363
  input_stream.close()
364
  except Exception as cleanup_error:
365
  logging.error(f"Error during cleanup: {cleanup_error}")
 
366
  logging.info("Streaming complete, resources cleaned up.")
367
  streaming_state = "idle"
368
  except Exception as e:
369
  error_message = f"An error occurred during streaming: {str(e)}\n\n{traceback.format_exc()}"
370
  logging.error(error_message)
371
  streaming_state = "idle"
372
+ # Notify the user about the error by storing it in a global variable
373
  notify_error(chat_id, error_message)
374
 
375
  def start_streaming(chat_id):
 
377
  valid, msg = validate_inputs()
378
  if not valid:
379
  return send_guide_message(chat_id, f"Validation error: {msg}")
 
380
  stream_chat_id = chat_id
381
  try:
382
  stream_thread = threading.Thread(
 
393
  stream_thread.daemon = True
394
  stream_thread.start()
395
  logging.info("Streaming thread started.")
396
+ # Immediately inform the user that streaming has started, along with current live logs.
397
+ response_text = "πŸš€ *Streaming initiated!* Live logs are now updating below:\n\n" + live_log_display
398
  return {
399
  "method": "sendMessage",
400
  "chat_id": chat_id,
401
+ "text": response_text,
402
  "reply_markup": get_inline_keyboard_for_stream(),
403
  "parse_mode": "Markdown"
404
  }
 
409
  return send_guide_message(chat_id, error_message)
410
 
411
  # -------------------------------------------------------------------
412
+ # Stream Control Handlers
413
  # -------------------------------------------------------------------
414
  def pause_stream(chat_id):
415
  global streaming_state
 
466
  }
467
 
468
  # -------------------------------------------------------------------
469
+ # FastAPI Webhook Endpoint for Telegram Updates
470
  # -------------------------------------------------------------------
471
  @app.post("/webhook")
472
  async def telegram_webhook(request: Request):
 
477
  if "message" in update:
478
  chat_id = update["message"]["chat"]["id"]
479
  text = update["message"].get("text", "").strip()
 
480
  if text.startswith("/setting"):
481
  return handle_setting(chat_id)
482
  elif text.startswith("/start"):
 
494
  elif text.startswith("/status"):
495
  return stream_status(chat_id)
496
  else:
497
+ # Process conversation setup inputs
498
  return handle_conversation(chat_id, text)
 
499
  # Process inline keyboard callback queries
500
  elif "callback_query" in update:
501
  callback_data = update["callback_query"]["data"]
502
  chat_id = update["callback_query"]["message"]["chat"]["id"]
503
  message_id = update["callback_query"]["message"]["message_id"]
 
504
  if callback_data == "pause":
505
  response = pause_stream(chat_id)
506
  elif callback_data == "resume":
 
513
  response = start_streaming(chat_id)
514
  else:
515
  response = send_guide_message(chat_id, "❓ Unknown callback command.")
516
+ # Ensure the inline keyboard remains in the updated message.
517
+ if streaming_state in ["streaming", "paused"]:
518
+ response["reply_markup"] = get_inline_keyboard_for_stream()
519
+ elif current_step is None:
520
+ response["reply_markup"] = get_inline_keyboard_for_start()
521
  response["method"] = "editMessageText"
522
  response["message_id"] = message_id
523
  return response
 
524
  return {"status": "ok"}