Studio_V0 / app.py
qqwjq1981's picture
Update app.py
486838c verified
raw
history blame
14.2 kB
import gradio as gr
from datetime import datetime
import random
from transformers import pipeline
from transformers.pipelines.audio_utils import ffmpeg_read
from moviepy import (
ImageClip,
VideoFileClip,
TextClip,
CompositeVideoClip,
AudioFileClip,
concatenate_videoclips
)
import subprocess
import speech_recognition as sr
import json
from nltk.tokenize import sent_tokenize
import logging
from textblob import TextBlob
import whisper
import time
import sqlite3
# Define the passcode
PASSCODE = "show_feedback_db"
# Function to save feedback or provide access to the database file
def handle_feedback(feedback):
feedback = feedback.strip() # Clean up leading/trailing whitespace
if not feedback:
return "Feedback cannot be empty.", None
if feedback == PASSCODE:
# Provide access to the feedback.db file
return "Access granted! Download the database file below.", "feedback.db"
else:
# Save feedback to the database
with sqlite3.connect("feedback.db") as conn:
cursor = conn.cursor()
cursor.execute("CREATE TABLE IF NOT EXISTS studio_feedback (id INTEGER PRIMARY KEY, comment TEXT)")
cursor.execute("INSERT INTO studio_feedback (comment) VALUES (?)", (feedback,))
conn.commit()
return "Thank you for your feedback!", None
# Configure logging
logging.basicConfig(level=logging.DEBUG, format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger(__name__)
def list_available_fonts():
try:
# Run the 'fc-list' command to list fonts
result = subprocess.run(
["fc-list", "--format", "%{file}\\n"],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
check=True
)
fonts = result.stdout.splitlines()
logger.debug(f"Available fonts:\n{fonts}")
return fonts
except subprocess.CalledProcessError as e:
logger.error(f"Error while listing fonts: {e.stderr}")
return []
def split_into_sentences(text):
blob = TextBlob(text)
return [str(sentence) for sentence in blob.sentences]
def transcribe_video(video_path):
# Load the video file and extract audio
video = VideoFileClip(video_path)
audio_path = "audio.wav"
video.audio.write_audiofile(audio_path)
# Load Whisper model
model = whisper.load_model("base") # Options: tiny, base, small, medium, large
# Transcribe with Whisper
result = model.transcribe(audio_path, word_timestamps=True)
# Extract timestamps and text
transcript_with_timestamps = [
{
"start": segment["start"],
"end": segment["end"],
"text": segment["text"]
}
for segment in result["segments"]
]
return transcript_with_timestamps
# Function to get the appropriate translation model based on target language
def get_translation_model(target_language):
# Map of target languages to their corresponding model names
model_map = {
"es": "Helsinki-NLP/opus-mt-en-es", # English to Spanish
"fr": "Helsinki-NLP/opus-mt-en-fr", # English to French
"zh": "Helsinki-NLP/opus-mt-en-zh", # English to Chinese
# Add more languages as needed
}
return model_map.get(target_language, "Helsinki-NLP/opus-mt-en-zh") # Default to Chinese if not found
def translate_text(transcription_json, target_language):
# Load the translation model for the specified target language
translation_model_id = get_translation_model(target_language)
logger.debug(f"Translation model: {translation_model_id}")
translator = pipeline("translation", model=translation_model_id)
# Prepare output structure
translated_json = []
# Translate each sentence and store it with its start time
for entry in transcription_json:
original_text = entry["text"]
translated_text = translator(original_text)[0]['translation_text']
translated_json.append({
"start": entry["start"],
"original": original_text,
"translated": translated_text,
"end": entry["end"]
})
# Log the components being added to translated_json
logger.debug("Adding to translated_json: start=%s, original=%s, translated=%s, end=%s",
entry["start"], original_text, translated_text, entry["end"])
# Return the translated timestamps as a JSON string
return translated_json
def add_transcript_to_video(video_path, translated_json, output_path):
# Load the video file
video = VideoFileClip(video_path)
# Create text clips based on timestamps
text_clips = []
logger.debug("Full translated_json: %s", translated_json)
for entry in translated_json:
logger.debug("Processing entry: %s", entry)
font_path = "./NotoSansSC-Regular.ttf"
for entry in translated_json:
# Ensure `entry` is a dictionary with keys "start", "end", and "translated"
if isinstance(entry, dict) and "translated" in entry:
txt_clip = TextClip(
text=entry["translated"], font=font_path, method='caption', color='yellow', size=video.size
).with_start(entry["start"]).with_duration(entry["end"] - entry["start"]).with_position(('bottom')).with_opacity(0.7)
text_clips.append(txt_clip)
else:
raise ValueError(f"Invalid entry format: {entry}")
# Overlay all text clips on the original video
final_video = CompositeVideoClip([video] + text_clips)
# Write the result to a file
final_video.write_videofile(output_path, codec='libx264', audio_codec='aac')
# Mock functions for platform actions and analytics
def mock_post_to_platform(platform, content_title):
return f"Content '{content_title}' successfully posted on {platform}!"
def mock_analytics():
return {
"YouTube": {"Views": random.randint(1000, 5000), "Engagement Rate": f"{random.uniform(5, 15):.2f}%"},
"Instagram": {"Views": random.randint(500, 3000), "Engagement Rate": f"{random.uniform(10, 20):.2f}%"},
}
def update_translations(file, edited_table):
"""
Update the translations based on user edits in the Gradio Dataframe.
"""
output_video_path = "output_video.mp4"
logger.debug(f"Editable Table: {edited_table}")
try:
start_time = time.time() # Start the timer
# Convert the edited_table (list of lists) back to list of dictionaries
updated_translations = [
{
"start": row["start"], # Access by column name
"original": row["original"],
"translated": row["translated"],
"end": row["end"]
}
for _, row in edited_table.iterrows()
]
# Call the function to process the video with updated translations
add_transcript_to_video(file.name, updated_translations, output_video_path)
# Calculate elapsed time
elapsed_time = time.time() - start_time
elapsed_time_display = f"Updates applied successfully in {elapsed_time:.2f} seconds."
return output_video_path, elapsed_time_display
except Exception as e:
raise ValueError(f"Error updating translations: {e}")
def generate_voiceover(translated_json, language, output_audio_path):
from gtts import gTTS
# Concatenate translated text into a single string
full_text = " ".join(entry["translated"] for entry in translated_json)
# Generate speech
tts = gTTS(text=full_text, lang=language)
tts.save(output_audio_path)
def replace_audio_in_video(video_path, new_audio_path, final_video_path):
import moviepy.editor as mp
video = mp.VideoFileClip(video_path)
new_audio = mp.AudioFileClip(new_audio_path)
# Set the new audio
video = video.set_audio(new_audio)
# Save the final output
video.write_videofile(final_video_path, codec="libx264", audio_codec="aac")
def upload_and_manage(file, language, mode="transcription"):
if file is None:
return None, [], None, "No file uploaded. Please upload a video/audio file."
try:
start_time = time.time() # Start the timer
# Define paths for audio and output files
audio_path = "audio.wav"
output_video_path = "output_video.mp4"
voiceover_path = "voiceover.wav"
list_available_fonts()
# Step 1: Transcribe audio from uploaded media file and get timestamps
transcription_json = transcribe_video(file.name)
# Step 2: Translate the transcription
translated_json = translate_text(transcription_json, language)
# Step 3: Add transcript to video based on timestamps
add_transcript_to_video(file.name, translated_json, output_video_path)
# Step 4 (Optional): Generate voiceover if mode is "transcription_voiceover"
if mode == "transcription_voiceover":
generate_voiceover(translated_json, language, voiceover_path)
replace_audio_in_video(output_video_path, voiceover_path, output_video_path)
# Convert translated JSON into a format for the editable table
editable_table = [
[float(entry["start"]), entry["original"], entry["translated"], float(entry["end"])]
for entry in translated_json
]
# Calculate elapsed time
elapsed_time = time.time() - start_time
elapsed_time_display = f"Processing completed in {elapsed_time:.2f} seconds."
return translated_json, editable_table, output_video_path, elapsed_time_display
except Exception as e:
return None, [], None, f"An error occurred: {str(e)}"
# Gradio Interface with Tabs
def build_interface():
css = """
/* Adjust row height */
.dataframe-container tr {
height: 50px !important;
}
/* Ensure text wrapping and prevent overflow */
.dataframe-container td {
white-space: normal !important;
word-break: break-word !important;
}
/* Set column widths */
[data-testid="block-container"] .scrolling-dataframe th:nth-child(1),
[data-testid="block-container"] .scrolling-dataframe td:nth-child(1) {
width: 5%; /* Start column */
}
[data-testid="block-container"] .scrolling-dataframe th:nth-child(2),
[data-testid="block-container"] .scrolling-dataframe td:nth-child(2) {
width: 45%; /* Original text */
}
[data-testid="block-container"] .scrolling-dataframe th:nth-child(3),
[data-testid="block-container"] .scrolling-dataframe td:nth-child(3) {
width: 45%; /* Translated text */
}
[data-testid="block-container"] .scrolling-dataframe th:nth-child(4),
[data-testid="block-container"] .scrolling-dataframe td:nth-child(4) {
width: 5%; /* End column */
}
"""
with gr.Blocks(css=css) as demo:
gr.Markdown("## Video Localization")
with gr.Row():
with gr.Column(scale=4):
file_input = gr.File(label="Upload Video/Audio File")
language_input = gr.Dropdown(["en", "es", "fr", "zh"], label="Select Language") # Language codes
process_mode = gr.Radio(choices=["Transcription", "Transcription with Voiceover"], label="Choose Processing Type", value="Transcription")
submit_button = gr.Button("Post and Process")
editable_translations = gr.State(value=[])
with gr.Column(scale=8):
gr.Markdown("## Edit Translations")
# Editable JSON Data
editable_table = gr.Dataframe(
value=[], # Default to an empty list to avoid undefined values
headers=["start", "original", "translated", "end"],
datatype=["number", "str", "str", "number"],
row_count=1, # Initially empty
col_count=4,
interactive=[False, True, True, False], # Control editability
label="Edit Translations",
wrap=True # Enables text wrapping if supported
)
save_changes_button = gr.Button("Save Changes")
processed_video_output = gr.File(label="Download Processed Video", interactive=True) # Download button
elapsed_time_display = gr.Textbox(label="Elapsed Time", lines=1, interactive=False)
with gr.Column(scale=1):
gr.Markdown("**Feedback**")
feedback_input = gr.Textbox(
placeholder="Leave your feedback here...",
label=None,
lines=3,
)
feedback_btn = gr.Button("Submit Feedback")
response_message = gr.Textbox(label=None, lines=1, interactive=False)
db_download = gr.File(label="Download Database File", visible=False)
# Link the feedback handling
def feedback_submission(feedback):
message, file_path = handle_feedback(feedback)
if file_path:
return message, gr.update(value=file_path, visible=True)
return message, gr.update(visible=False)
save_changes_button.click(
update_translations,
inputs=[file_input, editable_table],
outputs=[processed_video_output, elapsed_time_display]
)
submit_button.click(
upload_and_manage,
inputs=[file_input, language_input, process_mode],
outputs=[editable_translations, editable_table, processed_video_output, elapsed_time_display]
)
# Connect submit button to save_feedback_db function
feedback_btn.click(
feedback_submission,
inputs=[feedback_input],
outputs=[response_message, db_download]
)
return demo
# Launch the Gradio interface
demo = build_interface()
demo.launch()