import gradio as gr from datetime import datetime import random from transformers import pipeline from transformers.pipelines.audio_utils import ffmpeg_read from moviepy import ( ImageClip, VideoFileClip, TextClip, CompositeVideoClip, AudioFileClip, concatenate_videoclips ) import subprocess import speech_recognition as sr import json from nltk.tokenize import sent_tokenize import logging from textblob import TextBlob import whisper import time import sqlite3 # Define the passcode PASSCODE = "show_feedback_db" # Function to save feedback or provide access to the database file def handle_feedback(feedback): feedback = feedback.strip() # Clean up leading/trailing whitespace if not feedback: return "Feedback cannot be empty.", None if feedback == PASSCODE: # Provide access to the feedback.db file return "Access granted! Download the database file below.", "feedback.db" else: # Save feedback to the database with sqlite3.connect("feedback.db") as conn: cursor = conn.cursor() cursor.execute("CREATE TABLE IF NOT EXISTS studio_feedback (id INTEGER PRIMARY KEY, comment TEXT)") cursor.execute("INSERT INTO studio_feedback (comment) VALUES (?)", (feedback,)) conn.commit() return "Thank you for your feedback!", None # Configure logging logging.basicConfig(level=logging.DEBUG, format="%(asctime)s - %(levelname)s - %(message)s") logger = logging.getLogger(__name__) def list_available_fonts(): try: # Run the 'fc-list' command to list fonts result = subprocess.run( ["fc-list", "--format", "%{file}\\n"], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, check=True ) fonts = result.stdout.splitlines() logger.debug(f"Available fonts:\n{fonts}") return fonts except subprocess.CalledProcessError as e: logger.error(f"Error while listing fonts: {e.stderr}") return [] def split_into_sentences(text): blob = TextBlob(text) return [str(sentence) for sentence in blob.sentences] def transcribe_video(video_path): # Load the video file and extract audio video = VideoFileClip(video_path) audio_path = "audio.wav" video.audio.write_audiofile(audio_path) # Load Whisper model model = whisper.load_model("base") # Options: tiny, base, small, medium, large # Transcribe with Whisper result = model.transcribe(audio_path, word_timestamps=True) # Extract timestamps and text transcript_with_timestamps = [ { "start": segment["start"], "end": segment["end"], "text": segment["text"] } for segment in result["segments"] ] return transcript_with_timestamps # Function to get the appropriate translation model based on target language def get_translation_model(target_language): # Map of target languages to their corresponding model names model_map = { "es": "Helsinki-NLP/opus-mt-en-es", # English to Spanish "fr": "Helsinki-NLP/opus-mt-en-fr", # English to French "zh": "Helsinki-NLP/opus-mt-en-zh", # English to Chinese # Add more languages as needed } return model_map.get(target_language, "Helsinki-NLP/opus-mt-en-zh") # Default to Chinese if not found def translate_text(transcription_json, target_language): # Load the translation model for the specified target language translation_model_id = get_translation_model(target_language) logger.debug(f"Translation model: {translation_model_id}") translator = pipeline("translation", model=translation_model_id) # Prepare output structure translated_json = [] # Translate each sentence and store it with its start time for entry in transcription_json: original_text = entry["text"] translated_text = translator(original_text)[0]['translation_text'] translated_json.append({ "start": entry["start"], "original": original_text, "translated": translated_text, "end": entry["end"] }) # Log the components being added to translated_json logger.debug("Adding to translated_json: start=%s, original=%s, translated=%s, end=%s", entry["start"], original_text, translated_text, entry["end"]) # Return the translated timestamps as a JSON string return translated_json def add_transcript_to_video(video_path, translated_json, output_path): # Load the video file video = VideoFileClip(video_path) # Create text clips based on timestamps text_clips = [] logger.debug("Full translated_json: %s", translated_json) for entry in translated_json: logger.debug("Processing entry: %s", entry) font_path = "./NotoSansSC-Regular.ttf" for entry in translated_json: # Ensure `entry` is a dictionary with keys "start", "end", and "translated" if isinstance(entry, dict) and "translated" in entry: txt_clip = TextClip( text=entry["translated"], font=font_path, method='caption', color='yellow', size=video.size ).with_start(entry["start"]).with_duration(entry["end"] - entry["start"]).with_position(('bottom')).with_opacity(0.7) text_clips.append(txt_clip) else: raise ValueError(f"Invalid entry format: {entry}") # Overlay all text clips on the original video final_video = CompositeVideoClip([video] + text_clips) # Write the result to a file final_video.write_videofile(output_path, codec='libx264', audio_codec='aac') # Mock functions for platform actions and analytics def mock_post_to_platform(platform, content_title): return f"Content '{content_title}' successfully posted on {platform}!" def mock_analytics(): return { "YouTube": {"Views": random.randint(1000, 5000), "Engagement Rate": f"{random.uniform(5, 15):.2f}%"}, "Instagram": {"Views": random.randint(500, 3000), "Engagement Rate": f"{random.uniform(10, 20):.2f}%"}, } def update_translations(file, edited_table): """ Update the translations based on user edits in the Gradio Dataframe. """ output_video_path = "output_video.mp4" logger.debug(f"Editable Table: {edited_table}") try: start_time = time.time() # Start the timer # Convert the edited_table (list of lists) back to list of dictionaries updated_translations = [ { "start": row["start"], # Access by column name "original": row["original"], "translated": row["translated"], "end": row["end"] } for _, row in edited_table.iterrows() ] # Call the function to process the video with updated translations add_transcript_to_video(file.name, updated_translations, output_video_path) # Calculate elapsed time elapsed_time = time.time() - start_time elapsed_time_display = f"Updates applied successfully in {elapsed_time:.2f} seconds." return output_video_path, elapsed_time_display except Exception as e: raise ValueError(f"Error updating translations: {e}") def generate_voiceover(translated_json, language, output_audio_path): from gtts import gTTS # Concatenate translated text into a single string full_text = " ".join(entry["translated"] for entry in translated_json) # Generate speech tts = gTTS(text=full_text, lang=language) tts.save(output_audio_path) def replace_audio_in_video(video_path, new_audio_path, final_video_path): import moviepy.editor as mp video = mp.VideoFileClip(video_path) new_audio = mp.AudioFileClip(new_audio_path) # Set the new audio video = video.set_audio(new_audio) # Save the final output video.write_videofile(final_video_path, codec="libx264", audio_codec="aac") def upload_and_manage(file, language, mode="transcription"): if file is None: return None, [], None, "No file uploaded. Please upload a video/audio file." try: start_time = time.time() # Start the timer # Define paths for audio and output files audio_path = "audio.wav" output_video_path = "output_video.mp4" voiceover_path = "voiceover.wav" list_available_fonts() # Step 1: Transcribe audio from uploaded media file and get timestamps transcription_json = transcribe_video(file.name) # Step 2: Translate the transcription translated_json = translate_text(transcription_json, language) # Step 3: Add transcript to video based on timestamps add_transcript_to_video(file.name, translated_json, output_video_path) # Step 4 (Optional): Generate voiceover if mode is "transcription_voiceover" if mode == "transcription_voiceover": generate_voiceover(translated_json, language, voiceover_path) replace_audio_in_video(output_video_path, voiceover_path, output_video_path) # Convert translated JSON into a format for the editable table editable_table = [ [float(entry["start"]), entry["original"], entry["translated"], float(entry["end"])] for entry in translated_json ] # Calculate elapsed time elapsed_time = time.time() - start_time elapsed_time_display = f"Processing completed in {elapsed_time:.2f} seconds." return translated_json, editable_table, output_video_path, elapsed_time_display except Exception as e: return None, [], None, f"An error occurred: {str(e)}" # Gradio Interface with Tabs def build_interface(): css = """ /* Adjust row height */ .dataframe-container tr { height: 50px !important; } /* Ensure text wrapping and prevent overflow */ .dataframe-container td { white-space: normal !important; word-break: break-word !important; } /* Set column widths */ [data-testid="block-container"] .scrolling-dataframe th:nth-child(1), [data-testid="block-container"] .scrolling-dataframe td:nth-child(1) { width: 5%; /* Start column */ } [data-testid="block-container"] .scrolling-dataframe th:nth-child(2), [data-testid="block-container"] .scrolling-dataframe td:nth-child(2) { width: 45%; /* Original text */ } [data-testid="block-container"] .scrolling-dataframe th:nth-child(3), [data-testid="block-container"] .scrolling-dataframe td:nth-child(3) { width: 45%; /* Translated text */ } [data-testid="block-container"] .scrolling-dataframe th:nth-child(4), [data-testid="block-container"] .scrolling-dataframe td:nth-child(4) { width: 5%; /* End column */ } """ with gr.Blocks(css=css) as demo: gr.Markdown("## Video Localization") with gr.Row(): with gr.Column(scale=4): file_input = gr.File(label="Upload Video/Audio File") language_input = gr.Dropdown(["en", "es", "fr", "zh"], label="Select Language") # Language codes process_mode = gr.Radio(choices=["Transcription", "Transcription with Voiceover"], label="Choose Processing Type", value="Transcription") submit_button = gr.Button("Post and Process") editable_translations = gr.State(value=[]) with gr.Column(scale=8): gr.Markdown("## Edit Translations") # Editable JSON Data editable_table = gr.Dataframe( value=[], # Default to an empty list to avoid undefined values headers=["start", "original", "translated", "end"], datatype=["number", "str", "str", "number"], row_count=1, # Initially empty col_count=4, interactive=[False, True, True, False], # Control editability label="Edit Translations", wrap=True # Enables text wrapping if supported ) save_changes_button = gr.Button("Save Changes") processed_video_output = gr.File(label="Download Processed Video", interactive=True) # Download button elapsed_time_display = gr.Textbox(label="Elapsed Time", lines=1, interactive=False) with gr.Column(scale=1): gr.Markdown("**Feedback**") feedback_input = gr.Textbox( placeholder="Leave your feedback here...", label=None, lines=3, ) feedback_btn = gr.Button("Submit Feedback") response_message = gr.Textbox(label=None, lines=1, interactive=False) db_download = gr.File(label="Download Database File", visible=False) # Link the feedback handling def feedback_submission(feedback): message, file_path = handle_feedback(feedback) if file_path: return message, gr.update(value=file_path, visible=True) return message, gr.update(visible=False) save_changes_button.click( update_translations, inputs=[file_input, editable_table], outputs=[processed_video_output, elapsed_time_display] ) submit_button.click( upload_and_manage, inputs=[file_input, language_input, process_mode], outputs=[editable_translations, editable_table, processed_video_output, elapsed_time_display] ) # Connect submit button to save_feedback_db function feedback_btn.click( feedback_submission, inputs=[feedback_input], outputs=[response_message, db_download] ) return demo # Launch the Gradio interface demo = build_interface() demo.launch()