""" Finetuning the library models for sequence classification.""" import logging import os import random import sys from dataclasses import dataclass, field from typing import Optional import datasets import numpy as np from datasets import load_metric import transformers from transformers import ( DataCollatorWithPadding, EvalPrediction, HfArgumentParser, Trainer, default_data_collator, set_seed, ) from transformers.utils import check_min_version from transformers.utils.versions import require_version from shared import CATEGORIES, load_datasets, CustomTrainingArguments, train_from_checkpoint, get_last_checkpoint from preprocess import PreprocessingDatasetArguments from model import get_model_tokenizer, ModelArguments # Will error if the minimal version of Transformers is not installed. Remove at your own risks. check_min_version("4.17.0") require_version("datasets>=1.8.0", "To fix: pip install -r requirements.txt") os.environ["WANDB_DISABLED"] = "true" logger = logging.getLogger(__name__) @dataclass class DataArguments: """ Arguments pertaining to what data we are going to input our model for training and eval. Using `HfArgumentParser` we can turn this class into argparse arguments to be able to specify them on the command line. """ max_seq_length: int = field( default=512, metadata={ "help": "The maximum total input sequence length after tokenization. Sequences longer " "than this will be truncated, sequences shorter will be padded." }, ) overwrite_cache: bool = field( default=False, metadata={"help": "Overwrite the cached preprocessed datasets or not."} ) pad_to_max_length: bool = field( default=True, metadata={ "help": "Whether to pad all samples to `max_seq_length`. " "If False, will pad the samples dynamically when batching to the maximum length in the batch." }, ) max_train_samples: Optional[int] = field( default=None, metadata={ "help": "For debugging purposes or quicker training, truncate the number of training examples to this " "value if set." }, ) max_eval_samples: Optional[int] = field( default=None, metadata={ "help": "For debugging purposes or quicker training, truncate the number of evaluation examples to this " "value if set." }, ) max_predict_samples: Optional[int] = field( default=None, metadata={ "help": "For debugging purposes or quicker training, truncate the number of prediction examples to this " "value if set." }, ) dataset_cache_dir: Optional[str] = PreprocessingDatasetArguments.__dataclass_fields__[ 'dataset_cache_dir'] data_dir: Optional[str] = PreprocessingDatasetArguments.__dataclass_fields__[ 'data_dir'] train_file: Optional[str] = PreprocessingDatasetArguments.__dataclass_fields__[ 'c_train_file'] validation_file: Optional[str] = PreprocessingDatasetArguments.__dataclass_fields__[ 'c_validation_file'] test_file: Optional[str] = PreprocessingDatasetArguments.__dataclass_fields__[ 'c_test_file'] def __post_init__(self): if self.train_file is None or self.validation_file is None: raise ValueError( "Need either a GLUE task, a training/validation file or a dataset name.") else: train_extension = self.train_file.split(".")[-1] assert train_extension in [ "csv", "json"], "`train_file` should be a csv or a json file." validation_extension = self.validation_file.split(".")[-1] assert ( validation_extension == train_extension ), "`validation_file` should have the same extension (csv or json) as `train_file`." def main(): # See all possible arguments in src/transformers/training_args.py # or by passing the --help flag to this script. # We now keep distinct sets of args, for a cleaner separation of concerns. parser = HfArgumentParser( (ModelArguments, DataArguments, CustomTrainingArguments)) model_args, data_args, training_args = parser.parse_args_into_dataclasses() # Setup logging logging.basicConfig( format="%(asctime)s - %(levelname)s - %(name)s - %(message)s", datefmt="%m/%d/%Y %H:%M:%S", handlers=[logging.StreamHandler(sys.stdout)], ) log_level = training_args.get_process_log_level() logger.setLevel(log_level) datasets.utils.logging.set_verbosity(log_level) transformers.utils.logging.set_verbosity(log_level) transformers.utils.logging.enable_default_handler() transformers.utils.logging.enable_explicit_format() # Log on each process the small summary: logger.warning( f"Process rank: {training_args.local_rank}, device: {training_args.device}, n_gpu: {training_args.n_gpu}" + f"distributed training: {bool(training_args.local_rank != -1)}, 16-bits training: {training_args.fp16}" ) logger.info(f"Training/evaluation parameters {training_args}") # Detecting last checkpoint. last_checkpoint = get_last_checkpoint(training_args) # Set seed before initializing model. set_seed(training_args.seed) # Loading a dataset from your local files. # CSV/JSON training and evaluation files are needed. raw_datasets = load_datasets(data_args) # See more about loading any type of standard or custom dataset at # https://huggingface.co/docs/datasets/loading_datasets.html. config_args = { 'num_labels': len(CATEGORIES), 'id2label': {k: str(v).upper() for k, v in enumerate(CATEGORIES)}, 'label2id': {str(v).upper(): k for k, v in enumerate(CATEGORIES)} } model, tokenizer = get_model_tokenizer(model_args, training_args, config_args=config_args, model_type='classifier') # Padding strategy if data_args.pad_to_max_length: padding = "max_length" else: # We will pad later, dynamically at batch creation, to the max sequence length in each batch padding = False if data_args.max_seq_length > tokenizer.model_max_length: logger.warning( f"The max_seq_length passed ({data_args.max_seq_length}) is larger than the maximum length for the" f"model ({tokenizer.model_max_length}). Using max_seq_length={tokenizer.model_max_length}." ) max_seq_length = min(data_args.max_seq_length, tokenizer.model_max_length) def preprocess_function(examples): # Tokenize the texts result = tokenizer( examples['text'], padding=padding, max_length=max_seq_length, truncation=True) result['label'] = examples['label'] return result with training_args.main_process_first(desc="dataset map pre-processing"): raw_datasets = raw_datasets.map( preprocess_function, batched=True, load_from_cache_file=not data_args.overwrite_cache, desc="Running tokenizer on dataset", ) if training_args.do_train: if "train" not in raw_datasets: raise ValueError("--do_train requires a train dataset") train_dataset = raw_datasets["train"] if data_args.max_train_samples is not None: train_dataset = train_dataset.select( range(data_args.max_train_samples)) if training_args.do_eval: if "validation" not in raw_datasets: raise ValueError("--do_eval requires a validation dataset") eval_dataset = raw_datasets["validation"] if data_args.max_eval_samples is not None: eval_dataset = eval_dataset.select( range(data_args.max_eval_samples)) if training_args.do_predict or data_args.test_file is not None: if "test" not in raw_datasets: raise ValueError("--do_predict requires a test dataset") predict_dataset = raw_datasets["test"] if data_args.max_predict_samples is not None: predict_dataset = predict_dataset.select( range(data_args.max_predict_samples)) # Log a few random samples from the training set: if training_args.do_train: for index in random.sample(range(len(train_dataset)), 3): logger.info( f"Sample {index} of the training set: {train_dataset[index]}.") # Get the metric function metric = load_metric("accuracy") # You can define your custom compute_metrics function. It takes an `EvalPrediction` object (a namedtuple with a # predictions and label_ids field) and has to return a dictionary string to float. def compute_metrics(p: EvalPrediction): preds = p.predictions[0] if isinstance( p.predictions, tuple) else p.predictions preds = np.argmax(preds, axis=1) if data_args.task_name is not None: result = metric.compute(predictions=preds, references=p.label_ids) if len(result) > 1: result["combined_score"] = np.mean( list(result.values())).item() return result else: return {"accuracy": (preds == p.label_ids).astype(np.float32).mean().item()} # Data collator will default to DataCollatorWithPadding when the tokenizer is passed to Trainer, so we change it if # we already did the padding. if data_args.pad_to_max_length: data_collator = default_data_collator elif training_args.fp16: data_collator = DataCollatorWithPadding( tokenizer, pad_to_multiple_of=8) else: data_collator = None # Initialize our Trainer trainer = Trainer( model=model, args=training_args, train_dataset=train_dataset, eval_dataset=eval_dataset, compute_metrics=compute_metrics, tokenizer=tokenizer, data_collator=data_collator, ) # Training train_result = train_from_checkpoint( trainer, last_checkpoint, training_args) metrics = train_result.metrics max_train_samples = ( data_args.max_train_samples if data_args.max_train_samples is not None else len( train_dataset) ) metrics["train_samples"] = min(max_train_samples, len(train_dataset)) trainer.save_model() # Saves the tokenizer too for easy upload trainer.log_metrics("train", metrics) trainer.save_metrics("train", metrics) trainer.save_state() kwargs = {"finetuned_from": model_args.model_name_or_path, "tasks": "text-classification"} if training_args.push_to_hub: trainer.push_to_hub(**kwargs) else: trainer.create_model_card(**kwargs) if __name__ == "__main__": main()