Spaces:
Runtime error
Runtime error
import os | |
import re | |
import numpy as np | |
import scipy as sp | |
import glob | |
import json | |
from typing import Dict, List, Tuple, Union | |
import torch | |
import pandas | |
import streamlit as st | |
import matplotlib.pyplot as plt | |
from inference_tokenizer import NextSentencePredictionTokenizer | |
from models import get_class | |
def get_model(_model_path): | |
print(f"Getting model at {_model_path}") | |
if os.path.isfile(os.path.join(_model_path, "meta-info.json")): | |
with open(os.path.join(_model_path, "meta-info.json"), "r") as f: | |
meta_info = json.load(f) | |
_model_package = meta_info["kwargs"].get("model_package", "transformers") | |
_model_class = meta_info["kwargs"].get("model_class", "BertForNextSentencePrediction") | |
else: | |
raise FileNotFoundError("Model is provided without meta-info.json. Cannot interfere proper configuration!") | |
model_class = get_class(_model_package, _model_class) | |
_model = model_class.from_pretrained(_model_path) | |
_model.eval() | |
return _model | |
def get_tokenizer(tokenizer_path): | |
print(f"Getting tokenizer at {tokenizer_path}") | |
from transformers import AutoTokenizer | |
tokenizer = AutoTokenizer.from_pretrained(tokenizer_path) | |
if os.path.isfile(os.path.join(tokenizer_path, "meta-info.json")): | |
with open(os.path.join(tokenizer_path, "meta-info.json"), "r") as f: | |
meta_info = json.load(f) | |
tokenizer_args = meta_info["tokenizer_args"] | |
special_token = meta_info["kwargs"]["special_token"] | |
else: | |
raise FileNotFoundError("Tokenizer is provided without meta-info.json. Cannot interfere proper configuration!") | |
if special_token != " ": | |
tokenizer.add_special_tokens({"additional_special_tokens": [special_token]}) | |
_inference_tokenizer = NextSentencePredictionTokenizer(tokenizer, **tokenizer_args) | |
return _inference_tokenizer | |
models_path = glob.glob("./model/*/info.json") | |
models = {} | |
for model_path in models_path: | |
with open(model_path, "r") as f: | |
model_data = json.load(f) | |
model_data["path"] = model_path.replace("info.json", "") | |
models[model_data["model"]] = model_data | |
model_name = st.selectbox('Which model do you want to use?', | |
(x for x in sorted(models.keys())), | |
index=0) | |
model_path = models[model_name]["path"] | |
model = get_model(model_path) | |
inference_tokenizer = get_tokenizer(model_path) | |
def get_evaluation_data_from_json(_context: List) -> List[Tuple[List, str, str]]: | |
output_data = [] | |
for _dict in _context: | |
_dict: Dict | |
for source in _dict["answers"].values(): | |
for _t, sentences in source.items(): | |
for sentence in sentences: | |
output_data.append((_dict["context"], sentence, _t)) | |
return output_data | |
control_sequence_regex_1 = re.compile(r"#.*? ") | |
control_sequence_regex_2 = re.compile(r"#.*?\n") | |
def _clean_conversational_line(_line: str): | |
_line = _line.replace("Bot: ", "") | |
_line = _line.replace("User: ", "") | |
_line = control_sequence_regex_1.sub("", _line) | |
_line = control_sequence_regex_2.sub("\n", _line) | |
return _line.strip() | |
def get_evaluation_data_from_dialogue(_context: List[str]) -> List[Dict]: | |
output_data = [] | |
_context = list(map(lambda x: x.strip(), _context)) | |
_context = list(filter(lambda x: len(x), _context)) | |
for idx, _line in enumerate(_context): | |
actual_context = _context[max(0, idx - 5):idx] | |
gradual_context_dict = {_line: []} | |
for context_idx in range(len(actual_context)): | |
gradual_context_dict[_line].append(actual_context[-context_idx:]) | |
output_data.append(gradual_context_dict) | |
return output_data | |
option = st.selectbox("Choose type of input:", | |
["01 - String (one turn per line)", | |
"02 - JSON (aggregated)", | |
"03 - JSON (example CA-OOD)", | |
"04 - JSON (example Elysai)", | |
"05 - Diagnostic mode", | |
"06 - JSON (example Elysai - large)", | |
"07 - Dialogue Breakdown Challenge"]) | |
progres_bar = st.progress(0.0, text="Inference") | |
if "01" in option: | |
with st.form("input_text"): | |
context = st.text_area("Insert context here (one turn per line):") | |
actual_text = st.text_input("Insert current turn:") | |
context = list(filter(lambda x: len(x.strip()) >= 1, context.split("\n"))) | |
input_tensor = inference_tokenizer.get_item(context=context, actual_sentence=actual_text) | |
output_model = model(**input_tensor.data).logits | |
output_model = output_model.detach().numpy()[0] | |
if len(output_model) == 2: # classification | |
output_model = sp.special.softmax(output_model, axis=-1) | |
prop_follow = output_model[0] | |
prop_not_follow = output_model[1] | |
elif len(output_model) == 1: # regression | |
prop_follow = 1 - output_model[0] | |
prop_not_follow = 1 - prop_follow | |
submitted = st.form_submit_button("Submit") | |
if submitted: | |
fig, ax = plt.subplots() | |
ax.pie([prop_follow, prop_not_follow], labels=["Probability - Follow", "Probability - Not Follow"], | |
autopct='%1.1f%%') | |
st.pyplot(fig) | |
if "02" in option or "03" in option or "04" in option or "06" in option: | |
with st.form("input_text"): | |
from data.example_data import ca_ood, elysai, elysai_large | |
option: str | |
# > Python 3.10 | |
# match option.split("-")[0].strip(): | |
# case "03": | |
# text = json.dumps(choices[0]) | |
# case "04": | |
# text = json.dumps(choices[1]) | |
# case _: | |
# text = "" | |
option = option.split("-")[0].strip() | |
text = "" | |
if option == "03": | |
text = json.dumps(ca_ood) | |
elif option == "04": | |
text = json.dumps(elysai) | |
elif option == "06": | |
text = json.dumps(elysai_large) | |
context = st.text_area("Insert JSON here:", value=str(text)) | |
if "{" in context: | |
data_for_evaluation = get_evaluation_data_from_json(_context=json.loads(context)) | |
results = [] | |
accuracy = [] | |
submitted = st.form_submit_button("Submit") | |
if submitted: | |
for idx, datapoint in enumerate(data_for_evaluation): | |
progres_bar.progress(idx / len(data_for_evaluation), text="Inference") | |
c, s, human_label = datapoint | |
input_tensor = inference_tokenizer.get_item(context=c, actual_sentence=s) | |
output_model = model(**input_tensor.data).logits | |
output_model = output_model.detach().numpy()[0] | |
if len(output_model) == 2: # classification | |
output_model = sp.special.softmax(output_model, axis=-1) | |
prop_follow = output_model[0] | |
prop_not_follow = output_model[1] | |
elif len(output_model) == 1: # regression | |
prop_follow = 1 - output_model[0] | |
prop_not_follow = 1 - prop_follow | |
results.append((c, s, human_label, prop_follow, prop_not_follow)) | |
if human_label == "coherent": | |
accuracy.append(int(prop_follow > prop_not_follow)) | |
else: | |
accuracy.append(int(prop_not_follow > prop_follow)) | |
st.metric(label="Accuracy", value=f"{sum(accuracy) / len(accuracy)} %") | |
df = pandas.DataFrame(results, columns=["Context", "Query", "Human Label", "Probability (follow)", | |
"Probability (not-follow)"]) | |
st.dataframe(df) | |
if "05" in option: | |
with st.form("input_text"): | |
context_size = 5 | |
context = st.text_area("Insert dialogue here (one turn per line):") | |
submitted = st.form_submit_button("Submit") | |
if submitted: | |
data_for_evaluation = get_evaluation_data_from_dialogue(_clean_conversational_line(context).split("\n")) | |
lines = [] | |
scores = np.zeros(shape=(len(data_for_evaluation), context_size)) | |
for idx, datapoint in enumerate(data_for_evaluation): | |
progres_bar.progress(idx / len(data_for_evaluation), text="Inference") | |
for actual_sentence, contexts in datapoint.items(): | |
lines.append(actual_sentence) | |
for c in contexts: | |
input_tensor = inference_tokenizer.get_item(context=c, actual_sentence=actual_sentence) | |
output_model = model(**input_tensor.data).logits | |
output_model = output_model.detach().numpy()[0] | |
if len(output_model) == 2: # classification | |
output_model = sp.special.softmax(output_model, axis=-1) | |
prop_follow = output_model[0] | |
prop_not_follow = output_model[1] | |
elif len(output_model) == 1: # regression | |
prop_follow = 1 - output_model[0] | |
prop_not_follow = 1 - prop_follow | |
scores[len(lines) - 1][len(c) - 1] = prop_follow | |
aggregated_result = [] | |
for idx, line in enumerate(lines): | |
aggregated_result.append([line] + scores[idx].tolist()) | |
st.table(aggregated_result) | |
if "07" in option: | |
from data.example_data import dbc | |
select_conversation = st.selectbox("Which dialogue to evaluate", list(range(len(dbc))), index=0) | |
context = st.text_area("Insert dialogue here (one turn per line):", | |
value=json.dumps([dbc[int(select_conversation)]])) | |
st.markdown("# Formatted form") | |
context_json = json.loads(context) | |
output = "" | |
for conversation in context_json: | |
for utterance in conversation: | |
output += " * " + utterance["text"] + "\n" | |
output += "## ------------------------ " | |
st.markdown(output) | |
with st.form("input_text"): | |
context_size = 5 | |
submitted = st.form_submit_button("Submit") | |
if submitted: | |
aggregated_result = [] | |
for idx, conversation in enumerate(context_json): | |
data_for_evaluation = get_evaluation_data_from_dialogue([x["text"] for x in conversation]) | |
lines = [] | |
scores = np.zeros(shape=(len(data_for_evaluation), context_size)) | |
for datapoint in data_for_evaluation: | |
progres_bar.progress(idx / len(data_for_evaluation), text="Inference") | |
for actual_sentence, contexts in datapoint.items(): | |
lines.append(actual_sentence) | |
for c in contexts: | |
input_tensor = inference_tokenizer.get_item(context=c, actual_sentence=actual_sentence) | |
output_model = model(**input_tensor.data).logits | |
output_model = output_model.detach().numpy()[0] | |
if len(output_model) == 2: # classification | |
output_model = sp.special.softmax(output_model, axis=-1) | |
prop_follow = output_model[0] | |
prop_not_follow = output_model[1] | |
elif len(output_model) == 1: # regression | |
prop_follow = 1 - output_model[0] | |
prop_not_follow = 1 - prop_follow | |
scores[len(lines) - 1][len(c) - 1] = prop_follow | |
for idx, line in enumerate(lines): | |
NB = conversation[idx]["NB"] | |
PB = conversation[idx]["PB"] | |
B = conversation[idx]["B"] | |
aggregated_result.append([line] + [f"{NB}/{PB}/{B}"] + scores[idx].tolist()) | |
aggregated_result.append([["-"] * len(aggregated_result[-1])]) | |
st.table(aggregated_result) | |
st.markdown("## Description of models:") | |
for x in sorted(models.values(), key=lambda x: x["model"]): | |
st.write((str(x["model"] + " - " + x["description"]))) | |