Spaces:
Running
Running
import numpy as np | |
import cvxpy as cp | |
import re | |
import concurrent.futures | |
import gradio as gr | |
from datetime import datetime | |
import random | |
import moviepy | |
from transformers import pipeline | |
from transformers.pipelines.audio_utils import ffmpeg_read | |
from moviepy.editor import ( | |
ImageClip, | |
VideoFileClip, | |
TextClip, | |
CompositeVideoClip, | |
CompositeAudioClip, | |
AudioFileClip, | |
concatenate_videoclips, | |
concatenate_audioclips | |
) | |
from PIL import Image, ImageDraw, ImageFont | |
from moviepy.audio.AudioClip import AudioArrayClip | |
import subprocess | |
import json | |
import logging | |
import whisperx | |
import time | |
import os | |
import openai | |
from openai import OpenAI | |
import traceback | |
from TTS.api import TTS | |
import torch | |
from pydub import AudioSegment | |
from pyannote.audio import Pipeline | |
import traceback | |
import wave | |
logger = logging.getLogger(__name__) | |
# Configure logging | |
logging.basicConfig(level=logging.DEBUG, format="%(asctime)s - %(levelname)s - %(message)s") | |
logger = logging.getLogger(__name__) | |
logger.info(f"MoviePy Version: {moviepy.__version__}") | |
# Accept license terms for Coqui XTTS | |
os.environ["COQUI_TOS_AGREED"] = "1" | |
# torch.serialization.add_safe_globals([XttsConfig]) | |
logger.info(gr.__version__) | |
client = OpenAI( | |
api_key= os.environ.get("openAI_api_key"), # This is the default and can be omitted | |
) | |
hf_api_key = os.environ.get("hf_token") | |
def silence(duration, fps=44100): | |
""" | |
Returns a silent AudioClip of the specified duration. | |
""" | |
return AudioArrayClip(np.zeros((int(fps*duration), 2)), fps=fps) | |
def count_words_or_characters(text): | |
# Count non-Chinese words | |
non_chinese_words = len(re.findall(r'\b[a-zA-Z0-9]+\b', text)) | |
# Count Chinese characters | |
chinese_chars = len(re.findall(r'[\u4e00-\u9fff]', text)) | |
return non_chinese_words + chinese_chars | |
# Define the passcode | |
PASSCODE = "show_feedback_db" | |
css = """ | |
/* Adjust row height */ | |
.dataframe-container tr { | |
height: 50px !important; | |
} | |
/* Ensure text wrapping and prevent overflow */ | |
.dataframe-container td { | |
white-space: normal !important; | |
word-break: break-word !important; | |
} | |
/* Set column widths */ | |
[data-testid="block-container"] .scrolling-dataframe th:nth-child(1), | |
[data-testid="block-container"] .scrolling-dataframe td:nth-child(1) { | |
width: 6%; /* Start column */ | |
} | |
[data-testid="block-container"] .scrolling-dataframe th:nth-child(2), | |
[data-testid="block-container"] .scrolling-dataframe td:nth-child(2) { | |
width: 47%; /* Original text */ | |
} | |
[data-testid="block-container"] .scrolling-dataframe th:nth-child(3), | |
[data-testid="block-container"] .scrolling-dataframe td:nth-child(3) { | |
width: 47%; /* Translated text */ | |
} | |
[data-testid="block-container"] .scrolling-dataframe th:nth-child(4), | |
[data-testid="block-container"] .scrolling-dataframe td:nth-child(4) { | |
display: none !important; | |
} | |
""" | |
# Function to save feedback or provide access to the database file | |
def handle_feedback(feedback): | |
feedback = feedback.strip() # Clean up leading/trailing whitespace | |
if not feedback: | |
return "Feedback cannot be empty.", None | |
if feedback == PASSCODE: | |
# Provide access to the feedback.db file | |
return "Access granted! Download the database file below.", "feedback.db" | |
else: | |
# Save feedback to the database | |
with sqlite3.connect("feedback.db") as conn: | |
cursor = conn.cursor() | |
cursor.execute("CREATE TABLE IF NOT EXISTS studio_feedback (id INTEGER PRIMARY KEY, comment TEXT)") | |
cursor.execute("INSERT INTO studio_feedback (comment) VALUES (?)", (feedback,)) | |
conn.commit() | |
return "Thank you for your feedback!", None | |
def segment_background_audio(audio_path, background_audio_path="background_segments.wav"): | |
pipeline = Pipeline.from_pretrained("pyannote/voice-activity-detection", use_auth_token=hf_api_key) | |
vad_result = pipeline(audio_path) | |
full_audio = AudioSegment.from_wav(audio_path) | |
full_duration_sec = len(full_audio) / 1000.0 | |
current_time = 0.0 | |
result_audio = AudioSegment.empty() | |
for segment in vad_result.itersegments(): | |
# Background segment before the speech | |
if current_time < segment.start: | |
bg = full_audio[int(current_time * 1000):int(segment.start * 1000)] | |
result_audio += bg | |
# Add silence for the speech duration | |
silence_duration = segment.end - segment.start | |
result_audio += AudioSegment.silent(duration=int(silence_duration * 1000)) | |
current_time = segment.end | |
# Handle any remaining background after the last speech | |
if current_time < full_duration_sec: | |
result_audio += full_audio[int(current_time * 1000):] | |
result_audio.export(background_audio_path, format="wav") | |
return background_audio_path | |
def transcribe_video_with_speakers(video_path): | |
# Extract audio from video | |
video = VideoFileClip(video_path) | |
audio_path = "audio.wav" | |
video.audio.write_audiofile(audio_path) | |
logger.info(f"Audio extracted from video: {audio_path}") | |
segment_result = segment_background_audio(audio_path) | |
print(f"Saved non-speech (background) audio to local") | |
# Set up device | |
device = "cuda" if torch.cuda.is_available() else "cpu" | |
logger.info(f"Using device: {device}") | |
try: | |
# Load a medium model with float32 for broader compatibility | |
model = whisperx.load_model("large-v3", device=device, compute_type="float32") | |
logger.info("WhisperX model loaded") | |
# Transcribe | |
result = model.transcribe(audio_path, chunk_size=6, print_progress = True) | |
logger.info("Audio transcription completed") | |
# Get the detected language | |
detected_language = result["language"] | |
logger.debug(f"Detected language: {detected_language}") | |
# Alignment | |
model_a, metadata = whisperx.load_align_model(language_code=result["language"], device=device) | |
result = whisperx.align(result["segments"], model_a, metadata, audio_path, device) | |
logger.info("Transcription alignment completed") | |
# Diarization (works independently of Whisper model size) | |
diarize_model = whisperx.DiarizationPipeline(use_auth_token=hf_api_key, device=device) | |
diarize_segments = diarize_model(audio_path) | |
logger.info("Speaker diarization completed") | |
# Assign speakers | |
result = whisperx.assign_word_speakers(diarize_segments, result) | |
logger.info("Speakers assigned to transcribed segments") | |
except Exception as e: | |
logger.error(f"❌ WhisperX pipeline failed: {e}") | |
# Extract timestamps, text, and speaker IDs | |
transcript_with_speakers = [ | |
{ | |
"start": segment["start"], | |
"end": segment["end"], | |
"text": segment["text"], | |
"speaker": segment["speaker"] | |
} | |
for segment in result["segments"] | |
] | |
# Collect audio for each speaker | |
speaker_audio = {} | |
for segment in result["segments"]: | |
speaker = segment["speaker"] | |
if speaker not in speaker_audio: | |
speaker_audio[speaker] = [] | |
speaker_audio[speaker].append((segment["start"], segment["end"])) | |
# Collapse and truncate speaker audio | |
speaker_sample_paths = {} | |
audio_clip = AudioFileClip(audio_path) | |
for speaker, segments in speaker_audio.items(): | |
speaker_clips = [audio_clip.subclip(start, end) for start, end in segments] | |
combined_clip = concatenate_audioclips(speaker_clips) | |
truncated_clip = combined_clip.subclip(0, min(30, combined_clip.duration)) | |
sample_path = f"speaker_{speaker}_sample.wav" | |
truncated_clip.write_audiofile(sample_path) | |
speaker_sample_paths[speaker] = sample_path | |
logger.info(f"Created sample for {speaker}: {sample_path}") | |
# Clean up | |
video.close() | |
audio_clip.close() | |
os.remove(audio_path) | |
return transcript_with_speakers, detected_language | |
# Function to get the appropriate translation model based on target language | |
def get_translation_model(source_language, target_language): | |
""" | |
Get the translation model based on the source and target language. | |
Parameters: | |
- target_language (str): The language to translate the content into (e.g., 'es', 'fr'). | |
- source_language (str): The language of the input content (default is 'en' for English). | |
Returns: | |
- str: The translation model identifier. | |
""" | |
# List of allowable languages | |
allowable_languages = ["en", "es", "fr", "zh", "de", "it", "pt", "ja", "ko", "ru"] | |
# Validate source and target languages | |
if source_language not in allowable_languages: | |
logger.debug(f"Invalid source language '{source_language}'. Supported languages are: {', '.join(allowable_languages)}") | |
# Return a default model if source language is invalid | |
source_language = "en" # Default to 'en' | |
if target_language not in allowable_languages: | |
logger.debug(f"Invalid target language '{target_language}'. Supported languages are: {', '.join(allowable_languages)}") | |
# Return a default model if target language is invalid | |
target_language = "zh" # Default to 'zh' | |
if source_language == target_language: | |
source_language = "en" # Default to 'en' | |
target_language = "zh" # Default to 'zh' | |
# Return the model using string concatenation | |
return f"Helsinki-NLP/opus-mt-{source_language}-{target_language}" | |
def translate_single_entry(entry, translator): | |
original_text = entry["text"] | |
translated_text = translator(original_text)[0]['translation_text'] | |
return { | |
"start": entry["start"], | |
"original": original_text, | |
"translated": translated_text, | |
"end": entry["end"], | |
"speaker": entry["speaker"] | |
} | |
def translate_text(transcription_json, source_language, target_language): | |
# Load the translation model for the specified target language | |
translation_model_id = get_translation_model(source_language, target_language) | |
logger.debug(f"Translation model: {translation_model_id}") | |
translator = pipeline("translation", model=translation_model_id) | |
# Use ThreadPoolExecutor to parallelize translations | |
with concurrent.futures.ThreadPoolExecutor() as executor: | |
# Submit all translation tasks and collect results | |
translate_func = lambda entry: translate_single_entry(entry, translator) | |
translated_json = list(executor.map(translate_func, transcription_json)) | |
# Sort the translated_json by start time | |
translated_json.sort(key=lambda x: x["start"]) | |
# Log the components being added to translated_json | |
for entry in translated_json: | |
logger.debug("Added to translated_json: start=%s, original=%s, translated=%s, end=%s, speaker=%s", | |
entry["start"], entry["original"], entry["translated"], entry["end"], entry["speaker"]) | |
return translated_json | |
def update_translations(file, edited_table, process_mode): | |
""" | |
Update the translations based on user edits in the Gradio Dataframe. | |
""" | |
output_video_path = "output_video.mp4" | |
logger.debug(f"Editable Table: {edited_table}") | |
if file is None: | |
logger.info("No file uploaded. Please upload a video/audio file.") | |
return None, [], None, "No file uploaded. Please upload a video/audio file." | |
try: | |
start_time = time.time() # Start the timer | |
# Convert the edited_table (list of lists) back to list of dictionaries | |
updated_translations = [ | |
{ | |
"start": row["start"], # Access by column name | |
"original": row["original"], | |
"translated": row["translated"], | |
"end": row["end"] | |
} | |
for _, row in edited_table.iterrows() | |
] | |
# Call the function to process the video with updated translations | |
add_transcript_voiceover(file.name, updated_translations, output_video_path, process_mode) | |
# Calculate elapsed time | |
elapsed_time = time.time() - start_time | |
elapsed_time_display = f"Updates applied successfully in {elapsed_time:.2f} seconds." | |
return output_video_path, elapsed_time_display | |
except Exception as e: | |
raise ValueError(f"Error updating translations: {e}") | |
def create_subtitle_clip_pil(text, start_time, end_time, video_width, video_height, font_path): | |
try: | |
subtitle_width = int(video_width * 0.8) | |
aspect_ratio = video_height / video_width | |
if aspect_ratio > 1.2: # Portrait video | |
subtitle_font_size = int(video_width // 22) | |
else: # Landscape video | |
subtitle_font_size = int(video_height // 24) | |
font = ImageFont.truetype(font_path, subtitle_font_size) | |
dummy_img = Image.new("RGBA", (subtitle_width, 1), (0, 0, 0, 0)) | |
draw = ImageDraw.Draw(dummy_img) | |
lines = [] | |
line = "" | |
for word in text.split(): | |
test_line = f"{line} {word}".strip() | |
bbox = draw.textbbox((0, 0), test_line, font=font) | |
w = bbox[2] - bbox[0] | |
if w <= subtitle_width - 10: | |
line = test_line | |
else: | |
lines.append(line) | |
line = word | |
lines.append(line) | |
line_heights = [draw.textbbox((0, 0), l, font=font)[3] - draw.textbbox((0, 0), l, font=font)[1] for l in lines] | |
total_height = sum(line_heights) + (len(lines) - 1) * 5 | |
img = Image.new("RGBA", (subtitle_width, total_height), (0, 0, 0, 0)) | |
draw = ImageDraw.Draw(img) | |
y = 0 | |
for idx, line in enumerate(lines): | |
bbox = draw.textbbox((0, 0), line, font=font) | |
w = bbox[2] - bbox[0] | |
draw.text(((subtitle_width - w) // 2, y), line, font=font, fill="yellow") | |
y += line_heights[idx] + 5 | |
img_np = np.array(img) # <- ✅ Fix: convert to NumPy | |
txt_clip = ImageClip(img_np).set_start(start_time).set_duration(end_time - start_time).set_position("bottom").set_opacity(0.8) | |
return txt_clip | |
except Exception as e: | |
logger.error(f"\u274c Failed to create subtitle clip: {e}") | |
return None | |
def solve_optimal_alignment(original_segments, generated_durations, total_duration): | |
""" | |
Aligns speech segments using quadratic programming. If optimization fails, | |
applies greedy fallback: center shorter segments, stretch longer ones. | |
""" | |
N = len(original_segments) | |
d = np.array(generated_durations) | |
m = np.array([(seg['start'] + seg['end']) / 2 for seg in original_segments]) | |
try: | |
s = cp.Variable(N) | |
objective = cp.Minimize(cp.sum_squares(s + d / 2 - m)) | |
constraints = [s[0] >= 0] | |
for i in range(N - 1): | |
constraints.append(s[i] + d[i] <= s[i + 1]) | |
constraints.append(s[N - 1] + d[N - 1] == total_duration) | |
problem = cp.Problem(objective, constraints) | |
problem.solve() | |
if s.value is None: | |
raise ValueError("Solver failed") | |
for i in range(N): | |
original_segments[i]['start'] = round(s.value[i], 3) | |
original_segments[i]['end'] = round(s.value[i] + d[i], 3) | |
except Exception as e: | |
print(f"⚠️ Optimization failed: {e}, falling back to greedy alignment.") | |
for i in range(N): | |
orig_start = original_segments[i]['start'] | |
orig_end = original_segments[i]['end'] | |
orig_mid = (orig_start + orig_end) / 2 | |
gen_duration = generated_durations[i] | |
orig_duration = orig_end - orig_start | |
if gen_duration <= orig_duration: | |
new_start = orig_mid - gen_duration / 2 | |
new_end = orig_mid + gen_duration / 2 | |
else: | |
extra = (gen_duration - orig_duration) / 2 | |
new_start = orig_start - extra | |
new_end = orig_end + extra | |
# Prevent overlap with previous | |
if i > 0: | |
prev_end = original_segments[i - 1]['end'] | |
new_start = max(new_start, prev_end + 0.01) | |
# Prevent overlap with next | |
if i < N - 1: | |
next_start = original_segments[i + 1]['start'] | |
new_end = min(new_end, next_start - 0.01) | |
if new_end <= new_start: | |
new_start = orig_start | |
new_end = orig_start + gen_duration | |
original_segments[i]['start'] = round(new_start, 3) | |
original_segments[i]['end'] = round(new_end, 3) | |
return original_segments | |
def get_frame_image_bytes(video, t): | |
frame = video.get_frame(t) | |
img = Image.fromarray(frame) | |
buf = io.BytesIO() | |
img.save(buf, format='JPEG') | |
return buf.getvalue() | |
def post_edit_segment(entry, image_bytes): | |
try: | |
system_prompt = """You are a multilingual assistant helping polish subtitles and voiceover content. | |
Your job is to fix punctuation, validate meaning, improve tone, and ensure the translation matches the speaker's intended message.""" | |
user_prompt = f""" | |
Original (source) transcript: {entry.get("original", "")} | |
Translated version: {entry.get("translated", "")} | |
Speaker ID: {entry.get("speaker", "")} | |
Time: {entry.get("start")} - {entry.get("end")} | |
Please: | |
1. Add correct punctuation and sentence boundaries. | |
2. Improve fluency and tone of the translated text. | |
3. Ensure the meaning is preserved from the original. | |
4. Use the attached image frame to infer emotion or setting. | |
Return the revised original and translated texts in the following format: | |
Original: <edited original> | |
Translated: <edited translation> | |
""" | |
response = ChatCompletion.create( | |
model="gpt-4o", | |
messages=[ | |
{"role": "system", "content": system_prompt}, | |
{"role": "user", "content": user_prompt, "image": image_bytes} | |
] | |
) | |
output = response.choices[0].message.content.strip() | |
lines = output.splitlines() | |
for line in lines: | |
if line.startswith("Original:"): | |
entry['original'] = line[len("Original:"):].strip() | |
elif line.startswith("Translated:"): | |
entry['translated'] = line[len("Translated:"):].strip() | |
return entry | |
except Exception as e: | |
print(f"Post-editing failed for segment: {e}") | |
return entry | |
def post_edit_translated_segments(translated_json, video_path): | |
video = VideoFileClip(video_path) | |
def process(entry): | |
mid_time = (entry['start'] + entry['end']) / 2 | |
image_bytes = get_frame_image_bytes(video, mid_time) | |
entry = post_edit_segment(entry, image_bytes) | |
return entry | |
with concurrent.futures.ThreadPoolExecutor() as executor: | |
edited = list(executor.map(process, translated_json)) | |
video.close() | |
return edited | |
def process_entry(entry, i, tts_model, video_width, video_height, process_mode, target_language, font_path, speaker_sample_paths=None): | |
logger.debug(f"Processing entry {i}: {entry}") | |
error_message = None | |
try: | |
txt_clip = create_subtitle_clip_pil(entry["translated"], entry["start"], entry["end"], video_width, video_height, font_path) | |
except Exception as e: | |
error_message = f"❌ Failed to create subtitle clip for entry {i}: {e}" | |
logger.error(error_message) | |
txt_clip = None | |
audio_segment = None | |
actual_duration = 0.0 | |
if process_mode > 1: | |
try: | |
segment_audio_path = f"segment_{i}_voiceover.wav" | |
desired_duration = entry["end"] - entry["start"] | |
desired_speed = calibrated_speed(entry['translated'], desired_duration) | |
speaker = entry.get("speaker", "default") | |
speaker_wav_path = f"speaker_{speaker}_sample.wav" | |
supported_languages = tts_model.synthesizer.tts_model.language_manager.name_to_id.keys() | |
if process_mode > 2 and speaker_wav_path and os.path.exists(speaker_wav_path) and target_language in supported_languages: | |
generate_voiceover_clone(entry['translated'], tts_model, desired_speed, target_language, speaker_wav_path, segment_audio_path) | |
else: | |
generate_voiceover_OpenAI(entry['translated'], target_language, desired_speed, segment_audio_path) | |
if not segment_audio_path or not os.path.exists(segment_audio_path): | |
raise FileNotFoundError(f"Voiceover file not generated at: {segment_audio_path}") | |
audio_clip = AudioFileClip(segment_audio_path) | |
actual_duration = audio_clip.duration | |
audio_segment = audio_clip # Do not set start here, alignment happens later | |
except Exception as e: | |
err = f"❌ Failed to generate audio segment for entry {i}: {e}" | |
logger.error(err) | |
error_message = error_message + " | " + err if error_message else err | |
audio_segment = None | |
return i, txt_clip, audio_segment, actual_duration, error_message | |
def add_transcript_voiceover(video_path, translated_json, output_path, process_mode, target_language="en", speaker_sample_paths=None, background_audio_path="background_segments.wav"): | |
video = VideoFileClip(video_path) | |
font_path = "./NotoSansSC-Regular.ttf" | |
text_clips = [] | |
audio_segments = [] | |
actual_durations = [] | |
error_messages = [] | |
if process_mode == 3: | |
global tts_model | |
if tts_model is None: | |
try: | |
print("🔄 Loading XTTS model...") | |
from TTS.api import TTS | |
tts_model = TTS(model_name="tts_models/multilingual/multi-dataset/your_tts") | |
print("✅ XTTS model loaded successfully.") | |
except Exception as e: | |
print("❌ Error loading XTTS model:") | |
traceback.print_exc() | |
return f"Error loading XTTS model: {e}" | |
with concurrent.futures.ThreadPoolExecutor() as executor: | |
futures = [executor.submit(process_entry, entry, i, tts_model, video.w, video.h, process_mode, target_language, font_path, speaker_sample_paths) | |
for i, entry in enumerate(translated_json)] | |
results = [] | |
for future in concurrent.futures.as_completed(futures): | |
try: | |
i, txt_clip, audio_segment, actual_duration, error = future.result() | |
results.append((i, txt_clip, audio_segment, actual_duration)) | |
if error: | |
error_messages.append(f"[Entry {i}] {error}") | |
except Exception as e: | |
err = f"❌ Unexpected error in future result: {e}" | |
error_messages.append(err) | |
results.sort(key=lambda x: x[0]) | |
text_clips = [clip for _, clip, _, _ in results if clip] | |
generated_durations = [dur for _, _, _, dur in results if dur > 0] | |
# Align using optimization (modifies translated_json in-place) | |
translated_json = solve_optimal_alignment(translated_json, generated_durations, video.duration) | |
# Set aligned timings | |
audio_segments = [] | |
for i, entry in enumerate(translated_json): | |
segment = results[i][2] # AudioFileClip | |
if segment: | |
segment = segment.set_start(entry['start']).set_duration(entry['end'] - entry['start']) | |
audio_segments.append(segment) | |
final_video = CompositeVideoClip([video] + text_clips) | |
if process_mode > 1 and audio_segments: | |
try: | |
voice_audio = CompositeAudioClip(audio_segments).set_duration(video.duration) | |
if background_audio_path and os.path.exists(background_audio_path): | |
background_audio = AudioFileClip(background_audio_path).set_duration(video.duration) | |
final_audio = CompositeAudioClip([voice_audio, background_audio]) | |
else: | |
final_audio = voice_audio | |
final_video = final_video.set_audio(final_audio) | |
except Exception as e: | |
print(f"❌ Failed to set audio: {e}") | |
final_video.write_videofile(output_path, codec="libx264", audio_codec="aac") | |
return error_messages | |
def generate_voiceover_OpenAI(full_text, language, desired_speed, output_audio_path): | |
""" | |
Generate voiceover from translated text for a given language using OpenAI TTS API. | |
""" | |
# Define the voice based on the language (for now, use 'alloy' as default) | |
voice = "alloy" # Adjust based on language if needed | |
# Define the model (use tts-1 for real-time applications) | |
model = "tts-1" | |
max_retries = 3 | |
retry_count = 0 | |
while retry_count < max_retries: | |
try: | |
# Create the speech using OpenAI TTS API | |
response = client.audio.speech.create( | |
model=model, | |
voice=voice, | |
input=full_text, | |
speed=desired_speed | |
) | |
# Save the audio to the specified path | |
with open(output_audio_path, 'wb') as f: | |
for chunk in response.iter_bytes(): | |
f.write(chunk) | |
logging.info(f"Voiceover generated successfully for {output_audio_path}") | |
break | |
except Exception as e: | |
retry_count += 1 | |
logging.error(f"Error generating voiceover (retry {retry_count}/{max_retries}): {e}") | |
time.sleep(5) # Wait 5 seconds before retrying | |
if retry_count == max_retries: | |
raise ValueError(f"Failed to generate voiceover after {max_retries} retries.") | |
def generate_voiceover_clone(full_text, tts_model, desired_speed, target_language, speaker_wav_path, output_audio_path): | |
try: | |
tts_model.tts_to_file( | |
text=full_text, | |
speaker_wav=speaker_wav_path, | |
language=target_language, | |
file_path=output_audio_path, | |
speed=desired_speed, | |
split_sentences=True | |
) | |
msg = "✅ Voice cloning completed successfully." | |
logger.info(msg) | |
return output_audio_path, msg, None | |
except Exception as e: | |
generate_voiceover_OpenAI(full_text, target_language, desired_speed, output_audio_path) | |
err_msg = f"❌ An error occurred: {str(e)}, fallback to premium voice" | |
logger.error(traceback.format_exc()) | |
return None, err_msg, err_msg | |
def calibrated_speed(text, desired_duration): | |
""" | |
Compute a speed factor to help TTS fit audio into desired duration, | |
using a simple truncated linear function of characters per second. | |
""" | |
char_count = len(text.strip()) | |
if char_count == 0 or desired_duration <= 0: | |
return 1.0 # fallback | |
cps = char_count / desired_duration # characters per second | |
# Truncated linear mapping | |
if cps < 14: | |
return 1.0 | |
elif cps > 30: | |
return 2 | |
else: | |
slope = (2 - 1.0) / (30 - 14) | |
return 1.0 + slope * (cps - 14) | |
def upload_and_manage(file, target_language, process_mode): | |
if file is None: | |
logger.info("No file uploaded. Please upload a video/audio file.") | |
return None, [], None, "No file uploaded. Please upload a video/audio file." | |
try: | |
start_time = time.time() # Start the timer | |
logger.info(f"Started processing file: {file.name}") | |
# Define paths for audio and output files | |
audio_path = "audio.wav" | |
output_video_path = "output_video.mp4" | |
voiceover_path = "voiceover.wav" | |
logger.info(f"Using audio path: {audio_path}, output video path: {output_video_path}, voiceover path: {voiceover_path}") | |
# Step 1: Transcribe audio from uploaded media file and get timestamps | |
logger.info("Transcribing audio...") | |
transcription_json, source_language = transcribe_video_with_speakers(file.name) | |
logger.info(f"Transcription completed. Detected source language: {source_language}") | |
# Step 2: Translate the transcription | |
logger.info(f"Translating transcription from {source_language} to {target_language}...") | |
translated_json = translate_text(transcription_json, source_language, target_language) | |
logger.info(f"Translation completed. Number of translated segments: {len(translated_json)}") | |
# translated_json = post_edit_translated_segments(translated_json, file.name) | |
# Step 3: Add transcript to video based on timestamps | |
logger.info("Adding translated transcript to video...") | |
add_transcript_voiceover(file.name, translated_json, output_video_path, process_mode, target_language) | |
logger.info(f"Transcript added to video. Output video saved at {output_video_path}") | |
# Convert translated JSON into a format for the editable table | |
logger.info("Converting translated JSON into editable table format...") | |
editable_table = [ | |
[float(entry["start"]), entry["original"], entry["translated"], float(entry["end"]), entry["speaker"]] | |
for entry in translated_json | |
] | |
# Calculate elapsed time | |
elapsed_time = time.time() - start_time | |
elapsed_time_display = f"Processing completed in {elapsed_time:.2f} seconds." | |
logger.info(f"Processing completed in {elapsed_time:.2f} seconds.") | |
return editable_table, output_video_path, elapsed_time_display | |
except Exception as e: | |
logger.error(f"An error occurred: {str(e)}") | |
return [], None, f"An error occurred: {str(e)}" | |
# Gradio Interface with Tabs | |
def build_interface(): | |
with gr.Blocks(css=css) as demo: | |
gr.Markdown("## Video Localization") | |
with gr.Row(): | |
with gr.Column(scale=4): | |
file_input = gr.File(label="Upload Video/Audio File") | |
language_input = gr.Dropdown(["en", "es", "fr", "zh"], label="Select Language") # Language codes | |
process_mode = gr.Radio(choices=[("Transcription Only", 1),("Transcription with Premium Voice",2),("Transcription with Voice Clone", 3)],label="Choose Processing Type",value=1) | |
submit_button = gr.Button("Post and Process") | |
with gr.Column(scale=8): | |
gr.Markdown("## Edit Translations") | |
# Editable JSON Data | |
editable_table = gr.Dataframe( | |
value=[], # Default to an empty list to avoid undefined values | |
headers=["start", "original", "translated", "end", "speaker"], | |
datatype=["number", "str", "str", "number", "str"], | |
row_count=1, # Initially empty | |
col_count=5, | |
interactive=[False, True, True, False, False], # Control editability | |
label="Edit Translations", | |
wrap=True # Enables text wrapping if supported | |
) | |
save_changes_button = gr.Button("Save Changes") | |
processed_video_output = gr.File(label="Download Processed Video", interactive=True) # Download button | |
elapsed_time_display = gr.Textbox(label="Elapsed Time", lines=1, interactive=False) | |
with gr.Column(scale=1): | |
gr.Markdown("**Feedback**") | |
feedback_input = gr.Textbox( | |
placeholder="Leave your feedback here...", | |
label=None, | |
lines=3, | |
) | |
feedback_btn = gr.Button("Submit Feedback") | |
response_message = gr.Textbox(label=None, lines=1, interactive=False) | |
db_download = gr.File(label="Download Database File", visible=False) | |
# Link the feedback handling | |
def feedback_submission(feedback): | |
message, file_path = handle_feedback(feedback) | |
if file_path: | |
return message, gr.update(value=file_path, visible=True) | |
return message, gr.update(visible=False) | |
save_changes_button.click( | |
update_translations, | |
inputs=[file_input, editable_table, process_mode], | |
outputs=[processed_video_output, elapsed_time_display] | |
) | |
submit_button.click( | |
upload_and_manage, | |
inputs=[file_input, language_input, process_mode], | |
outputs=[editable_table, processed_video_output, elapsed_time_display] | |
) | |
# Connect submit button to save_feedback_db function | |
feedback_btn.click( | |
feedback_submission, | |
inputs=[feedback_input], | |
outputs=[response_message, db_download] | |
) | |
return demo | |
tts_model = None | |
# Launch the Gradio interface | |
demo = build_interface() | |
demo.launch() |