File size: 9,797 Bytes
5fa5566 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 |
import asyncio
import os
from pathlib import Path
import deep_translator
import pysrt
import tqdm.asyncio
import subtitle_utils
from utils import format_time
# all entence endings for japanese and normal people languages
sentence_endings = ['.', '!', '?', ')', 'よ', 'ね',
'の', 'さ', 'ぞ', 'な', 'か', '!', '。', '」', '…']
# a good separator is a char or string that doenst change the translation quality but is near ever preserved in result at same or near position
separator = " ◌ "
separator_unjoin = separator.replace(' ', '')
chunk_max_chars = 4999
def translate_srt_file(srt_file_path: Path, translated_subtitle_path: Path, target_lang):
# Load the original SRT file
subs = pysrt.open(srt_file_path, encoding='utf-8')
# Extract the subtitle content and store it in a list. Also rejoin all lines splited
sub_content = [' '.join(sub.text.strip().splitlines()) for sub in subs]
# Make chunks of at maximum $chunk_max_chars to stay under Google Translate public API limits
chunks = join_sentences(sub_content, chunk_max_chars) or []
# Empty list to store enumerated translated chunks
translated_chunks = [None] * len(chunks)
tasks = []
# Limit to 7 concomitant running tasks
semaphore = asyncio.Semaphore(7)
# Async chunks translate function
async def translate_async():
async def run_translate(index, chunk, lang):
while True:
try:
async with semaphore:
result = await asyncio.wait_for(translate_chunk(index, chunk, lang), 120)
translated_chunks[index] = result
break
except Exception:
# Restart task
await asyncio.sleep(3)
for index, chunk in enumerate(chunks):
task = asyncio.create_task(
run_translate(index, chunk, target_lang))
tasks.append(task)
for tsk in tqdm.asyncio.tqdm_asyncio.as_completed(tasks, total=len(tasks), desc="Translating", unit="chunks", unit_scale=False, leave=True, bar_format="{desc} {percentage:3.0f}% | {n_fmt}/{total_fmt} | ETA: {remaining} | ⏱: {elapsed}"):
await tsk
# Cria um loop de eventos e executa as tasks
loop = asyncio.get_event_loop()
loop.run_until_complete(translate_async())
print('Processing translation...', end='')
# Unjoin lines within each chunk that end with a sentence ending
unjoined_texts = [unjoin_sentences(
chunk, translated_chunks[i], separator_unjoin) or "" for i, chunk in enumerate(chunks)]
unjoined_texts = [text for sublist in unjoined_texts for text in sublist]
# Split lines as necessary targeting same number of lines as original string
for i, segment in enumerate(unjoined_texts):
unjoined_texts[i] = "\n".join(subtitle_utils.split_string_to_max_lines(
text=segment, max_width=0, max_lines=len(subs[i].text.splitlines())))
# Combine the original and translated subtitle content
for i, sub in enumerate(subs):
sub.text = unjoined_texts[i]
# Save the translated SRT file
os.makedirs(translated_subtitle_path.parent, exist_ok=True)
subs.save(translated_subtitle_path, encoding='utf-8')
print('\r ', end='\r')
return subs
# Async chunk translate function
async def translate_chunk(index, chunk, target_lang):
while True:
try:
# Translate the subtitle content of the chunk using Google Translate
translator = deep_translator.google.GoogleTranslator(
source='auto', target=target_lang)
translated_chunk: str = await asyncio.wait_for(asyncio.get_event_loop().run_in_executor(None, translator.translate, chunk), 30)
await asyncio.sleep(0)
# if nothing is retuned, return the original chunk
if translated_chunk is None or len(translated_chunk.replace(separator.strip(), '').split()) == 0:
return chunk
return translated_chunk
except Exception as e:
# If an error occurred, retry
del translator
print(
f"\r[chunk {index}]: Exception: {e.__doc__} Retrying in 30 seconds...", flush=True)
await asyncio.sleep(30)
def join_sentences(lines, max_chars):
"""
Joins the given list of strings in a way that each part ends with a sentence ending.
Adds a separator to all lines in the chunk.
"""
joined_lines = []
current_chunk = ""
for line in lines:
if not line or line is None:
line = 'ㅤ' # invisible char (not a simple space)
if len(current_chunk) + len(line) + len(separator) <= max_chars:
current_chunk += line + separator
if any(line.endswith(ending) for ending in sentence_endings):
joined_lines.append(current_chunk)
current_chunk = ""
else:
if current_chunk:
joined_lines.append(current_chunk)
current_chunk = ""
if len(current_chunk) + len(line) + len(separator) <= max_chars:
current_chunk += line + separator
else:
# if a single line exceed max_chars, use maximum posible number of words. Discart the remaining
end_index = line.rfind(
' ', 0, max_chars - (1 + len(separator)))
if end_index == - (1 + len(separator)):
end_index = max_chars - (1 + len(separator))
joined_lines.append(
(line[:end_index] + '…' + separator)[:max_chars])
# append a chunk wich doenst have a formal end with sentence endings
if current_chunk:
joined_lines.append(current_chunk)
return joined_lines
def unjoin_sentences(original_sentence: str, modified_sentence: str, separator: str):
"""
Splits the original and modified sentences into lines based on the separator.
Tries to match the number of lines between the original and modified sentences.
"""
if original_sentence is None:
return ' '
# split by separator, remove double spaces and empty or only space strings from list
original_lines = original_sentence.split(separator)
original_lines = [s.strip().replace(' ', ' ').lstrip(" ,.:;)") if s.strip().replace(' ', ' ').lstrip(" ,.:;)") else s
for s in original_lines if s.strip()]
original_lines = [s for s in original_lines if s]
original_lines = [s for s in original_lines if s.strip()]
if modified_sentence is None:
return original_lines or ' '
# fix strange formatation returned by google translate, case occuring
modified_sentence.replace(f"{separator_unjoin} ", f"{separator_unjoin}").replace(f" {separator_unjoin}", f"{separator_unjoin}").replace(
f"{separator_unjoin}.", f".{separator_unjoin}").replace(f"{separator_unjoin},", f",{separator_unjoin}")
# split by separator, remove double spaces and empty or only space strings from list
modified_lines = modified_sentence.split(separator_unjoin)
modified_lines = [s.strip().replace(' ', ' ').lstrip(" ,.:;)") if s.strip().replace(' ', ' ').lstrip(" ,.:;)") else s
for s in modified_lines if s.strip()]
modified_lines = [s for s in modified_lines if s]
modified_lines = [s for s in modified_lines if s.strip()]
# if original lines is "silence" sign, doenst translate
if original_lines == "..." or original_lines == "…":
return original_lines
# all ok, return lines
if len(original_lines) == len(modified_lines):
return modified_lines
# zero words? return original sentence, removing separator
original_word_count = sum(len(line.strip().split())
for line in original_lines)
modified_word_count = len(' '.join(modified_lines).strip().split())
if original_word_count == 0 or modified_word_count == 0:
return original_sentence.replace(separator, ' ').replace(' ', ' ')
# calculate proportion of words between original and translated
modified_words_proportion = modified_word_count / original_word_count
# list all modified words
modified_words = ' '.join(modified_lines).replace(separator, "").replace(
separator_unjoin, "").replace(" ", " ").strip().split(' ')
new_modified_lines = []
current_index = 0
# reconstruct lines based on proportion of original and translated words
for i in range(len(original_lines)):
# Calculate the number of words for the current modified sentence
num_words = int(
round(len(original_lines[i].strip().split()) * modified_words_proportion))
# Extract words from modified list
generated_line = ' '.join(
modified_words[current_index:current_index+num_words])
# Update the current index
current_index += num_words
# append remaining if is the last loop
if i == len(original_lines) - 1:
' '.join([generated_line, ' '.join(
modified_words[current_index:])])
# Add modified sentence to the new list
new_modified_lines.append(generated_line.replace(" ", " ").strip())
# case it continues being shorter
while len(new_modified_lines) < len(original_lines):
new_modified_lines.append(new_modified_lines[-1])
return new_modified_lines or original_lines or ' '
|