Spaces:
Running
on
Zero
Running
on
Zero
| import gradio as gr | |
| import spaces | |
| import glob | |
| import os | |
| import shutil | |
| import tempfile | |
| from pydub import AudioSegment | |
| from moviepy.editor import VideoFileClip, AudioFileClip, concatenate_videoclips, ImageClip | |
| is_shared_ui = True if "fffiloni/Hibiki-simple" in os.environ['SPACE_ID'] else False | |
| def extract_audio_as_mp3(video_path: str) -> str: | |
| """ | |
| Extracts the audio from a video file and saves it as a temporary MP3 file. | |
| :param video_path: Path to the input video file. | |
| :return: Path to the temporary MP3 file. | |
| """ | |
| # Load the video | |
| video = VideoFileClip(video_path) | |
| # Create a temporary file for the extracted audio | |
| temp_audio = tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") | |
| # Extract and export the audio as MP3 | |
| video.audio.write_audiofile(temp_audio.name, codec="mp3") | |
| return temp_audio.name # Return the temp file path | |
| def process_audio(input_file): | |
| """ | |
| Processes the input audio file: | |
| - Converts it to MP3 format | |
| - Trims the audio to 1 minute if in shared UI mode | |
| Args: | |
| input_file (str): Path to the audio file (WAV/MP3/etc.) | |
| Returns: | |
| str: Path to the converted MP3 file | |
| """ | |
| # Load the audio file | |
| audio = AudioSegment.from_file(input_file) | |
| # Ensure it's in MP3 format | |
| output_file = os.path.splitext(input_file)[0] + ".mp3" | |
| if is_shared_ui: | |
| # Limit duration to 1 minute (60,000 ms) | |
| if len(audio) > 60000: | |
| audio = audio[:60000] # Trim to 60 seconds | |
| # Export as MP3 | |
| audio.export(output_file, format="mp3") | |
| return output_file | |
| def cleanup_old_audio(): | |
| """Remove old audio files before starting a new inference.""" | |
| files_to_remove = glob.glob("out_en-*.wav") + glob.glob("final_output.wav") | |
| if files_to_remove: | |
| print(f"Cleaning up {len(files_to_remove)} old audio files...") | |
| for file in files_to_remove: | |
| try: | |
| os.remove(file) | |
| print(f"Deleted: {file}") | |
| except Exception as e: | |
| print(f"Error deleting {file}: {e}") | |
| else: | |
| print("No old audio files found.") | |
| def find_audio_chunks(): | |
| """Finds all out_en-*.wav files, sorts them, and returns the file paths.""" | |
| wav_files = glob.glob("out_en-*.wav") | |
| # Extract numbers and sort properly | |
| wav_files.sort(key=lambda x: int(x.split('-')[-1].split('.')[0])) | |
| print(f"Found {len(wav_files)} audio chunks: {wav_files}") | |
| return wav_files # Returning the list of file paths | |
| def concatenate_audio(output_filename="final_output.wav"): | |
| """ | |
| Concatenates audio chunks created by the translation model. | |
| Finds all chunk files matching the `out_en-*.wav` pattern, | |
| sorts them numerically, moves them to a temp folder, | |
| and returns the path to the first file and the list of all files. | |
| Returns: | |
| Tuple[str, List[str]]: Path to first chunk and list of all chunks | |
| """ | |
| wav_files = find_audio_chunks() # Get sorted audio file paths | |
| if not wav_files: | |
| print("No audio files found.") | |
| return [] | |
| # Create a temporary directory | |
| temp_dir = tempfile.mkdtemp() | |
| # Load and concatenate all audio files | |
| #combined = AudioSegment.empty() | |
| temp_wav_files = [] | |
| for file in wav_files: | |
| #audio = AudioSegment.from_wav(file) | |
| #combined += audio | |
| # Move individual files to the temp directory | |
| temp_file_path = os.path.join(temp_dir, os.path.basename(file)) | |
| shutil.move(file, temp_file_path) | |
| temp_wav_files.append(temp_file_path) | |
| # Define the final output path in the temporary directory | |
| #temp_output_path = os.path.join(temp_dir, output_filename) | |
| # Export the final combined audio | |
| #combined.export(temp_output_path, format="wav") | |
| #print(f"Concatenated audio saved at {temp_output_path}") | |
| return temp_wav_files[0], temp_wav_files # Returning temp paths | |
| def infer(audio_input_path): | |
| """ | |
| Perform translation inference on an audio file using the Hibiki model. | |
| This function orchestrates the full processing and translation pipeline: | |
| - Cleans up old audio files from previous runs | |
| - Processes the input audio (trims it and ensures correct format) | |
| - Executes the Hibiki translation model via subprocess | |
| - Concatenates translated audio chunks | |
| - Returns the translated audio and UI updates | |
| Args: | |
| audio_input_path (str): Path to the input audio file (e.g., MP3 or WAV) | |
| Returns: | |
| A tuple with: | |
| - Path to the first translated audio chunk (WAV file) | |
| - Updated Gradio Dropdown component with all chunk file choices | |
| - Visibility settings for additional audio controls and result components | |
| """ | |
| cleanup_old_audio() | |
| audio_input_path = process_audio(audio_input_path) | |
| print(f"Processed file saved as: {audio_input_path}") | |
| import subprocess | |
| command = [ | |
| "python", "-m", "moshi.run_inference", | |
| f"{audio_input_path}", "out_en.wav", | |
| "--hf-repo", "kyutai/hibiki-1b-pytorch-bf16" | |
| ] | |
| result = subprocess.run(command, capture_output=True, text=True) | |
| # Print the standard output and error | |
| print("STDOUT:", result.stdout) | |
| print("STDERR:", result.stderr) | |
| # Check if the command was successful | |
| if result.returncode == 0: | |
| print("Command executed successfully.") | |
| first_out, file_list = concatenate_audio() | |
| return first_out, gr.update(choices=file_list, value=file_list[0], visible=True), gr.update(visible=True), gr.update(value=file_list, visible=True), gr.update(visible=True), gr.update(visible=True), gr.update(visible=True) | |
| else: | |
| print("Error executing command.") | |
| raise gr.Error("Error executing command") | |
| def load_chosen_audio(audio_path): | |
| return audio_path | |
| def overlay_audio( | |
| original_mp3: str, | |
| translated_wav: str, | |
| volume_reduction_db: int = 10, | |
| cut_start: float = 0.0 | |
| ) -> str: | |
| """ | |
| Overlays translated audio on top of the original, reduces the original volume, | |
| and ensures the final audio lasts as long as the longer of the two tracks. | |
| :param original_mp3: Path to the original MP3 file. | |
| :param translated_wav: Path to the translated WAV file. | |
| :param volume_reduction_db: Volume reduction in dB (default is -10 dB). | |
| :param cut_start: Number of seconds to trim from the start of the translated audio (default: 0.0). | |
| :return: Path to the temporary output WAV file. | |
| """ | |
| # Load original MP3 and convert to WAV | |
| original = AudioSegment.from_mp3(original_mp3).set_frame_rate(16000).set_channels(1) | |
| # Lower the volume | |
| original = original - volume_reduction_db | |
| # Load the translated WAV | |
| translated = AudioSegment.from_wav(translated_wav).set_frame_rate(16000).set_channels(1) | |
| # Trim the start of the translated audio if needed | |
| if cut_start > 0: | |
| cut_ms = int(cut_start * 1000) # Convert seconds to milliseconds | |
| translated = translated[cut_ms:] | |
| # Determine the final length (longer of the two) | |
| final_length = max(len(original), len(translated)) | |
| # Extend the shorter track with silence to match the longer track | |
| if len(original) < final_length: | |
| original += AudioSegment.silent(duration=final_length - len(original)) | |
| if len(translated) < final_length: | |
| translated += AudioSegment.silent(duration=final_length - len(translated)) | |
| # Overlay the translated speech over the original | |
| combined = original.overlay(translated) | |
| # Create a temporary file to save the output | |
| temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=".wav") | |
| combined.export(temp_file.name, format="wav") | |
| print(f"Final audio saved at: {temp_file.name}") | |
| return temp_file.name | |
| def process_final_combination(audio_in, chosen_translated, volume, cut_start, video_input): | |
| audio_in = process_audio(audio_in) | |
| temp_output_path = overlay_audio(audio_in, chosen_translated, volume, cut_start) | |
| if video_input: | |
| return gr.update(value=temp_output_path, visible=True), gr.update(visible=True) | |
| else: | |
| return gr.update(value=temp_output_path, visible=True), gr.update(visible=False) | |
| import tempfile | |
| import gradio as gr | |
| from moviepy.editor import VideoFileClip, AudioFileClip, ImageClip, concatenate_videoclips | |
| def replace_video_audio(video_path: str, new_audio_path: str) -> str: | |
| """ | |
| Replaces a video's audio with new translated audio. | |
| If the new audio is longer than the video, extends the video | |
| by freezing the last frame. | |
| Args: | |
| video_path (str): Path to the video file | |
| new_audio_path (str): Path to the new audio file | |
| Returns: | |
| gr.update: Gradio update pointing to the new video file path | |
| """ | |
| # Debugging: Ensure video_path is a string | |
| print(f"DEBUG: video_path = {video_path}, type = {type(video_path)}") | |
| if not isinstance(video_path, str): | |
| raise ValueError(f"video_path must be a string, got {type(video_path)}") | |
| # Load video and audio | |
| video = VideoFileClip(video_path) | |
| new_audio = AudioFileClip(new_audio_path) | |
| # Extend video if new audio is longer | |
| if new_audio.duration > video.duration: | |
| # Safely extract last frame | |
| print("Extending video to match longer audio...") | |
| last_frame = None | |
| for frame in video.iter_frames(): # Iterates through all frames | |
| last_frame = frame | |
| if last_frame is None: | |
| raise RuntimeError("Failed to extract last frame from video.") | |
| freeze_duration = new_audio.duration - video.duration | |
| freeze_frame = ImageClip(last_frame).set_duration(freeze_duration).set_fps(video.fps) | |
| video = concatenate_videoclips([video, freeze_frame]) | |
| # Set the new audio track | |
| video = video.set_audio(new_audio) | |
| # Create a temp file for output | |
| temp_video = tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") | |
| # Save the final video | |
| video.write_videofile( | |
| temp_video.name, | |
| codec="libx264", | |
| audio_codec="aac", | |
| fps=video.fps, | |
| preset="medium" | |
| ) | |
| # Clean up resources | |
| video.close() | |
| new_audio.close() | |
| # Return path to new video | |
| return gr.update(value=temp_video.name, visible=True) | |
| def clean_previous_video_input(): | |
| return gr.update(value=None) | |
| def show_upcoming_component(): | |
| return gr.update(visible=True) | |
| def hide_previous(): | |
| return gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False) | |
| css=""" | |
| div#col-container{ | |
| margin: 0 auto; | |
| max-width: 1200px; | |
| } | |
| """ | |
| with gr.Blocks(css=css) as demo: | |
| with gr.Column(elem_id="col-container"): | |
| gr.Markdown("# Hibiki ") | |
| gr.Markdown("This is a simple demo for Kyutai's Hibiki translation models • Currently supports French to English only.") | |
| gr.HTML(""" | |
| <div style="display:flex;column-gap:4px;"> | |
| <a href="https://huggingface.co/spaces/fffiloni/Hibiki-simple?duplicate=true"> | |
| <img src="https://huggingface.co/datasets/huggingface/badges/resolve/main/duplicate-this-space-sm.svg" alt="Duplicate this Space"> | |
| </a> | |
| </div> | |
| """) | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| video_input = gr.Video(label="Video IN (Optional)") | |
| audio_input = gr.Audio(label="Audio IN", type="filepath") | |
| submit_btn = gr.Button("Generate translations") | |
| gr.Examples( | |
| examples = [ | |
| "./examples/sample_fr_hibiki_intro.mp3", | |
| "./examples/sample_fr_hibiki_crepes.mp3", | |
| "./examples/sample_fr_hibiki_monologue_otis.mp3" | |
| ], | |
| inputs = [audio_input] | |
| ) | |
| with gr.Column(scale=3): | |
| output_result = gr.Audio(label="Translated result") | |
| with gr.Row(): | |
| dropdown_wav_selector = gr.Dropdown( | |
| label="Pick a generated translated audio to load", | |
| value = None, | |
| visible=False, | |
| scale=2 | |
| ) | |
| choose_this_btn = gr.Button("Apply and check this one as translated audio overlay", scale=1, visible=False) | |
| with gr.Row(): | |
| volume_reduction = gr.Slider(label="Original audio Volume reduction", minimum=0, maximum=60, step=1, value=30, visible=False) | |
| cut_start = gr.Slider(label="Reduce translator delay (seconds)", minimum=0.0, maximum=4.0, step=0.1, value=2.0, visible=False) | |
| combined_output = gr.Audio(label="Combinated Audio", type="filepath", visible=False, show_download_button=True) | |
| apply_to_video_btn = gr.Button("Apply this combination to your video", visible=False) | |
| final_video_out = gr.Video(label="Video + Translated Audio", visible=False) | |
| with gr.Accordion("Downloadable audio Output list", open=False, visible=False) as result_accordion: | |
| wav_list = gr.Files(label="Output Audio List", visible=False) | |
| audio_input.upload( | |
| fn = clean_previous_video_input, | |
| inputs = None, | |
| outputs = [video_input], | |
| queue=False, | |
| show_api=False | |
| ) | |
| video_input.upload( | |
| fn = extract_audio_as_mp3, | |
| inputs = [video_input], | |
| outputs = [audio_input], | |
| queue=False, | |
| show_api=False | |
| ) | |
| dropdown_wav_selector.select( | |
| fn = load_chosen_audio, | |
| inputs = [dropdown_wav_selector], | |
| outputs = [output_result], | |
| queue = False, | |
| show_api=False | |
| ) | |
| choose_this_btn.click( | |
| fn = show_upcoming_component, | |
| inputs=None, | |
| outputs=[combined_output], | |
| queue=False, | |
| show_api=False | |
| ).then( | |
| fn = process_final_combination, | |
| inputs = [audio_input, dropdown_wav_selector, volume_reduction, cut_start, video_input], | |
| outputs = [combined_output, apply_to_video_btn], | |
| show_api=False | |
| ) | |
| apply_to_video_btn.click( | |
| fn = show_upcoming_component, | |
| inputs=None, | |
| outputs=[final_video_out], | |
| queue=False, | |
| show_api=False | |
| ).then( | |
| fn = replace_video_audio, | |
| inputs = [video_input, combined_output], | |
| outputs = [final_video_out], | |
| show_api=False | |
| ) | |
| submit_btn.click( | |
| fn = hide_previous, | |
| inputs = None, | |
| outputs = [dropdown_wav_selector, result_accordion, wav_list, choose_this_btn, combined_output, apply_to_video_btn, final_video_out, volume_reduction, cut_start], | |
| show_api=False | |
| ).then( | |
| fn = infer, | |
| inputs = [audio_input], | |
| outputs = [output_result, dropdown_wav_selector, result_accordion, wav_list, choose_this_btn, volume_reduction, cut_start], | |
| show_api=True | |
| ) | |
| demo.queue().launch(show_api=True, show_error=True, ssr_mode=False, mcp_server=True) |