Spaces:
Runtime error
Runtime error
File size: 4,481 Bytes
d0639dc 6dfcd3a d0639dc 6dfcd3a d0639dc 681abd4 6dfcd3a 681abd4 6dfcd3a 681abd4 6dfcd3a 681abd4 d0639dc 6dfcd3a 681abd4 6dfcd3a 681abd4 6dfcd3a 681abd4 6dfcd3a 681abd4 6dfcd3a d0639dc 6dfcd3a 681abd4 d0639dc 681abd4 6dfcd3a d0639dc 681abd4 d0639dc 6dfcd3a d0639dc |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 |
import asyncio
import io
import logging
import select
import time
import traceback
from typing import List
import av
import numpy as np
import streamlit as st
from streamlit_webrtc import WebRtcMode, webrtc_streamer
import pydub
from dotenv import load_dotenv
load_dotenv()
from sample_utils.turn import get_ice_servers
logger = logging.getLogger(__name__)
import subprocess
import os
import ray
from ray.util.queue import Queue
if not ray.is_initialized():
# Try to connect to a running Ray cluster
ray_address = os.getenv('RAY_ADDRESS')
if ray_address:
ray.init(ray_address, namespace="project_charles")
else:
ray.init(namespace="project_charles")
@ray.remote
class FFMpegConverterActor:
def __init__(self, queue: Queue, buffer_size: int = 1920, output_format: str='s16le'):
self.queue = queue
self.buffer_size = 1920
self.output_format = output_format
self.input_pipe = None
self.output_pipe = None
self.process = None
async def run(self):
while True:
chunk = await self.output_pipe.read(self.buffer_size)
await self.queue.put_async(chunk)
async def start_process(self):
cmd = [
'ffmpeg',
'-i', 'pipe:0', # read from stdin
'-f', self.output_format,
'-ar', '48000',
'-ac', '1',
'pipe:1' # write to stdout
]
self.process = await asyncio.create_subprocess_exec(
*cmd,
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
self.input_pipe = self.process.stdin
self.output_pipe = self.process.stdout
assert self.input_pipe is not None, "input_pipe was not initialized"
print (f"input_pipe: {self.input_pipe}")
async def push_chunk(self, chunk):
try:
self.input_pipe.write(chunk)
# await self.input_pipe.drain()
except BrokenPipeError:
# If the pipe is broken, restart the process.
await self.start_process()
self.input_pipe.write(chunk)
# await self.input_pipe.drain()
# def has_processed_all_data(self):
# return self.process.poll() is not None
def close(self):
self.input_pipe.close()
self.output_pipe.close()
self.process.wait()
def video_frame_callback(
frame: av.VideoFrame,
) -> av.VideoFrame:
return frame
with open("chunks.pkl", "rb") as f:
import pickle
debug_chunks = pickle.load(f)
converter_queue = Queue(maxsize=100)
converter_actor = FFMpegConverterActor.remote(converter_queue)
ray.get(converter_actor.start_process.remote())
converter_actor.run.remote()
for chunk in debug_chunks:
ray.get(converter_actor.push_chunk.remote(chunk))
# emptry array of type int16
sample_buffer = np.zeros((0), dtype=np.int16)
def process_frame(old_frame):
try:
output_channels = 2
output_sample_rate = 44100
required_samples = old_frame.samples
if not converter_queue.empty():
frame_as_bytes = converter_queue.get()
samples = np.frombuffer(frame_as_bytes, dtype=np.int16)
else:
# create a byte array of zeros
samples = np.zeros((required_samples * 2 * 1), dtype=np.int16)
# Duplicate mono channel for stereo
if output_channels == 2:
samples = np.vstack((samples, samples)).reshape((-1,), order='F')
samples = samples.reshape(1, -1)
layout = 'stereo' if output_channels == 2 else 'mono'
new_frame = av.AudioFrame.from_ndarray(samples, format='s16', layout=layout)
new_frame.sample_rate = old_frame.sample_rate
new_frame.pts = old_frame.pts
return new_frame
except Exception as e:
print (e)
traceback.print_exc()
raise(e)
def audio_frame_callback(old_frame: av.AudioFrame) -> av.AudioFrame:
new_frame = process_frame(old_frame)
# print (f"frame: {old_frame}, pts: {old_frame.pts}")
# print (f"new_frame: {new_frame}, pts: {new_frame.pts}")
return new_frame
# return old_frame
webrtc_streamer(
key="delay",
mode=WebRtcMode.SENDRECV,
rtc_configuration={"iceServers": get_ice_servers()},
video_frame_callback=video_frame_callback,
audio_frame_callback=audio_frame_callback,
)
|