import gradio as gr import subprocess import os import shutil import tempfile import torch import logging import numpy as np import re from concurrent.futures import ThreadPoolExecutor from functools import lru_cache # 로깅 설정 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('yue_generation.log'), logging.StreamHandler() ] ) def analyze_lyrics(lyrics, repeat_chorus=2): lines = [line.strip() for line in lyrics.split('\n') if line.strip()] sections = { 'verse': 0, 'chorus': 0, 'bridge': 0, 'total_lines': len(lines) } current_section = None section_lines = { 'verse': [], 'chorus': [], 'bridge': [] } for line in lines: lower_line = line.lower() if '[verse]' in lower_line: current_section = 'verse' sections['verse'] += 1 continue elif '[chorus]' in lower_line: current_section = 'chorus' sections['chorus'] += 1 continue elif '[bridge]' in lower_line: current_section = 'bridge' sections['bridge'] += 1 continue # 현재 섹션에 라인 추가 if current_section: section_lines[current_section].append(line) # 만약 코러스가 1회만 있고, repeat_chorus > 1이면 반복해서 붙이기 # chorus 섹션 전체 블록을 복제 if sections['chorus'] == 1 and repeat_chorus > 1: chorus_block = section_lines['chorus'][:] for _ in range(repeat_chorus - 1): section_lines['chorus'].extend(chorus_block) # 라인 수 재계산 new_total_lines = sum(len(section_lines[sec]) for sec in section_lines) return sections, (sections['verse'] + sections['chorus'] + sections['bridge']), new_total_lines, { 'verse': len(section_lines['verse']), 'chorus': len(section_lines['chorus']), 'bridge': len(section_lines['bridge']) } def calculate_generation_params(lyrics): sections, total_sections, total_lines, section_lines = analyze_lyrics(lyrics) # 기본 시간 계산 (초 단위) time_per_line = { 'verse': 4, # verse는 한 줄당 4초 'chorus': 6, # chorus는 한 줄당 6초 'bridge': 5 # bridge는 한 줄당 5초 } # 각 섹션별 예상 시간 계산 section_durations = { 'verse': section_lines['verse'] * time_per_line['verse'], 'chorus': section_lines['chorus'] * time_per_line['chorus'], 'bridge': section_lines['bridge'] * time_per_line['bridge'] } total_duration = sum(section_durations.values()) total_duration = max(60, total_duration) # 최소 60초 # 토큰 계산 (더 보수적인 값 사용) base_tokens = 3000 # 기본 토큰 수 tokens_per_line = 200 # 줄당 토큰 수 total_tokens = base_tokens + (total_lines * tokens_per_line) # 섹션 기반 세그먼트 수 계산 if sections['chorus'] > 0: num_segments = 3 # 코러스가 있는 경우 3개 세그먼트 else: num_segments = 2 # 코러스가 없는 경우 2개 세그먼트 # 토큰 수 제한 max_tokens = min(8000, total_tokens) # 최대 8000 토큰으로 제한 return { 'max_tokens': max_tokens, 'num_segments': num_segments, 'sections': sections, 'section_lines': section_lines, 'estimated_duration': total_duration, 'section_durations': section_durations, 'has_chorus': sections['chorus'] > 0 } def get_audio_duration(file_path): try: import librosa duration = librosa.get_duration(path=file_path) return duration except Exception as e: logging.error(f"Failed to get audio duration: {e}") return None # 언어 감지 및 모델 선택 함수 def detect_and_select_model(text): if re.search(r'[\u3131-\u318E\uAC00-\uD7A3]', text): # 한글 return "m-a-p/YuE-s1-7B-anneal-jp-kr-cot" elif re.search(r'[\u4e00-\u9fff]', text): # 중국어 return "m-a-p/YuE-s1-7B-anneal-zh-cot" elif re.search(r'[\u3040-\u309F\u30A0-\u30FF]', text): # 일본어 return "m-a-p/YuE-s1-7B-anneal-jp-kr-cot" else: # 영어/기타 return "m-a-p/YuE-s1-7B-anneal-en-cot" # GPU 설정 최적화 def optimize_gpu_settings(): if torch.cuda.is_available(): torch.backends.cuda.matmul.allow_tf32 = True torch.backends.cudnn.benchmark = True torch.backends.cudnn.deterministic = False torch.backends.cudnn.enabled = True torch.cuda.empty_cache() torch.cuda.set_device(0) logging.info(f"Using GPU: {torch.cuda.get_device_name(0)}") logging.info(f"Available GPU memory: {torch.cuda.get_device_properties(0).total_memory / 1024**3:.2f} GB") else: logging.warning("GPU not available!") def install_flash_attn(): try: if not torch.cuda.is_available(): logging.warning("GPU not available, skipping flash-attn installation") return False cuda_version = torch.version.cuda if cuda_version is None: logging.warning("CUDA not available, skipping flash-attn installation") return False logging.info(f"Detected CUDA version: {cuda_version}") try: import flash_attn logging.info("flash-attn already installed") return True except ImportError: logging.info("Installing flash-attn...") try: subprocess.run( ["pip", "install", "flash-attn", "--no-build-isolation"], check=True, capture_output=True ) logging.info("flash-attn installed successfully!") return True except subprocess.CalledProcessError: logging.warning("Failed to install flash-attn via pip, skipping...") return False except Exception as e: logging.warning(f"Failed to install flash-attn: {e}") return False def initialize_system(): optimize_gpu_settings() has_flash_attn = install_flash_attn() from huggingface_hub import snapshot_download folder_path = './inference/xcodec_mini_infer' os.makedirs(folder_path, exist_ok=True) logging.info(f"Created folder at: {folder_path}") snapshot_download( repo_id="m-a-p/xcodec_mini_infer", local_dir="./inference/xcodec_mini_infer", resume_download=True ) try: os.chdir("./inference") logging.info(f"Working directory changed to: {os.getcwd()}") except FileNotFoundError as e: logging.error(f"Directory error: {e}") raise @lru_cache(maxsize=50) def get_cached_file_path(content_hash, prefix): return create_temp_file(content_hash, prefix) def empty_output_folder(output_dir): try: shutil.rmtree(output_dir) os.makedirs(output_dir) logging.info(f"Output folder cleaned: {output_dir}") except Exception as e: logging.error(f"Error cleaning output folder: {e}") raise def create_temp_file(content, prefix, suffix=".txt"): temp_file = tempfile.NamedTemporaryFile(delete=False, mode="w", prefix=prefix, suffix=suffix) content = content.strip() + "\n\n" content = content.replace("\r\n", "\n").replace("\r", "\n") temp_file.write(content) temp_file.close() logging.debug(f"Temporary file created: {temp_file.name}") return temp_file.name def get_last_mp3_file(output_dir): mp3_files = [f for f in os.listdir(output_dir) if f.endswith('.mp3')] if not mp3_files: logging.warning("No MP3 files found") return None mp3_files_with_path = [os.path.join(output_dir, f) for f in mp3_files] mp3_files_with_path.sort(key=os.path.getmtime, reverse=True) return mp3_files_with_path[0] def optimize_model_selection(lyrics, genre): model_path = detect_and_select_model(lyrics) params = calculate_generation_params(lyrics) # 코러스 존재 여부에 따른 설정 조정 has_chorus = params['sections']['chorus'] > 0 # 토큰 수 계산 tokens_per_segment = params['max_tokens'] // params['num_segments'] model_config = { "m-a-p/YuE-s1-7B-anneal-en-cot": { "max_tokens": params['max_tokens'], "temperature": 0.8, "batch_size": 8, "num_segments": params['num_segments'], "estimated_duration": params['estimated_duration'] }, "m-a-p/YuE-s1-7B-anneal-jp-kr-cot": { "max_tokens": params['max_tokens'], "temperature": 0.7, "batch_size": 8, "num_segments": params['num_segments'], "estimated_duration": params['estimated_duration'] }, "m-a-p/YuE-s1-7B-anneal-zh-cot": { "max_tokens": params['max_tokens'], "temperature": 0.7, "batch_size": 8, "num_segments": params['num_segments'], "estimated_duration": params['estimated_duration'] } } # 코러스가 있는 경우 토큰 수 증가 if has_chorus: for config in model_config.values(): config['max_tokens'] = int(config['max_tokens'] * 1.5) # 50% 더 많은 토큰 할당 return model_path, model_config[model_path], params def infer(genre_txt_content, lyrics_txt_content, num_segments, max_new_tokens): genre_txt_path = None lyrics_txt_path = None try: # 모델 선택 및 설정 model_path, config, params = optimize_model_selection(lyrics_txt_content, genre_txt_content) logging.info(f"Selected model: {model_path}") logging.info(f"Lyrics analysis: {params}") # 코러스 섹션 확인 및 로깅 has_chorus = params['sections']['chorus'] > 0 estimated_duration = params.get('estimated_duration', 90) # 토큰 수와 세그먼트 수 조정 if has_chorus: actual_max_tokens = min(8000, int(config['max_tokens'] * 1.2)) # 20% 증가, 최대 8000 actual_num_segments = 3 else: actual_max_tokens = config['max_tokens'] actual_num_segments = 2 logging.info(f"Estimated duration: {estimated_duration} seconds") logging.info(f"Has chorus sections: {has_chorus}") logging.info(f"Using segments: {actual_num_segments}, tokens: {actual_max_tokens}") # 임시 파일 생성 genre_txt_path = create_temp_file(genre_txt_content, prefix="genre_") lyrics_txt_path = create_temp_file(lyrics_txt_content, prefix="lyrics_") output_dir = "./output" os.makedirs(output_dir, exist_ok=True) empty_output_folder(output_dir) # 기본 명령어 구성 command = [ "python", "infer.py", "--stage1_model", model_path, "--stage2_model", "m-a-p/YuE-s2-1B-general", "--genre_txt", genre_txt_path, "--lyrics_txt", lyrics_txt_path, "--run_n_segments", str(actual_num_segments), "--stage2_batch_size", "4", # 배치 사이즈 감소 "--output_dir", output_dir, "--cuda_idx", "0", "--max_new_tokens", str(actual_max_tokens) ] # GPU 설정 if torch.cuda.is_available(): command.append("--disable_offload_model") # GPU 설정 # CUDA 환경 변수 설정 env = os.environ.copy() if torch.cuda.is_available(): env.update({ "CUDA_VISIBLE_DEVICES": "0", "CUDA_HOME": "/usr/local/cuda", "PATH": f"/usr/local/cuda/bin:{env.get('PATH', '')}", "LD_LIBRARY_PATH": f"/usr/local/cuda/lib64:{env.get('LD_LIBRARY_PATH', '')}", "PYTORCH_CUDA_ALLOC_CONF": f"max_split_size_mb:512" }) # transformers 캐시 마이그레이션 처리 try: from transformers.utils import move_cache move_cache() except Exception as e: logging.warning(f"Cache migration warning (non-critical): {e}") # 명령 실행 process = subprocess.run( command, env=env, check=False, capture_output=True, text=True ) # 실행 결과 로깅 logging.info(f"Command output: {process.stdout}") if process.stderr: logging.error(f"Command error: {process.stderr}") if process.returncode != 0: logging.error(f"Command failed with return code: {process.returncode}") logging.error(f"Command: {' '.join(command)}") raise RuntimeError(f"Inference failed: {process.stderr}") # 결과 처리 last_mp3 = get_last_mp3_file(output_dir) if last_mp3: try: duration = get_audio_duration(last_mp3) logging.info(f"Generated audio file: {last_mp3}") if duration: logging.info(f"Audio duration: {duration:.2f} seconds") logging.info(f"Expected duration: {estimated_duration} seconds") # 생성된 음악이 너무 짧은 경우 경고 if duration < estimated_duration * 0.8: logging.warning(f"Generated audio is shorter than expected: {duration:.2f}s < {estimated_duration:.2f}s") except Exception as e: logging.warning(f"Failed to get audio duration: {e}") return last_mp3 else: logging.warning("No output audio file generated") return None except Exception as e: logging.error(f"Inference error: {e}") raise finally: # 임시 파일 정리 if genre_txt_path and os.path.exists(genre_txt_path): try: os.remove(genre_txt_path) logging.debug(f"Removed temporary file: {genre_txt_path}") except Exception as e: logging.warning(f"Failed to remove temporary file {genre_txt_path}: {e}") if lyrics_txt_path and os.path.exists(lyrics_txt_path): try: os.remove(lyrics_txt_path) logging.debug(f"Removed temporary file: {lyrics_txt_path}") except Exception as e: logging.warning(f"Failed to remove temporary file {lyrics_txt_path}: {e}") def main(): # Gradio 인터페이스 with gr.Blocks() as demo: with gr.Column(): gr.Markdown("# Open SUNO: Full-Song Generation (Multi-Language Support)") with gr.Row(): with gr.Column(): genre_txt = gr.Textbox( label="Genre", placeholder="Enter music genre and style descriptions..." ) lyrics_txt = gr.Textbox( label="Lyrics (Supports English, Korean, Japanese, Chinese)", placeholder="Enter song lyrics with [verse], [chorus], [bridge] tags...", lines=10 ) with gr.Column(): num_segments = gr.Number( label="Number of Song Segments (Auto-adjusted based on lyrics)", value=2, minimum=1, maximum=4, step=1, interactive=False ) max_new_tokens = gr.Slider( label="Max New Tokens (Auto-adjusted based on lyrics)", minimum=500, maximum=32000, step=500, value=4000, interactive=False ) with gr.Row(): duration_info = gr.Label(label="Estimated Duration") sections_info = gr.Label(label="Section Information") submit_btn = gr.Button("Generate Music", variant="primary") music_out = gr.Audio(label="Generated Audio") # 다국어 예제 gr.Examples( examples=[ # 영어 예제 [ "female blues airy vocal bright vocal piano sad romantic guitar jazz", """[verse] In the quiet of the evening, shadows start to fall Whispers of the night wind echo through the hall Lost within the silence, I hear your gentle voice Guiding me back homeward, making my heart rejoice [chorus] Don't let this moment fade, hold me close tonight With you here beside me, everything's alright Can't imagine life alone, don't want to let you go Stay with me forever, let our love just flow [verse] In the quiet of the evening, shadows start to fall Whispers of the night wind echo through the hall Lost within the silence, I hear your gentle voice Guiding me back homeward, making my heart rejoice [chorus] Don't let this moment fade, hold me close tonight With you here beside me, everything's alright Can't imagine life alone, don't want to let you go Stay with me forever, let our love just flow """ ], # 한국어 예제 [ "K-pop bright energetic synth dance electronic", """[verse] 언젠가 마주한 눈빛 속에서 우린 서로를 알아보았지 [chorus] 다시 한 번 내게 말해줘 너의 진심을 숨기지 말아 줘 [verse] 어두운 밤을 지날 때마다 너의 목소리를 떠올려 [chorus] 다시 한 번 내게 말해줘 너의 진심을 숨기지 말아 줘 """ ] ], inputs=[genre_txt, lyrics_txt] ) # 시스템 초기화 initialize_system() def update_info(lyrics): if not lyrics: return "No lyrics entered", "No sections detected" params = calculate_generation_params(lyrics) duration = params['estimated_duration'] sections = params['sections'] return ( f"Estimated duration: {duration:.1f} seconds", f"Verses: {sections['verse']}, Chorus: {sections['chorus']} (Expected full length including chorus)" ) # 이벤트 핸들러 lyrics_txt.change( fn=update_info, inputs=[lyrics_txt], outputs=[duration_info, sections_info] ) submit_btn.click( fn=infer, inputs=[genre_txt, lyrics_txt, num_segments, max_new_tokens], outputs=[music_out] ) return demo if __name__ == "__main__": demo = main() demo.queue(max_size=20).launch( server_name="0.0.0.0", server_port=7860, share=True, show_api=True, show_error=True, max_threads=2 )