import numpy as np import os import torch import cv2 import csv from metric import * import metric import argparse from tqdm import tqdm import json device = 'cuda' eval_metrics = [ "abs_relative_difference", "rmse_linear", "delta1_acc", # "squared_relative_difference", # "rmse_log", # "log10", # "delta2_acc", # "delta3_acc", # "i_rmse", # "silog_rmse", ] def depth2disparity(depth, return_mask=False): if isinstance(depth, torch.Tensor): disparity = torch.zeros_like(depth) elif isinstance(depth, np.ndarray): disparity = np.zeros_like(depth) non_negtive_mask = depth > 0 disparity[non_negtive_mask] = 1.0 / depth[non_negtive_mask] if return_mask: return disparity, non_negtive_mask else: return disparity def resize_images(images, new_size): resized_images = np.empty( (images.shape[0], new_size[0], new_size[1], images.shape[3]) ) for i, image in enumerate(images): if image.shape[2]==1: resized_images[i] = cv2.resize(image, (new_size[1], new_size[0]))[..., None] else: resized_images[i] = cv2.resize(image, (new_size[1], new_size[0])) return resized_images def eval_single( pred_disp_path, gt_disp_path, seq_len=98, domain='depth', method_type="ours", dataset_max_depth="70" ): # load data gt_disp = np.load(gt_disp_path)['disparity'] \ if 'disparity' in np.load(gt_disp_path).files else \ np.load(gt_disp_path)['arr_0'] # (t, 1, h, w) if method_type=="ours": pred_disp = np.load(pred_disp_path)['depth'] # (t, h, w) if method_type=="depth_anything": pred_disp = np.load(pred_disp_path)['disparity'] # (t, h, w) # seq_len if pred_disp.shape[0] < seq_len: seq_len = pred_disp.shape[0] # preprocess pred_disp = resize_images(pred_disp[..., None], (gt_disp.shape[-2], gt_disp.shape[-1])) # (t, h, w) pred_disp = pred_disp[..., 0] # (t, h, w) pred_disp = pred_disp[:seq_len] gt_disp = gt_disp[:seq_len, 0] # (t, h, w) # valid mask valid_mask = np.logical_and( (gt_disp > 1e-3), (gt_disp < dataset_max_depth) ) pred_disp = np.clip(pred_disp, a_min=1e-3, a_max=None) pred_disp_masked = pred_disp[valid_mask].reshape((-1, 1)) # choose evaluation domain DOMAIN = domain if DOMAIN=='disp': # align in real disp, calc in disp gt_disp_maksed = gt_disp[valid_mask].reshape((-1, 1)).astype(np.float64) elif DOMAIN=='depth': # align in disp = 1/depth, calc in depth gt_disp_maksed = 1. / (gt_disp[valid_mask].reshape((-1, 1)).astype(np.float64) + 1e-8) else: pass # calc scale and shift _ones = np.ones_like(pred_disp_masked) A = np.concatenate([pred_disp_masked, _ones], axis=-1) X = np.linalg.lstsq(A, gt_disp_maksed, rcond=None)[0] scale, shift = X # gt = scale * pred + shift # align aligned_pred = scale * pred_disp + shift aligned_pred = np.clip(aligned_pred, a_min=1e-3, a_max=None) # align in real disp, calc in disp if DOMAIN=='disp': pred_depth = aligned_pred gt_depth = gt_disp # align in disp = 1/depth, calc in depth elif DOMAIN=='depth': pred_depth = depth2disparity(aligned_pred) gt_depth = gt_disp else: pass # metric evaluation, clip to dataset min max pred_depth = np.clip( pred_depth, a_min=1e-3, a_max=dataset_max_depth ) # evaluate metric sample_metric = [] metric_funcs = [getattr(metric, _met) for _met in eval_metrics] # Evaluate sample_metric = [] pred_depth_ts = torch.from_numpy(pred_depth).to(device) gt_depth_ts = torch.from_numpy(gt_depth).to(device) valid_mask_ts = torch.from_numpy(valid_mask).to(device) n = valid_mask.sum((-1, -2)) valid_frame = (n > 0) pred_depth_ts = pred_depth_ts[valid_frame] gt_depth_ts = gt_depth_ts[valid_frame] valid_mask_ts = valid_mask_ts[valid_frame] for met_func in metric_funcs: _metric_name = met_func.__name__ _metric = met_func(pred_depth_ts, gt_depth_ts, valid_mask_ts).item() sample_metric.append(_metric) return sample_metric if __name__=="__main__": parser = argparse.ArgumentParser() parser.add_argument( "--seq_len", type=int, default=50, help="Max video frame length for evaluation." ) parser.add_argument( "--domain", type=str, default="depth", choices=["depth", "disp"], help="Domain of metric calculation." ) parser.add_argument( "--method_type", type=str, default="ours", choices=["ours", "depth_anything"], help="Choose the methods." ) parser.add_argument( "--dataset_max_depth", type=int, default=70, help="Dataset max depth clip." ) parser.add_argument( "--pred_disp_root", type=str, default="./demo_output", help="Predicted output directory." ) parser.add_argument( "--gt_disp_root", type=str, required=True, help="GT depth directory." ) parser.add_argument( "--dataset", type=str, required=True, help="Choose the datasets." ) parser.add_argument( "--meta_path", type=str, required=True, help="Path of test dataset csv file." ) args = parser.parse_args() SEQ_LEN = args.seq_len method_type = args.method_type if method_type == "ours": pred_disp_root = os.path.join(args.pred_disp_root, f'results_{args.dataset}') else: # pred_disp_root = args.pred_disp_root pred_disp_root = os.path.join(args.pred_disp_root, f'results_{args.dataset}') domain = args.domain dataset_max_depth = args.dataset_max_depth saved_json_path = os.path.join(args.pred_disp_root, f"results_{args.dataset}.json") meta_path = args.meta_path assert method_type in ["depth_anything", "ours"], "Invalid method type, must be in ['depth_anything', 'ours']" assert domain in ["depth", "disp"], "Invalid domain type, must be in ['depth', 'disp']" with open(meta_path, mode="r", encoding="utf-8") as csvfile: csv_reader = csv.DictReader(csvfile) samples = list(csv_reader) # iterate all cases results_all = [] for i, sample in enumerate(tqdm(samples)): gt_disp_path = os.path.join(args.gt_disp_root, samples[i]['filepath_disparity']) if method_type=="ours": pred_disp_path = os.path.join(pred_disp_root, samples[i]['filepath_disparity']) pred_disp_path = pred_disp_path.replace("disparity", "rgb_left") if method_type=="depth_anything": pred_disp_path = os.path.join(pred_disp_root, samples[i]['filepath_disparity']) pred_disp_path = pred_disp_path.replace("disparity", "rgb_left_depth") results_single = eval_single( pred_disp_path, gt_disp_path, seq_len=SEQ_LEN, domain=domain, method_type=method_type, dataset_max_depth=dataset_max_depth ) results_all.append(results_single) # avarage final_results = np.array(results_all) final_results_mean = np.mean(final_results, axis=0) print("") # save mean to json result_dict = { 'name': method_type } for i, metric in enumerate(eval_metrics): result_dict[metric] = final_results_mean[i] print(f"{metric}: {final_results_mean[i]:04f}") # save each case to json for i, results in enumerate(results_all): result_dict[samples[i]['filepath_disparity']] = results # write json with open(saved_json_path, 'w') as f: json.dump(result_dict, f, indent=4) print("") print(f"Evaluation results json are saved to {saved_json_path}")