import numpy as np import re import concurrent.futures import gradio as gr from datetime import datetime import random import moviepy from transformers import pipeline from transformers.pipelines.audio_utils import ffmpeg_read from moviepy.editor import ( ImageClip, VideoFileClip, TextClip, CompositeVideoClip, CompositeAudioClip, AudioFileClip, concatenate_videoclips, concatenate_audioclips ) from PIL import Image, ImageDraw, ImageFont from moviepy.audio.AudioClip import AudioArrayClip import subprocess import speech_recognition as sr import json from nltk.tokenize import sent_tokenize import logging import whisperx import time import os import openai from openai import OpenAI import traceback from TTS.api import TTS import torch from TTS.tts.configs.xtts_config import XttsConfig from pydub import AudioSegment from pyannote.audio import Pipeline import traceback import wave logger = logging.getLogger(__name__) # Accept license terms for Coqui XTTS os.environ["COQUI_TOS_AGREED"] = "1" # torch.serialization.add_safe_globals([XttsConfig]) # Load XTTS model try: print("🔄 Loading XTTS model...") tts = TTS(model_name="tts_models/multilingual/multi-dataset/xtts_v2") print("✅ XTTS model loaded successfully.") except Exception as e: print("❌ Error loading XTTS model:") traceback.print_exc() raise e logger.info(gr.__version__) client = OpenAI( api_key= os.environ.get("openAI_api_key"), # This is the default and can be omitted ) hf_api_key = os.environ.get("hf_token") # def silence(duration, fps=44100): # """ # Returns a silent AudioClip of the specified duration. # """ # return AudioArrayClip(np.zeros((int(fps*duration), 2)), fps=fps) # def count_words_or_characters(text): # # Count non-Chinese words # non_chinese_words = len(re.findall(r'\b[a-zA-Z0-9]+\b', text)) # # Count Chinese characters # chinese_chars = len(re.findall(r'[\u4e00-\u9fff]', text)) # return non_chinese_words + chinese_chars # # Define the passcode # PASSCODE = "show_feedback_db" # css = """ # /* Adjust row height */ # .dataframe-container tr { # height: 50px !important; # } # /* Ensure text wrapping and prevent overflow */ # .dataframe-container td { # white-space: normal !important; # word-break: break-word !important; # } # /* Set column widths */ # [data-testid="block-container"] .scrolling-dataframe th:nth-child(1), # [data-testid="block-container"] .scrolling-dataframe td:nth-child(1) { # width: 6%; /* Start column */ # } # [data-testid="block-container"] .scrolling-dataframe th:nth-child(2), # [data-testid="block-container"] .scrolling-dataframe td:nth-child(2) { # width: 47%; /* Original text */ # } # [data-testid="block-container"] .scrolling-dataframe th:nth-child(3), # [data-testid="block-container"] .scrolling-dataframe td:nth-child(3) { # width: 47%; /* Translated text */ # } # [data-testid="block-container"] .scrolling-dataframe th:nth-child(4), # [data-testid="block-container"] .scrolling-dataframe td:nth-child(4) { # display: none !important; # } # """ # # Function to save feedback or provide access to the database file # def handle_feedback(feedback): # feedback = feedback.strip() # Clean up leading/trailing whitespace # if not feedback: # return "Feedback cannot be empty.", None # if feedback == PASSCODE: # # Provide access to the feedback.db file # return "Access granted! Download the database file below.", "feedback.db" # else: # # Save feedback to the database # with sqlite3.connect("feedback.db") as conn: # cursor = conn.cursor() # cursor.execute("CREATE TABLE IF NOT EXISTS studio_feedback (id INTEGER PRIMARY KEY, comment TEXT)") # cursor.execute("INSERT INTO studio_feedback (comment) VALUES (?)", (feedback,)) # conn.commit() # return "Thank you for your feedback!", None # # Configure logging # logging.basicConfig(level=logging.DEBUG, format="%(asctime)s - %(levelname)s - %(message)s") # logger = logging.getLogger(__name__) # logger.info(f"MoviePy Version: {moviepy.__version__}") # # def segment_background_audio(audio_path, output_path="background_segments.wav"): # # # Step 2: Initialize pyannote voice activity detection pipeline (you need Hugging Face token) # # pipeline = Pipeline.from_pretrained( # # "pyannote/voice-activity-detection", # # use_auth_token=hf_api_key # # ) # # # Step 3: Run VAD to get speech segments # # vad_result = pipeline(audio_path) # # print(f"Detected speech segments: {vad_result}") # # # Step 4: Load full audio and subtract speech segments # # full_audio = AudioSegment.from_wav(audio_path) # # background_audio = AudioSegment.silent(duration=len(full_audio)) # # for segment in vad_result.itersegments(): # # start_ms = int(segment.start * 1000) # # end_ms = int(segment.end * 1000) # # # Remove speech by muting that portion # # background_audio = background_audio.overlay(AudioSegment.silent(duration=end_ms - start_ms), position=start_ms) # # # Step 5: Subtract background_audio from full_audio # # result_audio = full_audio.overlay(background_audio) # # # Step 6: Export non-speech segments # # result_audio.export(output_path, format="wav") # # print(f"Saved non-speech (background) audio to: {output_path}") # # return True # def transcribe_video_with_speakers(video_path): # # Extract audio from video # video = VideoFileClip(video_path) # audio_path = "audio.wav" # video.audio.write_audiofile(audio_path) # logger.info(f"Audio extracted from video: {audio_path}") # # segment_result = segment_background_audio(audio_path) # # print(f"Saved non-speech (background) audio to local") # # Set up device # device = "cuda" if torch.cuda.is_available() else "cpu" # logger.info(f"Using device: {device}") # try: # # Load a medium model with float32 for broader compatibility # model = whisperx.load_model("medium", device=device, compute_type="float32") # logger.info("WhisperX model loaded") # # Transcribe # result = model.transcribe(audio_path, chunk_size=5, print_progress = True) # logger.info("Audio transcription completed") # # Get the detected language # detected_language = result["language"] # logger.debug(f"Detected language: {detected_language}") # # Alignment # model_a, metadata = whisperx.load_align_model(language_code=result["language"], device=device) # result = whisperx.align(result["segments"], model_a, metadata, audio_path, device) # logger.info("Transcription alignment completed") # # Diarization (works independently of Whisper model size) # diarize_model = whisperx.DiarizationPipeline(use_auth_token=hf_api_key, device=device) # diarize_segments = diarize_model(audio_path) # logger.info("Speaker diarization completed") # # Assign speakers # result = whisperx.assign_word_speakers(diarize_segments, result) # logger.info("Speakers assigned to transcribed segments") # except Exception as e: # logger.error(f"❌ WhisperX pipeline failed: {e}") # # Extract timestamps, text, and speaker IDs # transcript_with_speakers = [ # { # "start": segment["start"], # "end": segment["end"], # "text": segment["text"], # "speaker": segment["speaker"] # } # for segment in result["segments"] # ] # # Collect audio for each speaker # speaker_audio = {} # for segment in result["segments"]: # speaker = segment["speaker"] # if speaker not in speaker_audio: # speaker_audio[speaker] = [] # speaker_audio[speaker].append((segment["start"], segment["end"])) # # Collapse and truncate speaker audio # speaker_sample_paths = {} # audio_clip = AudioFileClip(audio_path) # for speaker, segments in speaker_audio.items(): # speaker_clips = [audio_clip.subclip(start, end) for start, end in segments] # combined_clip = concatenate_audioclips(speaker_clips) # truncated_clip = combined_clip.subclip(0, min(30, combined_clip.duration)) # sample_path = f"speaker_{speaker}_sample.wav" # truncated_clip.write_audiofile(sample_path) # speaker_sample_paths[speaker] = sample_path # logger.info(f"Created sample for {speaker}: {sample_path}") # # Clean up # video.close() # audio_clip.close() # os.remove(audio_path) # return transcript_with_speakers, detected_language # # Function to get the appropriate translation model based on target language # def get_translation_model(source_language, target_language): # """ # Get the translation model based on the source and target language. # Parameters: # - target_language (str): The language to translate the content into (e.g., 'es', 'fr'). # - source_language (str): The language of the input content (default is 'en' for English). # Returns: # - str: The translation model identifier. # """ # # List of allowable languages # allowable_languages = ["en", "es", "fr", "zh", "de", "it", "pt", "ja", "ko", "ru"] # # Validate source and target languages # if source_language not in allowable_languages: # logger.debug(f"Invalid source language '{source_language}'. Supported languages are: {', '.join(allowable_languages)}") # # Return a default model if source language is invalid # source_language = "en" # Default to 'en' # if target_language not in allowable_languages: # logger.debug(f"Invalid target language '{target_language}'. Supported languages are: {', '.join(allowable_languages)}") # # Return a default model if target language is invalid # target_language = "zh" # Default to 'zh' # if source_language == target_language: # source_language = "en" # Default to 'en' # target_language = "zh" # Default to 'zh' # # Return the model using string concatenation # return f"Helsinki-NLP/opus-mt-{source_language}-{target_language}" # def translate_single_entry(entry, translator): # original_text = entry["text"] # translated_text = translator(original_text)[0]['translation_text'] # return { # "start": entry["start"], # "original": original_text, # "translated": translated_text, # "end": entry["end"], # "speaker": entry["speaker"] # } # def translate_text(transcription_json, source_language, target_language): # # Load the translation model for the specified target language # translation_model_id = get_translation_model(source_language, target_language) # logger.debug(f"Translation model: {translation_model_id}") # translator = pipeline("translation", model=translation_model_id) # # Use ThreadPoolExecutor to parallelize translations # with concurrent.futures.ThreadPoolExecutor() as executor: # # Submit all translation tasks and collect results # translate_func = lambda entry: translate_single_entry(entry, translator) # translated_json = list(executor.map(translate_func, transcription_json)) # # Sort the translated_json by start time # translated_json.sort(key=lambda x: x["start"]) # # Log the components being added to translated_json # for entry in translated_json: # logger.debug("Added to translated_json: start=%s, original=%s, translated=%s, end=%s, speaker=%s", # entry["start"], entry["original"], entry["translated"], entry["end"], entry["speaker"]) # return translated_json # def update_translations(file, edited_table, mode): # """ # Update the translations based on user edits in the Gradio Dataframe. # """ # output_video_path = "output_video.mp4" # logger.debug(f"Editable Table: {edited_table}") # if file is None: # logger.info("No file uploaded. Please upload a video/audio file.") # return None, [], None, "No file uploaded. Please upload a video/audio file." # try: # start_time = time.time() # Start the timer # # Convert the edited_table (list of lists) back to list of dictionaries # updated_translations = [ # { # "start": row["start"], # Access by column name # "original": row["original"], # "translated": row["translated"], # "end": row["end"] # } # for _, row in edited_table.iterrows() # ] # # Call the function to process the video with updated translations # add_transcript_voiceover(file.name, updated_translations, output_video_path, mode=="Transcription with Voiceover") # # Calculate elapsed time # elapsed_time = time.time() - start_time # elapsed_time_display = f"Updates applied successfully in {elapsed_time:.2f} seconds." # return output_video_path, elapsed_time_display # except Exception as e: # raise ValueError(f"Error updating translations: {e}") # def create_subtitle_clip_pil(text, start_time, end_time, video_width, video_height, font_path): # try: # subtitle_width = int(video_width * 0.8) # subtitle_font_size = int(video_height // 20) # font = ImageFont.truetype(font_path, subtitle_font_size) # dummy_img = Image.new("RGBA", (subtitle_width, 1), (0, 0, 0, 0)) # draw = ImageDraw.Draw(dummy_img) # lines = [] # line = "" # for word in text.split(): # test_line = f"{line} {word}".strip() # bbox = draw.textbbox((0, 0), test_line, font=font) # w = bbox[2] - bbox[0] # if w <= subtitle_width - 10: # line = test_line # else: # lines.append(line) # line = word # lines.append(line) # line_heights = [draw.textbbox((0, 0), l, font=font)[3] - draw.textbbox((0, 0), l, font=font)[1] for l in lines] # total_height = sum(line_heights) + (len(lines) - 1) * 5 # img = Image.new("RGBA", (subtitle_width, total_height), (0, 0, 0, 0)) # draw = ImageDraw.Draw(img) # y = 0 # for idx, line in enumerate(lines): # bbox = draw.textbbox((0, 0), line, font=font) # w = bbox[2] - bbox[0] # draw.text(((subtitle_width - w) // 2, y), line, font=font, fill="yellow") # y += line_heights[idx] + 5 # img_np = np.array(img) # <- ✅ Fix: convert to NumPy # txt_clip = ImageClip(img_np).set_start(start_time).set_duration(end_time - start_time).set_position("bottom").set_opacity(0.8) # return txt_clip # except Exception as e: # logger.error(f"\u274c Failed to create subtitle clip: {e}") # return None # def process_entry(entry, i, video_width, video_height, add_voiceover, target_language, font_path, speaker_sample_paths=None): # logger.debug(f"Processing entry {i}: {entry}") # error_message = None # try: # txt_clip = create_subtitle_clip_pil(entry["translated"], entry["start"], entry["end"], video_width, video_height, font_path) # except Exception as e: # error_message = f"❌ Failed to create subtitle clip for entry {i}: {e}" # logger.error(error_message) # txt_clip = None # audio_segment = None # if add_voiceover: # try: # segment_audio_path = f"segment_{i}_voiceover.wav" # desired_duration = entry["end"] - entry["start"] # speaker = entry.get("speaker", "default") # speaker_wav_path = f"speaker_{speaker}_sample.wav" # output_path, status_msg, tts_error = generate_voiceover_clone([entry], desired_duration, target_language, speaker_wav_path, segment_audio_path) # if tts_error: # error_message = error_message + " | " + tts_error if error_message else tts_error # if not output_path or not os.path.exists(segment_audio_path): # raise FileNotFoundError(f"Voiceover file not generated at: {segment_audio_path}") # audio_clip = AudioFileClip(segment_audio_path) # logger.debug(f"Audio clip duration: {audio_clip.duration}, Desired duration: {desired_duration}") # if audio_clip.duration < desired_duration: # silence_duration = desired_duration - audio_clip.duration # audio_clip = concatenate_audioclips([audio_clip, silence(duration=silence_duration)]) # logger.info(f"Padded audio with {silence_duration} seconds of silence.") # audio_segment = audio_clip.set_start(entry["start"]).set_duration(desired_duration) # except Exception as e: # err = f"❌ Failed to generate audio segment for entry {i}: {e}" # logger.error(err) # error_message = error_message + " | " + err if error_message else err # audio_segment = None # return i, txt_clip, audio_segment, error_message # def add_transcript_voiceover(video_path, translated_json, output_path, add_voiceover=False, target_language="en", speaker_sample_paths=None): # video = VideoFileClip(video_path) # font_path = "./NotoSansSC-Regular.ttf" # text_clips = [] # audio_segments = [] # error_messages = [] # with concurrent.futures.ThreadPoolExecutor() as executor: # futures = [executor.submit(process_entry, entry, i, video.w, video.h, add_voiceover, target_language, font_path, speaker_sample_paths) # for i, entry in enumerate(translated_json)] # results = [] # for future in concurrent.futures.as_completed(futures): # try: # i, txt_clip, audio_segment, error = future.result() # results.append((i, txt_clip, audio_segment)) # if error: # error_messages.append(f"[Entry {i}] {error}") # except Exception as e: # err = f"❌ Unexpected error in future result: {e}" # logger.error(err) # error_messages.append(err) # # Sort by entry index to ensure order # results.sort(key=lambda x: x[0]) # text_clips = [clip for _, clip, _ in results if clip] # if add_voiceover: # audio_segments = [segment for _, _, segment in results if segment] # final_video = CompositeVideoClip([video] + text_clips) # if add_voiceover: # if audio_segments: # final_audio = CompositeAudioClip(audio_segments).set_duration(video.duration) # final_video = final_video.set_audio(final_audio) # else: # logger.warning("⚠️ No audio segments available. Adding silent fallback.") # silent_audio = AudioClip(lambda t: 0, duration=video.duration) # final_video = final_video.set_audio(silent_audio) # logger.info(f"Saving the final video to: {output_path}") # final_video.write_videofile(output_path, codec="libx264", audio_codec="aac") # logger.info("Video processing completed successfully.") # # Optional: return errors # if error_messages: # logger.warning("⚠️ Errors encountered during processing:") # for msg in error_messages: # logger.warning(msg) # return error_messages # # Initialize TTS model only once (outside the function) # tts = TTS(model_name="tts_models/multilingual/multi-dataset/xtts_v2") # def generate_voiceover_clone(translated_json, desired_duration, target_language, speaker_wav_path, output_audio_path): # try: # full_text = " ".join(entry["translated"] for entry in translated_json if "translated" in entry and entry["translated"].strip()) # if not full_text.strip(): # msg = "❌ Translated text is empty." # logger.error(msg) # return None, msg, msg # if not speaker_wav_path or not os.path.exists(speaker_wav_path): # msg = f"❌ Speaker audio not found: {speaker_wav_path}" # logger.error(msg) # return None, msg, msg # # # Truncate text based on max token assumption (~60 tokens) # # MAX_TTS_TOKENS = 60 # # tokens = full_text.split() # crude token count # # if len(tokens) > MAX_TTS_TOKENS: # # logger.warning(f"⚠️ Text too long for TTS model ({len(tokens)} tokens). Truncating to {MAX_TTS_TOKENS} tokens.") # # full_text = " ".join(tokens[:MAX_TTS_TOKENS]) # speed_tts = calibrated_speed(full_text, desired_duration) # tts.tts_to_file( # text=full_text, # speaker_wav=speaker_wav_path, # language=target_language, # file_path=output_audio_path, # speed=speed_tts, # split_sentences=True # ) # if not os.path.exists(output_audio_path): # msg = f"❌ Voiceover file not generated at: {output_audio_path}" # logger.error(msg) # return None, msg, msg # msg = "✅ Voice cloning completed successfully." # logger.info(msg) # return output_audio_path, msg, None # except Exception as e: # err_msg = f"❌ An error occurred: {str(e)}" # logger.error("❌ Error during voice cloning:") # logger.error(traceback.format_exc()) # return None, err_msg, err_msg # def calibrated_speed(text, desired_duration): # """ # Compute a speed factor to help TTS fit audio into desired duration, # using a simple truncated linear function of characters per second. # """ # char_count = len(text.strip()) # if char_count == 0 or desired_duration <= 0: # return 1.0 # fallback # cps = char_count / desired_duration # characters per second # # Truncated linear mapping # if cps < 10: # return 1.0 # elif cps > 25: # return 1.4 # else: # # Linearly scale between cps 10 -> 25 and speed 1.0 -> 1.3 # slope = (1.4 - 1.0) / (25 - 10) # return 1.0 + slope * (cps - 10) # def upload_and_manage(file, target_language, mode="transcription"): # if file is None: # logger.info("No file uploaded. Please upload a video/audio file.") # return None, [], None, "No file uploaded. Please upload a video/audio file." # try: # start_time = time.time() # Start the timer # logger.info(f"Started processing file: {file.name}") # # Define paths for audio and output files # audio_path = "audio.wav" # output_video_path = "output_video.mp4" # voiceover_path = "voiceover.wav" # logger.info(f"Using audio path: {audio_path}, output video path: {output_video_path}, voiceover path: {voiceover_path}") # # Step 1: Transcribe audio from uploaded media file and get timestamps # logger.info("Transcribing audio...") # transcription_json, source_language = transcribe_video_with_speakers(file.name) # logger.info(f"Transcription completed. Detected source language: {source_language}") # # Step 2: Translate the transcription # logger.info(f"Translating transcription from {source_language} to {target_language}...") # translated_json = translate_text(transcription_json, source_language, target_language) # logger.info(f"Translation completed. Number of translated segments: {len(translated_json)}") # # Step 3: Add transcript to video based on timestamps # logger.info("Adding translated transcript to video...") # add_transcript_voiceover(file.name, translated_json, output_video_path, mode == "Transcription with Voiceover", target_language) # logger.info(f"Transcript added to video. Output video saved at {output_video_path}") # # Convert translated JSON into a format for the editable table # logger.info("Converting translated JSON into editable table format...") # editable_table = [ # [float(entry["start"]), entry["original"], entry["translated"], float(entry["end"]), entry["speaker"]] # for entry in translated_json # ] # # Calculate elapsed time # elapsed_time = time.time() - start_time # elapsed_time_display = f"Processing completed in {elapsed_time:.2f} seconds." # logger.info(f"Processing completed in {elapsed_time:.2f} seconds.") # return translated_json, editable_table, output_video_path, elapsed_time_display # except Exception as e: # logger.error(f"An error occurred: {str(e)}") # return None, [], None, f"An error occurred: {str(e)}" # # Gradio Interface with Tabs # def build_interface(): # with gr.Blocks(css=css) as demo: # gr.Markdown("## Video Localization") # with gr.Row(): # with gr.Column(scale=4): # file_input = gr.File(label="Upload Video/Audio File") # language_input = gr.Dropdown(["en", "es", "fr", "zh"], label="Select Language") # Language codes # process_mode = gr.Radio(choices=["Transcription", "Transcription with Voiceover"], label="Choose Processing Type", value="Transcription") # submit_button = gr.Button("Post and Process") # editable_translations = gr.State(value=[]) # with gr.Column(scale=8): # gr.Markdown("## Edit Translations") # # Editable JSON Data # editable_table = gr.Dataframe( # value=[], # Default to an empty list to avoid undefined values # headers=["start", "original", "translated", "end", "speaker"], # datatype=["number", "str", "str", "number", "str"], # row_count=1, # Initially empty # col_count=5, # interactive=[False, True, True, False, False], # Control editability # label="Edit Translations", # wrap=True # Enables text wrapping if supported # ) # save_changes_button = gr.Button("Save Changes") # processed_video_output = gr.File(label="Download Processed Video", interactive=True) # Download button # elapsed_time_display = gr.Textbox(label="Elapsed Time", lines=1, interactive=False) # with gr.Column(scale=1): # gr.Markdown("**Feedback**") # feedback_input = gr.Textbox( # placeholder="Leave your feedback here...", # label=None, # lines=3, # ) # feedback_btn = gr.Button("Submit Feedback") # response_message = gr.Textbox(label=None, lines=1, interactive=False) # db_download = gr.File(label="Download Database File", visible=False) # # Link the feedback handling # def feedback_submission(feedback): # message, file_path = handle_feedback(feedback) # if file_path: # return message, gr.update(value=file_path, visible=True) # return message, gr.update(visible=False) # save_changes_button.click( # update_translations, # inputs=[file_input, editable_table, process_mode], # outputs=[processed_video_output, elapsed_time_display] # ) # submit_button.click( # upload_and_manage, # inputs=[file_input, language_input, process_mode], # outputs=[editable_translations, editable_table, processed_video_output, elapsed_time_display] # ) # # Connect submit button to save_feedback_db function # feedback_btn.click( # feedback_submission, # inputs=[feedback_input], # outputs=[response_message, db_download] # ) # return demo # # Launch the Gradio interface # demo = build_interface() # demo.launch() import gradio as gr def dummy_func(x): return x, "Success" with gr.Blocks() as demo: inp = gr.Textbox() out1 = gr.Textbox() out2 = gr.Textbox() btn = gr.Button("Run") btn.click(dummy_func, inputs=inp, outputs=[out1, out2]) demo.launch()