# NDIS Project - PBSP Scoring - Page 3

In [None]:
import os
from ipywidgets import interact
import ipywidgets as widgets
from IPython.display import display, clear_output, Javascript, HTML, Markdown
from qdrant_client import QdrantClient
from qdrant_client.http.models import Distance, VectorParams, Batch, Filter, FieldCondition, Range, MatchValue
import json
import spacy
from spacy import displacy
import nltk
from nltk import sent_tokenize
from sklearn.feature_extraction import text
from pprint import pprint
import re
from flair.embeddings import TransformerDocumentEmbeddings
from flair.data import Sentence
from sentence_transformers import SentenceTransformer, util
import pandas as pd
import argilla as rg
from argilla.metrics.text_classification import f1
from typing import Dict
from setfit import SetFitModel
from tqdm import tqdm
import time
for i in tqdm(range(60), disable=True):
    time.sleep(1)
import warnings
warnings.filterwarnings('ignore')
%matplotlib inline
pd.set_option('display.max_rows', 500)
pd.set_option('display.max_colwidth', 10000)
pd.set_option('display.width', 10000)

In [None]:
#initializations
embedding = TransformerDocumentEmbeddings('distilbert-base-uncased')
client = QdrantClient(
    host=os.environ["QDRANT_API_URL"], 
    api_key=os.environ["QDRANT_API_KEY"],
    timeout=60,
    port=443
)
collection_name = "my_collection"
model = SentenceTransformer('./sentence-transformers_multi-qa-MiniLM-L6-cos-v1')
vector_dim = 384 #{distilbert-base-uncased: 768, multi-qa-MiniLM-L6-cos-v1:384}
sf_bhvr_model_name = "setfit-zero-shot-classification-pbsp-p3-bhvr"
sf_bhvr_model = SetFitModel.from_pretrained(f"aammari/{sf_bhvr_model_name}")
sf_sev_model_name = "setfit-zero-shot-classification-pbsp-p3-sev"
sf_sev_model = SetFitModel.from_pretrained(f"aammari/{sf_sev_model_name}")

# download nltk 'punkt' if not available
try:
    nltk.data.find('tokenizers/punkt')
except LookupError:
    nltk.download('punkt')

# download nltk 'averaged_perceptron_tagger' if not available
try:
    nltk.data.find('taggers/averaged_perceptron_tagger')
except LookupError:
    nltk.download('averaged_perceptron_tagger')
    
#argilla
rg.init(
    api_url=os.environ["ARGILLA_API_URL"],
    api_key=os.environ["ARGILLA_API_KEY"]
)

### <font color='red'>Domain Expert Section</font>
#### Enter the Topic Glossary

In [None]:
bhvr_onto_lst = [
    'hit employees',
    'push people',
    'throw objects',
    'beat students' 
]
bhvr_onto_text_input = widgets.Textarea(
    value='\n'.join(bhvr_onto_lst),
    placeholder='Type your answer',
    description='',
    disabled=False,
    layout={'height': '100%', 'width': '90%'}
)
bhvr_onto_label = widgets.Label(value='Behaviours')
bhvr_onto_box = widgets.VBox([bhvr_onto_label, bhvr_onto_text_input], 
                   layout={'width': '400px', 'height': '150px'})

In [None]:
fh_onto_lst = [
    'Gain the teacher attention',
    'Complete work in class',
    'Avoid difficult work'
]

fh_onto_text_input = widgets.Textarea(
    value='\n'.join(fh_onto_lst),
    placeholder='Type your answer',
    description='',
    disabled=False,
    layout={'height': '100%', 'width': '90%'}
)
fh_onto_label = widgets.Label(value='Functional Hypothesis')
fh_onto_box = widgets.VBox([fh_onto_label, fh_onto_text_input], 
                   layout={'width': '400px', 'height': '150px'})

In [None]:
rep_onto_lst = [
    'Ask teacher for help',
    'Replace full body slam',
    'Use a next sign'
]

rep_onto_text_input = widgets.Textarea(
    value='\n'.join(rep_onto_lst),
    placeholder='Type your answer',
    description='',
    disabled=False,
    layout={'height': '100%', 'width': '90%'}
)
rep_onto_label = widgets.Label(value='Replacement Behaviour')
rep_onto_box = widgets.VBox([rep_onto_label, rep_onto_text_input], 
                   layout={'width': '400px', 'height': '150px'})

#onto_boxes = widgets.HBox([bhvr_onto_box, fh_onto_box, rep_onto_box], 
#                   layout={'width': '90%', 'height': '150px'})

onto_boxes = widgets.HBox([bhvr_onto_box], 
                   layout={'width': '90%', 'height': '150px'})

display(onto_boxes)

In [None]:
#Text Preprocessing
try:
    nlp = spacy.load('en_core_web_sm')
except OSError:
    spacy.cli.download('en_core_web_sm')
    nlp = spacy.load('en_core_web_sm')
sw_lst = text.ENGLISH_STOP_WORDS
def preprocess(onto_lst):
    cleaned_onto_lst = []
    pattern = re.compile(r'^[a-z ]*$')
    for document in onto_lst:
        text = []
        doc = nlp(document)
        person_tokens = []
        for w in doc:
            if w.ent_type_ == 'PERSON':
                person_tokens.append(w.lemma_)
        for w in doc:
            if not w.is_stop and not w.is_punct and not w.like_num and not len(w.text.strip()) == 0 and not w.lemma_ in person_tokens:
                text.append(w.lemma_.lower())
        texts = [t for t in text if len(t) > 1 and pattern.search(t) is not None and t not in sw_lst]
        cleaned_onto_lst.append(" ".join(texts))
    return cleaned_onto_lst

cl_bhvr_onto_lst = preprocess(bhvr_onto_lst)
cl_fh_onto_lst = preprocess(fh_onto_lst)
cl_rep_onto_lst = preprocess(rep_onto_lst)

#pprint(cl_bhvr_onto_lst)
#pprint(cl_fh_onto_lst)
#pprint(cl_rep_onto_lst)

In [None]:
#compute document embeddings

# distilbert-base-uncased from Flair
def embeddings(cl_onto_lst):
    emb_onto_lst = []
    for doc in cl_onto_lst:
        sentence = Sentence(doc)
        embedding.embed(sentence)
        emb_onto_lst.append(sentence.embedding.tolist())
    return emb_onto_lst

# multi-qa-MiniLM-L6-cos-v1 from sentence_transformers
def sentence_embeddings(cl_onto_lst):
    emb_onto_lst_temp = model.encode(cl_onto_lst)
    emb_onto_lst = [x.tolist() for x in emb_onto_lst_temp]
    return emb_onto_lst

'''
emb_bhvr_onto_lst = embeddings(cl_bhvr_onto_lst)
emb_fh_onto_lst = embeddings(cl_fh_onto_lst)
emb_rep_onto_lst = embeddings(cl_rep_onto_lst)
'''

emb_bhvr_onto_lst = sentence_embeddings(cl_bhvr_onto_lst)
emb_fh_onto_lst = sentence_embeddings(cl_fh_onto_lst)
emb_rep_onto_lst = sentence_embeddings(cl_rep_onto_lst)

In [None]:
#add to qdrant collection
def add_to_collection():
    global cl_bhvr_onto_lst, emb_bhvr_onto_lst, cl_fh_onto_lst, emb_fh_onto_lst, cl_rep_onto_lst, emb_rep_onto_lst
    client.recreate_collection(
        collection_name=collection_name,
        vectors_config=VectorParams(size=vector_dim, distance=Distance.COSINE),
    )
    doc_count = len(emb_bhvr_onto_lst) + len(emb_fh_onto_lst) + len(emb_rep_onto_lst)
    ids = list(range(1, doc_count+1))
    payloads = [{"ontology": "behaviours", "phrase": x} for x in cl_bhvr_onto_lst] + \
               [{"ontology": "functional_hypothesis", "phrase": y} for y in cl_fh_onto_lst] + \
               [{"ontology": "replacement_behaviour", "phrase": z} for z in cl_rep_onto_lst]
    vectors = emb_bhvr_onto_lst+emb_fh_onto_lst+emb_rep_onto_lst
    client.upsert(
        collection_name=f"{collection_name}",
        points=Batch(
            ids=ids,
            payloads=payloads,
            vectors=vectors
        ),
    )

def count_collection():
    return len(client.scroll(
            collection_name=f"{collection_name}"
        )[0])

add_to_collection()
point_count = count_collection()
#print(point_count)

In [None]:
query_filter=Filter(
        must=[ 
            FieldCondition(
                key='ontology',
                match=MatchValue(value="functional_hypothesis")# Condition based on values of `rand_number` field.
            )
        ]
    )

In [None]:
#verb phrase extraction
def extract_vbs(data_chunked):
    for tup in data_chunked:
        if len(tup) > 2:
            yield(str(" ".join(str(x[0]) for x in tup)))

def get_verb_phrases(nltk_query):
    data_tok = nltk.word_tokenize(nltk_query) #tokenisation
    data_pos = nltk.pos_tag(data_tok) #POS tagging
    cfgs = [
        "CUSTOMCHUNK: {<VB><.*>{0,3}<NN>}",
        "CUSTOMCHUNK: {<VB><.*>{0,3}<NNP>}",
        "CUSTOMCHUNK: {<VB><.*>{0,3}<PRP><NN>}",
        "CUSTOMCHUNK: {<VB><.*>{0,3}<PRP><NNS>}",
        "CUSTOMCHUNK: {<VB><.*>{0,3}<NNPS>}",
        "CUSTOMCHUNK: {<VB><.*>{0,3}<NNS>}",
        "CUSTOMCHUNK: {<VB><.*>{0,3}<PRP><NNP>}",
        "CUSTOMCHUNK: {<VB><.*>{0,3}<PRP><NNPS>}",
        "CUSTOMCHUNK: {<VBN><.*>{0,3}<NN>}",
        "CUSTOMCHUNK: {<VBN><.*>{0,3}<NNP>}",
        "CUSTOMCHUNK: {<VBN><.*>{0,3}<PRP><NN>}",
        "CUSTOMCHUNK: {<VBN><.*>{0,3}<PRP><NNS>}",
        "CUSTOMCHUNK: {<VBN><.*>{0,3}<NNPS>}",
        "CUSTOMCHUNK: {<VBN><.*>{0,3}<NNS>}",
        "CUSTOMCHUNK: {<VBN><.*>{0,3}<PRP><NNP>}",
        "CUSTOMCHUNK: {<VBN><.*>{0,3}<PRP><NNPS>}",
        "CUSTOMCHUNK: {<VBG><.*>{0,3}<NN>}",
        "CUSTOMCHUNK: {<VBG><.*>{0,3}<NNP>}",
        "CUSTOMCHUNK: {<VBG><.*>{0,3}<PRP><NN>}",
        "CUSTOMCHUNK: {<VBG><.*>{0,3}<PRP><NNS>}",
        "CUSTOMCHUNK: {<VBG><.*>{0,3}<NNPS>}",
        "CUSTOMCHUNK: {<VBG><.*>{0,3}<NNS>}",
        "CUSTOMCHUNK: {<VBG><.*>{0,3}<PRP><NNP>}",
        "CUSTOMCHUNK: {<VBG><.*>{0,3}<PRP><NNPS>}",
        "CUSTOMCHUNK: {<VBP><.*>{0,3}<NN>}",
        "CUSTOMCHUNK: {<VBP><.*>{0,3}<NNP>}",
        "CUSTOMCHUNK: {<VBP><.*>{0,3}<PRP><NN>}",
        "CUSTOMCHUNK: {<VBP><.*>{0,3}<PRP><NNS>}",
        "CUSTOMCHUNK: {<VBP><.*>{0,3}<NNPS>}",
        "CUSTOMCHUNK: {<VBP><.*>{0,3}<NNS>}",
        "CUSTOMCHUNK: {<VBP><.*>{0,3}<PRP><NNP>}",
        "CUSTOMCHUNK: {<VBP><.*>{0,3}<PRP><NNPS>}",
        "CUSTOMCHUNK: {<VBZ><.*>{0,3}<NN>}",
        "CUSTOMCHUNK: {<VBZ><.*>{0,3}<NNP>}",
        "CUSTOMCHUNK: {<VBZ><.*>{0,3}<PRP><NN>}",
        "CUSTOMCHUNK: {<VBZ><.*>{0,3}<PRP><NNS>}",
        "CUSTOMCHUNK: {<VBZ><.*>{0,3}<NNPS>}",
        "CUSTOMCHUNK: {<VBZ><.*>{0,3}<NNS>}",
        "CUSTOMCHUNK: {<VBZ><.*>{0,3}<PRP><NNP>}",
        "CUSTOMCHUNK: {<VBZ><.*>{0,3}<PRP><NNPS>}"
       ]
    vbs = []
    for cfg_1 in cfgs: 
        chunker = nltk.RegexpParser(cfg_1)
        data_chunked = chunker.parse(data_pos)
        vbs += extract_vbs(data_chunked)
    return vbs

In [None]:
#query and get score

# distilbert-base-uncased from Flair
def get_query_vector(query):
    sentence = Sentence(query)
    embedding.embed(sentence)
    query_vector = sentence.embedding.tolist()
    return query_vector

# multi-qa-MiniLM-L6-cos-v1 from sentence_transformers
def sentence_get_query_vector(query):
    query_vector = model.encode(query)
    return query_vector

def search_collection(ontology, query_vector):
    query_filter=Filter(
        must=[  
            FieldCondition(
                key='ontology',
                match=MatchValue(value=ontology)
            )
        ]
    )
    
    hits = client.search(
        collection_name=f"{collection_name}",
        query_vector=query_vector,
        query_filter=query_filter, 
        append_payload=True,  
        limit=point_count 
    )
    return hits

semantic_passing_score = 0.50


#ontology = 'behaviours'
#query = 'punch father face'
#query_vector = sentence_get_query_vector(query)
#hist = search_collection(ontology, query_vector)

In [None]:
# format output
def color(df):
    return df.style.format({'Score': '{:,.2%}'.format}).bar(subset=['Score'], color='#ADD8E6')

def annotate_query(highlights, query):
    ents = []
    for h in highlights:
        ent_dict = {}
        for match in re.finditer(h, query):
            ent_dict = {"start": match.start(), "end": match.end(), "label": 'GLOSSARY'}
            break
        if len(ent_dict.keys()) > 0:
            ents.append(ent_dict)
    return ents

In [None]:
#setfit bhvr sentence extraction
def extract_sentences(nltk_query):
    sentences = sent_tokenize(nltk_query)
    return sentences

In [None]:
def convert_df(result_df):
    new_df = pd.DataFrame(columns=['text', 'prediction'])
    new_df['text'] = result_df['Phrase']
    new_df['prediction'] = result_df.apply(lambda row: [[row['Topic'], row['Score']]], axis=1)
    return new_df

In [None]:
def custom_f1(data: Dict[str, float], title: str):
    from plotly.subplots import make_subplots
    import plotly.colors
    import random

    fig = make_subplots(
        rows=2,
        cols=1,
        subplot_titles=[        "Overall Model Score",        "Model Score By Category",    ],
    )

    x = ['precision', 'recall', 'f1']
    macro_data = [v for k, v in data.items() if "macro" in k]
    fig.add_bar(
        x=x,
        y=macro_data,
        row=1,
        col=1,
    )
    per_label = {
        k: v
        for k, v in data.items()
        if all(key not in k for key in ["macro", "micro", "support"])
    }

    num_labels = int(len(per_label.keys())/3)
    fixed_colors = [str(color) for color in plotly.colors.qualitative.Plotly]
    colors = random.sample(fixed_colors, num_labels)

    fig.add_bar(
        x=[k for k, v in per_label.items()],
        y=[v for k, v in per_label.items()],
        row=2,
        col=1,
        marker_color=[colors[int(i/3)] for i in range(0, len(per_label.keys()))]
    )
    fig.update_layout(showlegend=False, title_text=title)

    return fig

In [None]:
def get_null_class_df(sentences, result_df):
    sents = result_df['Phrase'].tolist()
    null_sents = [x for x in sentences if x not in sents]
    topics = ['NONE'] * len(null_sents)
    scores = [0.90] * len(null_sents)
    null_df = pd.DataFrame({'Phrase': null_sents, 'Topic': topics, 'Score': scores})
    return null_df

In [None]:
#setfit bhvr query and get predicted topic

def get_sf_bhvr_topic(sentences):
    preds = list(sf_bhvr_model(sentences))
    return preds
def get_sf_bhvr_topic_scores(sentences):
    preds = sf_bhvr_model.predict_proba(sentences)
    preds = [max(list(x)) for x in preds]
    return preds

In [None]:
# setfit bhvr format output
ind_bhvr_topic_dict = {
        0: 'NO BEHAVIOUR',
        1: 'BEHAVIOUR',
    }

highlight_threshold = 0.25
passing_score = 0.50

def sf_bhvr_color(df):
    return df.style.format({'Score': '{:,.2%}'.format}).bar(subset=['Score'], color='#CCFFCC')

def sf_annotate_query(highlights, query, topics):
    ents = []
    for h, t in zip(highlights, topics):
        ent_dict = {}
        for match in re.finditer(h, query):
            ent_dict = {"start": match.start(), "end": match.end(), "label": t}
            break
        if len(ent_dict.keys()) > 0:
            ents.append(ent_dict)
    return ents

In [None]:
#regex freq query and get predicted topic

def detect_frequency(sentences):
    frequency_patterns = [
        r"(\d+|(once|twice|thrice))\s*(time(s)?)?\s*(per)?\s*(a|an)?\s*((minute|hour|day|week|month|year)s?)\b",
        r"(\b\d+\b)(\s*\btime(s)?\b)?\s*\b(a|an)?\s*\b(minute(s)?|hour(s)?|day(s)?|week(s)?|month(s)?|year(s)?|month(s)?\b)",
        r"\b(hourly|daily|weekly|fortnightly|monthly|yearly)\b",
        r"(\d+(\.\d+)?(\s*\w+)?(\s+\w+)?\s*per\s*(hr|hour|day|fortnight|month|year))",
        r"\b\d+(\s+or\s+\d+)?\s+\w+\s+(every|each|per)\s+(a\s+single\s+|single\s+|couple\s+of\s+|\d+\s+)?(minute(s)?|min(s)?|hour(s)?|hr(s)?|day(s)?|week(s)?|month(s)?|year(s)?|yr(s)?)\b",
        r"\b(one|two|three|four|five|six|seven|eight|nine|ten)\s+(or\s+(one|two|three|four|five|six|seven|eight|nine|ten))?\s+\w+\s+(every|each|per)\s+(a\s+single\s+|single\s+|couple\s+of\s+|(one|two|three|four|five|six|seven|eight|nine|ten)\s+)?(minute(s)?|min(s)?|hour(s)?|hr(s)?|day(s)?|week(s)?|month(s)?|year(s)?|yr(s)?)\b",
        r"((once|twice|thrice)\s*(every|each|per)?\s*(\d+)\s*((minute|hour|day|week|month|year)s?)\b)"
    ]

    sf_freq_result_df = pd.DataFrame(columns=['Phrase', 'Topic', 'Score'])

    for sentence in sentences:
        freq_matches = []
        temp_matches = []
        for pattern in frequency_patterns:
            match = re.search(pattern, sentence, flags=re.IGNORECASE)
            if match:
                temp_matches.append(match.group(0))
        if temp_matches:
            freq_matches.append(max(temp_matches, key=len))

        if freq_matches:
            sf_freq_result_df = sf_freq_result_df.append({'Phrase': ", ".join(freq_matches),
                                                          'Topic': 'FREQUENCY',
                                                          'Score': 0.75}, ignore_index=True)
        else:
            sf_freq_result_df = sf_freq_result_df.append({'Phrase': '',
                                                          'Topic': 'NO FREQUENCY',
                                                          'Score': 0.75}, ignore_index=True)

        if len(sf_freq_result_df) > 0:
            for i in range(len(sf_freq_result_df)):
                phrase = sf_freq_result_df.loc[i, 'Phrase']
                if ',' in phrase:
                    sf_freq_result_df.loc[i, 'Phrase'] = phrase.split(',')[0]
                    
    duration_patterns = [
        r"\b\d+\s*(minute(s)?|hour(s)?|day(s)?|week(s)?|month(s)?|year(s)?)\b",
        r"\bhalf an hour\b|\ban hour\b|\btwo hours\b|\ba day\b|\btwo days\b|\bthree days\b|\ba week\b|\btwo weeks\b|\bthree weeks\b|\ba month\b|\btwo months\b|\bthree months\b|\ba year\b|\btwo years\b|\bthree years\b"
    ]

    sf_dur_result_df = pd.DataFrame(columns=['Phrase', 'Topic', 'Score'])

    for sentence in sentences:
        dur_matches = []

        for pattern in duration_patterns:
            match = re.search(pattern, sentence, flags=re.IGNORECASE)
            if match:
                dur_matches.append(match.group(0))

        if dur_matches:
            sf_dur_result_df = sf_dur_result_df.append({'Phrase': ", ".join(dur_matches),
                                                          'Topic': 'DURATION',
                                                          'Score': 0.75}, ignore_index=True)
        else:
            sf_dur_result_df = sf_dur_result_df.append({'Phrase': '',
                                                          'Topic': 'NO DURATION',
                                                          'Score': 0.75}, ignore_index=True)

        if len(sf_dur_result_df) > 0:
            for i in range(len(sf_dur_result_df)):
                phrase = sf_dur_result_df.loc[i, 'Phrase']
                if ',' in phrase:
                    sf_dur_result_df.loc[i, 'Phrase'] = phrase.split(',')[0]
        sf_dur_lst = sf_dur_result_df['Phrase'].tolist()
        sf_freq_result_df = sf_freq_result_df[~sf_freq_result_df['Phrase'].isin(sf_dur_lst)] 

    return sf_freq_result_df

In [None]:
# setfit freq format output
ind_freq_topic_dict = {
        0: 'NO FREQUENCY',
        1: 'FREQUENCY',
    }

def sf_freq_color(df):
    return df.style.format({'Score': '{:,.2%}'.format}).bar(subset=['Score'], color='#FFFF00')

In [None]:
#regex dur query and get predicted topic

def detect_duration(sentences, sf_freq_result_df):
    duration_patterns = [
        r"\b\d+\s*(minute(s)?|hour(s)?|day(s)?|week(s)?|month(s)?|year(s)?)\b",
        r"\bhalf an hour\b|\ban hour\b|\btwo hours\b|\bthree hours\b|\bfour hours\b|\bfive hours\b|\bsix hours\b|\bseven hours\b|\beight hours\b|\bnine hours\b|\bten hours\b|\ba minute\b|\btwo minutes\b|\bthree minutes\b|\bfour minutes\b|\bfive minutes\b|\bsix minutes\b|\bseven minutes\b|\beight minutes\b|\bnine minutes\b|\bten minutes\b|\ba day\b|\btwo days\b|\bthree days\b|\bfour days\b|\bfive days\b|\bsix days\b|\bseven days\b|\beight days\b|\bnine days\b|\bten days\b|\ba week\b|\btwo weeks\b|\bthree weeks\b|\bfour weeks\b|\bfive weeks\b|\bsix weeks\b|\bseven weeks\b|\beight weeks\b|\bnine weeks\b|\bten weeks\b|\ba month\b|\btwo months\b|\bthree months\b|\bfour months\b|\bfive months\b|\bsix months\b|\bseven months\b|\beight months\b|\bnine months\b|\bten months\b|\ba year\b|\btwo years\b|\bthree years\b|\bfour years\b|\bfive years\b|\bsix years\b|\bseven years\b|\beight years\b|\bnine years\b|\bten years\b",
        r"\b\d+\s*(min|mins)\b",  # e.g., "5 mins"
        r"\b\d+\s*(hr|hrs|hour|hours)\b",  # e.g., "2 hrs"
        r"\b\d+\s*(d|day|days)\b",  # e.g., "3 days"
        r"\b\d+\s*(w|week|weeks)\b",  # e.g., "4 weeks"
        r"\b\d+\s*(m|month|months)\b",  # e.g., "6 months"
        r"\b\d+\s*(y|yr|year|years)\b",  # e.g., "1 yr"
        r"\b(\d+\s*(minute(s)?|hour(s)?|day(s)?|week(s)?|month(s)?|year(s)?)\s*,\s*){2,}\d+\s*(minute(s)?|hour(s)?|day(s)?|week(s)?|month(s)?|year(s)?)\b",  # e.g., "2 hours, 30 minutes"
        r"\b(half|quarter)\s+an?\s+(hour|hr)\b",  # e.g., "half an hour"
        r"\b(\d+(?:\.\d+)?|\d+(?:/\d+))\s*(hour|hr)s?\s*(and|&)\s*(\d+(?:\.\d+)?|\d+(?:/\d+))\s*(minute|min)s?\b",  # e.g., "1.5 hours & 30 mins"
        r"\b(\d+)\s*-\s*(\d+)\s*(minute|min|hour|hr|day|week|month|year)s?\b",  # e.g., "5 - 10 mins"
        r"\b(more than|less than)\s*\d+\s*(minute(s)?|hour(s)?|day(s)?|week(s)?|month(s)?|year(s)?)\b"  # e.g., "more than 3 hours"
    ]

    if len(sf_freq_result_df) > 0:
        sf_freq_lst = sf_freq_result_df['Phrase'].tolist()
    else:
        sf_freq_lst = []
        
    sf_dur_result_df = pd.DataFrame(columns=['Phrase', 'Topic', 'Score'])

    for sentence in sentences:
        dur_matches = []
        temp_matches = []
        
        for phrase in sf_freq_lst:
            sentence = sentence.replace(phrase, "")
        # Remove extra spaces
        sentence = ' '.join(sentence.split())

        for pattern in duration_patterns:
            match = re.search(pattern, sentence, flags=re.IGNORECASE)
            if match:
                temp_matches.append(match.group(0))
        if temp_matches:
            dur_matches.append(max(temp_matches, key=len))

        if dur_matches:
            sf_dur_result_df = sf_dur_result_df.append({'Phrase': ", ".join(dur_matches),
                                                          'Topic': 'DURATION',
                                                          'Score': 0.75}, ignore_index=True)
        else:
            sf_dur_result_df = sf_dur_result_df.append({'Phrase': '',
                                                          'Topic': 'NO DURATION',
                                                          'Score': 0.75}, ignore_index=True)

        if len(sf_dur_result_df) > 0:
            for i in range(len(sf_dur_result_df)):
                phrase = sf_dur_result_df.loc[i, 'Phrase']
                if ',' in phrase:
                    sf_dur_result_df.loc[i, 'Phrase'] = phrase.split(',')[0]

    return sf_dur_result_df

In [None]:
# setfit dur format output
ind_dur_topic_dict = {
        0: 'NO DURATION',
        1: 'DURATION',
    }

def sf_dur_color(df):
    return df.style.format({'Score': '{:,.2%}'.format}).bar(subset=['Score'], color='#DDA0DD')

In [None]:
#setfit sev query and get predicted topic

def get_sf_sev_topic(sentences):
    preds = list(sf_sev_model(sentences))
    return preds
def get_sf_sev_topic_scores(sentences):
    preds = sf_sev_model.predict_proba(sentences)
    preds = [max(list(x)) for x in preds]
    return preds

In [None]:
# setfit sev format output
ind_sev_topic_dict = {
        0: 'NO SEVERITY',
        1: 'SEVERITY',
    }

def sf_sev_color(df):
    return df.style.format({'Score': '{:,.2%}'.format}).bar(subset=['Score'], color='#FFCCCB')

In [None]:
def path_to_image_html(path):
    return '<img src="'+ path + '" width="30" height="15" />'

def display_final_df(tags):
    crits = [
        'Behaviour',
        'Frequency',
        'Duration',
        'Severity'
    ]
    descs = [
        'Are all behaviours described in a way that would allow another person to act them out?',
        'Has information been provided about how often the behaviours occur?',
        'Has information been provided about how long the behaviours last for?',
        'Has information been provided about how damaging or destructive the behaviours are?'
    ]
    paths = ['./thumbs_up.png' if x else './thumbs_down.png' for x in tags]
    df = pd.DataFrame({'Criteria': crits, 'Descrption': descs, 'Score': paths})
    df = df.set_index('Criteria')
    pd.set_option('display.max_colwidth', None)
    display(HTML('<div style="text-align: center;">' + df.to_html(classes=["align-center"], index=True, escape=False ,formatters=dict(Score=path_to_image_html)) + '</div>'))

### <font color='red'>Practitioner Section</font>
#### Enter description of <font color='blue'>behaviours</font> that align with this function. Include <font color='blue'>frequency</font>, <font color='blue'>duration</font>, and <font color='blue'>severity</font>

In [None]:
#demo with Voila

bhvr_label = widgets.Label(value='Please type your answer:')
bhvr_text_input = widgets.Textarea(
    value='',
    placeholder='Type your answer',
    description='',
    disabled=False,
    layout={'height': '300px', 'width': '90%'}
)

bhvr_nlp_btn = widgets.Button(
    description='Score Behaviours',
    disabled=False,
    button_style='success', # 'success', 'info', 'warning', 'danger' or ''
    tooltip='Score Behaviours',
    icon='check',
    layout={'height': '70px', 'width': '250px'}
)
bhvr_agr_btn = widgets.Button(
    description='Validate Data',
    disabled=False,
    button_style='success', # 'success', 'info', 'warning', 'danger' or ''
    tooltip='Validate Data',
    icon='check',
    layout={'height': '70px', 'width': '250px'}
)
bhvr_eval_btn = widgets.Button(
    description='Evaluate Model',
    disabled=False,
    button_style='success', # 'success', 'info', 'warning', 'danger' or ''
    tooltip='Evaluate Model',
    icon='check',
    layout={'height': '70px', 'width': '250px'}
)
btn_box = widgets.HBox([bhvr_nlp_btn, bhvr_agr_btn, bhvr_eval_btn], 
                       layout={'width': '100%', 'height': '160%'})
bhvr_outt = widgets.Output()
bhvr_outt.layout.height = '100%'
bhvr_outt.layout.width = '100%'
bhvr_box = widgets.VBox([bhvr_text_input, btn_box, bhvr_outt], 
                   layout={'width': '100%', 'height': '160%'})
dataset_rg_name = 'pbsp-page3-bhvr-argilla-ds'
agrilla_df = None
annotated = False
sub_2_result_dfs = []
def on_bhvr_button_next(b):
    global bhvr_onto_lst, cl_bhvr_onto_lst, emb_bhvr_onto_lst, agrilla_df
    with bhvr_outt:
        clear_output()
        bhvr_onto_lst = bhvr_onto_text_input.value.split("\n")
        cl_bhvr_onto_lst = preprocess(bhvr_onto_lst)
        orig_cl_dict = {x:y for x,y in zip(cl_bhvr_onto_lst, bhvr_onto_lst)}
        emb_bhvr_onto_lst = sentence_embeddings(cl_bhvr_onto_lst)
        add_to_collection()
        query = bhvr_text_input.value
        vbs = get_verb_phrases(query)
        cl_vbs = preprocess(vbs)
        emb_vbs = sentence_embeddings(cl_vbs)
        vb_ind = -1
        highlights = []
        highlight_scores = []
        result_dfs = []
        for query_vector in emb_vbs:
            vb_ind += 1
            hist = search_collection('behaviours', query_vector)
            hist_dict = [dict(x) for x in hist]
            scores = [x['score'] for x in hist_dict]
            payloads = [orig_cl_dict[x['payload']['phrase']] for x in hist_dict]
            result_df = pd.DataFrame({'Score': scores, 'Glossary': payloads})
            result_df = result_df[result_df['Score'] >= semantic_passing_score]
            if len(result_df) > 0:
                highlights.append(vbs[vb_ind])
                highlight_scores.append(result_df.Score.max())
                result_df['Phrase'] = [vbs[vb_ind]] * len(result_df)
                result_df = result_df.sort_values(by='Score', ascending=False).reset_index(drop=True)
                result_dfs.append(result_df)
            else:
                continue
        ents = []
        colors = {}
        if len(highlights) > 0:
            ents = annotate_query(highlights, query)
            for ent in ents:
                colors[ent['label']] = '#ADD8E6'
        
        #setfit behaviour
        sentences = extract_sentences(query)
        cl_sentences = preprocess(sentences)
        topic_inds = get_sf_bhvr_topic(cl_sentences)
        topics = [ind_bhvr_topic_dict[i] for i in topic_inds]
        scores = get_sf_bhvr_topic_scores(cl_sentences)
        sf_bhvr_result_df = pd.DataFrame({'Phrase': sentences, 'Topic': topics, 'Score': scores})
        sf_bhvr_sub_result_df = sf_bhvr_result_df[sf_bhvr_result_df['Topic'] == 'BEHAVIOUR']
        sub_2_result_df = sf_bhvr_sub_result_df.copy()
        if len(sub_2_result_df) > 0:
            sub_2_result_dfs.append(sub_2_result_df)
        sf_bhvr_highlights = []
        sf_bhvr_ents = []
        if len(sf_bhvr_sub_result_df) > 0:
            sf_bhvr_highlights = sf_bhvr_sub_result_df['Phrase'].tolist()
            sf_bhvr_highlight_topics = sf_bhvr_sub_result_df['Topic'].tolist()
            sf_bhvr_highlight_scores = sf_bhvr_sub_result_df['Score'].tolist()    
            sf_bhvr_ents = sf_annotate_query(sf_bhvr_highlights, query, sf_bhvr_highlight_topics)
            for ent, hs in zip(sf_bhvr_ents, sf_bhvr_highlight_scores):
                if hs >= passing_score:
                    colors[ent['label']] = '#CCFFCC'
                else:
                    colors[ent['label']] = '#FFCC66'
        options = {"ents": list(colors), "colors": colors}
        if len(sf_bhvr_ents) > 0:
            ents = ents + sf_bhvr_ents
            
        #regex frequency
        sf_freq_result_df = detect_frequency(sentences)
        sf_freq_sub_result_df = sf_freq_result_df[sf_freq_result_df['Topic'] == 'FREQUENCY']
        sub_2_result_df = sf_freq_sub_result_df.copy()
        if len(sub_2_result_df) > 0:
            sub_2_result_dfs.append(sub_2_result_df)
        sf_freq_highlights = []
        sf_freq_ents = []
        if len(sf_freq_sub_result_df) > 0:
            sf_freq_highlights = sf_freq_sub_result_df['Phrase'].tolist()
            sf_freq_highlight_topics = sf_freq_sub_result_df['Topic'].tolist()
            sf_freq_highlight_scores = sf_freq_sub_result_df['Score'].tolist()    
            sf_freq_ents = sf_annotate_query(sf_freq_highlights, query, sf_freq_highlight_topics)
            for ent, hs in zip(sf_freq_ents, sf_freq_highlight_scores):
                if hs >= passing_score:
                    colors[ent['label']] = '#FFFF00'
                else:
                    colors[ent['label']] = '#FFCC66'
        options = {"ents": list(colors), "colors": colors}
        if len(sf_freq_ents) > 0:
            ents = ents + sf_freq_ents
            
        #regex duration
        sf_dur_result_df = detect_duration(sentences, sf_freq_result_df)
        sf_dur_sub_result_df = sf_dur_result_df[sf_dur_result_df['Topic'] == 'DURATION']
        sub_2_result_df = sf_dur_sub_result_df.copy()
        if len(sub_2_result_df) > 0:
            sub_2_result_dfs.append(sub_2_result_df)
        sf_dur_highlights = []
        sf_dur_ents = []
        if len(sf_dur_sub_result_df) > 0:
            sf_dur_highlights = sf_dur_sub_result_df['Phrase'].tolist()
            sf_dur_highlight_topics = sf_dur_sub_result_df['Topic'].tolist()
            sf_dur_highlight_scores = sf_dur_sub_result_df['Score'].tolist()    
            sf_dur_ents = sf_annotate_query(sf_dur_highlights, query, sf_dur_highlight_topics)
            for ent, hs in zip(sf_dur_ents, sf_dur_highlight_scores):
                if hs >= passing_score:
                    colors[ent['label']] = '#DDA0DD'
                else:
                    colors[ent['label']] = '#FFCC66'
        options = {"ents": list(colors), "colors": colors}
        if len(sf_dur_ents) > 0:
            ents = ents + sf_dur_ents
            
        #setfit severity
        topic_inds = get_sf_sev_topic(sentences)
        topics = [ind_sev_topic_dict[i] for i in topic_inds]
        scores = get_sf_sev_topic_scores(sentences)
        sf_sev_result_df = pd.DataFrame({'Phrase': sentences, 'Topic': topics, 'Score': scores})
        sf_sev_sub_result_df = sf_sev_result_df[sf_sev_result_df['Topic'] == 'SEVERITY']
        sub_2_result_df = sf_sev_sub_result_df.copy()
        if len(sub_2_result_df) > 0:
            sub_2_result_dfs.append(sub_2_result_df)
        sf_sev_highlights = []
        sf_sev_ents = []
        if len(sf_sev_sub_result_df) > 0:
            sf_sev_highlights = sf_sev_sub_result_df['Phrase'].tolist()
            sf_sev_highlight_topics = sf_sev_sub_result_df['Topic'].tolist()
            sf_sev_highlight_scores = sf_sev_sub_result_df['Score'].tolist()    
            sf_sev_ents = sf_annotate_query(sf_sev_highlights, query, sf_sev_highlight_topics)
            for ent, hs in zip(sf_sev_ents, sf_sev_highlight_scores):
                if hs >= passing_score:
                    colors[ent['label']] = '#FFCCCB'
                else:
                    colors[ent['label']] = '#FFCC66'
        options = {"ents": list(colors), "colors": colors}
        if len(sf_sev_ents) > 0:
            ents = ents + sf_sev_ents
            
        ex = [{"text": query,
               "ents": ents,
               "title": None}]
        if len(ents) > 0:
            title = "Answer Highlights"
            display(HTML(f'<center><h1>{title}</h1></center>'))
            html = displacy.render(ex, style="ent", manual=True, options=options)
            display(HTML(html))
        if len(result_dfs) > 0:
            title = "Subtopics"
            display(HTML(f'<center><h1 style="background-color: #ADD8E6; padding: 5px 10px;">{title}</h1></center>'))
            result_df = pd.concat(result_dfs).reset_index(drop = True)
            result_df = result_df.sort_values(by='Score', ascending=False).reset_index(drop=True)
            sub_2_result_df = result_df.copy()
            sub_2_result_df['Topic'] = ['BEHAVIOUR'] * len(result_df)
            sub_2_result_df = sub_2_result_df[['Phrase', 'Topic', 'Score']].drop_duplicates().reset_index(drop=True)
            sub_2_result_dfs.append(sub_2_result_df)
            agg_df = result_df.groupby(result_df.Phrase).max()
            agg_df['Phrase'] = agg_df.index
            agg_df = agg_df.reset_index(drop=True)
            agg_df = agg_df.drop(columns=['Glossary'])
            result_df = pd.merge(result_df, agg_df, 'inner', ['Phrase', 'Score'])
            result_df = result_df[['Phrase', 'Glossary', 'Score']]
            result_df = result_df.set_index('Phrase')
            display(color(result_df))
        bhvr_tag = False
        freq_tag = False
        dur_tag = False
        sev_tag = False
        if len(sf_bhvr_sub_result_df) > 0:
            bhvr_tag = True
            title = "Relevant Behaviours"
            display(HTML(f'<center><h1 style="background-color: #CCFFCC; padding: 5px 10px;">{title}</h1></center>'))
            result_df = sf_bhvr_sub_result_df.sort_values(by='Score', ascending=False).reset_index(drop=True)
            result_df = result_df.set_index('Phrase')
            display(sf_bhvr_color(result_df))
        if len(sf_freq_sub_result_df) > 0:
            freq_tag = True
            title = "Relevant Frequencies"
            display(HTML(f'<center><h1 style="background-color: #FFFF00; padding: 5px 10px;">{title}</h1></center>'))
            result_df = sf_freq_sub_result_df.sort_values(by='Score', ascending=False).reset_index(drop=True)
            result_df = result_df.set_index('Phrase')
            display(sf_freq_color(result_df))
        if len(sf_dur_sub_result_df) > 0:
            dur_tag = True
            title = "Relevant Durations"
            display(HTML(f'<center><h1 style="background-color: #DDA0DD; padding: 5px 10px;">{title}</h1></center>'))
            result_df = sf_dur_sub_result_df.sort_values(by='Score', ascending=False).reset_index(drop=True)
            result_df = result_df.set_index('Phrase')
            display(sf_dur_color(result_df))
        if len(sf_sev_sub_result_df) > 0:
            sev_tag = True
            title = "Relevant Severities"
            display(HTML(f'<center><h1 style="background-color: #FFCCCB; padding: 5px 10px;">{title}</h1></center>'))
            result_df = sf_sev_sub_result_df.sort_values(by='Score', ascending=False).reset_index(drop=True)
            result_df = result_df.set_index('Phrase')
            display(sf_sev_color(result_df))
        title = "Final Scores"
        display(HTML(f'<left><h1>{title}</h1></left>'))
        display_final_df([bhvr_tag, freq_tag, dur_tag, sev_tag])
        if len(sub_2_result_dfs) > 0:
            sub_2_result_df = pd.concat(sub_2_result_dfs).reset_index(drop=True)
            null_df = get_null_class_df(sentences, sub_2_result_df)
            if len(null_df) > 0:
                sub_2_result_df = pd.concat([sub_2_result_df, null_df]).reset_index(drop=True)
        agrilla_df = sub_2_result_df.copy()

def on_agr_button_next(b):
    global agrilla_df, annotated
    with bhvr_outt:
        clear_output()
        if agrilla_df is not None:
            # convert the dataframe to the structure accepted by argilla
            converted_df = convert_df(agrilla_df)
            # convert pandas dataframe to DatasetForTextClassification
            dataset_rg = rg.DatasetForTextClassification.from_pandas(converted_df)
            # delete the old DatasetForTextClassification from the Argilla web app if exists
            rg.delete(dataset_rg_name, workspace="admin")
            # load the new DatasetForTextClassification into the Argilla web app
            rg.log(dataset_rg, name=dataset_rg_name, workspace="admin")
            # Make sure all classes are present for annotation
            rg_settings = rg.TextClassificationSettings(label_schema=['BEHAVIOUR', 
                                                                      'FREQUENCY', 
                                                                      'DURATION', 
                                                                      'SEVERITY', 
                                                                      'NONE'])
            rg.configure_dataset(name=dataset_rg_name, workspace="admin", settings=rg_settings)
            annotated = True
        else:
            display(Markdown("<h2 style='color:red; text-align:center;'>Please score the answer first!</h2>"))
            
def on_eval_button_next(b):
    global annotated
    with bhvr_outt:
        clear_output()
        if annotated:
            display(f1(dataset_rg_name).visualize())
        else:
            display(Markdown("<h2 style='color:red; text-align:center;'>Please score the answer and validate the data first!</h2>"))

bhvr_nlp_btn.on_click(on_bhvr_button_next)
bhvr_agr_btn.on_click(on_agr_button_next)
bhvr_eval_btn.on_click(on_eval_button_next)

display(bhvr_label, bhvr_box)