# import warnings # warnings.filterwarnings("ignore") import gradio as gr import torch torch.set_num_threads(1) from transformers import AutoModelForCausalLM, AutoTokenizer from typing import Optional, Union, List, Dict, Any, Tuple import random import time import datetime import os import re import pandas as pd from langchain.llms import HuggingFacePipeline from transformers import pipeline import requests from urllib.request import urlopen from urllib.parse import urlencode from urllib.error import HTTPError, URLError from urllib.request import Request import copy from langchain import ConversationChain, LLMChain, PromptTemplate from langchain.memory import ConversationBufferWindowMemory import torch import pickle from abc import ABC, abstractmethod from typing import List import numpy as np import tensorflow_hub as hub import tensorflow_text from dataclasses import dataclass import numpy as np import tensorflow as tf class Encoder(ABC): @abstractmethod def encode(self, texts: List[str]) -> np.array: """ output dimension expected to be one dimension and normalized (unit vector) """ ... class MUSEEncoder(Encoder): def __init__(self, model_url: str = "https://tfhub.dev/google/universal-sentence-encoder-multilingual/3"): self.embed = hub.load(model_url) def encode(self, texts: List[str]) -> np.array: embeds = self.embed(texts).numpy() embeds = embeds / np.linalg.norm(embeds, axis=1).reshape(embeds.shape[0], -1) return embeds @dataclass class SensitiveTopic: name: str respond_message: str sensitivity: float = None # range from 0 to 1 demonstrations: List[str] = None adhoc_embeded_demonstrations: np.array = None # dimension = [N_ADHOC, DIM]. Please kindly note that this suppose to DEFAULT_SENSITIVITY = 0.7 class SensitiveTopicProtector: def __init__( self, sensitive_topics: List[SensitiveTopic], encoder: Encoder = MUSEEncoder(), default_sensitivity: float = DEFAULT_SENSITIVITY ): self.sensitive_topics = sensitive_topics self.default_sensitivity = default_sensitivity self.encoder = encoder self.topic_embeddings = self._get_topic_embeddings() def _get_topic_embeddings(self) -> Dict[str, List[np.array]]: topic_embeddings = {} for topic in self.sensitive_topics: current_topic_embeddings = None if topic.demonstrations is not None: current_topic_embeddings = self.encoder.encode(texts=topic.demonstrations) if current_topic_embeddings is None \ else np.concatenate((current_topic_embeddings, self.encoder.encode(texts=topic.demonstrations)), axis=0) if topic.adhoc_embeded_demonstrations is not None: current_topic_embeddings = topic.adhoc_embeded_demonstrations if current_topic_embeddings is None \ else np.concatenate((current_topic_embeddings, topic.adhoc_embeded_demonstrations), axis=0) topic_embeddings[topic.name] = current_topic_embeddings return topic_embeddings def filter(self, text: str) -> Tuple[bool, str]: is_sensitive, respond_message = False, None text_embedding = self.encoder.encode([text,]) for topic in self.sensitive_topics: risk_scores = np.einsum('ik,jk->j', text_embedding, self.topic_embeddings[topic.name]) max_risk_score = np.max(risk_scores) if topic.sensitivity: if max_risk_score > (1.0 - topic.sensitivity): return True, topic.respond_message continue if max_risk_score > (1.0 - self.default_sensitivity): return True, topic.respond_message return is_sensitive, respond_message @classmethod def fromRaw(cls, raw_sensitive_topics: List[Dict], encoder: Encoder = MUSEEncoder(), default_sensitivity: float = DEFAULT_SENSITIVITY): sensitive_topics = [SensitiveTopic(**topic) for topic in raw_sensitive_topics] return cls(sensitive_topics=sensitive_topics, encoder=encoder, default_sensitivity=default_sensitivity) f = open("sensitive_topics.pkl", "rb") sensitive_topics = pickle.load(f) f.close() guardian = SensitiveTopicProtector.fromRaw(sensitive_topics) name_model = "pythainlp/wangchanglm-7.5B-sft-en-sharded" model = AutoModelForCausalLM.from_pretrained( name_model, device_map="auto", torch_dtype=torch.bfloat16, offload_folder="./", low_cpu_mem_usage=True, ) tokenizer = AutoTokenizer.from_pretrained("facebook/xglm-7.5B") Thai = "Yes" from transformers import AutoTokenizer,AutoModelForCausalLM template = """ {history} : {human_input} :""" prompt = PromptTemplate( input_variables=["history", "human_input"], template=template ) exclude_pattern = re.compile(r'[^ก-๙]+') #|[^0-9a-zA-Z]+ def is_exclude(text): return bool(exclude_pattern.search(text)) df = pd.DataFrame(tokenizer.vocab.items(), columns=['text', 'idx']) df['is_exclude'] = df.text.map(is_exclude) exclude_ids = df[df.is_exclude==True].idx.tolist() if Thai=="Yes": pipe = pipeline( "text-generation", model=model, tokenizer=tokenizer, max_new_tokens=512, begin_suppress_tokens=exclude_ids, no_repeat_ngram_size=2, ) else: pipe = pipeline( "text-generation", model=model, tokenizer=tokenizer, max_new_tokens=512, no_repeat_ngram_size=2, ) hf_pipeline = HuggingFacePipeline(pipeline=pipe) chatgpt_chain = LLMChain( llm=hf_pipeline, prompt=prompt, verbose=True, memory=ConversationBufferWindowMemory(k=2), ) api_url = "https://wangchanglm.numfa.com/api.php" # Don't open this url!!! def sumbit_data(save,prompt,vote,feedback=None,max_len=None,temp=None,top_p=None,name_model=name_model): api_url = "https://wangchanglm.numfa.com/api.php" myobj = { 'save': save, 'prompt':prompt, 'vote':vote, 'feedback':feedback, 'max_len':max_len, 'temp':temp, 'top_p':top_p, 'model':name_model } _temp_url ="https://wangchanglm.numfa.com/api.php" _temp_url += "?" + urlencode(myobj, doseq=True, safe="/") html = urlopen(_temp_url).read().decode('utf-8') return True def gen_instruct(text,max_new_tokens=512,top_p=0.95,temperature=0.9,top_k=50): batch = tokenizer(text, return_tensors="pt") with torch.cuda.amp.autocast(): # cuda -> cpu if cpu if Thai=="Yes": output_tokens = model.generate( input_ids=batch["input_ids"], max_new_tokens=max_new_tokens, # 512 begin_suppress_tokens = exclude_ids, no_repeat_ngram_size=2, #oasst k50 top_k=top_k, top_p=top_p, # 0.95 typical_p=1., temperature=temperature, # 0.9 ) else: output_tokens = model.generate( input_ids=batch["input_ids"], max_new_tokens=max_new_tokens, # 512 no_repeat_ngram_size=2, #oasst k50 top_k=top_k, top_p=top_p, # 0.95 typical_p=1., temperature=temperature, # 0.9 ) return tokenizer.decode(output_tokens[0][len(batch["input_ids"][0]):], skip_special_tokens=True) def gen_chatbot_old(text): is_sensitive, respond_message = guardian.filter(text) if is_sensitive: return respond_message batch = tokenizer(text, return_tensors="pt") #context_tokens = tokenizer(text, add_special_tokens=False)['input_ids'] #logits_processor = FocusContextProcessor(context_tokens, model.config.vocab_size, scaling_factor = 1.5) with torch.cpu.amp.autocast(): # cuda if gpu output_tokens = model.generate( input_ids=batch["input_ids"], max_new_tokens=512, begin_suppress_tokens = exclude_ids, no_repeat_ngram_size=2, ) return tokenizer.decode(output_tokens[0], skip_special_tokens=True).split(": ")[-1] def list2prompt(history): _text = "" for user,bot in history: _text+=": "+user+"\n: " if bot!=None: _text+=bot+"\n" return _text PROMPT_DICT = { "prompt_input": ( ": {input}\n: {instruction}\n: " ), "prompt_no_input": ( ": {instruction}\n: " ), } def instruct_generate( instruct: str, input: str = 'none', max_gen_len=512, temperature: float = 0.1, top_p: float = 0.75, ): is_sensitive, respond_message = guardian.filter(instruct) if is_sensitive: return respond_message if input == 'none' or len(input)<2: prompt = PROMPT_DICT['prompt_no_input'].format_map( {'instruction': instruct, 'input': ''}) else: prompt = PROMPT_DICT['prompt_input'].format_map( {'instruction': instruct, 'input': input}) result = gen_instruct(prompt,max_gen_len,top_p,temperature) return result with gr.Blocks(height=900) as demo: chatgpt_chain = LLMChain( llm=hf_pipeline, prompt=prompt, verbose=True, memory=ConversationBufferWindowMemory(k=2), ) gr.Markdown( """ # 🐘 WangChanGLM v0.2 demo [Blog](https://medium.com/@iwishcognitivedissonance/wangchanglm-the-thai-turned-multilingual-instruction-following-model-7aa9a0f51f5f) | [Codes](https://github.com/pythainlp/wangchanglm) | [Demo](https://colab.research.google.com/github/pythainlp/WangChanGLM/blob/main/demo/WangChanGLM_v0_1_demo.ipynb) This demo use CPU only, so It may be slow or very slow. If you want the speed, try [Google colab](https://colab.research.google.com/github/pythainlp/WangChanGLM/blob/main/demo/WangChanGLM_v0_1_demo.ipynb). **We do not guarantee a reply message.** """ ) with gr.Tab("Text Generation"): with gr.Row(): with gr.Column(): instruction = gr.Textbox(lines=2, label="Instruction",max_lines=10) input = gr.Textbox( lines=2, label="Context input", placeholder='none',max_lines=5) max_len = gr.Slider(minimum=1, maximum=1024, value=512, label="Max new tokens") with gr.Accordion(label='Advanced options', open=False): temp = gr.Slider(minimum=0, maximum=1, value=0.9, label="Temperature") top_p = gr.Slider(minimum=0, maximum=1, value=0.95, label="Top p") run_botton = gr.Button("Run") with gr.Column(): outputs = gr.Textbox(lines=10, label="Output") with gr.Column(visible=False) as feedback_gen_box: gen_radio = gr.Radio( ["Good", "Bad", "Report"], label="Do you think about the chat?") feedback_gen = gr.Textbox(placeholder="Feedback chatbot",show_label=False, lines=4) feedback_gen_submit = gr.Button("Submit Feedback") with gr.Row(visible=False) as feedback_gen_ok: gr.Markdown("Thank you for feedback.") def save_up2(instruction, input,prompt,max_len,temp,top_p,choice,feedback): save="gen" if input == 'none' or len(input)<2: _prompt = PROMPT_DICT['prompt_no_input'].format_map( {'instruction': instruction, 'input': ''}) else: _prompt = PROMPT_DICT['prompt_input'].format_map( {'instruction': instruction, 'input': input}) prompt=_prompt+prompt if choice=="Good": sumbit_data(save=save,prompt=prompt,vote=1,feedback=feedback,max_len=max_len,temp=temp,top_p=top_p) elif choice=="Bad": sumbit_data(save=save,prompt=prompt,vote=0,feedback=feedback,max_len=max_len,temp=temp,top_p=top_p) else: sumbit_data(save=save,prompt=prompt,vote=3,feedback=feedback,max_len=max_len,temp=temp,top_p=top_p) return {feedback_gen_box: gr.update(visible=False),feedback_gen_ok: gr.update(visible=True)} def gen(instruct: str,input: str = 'none',max_gen_len=512,temperature: float = 0.1,top_p: float = 0.75): feedback_gen_ok.update(visible=False) _temp= instruct_generate(instruct,input,max_gen_len,temperature,top_p) feedback_gen_box.update(visible=True) return {outputs:_temp,feedback_gen_box: gr.update(visible=True),feedback_gen_ok: gr.update(visible=False)} feedback_gen_submit.click(fn=save_up2, inputs=[instruction, input,outputs,max_len,temp,top_p,gen_radio,feedback_gen], outputs=[feedback_gen_box,feedback_gen_ok], queue=False) inputs = [instruction, input, max_len, temp, top_p] run_botton.click(fn=gen, inputs=inputs, outputs=[outputs,feedback_gen_box,feedback_gen_ok]) examples = gr.Examples(examples=['อยากลดความอ้วนทำไง','จงแต่งเรียงความเรื่องความฝันของคนรุ่นใหม่ต่อประเทศไทย'],inputs=[instruction]) with gr.Tab("ChatBot"): with gr.Column(): chatbot = gr.Chatbot(label="Chat Message Box", placeholder="Chat Message Box",show_label=False).style(container=False) with gr.Row(): with gr.Column(scale=0.85): msg = gr.Textbox(placeholder="พิมพ์คำถามของคุณที่นี่... (กด enter หรือ submit หลังพิมพ์เสร็จ)",show_label=False) with gr.Column(scale=0.15, min_width=0): submit = gr.Button("Submit") with gr.Column(): with gr.Column(visible=False) as feedback_chatbot_box: chatbot_radio = gr.Radio( ["Good", "Bad", "Report"], label="Do you think about the chat?" ) feedback_chatbot = gr.Textbox(placeholder="Feedback chatbot",show_label=False, lines=4) feedback_chatbot_submit = gr.Button("Submit Feedback") with gr.Row(visible=False) as feedback_chatbot_ok: gr.Markdown("Thank you for feedback.") clear = gr.Button("Clear") def save_up(history,choice,feedback): _bot = list2prompt(history) x=False if choice=="Good": x=sumbit_data(save="chat",prompt=_bot,vote=1,feedback=feedback) elif choice=="Bad": x=sumbit_data(save="chat",prompt=_bot,vote=0,feedback=feedback) else: x=sumbit_data(save="chat",prompt=_bot,vote=3,feedback=feedback) return {feedback_chatbot_ok: gr.update(visible=True),feedback_chatbot_box: gr.update(visible=False)} def user(user_message, history): is_sensitive, respond_message = guardian.filter(user_message) if is_sensitive: bot_message = respond_message else: bot_message = chatgpt_chain.predict(human_input=user_message) history.append((user_message, bot_message)) return "", history,gr.update(visible=True) def reset(): chatgpt_chain.memory.clear() print("clear!") feedback_chatbot_submit.click(fn=save_up, inputs=[chatbot,chatbot_radio,feedback_chatbot], outputs=[feedback_chatbot_ok,feedback_chatbot_box,], queue=False) clear.click(reset, None, chatbot, queue=False) submit_event = msg.submit(fn=user, inputs=[msg, chatbot], outputs=[msg, chatbot,feedback_chatbot_box], queue=True) submit_click_event = submit.click(fn=user, inputs=[msg, chatbot], outputs=[msg, chatbot,feedback_chatbot_box], queue=True) with gr.Tab("ChatBot without LangChain"): chatbot2 = gr.Chatbot() msg2 = gr.Textbox(label="Your sentence here... (press enter to submit)") with gr.Column(): with gr.Column(visible=False) as feedback_chatbot_box2: chatbot_radio2 = gr.Radio( ["Good", "Bad", "Report"], label="Do you think about the chat?" ) feedback_chatbot2 = gr.Textbox(placeholder="Feedback chatbot",show_label=False, lines=4) feedback_chatbot_submit2 = gr.Button("Submit Feedback") with gr.Row(visible=False) as feedback_chatbot_ok2: gr.Markdown("Thank you for feedback.") def user2(user_message, history): return "", history + [[user_message, None]] def bot2(history): _bot = list2prompt(history) bot_message = gen_chatbot_old(_bot) history[-1][1] = bot_message return history,gr.update(visible=True) def save_up2(history,choice,feedback): _bot = list2prompt(history) x=False if choice=="Good": x=sumbit_data(save="chat",prompt=_bot,vote=1,feedback=feedback,name_model=name_model+"-chat_old") elif choice=="Bad": x=sumbit_data(save="chat",prompt=_bot,vote=0,feedback=feedback,name_model=name_model+"-chat_old") else: x=sumbit_data(save="chat",prompt=_bot,vote=3,feedback=feedback,name_model=name_model+"-chat_old") return {feedback_chatbot_ok2: gr.update(visible=True),feedback_chatbot_box2: gr.update(visible=False)} msg2.submit(user2, [msg2, chatbot2], [msg2, chatbot2], queue=True).then(bot2, chatbot2, [chatbot2,feedback_chatbot_box2]) feedback_chatbot_submit2.click(fn=save_up2, inputs=[chatbot2,chatbot_radio2,feedback_chatbot2], outputs=[feedback_chatbot_ok2,feedback_chatbot_box2], queue=False) clear2 = gr.Button("Clear") clear2.click(lambda: None, None, chatbot2, queue=False) demo.queue() demo.launch()