Spaces:
Running
Running
#!/usr/bin/env python | |
# Copyright 2024 The HuggingFace Inc. team. All rights reserved. | |
# | |
# Licensed under the Apache License, Version 2.0 (the "License"); | |
# you may not use this file except in compliance with the License. | |
# You may obtain a copy of the License at | |
# | |
# http://www.apache.org/licenses/LICENSE-2.0 | |
# | |
# Unless required by applicable law or agreed to in writing, software | |
# distributed under the License is distributed on an "AS IS" BASIS, | |
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
# See the License for the specific language governing permissions and | |
# limitations under the License. | |
"""Evaluate a policy on an environment by running rollouts and computing metrics. | |
Usage examples: | |
You want to evaluate a model from the hub (eg: https://huggingface.co/lerobot/diffusion_pusht) | |
for 10 episodes. | |
``` | |
python lerobot/scripts/eval.py \ | |
--policy.path=lerobot/diffusion_pusht \ | |
--env.type=pusht \ | |
--eval.batch_size=10 \ | |
--eval.n_episodes=10 \ | |
--use_amp=false \ | |
--device=cuda | |
``` | |
OR, you want to evaluate a model checkpoint from the LeRobot training script for 10 episodes. | |
``` | |
python lerobot/scripts/eval.py \ | |
--policy.path=outputs/train/diffusion_pusht/checkpoints/005000/pretrained_model \ | |
--env.type=pusht \ | |
--eval.batch_size=10 \ | |
--eval.n_episodes=10 \ | |
--use_amp=false \ | |
--device=cuda | |
``` | |
Note that in both examples, the repo/folder should contain at least `config.json` and `model.safetensors` files. | |
You can learn about the CLI options for this script in the `EvalPipelineConfig` in lerobot/configs/eval.py | |
""" | |
import json | |
import logging | |
import threading | |
import time | |
from contextlib import nullcontext | |
from copy import deepcopy | |
from dataclasses import asdict | |
from pathlib import Path | |
from pprint import pformat | |
from typing import Callable | |
import einops | |
import gymnasium as gym | |
import numpy as np | |
import torch | |
from termcolor import colored | |
from torch import Tensor, nn | |
from tqdm import trange | |
from lerobot.common.envs.factory import make_env | |
from lerobot.common.envs.utils import add_envs_task, check_env_attributes_and_types, preprocess_observation | |
from lerobot.common.policies.factory import make_policy | |
from lerobot.common.policies.pretrained import PreTrainedPolicy | |
from lerobot.common.policies.utils import get_device_from_parameters | |
from lerobot.common.utils.io_utils import write_video | |
from lerobot.common.utils.random_utils import set_seed | |
from lerobot.common.utils.utils import ( | |
get_safe_torch_device, | |
init_logging, | |
inside_slurm, | |
) | |
from lerobot.configs import parser | |
from lerobot.configs.eval import EvalPipelineConfig | |
def rollout( | |
env: gym.vector.VectorEnv, | |
policy: PreTrainedPolicy, | |
seeds: list[int] | None = None, | |
return_observations: bool = False, | |
render_callback: Callable[[gym.vector.VectorEnv], None] | None = None, | |
) -> dict: | |
"""Run a batched policy rollout once through a batch of environments. | |
Note that all environments in the batch are run until the last environment is done. This means some | |
data will probably need to be discarded (for environments that aren't the first one to be done). | |
The return dictionary contains: | |
(optional) "observation": A a dictionary of (batch, sequence + 1, *) tensors mapped to observation | |
keys. NOTE the that this has an extra sequence element relative to the other keys in the | |
dictionary. This is because an extra observation is included for after the environment is | |
terminated or truncated. | |
"action": A (batch, sequence, action_dim) tensor of actions applied based on the observations (not | |
including the last observations). | |
"reward": A (batch, sequence) tensor of rewards received for applying the actions. | |
"success": A (batch, sequence) tensor of success conditions (the only time this can be True is upon | |
environment termination/truncation). | |
"done": A (batch, sequence) tensor of **cumulative** done conditions. For any given batch element, | |
the first True is followed by True's all the way till the end. This can be used for masking | |
extraneous elements from the sequences above. | |
Args: | |
env: The batch of environments. | |
policy: The policy. Must be a PyTorch nn module. | |
seeds: The environments are seeded once at the start of the rollout. If provided, this argument | |
specifies the seeds for each of the environments. | |
return_observations: Whether to include all observations in the returned rollout data. Observations | |
are returned optionally because they typically take more memory to cache. Defaults to False. | |
render_callback: Optional rendering callback to be used after the environments are reset, and after | |
every step. | |
Returns: | |
The dictionary described above. | |
""" | |
assert isinstance(policy, nn.Module), "Policy must be a PyTorch nn module." | |
device = get_device_from_parameters(policy) | |
# Reset the policy and environments. | |
policy.reset() | |
observation, info = env.reset(seed=seeds) | |
if render_callback is not None: | |
render_callback(env) | |
all_observations = [] | |
all_actions = [] | |
all_rewards = [] | |
all_successes = [] | |
all_dones = [] | |
step = 0 | |
# Keep track of which environments are done. | |
done = np.array([False] * env.num_envs) | |
max_steps = env.call("_max_episode_steps")[0] | |
progbar = trange( | |
max_steps, | |
desc=f"Running rollout with at most {max_steps} steps", | |
disable=inside_slurm(), # we dont want progress bar when we use slurm, since it clutters the logs | |
leave=False, | |
) | |
check_env_attributes_and_types(env) | |
while not np.all(done): | |
# Numpy array to tensor and changing dictionary keys to LeRobot policy format. | |
observation = preprocess_observation(observation) | |
if return_observations: | |
all_observations.append(deepcopy(observation)) | |
observation = { | |
key: observation[key].to(device, non_blocking=device.type == "cuda") for key in observation | |
} | |
# Infer "task" from attributes of environments. | |
# TODO: works with SyncVectorEnv but not AsyncVectorEnv | |
observation = add_envs_task(env, observation) | |
with torch.inference_mode(): | |
action = policy.select_action(observation) | |
# Convert to CPU / numpy. | |
action = action.to("cpu").numpy() | |
assert action.ndim == 2, "Action dimensions should be (batch, action_dim)" | |
# Apply the next action. | |
observation, reward, terminated, truncated, info = env.step(action) | |
if render_callback is not None: | |
render_callback(env) | |
# VectorEnv stores is_success in `info["final_info"][env_index]["is_success"]`. "final_info" isn't | |
# available of none of the envs finished. | |
if "final_info" in info: | |
successes = [info["is_success"] if info is not None else False for info in info["final_info"]] | |
else: | |
successes = [False] * env.num_envs | |
# Keep track of which environments are done so far. | |
done = terminated | truncated | done | |
all_actions.append(torch.from_numpy(action)) | |
all_rewards.append(torch.from_numpy(reward)) | |
all_dones.append(torch.from_numpy(done)) | |
all_successes.append(torch.tensor(successes)) | |
step += 1 | |
running_success_rate = ( | |
einops.reduce(torch.stack(all_successes, dim=1), "b n -> b", "any").numpy().mean() | |
) | |
progbar.set_postfix({"running_success_rate": f"{running_success_rate.item() * 100:.1f}%"}) | |
progbar.update() | |
# Track the final observation. | |
if return_observations: | |
observation = preprocess_observation(observation) | |
all_observations.append(deepcopy(observation)) | |
# Stack the sequence along the first dimension so that we have (batch, sequence, *) tensors. | |
ret = { | |
"action": torch.stack(all_actions, dim=1), | |
"reward": torch.stack(all_rewards, dim=1), | |
"success": torch.stack(all_successes, dim=1), | |
"done": torch.stack(all_dones, dim=1), | |
} | |
if return_observations: | |
stacked_observations = {} | |
for key in all_observations[0]: | |
stacked_observations[key] = torch.stack([obs[key] for obs in all_observations], dim=1) | |
ret["observation"] = stacked_observations | |
if hasattr(policy, "use_original_modules"): | |
policy.use_original_modules() | |
return ret | |
def eval_policy( | |
env: gym.vector.VectorEnv, | |
policy: PreTrainedPolicy, | |
n_episodes: int, | |
max_episodes_rendered: int = 0, | |
videos_dir: Path | None = None, | |
return_episode_data: bool = False, | |
start_seed: int | None = None, | |
) -> dict: | |
""" | |
Args: | |
env: The batch of environments. | |
policy: The policy. | |
n_episodes: The number of episodes to evaluate. | |
max_episodes_rendered: Maximum number of episodes to render into videos. | |
videos_dir: Where to save rendered videos. | |
return_episode_data: Whether to return episode data for online training. Incorporates the data into | |
the "episodes" key of the returned dictionary. | |
start_seed: The first seed to use for the first individual rollout. For all subsequent rollouts the | |
seed is incremented by 1. If not provided, the environments are not manually seeded. | |
Returns: | |
Dictionary with metrics and data regarding the rollouts. | |
""" | |
if max_episodes_rendered > 0 and not videos_dir: | |
raise ValueError("If max_episodes_rendered > 0, videos_dir must be provided.") | |
if not isinstance(policy, PreTrainedPolicy): | |
raise ValueError( | |
f"Policy of type 'PreTrainedPolicy' is expected, but type '{type(policy)}' was provided." | |
) | |
start = time.time() | |
policy.eval() | |
# Determine how many batched rollouts we need to get n_episodes. Note that if n_episodes is not evenly | |
# divisible by env.num_envs we end up discarding some data in the last batch. | |
n_batches = n_episodes // env.num_envs + int((n_episodes % env.num_envs) != 0) | |
# Keep track of some metrics. | |
sum_rewards = [] | |
max_rewards = [] | |
all_successes = [] | |
all_seeds = [] | |
threads = [] # for video saving threads | |
n_episodes_rendered = 0 # for saving the correct number of videos | |
# Callback for visualization. | |
def render_frame(env: gym.vector.VectorEnv): | |
# noqa: B023 | |
if n_episodes_rendered >= max_episodes_rendered: | |
return | |
n_to_render_now = min(max_episodes_rendered - n_episodes_rendered, env.num_envs) | |
if isinstance(env, gym.vector.SyncVectorEnv): | |
ep_frames.append(np.stack([env.envs[i].render() for i in range(n_to_render_now)])) # noqa: B023 | |
elif isinstance(env, gym.vector.AsyncVectorEnv): | |
# Here we must render all frames and discard any we don't need. | |
ep_frames.append(np.stack(env.call("render")[:n_to_render_now])) | |
if max_episodes_rendered > 0: | |
video_paths: list[str] = [] | |
if return_episode_data: | |
episode_data: dict | None = None | |
# we dont want progress bar when we use slurm, since it clutters the logs | |
progbar = trange(n_batches, desc="Stepping through eval batches", disable=inside_slurm()) | |
for batch_ix in progbar: | |
# Cache frames for rendering videos. Each item will be (b, h, w, c), and the list indexes the rollout | |
# step. | |
if max_episodes_rendered > 0: | |
ep_frames: list[np.ndarray] = [] | |
if start_seed is None: | |
seeds = None | |
else: | |
seeds = range( | |
start_seed + (batch_ix * env.num_envs), start_seed + ((batch_ix + 1) * env.num_envs) | |
) | |
rollout_data = rollout( | |
env, | |
policy, | |
seeds=list(seeds) if seeds else None, | |
return_observations=return_episode_data, | |
render_callback=render_frame if max_episodes_rendered > 0 else None, | |
) | |
# Figure out where in each rollout sequence the first done condition was encountered (results after | |
# this won't be included). | |
n_steps = rollout_data["done"].shape[1] | |
# Note: this relies on a property of argmax: that it returns the first occurrence as a tiebreaker. | |
done_indices = torch.argmax(rollout_data["done"].to(int), dim=1) | |
# Make a mask with shape (batch, n_steps) to mask out rollout data after the first done | |
# (batch-element-wise). Note the `done_indices + 1` to make sure to keep the data from the done step. | |
mask = (torch.arange(n_steps) <= einops.repeat(done_indices + 1, "b -> b s", s=n_steps)).int() | |
# Extend metrics. | |
batch_sum_rewards = einops.reduce((rollout_data["reward"] * mask), "b n -> b", "sum") | |
sum_rewards.extend(batch_sum_rewards.tolist()) | |
batch_max_rewards = einops.reduce((rollout_data["reward"] * mask), "b n -> b", "max") | |
max_rewards.extend(batch_max_rewards.tolist()) | |
batch_successes = einops.reduce((rollout_data["success"] * mask), "b n -> b", "any") | |
all_successes.extend(batch_successes.tolist()) | |
if seeds: | |
all_seeds.extend(seeds) | |
else: | |
all_seeds.append(None) | |
# FIXME: episode_data is either None or it doesn't exist | |
if return_episode_data: | |
this_episode_data = _compile_episode_data( | |
rollout_data, | |
done_indices, | |
start_episode_index=batch_ix * env.num_envs, | |
start_data_index=(0 if episode_data is None else (episode_data["index"][-1].item() + 1)), | |
fps=env.unwrapped.metadata["render_fps"], | |
) | |
if episode_data is None: | |
episode_data = this_episode_data | |
else: | |
# Some sanity checks to make sure we are correctly compiling the data. | |
assert episode_data["episode_index"][-1] + 1 == this_episode_data["episode_index"][0] | |
assert episode_data["index"][-1] + 1 == this_episode_data["index"][0] | |
# Concatenate the episode data. | |
episode_data = {k: torch.cat([episode_data[k], this_episode_data[k]]) for k in episode_data} | |
# Maybe render video for visualization. | |
if max_episodes_rendered > 0 and len(ep_frames) > 0: | |
batch_stacked_frames = np.stack(ep_frames, axis=1) # (b, t, *) | |
for stacked_frames, done_index in zip( | |
batch_stacked_frames, done_indices.flatten().tolist(), strict=False | |
): | |
if n_episodes_rendered >= max_episodes_rendered: | |
break | |
videos_dir.mkdir(parents=True, exist_ok=True) | |
video_path = videos_dir / f"eval_episode_{n_episodes_rendered}.mp4" | |
video_paths.append(str(video_path)) | |
thread = threading.Thread( | |
target=write_video, | |
args=( | |
str(video_path), | |
stacked_frames[: done_index + 1], # + 1 to capture the last observation | |
env.unwrapped.metadata["render_fps"], | |
), | |
) | |
thread.start() | |
threads.append(thread) | |
n_episodes_rendered += 1 | |
progbar.set_postfix( | |
{"running_success_rate": f"{np.mean(all_successes[:n_episodes]).item() * 100:.1f}%"} | |
) | |
# Wait till all video rendering threads are done. | |
for thread in threads: | |
thread.join() | |
# Compile eval info. | |
info = { | |
"per_episode": [ | |
{ | |
"episode_ix": i, | |
"sum_reward": sum_reward, | |
"max_reward": max_reward, | |
"success": success, | |
"seed": seed, | |
} | |
for i, (sum_reward, max_reward, success, seed) in enumerate( | |
zip( | |
sum_rewards[:n_episodes], | |
max_rewards[:n_episodes], | |
all_successes[:n_episodes], | |
all_seeds[:n_episodes], | |
strict=True, | |
) | |
) | |
], | |
"aggregated": { | |
"avg_sum_reward": float(np.nanmean(sum_rewards[:n_episodes])), | |
"avg_max_reward": float(np.nanmean(max_rewards[:n_episodes])), | |
"pc_success": float(np.nanmean(all_successes[:n_episodes]) * 100), | |
"eval_s": time.time() - start, | |
"eval_ep_s": (time.time() - start) / n_episodes, | |
}, | |
} | |
if return_episode_data: | |
info["episodes"] = episode_data | |
if max_episodes_rendered > 0: | |
info["video_paths"] = video_paths | |
return info | |
def _compile_episode_data( | |
rollout_data: dict, done_indices: Tensor, start_episode_index: int, start_data_index: int, fps: float | |
) -> dict: | |
"""Convenience function for `eval_policy(return_episode_data=True)` | |
Compiles all the rollout data into a Hugging Face dataset. | |
Similar logic is implemented when datasets are pushed to hub (see: `push_to_hub`). | |
""" | |
ep_dicts = [] | |
total_frames = 0 | |
for ep_ix in range(rollout_data["action"].shape[0]): | |
# + 2 to include the first done frame and the last observation frame. | |
num_frames = done_indices[ep_ix].item() + 2 | |
total_frames += num_frames | |
# Here we do `num_frames - 1` as we don't want to include the last observation frame just yet. | |
ep_dict = { | |
"action": rollout_data["action"][ep_ix, : num_frames - 1], | |
"episode_index": torch.tensor([start_episode_index + ep_ix] * (num_frames - 1)), | |
"frame_index": torch.arange(0, num_frames - 1, 1), | |
"timestamp": torch.arange(0, num_frames - 1, 1) / fps, | |
"next.done": rollout_data["done"][ep_ix, : num_frames - 1], | |
"next.success": rollout_data["success"][ep_ix, : num_frames - 1], | |
"next.reward": rollout_data["reward"][ep_ix, : num_frames - 1].type(torch.float32), | |
} | |
# For the last observation frame, all other keys will just be copy padded. | |
for k in ep_dict: | |
ep_dict[k] = torch.cat([ep_dict[k], ep_dict[k][-1:]]) | |
for key in rollout_data["observation"]: | |
ep_dict[key] = rollout_data["observation"][key][ep_ix, :num_frames] | |
ep_dicts.append(ep_dict) | |
data_dict = {} | |
for key in ep_dicts[0]: | |
data_dict[key] = torch.cat([x[key] for x in ep_dicts]) | |
data_dict["index"] = torch.arange(start_data_index, start_data_index + total_frames, 1) | |
return data_dict | |
def eval_main(cfg: EvalPipelineConfig): | |
logging.info(pformat(asdict(cfg))) | |
# Check device is available | |
device = get_safe_torch_device(cfg.policy.device, log=True) | |
torch.backends.cudnn.benchmark = True | |
torch.backends.cuda.matmul.allow_tf32 = True | |
set_seed(cfg.seed) | |
logging.info(colored("Output dir:", "yellow", attrs=["bold"]) + f" {cfg.output_dir}") | |
logging.info("Making environment.") | |
env = make_env(cfg.env, n_envs=cfg.eval.batch_size, use_async_envs=cfg.eval.use_async_envs) | |
logging.info("Making policy.") | |
policy = make_policy( | |
cfg=cfg.policy, | |
env_cfg=cfg.env, | |
) | |
policy.eval() | |
with torch.no_grad(), torch.autocast(device_type=device.type) if cfg.policy.use_amp else nullcontext(): | |
info = eval_policy( | |
env, | |
policy, | |
cfg.eval.n_episodes, | |
max_episodes_rendered=10, | |
videos_dir=Path(cfg.output_dir) / "videos", | |
start_seed=cfg.seed, | |
) | |
print(info["aggregated"]) | |
# Save info | |
with open(Path(cfg.output_dir) / "eval_info.json", "w") as f: | |
json.dump(info, f, indent=2) | |
env.close() | |
logging.info("End of eval") | |
if __name__ == "__main__": | |
init_logging() | |
eval_main() | |