Hibiki-simple / app.py
fffiloni's picture
queue handling
510db0c verified
import gradio as gr
import spaces
import glob
import os
import shutil
import tempfile
from pydub import AudioSegment
from moviepy.editor import VideoFileClip, AudioFileClip, concatenate_videoclips, ImageClip
is_shared_ui = True if "fffiloni/Hibiki-simple" in os.environ['SPACE_ID'] else False
def extract_audio_as_mp3(video_path: str) -> str:
"""
Extracts the audio from a video file and saves it as a temporary MP3 file.
:param video_path: Path to the input video file.
:return: Path to the temporary MP3 file.
"""
# Load the video
video = VideoFileClip(video_path)
# Create a temporary file for the extracted audio
temp_audio = tempfile.NamedTemporaryFile(delete=False, suffix=".mp3")
# Extract and export the audio as MP3
video.audio.write_audiofile(temp_audio.name, codec="mp3")
return temp_audio.name # Return the temp file path
def process_audio(input_file):
"""
Processes the input audio file:
- Converts it to MP3 format
- Trims the audio to 1 minute if in shared UI mode
Args:
input_file (str): Path to the audio file (WAV/MP3/etc.)
Returns:
str: Path to the converted MP3 file
"""
# Load the audio file
audio = AudioSegment.from_file(input_file)
# Ensure it's in MP3 format
output_file = os.path.splitext(input_file)[0] + ".mp3"
if is_shared_ui:
# Limit duration to 1 minute (60,000 ms)
if len(audio) > 60000:
audio = audio[:60000] # Trim to 60 seconds
# Export as MP3
audio.export(output_file, format="mp3")
return output_file
def cleanup_old_audio():
"""Remove old audio files before starting a new inference."""
files_to_remove = glob.glob("out_en-*.wav") + glob.glob("final_output.wav")
if files_to_remove:
print(f"Cleaning up {len(files_to_remove)} old audio files...")
for file in files_to_remove:
try:
os.remove(file)
print(f"Deleted: {file}")
except Exception as e:
print(f"Error deleting {file}: {e}")
else:
print("No old audio files found.")
def find_audio_chunks():
"""Finds all out_en-*.wav files, sorts them, and returns the file paths."""
wav_files = glob.glob("out_en-*.wav")
# Extract numbers and sort properly
wav_files.sort(key=lambda x: int(x.split('-')[-1].split('.')[0]))
print(f"Found {len(wav_files)} audio chunks: {wav_files}")
return wav_files # Returning the list of file paths
def concatenate_audio(output_filename="final_output.wav"):
"""
Concatenates audio chunks created by the translation model.
Finds all chunk files matching the `out_en-*.wav` pattern,
sorts them numerically, moves them to a temp folder,
and returns the path to the first file and the list of all files.
Returns:
Tuple[str, List[str]]: Path to first chunk and list of all chunks
"""
wav_files = find_audio_chunks() # Get sorted audio file paths
if not wav_files:
print("No audio files found.")
return []
# Create a temporary directory
temp_dir = tempfile.mkdtemp()
# Load and concatenate all audio files
#combined = AudioSegment.empty()
temp_wav_files = []
for file in wav_files:
#audio = AudioSegment.from_wav(file)
#combined += audio
# Move individual files to the temp directory
temp_file_path = os.path.join(temp_dir, os.path.basename(file))
shutil.move(file, temp_file_path)
temp_wav_files.append(temp_file_path)
# Define the final output path in the temporary directory
#temp_output_path = os.path.join(temp_dir, output_filename)
# Export the final combined audio
#combined.export(temp_output_path, format="wav")
#print(f"Concatenated audio saved at {temp_output_path}")
return temp_wav_files[0], temp_wav_files # Returning temp paths
@spaces.GPU()
def infer(audio_input_path):
"""
Perform translation inference on an audio file using the Hibiki model.
This function orchestrates the full processing and translation pipeline:
- Cleans up old audio files from previous runs
- Processes the input audio (trims it and ensures correct format)
- Executes the Hibiki translation model via subprocess
- Concatenates translated audio chunks
- Returns the translated audio and UI updates
Args:
audio_input_path (str): Path to the input audio file (e.g., MP3 or WAV)
Returns:
A tuple with:
- Path to the first translated audio chunk (WAV file)
- Updated Gradio Dropdown component with all chunk file choices
- Visibility settings for additional audio controls and result components
"""
cleanup_old_audio()
audio_input_path = process_audio(audio_input_path)
print(f"Processed file saved as: {audio_input_path}")
import subprocess
command = [
"python", "-m", "moshi.run_inference",
f"{audio_input_path}", "out_en.wav",
"--hf-repo", "kyutai/hibiki-1b-pytorch-bf16"
]
result = subprocess.run(command, capture_output=True, text=True)
# Print the standard output and error
print("STDOUT:", result.stdout)
print("STDERR:", result.stderr)
# Check if the command was successful
if result.returncode == 0:
print("Command executed successfully.")
first_out, file_list = concatenate_audio()
return first_out, gr.update(choices=file_list, value=file_list[0], visible=True), gr.update(visible=True), gr.update(value=file_list, visible=True), gr.update(visible=True), gr.update(visible=True), gr.update(visible=True)
else:
print("Error executing command.")
raise gr.Error("Error executing command")
def load_chosen_audio(audio_path):
return audio_path
def overlay_audio(
original_mp3: str,
translated_wav: str,
volume_reduction_db: int = 10,
cut_start: float = 0.0
) -> str:
"""
Overlays translated audio on top of the original, reduces the original volume,
and ensures the final audio lasts as long as the longer of the two tracks.
:param original_mp3: Path to the original MP3 file.
:param translated_wav: Path to the translated WAV file.
:param volume_reduction_db: Volume reduction in dB (default is -10 dB).
:param cut_start: Number of seconds to trim from the start of the translated audio (default: 0.0).
:return: Path to the temporary output WAV file.
"""
# Load original MP3 and convert to WAV
original = AudioSegment.from_mp3(original_mp3).set_frame_rate(16000).set_channels(1)
# Lower the volume
original = original - volume_reduction_db
# Load the translated WAV
translated = AudioSegment.from_wav(translated_wav).set_frame_rate(16000).set_channels(1)
# Trim the start of the translated audio if needed
if cut_start > 0:
cut_ms = int(cut_start * 1000) # Convert seconds to milliseconds
translated = translated[cut_ms:]
# Determine the final length (longer of the two)
final_length = max(len(original), len(translated))
# Extend the shorter track with silence to match the longer track
if len(original) < final_length:
original += AudioSegment.silent(duration=final_length - len(original))
if len(translated) < final_length:
translated += AudioSegment.silent(duration=final_length - len(translated))
# Overlay the translated speech over the original
combined = original.overlay(translated)
# Create a temporary file to save the output
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=".wav")
combined.export(temp_file.name, format="wav")
print(f"Final audio saved at: {temp_file.name}")
return temp_file.name
def process_final_combination(audio_in, chosen_translated, volume, cut_start, video_input):
audio_in = process_audio(audio_in)
temp_output_path = overlay_audio(audio_in, chosen_translated, volume, cut_start)
if video_input:
return gr.update(value=temp_output_path, visible=True), gr.update(visible=True)
else:
return gr.update(value=temp_output_path, visible=True), gr.update(visible=False)
import tempfile
import gradio as gr
from moviepy.editor import VideoFileClip, AudioFileClip, ImageClip, concatenate_videoclips
def replace_video_audio(video_path: str, new_audio_path: str) -> str:
"""
Replaces a video's audio with new translated audio.
If the new audio is longer than the video, extends the video
by freezing the last frame.
Args:
video_path (str): Path to the video file
new_audio_path (str): Path to the new audio file
Returns:
gr.update: Gradio update pointing to the new video file path
"""
# Debugging: Ensure video_path is a string
print(f"DEBUG: video_path = {video_path}, type = {type(video_path)}")
if not isinstance(video_path, str):
raise ValueError(f"video_path must be a string, got {type(video_path)}")
# Load video and audio
video = VideoFileClip(video_path)
new_audio = AudioFileClip(new_audio_path)
# Extend video if new audio is longer
if new_audio.duration > video.duration:
# Safely extract last frame
print("Extending video to match longer audio...")
last_frame = None
for frame in video.iter_frames(): # Iterates through all frames
last_frame = frame
if last_frame is None:
raise RuntimeError("Failed to extract last frame from video.")
freeze_duration = new_audio.duration - video.duration
freeze_frame = ImageClip(last_frame).set_duration(freeze_duration).set_fps(video.fps)
video = concatenate_videoclips([video, freeze_frame])
# Set the new audio track
video = video.set_audio(new_audio)
# Create a temp file for output
temp_video = tempfile.NamedTemporaryFile(delete=False, suffix=".mp4")
# Save the final video
video.write_videofile(
temp_video.name,
codec="libx264",
audio_codec="aac",
fps=video.fps,
preset="medium"
)
# Clean up resources
video.close()
new_audio.close()
# Return path to new video
return gr.update(value=temp_video.name, visible=True)
def clean_previous_video_input():
return gr.update(value=None)
def show_upcoming_component():
return gr.update(visible=True)
def hide_previous():
return gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False)
css="""
div#col-container{
margin: 0 auto;
max-width: 1200px;
}
"""
with gr.Blocks(css=css) as demo:
with gr.Column(elem_id="col-container"):
gr.Markdown("# Hibiki ")
gr.Markdown("This is a simple demo for Kyutai's Hibiki translation models • Currently supports French to English only.")
gr.HTML("""
<div style="display:flex;column-gap:4px;">
<a href="https://huggingface.co/spaces/fffiloni/Hibiki-simple?duplicate=true">
<img src="https://huggingface.co/datasets/huggingface/badges/resolve/main/duplicate-this-space-sm.svg" alt="Duplicate this Space">
</a>
</div>
""")
with gr.Row():
with gr.Column(scale=2):
video_input = gr.Video(label="Video IN (Optional)")
audio_input = gr.Audio(label="Audio IN", type="filepath")
submit_btn = gr.Button("Generate translations")
gr.Examples(
examples = [
"./examples/sample_fr_hibiki_intro.mp3",
"./examples/sample_fr_hibiki_crepes.mp3",
"./examples/sample_fr_hibiki_monologue_otis.mp3"
],
inputs = [audio_input]
)
with gr.Column(scale=3):
output_result = gr.Audio(label="Translated result")
with gr.Row():
dropdown_wav_selector = gr.Dropdown(
label="Pick a generated translated audio to load",
value = None,
visible=False,
scale=2
)
choose_this_btn = gr.Button("Apply and check this one as translated audio overlay", scale=1, visible=False)
with gr.Row():
volume_reduction = gr.Slider(label="Original audio Volume reduction", minimum=0, maximum=60, step=1, value=30, visible=False)
cut_start = gr.Slider(label="Reduce translator delay (seconds)", minimum=0.0, maximum=4.0, step=0.1, value=2.0, visible=False)
combined_output = gr.Audio(label="Combinated Audio", type="filepath", visible=False, show_download_button=True)
apply_to_video_btn = gr.Button("Apply this combination to your video", visible=False)
final_video_out = gr.Video(label="Video + Translated Audio", visible=False)
with gr.Accordion("Downloadable audio Output list", open=False, visible=False) as result_accordion:
wav_list = gr.Files(label="Output Audio List", visible=False)
audio_input.upload(
fn = clean_previous_video_input,
inputs = None,
outputs = [video_input],
queue=False,
show_api=False
)
video_input.upload(
fn = extract_audio_as_mp3,
inputs = [video_input],
outputs = [audio_input],
queue=False,
show_api=False
)
dropdown_wav_selector.select(
fn = load_chosen_audio,
inputs = [dropdown_wav_selector],
outputs = [output_result],
queue = False,
show_api=False
)
choose_this_btn.click(
fn = show_upcoming_component,
inputs=None,
outputs=[combined_output],
queue=False,
show_api=False
).then(
fn = process_final_combination,
inputs = [audio_input, dropdown_wav_selector, volume_reduction, cut_start, video_input],
outputs = [combined_output, apply_to_video_btn],
show_api=False
)
apply_to_video_btn.click(
fn = show_upcoming_component,
inputs=None,
outputs=[final_video_out],
queue=False,
show_api=False
).then(
fn = replace_video_audio,
inputs = [video_input, combined_output],
outputs = [final_video_out],
show_api=False
)
submit_btn.click(
fn = hide_previous,
inputs = None,
outputs = [dropdown_wav_selector, result_accordion, wav_list, choose_this_btn, combined_output, apply_to_video_btn, final_video_out, volume_reduction, cut_start],
show_api=False
).then(
fn = infer,
inputs = [audio_input],
outputs = [output_result, dropdown_wav_selector, result_accordion, wav_list, choose_this_btn, volume_reduction, cut_start],
show_api=True
)
demo.queue().launch(show_api=True, show_error=True, ssr_mode=False, mcp_server=True)