OpenSUNO / app.py
ginipick's picture
Update app.py
3469b26 verified
raw
history blame
18.7 kB
import gradio as gr
import subprocess
import os
import shutil
import tempfile
import torch
import logging
import numpy as np
import re
from concurrent.futures import ThreadPoolExecutor
from functools import lru_cache
# λ‘œκΉ… μ„€μ •
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('yue_generation.log'),
logging.StreamHandler()
]
)
# 가사 뢄석 ν•¨μˆ˜
def analyze_lyrics(lyrics):
# 쀄 λ‹¨μœ„λ‘œ 뢄리
lines = [line.strip() for line in lyrics.split('\n') if line.strip()]
# μ„Ήμ…˜ 카운트
sections = {
'verse': 0,
'chorus': 0,
'bridge': 0,
'total_lines': len(lines)
}
current_section = None
section_lines = {
'verse': 0,
'chorus': 0,
'bridge': 0
}
for line in lines:
lower_line = line.lower()
if '[verse]' in lower_line:
current_section = 'verse'
sections['verse'] += 1
elif '[chorus]' in lower_line:
current_section = 'chorus'
sections['chorus'] += 1
elif '[bridge]' in lower_line:
current_section = 'bridge'
sections['bridge'] += 1
elif current_section and line.strip():
section_lines[current_section] += 1
# 총 μ„Ήμ…˜ 수 계산
total_sections = sections['verse'] + sections['chorus'] + sections['bridge']
return sections, total_sections, len(lines), section_lines
def calculate_generation_params(lyrics):
sections, total_sections, total_lines, section_lines = analyze_lyrics(lyrics)
# κΈ°λ³Έ μ‹œκ°„ 계산 (초 λ‹¨μœ„)
time_per_line = {
'verse': 4, # verseλŠ” ν•œ 쀄당 4초
'chorus': 6, # chorusλŠ” ν•œ 쀄당 6초
'bridge': 5 # bridgeλŠ” ν•œ 쀄당 5초
}
# 각 μ„Ήμ…˜λ³„ μ˜ˆμƒ μ‹œκ°„ 계산
section_durations = {
'verse': section_lines['verse'] * time_per_line['verse'],
'chorus': section_lines['chorus'] * time_per_line['chorus'],
'bridge': section_lines['bridge'] * time_per_line['bridge']
}
total_duration = sum(section_durations.values())
# μ΅œμ†Œ 지속 μ‹œκ°„ 보μž₯ (90초)
total_duration = max(90, total_duration)
# 토큰 계산 (1μ΄ˆλ‹Ή μ•½ 100ν† ν°μœΌλ‘œ 증가)
tokens_per_second = 100
base_tokens = int(total_duration * tokens_per_second)
# μ½”λŸ¬μŠ€κ°€ μžˆλŠ” 경우 μΆ”κ°€ 토큰 ν• λ‹Ή
if sections['chorus'] > 0:
chorus_tokens = int(section_durations['chorus'] * tokens_per_second * 1.5)
total_tokens = base_tokens + chorus_tokens
else:
total_tokens = base_tokens
# μ„Ήμ…˜ 기반 μ„Έκ·Έλ¨ΌνŠΈ 수 계산
if sections['chorus'] > 0:
num_segments = max(3, sections['verse'] + sections['chorus'])
else:
num_segments = max(2, total_sections)
# 토큰 수 μ œν•œ (μ΅œμ†Œ 8000토큰 보μž₯)
max_tokens = min(32000, max(8000, total_tokens))
return {
'max_tokens': max_tokens,
'num_segments': min(4, num_segments), # μ΅œλŒ€ 4개 μ„Έκ·Έλ¨ΌνŠΈλ‘œ μ œν•œ
'sections': sections,
'section_lines': section_lines,
'estimated_duration': total_duration,
'section_durations': section_durations,
'has_chorus': sections['chorus'] > 0
}
def get_audio_duration(file_path):
try:
import librosa
duration = librosa.get_duration(path=file_path)
return duration
except Exception as e:
logging.error(f"Failed to get audio duration: {e}")
return None
# μ–Έμ–΄ 감지 및 λͺ¨λΈ 선택 ν•¨μˆ˜
def detect_and_select_model(text):
if re.search(r'[\u3131-\u318E\uAC00-\uD7A3]', text): # ν•œκΈ€
return "m-a-p/YuE-s1-7B-anneal-jp-kr-cot"
elif re.search(r'[\u4e00-\u9fff]', text): # 쀑ꡭ어
return "m-a-p/YuE-s1-7B-anneal-zh-cot"
elif re.search(r'[\u3040-\u309F\u30A0-\u30FF]', text): # 일본어
return "m-a-p/YuE-s1-7B-anneal-jp-kr-cot"
else: # μ˜μ–΄/기타
return "m-a-p/YuE-s1-7B-anneal-en-cot"
def optimize_model_selection(lyrics, genre):
model_path = detect_and_select_model(lyrics)
params = calculate_generation_params(lyrics)
# μ½”λŸ¬μŠ€ 쑴재 여뢀에 λ”°λ₯Έ μ„€μ • μ‘°μ •
has_chorus = params['sections']['chorus'] > 0
model_config = {
"m-a-p/YuE-s1-7B-anneal-en-cot": {
"max_tokens": params['max_tokens'],
"temperature": 0.8,
"batch_size": 8,
"num_segments": params['num_segments'],
"tokens_per_segment": params['tokens_per_segment'],
"estimated_duration": params['estimated_duration']
},
"m-a-p/YuE-s1-7B-anneal-jp-kr-cot": {
"max_tokens": params['max_tokens'],
"temperature": 0.7,
"batch_size": 8,
"num_segments": params['num_segments'],
"tokens_per_segment": params['tokens_per_segment'],
"estimated_duration": params['estimated_duration']
},
"m-a-p/YuE-s1-7B-anneal-zh-cot": {
"max_tokens": params['max_tokens'],
"temperature": 0.7,
"batch_size": 8,
"num_segments": params['num_segments'],
"tokens_per_segment": params['tokens_per_segment'],
"estimated_duration": params['estimated_duration']
}
}
# μ½”λŸ¬μŠ€κ°€ μžˆλŠ” 경우 토큰 수 증가
if has_chorus:
for config in model_config.values():
config['max_tokens'] = int(config['max_tokens'] * 1.5) # 50% 더 λ§Žμ€ 토큰 ν• λ‹Ή
return model_path, model_config[model_path], params
# GPU μ„€μ • μ΅œμ ν™”
def optimize_gpu_settings():
if torch.cuda.is_available():
torch.backends.cuda.matmul.allow_tf32 = True
torch.backends.cudnn.benchmark = True
torch.backends.cudnn.deterministic = False
torch.backends.cudnn.enabled = True
torch.cuda.empty_cache()
torch.cuda.set_device(0)
logging.info(f"Using GPU: {torch.cuda.get_device_name(0)}")
logging.info(f"Available GPU memory: {torch.cuda.get_device_properties(0).total_memory / 1024**3:.2f} GB")
else:
logging.warning("GPU not available!")
def install_flash_attn():
try:
if not torch.cuda.is_available():
logging.warning("GPU not available, skipping flash-attn installation")
return False
cuda_version = torch.version.cuda
if cuda_version is None:
logging.warning("CUDA not available, skipping flash-attn installation")
return False
logging.info(f"Detected CUDA version: {cuda_version}")
try:
import flash_attn
logging.info("flash-attn already installed")
return True
except ImportError:
logging.info("Installing flash-attn...")
try:
subprocess.run(
["pip", "install", "flash-attn", "--no-build-isolation"],
check=True,
capture_output=True
)
logging.info("flash-attn installed successfully!")
return True
except subprocess.CalledProcessError:
logging.warning("Failed to install flash-attn via pip, skipping...")
return False
except Exception as e:
logging.warning(f"Failed to install flash-attn: {e}")
return False
def initialize_system():
optimize_gpu_settings()
has_flash_attn = install_flash_attn()
from huggingface_hub import snapshot_download
folder_path = './inference/xcodec_mini_infer'
os.makedirs(folder_path, exist_ok=True)
logging.info(f"Created folder at: {folder_path}")
snapshot_download(
repo_id="m-a-p/xcodec_mini_infer",
local_dir="./inference/xcodec_mini_infer",
resume_download=True
)
try:
os.chdir("./inference")
logging.info(f"Working directory changed to: {os.getcwd()}")
except FileNotFoundError as e:
logging.error(f"Directory error: {e}")
raise
@lru_cache(maxsize=50)
def get_cached_file_path(content_hash, prefix):
return create_temp_file(content_hash, prefix)
def empty_output_folder(output_dir):
try:
shutil.rmtree(output_dir)
os.makedirs(output_dir)
logging.info(f"Output folder cleaned: {output_dir}")
except Exception as e:
logging.error(f"Error cleaning output folder: {e}")
raise
def create_temp_file(content, prefix, suffix=".txt"):
temp_file = tempfile.NamedTemporaryFile(delete=False, mode="w", prefix=prefix, suffix=suffix)
content = content.strip() + "\n\n"
content = content.replace("\r\n", "\n").replace("\r", "\n")
temp_file.write(content)
temp_file.close()
logging.debug(f"Temporary file created: {temp_file.name}")
return temp_file.name
def get_last_mp3_file(output_dir):
mp3_files = [f for f in os.listdir(output_dir) if f.endswith('.mp3')]
if not mp3_files:
logging.warning("No MP3 files found")
return None
mp3_files_with_path = [os.path.join(output_dir, f) for f in mp3_files]
mp3_files_with_path.sort(key=os.path.getmtime, reverse=True)
return mp3_files_with_path[0]
def infer(genre_txt_content, lyrics_txt_content, num_segments, max_new_tokens):
try:
# λͺ¨λΈ 선택 및 μ„€μ •
model_path, config, params = optimize_model_selection(lyrics_txt_content, genre_txt_content)
logging.info(f"Selected model: {model_path}")
logging.info(f"Lyrics analysis: {params}")
# μ½”λŸ¬μŠ€ μ„Ήμ…˜ 확인 및 λ‘œκΉ…
has_chorus = params['has_chorus']
estimated_duration = params.get('estimated_duration', 90)
# 토큰 μˆ˜μ™€ μ„Έκ·Έλ¨ΌνŠΈ 수 μ‘°μ •
if has_chorus:
actual_max_tokens = int(params['max_tokens'] * 1.5) # 50% 더 λ§Žμ€ 토큰
actual_num_segments = max(3, params['num_segments']) # μ΅œμ†Œ 3개 μ„Έκ·Έλ¨ΌνŠΈ
tokens_per_segment = actual_max_tokens // actual_num_segments
else:
actual_max_tokens = params['max_tokens']
actual_num_segments = params['num_segments']
tokens_per_segment = actual_max_tokens // actual_num_segments
logging.info(f"Estimated duration: {estimated_duration} seconds")
logging.info(f"Has chorus sections: {has_chorus}")
logging.info(f"Using segments: {actual_num_segments}, tokens: {actual_max_tokens}")
logging.info(f"Tokens per segment: {tokens_per_segment}")
# μž„μ‹œ 파일 생성
genre_txt_path = create_temp_file(genre_txt_content, prefix="genre_")
lyrics_txt_path = create_temp_file(lyrics_txt_content, prefix="lyrics_")
output_dir = "./output"
os.makedirs(output_dir, exist_ok=True)
empty_output_folder(output_dir)
# κΈ°λ³Έ λͺ…λ Ήμ–΄ ꡬ성
command = [
"python", "infer.py",
"--stage1_model", model_path,
"--stage2_model", "m-a-p/YuE-s2-1B-general",
"--genre_txt", genre_txt_path,
"--lyrics_txt", lyrics_txt_path,
"--run_n_segments", str(actual_num_segments),
"--stage2_batch_size", str(config['batch_size']),
"--output_dir", output_dir,
"--cuda_idx", "0",
"--max_new_tokens", str(actual_max_tokens)
]
# GPU μ„€μ •
if torch.cuda.is_available():
command.extend([
"--disable_offload_model",
"--use_bf16" # 더 λΉ λ₯Έ 처리λ₯Ό μœ„ν•œ BF16 μ‚¬μš©
])
# CUDA ν™˜κ²½ λ³€μˆ˜ μ„€μ •
env = os.environ.copy()
if torch.cuda.is_available():
env.update({
"CUDA_VISIBLE_DEVICES": "0",
"CUDA_HOME": "/usr/local/cuda",
"PATH": f"/usr/local/cuda/bin:{env.get('PATH', '')}",
"LD_LIBRARY_PATH": f"/usr/local/cuda/lib64:{env.get('LD_LIBRARY_PATH', '')}",
"PYTORCH_CUDA_ALLOC_CONF": f"max_split_size_mb:512"
})
# transformers μΊμ‹œ λ§ˆμ΄κ·Έλ ˆμ΄μ…˜ 처리
try:
from transformers.utils import move_cache
move_cache()
except Exception as e:
logging.warning(f"Cache migration warning (non-critical): {e}")
# λͺ…λ Ή μ‹€ν–‰
process = subprocess.run(
command,
env=env,
check=False,
capture_output=True,
text=True
)
# μ‹€ν–‰ κ²°κ³Ό λ‘œκΉ…
logging.info(f"Command output: {process.stdout}")
if process.stderr:
logging.error(f"Command error: {process.stderr}")
if process.returncode != 0:
logging.error(f"Command failed with return code: {process.returncode}")
logging.error(f"Command: {' '.join(command)}")
raise RuntimeError(f"Inference failed: {process.stderr}")
# 결과 처리
last_mp3 = get_last_mp3_file(output_dir)
if last_mp3:
try:
duration = get_audio_duration(last_mp3)
logging.info(f"Generated audio file: {last_mp3}")
if duration:
logging.info(f"Audio duration: {duration:.2f} seconds")
logging.info(f"Expected duration: {estimated_duration} seconds")
# μƒμ„±λœ μŒμ•…μ΄ λ„ˆλ¬΄ 짧은 경우 κ²½κ³ 
if duration < estimated_duration * 0.8:
logging.warning(f"Generated audio is shorter than expected: {duration:.2f}s < {estimated_duration:.2f}s")
except Exception as e:
logging.warning(f"Failed to get audio duration: {e}")
return last_mp3
else:
logging.warning("No output audio file generated")
return None
except Exception as e:
logging.error(f"Inference error: {e}")
raise
finally:
# μž„μ‹œ 파일 정리
for file in [genre_txt_path, lyrics_txt_path]:
try:
os.remove(file)
logging.debug(f"Removed temporary file: {file}")
except Exception as e:
logging.warning(f"Failed to remove temporary file {file}: {e}")
def main():
# Gradio μΈν„°νŽ˜μ΄μŠ€
with gr.Blocks() as demo:
with gr.Column():
gr.Markdown("# Open SUNO: Full-Song Generation (Multi-Language Support)")
with gr.Row():
with gr.Column():
genre_txt = gr.Textbox(
label="Genre",
placeholder="Enter music genre and style descriptions..."
)
lyrics_txt = gr.Textbox(
label="Lyrics (Supports English, Korean, Japanese, Chinese)",
placeholder="Enter song lyrics with [verse], [chorus], [bridge] tags...",
lines=10
)
with gr.Column():
num_segments = gr.Number(
label="Number of Song Segments (Auto-adjusted based on lyrics)",
value=2,
minimum=1,
maximum=4,
step=1,
interactive=False
)
max_new_tokens = gr.Slider(
label="Max New Tokens (Auto-adjusted based on lyrics)",
minimum=500,
maximum=32000,
step=500,
value=4000,
interactive=False
)
with gr.Row():
duration_info = gr.Label(label="Estimated Duration")
sections_info = gr.Label(label="Section Information")
submit_btn = gr.Button("Generate Music", variant="primary")
music_out = gr.Audio(label="Generated Audio")
# λ‹€κ΅­μ–΄ 예제
gr.Examples(
examples=[
# μ˜μ–΄ 예제
[
"female blues airy vocal bright vocal piano sad romantic guitar jazz",
"""[verse]
In the quiet of the evening, shadows start to fall
Whispers of the night wind echo through the hall
Lost within the silence, I hear your gentle voice
Guiding me back homeward, making my heart rejoice
[chorus]
Don't let this moment fade, hold me close tonight
With you here beside me, everything's alright
Can't imagine life alone, don't want to let you go
Stay with me forever, let our love just flow
"""
],
# ν•œκ΅­μ–΄ 예제
[
"K-pop bright energetic synth dance electronic",
"""[verse]
λΉ›λ‚˜λŠ” λ³„λ“€μ²˜λŸΌ 우리의 꿈이
μ € ν•˜λŠ˜μ„ μˆ˜λ†“μ•„ λ°˜μ§μ΄λ„€
ν•¨κ»˜λΌλ©΄ μ–΄λ””λ“  갈 수 μžˆμ–΄
[chorus]
λ‹¬λ €κ°€μž 더 높이 더 멀리
"""
]
],
inputs=[genre_txt, lyrics_txt]
)
# μ‹œμŠ€ν…œ μ΄ˆκΈ°ν™”
initialize_system()
def update_info(lyrics):
if not lyrics:
return "No lyrics entered", "No sections detected"
params = calculate_generation_params(lyrics)
duration = params['estimated_duration']
sections = params['sections']
return (
f"Estimated duration: {duration:.1f} seconds",
f"Verses: {sections['verse']}, Chorus: {sections['chorus']} (Expected full length including chorus)"
)
# 이벀트 ν•Έλ“€λŸ¬
lyrics_txt.change(
fn=update_info,
inputs=[lyrics_txt],
outputs=[duration_info, sections_info]
)
submit_btn.click(
fn=infer,
inputs=[genre_txt, lyrics_txt, num_segments, max_new_tokens],
outputs=[music_out]
)
return demo
if __name__ == "__main__":
demo = main()
demo.queue(max_size=20).launch(
server_name="0.0.0.0",
server_port=7860,
share=True,
show_api=True,
show_error=True,
max_threads=2
)