Spaces:
Running
on
Zero
Running
on
Zero
| import gradio as gr | |
| import glob | |
| import os | |
| import shutil | |
| import tempfile | |
| from pydub import AudioSegment | |
| def process_audio(input_file): | |
| # Load the audio file | |
| audio = AudioSegment.from_file(input_file) | |
| # Ensure it's in MP3 format | |
| output_file = os.path.splitext(input_file)[0] + ".mp3" | |
| # Limit duration to 1 minute (60,000 ms) | |
| if len(audio) > 60000: | |
| audio = audio[:60000] # Trim to 60 seconds | |
| # Export as MP3 | |
| audio.export(output_file, format="mp3") | |
| return output_file | |
| def cleanup_old_audio(): | |
| """Remove old audio files before starting a new inference.""" | |
| files_to_remove = glob.glob("out_en-*.wav") + glob.glob("final_output.wav") | |
| if files_to_remove: | |
| print(f"Cleaning up {len(files_to_remove)} old audio files...") | |
| for file in files_to_remove: | |
| try: | |
| os.remove(file) | |
| print(f"Deleted: {file}") | |
| except Exception as e: | |
| print(f"Error deleting {file}: {e}") | |
| else: | |
| print("No old audio files found.") | |
| def find_audio_chunks(): | |
| """Finds all out_en-*.wav files, sorts them, and returns the file paths.""" | |
| wav_files = glob.glob("out_en-*.wav") | |
| # Extract numbers and sort properly | |
| wav_files.sort(key=lambda x: int(x.split('-')[-1].split('.')[0])) | |
| print(f"Found {len(wav_files)} audio chunks: {wav_files}") | |
| return wav_files # Returning the list of file paths | |
| def concatenate_audio(output_filename="final_output.wav"): | |
| """Concatenates all audio chunks and saves them to a final output file in a temporary directory.""" | |
| wav_files = find_audio_chunks() # Get sorted audio file paths | |
| if not wav_files: | |
| print("No audio files found.") | |
| return [] | |
| # Create a temporary directory | |
| temp_dir = tempfile.mkdtemp() | |
| # Load and concatenate all audio files | |
| combined = AudioSegment.empty() | |
| temp_wav_files = [] | |
| for file in wav_files: | |
| audio = AudioSegment.from_wav(file) | |
| combined += audio | |
| # Move individual files to the temp directory | |
| temp_file_path = os.path.join(temp_dir, os.path.basename(file)) | |
| shutil.move(file, temp_file_path) | |
| temp_wav_files.append(temp_file_path) | |
| # Define the final output path in the temporary directory | |
| temp_output_path = os.path.join(temp_dir, output_filename) | |
| # Export the final combined audio | |
| combined.export(temp_output_path, format="wav") | |
| print(f"Concatenated audio saved at {temp_output_path}") | |
| return temp_output_path, temp_wav_files # Returning temp paths | |
| def infer(audio_input_path): | |
| cleanup_old_audio() | |
| audio_input_path = process_audio(audio_input_path) | |
| print(f"Processed file saved as: {audio_input_path}") | |
| import subprocess | |
| command = [ | |
| "python", "-m", "moshi.run_inference", | |
| f"{audio_input_path}", "out_en.wav", | |
| "--hf-repo", "kyutai/hibiki-1b-pytorch-bf16" | |
| ] | |
| result = subprocess.run(command, capture_output=True, text=True) | |
| # Print the standard output and error | |
| print("STDOUT:", result.stdout) | |
| print("STDERR:", result.stderr) | |
| # Check if the command was successful | |
| if result.returncode == 0: | |
| print("Command executed successfully.") | |
| concat_out, file_list = concatenate_audio() | |
| return concat_out, gr.update(choices=file_list, value=file_list[0], visible=True), gr.update(visible=True), gr.update(value=file_list, visible=True), gr.update(visible=True) | |
| else: | |
| print("Error executing command.") | |
| raise gr.Error("Error executing command") | |
| def load_chosen_audio(audio_path): | |
| return audio_path | |
| def overlay_audio(original_mp3: str, translated_wav: str, volume_reduction_db: int = 10) -> str: | |
| """ | |
| Overlays translated audio on top of the original, reduces the original volume, | |
| and ensures the final audio lasts as long as the longer of the two tracks. | |
| :param original_mp3: Path to the original MP3 file. | |
| :param translated_wav: Path to the translated WAV file. | |
| :param volume_reduction_db: Volume reduction in dB (default is -10 dB). | |
| :return: Path to the temporary output WAV file. | |
| """ | |
| # Load original MP3 and convert to WAV | |
| original = AudioSegment.from_mp3(original_mp3).set_frame_rate(16000).set_channels(1) | |
| # Lower the volume (default is -10 dB) | |
| original = original - volume_reduction_db | |
| # Load the translated WAV | |
| translated = AudioSegment.from_wav(translated_wav).set_frame_rate(16000).set_channels(1) | |
| # Determine the final length (longer of the two) | |
| final_length = max(len(original), len(translated)) | |
| # Extend the shorter track with silence to match the longer track | |
| if len(original) < final_length: | |
| original = original + AudioSegment.silent(duration=final_length - len(original)) | |
| if len(translated) < final_length: | |
| translated = translated + AudioSegment.silent(duration=final_length - len(translated)) | |
| # Overlay the translated speech over the original | |
| combined = original.overlay(translated) | |
| # Create a temporary file to save the output | |
| temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=".wav") | |
| combined.export(temp_file.name, format="wav") | |
| print(f"Final audio saved at: {temp_file.name}") | |
| return temp_file.name # Return the temporary file path | |
| def process_final_combination(audio_in, chosen_translated): | |
| audio_in = process_audio(audio_in) | |
| temp_output_path = overlay_audio(audio_in, chosen_translated) | |
| return gr.update(value=temp_output_path, visible=True) | |
| def hide_previous(): | |
| return gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False) | |
| css=""" | |
| div#col-container{ | |
| margin: 0 auto; | |
| max-width: 720px; | |
| } | |
| """ | |
| with gr.Blocks(css=css) as demo: | |
| with gr.Column(elem_id="col-container"): | |
| gr.Markdown("# Hibiki ") | |
| gr.Markdown("This is a simple demo for Kyutai's Hibiki translation models • Currently supports French to English only.") | |
| audio_input = gr.Audio(label="Audio IN", type="filepath") | |
| submit_btn = gr.Button("Submit") | |
| output_result = gr.Audio(label="Translated result") | |
| with gr.Row(): | |
| dropdown_wav_selector = gr.Dropdown( | |
| label="Pick a generated audio to load", | |
| value = None, | |
| visible=False, | |
| scale=2 | |
| ) | |
| choose_this_btn = gr.Button("Use this one", scale=1, visible=False) | |
| combined_output = gr.Audio("Combined Outpu", visible=False) | |
| with gr.Accordion("Downloadable audio Output list", open=False, visible=False) as result_accordion: | |
| wav_list = gr.Files(label="Output Audio List", visible=False) | |
| gr.Examples( | |
| examples = [ | |
| "./examples/sample_fr_hibiki_intro.mp3", | |
| "./examples/sample_fr_hibiki_crepes.mp3", | |
| "./examples/sample_fr_hibiki_monologue_otis.mp3" | |
| ], | |
| inputs = [audio_input] | |
| ) | |
| dropdown_wav_selector.select( | |
| fn = load_chosen_audio, | |
| inputs = [dropdown_wav_selector], | |
| outputs = [output_result], | |
| queue = False | |
| ) | |
| choose_this_btn.click( | |
| fn = process_final_combination, | |
| inputs = [audio_input, dropdown_wav_selector], | |
| outputs = [combined_output] | |
| ) | |
| submit_btn.click( | |
| fn = hide_previous, | |
| inputs = None, | |
| outputs = [dropdown_wav_selector, result_accordion, wav_list, choose_this_btn, combined_output] | |
| ).then( | |
| fn = infer, | |
| inputs = [audio_input], | |
| outputs = [output_result, dropdown_wav_selector, result_accordion, wav_list, choose_this_btn] | |
| ) | |
| demo.queue().launch(show_api=False, show_error=True) |