wannaphong's picture
Update app.py
2ae4509
raw
history blame
18.2 kB
# import warnings
# warnings.filterwarnings("ignore")
import gradio as gr
import torch
#torch.set_num_threads(1)
from transformers import AutoModelForCausalLM, AutoTokenizer
from typing import Optional, Union, List, Dict, Any, Tuple
import random
import time
import datetime
import os
import re
import pandas as pd
from langchain.llms import HuggingFacePipeline
from transformers import pipeline
import requests
import urllib
from urllib.request import urlopen
from urllib.parse import urlencode
from urllib.error import HTTPError, URLError
from urllib.request import Request
import copy
from langchain import ConversationChain, LLMChain, PromptTemplate
from langchain.memory import ConversationBufferWindowMemory
import torch
import pickle
from abc import ABC, abstractmethod
from typing import List
import numpy as np
import tensorflow_hub as hub
import tensorflow_text
from dataclasses import dataclass
import numpy as np
import tensorflow as tf
class Encoder(ABC):
@abstractmethod
def encode(self, texts: List[str]) -> np.array:
"""
output dimension expected to be one dimension and normalized (unit vector)
"""
...
class MUSEEncoder(Encoder):
def __init__(self, model_url: str = "https://tfhub.dev/google/universal-sentence-encoder-multilingual/3"):
self.embed = hub.load(model_url)
def encode(self, texts: List[str]) -> np.array:
embeds = self.embed(texts).numpy()
embeds = embeds / np.linalg.norm(embeds, axis=1).reshape(embeds.shape[0], -1)
return embeds
@dataclass
class SensitiveTopic:
name: str
respond_message: str
sensitivity: float = None # range from 0 to 1
demonstrations: List[str] = None
adhoc_embeded_demonstrations: np.array = None # dimension = [N_ADHOC, DIM]. Please kindly note that this suppose to
DEFAULT_SENSITIVITY = 0.4
class SensitiveTopicProtector:
def __init__(
self,
sensitive_topics: List[SensitiveTopic],
encoder: Encoder = MUSEEncoder(),
default_sensitivity: float = DEFAULT_SENSITIVITY
):
self.sensitive_topics = sensitive_topics
self.default_sensitivity = default_sensitivity
self.encoder = encoder
self.topic_embeddings = self._get_topic_embeddings()
def _get_topic_embeddings(self) -> Dict[str, List[np.array]]:
topic_embeddings = {}
for topic in self.sensitive_topics:
current_topic_embeddings = None
if topic.demonstrations is not None:
current_topic_embeddings = self.encoder.encode(texts=topic.demonstrations) if current_topic_embeddings is None \
else np.concatenate((current_topic_embeddings, self.encoder.encode(texts=topic.demonstrations)), axis=0)
if topic.adhoc_embeded_demonstrations is not None:
current_topic_embeddings = topic.adhoc_embeded_demonstrations if current_topic_embeddings is None \
else np.concatenate((current_topic_embeddings, topic.adhoc_embeded_demonstrations), axis=0)
topic_embeddings[topic.name] = current_topic_embeddings
return topic_embeddings
def filter(self, text: str) -> Tuple[bool, str]:
is_sensitive, respond_message = False, None
text_embedding = self.encoder.encode([text,])
for topic in self.sensitive_topics:
risk_scores = np.einsum('ik,jk->j', text_embedding, self.topic_embeddings[topic.name])
max_risk_score = np.max(risk_scores)
if topic.sensitivity:
if max_risk_score > (1.0 - topic.sensitivity):
return True, topic.respond_message
continue
if max_risk_score > (1.0 - self.default_sensitivity):
return True, topic.respond_message
return is_sensitive, respond_message
@classmethod
def fromRaw(cls, raw_sensitive_topics: List[Dict], encoder: Encoder = MUSEEncoder(), default_sensitivity: float = DEFAULT_SENSITIVITY):
sensitive_topics = [SensitiveTopic(**topic) for topic in raw_sensitive_topics]
return cls(sensitive_topics=sensitive_topics, encoder=encoder, default_sensitivity=default_sensitivity)
f = open("sensitive_topics.pkl", "rb")
sensitive_topics = pickle.load(f)
f.close()
guardian = SensitiveTopicProtector.fromRaw(sensitive_topics)
name_model = "pythainlp/wangchanglm-7.5B-sft-en-sharded"
model = AutoModelForCausalLM.from_pretrained(
name_model,
device_map="auto",
torch_dtype=torch.bfloat16,
offload_folder="./",
low_cpu_mem_usage=True,
)
tokenizer = AutoTokenizer.from_pretrained("facebook/xglm-7.5B")
Thai = "Yes"
from transformers import AutoTokenizer,AutoModelForCausalLM
template = """
{history}
<human>: {human_input}
<bot>:"""
prompt = PromptTemplate(
input_variables=["history", "human_input"],
template=template
)
exclude_pattern = re.compile(r'[^ก-๙]+') #|[^0-9a-zA-Z]+
def is_exclude(text):
return bool(exclude_pattern.search(text))
df = pd.DataFrame(tokenizer.vocab.items(), columns=['text', 'idx'])
df['is_exclude'] = df.text.map(is_exclude)
exclude_ids = df[df.is_exclude==True].idx.tolist()
if Thai=="Yes":
pipe = pipeline(
"text-generation",
model=model,
tokenizer=tokenizer,
max_new_tokens=512,
begin_suppress_tokens=exclude_ids,
no_repeat_ngram_size=2,
)
else:
pipe = pipeline(
"text-generation",
model=model,
tokenizer=tokenizer,
max_new_tokens=512,
no_repeat_ngram_size=2,
)
hf_pipeline = HuggingFacePipeline(pipeline=pipe)
chatgpt_chain = LLMChain(
llm=hf_pipeline,
prompt=prompt,
verbose=True,
memory=ConversationBufferWindowMemory(k=2),
)
api_url = "https://wangchanglm.numfa.com/apiv2.php" # Don't open this url!!!
def sumbit_data(save,prompt,vote,feedback=None,max_len=None,temp=None,top_p=None,name_model=name_model):
api_url = "https://wangchanglm.numfa.com/apiv2.php"
myobj = {
'save': save,
'prompt':prompt,
'vote':vote,
'feedback':feedback,
'max_len':max_len,
'temp':temp,
'top_p':top_p,
'model':name_model
}
myobj=[(k, v) for k, v in myobj.items()]
myobj=urllib.parse.urlencode(myobj)
utf8 = bytes(myobj, 'utf-8')
#req = urllib.request.Request(api_url)
#req.add_header("Content-type", "application/x-www-form-urlencoded")
page=urllib.request.urlopen(api_url, utf8, 300).read()
return True
def gen_instruct(text,max_new_tokens=512,top_p=0.95,temperature=0.9,top_k=50):
batch = tokenizer(text, return_tensors="pt")
with torch.cuda.amp.autocast(): # cuda -> cpu if cpu
if Thai=="Yes":
output_tokens = model.generate(
input_ids=batch["input_ids"],
max_new_tokens=max_new_tokens, # 512
begin_suppress_tokens = exclude_ids,
no_repeat_ngram_size=2,
#oasst k50
top_k=top_k,
top_p=top_p, # 0.95
typical_p=1.,
temperature=temperature, # 0.9
)
else:
output_tokens = model.generate(
input_ids=batch["input_ids"],
max_new_tokens=max_new_tokens, # 512
no_repeat_ngram_size=2,
#oasst k50
top_k=top_k,
top_p=top_p, # 0.95
typical_p=1.,
temperature=temperature, # 0.9
)
return tokenizer.decode(output_tokens[0][len(batch["input_ids"][0]):], skip_special_tokens=True)
def gen_chatbot_old(text):
is_sensitive, respond_message = guardian.filter(text)
if is_sensitive:
return respond_message
batch = tokenizer(text, return_tensors="pt")
#context_tokens = tokenizer(text, add_special_tokens=False)['input_ids']
#logits_processor = FocusContextProcessor(context_tokens, model.config.vocab_size, scaling_factor = 1.5)
with torch.cpu.amp.autocast(): # cuda if gpu
output_tokens = model.generate(
input_ids=batch["input_ids"],
max_new_tokens=512,
begin_suppress_tokens = exclude_ids,
no_repeat_ngram_size=2,
)
return tokenizer.decode(output_tokens[0], skip_special_tokens=True).split(": ")[-1]
def list2prompt(history):
_text = ""
for user,bot in history:
_text+="<human>: "+user+"\n<bot>: "
if bot!=None:
_text+=bot+"\n"
return _text
PROMPT_DICT = {
"prompt_input": (
"<context>: {input}\n<human>: {instruction}\n<bot>: "
),
"prompt_no_input": (
"<human>: {instruction}\n<bot>: "
),
}
def instruct_generate(
instruct: str,
input: str = 'none',
max_gen_len=512,
temperature: float = 0.1,
top_p: float = 0.75,
):
is_sensitive, respond_message = guardian.filter(instruct)
if is_sensitive:
return respond_message
if input == 'none' or len(input)<2:
prompt = PROMPT_DICT['prompt_no_input'].format_map(
{'instruction': instruct, 'input': ''})
else:
prompt = PROMPT_DICT['prompt_input'].format_map(
{'instruction': instruct, 'input': input})
result = gen_instruct(prompt,max_gen_len,top_p,temperature)
return result
with gr.Blocks(height=900) as demo:
chatgpt_chain = LLMChain(
llm=hf_pipeline,
prompt=prompt,
verbose=True,
memory=ConversationBufferWindowMemory(k=2),
)
gr.Markdown(
"""
# 🐘 WangChanGLM v0.2 demo
[Blog](https://medium.com/@iwishcognitivedissonance/wangchanglm-the-thai-turned-multilingual-instruction-following-model-7aa9a0f51f5f) | [Codes](https://github.com/pythainlp/wangchanglm) | [Demo](https://colab.research.google.com/github/pythainlp/WangChanGLM/blob/main/demo/WangChanGLM_v0_1_demo.ipynb)
This demo use CPU only, so It may be slow or very slow. If you want the speed, try [Google colab](https://colab.research.google.com/github/pythainlp/WangChanGLM/blob/main/demo/WangChanGLM_v0_1_demo.ipynb).
**We do not guarantee a reply message.**
"""
)
with gr.Tab("Text Generation"):
with gr.Row():
with gr.Column():
instruction = gr.Textbox(lines=2, label="Instruction",max_lines=10)
input = gr.Textbox(
lines=2, label="Context input", placeholder='none',max_lines=5)
max_len = gr.Slider(minimum=1, maximum=1024,
value=512, label="Max new tokens")
with gr.Accordion(label='Advanced options', open=False):
temp = gr.Slider(minimum=0, maximum=1,
value=0.9, label="Temperature")
top_p = gr.Slider(minimum=0, maximum=1,
value=0.95, label="Top p")
run_botton = gr.Button("Run")
with gr.Column():
outputs = gr.Textbox(lines=10, label="Output")
with gr.Column(visible=False) as feedback_gen_box:
gen_radio = gr.Radio(
["Good", "Bad", "Report"], label="Do you think about the chat?")
feedback_gen = gr.Textbox(placeholder="Feedback chatbot",show_label=False, lines=4)
feedback_gen_submit = gr.Button("Submit Feedback")
with gr.Row(visible=False) as feedback_gen_ok:
gr.Markdown("Thank you for feedback.")
def save_up2(instruction, input,prompt,max_len,temp,top_p,choice,feedback):
save="gen"
if input == 'none' or len(input)<2:
_prompt = PROMPT_DICT['prompt_no_input'].format_map(
{'instruction': instruction, 'input': ''})
else:
_prompt = PROMPT_DICT['prompt_input'].format_map(
{'instruction': instruction, 'input': input})
prompt=_prompt+prompt
if choice=="Good":
sumbit_data(save=save,prompt=prompt,vote=1,feedback=feedback,max_len=max_len,temp=temp,top_p=top_p)
elif choice=="Bad":
sumbit_data(save=save,prompt=prompt,vote=0,feedback=feedback,max_len=max_len,temp=temp,top_p=top_p)
else:
sumbit_data(save=save,prompt=prompt,vote=3,feedback=feedback,max_len=max_len,temp=temp,top_p=top_p)
return {feedback_gen_box: gr.update(visible=False),feedback_gen_ok: gr.update(visible=True)}
def gen(instruct: str,input: str = 'none',max_gen_len=512,temperature: float = 0.1,top_p: float = 0.75):
feedback_gen_ok.update(visible=False)
_temp= instruct_generate(instruct,input,max_gen_len,temperature,top_p)
feedback_gen_box.update(visible=True)
return {outputs:_temp,feedback_gen_box: gr.update(visible=True),feedback_gen_ok: gr.update(visible=False)}
feedback_gen_submit.click(fn=save_up2, inputs=[instruction, input,outputs,max_len,temp,top_p,gen_radio,feedback_gen], outputs=[feedback_gen_box,feedback_gen_ok], queue=False)
inputs = [instruction, input, max_len, temp, top_p]
run_botton.click(fn=gen, inputs=inputs, outputs=[outputs,feedback_gen_box,feedback_gen_ok])
examples = gr.Examples(examples=["แต่งกลอนวันแม่","แต่งกลอนแปดวันแม่",'อยากลดความอ้วนทำไง','จงแต่งเรียงความเรื่องความฝันของคนรุ่นใหม่ต่อประเทศไทย'],inputs=[instruction])
with gr.Tab("ChatBot"):
with gr.Column():
chatbot = gr.Chatbot(label="Chat Message Box", placeholder="Chat Message Box",show_label=False).style(container=False)
with gr.Row():
with gr.Column(scale=0.85):
msg = gr.Textbox(placeholder="พิมพ์คำถามของคุณที่นี่... (กด enter หรือ submit หลังพิมพ์เสร็จ)",show_label=False)
with gr.Column(scale=0.15, min_width=0):
submit = gr.Button("Submit")
with gr.Column():
with gr.Column(visible=False) as feedback_chatbot_box:
chatbot_radio = gr.Radio(
["Good", "Bad", "Report"], label="Do you think about the chat?"
)
feedback_chatbot = gr.Textbox(placeholder="Feedback chatbot",show_label=False, lines=4)
feedback_chatbot_submit = gr.Button("Submit Feedback")
with gr.Row(visible=False) as feedback_chatbot_ok:
gr.Markdown("Thank you for feedback.")
clear = gr.Button("Clear")
def save_up(history,choice,feedback):
_bot = list2prompt(history)
x=False
if choice=="Good":
x=sumbit_data(save="chat",prompt=_bot,vote=1,feedback=feedback)
elif choice=="Bad":
x=sumbit_data(save="chat",prompt=_bot,vote=0,feedback=feedback)
else:
x=sumbit_data(save="chat",prompt=_bot,vote=3,feedback=feedback)
return {feedback_chatbot_ok: gr.update(visible=True),feedback_chatbot_box: gr.update(visible=False)}
def user(user_message, history):
is_sensitive, respond_message = guardian.filter(user_message)
if is_sensitive:
bot_message = respond_message
else:
bot_message = chatgpt_chain.predict(human_input=user_message)
history.append((user_message, bot_message))
return "", history,gr.update(visible=True)
def reset():
chatgpt_chain.memory.clear()
print("clear!")
feedback_chatbot_submit.click(fn=save_up, inputs=[chatbot,chatbot_radio,feedback_chatbot], outputs=[feedback_chatbot_ok,feedback_chatbot_box,], queue=False)
clear.click(reset, None, chatbot, queue=False)
submit_event = msg.submit(fn=user, inputs=[msg, chatbot], outputs=[msg, chatbot,feedback_chatbot_box], queue=True)
submit_click_event = submit.click(fn=user, inputs=[msg, chatbot], outputs=[msg, chatbot,feedback_chatbot_box], queue=True)
with gr.Tab("ChatBot without LangChain"):
chatbot2 = gr.Chatbot()
msg2 = gr.Textbox(label="Your sentence here... (press enter to submit)")
with gr.Column():
with gr.Column(visible=False) as feedback_chatbot_box2:
chatbot_radio2 = gr.Radio(
["Good", "Bad", "Report"], label="Do you think about the chat?"
)
feedback_chatbot2 = gr.Textbox(placeholder="Feedback chatbot",show_label=False, lines=4)
feedback_chatbot_submit2 = gr.Button("Submit Feedback")
with gr.Row(visible=False) as feedback_chatbot_ok2:
gr.Markdown("Thank you for feedback.")
def user2(user_message, history):
return "", history + [[user_message, None]]
def bot2(history):
_bot = list2prompt(history)
bot_message = gen_chatbot_old(_bot)
history[-1][1] = bot_message
return history,gr.update(visible=True)
def save_up2(history,choice,feedback):
_bot = list2prompt(history)
x=False
if choice=="Good":
x=sumbit_data(save="chat",prompt=_bot,vote=1,feedback=feedback,name_model=name_model+"-chat_old")
elif choice=="Bad":
x=sumbit_data(save="chat",prompt=_bot,vote=0,feedback=feedback,name_model=name_model+"-chat_old")
else:
x=sumbit_data(save="chat",prompt=_bot,vote=3,feedback=feedback,name_model=name_model+"-chat_old")
return {feedback_chatbot_ok2: gr.update(visible=True),feedback_chatbot_box2: gr.update(visible=False)}
msg2.submit(user2, [msg2, chatbot2], [msg2, chatbot2]).then(bot2, chatbot2, [chatbot2,feedback_chatbot_box2], queue=True)
feedback_chatbot_submit2.click(fn=save_up2, inputs=[chatbot2,chatbot_radio2,feedback_chatbot2], outputs=[feedback_chatbot_ok2,feedback_chatbot_box2], queue=False)
clear2 = gr.Button("Clear")
clear2.click(lambda: None, None, chatbot2, queue=False)
demo.queue()
demo.launch()