Studio_V0 / app.py
qqwjq1981's picture
Update app.py
b82606e verified
raw
history blame
12 kB
import gradio as gr
from datetime import datetime
import random
from transformers import pipeline
from transformers.pipelines.audio_utils import ffmpeg_read
from moviepy import (
ImageClip,
VideoFileClip,
TextClip,
CompositeVideoClip,
AudioFileClip,
concatenate_videoclips
)
import subprocess
import speech_recognition as sr
import json
from nltk.tokenize import sent_tokenize
import logging
from textblob import TextBlob
import whisper
import sqlite3
# Define the passcode
PASSCODE = "show_feedback_db"
# Function to save feedback or provide access to the database file
def handle_feedback(feedback):
feedback = feedback.strip() # Clean up leading/trailing whitespace
if not feedback:
return "Feedback cannot be empty.", None
if feedback == PASSCODE:
# Provide access to the feedback.db file
return "Access granted! Download the database file below.", "feedback.db"
else:
# Save feedback to the database
with sqlite3.connect("feedback.db") as conn:
cursor = conn.cursor()
cursor.execute("CREATE TABLE IF NOT EXISTS studio_feedback (id INTEGER PRIMARY KEY, comment TEXT)")
cursor.execute("INSERT INTO studio_feedback (comment) VALUES (?)", (feedback,))
conn.commit()
return "Thank you for your feedback!", None
# Configure logging
logging.basicConfig(level=logging.DEBUG, format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger(__name__)
def list_available_fonts():
try:
# Run the 'fc-list' command to list fonts
result = subprocess.run(
["fc-list", "--format", "%{file}\\n"],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
check=True
)
fonts = result.stdout.splitlines()
logger.debug(f"Available fonts:\n{fonts}")
return fonts
except subprocess.CalledProcessError as e:
logger.error(f"Error while listing fonts: {e.stderr}")
return []
def split_into_sentences(text):
blob = TextBlob(text)
return [str(sentence) for sentence in blob.sentences]
def transcribe_video(video_path):
# Load the video file and extract audio
video = VideoFileClip(video_path)
audio_path = "audio.wav"
video.audio.write_audiofile(audio_path)
# Load Whisper model
model = whisper.load_model("base") # Options: tiny, base, small, medium, large
# Transcribe with Whisper
result = model.transcribe(audio_path, word_timestamps=True)
# Extract timestamps and text
transcript_with_timestamps = [
{
"start": segment["start"],
"end": segment["end"],
"text": segment["text"]
}
for segment in result["segments"]
]
return transcript_with_timestamps
# Function to get the appropriate translation model based on target language
def get_translation_model(target_language):
# Map of target languages to their corresponding model names
model_map = {
"es": "Helsinki-NLP/opus-mt-en-es", # English to Spanish
"fr": "Helsinki-NLP/opus-mt-en-fr", # English to French
"zh": "Helsinki-NLP/opus-mt-en-zh", # English to Chinese
# Add more languages as needed
}
return model_map.get(target_language, "Helsinki-NLP/opus-mt-en-zh") # Default to Chinese if not found
def translate_text(transcription_json, target_language):
# Load the translation model for the specified target language
translation_model_id = get_translation_model(target_language)
logger.debug(f"Translation model: {translation_model_id}")
translator = pipeline("translation", model=translation_model_id)
# Prepare output structure
translated_json = []
# Translate each sentence and store it with its start time
for entry in transcription_json:
original_text = entry["text"]
translated_text = translator(original_text)[0]['translation_text']
translated_json.append({
"start": entry["start"],
"original": original_text,
"translated": translated_text,
"end": entry["end"]
})
# Log the components being added to translated_json
logger.debug("Adding to translated_json: start=%s, original=%s, translated=%s, end=%s",
entry["start"], original_text, translated_text, entry["end"])
# Return the translated timestamps as a JSON string
return translated_json
def add_transcript_to_video(video_path, translated_json, output_path):
# Load the video file
video = VideoFileClip(video_path)
# Create text clips based on timestamps
text_clips = []
logger.debug("Full translated_json: %s", translated_json)
for entry in translated_json:
logger.debug("Processing entry: %s", entry)
font_path = "./NotoSansSC-Regular.ttf"
for entry in translated_json:
# Ensure `entry` is a dictionary with keys "start", "end", and "translated"
if isinstance(entry, dict) and "translated" in entry:
txt_clip = TextClip(
text=entry["translated"], font=font_path, method='caption', color='yellow', size=video.size
).with_start(entry["start"]).with_duration(entry["end"] - entry["start"]).with_position(('bottom')).with_opacity(0.7)
text_clips.append(txt_clip)
else:
raise ValueError(f"Invalid entry format: {entry}")
# Overlay all text clips on the original video
final_video = CompositeVideoClip([video] + text_clips)
# Write the result to a file
final_video.write_videofile(output_path, codec='libx264', audio_codec='aac')
# Mock functions for platform actions and analytics
def mock_post_to_platform(platform, content_title):
return f"Content '{content_title}' successfully posted on {platform}!"
def mock_analytics():
return {
"YouTube": {"Views": random.randint(1000, 5000), "Engagement Rate": f"{random.uniform(5, 15):.2f}%"},
"Instagram": {"Views": random.randint(500, 3000), "Engagement Rate": f"{random.uniform(10, 20):.2f}%"},
}
import json
def update_translations(file, edited_table):
"""
Update the translations based on user edits in the Gradio Dataframe.
"""
output_video_path = "output_video.mp4"
logger.debug(f"Editable Table: {edited_table}")
try:
# Convert the edited_table (list of lists) back to list of dictionaries
updated_translations = [
{
"start": row["start"], # Access by column name
"original": row["original"],
"translated": row["translated"],
"end": row["end"]
}
for _, row in edited_table.iterrows()
]
# Call the function to process the video with updated translations
add_transcript_to_video(file.name, updated_translations, output_video_path)
return output_video_path
except Exception as e:
raise ValueError(f"Error updating translations: {e}")
# Core functionalities
def upload_and_manage(file, language):
if file is None:
return "Please upload a video/audio file.", None, None, None
# Define paths for audio and output files
audio_path = "audio.wav"
output_video_path = "output_video.mp4"
list_available_fonts()
# Transcribe audio from uploaded media file and get timestamps
transcrption_json = transcribe_video(file.name)
translated_json = translate_text(transcrption_json, language)
# Add transcript to video based on timestamps
add_transcript_to_video(file.name, translated_json, output_video_path)
# Mock posting action (you can implement this as needed)
# post_message = mock_post_to_platform(platform, file.name)
# Convert the translated JSON into a format for the editable table
editable_table = [
[float(entry["start"]), entry["original"], entry["translated"], float(entry["end"])]
for entry in translated_json
]
return translated_json, editable_table, output_video_path
# def generate_dashboard():
# # Mock analytics generation
# analytics = mock_analytics()
# if not analytics:
# return "No analytics available."
# dashboard = "Platform Analytics:\n"
# for platform, data in analytics.items():
# dashboard += f"\n{platform}:\n"
# for metric, value in data.items():
# dashboard += f" {metric}: {value}\n"
# return dashboard
# Gradio Interface with Tabs
def build_interface():
with gr.Blocks() as demo:
# with gr.Tab("Content Management"):
gr.Markdown("## Video Localization")
with gr.Row():
with gr.Column(scale=4):
file_input = gr.File(label="Upload Video/Audio File")
# platform_input = gr.Dropdown(["YouTube", "Instagram"], label="Select Platform")
language_input = gr.Dropdown(["en", "es", "fr", "zh"], label="Select Language") # Language codes
submit_button = gr.Button("Post and Process")
editable_translations = gr.State(value=[])
with gr.Column(scale=8):
gr.Markdown("## Edit Translations")
# Editable JSON Data
editable_table = gr.Dataframe(
value=[], # Default to an empty list to avoid undefined values
headers=["start", "original", "translated", "end"],
datatype=["number", "str", "str", "number"],
row_count=1, # Initially empty
col_count=4,
interactive=[False, True, True, False], # Control editability
label="Edit Translations",
)
save_changes_button = gr.Button("Save Changes")
processed_video_output = gr.File(label="Download Processed Video", interactive=True) # Download button
with gr.Column(scale=1):
gr.Markdown("**Feedback**")
feedback_input = gr.Textbox(
placeholder="Leave your feedback here...",
label=None,
lines=3,
)
feedback_btn = gr.Button("Submit Feeback")
response_message = gr.Textbox(label=None, lines=1, interactive=False)
db_download = gr.File(label="Download Database File", visible=False)
# Link the feedback handling
def feedback_submission(feedback):
message, file_path = handle_feedback(feedback)
if file_path:
return message, gr.update(value=file_path, visible=True)
return message, gr.update(visible=False)
save_changes_button.click(
update_translations,
inputs=[file_input, editable_table],
outputs=[processed_video_output]
)
submit_button.click(
upload_and_manage,
inputs=[file_input, language_input],
outputs=[editable_translations, editable_table, processed_video_output]
)
# Connect submit button to save_feedback_db function
feedback_btn.click(
feedback_submission,
inputs=[feedback_input],
outputs=[response_message, db_download]
)
# with gr.Tab("Analytics Dashboard"):
# gr.Markdown("## Content Performance Analytics")
# analytics_output = gr.Textbox(label="Dashboard", interactive=False)
# generate_dashboard_button = gr.Button("Generate Dashboard")
# generate_dashboard_button.click(generate_dashboard, outputs=[analytics_output])
return demo
# Launch the Gradio interface
demo = build_interface()
demo.launch()