|
|
|
|
|
|
|
import time |
|
import numpy as np |
|
import pandas as pd |
|
import torch |
|
import faiss |
|
from sklearn.preprocessing import normalize |
|
from transformers import AutoTokenizer, AutoModelForQuestionAnswering |
|
from sentence_transformers import SentenceTransformer,util |
|
from pythainlp import Tokenizer |
|
import pickle |
|
import evaluate |
|
from sklearn.metrics.pairwise import cosine_similarity,euclidean_distances |
|
import gradio as gr |
|
|
|
print(torch.cuda.is_available()) |
|
|
|
__all__ = [ |
|
"mdeberta", |
|
"wangchanberta-hyp", |
|
] |
|
|
|
predict_method = [ |
|
"faiss", |
|
"faissWithModel", |
|
"cosineWithModel", |
|
"semanticSearchWithModel", |
|
] |
|
|
|
DEFAULT_MODEL='wangchanberta-hyp' |
|
DEFAULT_SENTENCE_EMBEDDING_MODEL='intfloat/multilingual-e5-base' |
|
|
|
MODEL_DICT = { |
|
'wangchanberta': 'Chananchida/wangchanberta-th-wiki-qa_ref-params', |
|
'wangchanberta-hyp': 'Chananchida/wangchanberta-th-wiki-qa_hyp-params', |
|
'mdeberta': 'Chananchida/mdeberta-v3-th-wiki-qa_ref-params', |
|
'mdeberta-hyp': 'Chananchida/mdeberta-v3-th-wiki-qa_hyp-params', |
|
} |
|
|
|
DATA_PATH='models/dataset.xlsx' |
|
EMBEDDINGS_PATH='models/embeddings.pkl' |
|
|
|
|
|
class ChatbotModel: |
|
def __init__(self, model=DEFAULT_MODEL): |
|
self._chatbot = Chatbot() |
|
self._chatbot.load_data() |
|
self._chatbot.load_model(model) |
|
self._chatbot.load_embedding_model(DEFAULT_SENTENCE_EMBEDDING_MODEL) |
|
self._chatbot.set_vectors() |
|
self._chatbot.set_index() |
|
|
|
|
|
def chat(self, question): |
|
return self._chatbot.answer_question(question) |
|
|
|
def eval(self,model,predict_method): |
|
return self._chatbot.eval(model_name=model,predict_method=predict_method) |
|
|
|
class Chatbot: |
|
def __init__(self): |
|
|
|
self.df = None |
|
self.test_df = None |
|
self.model = None |
|
self.model_name = None |
|
self.tokenizer = None |
|
self.embedding_model = None |
|
self.vectors = None |
|
self.index = None |
|
self.k = 1 |
|
|
|
def load_data(self, path: str = DATA_PATH): |
|
self.df = pd.read_excel(path, sheet_name='Default') |
|
self.df['Context'] = pd.read_excel(path, sheet_name='mdeberta')['Context'] |
|
|
|
|
|
def load_model(self, model_name: str = DEFAULT_MODEL): |
|
self.model = AutoModelForQuestionAnswering.from_pretrained(MODEL_DICT[model_name]) |
|
self.tokenizer = AutoTokenizer.from_pretrained(MODEL_DICT[model_name]) |
|
self.model_name = model_name |
|
|
|
|
|
def load_embedding_model(self, model_name: str = DEFAULT_SENTENCE_EMBEDDING_MODEL): |
|
if torch.cuda.is_available(): |
|
self.embedding_model = SentenceTransformer(model_name, device='cpu') |
|
else: self.embedding_model = SentenceTransformer(model_name) |
|
|
|
|
|
def set_vectors(self): |
|
self.vectors = self.prepare_sentences_vector(self.load_embeddings(EMBEDDINGS_PATH)) |
|
|
|
def set_index(self): |
|
if torch.cuda.is_available(): |
|
res = faiss.StandardGpuResources() |
|
self.index = faiss.IndexFlatL2(self.vectors.shape[1]) |
|
gpu_index_flat = faiss.index_cpu_to_gpu(res, 0, self.index) |
|
gpu_index_flat.add(self.vectors) |
|
self.index = gpu_index_flat |
|
else: |
|
self.index = faiss.IndexFlatL2(self.vectors.shape[1]) |
|
self.index.add(self.vectors) |
|
|
|
def get_embeddings(self, text_list): |
|
return self.embedding_model.encode(text_list) |
|
|
|
def prepare_sentences_vector(self, encoded_list): |
|
encoded_list = [i.reshape(1, -1) for i in encoded_list] |
|
encoded_list = np.vstack(encoded_list).astype('float32') |
|
encoded_list = normalize(encoded_list) |
|
return encoded_list |
|
|
|
|
|
def store_embeddings(self, embeddings): |
|
with open('models/embeddings.pkl', "wb") as fOut: |
|
pickle.dump({'sentences': self.df['Question'], 'embeddings': embeddings}, fOut, protocol=pickle.HIGHEST_PROTOCOL) |
|
print('Store embeddings done') |
|
|
|
def load_embeddings(self, file_path): |
|
with open(file_path, "rb") as fIn: |
|
stored_data = pickle.load(fIn) |
|
stored_sentences = stored_data['sentences'] |
|
stored_embeddings = stored_data['embeddings'] |
|
print('Load (questions) embeddings done') |
|
return stored_embeddings |
|
|
|
def model_pipeline(self, question, similar_context): |
|
inputs = self.tokenizer(question, similar_context, return_tensors="pt") |
|
with torch.no_grad(): |
|
outputs = self.model(**inputs) |
|
answer_start_index = outputs.start_logits.argmax() |
|
answer_end_index = outputs.end_logits.argmax() |
|
predict_answer_tokens = inputs.input_ids[0, answer_start_index: answer_end_index + 1] |
|
Answer = self.tokenizer.decode(predict_answer_tokens) |
|
return Answer |
|
|
|
def faiss_search(self, question_vector): |
|
distances, indices = self.index.search(question_vector, self.k) |
|
similar_questions = [self.df['Question'][indices[0][i]] for i in range(self.k)] |
|
similar_contexts = [self.df['Context'][indices[0][i]] for i in range(self.k)] |
|
return similar_questions, similar_contexts, distances, indices |
|
|
|
def predict_faiss(self, message): |
|
message = message.strip() |
|
question_vector = self.get_embeddings(message) |
|
question_vector = self.prepare_sentences_vector([question_vector]) |
|
similar_questions, similar_contexts, distances, indices = self.faiss_search(question_vector) |
|
Answers = [self.df['Answer'][i] for i in indices[0]] |
|
Answer = Answers[0] |
|
|
|
return Answer |
|
|
|
def predict_faiss(self, message): |
|
message = message.strip() |
|
question_vector = self.get_embeddings([message]) |
|
question_vector = self.prepare_sentences_vector([question_vector]) |
|
similar_questions, similar_contexts, distances, indices = self.faiss_search(question_vector) |
|
Answers = [self.df['Answer'][i] for i in indices[0]] |
|
Answer = Answers[0] |
|
|
|
return Answer |
|
|
|
|
|
|
|
|
|
def predict_bert_embedding(self,message): |
|
message = message.strip() |
|
question_vector = self.get_embeddings(message) |
|
question_vector=self.prepare_sentences_vector([question_vector]) |
|
similar_questions, similar_contexts, distances,indices = self.faiss_search(question_vector) |
|
Answer = self.model_pipeline(similar_questions, similar_contexts) |
|
return Answer |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def predict_semantic_search(self, message): |
|
corpus_embeddings = bot._chatbot.prepare_sentences_vector(bot._chatbot.get_embeddings(bot._chatbot.df['Context'])) |
|
message = message.strip() |
|
query_embedding = self.embedding_model.encode(message, convert_to_tensor=True) |
|
query_embedding = query_embedding.to('cuda') |
|
hits = util.semantic_search(query_embedding, corpus_embeddings, top_k=1) |
|
hit = hits[0][0] |
|
context = self.df['Context'][hit['corpus_id']] |
|
Answer = self.model_pipeline(message, context) |
|
return Answer |
|
|
|
|
|
|
|
def predict_without_faiss(self,message): |
|
MostSimilarContext = "" |
|
min_distance = 1000 |
|
message = message.strip(' \t\n') |
|
question_vector = self.get_embeddings([message]) |
|
question_vector=self.prepare_sentences_vector(question_vector) |
|
for j, _question_vector in enumerate(self.vectors): |
|
distance = euclidean_distances(question_vector, _question_vector.reshape(1, -1))[0][0] |
|
if distance < min_distance: |
|
min_distance = distance |
|
MostSimilarContext = self.df['Context'][j] |
|
similar_question = self.df['Question'][j] |
|
if distance <= 0.02469331026: |
|
break |
|
predict_answer = self.model_pipeline(message, MostSimilarContext) |
|
Answer = predict_answer.strip().replace("<unk>","@") |
|
return Answer |
|
|
|
bot = ChatbotModel() |
|
|
|
"""#Gradio""" |
|
|
|
|
|
EXAMPLE_PATH = ["หลิน ไห่เฟิง มีชื่อเรียกอีกชื่อว่าอะไร" , "ใครเป็นผู้ตั้งสภาเศรษฐกิจโลกขึ้นในปี พ.ศ. 2514 โดยทุกปีจะมีการประชุมที่ประเทศสวิตเซอร์แลนด์", "โปรดิวเซอร์ของอัลบั้มตลอดกาล ของวงคีรีบูนคือใคร", "สกุลเดิมของหม่อมครูนุ่ม นวรัตน ณ อยุธยา คืออะไร"] |
|
|
|
|
|
|
|
|
|
|
|
demoFaiss = gr.Interface(fn=bot._chatbot.predict_faiss, inputs="text", outputs="text", title="TH wiki (just Faiss)") |
|
demoBert = gr.Interface(fn=bot._chatbot.predict_bert_embedding, inputs="text", outputs="text", title="TH wiki (Faiss & Model)") |
|
demoSemantic = gr.Interface(fn=bot._chatbot.predict_semantic_search, inputs="text", outputs="text", title="TH wiki (Semantic Search & Model)") |
|
demoWithoutFiss = gr.Interface(fn=bot._chatbot.predict_without_faiss, inputs="text", outputs="text", title="TH wiki (just Model)") |
|
|
|
demo = gr.TabbedInterface([demoFaiss, demoWithoutFiss, demoBert, demoSemantic], ["Faiss", "Model", "Faiss & Model", "Semantic Search & Model"]) |
|
demo.launch() |
|
|