|
import torch |
|
from safetensors.torch import load_file, save_file |
|
import logging |
|
from typing import Dict, List, Optional |
|
import time |
|
from pathlib import Path |
|
import sys |
|
|
|
|
|
logging.basicConfig( |
|
level=logging.INFO, |
|
format="%(asctime)s - %(levelname)s - %(message)s", |
|
handlers=[ |
|
logging.StreamHandler(sys.stdout), |
|
logging.FileHandler("model_operations.log") |
|
] |
|
) |
|
|
|
class ModelHandler: |
|
"""Class to handle model operations with improved efficiency.""" |
|
|
|
DEFAULT_CHECKPOINT = Path("Model_4_of_10.safetensors") |
|
|
|
def __init__(self, checkpoint_path: str | Path = DEFAULT_CHECKPOINT): |
|
self.checkpoint_path = Path(checkpoint_path) |
|
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
|
|
|
def _log_time(self, operation: str, start_time: float) -> None: |
|
"""Helper method for consistent timing logging.""" |
|
elapsed = time.time() - start_time |
|
logging.info(f"{operation} completed in {elapsed:.2f} seconds") |
|
|
|
def load_model(self) -> Dict[str, torch.Tensor]: |
|
"""Loads model with memory-efficient handling.""" |
|
start_time = time.time() |
|
try: |
|
logging.info(f"Loading model from {self.checkpoint_path}") |
|
|
|
model_data = load_file(str(self.checkpoint_path), device="cpu") |
|
for key in model_data: |
|
model_data[key] = model_data[key].to(self.device) |
|
self._log_time("Model loading", start_time) |
|
return model_data |
|
except Exception as e: |
|
logging.error(f"Model loading failed: {str(e)}") |
|
raise RuntimeError(f"Failed to load model: {str(e)}") from e |
|
|
|
def save_model(self, model_tensors: Dict[str, torch.Tensor]) -> None: |
|
"""Saves model with validation and error handling.""" |
|
start_time = time.time() |
|
try: |
|
logging.info(f"Saving model to {self.checkpoint_path}") |
|
self.checkpoint_path.parent.mkdir(parents=True, exist_ok=True) |
|
save_file(model_tensors, str(self.checkpoint_path)) |
|
self._log_time("Model saving", start_time) |
|
except Exception as e: |
|
logging.error(f"Model saving failed: {str(e)}") |
|
raise RuntimeError(f"Failed to save model: {str(e)}") from e |
|
|
|
def initialize_model( |
|
self, |
|
layers: List[int] = [8192, 16384, 32768], |
|
dtype: torch.dtype = torch.bfloat16, |
|
seed: Optional[int] = 42 |
|
) -> Dict[str, torch.Tensor]: |
|
"""Initializes model with optimized parameters.""" |
|
if seed is not None: |
|
torch.manual_seed(seed) |
|
|
|
model_tensors = {} |
|
start_time = time.time() |
|
try: |
|
for i, size in enumerate(layers, 1): |
|
layer_name = f"layer_{i}" |
|
logging.info(f"Initializing {layer_name} [{size}x{size}] on {self.device}") |
|
|
|
tensor = torch.randn(size, size, dtype=dtype, device=self.device) * (1.0 / size ** 0.5) |
|
model_tensors[layer_name] = tensor |
|
self._log_time("Model initialization", start_time) |
|
return model_tensors |
|
except Exception as e: |
|
logging.error(f"Model initialization failed: {str(e)}") |
|
raise RuntimeError(f"Failed to initialize model: {str(e)}") from e |
|
|
|
def verify_model( |
|
self, |
|
original: Dict[str, torch.Tensor], |
|
loaded: Dict[str, torch.Tensor], |
|
atol: float = 1e-5, |
|
rtol: float = 1e-3 |
|
) -> bool: |
|
"""Verifies model integrity with detailed comparison.""" |
|
all_match = True |
|
for key in original: |
|
if key not in loaded: |
|
logging.warning(f"Missing tensor: {key}") |
|
all_match = False |
|
continue |
|
|
|
orig, load = original[key], loaded[key] |
|
try: |
|
if orig.shape != load.shape: |
|
logging.warning(f"Shape mismatch in {key}: {orig.shape} vs {load.shape}") |
|
all_match = False |
|
continue |
|
|
|
if not torch.allclose(orig, load, atol=atol, rtol=rtol): |
|
diff = torch.max(torch.abs(orig - load)) |
|
logging.warning(f"Mismatch in {key}: max diff = {diff}") |
|
all_match = False |
|
else: |
|
logging.info(f"Tensor {key} verified (shape: {orig.shape})") |
|
except Exception as e: |
|
logging.error(f"Verification failed for {key}: {str(e)}") |
|
all_match = False |
|
return all_match |
|
|
|
def main(): |
|
"""Main execution flow.""" |
|
try: |
|
|
|
handler = ModelHandler() |
|
|
|
|
|
model_data = handler.initialize_model() |
|
handler.save_model(model_data) |
|
|
|
|
|
loaded_model_data = handler.load_model() |
|
is_valid = handler.verify_model(model_data, loaded_model_data) |
|
|
|
logging.info(f"Model verification {'passed' if is_valid else 'failed'}") |
|
return 0 |
|
|
|
except Exception as e: |
|
logging.error(f"Execution failed: {str(e)}") |
|
return 1 |
|
|
|
if __name__ == "__main__": |
|
sys.exit(main()) |