import os
import subprocess
import json
from datetime import timedelta
import tempfile
import re
import gradio as gr
import groq
from groq import Groq
# setup groq
client = Groq(api_key=os.environ.get("Groq_Api_Key"))
def handle_groq_error(e, model_name):
error_data = e.args[0]
if isinstance(error_data, str):
# Use regex to extract the JSON part of the string
json_match = re.search(r'(\{.*\})', error_data)
if json_match:
json_str = json_match.group(1)
# Ensure the JSON string is well-formed
json_str = json_str.replace("'", '"') # Replace single quotes with double quotes
error_data = json.loads(json_str)
if isinstance(e, groq.AuthenticationError):
if isinstance(error_data, dict) and 'error' in error_data and 'message' in error_data['error']:
error_message = error_data['error']['message']
raise gr.Error(error_message)
elif isinstance(e, groq.RateLimitError):
if isinstance(error_data, dict) and 'error' in error_data and 'message' in error_data['error']:
error_message = error_data['error']['message']
error_message = re.sub(r'org_[a-zA-Z0-9]+', 'org_(censored)', error_message) # censor org
raise gr.Error(error_message)
else:
raise gr.Error(f"Error during Groq API call: {e}")
# language codes for subtitle maker
LANGUAGE_CODES = {
"English": "en",
"Chinese": "zh",
"German": "de",
"Spanish": "es",
"Russian": "ru",
"Korean": "ko",
"French": "fr",
"Japanese": "ja",
"Portuguese": "pt",
"Turkish": "tr",
"Polish": "pl",
"Catalan": "ca",
"Dutch": "nl",
"Arabic": "ar",
"Swedish": "sv",
"Italian": "it",
"Indonesian": "id",
"Hindi": "hi",
"Finnish": "fi",
"Vietnamese": "vi",
"Hebrew": "he",
"Ukrainian": "uk",
"Greek": "el",
"Malay": "ms",
"Czech": "cs",
"Romanian": "ro",
"Danish": "da",
"Hungarian": "hu",
"Tamil": "ta",
"Norwegian": "no",
"Thai": "th",
"Urdu": "ur",
"Croatian": "hr",
"Bulgarian": "bg",
"Lithuanian": "lt",
"Latin": "la",
"Māori": "mi",
"Malayalam": "ml",
"Welsh": "cy",
"Slovak": "sk",
"Telugu": "te",
"Persian": "fa",
"Latvian": "lv",
"Bengali": "bn",
"Serbian": "sr",
"Azerbaijani": "az",
"Slovenian": "sl",
"Kannada": "kn",
"Estonian": "et",
"Macedonian": "mk",
"Breton": "br",
"Basque": "eu",
"Icelandic": "is",
"Armenian": "hy",
"Nepali": "ne",
"Mongolian": "mn",
"Bosnian": "bs",
"Kazakh": "kk",
"Albanian": "sq",
"Swahili": "sw",
"Galician": "gl",
"Marathi": "mr",
"Panjabi": "pa",
"Sinhala": "si",
"Khmer": "km",
"Shona": "sn",
"Yoruba": "yo",
"Somali": "so",
"Afrikaans": "af",
"Occitan": "oc",
"Georgian": "ka",
"Belarusian": "be",
"Tajik": "tg",
"Sindhi": "sd",
"Gujarati": "gu",
"Amharic": "am",
"Yiddish": "yi",
"Lao": "lo",
"Uzbek": "uz",
"Faroese": "fo",
"Haitian": "ht",
"Pashto": "ps",
"Turkmen": "tk",
"Norwegian Nynorsk": "nn",
"Maltese": "mt",
"Sanskrit": "sa",
"Luxembourgish": "lb",
"Burmese": "my",
"Tibetan": "bo",
"Tagalog": "tl",
"Malagasy": "mg",
"Assamese": "as",
"Tatar": "tt",
"Hawaiian": "haw",
"Lingala": "ln",
"Hausa": "ha",
"Bashkir": "ba",
"jw": "jw",
"Sundanese": "su",
}
# helper functions
def split_audio(input_file_path, chunk_size_mb):
chunk_size = chunk_size_mb * 1024 * 1024 # Convert MB to bytes
file_number = 1
chunks = []
with open(input_file_path, 'rb') as f:
chunk = f.read(chunk_size)
while chunk:
chunk_name = f"{os.path.splitext(input_file_path)[0]}_part{file_number:03}.mp3" # Pad file number for correct ordering
with open(chunk_name, 'wb') as chunk_file:
chunk_file.write(chunk)
chunks.append(chunk_name)
file_number += 1
chunk = f.read(chunk_size)
return chunks
def merge_audio(chunks, output_file_path):
with open("temp_list.txt", "w") as f:
for file in chunks:
f.write(f"file '{file}'\n")
try:
subprocess.run(
[
"ffmpeg",
"-f",
"concat",
"-safe", "0",
"-i",
"temp_list.txt",
"-c",
"copy",
"-y",
output_file_path
],
check=True
)
os.remove("temp_list.txt")
for chunk in chunks:
os.remove(chunk)
except subprocess.CalledProcessError as e:
raise gr.Error(f"Error during audio merging: {e}")
# Checks file extension, size, and downsamples or splits if needed.
ALLOWED_FILE_EXTENSIONS = ["mp3", "mp4", "mpeg", "mpga", "m4a", "wav", "webm"]
MAX_FILE_SIZE_MB = 25
CHUNK_SIZE_MB = 25
def check_file(input_file_path):
if not input_file_path:
raise gr.Error("Please upload an audio/video file.")
file_size_mb = os.path.getsize(input_file_path) / (1024 * 1024)
file_extension = input_file_path.split(".")[-1].lower()
if file_extension not in ALLOWED_FILE_EXTENSIONS:
raise gr.Error(f"Invalid file type (.{file_extension}). Allowed types: {', '.join(ALLOWED_FILE_EXTENSIONS)}")
if file_size_mb > MAX_FILE_SIZE_MB:
gr.Warning(
f"File size too large ({file_size_mb:.2f} MB). Attempting to downsample to 16kHz MP3 128kbps. Maximum size allowed: {MAX_FILE_SIZE_MB} MB"
)
output_file_path = os.path.splitext(input_file_path)[0] + "_downsampled.mp3"
try:
subprocess.run(
[
"ffmpeg",
"-i",
input_file_path,
"-ar",
"16000",
"-ab",
"128k",
"-ac",
"1",
"-f",
"mp3",
"-y",
output_file_path,
],
check=True
)
# Check size after downsampling
downsampled_size_mb = os.path.getsize(output_file_path) / (1024 * 1024)
if downsampled_size_mb > MAX_FILE_SIZE_MB:
gr.Warning(f"File still too large after downsampling ({downsampled_size_mb:.2f} MB). Splitting into {CHUNK_SIZE_MB} MB chunks.")
return split_audio(output_file_path, CHUNK_SIZE_MB), "split"
return output_file_path, None
except subprocess.CalledProcessError as e:
raise gr.Error(f"Error during downsampling: {e}")
return input_file_path, None
# subtitle maker
def format_time(seconds_float):
# Calculate total whole seconds and milliseconds
total_seconds = int(seconds_float)
milliseconds = int((seconds_float - total_seconds) * 1000)
# Calculate hours, minutes, and remaining seconds
hours = total_seconds // 3600
minutes = (total_seconds % 3600) // 60
seconds = total_seconds % 60
return f"{hours:02}:{minutes:02}:{seconds:02},{milliseconds:03}"
def json_to_srt(transcription_json):
srt_lines = []
for segment in transcription_json:
start_time = format_time(segment['start'])
end_time = format_time(segment['end'])
text = segment['text']
srt_line = f"{segment['id']+1}\n{start_time} --> {end_time}\n{text}\n"
srt_lines.append(srt_line)
return '\n'.join(srt_lines)
def words_json_to_srt(words_data, starting_id=0):
srt_lines = []
previous_end_time = 0.0 # Keep track of the end time of the previous word
for i, word_entry in enumerate(words_data):
# Get original start and end times
start_seconds = word_entry['start']
end_seconds = word_entry['end']
# --- Overlap Prevention Logic ---
# Ensure the start time is not before the previous word ended
start_seconds = max(start_seconds, previous_end_time)
# Ensure the end time is not before the start time (can happen with adjustments)
# And add a tiny minimum duration (e.g., 50ms) if start and end are identical,
# otherwise the subtitle might flash too quickly or be ignored by players.
min_duration = 0.050 # 50 milliseconds
if end_seconds <= start_seconds:
end_seconds = start_seconds + min_duration
# --- End of Overlap Prevention ---
# Format the potentially adjusted times
start_time_fmt = format_time(start_seconds)
end_time_fmt = format_time(end_seconds)
text = word_entry['word']
srt_id = starting_id + i + 1
srt_line = f"{srt_id}\n{start_time_fmt} --> {end_time_fmt}\n{text}\n"
srt_lines.append(srt_line)
# Update previous_end_time for the next iteration using the *adjusted* end time
previous_end_time = end_seconds
return '\n'.join(srt_lines)
def generate_subtitles(input_file, prompt, timestamp_granularities_str, language, auto_detect_language, model, include_video, font_selection, font_file, font_color, font_size, outline_thickness, outline_color):
input_file_path = input_file
processed_path, split_status = check_file(input_file_path)
full_srt_content = "" # Used for accumulating SRT content string for split files
srt_chunks_paths = [] # Used to store paths of individual SRT chunk files for merging
video_chunks = [] # Used to store paths of video chunks with embedded subs
total_duration = 0 # Cumulative duration for timestamp adjustment in split files
srt_entry_offset = 0 # Cumulative SRT entry count (words or segments) for ID adjustment
# transforms the gradio dropdown choice str to a python list needed for the groq api
timestamp_granularities_list = [gran.strip() for gran in timestamp_granularities_str.split(',') if gran.strip()]
# Determine primary granularity for logic (prefer word if both specified, else segment)
primary_granularity = "word" if "word" in timestamp_granularities_list else "segment"
# handling splitted files or single ones
if split_status == "split":
for i, chunk_path in enumerate(processed_path):
chunk_srt_content = "" # SRT content for the current chunk
temp_srt_path = f"{os.path.splitext(chunk_path)[0]}.srt" # Path for this chunk's SRT file
try:
gr.Info(f"Processing chunk {i+1}/{len(processed_path)}...")
with open(chunk_path, "rb") as file:
transcription_json_response = client.audio.transcriptions.create(
file=(os.path.basename(chunk_path), file.read()),
model=model,
prompt=prompt,
response_format="verbose_json",
timestamp_granularities=timestamp_granularities_list,
language=None if auto_detect_language else language,
temperature=0.0,
)
if primary_granularity == "word":
word_data = transcription_json_response.words
if word_data:
# Adjust timestamps BEFORE generating SRT
adjusted_word_data = []
for entry in word_data:
adjusted_entry = entry.copy()
adjusted_entry['start'] += total_duration
adjusted_entry['end'] += total_duration
adjusted_word_data.append(adjusted_entry)
# Generate SRT using adjusted data and current offset
chunk_srt_content = words_json_to_srt(adjusted_word_data, srt_entry_offset)
# Update offsets for the *next* chunk
total_duration = adjusted_word_data[-1]['end'] # Use adjusted end time
srt_entry_offset += len(word_data) # Increment by number of words in this chunk
else:
gr.Warning(f"API returned no word timestamps for chunk {i+1}.")
elif primary_granularity == "segment":
segment_data = transcription_json_response.segments
if segment_data:
# Adjust timestamps and IDs BEFORE generating SRT
adjusted_segment_data = []
max_original_id = -1
for entry in segment_data:
adjusted_entry = entry.copy()
adjusted_entry['start'] += total_duration
adjusted_entry['end'] += total_duration
max_original_id = max(max_original_id, adjusted_entry['id']) # Track max original ID for offset calc
adjusted_entry['id'] += srt_entry_offset # Adjust ID for SRT generation
adjusted_segment_data.append(adjusted_entry)
# Generate SRT using adjusted data
chunk_srt_content = json_to_srt(adjusted_segment_data) # json_to_srt uses the 'id' field directly
# Update offsets for the *next* chunk
total_duration = adjusted_segment_data[-1]['end'] # Use adjusted end time
srt_entry_offset += (max_original_id + 1) # Increment by number of segments in this chunk (based on original IDs)
else:
gr.Warning(f"API returned no segment timestamps for chunk {i+1}.")
else:
# This case should ideally not be reached due to dropdown default/logic
gr.Warning(f"Invalid timestamp granularity for chunk {i+1}. Skipping SRT generation for this chunk.")
# Write and store path for this chunk's SRT file if content exists
if chunk_srt_content:
with open(temp_srt_path, "w", encoding="utf-8") as temp_srt_file:
temp_srt_file.write(chunk_srt_content)
srt_chunks_paths.append(temp_srt_path)
full_srt_content += chunk_srt_content # Append to the full content string as well
# Video embedding for the chunk
if include_video and input_file_path.lower().endswith((".mp4", ".webm")):
try:
output_video_chunk_path = chunk_path.replace(os.path.splitext(chunk_path)[1], "_with_subs" + os.path.splitext(chunk_path)[1])
# Handle font selection
font_name = None
font_dir = None
if font_selection == "Custom Font File" and font_file:
font_name = os.path.splitext(os.path.basename(font_file.name))[0]
font_dir = os.path.dirname(font_file.name)
elif font_selection == "Custom Font File" and not font_file:
gr.Warning(f"Custom Font File selected but none uploaded. Using default font for chunk {i+1}.")
# FFmpeg command for the chunk
subprocess.run(
[
"ffmpeg", "-y", "-i", chunk_path,
"-vf", f"subtitles={temp_srt_path}:fontsdir={font_dir}:force_style='FontName={font_name},Fontsize={int(font_size)},PrimaryColour=&H{font_color[1:]}&,OutlineColour=&H{outline_color[1:]}&,BorderStyle={int(outline_thickness)},Outline=1'",
"-preset", "fast", output_video_chunk_path,
], check=True,
)
video_chunks.append(output_video_chunk_path)
except subprocess.CalledProcessError as e:
# Warn but continue processing other chunks
gr.Warning(f"Error adding subtitles to video chunk {i+1}: {e}. Skipping video for this chunk.")
except Exception as e: # Catch other potential errors during font handling etc.
gr.Warning(f"Error preparing subtitle style for video chunk {i+1}: {e}. Skipping video for this chunk.")
elif include_video and i == 0: # Show warning only once for non-video input
gr.Warning(f"Include Video checked, but input isn't MP4/WebM. Only SRT will be generated.", duration=15)
except groq.AuthenticationError as e:
handle_groq_error(e, model) # This will raise gr.Error and stop execution
except groq.RateLimitError as e:
handle_groq_error(e, model) # This will raise gr.Error and stop execution
except Exception as e:
gr.Warning(f"Error processing chunk {i+1}: {e}. Skipping this chunk.")
# Remove potentially incomplete SRT for this chunk if it exists
if os.path.exists(temp_srt_path):
try: os.remove(temp_srt_path)
except: pass
continue # Move to the next chunk
# After processing all chunks
final_srt_path = None
final_video_path = None
# Merge SRT chunks if any were created
if srt_chunks_paths:
final_srt_path = os.path.splitext(input_file_path)[0] + "_final.srt"
gr.Info("Merging SRT chunks...")
with open(final_srt_path, 'w', encoding="utf-8") as outfile:
# Use the full_srt_content string which ensures correct order and content
outfile.write(full_srt_content)
# Clean up individual srt chunks paths
for srt_chunk_file in srt_chunks_paths:
try: os.remove(srt_chunk_file)
except: pass
# Clean up intermediate audio chunks used for transcription
for chunk in processed_path:
try: os.remove(chunk)
except: pass
else:
gr.Warning("No SRT content was generated from any chunk.")
# Merge video chunks if any were created
if video_chunks:
# Check if number of video chunks matches expected number based on successful SRT generation
if len(video_chunks) != len(srt_chunks_paths):
gr.Warning("Mismatch between successful SRT chunks and video chunks created. Video merge might be incomplete.")
final_video_path = os.path.splitext(input_file_path)[0] + '_merged_video_with_subs.mp4' # More descriptive name
gr.Info("Merging video chunks...")
try:
merge_audio(video_chunks, final_video_path) # Re-using merge_audio logic for video files
# video_chunks are removed inside merge_audio if successful
except Exception as e:
gr.Error(f"Failed to merge video chunks: {e}")
final_video_path = None # Indicate failure
return final_srt_path, final_video_path
else: # Single file processing (no splitting)
final_srt_path = None
final_video_path = None
temp_srt_path = os.path.splitext(processed_path)[0] + ".srt" # Use processed_path for naming
try:
gr.Info("Processing file...")
with open(processed_path, "rb") as file:
transcription_json_response = client.audio.transcriptions.create(
file=(os.path.basename(processed_path), file.read()),
model=model,
prompt=prompt,
response_format="verbose_json",
timestamp_granularities=timestamp_granularities_list,
language=None if auto_detect_language else language,
temperature=0.0,
)
srt_content = "" # Initialize
if primary_granularity == "word":
word_data = transcription_json_response.words
if word_data:
srt_content = words_json_to_srt(word_data, 0) # Start IDs from 0
else:
gr.Warning("API returned no word timestamps.")
elif primary_granularity == "segment":
segment_data = transcription_json_response.segments
if segment_data:
# No need to adjust IDs/timestamps for single file
srt_content = json_to_srt(segment_data)
else:
gr.Warning("API returned no segment timestamps.")
else:
# Should not happen
gr.Warning("Invalid timestamp granularity selected. Skipping SRT generation.")
# Write SRT file if content exists
if srt_content:
with open(temp_srt_path, "w", encoding="utf-8") as temp_srt_file:
temp_srt_file.write(srt_content)
final_srt_path = temp_srt_path # Set the final path
# Video embedding logic
if include_video and input_file_path.lower().endswith((".mp4", ".webm")):
try:
output_video_path = processed_path.replace(
os.path.splitext(processed_path)[1], "_with_subs" + os.path.splitext(processed_path)[1]
)
# Handle font selection
font_name = None
font_dir = None
if font_selection == "Custom Font File" and font_file:
font_name = os.path.splitext(os.path.basename(font_file.name))[0]
font_dir = os.path.dirname(font_file.name)
elif font_selection == "Custom Font File" and not font_file:
gr.Warning(f"Custom Font File selected but none uploaded. Using default font.")
# FFmpeg command
gr.Info("Adding subtitles to video...")
subprocess.run(
[
"ffmpeg", "-y", "-i", processed_path, # Use processed_path as input
"-vf", f"subtitles={temp_srt_path}:fontsdir={font_dir}:force_style='FontName={font_name},Fontsize={int(font_size)},PrimaryColour=&H{font_color[1:]}&,OutlineColour=&H{outline_color[1:]}&,BorderStyle={int(outline_thickness)},Outline=1'",
"-preset", "fast", output_video_path,
], check=True,
)
final_video_path = output_video_path
except subprocess.CalledProcessError as e:
gr.Error(f"Error during subtitle addition: {e}")
# Keep SRT file, but no video output
final_video_path = None
except Exception as e:
gr.Error(f"Error preparing subtitle style for video: {e}")
final_video_path = None
elif include_video:
# Warning for non-video input shown once
gr.Warning(f"Include Video checked, but input isn't MP4/WebM. Only SRT will be generated.", duration=15)
# Clean up downsampled file if it was created and different from original input
if processed_path != input_file_path and os.path.exists(processed_path):
try: os.remove(processed_path)
except: pass
return final_srt_path, final_video_path # Return paths (video might be None)
else: # No SRT content generated
gr.Warning("No SRT content could be generated.")
# Clean up downsampled file if created
if processed_path != input_file_path and os.path.exists(processed_path):
try: os.remove(processed_path)
except: pass
return None, None # Return None for both outputs
except groq.AuthenticationError as e:
handle_groq_error(e, model)
except groq.RateLimitError as e:
handle_groq_error(e, model)
except Exception as e: # Catch any other error during single file processing
# Clean up downsampled file if created
if processed_path != input_file_path and os.path.exists(processed_path):
try: os.remove(processed_path)
except: pass
# Clean up potentially created empty SRT
if os.path.exists(temp_srt_path):
try: os.remove(temp_srt_path)
except: pass
raise gr.Error(f"An unexpected error occurred: {e}")
theme = gr.themes.Soft(
primary_hue="sky",
secondary_hue="blue",
neutral_hue="neutral"
).set(
border_color_primary='*neutral_300',
block_border_width='1px',
block_border_width_dark='1px',
block_title_border_color='*secondary_100',
block_title_border_color_dark='*secondary_200',
input_background_fill_focus='*secondary_300',
input_border_color='*border_color_primary',
input_border_color_focus='*secondary_500',
input_border_width='1px',
input_border_width_dark='1px',
slider_color='*secondary_500',
slider_color_dark='*secondary_600'
)
css = """
.gradio-container{max-width: 1400px !important}
h1{text-align:center}
.extra-option {
display: none;
}
.extra-option.visible {
display: block;
}
"""
with gr.Blocks(theme=theme, css=css) as interface:
gr.Markdown(
"""
# Fast Subtitle Maker
Inference by Groq API
If you are having API Rate Limit issues, you can retry later based on the [rate limits](https://console.groq.com/docs/rate-limits) or with your own API Key