Spaces:
Runtime error
Runtime error
# Copyright (c) Facebook, Inc. and its affiliates. | |
# | |
# This source code is licensed under the MIT license found in the | |
# LICENSE file in the root directory of this source tree. | |
import shutil | |
import struct | |
from functools import lru_cache | |
import numpy as np | |
import torch | |
from fairseq.dataclass.constants import DATASET_IMPL_CHOICES | |
from fairseq.data.fasta_dataset import FastaDataset | |
from fairseq.file_io import PathManager | |
from fairseq.data.huffman import HuffmanMMapIndexedDataset, HuffmanMMapIndex | |
from . import FairseqDataset | |
from typing import Union | |
def best_fitting_int_dtype( | |
max_int_to_represent, | |
) -> Union[np.uint16, np.uint32, np.int64]: | |
if max_int_to_represent is None: | |
return np.uint32 # Safe guess | |
elif max_int_to_represent < 65500: | |
return np.uint16 | |
elif max_int_to_represent < 4294967295: | |
return np.uint32 | |
else: | |
return np.int64 | |
# we avoid np.uint64 because it doesn't save space and its type promotion behaves unexpectedly | |
# https://github.com/numpy/numpy/issues/5745 | |
def get_available_dataset_impl(): | |
return list(map(str, DATASET_IMPL_CHOICES)) | |
def infer_dataset_impl(path): | |
if IndexedRawTextDataset.exists(path): | |
return "raw" | |
elif IndexedDataset.exists(path): | |
with open(index_file_path(path), "rb") as f: | |
magic = f.read(8) | |
if magic == IndexedDataset._HDR_MAGIC: | |
return "cached" | |
elif magic == MMapIndexedDataset.Index._HDR_MAGIC[:8]: | |
return "mmap" | |
elif magic == HuffmanMMapIndex._HDR_MAGIC[:8]: | |
return "huffman" | |
else: | |
return None | |
elif FastaDataset.exists(path): | |
return "fasta" | |
else: | |
return None | |
def make_builder(out_file, impl, vocab_size=None): | |
if impl == "mmap": | |
return MMapIndexedDatasetBuilder( | |
out_file, dtype=best_fitting_int_dtype(vocab_size) | |
) | |
elif impl == "fasta": | |
raise NotImplementedError | |
elif impl == "huffman": | |
raise ValueError( | |
"Use HuffmanCodeBuilder directly as it has a different interface." | |
) | |
else: | |
return IndexedDatasetBuilder(out_file) | |
def make_dataset(path, impl, fix_lua_indexing=False, dictionary=None): | |
if impl == "raw" and IndexedRawTextDataset.exists(path): | |
assert dictionary is not None | |
return IndexedRawTextDataset(path, dictionary) | |
elif impl == "lazy" and IndexedDataset.exists(path): | |
return IndexedDataset(path, fix_lua_indexing=fix_lua_indexing) | |
elif impl == "cached" and IndexedDataset.exists(path): | |
return IndexedCachedDataset(path, fix_lua_indexing=fix_lua_indexing) | |
elif impl == "mmap" and MMapIndexedDataset.exists(path): | |
return MMapIndexedDataset(path) | |
elif impl == "fasta" and FastaDataset.exists(path): | |
from fairseq.data.fasta_dataset import EncodedFastaDataset | |
return EncodedFastaDataset(path, dictionary) | |
elif impl == "huffman" and HuffmanMMapIndexedDataset.exists(path): | |
return HuffmanMMapIndexedDataset(path) | |
return None | |
def dataset_exists(path, impl): | |
if impl == "raw": | |
return IndexedRawTextDataset.exists(path) | |
elif impl == "mmap": | |
return MMapIndexedDataset.exists(path) | |
elif impl == "huffman": | |
return HuffmanMMapIndexedDataset.exists(path) | |
else: | |
return IndexedDataset.exists(path) | |
def read_longs(f, n): | |
a = np.empty(n, dtype=np.int64) | |
f.readinto(a) | |
return a | |
def write_longs(f, a): | |
f.write(np.array(a, dtype=np.int64)) | |
_code_to_dtype = { | |
1: np.uint8, | |
2: np.int8, | |
3: np.int16, | |
4: np.int32, | |
5: np.int64, | |
6: np.float64, | |
7: np.double, | |
8: np.uint16, | |
9: np.uint32, | |
10: np.uint64, | |
} | |
def _dtype_header_code(dtype) -> int: | |
for k in _code_to_dtype.keys(): | |
if _code_to_dtype[k] == dtype: | |
return k | |
raise ValueError(dtype) | |
def index_file_path(prefix_path): | |
return prefix_path + ".idx" | |
def data_file_path(prefix_path): | |
return prefix_path + ".bin" | |
class IndexedDataset(FairseqDataset): | |
"""Loader for TorchNet IndexedDataset""" | |
_HDR_MAGIC = b"TNTIDX\x00\x00" | |
def __init__(self, path, fix_lua_indexing=False): | |
super().__init__() | |
self.path = path | |
self.fix_lua_indexing = fix_lua_indexing | |
self.data_file = None | |
self.read_index(path) | |
def read_index(self, path): | |
with open(index_file_path(path), "rb") as f: | |
magic = f.read(8) | |
assert magic == self._HDR_MAGIC, ( | |
"Index file doesn't match expected format. " | |
"Make sure that --dataset-impl is configured properly." | |
) | |
version = f.read(8) | |
assert struct.unpack("<Q", version) == (1,) | |
code, self.element_size = struct.unpack("<QQ", f.read(16)) | |
self.dtype = _code_to_dtype[code] | |
self._len, self.s = struct.unpack("<QQ", f.read(16)) | |
self.dim_offsets = read_longs(f, self._len + 1) | |
self.data_offsets = read_longs(f, self._len + 1) | |
self.sizes = read_longs(f, self.s) | |
def read_data(self, path): | |
self.data_file = open(data_file_path(path), "rb", buffering=0) | |
def check_index(self, i): | |
if i < 0 or i >= self._len: | |
raise IndexError("index out of range") | |
def __del__(self): | |
if self.data_file: | |
self.data_file.close() | |
def __getitem__(self, i) -> torch.Tensor: | |
if not self.data_file: | |
self.read_data(self.path) | |
self.check_index(i) | |
tensor_size = self.sizes[self.dim_offsets[i] : self.dim_offsets[i + 1]] | |
a = np.empty(tensor_size, dtype=self.dtype) | |
self.data_file.seek(self.data_offsets[i] * self.element_size) | |
self.data_file.readinto(a) | |
item = torch.from_numpy(a).long() | |
if self.fix_lua_indexing: | |
item -= 1 # subtract 1 for 0-based indexing | |
return item | |
def __len__(self): | |
return self._len | |
def num_tokens(self, index): | |
return self.sizes[index] | |
def size(self, index): | |
return self.sizes[index] | |
def exists(path): | |
return PathManager.exists(index_file_path(path)) and PathManager.exists( | |
data_file_path(path) | |
) | |
def supports_prefetch(self): | |
return False # avoid prefetching to save memory | |
class IndexedCachedDataset(IndexedDataset): | |
def __init__(self, path, fix_lua_indexing=False): | |
super().__init__(path, fix_lua_indexing=fix_lua_indexing) | |
self.cache = None | |
self.cache_index = {} | |
def supports_prefetch(self): | |
return True | |
def prefetch(self, indices): | |
if all(i in self.cache_index for i in indices): | |
return | |
if not self.data_file: | |
self.read_data(self.path) | |
indices = sorted(set(indices)) | |
total_size = 0 | |
for i in indices: | |
total_size += self.data_offsets[i + 1] - self.data_offsets[i] | |
self.cache = np.empty(total_size, dtype=self.dtype) | |
ptx = 0 | |
self.cache_index.clear() | |
for i in indices: | |
self.cache_index[i] = ptx | |
size = self.data_offsets[i + 1] - self.data_offsets[i] | |
a = self.cache[ptx : ptx + size] | |
self.data_file.seek(self.data_offsets[i] * self.element_size) | |
self.data_file.readinto(a) | |
ptx += size | |
if self.data_file: | |
# close and delete data file after prefetch so we can pickle | |
self.data_file.close() | |
self.data_file = None | |
def __getitem__(self, i): | |
self.check_index(i) | |
tensor_size = self.sizes[self.dim_offsets[i] : self.dim_offsets[i + 1]] | |
a = np.empty(tensor_size, dtype=self.dtype) | |
ptx = self.cache_index[i] | |
np.copyto(a, self.cache[ptx : ptx + a.size]) | |
item = torch.from_numpy(a).long() | |
if self.fix_lua_indexing: | |
item -= 1 # subtract 1 for 0-based indexing | |
return item | |
class IndexedRawTextDataset(FairseqDataset): | |
"""Takes a text file as input and binarizes it in memory at instantiation. | |
Original lines are also kept in memory""" | |
def __init__(self, path, dictionary, append_eos=True, reverse_order=False): | |
self.tokens_list = [] | |
self.lines = [] | |
self.sizes = [] | |
self.append_eos = append_eos | |
self.reverse_order = reverse_order | |
self.read_data(path, dictionary) | |
self.size = len(self.tokens_list) | |
def read_data(self, path, dictionary): | |
with open(path, "r", encoding="utf-8") as f: | |
for line in f: | |
self.lines.append(line.strip("\n")) | |
tokens = dictionary.encode_line( | |
line, | |
add_if_not_exist=False, | |
append_eos=self.append_eos, | |
reverse_order=self.reverse_order, | |
).long() | |
self.tokens_list.append(tokens) | |
self.sizes.append(len(tokens)) | |
self.sizes = np.array(self.sizes) | |
def check_index(self, i): | |
if i < 0 or i >= self.size: | |
raise IndexError("index out of range") | |
def __getitem__(self, i): | |
self.check_index(i) | |
return self.tokens_list[i] | |
def get_original_text(self, i): | |
self.check_index(i) | |
return self.lines[i] | |
def __del__(self): | |
pass | |
def __len__(self): | |
return self.size | |
def num_tokens(self, index): | |
return self.sizes[index] | |
def size(self, index): | |
return self.sizes[index] | |
def exists(path): | |
return PathManager.exists(path) | |
class IndexedDatasetBuilder: | |
element_sizes = { | |
np.uint8: 1, | |
np.int8: 1, | |
np.int16: 2, | |
np.int32: 4, | |
np.int64: 8, | |
np.float64: 4, | |
np.double: 8, | |
} | |
def __init__(self, out_file, dtype=np.int32): | |
self.out_file = open(out_file, "wb") | |
self.dtype = dtype | |
self.data_offsets = [0] | |
self.dim_offsets = [0] | |
self.sizes = [] | |
self.element_size = self.element_sizes[self.dtype] | |
def add_item(self, tensor): | |
# +1 for Lua compatibility | |
bytes = self.out_file.write(np.array(tensor.numpy() + 1, dtype=self.dtype)) | |
self.data_offsets.append(self.data_offsets[-1] + bytes / self.element_size) | |
for s in tensor.size(): | |
self.sizes.append(s) | |
self.dim_offsets.append(self.dim_offsets[-1] + len(tensor.size())) | |
def merge_file_(self, another_file): | |
index = IndexedDataset(another_file) | |
assert index.dtype == self.dtype | |
begin = self.data_offsets[-1] | |
for offset in index.data_offsets[1:]: | |
self.data_offsets.append(begin + offset) | |
self.sizes.extend(index.sizes) | |
begin = self.dim_offsets[-1] | |
for dim_offset in index.dim_offsets[1:]: | |
self.dim_offsets.append(begin + dim_offset) | |
with open(data_file_path(another_file), "rb") as f: | |
while True: | |
data = f.read(1024) | |
if data: | |
self.out_file.write(data) | |
else: | |
break | |
def finalize(self, index_file): | |
self.out_file.close() | |
index = open(index_file, "wb") | |
index.write(b"TNTIDX\x00\x00") | |
index.write(struct.pack("<Q", 1)) | |
index.write( | |
struct.pack("<QQ", _dtype_header_code(self.dtype), self.element_size) | |
) | |
index.write(struct.pack("<QQ", len(self.data_offsets) - 1, len(self.sizes))) | |
write_longs(index, self.dim_offsets) | |
write_longs(index, self.data_offsets) | |
write_longs(index, self.sizes) | |
index.close() | |
def _warmup_mmap_file(path): | |
with open(path, "rb") as stream: | |
while stream.read(100 * 1024 * 1024): | |
pass | |
class MMapIndexedDataset(torch.utils.data.Dataset): | |
class Index: | |
_HDR_MAGIC = b"MMIDIDX\x00\x00" | |
def writer(cls, path, dtype): | |
class _Writer: | |
def __enter__(self): | |
self._file = open(path, "wb") | |
self._file.write(cls._HDR_MAGIC) | |
self._file.write(struct.pack("<Q", 1)) | |
self._file.write(struct.pack("<B", _dtype_header_code(dtype))) | |
return self | |
def _get_pointers(sizes): | |
dtype_size = dtype().itemsize | |
address = 0 | |
pointers = [] | |
for size in sizes: | |
pointers.append(address) | |
address += size * dtype_size | |
return pointers | |
def write(self, sizes): | |
pointers = self._get_pointers(sizes) | |
self._file.write(struct.pack("<Q", len(sizes))) | |
sizes = np.array(sizes, dtype=np.int32) | |
self._file.write(sizes.tobytes(order="C")) | |
del sizes | |
pointers = np.array(pointers, dtype=np.int64) | |
self._file.write(pointers.tobytes(order="C")) | |
del pointers | |
def __exit__(self, exc_type, exc_val, exc_tb): | |
self._file.close() | |
return _Writer() | |
def __init__(self, path): | |
with open(path, "rb") as stream: | |
magic_test = stream.read(9) | |
assert self._HDR_MAGIC == magic_test, ( | |
"Index file doesn't match expected format. " | |
"Make sure that --dataset-impl is configured properly." | |
) | |
version = struct.unpack("<Q", stream.read(8)) | |
assert (1,) == version | |
(dtype_code,) = struct.unpack("<B", stream.read(1)) | |
self._dtype = _code_to_dtype[dtype_code] | |
self._dtype_size = self._dtype().itemsize | |
self._len = struct.unpack("<Q", stream.read(8))[0] | |
offset = stream.tell() | |
_warmup_mmap_file(path) | |
self._bin_buffer_mmap = np.memmap(path, mode="r", order="C") | |
self._bin_buffer = memoryview(self._bin_buffer_mmap) | |
self._sizes = np.frombuffer( | |
self._bin_buffer, dtype=np.int32, count=self._len, offset=offset | |
) | |
self._pointers = np.frombuffer( | |
self._bin_buffer, | |
dtype=np.int64, | |
count=self._len, | |
offset=offset + self._sizes.nbytes, | |
) | |
def __del__(self): | |
self._bin_buffer_mmap._mmap.close() | |
del self._bin_buffer_mmap | |
def dtype(self): | |
return self._dtype | |
def sizes(self): | |
return self._sizes | |
def __getitem__(self, i): | |
return self._pointers[i], self._sizes[i] | |
def __len__(self): | |
return self._len | |
def __init__(self, path): | |
super().__init__() | |
self._path = None | |
self._index = None | |
self._bin_buffer = None | |
self._do_init(path) | |
def __getstate__(self): | |
return self._path | |
def __setstate__(self, state): | |
self._do_init(state) | |
def _do_init(self, path): | |
self._path = path | |
self._index = self.Index(index_file_path(self._path)) | |
_warmup_mmap_file(data_file_path(self._path)) | |
self._bin_buffer_mmap = np.memmap( | |
data_file_path(self._path), mode="r", order="C" | |
) | |
self._bin_buffer = memoryview(self._bin_buffer_mmap) | |
def __del__(self): | |
self._bin_buffer_mmap._mmap.close() | |
del self._bin_buffer_mmap | |
del self._index | |
def __len__(self): | |
return len(self._index) | |
def __getitem__(self, i): | |
ptr, size = self._index[i] | |
np_array = np.frombuffer( | |
self._bin_buffer, dtype=self._index.dtype, count=size, offset=ptr | |
) | |
if self._index.dtype != np.int64: | |
np_array = np_array.astype(np.int64) | |
return torch.from_numpy(np_array) | |
def sizes(self): | |
return self._index.sizes | |
def supports_prefetch(self): | |
return False | |
def exists(path): | |
return PathManager.exists(index_file_path(path)) and PathManager.exists( | |
data_file_path(path) | |
) | |
def get_indexed_dataset_to_local(path) -> str: | |
local_index_path = PathManager.get_local_path(index_file_path(path)) | |
local_data_path = PathManager.get_local_path(data_file_path(path)) | |
assert local_index_path.endswith(".idx") and local_data_path.endswith(".bin"), ( | |
"PathManager.get_local_path does not return files with expected patterns: " | |
f"{local_index_path} and {local_data_path}" | |
) | |
local_path = local_data_path[:-4] # stripping surfix ".bin" | |
assert local_path == local_index_path[:-4] # stripping surfix ".idx" | |
return local_path | |
class MMapIndexedDatasetBuilder: | |
def __init__(self, out_file, dtype=np.int64): | |
self._data_file = open(out_file, "wb") | |
self._dtype = dtype | |
self._sizes = [] | |
def add_item(self, tensor): | |
np_array = np.array(tensor.numpy(), dtype=self._dtype) | |
self._data_file.write(np_array.tobytes(order="C")) | |
self._sizes.append(np_array.size) | |
def merge_file_(self, another_file): | |
# Concatenate index | |
index = MMapIndexedDataset.Index(index_file_path(another_file)) | |
assert index.dtype == self._dtype | |
for size in index.sizes: | |
self._sizes.append(size) | |
# Concatenate data | |
with open(data_file_path(another_file), "rb") as f: | |
shutil.copyfileobj(f, self._data_file) | |
def finalize(self, index_file): | |
self._data_file.close() | |
with MMapIndexedDataset.Index.writer(index_file, self._dtype) as index: | |
index.write(self._sizes) | |