Spaces:
Runtime error
Runtime error
#!/usr/bin/env python | |
# coding=utf-8 | |
"""The Finetuner class simplifies the process of running finetuning process on a language model for a TunableModel instance with given dataset. | |
""" | |
import logging | |
import os | |
import sys | |
import datasets | |
import transformers | |
from itertools import chain | |
from transformers import ( | |
Trainer, | |
default_data_collator, | |
set_seed, | |
) | |
from transformers.utils import send_example_telemetry | |
from lmflow.datasets.dataset import Dataset | |
from lmflow.pipeline.base_tuner import BaseTuner | |
logger = logging.getLogger(__name__) | |
class Finetuner(BaseTuner): | |
""" | |
Initializes the `Finetuner` class with given arguments. | |
Parameters | |
------------ | |
model_args : ModelArguments object. | |
Contains the arguments required to load the model. | |
data_args : DatasetArguments object. | |
Contains the arguments required to load the dataset. | |
finetuner_args : FinetunerArguments object. | |
Contains the arguments required to perform finetuning. | |
args : Optional. | |
Positional arguments. | |
kwargs : Optional. | |
Keyword arguments. | |
""" | |
def __init__(self, model_args, data_args, finetuner_args, *args, **kwargs): | |
self.model_args = model_args | |
self.data_args = data_args | |
self.finetuner_args = finetuner_args | |
# Sending telemetry. Tracking the example usage helps us better | |
# allocate resources to maintain them. The information sent is the one | |
# passed as arguments along with your Python/PyTorch versions. | |
send_example_telemetry("run_clm", model_args, data_args) | |
# Setup logging | |
logging.basicConfig( | |
format="%(asctime)s - %(levelname)s - %(name)s - %(message)s", | |
datefmt="%m/%d/%Y %H:%M:%S", | |
handlers=[logging.StreamHandler(sys.stdout)], | |
) | |
log_level = finetuner_args.get_process_log_level() | |
logger.setLevel(log_level) | |
datasets.utils.logging.set_verbosity(log_level) | |
transformers.utils.logging.set_verbosity(log_level) | |
transformers.utils.logging.enable_default_handler() | |
transformers.utils.logging.enable_explicit_format() | |
# Log on each process the small summary: | |
logger.warning( | |
f"Process rank: {finetuner_args.local_rank}," | |
f" device: {finetuner_args.device}," | |
f" n_gpu: {finetuner_args.n_gpu}" | |
f"distributed training: {bool(finetuner_args.local_rank != -1)}," | |
f" 16-bits training: {finetuner_args.fp16}" | |
) | |
logger.info(f"Training/evaluation parameters {finetuner_args}") | |
# Detecting last checkpoint. | |
last_checkpoint = None | |
if os.path.isdir(finetuner_args.output_dir) and finetuner_args.do_train and not finetuner_args.overwrite_output_dir: | |
last_checkpoint = get_last_checkpoint(finetuner_args.output_dir) | |
if last_checkpoint is None and len(os.listdir(finetuner_args.output_dir)) > 0: | |
raise ValueError( | |
f"Output directory ({finetuner_args.output_dir}) already" | |
" exists and is not empty. " | |
"Use --overwrite_output_dir to overcome." | |
) | |
elif last_checkpoint is not None and finetuner_args.resume_from_checkpoint is None: | |
logger.info( | |
f"Checkpoint detected, resuming training at" | |
f" {last_checkpoint}. To avoid this behavior, change" | |
" the `--output_dir` or add `--overwrite_output_dir` to" | |
" train from scratch." | |
) | |
self.last_checkpoint = last_checkpoint | |
# Set seed before initializing model. | |
set_seed(finetuner_args.seed) | |
def group_text(self, tokenized_datasets, model_max_length): | |
""" | |
Groups texts together to form blocks of maximum length `model_max_length` and returns the processed data as | |
a dictionary. | |
""" | |
data_args = self.data_args | |
finetuner_args = self.finetuner_args | |
if data_args.block_size is None: | |
block_size = model_max_length | |
if block_size > 1024: | |
logger.warning( | |
"The chosen tokenizer supports a `model_max_length` that is" | |
" longer than the default `block_size` value" | |
" of 1024. If you would like to use a longer `block_size`" | |
" up to `tokenizer.model_max_length` you can override this " | |
" default with `--block_size xxx`." | |
) | |
block_size = 1024 | |
else: | |
if data_args.block_size > model_max_length: | |
logger.warning( | |
f"The block_size passed ({data_args.block_size}) is larger" | |
f" than the maximum length for the model" | |
f"({model_max_length})." | |
f" Using block_size={model_max_length}." | |
) | |
block_size = min(data_args.block_size, model_max_length) | |
# Main data processing function that will concatenate all texts from | |
# our dataset and generate chunks of block_size. | |
def group_texts(examples): | |
# Concatenate all texts. | |
concatenated_examples = {k: list(chain(*examples[k])) for k in examples.keys()} | |
total_length = len(concatenated_examples[list(examples.keys())[0]]) | |
# We drop the small remainder, we could add padding if the model | |
# supported it instead of this drop, you can customize this part to | |
# your needs. | |
total_length = (total_length // block_size) * block_size | |
# Split by chunks of max_len. | |
result = { | |
k: [t[i : i + block_size] for i in range(0, total_length, block_size)] | |
for k, t in concatenated_examples.items() | |
} | |
return result | |
# Note that with `batched=True`, this map processes 1,000 texts | |
# together, so group_texts throws away a remainder for each of those | |
# groups of 1,000 texts. You can adjust that batch_size here but a | |
# higher value might be slower to preprocess. | |
# | |
# To speed up this part, we use multiprocessing. See the documentation | |
# of the map method for more information: | |
# https://huggingface.co/docs/datasets/package_reference/main_classes.html#datasets.Dataset.map | |
with finetuner_args.main_process_first(desc="grouping texts together"): | |
group_batch_size = 1000 | |
if data_args.disable_group_texts: | |
group_batch_size = 1 | |
if not data_args.streaming: | |
lm_datasets = tokenized_datasets.map( | |
group_texts, | |
batched=True, | |
batch_size=group_batch_size, | |
num_proc=data_args.preprocessing_num_workers, | |
load_from_cache_file=not data_args.overwrite_cache, | |
desc=f"Grouping texts in chunks of {block_size}", | |
) | |
else: | |
lm_datasets = tokenized_datasets.map( | |
group_texts, | |
batched=True, | |
batch_size=group_batch_size, | |
) | |
return lm_datasets | |
def tune(self, model, dataset): | |
""" | |
Perform tuning for a model | |
Parameters | |
------------ | |
model : TunableModel object. | |
TunableModel to perform tuning. | |
dataset: | |
dataset to train model. | |
""" | |
model_args = self.model_args | |
data_args = self.data_args | |
finetuner_args = self.finetuner_args | |
# Tokenization and text grouping must be done in the main process | |
with finetuner_args.main_process_first(desc="dataset map tokenization"): | |
tokenized_dataset = model.tokenize(dataset) | |
lm_dataset = self.group_text( | |
tokenized_dataset, | |
model_max_length=model.get_max_length(), | |
) | |
train_dataset = lm_dataset.get_backend_dataset() | |
if finetuner_args.do_train: | |
if data_args.max_train_samples is not None: | |
max_train_samples = min(len(train_dataset), data_args.max_train_samples) | |
train_dataset = train_dataset.select(range(max_train_samples)) | |
# Initialize our Trainer | |
training_args = finetuner_args | |
trainer = Trainer( | |
model=model.get_backend_model(), | |
args=training_args, | |
train_dataset=train_dataset if training_args.do_train else None, | |
eval_dataset=None, | |
tokenizer=model.get_tokenizer(), | |
# Data collator will default to DataCollatorWithPadding, so we change it. | |
data_collator=default_data_collator, | |
compute_metrics=None, | |
preprocess_logits_for_metrics=None, | |
) | |
# Training | |
if training_args.do_train: | |
checkpoint = None | |
last_checkpoint = self.last_checkpoint | |
if training_args.resume_from_checkpoint is not None: | |
checkpoint = training_args.resume_from_checkpoint | |
elif last_checkpoint is not None: | |
checkpoint = last_checkpoint | |
train_result = trainer.train(resume_from_checkpoint=checkpoint) | |
if not model_args.use_lora: | |
trainer.save_model() # Saves the tokenizer too for easy upload | |
else: | |
if model_args.save_aggregated_lora: | |
model.merge_lora_weights() | |
model.save(finetuner_args.output_dir,model_args.save_aggregated_lora) | |
metrics = train_result.metrics | |
max_train_samples = ( | |
data_args.max_train_samples if data_args.max_train_samples is not None else len(train_dataset) | |
) | |
metrics["train_samples"] = min(max_train_samples, len(train_dataset)) | |
trainer.log_metrics("train", metrics) | |
trainer.save_metrics("train", metrics) | |
trainer.save_state() | |
kwargs = {"finetuned_from": model_args.model_name_or_path, "tasks": "text-generation"} | |
if data_args.dataset_name is not None: | |
kwargs["dataset_tags"] = data_args.dataset_name | |
if data_args.dataset_config_name is not None: | |
kwargs["dataset_args"] = data_args.dataset_config_name | |
kwargs["dataset"] = f"{data_args.dataset_name} {data_args.dataset_config_name}" | |
else: | |
kwargs["dataset"] = data_args.dataset_name | |
if training_args.push_to_hub: | |
trainer.push_to_hub(**kwargs) | |
else: | |
trainer.create_model_card(**kwargs) | |
return model | |