import gradio as gr import glob import os import shutil import tempfile from pydub import AudioSegment from moviepy.editor import VideoFileClip, AudioFileClip, concatenate_videoclips, ImageClip is_shared_ui = True if "fffiloni/Hibiki-simple" in os.environ['SPACE_ID'] else False def extract_audio_as_mp3(video_path: str) -> str: """ Extracts the audio from a video file and saves it as a temporary MP3 file. :param video_path: Path to the input video file. :return: Path to the temporary MP3 file. """ # Load the video video = VideoFileClip(video_path) # Create a temporary file for the extracted audio temp_audio = tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") # Extract and export the audio as MP3 video.audio.write_audiofile(temp_audio.name, codec="mp3") return temp_audio.name # Return the temp file path def process_audio(input_file): # Load the audio file audio = AudioSegment.from_file(input_file) # Ensure it's in MP3 format output_file = os.path.splitext(input_file)[0] + ".mp3" if is_shared_ui: # Limit duration to 1 minute (60,000 ms) if len(audio) > 60000: audio = audio[:60000] # Trim to 60 seconds # Export as MP3 audio.export(output_file, format="mp3") return output_file def cleanup_old_audio(): """Remove old audio files before starting a new inference.""" files_to_remove = glob.glob("out_en-*.wav") + glob.glob("final_output.wav") if files_to_remove: print(f"Cleaning up {len(files_to_remove)} old audio files...") for file in files_to_remove: try: os.remove(file) print(f"Deleted: {file}") except Exception as e: print(f"Error deleting {file}: {e}") else: print("No old audio files found.") def find_audio_chunks(): """Finds all out_en-*.wav files, sorts them, and returns the file paths.""" wav_files = glob.glob("out_en-*.wav") # Extract numbers and sort properly wav_files.sort(key=lambda x: int(x.split('-')[-1].split('.')[0])) print(f"Found {len(wav_files)} audio chunks: {wav_files}") return wav_files # Returning the list of file paths def concatenate_audio(output_filename="final_output.wav"): """Concatenates all audio chunks and saves them to a final output file in a temporary directory.""" wav_files = find_audio_chunks() # Get sorted audio file paths if not wav_files: print("No audio files found.") return [] # Create a temporary directory temp_dir = tempfile.mkdtemp() # Load and concatenate all audio files #combined = AudioSegment.empty() temp_wav_files = [] for file in wav_files: #audio = AudioSegment.from_wav(file) #combined += audio # Move individual files to the temp directory temp_file_path = os.path.join(temp_dir, os.path.basename(file)) shutil.move(file, temp_file_path) temp_wav_files.append(temp_file_path) # Define the final output path in the temporary directory #temp_output_path = os.path.join(temp_dir, output_filename) # Export the final combined audio #combined.export(temp_output_path, format="wav") #print(f"Concatenated audio saved at {temp_output_path}") return temp_wav_files[0], temp_wav_files # Returning temp paths def infer(audio_input_path): cleanup_old_audio() audio_input_path = process_audio(audio_input_path) print(f"Processed file saved as: {audio_input_path}") import subprocess command = [ "python", "-m", "moshi.run_inference", f"{audio_input_path}", "out_en.wav", "--hf-repo", "kyutai/hibiki-1b-pytorch-bf16" ] result = subprocess.run(command, capture_output=True, text=True) # Print the standard output and error print("STDOUT:", result.stdout) print("STDERR:", result.stderr) # Check if the command was successful if result.returncode == 0: print("Command executed successfully.") first_out, file_list = concatenate_audio() return first_out, gr.update(choices=file_list, value=file_list[0], visible=True), gr.update(visible=True), gr.update(value=file_list, visible=True), gr.update(visible=True), gr.update(visible=True), gr.update(visible=True) else: print("Error executing command.") raise gr.Error("Error executing command") def load_chosen_audio(audio_path): return audio_path def overlay_audio( original_mp3: str, translated_wav: str, volume_reduction_db: int = 10, cut_start: float = 0.0 ) -> str: """ Overlays translated audio on top of the original, reduces the original volume, and ensures the final audio lasts as long as the longer of the two tracks. :param original_mp3: Path to the original MP3 file. :param translated_wav: Path to the translated WAV file. :param volume_reduction_db: Volume reduction in dB (default is -10 dB). :param cut_start: Number of seconds to trim from the start of the translated audio (default: 0.0). :return: Path to the temporary output WAV file. """ # Load original MP3 and convert to WAV original = AudioSegment.from_mp3(original_mp3).set_frame_rate(16000).set_channels(1) # Lower the volume original = original - volume_reduction_db # Load the translated WAV translated = AudioSegment.from_wav(translated_wav).set_frame_rate(16000).set_channels(1) # Trim the start of the translated audio if needed if cut_start > 0: cut_ms = int(cut_start * 1000) # Convert seconds to milliseconds translated = translated[cut_ms:] # Determine the final length (longer of the two) final_length = max(len(original), len(translated)) # Extend the shorter track with silence to match the longer track if len(original) < final_length: original += AudioSegment.silent(duration=final_length - len(original)) if len(translated) < final_length: translated += AudioSegment.silent(duration=final_length - len(translated)) # Overlay the translated speech over the original combined = original.overlay(translated) # Create a temporary file to save the output temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=".wav") combined.export(temp_file.name, format="wav") print(f"Final audio saved at: {temp_file.name}") return temp_file.name def process_final_combination(audio_in, chosen_translated, volume, cut_start, video_input): audio_in = process_audio(audio_in) temp_output_path = overlay_audio(audio_in, chosen_translated, volume, cut_start) if video_input: return gr.update(value=temp_output_path, visible=True), gr.update(visible=True) else: return gr.update(value=temp_output_path, visible=True), gr.update(visible=False) def replace_video_audio(video_path: str, new_audio_path: str) -> str: """Replaces the original audio in the video and extends it if the new audio is longer. Returns the path to a temporary video file. """ # Debugging: Ensure video_path is a string print(f"DEBUG: video_path = {video_path}, type = {type(video_path)}") if not isinstance(video_path, str): raise ValueError(f"video_path must be a string, got {type(video_path)}") # Load video video = VideoFileClip(video_path) # Load new audio new_audio = AudioFileClip(new_audio_path) # Extend video if new audio is longer if new_audio.duration > video.duration: last_frame = video.get_frame(video.duration - 0.1) # Extract last frame freeze_frame = ImageClip(last_frame).set_duration(new_audio.duration - video.duration) freeze_frame = freeze_frame.set_fps(video.fps) # Maintain video frame rate video = concatenate_videoclips([video, freeze_frame]) # Set new audio video = video.set_audio(new_audio) # Create a temp file temp_video = tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") # Save video (explicit codec settings for MoviePy <2.0) video.write_videofile( temp_video.name, codec="libx264", audio_codec="aac", fps=video.fps, # Ensure FPS is set correctly preset="medium" # Optional: Can be "slow", "medium", or "fast" ) return gr.update(value=temp_video.name, visible=True) # Return path to temp video file def clean_previous_video_input(): return gr.update(value=None) def show_upcoming_component(): return gr.update(visible=True) def hide_previous(): return gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False) css=""" div#col-container{ margin: 0 auto; max-width: 1200px; } """ with gr.Blocks(css=css) as demo: with gr.Column(elem_id="col-container"): gr.Markdown("# Hibiki ") gr.Markdown("This is a simple demo for Kyutai's Hibiki translation models • Currently supports French to English only.") gr.HTML("""
""") with gr.Row(): with gr.Column(scale=2): video_input = gr.Video(label="Video IN (Optional)") audio_input = gr.Audio(label="Audio IN", type="filepath") submit_btn = gr.Button("Generate translations") gr.Examples( examples = [ "./examples/sample_fr_hibiki_intro.mp3", "./examples/sample_fr_hibiki_crepes.mp3", "./examples/sample_fr_hibiki_monologue_otis.mp3" ], inputs = [audio_input] ) with gr.Column(scale=3): output_result = gr.Audio(label="Translated result") with gr.Row(): dropdown_wav_selector = gr.Dropdown( label="Pick a generated translated audio to load", value = None, visible=False, scale=2 ) choose_this_btn = gr.Button("Apply and check this one as translated audio overlay", scale=1, visible=False) with gr.Row(): volume_reduction = gr.Slider(label="Original audio Volume reduction", minimum=0, maximum=60, step=1, value=30, visible=False) cut_start = gr.Slider(label="Reduce translator delay (seconds)", minimum=0.0, maximum=4.0, step=0.1, value=2.0, visible=False) combined_output = gr.Audio(label="Combinated Audio", type="filepath", visible=False, show_download_button=True) apply_to_video_btn = gr.Button("Apply this combination to your video", visible=False) final_video_out = gr.Video(label="Video + Translated Audio", visible=False) with gr.Accordion("Downloadable audio Output list", open=False, visible=False) as result_accordion: wav_list = gr.Files(label="Output Audio List", visible=False) audio_input.upload( fn = clean_previous_video_input, inputs = None, outputs = [video_input] ) video_input.upload( fn = extract_audio_as_mp3, inputs = [video_input], outputs = [audio_input] ) dropdown_wav_selector.select( fn = load_chosen_audio, inputs = [dropdown_wav_selector], outputs = [output_result], queue = False ) choose_this_btn.click( fn = show_upcoming_component, inputs=None, outputs=[combined_output] ).then( fn = process_final_combination, inputs = [audio_input, dropdown_wav_selector, volume_reduction, cut_start, video_input], outputs = [combined_output, apply_to_video_btn] ) apply_to_video_btn.click( fn = show_upcoming_component, inputs=None, outputs=[final_video_out] ).then( fn = replace_video_audio, inputs = [video_input, combined_output], outputs = [final_video_out] ) submit_btn.click( fn = hide_previous, inputs = None, outputs = [dropdown_wav_selector, result_accordion, wav_list, choose_this_btn, combined_output, apply_to_video_btn, final_video_out, volume_reduction, cut_start] ).then( fn = infer, inputs = [audio_input], outputs = [output_result, dropdown_wav_selector, result_accordion, wav_list, choose_this_btn, volume_reduction, cut_start] ) demo.queue().launch(show_api=False, show_error=True, ssr_mode=False)