Spaces:
Running
Running
from __future__ import annotations | |
import os | |
import time | |
import json | |
import argparse | |
import multiprocessing as mp | |
from pprint import pprint | |
from pathlib import Path | |
from contextlib import suppress | |
from dataclasses import dataclass, field, asdict | |
import torch | |
import pynvml | |
import numpy as np | |
import pandas as pd | |
from PIL import Image | |
from datasets import load_dataset, Dataset | |
from transformers.trainer_utils import set_seed | |
from transformers import CLIPModel, CLIPProcessor | |
from diffusers import ( | |
ModelMixin, # type: ignore | |
AutoPipelineForText2Image, # type: ignore | |
DiffusionPipeline, # type: ignore | |
StableCascadeCombinedPipeline, # type: ignore | |
) | |
from zeus.monitor import ZeusMonitor | |
# Disable torch gradients globally | |
torch.set_grad_enabled(False) | |
CLIP = "openai/clip-vit-large-patch14" | |
class Results: | |
model: str | |
num_parameters: dict[str, int] | |
gpu_model: str | |
power_limit: int | |
batch_size: int | |
num_inference_steps: int | |
num_prompts: int | |
average_clip_score: float = 0.0 | |
total_runtime: float = 0.0 | |
total_energy: float = 0.0 | |
average_batch_latency: float = 0.0 | |
average_images_per_second: float = 0.0 | |
average_batch_energy: float = 0.0 | |
average_power_consumption: float = 0.0 | |
peak_memory: float = 0.0 | |
results: list[Result] = field(default_factory=list, repr=False) | |
class ResultIntermediateBatched: | |
batch_latency: float = 0.0 | |
batch_energy: float = 0.0 | |
prompts: list[str] = field(default_factory=list) | |
images: np.ndarray = np.empty(0) | |
class Result: | |
batch_latency: float | |
sample_energy: float | |
prompt: str | |
image_path: str | None | |
clip_score: float | |
def get_pipeline(model_id: str): | |
"""Instantiate a Diffusers pipeline from a modes's HuggingFace Hub ID.""" | |
# Load args to give to `from_pretrained` from the model's kwargs.json file | |
kwargs = json.load(open(f"models/{model_id}/kwargs.json")) | |
with suppress(KeyError): | |
kwargs["torch_dtype"] = eval(kwargs["torch_dtype"]) | |
# Add additional args | |
kwargs["safety_checker"] = None | |
kwargs["revision"] = open(f"models/{model_id}/revision.txt").read().strip() | |
# Hack for stable-cascade, which defaults to only a part of the model. | |
if model_id == "stabilityai/stable-cascade": | |
pipeline = StableCascadeCombinedPipeline.from_pretrained(model_id, **kwargs).to("cuda:0") | |
print("\nInstantiated pipeline via StableCascadeCombinedPipeline:\n", pipeline) | |
else: | |
try: | |
pipeline = AutoPipelineForText2Image.from_pretrained(model_id, **kwargs).to("cuda:0") | |
print("\nInstantiated pipeline via AutoPipelineForText2Image:\n", pipeline) | |
except ValueError: | |
pipeline = DiffusionPipeline.from_pretrained(model_id, **kwargs).to("cuda:0") | |
print("\nInstantiated pipeline via DiffusionPipeline:\n", pipeline) | |
return pipeline | |
def load_partiprompts( | |
batch_size: int, | |
seed: int, | |
num_batches: int | None = None, | |
) -> tuple[int, list[list[str]]]: | |
"""Load the parti-prompts dataset and return it as a list of batches of prompts. | |
Depending on the batch size, the final batch may not be full. The final batch | |
is dropped in that case. If `num_batches` is not None, only that many batches | |
is returned. If `num_batches` is None, all batches are returned. | |
Returns: | |
Total number of prompts and a list of batches of prompts. | |
""" | |
dataset = load_dataset("nateraw/parti-prompts", split="train").shuffle(seed=seed) | |
assert isinstance(dataset, Dataset) | |
if num_batches is not None: | |
dataset = dataset.select(range(min(num_batches * batch_size, len(dataset)))) | |
prompts: list[str] = dataset["Prompt"] | |
batched = [prompts[i : i + batch_size] for i in range(0, len(prompts), batch_size)] | |
if len(batched[-1]) < batch_size: | |
batched.pop() | |
return len(batched) * batch_size, batched | |
def power_monitor(csv_path: str, gpu_indices: list[int], chan: mp.SimpleQueue) -> None: | |
pynvml.nvmlInit() | |
handles = [pynvml.nvmlDeviceGetHandleByIndex(i) for i in gpu_indices] | |
fields = [ | |
(pynvml.NVML_FI_DEV_POWER_AVERAGE, pynvml.NVML_POWER_SCOPE_GPU), | |
(pynvml.NVML_FI_DEV_POWER_AVERAGE, pynvml.NVML_POWER_SCOPE_MEMORY), | |
] | |
columns = ["timestamp"] + sum([[f"gpu{i}", f"vram{i}"] for i in gpu_indices], []) | |
power: list[list] = [] | |
while chan.empty(): | |
row = [time.monotonic()] | |
values = [pynvml.nvmlDeviceGetFieldValues(h, fields) for h in handles] | |
for value in values: | |
row.extend((value[0].value.uiVal, value[1].value.uiVal)) | |
power.append(row) | |
time.sleep(max(0.0, 0.1 - (time.monotonic() - row[0]))) | |
pd.DataFrame(power, columns=columns).to_csv(csv_path, index=False) | |
def calculate_clip_score( | |
model: CLIPModel, | |
processor: CLIPProcessor, | |
images_np: np.ndarray, | |
text: list[str], | |
) -> torch.Tensor: | |
"""Calculate the CLIP score for each image and prompt pair. | |
`images_np` is assumed to be already scaled to [0, 255] and in uint8 format. | |
Returns: | |
The clip score of each image and prompt as a list of floats. | |
Tensor shape is (batch size,). | |
""" | |
model = model.to("cuda:0") | |
images = list(torch.from_numpy(images_np).permute(0, 3, 1, 2)) | |
assert len(images) == len(text) | |
processed_input = processor(text=text, images=images, return_tensors="pt", padding=True) | |
img_features = model.get_image_features(processed_input["pixel_values"].to("cuda:0")) | |
img_features = img_features / img_features.norm(p=2, dim=-1, keepdim=True) | |
max_position_embeddings = model.config.text_config.max_position_embeddings | |
if processed_input["attention_mask"].shape[-1] > max_position_embeddings: | |
print( | |
f"Input attention mask is larger than max_position_embeddings. " | |
f"Truncating the attention mask to {max_position_embeddings}." | |
) | |
processed_input["attention_mask"] = processed_input["attention_mask"][..., :max_position_embeddings] | |
processed_input["input_ids"] = processed_input["input_ids"][..., :max_position_embeddings] | |
txt_features = model.get_text_features( | |
processed_input["input_ids"].to("cuda:0"), processed_input["attention_mask"].to("cuda:0") | |
) | |
txt_features = txt_features / txt_features.norm(p=2, dim=-1, keepdim=True) | |
scores = 100 * (img_features * txt_features).sum(axis=-1) | |
scores = torch.max(scores, torch.zeros_like(scores)) | |
return scores | |
def count_parameters(pipeline) -> dict[str, int]: | |
"""Count the number of parameters in the given pipeline.""" | |
num_params = {} | |
for name, attr in vars(pipeline).items(): | |
if isinstance(attr, ModelMixin): | |
num_params[name] = attr.num_parameters(only_trainable=False, exclude_embeddings=True) | |
elif isinstance(attr, torch.nn.Module): | |
num_params[name] = sum(p.numel() for p in attr.parameters()) | |
return num_params | |
def benchmark(args: argparse.Namespace) -> None: | |
os.environ["HF_TOKEN"] = args.huggingface_token | |
if args.model.startswith("models/"): | |
args.model = args.model[len("models/") :] | |
if args.model.endswith("/"): | |
args.model = args.model[:-1] | |
set_seed(args.seed) | |
results_dir = Path(args.result_root) / args.model | |
results_dir.mkdir(parents=True, exist_ok=True) | |
benchmark_name = str(results_dir / f"bs{args.batch_size}+pl{args.power_limit}+steps{args.num_inference_steps}") | |
image_dir = results_dir / f"bs{args.batch_size}+pl{args.power_limit}+steps{args.num_inference_steps}+generated" | |
image_dir.mkdir(exist_ok=True) | |
arg_out_filename = f"{benchmark_name}+args.json" | |
with open(arg_out_filename, "w") as f: | |
f.write(json.dumps(vars(args), indent=2)) | |
print(args) | |
print("Benchmark args written to", arg_out_filename) | |
zeus_monitor = ZeusMonitor() | |
pynvml.nvmlInit() | |
handle = pynvml.nvmlDeviceGetHandleByIndex(0) | |
gpu_model = pynvml.nvmlDeviceGetName(handle) | |
pynvml.nvmlDeviceSetPersistenceMode(handle, pynvml.NVML_FEATURE_ENABLED) | |
pynvml.nvmlDeviceSetPowerManagementLimit(handle, args.power_limit * 1000) | |
pynvml.nvmlShutdown() | |
num_prompts, batched_prompts = load_partiprompts(args.batch_size, args.seed, args.num_batches) | |
pipeline = get_pipeline(args.model) | |
# Warmup | |
print("Warming up with five batches...") | |
for i in range(5): | |
_ = pipeline( | |
batched_prompts[i], | |
num_inference_steps=args.num_inference_steps, | |
output_type="np", | |
) | |
rng = torch.manual_seed(args.seed) | |
images = [] | |
intermediates: list[ResultIntermediateBatched] = [ | |
ResultIntermediateBatched(prompts=batch) for batch in batched_prompts | |
] | |
pmon = None | |
pmon_chan = None | |
if args.monitor_power: | |
pmon_chan = mp.SimpleQueue() | |
pmon = mp.get_context("spawn").Process( | |
target=power_monitor, | |
args=(f"{benchmark_name}+power.csv", [g.gpu_index for g in zeus_monitor.gpus.gpus], pmon_chan), | |
) | |
pmon.start() | |
torch.cuda.reset_peak_memory_stats(device="cuda:0") | |
zeus_monitor.begin_window("benchmark", sync_execution=False) | |
for ind, intermediate in enumerate(intermediates): | |
print(f"Batch {ind + 1}/{len(intermediates)}") | |
zeus_monitor.begin_window("batch", sync_execution=False) | |
images = pipeline( | |
intermediate.prompts, | |
generator=rng, | |
num_inference_steps=args.num_inference_steps, | |
output_type="np", | |
).images | |
batch_measurements = zeus_monitor.end_window("batch", sync_execution=False) | |
intermediate.images = images | |
intermediate.batch_latency = batch_measurements.time | |
intermediate.batch_energy = batch_measurements.total_energy | |
measurements = zeus_monitor.end_window("benchmark", sync_execution=False) | |
peak_memory = torch.cuda.max_memory_allocated(device="cuda:0") | |
if pmon is not None and pmon_chan is not None: | |
pmon_chan.put("stop") | |
pmon.join(timeout=5.0) | |
pmon.terminate() | |
# Scale images to [0, 256] and convert to uint8 | |
for intermediate in intermediates: | |
intermediate.images = (intermediate.images * 255).astype("uint8") | |
# Compute the CLIP score for each image and prompt pair. | |
# Code was mostly inspired from torchmetrics.multimodal.clip_score, but | |
# adapted here to calculate the CLIP score for each image and prompt pair. | |
clip_model: CLIPModel = CLIPModel.from_pretrained(CLIP).cuda() # type: ignore | |
clip_processor: CLIPProcessor = CLIPProcessor.from_pretrained(CLIP) # type: ignore | |
batch_clip_scores = [] | |
for intermediate in intermediates: | |
clip_score = calculate_clip_score( | |
clip_model, | |
clip_processor, | |
intermediate.images, | |
intermediate.prompts, | |
) | |
batch_clip_scores.append(clip_score.tolist()) | |
results: list[Result] = [] | |
ind = 0 | |
for intermediate, batch_clip_score in zip(intermediates, batch_clip_scores, strict=True): | |
for image, prompt, clip_score in zip( | |
intermediate.images, | |
intermediate.prompts, | |
batch_clip_score, | |
strict=True, | |
): | |
if ind % args.image_save_every == 0: | |
image_path = str(image_dir / f"{prompt}.png") | |
Image.fromarray(image).save(image_path) | |
else: | |
image_path = None | |
results.append( | |
Result( | |
batch_latency=intermediate.batch_latency, | |
sample_energy=intermediate.batch_energy / len(intermediate.prompts), | |
prompt=prompt, | |
image_path=image_path, | |
clip_score=clip_score, | |
) | |
) | |
ind += 1 | |
final_results = Results( | |
model=args.model, | |
num_parameters=count_parameters(pipeline), | |
gpu_model=gpu_model, | |
power_limit=args.power_limit, | |
batch_size=args.batch_size, | |
num_inference_steps=args.num_inference_steps, | |
num_prompts=num_prompts, | |
average_clip_score=sum(r.clip_score for r in results) / len(results), | |
total_runtime=measurements.time, | |
total_energy=measurements.total_energy, | |
average_batch_latency=measurements.time / len(batched_prompts), | |
average_images_per_second=num_prompts / measurements.time, | |
average_batch_energy=measurements.total_energy / len(batched_prompts), | |
average_power_consumption=measurements.total_energy / measurements.time, | |
peak_memory=peak_memory, | |
results=results, | |
) | |
with open(f"{benchmark_name}+results.json", "w") as f: | |
f.write(json.dumps(asdict(final_results), indent=2)) | |
print("Benchmark results written to", f"{benchmark_name}+results.json") | |
print("Benchmark results:") | |
pprint(final_results) | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser() | |
parser.add_argument("--model", type=str, required=True, help="The model to benchmark.") | |
parser.add_argument("--result-root", type=str, help="The root directory to save results to.") | |
parser.add_argument("--batch-size", type=int, default=1, help="The size of each batch of prompts.") | |
parser.add_argument("--power-limit", type=int, default=300, help="The power limit to set for the GPU in Watts.") | |
parser.add_argument("--num-inference-steps", type=int, default=50, help="The number of denoising steps.") | |
parser.add_argument("--num-batches", type=int, default=None, help="The number of batches to use from the dataset.") | |
parser.add_argument("--image-save-every", type=int, default=10, help="Save images to file every N prompts.") | |
parser.add_argument("--seed", type=int, default=0, help="The seed to use for the RNG.") | |
parser.add_argument("--huggingface-token", type=str, help="The HuggingFace token to use.") | |
parser.add_argument("--monitor-power", default=False, action="store_true", help="Whether to monitor power over time.") | |
args = parser.parse_args() | |
benchmark(args) | |