Spaces:
Runtime error
Runtime error
#!/usr/bin/env python | |
# coding=utf-8 | |
"""The Inferencer class simplifies the process of model inferencing.""" | |
import os | |
import torch | |
import wandb | |
import deepspeed | |
import sys | |
import numpy as np | |
import datetime | |
import json | |
from transformers import AutoConfig | |
import torch.distributed as dist | |
from lmflow.args import DatasetArguments | |
from lmflow.datasets.dataset import Dataset | |
from lmflow.pipeline.base_pipeline import BasePipeline | |
from lmflow.models.hf_decoder_model import HFDecoderModel | |
from lmflow.utils.data_utils import set_random_seed, batchlize, answer_extraction | |
os.environ["TOKENIZERS_PARALLELISM"] = "false" # To avoid warnings about parallelism in tokenizers | |
def rstrip_partial_utf8(string): | |
return string.replace("\ufffd", "") | |
class Inferencer(BasePipeline): | |
""" | |
Initializes the `Inferencer` class with given arguments. | |
Parameters | |
------------ | |
model_args : ModelArguments object. | |
Contains the arguments required to load the model. | |
data_args : DatasetArguments object. | |
Contains the arguments required to load the dataset. | |
inferencer_args : InferencerArguments object. | |
Contains the arguments required to perform inference. | |
""" | |
def __init__(self, model_args, data_args, inferencer_args): | |
self.data_args = data_args | |
self.inferencer_args = inferencer_args | |
self.model_args = model_args | |
set_random_seed(self.inferencer_args.random_seed) | |
self.local_rank = int(os.getenv("LOCAL_RANK", "0")) | |
self.world_size = int(os.getenv("WORLD_SIZE", "1")) | |
if inferencer_args.device == "gpu": | |
torch.cuda.set_device(self.local_rank) # NOTE: cpu-only machine will have error | |
deepspeed.init_distributed() | |
else: | |
os.environ["MASTER_ADDR"] = "localhost" | |
os.environ["MASTER_PORT"] = "15000" | |
dist.init_process_group( | |
"gloo", rank=self.local_rank, world_size=self.world_size | |
) | |
self.config = AutoConfig.from_pretrained(model_args.model_name_or_path, trust_remote_code=True) | |
try: | |
self.model_hidden_size = self.config.hidden_size | |
except: | |
print("Error in setting hidden size, use the default size 1024") | |
self.model_hidden_size = 1024 # gpt2 seems do not have hidden_size in config | |
def create_dataloader(self, dataset: Dataset): | |
data_dict = dataset.to_dict() | |
inputs = [ instance["text"] for instance in data_dict["instances"] ] | |
dataset_size = len(inputs) | |
dataset_buf = [] | |
for idx in range(dataset_size): | |
dataset_buf.append({ | |
"input": inputs[idx], | |
"input_idx": idx | |
}) | |
dataloader = batchlize( | |
dataset_buf, | |
batch_size=1, | |
random_shuffle=False, | |
) | |
return dataloader, dataset_size | |
def inference( | |
self, | |
model, | |
dataset: Dataset, | |
max_new_tokens: int=100, | |
temperature: float=0.0, | |
prompt_structure: str='{input}', | |
): | |
""" | |
Perform inference for a model | |
Parameters | |
------------ | |
model : TunableModel object. | |
TunableModel to perform inference | |
dataset : Dataset object. | |
Returns: | |
output_dataset: Dataset object. | |
""" | |
if dataset.get_type() != "text_only": | |
raise NotImplementedError( | |
'input dataset should have type "text_only"' | |
) | |
dataloader, data_size = self.create_dataloader(dataset) | |
# The output dataset | |
output_dict = { | |
"type": "text_only", | |
"instances": [ | |
] | |
} | |
for batch_index, batch in enumerate(dataloader): | |
current_batch = batch[0] # batch size is 1 | |
input = prompt_structure.format(input=current_batch['input']) | |
if self.inferencer_args.device == "gpu": | |
inputs = model.encode(input, return_tensors="pt").to(device=self.local_rank) | |
elif self.inferencer_args.device == "cpu": | |
inputs = model.encode(input, return_tensors="pt").to(device='cpu') | |
else: | |
raise NotImplementedError( | |
f"device \"{self.inferencer_args.device}\" is not supported" | |
) | |
outputs = model.inference( | |
inputs, | |
max_new_tokens=max_new_tokens, | |
temperature=temperature, | |
repetition_penalty=1.0, | |
) | |
text_out = model.decode(outputs[0], skip_special_tokens=True) | |
# only return the generation, trucating the input | |
prompt_length = len(model.decode(inputs[0], skip_special_tokens=True,)) | |
text_out = text_out[prompt_length:] | |
output_dict["instances"].append({ "text": text_out }) | |
output_dataset = Dataset(DatasetArguments(dataset_path = None)) | |
output_dataset = output_dataset.from_dict(output_dict) | |
return output_dataset | |
def stream_inference(self, context, model, max_new_tokens, token_per_step, temperature, end_string, input_dataset): | |
response = "" | |
history = [] | |
if "ChatGLMModel" in self.config.architectures: | |
for response, history in model.get_backend_model().stream_chat(model.get_tokenizer(), context, history=history): | |
response = rstrip_partial_utf8(response) | |
yield response, False | |
else: | |
for _ in range(0, max_new_tokens // token_per_step): | |
output_dataset = self.inference( | |
model=model, | |
dataset=input_dataset, | |
max_new_tokens=token_per_step, | |
temperature=temperature, | |
) | |
new_append_text = output_dataset.to_dict()["instances"][0]["text"] | |
new_append_text = rstrip_partial_utf8(new_append_text) | |
response += new_append_text | |
input_dict = input_dataset.to_dict() | |
input_dict["instances"][0]["text"] += new_append_text | |
input_dataset = input_dataset.from_dict(input_dict) | |
flag_break = False | |
try: | |
index = response.index(end_string) | |
flag_break = True | |
except ValueError: | |
response += end_string | |
index = response.index(end_string) | |
response = response[:index] | |
yield response, flag_break | |