|
import gradio as gr |
|
import librosa |
|
import numpy as np |
|
import re |
|
import os |
|
import time |
|
import struct |
|
import subprocess |
|
import soundfile as sf |
|
import matplotlib.font_manager as fm |
|
from PIL import ImageFont |
|
from typing import Tuple, List, Dict, Set |
|
from mutagen.flac import FLAC |
|
from moviepy import CompositeVideoClip, TextClip, VideoClip, AudioFileClip, ImageClip |
|
|
|
|
|
def get_font_display_name(font_path: str) -> Tuple[str, str]: |
|
""" |
|
A robust TTF/TTC parser based on the user's final design. |
|
It reads the 'name' table to find the localized "Full Font Name" (nameID=4). |
|
Returns a tuple of (display_name, language_tag {'zh'/'ja'/'ko'/'en'/'other'}). |
|
""" |
|
def decode_name_string(name_bytes: bytes, platform_id: int, encoding_id: int) -> str: |
|
"""Decodes the name string based on platform and encoding IDs.""" |
|
try: |
|
if platform_id == 3 and encoding_id in [1, 10]: |
|
return name_bytes.decode('utf_16_be').strip('\x00') |
|
elif platform_id == 1 and encoding_id == 0: |
|
return name_bytes.decode('mac_roman').strip('\x00') |
|
elif platform_id == 0: |
|
return name_bytes.decode('utf_16_be').strip('\x00') |
|
else: |
|
return name_bytes.decode('utf_8', errors='ignore').strip('\x00') |
|
except Exception: |
|
return None |
|
|
|
try: |
|
with open(font_path, 'rb') as f: data = f.read() |
|
def read_ushort(offset): |
|
return struct.unpack('>H', data[offset:offset+2])[0] |
|
def read_ulong(offset): |
|
return struct.unpack('>I', data[offset:offset+4])[0] |
|
font_offsets = [0] |
|
|
|
if data[:4] == b'ttcf': |
|
num_fonts = read_ulong(8) |
|
font_offsets = [read_ulong(12 + i * 4) for i in range(num_fonts)] |
|
|
|
|
|
font_offset = font_offsets[0] |
|
|
|
num_tables = read_ushort(font_offset + 4) |
|
name_table_offset = -1 |
|
|
|
for i in range(num_tables): |
|
entry_offset = font_offset + 12 + i * 16 |
|
tag = data[entry_offset:entry_offset+4] |
|
if tag == b'name': |
|
name_table_offset = read_ulong(entry_offset + 8) |
|
break |
|
|
|
if name_table_offset == -1: |
|
return None, None |
|
|
|
count, string_offset = read_ushort(name_table_offset + 2), read_ushort(name_table_offset + 4) |
|
name_candidates = {} |
|
|
|
for i in range(count): |
|
rec_offset = name_table_offset + 6 + i * 12 |
|
platform_id, encoding_id, language_id, name_id, length, offset = struct.unpack('>HHHHHH', data[rec_offset:rec_offset+12]) |
|
|
|
if name_id == 4: |
|
string_pos = name_table_offset + string_offset + offset |
|
value = decode_name_string(data[string_pos : string_pos + length], platform_id, encoding_id) |
|
|
|
if value: |
|
|
|
if language_id in [1028, 2052, 3076, 4100, 5124]: |
|
name_candidates["zh"] = value |
|
elif language_id == 1041: |
|
name_candidates["ja"] = value |
|
elif language_id == 1042: |
|
name_candidates["ko"] = value |
|
elif language_id in [1033, 0]: |
|
name_candidates["en"] = value |
|
else: |
|
if "other" not in name_candidates: |
|
name_candidates["other"] = value |
|
|
|
|
|
if name_candidates.get("zh"): |
|
return name_candidates.get("zh"), "zh" |
|
if name_candidates.get("ja"): |
|
return name_candidates.get("ja"), "ja" |
|
if name_candidates.get("ko"): |
|
return name_candidates.get("ko"), "ko" |
|
if name_candidates.get("other"): |
|
return name_candidates.get("other"), "other" |
|
if name_candidates.get("en"): |
|
return name_candidates.get("en"), "en" |
|
return None, None |
|
|
|
except Exception: |
|
return None, None |
|
|
|
def get_font_data() -> Tuple[Dict[str, str], List[str]]: |
|
""" |
|
Scans system fonts, parses their display names, and returns a sorted list |
|
with a corresponding name-to-path map. |
|
""" |
|
font_map = {} |
|
found_names = [] |
|
|
|
|
|
ttf_files = fm.findSystemFonts(fontpaths=None, fontext='ttf') |
|
ttc_files = fm.findSystemFonts(fontpaths=None, fontext='ttc') |
|
all_font_files = list(set(ttf_files + ttc_files)) |
|
|
|
for path in all_font_files: |
|
display_name, lang_tag = get_font_display_name(path) |
|
is_fallback = display_name is None |
|
|
|
if is_fallback: |
|
|
|
display_name = os.path.splitext(os.path.basename(path))[0].replace('-', ' ').replace('_', ' ').title() |
|
lang_tag = 'fallback' |
|
|
|
if display_name and display_name not in font_map: |
|
font_map[display_name] = path |
|
found_names.append((display_name, is_fallback, lang_tag)) |
|
|
|
|
|
sort_order = {'zh': 0, 'ja': 1, 'ko': 2, 'en': 3, 'other': 4, 'fallback': 5} |
|
|
|
|
|
found_names.sort(key=lambda x: (sort_order.get(x[2], 99), x[0])) |
|
|
|
sorted_display_names = [name for name, _, _ in found_names] |
|
return font_map, sorted_display_names |
|
|
|
print("Scanning system fonts and parsing names...") |
|
SYSTEM_FONTS_MAP, FONT_DISPLAY_NAMES = get_font_data() |
|
print(f"Scan complete. Found {len(FONT_DISPLAY_NAMES)} available fonts.") |
|
|
|
|
|
|
|
def cue_time_to_seconds(time_str: str) -> float: |
|
try: |
|
minutes, seconds, frames = map(int, time_str.split(':')) |
|
return minutes * 60 + seconds + frames / 75.0 |
|
except ValueError: |
|
return 0.0 |
|
|
|
def parse_cue_sheet_manually(cue_data: str) -> List[Dict[str, any]]: |
|
tracks = [] |
|
current_track_info = None |
|
for line in cue_data.splitlines(): |
|
line = line.strip() |
|
if line.upper().startswith('TRACK'): |
|
if current_track_info and 'title' in current_track_info and 'start_time' in current_track_info: |
|
tracks.append(current_track_info) |
|
current_track_info = {} |
|
continue |
|
if current_track_info is not None: |
|
title_match = re.search(r'TITLE\s+"(.*?)"', line, re.IGNORECASE) |
|
if title_match: |
|
current_track_info['title'] = title_match.group(1) |
|
continue |
|
index_match = re.search(r'INDEX\s+01\s+(\d{2}:\d{2}:\d{2})', line, re.IGNORECASE) |
|
if index_match: |
|
current_track_info['start_time'] = cue_time_to_seconds(index_match.group(1)) |
|
continue |
|
if current_track_info and 'title' in current_track_info and 'start_time' in current_track_info: |
|
tracks.append(current_track_info) |
|
return tracks |
|
|
|
|
|
|
|
def increase_video_framerate(input_path: str, output_path: str, target_fps: int = 24): |
|
""" |
|
Uses FFmpeg to increase the video's framerate without re-encoding. |
|
This is extremely fast as it only copies streams and changes metadata. |
|
|
|
Args: |
|
input_path (str): Path to the low-framerate video file. |
|
output_path (str): Path for the final, high-framerate video file. |
|
target_fps (int): The desired output framerate. |
|
""" |
|
print(f"Increasing framerate of '{input_path}' to {target_fps} FPS...") |
|
|
|
|
|
command = [ |
|
'ffmpeg', |
|
'-y', |
|
'-i', input_path, |
|
'-map', '0', |
|
'-vf', f'fps={target_fps}', |
|
'-c:v', 'libx264', |
|
'-preset', 'fast', |
|
'-crf', '18', |
|
'-c:a', 'copy', |
|
output_path |
|
] |
|
|
|
try: |
|
|
|
|
|
result = subprocess.run(command, check=True, capture_output=True, text=True) |
|
print("Framerate increase successful.") |
|
except FileNotFoundError: |
|
|
|
raise gr.Error("FFmpeg not found. Please ensure FFmpeg is installed and accessible in your system's PATH.") |
|
except subprocess.CalledProcessError as e: |
|
|
|
print("FFmpeg error output:\n", e.stderr) |
|
raise gr.Error(f"FFmpeg failed to increase the framerate. See console for details. Error: {e.stderr}") |
|
|
|
|
|
|
|
def parse_track_ranges(range_str: str) -> Set[int]: |
|
"""Parses a string like '1-4, 7, 10-13' into a set of integers.""" |
|
if not range_str: |
|
return set() |
|
|
|
indices = set() |
|
parts = range_str.split(',') |
|
for part in parts: |
|
part = part.strip() |
|
if not part: |
|
continue |
|
if '-' in part: |
|
try: |
|
start, end = map(int, part.split('-')) |
|
indices.update(range(start, end + 1)) |
|
except ValueError: |
|
print(f"Warning: Could not parse range '{part}'. Skipping.") |
|
else: |
|
try: |
|
indices.add(int(part)) |
|
except ValueError: |
|
print(f"Warning: Could not parse track number '{part}'. Skipping.") |
|
return indices |
|
|
|
|
|
|
|
def process_audio_to_video(*args, progress=gr.Progress(track_tqdm=True)): |
|
|
|
MAX_GROUPS = 10 |
|
|
|
|
|
audio_files = args[0] |
|
|
|
|
|
all_track_strs = args[1 : 1 + MAX_GROUPS] |
|
all_image_lists = args[1 + MAX_GROUPS : 1 + MAX_GROUPS * 2] |
|
|
|
|
|
group_definitions = [] |
|
for i in range(MAX_GROUPS): |
|
group_definitions.append({ |
|
"tracks_str": all_track_strs[i], |
|
"images": all_image_lists[i] |
|
}) |
|
|
|
|
|
arg_offset = 1 + MAX_GROUPS * 2 |
|
fallback_images = args[arg_offset] |
|
format_double_digits = args[arg_offset + 1] |
|
video_width = args[arg_offset + 2] |
|
video_height = args[arg_offset + 3] |
|
spec_fg_color = args[arg_offset + 4] |
|
spec_bg_color = args[arg_offset + 5] |
|
|
|
|
|
n_bands = int(args[arg_offset + 6]) |
|
bar_spacing = int(args[arg_offset + 7]) |
|
mirror_mode = args[arg_offset + 8] |
|
bar_style = args[arg_offset + 9] |
|
num_blocks = int(args[arg_offset + 10]) |
|
|
|
|
|
font_name = args[arg_offset + 11] |
|
font_size = args[arg_offset + 12] |
|
font_color = args[arg_offset + 13] |
|
font_bg_color = args[arg_offset + 14] |
|
font_bg_alpha = args[arg_offset + 15] |
|
pos_h = args[arg_offset + 16] |
|
pos_v = args[arg_offset + 17] |
|
|
|
|
|
if not audio_files: |
|
raise gr.Error("Please upload at least one audio file.") |
|
if not font_name: |
|
raise gr.Error("Please select a font from the list.") |
|
|
|
progress(0, desc="Initializing...") |
|
|
|
|
|
timestamp = int(time.time()) |
|
temp_fps1_path = f"temp_{timestamp}_fps1.mp4" |
|
temp_audio_path = f"temp_combined_audio_{timestamp}.wav" |
|
final_output_path = f"final_video_{timestamp}_fps24.mp4" |
|
|
|
WIDTH, HEIGHT = int(video_width), int(video_height) |
|
RENDER_FPS = 1 |
|
PLAYBACK_FPS = 24 |
|
|
|
|
|
def parse_color_to_rgb(color_str: str) -> Tuple[int, int, int]: |
|
""" |
|
Parses a color string which can be in hex format (#RRGGBB) or |
|
rgb format (e.g., "rgb(255, 128, 0)"). |
|
Returns a tuple of (R, G, B). |
|
""" |
|
color_str = color_str.strip() |
|
if color_str.startswith('#'): |
|
|
|
hex_val = color_str.lstrip('#') |
|
if len(hex_val) == 3: |
|
hex_val = "".join([c*2 for c in hex_val]) |
|
return tuple(int(hex_val[i:i+2], 16) for i in (0, 2, 4)) |
|
elif color_str.startswith('rgb'): |
|
|
|
try: |
|
numbers = re.findall(r'\d+', color_str) |
|
return tuple(int(n) for n in numbers[:3]) |
|
except (ValueError, IndexError): |
|
raise ValueError(f"Could not parse rgb color string: {color_str}") |
|
else: |
|
raise ValueError(f"Unknown color format: {color_str}") |
|
|
|
|
|
fg_rgb, bg_rgb = parse_color_to_rgb(spec_fg_color), parse_color_to_rgb(spec_bg_color) |
|
grid_rgb = tuple(min(c + 40, 255) for c in bg_rgb) |
|
|
|
|
|
try: |
|
|
|
TOTAL_STEPS = 5 |
|
|
|
|
|
master_track_list, y_accumulator, current_sr = [], [], None |
|
total_duration, global_track_counter = 0.0, 0 |
|
|
|
|
|
for file_idx, audio_path in enumerate(progress.tqdm(audio_files, desc=f"Stage 1/{TOTAL_STEPS}: Analyzing Audio Files")): |
|
|
|
y, sr = librosa.load(audio_path, sr=None, mono=False) |
|
|
|
|
|
if y.ndim == 1: |
|
print(f" - Converting mono file to stereo: {os.path.basename(audio_path)}") |
|
y = np.stack([y, y]) |
|
|
|
if current_sr is None: |
|
current_sr = sr |
|
if current_sr != sr: |
|
print(f"Warning: Sample rate mismatch for {os.path.basename(audio_path)}. Expected {current_sr}Hz, found {sr}Hz.") |
|
print(f"Resampling from {sr}Hz to {current_sr}Hz...") |
|
y = librosa.resample(y, orig_sr=sr, target_sr=current_sr) |
|
|
|
y_accumulator.append(y) |
|
|
|
file_duration = librosa.get_duration(y=y[0], sr=current_sr) |
|
|
|
|
|
cue_tracks = [] |
|
if audio_path.lower().endswith('.flac'): |
|
try: |
|
audio_meta = FLAC(audio_path) |
|
if 'cuesheet' in audio_meta.tags: |
|
cue_tracks = parse_cue_sheet_manually(audio_meta.tags['cuesheet'][0]) |
|
|
|
print(f"Successfully parsed {len(cue_tracks)} tracks from CUE sheet.") |
|
except Exception as e: |
|
print(f"Warning: Could not parse CUE sheet for {os.path.basename(audio_path)}: {e}") |
|
|
|
if cue_tracks: |
|
for track_idx, track in enumerate(cue_tracks): |
|
global_track_counter += 1 |
|
start_time = track.get('start_time', 0) |
|
end_time = cue_tracks[track_idx+1].get('start_time', file_duration) if track_idx + 1 < len(cue_tracks) else file_duration |
|
master_track_list.append({"global_index": global_track_counter, "title": track.get('title', 'Unknown'), "start_time": total_duration + start_time, "end_time": total_duration + end_time}) |
|
else: |
|
global_track_counter += 1 |
|
master_track_list.append({"global_index": global_track_counter, "title": os.path.splitext(os.path.basename(audio_path))[0], "start_time": total_duration, "end_time": total_duration + file_duration}) |
|
|
|
total_duration += file_duration |
|
|
|
|
|
y_combined = np.concatenate(y_accumulator, axis=1) |
|
duration = total_duration |
|
|
|
|
|
sf.write(temp_audio_path, y_combined.T, current_sr) |
|
print(f"Combined all audio files into one. Total duration: {duration:.2f}s") |
|
|
|
|
|
progress(1 / TOTAL_STEPS, desc=f"Stage 2/{TOTAL_STEPS}: Mapping Images to Tracks") |
|
|
|
|
|
parsed_groups = [parse_track_ranges(g['tracks_str']) for g in group_definitions] |
|
track_to_images_map = {} |
|
for track_info in master_track_list: |
|
track_idx = track_info['global_index'] |
|
assigned = False |
|
for i, group_indices in enumerate(parsed_groups): |
|
if track_idx in group_indices: |
|
track_to_images_map[track_idx] = group_definitions[i]['images'] |
|
assigned = True |
|
break |
|
if not assigned: |
|
track_to_images_map[track_idx] = fallback_images |
|
|
|
|
|
image_clips = [] |
|
if any(track_to_images_map.values()): |
|
current_track_cursor = 0 |
|
while current_track_cursor < len(master_track_list): |
|
start_track_info = master_track_list[current_track_cursor] |
|
image_set_for_block = track_to_images_map.get(start_track_info['global_index']) |
|
|
|
|
|
end_track_cursor = current_track_cursor |
|
while (end_track_cursor + 1 < len(master_track_list) and |
|
track_to_images_map.get(master_track_list[end_track_cursor + 1]['global_index']) == image_set_for_block): |
|
end_track_cursor += 1 |
|
|
|
end_track_info = master_track_list[end_track_cursor] |
|
|
|
block_start_time = start_track_info['start_time'] |
|
block_end_time = end_track_info['end_time'] |
|
block_duration = block_end_time - block_start_time |
|
|
|
if image_set_for_block and block_duration > 0: |
|
print(f"Creating image block for tracks {start_track_info['global_index']}-{end_track_info['global_index']} (Time: {block_start_time:.2f}s - {block_end_time:.2f}s)") |
|
time_per_image = block_duration / len(image_set_for_block) |
|
for i, img_path in enumerate(image_set_for_block): |
|
def create_image_layer(path, start, dur): |
|
try: |
|
img = ImageClip(path) |
|
scale = min(WIDTH/img.w, HEIGHT/img.h) |
|
resized_img = img.resized(scale) |
|
return CompositeVideoClip([resized_img.with_position("center")], size=(WIDTH, HEIGHT)).with_duration(dur).with_start(start) |
|
except Exception as e: |
|
print(f"Warning: Failed to process image '{path}'. Skipping. Error: {e}") |
|
return None |
|
|
|
clip = create_image_layer(img_path, block_start_time + i * time_per_image, time_per_image) |
|
if clip: |
|
image_clips.append(clip) |
|
|
|
current_track_cursor = end_track_cursor + 1 |
|
|
|
progress(2 / TOTAL_STEPS, desc=f"Stage 3/{TOTAL_STEPS}: Generating Text & Spectrogram") |
|
|
|
|
|
|
|
text_clips = [] |
|
|
|
font_path = SYSTEM_FONTS_MAP.get(font_name) |
|
if not font_path: |
|
raise gr.Error(f"Font path for '{font_name}' not found!") |
|
|
|
|
|
font_bg_rgb = parse_color_to_rgb(font_bg_color) |
|
|
|
position = (pos_h.lower(), pos_v.lower()) |
|
|
|
print(f"Using font: {font_name}, Size: {font_size}, Position: {position}") |
|
|
|
|
|
|
|
bg_color_tuple = (font_bg_rgb[0], font_bg_rgb[1], font_bg_rgb[2], int(font_bg_alpha * 255)) |
|
|
|
|
|
caption_width = int(WIDTH * 0.9) |
|
|
|
|
|
try: |
|
|
|
pil_font = ImageFont.truetype(font_path, size=font_size) |
|
_, descent = pil_font.getmetrics() |
|
|
|
|
|
|
|
bottom_margin = int(descent * 0.5) + 2 |
|
print(f"Font '{font_name}' descent: {descent}. Applying dynamic bottom margin of {bottom_margin}px.") |
|
except Exception as e: |
|
|
|
print(f"Warning: Could not get font metrics for '{font_name}'. Using fixed margin. Error: {e}") |
|
bottom_margin = int(WIDTH * 0.01) |
|
|
|
for track in master_track_list: |
|
text_duration = track['end_time'] - track['start_time'] |
|
if text_duration <= 0: |
|
continue |
|
|
|
|
|
num_str = f"{track['global_index']:02d}" if format_double_digits else str(track['global_index']) |
|
display_text = f"{num_str}. {track['title']}" |
|
|
|
|
|
|
|
txt_clip = TextClip( |
|
text=display_text.strip(), |
|
font_size=font_size, |
|
color=font_color, |
|
font=font_path, |
|
bg_color=bg_color_tuple, |
|
method='caption', |
|
size=(caption_width, None), |
|
margin=(0, 0, 0, bottom_margin) |
|
).with_position(position).with_duration(text_duration).with_start(track['start_time']) |
|
|
|
text_clips.append(txt_clip) |
|
|
|
N_FFT, HOP_LENGTH = 2048, 512 |
|
MIN_DB, MAX_DB = -80.0, 0.0 |
|
|
|
|
|
|
|
|
|
y_mono_for_spec = librosa.to_mono(y_combined) |
|
S_mel = librosa.feature.melspectrogram(y=y_mono_for_spec, sr=current_sr, n_fft=N_FFT, hop_length=HOP_LENGTH, n_mels=n_bands, fmax=current_sr/2) |
|
S_mel_db = librosa.power_to_db(S_mel, ref=np.max) |
|
|
|
|
|
BLOCK_SPACING = 2 |
|
if bar_style == 'Stacked Blocks': |
|
|
|
|
|
if mirror_mode == 'Vertical (Left/Right)': |
|
drawable_size = WIDTH // 2 |
|
elif mirror_mode == 'Horizontal (Top/Bottom)': |
|
drawable_size = HEIGHT // 2 |
|
else: |
|
drawable_size = HEIGHT |
|
total_block_pixel_size = drawable_size - ((num_blocks - 1) * BLOCK_SPACING) |
|
|
|
single_block_size = total_block_pixel_size / num_blocks |
|
|
|
|
|
def frame_generator(t): |
|
|
|
|
|
|
|
frame_bg = bg_rgb if not image_clips else (0,0,0) |
|
frame = np.full((HEIGHT, WIDTH, 3), frame_bg, dtype=np.uint8) |
|
|
|
|
|
if not image_clips: |
|
for i in range(1, 9): |
|
y_pos = int(i * (HEIGHT / 9)); frame[y_pos-1:y_pos, :] = grid_rgb |
|
|
|
|
|
|
|
if S_mel_db.shape[1] == 0: |
|
return frame |
|
|
|
|
|
|
|
|
|
time_idx = librosa.time_to_frames(t, sr=current_sr, hop_length=HOP_LENGTH) |
|
|
|
|
|
|
|
|
|
time_idx = min(time_idx, S_mel_db.shape[1] - 1) |
|
|
|
|
|
if mirror_mode == 'Vertical (Left/Right)': |
|
center_x = WIDTH // 2 |
|
max_pixel_length = WIDTH // 2 |
|
bar_height = HEIGHT / n_bands |
|
|
|
for i in range(n_bands): |
|
energy_db = S_mel_db[i, time_idx] |
|
norm_height = np.clip((energy_db - MIN_DB) / (MAX_DB - MIN_DB), 0, 1) |
|
if norm_height == 0: |
|
continue |
|
|
|
|
|
|
|
y_start = int(HEIGHT - (i + 1) * bar_height) |
|
y_end = int(HEIGHT - i * bar_height) |
|
|
|
|
|
y_start_with_spacing = y_start + bar_spacing |
|
|
|
|
|
if y_start_with_spacing >= y_end: |
|
continue |
|
|
|
if bar_style == 'Stacked Blocks': |
|
blocks_to_draw = int(norm_height * num_blocks) |
|
if blocks_to_draw == 0: |
|
continue |
|
|
|
for j in range(blocks_to_draw): |
|
block_left_x = center_x + (j * (single_block_size + BLOCK_SPACING)) |
|
block_right_x = block_left_x + single_block_size |
|
|
|
frame[y_start_with_spacing:y_end, int(block_left_x):int(block_right_x)] = fg_rgb |
|
|
|
frame[y_start_with_spacing:y_end, int(center_x - (block_right_x - center_x)):int(center_x - (block_left_x - center_x))] = fg_rgb |
|
else: |
|
bar_pixel_length = int(norm_height * max_pixel_length) |
|
if bar_pixel_length < 1: |
|
continue |
|
|
|
|
|
frame[y_start_with_spacing:y_end, center_x : center_x + bar_pixel_length] = fg_rgb |
|
|
|
frame[y_start_with_spacing:y_end, center_x - bar_pixel_length : center_x] = fg_rgb |
|
|
|
|
|
else: |
|
bar_width = WIDTH / n_bands |
|
is_horizontal_mirror = (mirror_mode == 'Horizontal (Top/Bottom)') |
|
|
|
|
|
if is_horizontal_mirror: |
|
center_y = HEIGHT // 2 |
|
max_pixel_height = HEIGHT // 2 |
|
else: |
|
center_y = HEIGHT |
|
max_pixel_height = HEIGHT |
|
|
|
|
|
for i in range(n_bands): |
|
energy_db = S_mel_db[i, time_idx] |
|
|
|
|
|
|
|
|
|
norm_height = np.clip((energy_db - MIN_DB) / (MAX_DB - MIN_DB), 0, 1) |
|
|
|
if norm_height == 0: |
|
continue |
|
|
|
|
|
x_start = int(i * bar_width) |
|
x_end = int((i + 1) * bar_width - bar_spacing) |
|
|
|
|
|
if bar_style == 'Stacked Blocks': |
|
|
|
blocks_to_draw = int(norm_height * num_blocks) |
|
if blocks_to_draw == 0: |
|
continue |
|
|
|
|
|
for j in range(blocks_to_draw): |
|
|
|
block_bottom_y = center_y - (j * (single_block_size + BLOCK_SPACING)) |
|
block_top_y = block_bottom_y - single_block_size |
|
frame[int(block_top_y):int(block_bottom_y), x_start:x_end] = fg_rgb |
|
|
|
if is_horizontal_mirror: |
|
frame[int(center_y + (center_y - block_bottom_y)):int(center_y + (center_y - block_top_y)), x_start:x_end] = fg_rgb |
|
else: |
|
|
|
bar_pixel_height = int(norm_height * max_pixel_height) |
|
|
|
if bar_pixel_height < 1: |
|
continue |
|
|
|
frame[center_y - bar_pixel_height : center_y, x_start:x_end] = fg_rgb |
|
|
|
if is_horizontal_mirror: |
|
frame[center_y : center_y + bar_pixel_height, x_start:x_end] = fg_rgb |
|
return frame |
|
|
|
video_clip = VideoClip(frame_function=frame_generator, duration=duration) |
|
|
|
|
|
|
|
if image_clips: |
|
print("Applying 50% opacity to spectrogram layer.") |
|
video_clip = video_clip.with_opacity(0.5) |
|
|
|
|
|
progress(3 / TOTAL_STEPS, desc=f"Stage 4/{TOTAL_STEPS}: Rendering Base Video") |
|
|
|
|
|
audio_clip = AudioFileClip(temp_audio_path) |
|
|
|
|
|
|
|
|
|
final_layers = image_clips + [video_clip] + text_clips |
|
final_clip = CompositeVideoClip(final_layers, size=(WIDTH, HEIGHT)).with_audio(audio_clip) |
|
|
|
|
|
print(f"Step 1/2: Rendering base video at {RENDER_FPS} FPS...") |
|
try: |
|
|
|
print("Attempting to copy audio stream directly...") |
|
final_clip.write_videofile( |
|
temp_fps1_path, codec="libx264", audio_codec="copy", fps=RENDER_FPS, |
|
logger='bar', threads=os.cpu_count(), preset='ultrafast' |
|
) |
|
print("Audio stream successfully copied!") |
|
except Exception: |
|
|
|
print("Direct audio copy failed, falling back to high-quality AAC encoding...") |
|
final_clip.write_videofile( |
|
temp_fps1_path, codec="libx264", audio_codec="aac", |
|
audio_bitrate="320k", fps=RENDER_FPS, |
|
logger='bar', threads=os.cpu_count(), preset='ultrafast') |
|
print("High-quality AAC audio encoding complete.") |
|
|
|
final_clip.close() |
|
|
|
|
|
print(f"\nStep 2/2: Remuxing video to {PLAYBACK_FPS} FPS...") |
|
|
|
|
|
progress(4 / TOTAL_STEPS, desc=f"Stage 5/{TOTAL_STEPS}: Finalizing Video") |
|
|
|
|
|
increase_video_framerate(temp_fps1_path, final_output_path, target_fps=PLAYBACK_FPS) |
|
|
|
return final_output_path |
|
|
|
except Exception as e: |
|
|
|
raise e |
|
finally: |
|
|
|
for f in [temp_fps1_path, temp_audio_path]: |
|
if os.path.exists(f): |
|
print(f"Cleaning up temporary file: {f}") |
|
os.remove(f) |
|
|
|
|
|
with gr.Blocks(title="Spectrogram Video Generator") as iface: |
|
gr.Markdown("# Spectrogram Video Generator") |
|
with gr.Row(): |
|
with gr.Column(scale=1): |
|
|
|
audio_inputs = gr.Files( |
|
label="Upload Audio File(s)", |
|
file_count="multiple", |
|
file_types=["audio"] |
|
) |
|
|
|
|
|
with gr.Accordion("Grouped Image Backgrounds (Advanced)", open=False): |
|
gr.Markdown("Define groups of tracks and assign specific images to them. Tracks are numbered globally starting from 1 across all uploaded files.") |
|
|
|
MAX_GROUPS = 10 |
|
group_track_inputs = [] |
|
group_image_inputs = [] |
|
group_accordions = [] |
|
|
|
|
|
def update_group_visibility(target_count: int): |
|
"""Updates the visibility of all group accordions and the state of the control buttons.""" |
|
|
|
target_count = max(1, min(target_count, MAX_GROUPS)) |
|
|
|
updates = {visible_groups_state: target_count} |
|
|
|
for i in range(MAX_GROUPS): |
|
updates[group_accordions[i]] = gr.update(visible=(i < target_count)) |
|
|
|
|
|
updates[add_group_btn] = gr.update(visible=(target_count < MAX_GROUPS)) |
|
updates[remove_group_btn] = gr.update(interactive=(target_count > 1)) |
|
|
|
return updates |
|
|
|
|
|
def add_group(current_count: int): |
|
return update_group_visibility(current_count + 1) |
|
|
|
def remove_group(current_count: int): |
|
return update_group_visibility(current_count - 1) |
|
|
|
|
|
for i in range(MAX_GROUPS): |
|
with gr.Accordion(f"Image Group {i+1}", open=False, visible=(i==0)) as acc: |
|
track_input = gr.Textbox(label=f"Tracks for Group {i+1} (e.g., '1-4, 7')") |
|
image_input = gr.Files(label=f"Images for Group {i+1}", file_count="multiple", file_types=[".png", ".jpg", ".jpeg", ".webp", ".avif"]) |
|
group_track_inputs.append(track_input) |
|
group_image_inputs.append(image_input) |
|
group_accordions.append(acc) |
|
|
|
visible_groups_state = gr.State(1) |
|
|
|
with gr.Row(): |
|
remove_group_btn = gr.Button("- Remove Last Group", variant="secondary", interactive=False) |
|
add_group_btn = gr.Button("+ Add Image Group", variant="secondary") |
|
|
|
with gr.Accordion("Fallback / Default Images", open=True): |
|
gr.Markdown("These images will be used for any tracks not assigned to a specific group above.") |
|
fallback_image_input = gr.Files(label="Fallback Images", file_count="multiple", file_types=[".png", ".jpg", ".jpeg", ".webp", ".avif"]) |
|
|
|
|
|
with gr.Accordion("General Visualizer Options", open=True): |
|
with gr.Row(): |
|
width_input = gr.Number(value=1920, label="Video Width (px)", precision=0) |
|
height_input = gr.Number(value=1080, label="Video Height (px)", precision=0) |
|
fg_color = gr.ColorPicker(value="#71808c", label="Spectrogram Bar Color") |
|
bg_color = gr.ColorPicker(value="#2C3E50", label="Background Color (if no images)") |
|
|
|
|
|
with gr.Accordion("Spectrogram Bar Style", open=True): |
|
n_bands_slider = gr.Slider(minimum=8, maximum=256, value=64, step=1, label="Number of Spectrogram Bars") |
|
bar_spacing_slider = gr.Slider(minimum=0, maximum=10, value=2, step=1, label="Bar/Block Spacing (px)") |
|
|
|
|
|
mirror_mode_radio = gr.Radio( |
|
choices=["Off", "Horizontal (Top/Bottom)", "Vertical (Left/Right)"], |
|
value="Off", |
|
label="Symmetry / Mirror Mode" |
|
) |
|
|
|
with gr.Row(): |
|
bar_style_radio = gr.Radio( |
|
choices=["Solid Bars", "Stacked Blocks"], |
|
value="Solid Bars", |
|
label="Bar Style" |
|
) |
|
num_blocks_slider = gr.Slider( |
|
minimum=5, maximum=50, value=20, step=1, |
|
label="Number of Blocks per Bar", |
|
visible=False |
|
) |
|
|
|
|
|
def update_block_slider_visibility(bar_style): |
|
return gr.update(visible=(bar_style == "Stacked Blocks")) |
|
|
|
bar_style_radio.change( |
|
fn=update_block_slider_visibility, |
|
inputs=bar_style_radio, |
|
outputs=num_blocks_slider |
|
) |
|
|
|
with gr.Accordion("Text Overlay Options", open=True): |
|
gr.Markdown( |
|
"**Note:** The title overlay feature automatically detects if a file has an embedded CUE sheet. If not, the filename will be used as the title." |
|
) |
|
gr.Markdown("---") |
|
|
|
format_double_digits_checkbox = gr.Checkbox(label="Format track numbers as double digits (e.g., 01, 05-09)", value=True) |
|
gr.Markdown("If the CUE sheet or filenames contain non-English characters, please select a compatible font.") |
|
|
|
|
|
|
|
preferred_fonts = [ |
|
"Yu Gothic", "游ゴシック", |
|
"MS Gothic", "MS ゴシック", |
|
"Meiryo", "メイリオ", |
|
"Hiragino Kaku Gothic ProN", |
|
"Microsoft JhengHei", |
|
"Arial" |
|
] |
|
default_font = None |
|
|
|
for font in preferred_fonts: |
|
for candidate in FONT_DISPLAY_NAMES: |
|
if candidate.startswith(font) or font in candidate: |
|
default_font = candidate |
|
break |
|
if default_font: |
|
break |
|
|
|
|
|
if not default_font and FONT_DISPLAY_NAMES: |
|
default_font = FONT_DISPLAY_NAMES[0] |
|
|
|
font_name_dd = gr.Dropdown(choices=FONT_DISPLAY_NAMES, value=default_font, label="Font Family") |
|
|
|
with gr.Row(): |
|
font_size_slider = gr.Slider(minimum=12, maximum=256, value=80, step=1, label="Font Size") |
|
font_color_picker = gr.ColorPicker(value="#FFFFFF", label="Font Color") |
|
|
|
with gr.Row(): |
|
font_bg_color_picker = gr.ColorPicker(value="#000000", label="Text BG Color") |
|
font_bg_alpha_slider = gr.Slider(minimum=0.0, maximum=1.0, value=0.6, step=0.05, label="Text BG Opacity") |
|
|
|
gr.Markdown("Text Position") |
|
with gr.Row(): |
|
pos_h_radio = gr.Radio(["left", "center", "right"], value="center", label="Horizontal Align") |
|
pos_v_radio = gr.Radio(["top", "center", "bottom"], value="bottom", label="Vertical Align") |
|
|
|
submit_btn = gr.Button("Generate Video", variant="primary") |
|
|
|
with gr.Column(scale=2): |
|
video_output = gr.Video(label="Generated Video") |
|
|
|
|
|
group_update_outputs = [visible_groups_state, add_group_btn, remove_group_btn] + group_accordions |
|
|
|
|
|
add_group_btn.click( |
|
fn=add_group, |
|
inputs=visible_groups_state, |
|
outputs=group_update_outputs |
|
) |
|
|
|
remove_group_btn.click( |
|
fn=remove_group, |
|
inputs=visible_groups_state, |
|
outputs=group_update_outputs |
|
) |
|
|
|
|
|
all_inputs = [audio_inputs] + group_track_inputs + group_image_inputs + [ |
|
fallback_image_input, |
|
format_double_digits_checkbox, |
|
width_input, height_input, |
|
fg_color, bg_color, |
|
|
|
n_bands_slider, bar_spacing_slider, mirror_mode_radio, |
|
bar_style_radio, num_blocks_slider, |
|
|
|
font_name_dd, font_size_slider, font_color_picker, |
|
font_bg_color_picker, font_bg_alpha_slider, |
|
pos_h_radio, pos_v_radio |
|
] |
|
|
|
submit_btn.click( |
|
fn=process_audio_to_video, |
|
inputs=all_inputs, |
|
outputs=video_output, |
|
show_progress="full" |
|
) |
|
|
|
if __name__ == "__main__": |
|
iface.launch(inbrowser=True) |