|
import gradio as gr |
|
from datetime import datetime |
|
import random |
|
from transformers import pipeline |
|
from transformers.pipelines.audio_utils import ffmpeg_read |
|
from moviepy import ( |
|
ImageClip, |
|
VideoFileClip, |
|
TextClip, |
|
CompositeVideoClip, |
|
AudioFileClip, |
|
concatenate_videoclips |
|
) |
|
import subprocess |
|
import speech_recognition as sr |
|
import json |
|
from nltk.tokenize import sent_tokenize |
|
import logging |
|
from textblob import TextBlob |
|
import whisper |
|
|
|
import sqlite3 |
|
|
|
|
|
PASSCODE = "show_feedback_db" |
|
|
|
|
|
def handle_feedback(feedback): |
|
feedback = feedback.strip() |
|
if not feedback: |
|
return "Feedback cannot be empty.", None |
|
|
|
if feedback == PASSCODE: |
|
|
|
return "Access granted! Download the database file below.", "feedback.db" |
|
else: |
|
|
|
with sqlite3.connect("feedback.db") as conn: |
|
cursor = conn.cursor() |
|
cursor.execute("CREATE TABLE IF NOT EXISTS studio_feedback (id INTEGER PRIMARY KEY, comment TEXT)") |
|
cursor.execute("INSERT INTO studio_feedback (comment) VALUES (?)", (feedback,)) |
|
conn.commit() |
|
return "Thank you for your feedback!", None |
|
|
|
|
|
logging.basicConfig(level=logging.DEBUG, format="%(asctime)s - %(levelname)s - %(message)s") |
|
logger = logging.getLogger(__name__) |
|
|
|
def list_available_fonts(): |
|
try: |
|
|
|
result = subprocess.run( |
|
["fc-list", "--format", "%{file}\\n"], |
|
stdout=subprocess.PIPE, |
|
stderr=subprocess.PIPE, |
|
text=True, |
|
check=True |
|
) |
|
fonts = result.stdout.splitlines() |
|
logger.debug(f"Available fonts:\n{fonts}") |
|
return fonts |
|
except subprocess.CalledProcessError as e: |
|
logger.error(f"Error while listing fonts: {e.stderr}") |
|
return [] |
|
|
|
def split_into_sentences(text): |
|
blob = TextBlob(text) |
|
return [str(sentence) for sentence in blob.sentences] |
|
|
|
def transcribe_video(video_path): |
|
|
|
video = VideoFileClip(video_path) |
|
audio_path = "audio.wav" |
|
video.audio.write_audiofile(audio_path) |
|
|
|
|
|
model = whisper.load_model("base") |
|
|
|
|
|
result = model.transcribe(audio_path, word_timestamps=True) |
|
|
|
|
|
transcript_with_timestamps = [ |
|
{ |
|
"start": segment["start"], |
|
"end": segment["end"], |
|
"text": segment["text"] |
|
} |
|
for segment in result["segments"] |
|
] |
|
return transcript_with_timestamps |
|
|
|
|
|
def get_translation_model(target_language): |
|
|
|
model_map = { |
|
"es": "Helsinki-NLP/opus-mt-en-es", |
|
"fr": "Helsinki-NLP/opus-mt-en-fr", |
|
"zh": "Helsinki-NLP/opus-mt-en-zh", |
|
|
|
} |
|
return model_map.get(target_language, "Helsinki-NLP/opus-mt-en-zh") |
|
|
|
def translate_text(transcription_json, target_language): |
|
|
|
translation_model_id = get_translation_model(target_language) |
|
logger.debug(f"Translation model: {translation_model_id}") |
|
translator = pipeline("translation", model=translation_model_id) |
|
|
|
|
|
translated_json = [] |
|
|
|
|
|
for entry in transcription_json: |
|
original_text = entry["text"] |
|
translated_text = translator(original_text)[0]['translation_text'] |
|
translated_json.append({ |
|
"start": entry["start"], |
|
"original": original_text, |
|
"translated": translated_text, |
|
"end": entry["end"] |
|
}) |
|
|
|
logger.debug("Adding to translated_json: start=%s, original=%s, translated=%s, end=%s", |
|
entry["start"], original_text, translated_text, entry["end"]) |
|
|
|
|
|
return translated_json |
|
|
|
def add_transcript_to_video(video_path, translated_json, output_path): |
|
|
|
video = VideoFileClip(video_path) |
|
|
|
|
|
text_clips = [] |
|
|
|
logger.debug("Full translated_json: %s", translated_json) |
|
for entry in translated_json: |
|
logger.debug("Processing entry: %s", entry) |
|
|
|
font_path = "./NotoSansSC-Regular.ttf" |
|
|
|
for entry in translated_json: |
|
|
|
if isinstance(entry, dict) and "translated" in entry: |
|
txt_clip = TextClip( |
|
text=entry["translated"], font=font_path, method='caption', color='yellow', size=video.size |
|
).with_start(entry["start"]).with_duration(entry["end"] - entry["start"]).with_position(('bottom')).with_opacity(0.7) |
|
text_clips.append(txt_clip) |
|
else: |
|
raise ValueError(f"Invalid entry format: {entry}") |
|
|
|
|
|
final_video = CompositeVideoClip([video] + text_clips) |
|
|
|
|
|
final_video.write_videofile(output_path, codec='libx264', audio_codec='aac') |
|
|
|
|
|
def mock_post_to_platform(platform, content_title): |
|
return f"Content '{content_title}' successfully posted on {platform}!" |
|
|
|
def mock_analytics(): |
|
return { |
|
"YouTube": {"Views": random.randint(1000, 5000), "Engagement Rate": f"{random.uniform(5, 15):.2f}%"}, |
|
"Instagram": {"Views": random.randint(500, 3000), "Engagement Rate": f"{random.uniform(10, 20):.2f}%"}, |
|
} |
|
|
|
import json |
|
|
|
def update_translations(file, edited_table): |
|
""" |
|
Update the translations based on user edits in the Gradio Dataframe. |
|
""" |
|
output_video_path = "output_video.mp4" |
|
logger.debug(f"Editable Table: {edited_table}") |
|
|
|
try: |
|
|
|
updated_translations = [ |
|
{ |
|
"start": row["start"], |
|
"original": row["original"], |
|
"translated": row["translated"], |
|
"end": row["end"] |
|
} |
|
for _, row in edited_table.iterrows() |
|
] |
|
|
|
|
|
add_transcript_to_video(file.name, updated_translations, output_video_path) |
|
|
|
return output_video_path |
|
|
|
except Exception as e: |
|
raise ValueError(f"Error updating translations: {e}") |
|
|
|
|
|
def upload_and_manage(file, language): |
|
if file is None: |
|
return "Please upload a video/audio file.", None, None, None |
|
|
|
|
|
audio_path = "audio.wav" |
|
output_video_path = "output_video.mp4" |
|
|
|
list_available_fonts() |
|
|
|
transcrption_json = transcribe_video(file.name) |
|
|
|
translated_json = translate_text(transcrption_json, language) |
|
|
|
|
|
add_transcript_to_video(file.name, translated_json, output_video_path) |
|
|
|
|
|
|
|
|
|
|
|
editable_table = [ |
|
[float(entry["start"]), entry["original"], entry["translated"], float(entry["end"])] |
|
for entry in translated_json |
|
] |
|
|
|
return translated_json, editable_table, output_video_path |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def build_interface(): |
|
with gr.Blocks() as demo: |
|
|
|
gr.Markdown("## Video Localization") |
|
with gr.Row(): |
|
with gr.Column(scale=4): |
|
file_input = gr.File(label="Upload Video/Audio File") |
|
|
|
language_input = gr.Dropdown(["en", "es", "fr", "zh"], label="Select Language") |
|
submit_button = gr.Button("Post and Process") |
|
editable_translations = gr.State(value=[]) |
|
|
|
with gr.Column(scale=8): |
|
|
|
gr.Markdown("## Edit Translations") |
|
|
|
|
|
editable_table = gr.Dataframe( |
|
value=[], |
|
headers=["start", "original", "translated", "end"], |
|
datatype=["number", "str", "str", "number"], |
|
row_count=1, |
|
col_count=4, |
|
interactive=[False, True, True, False], |
|
label="Edit Translations", |
|
) |
|
save_changes_button = gr.Button("Save Changes") |
|
processed_video_output = gr.File(label="Download Processed Video", interactive=True) |
|
|
|
with gr.Column(scale=1): |
|
gr.Markdown("**Feedback**") |
|
feedback_input = gr.Textbox( |
|
placeholder="Leave your feedback here...", |
|
label=None, |
|
lines=3, |
|
) |
|
feedback_btn = gr.Button("Submit Feeback") |
|
response_message = gr.Textbox(label=None, lines=1, interactive=False) |
|
db_download = gr.File(label="Download Database File", visible=False) |
|
|
|
|
|
def feedback_submission(feedback): |
|
message, file_path = handle_feedback(feedback) |
|
if file_path: |
|
return message, gr.update(value=file_path, visible=True) |
|
return message, gr.update(visible=False) |
|
|
|
save_changes_button.click( |
|
update_translations, |
|
inputs=[file_input, editable_table], |
|
outputs=[processed_video_output] |
|
) |
|
|
|
submit_button.click( |
|
upload_and_manage, |
|
inputs=[file_input, language_input], |
|
outputs=[editable_translations, editable_table, processed_video_output] |
|
) |
|
|
|
|
|
feedback_btn.click( |
|
feedback_submission, |
|
inputs=[feedback_input], |
|
outputs=[response_message, db_download] |
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return demo |
|
|
|
|
|
demo = build_interface() |
|
demo.launch() |