# Ported from https://github.com/openai/whisper/blob/main/whisper/utils.py
import json
import os
import re
import sys
import zlib
from typing import Callable, List, Optional, TextIO, Union, Dict, Tuple
from datetime import datetime
from modules.whisper.data_classes import Segment, Word
from .files_manager import read_file
# Zero GPU
import spaces
def format_timestamp(
seconds: float, always_include_hours: bool = True, decimal_marker: str = ","
) -> str:
assert seconds >= 0, "non-negative timestamp expected"
milliseconds = round(seconds * 1000.0)
hours = milliseconds // 3_600_000
milliseconds -= hours * 3_600_000
minutes = milliseconds // 60_000
milliseconds -= minutes * 60_000
seconds = milliseconds // 1_000
milliseconds -= seconds * 1_000
hours_marker = f"{hours:02d}:" if always_include_hours or hours > 0 else ""
return (
def time_str_to_seconds(time_str: str, decimal_marker: str = ",") -> float:
times = time_str.split(":")
if len(times) == 3:
hours, minutes, rest = times
hours = int(hours)
hours = 0
minutes, rest = times
seconds, fractional = rest.split(decimal_marker)
minutes = int(minutes)
seconds = int(seconds)
fractional_seconds = float("0." + fractional)
return hours * 3600 + minutes * 60 + seconds + fractional_seconds
def get_start(segments: List[dict]) -> Optional[float]:
return next(
(w["start"] for s in segments for w in s["words"]),
segments[0]["start"] if segments else None,
def get_end(segments: List[dict]) -> Optional[float]:
return next(
(w["end"] for s in reversed(segments) for w in reversed(s["words"])),
segments[-1]["end"] if segments else None,
class ResultWriter:
extension: str
def __init__(self, output_dir: str):
self.output_dir = output_dir
def __call__(
self, result: Union[dict, List[Segment]], output_file_name: str,
options: Optional[dict] = None, **kwargs
if isinstance(result, List) and result and isinstance(result[0], Segment):
result = {"segments": [seg.model_dump() for seg in result]}
output_path = os.path.join(
self.output_dir, output_file_name + "." + self.extension
with open(output_path, "w", encoding="utf-8") as f:
self.write_result(result, file=f, options=options, **kwargs)
def write_result(
self, result: dict, file: TextIO, options: Optional[dict] = None, **kwargs
raise NotImplementedError
class WriteTXT(ResultWriter):
extension: str = "txt"
def write_result(
self, result: Union[Dict, List[Segment]], file: TextIO, options: Optional[dict] = None, **kwargs
for segment in result["segments"]:
print(segment["text"].strip(), file=file, flush=True)
class SubtitlesWriter(ResultWriter):
always_include_hours: bool
decimal_marker: str
def iterate_result(
result: dict,
options: Optional[dict] = None,
max_line_width: Optional[int] = None,
max_line_count: Optional[int] = None,
highlight_words: bool = False,
align_lrc_words: bool = False,
max_words_per_line: Optional[int] = None,
options = options or {}
max_line_width = max_line_width or options.get("max_line_width")
max_line_count = max_line_count or options.get("max_line_count")
highlight_words = highlight_words or options.get("highlight_words", False)
align_lrc_words = align_lrc_words or options.get("align_lrc_words", False)
max_words_per_line = max_words_per_line or options.get("max_words_per_line")
preserve_segments = max_line_count is None or max_line_width is None
max_line_width = max_line_width or 1000
max_words_per_line = max_words_per_line or 1000
def iterate_subtitles():
line_len = 0
line_count = 1
# the next subtitle to yield (a list of word timings with whitespace)
subtitle: List[dict] = []
last: float = get_start(result["segments"]) or 0.0
for segment in result["segments"]:
chunk_index = 0
words_count = max_words_per_line
while chunk_index < len(segment["words"]):
remaining_words = len(segment["words"]) - chunk_index
if max_words_per_line > len(segment["words"]) - chunk_index:
words_count = remaining_words
for i, original_timing in enumerate(
segment["words"][chunk_index : chunk_index + words_count]
timing = original_timing.copy()
long_pause = (
not preserve_segments and timing["start"] - last > 3.0
has_room = line_len + len(timing["word"]) <= max_line_width
seg_break = i == 0 and len(subtitle) > 0 and preserve_segments
if (
line_len > 0
and has_room
and not long_pause
and not seg_break
# line continuation
line_len += len(timing["word"])
# new line
timing["word"] = timing["word"].strip()
if (
len(subtitle) > 0
and max_line_count is not None
and (long_pause or line_count >= max_line_count)
or seg_break
# subtitle break
yield subtitle
subtitle = []
line_count = 1
elif line_len > 0:
# line break
line_count += 1
timing["word"] = "\n" + timing["word"]
line_len = len(timing["word"].strip())
last = timing["start"]
chunk_index += max_words_per_line
if len(subtitle) > 0:
yield subtitle
if len(result["segments"]) > 0 and "words" in result["segments"][0] and result["segments"][0]["words"]:
for subtitle in iterate_subtitles():
subtitle_start = self.format_timestamp(subtitle[0]["start"])
subtitle_end = self.format_timestamp(subtitle[-1]["end"])
subtitle_text = "".join([word["word"] for word in subtitle])
if highlight_words:
last = subtitle_start
all_words = [timing["word"] for timing in subtitle]
for i, this_word in enumerate(subtitle):
start = self.format_timestamp(this_word["start"])
end = self.format_timestamp(this_word["end"])
if last != start:
yield last, start, subtitle_text
yield start, end, "".join(
re.sub(r"^(\s*)(.*)$", r"\1\2", word)
if j == i
else word
for j, word in enumerate(all_words)
last = end
if align_lrc_words:
lrc_aligned_words = [f"[{self.format_timestamp(sub['start'])}]{sub['word']}" for sub in subtitle]
l_start, l_end = self.format_timestamp(subtitle[-1]['start']), self.format_timestamp(subtitle[-1]['end'])
lrc_aligned_words[-1] = f"[{l_start}]{subtitle[-1]['word']}[{l_end}]"
lrc_aligned_words = ' '.join(lrc_aligned_words)
yield None, None, lrc_aligned_words
yield subtitle_start, subtitle_end, subtitle_text
for segment in result["segments"]:
segment_start = self.format_timestamp(segment["start"])
segment_end = self.format_timestamp(segment["end"])
segment_text = segment["text"].strip().replace("-->", "->")
yield segment_start, segment_end, segment_text
def format_timestamp(self, seconds: float):
return format_timestamp(
class WriteVTT(SubtitlesWriter):
extension: str = "vtt"
always_include_hours: bool = False
decimal_marker: str = "."
def write_result(
self, result: dict, file: TextIO, options: Optional[dict] = None, **kwargs
print("WEBVTT\n", file=file)
for start, end, text in self.iterate_result(result, options, **kwargs):
print(f"{start} --> {end}\n{text}\n", file=file, flush=True)
def to_segments(self, file_path: str) -> List[Segment]:
segments = []
blocks = read_file(file_path).split('\n\n')
for block in blocks:
if block.strip() != '' and not block.strip().startswith("WEBVTT"):
lines = block.strip().split('\n')
time_line = lines[0].split(" --> ")
start, end = time_str_to_seconds(time_line[0], self.decimal_marker), time_str_to_seconds(time_line[1], self.decimal_marker)
sentence = ' '.join(lines[1:])
return segments
class WriteSRT(SubtitlesWriter):
extension: str = "srt"
always_include_hours: bool = True
decimal_marker: str = ","
def write_result(
self, result: dict, file: TextIO, options: Optional[dict] = None, **kwargs
for i, (start, end, text) in enumerate(
self.iterate_result(result, options, **kwargs), start=1
print(f"{i}\n{start} --> {end}\n{text}\n", file=file, flush=True)
def to_segments(self, file_path: str) -> List[Segment]:
segments = []
blocks = read_file(file_path).split('\n\n')
for block in blocks:
if block.strip() != '':
lines = block.strip().split('\n')
index = lines[0]
time_line = lines[1].split(" --> ")
start, end = time_str_to_seconds(time_line[0], self.decimal_marker), time_str_to_seconds(time_line[1], self.decimal_marker)
sentence = ' '.join(lines[2:])
return segments
class WriteLRC(SubtitlesWriter):
extension: str = "lrc"
always_include_hours: bool = False
decimal_marker: str = "."
def write_result(
self, result: dict, file: TextIO, options: Optional[dict] = None, **kwargs
for i, (start, end, text) in enumerate(
self.iterate_result(result, options, **kwargs), start=1
if "align_lrc_words" in kwargs and kwargs["align_lrc_words"]:
print(f"{text}\n", file=file, flush=True)
print(f"[{start}]{text}[{end}]\n", file=file, flush=True)
def to_segments(self, file_path: str) -> List[Segment]:
segments = []
blocks = read_file(file_path).split('\n')
for block in blocks:
if block.strip() != '':
lines = block.strip()
pattern = r'(\[.*?\])'
parts = re.split(pattern, lines)
parts = [part.strip() for part in parts if part]
for i, part in enumerate(parts):
sentence_i = i%2
if sentence_i == 1:
start_str, text, end_str = parts[sentence_i-1], parts[sentence_i], parts[sentence_i+1]
start_str, end_str = start_str.replace("[", "").replace("]", ""), end_str.replace("[", "").replace("]", "")
start, end = time_str_to_seconds(start_str, self.decimal_marker), time_str_to_seconds(end_str, self.decimal_marker)
return segments
class WriteTSV(ResultWriter):
Write a transcript to a file in TSV (tab-separated values) format containing lines like:
Using integer milliseconds as start and end times means there's no chance of interference from
an environment setting a language encoding that causes the decimal in a floating point number
to appear as a comma; also is faster and more efficient to parse & store, e.g., in C++.
extension: str = "tsv"
def write_result(
self, result: dict, file: TextIO, options: Optional[dict] = None, **kwargs
print("start", "end", "text", sep="\t", file=file)
for segment in result["segments"]:
print(round(1000 * segment["start"]), file=file, end="\t")
print(round(1000 * segment["end"]), file=file, end="\t")
print(segment["text"].strip().replace("\t", " "), file=file, flush=True)
class WriteJSON(ResultWriter):
extension: str = "json"
def write_result(
self, result: dict, file: TextIO, options: Optional[dict] = None, **kwargs
json.dump(result, file)
def get_writer(
output_format: str, output_dir: str
) -> Callable[[dict, TextIO, dict], None]:
output_format = output_format.strip().lower().replace(".", "")
writers = {
"txt": WriteTXT,
"vtt": WriteVTT,
"srt": WriteSRT,
"tsv": WriteTSV,
"json": WriteJSON,
"lrc": WriteLRC
if output_format == "all":
all_writers = [writer(output_dir) for writer in writers.values()]
def write_all(
result: dict, file: TextIO, options: Optional[dict] = None, **kwargs
for writer in all_writers:
writer(result, file, options, **kwargs)
return write_all
return writers[output_format](output_dir)
def generate_file(
output_format: str, output_dir: str, result: Union[dict, List[Segment]], output_file_name: str,
add_timestamp: bool = True, **kwargs
) -> Tuple[str, str]:
output_format = output_format.strip().lower().replace(".", "")
output_format = "vtt" if output_format == "webvtt" else output_format
if add_timestamp:
timestamp = datetime.now().strftime("%m%d%H%M%S")
output_file_name += f"-{timestamp}"
file_path = os.path.join(output_dir, f"{output_file_name}.{output_format}")
file_writer = get_writer(output_format=output_format, output_dir=output_dir)
if isinstance(file_writer, WriteLRC) and kwargs.get("highlight_words", False):
kwargs["highlight_words"], kwargs["align_lrc_words"] = False, True
file_writer(result=result, output_file_name=output_file_name, **kwargs)
content = read_file(file_path)
return content, file_path
def safe_filename(name):
INVALID_FILENAME_CHARS = r'[<>:"/\\|?*\x00-\x1f]'
safe_name = re.sub(INVALID_FILENAME_CHARS, '_', name)
# Truncate the filename if it exceeds the max_length (20)
if len(safe_name) > 20:
file_extension = safe_name.split('.')[-1]
if len(file_extension) + 1 < 20:
truncated_name = safe_name[:20 - len(file_extension) - 1]
safe_name = truncated_name + '.' + file_extension
safe_name = safe_name[:20]
return safe_name