|
import numpy as np |
|
import re |
|
import concurrent.futures |
|
import gradio as gr |
|
from datetime import datetime |
|
import random |
|
import moviepy |
|
from transformers import pipeline |
|
from transformers.pipelines.audio_utils import ffmpeg_read |
|
from moviepy.editor import ( |
|
VideoFileClip, |
|
TextClip, |
|
CompositeVideoClip, |
|
CompositeAudioClip, |
|
AudioFileClip, |
|
concatenate_videoclips, |
|
concatenate_audioclips |
|
) |
|
from moviepy.audio.AudioClip import AudioArrayClip |
|
import subprocess |
|
import speech_recognition as sr |
|
import json |
|
from nltk.tokenize import sent_tokenize |
|
import logging |
|
from textblob import TextBlob |
|
import whisperx |
|
import time |
|
import os |
|
import openai |
|
from openai import OpenAI |
|
import traceback |
|
from TTS.api import TTS |
|
import torch |
|
from TTS.tts.configs.xtts_config import XttsConfig |
|
|
|
|
|
os.environ["COQUI_TOS_AGREED"] = "1" |
|
torch.serialization.add_safe_globals([XttsConfig]) |
|
|
|
|
|
try: |
|
print("π Loading XTTS model...") |
|
tts = TTS(model_name="tts_models/multilingual/multi-dataset/xtts_v2") |
|
print("β
XTTS model loaded successfully.") |
|
except Exception as e: |
|
print("β Error loading XTTS model:") |
|
traceback.print_exc() |
|
raise e |
|
|
|
client = OpenAI( |
|
api_key= os.environ.get("openAI_api_key"), |
|
) |
|
hf_api_key = os.environ.get("hf_token") |
|
|
|
def silence(duration, fps=44100): |
|
""" |
|
Returns a silent AudioClip of the specified duration. |
|
""" |
|
return AudioArrayClip(np.zeros((int(fps*duration), 2)), fps=fps) |
|
|
|
def count_words_or_characters(text): |
|
|
|
non_chinese_words = len(re.findall(r'\b[a-zA-Z0-9]+\b', text)) |
|
|
|
|
|
chinese_chars = len(re.findall(r'[\u4e00-\u9fff]', text)) |
|
|
|
return non_chinese_words + chinese_chars |
|
|
|
|
|
PASSCODE = "show_feedback_db" |
|
|
|
css = """ |
|
/* Adjust row height */ |
|
.dataframe-container tr { |
|
height: 50px !important; |
|
} |
|
|
|
/* Ensure text wrapping and prevent overflow */ |
|
.dataframe-container td { |
|
white-space: normal !important; |
|
word-break: break-word !important; |
|
} |
|
|
|
/* Set column widths */ |
|
[data-testid="block-container"] .scrolling-dataframe th:nth-child(1), |
|
[data-testid="block-container"] .scrolling-dataframe td:nth-child(1) { |
|
width: 6%; /* Start column */ |
|
} |
|
|
|
[data-testid="block-container"] .scrolling-dataframe th:nth-child(2), |
|
[data-testid="block-container"] .scrolling-dataframe td:nth-child(2) { |
|
width: 47%; /* Original text */ |
|
} |
|
|
|
[data-testid="block-container"] .scrolling-dataframe th:nth-child(3), |
|
[data-testid="block-container"] .scrolling-dataframe td:nth-child(3) { |
|
width: 47%; /* Translated text */ |
|
} |
|
|
|
[data-testid="block-container"] .scrolling-dataframe th:nth-child(4), |
|
[data-testid="block-container"] .scrolling-dataframe td:nth-child(4) { |
|
display: none !important; |
|
} |
|
""" |
|
|
|
|
|
def handle_feedback(feedback): |
|
feedback = feedback.strip() |
|
if not feedback: |
|
return "Feedback cannot be empty.", None |
|
|
|
if feedback == PASSCODE: |
|
|
|
return "Access granted! Download the database file below.", "feedback.db" |
|
else: |
|
|
|
with sqlite3.connect("feedback.db") as conn: |
|
cursor = conn.cursor() |
|
cursor.execute("CREATE TABLE IF NOT EXISTS studio_feedback (id INTEGER PRIMARY KEY, comment TEXT)") |
|
cursor.execute("INSERT INTO studio_feedback (comment) VALUES (?)", (feedback,)) |
|
conn.commit() |
|
return "Thank you for your feedback!", None |
|
|
|
|
|
logging.basicConfig(level=logging.DEBUG, format="%(asctime)s - %(levelname)s - %(message)s") |
|
logger = logging.getLogger(__name__) |
|
logger.info(f"MoviePy Version: {moviepy.__version__}") |
|
|
|
def transcribe_video_with_speakers(video_path): |
|
|
|
video = VideoFileClip(video_path) |
|
audio_path = "audio.wav" |
|
video.audio.write_audiofile(audio_path) |
|
logger.info(f"Audio extracted from video: {audio_path}") |
|
|
|
|
|
device = "cuda" if torch.cuda.is_available() else "cpu" |
|
logger.info(f"Using device: {device}") |
|
|
|
try: |
|
|
|
model = whisperx.load_model("medium", device=device, compute_type="float32") |
|
logger.info("WhisperX model loaded") |
|
|
|
|
|
result = model.transcribe(audio_path) |
|
logger.info("Audio transcription completed") |
|
|
|
|
|
detected_language = result["language"] |
|
logger.debug(f"Detected language: {detected_language}") |
|
|
|
model_a, metadata = whisperx.load_align_model(language_code=result["language"], device=device) |
|
result = whisperx.align(result["segments"], model_a, metadata, audio_path, device) |
|
logger.info("Transcription alignment completed") |
|
|
|
|
|
diarize_model = whisperx.DiarizationPipeline(use_auth_token=hf_api_key, device=device) |
|
diarize_segments = diarize_model(audio_path) |
|
logger.info("Speaker diarization completed") |
|
|
|
|
|
result = whisperx.assign_word_speakers(diarize_segments, result) |
|
logger.info("Speakers assigned to transcribed segments") |
|
|
|
except Exception as e: |
|
logger.error(f"β WhisperX pipeline failed: {e}") |
|
|
|
|
|
transcript_with_speakers = [ |
|
{ |
|
"start": segment["start"], |
|
"end": segment["end"], |
|
"text": segment["text"], |
|
"speaker": segment["speaker"] |
|
} |
|
for segment in result["segments"] |
|
] |
|
|
|
|
|
speaker_audio = {} |
|
for segment in result["segments"]: |
|
speaker = segment["speaker"] |
|
if speaker not in speaker_audio: |
|
speaker_audio[speaker] = [] |
|
speaker_audio[speaker].append((segment["start"], segment["end"])) |
|
|
|
|
|
speaker_sample_paths = {} |
|
audio_clip = AudioFileClip(audio_path) |
|
for speaker, segments in speaker_audio.items(): |
|
speaker_clips = [audio_clip.subclip(start, end) for start, end in segments] |
|
combined_clip = concatenate_audioclips(speaker_clips) |
|
truncated_clip = combined_clip.subclip(0, min(30, combined_clip.duration)) |
|
sample_path = f"speaker_{speaker}_sample.wav" |
|
truncated_clip.write_audiofile(sample_path) |
|
speaker_sample_paths[speaker] = sample_path |
|
logger.info(f"Created sample for {speaker}: {sample_path}") |
|
|
|
|
|
video.close() |
|
audio_clip.close() |
|
os.remove(audio_path) |
|
|
|
return transcript_with_speakers, detected_language |
|
|
|
|
|
def get_translation_model(source_language, target_language): |
|
""" |
|
Get the translation model based on the source and target language. |
|
|
|
Parameters: |
|
- target_language (str): The language to translate the content into (e.g., 'es', 'fr'). |
|
- source_language (str): The language of the input content (default is 'en' for English). |
|
|
|
Returns: |
|
- str: The translation model identifier. |
|
""" |
|
|
|
allowable_languages = ["en", "es", "fr", "zh", "de", "it", "pt", "ja", "ko", "ru"] |
|
|
|
|
|
if source_language not in allowable_languages: |
|
logger.debug(f"Invalid source language '{source_language}'. Supported languages are: {', '.join(allowable_languages)}") |
|
|
|
source_language = "en" |
|
|
|
if target_language not in allowable_languages: |
|
logger.debug(f"Invalid target language '{target_language}'. Supported languages are: {', '.join(allowable_languages)}") |
|
|
|
target_language = "zh" |
|
|
|
if source_language == target_language: |
|
source_language = "en" |
|
target_language = "zh" |
|
|
|
|
|
return f"Helsinki-NLP/opus-mt-{source_language}-{target_language}" |
|
|
|
def translate_single_entry(entry, translator): |
|
original_text = entry["text"] |
|
translated_text = translator(original_text)[0]['translation_text'] |
|
return { |
|
"start": entry["start"], |
|
"original": original_text, |
|
"translated": translated_text, |
|
"end": entry["end"] |
|
} |
|
|
|
def translate_text(transcription_json, source_language, target_language): |
|
|
|
translation_model_id = get_translation_model(source_language, target_language) |
|
logger.debug(f"Translation model: {translation_model_id}") |
|
translator = pipeline("translation", model=translation_model_id) |
|
|
|
|
|
with concurrent.futures.ThreadPoolExecutor() as executor: |
|
|
|
translate_func = lambda entry: translate_single_entry(entry, translator) |
|
translated_json = list(executor.map(translate_func, transcription_json)) |
|
|
|
|
|
translated_json.sort(key=lambda x: x["start"]) |
|
|
|
|
|
for entry in translated_json: |
|
logger.debug("Added to translated_json: start=%s, original=%s, translated=%s, end=%s", |
|
entry["start"], entry["original"], entry["translated"], entry["end"]) |
|
|
|
return translated_json |
|
|
|
def update_translations(file, edited_table, mode): |
|
""" |
|
Update the translations based on user edits in the Gradio Dataframe. |
|
""" |
|
output_video_path = "output_video.mp4" |
|
logger.debug(f"Editable Table: {edited_table}") |
|
|
|
if file is None: |
|
logger.info("No file uploaded. Please upload a video/audio file.") |
|
return None, [], None, "No file uploaded. Please upload a video/audio file." |
|
|
|
try: |
|
start_time = time.time() |
|
|
|
|
|
updated_translations = [ |
|
{ |
|
"start": row["start"], |
|
"original": row["original"], |
|
"translated": row["translated"], |
|
"end": row["end"] |
|
} |
|
for _, row in edited_table.iterrows() |
|
] |
|
|
|
|
|
add_transcript_voiceover(file.name, updated_translations, output_video_path, mode=="Transcription with Voiceover") |
|
|
|
|
|
elapsed_time = time.time() - start_time |
|
elapsed_time_display = f"Updates applied successfully in {elapsed_time:.2f} seconds." |
|
|
|
return output_video_path, elapsed_time_display |
|
|
|
except Exception as e: |
|
raise ValueError(f"Error updating translations: {e}") |
|
|
|
def process_entry(entry, i, video_width, video_height, add_voiceover, target_language, speaker_sample_paths=None): |
|
logger.debug(f"Processing entry {i}: {entry}") |
|
|
|
|
|
txt_clip = TextClip( |
|
txt=entry["translated"], |
|
font="./NotoSansSC-Regular.ttf", |
|
color='yellow', |
|
stroke_color='black', |
|
stroke_width=2, |
|
fontsize=int(video_height // 20), |
|
).with_start(entry["start"]).with_duration(entry["end"] - entry["start"]).with_position(('bottom')).with_opacity(0.8) |
|
|
|
audio_segment = None |
|
if add_voiceover: |
|
segment_audio_path = f"segment_{i}_voiceover.wav" |
|
desired_duration = entry["end"] - entry["start"] |
|
speaker_id = entry["speaker"] |
|
speaker_wav_path = f"speaker_{speaker_id}_sample.wav" |
|
generate_voiceover_clone([entry], desired_duration, target_language, speaker_wav_path, segment_audio_path) |
|
|
|
audio_clip = AudioFileClip(segment_audio_path) |
|
|
|
logger.info("Methods in AudioFileClip:") |
|
for method in dir(audio_clip): |
|
logger.info(method) |
|
|
|
|
|
logger.debug(f"Audio clip duration: {audio_clip.duration}, Desired duration: {desired_duration}") |
|
|
|
if audio_clip.duration < desired_duration: |
|
|
|
silence_duration = desired_duration - audio_clip.duration |
|
|
|
|
|
audio_clip = concatenate_audioclips([audio_clip, silence(duration=silence_duration)]) |
|
logger.info(f"Padded audio with {silence_duration} seconds of silence.") |
|
|
|
|
|
audio_segment = audio_clip.with_start(entry["start"]).with_duration(desired_duration) |
|
|
|
return i, txt_clip, audio_segment |
|
|
|
def add_transcript_voiceover(video_path, translated_json, output_path, add_voiceover=False, target_language="en", speaker_sample_paths=None): |
|
""" |
|
Add transcript and voiceover to a video, segment by segment. |
|
""" |
|
video = VideoFileClip(video_path) |
|
font_path = "./NotoSansSC-Regular.ttf" |
|
|
|
text_clips = [] |
|
audio_segments = [] |
|
|
|
with concurrent.futures.ThreadPoolExecutor() as executor: |
|
futures = [executor.submit(process_entry, entry, i, video.w, video.h, add_voiceover, target_language, speaker_sample_paths) |
|
for i, entry in enumerate(translated_json)] |
|
|
|
|
|
results = [] |
|
for future in concurrent.futures.as_completed(futures): |
|
try: |
|
i, txt_clip, audio_segment = future.result() |
|
results.append((i, txt_clip, audio_segment)) |
|
except Exception as e: |
|
logger.error(f"Error processing entry: {e}") |
|
|
|
|
|
results.sort(key=lambda x: x[0]) |
|
|
|
|
|
text_clips = [clip for i, clip, segment in results] |
|
|
|
final_video = CompositeVideoClip([video] + text_clips) |
|
|
|
logger.info("Methods in CompositeVideoClip:") |
|
for method in dir(final_video): |
|
logger.info(method) |
|
|
|
if add_voiceover: |
|
audio_segments = [segment for i, clip, segment in results if segment is not None] |
|
final_audio = CompositeAudioClip(audio_segments) |
|
final_audio = final_audio.with_duration(video.duration) |
|
|
|
final_video = final_video.with_audio(final_audio) |
|
|
|
logger.info(f"Saving the final video to: {output_path}") |
|
final_video.write_videofile(output_path, codec="libx264", audio_codec="aac") |
|
|
|
logger.info("Video processing completed successfully.") |
|
|
|
|
|
def generate_voiceover_clone(translated_json, desired_duration, target_language, speaker_wav_path, output_audio_path): |
|
try: |
|
full_text = " ".join(entry["translated"] for entry in translated_json) |
|
speed_tts = calculate_speed(full_text, desired_duration) |
|
if not speaker_wav_path or not os.path.exists(speaker_wav_path): |
|
return None, "β Please upload a valid speaker audio file." |
|
|
|
print(f"π₯ Received text: {full_text}") |
|
print(f"π Speaker audio path: {speaker_wav_path}") |
|
print(f"π Selected language: {target_language}") |
|
print(f"β±οΈ Target speed: {speed_tts}") |
|
|
|
|
|
tts.tts_to_file( |
|
text=full_text, |
|
speaker_wav=speaker_wav_path, |
|
language=language, |
|
file_path=output_audio_path, |
|
speed=speed_tts |
|
) |
|
print("β
Voice cloning completed.") |
|
return output_path, "β
Voice cloning completed successfully." |
|
|
|
except Exception as e: |
|
print("β Error during voice cloning:") |
|
traceback.print_exc() |
|
error_msg = f"β An error occurred: {str(e)}" |
|
return None, error_msg |
|
|
|
def truncated_linear(x): |
|
if x < 15: |
|
return 1 |
|
elif x > 25: |
|
return 1.3 |
|
else: |
|
slope = (1.3 - 1) / (25 - 15) |
|
return 1 + slope * (x - 15) |
|
|
|
def calculate_speed(text, desired_duration): |
|
|
|
char_count = len(text) |
|
chars_per_second = char_count / (desired_duration + 0.001) |
|
|
|
|
|
speed = truncated_linear(chars_per_second) |
|
|
|
return speed |
|
|
|
def upload_and_manage(file, target_language, mode="transcription"): |
|
if file is None: |
|
logger.info("No file uploaded. Please upload a video/audio file.") |
|
return None, [], None, "No file uploaded. Please upload a video/audio file." |
|
|
|
try: |
|
start_time = time.time() |
|
logger.info(f"Started processing file: {file.name}") |
|
|
|
|
|
audio_path = "audio.wav" |
|
output_video_path = "output_video.mp4" |
|
voiceover_path = "voiceover.wav" |
|
logger.info(f"Using audio path: {audio_path}, output video path: {output_video_path}, voiceover path: {voiceover_path}") |
|
|
|
|
|
logger.info("Transcribing audio...") |
|
transcription_json, source_language = transcribe_video_with_speakers(file.name) |
|
logger.info(f"Transcription completed. Detected source language: {source_language}") |
|
|
|
|
|
logger.info(f"Translating transcription from {source_language} to {target_language}...") |
|
translated_json = translate_text(transcription_json, source_language, target_language) |
|
logger.info(f"Translation completed. Number of translated segments: {len(translated_json)}") |
|
|
|
|
|
logger.info("Adding translated transcript to video...") |
|
add_transcript_voiceover(file.name, translated_json, output_video_path, mode == "Transcription with Voiceover", target_language) |
|
logger.info(f"Transcript added to video. Output video saved at {output_video_path}") |
|
|
|
|
|
logger.info("Converting translated JSON into editable table format...") |
|
editable_table = [ |
|
[float(entry["start"]), entry["original"], entry["translated"], float(entry["end"]), entry["speaker"]] |
|
for entry in translated_json |
|
] |
|
|
|
|
|
elapsed_time = time.time() - start_time |
|
elapsed_time_display = f"Processing completed in {elapsed_time:.2f} seconds." |
|
logger.info(f"Processing completed in {elapsed_time:.2f} seconds.") |
|
|
|
return translated_json, editable_table, output_video_path, elapsed_time_display |
|
|
|
except Exception as e: |
|
logger.error(f"An error occurred: {str(e)}") |
|
return None, [], None, f"An error occurred: {str(e)}" |
|
|
|
def build_interface(): |
|
with gr.Blocks(css=css) as demo: |
|
gr.Markdown("## Video Localization") |
|
with gr.Row(): |
|
with gr.Column(scale=4): |
|
file_input = gr.File(label="Upload Video/Audio File") |
|
language_input = gr.Dropdown(["en", "es", "fr", "zh"], label="Select Language") |
|
process_mode = gr.Radio(choices=["Transcription", "Transcription with Voiceover"], label="Choose Processing Type", value="Transcription") |
|
submit_button = gr.Button("Post and Process") |
|
editable_translations = gr.State(value=[]) |
|
|
|
with gr.Column(scale=8): |
|
gr.Markdown("## Edit Translations") |
|
|
|
|
|
editable_table = gr.Dataframe( |
|
value=[], |
|
headers=["start", "original", "translated", "end", "speaker"], |
|
datatype=["number", "str", "str", "number", "str"], |
|
row_count=1, |
|
col_count=5, |
|
interactive=[False, True, True, False, False], |
|
label="Edit Translations", |
|
wrap=True |
|
) |
|
save_changes_button = gr.Button("Save Changes") |
|
processed_video_output = gr.File(label="Download Processed Video", interactive=True) |
|
elapsed_time_display = gr.Textbox(label="Elapsed Time", lines=1, interactive=False) |
|
|
|
with gr.Column(scale=1): |
|
gr.Markdown("**Feedback**") |
|
feedback_input = gr.Textbox( |
|
placeholder="Leave your feedback here...", |
|
label=None, |
|
lines=3, |
|
) |
|
feedback_btn = gr.Button("Submit Feedback") |
|
response_message = gr.Textbox(label=None, lines=1, interactive=False) |
|
db_download = gr.File(label="Download Database File", visible=False) |
|
|
|
|
|
def feedback_submission(feedback): |
|
message, file_path = handle_feedback(feedback) |
|
if file_path: |
|
return message, gr.update(value=file_path, visible=True) |
|
return message, gr.update(visible=False) |
|
|
|
save_changes_button.click( |
|
update_translations, |
|
inputs=[file_input, editable_table, process_mode], |
|
outputs=[processed_video_output, elapsed_time_display] |
|
) |
|
|
|
submit_button.click( |
|
upload_and_manage, |
|
inputs=[file_input, language_input, process_mode], |
|
outputs=[editable_translations, editable_table, processed_video_output, elapsed_time_display] |
|
) |
|
|
|
|
|
feedback_btn.click( |
|
feedback_submission, |
|
inputs=[feedback_input], |
|
outputs=[response_message, db_download] |
|
) |
|
|
|
return demo |
|
|
|
|
|
demo = build_interface() |
|
demo.launch() |