{
"cells": [
{
"cell_type": "markdown",
"metadata": {
"colab_type": "text",
"id": "view-in-github"
},
"source": [
""
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "8lw0EgLex-YZ"
},
"source": [
"# SoniTranslate embedded app\n",
"\n",
"`This notebook embeds the Gradio app directly into a cell, allowing you to interact with it without needing to open a separate browser window or navigate to a public/local URL.`\n",
"\n",
"| Description | Link |\n",
"| ----------- | ---- |\n",
"| 🎉 Repository | [](https://github.com/R3gm/SoniTranslate/) |\n",
"| 🚀 Online Demo in HF | [](https://huggingface.co/spaces/r3gm/SoniTranslate_translate_audio_of_a_video_content) |\n",
"\n",
"\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"cellView": "form",
"id": "LUgwm0rfx0_J"
},
"outputs": [],
"source": [
"# @title Install requirements for SoniTranslate\n",
"!git clone https://github.com/r3gm/SoniTranslate.git\n",
"%cd SoniTranslate\n",
"\n",
"!pip uninstall chex pandas-stubs ibis-framework albumentations albucore -y -q\n",
"!python -m pip install -q pip==23.1.2\n",
"!apt install git-lfs\n",
"!git lfs install\n",
"\n",
"!sed -i 's|git+https://github.com/R3gm/whisperX.git@cuda_11_8|git+https://github.com/R3gm/whisperX.git@cuda_12_x|' requirements_base.txt\n",
"!pip install -q -r requirements_base.txt\n",
"!pip install -q -r requirements_extra.txt\n",
"!pip install -q ort-nightly-gpu --index-url=https://aiinfra.pkgs.visualstudio.com/PublicPackages/_packaging/ort-cuda-12-nightly/pypi/simple/\n",
"\n",
"Install_PIPER_TTS = True # @param {type:\"boolean\"}\n",
"\n",
"if Install_PIPER_TTS:\n",
" !pip install -q piper-tts==1.2.0\n",
"\n",
"Install_Coqui_XTTS = True # @param {type:\"boolean\"}\n",
"\n",
"if Install_Coqui_XTTS:\n",
" !pip install -q -r requirements_xtts.txt\n",
" !pip install -q TTS==0.21.1 --no-deps"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "LTaTstXPXNg2"
},
"source": [
"One important step is to accept the license agreement for using Pyannote. You need to have an account on Hugging Face and `accept the license to use the models`: https://huggingface.co/pyannote/speaker-diarization and https://huggingface.co/pyannote/segmentation\n",
"\n",
"\n",
"Get your KEY TOKEN here: https://hf.co/settings/tokens\n",
"\n",
"When you are creating the new Access Token in Hugging Face, make sure to tick \"Read access to contents of all public gated repos you can access\"."
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "NRAsK95dJSgq"
},
"source": [
"Directory output: /content/SoniTranslate/outputs"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"cellView": "form",
"id": "XkhXfaFw4R4J"
},
"outputs": [],
"source": [
"#@markdown # `RUN THE WEB APP`\n",
"YOUR_HF_TOKEN = \"\" #@param {type:'string'}\n",
"%env YOUR_HF_TOKEN={YOUR_HF_TOKEN}\n",
"theme_var = \"Taithrah/Minimal\" # @param [\"Taithrah/Minimal\", \"aliabid94/new-theme\", \"gstaff/xkcd\", \"ParityError/LimeFace\", \"abidlabs/pakistan\", \"rottenlittlecreature/Moon_Goblin\", \"ysharma/llamas\", \"gradio/dracula_revamped\"]\n",
"interface_language_var = \"english\" # @param ['afrikaans', 'arabic', 'azerbaijani', 'chinese_zh_cn', 'english', 'french', 'german', 'hindi', 'indonesian', 'italian', 'japanese', 'korean', 'marathi', 'persian', 'polish', 'portuguese', 'russian', 'spanish', 'swedish', 'turkish', 'ukrainian', 'vietnamese']\n",
"verbosity_level_var = \"error\" # @param [\"debug\", \"info\", \"warning\", \"error\", \"critical\"]\n",
"#@markdown ### `The interface will appear down here 👇`\n",
"\n",
"%cd /content/SoniTranslate\n",
"import gradio as gr\n",
"from soni_translate.logging_setup import (\n",
" logger,\n",
" set_logging_level,\n",
" configure_logging_libs,\n",
"); configure_logging_libs() # noqa\n",
"import whisperx\n",
"import torch\n",
"import os\n",
"from soni_translate.audio_segments import create_translated_audio\n",
"from soni_translate.text_to_speech import (\n",
" audio_segmentation_to_voice,\n",
" edge_tts_voices_list,\n",
" coqui_xtts_voices_list,\n",
" piper_tts_voices_list,\n",
" create_wav_file_vc,\n",
" accelerate_segments,\n",
")\n",
"from soni_translate.translate_segments import (\n",
" translate_text,\n",
" TRANSLATION_PROCESS_OPTIONS,\n",
" DOCS_TRANSLATION_PROCESS_OPTIONS\n",
")\n",
"from soni_translate.preprocessor import (\n",
" audio_video_preprocessor,\n",
" audio_preprocessor,\n",
")\n",
"from soni_translate.postprocessor import (\n",
" OUTPUT_TYPE_OPTIONS,\n",
" DOCS_OUTPUT_TYPE_OPTIONS,\n",
" sound_separate,\n",
" get_no_ext_filename,\n",
" media_out,\n",
" get_subtitle_speaker,\n",
")\n",
"from soni_translate.language_configuration import (\n",
" LANGUAGES,\n",
" UNIDIRECTIONAL_L_LIST,\n",
" LANGUAGES_LIST,\n",
" BARK_VOICES_LIST,\n",
" VITS_VOICES_LIST,\n",
" OPENAI_TTS_MODELS,\n",
")\n",
"from soni_translate.utils import (\n",
" remove_files,\n",
" download_list,\n",
" upload_model_list,\n",
" download_manager,\n",
" run_command,\n",
" is_audio_file,\n",
" is_subtitle_file,\n",
" copy_files,\n",
" get_valid_files,\n",
" get_link_list,\n",
" remove_directory_contents,\n",
")\n",
"from soni_translate.mdx_net import (\n",
" UVR_MODELS,\n",
" MDX_DOWNLOAD_LINK,\n",
" mdxnet_models_dir,\n",
")\n",
"from soni_translate.speech_segmentation import (\n",
" ASR_MODEL_OPTIONS,\n",
" COMPUTE_TYPE_GPU,\n",
" COMPUTE_TYPE_CPU,\n",
" find_whisper_models,\n",
" transcribe_speech,\n",
" align_speech,\n",
" diarize_speech,\n",
" diarization_models,\n",
")\n",
"from soni_translate.text_multiformat_processor import (\n",
" BORDER_COLORS,\n",
" srt_file_to_segments,\n",
" document_preprocessor,\n",
" determine_chunk_size,\n",
" plain_text_to_segments,\n",
" segments_to_plain_text,\n",
" process_subtitles,\n",
" linguistic_level_segments,\n",
" break_aling_segments,\n",
" doc_to_txtximg_pages,\n",
" page_data_to_segments,\n",
" update_page_data,\n",
" fix_timestamps_docs,\n",
" create_video_from_images,\n",
" merge_video_and_audio,\n",
")\n",
"from soni_translate.languages_gui import language_data, news\n",
"import copy\n",
"import logging\n",
"import json\n",
"from pydub import AudioSegment\n",
"from voice_main import ClassVoices\n",
"import argparse\n",
"import time\n",
"import hashlib\n",
"import sys\n",
"\n",
"directories = [\n",
" \"downloads\",\n",
" \"logs\",\n",
" \"weights\",\n",
" \"clean_song_output\",\n",
" \"_XTTS_\",\n",
" f\"audio2{os.sep}audio\",\n",
" \"audio\",\n",
" \"outputs\",\n",
"]\n",
"[\n",
" os.makedirs(directory)\n",
" for directory in directories\n",
" if not os.path.exists(directory)\n",
"]\n",
"\n",
"\n",
"class TTS_Info:\n",
" def __init__(self, piper_enabled, xtts_enabled):\n",
" self.list_edge = edge_tts_voices_list()\n",
" self.list_bark = list(BARK_VOICES_LIST.keys())\n",
" self.list_vits = list(VITS_VOICES_LIST.keys())\n",
" self.list_openai_tts = OPENAI_TTS_MODELS\n",
" self.piper_enabled = piper_enabled\n",
" self.list_vits_onnx = (\n",
" piper_tts_voices_list() if self.piper_enabled else []\n",
" )\n",
" self.xtts_enabled = xtts_enabled\n",
"\n",
" def tts_list(self):\n",
" self.list_coqui_xtts = (\n",
" coqui_xtts_voices_list() if self.xtts_enabled else []\n",
" )\n",
" list_tts = self.list_coqui_xtts + sorted(\n",
" self.list_edge\n",
" + self.list_bark\n",
" + self.list_vits\n",
" + self.list_openai_tts\n",
" + self.list_vits_onnx\n",
" )\n",
" return list_tts\n",
"\n",
"\n",
"def prog_disp(msg, percent, is_gui, progress=None):\n",
" logger.info(msg)\n",
" if is_gui:\n",
" progress(percent, desc=msg)\n",
"\n",
"\n",
"def warn_disp(wrn_lang, is_gui):\n",
" logger.warning(wrn_lang)\n",
" if is_gui:\n",
" gr.Warning(wrn_lang)\n",
"\n",
"\n",
"class SoniTrCache:\n",
" def __init__(self):\n",
" self.cache = {\n",
" 'media': [[]],\n",
" 'refine_vocals': [],\n",
" 'transcript_align': [],\n",
" 'break_align': [],\n",
" 'diarize': [],\n",
" 'translate': [],\n",
" 'subs_and_edit': [],\n",
" 'tts': [],\n",
" 'acc_and_vc': [],\n",
" 'mix_aud': [],\n",
" 'output': []\n",
" }\n",
"\n",
" self.cache_data = {\n",
" 'media': [],\n",
" 'refine_vocals': [],\n",
" 'transcript_align': [],\n",
" 'break_align': [],\n",
" 'diarize': [],\n",
" 'translate': [],\n",
" 'subs_and_edit': [],\n",
" 'tts': [],\n",
" 'acc_and_vc': [],\n",
" 'mix_aud': [],\n",
" 'output': []\n",
" }\n",
"\n",
" self.cache_keys = list(self.cache.keys())\n",
" self.first_task = self.cache_keys[0]\n",
" self.last_task = self.cache_keys[-1]\n",
"\n",
" self.pre_step = None\n",
" self.pre_params = []\n",
"\n",
" def set_variable(self, variable_name, value):\n",
" setattr(self, variable_name, value)\n",
"\n",
" def task_in_cache(self, step: str, params: list, previous_step_data: dict):\n",
"\n",
" self.pre_step_cache = None\n",
"\n",
" if step == self.first_task:\n",
" self.pre_step = None\n",
"\n",
" if self.pre_step:\n",
" self.cache[self.pre_step] = self.pre_params\n",
"\n",
" # Fill data in cache\n",
" self.cache_data[self.pre_step] = copy.deepcopy(previous_step_data)\n",
"\n",
" self.pre_params = params\n",
" # logger.debug(f\"Step: {str(step)}, Cache params: {str(self.cache)}\")\n",
" if params == self.cache[step]:\n",
" logger.debug(f\"In cache: {str(step)}\")\n",
"\n",
" # Set the var needed for next step\n",
" # Recovery from cache_data the current step\n",
" for key, value in self.cache_data[step].items():\n",
" self.set_variable(key, copy.deepcopy(value))\n",
" logger.debug(\n",
" f\"Chache load: {str(key)}\"\n",
" )\n",
"\n",
" self.pre_step = step\n",
" return True\n",
"\n",
" else:\n",
" logger.debug(f\"Flush next and caching {str(step)}\")\n",
" selected_index = self.cache_keys.index(step)\n",
"\n",
" for idx, key in enumerate(self.cache.keys()):\n",
" if idx >= selected_index:\n",
" self.cache[key] = []\n",
" self.cache_data[key] = {}\n",
"\n",
" # The last is now previous\n",
" self.pre_step = step\n",
" return False\n",
"\n",
" def clear_cache(self, media, force=False):\n",
"\n",
" self.cache[\"media\"] = (\n",
" self.cache[\"media\"] if len(self.cache[\"media\"]) else [[]]\n",
" )\n",
"\n",
" if media != self.cache[\"media\"][0] or force:\n",
"\n",
" # Clear cache\n",
" self.cache = {key: [] for key in self.cache}\n",
" self.cache[\"media\"] = [[]]\n",
"\n",
" logger.info(\"Cache flushed\")\n",
"\n",
"\n",
"def get_hash(filepath):\n",
" with open(filepath, 'rb') as f:\n",
" file_hash = hashlib.blake2b()\n",
" while chunk := f.read(8192):\n",
" file_hash.update(chunk)\n",
"\n",
" return file_hash.hexdigest()[:18]\n",
"\n",
"\n",
"def check_openai_api_key():\n",
" if not os.environ.get(\"OPENAI_API_KEY\"):\n",
" raise ValueError(\n",
" \"To use GPT for translation, please set up your OpenAI API key \"\n",
" \"as an environment variable in Linux as follows: \"\n",
" \"export OPENAI_API_KEY='your-api-key-here'. Or change the \"\n",
" \"translation process in Advanced settings.\"\n",
" )\n",
"\n",
"\n",
"class SoniTranslate(SoniTrCache):\n",
" def __init__(self, cpu_mode=False):\n",
" super().__init__()\n",
" if cpu_mode:\n",
" os.environ[\"SONITR_DEVICE\"] = \"cpu\"\n",
" else:\n",
" os.environ[\"SONITR_DEVICE\"] = (\n",
" \"cuda\" if torch.cuda.is_available() else \"cpu\"\n",
" )\n",
"\n",
" self.device = os.environ.get(\"SONITR_DEVICE\")\n",
" self.result_diarize = None\n",
" self.align_language = None\n",
" self.result_source_lang = None\n",
" self.edit_subs_complete = False\n",
" self.voiceless_id = None\n",
" self.burn_subs_id = None\n",
"\n",
" self.vci = ClassVoices(only_cpu=cpu_mode)\n",
"\n",
" self.tts_voices = self.get_tts_voice_list()\n",
"\n",
" logger.info(f\"Working in: {self.device}\")\n",
"\n",
" def get_tts_voice_list(self):\n",
" try:\n",
" from piper import PiperVoice # noqa\n",
"\n",
" piper_enabled = True\n",
" logger.info(\"PIPER TTS enabled\")\n",
" except Exception as error:\n",
" logger.debug(str(error))\n",
" piper_enabled = False\n",
" logger.info(\"PIPER TTS disabled\")\n",
" try:\n",
" from TTS.api import TTS # noqa\n",
"\n",
" xtts_enabled = True\n",
" logger.info(\"Coqui XTTS enabled\")\n",
" logger.info(\n",
" \"In this app, by using Coqui TTS (text-to-speech), you \"\n",
" \"acknowledge and agree to the license.\\n\"\n",
" \"You confirm that you have read, understood, and agreed \"\n",
" \"to the Terms and Conditions specified at the following \"\n",
" \"link:\\nhttps://coqui.ai/cpml.txt.\"\n",
" )\n",
" os.environ[\"COQUI_TOS_AGREED\"] = \"1\"\n",
" except Exception as error:\n",
" logger.debug(str(error))\n",
" xtts_enabled = False\n",
" logger.info(\"Coqui XTTS disabled\")\n",
"\n",
" self.tts_info = TTS_Info(piper_enabled, xtts_enabled)\n",
"\n",
" return self.tts_info.tts_list()\n",
"\n",
" def batch_multilingual_media_conversion(self, *kwargs):\n",
" # logger.debug(str(kwargs))\n",
"\n",
" media_file_arg = kwargs[0] if kwargs[0] is not None else []\n",
"\n",
" link_media_arg = kwargs[1]\n",
" link_media_arg = [x.strip() for x in link_media_arg.split(',')]\n",
" link_media_arg = get_link_list(link_media_arg)\n",
"\n",
" path_arg = kwargs[2]\n",
" path_arg = [x.strip() for x in path_arg.split(',')]\n",
" path_arg = get_valid_files(path_arg)\n",
"\n",
" edit_text_arg = kwargs[31]\n",
" get_text_arg = kwargs[32]\n",
"\n",
" is_gui_arg = kwargs[-1]\n",
"\n",
" kwargs = kwargs[3:]\n",
"\n",
" media_batch = media_file_arg + link_media_arg + path_arg\n",
" media_batch = list(filter(lambda x: x != \"\", media_batch))\n",
" media_batch = media_batch if media_batch else [None]\n",
" logger.debug(str(media_batch))\n",
"\n",
" remove_directory_contents(\"outputs\")\n",
"\n",
" if edit_text_arg or get_text_arg:\n",
" return self.multilingual_media_conversion(\n",
" media_batch[0], \"\", \"\", *kwargs\n",
" )\n",
"\n",
" if \"SET_LIMIT\" == os.getenv(\"DEMO\"):\n",
" media_batch = [media_batch[0]]\n",
"\n",
" result = []\n",
" for media in media_batch:\n",
" # Call the nested function with the parameters\n",
" output_file = self.multilingual_media_conversion(\n",
" media, \"\", \"\", *kwargs\n",
" )\n",
"\n",
" if isinstance(output_file, str):\n",
" output_file = [output_file]\n",
" result.extend(output_file)\n",
"\n",
" if is_gui_arg and len(media_batch) > 1:\n",
" gr.Info(f\"Done: {os.path.basename(output_file[0])}\")\n",
"\n",
" return result\n",
"\n",
" def multilingual_media_conversion(\n",
" self,\n",
" media_file=None,\n",
" link_media=\"\",\n",
" directory_input=\"\",\n",
" YOUR_HF_TOKEN=\"\",\n",
" preview=False,\n",
" transcriber_model=\"large-v3\",\n",
" batch_size=4,\n",
" compute_type=\"auto\",\n",
" origin_language=\"Automatic detection\",\n",
" target_language=\"English (en)\",\n",
" min_speakers=1,\n",
" max_speakers=1,\n",
" tts_voice00=\"en-US-EmmaMultilingualNeural-Female\",\n",
" tts_voice01=\"en-US-AndrewMultilingualNeural-Male\",\n",
" tts_voice02=\"en-US-AvaMultilingualNeural-Female\",\n",
" tts_voice03=\"en-US-BrianMultilingualNeural-Male\",\n",
" tts_voice04=\"de-DE-SeraphinaMultilingualNeural-Female\",\n",
" tts_voice05=\"de-DE-FlorianMultilingualNeural-Male\",\n",
" tts_voice06=\"fr-FR-VivienneMultilingualNeural-Female\",\n",
" tts_voice07=\"fr-FR-RemyMultilingualNeural-Male\",\n",
" tts_voice08=\"en-US-EmmaMultilingualNeural-Female\",\n",
" tts_voice09=\"en-US-AndrewMultilingualNeural-Male\",\n",
" tts_voice10=\"en-US-EmmaMultilingualNeural-Female\",\n",
" tts_voice11=\"en-US-AndrewMultilingualNeural-Male\",\n",
" video_output_name=\"\",\n",
" mix_method_audio=\"Adjusting volumes and mixing audio\",\n",
" max_accelerate_audio=2.1,\n",
" acceleration_rate_regulation=False,\n",
" volume_original_audio=0.25,\n",
" volume_translated_audio=1.80,\n",
" output_format_subtitle=\"srt\",\n",
" get_translated_text=False,\n",
" get_video_from_text_json=False,\n",
" text_json=\"{}\",\n",
" avoid_overlap=False,\n",
" vocal_refinement=False,\n",
" literalize_numbers=True,\n",
" segment_duration_limit=15,\n",
" diarization_model=\"pyannote_2.1\",\n",
" translate_process=\"google_translator_batch\",\n",
" subtitle_file=None,\n",
" output_type=\"video (mp4)\",\n",
" voiceless_track=False,\n",
" voice_imitation=False,\n",
" voice_imitation_max_segments=3,\n",
" voice_imitation_vocals_dereverb=False,\n",
" voice_imitation_remove_previous=True,\n",
" voice_imitation_method=\"freevc\",\n",
" dereverb_automatic_xtts=True,\n",
" text_segmentation_scale=\"sentence\",\n",
" divide_text_segments_by=\"\",\n",
" soft_subtitles_to_video=True,\n",
" burn_subtitles_to_video=False,\n",
" enable_cache=True,\n",
" custom_voices=False,\n",
" custom_voices_workers=1,\n",
" is_gui=False,\n",
" progress=gr.Progress(),\n",
" ):\n",
" if not YOUR_HF_TOKEN:\n",
" YOUR_HF_TOKEN = os.getenv(\"YOUR_HF_TOKEN\")\n",
" if diarization_model == \"disable\" or max_speakers == 1:\n",
" if YOUR_HF_TOKEN is None:\n",
" YOUR_HF_TOKEN = \"\"\n",
" elif not YOUR_HF_TOKEN:\n",
" raise ValueError(\"No valid Hugging Face token\")\n",
" else:\n",
" os.environ[\"YOUR_HF_TOKEN\"] = YOUR_HF_TOKEN\n",
"\n",
" if (\n",
" \"gpt\" in translate_process\n",
" or transcriber_model == \"OpenAI_API_Whisper\"\n",
" or \"OpenAI-TTS\" in tts_voice00\n",
" ):\n",
" check_openai_api_key()\n",
"\n",
" if media_file is None:\n",
" media_file = (\n",
" directory_input\n",
" if os.path.exists(directory_input)\n",
" else link_media\n",
" )\n",
" media_file = (\n",
" media_file if isinstance(media_file, str) else media_file.name\n",
" )\n",
"\n",
" if is_subtitle_file(media_file):\n",
" subtitle_file = media_file\n",
" media_file = \"\"\n",
"\n",
" if media_file is None:\n",
" media_file = \"\"\n",
"\n",
" if not origin_language:\n",
" origin_language = \"Automatic detection\"\n",
"\n",
" if origin_language in UNIDIRECTIONAL_L_LIST and not subtitle_file:\n",
" raise ValueError(\n",
" f\"The language '{origin_language}' \"\n",
" \"is not supported for transcription (ASR).\"\n",
" )\n",
"\n",
" if get_translated_text:\n",
" self.edit_subs_complete = False\n",
" if get_video_from_text_json:\n",
" if not self.edit_subs_complete:\n",
" raise ValueError(\"Generate the transcription first.\")\n",
"\n",
" if (\n",
" (\"sound\" in output_type or output_type == \"raw media\")\n",
" and (get_translated_text or get_video_from_text_json)\n",
" ):\n",
" raise ValueError(\n",
" \"Please disable 'edit generate subtitles' \"\n",
" f\"first to acquire the {output_type}.\"\n",
" )\n",
"\n",
" TRANSLATE_AUDIO_TO = LANGUAGES[target_language]\n",
" SOURCE_LANGUAGE = LANGUAGES[origin_language]\n",
"\n",
" if (\n",
" transcriber_model == \"OpenAI_API_Whisper\"\n",
" and SOURCE_LANGUAGE == \"zh-TW\"\n",
" ):\n",
" logger.warning(\n",
" \"OpenAI API Whisper only supports Chinese (Simplified).\"\n",
" )\n",
" SOURCE_LANGUAGE = \"zh\"\n",
"\n",
" if (\n",
" text_segmentation_scale in [\"word\", \"character\"]\n",
" and \"subtitle\" not in output_type\n",
" ):\n",
" wrn_lang = (\n",
" \"Text segmentation by words or characters is typically\"\n",
" \" used for generating subtitles. If subtitles are not the\"\n",
" \" intended output, consider selecting 'sentence' \"\n",
" \"segmentation method to ensure optimal results.\"\n",
"\n",
" )\n",
" warn_disp(wrn_lang, is_gui)\n",
"\n",
" if tts_voice00[:2].lower() != TRANSLATE_AUDIO_TO[:2].lower():\n",
" wrn_lang = (\n",
" \"Make sure to select a 'TTS Speaker' suitable for\"\n",
" \" the translation language to avoid errors with the TTS.\"\n",
" )\n",
" warn_disp(wrn_lang, is_gui)\n",
"\n",
" if \"_XTTS_\" in tts_voice00 and voice_imitation:\n",
" wrn_lang = (\n",
" \"When you select XTTS, it is advisable \"\n",
" \"to disable Voice Imitation.\"\n",
" )\n",
" warn_disp(wrn_lang, is_gui)\n",
"\n",
" if custom_voices and voice_imitation:\n",
" wrn_lang = (\n",
" \"When you use R.V.C. models, it is advisable\"\n",
" \" to disable Voice Imitation.\"\n",
" )\n",
" warn_disp(wrn_lang, is_gui)\n",
"\n",
" if not media_file and not subtitle_file:\n",
" raise ValueError(\n",
" \"Specifify a media or SRT file in advanced settings\"\n",
" )\n",
"\n",
" if subtitle_file:\n",
" subtitle_file = (\n",
" subtitle_file\n",
" if isinstance(subtitle_file, str)\n",
" else subtitle_file.name\n",
" )\n",
"\n",
" if subtitle_file and SOURCE_LANGUAGE == \"Automatic detection\":\n",
" raise Exception(\n",
" \"To use an SRT file, you need to specify its \"\n",
" \"original language (Source language)\"\n",
" )\n",
"\n",
" if not media_file and subtitle_file:\n",
" diarization_model = \"disable\"\n",
" media_file = \"audio_support.wav\"\n",
" if not get_video_from_text_json:\n",
" remove_files(media_file)\n",
" srt_data = srt_file_to_segments(subtitle_file)\n",
" total_duration = srt_data[\"segments\"][-1][\"end\"] + 30.\n",
" support_audio = AudioSegment.silent(\n",
" duration=int(total_duration * 1000)\n",
" )\n",
" support_audio.export(\n",
" media_file, format=\"wav\"\n",
" )\n",
" logger.info(\"Supporting audio for the SRT file, created.\")\n",
"\n",
" if \"SET_LIMIT\" == os.getenv(\"DEMO\"):\n",
" preview = True\n",
" mix_method_audio = \"Adjusting volumes and mixing audio\"\n",
" transcriber_model = \"medium\"\n",
" logger.info(\n",
" \"DEMO; set preview=True; Generation is limited to \"\n",
" \"10 seconds to prevent CPU errors. No limitations with GPU.\\n\"\n",
" \"DEMO; set Adjusting volumes and mixing audio\\n\"\n",
" \"DEMO; set whisper model to medium\"\n",
" )\n",
"\n",
" # Check GPU\n",
" if self.device == \"cpu\" and compute_type not in COMPUTE_TYPE_CPU:\n",
" logger.info(\"Compute type changed to float32\")\n",
" compute_type = \"float32\"\n",
"\n",
" base_video_file = \"Video.mp4\"\n",
" base_audio_wav = \"audio.wav\"\n",
" dub_audio_file = \"audio_dub_solo.ogg\"\n",
" vocals_audio_file = \"audio_Vocals_DeReverb.wav\"\n",
" voiceless_audio_file = \"audio_Voiceless.wav\"\n",
" mix_audio_file = \"audio_mix.mp3\"\n",
" vid_subs = \"video_subs_file.mp4\"\n",
" video_output_file = \"video_dub.mp4\"\n",
"\n",
" if os.path.exists(media_file):\n",
" media_base_hash = get_hash(media_file)\n",
" else:\n",
" media_base_hash = media_file\n",
" self.clear_cache(media_base_hash, force=(not enable_cache))\n",
"\n",
" if not get_video_from_text_json:\n",
" self.result_diarize = (\n",
" self.align_language\n",
" ) = self.result_source_lang = None\n",
" if not self.task_in_cache(\"media\", [media_base_hash, preview], {}):\n",
" if is_audio_file(media_file):\n",
" prog_disp(\n",
" \"Processing audio...\", 0.15, is_gui, progress=progress\n",
" )\n",
" audio_preprocessor(preview, media_file, base_audio_wav)\n",
" else:\n",
" prog_disp(\n",
" \"Processing video...\", 0.15, is_gui, progress=progress\n",
" )\n",
" audio_video_preprocessor(\n",
" preview, media_file, base_video_file, base_audio_wav\n",
" )\n",
" logger.debug(\"Set file complete.\")\n",
"\n",
" if \"sound\" in output_type:\n",
" prog_disp(\n",
" \"Separating sounds in the file...\",\n",
" 0.50,\n",
" is_gui,\n",
" progress=progress\n",
" )\n",
" separate_out = sound_separate(base_audio_wav, output_type)\n",
" final_outputs = []\n",
" for out in separate_out:\n",
" final_name = media_out(\n",
" media_file,\n",
" f\"{get_no_ext_filename(out)}\",\n",
" video_output_name,\n",
" \"wav\",\n",
" file_obj=out,\n",
" )\n",
" final_outputs.append(final_name)\n",
" logger.info(f\"Done: {str(final_outputs)}\")\n",
" return final_outputs\n",
"\n",
" if output_type == \"raw media\":\n",
" output = media_out(\n",
" media_file,\n",
" \"raw_media\",\n",
" video_output_name,\n",
" \"wav\" if is_audio_file(media_file) else \"mp4\",\n",
" file_obj=base_audio_wav if is_audio_file(media_file) else base_video_file,\n",
" )\n",
" logger.info(f\"Done: {output}\")\n",
" return output\n",
"\n",
" if not self.task_in_cache(\"refine_vocals\", [vocal_refinement], {}):\n",
" self.vocals = None\n",
" if vocal_refinement:\n",
" try:\n",
" from soni_translate.mdx_net import process_uvr_task\n",
" _, _, _, _, file_vocals = process_uvr_task(\n",
" orig_song_path=base_audio_wav,\n",
" main_vocals=False,\n",
" dereverb=True,\n",
" remove_files_output_dir=True,\n",
" )\n",
" remove_files(vocals_audio_file)\n",
" copy_files(file_vocals, \".\")\n",
" self.vocals = vocals_audio_file\n",
" except Exception as error:\n",
" logger.error(str(error))\n",
"\n",
" if not self.task_in_cache(\"transcript_align\", [\n",
" subtitle_file,\n",
" SOURCE_LANGUAGE,\n",
" transcriber_model,\n",
" compute_type,\n",
" batch_size,\n",
" literalize_numbers,\n",
" segment_duration_limit,\n",
" (\n",
" \"l_unit\"\n",
" if text_segmentation_scale in [\"word\", \"character\"]\n",
" and subtitle_file\n",
" else \"sentence\"\n",
" )\n",
" ], {\"vocals\": self.vocals}):\n",
" if subtitle_file:\n",
" prog_disp(\n",
" \"From SRT file...\", 0.30, is_gui, progress=progress\n",
" )\n",
" audio = whisperx.load_audio(\n",
" base_audio_wav if not self.vocals else self.vocals\n",
" )\n",
" self.result = srt_file_to_segments(subtitle_file)\n",
" self.result[\"language\"] = SOURCE_LANGUAGE\n",
" else:\n",
" prog_disp(\n",
" \"Transcribing...\", 0.30, is_gui, progress=progress\n",
" )\n",
" SOURCE_LANGUAGE = (\n",
" None\n",
" if SOURCE_LANGUAGE == \"Automatic detection\"\n",
" else SOURCE_LANGUAGE\n",
" )\n",
" audio, self.result = transcribe_speech(\n",
" base_audio_wav if not self.vocals else self.vocals,\n",
" transcriber_model,\n",
" compute_type,\n",
" batch_size,\n",
" SOURCE_LANGUAGE,\n",
" literalize_numbers,\n",
" segment_duration_limit,\n",
" )\n",
" logger.debug(\n",
" \"Transcript complete, \"\n",
" f\"segments count {len(self.result['segments'])}\"\n",
" )\n",
"\n",
" self.align_language = self.result[\"language\"]\n",
" if (\n",
" not subtitle_file\n",
" or text_segmentation_scale in [\"word\", \"character\"]\n",
" ):\n",
" prog_disp(\"Aligning...\", 0.45, is_gui, progress=progress)\n",
" try:\n",
" if self.align_language in [\"vi\"]:\n",
" logger.info(\n",
" \"Deficient alignment for the \"\n",
" f\"{self.align_language} language, skipping the\"\n",
" \" process. It is suggested to reduce the \"\n",
" \"duration of the segments as an alternative.\"\n",
" )\n",
" else:\n",
" self.result = align_speech(audio, self.result)\n",
" logger.debug(\n",
" \"Align complete, \"\n",
" f\"segments count {len(self.result['segments'])}\"\n",
" )\n",
" except Exception as error:\n",
" logger.error(str(error))\n",
"\n",
" if self.result[\"segments\"] == []:\n",
" raise ValueError(\"No active speech found in audio\")\n",
"\n",
" if not self.task_in_cache(\"break_align\", [\n",
" divide_text_segments_by,\n",
" text_segmentation_scale,\n",
" self.align_language\n",
" ], {\n",
" \"result\": self.result,\n",
" \"align_language\": self.align_language\n",
" }):\n",
" if self.align_language in [\"ja\", \"zh\", \"zh-TW\"]:\n",
" divide_text_segments_by += \"|!|?|...|。\"\n",
" if text_segmentation_scale in [\"word\", \"character\"]:\n",
" self.result = linguistic_level_segments(\n",
" self.result,\n",
" text_segmentation_scale,\n",
" )\n",
" elif divide_text_segments_by:\n",
" try:\n",
" self.result = break_aling_segments(\n",
" self.result,\n",
" break_characters=divide_text_segments_by,\n",
" )\n",
" except Exception as error:\n",
" logger.error(str(error))\n",
"\n",
" if not self.task_in_cache(\"diarize\", [\n",
" min_speakers,\n",
" max_speakers,\n",
" YOUR_HF_TOKEN[:len(YOUR_HF_TOKEN)//2],\n",
" diarization_model\n",
" ], {\n",
" \"result\": self.result\n",
" }):\n",
" prog_disp(\"Diarizing...\", 0.60, is_gui, progress=progress)\n",
" diarize_model_select = diarization_models[diarization_model]\n",
" self.result_diarize = diarize_speech(\n",
" base_audio_wav if not self.vocals else self.vocals,\n",
" self.result,\n",
" min_speakers,\n",
" max_speakers,\n",
" YOUR_HF_TOKEN,\n",
" diarize_model_select,\n",
" )\n",
" logger.debug(\"Diarize complete\")\n",
" self.result_source_lang = copy.deepcopy(self.result_diarize)\n",
"\n",
" if not self.task_in_cache(\"translate\", [\n",
" TRANSLATE_AUDIO_TO,\n",
" translate_process\n",
" ], {\n",
" \"result_diarize\": self.result_diarize\n",
" }):\n",
" prog_disp(\"Translating...\", 0.70, is_gui, progress=progress)\n",
" lang_source = (\n",
" self.align_language\n",
" if self.align_language\n",
" else SOURCE_LANGUAGE\n",
" )\n",
" self.result_diarize[\"segments\"] = translate_text(\n",
" self.result_diarize[\"segments\"],\n",
" TRANSLATE_AUDIO_TO,\n",
" translate_process,\n",
" chunk_size=1800,\n",
" source=lang_source,\n",
" )\n",
" logger.debug(\"Translation complete\")\n",
" logger.debug(self.result_diarize)\n",
"\n",
" if get_translated_text:\n",
"\n",
" json_data = []\n",
" for segment in self.result_diarize[\"segments\"]:\n",
" start = segment[\"start\"]\n",
" text = segment[\"text\"]\n",
" speaker = int(segment.get(\"speaker\", \"SPEAKER_00\")[-2:]) + 1\n",
" json_data.append(\n",
" {\"start\": start, \"text\": text, \"speaker\": speaker}\n",
" )\n",
"\n",
" # Convert list of dictionaries to a JSON string with indentation\n",
" json_string = json.dumps(json_data, indent=2)\n",
" logger.info(\"Done\")\n",
" self.edit_subs_complete = True\n",
" return json_string.encode().decode(\"unicode_escape\")\n",
"\n",
" if get_video_from_text_json:\n",
"\n",
" if self.result_diarize is None:\n",
" raise ValueError(\"Generate the transcription first.\")\n",
" # with open('text_json.json', 'r') as file:\n",
" text_json_loaded = json.loads(text_json)\n",
" for i, segment in enumerate(self.result_diarize[\"segments\"]):\n",
" segment[\"text\"] = text_json_loaded[i][\"text\"]\n",
" segment[\"speaker\"] = \"SPEAKER_{:02d}\".format(\n",
" int(text_json_loaded[i][\"speaker\"]) - 1\n",
" )\n",
"\n",
" # Write subtitle\n",
" if not self.task_in_cache(\"subs_and_edit\", [\n",
" copy.deepcopy(self.result_diarize),\n",
" output_format_subtitle,\n",
" TRANSLATE_AUDIO_TO\n",
" ], {\n",
" \"result_diarize\": self.result_diarize\n",
" }):\n",
" if output_format_subtitle == \"disable\":\n",
" self.sub_file = \"sub_tra.srt\"\n",
" elif output_format_subtitle != \"ass\":\n",
" self.sub_file = process_subtitles(\n",
" self.result_source_lang,\n",
" self.align_language,\n",
" self.result_diarize,\n",
" output_format_subtitle,\n",
" TRANSLATE_AUDIO_TO,\n",
" )\n",
"\n",
" # Need task\n",
" if output_format_subtitle != \"srt\":\n",
" _ = process_subtitles(\n",
" self.result_source_lang,\n",
" self.align_language,\n",
" self.result_diarize,\n",
" \"srt\",\n",
" TRANSLATE_AUDIO_TO,\n",
" )\n",
"\n",
" if output_format_subtitle == \"ass\":\n",
" convert_ori = \"ffmpeg -i sub_ori.srt sub_ori.ass -y\"\n",
" convert_tra = \"ffmpeg -i sub_tra.srt sub_tra.ass -y\"\n",
" self.sub_file = \"sub_tra.ass\"\n",
" run_command(convert_ori)\n",
" run_command(convert_tra)\n",
"\n",
" format_sub = (\n",
" output_format_subtitle\n",
" if output_format_subtitle != \"disable\"\n",
" else \"srt\"\n",
" )\n",
"\n",
" if output_type == \"subtitle\":\n",
"\n",
" out_subs = []\n",
" tra_subs = media_out(\n",
" media_file,\n",
" TRANSLATE_AUDIO_TO,\n",
" video_output_name,\n",
" format_sub,\n",
" file_obj=self.sub_file,\n",
" )\n",
" out_subs.append(tra_subs)\n",
"\n",
" ori_subs = media_out(\n",
" media_file,\n",
" self.align_language,\n",
" video_output_name,\n",
" format_sub,\n",
" file_obj=f\"sub_ori.{format_sub}\",\n",
" )\n",
" out_subs.append(ori_subs)\n",
" logger.info(f\"Done: {out_subs}\")\n",
" return out_subs\n",
"\n",
" if output_type == \"subtitle [by speaker]\":\n",
" output = get_subtitle_speaker(\n",
" media_file,\n",
" result=self.result_diarize,\n",
" language=TRANSLATE_AUDIO_TO,\n",
" extension=format_sub,\n",
" base_name=video_output_name,\n",
" )\n",
" logger.info(f\"Done: {str(output)}\")\n",
" return output\n",
"\n",
" if \"video [subtitled]\" in output_type:\n",
" output = media_out(\n",
" media_file,\n",
" TRANSLATE_AUDIO_TO + \"_subtitled\",\n",
" video_output_name,\n",
" \"wav\" if is_audio_file(media_file) else (\n",
" \"mkv\" if \"mkv\" in output_type else \"mp4\"\n",
" ),\n",
" file_obj=base_audio_wav if is_audio_file(media_file) else base_video_file,\n",
" soft_subtitles=False if is_audio_file(media_file) else True,\n",
" subtitle_files=output_format_subtitle,\n",
" )\n",
" msg_out = output[0] if isinstance(output, list) else output\n",
" logger.info(f\"Done: {msg_out}\")\n",
" return output\n",
"\n",
" if not self.task_in_cache(\"tts\", [\n",
" TRANSLATE_AUDIO_TO,\n",
" tts_voice00,\n",
" tts_voice01,\n",
" tts_voice02,\n",
" tts_voice03,\n",
" tts_voice04,\n",
" tts_voice05,\n",
" tts_voice06,\n",
" tts_voice07,\n",
" tts_voice08,\n",
" tts_voice09,\n",
" tts_voice10,\n",
" tts_voice11,\n",
" dereverb_automatic_xtts\n",
" ], {\n",
" \"sub_file\": self.sub_file\n",
" }):\n",
" prog_disp(\"Text to speech...\", 0.80, is_gui, progress=progress)\n",
" self.valid_speakers = audio_segmentation_to_voice(\n",
" self.result_diarize,\n",
" TRANSLATE_AUDIO_TO,\n",
" is_gui,\n",
" tts_voice00,\n",
" tts_voice01,\n",
" tts_voice02,\n",
" tts_voice03,\n",
" tts_voice04,\n",
" tts_voice05,\n",
" tts_voice06,\n",
" tts_voice07,\n",
" tts_voice08,\n",
" tts_voice09,\n",
" tts_voice10,\n",
" tts_voice11,\n",
" dereverb_automatic_xtts,\n",
" )\n",
"\n",
" if not self.task_in_cache(\"acc_and_vc\", [\n",
" max_accelerate_audio,\n",
" acceleration_rate_regulation,\n",
" voice_imitation,\n",
" voice_imitation_max_segments,\n",
" voice_imitation_remove_previous,\n",
" voice_imitation_vocals_dereverb,\n",
" voice_imitation_method,\n",
" custom_voices,\n",
" custom_voices_workers,\n",
" copy.deepcopy(self.vci.model_config),\n",
" avoid_overlap\n",
" ], {\n",
" \"valid_speakers\": self.valid_speakers\n",
" }):\n",
" audio_files, speakers_list = accelerate_segments(\n",
" self.result_diarize,\n",
" max_accelerate_audio,\n",
" self.valid_speakers,\n",
" acceleration_rate_regulation,\n",
" )\n",
"\n",
" # Voice Imitation (Tone color converter)\n",
" if voice_imitation:\n",
" prog_disp(\n",
" \"Voice Imitation...\", 0.85, is_gui, progress=progress\n",
" )\n",
" from soni_translate.text_to_speech import toneconverter\n",
"\n",
" try:\n",
" toneconverter(\n",
" copy.deepcopy(self.result_diarize),\n",
" voice_imitation_max_segments,\n",
" voice_imitation_remove_previous,\n",
" voice_imitation_vocals_dereverb,\n",
" voice_imitation_method,\n",
" )\n",
" except Exception as error:\n",
" logger.error(str(error))\n",
"\n",
" # custom voice\n",
" if custom_voices:\n",
" prog_disp(\n",
" \"Applying customized voices...\",\n",
" 0.90,\n",
" is_gui,\n",
" progress=progress,\n",
" )\n",
"\n",
" try:\n",
" self.vci(\n",
" audio_files,\n",
" speakers_list,\n",
" overwrite=True,\n",
" parallel_workers=custom_voices_workers,\n",
" )\n",
" self.vci.unload_models()\n",
" except Exception as error:\n",
" logger.error(str(error))\n",
"\n",
" prog_disp(\n",
" \"Creating final translated video...\",\n",
" 0.95,\n",
" is_gui,\n",
" progress=progress,\n",
" )\n",
" remove_files(dub_audio_file)\n",
" create_translated_audio(\n",
" self.result_diarize,\n",
" audio_files,\n",
" dub_audio_file,\n",
" False,\n",
" avoid_overlap,\n",
" )\n",
"\n",
" # Voiceless track, change with file\n",
" hash_base_audio_wav = get_hash(base_audio_wav)\n",
" if voiceless_track:\n",
" if self.voiceless_id != hash_base_audio_wav:\n",
" from soni_translate.mdx_net import process_uvr_task\n",
"\n",
" try:\n",
" # voiceless_audio_file_dir = \"clean_song_output/voiceless\"\n",
" remove_files(voiceless_audio_file)\n",
" uvr_voiceless_audio_wav, _ = process_uvr_task(\n",
" orig_song_path=base_audio_wav,\n",
" song_id=\"voiceless\",\n",
" only_voiceless=True,\n",
" remove_files_output_dir=False,\n",
" )\n",
" copy_files(uvr_voiceless_audio_wav, \".\")\n",
" base_audio_wav = voiceless_audio_file\n",
" self.voiceless_id = hash_base_audio_wav\n",
"\n",
" except Exception as error:\n",
" logger.error(str(error))\n",
" else:\n",
" base_audio_wav = voiceless_audio_file\n",
"\n",
" if not self.task_in_cache(\"mix_aud\", [\n",
" mix_method_audio,\n",
" volume_original_audio,\n",
" volume_translated_audio,\n",
" voiceless_track\n",
" ], {}):\n",
" # TYPE MIX AUDIO\n",
" remove_files(mix_audio_file)\n",
" command_volume_mix = f'ffmpeg -y -i {base_audio_wav} -i {dub_audio_file} -filter_complex \"[0:0]volume={volume_original_audio}[a];[1:0]volume={volume_translated_audio}[b];[a][b]amix=inputs=2:duration=longest\" -c:a libmp3lame {mix_audio_file}'\n",
" command_background_mix = f'ffmpeg -i {base_audio_wav} -i {dub_audio_file} -filter_complex \"[1:a]asplit=2[sc][mix];[0:a][sc]sidechaincompress=threshold=0.003:ratio=20[bg]; [bg][mix]amerge[final]\" -map [final] {mix_audio_file}'\n",
" if mix_method_audio == \"Adjusting volumes and mixing audio\":\n",
" # volume mix\n",
" run_command(command_volume_mix)\n",
" else:\n",
" try:\n",
" # background mix\n",
" run_command(command_background_mix)\n",
" except Exception as error_mix:\n",
" # volume mix except\n",
" logger.error(str(error_mix))\n",
" run_command(command_volume_mix)\n",
"\n",
" if \"audio\" in output_type or is_audio_file(media_file):\n",
" output = media_out(\n",
" media_file,\n",
" TRANSLATE_AUDIO_TO,\n",
" video_output_name,\n",
" \"wav\" if \"wav\" in output_type else (\n",
" \"ogg\" if \"ogg\" in output_type else \"mp3\"\n",
" ),\n",
" file_obj=mix_audio_file,\n",
" subtitle_files=output_format_subtitle,\n",
" )\n",
" msg_out = output[0] if isinstance(output, list) else output\n",
" logger.info(f\"Done: {msg_out}\")\n",
" return output\n",
"\n",
" hash_base_video_file = get_hash(base_video_file)\n",
"\n",
" if burn_subtitles_to_video:\n",
" hashvideo_text = [\n",
" hash_base_video_file,\n",
" [seg[\"text\"] for seg in self.result_diarize[\"segments\"]]\n",
" ]\n",
" if self.burn_subs_id != hashvideo_text:\n",
" try:\n",
" logger.info(\"Burn subtitles\")\n",
" remove_files(vid_subs)\n",
" command = f\"ffmpeg -i {base_video_file} -y -vf subtitles=sub_tra.srt -max_muxing_queue_size 9999 {vid_subs}\"\n",
" run_command(command)\n",
" base_video_file = vid_subs\n",
" self.burn_subs_id = hashvideo_text\n",
" except Exception as error:\n",
" logger.error(str(error))\n",
" else:\n",
" base_video_file = vid_subs\n",
"\n",
" if not self.task_in_cache(\"output\", [\n",
" hash_base_video_file,\n",
" hash_base_audio_wav,\n",
" burn_subtitles_to_video\n",
" ], {}):\n",
" # Merge new audio + video\n",
" remove_files(video_output_file)\n",
" run_command(\n",
" f\"ffmpeg -i {base_video_file} -i {mix_audio_file} -c:v copy -c:a copy -map 0:v -map 1:a -shortest {video_output_file}\"\n",
" )\n",
"\n",
" output = media_out(\n",
" media_file,\n",
" TRANSLATE_AUDIO_TO,\n",
" video_output_name,\n",
" \"mkv\" if \"mkv\" in output_type else \"mp4\",\n",
" file_obj=video_output_file,\n",
" soft_subtitles=soft_subtitles_to_video,\n",
" subtitle_files=output_format_subtitle,\n",
" )\n",
" msg_out = output[0] if isinstance(output, list) else output\n",
" logger.info(f\"Done: {msg_out}\")\n",
"\n",
" return output\n",
"\n",
" def hook_beta_processor(\n",
" self,\n",
" document,\n",
" tgt_lang,\n",
" translate_process,\n",
" ori_lang,\n",
" tts,\n",
" name_final_file,\n",
" custom_voices,\n",
" custom_voices_workers,\n",
" output_type,\n",
" chunk_size,\n",
" width,\n",
" height,\n",
" start_page,\n",
" end_page,\n",
" bcolor,\n",
" is_gui,\n",
" progress\n",
" ):\n",
" prog_disp(\"Processing pages...\", 0.10, is_gui, progress=progress)\n",
" doc_data = doc_to_txtximg_pages(document, width, height, start_page, end_page, bcolor)\n",
" result_diarize = page_data_to_segments(doc_data, 1700)\n",
"\n",
" prog_disp(\"Translating...\", 0.20, is_gui, progress=progress)\n",
" result_diarize[\"segments\"] = translate_text(\n",
" result_diarize[\"segments\"],\n",
" tgt_lang,\n",
" translate_process,\n",
" chunk_size=0,\n",
" source=ori_lang,\n",
" )\n",
" chunk_size = (\n",
" chunk_size if chunk_size else determine_chunk_size(tts)\n",
" )\n",
" doc_data = update_page_data(result_diarize, doc_data)\n",
"\n",
" prog_disp(\"Text to speech...\", 0.30, is_gui, progress=progress)\n",
" result_diarize = page_data_to_segments(doc_data, chunk_size)\n",
" valid_speakers = audio_segmentation_to_voice(\n",
" result_diarize,\n",
" tgt_lang,\n",
" is_gui,\n",
" tts,\n",
" )\n",
"\n",
" # fix format and set folder output\n",
" audio_files, speakers_list = accelerate_segments(\n",
" result_diarize,\n",
" 1.0,\n",
" valid_speakers,\n",
" )\n",
"\n",
" # custom voice\n",
" if custom_voices:\n",
" prog_disp(\n",
" \"Applying customized voices...\",\n",
" 0.60,\n",
" is_gui,\n",
" progress=progress,\n",
" )\n",
" self.vci(\n",
" audio_files,\n",
" speakers_list,\n",
" overwrite=True,\n",
" parallel_workers=custom_voices_workers,\n",
" )\n",
" self.vci.unload_models()\n",
"\n",
" # Update time segments and not concat\n",
" result_diarize = fix_timestamps_docs(result_diarize, audio_files)\n",
" final_wav_file = \"audio_book.wav\"\n",
" remove_files(final_wav_file)\n",
"\n",
" prog_disp(\"Creating audio file...\", 0.70, is_gui, progress=progress)\n",
" create_translated_audio(\n",
" result_diarize, audio_files, final_wav_file, False\n",
" )\n",
"\n",
" prog_disp(\"Creating video file...\", 0.80, is_gui, progress=progress)\n",
" video_doc = create_video_from_images(\n",
" doc_data,\n",
" result_diarize\n",
" )\n",
"\n",
" # Merge video and audio\n",
" prog_disp(\"Merging...\", 0.90, is_gui, progress=progress)\n",
" vid_out = merge_video_and_audio(video_doc, final_wav_file)\n",
"\n",
" # End\n",
" output = media_out(\n",
" document,\n",
" tgt_lang,\n",
" name_final_file,\n",
" \"mkv\" if \"mkv\" in output_type else \"mp4\",\n",
" file_obj=vid_out,\n",
" )\n",
" logger.info(f\"Done: {output}\")\n",
" return output\n",
"\n",
" def multilingual_docs_conversion(\n",
" self,\n",
" string_text=\"\", # string\n",
" document=None, # doc path gui\n",
" directory_input=\"\", # doc path\n",
" origin_language=\"English (en)\",\n",
" target_language=\"English (en)\",\n",
" tts_voice00=\"en-US-EmmaMultilingualNeural-Female\",\n",
" name_final_file=\"\",\n",
" translate_process=\"google_translator\",\n",
" output_type=\"audio\",\n",
" chunk_size=None,\n",
" custom_voices=False,\n",
" custom_voices_workers=1,\n",
" start_page=1,\n",
" end_page=99999,\n",
" width=1280,\n",
" height=720,\n",
" bcolor=\"dynamic\",\n",
" is_gui=False,\n",
" progress=gr.Progress(),\n",
" ):\n",
" if \"gpt\" in translate_process:\n",
" check_openai_api_key()\n",
"\n",
" SOURCE_LANGUAGE = LANGUAGES[origin_language]\n",
" if translate_process != \"disable_translation\":\n",
" TRANSLATE_AUDIO_TO = LANGUAGES[target_language]\n",
" else:\n",
" TRANSLATE_AUDIO_TO = SOURCE_LANGUAGE\n",
" logger.info(\"No translation\")\n",
" if tts_voice00[:2].lower() != TRANSLATE_AUDIO_TO[:2].lower():\n",
" logger.debug(\n",
" \"Make sure to select a 'TTS Speaker' suitable for the \"\n",
" \"translation language to avoid errors with the TTS.\"\n",
" )\n",
"\n",
" self.clear_cache(string_text, force=True)\n",
"\n",
" is_string = False\n",
" if document is None:\n",
" if os.path.exists(directory_input):\n",
" document = directory_input\n",
" else:\n",
" document = string_text\n",
" is_string = True\n",
" document = document if isinstance(document, str) else document.name\n",
" if not document:\n",
" raise Exception(\"No data found\")\n",
"\n",
" if \"videobook\" in output_type:\n",
" if not document.lower().endswith(\".pdf\"):\n",
" raise ValueError(\n",
" \"Videobooks are only compatible with PDF files.\"\n",
" )\n",
"\n",
" return self.hook_beta_processor(\n",
" document,\n",
" TRANSLATE_AUDIO_TO,\n",
" translate_process,\n",
" SOURCE_LANGUAGE,\n",
" tts_voice00,\n",
" name_final_file,\n",
" custom_voices,\n",
" custom_voices_workers,\n",
" output_type,\n",
" chunk_size,\n",
" width,\n",
" height,\n",
" start_page,\n",
" end_page,\n",
" bcolor,\n",
" is_gui,\n",
" progress\n",
" )\n",
"\n",
" # audio_wav = \"audio.wav\"\n",
" final_wav_file = \"audio_book.wav\"\n",
"\n",
" prog_disp(\"Processing text...\", 0.15, is_gui, progress=progress)\n",
" result_file_path, result_text = document_preprocessor(\n",
" document, is_string, start_page, end_page\n",
" )\n",
"\n",
" if (\n",
" output_type == \"book (txt)\"\n",
" and translate_process == \"disable_translation\"\n",
" ):\n",
" return result_file_path\n",
"\n",
" if \"SET_LIMIT\" == os.getenv(\"DEMO\"):\n",
" result_text = result_text[:50]\n",
" logger.info(\n",
" \"DEMO; Generation is limited to 50 characters to prevent \"\n",
" \"CPU errors. No limitations with GPU.\\n\"\n",
" )\n",
"\n",
" if translate_process != \"disable_translation\":\n",
" # chunks text for translation\n",
" result_diarize = plain_text_to_segments(result_text, 1700)\n",
" prog_disp(\"Translating...\", 0.30, is_gui, progress=progress)\n",
" # not or iterative with 1700 chars\n",
" result_diarize[\"segments\"] = translate_text(\n",
" result_diarize[\"segments\"],\n",
" TRANSLATE_AUDIO_TO,\n",
" translate_process,\n",
" chunk_size=0,\n",
" source=SOURCE_LANGUAGE,\n",
" )\n",
"\n",
" txt_file_path, result_text = segments_to_plain_text(result_diarize)\n",
"\n",
" if output_type == \"book (txt)\":\n",
" return media_out(\n",
" result_file_path if is_string else document,\n",
" TRANSLATE_AUDIO_TO,\n",
" name_final_file,\n",
" \"txt\",\n",
" file_obj=txt_file_path,\n",
" )\n",
"\n",
" # (TTS limits) plain text to result_diarize\n",
" chunk_size = (\n",
" chunk_size if chunk_size else determine_chunk_size(tts_voice00)\n",
" )\n",
" result_diarize = plain_text_to_segments(result_text, chunk_size)\n",
" logger.debug(result_diarize)\n",
"\n",
" prog_disp(\"Text to speech...\", 0.45, is_gui, progress=progress)\n",
" valid_speakers = audio_segmentation_to_voice(\n",
" result_diarize,\n",
" TRANSLATE_AUDIO_TO,\n",
" is_gui,\n",
" tts_voice00,\n",
" )\n",
"\n",
" # fix format and set folder output\n",
" audio_files, speakers_list = accelerate_segments(\n",
" result_diarize,\n",
" 1.0,\n",
" valid_speakers,\n",
" )\n",
"\n",
" # custom voice\n",
" if custom_voices:\n",
" prog_disp(\n",
" \"Applying customized voices...\",\n",
" 0.80,\n",
" is_gui,\n",
" progress=progress,\n",
" )\n",
" self.vci(\n",
" audio_files,\n",
" speakers_list,\n",
" overwrite=True,\n",
" parallel_workers=custom_voices_workers,\n",
" )\n",
" self.vci.unload_models()\n",
"\n",
" prog_disp(\n",
" \"Creating final audio file...\", 0.90, is_gui, progress=progress\n",
" )\n",
" remove_files(final_wav_file)\n",
" create_translated_audio(\n",
" result_diarize, audio_files, final_wav_file, True\n",
" )\n",
"\n",
" output = media_out(\n",
" result_file_path if is_string else document,\n",
" TRANSLATE_AUDIO_TO,\n",
" name_final_file,\n",
" \"mp3\" if \"mp3\" in output_type else (\n",
" \"ogg\" if \"ogg\" in output_type else \"wav\"\n",
" ),\n",
" file_obj=final_wav_file,\n",
" )\n",
"\n",
" logger.info(f\"Done: {output}\")\n",
"\n",
" return output\n",
"\n",
"\n",
"title = \"