import numpy as np import re import concurrent.futures import gradio as gr from datetime import datetime import random import moviepy from transformers import pipeline from transformers.pipelines.audio_utils import ffmpeg_read from moviepy.editor import ( VideoFileClip, TextClip, CompositeVideoClip, CompositeAudioClip, AudioFileClip, concatenate_videoclips, concatenate_audioclips ) from moviepy.audio.AudioClip import AudioArrayClip from gtts import gTTS import subprocess import speech_recognition as sr import json from nltk.tokenize import sent_tokenize import logging from textblob import TextBlob import whisper import time import os import openai from openai import OpenAI import traceback from TTS.api import TTS import torch from TTS.tts.configs.xtts_config import XttsConfig # Accept license terms for Coqui XTTS os.environ["COQUI_TOS_AGREED"] = "1" torch.serialization.add_safe_globals([XttsConfig]) # Load XTTS model try: print("🔄 Loading XTTS model...") tts = TTS(model_name="tts_models/multilingual/multi-dataset/xtts_v2") print("✅ XTTS model loaded successfully.") except Exception as e: print("❌ Error loading XTTS model:") traceback.print_exc() raise e client = OpenAI( api_key= os.environ.get("openAI_api_key"), # This is the default and can be omitted ) def silence(duration, fps=44100): """ Returns a silent AudioClip of the specified duration. """ return AudioArrayClip(np.zeros((int(fps*duration), 2)), fps=fps) def count_words_or_characters(text): # Count non-Chinese words non_chinese_words = len(re.findall(r'\b[a-zA-Z0-9]+\b', text)) # Count Chinese characters chinese_chars = len(re.findall(r'[\u4e00-\u9fff]', text)) return non_chinese_words + chinese_chars # Define the passcode PASSCODE = "show_feedback_db" css = """ /* Adjust row height */ .dataframe-container tr { height: 50px !important; } /* Ensure text wrapping and prevent overflow */ .dataframe-container td { white-space: normal !important; word-break: break-word !important; } /* Set column widths */ [data-testid="block-container"] .scrolling-dataframe th:nth-child(1), [data-testid="block-container"] .scrolling-dataframe td:nth-child(1) { width: 6%; /* Start column */ } [data-testid="block-container"] .scrolling-dataframe th:nth-child(2), [data-testid="block-container"] .scrolling-dataframe td:nth-child(2) { width: 47%; /* Original text */ } [data-testid="block-container"] .scrolling-dataframe th:nth-child(3), [data-testid="block-container"] .scrolling-dataframe td:nth-child(3) { width: 47%; /* Translated text */ } [data-testid="block-container"] .scrolling-dataframe th:nth-child(4), [data-testid="block-container"] .scrolling-dataframe td:nth-child(4) { display: none !important; } """ # Function to save feedback or provide access to the database file def handle_feedback(feedback): feedback = feedback.strip() # Clean up leading/trailing whitespace if not feedback: return "Feedback cannot be empty.", None if feedback == PASSCODE: # Provide access to the feedback.db file return "Access granted! Download the database file below.", "feedback.db" else: # Save feedback to the database with sqlite3.connect("feedback.db") as conn: cursor = conn.cursor() cursor.execute("CREATE TABLE IF NOT EXISTS studio_feedback (id INTEGER PRIMARY KEY, comment TEXT)") cursor.execute("INSERT INTO studio_feedback (comment) VALUES (?)", (feedback,)) conn.commit() return "Thank you for your feedback!", None # Configure logging logging.basicConfig(level=logging.DEBUG, format="%(asctime)s - %(levelname)s - %(message)s") logger = logging.getLogger(__name__) logger.info(f"MoviePy Version: {moviepy.__version__}") def transcribe_video_with_speakers(video_path): # Extract audio from video video = VideoFileClip(video_path) audio_path = "audio.wav" video.audio.write_audiofile(audio_path) logger.info(f"Audio extracted from video: {audio_path}") # Set up device device = "cuda" if torch.cuda.is_available() else "cpu" logger.info(f"Using device: {device}") # Load WhisperX model model = whisperx.load_model("large-v2", device) logger.info("WhisperX model loaded") # Transcribe with WhisperX result = model.transcribe(audio_path) logger.info("Audio transcription completed") # Align transcription model_a, metadata = whisperx.load_align_model(language_code=result["language"], device=device) result = whisperx.align(result["segments"], model_a, metadata, audio_path, device) logger.info("Transcription alignment completed") # Perform speaker diarization diarize_model = whisperx.DiarizationPipeline(use_auth_token=hf_api_key, device=device) diarize_segments = diarize_model(audio_path) logger.info("Speaker diarization completed") # Assign speakers to transcribed segments result = whisperx.assign_word_speakers(diarize_segments, result) logger.info("Speakers assigned to transcribed segments") # Extract timestamps, text, and speaker IDs transcript_with_speakers = [ { "start": segment["start"], "end": segment["end"], "text": segment["text"], "speaker": segment["speaker"] } for segment in result["segments"] ] # Collect audio for each speaker speaker_audio = {} for segment in result["segments"]: speaker = segment["speaker"] if speaker not in speaker_audio: speaker_audio[speaker] = [] speaker_audio[speaker].append((segment["start"], segment["end"])) # Collapse and truncate speaker audio speaker_sample_paths = {} audio_clip = AudioFileClip(audio_path) for speaker, segments in speaker_audio.items(): speaker_clips = [audio_clip.subclip(start, end) for start, end in segments] combined_clip = concatenate_audioclips(speaker_clips) truncated_clip = combined_clip.subclip(0, min(30, combined_clip.duration)) sample_path = f"speaker_{speaker}_sample.wav" truncated_clip.write_audiofile(sample_path) speaker_sample_paths[speaker] = sample_path logger.info(f"Created sample for {speaker}: {sample_path}") # Get the detected language detected_language = result["language"] logger.debug(f"Detected language: {detected_language}") # Clean up video.close() audio_clip.close() os.remove(audio_path) return transcript_with_speakers, detected_language # Function to get the appropriate translation model based on target language def get_translation_model(source_language, target_language): """ Get the translation model based on the source and target language. Parameters: - target_language (str): The language to translate the content into (e.g., 'es', 'fr'). - source_language (str): The language of the input content (default is 'en' for English). Returns: - str: The translation model identifier. """ # List of allowable languages allowable_languages = ["en", "es", "fr", "zh", "de", "it", "pt", "ja", "ko", "ru"] # Validate source and target languages if source_language not in allowable_languages: logger.debug(f"Invalid source language '{source_language}'. Supported languages are: {', '.join(allowable_languages)}") # Return a default model if source language is invalid source_language = "en" # Default to 'en' if target_language not in allowable_languages: logger.debug(f"Invalid target language '{target_language}'. Supported languages are: {', '.join(allowable_languages)}") # Return a default model if target language is invalid target_language = "zh" # Default to 'zh' if source_language == target_language: source_language = "en" # Default to 'en' target_language = "zh" # Default to 'zh' # Return the model using string concatenation return f"Helsinki-NLP/opus-mt-{source_language}-{target_language}" def translate_single_entry(entry, translator): original_text = entry["text"] translated_text = translator(original_text)[0]['translation_text'] return { "start": entry["start"], "original": original_text, "translated": translated_text, "end": entry["end"] } def translate_text(transcription_json, source_language, target_language): # Load the translation model for the specified target language translation_model_id = get_translation_model(source_language, target_language) logger.debug(f"Translation model: {translation_model_id}") translator = pipeline("translation", model=translation_model_id) # Use ThreadPoolExecutor to parallelize translations with concurrent.futures.ThreadPoolExecutor() as executor: # Submit all translation tasks and collect results translate_func = lambda entry: translate_single_entry(entry, translator) translated_json = list(executor.map(translate_func, transcription_json)) # Sort the translated_json by start time translated_json.sort(key=lambda x: x["start"]) # Log the components being added to translated_json for entry in translated_json: logger.debug("Added to translated_json: start=%s, original=%s, translated=%s, end=%s", entry["start"], entry["original"], entry["translated"], entry["end"]) return translated_json def update_translations(file, edited_table, mode): """ Update the translations based on user edits in the Gradio Dataframe. """ output_video_path = "output_video.mp4" logger.debug(f"Editable Table: {edited_table}") if file is None: logger.info("No file uploaded. Please upload a video/audio file.") return None, [], None, "No file uploaded. Please upload a video/audio file." try: start_time = time.time() # Start the timer # Convert the edited_table (list of lists) back to list of dictionaries updated_translations = [ { "start": row["start"], # Access by column name "original": row["original"], "translated": row["translated"], "end": row["end"] } for _, row in edited_table.iterrows() ] # Call the function to process the video with updated translations add_transcript_voiceover(file.name, updated_translations, output_video_path, mode=="Transcription with Voiceover") # Calculate elapsed time elapsed_time = time.time() - start_time elapsed_time_display = f"Updates applied successfully in {elapsed_time:.2f} seconds." return output_video_path, elapsed_time_display except Exception as e: raise ValueError(f"Error updating translations: {e}") def process_entry(entry, i, video_width, video_height, add_voiceover, target_language, speaker_sample_paths=None): logger.debug(f"Processing entry {i}: {entry}") # Create text clip for subtitles txt_clip = TextClip( text=entry["translated"], font="./NotoSansSC-Regular.ttf", method='caption', color='yellow', stroke_color='black', # Border color stroke_width=2, # Border thickness font_size=int(video_height // 20), size=(int(video_width * 0.8), None) ).with_start(entry["start"]).with_duration(entry["end"] - entry["start"]).with_position(('bottom')).with_opacity(0.8) audio_segment = None if add_voiceover: segment_audio_path = f"segment_{i}_voiceover.wav" desired_duration = entry["end"] - entry["start"] speaker_id = entry["speaker"] # Extract the speaker ID speaker_wav_path = f"speaker_{speaker_id}_sample.wav" # pass the intermediate value to prevent from breaking. generate_voiceover_clone([entry], desired_duration, target_language, speaker_wav_path, segment_audio_path) audio_clip = AudioFileClip(segment_audio_path) # Get and log all methods in AudioFileClip logger.info("Methods in AudioFileClip:") for method in dir(audio_clip): logger.info(method) # Log duration of the audio clip and the desired duration for debugging. logger.debug(f"Audio clip duration: {audio_clip.duration}, Desired duration: {desired_duration}") if audio_clip.duration < desired_duration: # Pad with silence if audio is too short silence_duration = desired_duration - audio_clip.duration # Concatenate the original audio and silence audio_clip = concatenate_audioclips([audio_clip, silence(duration=silence_duration)]) logger.info(f"Padded audio with {silence_duration} seconds of silence.") # Set the audio_segment to the required duration. audio_segment = audio_clip.with_start(entry["start"]).with_duration(desired_duration) return i, txt_clip, audio_segment def add_transcript_voiceover(video_path, translated_json, output_path, add_voiceover=False, target_language="en", speaker_sample_paths=None): """ Add transcript and voiceover to a video, segment by segment. """ video = VideoFileClip(video_path) font_path = "./NotoSansSC-Regular.ttf" text_clips = [] audio_segments = [] with concurrent.futures.ThreadPoolExecutor() as executor: futures = [executor.submit(process_entry, entry, i, video.w, video.h, add_voiceover, target_language, speaker_sample_paths) for i, entry in enumerate(translated_json)] # Collect results with original index i results = [] for future in concurrent.futures.as_completed(futures): try: i, txt_clip, audio_segment = future.result() results.append((i, txt_clip, audio_segment)) except Exception as e: logger.error(f"Error processing entry: {e}") # Sort by original index i results.sort(key=lambda x: x[0]) # Extract sorted clips text_clips = [clip for i, clip, segment in results] final_video = CompositeVideoClip([video] + text_clips) logger.info("Methods in CompositeVideoClip:") for method in dir(final_video): logger.info(method) if add_voiceover: audio_segments = [segment for i, clip, segment in results if segment is not None] final_audio = CompositeAudioClip(audio_segments) # Critical fix final_audio = final_audio.with_duration(video.duration) final_video = final_video.with_audio(final_audio) logger.info(f"Saving the final video to: {output_path}") final_video.write_videofile(output_path, codec="libx264", audio_codec="aac") logger.info("Video processing completed successfully.") # Voice cloning function with debug and error handling def generate_voiceover_clone(translated_json, desired_duration, target_language, speaker_wav_path, output_audio_path): try: full_text = " ".join(entry["translated"] for entry in translated_json) speed_tts = calculate_speed(full_text, desired_duration) if not speaker_wav_path or not os.path.exists(speaker_wav_path): return None, "❌ Please upload a valid speaker audio file." print(f"📥 Received text: {full_text}") print(f"📁 Speaker audio path: {speaker_wav_path}") print(f"🌐 Selected language: {target_language}") print(f"⏱️ Target speed: {speed_tts}") # Run TTS with speed control (if supported by model) tts.tts_to_file( text=full_text, speaker_wav=speaker_wav_path, language=language, file_path=output_audio_path, speed=speed_tts # <- add speed control ) print("✅ Voice cloning completed.") return output_path, "✅ Voice cloning completed successfully." except Exception as e: print("❌ Error during voice cloning:") traceback.print_exc() error_msg = f"❌ An error occurred: {str(e)}" return None, error_msg def truncated_linear(x): if x < 15: return 1 elif x > 25: return 1.3 else: slope = (1.3 - 1) / (25 - 15) return 1 + slope * (x - 15) def calculate_speed(text, desired_duration): # Calculate characters per second char_count = len(text) chars_per_second = char_count / (desired_duration + 0.001) # Apply truncated linear function to get speed speed = truncated_linear(chars_per_second) return speed def upload_and_manage(file, target_language, mode="transcription"): if file is None: logger.info("No file uploaded. Please upload a video/audio file.") return None, [], None, "No file uploaded. Please upload a video/audio file." try: start_time = time.time() # Start the timer logger.info(f"Started processing file: {file.name}") # Define paths for audio and output files audio_path = "audio.wav" output_video_path = "output_video.mp4" voiceover_path = "voiceover.wav" logger.info(f"Using audio path: {audio_path}, output video path: {output_video_path}, voiceover path: {voiceover_path}") # Step 1: Transcribe audio from uploaded media file and get timestamps logger.info("Transcribing audio...") transcription_json, source_language = transcribe_video_with_speakers(file.name) logger.info(f"Transcription completed. Detected source language: {source_language}") # Step 2: Translate the transcription logger.info(f"Translating transcription from {source_language} to {target_language}...") translated_json = translate_text(transcription_json, source_language, target_language) logger.info(f"Translation completed. Number of translated segments: {len(translated_json)}") # Step 3: Add transcript to video based on timestamps logger.info("Adding translated transcript to video...") add_transcript_voiceover(file.name, translated_json, output_video_path, mode == "Transcription with Voiceover", target_language, speaker_sample_path) logger.info(f"Transcript added to video. Output video saved at {output_video_path}") # Convert translated JSON into a format for the editable table logger.info("Converting translated JSON into editable table format...") editable_table = [ [float(entry["start"]), entry["original"], entry["translated"], float(entry["end"]), entry["speaker"]] for entry in translated_json ] # Calculate elapsed time elapsed_time = time.time() - start_time elapsed_time_display = f"Processing completed in {elapsed_time:.2f} seconds." logger.info(f"Processing completed in {elapsed_time:.2f} seconds.") return translated_json, editable_table, output_video_path, elapsed_time_display except Exception as e: logger.error(f"An error occurred: {str(e)}") return None, [], None, f"An error occurred: {str(e)}" # Gradio Interface with Tabs def build_interface(): with gr.Blocks(css=css) as demo: gr.Markdown("## Video Localization") with gr.Row(): with gr.Column(scale=4): file_input = gr.File(label="Upload Video/Audio File") language_input = gr.Dropdown(["en", "es", "fr", "zh"], label="Select Language") # Language codes process_mode = gr.Radio(choices=["Transcription", "Transcription with Voiceover"], label="Choose Processing Type", value="Transcription") submit_button = gr.Button("Post and Process") editable_translations = gr.State(value=[]) with gr.Column(scale=8): gr.Markdown("## Edit Translations") # Editable JSON Data editable_table = gr.Dataframe( value=[], # Default to an empty list to avoid undefined values headers=["start", "original", "translated", "end"], datatype=["number", "str", "str", "number"], row_count=1, # Initially empty col_count=4, interactive=[False, True, True, False], # Control editability label="Edit Translations", wrap=True # Enables text wrapping if supported ) save_changes_button = gr.Button("Save Changes") processed_video_output = gr.File(label="Download Processed Video", interactive=True) # Download button elapsed_time_display = gr.Textbox(label="Elapsed Time", lines=1, interactive=False) with gr.Column(scale=1): gr.Markdown("**Feedback**") feedback_input = gr.Textbox( placeholder="Leave your feedback here...", label=None, lines=3, ) feedback_btn = gr.Button("Submit Feedback") response_message = gr.Textbox(label=None, lines=1, interactive=False) db_download = gr.File(label="Download Database File", visible=False) # Link the feedback handling def feedback_submission(feedback): message, file_path = handle_feedback(feedback) if file_path: return message, gr.update(value=file_path, visible=True) return message, gr.update(visible=False) save_changes_button.click( update_translations, inputs=[file_input, editable_table, process_mode], outputs=[processed_video_output, elapsed_time_display] ) submit_button.click( upload_and_manage, inputs=[file_input, language_input, process_mode], outputs=[editable_translations, editable_table, processed_video_output, elapsed_time_display] ) # Connect submit button to save_feedback_db function feedback_btn.click( feedback_submission, inputs=[feedback_input], outputs=[response_message, db_download] ) return demo # Launch the Gradio interface demo = build_interface() demo.launch()