AstraOS commited on
Commit
697a015
·
verified ·
1 Parent(s): 2acbcce

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +48 -22
app.py CHANGED
@@ -14,12 +14,13 @@ app = FastAPI()
14
  # -------------------------------------------------------------------
15
  # Configuration & Global Variables
16
  # -------------------------------------------------------------------
17
- # (No explicit token usage here; all responses are returned as JSON.)
 
18
  # Conversation state
19
  user_inputs = {}
20
- # Conversation fields depend on mode:
21
- # Simple mode (default): Only "input_url" and "output_url" are required.
22
- # Advanced mode (triggered by /setting): Additional fields are required.
23
  conversation_fields = []
24
  current_step = None
25
  advanced_mode = False
@@ -33,8 +34,8 @@ default_settings = {
33
  }
34
 
35
  # Streaming state & statistics
36
- streaming_state = "idle" # Possible values: "idle", "streaming", "paused", "stopped"
37
- stream_chat_id = None # Chat ID for the session
38
  stream_start_time = None
39
  frames_encoded = 0
40
  bytes_sent = 0
@@ -44,11 +45,9 @@ video_stream = None
44
  audio_stream_in = None
45
  output_stream = None
46
 
47
- # Thread references
48
- stream_thread = None
49
- # (No live log updater thread is launched because outgoing calls are not allowed.)
50
- # Instead, live logs are maintained in a global variable.
51
  live_log_lines = [] # Rolling list (max 50 lines)
 
52
 
53
  # -------------------------------------------------------------------
54
  # Enhanced Logging Setup
@@ -79,11 +78,11 @@ logger.addHandler(list_handler)
79
  # Utility Functions & UI Helpers
80
  # -------------------------------------------------------------------
81
  def create_html_message(text: str):
82
- # Returns a response dict with HTML parse mode and wrapped in <pre> for monospaced output.
83
  return {"parse_mode": "HTML", "text": f"<pre>{text}</pre>"}
84
 
85
  def get_inline_keyboard_for_stream():
86
- # Inline keyboard for controlling the stream.
87
  return {
88
  "inline_keyboard": [
89
  [
@@ -98,7 +97,7 @@ def get_inline_keyboard_for_stream():
98
  }
99
 
100
  def get_inline_keyboard_for_start():
101
- # Inline keyboard with a single "Start Streaming" button.
102
  return {
103
  "inline_keyboard": [
104
  [
@@ -144,30 +143,35 @@ def get_uptime():
144
  return "0"
145
 
146
  def validate_inputs():
147
- # Ensure that every required field is provided.
148
  missing = [field for field in conversation_fields if field not in user_inputs or not user_inputs[field]]
149
  if missing:
150
  return False, f"Missing fields: {', '.join(missing)}"
151
  return True, ""
152
 
153
  # -------------------------------------------------------------------
154
- # Error Handling Helper
155
  # -------------------------------------------------------------------
156
  def notify_error(chat_id, error_message):
157
- # Instead of making an outgoing call, we simply log the error.
158
- # The error will appear in the global log history (and can be retrieved via /logs).
159
- logging.error(f"Notify user (chat {chat_id}) about error: {error_message}")
 
 
160
 
161
  # -------------------------------------------------------------------
162
  # Logs History Handler (/logs)
163
  # -------------------------------------------------------------------
164
  def logs_history(chat_id):
165
- # Return the last 50 log lines in HTML format.
166
- log_text = "<pre>" + "\n".join(live_log_lines[-50:]) + "</pre>"
 
 
 
167
  return {
168
  "method": "sendMessage",
169
  "chat_id": chat_id,
170
- "text": log_text,
171
  "parse_mode": "HTML"
172
  }
173
 
@@ -176,6 +180,7 @@ def logs_history(chat_id):
176
  # -------------------------------------------------------------------
177
  def handle_start(chat_id):
178
  global current_step, user_inputs, conversation_fields, advanced_mode
 
179
  user_inputs = {}
180
  if not advanced_mode:
181
  conversation_fields = ["input_url", "output_url"]
@@ -229,6 +234,7 @@ def handle_conversation(chat_id, text):
229
  else:
230
  user_inputs[current_step] = text.strip()
231
  logging.info(f"Received {current_step}: {text.strip()}")
 
232
  idx = conversation_fields.index(current_step)
233
  if idx < len(conversation_fields) - 1:
234
  current_step = conversation_fields[idx + 1]
@@ -264,13 +270,16 @@ def stream_to_youtube(input_url, quality_settings, video_codec, audio_codec, out
264
  try:
265
  streaming_state = "streaming"
266
  reset_statistics()
 
267
  input_stream = av.open(input_url)
268
  output_stream = av.open(output_url, mode='w', format='flv')
 
269
  video_stream = output_stream.add_stream(video_codec, rate=30)
270
  video_stream.width = input_stream.streams.video[0].width
271
  video_stream.height = input_stream.streams.video[0].height
272
  video_stream.pix_fmt = input_stream.streams.video[0].format.name
273
  video_stream.codec_context.options.update({'g': '30'})
 
274
  if quality_settings.lower() == "high":
275
  video_stream.bit_rate = 3000000
276
  video_stream.bit_rate_tolerance = 1000000
@@ -280,11 +289,15 @@ def stream_to_youtube(input_url, quality_settings, video_codec, audio_codec, out
280
  elif quality_settings.lower() == "low":
281
  video_stream.bit_rate = 800000
282
  video_stream.bit_rate_tolerance = 200000
 
283
  audio_stream_in = input_stream.streams.audio[0]
284
  out_audio_stream = output_stream.add_stream(audio_codec, rate=audio_stream_in.rate)
285
  out_audio_stream.layout = "stereo"
 
286
  video_stream.codec_context.time_base = fractions.Fraction(1, video_stream.rate)
 
287
  logging.info("Streaming started successfully.")
 
288
  while streaming_state in ["streaming", "paused"]:
289
  for packet in input_stream.demux():
290
  if streaming_state == "stopped":
@@ -308,12 +321,15 @@ def stream_to_youtube(input_url, quality_settings, video_codec, audio_codec, out
308
  output_stream.mux(out_packet)
309
  if hasattr(out_packet, "size"):
310
  bytes_sent += out_packet.size
 
311
  for out_packet in video_stream.encode():
312
  output_stream.mux(out_packet)
313
  for out_packet in out_audio_stream.encode():
314
  output_stream.mux(out_packet)
 
315
  if streaming_state == "paused":
316
  time.sleep(1)
 
317
  try:
318
  video_stream.close()
319
  out_audio_stream.close()
@@ -321,6 +337,7 @@ def stream_to_youtube(input_url, quality_settings, video_codec, audio_codec, out
321
  input_stream.close()
322
  except Exception as cleanup_error:
323
  logging.error(f"Error during cleanup: {cleanup_error}")
 
324
  logging.info("Streaming complete, resources cleaned up.")
325
  streaming_state = "idle"
326
  except Exception as e:
@@ -350,10 +367,11 @@ def start_streaming(chat_id):
350
  stream_thread.daemon = True
351
  stream_thread.start()
352
  logging.info("Streaming thread started.")
 
353
  return {
354
  "method": "sendMessage",
355
  "chat_id": chat_id,
356
- "text": "🚀 *Streaming initiated!* Live logs are available via the /logs command. Use the inline keyboard to control the stream.",
357
  "reply_markup": get_inline_keyboard_for_stream(),
358
  "parse_mode": "Markdown"
359
  }
@@ -427,9 +445,12 @@ def stream_status(chat_id):
427
  async def telegram_webhook(request: Request):
428
  update = await request.json()
429
  logging.debug(f"Received update: {update}")
 
 
430
  if "message" in update:
431
  chat_id = update["message"]["chat"]["id"]
432
  text = update["message"].get("text", "").strip()
 
433
  if text.startswith("/setting"):
434
  return handle_setting(chat_id)
435
  elif text.startswith("/start"):
@@ -448,10 +469,13 @@ async def telegram_webhook(request: Request):
448
  return stream_status(chat_id)
449
  else:
450
  return handle_conversation(chat_id, text)
 
 
451
  elif "callback_query" in update:
452
  callback_data = update["callback_query"]["data"]
453
  chat_id = update["callback_query"]["message"]["chat"]["id"]
454
  message_id = update["callback_query"]["message"]["message_id"]
 
455
  if callback_data == "pause":
456
  response = pause_stream(chat_id)
457
  elif callback_data == "resume":
@@ -464,7 +488,9 @@ async def telegram_webhook(request: Request):
464
  response = start_streaming(chat_id)
465
  else:
466
  response = send_guide_message(chat_id, "❓ Unknown callback command.")
 
467
  response["method"] = "editMessageText"
468
  response["message_id"] = message_id
469
  return response
 
470
  return {"status": "ok"}
 
14
  # -------------------------------------------------------------------
15
  # Configuration & Global Variables
16
  # -------------------------------------------------------------------
17
+ # (The bot token is assumed to be configured externally via the webhook URL.)
18
+ # No token is used in any outgoing call functions.
19
  # Conversation state
20
  user_inputs = {}
21
+ # The conversation fields will depend on the mode.
22
+ # Simple mode (default): Only "input_url" and "output_url" are required.
23
+ # Advanced mode (if user sends /setting): Additional fields are required.
24
  conversation_fields = []
25
  current_step = None
26
  advanced_mode = False
 
34
  }
35
 
36
  # Streaming state & statistics
37
+ streaming_state = "idle" # "idle", "streaming", "paused", "stopped"
38
+ stream_chat_id = None # Chat ID for responses
39
  stream_start_time = None
40
  frames_encoded = 0
41
  bytes_sent = 0
 
45
  audio_stream_in = None
46
  output_stream = None
47
 
48
+ # Live logging globals
 
 
 
49
  live_log_lines = [] # Rolling list (max 50 lines)
50
+ last_error_notification = None # Stores error messages (if any)
51
 
52
  # -------------------------------------------------------------------
53
  # Enhanced Logging Setup
 
78
  # Utility Functions & UI Helpers
79
  # -------------------------------------------------------------------
80
  def create_html_message(text: str):
81
+ # Wrap text in <pre> tags for monospaced output (HTML parse mode)
82
  return {"parse_mode": "HTML", "text": f"<pre>{text}</pre>"}
83
 
84
  def get_inline_keyboard_for_stream():
85
+ # Inline keyboard for streaming controls after the stream has started
86
  return {
87
  "inline_keyboard": [
88
  [
 
97
  }
98
 
99
  def get_inline_keyboard_for_start():
100
+ # Inline keyboard with a start button for when conversation is complete.
101
  return {
102
  "inline_keyboard": [
103
  [
 
143
  return "0"
144
 
145
  def validate_inputs():
146
+ # Ensure all fields in conversation_fields have been provided
147
  missing = [field for field in conversation_fields if field not in user_inputs or not user_inputs[field]]
148
  if missing:
149
  return False, f"Missing fields: {', '.join(missing)}"
150
  return True, ""
151
 
152
  # -------------------------------------------------------------------
153
+ # Error Notification Helper
154
  # -------------------------------------------------------------------
155
  def notify_error(chat_id, error_message):
156
+ global last_error_notification
157
+ last_error_notification = error_message
158
+ logging.error(f"Notifying user (chat {chat_id}) of error: {error_message}")
159
+ # Because we are not making outgoing calls, this error message will be delivered
160
+ # when the user next sends a command (e.g. /logs or /status).
161
 
162
  # -------------------------------------------------------------------
163
  # Logs History Handler (/logs)
164
  # -------------------------------------------------------------------
165
  def logs_history(chat_id):
166
+ global last_error_notification
167
+ log_text = ""
168
+ if last_error_notification:
169
+ log_text += "⚠️ *Error Notification:*\n" + last_error_notification + "\n\n"
170
+ log_text += "\n".join(live_log_lines[-50:])
171
  return {
172
  "method": "sendMessage",
173
  "chat_id": chat_id,
174
+ "text": f"<pre>{log_text}</pre>",
175
  "parse_mode": "HTML"
176
  }
177
 
 
180
  # -------------------------------------------------------------------
181
  def handle_start(chat_id):
182
  global current_step, user_inputs, conversation_fields, advanced_mode
183
+ # By default, simple mode (unless advanced_mode was set via /setting)
184
  user_inputs = {}
185
  if not advanced_mode:
186
  conversation_fields = ["input_url", "output_url"]
 
234
  else:
235
  user_inputs[current_step] = text.strip()
236
  logging.info(f"Received {current_step}: {text.strip()}")
237
+
238
  idx = conversation_fields.index(current_step)
239
  if idx < len(conversation_fields) - 1:
240
  current_step = conversation_fields[idx + 1]
 
270
  try:
271
  streaming_state = "streaming"
272
  reset_statistics()
273
+
274
  input_stream = av.open(input_url)
275
  output_stream = av.open(output_url, mode='w', format='flv')
276
+
277
  video_stream = output_stream.add_stream(video_codec, rate=30)
278
  video_stream.width = input_stream.streams.video[0].width
279
  video_stream.height = input_stream.streams.video[0].height
280
  video_stream.pix_fmt = input_stream.streams.video[0].format.name
281
  video_stream.codec_context.options.update({'g': '30'})
282
+
283
  if quality_settings.lower() == "high":
284
  video_stream.bit_rate = 3000000
285
  video_stream.bit_rate_tolerance = 1000000
 
289
  elif quality_settings.lower() == "low":
290
  video_stream.bit_rate = 800000
291
  video_stream.bit_rate_tolerance = 200000
292
+
293
  audio_stream_in = input_stream.streams.audio[0]
294
  out_audio_stream = output_stream.add_stream(audio_codec, rate=audio_stream_in.rate)
295
  out_audio_stream.layout = "stereo"
296
+
297
  video_stream.codec_context.time_base = fractions.Fraction(1, video_stream.rate)
298
+
299
  logging.info("Streaming started successfully.")
300
+
301
  while streaming_state in ["streaming", "paused"]:
302
  for packet in input_stream.demux():
303
  if streaming_state == "stopped":
 
321
  output_stream.mux(out_packet)
322
  if hasattr(out_packet, "size"):
323
  bytes_sent += out_packet.size
324
+
325
  for out_packet in video_stream.encode():
326
  output_stream.mux(out_packet)
327
  for out_packet in out_audio_stream.encode():
328
  output_stream.mux(out_packet)
329
+
330
  if streaming_state == "paused":
331
  time.sleep(1)
332
+
333
  try:
334
  video_stream.close()
335
  out_audio_stream.close()
 
337
  input_stream.close()
338
  except Exception as cleanup_error:
339
  logging.error(f"Error during cleanup: {cleanup_error}")
340
+
341
  logging.info("Streaming complete, resources cleaned up.")
342
  streaming_state = "idle"
343
  except Exception as e:
 
367
  stream_thread.daemon = True
368
  stream_thread.start()
369
  logging.info("Streaming thread started.")
370
+
371
  return {
372
  "method": "sendMessage",
373
  "chat_id": chat_id,
374
+ "text": "🚀 *Streaming initiated!* Use the inline keyboard to control the stream. (Use /logs to view live log history.)",
375
  "reply_markup": get_inline_keyboard_for_stream(),
376
  "parse_mode": "Markdown"
377
  }
 
445
  async def telegram_webhook(request: Request):
446
  update = await request.json()
447
  logging.debug(f"Received update: {update}")
448
+
449
+ # Process messages from users
450
  if "message" in update:
451
  chat_id = update["message"]["chat"]["id"]
452
  text = update["message"].get("text", "").strip()
453
+
454
  if text.startswith("/setting"):
455
  return handle_setting(chat_id)
456
  elif text.startswith("/start"):
 
469
  return stream_status(chat_id)
470
  else:
471
  return handle_conversation(chat_id, text)
472
+
473
+ # Process inline keyboard callback queries
474
  elif "callback_query" in update:
475
  callback_data = update["callback_query"]["data"]
476
  chat_id = update["callback_query"]["message"]["chat"]["id"]
477
  message_id = update["callback_query"]["message"]["message_id"]
478
+
479
  if callback_data == "pause":
480
  response = pause_stream(chat_id)
481
  elif callback_data == "resume":
 
488
  response = start_streaming(chat_id)
489
  else:
490
  response = send_guide_message(chat_id, "❓ Unknown callback command.")
491
+
492
  response["method"] = "editMessageText"
493
  response["message_id"] = message_id
494
  return response
495
+
496
  return {"status": "ok"}