import numpy as np | |
import re | |
import concurrent.futures | |
import gradio as gr | |
from datetime import datetime | |
import random | |
import moviepy | |
from transformers import pipeline | |
from transformers.pipelines.audio_utils import ffmpeg_read | |
from moviepy.editor import ( | |
ImageClip, | |
VideoFileClip, | |
TextClip, | |
CompositeVideoClip, | |
CompositeAudioClip, | |
AudioFileClip, | |
concatenate_videoclips, | |
concatenate_audioclips | |
) | |
from PIL import Image, ImageDraw, ImageFont | |
from moviepy.audio.AudioClip import AudioArrayClip | |
import subprocess | |
import speech_recognition as sr | |
import json | |
from nltk.tokenize import sent_tokenize | |
import logging | |
import whisperx | |
import time | |
import os | |
import openai | |
from openai import OpenAI | |
import traceback | |
from TTS.api import TTS | |
import torch | |
from TTS.tts.configs.xtts_config import XttsConfig | |
from pydub import AudioSegment | |
from pyannote.audio import Pipeline | |
import traceback | |
import wave | |
logger = logging.getLogger(__name__) | |
# Accept license terms for Coqui XTTS | |
os.environ["COQUI_TOS_AGREED"] = "1" | |
# torch.serialization.add_safe_globals([XttsConfig]) | |
# Load XTTS model | |
try: | |
print("π Loading XTTS model...") | |
tts = TTS(model_name="tts_models/multilingual/multi-dataset/xtts_v2") | |
print("β XTTS model loaded successfully.") | |
except Exception as e: | |
print("β Error loading XTTS model:") | |
traceback.print_exc() | |
raise e | |
logger.info(gr.__version__) | |
client = OpenAI( | |
api_key= os.environ.get("openAI_api_key"), # This is the default and can be omitted | |
) | |
hf_api_key = os.environ.get("hf_token") | |
# def silence(duration, fps=44100): | |
# """ | |
# Returns a silent AudioClip of the specified duration. | |
# """ | |
# return AudioArrayClip(np.zeros((int(fps*duration), 2)), fps=fps) | |
# def count_words_or_characters(text): | |
# # Count non-Chinese words | |
# non_chinese_words = len(re.findall(r'\b[a-zA-Z0-9]+\b', text)) | |
# # Count Chinese characters | |
# chinese_chars = len(re.findall(r'[\u4e00-\u9fff]', text)) | |
# return non_chinese_words + chinese_chars | |
# # Define the passcode | |
# PASSCODE = "show_feedback_db" | |
# css = """ | |
# /* Adjust row height */ | |
# .dataframe-container tr { | |
# height: 50px !important; | |
# } | |
# /* Ensure text wrapping and prevent overflow */ | |
# .dataframe-container td { | |
# white-space: normal !important; | |
# word-break: break-word !important; | |
# } | |
# /* Set column widths */ | |
# [data-testid="block-container"] .scrolling-dataframe th:nth-child(1), | |
# [data-testid="block-container"] .scrolling-dataframe td:nth-child(1) { | |
# width: 6%; /* Start column */ | |
# } | |
# [data-testid="block-container"] .scrolling-dataframe th:nth-child(2), | |
# [data-testid="block-container"] .scrolling-dataframe td:nth-child(2) { | |
# width: 47%; /* Original text */ | |
# } | |
# [data-testid="block-container"] .scrolling-dataframe th:nth-child(3), | |
# [data-testid="block-container"] .scrolling-dataframe td:nth-child(3) { | |
# width: 47%; /* Translated text */ | |
# } | |
# [data-testid="block-container"] .scrolling-dataframe th:nth-child(4), | |
# [data-testid="block-container"] .scrolling-dataframe td:nth-child(4) { | |
# display: none !important; | |
# } | |
# """ | |
# # Function to save feedback or provide access to the database file | |
# def handle_feedback(feedback): | |
# feedback = feedback.strip() # Clean up leading/trailing whitespace | |
# if not feedback: | |
# return "Feedback cannot be empty.", None | |
# if feedback == PASSCODE: | |
# # Provide access to the feedback.db file | |
# return "Access granted! Download the database file below.", "feedback.db" | |
# else: | |
# # Save feedback to the database | |
# with sqlite3.connect("feedback.db") as conn: | |
# cursor = conn.cursor() | |
# cursor.execute("CREATE TABLE IF NOT EXISTS studio_feedback (id INTEGER PRIMARY KEY, comment TEXT)") | |
# cursor.execute("INSERT INTO studio_feedback (comment) VALUES (?)", (feedback,)) | |
# conn.commit() | |
# return "Thank you for your feedback!", None | |
# # Configure logging | |
# logging.basicConfig(level=logging.DEBUG, format="%(asctime)s - %(levelname)s - %(message)s") | |
# logger = logging.getLogger(__name__) | |
# logger.info(f"MoviePy Version: {moviepy.__version__}") | |
# # def segment_background_audio(audio_path, output_path="background_segments.wav"): | |
# # # Step 2: Initialize pyannote voice activity detection pipeline (you need Hugging Face token) | |
# # pipeline = Pipeline.from_pretrained( | |
# # "pyannote/voice-activity-detection", | |
# # use_auth_token=hf_api_key | |
# # ) | |
# # # Step 3: Run VAD to get speech segments | |
# # vad_result = pipeline(audio_path) | |
# # print(f"Detected speech segments: {vad_result}") | |
# # # Step 4: Load full audio and subtract speech segments | |
# # full_audio = AudioSegment.from_wav(audio_path) | |
# # background_audio = AudioSegment.silent(duration=len(full_audio)) | |
# # for segment in vad_result.itersegments(): | |
# # start_ms = int(segment.start * 1000) | |
# # end_ms = int(segment.end * 1000) | |
# # # Remove speech by muting that portion | |
# # background_audio = background_audio.overlay(AudioSegment.silent(duration=end_ms - start_ms), position=start_ms) | |
# # # Step 5: Subtract background_audio from full_audio | |
# # result_audio = full_audio.overlay(background_audio) | |
# # # Step 6: Export non-speech segments | |
# # result_audio.export(output_path, format="wav") | |
# # print(f"Saved non-speech (background) audio to: {output_path}") | |
# # return True | |
# def transcribe_video_with_speakers(video_path): | |
# # Extract audio from video | |
# video = VideoFileClip(video_path) | |
# audio_path = "audio.wav" | |
# video.audio.write_audiofile(audio_path) | |
# logger.info(f"Audio extracted from video: {audio_path}") | |
# # segment_result = segment_background_audio(audio_path) | |
# # print(f"Saved non-speech (background) audio to local") | |
# # Set up device | |
# device = "cuda" if torch.cuda.is_available() else "cpu" | |
# logger.info(f"Using device: {device}") | |
# try: | |
# # Load a medium model with float32 for broader compatibility | |
# model = whisperx.load_model("medium", device=device, compute_type="float32") | |
# logger.info("WhisperX model loaded") | |
# # Transcribe | |
# result = model.transcribe(audio_path, chunk_size=5, print_progress = True) | |
# logger.info("Audio transcription completed") | |
# # Get the detected language | |
# detected_language = result["language"] | |
# logger.debug(f"Detected language: {detected_language}") | |
# # Alignment | |
# model_a, metadata = whisperx.load_align_model(language_code=result["language"], device=device) | |
# result = whisperx.align(result["segments"], model_a, metadata, audio_path, device) | |
# logger.info("Transcription alignment completed") | |
# # Diarization (works independently of Whisper model size) | |
# diarize_model = whisperx.DiarizationPipeline(use_auth_token=hf_api_key, device=device) | |
# diarize_segments = diarize_model(audio_path) | |
# logger.info("Speaker diarization completed") | |
# # Assign speakers | |
# result = whisperx.assign_word_speakers(diarize_segments, result) | |
# logger.info("Speakers assigned to transcribed segments") | |
# except Exception as e: | |
# logger.error(f"β WhisperX pipeline failed: {e}") | |
# # Extract timestamps, text, and speaker IDs | |
# transcript_with_speakers = [ | |
# { | |
# "start": segment["start"], | |
# "end": segment["end"], | |
# "text": segment["text"], | |
# "speaker": segment["speaker"] | |
# } | |
# for segment in result["segments"] | |
# ] | |
# # Collect audio for each speaker | |
# speaker_audio = {} | |
# for segment in result["segments"]: | |
# speaker = segment["speaker"] | |
# if speaker not in speaker_audio: | |
# speaker_audio[speaker] = [] | |
# speaker_audio[speaker].append((segment["start"], segment["end"])) | |
# # Collapse and truncate speaker audio | |
# speaker_sample_paths = {} | |
# audio_clip = AudioFileClip(audio_path) | |
# for speaker, segments in speaker_audio.items(): | |
# speaker_clips = [audio_clip.subclip(start, end) for start, end in segments] | |
# combined_clip = concatenate_audioclips(speaker_clips) | |
# truncated_clip = combined_clip.subclip(0, min(30, combined_clip.duration)) | |
# sample_path = f"speaker_{speaker}_sample.wav" | |
# truncated_clip.write_audiofile(sample_path) | |
# speaker_sample_paths[speaker] = sample_path | |
# logger.info(f"Created sample for {speaker}: {sample_path}") | |
# # Clean up | |
# video.close() | |
# audio_clip.close() | |
# os.remove(audio_path) | |
# return transcript_with_speakers, detected_language | |
# # Function to get the appropriate translation model based on target language | |
# def get_translation_model(source_language, target_language): | |
# """ | |
# Get the translation model based on the source and target language. | |
# Parameters: | |
# - target_language (str): The language to translate the content into (e.g., 'es', 'fr'). | |
# - source_language (str): The language of the input content (default is 'en' for English). | |
# Returns: | |
# - str: The translation model identifier. | |
# """ | |
# # List of allowable languages | |
# allowable_languages = ["en", "es", "fr", "zh", "de", "it", "pt", "ja", "ko", "ru"] | |
# # Validate source and target languages | |
# if source_language not in allowable_languages: | |
# logger.debug(f"Invalid source language '{source_language}'. Supported languages are: {', '.join(allowable_languages)}") | |
# # Return a default model if source language is invalid | |
# source_language = "en" # Default to 'en' | |
# if target_language not in allowable_languages: | |
# logger.debug(f"Invalid target language '{target_language}'. Supported languages are: {', '.join(allowable_languages)}") | |
# # Return a default model if target language is invalid | |
# target_language = "zh" # Default to 'zh' | |
# if source_language == target_language: | |
# source_language = "en" # Default to 'en' | |
# target_language = "zh" # Default to 'zh' | |
# # Return the model using string concatenation | |
# return f"Helsinki-NLP/opus-mt-{source_language}-{target_language}" | |
# def translate_single_entry(entry, translator): | |
# original_text = entry["text"] | |
# translated_text = translator(original_text)[0]['translation_text'] | |
# return { | |
# "start": entry["start"], | |
# "original": original_text, | |
# "translated": translated_text, | |
# "end": entry["end"], | |
# "speaker": entry["speaker"] | |
# } | |
# def translate_text(transcription_json, source_language, target_language): | |
# # Load the translation model for the specified target language | |
# translation_model_id = get_translation_model(source_language, target_language) | |
# logger.debug(f"Translation model: {translation_model_id}") | |
# translator = pipeline("translation", model=translation_model_id) | |
# # Use ThreadPoolExecutor to parallelize translations | |
# with concurrent.futures.ThreadPoolExecutor() as executor: | |
# # Submit all translation tasks and collect results | |
# translate_func = lambda entry: translate_single_entry(entry, translator) | |
# translated_json = list(executor.map(translate_func, transcription_json)) | |
# # Sort the translated_json by start time | |
# translated_json.sort(key=lambda x: x["start"]) | |
# # Log the components being added to translated_json | |
# for entry in translated_json: | |
# logger.debug("Added to translated_json: start=%s, original=%s, translated=%s, end=%s, speaker=%s", | |
# entry["start"], entry["original"], entry["translated"], entry["end"], entry["speaker"]) | |
# return translated_json | |
# def update_translations(file, edited_table, mode): | |
# """ | |
# Update the translations based on user edits in the Gradio Dataframe. | |
# """ | |
# output_video_path = "output_video.mp4" | |
# logger.debug(f"Editable Table: {edited_table}") | |
# if file is None: | |
# logger.info("No file uploaded. Please upload a video/audio file.") | |
# return None, [], None, "No file uploaded. Please upload a video/audio file." | |
# try: | |
# start_time = time.time() # Start the timer | |
# # Convert the edited_table (list of lists) back to list of dictionaries | |
# updated_translations = [ | |
# { | |
# "start": row["start"], # Access by column name | |
# "original": row["original"], | |
# "translated": row["translated"], | |
# "end": row["end"] | |
# } | |
# for _, row in edited_table.iterrows() | |
# ] | |
# # Call the function to process the video with updated translations | |
# add_transcript_voiceover(file.name, updated_translations, output_video_path, mode=="Transcription with Voiceover") | |
# # Calculate elapsed time | |
# elapsed_time = time.time() - start_time | |
# elapsed_time_display = f"Updates applied successfully in {elapsed_time:.2f} seconds." | |
# return output_video_path, elapsed_time_display | |
# except Exception as e: | |
# raise ValueError(f"Error updating translations: {e}") | |
# def create_subtitle_clip_pil(text, start_time, end_time, video_width, video_height, font_path): | |
# try: | |
# subtitle_width = int(video_width * 0.8) | |
# subtitle_font_size = int(video_height // 20) | |
# font = ImageFont.truetype(font_path, subtitle_font_size) | |
# dummy_img = Image.new("RGBA", (subtitle_width, 1), (0, 0, 0, 0)) | |
# draw = ImageDraw.Draw(dummy_img) | |
# lines = [] | |
# line = "" | |
# for word in text.split(): | |
# test_line = f"{line} {word}".strip() | |
# bbox = draw.textbbox((0, 0), test_line, font=font) | |
# w = bbox[2] - bbox[0] | |
# if w <= subtitle_width - 10: | |
# line = test_line | |
# else: | |
# lines.append(line) | |
# line = word | |
# lines.append(line) | |
# line_heights = [draw.textbbox((0, 0), l, font=font)[3] - draw.textbbox((0, 0), l, font=font)[1] for l in lines] | |
# total_height = sum(line_heights) + (len(lines) - 1) * 5 | |
# img = Image.new("RGBA", (subtitle_width, total_height), (0, 0, 0, 0)) | |
# draw = ImageDraw.Draw(img) | |
# y = 0 | |
# for idx, line in enumerate(lines): | |
# bbox = draw.textbbox((0, 0), line, font=font) | |
# w = bbox[2] - bbox[0] | |
# draw.text(((subtitle_width - w) // 2, y), line, font=font, fill="yellow") | |
# y += line_heights[idx] + 5 | |
# img_np = np.array(img) # <- β Fix: convert to NumPy | |
# txt_clip = ImageClip(img_np).set_start(start_time).set_duration(end_time - start_time).set_position("bottom").set_opacity(0.8) | |
# return txt_clip | |
# except Exception as e: | |
# logger.error(f"\u274c Failed to create subtitle clip: {e}") | |
# return None | |
# def process_entry(entry, i, video_width, video_height, add_voiceover, target_language, font_path, speaker_sample_paths=None): | |
# logger.debug(f"Processing entry {i}: {entry}") | |
# error_message = None | |
# try: | |
# txt_clip = create_subtitle_clip_pil(entry["translated"], entry["start"], entry["end"], video_width, video_height, font_path) | |
# except Exception as e: | |
# error_message = f"β Failed to create subtitle clip for entry {i}: {e}" | |
# logger.error(error_message) | |
# txt_clip = None | |
# audio_segment = None | |
# if add_voiceover: | |
# try: | |
# segment_audio_path = f"segment_{i}_voiceover.wav" | |
# desired_duration = entry["end"] - entry["start"] | |
# speaker = entry.get("speaker", "default") | |
# speaker_wav_path = f"speaker_{speaker}_sample.wav" | |
# output_path, status_msg, tts_error = generate_voiceover_clone([entry], desired_duration, target_language, speaker_wav_path, segment_audio_path) | |
# if tts_error: | |
# error_message = error_message + " | " + tts_error if error_message else tts_error | |
# if not output_path or not os.path.exists(segment_audio_path): | |
# raise FileNotFoundError(f"Voiceover file not generated at: {segment_audio_path}") | |
# audio_clip = AudioFileClip(segment_audio_path) | |
# logger.debug(f"Audio clip duration: {audio_clip.duration}, Desired duration: {desired_duration}") | |
# if audio_clip.duration < desired_duration: | |
# silence_duration = desired_duration - audio_clip.duration | |
# audio_clip = concatenate_audioclips([audio_clip, silence(duration=silence_duration)]) | |
# logger.info(f"Padded audio with {silence_duration} seconds of silence.") | |
# audio_segment = audio_clip.set_start(entry["start"]).set_duration(desired_duration) | |
# except Exception as e: | |
# err = f"β Failed to generate audio segment for entry {i}: {e}" | |
# logger.error(err) | |
# error_message = error_message + " | " + err if error_message else err | |
# audio_segment = None | |
# return i, txt_clip, audio_segment, error_message | |
# def add_transcript_voiceover(video_path, translated_json, output_path, add_voiceover=False, target_language="en", speaker_sample_paths=None): | |
# video = VideoFileClip(video_path) | |
# font_path = "./NotoSansSC-Regular.ttf" | |
# text_clips = [] | |
# audio_segments = [] | |
# error_messages = [] | |
# with concurrent.futures.ThreadPoolExecutor() as executor: | |
# futures = [executor.submit(process_entry, entry, i, video.w, video.h, add_voiceover, target_language, font_path, speaker_sample_paths) | |
# for i, entry in enumerate(translated_json)] | |
# results = [] | |
# for future in concurrent.futures.as_completed(futures): | |
# try: | |
# i, txt_clip, audio_segment, error = future.result() | |
# results.append((i, txt_clip, audio_segment)) | |
# if error: | |
# error_messages.append(f"[Entry {i}] {error}") | |
# except Exception as e: | |
# err = f"β Unexpected error in future result: {e}" | |
# logger.error(err) | |
# error_messages.append(err) | |
# # Sort by entry index to ensure order | |
# results.sort(key=lambda x: x[0]) | |
# text_clips = [clip for _, clip, _ in results if clip] | |
# if add_voiceover: | |
# audio_segments = [segment for _, _, segment in results if segment] | |
# final_video = CompositeVideoClip([video] + text_clips) | |
# if add_voiceover: | |
# if audio_segments: | |
# final_audio = CompositeAudioClip(audio_segments).set_duration(video.duration) | |
# final_video = final_video.set_audio(final_audio) | |
# else: | |
# logger.warning("β οΈ No audio segments available. Adding silent fallback.") | |
# silent_audio = AudioClip(lambda t: 0, duration=video.duration) | |
# final_video = final_video.set_audio(silent_audio) | |
# logger.info(f"Saving the final video to: {output_path}") | |
# final_video.write_videofile(output_path, codec="libx264", audio_codec="aac") | |
# logger.info("Video processing completed successfully.") | |
# # Optional: return errors | |
# if error_messages: | |
# logger.warning("β οΈ Errors encountered during processing:") | |
# for msg in error_messages: | |
# logger.warning(msg) | |
# return error_messages | |
# # Initialize TTS model only once (outside the function) | |
# tts = TTS(model_name="tts_models/multilingual/multi-dataset/xtts_v2") | |
# def generate_voiceover_clone(translated_json, desired_duration, target_language, speaker_wav_path, output_audio_path): | |
# try: | |
# full_text = " ".join(entry["translated"] for entry in translated_json if "translated" in entry and entry["translated"].strip()) | |
# if not full_text.strip(): | |
# msg = "β Translated text is empty." | |
# logger.error(msg) | |
# return None, msg, msg | |
# if not speaker_wav_path or not os.path.exists(speaker_wav_path): | |
# msg = f"β Speaker audio not found: {speaker_wav_path}" | |
# logger.error(msg) | |
# return None, msg, msg | |
# # # Truncate text based on max token assumption (~60 tokens) | |
# # MAX_TTS_TOKENS = 60 | |
# # tokens = full_text.split() # crude token count | |
# # if len(tokens) > MAX_TTS_TOKENS: | |
# # logger.warning(f"β οΈ Text too long for TTS model ({len(tokens)} tokens). Truncating to {MAX_TTS_TOKENS} tokens.") | |
# # full_text = " ".join(tokens[:MAX_TTS_TOKENS]) | |
# speed_tts = calibrated_speed(full_text, desired_duration) | |
# tts.tts_to_file( | |
# text=full_text, | |
# speaker_wav=speaker_wav_path, | |
# language=target_language, | |
# file_path=output_audio_path, | |
# speed=speed_tts, | |
# split_sentences=True | |
# ) | |
# if not os.path.exists(output_audio_path): | |
# msg = f"β Voiceover file not generated at: {output_audio_path}" | |
# logger.error(msg) | |
# return None, msg, msg | |
# msg = "β Voice cloning completed successfully." | |
# logger.info(msg) | |
# return output_audio_path, msg, None | |
# except Exception as e: | |
# err_msg = f"β An error occurred: {str(e)}" | |
# logger.error("β Error during voice cloning:") | |
# logger.error(traceback.format_exc()) | |
# return None, err_msg, err_msg | |
# def calibrated_speed(text, desired_duration): | |
# """ | |
# Compute a speed factor to help TTS fit audio into desired duration, | |
# using a simple truncated linear function of characters per second. | |
# """ | |
# char_count = len(text.strip()) | |
# if char_count == 0 or desired_duration <= 0: | |
# return 1.0 # fallback | |
# cps = char_count / desired_duration # characters per second | |
# # Truncated linear mapping | |
# if cps < 10: | |
# return 1.0 | |
# elif cps > 25: | |
# return 1.4 | |
# else: | |
# # Linearly scale between cps 10 -> 25 and speed 1.0 -> 1.3 | |
# slope = (1.4 - 1.0) / (25 - 10) | |
# return 1.0 + slope * (cps - 10) | |
# def upload_and_manage(file, target_language, mode="transcription"): | |
# if file is None: | |
# logger.info("No file uploaded. Please upload a video/audio file.") | |
# return None, [], None, "No file uploaded. Please upload a video/audio file." | |
# try: | |
# start_time = time.time() # Start the timer | |
# logger.info(f"Started processing file: {file.name}") | |
# # Define paths for audio and output files | |
# audio_path = "audio.wav" | |
# output_video_path = "output_video.mp4" | |
# voiceover_path = "voiceover.wav" | |
# logger.info(f"Using audio path: {audio_path}, output video path: {output_video_path}, voiceover path: {voiceover_path}") | |
# # Step 1: Transcribe audio from uploaded media file and get timestamps | |
# logger.info("Transcribing audio...") | |
# transcription_json, source_language = transcribe_video_with_speakers(file.name) | |
# logger.info(f"Transcription completed. Detected source language: {source_language}") | |
# # Step 2: Translate the transcription | |
# logger.info(f"Translating transcription from {source_language} to {target_language}...") | |
# translated_json = translate_text(transcription_json, source_language, target_language) | |
# logger.info(f"Translation completed. Number of translated segments: {len(translated_json)}") | |
# # Step 3: Add transcript to video based on timestamps | |
# logger.info("Adding translated transcript to video...") | |
# add_transcript_voiceover(file.name, translated_json, output_video_path, mode == "Transcription with Voiceover", target_language) | |
# logger.info(f"Transcript added to video. Output video saved at {output_video_path}") | |
# # Convert translated JSON into a format for the editable table | |
# logger.info("Converting translated JSON into editable table format...") | |
# editable_table = [ | |
# [float(entry["start"]), entry["original"], entry["translated"], float(entry["end"]), entry["speaker"]] | |
# for entry in translated_json | |
# ] | |
# # Calculate elapsed time | |
# elapsed_time = time.time() - start_time | |
# elapsed_time_display = f"Processing completed in {elapsed_time:.2f} seconds." | |
# logger.info(f"Processing completed in {elapsed_time:.2f} seconds.") | |
# return translated_json, editable_table, output_video_path, elapsed_time_display | |
# except Exception as e: | |
# logger.error(f"An error occurred: {str(e)}") | |
# return None, [], None, f"An error occurred: {str(e)}" | |
# # Gradio Interface with Tabs | |
# def build_interface(): | |
# with gr.Blocks(css=css) as demo: | |
# gr.Markdown("## Video Localization") | |
# with gr.Row(): | |
# with gr.Column(scale=4): | |
# file_input = gr.File(label="Upload Video/Audio File") | |
# language_input = gr.Dropdown(["en", "es", "fr", "zh"], label="Select Language") # Language codes | |
# process_mode = gr.Radio(choices=["Transcription", "Transcription with Voiceover"], label="Choose Processing Type", value="Transcription") | |
# submit_button = gr.Button("Post and Process") | |
# editable_translations = gr.State(value=[]) | |
# with gr.Column(scale=8): | |
# gr.Markdown("## Edit Translations") | |
# # Editable JSON Data | |
# editable_table = gr.Dataframe( | |
# value=[], # Default to an empty list to avoid undefined values | |
# headers=["start", "original", "translated", "end", "speaker"], | |
# datatype=["number", "str", "str", "number", "str"], | |
# row_count=1, # Initially empty | |
# col_count=5, | |
# interactive=[False, True, True, False, False], # Control editability | |
# label="Edit Translations", | |
# wrap=True # Enables text wrapping if supported | |
# ) | |
# save_changes_button = gr.Button("Save Changes") | |
# processed_video_output = gr.File(label="Download Processed Video", interactive=True) # Download button | |
# elapsed_time_display = gr.Textbox(label="Elapsed Time", lines=1, interactive=False) | |
# with gr.Column(scale=1): | |
# gr.Markdown("**Feedback**") | |
# feedback_input = gr.Textbox( | |
# placeholder="Leave your feedback here...", | |
# label=None, | |
# lines=3, | |
# ) | |
# feedback_btn = gr.Button("Submit Feedback") | |
# response_message = gr.Textbox(label=None, lines=1, interactive=False) | |
# db_download = gr.File(label="Download Database File", visible=False) | |
# # Link the feedback handling | |
# def feedback_submission(feedback): | |
# message, file_path = handle_feedback(feedback) | |
# if file_path: | |
# return message, gr.update(value=file_path, visible=True) | |
# return message, gr.update(visible=False) | |
# save_changes_button.click( | |
# update_translations, | |
# inputs=[file_input, editable_table, process_mode], | |
# outputs=[processed_video_output, elapsed_time_display] | |
# ) | |
# submit_button.click( | |
# upload_and_manage, | |
# inputs=[file_input, language_input, process_mode], | |
# outputs=[editable_translations, editable_table, processed_video_output, elapsed_time_display] | |
# ) | |
# # Connect submit button to save_feedback_db function | |
# feedback_btn.click( | |
# feedback_submission, | |
# inputs=[feedback_input], | |
# outputs=[response_message, db_download] | |
# ) | |
# return demo | |
# # Launch the Gradio interface | |
# demo = build_interface() | |
# demo.launch() | |
import gradio as gr | |
def dummy_func(x): | |
return x, "Success" | |
with gr.Blocks() as demo: | |
inp = gr.Textbox() | |
out1 = gr.Textbox() | |
out2 = gr.Textbox() | |
btn = gr.Button("Run") | |
btn.click(dummy_func, inputs=inp, outputs=[out1, out2]) | |
demo.launch() | |