|
import gradio as gr |
|
from datetime import datetime |
|
import random |
|
from transformers import pipeline |
|
from transformers.pipelines.audio_utils import ffmpeg_read |
|
from moviepy import ( |
|
ImageClip, |
|
VideoFileClip, |
|
TextClip, |
|
CompositeVideoClip, |
|
AudioFileClip, |
|
concatenate_videoclips |
|
) |
|
import speech_recognition as sr |
|
import json |
|
from nltk.tokenize import sent_tokenize |
|
import logging |
|
|
|
|
|
logging.basicConfig(level=logging.DEBUG, format="%(asctime)s - %(levelname)s - %(message)s") |
|
logger = logging.getLogger(__name__) |
|
|
|
def transcribe_video(video_path): |
|
|
|
video = VideoFileClip(video_path) |
|
audio_path = "audio.wav" |
|
video.audio.write_audiofile(audio_path) |
|
|
|
|
|
recognizer = sr.Recognizer() |
|
|
|
|
|
with sr.AudioFile(audio_path) as source: |
|
audio_text = recognizer.record(source) |
|
transcript = recognizer.recognize_google(audio_text) |
|
|
|
|
|
sentences = sent_tokenize(transcript) |
|
|
|
|
|
timestamps = [] |
|
duration_per_sentence = len(audio_text.frame_data) / len(sentences) / 44100 |
|
|
|
for i, sentence in enumerate(sentences): |
|
start_time = i * duration_per_sentence |
|
timestamps.append({"start": start_time, "text": sentence}) |
|
|
|
return timestamps |
|
|
|
|
|
def get_translation_model(target_language): |
|
|
|
model_map = { |
|
"es": "Helsinki-NLP/opus-mt-en-es", |
|
"fr": "Helsinki-NLP/opus-mt-en-fr", |
|
"zh": "Helsinki-NLP/opus-mt-en-zh", |
|
|
|
} |
|
return model_map.get(target_language, "Helsinki-NLP/opus-mt-en-fr") |
|
|
|
def translate_text(transcription_json, target_language): |
|
|
|
translation_model_id = get_translation_model(target_language) |
|
logger.debug(f"Translation model: {translation_model_id}") |
|
translator = pipeline("translation", model=translation_model_id) |
|
|
|
|
|
translated_json = [] |
|
|
|
|
|
for entry in transcription_json: |
|
original_text = entry["text"] |
|
translated_text = translator(original_text)[0]['translation_text'] |
|
translated_json.append({ |
|
"start": entry["start"], |
|
"original": original_text, |
|
"translated": translated_text |
|
}) |
|
|
|
|
|
return json.dumps(translated_json, indent=4) |
|
|
|
def add_transcript_to_video(video_path, timestamps, output_path): |
|
|
|
video = VideoFileClip(video_path) |
|
|
|
|
|
text_clips = [] |
|
|
|
for entry in timestamps: |
|
|
|
txt_clip = TextClip(entry["text"], fontsize=24, color='white', bg_color='black', size=video.size) |
|
|
|
|
|
txt_clip = txt_clip.set_start(entry["start"]).set_duration(3).set_position(('bottom')).set_opacity(0.7) |
|
|
|
|
|
text_clips.append(txt_clip) |
|
|
|
|
|
final_video = CompositeVideoClip([video] + text_clips) |
|
|
|
|
|
final_video.write_videofile(output_path, codec='libx264', audio_codec='aac') |
|
|
|
|
|
def mock_post_to_platform(platform, content_title): |
|
return f"Content '{content_title}' successfully posted on {platform}!" |
|
|
|
def mock_analytics(): |
|
return { |
|
"YouTube": {"Views": random.randint(1000, 5000), "Engagement Rate": f"{random.uniform(5, 15):.2f}%"}, |
|
"Instagram": {"Views": random.randint(500, 3000), "Engagement Rate": f"{random.uniform(10, 20):.2f}%"}, |
|
} |
|
|
|
|
|
def upload_and_manage(file, platform, language): |
|
if file is None: |
|
return "Please upload a video/audio file.", None, None, None |
|
|
|
|
|
audio_path = "audio.wav" |
|
output_video_path = "output_video.mp4" |
|
|
|
|
|
transcrption_json = transcribe_video(file.name) |
|
|
|
translated_json = translate_text(transcrption_json, language) |
|
|
|
|
|
add_transcript_to_video(file.name, translated_json, output_video_path) |
|
|
|
|
|
post_message = mock_post_to_platform(platform, file.name) |
|
|
|
|
|
analytics = mock_analytics() |
|
|
|
return post_message, transcrption_json, translated_json, analytics |
|
|
|
def generate_dashboard(analytics): |
|
if not analytics: |
|
return "No analytics available." |
|
|
|
dashboard = "Platform Analytics:\n" |
|
for platform, data in analytics.items(): |
|
dashboard += f"\n{platform}:\n" |
|
for metric, value in data.items(): |
|
dashboard += f" {metric}: {value}\n" |
|
return dashboard |
|
|
|
|
|
def build_interface(): |
|
with gr.Blocks() as demo: |
|
with gr.Tab("Content Management"): |
|
gr.Markdown("## Integrated Content Management") |
|
with gr.Row(): |
|
file_input = gr.File(label="Upload Video/Audio File") |
|
platform_input = gr.Dropdown(["YouTube", "Instagram"], label="Select Platform") |
|
language_input = gr.Dropdown(["en", "es", "fr", "zh"], label="Select Language") |
|
|
|
submit_button = gr.Button("Post and Process") |
|
|
|
with gr.Row(): |
|
post_output = gr.Textbox(label="Posting Status", interactive=False) |
|
transcription_output = gr.Textbox(label="Transcription JSON File", interactive=False) |
|
translated_output = gr.Textbox(label="Translated JSON File", interactive=False) |
|
|
|
submit_button.click(upload_and_manage, |
|
inputs=[file_input, platform_input, language_input], |
|
outputs=[post_output, transcription_output, json_output, gr.State()]) |
|
|
|
with gr.Tab("Analytics Dashboard"): |
|
gr.Markdown("## Content Performance Analytics") |
|
analytics_output = gr.Textbox(label="Dashboard", interactive=False) |
|
generate_dashboard_button = gr.Button("Generate Dashboard") |
|
|
|
generate_dashboard_button.click(generate_dashboard, inputs=[gr.State()], outputs=[analytics_output]) |
|
|
|
return demo |
|
|
|
|
|
demo = build_interface() |
|
demo.launch() |