AstraOS commited on
Commit
27999cb
Β·
verified Β·
1 Parent(s): 697a015

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +85 -30
app.py CHANGED
@@ -5,6 +5,7 @@ import time
5
  import datetime
6
  import traceback
7
  import fractions
 
8
 
9
  from fastapi import FastAPI, Request
10
  import av
@@ -14,8 +15,9 @@ app = FastAPI()
14
  # -------------------------------------------------------------------
15
  # Configuration & Global Variables
16
  # -------------------------------------------------------------------
17
- # (The bot token is assumed to be configured externally via the webhook URL.)
18
- # No token is used in any outgoing call functions.
 
19
  # Conversation state
20
  user_inputs = {}
21
  # The conversation fields will depend on the mode.
@@ -35,7 +37,7 @@ default_settings = {
35
 
36
  # Streaming state & statistics
37
  streaming_state = "idle" # "idle", "streaming", "paused", "stopped"
38
- stream_chat_id = None # Chat ID for responses
39
  stream_start_time = None
40
  frames_encoded = 0
41
  bytes_sent = 0
@@ -45,9 +47,13 @@ video_stream = None
45
  audio_stream_in = None
46
  output_stream = None
47
 
 
 
 
 
48
  # Live logging globals
49
  live_log_lines = [] # Rolling list (max 50 lines)
50
- last_error_notification = None # Stores error messages (if any)
51
 
52
  # -------------------------------------------------------------------
53
  # Enhanced Logging Setup
@@ -83,7 +89,7 @@ def create_html_message(text: str):
83
 
84
  def get_inline_keyboard_for_stream():
85
  # Inline keyboard for streaming controls after the stream has started
86
- return {
87
  "inline_keyboard": [
88
  [
89
  {"text": "⏸ Pause", "callback_data": "pause"},
@@ -95,16 +101,18 @@ def get_inline_keyboard_for_stream():
95
  ]
96
  ]
97
  }
 
98
 
99
  def get_inline_keyboard_for_start():
100
  # Inline keyboard with a start button for when conversation is complete.
101
- return {
102
  "inline_keyboard": [
103
  [
104
  {"text": "πŸš€ Start Streaming", "callback_data": "start_stream"}
105
  ]
106
  ]
107
  }
 
108
 
109
  def help_text():
110
  return (
@@ -123,6 +131,7 @@ def help_text():
123
 
124
  def send_guide_message(chat_id, message):
125
  logging.info(f"Sending message to chat {chat_id}: {message}")
 
126
  return {
127
  "method": "sendMessage",
128
  "chat_id": chat_id,
@@ -150,28 +159,55 @@ def validate_inputs():
150
  return True, ""
151
 
152
  # -------------------------------------------------------------------
153
- # Error Notification Helper
154
  # -------------------------------------------------------------------
155
  def notify_error(chat_id, error_message):
156
- global last_error_notification
157
- last_error_notification = error_message
158
- logging.error(f"Notifying user (chat {chat_id}) of error: {error_message}")
159
- # Because we are not making outgoing calls, this error message will be delivered
160
- # when the user next sends a command (e.g. /logs or /status).
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
161
 
162
  # -------------------------------------------------------------------
163
  # Logs History Handler (/logs)
164
  # -------------------------------------------------------------------
165
  def logs_history(chat_id):
166
- global last_error_notification
167
- log_text = ""
168
- if last_error_notification:
169
- log_text += "⚠️ *Error Notification:*\n" + last_error_notification + "\n\n"
170
- log_text += "\n".join(live_log_lines[-50:])
171
  return {
172
  "method": "sendMessage",
173
  "chat_id": chat_id,
174
- "text": f"<pre>{log_text}</pre>",
175
  "parse_mode": "HTML"
176
  }
177
 
@@ -187,12 +223,10 @@ def handle_start(chat_id):
187
  else:
188
  conversation_fields = ["input_url", "quality_settings", "video_codec", "audio_codec", "output_url"]
189
  current_step = conversation_fields[0]
190
- text = (
191
- "πŸ‘‹ *Welcome to the Stream Bot!*\n\n"
192
- "Let's set up your stream.\n"
193
- f"Please enter the *{current_step.replace('_', ' ')}*"
194
- f"{' (no default)' if current_step not in default_settings else f' _(default: {default_settings[current_step]})_'}:"
195
- )
196
  logging.info(f"/start command from chat {chat_id} (advanced_mode={advanced_mode})")
197
  return {
198
  "method": "sendMessage",
@@ -207,7 +241,8 @@ def handle_setting(chat_id):
207
  conversation_fields = ["input_url", "quality_settings", "video_codec", "audio_codec", "output_url"]
208
  user_inputs = {}
209
  current_step = conversation_fields[0]
210
- text = "βš™οΈ *Advanced Mode Activated!*\n\nPlease enter the *input url*:"
 
211
  logging.info(f"/setting command from chat {chat_id} - advanced mode enabled")
212
  return {
213
  "method": "sendMessage",
@@ -228,6 +263,7 @@ def handle_help(chat_id):
228
  def handle_conversation(chat_id, text):
229
  global current_step, user_inputs, conversation_fields
230
  if current_step:
 
231
  if text.strip() == "" and current_step in default_settings:
232
  user_inputs[current_step] = default_settings[current_step]
233
  logging.info(f"Using default for {current_step}: {default_settings[current_step]}")
@@ -243,14 +279,17 @@ def handle_conversation(chat_id, text):
243
  prompt += f" _(default: {default_settings[current_step]})_"
244
  return send_guide_message(chat_id, prompt)
245
  else:
 
246
  current_step = None
247
  valid, msg = validate_inputs()
248
  if not valid:
249
  return send_guide_message(chat_id, f"Validation error: {msg}")
 
250
  if not advanced_mode:
251
  user_inputs.setdefault("quality_settings", default_settings["quality_settings"])
252
  user_inputs.setdefault("video_codec", default_settings["video_codec"])
253
  user_inputs.setdefault("audio_codec", default_settings["audio_codec"])
 
254
  return {
255
  "method": "sendMessage",
256
  "chat_id": chat_id,
@@ -274,6 +313,7 @@ def stream_to_youtube(input_url, quality_settings, video_codec, audio_codec, out
274
  input_stream = av.open(input_url)
275
  output_stream = av.open(output_url, mode='w', format='flv')
276
 
 
277
  video_stream = output_stream.add_stream(video_codec, rate=30)
278
  video_stream.width = input_stream.streams.video[0].width
279
  video_stream.height = input_stream.streams.video[0].height
@@ -290,6 +330,7 @@ def stream_to_youtube(input_url, quality_settings, video_codec, audio_codec, out
290
  video_stream.bit_rate = 800000
291
  video_stream.bit_rate_tolerance = 200000
292
 
 
293
  audio_stream_in = input_stream.streams.audio[0]
294
  out_audio_stream = output_stream.add_stream(audio_codec, rate=audio_stream_in.rate)
295
  out_audio_stream.layout = "stereo"
@@ -298,6 +339,7 @@ def stream_to_youtube(input_url, quality_settings, video_codec, audio_codec, out
298
 
299
  logging.info("Streaming started successfully.")
300
 
 
301
  while streaming_state in ["streaming", "paused"]:
302
  for packet in input_stream.demux():
303
  if streaming_state == "stopped":
@@ -322,6 +364,7 @@ def stream_to_youtube(input_url, quality_settings, video_codec, audio_codec, out
322
  if hasattr(out_packet, "size"):
323
  bytes_sent += out_packet.size
324
 
 
325
  for out_packet in video_stream.encode():
326
  output_stream.mux(out_packet)
327
  for out_packet in out_audio_stream.encode():
@@ -330,6 +373,7 @@ def stream_to_youtube(input_url, quality_settings, video_codec, audio_codec, out
330
  if streaming_state == "paused":
331
  time.sleep(1)
332
 
 
333
  try:
334
  video_stream.close()
335
  out_audio_stream.close()
@@ -344,15 +388,18 @@ def stream_to_youtube(input_url, quality_settings, video_codec, audio_codec, out
344
  error_message = f"An error occurred during streaming: {str(e)}\n\n{traceback.format_exc()}"
345
  logging.error(error_message)
346
  streaming_state = "idle"
347
- notify_error(chat_id, error_message)
 
348
 
349
  def start_streaming(chat_id):
350
- global stream_thread, stream_chat_id
351
  valid, msg = validate_inputs()
352
  if not valid:
353
  return send_guide_message(chat_id, f"Validation error: {msg}")
 
354
  stream_chat_id = chat_id
355
  try:
 
356
  stream_thread = threading.Thread(
357
  target=stream_to_youtube,
358
  args=(
@@ -368,18 +415,24 @@ def start_streaming(chat_id):
368
  stream_thread.start()
369
  logging.info("Streaming thread started.")
370
 
 
 
 
 
 
 
 
371
  return {
372
  "method": "sendMessage",
373
  "chat_id": chat_id,
374
- "text": "πŸš€ *Streaming initiated!* Use the inline keyboard to control the stream. (Use /logs to view live log history.)",
375
  "reply_markup": get_inline_keyboard_for_stream(),
376
  "parse_mode": "Markdown"
377
  }
378
  except Exception as e:
379
  error_message = f"Failed to start streaming: {str(e)}"
380
  logging.error(error_message)
381
- notify_error(chat_id, error_message)
382
- return send_guide_message(chat_id, error_message)
383
 
384
  # -------------------------------------------------------------------
385
  # Stream Control Handlers
@@ -468,6 +521,7 @@ async def telegram_webhook(request: Request):
468
  elif text.startswith("/status"):
469
  return stream_status(chat_id)
470
  else:
 
471
  return handle_conversation(chat_id, text)
472
 
473
  # Process inline keyboard callback queries
@@ -489,6 +543,7 @@ async def telegram_webhook(request: Request):
489
  else:
490
  response = send_guide_message(chat_id, "❓ Unknown callback command.")
491
 
 
492
  response["method"] = "editMessageText"
493
  response["message_id"] = message_id
494
  return response
 
5
  import datetime
6
  import traceback
7
  import fractions
8
+ import requests
9
 
10
  from fastapi import FastAPI, Request
11
  import av
 
15
  # -------------------------------------------------------------------
16
  # Configuration & Global Variables
17
  # -------------------------------------------------------------------
18
+ # (No external token or outgoing call configuration is required now.)
19
+ # All functions will simply return JSON responses from the webhook endpoint.
20
+
21
  # Conversation state
22
  user_inputs = {}
23
  # The conversation fields will depend on the mode.
 
37
 
38
  # Streaming state & statistics
39
  streaming_state = "idle" # "idle", "streaming", "paused", "stopped"
40
+ stream_chat_id = None # Chat ID for periodic updates
41
  stream_start_time = None
42
  frames_encoded = 0
43
  bytes_sent = 0
 
47
  audio_stream_in = None
48
  output_stream = None
49
 
50
+ # Thread references
51
+ stream_thread = None
52
+ live_log_thread = None
53
+
54
  # Live logging globals
55
  live_log_lines = [] # Rolling list (max 50 lines)
56
+ live_log_message_id = None # Dummy value used for simulation
57
 
58
  # -------------------------------------------------------------------
59
  # Enhanced Logging Setup
 
89
 
90
  def get_inline_keyboard_for_stream():
91
  # Inline keyboard for streaming controls after the stream has started
92
+ keyboard = {
93
  "inline_keyboard": [
94
  [
95
  {"text": "⏸ Pause", "callback_data": "pause"},
 
101
  ]
102
  ]
103
  }
104
+ return keyboard
105
 
106
  def get_inline_keyboard_for_start():
107
  # Inline keyboard with a start button for when conversation is complete.
108
+ keyboard = {
109
  "inline_keyboard": [
110
  [
111
  {"text": "πŸš€ Start Streaming", "callback_data": "start_stream"}
112
  ]
113
  ]
114
  }
115
+ return keyboard
116
 
117
  def help_text():
118
  return (
 
131
 
132
  def send_guide_message(chat_id, message):
133
  logging.info(f"Sending message to chat {chat_id}: {message}")
134
+ # Return a JSON response that would be sent back to Telegram via webhook.
135
  return {
136
  "method": "sendMessage",
137
  "chat_id": chat_id,
 
159
  return True, ""
160
 
161
  # -------------------------------------------------------------------
162
+ # Notify Error Helper
163
  # -------------------------------------------------------------------
164
  def notify_error(chat_id, error_message):
165
+ # Instead of making an outgoing call, simply return a JSON response
166
+ # (In practice, this error notification would be sent back via the webhook response.)
167
+ payload = {
168
+ "method": "sendMessage",
169
+ "chat_id": chat_id,
170
+ "text": f"⚠️ *Streaming Error Occurred:*\n\n{error_message}\n\nPlease check the live logs for details.",
171
+ "parse_mode": "Markdown"
172
+ }
173
+ logging.error(f"Notifying error to chat {chat_id}: {error_message}")
174
+ return payload
175
+
176
+ # -------------------------------------------------------------------
177
+ # Live Logging Updater (Background Thread)
178
+ # -------------------------------------------------------------------
179
+ def live_log_updater(chat_id):
180
+ global live_log_message_id, streaming_state
181
+ try:
182
+ # Simulate sending an initial live log message in HTML format
183
+ payload = {
184
+ "chat_id": chat_id,
185
+ "text": "<pre>Live Logs:\nInitializing...</pre>",
186
+ "parse_mode": "HTML"
187
+ }
188
+ # Since we are not making an outgoing call, simply log and assign a dummy message ID.
189
+ live_log_message_id = 1 # Dummy value for simulation
190
+ logging.info("Simulated live log message sent with dummy id 1")
191
+ # Update live log every 1 second until streaming stops
192
+ while streaming_state in ["streaming", "paused"]:
193
+ # Prepare the log text from the last 15 log lines
194
+ log_text = "<pre>" + "\n".join(live_log_lines[-15:]) + "</pre>"
195
+ # Instead of calling an external API, log the update that would be sent.
196
+ logging.info(f"Live log update for chat {chat_id}: {log_text}")
197
+ time.sleep(1)
198
+ except Exception as e:
199
+ logging.error(f"Error in live log updater: {e}")
200
 
201
  # -------------------------------------------------------------------
202
  # Logs History Handler (/logs)
203
  # -------------------------------------------------------------------
204
  def logs_history(chat_id):
205
+ # Return the last 50 log lines in HTML format as a JSON response
206
+ log_text = "<pre>" + "\n".join(live_log_lines[-50:]) + "</pre>"
 
 
 
207
  return {
208
  "method": "sendMessage",
209
  "chat_id": chat_id,
210
+ "text": log_text,
211
  "parse_mode": "HTML"
212
  }
213
 
 
223
  else:
224
  conversation_fields = ["input_url", "quality_settings", "video_codec", "audio_codec", "output_url"]
225
  current_step = conversation_fields[0]
226
+ text = ("πŸ‘‹ *Welcome to the Stream Bot!*\n\n"
227
+ "Let's set up your stream.\n"
228
+ f"Please enter the *{current_step.replace('_', ' ')}*"
229
+ f"{' (no default)' if current_step not in default_settings else f' _(default: {default_settings[current_step]})_'}:")
 
 
230
  logging.info(f"/start command from chat {chat_id} (advanced_mode={advanced_mode})")
231
  return {
232
  "method": "sendMessage",
 
241
  conversation_fields = ["input_url", "quality_settings", "video_codec", "audio_codec", "output_url"]
242
  user_inputs = {}
243
  current_step = conversation_fields[0]
244
+ text = ("βš™οΈ *Advanced Mode Activated!*\n\n"
245
+ "Please enter the *input url*:")
246
  logging.info(f"/setting command from chat {chat_id} - advanced mode enabled")
247
  return {
248
  "method": "sendMessage",
 
263
  def handle_conversation(chat_id, text):
264
  global current_step, user_inputs, conversation_fields
265
  if current_step:
266
+ # If the response is empty and a default exists, use the default.
267
  if text.strip() == "" and current_step in default_settings:
268
  user_inputs[current_step] = default_settings[current_step]
269
  logging.info(f"Using default for {current_step}: {default_settings[current_step]}")
 
279
  prompt += f" _(default: {default_settings[current_step]})_"
280
  return send_guide_message(chat_id, prompt)
281
  else:
282
+ # All inputs have been collected.
283
  current_step = None
284
  valid, msg = validate_inputs()
285
  if not valid:
286
  return send_guide_message(chat_id, f"Validation error: {msg}")
287
+ # In simple mode, fill in advanced fields with defaults.
288
  if not advanced_mode:
289
  user_inputs.setdefault("quality_settings", default_settings["quality_settings"])
290
  user_inputs.setdefault("video_codec", default_settings["video_codec"])
291
  user_inputs.setdefault("audio_codec", default_settings["audio_codec"])
292
+ # Instead of asking the user to type "start", send an inline button.
293
  return {
294
  "method": "sendMessage",
295
  "chat_id": chat_id,
 
313
  input_stream = av.open(input_url)
314
  output_stream = av.open(output_url, mode='w', format='flv')
315
 
316
+ # Configure video stream
317
  video_stream = output_stream.add_stream(video_codec, rate=30)
318
  video_stream.width = input_stream.streams.video[0].width
319
  video_stream.height = input_stream.streams.video[0].height
 
330
  video_stream.bit_rate = 800000
331
  video_stream.bit_rate_tolerance = 200000
332
 
333
+ # Configure audio stream
334
  audio_stream_in = input_stream.streams.audio[0]
335
  out_audio_stream = output_stream.add_stream(audio_codec, rate=audio_stream_in.rate)
336
  out_audio_stream.layout = "stereo"
 
339
 
340
  logging.info("Streaming started successfully.")
341
 
342
+ # Stream loop: process packets until state changes
343
  while streaming_state in ["streaming", "paused"]:
344
  for packet in input_stream.demux():
345
  if streaming_state == "stopped":
 
364
  if hasattr(out_packet, "size"):
365
  bytes_sent += out_packet.size
366
 
367
+ # Flush remaining packets
368
  for out_packet in video_stream.encode():
369
  output_stream.mux(out_packet)
370
  for out_packet in out_audio_stream.encode():
 
373
  if streaming_state == "paused":
374
  time.sleep(1)
375
 
376
+ # Clean up resources
377
  try:
378
  video_stream.close()
379
  out_audio_stream.close()
 
388
  error_message = f"An error occurred during streaming: {str(e)}\n\n{traceback.format_exc()}"
389
  logging.error(error_message)
390
  streaming_state = "idle"
391
+ # Notify the user about the error by returning a JSON response with the error details.
392
+ return notify_error(chat_id, error_message)
393
 
394
  def start_streaming(chat_id):
395
+ global stream_thread, live_log_thread, stream_chat_id
396
  valid, msg = validate_inputs()
397
  if not valid:
398
  return send_guide_message(chat_id, f"Validation error: {msg}")
399
+
400
  stream_chat_id = chat_id
401
  try:
402
+ # Start the background streaming thread
403
  stream_thread = threading.Thread(
404
  target=stream_to_youtube,
405
  args=(
 
415
  stream_thread.start()
416
  logging.info("Streaming thread started.")
417
 
418
+ # Start the live log updater thread (updates every 1 second in HTML format)
419
+ live_log_thread = threading.Thread(target=live_log_updater, args=(chat_id,))
420
+ live_log_thread.daemon = True
421
+ live_log_thread.start()
422
+ logging.info("Live log updater started.")
423
+
424
+ # Immediately inform the user that streaming has started
425
  return {
426
  "method": "sendMessage",
427
  "chat_id": chat_id,
428
+ "text": "πŸš€ *Streaming initiated!* Live logs are now updating below. Use the inline keyboard to control the stream.",
429
  "reply_markup": get_inline_keyboard_for_stream(),
430
  "parse_mode": "Markdown"
431
  }
432
  except Exception as e:
433
  error_message = f"Failed to start streaming: {str(e)}"
434
  logging.error(error_message)
435
+ return notify_error(chat_id, error_message)
 
436
 
437
  # -------------------------------------------------------------------
438
  # Stream Control Handlers
 
521
  elif text.startswith("/status"):
522
  return stream_status(chat_id)
523
  else:
524
+ # Process conversation setup inputs
525
  return handle_conversation(chat_id, text)
526
 
527
  # Process inline keyboard callback queries
 
543
  else:
544
  response = send_guide_message(chat_id, "❓ Unknown callback command.")
545
 
546
+ # Edit the original message with updated information
547
  response["method"] = "editMessageText"
548
  response["message_id"] = message_id
549
  return response