Alignment-Lab-AI's picture
Update app.py
052bd8b verified
raw
history blame
5.68 kB
import os
from threading import Thread
from typing import Iterator
import gradio as gr
import spaces
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer, TextIteratorStreamer
MAX_MAX_NEW_TOKENS = 1024
DEFAULT_MAX_NEW_TOKENS = 256
MAX_INPUT_TOKEN_LENGTH = 512
DESCRIPTION = """\
# Buzz-3B-Small
This Space demonstrates Buzz-3b-small-v0.6.3.
"""
LICENSE = """
<p/>
---
This demo uses Buzz-3b-small-v0.6.3. Please check the model card for details.
"""
if not torch.cuda.is_available():
DESCRIPTION += "\n<p>Running on CPU 🥶 This demo works better on GPU.</p>"
model_id = "H-D-T/Buzz-3b-small-v0.6.3"
if torch.cuda.is_available():
model = AutoModelForCausalLM.from_pretrained(model_id, device_map="auto", trust_remote_code=True, low_cpu_mem_usage=True)
else:
model = AutoModelForCausalLM.from_pretrained(model_id, device_map="cpu", trust_remote_code=True, low_cpu_mem_usage=True)
tokenizer = AutoTokenizer.from_pretrained(model_id)
if tokenizer.pad_token == None:
tokenizer.pad_token = tokenizer.eos_token
tokenizer.pad_token_id = tokenizer.eos_token_id
model.config.pad_token_id = tokenizer.eos_token_id
# Define the special tokens
bos_token = "<|begin_of_text|>"
eos_token = "<|eot_id|>"
start_header_id = "<|start_header_id|>"
end_header_id = "<|end_header_id|>"
def format_chat_history(chat_history: list[tuple[str, str]], add_generation_prompt=False) -> str:
"""
Formats the chat history according to the model's chat template.
"""
chat_template = f"""
{{% if not add_generation_prompt is defined %}}{{% set add_generation_prompt = false %}}{{% endif %}}
{{% set loop_messages = messages %}}
{{% for message in loop_messages %}}
{{% set content = '{start_header_id}' + message['role'] + '{end_header_id}\\n\\n' + message['content'].strip() + '{eos_token}' %}}
{{% if loop.index0 == 0 %}}{{% set content = bos_token + content %}}{{% endif %}}
{{ content }}
{{% endfor %}}
{{% if add_generation_prompt %}}{{ '{start_header_id}assistant{end_header_id}\\n\\n' }}{{% else %}}{{ eos_token }}{{% endif %}}
"""
chat_context = ""
for i, (user, assistant) in enumerate(chat_history):
user_msg = start_header_id + "user" + end_header_id + "\n\n" + user.strip() + eos_token
assistant_msg = start_header_id + "assistant" + end_header_id + "\n\n" + assistant.strip() + eos_token
if i == 0:
user_msg = bos_token + user_msg
chat_context += user_msg + assistant_msg
if add_generation_prompt:
chat_context += start_header_id + "assistant" + end_header_id + "\n\n"
else:
chat_context += eos_token
return chat_context
@spaces.GPU
def generate(
message: str,
chat_history: list[tuple[str, str]],
max_new_tokens: int = 1024,
temperature: float = 0.6,
top_p: float = 0.9,
top_k: int = 50,
repetition_penalty: float = 1.4,
) -> Iterator[str]:
chat_history.append(("user", message))
chat_context = format_chat_history(chat_history, add_generation_prompt=True)
input_ids = tokenizer([chat_context], return_tensors="pt").input_ids
if input_ids.shape[1] > MAX_INPUT_TOKEN_LENGTH:
input_ids = input_ids[:, -MAX_INPUT_TOKEN_LENGTH:]
gr.Warning(f"Trimmed input from conversation as it was longer than {MAX_INPUT_TOKEN_LENGTH} tokens.")
input_ids = input_ids.to(model.device)
streamer = TextIteratorStreamer(tokenizer, timeout=10.0, skip_prompt=True, skip_special_tokens=True)
generate_kwargs = dict(
{"input_ids": input_ids},
streamer=streamer,
max_new_tokens=max_new_tokens,
do_sample=True,
top_p=top_p,
top_k=top_k,
temperature=temperature,
num_beams=1,
pad_token_id = tokenizer.eos_token_id,
repetition_penalty=repetition_penalty,
no_repeat_ngram_size=5,
early_stopping=False,
)
t = Thread(target=model.generate, kwargs=generate_kwargs)
t.start()
outputs = []
for text in streamer:
outputs.append(text)
yield "".join(outputs)
chat_interface = gr.ChatInterface(
fn=generate,
additional_inputs=[
gr.Slider(
label="Max new tokens",
minimum=1,
maximum=MAX_MAX_NEW_TOKENS,
step=1,
value=DEFAULT_MAX_NEW_TOKENS,
),
gr.Slider(
label="Temperature",
minimum=0.1,
maximum=4.0,
step=0.1,
value=0.6,
),
gr.Slider(
label="Top-p (nucleus sampling)",
minimum=0.05,
maximum=1.0,
step=0.05,
value=0.9,
),
gr.Slider(
label="Top-k",
minimum=1,
maximum=1000,
step=1,
value=50,
),
gr.Slider(
label="Repetition penalty",
minimum=1.0,
maximum=2.0,
step=0.05,
value=1.4,
),
],
stop_btn=None,
examples=[
["A recipe for a chocolate cake:"],
["Can you explain briefly to me what is the Python programming language?"],
["Explain the plot of Cinderella in a sentence."],
["Question: What is the capital of France?\nAnswer:"],
["Question: I am very tired, what should I do?\nAnswer:"],
],
)
with gr.Blocks(css="style.css") as demo:
gr.Markdown(DESCRIPTION)
gr.DuplicateButton(value="Duplicate Space for private use", elem_id="duplicate-button")
chat_interface.render()
gr.Markdown(LICENSE)
if __name__ == "__main__":
demo.queue(max_size=20).launch()