Load prediction file and GT file to calculate TVR metrics:
- recall at top K (R@K), for a specified IoU, where K in [1, 5, 10, 100], IoU in [0.5, 0.7]
import json
import numpy as np
from tqdm import tqdm
from collections import OrderedDict, defaultdict
def load_json(filename):
with open(filename, "r") as f:
return json.load(f)
def load_jsonl(filename):
with open(filename, "r") as f:
return [json.loads(l.strip("\n")) for l in f.readlines()]
def pad_sequences_1d_np(sequences, dtype=np.float32):
""" Pad a single-nested list or a sequence of n-d array (torch.tensor or np.ndarray)
into a (n+1)-d array, only allow the first dim has variable lengths.
sequences: list(n-d tensor or list)
dtype: np.dtype or torch.dtype
padded_seqs: ((n+1)-d tensor) padded with zeros
mask: (2d tensor) of the same shape as the first two dims of padded_seqs,
1 indicate valid, 0 otherwise
>>> test_data_list = [[1,2,3], [1,2], [3,4,7,9]]
>>> pad_sequences_1d(test_data_list, dtype=np.float32)
>>> test_data_3d = [np.random.randn(2,3,4), np.random.randn(4,3,4), np.random.randn(1,3,4)]
>>> pad_sequences_1d(test_data_3d, dtype=np.float32)
if isinstance(sequences[0], list):
sequences = [np.asarray(s, dtype=dtype) for s in sequences]
extra_dims = sequences[0].shape[1:] # the extra dims should be the same for all elements
lengths = [len(seq) for seq in sequences]
assert "numpy" in str(dtype), "dtype and input type does not match"
padded_seqs = np.zeros((len(sequences), max(lengths)) + extra_dims, dtype=dtype)
mask = np.zeros((len(sequences), max(lengths)), dtype=np.float32)
for idx, seq in enumerate(sequences):
end = lengths[idx]
padded_seqs[idx, :end] = seq
mask[idx, :end] = 1
return padded_seqs, mask
def compute_temporal_iou_batch(preds, gt):
""" compute intersection-over-union along temporal axis
This function is significantly faster than `compute_temporal_iou`,
the result should be the same.
preds: np.ndarray, (N, 2), [st (float), ed (float)] * N
gt: [st (float), ed (float)]
iou (float): np.ndarray, (N, )
for np.divide with zeros, see
intersection = np.maximum(0, np.minimum(preds[:, 1], gt[1]) - np.maximum(preds[:, 0], gt[0]))
union = np.maximum(preds[:, 1], gt[1]) - np.minimum(preds[:, 0], gt[0]) # not the correct union though
return np.divide(intersection, union, out=np.zeros_like(intersection), where=union != 0)
def get_rounded_percentage(float_number, n_floats=2):
return round(float_number * 100, n_floats)
TASK_TYPES = OrderedDict([
("VCMR", "Video Corpus Moment Retrieval"),
("SVMR", "Single Video Moment Retrieval"),
("VR", "regular Video Retrieval")
def eval_by_task_type(moment_predictions, video2idx, ground_truth,
iou_thds=(0.5, 0.7), recall_topks=(1, 5, 10, 100),
task_type="SVMR", max_pred_per_query=100, match_number=True, verbose=True, use_desc_type=True):
""" a predicted triplet is positive only if:
1) its vid_name matches the GT vid_name
2) IoU between its timestamp and GT timestamp is higher than the given threshold
moment_predictions w.r.t. different task_type:
For each query, evaluated on top max_pred_per_query [vid_name, st, ed] triplets. (score entry ignored)
VCMR: vid_name might be repeating.
SVMR: vid_name is fixed to be the GT vid_name.
VR: vid_name is not repeating, st and ed will not be used.
video2idx: {vid_name (str): index (int), ...}
moment_predictions: list(dict), each dict is {
"desc": str,
"query_id": int,
"predictions": [vid_name_idx (int), st (float), ed (float), score (float)] * n_pred,
sorted predictions, n_pred could be different for all dicts. For each prediction,
only the first 3 elements [vid_name (str), st (float), ed (float),] are used,
any other following elements are ignored. We leave score here for record.
ground_truth: list(dict), each dict is {
"desc": str,
"query_id": int,
"type": str, one of [v, t, vt]
"vid_name": str
"ts": [st (float), ed (float)], or list([st (float), ed (float)]), len == 4.
iou_thds: temporal IoU thresholds
recall_topks: recall at different top k
task_type: str, could be: ["VCMR", "SVMR", "VR"], see TASK_TYPES for definition.
max_pred_per_query: int, only top max_pred_per_query predictions for each query are used.
match_number: bool, must set to True if when do evaluation, False is only used for debug.
use_desc_type: only TVR has desc type
assert task_type in TASK_TYPES, "task_type must be one of {}".format(list(TASK_TYPES.keys()))
if verbose:
print("Running evaluation with task_type {}, n results {}; n gt {}"
.format(task_type, len(moment_predictions), len(ground_truth)))
predictions_by_query_id = {e["query_id"]: e for e in moment_predictions}
gt_by_query_id = {e["query_id"]: e for e in ground_truth}
desc_type2idx = {"v": 0, "t": 1, "vt": 2}
desc_types = [] # n_desc
if match_number:
assert set(gt_by_query_id.keys()) == set(predictions_by_query_id.keys()), \
"query_ids in predictions and ground_truth must match"
# assert len(set([len(e["predictions"]) for e in predictions_by_query_id.values()])) == 1, \
# "all queries must have the same number of predictions"
pred_info_matrix_collection = []
for k, gt_item in tqdm(gt_by_query_id.items(), desc="Loop over moments", leave=False):
if not match_number and k not in predictions_by_query_id:
pred_info_matrix = np.array(
[e[:3] for e in predictions_by_query_id[k]["predictions"]][:max_pred_per_query],
dtype=np.float32) # (n_pred, 3)
if use_desc_type:
vid_name_matched_pred = pred_info_matrix[:, 0] == video2idx[gt_item["vid_name"]] # bool, (n_pred, )
pred_info_matrix = np.concatenate([pred_info_matrix, vid_name_matched_pred[:, None]], axis=1) # (n_pred, 4)
# add 1 + len(iou_thds) columns, iou_scores, iou_corrects for each iou_thd.
iou_thd_corrects_columns = []
if len(gt_item["ts"]) >= 4: # didemo, fro all 3 splits, at least 4 ts for each, < 0.5% has more than 4.
least_n_overlap = 2 # True if overlapped with at least least_n_overlap GT ts.
iou_corrects_dict = defaultdict(list)
for single_gt_ts in gt_item["ts"]:
single_gt_ts = np.array(single_gt_ts, dtype=np.float32) # (2, )
# iou scores of the predictions that have wrong vid_name are set to 0.
iou_scores = compute_temporal_iou_batch(pred_info_matrix[:, 1:3], single_gt_ts) * vid_name_matched_pred
for iou_thd in iou_thds:
iou_corrects_dict[iou_thd].append(iou_scores >= iou_thd)
for iou_thd in iou_thds:
iou_corrects = sum(iou_corrects_dict[iou_thd]) >= least_n_overlap # bool, (n_pred, )
iou_thd_corrects_columns.append(iou_corrects[:, None])
else: # should be 2, len([st, ed]) == 2
single_gt_ts = np.array(gt_item["ts"], dtype=np.float32) # (2, )
# iou scores of the predictions that have wrong vid_name are set to 0.
iou_scores = compute_temporal_iou_batch(pred_info_matrix[:, 1:3], single_gt_ts) * vid_name_matched_pred
for iou_thd in iou_thds:
iou_corrects = iou_scores >= iou_thd # bool, (n_pred, )
iou_thd_corrects_columns.append(iou_corrects[:, None])
pred_info_matrix = np.concatenate([pred_info_matrix, ] + iou_thd_corrects_columns, axis=1) # (n_pred, 6)
# column header [vid_name_idx (int), st (float), ed (float), is_vid_name_match (bool),
# iou_scores>=iou_thd0 (bool), iou_scores>=iou_thd1 (bool)]
pred_info_matrix_collection = pad_sequences_1d_np(pred_info_matrix_collection)[0] # (n_desc, n_pred, 6)
if use_desc_type:
desc_types = np.array(desc_types) # (n_desc)
# results wrapper
metrics = OrderedDict()
metrics_by_type = OrderedDict()
iou_c_offset = 4 # iou_corrects column index starts here
if task_type == "VCMR":
for iou_idx, iou_thd in enumerate(iou_thds):
iou_corrects = pred_info_matrix_collection[:, :, iou_c_offset + iou_idx].astype(bool) # (n_desc, n_pred)
# 1) there might be more than one positive clip, so use `>= 1`
for k in recall_topks:
metrics["{}-r{}".format(iou_thd, k)] = \
get_rounded_percentage(np.mean(np.sum(iou_corrects[:, :k], axis=1) >= 1))
if use_desc_type:
for desc_type in desc_type2idx:
type_corrects = desc_types == desc_type2idx[desc_type] # (n_desc)
n_desc_in_type = np.sum(type_corrects) # (n_desc)
for iou_idx, iou_thd in enumerate(iou_thds):
# (n_desc, n_pred)
iou_corrects = pred_info_matrix_collection[:, :, iou_c_offset + iou_idx].astype(bool)
for k in recall_topks:
metrics_by_type["{}-{}-r{}".format(desc_type, iou_thd, k)] = get_rounded_percentage(
1.0 * np.sum(np.logical_and(np.sum(iou_corrects[:, :k], axis=1) >= 1, type_corrects))
/ n_desc_in_type
elif task_type == "SVMR":
vid_name_matched = pred_info_matrix_collection[:, :, 3].astype(bool) # (n_desc, n_pred)
n_desc = len(vid_name_matched)
for iou_idx, iou_thd in enumerate(iou_thds):
iou_corrects = pred_info_matrix_collection[:, :, iou_c_offset + iou_idx].astype(bool) # (n_desc, n_pred)
# 1) there might be more than one positive clip, so use `>= 1`
for k in recall_topks:
metrics["{}-r{}".format(iou_thd, k)] = get_rounded_percentage(np.mean(
[np.sum(iou_corrects[idx][vid_name_matched[idx]][:k]) >= 1 for idx in range(n_desc)]
if use_desc_type:
for desc_type in desc_type2idx:
type_corrects = desc_types == desc_type2idx[desc_type] # (n_desc)
n_desc_in_type = np.sum(type_corrects) # (n_desc)
for iou_idx, iou_thd in enumerate(iou_thds):
# (n_desc, n_pred)
iou_corrects = pred_info_matrix_collection[:, :, iou_c_offset + iou_idx].astype(bool)
# 1) there might be more than one positive clip, so use `>= 1`
for k in recall_topks:
metrics_by_type["{}-{}-r{}".format(desc_type, iou_thd, k)] = get_rounded_percentage(
1.0 * np.sum([np.sum(iou_corrects[idx][vid_name_matched[idx]][:k]) >= 1 and type_corrects[idx]
for idx in range(n_desc)])
/ n_desc_in_type)
elif task_type == "VR":
vid_name_matched = pred_info_matrix_collection[:, :, 3].astype(bool) # (n_desc, n_pred)
for k in recall_topks:
metrics["r{}".format(k)] = \
get_rounded_percentage(np.mean(np.sum(vid_name_matched[:, :k], axis=1) >= 1))
if use_desc_type:
for desc_type in desc_type2idx:
type_corrects = desc_types == desc_type2idx[desc_type] # (n_desc)
n_desc_in_type = np.sum(type_corrects) # (n_desc)
for k in recall_topks:
metrics_by_type["{}-r{}".format(desc_type, k)] = get_rounded_percentage(
1.0 * np.sum(np.logical_and(np.sum(vid_name_matched[:, :k], axis=1) >= 1, type_corrects))
/ n_desc_in_type)
raise ValueError("task_type wrong.")
if use_desc_type:
metrics_by_type["desc_type_ratio"] = "v {} t {} vt {}"\
.format(*[get_rounded_percentage(1.0 * np.sum(desc_types == desc_type2idx[k]) / len(desc_types))
for k in ["v", "t", "vt"]])
return metrics, metrics_by_type
def eval_retrieval(submission, ground_truth, iou_thds=(0.5, 0.7), verbose=True, match_number=True, use_desc_type=True):
video2idx = submission["video2idx"]
submitted_task_types = [k for k in TASK_TYPES if k in submission]
if verbose:
print("Evaluating for task {}".format(submitted_task_types))
eval_metrics = OrderedDict()
metrics_raw_dict = {}
for task_type in submitted_task_types:
metrics, metrics_by_type = eval_by_task_type(
submission[task_type], video2idx, ground_truth,
iou_thds=iou_thds, recall_topks=(1, 5, 10, 100),
task_type=task_type, max_pred_per_query=100,
match_number=match_number, verbose=verbose, use_desc_type=use_desc_type)
metrics_raw_dict[task_type] = metrics
metrics_raw_dict[task_type+"_by_type"] = metrics_by_type
for task_type in submitted_task_types:
eval_metrics[task_type] = metrics_raw_dict[task_type]
if use_desc_type:
for task_type in submitted_task_types:
eval_metrics[task_type+"_by_type"] = metrics_raw_dict[task_type+"_by_type"]
return eval_metrics
def eval_main():
import argparse
parser = argparse.ArgumentParser(description="TVR Evaluation Script")
parser.add_argument("--submission_path", type=str, help="path to generated prediction file")
parser.add_argument("--gt_path", type=str, help="path to GT file")
parser.add_argument("--save_path", type=str, help="path to save the results")
parser.add_argument("--not_verbose", action="store_true")
args = parser.parse_args()
verbose = not args.not_verbose
submission = load_json(args.submission_path)
gt = load_jsonl(args.gt_path)
results = eval_retrieval(submission, gt, iou_thds=(0.5, 0.7), verbose=verbose)
if verbose:
print(json.dumps(results, indent=4))
with open(args.save_path, "w") as f:
f.write(json.dumps(results, indent=4))
if __name__ == '__main__':