import multiprocessing as mp from datasets import load_dataset, DownloadConfig import backoff import os from pathlib import Path import numpy as np import tiktoken # Function to process individual dataset items def process_data(item): """ Process a single dataset item. Replace this with your actual processing logic (e.g., tokenization). """ # Example: Tokenize text using tiktoken (adjust based on your needs) encoder = tiktoken.get_encoding('gpt2') text = item.get('text', '') # Assuming dataset has a 'text' field tokens = encoder.encode(text) return tokens @backoff.on_exception(backoff.expo, Exception, max_tries=5) def fetch_data(item): """ Wrapper for process_data with exponential backoff for retries. """ return process_data(item) def main(): """ Main function to load and process the FineWeb-Edu dataset. """ # Configuration remote_name = "sample-10BT" # Dataset configuration name output_dir = "./data" # Directory to save processed data os.makedirs(output_dir, exist_ok=True) # Set up download config to handle rate limits and caching download_config = DownloadConfig( max_retries=5, num_proc=4, # Limit to 4 processes to avoid HTTP 429 cache_dir=Path.home() / ".cache" / "huggingface" / "datasets" ) try: # Load dataset with caching print("Loading dataset...") dataset = load_dataset( 'HuggingFaceFW/fineweb-edu', name=remote_name, split='train', download_mode="reuse_dataset_if_exists", download_config=download_config ) print(f"Dataset loaded with {len(dataset)} items.") # Limit number of processes to avoid overwhelming Hugging Face Hub nprocs = min(mp.cpu_count(), 4) print(f"Using {nprocs} processes for multiprocessing.") # Process dataset using multiprocessing with mp.Pool(nprocs) as pool: results = pool.map(fetch_data, dataset) # Save processed results (example: save as numpy arrays) output_path = os.path.join(output_dir, "processed_fineweb_edu.npy") np.save(output_path, results) print(f"Processed dataset saved to {output_path}") except Exception as e: print(f"Error loading or processing dataset: {e}") raise if __name__ == '__main__': mp.freeze_support() # Required for Windows compatibility with executables main()