#!/usr/bin/env python # coding=utf-8 """The Finetuner class simplifies the process of running finetuning process on a language model for a TunableModel instance with given dataset. """ import logging import os import sys import datasets import transformers from itertools import chain from transformers import ( Trainer, default_data_collator, set_seed, ) from transformers.utils import send_example_telemetry from lmflow.datasets.dataset import Dataset from lmflow.pipeline.base_tuner import BaseTuner logger = logging.getLogger(__name__) class Finetuner(BaseTuner): """ Initializes the `Finetuner` class with given arguments. Parameters ------------ model_args : ModelArguments object. Contains the arguments required to load the model. data_args : DatasetArguments object. Contains the arguments required to load the dataset. finetuner_args : FinetunerArguments object. Contains the arguments required to perform finetuning. args : Optional. Positional arguments. kwargs : Optional. Keyword arguments. """ def __init__(self, model_args, data_args, finetuner_args, *args, **kwargs): self.model_args = model_args self.data_args = data_args self.finetuner_args = finetuner_args # Sending telemetry. Tracking the example usage helps us better # allocate resources to maintain them. The information sent is the one # passed as arguments along with your Python/PyTorch versions. send_example_telemetry("run_clm", model_args, data_args) # Setup logging logging.basicConfig( format="%(asctime)s - %(levelname)s - %(name)s - %(message)s", datefmt="%m/%d/%Y %H:%M:%S", handlers=[logging.StreamHandler(sys.stdout)], ) log_level = finetuner_args.get_process_log_level() logger.setLevel(log_level) datasets.utils.logging.set_verbosity(log_level) transformers.utils.logging.set_verbosity(log_level) transformers.utils.logging.enable_default_handler() transformers.utils.logging.enable_explicit_format() # Log on each process the small summary: logger.warning( f"Process rank: {finetuner_args.local_rank}," f" device: {finetuner_args.device}," f" n_gpu: {finetuner_args.n_gpu}" f"distributed training: {bool(finetuner_args.local_rank != -1)}," f" 16-bits training: {finetuner_args.fp16}" ) logger.info(f"Training/evaluation parameters {finetuner_args}") # Detecting last checkpoint. last_checkpoint = None if os.path.isdir(finetuner_args.output_dir) and finetuner_args.do_train and not finetuner_args.overwrite_output_dir: last_checkpoint = get_last_checkpoint(finetuner_args.output_dir) if last_checkpoint is None and len(os.listdir(finetuner_args.output_dir)) > 0: raise ValueError( f"Output directory ({finetuner_args.output_dir}) already" " exists and is not empty. " "Use --overwrite_output_dir to overcome." ) elif last_checkpoint is not None and finetuner_args.resume_from_checkpoint is None: logger.info( f"Checkpoint detected, resuming training at" f" {last_checkpoint}. To avoid this behavior, change" " the `--output_dir` or add `--overwrite_output_dir` to" " train from scratch." ) self.last_checkpoint = last_checkpoint # Set seed before initializing model. set_seed(finetuner_args.seed) def group_text(self, tokenized_datasets, model_max_length): """ Groups texts together to form blocks of maximum length `model_max_length` and returns the processed data as a dictionary. """ data_args = self.data_args finetuner_args = self.finetuner_args if data_args.block_size is None: block_size = model_max_length if block_size > 1024: logger.warning( "The chosen tokenizer supports a `model_max_length` that is" " longer than the default `block_size` value" " of 1024. If you would like to use a longer `block_size`" " up to `tokenizer.model_max_length` you can override this " " default with `--block_size xxx`." ) block_size = 1024 else: if data_args.block_size > model_max_length: logger.warning( f"The block_size passed ({data_args.block_size}) is larger" f" than the maximum length for the model" f"({model_max_length})." f" Using block_size={model_max_length}." ) block_size = min(data_args.block_size, model_max_length) # Main data processing function that will concatenate all texts from # our dataset and generate chunks of block_size. def group_texts(examples): # Concatenate all texts. concatenated_examples = {k: list(chain(*examples[k])) for k in examples.keys()} total_length = len(concatenated_examples[list(examples.keys())[0]]) # We drop the small remainder, we could add padding if the model # supported it instead of this drop, you can customize this part to # your needs. total_length = (total_length // block_size) * block_size # Split by chunks of max_len. result = { k: [t[i : i + block_size] for i in range(0, total_length, block_size)] for k, t in concatenated_examples.items() } return result # Note that with `batched=True`, this map processes 1,000 texts # together, so group_texts throws away a remainder for each of those # groups of 1,000 texts. You can adjust that batch_size here but a # higher value might be slower to preprocess. # # To speed up this part, we use multiprocessing. See the documentation # of the map method for more information: # https://huggingface.co/docs/datasets/package_reference/main_classes.html#datasets.Dataset.map with finetuner_args.main_process_first(desc="grouping texts together"): group_batch_size = 1000 if data_args.disable_group_texts: group_batch_size = 1 if not data_args.streaming: lm_datasets = tokenized_datasets.map( group_texts, batched=True, batch_size=group_batch_size, num_proc=data_args.preprocessing_num_workers, load_from_cache_file=not data_args.overwrite_cache, desc=f"Grouping texts in chunks of {block_size}", ) else: lm_datasets = tokenized_datasets.map( group_texts, batched=True, batch_size=group_batch_size, ) return lm_datasets def tune(self, model, dataset): """ Perform tuning for a model Parameters ------------ model : TunableModel object. TunableModel to perform tuning. dataset: dataset to train model. """ model_args = self.model_args data_args = self.data_args finetuner_args = self.finetuner_args # Tokenization and text grouping must be done in the main process with finetuner_args.main_process_first(desc="dataset map tokenization"): tokenized_dataset = model.tokenize(dataset) lm_dataset = self.group_text( tokenized_dataset, model_max_length=model.get_max_length(), ) train_dataset = lm_dataset.get_backend_dataset() if finetuner_args.do_train: if data_args.max_train_samples is not None: max_train_samples = min(len(train_dataset), data_args.max_train_samples) train_dataset = train_dataset.select(range(max_train_samples)) # Initialize our Trainer training_args = finetuner_args trainer = Trainer( model=model.get_backend_model(), args=training_args, train_dataset=train_dataset if training_args.do_train else None, eval_dataset=None, tokenizer=model.get_tokenizer(), # Data collator will default to DataCollatorWithPadding, so we change it. data_collator=default_data_collator, compute_metrics=None, preprocess_logits_for_metrics=None, ) # Training if training_args.do_train: checkpoint = None last_checkpoint = self.last_checkpoint if training_args.resume_from_checkpoint is not None: checkpoint = training_args.resume_from_checkpoint elif last_checkpoint is not None: checkpoint = last_checkpoint train_result = trainer.train(resume_from_checkpoint=checkpoint) if not model_args.use_lora: trainer.save_model() # Saves the tokenizer too for easy upload else: if model_args.save_aggregated_lora: model.merge_lora_weights() model.save(finetuner_args.output_dir,model_args.save_aggregated_lora) metrics = train_result.metrics max_train_samples = ( data_args.max_train_samples if data_args.max_train_samples is not None else len(train_dataset) ) metrics["train_samples"] = min(max_train_samples, len(train_dataset)) trainer.log_metrics("train", metrics) trainer.save_metrics("train", metrics) trainer.save_state() kwargs = {"finetuned_from": model_args.model_name_or_path, "tasks": "text-generation"} if data_args.dataset_name is not None: kwargs["dataset_tags"] = data_args.dataset_name if data_args.dataset_config_name is not None: kwargs["dataset_args"] = data_args.dataset_config_name kwargs["dataset"] = f"{data_args.dataset_name} {data_args.dataset_config_name}" else: kwargs["dataset"] = data_args.dataset_name if training_args.push_to_hub: trainer.push_to_hub(**kwargs) else: trainer.create_model_card(**kwargs) return model