import os import re import numpy as np import scipy as sp import glob import json from typing import Dict, List, Tuple, Union import torch import pandas import streamlit as st import matplotlib.pyplot as plt from inference_tokenizer import NextSentencePredictionTokenizer from models import get_class def get_model(_model_path): print(f"Getting model at {_model_path}") if os.path.isfile(os.path.join(_model_path, "meta-info.json")): with open(os.path.join(_model_path, "meta-info.json"), "r") as f: meta_info = json.load(f) _model_package = meta_info["kwargs"].get("model_package", "transformers") _model_class = meta_info["kwargs"].get("model_class", "BertForNextSentencePrediction") else: raise FileNotFoundError("Model is provided without meta-info.json. Cannot interfere proper configuration!") model_class = get_class(_model_package, _model_class) _model = model_class.from_pretrained(_model_path) _model.eval() return _model def get_tokenizer(tokenizer_path): print(f"Getting tokenizer at {tokenizer_path}") from transformers import AutoTokenizer tokenizer = AutoTokenizer.from_pretrained(tokenizer_path) if os.path.isfile(os.path.join(tokenizer_path, "meta-info.json")): with open(os.path.join(tokenizer_path, "meta-info.json"), "r") as f: meta_info = json.load(f) tokenizer_args = meta_info["tokenizer_args"] special_token = meta_info["kwargs"]["special_token"] else: raise FileNotFoundError("Tokenizer is provided without meta-info.json. Cannot interfere proper configuration!") if special_token != " ": tokenizer.add_special_tokens({"additional_special_tokens": [special_token]}) _inference_tokenizer = NextSentencePredictionTokenizer(tokenizer, **tokenizer_args) return _inference_tokenizer models_path = glob.glob("./model/*/info.json") models = {} for model_path in models_path: with open(model_path, "r") as f: model_data = json.load(f) model_data["path"] = model_path.replace("info.json", "") models[model_data["model"]] = model_data model_name = st.selectbox('Which model do you want to use?', (x for x in sorted(models.keys())), index=0) model_path = models[model_name]["path"] model = get_model(model_path) inference_tokenizer = get_tokenizer(model_path) def get_evaluation_data_from_json(_context: List) -> List[Tuple[List, str, str]]: output_data = [] for _dict in _context: _dict: Dict for source in _dict["answers"].values(): for _t, sentences in source.items(): for sentence in sentences: output_data.append((_dict["context"], sentence, _t)) return output_data control_sequence_regex_1 = re.compile(r"#.*? ") control_sequence_regex_2 = re.compile(r"#.*?\n") def _clean_conversational_line(_line: str): _line = _line.replace("Bot: ", "") _line = _line.replace("User: ", "") _line = control_sequence_regex_1.sub("", _line) _line = control_sequence_regex_2.sub("\n", _line) return _line.strip() def get_evaluation_data_from_dialogue(_context: List[str]) -> List[Dict]: output_data = [] _context = list(map(lambda x: x.strip(), _context)) _context = list(filter(lambda x: len(x), _context)) for idx, _line in enumerate(_context): actual_context = _context[max(0, idx - 5):idx] gradual_context_dict = {_line: []} for context_idx in range(len(actual_context)): gradual_context_dict[_line].append(actual_context[-context_idx:]) output_data.append(gradual_context_dict) return output_data option = st.selectbox("Choose type of input:", ["01 - String (one turn per line)", "02 - JSON (aggregated)", "03 - JSON (example CA-OOD)", "04 - JSON (example Elysai)", "05 - Diagnostic mode", "06 - JSON (example Elysai - large)", "07 - Dialogue Breakdown Challenge"]) progres_bar = st.progress(0.0, text="Inference") if "01" in option: with st.form("input_text"): context = st.text_area("Insert context here (one turn per line):") actual_text = st.text_input("Insert current turn:") context = list(filter(lambda x: len(x.strip()) >= 1, context.split("\n"))) input_tensor = inference_tokenizer.get_item(context=context, actual_sentence=actual_text) output_model = model(**input_tensor.data).logits output_model = output_model.detach().numpy()[0] if len(output_model) == 2: # classification output_model = sp.special.softmax(output_model, axis=-1) prop_follow = output_model[0] prop_not_follow = output_model[1] elif len(output_model) == 1: # regression prop_follow = 1 - output_model[0] prop_not_follow = 1 - prop_follow submitted = st.form_submit_button("Submit") if submitted: fig, ax = plt.subplots() ax.pie([prop_follow, prop_not_follow], labels=["Probability - Follow", "Probability - Not Follow"], autopct='%1.1f%%') st.pyplot(fig) if "02" in option or "03" in option or "04" in option or "06" in option: with st.form("input_text"): from data.example_data import ca_ood, elysai, elysai_large option: str # > Python 3.10 # match option.split("-")[0].strip(): # case "03": # text = json.dumps(choices[0]) # case "04": # text = json.dumps(choices[1]) # case _: # text = "" option = option.split("-")[0].strip() text = "" if option == "03": text = json.dumps(ca_ood) elif option == "04": text = json.dumps(elysai) elif option == "06": text = json.dumps(elysai_large) context = st.text_area("Insert JSON here:", value=str(text)) if "{" in context: data_for_evaluation = get_evaluation_data_from_json(_context=json.loads(context)) results = [] accuracy = [] submitted = st.form_submit_button("Submit") if submitted: for idx, datapoint in enumerate(data_for_evaluation): progres_bar.progress(idx / len(data_for_evaluation), text="Inference") c, s, human_label = datapoint input_tensor = inference_tokenizer.get_item(context=c, actual_sentence=s) output_model = model(**input_tensor.data).logits output_model = output_model.detach().numpy()[0] if len(output_model) == 2: # classification output_model = sp.special.softmax(output_model, axis=-1) prop_follow = output_model[0] prop_not_follow = output_model[1] elif len(output_model) == 1: # regression prop_follow = 1 - output_model[0] prop_not_follow = 1 - prop_follow results.append((c, s, human_label, prop_follow, prop_not_follow)) if human_label == "coherent": accuracy.append(int(prop_follow > prop_not_follow)) else: accuracy.append(int(prop_not_follow > prop_follow)) st.metric(label="Accuracy", value=f"{sum(accuracy) / len(accuracy)} %") df = pandas.DataFrame(results, columns=["Context", "Query", "Human Label", "Probability (follow)", "Probability (not-follow)"]) st.dataframe(df) if "05" in option: with st.form("input_text"): context_size = 5 context = st.text_area("Insert dialogue here (one turn per line):") submitted = st.form_submit_button("Submit") if submitted: data_for_evaluation = get_evaluation_data_from_dialogue(_clean_conversational_line(context).split("\n")) lines = [] scores = np.zeros(shape=(len(data_for_evaluation), context_size)) for idx, datapoint in enumerate(data_for_evaluation): progres_bar.progress(idx / len(data_for_evaluation), text="Inference") for actual_sentence, contexts in datapoint.items(): lines.append(actual_sentence) for c in contexts: input_tensor = inference_tokenizer.get_item(context=c, actual_sentence=actual_sentence) output_model = model(**input_tensor.data).logits output_model = output_model.detach().numpy()[0] if len(output_model) == 2: # classification output_model = sp.special.softmax(output_model, axis=-1) prop_follow = output_model[0] prop_not_follow = output_model[1] elif len(output_model) == 1: # regression prop_follow = 1 - output_model[0] prop_not_follow = 1 - prop_follow scores[len(lines) - 1][len(c) - 1] = prop_follow aggregated_result = [] for idx, line in enumerate(lines): aggregated_result.append([line] + scores[idx].tolist()) st.table(aggregated_result) if "07" in option: from data.example_data import dbc select_conversation = st.selectbox("Which dialogue to evaluate", list(range(len(dbc))), index=0) context = st.text_area("Insert dialogue here (one turn per line):", value=json.dumps([dbc[int(select_conversation)]])) st.markdown("# Formatted form") context_json = json.loads(context) output = "" for conversation in context_json: for utterance in conversation: output += " * " + utterance["text"] + "\n" output += "## ------------------------ " st.markdown(output) with st.form("input_text"): context_size = 5 submitted = st.form_submit_button("Submit") if submitted: aggregated_result = [] for idx, conversation in enumerate(context_json): data_for_evaluation = get_evaluation_data_from_dialogue([x["text"] for x in conversation]) lines = [] scores = np.zeros(shape=(len(data_for_evaluation), context_size)) for datapoint in data_for_evaluation: progres_bar.progress(idx / len(data_for_evaluation), text="Inference") for actual_sentence, contexts in datapoint.items(): lines.append(actual_sentence) for c in contexts: input_tensor = inference_tokenizer.get_item(context=c, actual_sentence=actual_sentence) output_model = model(**input_tensor.data).logits output_model = output_model.detach().numpy()[0] if len(output_model) == 2: # classification output_model = sp.special.softmax(output_model, axis=-1) prop_follow = output_model[0] prop_not_follow = output_model[1] elif len(output_model) == 1: # regression prop_follow = 1 - output_model[0] prop_not_follow = 1 - prop_follow scores[len(lines) - 1][len(c) - 1] = prop_follow for idx, line in enumerate(lines): NB = conversation[idx]["NB"] PB = conversation[idx]["PB"] B = conversation[idx]["B"] aggregated_result.append([line] + [f"{NB}/{PB}/{B}"] + scores[idx].tolist()) aggregated_result.append([["-"] * len(aggregated_result[-1])]) st.table(aggregated_result) st.markdown("## Description of models:") for x in sorted(models.values(), key=lambda x: x["model"]): st.write((str(x["model"] + " - " + x["description"])))