|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import os |
|
import sys |
|
from typing import TYPE_CHECKING, Dict, Literal, Optional, Sequence, Union |
|
|
|
import numpy as np |
|
from datasets import DatasetDict, load_dataset, load_from_disk |
|
|
|
from ..extras import logging |
|
from ..extras.constants import FILEEXT2TYPE |
|
from ..extras.misc import check_version, has_tokenized_data |
|
from .converter import align_dataset |
|
from .data_utils import merge_dataset, split_dataset |
|
from .parser import get_dataset_list |
|
from .processor import ( |
|
FeedbackDatasetProcessor, |
|
PackedSupervisedDatasetProcessor, |
|
PairwiseDatasetProcessor, |
|
PretrainDatasetProcessor, |
|
SupervisedDatasetProcessor, |
|
UnsupervisedDatasetProcessor, |
|
) |
|
|
|
|
|
if TYPE_CHECKING: |
|
from datasets import Dataset, IterableDataset |
|
from transformers import PreTrainedTokenizer, ProcessorMixin, Seq2SeqTrainingArguments |
|
|
|
from ..hparams import DataArguments, ModelArguments |
|
from .data_utils import DatasetModule |
|
from .parser import DatasetAttr |
|
from .processor import DatasetProcessor |
|
from .template import Template |
|
|
|
|
|
logger = logging.get_logger(__name__) |
|
|
|
|
|
def _load_single_dataset( |
|
dataset_attr: "DatasetAttr", |
|
model_args: "ModelArguments", |
|
data_args: "DataArguments", |
|
training_args: "Seq2SeqTrainingArguments", |
|
) -> Union["Dataset", "IterableDataset"]: |
|
r""" |
|
Loads a single dataset and aligns it to the standard format. |
|
""" |
|
logger.info_rank0(f"Loading dataset {dataset_attr}...") |
|
data_path, data_name, data_dir, data_files = None, None, None, None |
|
if dataset_attr.load_from in ["hf_hub", "ms_hub", "om_hub"]: |
|
data_path = dataset_attr.dataset_name |
|
data_name = dataset_attr.subset |
|
data_dir = dataset_attr.folder |
|
|
|
elif dataset_attr.load_from == "script": |
|
data_path = os.path.join(data_args.dataset_dir, dataset_attr.dataset_name) |
|
data_name = dataset_attr.subset |
|
data_dir = dataset_attr.folder |
|
|
|
elif dataset_attr.load_from == "file": |
|
data_files = [] |
|
local_path = os.path.join(data_args.dataset_dir, dataset_attr.dataset_name) |
|
if os.path.isdir(local_path): |
|
for file_name in os.listdir(local_path): |
|
data_files.append(os.path.join(local_path, file_name)) |
|
elif os.path.isfile(local_path): |
|
data_files.append(local_path) |
|
else: |
|
raise ValueError(f"File {local_path} not found.") |
|
|
|
data_path = FILEEXT2TYPE.get(os.path.splitext(data_files[0])[-1][1:], None) |
|
if data_path is None: |
|
raise ValueError("Allowed file types: {}.".format(",".join(FILEEXT2TYPE.keys()))) |
|
|
|
if any(data_path != FILEEXT2TYPE.get(os.path.splitext(data_file)[-1][1:], None) for data_file in data_files): |
|
raise ValueError("File types should be identical.") |
|
else: |
|
raise NotImplementedError(f"Unknown load type: {dataset_attr.load_from}.") |
|
|
|
if dataset_attr.load_from == "ms_hub": |
|
check_version("modelscope>=1.11.0", mandatory=True) |
|
from modelscope import MsDataset |
|
from modelscope.utils.config_ds import MS_DATASETS_CACHE |
|
|
|
cache_dir = model_args.cache_dir or MS_DATASETS_CACHE |
|
dataset = MsDataset.load( |
|
dataset_name=data_path, |
|
subset_name=data_name, |
|
data_dir=data_dir, |
|
data_files=data_files, |
|
split=dataset_attr.split, |
|
cache_dir=cache_dir, |
|
token=model_args.ms_hub_token, |
|
use_streaming=data_args.streaming, |
|
) |
|
if isinstance(dataset, MsDataset): |
|
dataset = dataset.to_hf_dataset() |
|
|
|
elif dataset_attr.load_from == "om_hub": |
|
check_version("openmind>=0.8.0", mandatory=True) |
|
from openmind import OmDataset |
|
from openmind.utils.hub import OM_DATASETS_CACHE |
|
|
|
cache_dir = model_args.cache_dir or OM_DATASETS_CACHE |
|
dataset = OmDataset.load_dataset( |
|
path=data_path, |
|
name=data_name, |
|
data_dir=data_dir, |
|
data_files=data_files, |
|
split=dataset_attr.split, |
|
cache_dir=cache_dir, |
|
token=model_args.om_hub_token, |
|
streaming=data_args.streaming, |
|
) |
|
else: |
|
dataset = load_dataset( |
|
path=data_path, |
|
name=data_name, |
|
data_dir=data_dir, |
|
data_files=data_files, |
|
split=dataset_attr.split, |
|
cache_dir=model_args.cache_dir, |
|
token=model_args.hf_hub_token, |
|
streaming=data_args.streaming, |
|
num_proc=data_args.preprocessing_num_workers, |
|
trust_remote_code=model_args.trust_remote_code, |
|
) |
|
|
|
if dataset_attr.num_samples is not None and not data_args.streaming: |
|
target_num = dataset_attr.num_samples |
|
indexes = np.random.permutation(len(dataset))[:target_num] |
|
target_num -= len(indexes) |
|
if target_num > 0: |
|
expand_indexes = np.random.choice(len(dataset), target_num) |
|
indexes = np.concatenate((indexes, expand_indexes), axis=0) |
|
|
|
assert len(indexes) == dataset_attr.num_samples, "Sample num mismatched." |
|
dataset = dataset.select(indexes) |
|
logger.info_rank0(f"Sampled {dataset_attr.num_samples} examples from dataset {dataset_attr}.") |
|
|
|
if data_args.max_samples is not None: |
|
max_samples = min(data_args.max_samples, len(dataset)) |
|
dataset = dataset.select(range(max_samples)) |
|
|
|
return align_dataset(dataset, dataset_attr, data_args, training_args) |
|
|
|
|
|
def _get_merged_dataset( |
|
dataset_names: Optional[Sequence[str]], |
|
model_args: "ModelArguments", |
|
data_args: "DataArguments", |
|
training_args: "Seq2SeqTrainingArguments", |
|
stage: Literal["pt", "sft", "rm", "ppo", "kto"], |
|
merge: bool = True, |
|
) -> Optional[Union["Dataset", "IterableDataset", Dict[str, "Dataset"]]]: |
|
r""" |
|
Returns the merged datasets in the standard format. |
|
""" |
|
if dataset_names is None: |
|
return None |
|
|
|
datasets = {} |
|
for dataset_name, dataset_attr in zip(dataset_names, get_dataset_list(dataset_names, data_args.dataset_dir)): |
|
if (stage == "rm" and dataset_attr.ranking is False) or (stage != "rm" and dataset_attr.ranking is True): |
|
raise ValueError("The dataset is not applicable in the current training stage.") |
|
|
|
datasets[dataset_name] = _load_single_dataset(dataset_attr, model_args, data_args, training_args) |
|
|
|
if merge: |
|
return merge_dataset(list(datasets.values()), data_args, seed=training_args.seed) |
|
else: |
|
return datasets |
|
|
|
|
|
def _get_dataset_processor( |
|
data_args: "DataArguments", |
|
stage: Literal["pt", "sft", "rm", "ppo", "kto"], |
|
template: "Template", |
|
tokenizer: "PreTrainedTokenizer", |
|
processor: Optional["ProcessorMixin"], |
|
do_generate: bool = False, |
|
) -> "DatasetProcessor": |
|
r""" |
|
Returns the corresponding dataset processor. |
|
""" |
|
if stage == "pt": |
|
dataset_processor_class = PretrainDatasetProcessor |
|
elif stage == "sft" and not do_generate: |
|
if data_args.packing: |
|
if data_args.neat_packing: |
|
from datasets.arrow_writer import OptimizedTypedSequence, TypedSequence |
|
|
|
def __init__(self, data, **kwargs): |
|
return TypedSequence.__init__( |
|
self, |
|
data, |
|
type=kwargs.pop("type", None), |
|
try_type=kwargs.pop("try_type", None), |
|
optimized_int_type=kwargs.pop("optimized_int_type", None), |
|
) |
|
|
|
OptimizedTypedSequence.__init__ = __init__ |
|
dataset_processor_class = PackedSupervisedDatasetProcessor |
|
else: |
|
dataset_processor_class = SupervisedDatasetProcessor |
|
|
|
elif stage == "rm": |
|
dataset_processor_class = PairwiseDatasetProcessor |
|
elif stage == "kto": |
|
dataset_processor_class = FeedbackDatasetProcessor |
|
else: |
|
dataset_processor_class = UnsupervisedDatasetProcessor |
|
|
|
return dataset_processor_class(template=template, tokenizer=tokenizer, processor=processor, data_args=data_args) |
|
|
|
|
|
def _get_preprocessed_dataset( |
|
dataset: Optional[Union["Dataset", "IterableDataset"]], |
|
data_args: "DataArguments", |
|
training_args: "Seq2SeqTrainingArguments", |
|
stage: Literal["pt", "sft", "rm", "ppo", "kto"], |
|
template: "Template", |
|
tokenizer: "PreTrainedTokenizer", |
|
processor: Optional["ProcessorMixin"] = None, |
|
is_eval: bool = False, |
|
) -> Optional[Union["Dataset", "IterableDataset"]]: |
|
r""" |
|
Preprocesses the dataset, including format checking and tokenization. |
|
""" |
|
if dataset is None: |
|
return None |
|
|
|
dataset_processor = _get_dataset_processor( |
|
data_args, stage, template, tokenizer, processor, do_generate=(training_args.predict_with_generate and is_eval) |
|
) |
|
column_names = list(next(iter(dataset)).keys()) |
|
kwargs = {} |
|
if not data_args.streaming: |
|
kwargs = dict( |
|
num_proc=data_args.preprocessing_num_workers, |
|
load_from_cache_file=(not data_args.overwrite_cache) or (training_args.local_process_index != 0), |
|
desc="Running tokenizer on dataset", |
|
) |
|
|
|
dataset = dataset.map( |
|
dataset_processor.preprocess_dataset, |
|
batched=True, |
|
batch_size=data_args.preprocessing_batch_size, |
|
remove_columns=column_names, |
|
**kwargs, |
|
) |
|
|
|
if training_args.should_log: |
|
try: |
|
print("eval example:" if is_eval else "training example:") |
|
dataset_processor.print_data_example(next(iter(dataset))) |
|
except StopIteration: |
|
if stage == "pt": |
|
raise RuntimeError("Cannot find sufficient samples, consider increasing dataset size.") |
|
else: |
|
raise RuntimeError("Cannot find valid samples, check `data/README.md` for the data format.") |
|
|
|
return dataset |
|
|
|
|
|
def get_dataset( |
|
template: "Template", |
|
model_args: "ModelArguments", |
|
data_args: "DataArguments", |
|
training_args: "Seq2SeqTrainingArguments", |
|
stage: Literal["pt", "sft", "rm", "ppo", "kto"], |
|
tokenizer: "PreTrainedTokenizer", |
|
processor: Optional["ProcessorMixin"] = None, |
|
) -> "DatasetModule": |
|
r""" |
|
Gets the train dataset and optionally gets the evaluation dataset. |
|
""" |
|
|
|
if data_args.tokenized_path is not None: |
|
if has_tokenized_data(data_args.tokenized_path): |
|
logger.warning_rank0("Loading dataset from disk will ignore other data arguments.") |
|
tokenized_data: Union["Dataset", "DatasetDict"] = load_from_disk(data_args.tokenized_path) |
|
logger.info_rank0(f"Loaded tokenized dataset from {data_args.tokenized_path}.") |
|
|
|
dataset_module: Dict[str, "Dataset"] = {} |
|
if isinstance(tokenized_data, DatasetDict): |
|
if "train" in tokenized_data: |
|
dataset_module["train_dataset"] = tokenized_data["train"] |
|
|
|
if "validation" in tokenized_data: |
|
dataset_module["eval_dataset"] = tokenized_data["validation"] |
|
|
|
else: |
|
dataset_module["train_dataset"] = tokenized_data |
|
|
|
if data_args.streaming: |
|
dataset_module = {k: v.to_iterable_dataset() for k, v in dataset_module.items()} |
|
|
|
return dataset_module |
|
|
|
if data_args.streaming: |
|
raise ValueError("Turn off `streaming` when saving dataset to disk.") |
|
|
|
|
|
with training_args.main_process_first(desc="load dataset"): |
|
dataset = _get_merged_dataset(data_args.dataset, model_args, data_args, training_args, stage) |
|
eval_dataset = _get_merged_dataset( |
|
data_args.eval_dataset, model_args, data_args, training_args, stage, merge=training_args.do_predict |
|
) |
|
|
|
with training_args.main_process_first(desc="pre-process dataset"): |
|
dataset = _get_preprocessed_dataset( |
|
dataset, data_args, training_args, stage, template, tokenizer, processor, is_eval=False |
|
) |
|
if isinstance(eval_dataset, dict): |
|
for eval_name, eval_data in eval_dataset.items(): |
|
eval_dataset[eval_name] = _get_preprocessed_dataset( |
|
eval_data, data_args, training_args, stage, template, tokenizer, processor, is_eval=True |
|
) |
|
else: |
|
eval_dataset = _get_preprocessed_dataset( |
|
eval_dataset, data_args, training_args, stage, template, tokenizer, processor, is_eval=True |
|
) |
|
|
|
if data_args.val_size > 1e-6: |
|
dataset_dict = split_dataset(dataset, data_args, seed=training_args.seed) |
|
else: |
|
dataset_dict = {} |
|
if dataset is not None: |
|
if data_args.streaming: |
|
dataset = dataset.shuffle(buffer_size=data_args.buffer_size, seed=training_args.seed) |
|
|
|
dataset_dict["train"] = dataset |
|
|
|
if eval_dataset is not None: |
|
if isinstance(eval_dataset, dict): |
|
dataset_dict.update({f"validation_{name}": data for name, data in eval_dataset.items()}) |
|
else: |
|
if data_args.streaming: |
|
eval_dataset = eval_dataset.shuffle(buffer_size=data_args.buffer_size, seed=training_args.seed) |
|
|
|
dataset_dict["validation"] = eval_dataset |
|
|
|
dataset_dict = DatasetDict(dataset_dict) |
|
|
|
if data_args.tokenized_path is not None: |
|
if training_args.should_save: |
|
dataset_dict.save_to_disk(data_args.tokenized_path) |
|
logger.info_rank0(f"Tokenized dataset is saved at {data_args.tokenized_path}.") |
|
logger.info_rank0(f"Please restart the training with `tokenized_path: {data_args.tokenized_path}`.") |
|
|
|
sys.exit(0) |
|
|
|
dataset_module = {} |
|
if "train" in dataset_dict: |
|
dataset_module["train_dataset"] = dataset_dict["train"] |
|
|
|
if "validation" in dataset_dict: |
|
dataset_module["eval_dataset"] = dataset_dict["validation"] |
|
else: |
|
eval_dataset = {} |
|
for key in dataset_dict.keys(): |
|
if key.startswith("validation_"): |
|
eval_dataset[key[len("validation_") :]] = dataset_dict[key] |
|
|
|
if len(eval_dataset): |
|
dataset_module["eval_dataset"] = eval_dataset |
|
|
|
return dataset_module |
|
|