Spaces:
Running
on
A10G
Running
on
A10G
import gradio as gr | |
import glob | |
import os | |
import shutil | |
import tempfile | |
from pydub import AudioSegment | |
from moviepy.editor import VideoFileClip, AudioFileClip, concatenate_videoclips, ImageClip | |
is_shared_ui = True if "fffiloni/Hibiki-simple" in os.environ['SPACE_ID'] else False | |
def extract_audio_as_mp3(video_path: str) -> str: | |
""" | |
Extracts the audio from a video file and saves it as a temporary MP3 file. | |
:param video_path: Path to the input video file. | |
:return: Path to the temporary MP3 file. | |
""" | |
# Load the video | |
video = VideoFileClip(video_path) | |
# Create a temporary file for the extracted audio | |
temp_audio = tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") | |
# Extract and export the audio as MP3 | |
video.audio.write_audiofile(temp_audio.name, codec="mp3") | |
return temp_audio.name # Return the temp file path | |
def process_audio(input_file): | |
# Load the audio file | |
audio = AudioSegment.from_file(input_file) | |
# Ensure it's in MP3 format | |
output_file = os.path.splitext(input_file)[0] + ".mp3" | |
if is_shared_ui: | |
# Limit duration to 1 minute (60,000 ms) | |
if len(audio) > 60000: | |
audio = audio[:60000] # Trim to 60 seconds | |
# Export as MP3 | |
audio.export(output_file, format="mp3") | |
return output_file | |
def cleanup_old_audio(): | |
"""Remove old audio files before starting a new inference.""" | |
files_to_remove = glob.glob("out_en-*.wav") + glob.glob("final_output.wav") | |
if files_to_remove: | |
print(f"Cleaning up {len(files_to_remove)} old audio files...") | |
for file in files_to_remove: | |
try: | |
os.remove(file) | |
print(f"Deleted: {file}") | |
except Exception as e: | |
print(f"Error deleting {file}: {e}") | |
else: | |
print("No old audio files found.") | |
def find_audio_chunks(): | |
"""Finds all out_en-*.wav files, sorts them, and returns the file paths.""" | |
wav_files = glob.glob("out_en-*.wav") | |
# Extract numbers and sort properly | |
wav_files.sort(key=lambda x: int(x.split('-')[-1].split('.')[0])) | |
print(f"Found {len(wav_files)} audio chunks: {wav_files}") | |
return wav_files # Returning the list of file paths | |
def concatenate_audio(output_filename="final_output.wav"): | |
"""Concatenates all audio chunks and saves them to a final output file in a temporary directory.""" | |
wav_files = find_audio_chunks() # Get sorted audio file paths | |
if not wav_files: | |
print("No audio files found.") | |
return [] | |
# Create a temporary directory | |
temp_dir = tempfile.mkdtemp() | |
# Load and concatenate all audio files | |
#combined = AudioSegment.empty() | |
temp_wav_files = [] | |
for file in wav_files: | |
#audio = AudioSegment.from_wav(file) | |
#combined += audio | |
# Move individual files to the temp directory | |
temp_file_path = os.path.join(temp_dir, os.path.basename(file)) | |
shutil.move(file, temp_file_path) | |
temp_wav_files.append(temp_file_path) | |
# Define the final output path in the temporary directory | |
#temp_output_path = os.path.join(temp_dir, output_filename) | |
# Export the final combined audio | |
#combined.export(temp_output_path, format="wav") | |
#print(f"Concatenated audio saved at {temp_output_path}") | |
return temp_wav_files[0], temp_wav_files # Returning temp paths | |
def infer(audio_input_path): | |
cleanup_old_audio() | |
audio_input_path = process_audio(audio_input_path) | |
print(f"Processed file saved as: {audio_input_path}") | |
import subprocess | |
command = [ | |
"python", "-m", "moshi.run_inference", | |
f"{audio_input_path}", "out_en.wav", | |
"--hf-repo", "kyutai/hibiki-1b-pytorch-bf16" | |
] | |
result = subprocess.run(command, capture_output=True, text=True) | |
# Print the standard output and error | |
print("STDOUT:", result.stdout) | |
print("STDERR:", result.stderr) | |
# Check if the command was successful | |
if result.returncode == 0: | |
print("Command executed successfully.") | |
first_out, file_list = concatenate_audio() | |
return first_out, gr.update(choices=file_list, value=file_list[0], visible=True), gr.update(visible=True), gr.update(value=file_list, visible=True), gr.update(visible=True), gr.update(visible=True), gr.update(visible=True) | |
else: | |
print("Error executing command.") | |
raise gr.Error("Error executing command") | |
def load_chosen_audio(audio_path): | |
return audio_path | |
def overlay_audio( | |
original_mp3: str, | |
translated_wav: str, | |
volume_reduction_db: int = 10, | |
cut_start: float = 0.0 | |
) -> str: | |
""" | |
Overlays translated audio on top of the original, reduces the original volume, | |
and ensures the final audio lasts as long as the longer of the two tracks. | |
:param original_mp3: Path to the original MP3 file. | |
:param translated_wav: Path to the translated WAV file. | |
:param volume_reduction_db: Volume reduction in dB (default is -10 dB). | |
:param cut_start: Number of seconds to trim from the start of the translated audio (default: 0.0). | |
:return: Path to the temporary output WAV file. | |
""" | |
# Load original MP3 and convert to WAV | |
original = AudioSegment.from_mp3(original_mp3).set_frame_rate(16000).set_channels(1) | |
# Lower the volume | |
original = original - volume_reduction_db | |
# Load the translated WAV | |
translated = AudioSegment.from_wav(translated_wav).set_frame_rate(16000).set_channels(1) | |
# Trim the start of the translated audio if needed | |
if cut_start > 0: | |
cut_ms = int(cut_start * 1000) # Convert seconds to milliseconds | |
translated = translated[cut_ms:] | |
# Determine the final length (longer of the two) | |
final_length = max(len(original), len(translated)) | |
# Extend the shorter track with silence to match the longer track | |
if len(original) < final_length: | |
original += AudioSegment.silent(duration=final_length - len(original)) | |
if len(translated) < final_length: | |
translated += AudioSegment.silent(duration=final_length - len(translated)) | |
# Overlay the translated speech over the original | |
combined = original.overlay(translated) | |
# Create a temporary file to save the output | |
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=".wav") | |
combined.export(temp_file.name, format="wav") | |
print(f"Final audio saved at: {temp_file.name}") | |
return temp_file.name | |
def process_final_combination(audio_in, chosen_translated, volume, cut_start, video_input): | |
audio_in = process_audio(audio_in) | |
temp_output_path = overlay_audio(audio_in, chosen_translated, volume, cut_start) | |
if video_input: | |
return gr.update(value=temp_output_path, visible=True), gr.update(visible=True) | |
else: | |
return gr.update(value=temp_output_path, visible=True), gr.update(visible=False) | |
def replace_video_audio(video_path: str, new_audio_path: str) -> str: | |
"""Replaces the original audio in the video and extends it if the new audio is longer. | |
Returns the path to a temporary video file. | |
""" | |
# Debugging: Ensure video_path is a string | |
print(f"DEBUG: video_path = {video_path}, type = {type(video_path)}") | |
if not isinstance(video_path, str): | |
raise ValueError(f"video_path must be a string, got {type(video_path)}") | |
# Load video | |
video = VideoFileClip(video_path) | |
# Load new audio | |
new_audio = AudioFileClip(new_audio_path) | |
# Extend video if new audio is longer | |
if new_audio.duration > video.duration: | |
last_frame = video.get_frame(video.duration - 0.1) # Extract last frame | |
freeze_frame = ImageClip(last_frame).set_duration(new_audio.duration - video.duration) | |
freeze_frame = freeze_frame.set_fps(video.fps) # Maintain video frame rate | |
video = concatenate_videoclips([video, freeze_frame]) | |
# Set new audio | |
video = video.set_audio(new_audio) | |
# Create a temp file | |
temp_video = tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") | |
# Save video (explicit codec settings for MoviePy <2.0) | |
video.write_videofile( | |
temp_video.name, | |
codec="libx264", | |
audio_codec="aac", | |
fps=video.fps, # Ensure FPS is set correctly | |
preset="medium" # Optional: Can be "slow", "medium", or "fast" | |
) | |
return gr.update(value=temp_video.name, visible=True) # Return path to temp video file | |
def clean_previous_video_input(): | |
return gr.update(value=None) | |
def show_upcoming_component(): | |
return gr.update(visible=True) | |
def hide_previous(): | |
return gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False) | |
css=""" | |
div#col-container{ | |
margin: 0 auto; | |
max-width: 1200px; | |
} | |
""" | |
with gr.Blocks(css=css) as demo: | |
with gr.Column(elem_id="col-container"): | |
gr.Markdown("# Hibiki ") | |
gr.Markdown("This is a simple demo for Kyutai's Hibiki translation models • Currently supports French to English only.") | |
gr.HTML(""" | |
<div style="display:flex;column-gap:4px;"> | |
<a href="https://huggingface.co/spaces/fffiloni/Hibiki-simple?duplicate=true"> | |
<img src="https://huggingface.co/datasets/huggingface/badges/resolve/main/duplicate-this-space-sm.svg" alt="Duplicate this Space"> | |
</a> | |
</div> | |
""") | |
with gr.Row(): | |
with gr.Column(scale=2): | |
video_input = gr.Video(label="Video IN (Optional)") | |
audio_input = gr.Audio(label="Audio IN", type="filepath") | |
submit_btn = gr.Button("Generate translations") | |
gr.Examples( | |
examples = [ | |
"./examples/sample_fr_hibiki_intro.mp3", | |
"./examples/sample_fr_hibiki_crepes.mp3", | |
"./examples/sample_fr_hibiki_monologue_otis.mp3" | |
], | |
inputs = [audio_input] | |
) | |
with gr.Column(scale=3): | |
output_result = gr.Audio(label="Translated result") | |
with gr.Row(): | |
dropdown_wav_selector = gr.Dropdown( | |
label="Pick a generated translated audio to load", | |
value = None, | |
visible=False, | |
scale=2 | |
) | |
choose_this_btn = gr.Button("Apply and check this one as translated audio overlay", scale=1, visible=False) | |
with gr.Row(): | |
volume_reduction = gr.Slider(label="Original audio Volume reduction", minimum=0, maximum=60, step=1, value=30, visible=False) | |
cut_start = gr.Slider(label="Reduce translator delay (seconds)", minimum=0.0, maximum=4.0, step=0.1, value=2.0, visible=False) | |
combined_output = gr.Audio(label="Combinated Audio", type="filepath", visible=False, show_download_button=True) | |
apply_to_video_btn = gr.Button("Apply this combination to your video", visible=False) | |
final_video_out = gr.Video(label="Video + Translated Audio", visible=False) | |
with gr.Accordion("Downloadable audio Output list", open=False, visible=False) as result_accordion: | |
wav_list = gr.Files(label="Output Audio List", visible=False) | |
audio_input.upload( | |
fn = clean_previous_video_input, | |
inputs = None, | |
outputs = [video_input] | |
) | |
video_input.upload( | |
fn = extract_audio_as_mp3, | |
inputs = [video_input], | |
outputs = [audio_input] | |
) | |
dropdown_wav_selector.select( | |
fn = load_chosen_audio, | |
inputs = [dropdown_wav_selector], | |
outputs = [output_result], | |
queue = False | |
) | |
choose_this_btn.click( | |
fn = show_upcoming_component, | |
inputs=None, | |
outputs=[combined_output] | |
).then( | |
fn = process_final_combination, | |
inputs = [audio_input, dropdown_wav_selector, volume_reduction, cut_start, video_input], | |
outputs = [combined_output, apply_to_video_btn] | |
) | |
apply_to_video_btn.click( | |
fn = show_upcoming_component, | |
inputs=None, | |
outputs=[final_video_out] | |
).then( | |
fn = replace_video_audio, | |
inputs = [video_input, combined_output], | |
outputs = [final_video_out] | |
) | |
submit_btn.click( | |
fn = hide_previous, | |
inputs = None, | |
outputs = [dropdown_wav_selector, result_accordion, wav_list, choose_this_btn, combined_output, apply_to_video_btn, final_video_out, volume_reduction, cut_start] | |
).then( | |
fn = infer, | |
inputs = [audio_input], | |
outputs = [output_result, dropdown_wav_selector, result_accordion, wav_list, choose_this_btn, volume_reduction, cut_start] | |
) | |
demo.queue().launch(show_api=False, show_error=True, ssr_mode=False) |