Studio_V0 / app.py
qqwjq1981's picture
Update app.py
ffcb48f verified
raw
history blame
8.06 kB
import gradio as gr
from datetime import datetime
import random
from transformers import pipeline
from transformers.pipelines.audio_utils import ffmpeg_read
from moviepy import (
ImageClip,
VideoFileClip,
TextClip,
CompositeVideoClip,
AudioFileClip,
concatenate_videoclips
)
import subprocess
import speech_recognition as sr
import json
from nltk.tokenize import sent_tokenize
import logging
from textblob import TextBlob
import whisper
# Configure logging
logging.basicConfig(level=logging.DEBUG, format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger(__name__)
def list_available_fonts():
try:
# Run the 'fc-list' command to list fonts
result = subprocess.run(
["fc-list", "--format", "%{file}\\n"],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
check=True
)
fonts = result.stdout.splitlines()
logger.debug(f"Available fonts:\n{fonts}")
return fonts
except subprocess.CalledProcessError as e:
logger.error(f"Error while listing fonts: {e.stderr}")
return []
def split_into_sentences(text):
blob = TextBlob(text)
return [str(sentence) for sentence in blob.sentences]
def transcribe_video(video_path):
# Load the video file and extract audio
video = VideoFileClip(video_path)
audio_path = "audio.wav"
video.audio.write_audiofile(audio_path)
# Load Whisper model
model = whisper.load_model("base") # Options: tiny, base, small, medium, large
# Transcribe with Whisper
result = model.transcribe(audio_path, word_timestamps=True)
# Extract timestamps and text
transcript_with_timestamps = [
{
"start": segment["start"],
"end": segment["end"],
"text": segment["text"]
}
for segment in result["segments"]
]
return transcript_with_timestamps
# Function to get the appropriate translation model based on target language
def get_translation_model(target_language):
# Map of target languages to their corresponding model names
model_map = {
"es": "Helsinki-NLP/opus-mt-en-es", # English to Spanish
"fr": "Helsinki-NLP/opus-mt-en-fr", # English to French
"zh": "Helsinki-NLP/opus-mt-en-zh", # English to Chinese
# Add more languages as needed
}
return model_map.get(target_language, "Helsinki-NLP/opus-mt-en-zh") # Default to Chinese if not found
def translate_text(transcription_json, target_language):
# Load the translation model for the specified target language
translation_model_id = get_translation_model(target_language)
logger.debug(f"Translation model: {translation_model_id}")
translator = pipeline("translation", model=translation_model_id)
# Prepare output structure
translated_json = []
# Translate each sentence and store it with its start time
for entry in transcription_json:
original_text = entry["text"]
translated_text = translator(original_text)[0]['translation_text']
translated_json.append({
"start": entry["start"],
"original": original_text,
"translated": translated_text,
"end": entry["end"]
})
# Log the components being added to translated_json
logger.debug("Adding to translated_json: start=%s, original=%s, translated=%s, end=%s",
entry["start"], original_text, translated_text, entry["end"])
# Return the translated timestamps as a JSON string
return translated_json
def add_transcript_to_video(video_path, translated_json, output_path):
# Load the video file
video = VideoFileClip(video_path)
# Create text clips based on timestamps
text_clips = []
logger.debug("Full translated_json: %s", translated_json)
for entry in translated_json:
logger.debug("Processing entry: %s", entry)
font_path = "./NotoSansSC-Regular.ttf"
for entry in translated_json:
# Ensure `entry` is a dictionary with keys "start", "end", and "translated"
if isinstance(entry, dict) and "translated" in entry:
txt_clip = TextClip(
text=entry["translated"], font=font_path, method='caption', color='yellow', size=video.size
).with_start(entry["start"]).with_duration(entry["end"] - entry["start"]).with_position(('bottom')).with_opacity(0.7)
text_clips.append(txt_clip)
else:
raise ValueError(f"Invalid entry format: {entry}")
# Overlay all text clips on the original video
final_video = CompositeVideoClip([video] + text_clips)
# Write the result to a file
final_video.write_videofile(output_path, codec='libx264', audio_codec='aac')
# Mock functions for platform actions and analytics
def mock_post_to_platform(platform, content_title):
return f"Content '{content_title}' successfully posted on {platform}!"
def mock_analytics():
return {
"YouTube": {"Views": random.randint(1000, 5000), "Engagement Rate": f"{random.uniform(5, 15):.2f}%"},
"Instagram": {"Views": random.randint(500, 3000), "Engagement Rate": f"{random.uniform(10, 20):.2f}%"},
}
# Core functionalities
def upload_and_manage(file, language):
if file is None:
return "Please upload a video/audio file.", None, None, None
# Define paths for audio and output files
audio_path = "audio.wav"
output_video_path = "output_video.mp4"
list_available_fonts()
# Transcribe audio from uploaded media file and get timestamps
transcrption_json = transcribe_video(file.name)
translated_json = translate_text(transcrption_json, language)
# Add transcript to video based on timestamps
add_transcript_to_video(file.name, translated_json, output_video_path)
# Mock posting action (you can implement this as needed)
# post_message = mock_post_to_platform(platform, file.name)
return transcrption_json, translated_json, output_video_path
# def generate_dashboard():
# # Mock analytics generation
# analytics = mock_analytics()
# if not analytics:
# return "No analytics available."
# dashboard = "Platform Analytics:\n"
# for platform, data in analytics.items():
# dashboard += f"\n{platform}:\n"
# for metric, value in data.items():
# dashboard += f" {metric}: {value}\n"
# return dashboard
# Gradio Interface with Tabs
def build_interface():
with gr.Blocks() as demo:
# with gr.Tab("Content Management"):
gr.Markdown("## Video Localization")
with gr.Row():
file_input = gr.File(label="Upload Video/Audio File")
# platform_input = gr.Dropdown(["YouTube", "Instagram"], label="Select Platform")
language_input = gr.Dropdown(["en", "es", "fr", "zh"], label="Select Language") # Language codes
submit_button = gr.Button("Post and Process")
with gr.Row():
# post_output = gr.Textbox(label="Posting Status", interactive=False)
transcription_output = gr.JSON(label="Transcription JSON File")
translated_output = gr.JSON(label="Translated JSON File")
with gr.Row():
processed_video_output = gr.File(label="Download Processed Video", interactive=False) # Download button
submit_button.click(
upload_and_manage,
inputs=[file_input, language_input],
outputs=[transcription_output, translated_output, processed_video_output]
)
# with gr.Tab("Analytics Dashboard"):
# gr.Markdown("## Content Performance Analytics")
# analytics_output = gr.Textbox(label="Dashboard", interactive=False)
# generate_dashboard_button = gr.Button("Generate Dashboard")
# generate_dashboard_button.click(generate_dashboard, outputs=[analytics_output])
return demo
# Launch the Gradio interface
demo = build_interface()
demo.launch()