Spaces:
Paused
Paused
| import gradio as gr | |
| import subprocess | |
| import os | |
| import shutil | |
| import tempfile | |
| import torch | |
| import logging | |
| import numpy as np | |
| import re | |
| from concurrent.futures import ThreadPoolExecutor | |
| from functools import lru_cache | |
| # ๋ก๊น ์ค์ | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(levelname)s - %(message)s', | |
| handlers=[ | |
| logging.FileHandler('yue_generation.log'), | |
| logging.StreamHandler() | |
| ] | |
| ) | |
| def analyze_lyrics(lyrics, repeat_chorus=2): | |
| lines = [line.strip() for line in lyrics.split('\n') if line.strip()] | |
| sections = { | |
| 'verse': 0, | |
| 'chorus': 0, | |
| 'bridge': 0, | |
| 'total_lines': len(lines) | |
| } | |
| current_section = None | |
| section_lines = { | |
| 'verse': [], | |
| 'chorus': [], | |
| 'bridge': [] | |
| } | |
| for line in lines: | |
| lower_line = line.lower() | |
| if '[verse]' in lower_line: | |
| current_section = 'verse' | |
| sections['verse'] += 1 | |
| continue | |
| elif '[chorus]' in lower_line: | |
| current_section = 'chorus' | |
| sections['chorus'] += 1 | |
| continue | |
| elif '[bridge]' in lower_line: | |
| current_section = 'bridge' | |
| sections['bridge'] += 1 | |
| continue | |
| # ํ์ฌ ์น์ ์ ๋ผ์ธ ์ถ๊ฐ | |
| if current_section: | |
| section_lines[current_section].append(line) | |
| # ๋ง์ฝ ์ฝ๋ฌ์ค๊ฐ 1ํ๋ง ์๊ณ , repeat_chorus > 1์ด๋ฉด ๋ฐ๋ณตํด์ ๋ถ์ด๊ธฐ | |
| # chorus ์น์ ์ ์ฒด ๋ธ๋ก์ ๋ณต์ | |
| if sections['chorus'] == 1 and repeat_chorus > 1: | |
| chorus_block = section_lines['chorus'][:] | |
| for _ in range(repeat_chorus - 1): | |
| section_lines['chorus'].extend(chorus_block) | |
| # ๋ผ์ธ ์ ์ฌ๊ณ์ฐ | |
| new_total_lines = sum(len(section_lines[sec]) for sec in section_lines) | |
| return sections, (sections['verse'] + sections['chorus'] + sections['bridge']), new_total_lines, { | |
| 'verse': len(section_lines['verse']), | |
| 'chorus': len(section_lines['chorus']), | |
| 'bridge': len(section_lines['bridge']) | |
| } | |
| def calculate_generation_params(lyrics): | |
| sections, total_sections, total_lines, section_lines = analyze_lyrics(lyrics) | |
| # ๊ธฐ๋ณธ ์๊ฐ ๊ณ์ฐ (์ด ๋จ์) | |
| time_per_line = { | |
| 'verse': 4, # verse๋ ํ ์ค๋น 4์ด | |
| 'chorus': 6, # chorus๋ ํ ์ค๋น 6์ด | |
| 'bridge': 5 # bridge๋ ํ ์ค๋น 5์ด | |
| } | |
| # ๊ฐ ์น์ ๋ณ ์์ ์๊ฐ ๊ณ์ฐ | |
| section_durations = { | |
| 'verse': section_lines['verse'] * time_per_line['verse'], | |
| 'chorus': section_lines['chorus'] * time_per_line['chorus'], | |
| 'bridge': section_lines['bridge'] * time_per_line['bridge'] | |
| } | |
| total_duration = sum(section_durations.values()) | |
| total_duration = max(60, total_duration) # ์ต์ 60์ด | |
| # ํ ํฐ ๊ณ์ฐ (๋ ๋ณด์์ ์ธ ๊ฐ ์ฌ์ฉ) | |
| base_tokens = 3000 # ๊ธฐ๋ณธ ํ ํฐ ์ | |
| tokens_per_line = 200 # ์ค๋น ํ ํฐ ์ | |
| total_tokens = base_tokens + (total_lines * tokens_per_line) | |
| # ์น์ ๊ธฐ๋ฐ ์ธ๊ทธ๋จผํธ ์ ๊ณ์ฐ | |
| if sections['chorus'] > 0: | |
| num_segments = 3 # ์ฝ๋ฌ์ค๊ฐ ์๋ ๊ฒฝ์ฐ 3๊ฐ ์ธ๊ทธ๋จผํธ | |
| else: | |
| num_segments = 2 # ์ฝ๋ฌ์ค๊ฐ ์๋ ๊ฒฝ์ฐ 2๊ฐ ์ธ๊ทธ๋จผํธ | |
| # ํ ํฐ ์ ์ ํ | |
| max_tokens = min(8000, total_tokens) # ์ต๋ 8000 ํ ํฐ์ผ๋ก ์ ํ | |
| return { | |
| 'max_tokens': max_tokens, | |
| 'num_segments': num_segments, | |
| 'sections': sections, | |
| 'section_lines': section_lines, | |
| 'estimated_duration': total_duration, | |
| 'section_durations': section_durations, | |
| 'has_chorus': sections['chorus'] > 0 | |
| } | |
| def get_audio_duration(file_path): | |
| try: | |
| import librosa | |
| duration = librosa.get_duration(path=file_path) | |
| return duration | |
| except Exception as e: | |
| logging.error(f"Failed to get audio duration: {e}") | |
| return None | |
| # ์ธ์ด ๊ฐ์ง ๋ฐ ๋ชจ๋ธ ์ ํ ํจ์ | |
| def detect_and_select_model(text): | |
| if re.search(r'[\u3131-\u318E\uAC00-\uD7A3]', text): # ํ๊ธ | |
| return "m-a-p/YuE-s1-7B-anneal-jp-kr-cot" | |
| elif re.search(r'[\u4e00-\u9fff]', text): # ์ค๊ตญ์ด | |
| return "m-a-p/YuE-s1-7B-anneal-zh-cot" | |
| elif re.search(r'[\u3040-\u309F\u30A0-\u30FF]', text): # ์ผ๋ณธ์ด | |
| return "m-a-p/YuE-s1-7B-anneal-jp-kr-cot" | |
| else: # ์์ด/๊ธฐํ | |
| return "m-a-p/YuE-s1-7B-anneal-en-cot" | |
| # GPU ์ค์ ์ต์ ํ | |
| def optimize_gpu_settings(): | |
| if torch.cuda.is_available(): | |
| torch.backends.cuda.matmul.allow_tf32 = True | |
| torch.backends.cudnn.benchmark = True | |
| torch.backends.cudnn.deterministic = False | |
| torch.backends.cudnn.enabled = True | |
| torch.cuda.empty_cache() | |
| torch.cuda.set_device(0) | |
| logging.info(f"Using GPU: {torch.cuda.get_device_name(0)}") | |
| logging.info(f"Available GPU memory: {torch.cuda.get_device_properties(0).total_memory / 1024**3:.2f} GB") | |
| else: | |
| logging.warning("GPU not available!") | |
| def install_flash_attn(): | |
| try: | |
| if not torch.cuda.is_available(): | |
| logging.warning("GPU not available, skipping flash-attn installation") | |
| return False | |
| cuda_version = torch.version.cuda | |
| if cuda_version is None: | |
| logging.warning("CUDA not available, skipping flash-attn installation") | |
| return False | |
| logging.info(f"Detected CUDA version: {cuda_version}") | |
| try: | |
| import flash_attn | |
| logging.info("flash-attn already installed") | |
| return True | |
| except ImportError: | |
| logging.info("Installing flash-attn...") | |
| try: | |
| subprocess.run( | |
| ["pip", "install", "flash-attn", "--no-build-isolation"], | |
| check=True, | |
| capture_output=True | |
| ) | |
| logging.info("flash-attn installed successfully!") | |
| return True | |
| except subprocess.CalledProcessError: | |
| logging.warning("Failed to install flash-attn via pip, skipping...") | |
| return False | |
| except Exception as e: | |
| logging.warning(f"Failed to install flash-attn: {e}") | |
| return False | |
| def initialize_system(): | |
| optimize_gpu_settings() | |
| has_flash_attn = install_flash_attn() | |
| from huggingface_hub import snapshot_download | |
| folder_path = './inference/xcodec_mini_infer' | |
| os.makedirs(folder_path, exist_ok=True) | |
| logging.info(f"Created folder at: {folder_path}") | |
| snapshot_download( | |
| repo_id="m-a-p/xcodec_mini_infer", | |
| local_dir="./inference/xcodec_mini_infer", | |
| resume_download=True | |
| ) | |
| try: | |
| os.chdir("./inference") | |
| logging.info(f"Working directory changed to: {os.getcwd()}") | |
| except FileNotFoundError as e: | |
| logging.error(f"Directory error: {e}") | |
| raise | |
| def get_cached_file_path(content_hash, prefix): | |
| return create_temp_file(content_hash, prefix) | |
| def empty_output_folder(output_dir): | |
| try: | |
| shutil.rmtree(output_dir) | |
| os.makedirs(output_dir) | |
| logging.info(f"Output folder cleaned: {output_dir}") | |
| except Exception as e: | |
| logging.error(f"Error cleaning output folder: {e}") | |
| raise | |
| def create_temp_file(content, prefix, suffix=".txt"): | |
| temp_file = tempfile.NamedTemporaryFile(delete=False, mode="w", prefix=prefix, suffix=suffix) | |
| content = content.strip() + "\n\n" | |
| content = content.replace("\r\n", "\n").replace("\r", "\n") | |
| temp_file.write(content) | |
| temp_file.close() | |
| logging.debug(f"Temporary file created: {temp_file.name}") | |
| return temp_file.name | |
| def get_last_mp3_file(output_dir): | |
| mp3_files = [f for f in os.listdir(output_dir) if f.endswith('.mp3')] | |
| if not mp3_files: | |
| logging.warning("No MP3 files found") | |
| return None | |
| mp3_files_with_path = [os.path.join(output_dir, f) for f in mp3_files] | |
| mp3_files_with_path.sort(key=os.path.getmtime, reverse=True) | |
| return mp3_files_with_path[0] | |
| def optimize_model_selection(lyrics, genre): | |
| model_path = detect_and_select_model(lyrics) | |
| params = calculate_generation_params(lyrics) | |
| # ์ฝ๋ฌ์ค ์กด์ฌ ์ฌ๋ถ์ ๋ฐ๋ฅธ ์ค์ ์กฐ์ | |
| has_chorus = params['sections']['chorus'] > 0 | |
| # ํ ํฐ ์ ๊ณ์ฐ | |
| tokens_per_segment = params['max_tokens'] // params['num_segments'] | |
| model_config = { | |
| "m-a-p/YuE-s1-7B-anneal-en-cot": { | |
| "max_tokens": params['max_tokens'], | |
| "temperature": 0.8, | |
| "batch_size": 8, | |
| "num_segments": params['num_segments'], | |
| "estimated_duration": params['estimated_duration'] | |
| }, | |
| "m-a-p/YuE-s1-7B-anneal-jp-kr-cot": { | |
| "max_tokens": params['max_tokens'], | |
| "temperature": 0.7, | |
| "batch_size": 8, | |
| "num_segments": params['num_segments'], | |
| "estimated_duration": params['estimated_duration'] | |
| }, | |
| "m-a-p/YuE-s1-7B-anneal-zh-cot": { | |
| "max_tokens": params['max_tokens'], | |
| "temperature": 0.7, | |
| "batch_size": 8, | |
| "num_segments": params['num_segments'], | |
| "estimated_duration": params['estimated_duration'] | |
| } | |
| } | |
| # ์ฝ๋ฌ์ค๊ฐ ์๋ ๊ฒฝ์ฐ ํ ํฐ ์ ์ฆ๊ฐ | |
| if has_chorus: | |
| for config in model_config.values(): | |
| config['max_tokens'] = int(config['max_tokens'] * 1.5) # 50% ๋ ๋ง์ ํ ํฐ ํ ๋น | |
| return model_path, model_config[model_path], params | |
| def infer(genre_txt_content, lyrics_txt_content, num_segments, max_new_tokens): | |
| genre_txt_path = None | |
| lyrics_txt_path = None | |
| try: | |
| # ๋ชจ๋ธ ์ ํ ๋ฐ ์ค์ | |
| model_path, config, params = optimize_model_selection(lyrics_txt_content, genre_txt_content) | |
| logging.info(f"Selected model: {model_path}") | |
| logging.info(f"Lyrics analysis: {params}") | |
| # ์ฝ๋ฌ์ค ์น์ ํ์ธ ๋ฐ ๋ก๊น | |
| has_chorus = params['sections']['chorus'] > 0 | |
| estimated_duration = params.get('estimated_duration', 90) | |
| # ํ ํฐ ์์ ์ธ๊ทธ๋จผํธ ์ ์กฐ์ | |
| if has_chorus: | |
| actual_max_tokens = min(8000, int(config['max_tokens'] * 1.2)) # 20% ์ฆ๊ฐ, ์ต๋ 8000 | |
| actual_num_segments = 3 | |
| else: | |
| actual_max_tokens = config['max_tokens'] | |
| actual_num_segments = 2 | |
| logging.info(f"Estimated duration: {estimated_duration} seconds") | |
| logging.info(f"Has chorus sections: {has_chorus}") | |
| logging.info(f"Using segments: {actual_num_segments}, tokens: {actual_max_tokens}") | |
| # ์์ ํ์ผ ์์ฑ | |
| genre_txt_path = create_temp_file(genre_txt_content, prefix="genre_") | |
| lyrics_txt_path = create_temp_file(lyrics_txt_content, prefix="lyrics_") | |
| output_dir = "./output" | |
| os.makedirs(output_dir, exist_ok=True) | |
| empty_output_folder(output_dir) | |
| # ๊ธฐ๋ณธ ๋ช ๋ น์ด ๊ตฌ์ฑ | |
| command = [ | |
| "python", "infer.py", | |
| "--stage1_model", model_path, | |
| "--stage2_model", "m-a-p/YuE-s2-1B-general", | |
| "--genre_txt", genre_txt_path, | |
| "--lyrics_txt", lyrics_txt_path, | |
| "--run_n_segments", str(actual_num_segments), | |
| "--stage2_batch_size", "4", # ๋ฐฐ์น ์ฌ์ด์ฆ ๊ฐ์ | |
| "--output_dir", output_dir, | |
| "--cuda_idx", "0", | |
| "--max_new_tokens", str(actual_max_tokens) | |
| ] | |
| # GPU ์ค์ | |
| if torch.cuda.is_available(): | |
| command.append("--disable_offload_model") | |
| # GPU ์ค์ | |
| # CUDA ํ๊ฒฝ ๋ณ์ ์ค์ | |
| env = os.environ.copy() | |
| if torch.cuda.is_available(): | |
| env.update({ | |
| "CUDA_VISIBLE_DEVICES": "0", | |
| "CUDA_HOME": "/usr/local/cuda", | |
| "PATH": f"/usr/local/cuda/bin:{env.get('PATH', '')}", | |
| "LD_LIBRARY_PATH": f"/usr/local/cuda/lib64:{env.get('LD_LIBRARY_PATH', '')}", | |
| "PYTORCH_CUDA_ALLOC_CONF": f"max_split_size_mb:512" | |
| }) | |
| # transformers ์บ์ ๋ง์ด๊ทธ๋ ์ด์ ์ฒ๋ฆฌ | |
| try: | |
| from transformers.utils import move_cache | |
| move_cache() | |
| except Exception as e: | |
| logging.warning(f"Cache migration warning (non-critical): {e}") | |
| # ๋ช ๋ น ์คํ | |
| process = subprocess.run( | |
| command, | |
| env=env, | |
| check=False, | |
| capture_output=True, | |
| text=True | |
| ) | |
| # ์คํ ๊ฒฐ๊ณผ ๋ก๊น | |
| logging.info(f"Command output: {process.stdout}") | |
| if process.stderr: | |
| logging.error(f"Command error: {process.stderr}") | |
| if process.returncode != 0: | |
| logging.error(f"Command failed with return code: {process.returncode}") | |
| logging.error(f"Command: {' '.join(command)}") | |
| raise RuntimeError(f"Inference failed: {process.stderr}") | |
| # ๊ฒฐ๊ณผ ์ฒ๋ฆฌ | |
| last_mp3 = get_last_mp3_file(output_dir) | |
| if last_mp3: | |
| try: | |
| duration = get_audio_duration(last_mp3) | |
| logging.info(f"Generated audio file: {last_mp3}") | |
| if duration: | |
| logging.info(f"Audio duration: {duration:.2f} seconds") | |
| logging.info(f"Expected duration: {estimated_duration} seconds") | |
| # ์์ฑ๋ ์์ ์ด ๋๋ฌด ์งง์ ๊ฒฝ์ฐ ๊ฒฝ๊ณ | |
| if duration < estimated_duration * 0.8: | |
| logging.warning(f"Generated audio is shorter than expected: {duration:.2f}s < {estimated_duration:.2f}s") | |
| except Exception as e: | |
| logging.warning(f"Failed to get audio duration: {e}") | |
| return last_mp3 | |
| else: | |
| logging.warning("No output audio file generated") | |
| return None | |
| except Exception as e: | |
| logging.error(f"Inference error: {e}") | |
| raise | |
| finally: | |
| # ์์ ํ์ผ ์ ๋ฆฌ | |
| if genre_txt_path and os.path.exists(genre_txt_path): | |
| try: | |
| os.remove(genre_txt_path) | |
| logging.debug(f"Removed temporary file: {genre_txt_path}") | |
| except Exception as e: | |
| logging.warning(f"Failed to remove temporary file {genre_txt_path}: {e}") | |
| if lyrics_txt_path and os.path.exists(lyrics_txt_path): | |
| try: | |
| os.remove(lyrics_txt_path) | |
| logging.debug(f"Removed temporary file: {lyrics_txt_path}") | |
| except Exception as e: | |
| logging.warning(f"Failed to remove temporary file {lyrics_txt_path}: {e}") | |
| def main(): | |
| # Gradio ์ธํฐํ์ด์ค | |
| with gr.Blocks() as demo: | |
| with gr.Column(): | |
| gr.Markdown("# Open SUNO: Full-Song Generation (Multi-Language Support)") | |
| with gr.Row(): | |
| with gr.Column(): | |
| genre_txt = gr.Textbox( | |
| label="Genre", | |
| placeholder="Enter music genre and style descriptions..." | |
| ) | |
| lyrics_txt = gr.Textbox( | |
| label="Lyrics (Supports English, Korean, Japanese, Chinese)", | |
| placeholder="Enter song lyrics with [verse], [chorus], [bridge] tags...", | |
| lines=10 | |
| ) | |
| with gr.Column(): | |
| num_segments = gr.Number( | |
| label="Number of Song Segments (Auto-adjusted based on lyrics)", | |
| value=2, | |
| minimum=1, | |
| maximum=4, | |
| step=1, | |
| interactive=False | |
| ) | |
| max_new_tokens = gr.Slider( | |
| label="Max New Tokens (Auto-adjusted based on lyrics)", | |
| minimum=500, | |
| maximum=32000, | |
| step=500, | |
| value=4000, | |
| interactive=False | |
| ) | |
| with gr.Row(): | |
| duration_info = gr.Label(label="Estimated Duration") | |
| sections_info = gr.Label(label="Section Information") | |
| submit_btn = gr.Button("Generate Music", variant="primary") | |
| music_out = gr.Audio(label="Generated Audio") | |
| # ๋ค๊ตญ์ด ์์ | |
| gr.Examples( | |
| examples=[ | |
| # ์์ด ์์ | |
| [ | |
| "female blues airy vocal bright vocal piano sad romantic guitar jazz", | |
| """[verse] | |
| In the quiet of the evening, shadows start to fall | |
| Whispers of the night wind echo through the hall | |
| Lost within the silence, I hear your gentle voice | |
| Guiding me back homeward, making my heart rejoice | |
| [chorus] | |
| Don't let this moment fade, hold me close tonight | |
| With you here beside me, everything's alright | |
| Can't imagine life alone, don't want to let you go | |
| Stay with me forever, let our love just flow | |
| [verse] | |
| In the quiet of the evening, shadows start to fall | |
| Whispers of the night wind echo through the hall | |
| Lost within the silence, I hear your gentle voice | |
| Guiding me back homeward, making my heart rejoice | |
| [chorus] | |
| Don't let this moment fade, hold me close tonight | |
| With you here beside me, everything's alright | |
| Can't imagine life alone, don't want to let you go | |
| Stay with me forever, let our love just flow | |
| """ | |
| ], | |
| # ํ๊ตญ์ด ์์ | |
| [ | |
| "K-pop bright energetic synth dance electronic", | |
| """[verse] | |
| ์ธ์ ๊ฐ ๋ง์ฃผํ ๋๋น ์์์ | |
| ์ฐ๋ฆฐ ์๋ก๋ฅผ ์์๋ณด์์ง | |
| [chorus] | |
| ๋ค์ ํ ๋ฒ ๋ด๊ฒ ๋งํด์ค | |
| ๋์ ์ง์ฌ์ ์จ๊ธฐ์ง ๋ง์ ์ค | |
| [verse] | |
| ์ด๋์ด ๋ฐค์ ์ง๋ ๋๋ง๋ค | |
| ๋์ ๋ชฉ์๋ฆฌ๋ฅผ ๋ ์ฌ๋ ค | |
| [chorus] | |
| ๋ค์ ํ ๋ฒ ๋ด๊ฒ ๋งํด์ค | |
| ๋์ ์ง์ฌ์ ์จ๊ธฐ์ง ๋ง์ ์ค | |
| """ | |
| ] | |
| ], | |
| inputs=[genre_txt, lyrics_txt] | |
| ) | |
| # ์์คํ ์ด๊ธฐํ | |
| initialize_system() | |
| def update_info(lyrics): | |
| if not lyrics: | |
| return "No lyrics entered", "No sections detected" | |
| params = calculate_generation_params(lyrics) | |
| duration = params['estimated_duration'] | |
| sections = params['sections'] | |
| return ( | |
| f"Estimated duration: {duration:.1f} seconds", | |
| f"Verses: {sections['verse']}, Chorus: {sections['chorus']} (Expected full length including chorus)" | |
| ) | |
| # ์ด๋ฒคํธ ํธ๋ค๋ฌ | |
| lyrics_txt.change( | |
| fn=update_info, | |
| inputs=[lyrics_txt], | |
| outputs=[duration_info, sections_info] | |
| ) | |
| submit_btn.click( | |
| fn=infer, | |
| inputs=[genre_txt, lyrics_txt, num_segments, max_new_tokens], | |
| outputs=[music_out] | |
| ) | |
| return demo | |
| if __name__ == "__main__": | |
| demo = main() | |
| demo.queue(max_size=20).launch( | |
| server_name="0.0.0.0", | |
| server_port=7860, | |
| share=True, | |
| show_api=True, | |
| show_error=True, | |
| max_threads=2 | |
| ) |