|
import math |
|
|
|
import pandas as pd |
|
import numpy as np |
|
from itertools import product |
|
import shapely |
|
from bokeh.models import Span, Label, ColumnDataSource, Whisker |
|
from bokeh.plotting import figure, show |
|
from shapely.geometry import Polygon |
|
import matplotlib as mpl |
|
import matplotlib.pyplot as plt |
|
import seaborn |
|
|
|
task_patterns = { |
|
"CB": [0, 3], |
|
"RTE": [0, 3], |
|
"BoolQ": [0, 3, 5], |
|
"MNLI": [0, 3], |
|
"COPA": [0, 1], |
|
"WSC": [0, 1, 2], |
|
"WiC": [0, 1], |
|
"MultiRC": [0, 1, 2], |
|
} |
|
task_reps = {"CB": 4, "RTE": 4, "BoolQ": 4, "MNLI": 4, "COPA": 4, "WSC": 4, "WiC": 4, "MultiRC": 4} |
|
task_best_pattern = {"CB": 0, "RTE": 0, "BoolQ": 0, "MNLI": 0, "COPA": 1, "WSC": 0, "WiC": 0, "MultiRC": 1} |
|
task_metric_short = { |
|
"CB": "f1-macro", |
|
"RTE": "acc", |
|
"BoolQ": "acc", |
|
"MNLI": "acc", |
|
"COPA": "acc", |
|
"WSC": "acc", |
|
"WiC": "acc", |
|
"MultiRC": "f1", |
|
} |
|
task_metrics = { |
|
"CB": "F1-macro", |
|
"RTE": "accuracy", |
|
"BoolQ": "accuracy", |
|
"MNLI": "accuracy", |
|
"COPA": "accuracy", |
|
"WSC": "accuracy", |
|
"WiC": "accuracy", |
|
"MultiRC": "F1", |
|
} |
|
task_neutral = { |
|
"CB": True, |
|
"RTE": True, |
|
"BoolQ": True, |
|
"MNLI": True, |
|
"COPA": False, |
|
"WSC": False, |
|
"multirc": True, |
|
"WiC": True, |
|
"MultiRC": True, |
|
} |
|
neutral_tasks = [ |
|
"BoolQ", |
|
"CB", |
|
"MNLI", |
|
"MultiRC", |
|
"RTE", |
|
"WiC", |
|
] |
|
tasks = sorted(task_patterns.keys()) |
|
|
|
pvp_colors = ["goldenrod", "blanchedalmond", "floralwhite"] |
|
ctl_colors = ["crimson", "salmon", "mistyrose"] |
|
clf_colors = ["indigo", "plum", "thistle"] |
|
|
|
|
|
def prompt_boolq(passage, question, pattern): |
|
if pattern == 0: |
|
return f"""<span style="color: #0c593d">{passage}</span> <span style="color: #910713"><b>Based on the previous passage,</b></span> <span style="color: #031154">{question}</span> <span style="color: #ba9004"><b>[YES/NO]</b></span>""" |
|
if pattern == 1: |
|
return f"""<span style="color: #0c593d">{passage}</span><span style="color: #910713"><b> Question:</b></span> <span style="color: #031154">{question}</span><span style="color: #910713"><b> Answer: </b></span><span style="color: #ba9004"><b>[YES/NO]</b></span>""" |
|
if pattern == 2: |
|
return f"""<span style="color: #910713"><b>Based on the following passage,</b></span> <span style="color: #031154">{question}</span><span style="color: #ba9004"><b> [YES/NO]</b></span> <span style="color: #0c593d">{passage}</span>""" |
|
|
|
|
|
def advantage_text(advantage): |
|
model_type = ( |
|
"""<span style="color: #4B0082">分类头法</span>""" |
|
if advantage < 0 |
|
else """<span style="color: #daa520">提示法</span>""" |
|
) |
|
return f"""<b>{model_type}</b> 优势: <b>{abs(advantage):.2f}</b> 条样本""" |
|
|
|
|
|
def average_advantage_text(advantage): |
|
model_type = ( |
|
"""<span style="color: #4B0082">分类头法</span>""" |
|
if advantage < 0 |
|
else """<span style="color: #daa520">提示法</span>""" |
|
) |
|
return f"""<b>Average {model_type}</b> 优势: <b>{abs(advantage):.2f}</b> 条样本""" |
|
|
|
|
|
def naming_convention(task, seed, pvp_index=None, neutral=False): |
|
method = f"PVP {pvp_index}" if pvp_index is not None else "CLF" |
|
model = "roberta" |
|
if neutral: |
|
verbalizer = "neutral" |
|
else: |
|
verbalizer = None |
|
return ( |
|
f"{method} {model}" |
|
+ (f" {verbalizer} verbalizer" if verbalizer is not None else "") |
|
+ f" seed {seed} - test-{task_metric_short[task]}-all-p" |
|
) |
|
|
|
|
|
def get_data(task): |
|
url = f"https://raw.githubusercontent.com/TevenLeScao/pet/master/exported_results/{task.lower()}/wandb_export.csv" |
|
df = pd.read_csv(url) |
|
training_points = df["training_points"] |
|
|
|
head_performances = np.transpose(np.array([df[naming_convention(task, i)] for i in range(task_reps[task])])) |
|
pattern_performances = {} |
|
for pattern in task_patterns[task]: |
|
pattern_performances[pattern] = { |
|
"normal": np.transpose(np.array([df[naming_convention(task, i, pattern)] for i in range(task_reps[task])])) |
|
} |
|
if task_neutral[task]: |
|
pattern_performances[pattern]["neutral"] = np.transpose( |
|
np.array([df[naming_convention(task, i, pattern, True)] for i in range(task_reps[task])]) |
|
) |
|
|
|
return training_points, head_performances, pattern_performances |
|
|
|
|
|
def reduct(performances, reduction="accmax", final_pattern=0, verbalizer="normal", exclude=None): |
|
|
|
reducted = None |
|
|
|
if isinstance(performances, dict): |
|
performances = performances[final_pattern][verbalizer] |
|
if exclude is not None: |
|
performances = np.delete(performances, exclude, axis=1) |
|
|
|
if reduction == "avg": |
|
|
|
reducted = np.nanmean(performances, axis=1) |
|
|
|
if reduction == "std": |
|
|
|
reducted = np.nanstd(performances, axis=1) |
|
|
|
if reduction == "max": |
|
|
|
reducted = np.nanmax(performances, axis=1) |
|
|
|
if reduction == "accmax": |
|
|
|
max_performance = np.nanmax(performances, axis=1) |
|
reducted = np.maximum.accumulate(max_performance) |
|
|
|
assert reducted is not None, "unrecognized reduction method" |
|
return reducted |
|
|
|
|
|
def find_surrounding_points(perf, clf_results, pvp_results): |
|
for i, clf_result in enumerate(clf_results): |
|
if i - 1 > 0 and clf_result == clf_results[i - 1]: |
|
continue |
|
if clf_result > perf: |
|
if i == 0: |
|
raise ValueError(f"value {perf} too small") |
|
else: |
|
break |
|
for j, pvp_result in enumerate(pvp_results): |
|
if j - 1 > 0 and pvp_result == pvp_results[j - 1]: |
|
continue |
|
if pvp_result > perf: |
|
if j == 0: |
|
raise ValueError(f"value {perf} too small") |
|
else: |
|
break |
|
return i - 1, j - 1 |
|
|
|
|
|
def interpolate(perf, x1, x2, y1, y2): |
|
return x1 + (perf - y1) * (x2 - x1) / (y2 - y1) |
|
|
|
|
|
def interpolate_from_idx(perf, idx, results, training_points): |
|
return interpolate(perf, training_points[idx], training_points[idx + 1], results[idx], results[idx + 1]) |
|
|
|
|
|
def interpolate_from_perf(perf, overlapping_range, training_points, clf_results, pvp_results): |
|
if not overlapping_range[0] <= perf <= overlapping_range[1]: |
|
raise ValueError(f"perf {perf} not in acceptable bounds {overlapping_range}") |
|
clf_idx, pvp_idx = find_surrounding_points(perf, clf_results, pvp_results) |
|
return interpolate_from_idx(perf, clf_idx, clf_results, training_points), interpolate_from_idx( |
|
perf, pvp_idx, pvp_results, training_points |
|
) |
|
|
|
|
|
def data_difference(perf, overlapping_range, training_points, clf_results, pvp_results): |
|
x1, x2 = interpolate_from_perf(perf, overlapping_range, training_points, clf_results, pvp_results) |
|
return x1 - x2 |
|
|
|
|
|
def calculate_overlap(clf_results, pvp_results, full_range=False): |
|
if full_range: |
|
return (min(min(clf_results), min(pvp_results)), max(max(clf_results), max(pvp_results))) |
|
else: |
|
return (max(min(clf_results), min(pvp_results)), min(max(clf_results), max(pvp_results))) |
|
|
|
|
|
def calculate_range(overlapping_range, number_of_points): |
|
integral_range = ( |
|
overlapping_range[0] + i / (number_of_points + 1) * (overlapping_range[1] - overlapping_range[0]) |
|
for i in range(1, number_of_points + 1) |
|
) |
|
return integral_range |
|
|
|
|
|
def calculate_differences(integral_range, overlapping_range, training_points, clf_results, pvp_results): |
|
differences = [ |
|
data_difference(y, overlapping_range, training_points, clf_results, pvp_results) for y in integral_range |
|
] |
|
return differences |
|
|
|
|
|
def calculate_offset(training_points, clf_results, pvp_results, number_of_points=1000): |
|
overlapping_range = calculate_overlap(clf_results, pvp_results) |
|
integral_range = calculate_range(overlapping_range, number_of_points) |
|
differences = calculate_differences(integral_range, overlapping_range, training_points, clf_results, pvp_results) |
|
offset = sum(differences) / number_of_points |
|
return offset |
|
|
|
|
|
def intersection_with_range(training_points, results, band): |
|
result_polygon = Polygon( |
|
[(training_points[i], results[i]) for i in range(len(training_points))] |
|
+ [(training_points[-1], 0), (training_points[0], 0)] |
|
) |
|
return result_polygon.intersection(band) |
|
|
|
|
|
def fill_polygon(fig, polygon, color, label=None, alpha=1.0): |
|
if polygon.is_empty or isinstance(polygon, shapely.geometry.LineString): |
|
return |
|
if isinstance(polygon, Polygon): |
|
xs, ys = polygon.exterior.xy |
|
fig.patch(xs, ys, color=color, alpha=alpha) |
|
else: |
|
for geom in polygon.geoms: |
|
if isinstance(geom, shapely.geometry.LineString): |
|
continue |
|
xs, ys = geom.exterior.xy |
|
fig.patch(xs, ys, color=color, alpha=alpha) |
|
label = None |
|
|
|
|
|
label_order = { |
|
"head run": 0, |
|
"head advantage": 1, |
|
"control run": 2, |
|
"optimization advantage": 3, |
|
"prompting run": 4, |
|
"semantics advantage": 5, |
|
"region of comparison": 6, |
|
} |
|
|
|
|
|
def metric_tap( |
|
event, overlapping_range, training_points, clf_results, pvp_results, advantage_box, advantage_plot |
|
): |
|
_, metric_value = event.x, event.y |
|
try: |
|
advantage_value = data_difference(metric_value, overlapping_range, training_points, clf_results, pvp_results) |
|
advantage_box.text = advantage_text(advantage_value) |
|
if not isinstance(advantage_plot.renderers[-1], Span): |
|
metric_line = Span( |
|
location=metric_value, |
|
line_alpha=0.7, |
|
dimension="width", |
|
line_color=clf_colors[0] if advantage_value < 0 else pvp_colors[0], |
|
line_dash="dashed", |
|
line_width=1, |
|
) |
|
advantage_plot.renderers.extend([metric_line]) |
|
else: |
|
advantage_plot.renderers[-1].location = metric_value |
|
advantage_plot.renderers[-1].line_color = clf_colors[0] if advantage_value < 0 else pvp_colors[0] |
|
|
|
except ValueError: |
|
pass |
|
|
|
|
|
def plot_polygons_bokeh(task, training_points, clf_results, pvp_results, clf_colors, pvp_colors, x_log_scale=False): |
|
overlapping_range = calculate_overlap(clf_results, pvp_results, False) |
|
full_range = calculate_overlap(clf_results, pvp_results, True) |
|
middle_y = (full_range[0] + full_range[1]) / 2 |
|
|
|
fig = figure(plot_height=400, plot_width=800, max_height=400, max_width=800, |
|
x_axis_type="log" if x_log_scale else "linear", title="分类头法及提示法在各规模的训练子集上的性能") |
|
|
|
fig.circle(training_points, clf_results, color=clf_colors[0], legend="分类头法") |
|
fig.circle(training_points, pvp_results, color=pvp_colors[0], legend="提示法") |
|
fig.line(training_points, clf_results, color=clf_colors[0], alpha=1) |
|
fig.line(training_points, pvp_results, color=pvp_colors[0], alpha=1) |
|
fig.xaxis.axis_label = "训练子集规模" |
|
fig.yaxis.axis_label = task_metrics[task] |
|
fig.patch( |
|
[training_points[0], training_points[0], training_points[-1], training_points[-1]], |
|
[overlapping_range[0], overlapping_range[1], overlapping_range[1], overlapping_range[0]], |
|
color="black", |
|
fill_alpha=0, |
|
line_width=0, |
|
legend="比较区域", |
|
hatch_alpha=0.14, |
|
hatch_scale=40, |
|
hatch_pattern="/", |
|
) |
|
|
|
band = Polygon( |
|
[ |
|
(training_points[0], overlapping_range[0]), |
|
(training_points[0], overlapping_range[1]), |
|
(training_points[-1], overlapping_range[1]), |
|
(training_points[-1], overlapping_range[0]), |
|
] |
|
) |
|
full_band = Polygon( |
|
[ |
|
(training_points[0], full_range[0]), |
|
(training_points[0], full_range[1]), |
|
(training_points[-1], full_range[1]), |
|
(training_points[-1], full_range[0]), |
|
] |
|
) |
|
clf_polygon = intersection_with_range(training_points, clf_results, band) |
|
pvp_polygon = intersection_with_range(training_points, pvp_results, band) |
|
full_clf_polygon = intersection_with_range(training_points, clf_results, full_band) |
|
full_pvp_polygon = intersection_with_range(training_points, pvp_results, full_band) |
|
|
|
clf_inside_area = clf_polygon.difference(pvp_polygon) |
|
pvp_inside_area = pvp_polygon.difference(clf_polygon) |
|
clf_outside_area = (full_clf_polygon.difference(full_pvp_polygon)).difference(clf_inside_area) |
|
pvp_outside_area = (full_pvp_polygon.difference(full_clf_polygon)).difference(pvp_inside_area) |
|
|
|
fill_polygon(fig, clf_outside_area, clf_colors[1], alpha=0.13) |
|
fill_polygon(fig, pvp_outside_area, pvp_colors[1], alpha=0.18) |
|
fill_polygon( |
|
fig, clf_inside_area, clf_colors[1], alpha=0.4, label="head advantage" if task == "WiC" else None |
|
) |
|
fill_polygon(fig, pvp_inside_area, pvp_colors[1], alpha=0.4, label="prompting advantage") |
|
|
|
fig.line([training_points[0], training_points[-1]], [overlapping_range[0], overlapping_range[0]], color="dimgrey") |
|
fig.line([training_points[0], training_points[-1]], [overlapping_range[1], overlapping_range[1]], color="dimgrey") |
|
|
|
vline = Span( |
|
location=training_points[-1], dimension="height", line_color="black", line_width=2.5, line_dash="dashed" |
|
) |
|
end_label = Label( |
|
x=training_points[-1], y=middle_y, text="数据集总大小", angle=90, angle_units="deg", text_align="center" |
|
) |
|
fig.renderers.extend([vline, end_label]) |
|
|
|
fig.legend.location = "bottom_right" |
|
|
|
return fig |
|
|
|
|
|
def plot_three_polygons_bokeh( |
|
task, training_points, clf_results, pvp_results, ctl_results, clf_colors, pvp_colors, ctl_colors, |
|
x_log_scale=False |
|
): |
|
overlapping_range = calculate_overlap(clf_results, pvp_results, False) |
|
full_range = calculate_overlap(clf_results, pvp_results, True) |
|
middle_y = (full_range[0] + full_range[1]) / 2 |
|
|
|
fig = figure(plot_height=400, plot_width=800, max_height=400, max_width=800, |
|
x_axis_type="log" if x_log_scale else "linear", title="分类头法、提示法以及空言语器提示法在各规模的训练子集上的性能") |
|
fig.xaxis.axis_label = "训练子集规模" |
|
fig.yaxis.axis_label = task_metrics[task] |
|
fig.circle(training_points, clf_results, color=clf_colors[0], legend="分类头法") |
|
fig.circle(training_points, pvp_results, color=pvp_colors[0], legend="提示法") |
|
fig.circle(training_points, ctl_results, color=ctl_colors[0], legend="空言语器提示法") |
|
fig.line(training_points, clf_results, color=clf_colors[0], alpha=1) |
|
fig.line(training_points, pvp_results, color=pvp_colors[0], alpha=1) |
|
fig.line(training_points, ctl_results, color=ctl_colors[0], alpha=1) |
|
|
|
fig.patch( |
|
[training_points[0], training_points[0], training_points[-1], training_points[-1]], |
|
[overlapping_range[0], overlapping_range[1], overlapping_range[1], overlapping_range[0]], |
|
color="black", |
|
fill_alpha=0, |
|
line_width=0, |
|
legend="比较区域", |
|
hatch_alpha=0.14, |
|
hatch_scale=40, |
|
hatch_pattern="/", |
|
) |
|
|
|
band = Polygon( |
|
[ |
|
(training_points[0], overlapping_range[0]), |
|
(training_points[0], overlapping_range[1]), |
|
(training_points[-1], overlapping_range[1]), |
|
(training_points[-1], overlapping_range[0]), |
|
] |
|
) |
|
full_band = Polygon( |
|
[ |
|
(training_points[0], full_range[0]), |
|
(training_points[0], full_range[1]), |
|
(training_points[-1], full_range[1]), |
|
(training_points[-1], full_range[0]), |
|
] |
|
) |
|
|
|
clf_polygon = intersection_with_range(training_points, clf_results, band) |
|
pvp_polygon = intersection_with_range(training_points, pvp_results, band) |
|
ctl_polygon = intersection_with_range(training_points, ctl_results, band) |
|
|
|
full_clf_polygon = intersection_with_range(training_points, clf_results, full_band) |
|
full_pvp_polygon = intersection_with_range(training_points, pvp_results, full_band) |
|
full_ctl_polygon = intersection_with_range(training_points, ctl_results, full_band) |
|
|
|
clf_inside_area = clf_polygon.difference(ctl_polygon) |
|
pvp_inside_area = pvp_polygon.difference(clf_polygon).difference(ctl_polygon) |
|
ctl_inside_area = ctl_polygon.difference(clf_polygon) |
|
|
|
clf_outside_area = (full_clf_polygon.difference(full_ctl_polygon)).difference(clf_inside_area) |
|
pvp_outside_area = (full_pvp_polygon.difference(full_clf_polygon).difference(ctl_polygon)).difference( |
|
pvp_inside_area |
|
) |
|
ctl_outside_area = (full_ctl_polygon.difference(full_clf_polygon)).difference(pvp_inside_area) |
|
|
|
fill_polygon( |
|
fig, clf_inside_area, clf_colors[1], alpha=0.4, label="head advantage" if task == "WiC" else None |
|
) |
|
fill_polygon(fig, pvp_inside_area, pvp_colors[1], alpha=0.4, label="prompting advantage") |
|
fill_polygon(fig, ctl_inside_area, ctl_colors[1], alpha=0.4, label="null verbalizer advantage") |
|
fill_polygon(fig, clf_outside_area, clf_colors[1], alpha=0.13) |
|
fill_polygon(fig, pvp_outside_area, pvp_colors[1], alpha=0.18) |
|
fill_polygon(fig, ctl_outside_area, ctl_colors[1], alpha=0.13) |
|
|
|
fig.line([training_points[0], training_points[-1]], [overlapping_range[0], overlapping_range[0]], color="dimgrey") |
|
fig.line([training_points[0], training_points[-1]], [overlapping_range[1], overlapping_range[1]], color="dimgrey") |
|
|
|
vline = Span( |
|
location=training_points[-1], dimension="height", line_color="black", line_width=2.5, line_dash="dashed" |
|
) |
|
end_label = Label( |
|
x=training_points[-1], y=middle_y, text="数据集总大小", angle=90, angle_units="deg", text_align="center" |
|
) |
|
fig.renderers.extend([vline, end_label]) |
|
|
|
fig.legend.location = "bottom_right" |
|
|
|
return fig |
|
|
|
|
|
def pattern_graph(task): |
|
fig = figure(plot_height=400, plot_width=800, max_height=400, max_width=800, x_axis_type="log", title="Performance over training subset sizes of different prompt patterns") |
|
fig.xaxis.axis_label = "训练子集规模" |
|
fig.yaxis.axis_label = task_metrics[task] |
|
url = f"https://raw.githubusercontent.com/TevenLeScao/pet/master/exported_results/{task.lower()}/wandb_export.csv" |
|
df = pd.read_csv(url) |
|
expanded_training_points = np.array(list(df["training_points"]) * task_reps[task] * len(task_patterns[task])) |
|
data = np.array(df[[naming_convention(task, seed, pattern) for pattern in task_patterns[task] for seed in |
|
range(task_reps[task])]]) |
|
data = data.reshape(-1, task_reps[task]) |
|
col_med = np.nanmean(data, axis=1) |
|
|
|
inds = np.where(np.isnan(data)) |
|
|
|
data[inds] = np.take(col_med, inds[0]) |
|
data = data.reshape(len(df["training_points"]), -1) |
|
data = data.transpose().reshape(-1) |
|
data = data + np.random.normal(0, 0.01, len(data)) |
|
pattern = np.array([i // (len(data) // len(task_patterns[task])) for i in range(len(data))]) |
|
seed = np.array([0, 1, 2, 3] * (len(data) // task_reps[task])) |
|
long_df = pd.DataFrame(np.stack((expanded_training_points, pattern, seed, data), axis=1), |
|
columns=["training_points", "pattern", "seed", task_metrics[task]]) |
|
long_df['pattern'] = long_df['pattern'].astype(int).astype(str) |
|
gby_pattern = long_df.groupby('pattern') |
|
pattern_colors = ["royalblue", "darkturquoise", "darkviolet"] |
|
|
|
for i, (pattern, pattern_df) in enumerate(gby_pattern): |
|
gby_training_points = pattern_df.groupby('training_points') |
|
x = [training_point for training_point, training_point_df in gby_training_points] |
|
y_max = list([np.max(training_point_df[task_metrics[task]]) for training_point, training_point_df in gby_training_points]) |
|
y_min = list([np.min(training_point_df[task_metrics[task]]) for training_point, training_point_df in gby_training_points]) |
|
y = list([np.median(training_point_df[task_metrics[task]]) for training_point, training_point_df in gby_training_points]) |
|
fig.circle(x, y, color=pattern_colors[i], alpha=1, legend=f"模式 {i}") |
|
fig.line(x, y, color=pattern_colors[i], alpha=1) |
|
fig.varea(x=x, y1=y_max, y2=y_min, color=pattern_colors[i], alpha=0.11) |
|
|
|
|
|
|
|
|
|
|
|
|
|
return fig |
|
|
|
|
|
|
|
def cubic_easing(t): |
|
if t < 0.5: |
|
return 4 * t * t * t |
|
p = 2 * t - 2 |
|
return 0.5 * p * p * p + 1 |
|
|
|
|
|
def circ_easing(t): |
|
if t < 0.5: |
|
return 0.5 * (1 - math.sqrt(1 - 4 * (t * t))) |
|
return 0.5 * (math.sqrt(-((2 * t) - 3) * ((2 * t) - 1)) + 1) |
|
|