Spaces:
Runtime error
Runtime error
| from pathlib import Path | |
| import random | |
| from typing import List | |
| import tempfile | |
| import subprocess | |
| import argbind | |
| from tqdm import tqdm | |
| import argbind | |
| from vampnet.interface import Interface | |
| import audiotools as at | |
| Interface: Interface = argbind.bind(Interface) | |
| def calculate_bitrate( | |
| interface, num_codebooks, | |
| downsample_factor | |
| ): | |
| bit_width = 10 | |
| sr = interface.codec.sample_rate | |
| hop = interface.codec.hop_size | |
| rate = (sr / hop) * ((bit_width * num_codebooks) / downsample_factor) | |
| return rate | |
| def baseline(sig, interface): | |
| return interface.preprocess(sig) | |
| def reconstructed(sig, interface): | |
| return interface.to_signal( | |
| interface.encode(sig) | |
| ) | |
| def coarse2fine(sig, interface): | |
| z = interface.encode(sig) | |
| z = z[:, :interface.c2f.n_conditioning_codebooks, :] | |
| z = interface.coarse_to_fine(z) | |
| return interface.to_signal(z) | |
| def coarse2fine_argmax(sig, interface): | |
| z = interface.encode(sig) | |
| z = z[:, :interface.c2f.n_conditioning_codebooks, :] | |
| z = interface.coarse_to_fine(z, | |
| sample="argmax", sampling_steps=1, | |
| temperature=1.0 | |
| ) | |
| return interface.to_signal(z) | |
| class CoarseCond: | |
| def __init__(self, num_codebooks, downsample_factor): | |
| self.num_codebooks = num_codebooks | |
| self.downsample_factor = downsample_factor | |
| def __call__(self, sig, interface): | |
| n_conditioning_codebooks = interface.coarse.n_codebooks - self.num_codebooks | |
| zv = interface.coarse_vamp_v2(sig, | |
| n_conditioning_codebooks=n_conditioning_codebooks, | |
| downsample_factor=self.downsample_factor | |
| ) | |
| zv = interface.coarse_to_fine(zv) | |
| return interface.to_signal(zv) | |
| def opus(sig, interface, bitrate=128): | |
| sig = interface.preprocess(sig) | |
| with tempfile.NamedTemporaryFile(suffix=".wav") as f: | |
| sig.write(f.name) | |
| opus_name = Path(f.name).with_suffix(".opus") | |
| # convert to opus | |
| cmd = [ | |
| "ffmpeg", "-y", "-i", f.name, | |
| "-c:a", "libopus", | |
| "-b:a", f"{bitrate}", | |
| opus_name | |
| ] | |
| subprocess.run(cmd, check=True) | |
| # convert back to wav | |
| output_name = Path(f"{f.name}-opus").with_suffix(".wav") | |
| cmd = [ | |
| "ffmpeg", "-y", "-i", opus_name, | |
| output_name | |
| ] | |
| subprocess.run(cmd, check=True) | |
| sig = at.AudioSignal( | |
| output_name, | |
| sample_rate=sig.sample_rate | |
| ) | |
| return sig | |
| COARSE_SAMPLE_CONDS ={ | |
| "baseline": baseline, | |
| "reconstructed": reconstructed, | |
| "coarse2fine": coarse2fine, | |
| **{ | |
| f"{n}_codebooks_downsampled_{x}x": CoarseCond(num_codebooks=n, downsample_factor=x) | |
| for (n, x) in ( | |
| (4, 2), # 4 codebooks, downsampled 2x, | |
| (2, 2), # 2 codebooks, downsampled 2x | |
| (1, None), # 1 codebook, no downsampling | |
| (4, 4), # 4 codebooks, downsampled 4x | |
| (1, 2), # 1 codebook, downsampled 2x, | |
| (4, 6), # 4 codebooks, downsampled 6x | |
| (4, 8), # 4 codebooks, downsampled 8x | |
| (4, 16), # 4 codebooks, downsampled 16x | |
| (4, 32), # 4 codebooks, downsampled 16x | |
| ) | |
| }, | |
| } | |
| OPUS_JAZZPOP_SAMPLE_CONDS = { | |
| f"opus_{bitrate}": lambda sig, interface: opus(sig, interface, bitrate=bitrate) | |
| for bitrate in [5620, 1875, 1250, 625] | |
| } | |
| OPUS_SPOTDL_SAMPLE_CONDS = { | |
| f"opus_{bitrate}": lambda sig, interface: opus(sig, interface, bitrate=bitrate) | |
| for bitrate in [8036, 2296, 1148, 574] | |
| } | |
| C2F_SAMPLE_CONDS = { | |
| "baseline": baseline, | |
| "reconstructed": reconstructed, | |
| "coarse2fine": coarse2fine, | |
| "coarse2fine_argmax": coarse2fine_argmax, | |
| } | |
| def main( | |
| sources=[ | |
| "/data/spotdl/audio/val", "/data/spotdl/audio/test" | |
| ], | |
| output_dir: str = "./samples", | |
| max_excerpts: int = 5000, | |
| exp_type: str = "coarse", | |
| seed: int = 0, | |
| ): | |
| at.util.seed(seed) | |
| interface = Interface() | |
| output_dir = Path(output_dir) | |
| output_dir.mkdir(exist_ok=True, parents=True) | |
| from audiotools.data.datasets import AudioLoader, AudioDataset | |
| loader = AudioLoader(sources=sources, shuffle_state=seed) | |
| dataset = AudioDataset(loader, | |
| sample_rate=interface.codec.sample_rate, | |
| duration=interface.coarse.chunk_size_s, | |
| n_examples=max_excerpts, | |
| without_replacement=True, | |
| ) | |
| if exp_type == "opus-jazzpop": | |
| SAMPLE_CONDS = OPUS_JAZZPOP_SAMPLE_CONDS | |
| elif exp_type == "opus-spotdl": | |
| SAMPLE_CONDS = OPUS_SPOTDL_SAMPLE_CONDS | |
| elif exp_type == "coarse": | |
| SAMPLE_CONDS = COARSE_SAMPLE_CONDS | |
| elif exp_type == "c2f": | |
| SAMPLE_CONDS = C2F_SAMPLE_CONDS | |
| else: | |
| raise ValueError(f"Unknown exp_type {exp_type}") | |
| indices = list(range(max_excerpts)) | |
| random.shuffle(indices) | |
| for i in tqdm(indices): | |
| # if all our files are already there, skip | |
| # done = [] | |
| # for name in SAMPLE_CONDS: | |
| # o_dir = Path(output_dir) / name | |
| # done.append((o_dir / f"{i}.wav").exists()) | |
| # if all(done): | |
| # continue | |
| sig = dataset[i]["signal"] | |
| results = { | |
| name: cond(sig, interface).cpu() | |
| for name, cond in SAMPLE_CONDS.items() | |
| } | |
| for name, sig in results.items(): | |
| o_dir = Path(output_dir) / name | |
| o_dir.mkdir(exist_ok=True, parents=True) | |
| sig.write(o_dir / f"{i}.wav") | |
| if __name__ == "__main__": | |
| args = argbind.parse_args() | |
| with argbind.scope(args): | |
| main() | |