Spaces:
Sleeping
Sleeping
# -*- coding:utf-8 -*- | |
from itertools import combinations | |
from collections import defaultdict, Counter | |
import pandas as pd | |
import seaborn as sns | |
import matplotlib as plt | |
plt.use('Agg') | |
import nltk | |
nltk.download('punkt') | |
from nltk.tokenize import sent_tokenize, word_tokenize | |
from n4a_analytics_lib.project import Project | |
from n4a_analytics_lib.metrics_utils import (fleiss_kappa_function, cohen_kappa_function, percentage_agreement_pov) | |
class GlobalStatistics(Project): | |
def __init__(self, zip_project, remote=False): | |
super().__init__(zip_project=zip_project, remote=remote, type="global") | |
self.data = [(src_file, ne_label) for src_file, ann in self.annotations.items() for ne_label in ann['labels']] | |
self.df_base = pd.DataFrame(self.data, columns=["SOURCE_FILE", "LABEL"]) | |
self.df_i = self.df_base.groupby(["LABEL"])["LABEL"].count().reset_index(name="TOTAL") | |
self.df_details = self.df_base.groupby(["SOURCE_FILE", "LABEL"])["LABEL"].count().reset_index(name="TOTAL") | |
self.total_annotations_project = self.df_i['TOTAL'].sum() | |
def create_plot(self, type_data: str) -> sns.barplot: | |
# apply data filter | |
data_tab_filtered = self.df_details.loc[self.df_details['SOURCE_FILE'] == type_data] | |
# create a new plot | |
ax = sns.barplot(x='LABEL', y='TOTAL', data=data_tab_filtered) | |
# add title to plot | |
ax.figure.suptitle(type_data) | |
# add value labels to bars | |
for container in ax.containers: | |
ax.bar_label(container) | |
return ax.figure | |
class IaaStatistics(Project): | |
def __init__(self, zip_project, baseline_text, remote=False): | |
super().__init__(zip_project=zip_project, remote=remote, type="iaa") | |
self.baseline_text = baseline_text.decode('utf-8') | |
self.mentions_per_coder = self.extract_refs(self.annotations, self.annotators, type="mentions") | |
self.labels_per_coder = self.extract_refs(self.annotations, self.annotators, type="labels") | |
self.annotations_per_coders = {coder: dict(zip(ann[1]['mentions'], ann[1]['labels'])) for coder, ann in zip(self.annotators, self.annotations.items())} | |
self.coders_pairs = list(combinations(self.annotations_per_coders, 2)) | |
self.similar_mention = list(dict.fromkeys([l for i,j in self.mentions_per_coder.items() for l in j])) | |
self.labels_schema = list(dict.fromkeys([label for _, labels in self.labels_per_coder.items() for label in labels])) | |
# dataframes and matrix analysis | |
self.base_df = self.build_base_df() | |
self.df_agree = self.base_df [self.base_df[self.annotators].apply(lambda row: self.check_all_equal(row), axis=1)] | |
self.df_disagree = self.base_df[self.base_df[self.annotators].apply(lambda row: self.check_all_not_equal(row), axis=1)] | |
self.coders_matrix = self.base_df.apply(pd.Series.value_counts, 1).fillna(0).astype(int).values | |
# totals | |
self.total_annotations = len(self.base_df) | |
self.total_agree = len(self.df_agree) | |
self.total_disagree = len(self.df_disagree) | |
# access to metrics | |
self.fleiss_kappa = round(fleiss_kappa_function(self.coders_matrix), 2) | |
self.cohen_kappa_pairs = self.compute_pairs_cohen_kappa() | |
self.percent_agree = percentage_agreement_pov(self.total_agree, self.total_annotations) | |
self.percent_disagree = percentage_agreement_pov(self.total_disagree, self.total_annotations) | |
def extract_refs(annotations: dict, annotators: list, type: str) -> dict: | |
return { | |
coder: data for coder, ann in zip( | |
annotators, | |
annotations.items() | |
) for ref, data in ann[1].items() if ref == type | |
} | |
def check_all_equal(iterator: list) -> bool: | |
return len(set(iterator)) <= 1 | |
def check_all_not_equal(iterator: list) -> bool: | |
return len(set(iterator)) > 1 | |
def plot_confusion_matrix(self, width: int, height: int) -> plt.pyplot.subplots: | |
intermediary = defaultdict(Counter) | |
for (src, tgt), count in self.cohen_kappa_pairs.items(): | |
intermediary[src][tgt] = count | |
letters = sorted({key for inner in intermediary.values() for key in inner} | set(intermediary.keys())) | |
confusion_matrix = [[intermediary[src][tgt] for tgt in letters] for src in letters] | |
df_cm = pd.DataFrame(confusion_matrix, letters, letters) | |
mask = df_cm.values == 0 | |
sns.set(font_scale=0.7) # for label size | |
colors = ["#e74c3c", "#f39c12", "#f4d03f", "#5dade2", "#58d68d", "#28b463"] | |
fig, ax = plt.pyplot.subplots(figsize=(width, height)) | |
sns.heatmap(df_cm, cmap=colors, annot=True, mask=mask, annot_kws={"size": 7}, vmin=0, vmax=1, ax=ax) # font size | |
return ax | |
def build_base_df(self) -> pd.DataFrame: | |
df = pd.DataFrame(self.annotations_per_coders, index=self.similar_mention) | |
for ann in self.annotators: | |
df[ann] = 'None' | |
for mention, value in self.annotations_per_coders[ann].items(): | |
df.loc[mention, ann] = value | |
return df | |
def compute_pairs_cohen_kappa(self) -> dict: | |
return { | |
(c1, c2): cohen_kappa_function(self.labels_per_coder[c1], | |
self.labels_per_coder[c2]) for c1, c2 in self.coders_pairs | |
} | |
def count_total_annotations_label(self) -> list: | |
return [ | |
(label, self.base_df.astype(object).eq(label).any(1).sum()) for label in self.labels_schema | |
] | |
def total_agree_disagree_per_label(self) -> list: | |
# t[0] : label | |
# t[1] : total_rows_with_label | |
return [( | |
t[0], | |
t[1], | |
(self.base_df[self.base_df.nunique(1).eq(1)].eq(t[0]).any(1).sum() / t[1]) * 100, | |
((t[1] - self.base_df[self.base_df.nunique(1).eq(1)].eq(t[0]).any(1).sum()) / t[1]) * 100 | |
) | |
for t in self.count_total_annotations_label()] | |
def plot_agreement_pies(self) -> plt.pyplot.subplots: | |
my_labels = 'agree', 'disagree' | |
my_colors = ['#47DBCD', '#F5B14C'] | |
my_explode = (0, 0.1) | |
counter = 0 | |
tasks_to_pie = self.total_agree_disagree_per_label() | |
fig, axes = plt.pyplot.subplots(1, len(tasks_to_pie), figsize=(20, 3)) | |
for t in tasks_to_pie: | |
tasks = [t[2], t[3]] | |
axes[counter].pie(tasks, autopct='%1.1f%%', startangle=15, shadow=True, colors=my_colors, | |
explode=my_explode) | |
axes[counter].set_title(t[0]) | |
axes[counter].axis('equal') | |
counter += 1 | |
fig.set_facecolor("white") | |
fig.legend(labels=my_labels, loc="center right", borderaxespad=0.1, title="Labels alignement") | |
# plt.savefig(f'./out/pie_alignement_labels_{filename_no_extension}.png', dpi=400) | |
return fig | |
def analyze_text(self) -> list: | |
"""returns total sentences, words and characters | |
in list format | |
""" | |
return [ | |
len(sent_tokenize(self.baseline_text, language="french")), | |
len(word_tokenize(self.baseline_text, language="french")), | |
len(self.baseline_text) | |
] | |