Spaces:
Sleeping
Sleeping
# @title web interface demo | |
# import random | |
# import gradio as gr | |
# import time | |
# import numpy as np | |
# import pandas as pd | |
# import torch | |
# import faiss | |
# from sklearn.preprocessing import normalize | |
# from transformers import AutoTokenizer, AutoModelForQuestionAnswering | |
# from sentence_transformers import SentenceTransformer, util | |
# from pythainlp import Tokenizer | |
# import pickle | |
# import re | |
# from pythainlp.tokenize import sent_tokenize | |
# DEFAULT_MODEL = 'wangchanberta-hyp' | |
# DEFAULT_SENTENCE_EMBEDDING_MODEL = 'intfloat/multilingual-e5-base' | |
# MODEL_DICT = { | |
# 'wangchanberta': 'Chananchida/wangchanberta-xet_ref-params', | |
# 'wangchanberta-hyp': 'Chananchida/wangchanberta-xet_hyp-params', | |
# } | |
# EMBEDDINGS_PATH = 'data/embeddings.pkl' | |
# DATA_PATH='data/dataset.xlsx' | |
# def load_data(path=DATA_PATH): | |
# df = pd.read_excel(path, sheet_name='Default') | |
# df['Context'] = pd.read_excel(path, sheet_name='mdeberta')['Context'] | |
# print(len(df)) | |
# print('Load data done') | |
# return df | |
# def load_model(model_name=DEFAULT_MODEL): | |
# model = AutoModelForQuestionAnswering.from_pretrained(MODEL_DICT[model_name]) | |
# tokenizer = AutoTokenizer.from_pretrained(MODEL_DICT[model_name]) | |
# print('Load model done') | |
# return model, tokenizer | |
# def load_embedding_model(model_name=DEFAULT_SENTENCE_EMBEDDING_MODEL): | |
# # if torch.cuda.is_available(): | |
# # embedding_model = SentenceTransformer(model_name, device='cuda') | |
# # else: | |
# embedding_model = SentenceTransformer(model_name) | |
# print('Load sentence embedding model done') | |
# return embedding_model | |
# def set_index(vector): | |
# if torch.cuda.is_available(): | |
# res = faiss.StandardGpuResources() | |
# index = faiss.IndexFlatL2(vector.shape[1]) | |
# gpu_index_flat = faiss.index_cpu_to_gpu(res, 0, index) | |
# gpu_index_flat.add(vector) | |
# index = gpu_index_flat | |
# else: | |
# index = faiss.IndexFlatL2(vector.shape[1]) | |
# index.add(vector) | |
# return index | |
# def get_embeddings(embedding_model, text_list): | |
# return embedding_model.encode(text_list) | |
# def prepare_sentences_vector(encoded_list): | |
# encoded_list = [i.reshape(1, -1) for i in encoded_list] | |
# encoded_list = np.vstack(encoded_list).astype('float32') | |
# encoded_list = normalize(encoded_list) | |
# return encoded_list | |
# def store_embeddings(df, embeddings): | |
# with open('embeddings.pkl', "wb") as fOut: | |
# pickle.dump({'sentences': df['Question'], 'embeddings': embeddings}, fOut, protocol=pickle.HIGHEST_PROTOCOL) | |
# print('Store embeddings done') | |
# def load_embeddings(file_path=EMBEDDINGS_PATH): | |
# with open(file_path, "rb") as fIn: | |
# stored_data = pickle.load(fIn) | |
# stored_sentences = stored_data['sentences'] | |
# stored_embeddings = stored_data['embeddings'] | |
# print('Load (questions) embeddings done') | |
# return stored_embeddings | |
# def model_pipeline(model, tokenizer, question, similar_context): | |
# inputs = tokenizer(question, similar_context, return_tensors="pt") | |
# with torch.no_grad(): | |
# outputs = model(**inputs) | |
# answer_start_index = outputs.start_logits.argmax() | |
# answer_end_index = outputs.end_logits.argmax() | |
# predict_answer_tokens = inputs.input_ids[0, answer_start_index: answer_end_index + 1] | |
# Answer = tokenizer.decode(predict_answer_tokens) | |
# return Answer.replace('<unk>','@') | |
# def faiss_search(index, question_vector, k=1): | |
# distances, indices = index.search(question_vector, k) | |
# return distances,indices | |
# def create_segment_index(vector): | |
# segment_index = faiss.IndexFlatL2(vector.shape[1]) | |
# segment_index.add(vector) | |
# return segment_index | |
# def predict_faiss(model, tokenizer, embedding_model, df, question, index): | |
# t = time.time() | |
# question = question.strip() | |
# question_vector = get_embeddings(embedding_model, question) | |
# question_vector = prepare_sentences_vector([question_vector]) | |
# distances,indices = faiss_search(index, question_vector) | |
# Answers = [df['Answer'][i] for i in indices[0]] | |
# _time = time.time() - t | |
# output = { | |
# "user_question": question, | |
# "answer": Answers[0], | |
# "totaltime": round(_time, 3), | |
# "score": round(distances[0][0], 4) | |
# } | |
# return output | |
# def predict(model, tokenizer, embedding_model, df, question, index): | |
# t = time.time() | |
# question = question.strip() | |
# question_vector = get_embeddings(embedding_model, question) | |
# question_vector = prepare_sentences_vector([question_vector]) | |
# distances,indices = faiss_search(index, question_vector) | |
# # Answer = model_pipeline(model, tokenizer, df['Question'][indices[0][0]], df['Context'][indices[0][0]]) | |
# Answer = model_pipeline(model, tokenizer, question, df['Context'][indices[0][0]]) | |
# _time = time.time() - t | |
# output = { | |
# "user_question": question, | |
# "answer": Answer, | |
# "totaltime": round(_time, 3), | |
# "distance": round(distances[0][0], 4) | |
# } | |
# return Answer | |
# def predict_test(model, tokenizer, embedding_model, df, question, index): # sent_tokenize pythainlp | |
# t = time.time() | |
# question = question.strip() | |
# question_vector = get_embeddings(embedding_model, question) | |
# question_vector = prepare_sentences_vector([question_vector]) | |
# distances,indices = faiss_search(index, question_vector) | |
# mostSimContext = df['Context'][indices[0][0]] | |
# pattern = r'(?<=\s{10}).*' | |
# matches = re.search(pattern, mostSimContext, flags=re.DOTALL) | |
# if matches: | |
# mostSimContext = matches.group(0) | |
# mostSimContext = mostSimContext.strip() | |
# mostSimContext = re.sub(r'\s+', ' ', mostSimContext) | |
# segments = sent_tokenize(mostSimContext, engine="crfcut") | |
# segment_embeddings = get_embeddings(embedding_model, segments) | |
# segment_embeddings = prepare_sentences_vector(segment_embeddings) | |
# segment_index = create_segment_index(segment_embeddings) | |
# _distances,_indices = faiss_search(segment_index, question_vector) | |
# mostSimSegment = segments[_indices[0][0]] | |
# Answer = model_pipeline(model, tokenizer,question,mostSimSegment) | |
# if len(Answer) <= 2: | |
# Answer = mostSimSegment | |
# # Find the start and end indices of mostSimSegment within mostSimContext | |
# start_index = mostSimContext.find(Answer) | |
# end_index = start_index + len(Answer) | |
# print(f"answer {len(Answer)} => {Answer} || startIndex =>{start_index} || endIndex =>{end_index}") | |
# print(f"mostSimContext{len(mostSimContext)}=>{mostSimContext}\nsegments{len(segments)}=>{segments}\nmostSimSegment{len(mostSimSegment)}=>{mostSimSegment}") | |
# _time = time.time() - t | |
# output = { | |
# "user_question": question, | |
# "answer": df['Answer'][indices[0][0]], | |
# "totaltime": round(_time, 3), | |
# "distance": round(distances[0][0], 4), | |
# "highlight_start": start_index, | |
# "highlight_end": end_index | |
# } | |
# return output | |
# def highlight_text(text, start_index, end_index): | |
# if start_index < 0: | |
# start_index = 0 | |
# if end_index > len(text): | |
# end_index = len(text) | |
# highlighted_text = "" | |
# for i, char in enumerate(text): | |
# if i == start_index: | |
# highlighted_text += "<mark>" | |
# highlighted_text += char | |
# if i == end_index - 1: | |
# highlighted_text += "</mark>" | |
# return highlighted_text | |
# def chat_interface_before(question, history): | |
# response = predict(model, tokenizer, embedding_model, df, question, index) | |
# return response | |
# def chat_interface_after(question, history): | |
# response = predict_test(model, tokenizer, embedding_model, df, question, index) | |
# highlighted_answer = highlight_text(response["answer"], response["highlight_start"], response["highlight_end"]) | |
# return highlighted_answer | |
# examples=[ | |
# 'ขอเลขที่บัญชีของบริษัทหน่อย', | |
# 'บริษัทตั้งอยู่ที่ถนนอะไร', | |
# 'ขอช่องทางติดตามข่าวสารทาง Line หน่อย', | |
# 'อยากทราบความถี่ในการดึงข้อมูลของ DXT360 ในแต่ละแพลตฟอร์ม', | |
# 'อยากทราบความถี่ในการดึงข้อมูลของ DXT360 บน Twitter', | |
# # 'ช่องทางติดตามข่าวสารของเรา', | |
# ] | |
# demo_before = gr.ChatInterface(fn=chat_interface_before, | |
# examples=examples) | |
# demo_after = gr.ChatInterface(fn=chat_interface_after, | |
# examples=examples) | |
# interface = gr.TabbedInterface([demo_before, demo_after], ["Before", "After"]) | |
# if __name__ == "__main__": | |
# # Load your model, tokenizer, data, and index here... | |
# df = load_data() | |
# model, tokenizer = load_model('wangchanberta-hyp') | |
# embedding_model = load_embedding_model() | |
# index = set_index(prepare_sentences_vector(load_embeddings(EMBEDDINGS_PATH))) | |
# interface.launch() | |
import random | |
import gradio as gr | |
import time | |
import numpy as np | |
import pandas as pd | |
import torch | |
import faiss | |
from sklearn.preprocessing import normalize | |
from transformers import AutoTokenizer, AutoModelForQuestionAnswering | |
from sentence_transformers import SentenceTransformer, util | |
from pythainlp import Tokenizer | |
import pickle | |
import re | |
from pythainlp.tokenize import sent_tokenize | |
DEFAULT_MODEL = 'wangchanberta-hyp' | |
DEFAULT_SENTENCE_EMBEDDING_MODEL = 'intfloat/multilingual-e5-base' | |
MODEL_DICT = 'Chananchida/wangchanberta-xet_hyp-params' | |
EMBEDDINGS_PATH = 'data/embeddings.pkl' | |
DATA_PATH='data/dataset.xlsx' | |
class ChatBot: | |
SHEET_NAME_MDEBERTA = 'mdeberta' | |
SHEET_NAME_DEFAULT = 'Default' | |
UNKNOWN_ANSWERS = ["กรุณาลงรายระเอียดมากกว่านี้ได้มั้ยคะ", "ขอโทษค่ะลูกค้า ดิฉันไม่ทราบจริง ๆ"] | |
def __init__(self, df_path=None, model_path=None, tokenizer_path=None, embedding_model_name=None, embeddingsPath=None): | |
self.df = None | |
self.model = None | |
self.tokenizer = None | |
self.embedding_model = None | |
self.index = None | |
self.k = 5 | |
if all(arg is not None for arg in (df_path, model_path, tokenizer_path, embedding_model_name, embeddingsPath)): | |
self.set_df(df_path) | |
self.set_model(model_path) | |
self.set_tokenizer(tokenizer_path) | |
self.set_embedding_model(embedding_model_name) | |
sentences_vector = self.load_embeddings(embeddingsPath) | |
repared_vector = self.prepare_sentences_vector(sentences_vector) | |
self.set_index(repared_vector) | |
def set_index(self, vector): | |
if torch.cuda.is_available(): # Check if GPU is available | |
res = faiss.StandardGpuResources() | |
index = faiss.IndexFlatL2(vector.shape[1]) | |
gpu_index_flat = faiss.index_cpu_to_gpu(res, 0, index) | |
gpu_index_flat.add(vector) | |
self.index = gpu_index_flat | |
else: # If GPU is not available, use CPU-based Faiss index | |
self.index = faiss.IndexFlatL2(vector.shape[1]) | |
self.index.add(vector) | |
return self.index | |
def set_df(self, path): | |
self.df = pd.read_excel(path, sheet_name=self.SHEET_NAME_DEFAULT) | |
self.df.rename(columns={'Response': 'Answer'}, inplace=True) | |
self.df['Context'] = pd.read_excel(path, self.SHEET_NAME_MDEBERTA)['Context'] | |
def set_model(self, model): | |
self.model = AutoModelForQuestionAnswering.from_pretrained(model) | |
def set_tokenizer(self, tokenizer): | |
self.tokenizer = AutoTokenizer.from_pretrained(tokenizer) | |
def set_embedding_model(self, model): | |
self.embedding_model = SentenceTransformer(model) | |
def set_k(self, k_value): | |
self.k = k_value | |
def get_df(self): | |
return self.df | |
def get_model(self): | |
return self.model | |
def get_tokenizer(self): | |
return self.tokenizer | |
def get_embedding_model(self): | |
return self.embedding_model | |
def get_index(self): | |
return self.index | |
def get_k(self): | |
return self.k | |
def get_embeddings(self, text_list): | |
return self.embedding_model.encode(text_list) | |
def prepare_sentences_vector(self, encoded_list): | |
encoded_list = [i.reshape(1, -1) for i in encoded_list] | |
encoded_list = np.vstack(encoded_list).astype('float32') | |
encoded_list = normalize(encoded_list) | |
return encoded_list | |
def load_embeddings(self, file_path): | |
with open(file_path, "rb") as fIn: | |
stored_data = pickle.load(fIn) | |
stored_sentences = stored_data['sentences'] | |
stored_embeddings = stored_data['embeddings'] | |
return stored_embeddings | |
def model_pipeline(self, question, similar_context): | |
inputs = self.tokenizer(question, similar_context, return_tensors="pt") | |
with torch.no_grad(): | |
outputs = self.model(**inputs) | |
answer_start_index = outputs.start_logits.argmax() | |
answer_end_index = outputs.end_logits.argmax() | |
predict_answer_tokens = inputs.input_ids[0, answer_start_index: answer_end_index + 1] | |
Answer = self.tokenizer.decode(predict_answer_tokens) | |
return Answer.replace('<unk>','@') | |
def faiss_search(self, index, question_vector): | |
if index is None: | |
raise ValueError("Index has not been initialized.") | |
distances, indices = index.search(question_vector, self.k) | |
similar_questions = [self.df['Question'][indices[0][i]] for i in range(self.k)] | |
similar_contexts = [self.df['Context'][indices[0][i]] for i in range(self.k)] | |
return similar_questions, similar_contexts, distances, indices | |
def faiss_segment_search(self, index, question_vector, x=1): | |
if index is None: | |
raise ValueError("Index has not been initialized.") | |
distances, indices = index.search(question_vector, x) | |
return distances, indices | |
def create_segment_index(self, vector): | |
segment_index = faiss.IndexFlatL2(vector.shape[1]) | |
segment_index.add(vector) | |
return segment_index | |
def predict_test(self, question): | |
list_context_for_show = [] | |
list_distance_for_show = [] | |
list_similar_question = [] | |
question = question.strip() | |
question_vector = self.get_embeddings([question]) | |
question_vector = self.prepare_sentences_vector([question_vector]) | |
similar_questions, similar_contexts, distances, indices = self.faiss_search(self.index, question_vector) | |
mostSimContext = similar_contexts[0] | |
pattern = r'(?<=\s{10}).*' | |
matches = re.search(pattern, mostSimContext, flags=re.DOTALL) | |
if matches: | |
mostSimContext = matches.group(0) | |
mostSimContext = mostSimContext.strip() | |
mostSimContext = re.sub(r'\s+', ' ', mostSimContext) | |
segments = sent_tokenize(mostSimContext, engine="crfcut") | |
segment_embeddings = self.get_embeddings(segments) | |
segment_embeddings = self.prepare_sentences_vector(segment_embeddings) | |
segment_index = self.create_segment_index(segment_embeddings) | |
_distances, _indices = self.faiss_segment_search(segment_index, question_vector) | |
mostSimSegment = segments[_indices[0][0]] | |
print(f"_indices => {_indices[0][0]}") | |
answer = self.model_pipeline(question, mostSimSegment) | |
if len(answer) <= 2: | |
answer = mostSimSegment | |
start_index = mostSimContext.find(answer) | |
end_index = start_index + len(answer) | |
print(f"mostSimContext {len(mostSimContext)} =>{mostSimContext}\nsegments {len(segments)} =>{segments}\nmostSimSegment {len(mostSimSegment)} =>{mostSimSegment}") | |
print(f"answer {len(answer)} => {answer} || startIndex =>{start_index} || endIndex =>{end_index}") | |
for i in range(min(5, self.k)): | |
index = indices[0][i] | |
similar_question = similar_questions[i] | |
similar_context = similar_contexts[i] | |
list_similar_question.append(similar_question) | |
list_context_for_show.append(similar_context) | |
list_distance_for_show.append(str(1 - distances[0][i])) | |
distance = list_distance_for_show[0] | |
if float(distance) < 0.5: | |
answer = random.choice(self.UNKNOWN_ANSWERS) | |
output = { | |
"user_question": question, | |
"answer": self.df['Answer'][indices[0][0]], | |
"distance": distance, | |
"highlight_start": start_index, | |
"highlight_end": end_index, | |
"list_context": list_context_for_show, | |
"list_distance": list_distance_for_show | |
} | |
return output | |
def highlight_text(self, text, start_index, end_index): | |
if start_index < 0: | |
start_index = 0 | |
if end_index > len(text): | |
end_index = len(text) | |
highlighted_text = "" | |
for i, char in enumerate(text): | |
if i == start_index: | |
highlighted_text += "<mark>" | |
highlighted_text += char | |
if i == end_index - 1: | |
highlighted_text += "</mark>" | |
return highlighted_text | |
def chat_interface_before(self, question, history): | |
response = self.predict(question) | |
return response | |
def chat_interface_after(self, question, history): | |
response = self.predict_test(question) | |
highlighted_answer = self.highlight_text(response["answer"], response["highlight_start"], response["highlight_end"]) | |
return highlighted_answer | |
if __name__ == "__main__": | |
bot = ChatBot(df_path=DATA_PATH, model_path=MODEL_DICT, tokenizer_path=MODEL_DICT, embedding_model_name=DEFAULT_SENTENCE_EMBEDDING_MODEL, embeddingsPath=EMBEDDINGS_PATH) | |
# bot.load_data() | |
# bot.load_model() | |
# bot.load_embedding_model() | |
# embeddings = bot.load_embeddings(EMBEDDINGS_PATH) | |
# bot.set_index(bot.prepare_sentences_vector(embeddings)) | |
examples = [ | |
'ขอเลขที่บัญชีของบริษัทหน่อย', | |
'บริษัทตั้งอยู่ที่ถนนอะไร', | |
'ขอช่องทางติดตามข่าวสารทาง Line หน่อย', | |
'อยากทราบความถี่ในการดึงข้อมูลของ DXT360 ในแต่ละแพลตฟอร์ม', | |
'อยากทราบความถี่ในการดึงข้อมูลของ DXT360 บน Twitter', | |
# 'ช่องทางติดตามข่าวสารของเรา', | |
] | |
demo_before = gr.ChatInterface(fn=bot.chat_interface_before, examples=examples) | |
demo_after = gr.ChatInterface(fn=bot.chat_interface_after, examples=examples) | |
interface = gr.TabbedInterface([demo_before, demo_after], ["Before", "After"]) | |
interface.launch() | |