File size: 9,797 Bytes
5fa5566
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
import asyncio
import os
from pathlib import Path

import deep_translator
import pysrt
import tqdm.asyncio
import subtitle_utils
from utils import format_time

# all entence endings for japanese and normal people languages
sentence_endings = ['.', '!', '?', ')', 'よ', 'ね',
                    'の', 'さ', 'ぞ', 'な', 'か', '!', '。', '」', '…']

# a good separator is a char or string that doenst change the translation quality but is near ever preserved in result at same or near position
separator = " ◌ "
separator_unjoin = separator.replace(' ', '')
chunk_max_chars = 4999


def translate_srt_file(srt_file_path: Path, translated_subtitle_path: Path, target_lang):
    # Load the original SRT file
    subs = pysrt.open(srt_file_path, encoding='utf-8')

    # Extract the subtitle content and store it in a list. Also rejoin all lines splited
    sub_content = [' '.join(sub.text.strip().splitlines()) for sub in subs]

    # Make chunks of at maximum $chunk_max_chars to stay under Google Translate public API limits
    chunks = join_sentences(sub_content, chunk_max_chars) or []

    # Empty list to store enumerated translated chunks
    translated_chunks = [None] * len(chunks)

    tasks = []
    # Limit to 7 concomitant running tasks
    semaphore = asyncio.Semaphore(7)

    # Async chunks translate function
    async def translate_async():
        async def run_translate(index, chunk, lang):
            while True:
                try:
                    async with semaphore:
                        result = await asyncio.wait_for(translate_chunk(index, chunk, lang), 120)
                    translated_chunks[index] = result
                    break
                except Exception:
                    # Restart task
                    await asyncio.sleep(3)

        for index, chunk in enumerate(chunks):
            task = asyncio.create_task(
                run_translate(index, chunk, target_lang))
            tasks.append(task)

        for tsk in tqdm.asyncio.tqdm_asyncio.as_completed(tasks, total=len(tasks), desc="Translating", unit="chunks", unit_scale=False, leave=True, bar_format="{desc} {percentage:3.0f}% | {n_fmt}/{total_fmt} | ETA: {remaining} | ⏱: {elapsed}"):
            await tsk

    # Cria um loop de eventos e executa as tasks
    loop = asyncio.get_event_loop()
    loop.run_until_complete(translate_async())

    print('Processing translation...', end='')

    # Unjoin lines within each chunk that end with a sentence ending
    unjoined_texts = [unjoin_sentences(
        chunk, translated_chunks[i], separator_unjoin) or "" for i, chunk in enumerate(chunks)]
    unjoined_texts = [text for sublist in unjoined_texts for text in sublist]

    # Split lines as necessary targeting same number of lines as original string
    for i, segment in enumerate(unjoined_texts):
        unjoined_texts[i] = "\n".join(subtitle_utils.split_string_to_max_lines(
            text=segment, max_width=0, max_lines=len(subs[i].text.splitlines())))

    # Combine the original and translated subtitle content
    for i, sub in enumerate(subs):
        sub.text = unjoined_texts[i]

    # Save the translated SRT file
    os.makedirs(translated_subtitle_path.parent, exist_ok=True)
    subs.save(translated_subtitle_path, encoding='utf-8')

    print('\r                         ', end='\r')

    return subs

# Async chunk translate function


async def translate_chunk(index, chunk, target_lang):
    while True:
        try:
            # Translate the subtitle content of the chunk using Google Translate
            translator = deep_translator.google.GoogleTranslator(
                source='auto', target=target_lang)
            translated_chunk: str = await asyncio.wait_for(asyncio.get_event_loop().run_in_executor(None, translator.translate, chunk), 30)
            await asyncio.sleep(0)

            # if nothing is retuned, return the original chunk
            if translated_chunk is None or len(translated_chunk.replace(separator.strip(), '').split()) == 0:
                return chunk

            return translated_chunk
        except Exception as e:
            # If an error occurred, retry
            del translator
            print(
                f"\r[chunk {index}]: Exception: {e.__doc__} Retrying in 30 seconds...", flush=True)
            await asyncio.sleep(30)


def join_sentences(lines, max_chars):
    """

    Joins the given list of strings in a way that each part ends with a sentence ending.

    Adds a separator to all lines in the chunk.

    """
    joined_lines = []
    current_chunk = ""

    for line in lines:
        if not line or line is None:
            line = 'ㅤ'  # invisible char (not a simple space)

        if len(current_chunk) + len(line) + len(separator) <= max_chars:
            current_chunk += line + separator
            if any(line.endswith(ending) for ending in sentence_endings):
                joined_lines.append(current_chunk)
                current_chunk = ""
        else:
            if current_chunk:
                joined_lines.append(current_chunk)
                current_chunk = ""
            if len(current_chunk) + len(line) + len(separator) <= max_chars:
                current_chunk += line + separator
            else:
                # if a single line exceed max_chars, use maximum posible number of words. Discart the remaining
                end_index = line.rfind(
                    ' ', 0, max_chars - (1 + len(separator)))

                if end_index == - (1 + len(separator)):
                    end_index = max_chars - (1 + len(separator))

                joined_lines.append(
                    (line[:end_index] + '…' + separator)[:max_chars])

    # append a chunk wich doenst have a formal end with sentence endings
    if current_chunk:
        joined_lines.append(current_chunk)

    return joined_lines


def unjoin_sentences(original_sentence: str, modified_sentence: str, separator: str):
    """

    Splits the original and modified sentences into lines based on the separator.

    Tries to match the number of lines between the original and modified sentences.

    """

    if original_sentence is None:
        return ' '

    # split by separator, remove double spaces and empty or only space strings from list
    original_lines = original_sentence.split(separator)
    original_lines = [s.strip().replace('  ', ' ').lstrip(" ,.:;)") if s.strip().replace('  ', ' ').lstrip(" ,.:;)") else s
                      for s in original_lines if s.strip()]
    original_lines = [s for s in original_lines if s]
    original_lines = [s for s in original_lines if s.strip()]

    if modified_sentence is None:
        return original_lines or ' '

    # fix strange formatation returned by google translate, case occuring
    modified_sentence.replace(f"{separator_unjoin} ", f"{separator_unjoin}").replace(f" {separator_unjoin}", f"{separator_unjoin}").replace(
        f"{separator_unjoin}.", f".{separator_unjoin}").replace(f"{separator_unjoin},", f",{separator_unjoin}")

    # split by separator, remove double spaces and empty or only space strings from list
    modified_lines = modified_sentence.split(separator_unjoin)
    modified_lines = [s.strip().replace('  ', ' ').lstrip(" ,.:;)") if s.strip().replace('  ', ' ').lstrip(" ,.:;)") else s
                      for s in modified_lines if s.strip()]
    modified_lines = [s for s in modified_lines if s]
    modified_lines = [s for s in modified_lines if s.strip()]

    # if original lines is "silence" sign, doenst translate
    if original_lines == "..." or original_lines == "…":
        return original_lines

    # all ok, return lines
    if len(original_lines) == len(modified_lines):
        return modified_lines

    # zero words? return original sentence, removing separator
    original_word_count = sum(len(line.strip().split())
                              for line in original_lines)
    modified_word_count = len(' '.join(modified_lines).strip().split())
    if original_word_count == 0 or modified_word_count == 0:
        return original_sentence.replace(separator, ' ').replace('  ', ' ')

    # calculate proportion of words between original and translated
    modified_words_proportion = modified_word_count / original_word_count
    # list all modified words
    modified_words = ' '.join(modified_lines).replace(separator, "").replace(
        separator_unjoin, "").replace("  ", " ").strip().split(' ')

    new_modified_lines = []
    current_index = 0

    # reconstruct lines based on proportion of original and translated words
    for i in range(len(original_lines)):
        # Calculate the number of words for the current modified sentence
        num_words = int(
            round(len(original_lines[i].strip().split()) * modified_words_proportion))

        # Extract words from modified list
        generated_line = ' '.join(
            modified_words[current_index:current_index+num_words])

        # Update the current index
        current_index += num_words

        # append remaining if is the last loop
        if i == len(original_lines) - 1:
            ' '.join([generated_line, ' '.join(
                modified_words[current_index:])])

        # Add modified sentence to the new list
        new_modified_lines.append(generated_line.replace("  ", " ").strip())

    # case it continues being shorter
    while len(new_modified_lines) < len(original_lines):
        new_modified_lines.append(new_modified_lines[-1])

    return new_modified_lines or original_lines or ' '