Spaces:
Sleeping
Sleeping
#!/usr/bin/env python3 | |
# -*- coding:utf-8 -*- | |
import requests.exceptions | |
import zipfile | |
import streamlit as st | |
from streamlit.components.v1 import html | |
from n4a_analytics_lib.analytics import (GlobalStatistics, IaaStatistics) | |
from n4a_analytics_lib.constants import (DESCRIPTION) | |
# Set application | |
st.set_page_config(layout="wide") | |
# sidebar: meta, inputs etc. | |
sidebar = st.sidebar | |
# cols: display results | |
col1, col2 = st.columns(2) | |
# description | |
sidebar.markdown(DESCRIPTION) | |
# to st components | |
#def clear_cache(): | |
# st.session_state = {} | |
def check_login(username, password): | |
if (len(username) == 0) or (len(password) == 0): | |
return False | |
return True | |
def logout(): | |
pass | |
# Level to analyze | |
option = sidebar.selectbox('Which statistics level?', ('Inter-Annotator Agreement results', | |
'Global project statistics')) | |
# IAA results view | |
if option == "Inter-Annotator Agreement results": | |
annotations = sidebar.file_uploader("Upload IAA annotations (.zip format only): ") | |
baseline_text = sidebar.file_uploader("Upload baseline text (.txt format only): ") | |
if baseline_text is not None and annotations is not None: | |
project_analyzed = IaaStatistics(zip_project=annotations, baseline_text=baseline_text.getvalue()) | |
baseline_analyzer = project_analyzed.analyze_text() | |
col2.markdown(f""" | |
### BASELINE TEXT: {baseline_text.name} | |
- sentences: {baseline_analyzer[0]} | |
- words: {baseline_analyzer[1]} | |
- characters: {baseline_analyzer[2]} | |
""") | |
#print(project_analyzed.annotations_per_coders) | |
commune_mentions = [l for i,j in project_analyzed.mentions_per_coder.items() for l in j] | |
commune_mentions = list(dict.fromkeys(commune_mentions)) | |
#print(commune_mentions) | |
#print(project_analyzed.annotations) | |
#print(project_analyzed.labels_per_coder) | |
import pandas as pd | |
from collections import defaultdict, Counter | |
from itertools import combinations | |
import seaborn as sn | |
import matplotlib as plt | |
import matplotlib.pyplot as pylt | |
dicts_coders = [] | |
for coder, annotations in project_analyzed.annotations_per_coders.items(): | |
nombre_annotations = [] | |
# print(f'* {coder}') | |
for annotation, label in annotations.items(): | |
nombre_annotations.append(label) | |
# print(f"Nombre total d'annotations : {len(nombre_annotations)}") | |
dict_coder = dict(Counter(nombre_annotations)) | |
dicts_coders.append(dict_coder) | |
# print(f'==========================') | |
labels = [label for label in dicts_coders[0]] | |
from n4a_analytics_lib.metrics_utils import interpret_kappa, fleiss_kappa_function, cohen_kappa_function | |
df = pd.DataFrame(project_analyzed.annotations_per_coders, index=commune_mentions) | |
for ann in project_analyzed.annotators: | |
df[ann] = 'None' | |
for mention, value in project_analyzed.annotations_per_coders[ann].items(): | |
df.loc[mention, ann] = value | |
total_annotations = len(df) | |
# print(f'* Total des annotations : {total_annotations}') | |
df_n = df.apply(pd.Series.value_counts, 1).fillna(0).astype(int) | |
matrix = df_n.values | |
pairs = list(combinations(project_analyzed.annotations_per_coders, 2)) | |
# Display in app | |
#cont_kappa = st.container() | |
st.title("Inter-Annotator Agreement (IAA) results") | |
#tab1, tab2, tab3, tab4, tab5 = st.tabs( | |
# ["π IAA metrics", "π IAA Metrics Legend", "βοΈ Agree annotations", "β Disagree annotations", | |
# "π·οΈ Global Labels Statistics"]) | |
st.markdown("## π IAA metrics") | |
col1_kappa, col2_kappa = st.columns(2) | |
col1_kappa.subheader("Fleiss Kappa (global score for group):") | |
col1_kappa.markdown(interpret_kappa(round(fleiss_kappa_function(matrix), 2)), unsafe_allow_html=True) | |
col1_kappa.subheader("Cohen Kappa Annotators Matrix (score between annotators):") | |
# tab1.dataframe(df) | |
data = [] | |
for coder_1, coder_2 in pairs: | |
cohen_function = cohen_kappa_function(project_analyzed.labels_per_coder[coder_1], project_analyzed.labels_per_coder[coder_2]) | |
data.append(((coder_1, coder_2), cohen_function)) | |
col1_kappa.markdown(f"* {coder_1} <> {coder_2} : {interpret_kappa(cohen_function)}", unsafe_allow_html=True) | |
# print(f"* {coder_1} <> {coder_2} : {cohen_function}") | |
intermediary = defaultdict(Counter) | |
for (src, tgt), count in data: | |
intermediary[src][tgt] = count | |
letters = sorted({key for inner in intermediary.values() for key in inner} | set(intermediary.keys())) | |
confusion_matrix = [[intermediary[src][tgt] for tgt in letters] for src in letters] | |
import numpy as np | |
df_cm = pd.DataFrame(confusion_matrix, letters, letters) | |
mask = df_cm.values == 0 | |
sn.set(font_scale=0.7) # for label size | |
colors = ["#e74c3c", "#f39c12", "#f4d03f", "#5dade2", "#58d68d", "#28b463"] | |
width = st.slider("matrix width", 1, 10, 14) | |
height = st.slider("matrix height", 1, 10, 4) | |
fig, ax = pylt.subplots(figsize=(width, height)) | |
sn.heatmap(df_cm, cmap=colors, annot=True, mask=mask, annot_kws={"size": 7}, vmin=0, vmax=1, ax=ax) # font size | |
# plt.show() | |
st.pyplot(ax.figure) | |
col2_kappa.markdown(""" | |
<div> | |
<div id="legend" style="right: 70em;"> | |
<h3>π IAA Metrics Legend</h3> | |
<table> | |
<thead> | |
<tr> | |
<th | |
colspan="2"> Kappa | |
interpretation | |
legend </th> | |
</tr> | |
</thead> | |
<tbody> | |
<tr> | |
<td> Kappa | |
score(k) </td> | |
<td>Agreement</td> | |
</tr> | |
<tr | |
style = "background-color: #e74c3c;"> | |
<td> k < 0 </td> | |
<td> Less | |
chance | |
agreement </td> | |
</tr> | |
<tr | |
style = "background-color: #f39c12;"> | |
<td> 0.01 < k < 0.20 </td> | |
<td> Slight | |
agreement </td> | |
</tr> | |
<tr | |
style = "background-color: #f4d03f;"> | |
<td> 0.21 < k < 0.40 </td> | |
<td> Fair | |
agreement </td> | |
</tr> | |
<tr | |
style = "background-color: #5dade2;"> | |
<td> 0.41 < k < 0.60 </td> | |
<td> Moderate | |
agreement </td> | |
</tr> | |
<tr | |
style = "background-color: #58d68d;"> | |
<td> 0.61 < k < 0.80 </td> | |
<td> Substantial | |
agreement </td> | |
</tr> | |
<tr | |
style = "background-color: #28b463;"> | |
<td> 0.81 < k < 0.99 </td> | |
<td> Almost | |
perfect | |
agreement </td> | |
</tr> | |
</tbody> | |
</table></div></div>""" | |
, unsafe_allow_html = True) | |
## commune | |
def convert_df(df_ex): | |
return df_ex.to_csv(encoding="utf-8").encode('utf-8') | |
## Agree part | |
columns_to_compare = project_analyzed.annotators | |
def check_all_equal(iterator): | |
return len(set(iterator)) <= 1 | |
df_agree = df[df[columns_to_compare].apply(lambda row: check_all_equal(row), axis=1)] | |
total_unanime = len(df_agree) | |
csv_agree = convert_df(df_agree) | |
st.subheader("βοΈ Agree annotations") | |
st.markdown(f"{total_unanime} / {len(df)} annotations ({round((total_unanime / len(df)) * 100, 2)} %)") | |
st.download_button( | |
"Press to Download CSV", | |
csv_agree, | |
"csv_annotators_agree.csv", | |
"text/csv", | |
key='download-csv-1' | |
) | |
st.dataframe(df_agree) | |
## Disagree part | |
def check_all_not_equal(iterator): | |
return len(set(iterator)) > 1 | |
df_disagree = df[df[columns_to_compare].apply(lambda row: check_all_not_equal(row), axis=1)] | |
total_desaccord = len(df_disagree) | |
csv_disagree = convert_df(df_disagree) | |
st.subheader("β Disagree annotations") | |
st.markdown( | |
f"{total_desaccord} / {len(df)} annotations ({round((total_desaccord / len(df)) * 100, 2)} %)") | |
st.download_button( | |
"Press to Download CSV", | |
csv_disagree, | |
"csv_annotators_disagree.csv", | |
"text/csv", | |
key='download-csv-2' | |
) | |
st.dataframe(df_disagree) | |
## alignement chart labels | |
def count_total_annotations_label(dataframe, labels): | |
pairs = [] | |
for label in labels: | |
total = dataframe.astype(object).eq(label).any(1).sum() | |
pairs.append((label, total)) | |
return pairs | |
totals_annotations_per_labels = count_total_annotations_label(df, labels) | |
# RΓ©cupΓ©rer le nombre de mention portant la mΓͺme classe selon les annotateurs | |
def total_agree_disagree_per_label(dataframe, pairs_totals_labels): | |
new_pairs = [] | |
for t in pairs_totals_labels: | |
# t[0] : label | |
# t[1] : total_rows_with_label | |
agree_res = df[df.nunique(1).eq(1)].eq(t[0]).any(1).sum() | |
disagree_res = t[1] - agree_res | |
agree_percent = (agree_res / t[1]) * 100 | |
disagree_percent = (disagree_res / t[1]) * 100 | |
new_pairs.append((t[0], t[1], agree_percent, disagree_percent)) | |
return new_pairs | |
to_pie = total_agree_disagree_per_label(df, totals_annotations_per_labels) | |
def plot_pies(tasks_to_pie): | |
my_labels = 'agree', 'disagree' | |
my_colors = ['#47DBCD', '#F5B14C'] | |
my_explode = (0, 0.1) | |
counter = 0 | |
fig, axes = pylt.subplots(1, len(tasks_to_pie), figsize=(20, 3)) | |
for t in tasks_to_pie: | |
tasks = [t[2], t[3]] | |
axes[counter].pie(tasks, autopct='%1.1f%%', startangle=15, shadow=True, colors=my_colors, | |
explode=my_explode) | |
axes[counter].set_title(t[0]) | |
axes[counter].axis('equal') | |
counter += 1 | |
fig.set_facecolor("white") | |
fig.legend(labels=my_labels, loc="center right", borderaxespad=0.1, title="Labels alignement") | |
# plt.savefig(f'./out/pie_alignement_labels_{filename_no_extension}.png', dpi=400) | |
return fig | |
f = plot_pies(to_pie) | |
st.subheader("π·οΈ Global Labels Statistics") | |
st.pyplot(f.figure) | |
# global project results view | |
# st_session = {"gs_local":True, "gs_remote":False, "gs_obj":<object>} | |
def display_data(): | |
col1.metric("Total curated annotations", | |
f"{st.session_state['gs_obj'].total_annotations_project} Named entities") | |
col1.dataframe(st.session_state['gs_obj'].df_i) | |
selected_data = col1.selectbox('Select specific data to display bar plot:', | |
st.session_state['gs_obj'].documents, key="selector_data") | |
col2.pyplot(st.session_state['gs_obj'].create_plot(selected_data)) | |
def init_session_statistics(remote: bool, local: bool, data: tuple) -> None: | |
# clear session | |
st.session_state = {} | |
# create a session variable | |
st.session_state["gs_local"] = local | |
st.session_state["gs_remote"] = remote | |
# create a new object: | |
# if remote fetch data from API Host first | |
if remote and not(local): | |
st.success('Fetch curated documents from host INCEpTION API in progress...') | |
fetch_curated_data_from_remote( | |
username=data[0], | |
password=data[1] | |
) | |
if local and not(remote): | |
st.session_state["gs_obj"] = GlobalStatistics(zip_project=data, remote=False) | |
from pycaprio import Pycaprio, mappings | |
from zipfile import ZipFile | |
import io | |
import requests | |
def fetch_curated_data_from_remote(username: str, | |
password: str, | |
endpoint: str = "https://inception.dhlab.epfl.ch/prod", | |
project_title: str = "ner4archives-template"): | |
# open a client | |
try: | |
client = Pycaprio(inception_host=endpoint, authentication=(str(username), str(password))) | |
except requests.exceptions.JSONDecodeError: | |
# username / password incorrect | |
st.error('Username or Password is incorrect please retry.') | |
# get project object | |
project_name = [p for p in client.api.projects() if p.project_name == project_title] | |
# get all documents from project | |
documents = client.api.documents(project_name[0].project_id) | |
curations = [] | |
zipfiles = [] | |
count = 0 | |
flag = "a" | |
# iterate over all documents and retrieve only curated into ZIP container | |
for document in documents: | |
if count > 0: | |
flag = "r" | |
if document.document_state == mappings.DocumentState.CURATION_COMPLETE: | |
curated_content = client.api.curation(project_name[0].project_id, document, | |
curation_format=mappings.InceptionFormat.UIMA_CAS_XMI_XML_1_1) | |
curations.append(curated_content) | |
for curation in curations: | |
z = ZipFile(io.BytesIO(curation), mode=flag) | |
zipfiles.append(z) | |
count += 1 | |
# Merge all zip in one | |
with zipfiles[0] as z1: | |
for fname in zipfiles[1:]: | |
zf = fname | |
# print(zf.namelist()) | |
for n in zf.namelist(): | |
if n not in z1.namelist(): | |
z1.writestr(n, zf.open(n).read()) | |
# Create a new object | |
st.session_state["gs_obj"] = GlobalStatistics(zip_project=z1, remote=True) | |
if option == "Global project statistics": | |
# User input controllers | |
mode = sidebar.radio("Choose mode to retrieve curated data: ", ( | |
"Local directory", "INCEpTION API Host remote" | |
)) | |
data = None | |
if mode == "Local directory": | |
project = sidebar.file_uploader("Folder that contains curated annotations in XMI 1.1 (.zip format only): ", | |
type="zip") | |
data = project | |
if mode == "INCEpTION API Host remote": | |
username = sidebar.text_input("Username: ") | |
password = sidebar.text_input("Password: ", type="password") | |
data = (username, password) | |
# Validate inputs | |
btn_process = sidebar.button('Process', key='process') | |
# Access data with local ressources | |
if btn_process and mode == "Local directory": | |
if data is not None: | |
# create a new session | |
init_session_statistics(remote=False, local=True, data=data) | |
# Access data with remote ressources | |
if btn_process and mode == "API Host remote": | |
if data is not None: | |
if check_login(username=data[0], password=data[1]): | |
# create a new session | |
init_session_statistics(remote=True, local=False, data=data) | |
else: | |
st.error("Sorry! Username or Password is empty.") | |
# Change data values and visualize new plot | |
if "gs_obj" in st.session_state: | |
if st.session_state["gs_local"] or st.session_state["gs_remote"]: | |
display_data() | |