Hibiki-simple / app.py
fffiloni's picture
Fixes 500 errors for some users (#1)
ca1eb53 verified
import gradio as gr
import glob
import os
import shutil
import tempfile
from pydub import AudioSegment
from moviepy.editor import VideoFileClip, AudioFileClip, concatenate_videoclips, ImageClip
is_shared_ui = True if "fffiloni/Hibiki-simple" in os.environ['SPACE_ID'] else False
def extract_audio_as_mp3(video_path: str) -> str:
"""
Extracts the audio from a video file and saves it as a temporary MP3 file.
:param video_path: Path to the input video file.
:return: Path to the temporary MP3 file.
"""
# Load the video
video = VideoFileClip(video_path)
# Create a temporary file for the extracted audio
temp_audio = tempfile.NamedTemporaryFile(delete=False, suffix=".mp3")
# Extract and export the audio as MP3
video.audio.write_audiofile(temp_audio.name, codec="mp3")
return temp_audio.name # Return the temp file path
def process_audio(input_file):
# Load the audio file
audio = AudioSegment.from_file(input_file)
# Ensure it's in MP3 format
output_file = os.path.splitext(input_file)[0] + ".mp3"
if is_shared_ui:
# Limit duration to 1 minute (60,000 ms)
if len(audio) > 60000:
audio = audio[:60000] # Trim to 60 seconds
# Export as MP3
audio.export(output_file, format="mp3")
return output_file
def cleanup_old_audio():
"""Remove old audio files before starting a new inference."""
files_to_remove = glob.glob("out_en-*.wav") + glob.glob("final_output.wav")
if files_to_remove:
print(f"Cleaning up {len(files_to_remove)} old audio files...")
for file in files_to_remove:
try:
os.remove(file)
print(f"Deleted: {file}")
except Exception as e:
print(f"Error deleting {file}: {e}")
else:
print("No old audio files found.")
def find_audio_chunks():
"""Finds all out_en-*.wav files, sorts them, and returns the file paths."""
wav_files = glob.glob("out_en-*.wav")
# Extract numbers and sort properly
wav_files.sort(key=lambda x: int(x.split('-')[-1].split('.')[0]))
print(f"Found {len(wav_files)} audio chunks: {wav_files}")
return wav_files # Returning the list of file paths
def concatenate_audio(output_filename="final_output.wav"):
"""Concatenates all audio chunks and saves them to a final output file in a temporary directory."""
wav_files = find_audio_chunks() # Get sorted audio file paths
if not wav_files:
print("No audio files found.")
return []
# Create a temporary directory
temp_dir = tempfile.mkdtemp()
# Load and concatenate all audio files
#combined = AudioSegment.empty()
temp_wav_files = []
for file in wav_files:
#audio = AudioSegment.from_wav(file)
#combined += audio
# Move individual files to the temp directory
temp_file_path = os.path.join(temp_dir, os.path.basename(file))
shutil.move(file, temp_file_path)
temp_wav_files.append(temp_file_path)
# Define the final output path in the temporary directory
#temp_output_path = os.path.join(temp_dir, output_filename)
# Export the final combined audio
#combined.export(temp_output_path, format="wav")
#print(f"Concatenated audio saved at {temp_output_path}")
return temp_wav_files[0], temp_wav_files # Returning temp paths
def infer(audio_input_path):
cleanup_old_audio()
audio_input_path = process_audio(audio_input_path)
print(f"Processed file saved as: {audio_input_path}")
import subprocess
command = [
"python", "-m", "moshi.run_inference",
f"{audio_input_path}", "out_en.wav",
"--hf-repo", "kyutai/hibiki-1b-pytorch-bf16"
]
result = subprocess.run(command, capture_output=True, text=True)
# Print the standard output and error
print("STDOUT:", result.stdout)
print("STDERR:", result.stderr)
# Check if the command was successful
if result.returncode == 0:
print("Command executed successfully.")
first_out, file_list = concatenate_audio()
return first_out, gr.update(choices=file_list, value=file_list[0], visible=True), gr.update(visible=True), gr.update(value=file_list, visible=True), gr.update(visible=True), gr.update(visible=True), gr.update(visible=True)
else:
print("Error executing command.")
raise gr.Error("Error executing command")
def load_chosen_audio(audio_path):
return audio_path
def overlay_audio(
original_mp3: str,
translated_wav: str,
volume_reduction_db: int = 10,
cut_start: float = 0.0
) -> str:
"""
Overlays translated audio on top of the original, reduces the original volume,
and ensures the final audio lasts as long as the longer of the two tracks.
:param original_mp3: Path to the original MP3 file.
:param translated_wav: Path to the translated WAV file.
:param volume_reduction_db: Volume reduction in dB (default is -10 dB).
:param cut_start: Number of seconds to trim from the start of the translated audio (default: 0.0).
:return: Path to the temporary output WAV file.
"""
# Load original MP3 and convert to WAV
original = AudioSegment.from_mp3(original_mp3).set_frame_rate(16000).set_channels(1)
# Lower the volume
original = original - volume_reduction_db
# Load the translated WAV
translated = AudioSegment.from_wav(translated_wav).set_frame_rate(16000).set_channels(1)
# Trim the start of the translated audio if needed
if cut_start > 0:
cut_ms = int(cut_start * 1000) # Convert seconds to milliseconds
translated = translated[cut_ms:]
# Determine the final length (longer of the two)
final_length = max(len(original), len(translated))
# Extend the shorter track with silence to match the longer track
if len(original) < final_length:
original += AudioSegment.silent(duration=final_length - len(original))
if len(translated) < final_length:
translated += AudioSegment.silent(duration=final_length - len(translated))
# Overlay the translated speech over the original
combined = original.overlay(translated)
# Create a temporary file to save the output
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=".wav")
combined.export(temp_file.name, format="wav")
print(f"Final audio saved at: {temp_file.name}")
return temp_file.name
def process_final_combination(audio_in, chosen_translated, volume, cut_start, video_input):
audio_in = process_audio(audio_in)
temp_output_path = overlay_audio(audio_in, chosen_translated, volume, cut_start)
if video_input:
return gr.update(value=temp_output_path, visible=True), gr.update(visible=True)
else:
return gr.update(value=temp_output_path, visible=True), gr.update(visible=False)
def replace_video_audio(video_path: str, new_audio_path: str) -> str:
"""Replaces the original audio in the video and extends it if the new audio is longer.
Returns the path to a temporary video file.
"""
# Debugging: Ensure video_path is a string
print(f"DEBUG: video_path = {video_path}, type = {type(video_path)}")
if not isinstance(video_path, str):
raise ValueError(f"video_path must be a string, got {type(video_path)}")
# Load video
video = VideoFileClip(video_path)
# Load new audio
new_audio = AudioFileClip(new_audio_path)
# Extend video if new audio is longer
if new_audio.duration > video.duration:
last_frame = video.get_frame(video.duration - 0.1) # Extract last frame
freeze_frame = ImageClip(last_frame).set_duration(new_audio.duration - video.duration)
freeze_frame = freeze_frame.set_fps(video.fps) # Maintain video frame rate
video = concatenate_videoclips([video, freeze_frame])
# Set new audio
video = video.set_audio(new_audio)
# Create a temp file
temp_video = tempfile.NamedTemporaryFile(delete=False, suffix=".mp4")
# Save video (explicit codec settings for MoviePy <2.0)
video.write_videofile(
temp_video.name,
codec="libx264",
audio_codec="aac",
fps=video.fps, # Ensure FPS is set correctly
preset="medium" # Optional: Can be "slow", "medium", or "fast"
)
return gr.update(value=temp_video.name, visible=True) # Return path to temp video file
def clean_previous_video_input():
return gr.update(value=None)
def show_upcoming_component():
return gr.update(visible=True)
def hide_previous():
return gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False)
css="""
div#col-container{
margin: 0 auto;
max-width: 1200px;
}
"""
with gr.Blocks(css=css) as demo:
with gr.Column(elem_id="col-container"):
gr.Markdown("# Hibiki ")
gr.Markdown("This is a simple demo for Kyutai's Hibiki translation models • Currently supports French to English only.")
gr.HTML("""
<div style="display:flex;column-gap:4px;">
<a href="https://huggingface.co/spaces/fffiloni/Hibiki-simple?duplicate=true">
<img src="https://huggingface.co/datasets/huggingface/badges/resolve/main/duplicate-this-space-sm.svg" alt="Duplicate this Space">
</a>
</div>
""")
with gr.Row():
with gr.Column(scale=2):
video_input = gr.Video(label="Video IN (Optional)")
audio_input = gr.Audio(label="Audio IN", type="filepath")
submit_btn = gr.Button("Generate translations")
gr.Examples(
examples = [
"./examples/sample_fr_hibiki_intro.mp3",
"./examples/sample_fr_hibiki_crepes.mp3",
"./examples/sample_fr_hibiki_monologue_otis.mp3"
],
inputs = [audio_input]
)
with gr.Column(scale=3):
output_result = gr.Audio(label="Translated result")
with gr.Row():
dropdown_wav_selector = gr.Dropdown(
label="Pick a generated translated audio to load",
value = None,
visible=False,
scale=2
)
choose_this_btn = gr.Button("Apply and check this one as translated audio overlay", scale=1, visible=False)
with gr.Row():
volume_reduction = gr.Slider(label="Original audio Volume reduction", minimum=0, maximum=60, step=1, value=30, visible=False)
cut_start = gr.Slider(label="Reduce translator delay (seconds)", minimum=0.0, maximum=4.0, step=0.1, value=2.0, visible=False)
combined_output = gr.Audio(label="Combinated Audio", type="filepath", visible=False, show_download_button=True)
apply_to_video_btn = gr.Button("Apply this combination to your video", visible=False)
final_video_out = gr.Video(label="Video + Translated Audio", visible=False)
with gr.Accordion("Downloadable audio Output list", open=False, visible=False) as result_accordion:
wav_list = gr.Files(label="Output Audio List", visible=False)
audio_input.upload(
fn = clean_previous_video_input,
inputs = None,
outputs = [video_input]
)
video_input.upload(
fn = extract_audio_as_mp3,
inputs = [video_input],
outputs = [audio_input]
)
dropdown_wav_selector.select(
fn = load_chosen_audio,
inputs = [dropdown_wav_selector],
outputs = [output_result],
queue = False
)
choose_this_btn.click(
fn = show_upcoming_component,
inputs=None,
outputs=[combined_output]
).then(
fn = process_final_combination,
inputs = [audio_input, dropdown_wav_selector, volume_reduction, cut_start, video_input],
outputs = [combined_output, apply_to_video_btn]
)
apply_to_video_btn.click(
fn = show_upcoming_component,
inputs=None,
outputs=[final_video_out]
).then(
fn = replace_video_audio,
inputs = [video_input, combined_output],
outputs = [final_video_out]
)
submit_btn.click(
fn = hide_previous,
inputs = None,
outputs = [dropdown_wav_selector, result_accordion, wav_list, choose_this_btn, combined_output, apply_to_video_btn, final_video_out, volume_reduction, cut_start]
).then(
fn = infer,
inputs = [audio_input],
outputs = [output_result, dropdown_wav_selector, result_accordion, wav_list, choose_this_btn, volume_reduction, cut_start]
)
demo.queue().launch(show_api=False, show_error=True, ssr_mode=False)