Spaces:
Running
Running
| from __future__ import annotations | |
| import json | |
| import argparse | |
| from typing import Any | |
| from pprint import pprint | |
| from pathlib import Path | |
| from contextlib import suppress | |
| from dataclasses import dataclass, field, asdict | |
| import torch | |
| import pynvml | |
| import numpy as np | |
| from PIL import Image | |
| from transformers.trainer_utils import set_seed | |
| from diffusers import ( | |
| ModelMixin, # type: ignore | |
| DiffusionPipeline, # type: ignore | |
| AnimateDiffPipeline, # type: ignore | |
| DDIMScheduler, # type: ignore | |
| MotionAdapter, # type: ignore | |
| ) | |
| from diffusers.utils import export_to_gif # pyright: reportPrivateImportUsage=false | |
| from zeus.monitor import ZeusMonitor | |
| # Disable torch gradients globally | |
| torch.set_grad_enabled(False) | |
| class Results: | |
| model: str | |
| num_parameters: dict[str, int] | |
| gpu_model: str | |
| power_limit: int | |
| batch_size: int | |
| num_inference_steps: int | |
| num_frames: int | |
| num_prompts: int | |
| total_runtime: float = 0.0 | |
| total_energy: float = 0.0 | |
| average_batch_latency: float = 0.0 | |
| average_generations_per_second: float = 0.0 | |
| average_batch_energy: float = 0.0 | |
| average_power_consumption: float = 0.0 | |
| peak_memory: float = 0.0 | |
| results: list[Result] = field(default_factory=list, repr=False) | |
| class ResultIntermediateBatched: | |
| batch_latency: float = 0.0 | |
| batch_energy: float = 0.0 | |
| prompts: list[str] = field(default_factory=list) | |
| frames: np.ndarray | list[list[Image.Image]] = np.empty(0) | |
| class Result: | |
| batch_latency: float | |
| sample_energy: float | |
| prompt: str | |
| video_path: str | None | |
| def get_pipeline(model_id: str): | |
| """Instantiate a Diffusers pipeline from a modes's HuggingFace Hub ID.""" | |
| # Load args to give to `from_pretrained` from the model's kwargs.json file | |
| kwargs = build_kwargs(model_id) | |
| # Hack for AnimateDiff | |
| if "animatediff" in model_id: | |
| adapter = MotionAdapter.from_pretrained(model_id, **kwargs) | |
| sd_model_id = "emilianJR/epiCRealism" | |
| sd_kwargs = build_kwargs(sd_model_id) | |
| pipeline = AnimateDiffPipeline.from_pretrained(sd_model_id, motion_adapter=adapter, **sd_kwargs) | |
| scheduler = DDIMScheduler.from_pretrained( | |
| sd_model_id, | |
| subfolder="scheduler", | |
| clip_sample=False, | |
| timestep_spacing="linspace", | |
| beta_schedule="linear", | |
| steps_offset=1, | |
| ) | |
| pipeline.scheduler = scheduler | |
| pipeline = pipeline.to("cuda:0") | |
| print("\nInstantiated AnimateDiff pipeline:\n", pipeline) | |
| else: | |
| pipeline = DiffusionPipeline.from_pretrained(model_id, **kwargs).to("cuda:0") | |
| print("\nInstantiated pipeline via DiffusionPipeline:\n", pipeline) | |
| return pipeline | |
| def build_kwargs(model_id: str) -> dict: | |
| """Build the kwargs to pass to the model's `from_pretrained` method.""" | |
| kwargs = json.load(open(f"models/{model_id}/kwargs.json")) | |
| with suppress(KeyError): | |
| kwargs["torch_dtype"] = eval(kwargs["torch_dtype"]) | |
| # Add additional args | |
| kwargs["safety_checker"] = None | |
| kwargs["revision"] = open(f"models/{model_id}/revision.txt").read().strip() | |
| return kwargs | |
| def load_text_prompts( | |
| path: str, | |
| batch_size: int, | |
| num_batches: int | None = None, | |
| ) -> tuple[int, list[list[str]]]: | |
| """Load the dataset to feed the model and return it as a list of batches of prompts. | |
| Depending on the batch size, the final batch may not be full. The final batch | |
| is dropped in that case. If `num_batches` is not None, only that many batches | |
| is returned. If `num_batches` is None, all batches are returned. | |
| Returns: | |
| Total number of prompts and a list of batches of prompts. | |
| """ | |
| dataset = json.load(open(path))["caption"] * 10 | |
| if num_batches is not None: | |
| if len(dataset) < num_batches * batch_size: | |
| raise ValueError("Dataset is too small for the given number of batches.") | |
| dataset = dataset[:num_batches * batch_size] | |
| batched = [dataset[i : i + batch_size] for i in range(0, len(dataset), batch_size)] | |
| if len(batched[-1]) < batch_size: | |
| batched.pop() | |
| return len(batched) * batch_size, batched | |
| def count_parameters(pipeline) -> dict[str, int]: | |
| """Count the number of parameters in the given pipeline.""" | |
| num_params = {} | |
| for name, attr in vars(pipeline).items(): | |
| if isinstance(attr, ModelMixin): | |
| num_params[name] = attr.num_parameters(only_trainable=False, exclude_embeddings=True) | |
| elif isinstance(attr, torch.nn.Module): | |
| num_params[name] = sum(p.numel() for p in attr.parameters()) | |
| return num_params | |
| def benchmark(args: argparse.Namespace) -> None: | |
| if args.model.startswith("models/"): | |
| args.model = args.model[len("models/") :] | |
| if args.model.endswith("/"): | |
| args.model = args.model[:-1] | |
| set_seed(args.seed) | |
| results_dir = Path(args.result_root) / args.model | |
| results_dir.mkdir(parents=True, exist_ok=True) | |
| benchmark_name = str(results_dir / f"bs{args.batch_size}+pl{args.power_limit}+steps{args.num_inference_steps}") | |
| video_dir = results_dir / f"bs{args.batch_size}+pl{args.power_limit}+steps{args.num_inference_steps}+generated" | |
| video_dir.mkdir(exist_ok=True) | |
| arg_out_filename = f"{benchmark_name}+args.json" | |
| with open(arg_out_filename, "w") as f: | |
| f.write(json.dumps(vars(args), indent=2)) | |
| print(args) | |
| print("Benchmark args written to", arg_out_filename) | |
| zeus_monitor = ZeusMonitor() | |
| pynvml.nvmlInit() | |
| handle = pynvml.nvmlDeviceGetHandleByIndex(0) | |
| gpu_model = pynvml.nvmlDeviceGetName(handle) | |
| pynvml.nvmlDeviceSetPersistenceMode(handle, pynvml.NVML_FEATURE_ENABLED) | |
| pynvml.nvmlDeviceSetPowerManagementLimit(handle, args.power_limit * 1000) | |
| pynvml.nvmlShutdown() | |
| num_prompts, batched_prompts = load_text_prompts(args.dataset_path, args.batch_size, args.num_batches) | |
| pipeline = get_pipeline(args.model) | |
| # Warmup | |
| print("Warming up with two batches...") | |
| for i in range(2): | |
| _ = pipeline( | |
| prompt=batched_prompts[i], | |
| num_frames=args.num_frames, | |
| num_inference_steps=args.num_inference_steps, | |
| ) | |
| rng = torch.manual_seed(args.seed) | |
| intermediates: list[ResultIntermediateBatched] = [ | |
| ResultIntermediateBatched(prompts=batch) for batch in batched_prompts | |
| ] | |
| torch.cuda.reset_peak_memory_stats(device="cuda:0") | |
| zeus_monitor.begin_window("benchmark", sync_execution=False) | |
| # Build common parameter dict for all batches | |
| params: dict[str, Any] = dict( | |
| num_frames=args.num_frames, | |
| num_inference_steps=args.num_inference_steps, | |
| generator=rng, | |
| ) | |
| if args.height is not None: | |
| params["height"] = args.height | |
| if args.width is not None: | |
| params["width"] = args.width | |
| for ind, intermediate in enumerate(intermediates): | |
| print(f"Batch {ind + 1}/{len(intermediates)}") | |
| params["prompt"] = intermediate.prompts | |
| zeus_monitor.begin_window("batch", sync_execution=False) | |
| frames = pipeline(**params).frames | |
| batch_measurements = zeus_monitor.end_window("batch", sync_execution=False) | |
| intermediate.frames = frames | |
| intermediate.batch_latency = batch_measurements.time | |
| intermediate.batch_energy = batch_measurements.total_energy | |
| measurements = zeus_monitor.end_window("benchmark", sync_execution=False) | |
| peak_memory = torch.cuda.max_memory_allocated(device="cuda:0") | |
| results: list[Result] = [] | |
| ind = 0 | |
| for intermediate in intermediates: | |
| # Some pipelines just return a giant numpy array for all frames. | |
| # In that case, scale frames to uint8 [0, 256] and convert to PIL.Image | |
| if isinstance(intermediate.frames, np.ndarray): | |
| frames = [] | |
| for batch in intermediate.frames: | |
| frames.append( | |
| [Image.fromarray((frame * 255).astype(np.uint8)) for frame in batch] | |
| ) | |
| intermediate.frames = frames | |
| for frames, prompt in zip(intermediate.frames, intermediate.prompts, strict=True): | |
| if ind % args.save_every == 0: | |
| video_path = str(video_dir / f"{prompt[:200]}.gif") | |
| export_to_gif(frames, video_path) | |
| else: | |
| video_path = None | |
| results.append( | |
| Result( | |
| batch_latency=intermediate.batch_latency, | |
| sample_energy=intermediate.batch_energy / len(intermediate.prompts), | |
| prompt=prompt, | |
| video_path=video_path, | |
| ) | |
| ) | |
| ind += 1 | |
| final_results = Results( | |
| model=args.model, | |
| num_parameters=count_parameters(pipeline), | |
| gpu_model=gpu_model, | |
| power_limit=args.power_limit, | |
| batch_size=args.batch_size, | |
| num_inference_steps=args.num_inference_steps, | |
| num_frames=args.num_frames, | |
| num_prompts=num_prompts, | |
| total_runtime=measurements.time, | |
| total_energy=measurements.total_energy, | |
| average_batch_latency=measurements.time / len(batched_prompts), | |
| average_generations_per_second=num_prompts / measurements.time, | |
| average_batch_energy=measurements.total_energy / len(batched_prompts), | |
| average_power_consumption=measurements.total_energy / measurements.time, | |
| peak_memory=peak_memory, | |
| results=results, | |
| ) | |
| with open(f"{benchmark_name}+results.json", "w") as f: | |
| f.write(json.dumps(asdict(final_results), indent=2)) | |
| print("Benchmark results written to", f"{benchmark_name}+results.json") | |
| print("Benchmark results:") | |
| pprint(final_results) | |
| if __name__ == "__main__": | |
| parser = argparse.ArgumentParser() | |
| parser.add_argument("--model", type=str, required=True, help="The model to benchmark.") | |
| parser.add_argument("--dataset-path", type=str, help="Path to the dataset to use.") | |
| parser.add_argument("--result-root", type=str, help="The root directory to save results to.") | |
| parser.add_argument("--batch-size", type=int, default=1, help="The size of each batch of prompts.") | |
| parser.add_argument("--power-limit", type=int, default=300, help="The power limit to set for the GPU in Watts.") | |
| parser.add_argument("--num-inference-steps", type=int, default=50, help="The number of denoising steps.") | |
| parser.add_argument("--num-frames", type=int, default=16, help="The number of frames to generate.") | |
| parser.add_argument("--height", type=int, help="Height of the generated video.") | |
| parser.add_argument("--width", type=int, help="Width of the generated video.") | |
| parser.add_argument("--num-batches", type=int, default=None, help="The number of batches to use from the dataset.") | |
| parser.add_argument("--save-every", type=int, default=10, help="Save images to file every N prompts.") | |
| parser.add_argument("--seed", type=int, default=0, help="The seed to use for the RNG.") | |
| parser.add_argument("--huggingface-token", type=str, help="The HuggingFace token to use.") | |
| args = parser.parse_args() | |
| benchmark(args) | |