|
""" |
|
Load prediction file and GT file to calculate TVR metrics: |
|
- recall at top K (R@K), for a specified IoU, where K in [1, 5, 10, 100], IoU in [0.5, 0.7] |
|
""" |
|
import json |
|
import numpy as np |
|
from tqdm import tqdm |
|
from collections import OrderedDict, defaultdict |
|
|
|
|
|
def load_json(filename): |
|
with open(filename, "r") as f: |
|
return json.load(f) |
|
|
|
|
|
def load_jsonl(filename): |
|
with open(filename, "r") as f: |
|
return [json.loads(l.strip("\n")) for l in f.readlines()] |
|
|
|
|
|
def pad_sequences_1d_np(sequences, dtype=np.float32): |
|
|
|
""" Pad a single-nested list or a sequence of n-d array (torch.tensor or np.ndarray) |
|
into a (n+1)-d array, only allow the first dim has variable lengths. |
|
Args: |
|
sequences: list(n-d tensor or list) |
|
dtype: np.dtype or torch.dtype |
|
Returns: |
|
padded_seqs: ((n+1)-d tensor) padded with zeros |
|
mask: (2d tensor) of the same shape as the first two dims of padded_seqs, |
|
1 indicate valid, 0 otherwise |
|
Examples: |
|
>>> test_data_list = [[1,2,3], [1,2], [3,4,7,9]] |
|
>>> pad_sequences_1d(test_data_list, dtype=np.float32) |
|
>>> test_data_3d = [np.random.randn(2,3,4), np.random.randn(4,3,4), np.random.randn(1,3,4)] |
|
>>> pad_sequences_1d(test_data_3d, dtype=np.float32) |
|
""" |
|
if isinstance(sequences[0], list): |
|
sequences = [np.asarray(s, dtype=dtype) for s in sequences] |
|
|
|
extra_dims = sequences[0].shape[1:] |
|
lengths = [len(seq) for seq in sequences] |
|
assert "numpy" in str(dtype), "dtype and input type does not match" |
|
padded_seqs = np.zeros((len(sequences), max(lengths)) + extra_dims, dtype=dtype) |
|
mask = np.zeros((len(sequences), max(lengths)), dtype=np.float32) |
|
|
|
for idx, seq in enumerate(sequences): |
|
end = lengths[idx] |
|
padded_seqs[idx, :end] = seq |
|
mask[idx, :end] = 1 |
|
return padded_seqs, mask |
|
|
|
|
|
def compute_temporal_iou_batch(preds, gt): |
|
""" compute intersection-over-union along temporal axis |
|
This function is significantly faster than `compute_temporal_iou`, |
|
the result should be the same. |
|
Args: |
|
preds: np.ndarray, (N, 2), [st (float), ed (float)] * N |
|
gt: [st (float), ed (float)] |
|
Returns: |
|
iou (float): np.ndarray, (N, ) |
|
|
|
References: |
|
for np.divide with zeros, see https://stackoverflow.com/a/37977222 |
|
""" |
|
intersection = np.maximum(0, np.minimum(preds[:, 1], gt[1]) - np.maximum(preds[:, 0], gt[0])) |
|
union = np.maximum(preds[:, 1], gt[1]) - np.minimum(preds[:, 0], gt[0]) |
|
return np.divide(intersection, union, out=np.zeros_like(intersection), where=union != 0) |
|
|
|
|
|
def get_rounded_percentage(float_number, n_floats=2): |
|
return round(float_number * 100, n_floats) |
|
|
|
|
|
TASK_TYPES = OrderedDict([ |
|
("VCMR", "Video Corpus Moment Retrieval"), |
|
("SVMR", "Single Video Moment Retrieval"), |
|
("VR", "regular Video Retrieval") |
|
]) |
|
|
|
|
|
def eval_by_task_type(moment_predictions, video2idx, ground_truth, |
|
iou_thds=(0.5, 0.7), recall_topks=(1, 5, 10, 100), |
|
task_type="SVMR", max_pred_per_query=100, match_number=True, verbose=True, use_desc_type=True): |
|
""" a predicted triplet is positive only if: |
|
1) its vid_name matches the GT vid_name |
|
2) IoU between its timestamp and GT timestamp is higher than the given threshold |
|
|
|
moment_predictions w.r.t. different task_type: |
|
For each query, evaluated on top max_pred_per_query [vid_name, st, ed] triplets. (score entry ignored) |
|
VCMR: vid_name might be repeating. |
|
SVMR: vid_name is fixed to be the GT vid_name. |
|
VR: vid_name is not repeating, st and ed will not be used. |
|
|
|
Args: |
|
video2idx: {vid_name (str): index (int), ...} |
|
moment_predictions: list(dict), each dict is { |
|
"desc": str, |
|
"query_id": int, |
|
"predictions": [vid_name_idx (int), st (float), ed (float), score (float)] * n_pred, |
|
sorted predictions, n_pred could be different for all dicts. For each prediction, |
|
only the first 3 elements [vid_name (str), st (float), ed (float),] are used, |
|
any other following elements are ignored. We leave score here for record. |
|
} |
|
ground_truth: list(dict), each dict is { |
|
"desc": str, |
|
"query_id": int, |
|
"type": str, one of [v, t, vt] |
|
"vid_name": str |
|
"ts": [st (float), ed (float)], or list([st (float), ed (float)]), len == 4. |
|
... |
|
} |
|
iou_thds: temporal IoU thresholds |
|
recall_topks: recall at different top k |
|
task_type: str, could be: ["VCMR", "SVMR", "VR"], see TASK_TYPES for definition. |
|
max_pred_per_query: int, only top max_pred_per_query predictions for each query are used. |
|
match_number: bool, must set to True if when do evaluation, False is only used for debug. |
|
verbose: |
|
use_desc_type: only TVR has desc type |
|
Returns: |
|
|
|
""" |
|
assert task_type in TASK_TYPES, "task_type must be one of {}".format(list(TASK_TYPES.keys())) |
|
if verbose: |
|
print("Running evaluation with task_type {}, n results {}; n gt {}" |
|
.format(task_type, len(moment_predictions), len(ground_truth))) |
|
|
|
predictions_by_query_id = {e["query_id"]: e for e in moment_predictions} |
|
gt_by_query_id = {e["query_id"]: e for e in ground_truth} |
|
desc_type2idx = {"v": 0, "t": 1, "vt": 2} |
|
desc_types = [] |
|
|
|
if match_number: |
|
assert set(gt_by_query_id.keys()) == set(predictions_by_query_id.keys()), \ |
|
"query_ids in predictions and ground_truth must match" |
|
|
|
|
|
|
|
pred_info_matrix_collection = [] |
|
for k, gt_item in tqdm(gt_by_query_id.items(), desc="Loop over moments", leave=False): |
|
if not match_number and k not in predictions_by_query_id: |
|
continue |
|
pred_info_matrix = np.array( |
|
[e[:3] for e in predictions_by_query_id[k]["predictions"]][:max_pred_per_query], |
|
dtype=np.float32) |
|
if use_desc_type: |
|
desc_types.append(desc_type2idx[gt_item["type"]]) |
|
vid_name_matched_pred = pred_info_matrix[:, 0] == video2idx[gt_item["vid_name"]] |
|
pred_info_matrix = np.concatenate([pred_info_matrix, vid_name_matched_pred[:, None]], axis=1) |
|
|
|
|
|
iou_thd_corrects_columns = [] |
|
if len(gt_item["ts"]) >= 4: |
|
least_n_overlap = 2 |
|
iou_corrects_dict = defaultdict(list) |
|
for single_gt_ts in gt_item["ts"]: |
|
single_gt_ts = np.array(single_gt_ts, dtype=np.float32) |
|
|
|
iou_scores = compute_temporal_iou_batch(pred_info_matrix[:, 1:3], single_gt_ts) * vid_name_matched_pred |
|
for iou_thd in iou_thds: |
|
iou_corrects_dict[iou_thd].append(iou_scores >= iou_thd) |
|
for iou_thd in iou_thds: |
|
iou_corrects = sum(iou_corrects_dict[iou_thd]) >= least_n_overlap |
|
iou_thd_corrects_columns.append(iou_corrects[:, None]) |
|
|
|
else: |
|
single_gt_ts = np.array(gt_item["ts"], dtype=np.float32) |
|
|
|
iou_scores = compute_temporal_iou_batch(pred_info_matrix[:, 1:3], single_gt_ts) * vid_name_matched_pred |
|
|
|
for iou_thd in iou_thds: |
|
iou_corrects = iou_scores >= iou_thd |
|
iou_thd_corrects_columns.append(iou_corrects[:, None]) |
|
|
|
pred_info_matrix = np.concatenate([pred_info_matrix, ] + iou_thd_corrects_columns, axis=1) |
|
pred_info_matrix_collection.append(pred_info_matrix) |
|
|
|
|
|
|
|
pred_info_matrix_collection = pad_sequences_1d_np(pred_info_matrix_collection)[0] |
|
if use_desc_type: |
|
desc_types = np.array(desc_types) |
|
|
|
|
|
metrics = OrderedDict() |
|
metrics_by_type = OrderedDict() |
|
|
|
iou_c_offset = 4 |
|
if task_type == "VCMR": |
|
for iou_idx, iou_thd in enumerate(iou_thds): |
|
iou_corrects = pred_info_matrix_collection[:, :, iou_c_offset + iou_idx].astype(bool) |
|
|
|
for k in recall_topks: |
|
metrics["{}-r{}".format(iou_thd, k)] = \ |
|
get_rounded_percentage(np.mean(np.sum(iou_corrects[:, :k], axis=1) >= 1)) |
|
if use_desc_type: |
|
for desc_type in desc_type2idx: |
|
type_corrects = desc_types == desc_type2idx[desc_type] |
|
n_desc_in_type = np.sum(type_corrects) |
|
for iou_idx, iou_thd in enumerate(iou_thds): |
|
|
|
iou_corrects = pred_info_matrix_collection[:, :, iou_c_offset + iou_idx].astype(bool) |
|
for k in recall_topks: |
|
metrics_by_type["{}-{}-r{}".format(desc_type, iou_thd, k)] = get_rounded_percentage( |
|
1.0 * np.sum(np.logical_and(np.sum(iou_corrects[:, :k], axis=1) >= 1, type_corrects)) |
|
/ n_desc_in_type |
|
) |
|
elif task_type == "SVMR": |
|
vid_name_matched = pred_info_matrix_collection[:, :, 3].astype(bool) |
|
n_desc = len(vid_name_matched) |
|
for iou_idx, iou_thd in enumerate(iou_thds): |
|
iou_corrects = pred_info_matrix_collection[:, :, iou_c_offset + iou_idx].astype(bool) |
|
|
|
for k in recall_topks: |
|
metrics["{}-r{}".format(iou_thd, k)] = get_rounded_percentage(np.mean( |
|
[np.sum(iou_corrects[idx][vid_name_matched[idx]][:k]) >= 1 for idx in range(n_desc)] |
|
)) |
|
if use_desc_type: |
|
for desc_type in desc_type2idx: |
|
type_corrects = desc_types == desc_type2idx[desc_type] |
|
n_desc_in_type = np.sum(type_corrects) |
|
for iou_idx, iou_thd in enumerate(iou_thds): |
|
|
|
iou_corrects = pred_info_matrix_collection[:, :, iou_c_offset + iou_idx].astype(bool) |
|
|
|
for k in recall_topks: |
|
metrics_by_type["{}-{}-r{}".format(desc_type, iou_thd, k)] = get_rounded_percentage( |
|
1.0 * np.sum([np.sum(iou_corrects[idx][vid_name_matched[idx]][:k]) >= 1 and type_corrects[idx] |
|
for idx in range(n_desc)]) |
|
/ n_desc_in_type) |
|
|
|
elif task_type == "VR": |
|
vid_name_matched = pred_info_matrix_collection[:, :, 3].astype(bool) |
|
for k in recall_topks: |
|
metrics["r{}".format(k)] = \ |
|
get_rounded_percentage(np.mean(np.sum(vid_name_matched[:, :k], axis=1) >= 1)) |
|
if use_desc_type: |
|
for desc_type in desc_type2idx: |
|
type_corrects = desc_types == desc_type2idx[desc_type] |
|
n_desc_in_type = np.sum(type_corrects) |
|
for k in recall_topks: |
|
metrics_by_type["{}-r{}".format(desc_type, k)] = get_rounded_percentage( |
|
1.0 * np.sum(np.logical_and(np.sum(vid_name_matched[:, :k], axis=1) >= 1, type_corrects)) |
|
/ n_desc_in_type) |
|
else: |
|
raise ValueError("task_type wrong.") |
|
if use_desc_type: |
|
metrics_by_type["desc_type_ratio"] = "v {} t {} vt {}"\ |
|
.format(*[get_rounded_percentage(1.0 * np.sum(desc_types == desc_type2idx[k]) / len(desc_types)) |
|
for k in ["v", "t", "vt"]]) |
|
return metrics, metrics_by_type |
|
|
|
|
|
def eval_retrieval(submission, ground_truth, iou_thds=(0.5, 0.7), verbose=True, match_number=True, use_desc_type=True): |
|
video2idx = submission["video2idx"] |
|
submitted_task_types = [k for k in TASK_TYPES if k in submission] |
|
if verbose: |
|
print("Evaluating for task {}".format(submitted_task_types)) |
|
eval_metrics = OrderedDict() |
|
metrics_raw_dict = {} |
|
for task_type in submitted_task_types: |
|
metrics, metrics_by_type = eval_by_task_type( |
|
submission[task_type], video2idx, ground_truth, |
|
iou_thds=iou_thds, recall_topks=(1, 5, 10, 100), |
|
task_type=task_type, max_pred_per_query=100, |
|
match_number=match_number, verbose=verbose, use_desc_type=use_desc_type) |
|
metrics_raw_dict[task_type] = metrics |
|
metrics_raw_dict[task_type+"_by_type"] = metrics_by_type |
|
|
|
for task_type in submitted_task_types: |
|
eval_metrics[task_type] = metrics_raw_dict[task_type] |
|
if use_desc_type: |
|
for task_type in submitted_task_types: |
|
eval_metrics[task_type+"_by_type"] = metrics_raw_dict[task_type+"_by_type"] |
|
return eval_metrics |
|
|
|
|
|
def eval_main(): |
|
import argparse |
|
parser = argparse.ArgumentParser(description="TVR Evaluation Script") |
|
parser.add_argument("--submission_path", type=str, help="path to generated prediction file") |
|
parser.add_argument("--gt_path", type=str, help="path to GT file") |
|
parser.add_argument("--save_path", type=str, help="path to save the results") |
|
parser.add_argument("--not_verbose", action="store_true") |
|
args = parser.parse_args() |
|
|
|
verbose = not args.not_verbose |
|
submission = load_json(args.submission_path) |
|
gt = load_jsonl(args.gt_path) |
|
results = eval_retrieval(submission, gt, iou_thds=(0.5, 0.7), verbose=verbose) |
|
if verbose: |
|
print(json.dumps(results, indent=4)) |
|
|
|
with open(args.save_path, "w") as f: |
|
f.write(json.dumps(results, indent=4)) |
|
|
|
|
|
if __name__ == '__main__': |
|
eval_main() |
|
|