Spaces:
Running
on
Zero
Running
on
Zero
import gradio as gr | |
import glob | |
import os | |
import shutil | |
import tempfile | |
from pydub import AudioSegment | |
def process_audio(input_file): | |
# Load the audio file | |
audio = AudioSegment.from_file(input_file) | |
# Ensure it's in MP3 format | |
output_file = os.path.splitext(input_file)[0] + ".mp3" | |
# Limit duration to 1 minute (60,000 ms) | |
if len(audio) > 60000: | |
audio = audio[:60000] # Trim to 60 seconds | |
# Export as MP3 | |
audio.export(output_file, format="mp3") | |
return output_file | |
def cleanup_old_audio(): | |
"""Remove old audio files before starting a new inference.""" | |
files_to_remove = glob.glob("out_en-*.wav") + glob.glob("final_output.wav") | |
if files_to_remove: | |
print(f"Cleaning up {len(files_to_remove)} old audio files...") | |
for file in files_to_remove: | |
try: | |
os.remove(file) | |
print(f"Deleted: {file}") | |
except Exception as e: | |
print(f"Error deleting {file}: {e}") | |
else: | |
print("No old audio files found.") | |
def find_audio_chunks(): | |
"""Finds all out_en-*.wav files, sorts them, and returns the file paths.""" | |
wav_files = glob.glob("out_en-*.wav") | |
# Extract numbers and sort properly | |
wav_files.sort(key=lambda x: int(x.split('-')[-1].split('.')[0])) | |
print(f"Found {len(wav_files)} audio chunks: {wav_files}") | |
return wav_files # Returning the list of file paths | |
def concatenate_audio(output_filename="final_output.wav"): | |
"""Concatenates all audio chunks and saves them to a final output file in a temporary directory.""" | |
wav_files = find_audio_chunks() # Get sorted audio file paths | |
if not wav_files: | |
print("No audio files found.") | |
return [] | |
# Create a temporary directory | |
temp_dir = tempfile.mkdtemp() | |
# Load and concatenate all audio files | |
combined = AudioSegment.empty() | |
temp_wav_files = [] | |
for file in wav_files: | |
audio = AudioSegment.from_wav(file) | |
combined += audio | |
# Move individual files to the temp directory | |
temp_file_path = os.path.join(temp_dir, os.path.basename(file)) | |
shutil.move(file, temp_file_path) | |
temp_wav_files.append(temp_file_path) | |
# Define the final output path in the temporary directory | |
temp_output_path = os.path.join(temp_dir, output_filename) | |
# Export the final combined audio | |
combined.export(temp_output_path, format="wav") | |
print(f"Concatenated audio saved at {temp_output_path}") | |
return temp_output_path, temp_wav_files # Returning temp paths | |
def infer(audio_input_path): | |
cleanup_old_audio() | |
audio_input_path = process_audio(audio_input_path) | |
print(f"Processed file saved as: {audio_input_path}") | |
import subprocess | |
command = [ | |
"python", "-m", "moshi.run_inference", | |
f"{audio_input_path}", "out_en.wav", | |
"--hf-repo", "kyutai/hibiki-1b-pytorch-bf16" | |
] | |
result = subprocess.run(command, capture_output=True, text=True) | |
# Print the standard output and error | |
print("STDOUT:", result.stdout) | |
print("STDERR:", result.stderr) | |
# Check if the command was successful | |
if result.returncode == 0: | |
print("Command executed successfully.") | |
concat_out, file_list = concatenate_audio() | |
return concat_out, gr.update(choices=file_list, value=file_list[0], visible=True), gr.update(visible=True), gr.update(value=file_list, visible=True), gr.update(visible=True) | |
else: | |
print("Error executing command.") | |
raise gr.Error("Error executing command") | |
def load_chosen_audio(audio_path): | |
return audio_path | |
def overlay_audio(original_mp3: str, translated_wav: str, volume_reduction_db: int = 10) -> str: | |
""" | |
Overlays translated audio on top of the original, reduces the original volume, | |
and ensures the final audio lasts as long as the longer of the two tracks. | |
:param original_mp3: Path to the original MP3 file. | |
:param translated_wav: Path to the translated WAV file. | |
:param volume_reduction_db: Volume reduction in dB (default is -10 dB). | |
:return: Path to the temporary output WAV file. | |
""" | |
# Load original MP3 and convert to WAV | |
original = AudioSegment.from_mp3(original_mp3).set_frame_rate(16000).set_channels(1) | |
# Lower the volume (default is -10 dB) | |
original = original - volume_reduction_db | |
# Load the translated WAV | |
translated = AudioSegment.from_wav(translated_wav).set_frame_rate(16000).set_channels(1) | |
# Determine the final length (longer of the two) | |
final_length = max(len(original), len(translated)) | |
# Extend the shorter track with silence to match the longer track | |
if len(original) < final_length: | |
original = original + AudioSegment.silent(duration=final_length - len(original)) | |
if len(translated) < final_length: | |
translated = translated + AudioSegment.silent(duration=final_length - len(translated)) | |
# Overlay the translated speech over the original | |
combined = original.overlay(translated) | |
# Create a temporary file to save the output | |
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=".wav") | |
combined.export(temp_file.name, format="wav") | |
print(f"Final audio saved at: {temp_file.name}") | |
return temp_file.name # Return the temporary file path | |
def process_final_combination(audio_in, chosen_translated): | |
audio_in = process_audio(audio_in) | |
temp_output_path = overlay_audio(audio_in, chosen_translated) | |
return gr.update(value=temp_output_path, visible=True) | |
def hide_previous(): | |
return gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False) | |
css=""" | |
div#col-container{ | |
margin: 0 auto; | |
max-width: 720px; | |
} | |
""" | |
with gr.Blocks(css=css) as demo: | |
with gr.Column(elem_id="col-container"): | |
gr.Markdown("# Hibiki ") | |
gr.Markdown("This is a simple demo for Kyutai's Hibiki translation models • Currently supports French to English only.") | |
audio_input = gr.Audio(label="Audio IN", type="filepath") | |
submit_btn = gr.Button("Submit") | |
output_result = gr.Audio(label="Translated result") | |
with gr.Row(): | |
dropdown_wav_selector = gr.Dropdown( | |
label="Pick a generated audio to load", | |
value = None, | |
visible=False, | |
scale=2 | |
) | |
choose_this_btn = gr.Button("Use this one", scale=1, visible=False) | |
combined_output = gr.Audio("Combined Outpu", visible=False) | |
with gr.Accordion("Downloadable audio Output list", open=False, visible=False) as result_accordion: | |
wav_list = gr.Files(label="Output Audio List", visible=False) | |
gr.Examples( | |
examples = [ | |
"./examples/sample_fr_hibiki_intro.mp3", | |
"./examples/sample_fr_hibiki_crepes.mp3", | |
"./examples/sample_fr_hibiki_monologue_otis.mp3" | |
], | |
inputs = [audio_input] | |
) | |
dropdown_wav_selector.select( | |
fn = load_chosen_audio, | |
inputs = [dropdown_wav_selector], | |
outputs = [output_result], | |
queue = False | |
) | |
choose_this_btn.click( | |
fn = process_final_combination, | |
inputs = [audio_input, dropdown_wav_selector], | |
outputs = [combined_output] | |
) | |
submit_btn.click( | |
fn = hide_previous, | |
inputs = None, | |
outputs = [dropdown_wav_selector, result_accordion, wav_list, choose_this_btn, combined_output] | |
).then( | |
fn = infer, | |
inputs = [audio_input], | |
outputs = [output_result, dropdown_wav_selector, result_accordion, wav_list, choose_this_btn] | |
) | |
demo.queue().launch(show_api=False, show_error=True) |