import os import subprocess import torchaudio from typing import Dict, Literal log_file = 'amt/log.txt' def prepare_media(source_path_or_url: os.PathLike, source_type: Literal['audio_filepath', 'youtube_url'], delete_video: bool = True, simulate = False) -> Dict: """prepare media from source path or youtube, and return audio info""" # Get audio_file if source_type == 'audio_filepath': audio_file = source_path_or_url elif source_type == 'youtube_url': if os.path.exists('/download/yt_audio.mp3'): os.remove('/download/yt_audio.mp3') # Download from youtube with open(log_file, 'w') as lf: audio_file = './downloaded/yt_audio' command = ['yt-dlp', '-x', source_path_or_url, '-f', 'bestaudio', '-o', audio_file, '--audio-format', 'mp3', '--restrict-filenames', '--extractor-retries', '10', '--force-overwrites', '--username', 'oauth2', '--password', '', '-v'] if simulate: command = command + ['-s'] process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True) for line in iter(process.stdout.readline, ''): # Filter out unnecessary messages print(line) if "www.google.com/device" in line: hl_text = line.replace("https://www.google.com/device", "\033[93mhttps://www.google.com/device\x1b[0m").split() hl_text[-1] = "\x1b[31;1m" + hl_text[-1] + "\x1b[0m" lf.write(' '.join(hl_text)); lf.flush() elif "Authorization successful" in line or "Video unavailable" in line: lf.write(line); lf.flush() process.stdout.close() process.wait() audio_file += '.mp3' else: raise ValueError(source_type) # Create info info = torchaudio.info(audio_file) return { "filepath": audio_file, "track_name": os.path.basename(audio_file).split('.')[0], "sample_rate": int(info.sample_rate), "bits_per_sample": int(info.bits_per_sample), "num_channels": int(info.num_channels), "num_frames": int(info.num_frames), "duration": int(info.num_frames / info.sample_rate), "encoding": str.lower(info.encoding), }