File size: 5,753 Bytes
bd62227 171e2fc bd62227 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 |
from typing import Optional
import os
import csv
import json
import numpy as np
from argdantic import ArgParser
from pydantic import BaseModel
from tqdm import tqdm
from huggingface_hub import hf_hub_download
from common import PuzzleDatasetMetadata
cli = ArgParser()
class DataProcessConfig(BaseModel):
source_repo: str = "sapientinc/sudoku-extreme"
output_dir: str = "data/sudoku-extreme-full"
subsample_size: Optional[int] = None
min_difficulty: Optional[int] = None
num_aug: int = 0
def shuffle_sudoku(board: np.ndarray, solution: np.ndarray):
# Create a random digit mapping: a permutation of 1..9, with zero (blank) unchanged
digit_map = np.pad(np.random.permutation(np.arange(1, 10)), (1, 0))
# Randomly decide whether to transpose.
transpose_flag = np.random.rand() < 0.5
# Generate a valid row permutation:
# - Shuffle the 3 bands (each band = 3 rows) and for each band, shuffle its 3 rows.
bands = np.random.permutation(3)
row_perm = np.concatenate([b * 3 + np.random.permutation(3) for b in bands])
# Similarly for columns (stacks).
stacks = np.random.permutation(3)
col_perm = np.concatenate([s * 3 + np.random.permutation(3) for s in stacks])
# Build an 81->81 mapping. For each new cell at (i, j)
# (row index = i // 9, col index = i % 9),
# its value comes from old row = row_perm[i//9] and old col = col_perm[i%9].
mapping = np.array([row_perm[i // 9] * 9 + col_perm[i % 9] for i in range(81)])
def apply_transformation(x: np.ndarray) -> np.ndarray:
# Apply transpose flag
if transpose_flag:
x = x.T
# Apply the position mapping.
new_board = x.flatten()[mapping].reshape(9, 9).copy()
# Apply digit mapping
return digit_map[new_board]
return apply_transformation(board), apply_transformation(solution)
def convert_subset(set_name: str, config: DataProcessConfig):
# Read CSV
inputs = []
labels = []
with open(hf_hub_download(config.source_repo, f"{set_name}.csv", repo_type="dataset"), newline="") as csvfile:
reader = csv.reader(csvfile)
next(reader) # Skip header
for source, q, a, rating in reader:
if (config.min_difficulty is None) or (int(rating) >= config.min_difficulty):
assert len(q) == 81 and len(a) == 81
inputs.append(np.frombuffer(q.replace('.', '0').encode(), dtype=np.uint8).reshape(9, 9) - ord('0'))
labels.append(np.frombuffer(a.encode(), dtype=np.uint8).reshape(9, 9) - ord('0'))
# If subsample_size is specified for the training set,
# randomly sample the desired number of examples.
if set_name == "train" and config.subsample_size is not None:
total_samples = len(inputs)
if config.subsample_size < total_samples:
indices = np.random.choice(total_samples, size=config.subsample_size, replace=False)
inputs = [inputs[i] for i in indices]
labels = [labels[i] for i in indices]
# Generate dataset
num_augments = config.num_aug if set_name == "train" else 0
results = {k: [] for k in ["inputs", "labels", "puzzle_identifiers", "puzzle_indices", "group_indices"]}
puzzle_id = 0
example_id = 0
results["puzzle_indices"].append(0)
results["group_indices"].append(0)
for orig_inp, orig_out in zip(tqdm(inputs), labels):
for aug_idx in range(1 + num_augments):
# First index is not augmented
if aug_idx == 0:
inp, out = orig_inp, orig_out
else:
inp, out = shuffle_sudoku(orig_inp, orig_out)
# Push puzzle (only single example)
results["inputs"].append(inp)
results["labels"].append(out)
example_id += 1
puzzle_id += 1
results["puzzle_indices"].append(example_id)
results["puzzle_identifiers"].append(0)
# Push group
results["group_indices"].append(puzzle_id)
# To Numpy
def _seq_to_numpy(seq):
arr = np.concatenate(seq).reshape(len(seq), -1)
assert np.all((arr >= 0) & (arr <= 9))
return arr + 1
results = {
"inputs": _seq_to_numpy(results["inputs"]),
"labels": _seq_to_numpy(results["labels"]),
"group_indices": np.array(results["group_indices"], dtype=np.int32),
"puzzle_indices": np.array(results["puzzle_indices"], dtype=np.int32),
"puzzle_identifiers": np.array(results["puzzle_identifiers"], dtype=np.int32),
}
# Metadata
metadata = PuzzleDatasetMetadata(
seq_len=81,
vocab_size=10 + 1, # PAD + "0" ... "9"
pad_id=0,
ignore_label_id=0,
blank_identifier_id=0,
num_puzzle_identifiers=1,
total_groups=len(results["group_indices"]) - 1,
mean_puzzle_examples=1,
sets=["all"]
)
# Save metadata as JSON.
save_dir = os.path.join(config.output_dir, set_name)
os.makedirs(save_dir, exist_ok=True)
with open(os.path.join(save_dir, "dataset.json"), "w") as f:
json.dump(metadata.model_dump(), f)
# Save data
for k, v in results.items():
np.save(os.path.join(save_dir, f"all__{k}.npy"), v)
# Save IDs mapping (for visualization only)
with open(os.path.join(config.output_dir, "identifiers.json"), "w") as f:
json.dump(["<blank>"], f)
@cli.command(singleton=True)
def preprocess_data(config: DataProcessConfig):
convert_subset("train", config)
convert_subset("test", config)
if __name__ == "__main__":
cli()
|