Studio_V0 / app.py
qqwjq1981's picture
Update app.py
99fac43 verified
raw
history blame
16.7 kB
import concurrent.futures
import gradio as gr
from datetime import datetime
import random
import moviepy
from transformers import pipeline
from transformers.pipelines.audio_utils import ffmpeg_read
from moviepy import (
ImageClip,
VideoFileClip,
TextClip,
CompositeVideoClip,
AudioFileClip,
concatenate_videoclips
)
from gtts import gTTS
import subprocess
import speech_recognition as sr
import json
from nltk.tokenize import sent_tokenize
import logging
from textblob import TextBlob
import whisper
import time
import sqlite3
# Define the passcode
PASSCODE = "show_feedback_db"
css = """
/* Adjust row height */
.dataframe-container tr {
height: 50px !important;
}
/* Ensure text wrapping and prevent overflow */
.dataframe-container td {
white-space: normal !important;
word-break: break-word !important;
}
/* Set column widths */
[data-testid="block-container"] .scrolling-dataframe th:nth-child(1),
[data-testid="block-container"] .scrolling-dataframe td:nth-child(1) {
width: 6%; /* Start column */
}
[data-testid="block-container"] .scrolling-dataframe th:nth-child(2),
[data-testid="block-container"] .scrolling-dataframe td:nth-child(2) {
width: 47%; /* Original text */
}
[data-testid="block-container"] .scrolling-dataframe th:nth-child(3),
[data-testid="block-container"] .scrolling-dataframe td:nth-child(3) {
width: 47%; /* Translated text */
}
[data-testid="block-container"] .scrolling-dataframe th:nth-child(4),
[data-testid="block-container"] .scrolling-dataframe td:nth-child(4) {
display: none !important;
}
"""
# Function to save feedback or provide access to the database file
def handle_feedback(feedback):
feedback = feedback.strip() # Clean up leading/trailing whitespace
if not feedback:
return "Feedback cannot be empty.", None
if feedback == PASSCODE:
# Provide access to the feedback.db file
return "Access granted! Download the database file below.", "feedback.db"
else:
# Save feedback to the database
with sqlite3.connect("feedback.db") as conn:
cursor = conn.cursor()
cursor.execute("CREATE TABLE IF NOT EXISTS studio_feedback (id INTEGER PRIMARY KEY, comment TEXT)")
cursor.execute("INSERT INTO studio_feedback (comment) VALUES (?)", (feedback,))
conn.commit()
return "Thank you for your feedback!", None
# Configure logging
logging.basicConfig(level=logging.DEBUG, format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger(__name__)
logger.info(f"MoviePy Version: {moviepy.__version__}")
def list_available_fonts():
try:
# Run the 'fc-list' command to list fonts
result = subprocess.run(
["fc-list", "--format", "%{file}\\n"],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
check=True
)
fonts = result.stdout.splitlines()
logger.debug(f"Available fonts:\n{fonts}")
return fonts
except subprocess.CalledProcessError as e:
logger.error(f"Error while listing fonts: {e.stderr}")
return []
def split_into_sentences(text):
blob = TextBlob(text)
return [str(sentence) for sentence in blob.sentences]
def transcribe_video(video_path):
# Load the video file and extract audio
video = VideoFileClip(video_path)
audio_path = "audio.wav"
video.audio.write_audiofile(audio_path)
# Load Whisper model
model = whisper.load_model("base") # Options: tiny, base, small, medium, large
# Transcribe with Whisper
result = model.transcribe(audio_path, word_timestamps=True)
# Extract timestamps and text
transcript_with_timestamps = [
{
"start": segment["start"],
"end": segment["end"],
"text": segment["text"]
}
for segment in result["segments"]
]
# Get the detected language
detected_language = result["language"]
logger.debug(f"Detected language:\n{detected_language}")
return transcript_with_timestamps, detected_language
# Function to get the appropriate translation model based on target language
def get_translation_model(source_language, target_language):
"""
Get the translation model based on the source and target language.
Parameters:
- target_language (str): The language to translate the content into (e.g., 'es', 'fr').
- source_language (str): The language of the input content (default is 'en' for English).
Returns:
- str: The translation model identifier.
"""
# List of allowable languages
allowable_languages = ["en", "es", "fr", "zh", "de", "it", "pt", "ja", "ko", "ru"]
# Validate source and target languages
if source_language not in allowable_languages:
logger.debug(f"Invalid source language '{source_language}'. Supported languages are: {', '.join(allowable_languages)}")
# Return a default model if source language is invalid
source_language = "en" # Default to 'en'
if target_language not in allowable_languages:
logger.debug(f"Invalid target language '{target_language}'. Supported languages are: {', '.join(allowable_languages)}")
# Return a default model if target language is invalid
target_language = "zh" # Default to 'zh'
if source_language == target_language:
source_language = "en" # Default to 'en'
target_language = "zh" # Default to 'zh'
# Return the model using string concatenation
return f"Helsinki-NLP/opus-mt-{source_language}-{target_language}"
def translate_text(transcription_json, source_language, target_language):
# Load the translation model for the specified target language
translation_model_id = get_translation_model(source_language, target_language)
logger.debug(f"Translation model: {translation_model_id}")
translator = pipeline("translation", model=translation_model_id)
# Prepare output structure
translated_json = []
# Translate each sentence and store it with its start time
for entry in transcription_json:
original_text = entry["text"]
translated_text = translator(original_text)[0]['translation_text']
translated_json.append({
"start": entry["start"],
"original": original_text,
"translated": translated_text,
"end": entry["end"]
})
# Log the components being added to translated_json
logger.debug("Adding to translated_json: start=%s, original=%s, translated=%s, end=%s",
entry["start"], original_text, translated_text, entry["end"])
# Return the translated timestamps as a JSON string
return translated_json
def update_translations(file, edited_table):
"""
Update the translations based on user edits in the Gradio Dataframe.
"""
output_video_path = "output_video.mp4"
logger.debug(f"Editable Table: {edited_table}")
try:
start_time = time.time() # Start the timer
# Convert the edited_table (list of lists) back to list of dictionaries
updated_translations = [
{
"start": row["start"], # Access by column name
"original": row["original"],
"translated": row["translated"],
"end": row["end"]
}
for _, row in edited_table.iterrows()
]
# Call the function to process the video with updated translations
add_transcript_voiceover(file.name, updated_translations, output_video_path)
# Calculate elapsed time
elapsed_time = time.time() - start_time
elapsed_time_display = f"Updates applied successfully in {elapsed_time:.2f} seconds."
return output_video_path, elapsed_time_display
except Exception as e:
raise ValueError(f"Error updating translations: {e}")
def process_entry(entry, i, add_voiceover, target_language):
logger.debug(f"Processing entry {i}: {entry}")
# Create text clip for subtitles
txt_clip = TextClip(
text=entry["translated"],
font="./NotoSansSC-Regular.ttf",
method='caption',
color='yellow',
stroke_color='black', # Border color
stroke_width=2, # Border thickness
font_size=int(video.h // 20),
size=(int(video.w * 0.8), None)
).with_start(entry["start"]).with_duration(entry["end"] - entry["start"]).with_position(('bottom')).with_opacity(0.8)
audio_segment = None
if add_voiceover:
segment_audio_path = f"segment_{i}_voiceover.wav"
generate_voiceover([entry], target_language, segment_audio_path)
audio_segment = AudioFileClip(segment_audio_path).set_duration(entry["end"] - entry["start"]) # No subclip here
return txt_clip, audio_segment
def add_transcript_voiceover(video_path, translated_json, output_path, add_voiceover=False, target_language="en"):
"""
Add transcript and voiceover to a video, segment by segment.
"""
video = VideoFileClip(video_path)
font_path = "./NotoSansSC-Regular.ttf"
text_clips = []
audio_segments = []
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = [executor.submit(process_entry, entry, i, video.w, video.h, font_path, add_voiceover, target_language)
for i, entry in enumerate(translated_json)]
for future in concurrent.futures.as_completed(futures):
try:
txt_clip, audio_segment = future.result()
text_clips.append(txt_clip)
if add_voiceover and audio_segment:
audio_segments.append(audio_segment)
except Exception as e:
logger.error(f"Error processing entry: {e}")
# Sort text clips and audio segments based on their start times
text_clips.sort(key=lambda clip: clip.start)
final_video = CompositeVideoClip([video] + text_clips)
if add_voiceover and audio_segments:
audio_segments.sort(key=lambda segment: segment.start)
final_audio = concatenate_audioclips(audio_segments)
final_audio = final_audio.set_duration(video.duration)
final_video = final_video.set_audio(final_audio)
logger.info(f"Saving the final video to: {output_path}")
final_video.write_videofile(output_path, codec="libx264", audio_codec="aac")
logger.info("Video processing completed successfully.")
def generate_voiceover(translated_json, language, output_audio_path):
"""
Generate voiceover from translated text for a given language.
"""
# Concatenate translated text into a single string
full_text = " ".join(entry["translated"] for entry in translated_json)
# Generate speech and save to file
tts = gTTS(text=full_text, lang=language)
tts.save(output_audio_path)
def upload_and_manage(file, target_language, mode="transcription"):
if file is None:
logger.info("No file uploaded. Please upload a video/audio file.")
return None, [], None, "No file uploaded. Please upload a video/audio file."
try:
start_time = time.time() # Start the timer
logger.info(f"Started processing file: {file.name}")
# Define paths for audio and output files
audio_path = "audio.wav"
output_video_path = "output_video.mp4"
voiceover_path = "voiceover.wav"
logger.info(f"Using audio path: {audio_path}, output video path: {output_video_path}, voiceover path: {voiceover_path}")
list_available_fonts()
# Step 1: Transcribe audio from uploaded media file and get timestamps
logger.info("Transcribing audio...")
transcription_json, source_language = transcribe_video(file.name)
logger.info(f"Transcription completed. Detected source language: {source_language}")
# Step 2: Translate the transcription
logger.info(f"Translating transcription from {source_language} to {target_language}...")
translated_json = translate_text(transcription_json, source_language, target_language)
logger.info(f"Translation completed. Number of translated segments: {len(translated_json)}")
# Step 3: Add transcript to video based on timestamps
logger.info("Adding translated transcript to video...")
add_transcript_voiceover(file.name, translated_json, output_video_path, mode == "Transcription with Voiceover", target_language)
logger.info(f"Transcript added to video. Output video saved at {output_video_path}")
# Convert translated JSON into a format for the editable table
logger.info("Converting translated JSON into editable table format...")
editable_table = [
[float(entry["start"]), entry["original"], entry["translated"], float(entry["end"])]
for entry in translated_json
]
# Calculate elapsed time
elapsed_time = time.time() - start_time
elapsed_time_display = f"Processing completed in {elapsed_time:.2f} seconds."
logger.info(f"Processing completed in {elapsed_time:.2f} seconds.")
return translated_json, editable_table, output_video_path, elapsed_time_display
except Exception as e:
logger.error(f"An error occurred: {str(e)}")
return None, [], None, f"An error occurred: {str(e)}"
# Gradio Interface with Tabs
def build_interface():
with gr.Blocks(css=css) as demo:
gr.Markdown("## Video Localization")
with gr.Row():
with gr.Column(scale=4):
file_input = gr.File(label="Upload Video/Audio File")
language_input = gr.Dropdown(["en", "es", "fr", "zh"], label="Select Language") # Language codes
process_mode = gr.Radio(choices=["Transcription", "Transcription with Voiceover"], label="Choose Processing Type", value="Transcription")
submit_button = gr.Button("Post and Process")
editable_translations = gr.State(value=[])
with gr.Column(scale=8):
gr.Markdown("## Edit Translations")
# Editable JSON Data
editable_table = gr.Dataframe(
value=[], # Default to an empty list to avoid undefined values
headers=["start", "original", "translated", "end"],
datatype=["number", "str", "str", "number"],
row_count=1, # Initially empty
col_count=4,
interactive=[False, True, True, False], # Control editability
label="Edit Translations",
wrap=True # Enables text wrapping if supported
)
save_changes_button = gr.Button("Save Changes")
processed_video_output = gr.File(label="Download Processed Video", interactive=True) # Download button
elapsed_time_display = gr.Textbox(label="Elapsed Time", lines=1, interactive=False)
with gr.Column(scale=1):
gr.Markdown("**Feedback**")
feedback_input = gr.Textbox(
placeholder="Leave your feedback here...",
label=None,
lines=3,
)
feedback_btn = gr.Button("Submit Feedback")
response_message = gr.Textbox(label=None, lines=1, interactive=False)
db_download = gr.File(label="Download Database File", visible=False)
# Link the feedback handling
def feedback_submission(feedback):
message, file_path = handle_feedback(feedback)
if file_path:
return message, gr.update(value=file_path, visible=True)
return message, gr.update(visible=False)
save_changes_button.click(
update_translations,
inputs=[file_input, editable_table],
outputs=[processed_video_output, elapsed_time_display]
)
submit_button.click(
upload_and_manage,
inputs=[file_input, language_input, process_mode],
outputs=[editable_translations, editable_table, processed_video_output, elapsed_time_display]
)
# Connect submit button to save_feedback_db function
feedback_btn.click(
feedback_submission,
inputs=[feedback_input],
outputs=[response_message, db_download]
)
return demo
# Launch the Gradio interface
demo = build_interface()
demo.launch()