ResNet50-image-classifier-app / resnet_execute.py
Ubuntu
Added checkpoint and early stopping
41b8141
raw
history blame
4.56 kB
import torch
import torchvision
import torchvision.transforms as transforms
from torch.utils.data import DataLoader
import torch.nn as nn
import torch.optim as optim
from resnet_model import ResNet50
from tqdm import tqdm
from torchvision import datasets
from checkpoint import save_checkpoint, load_checkpoint
# Define transformations
transform = transforms.Compose([
transforms.Resize(256), # Resize the smaller side to 256 pixels while keeping aspect ratio
transforms.CenterCrop(224), # Then crop to 224x224 pixels from the center
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]) # ImageNet normalization
])
# Train dataset and loader
trainset = datasets.ImageFolder(root='/mnt/imagenet/ILSVRC/Data/CLS-LOC/train', transform=transform)
trainloader = DataLoader(trainset, batch_size=128, shuffle=True, num_workers=16, pin_memory=True)
testset = datasets.ImageFolder(root='/mnt/imagenet/ILSVRC/Data/CLS-LOC/val', transform=transform )
testloader = DataLoader(testset, batch_size=1000, shuffle=False, num_workers=16, pin_memory=True)
# Initialize model, loss function, and optimizer
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = ResNet50()
model = torch.nn.DataParallel(model)
model = model.to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=0.01, momentum=0.9, weight_decay=5e-4)
# Training function
from torch.amp import autocast
from tqdm import tqdm
def train(model, device, train_loader, optimizer, criterion, epoch, accumulation_steps=4):
model.train()
running_loss = 0.0
correct = 0
total = 0
pbar = tqdm(train_loader)
for batch_idx, (inputs, targets) in enumerate(pbar):
inputs, targets = inputs.to(device), targets.to(device)
with autocast(device_type='cuda'):
outputs = model(inputs)
loss = criterion(outputs, targets) / accumulation_steps
loss.backward()
if (batch_idx + 1) % accumulation_steps == 0 or (batch_idx + 1) == len(train_loader):
optimizer.step()
optimizer.zero_grad()
running_loss += loss.item() * accumulation_steps
_, predicted = outputs.max(1)
total += targets.size(0)
correct += predicted.eq(targets).sum().item()
pbar.set_description(desc=f'Epoch {epoch} | Loss: {running_loss / (batch_idx + 1):.4f} | Accuracy: {100. * correct / total:.2f}%')
if (batch_idx + 1) % 50 == 0:
torch.cuda.empty_cache()
return 100. * correct / total
# Testing function
def test(model, device, test_loader, criterion):
model.eval()
test_loss = 0
correct = 0
total = 0
with torch.no_grad():
for inputs, targets in test_loader:
inputs, targets = inputs.to(device), targets.to(device)
outputs = model(inputs)
loss = criterion(outputs, targets)
test_loss += loss.item()
_, predicted = outputs.max(1)
total += targets.size(0)
correct += predicted.eq(targets).sum().item()
test_accuracy = 100.*correct/total
print(f'Test Loss: {test_loss/len(test_loader):.4f}, Accuracy: {test_accuracy:.2f}%')
return test_accuracy, test_loss/len(test_loader)
# Main execution
if __name__ == '__main__':
# Early stopping parameters and checkpoint path
checkpoint_path = "checkpoint.pth"
best_loss = float('inf')
patience = 5
patience_counter = 0
# Load checkpoint if it exists to resume training
try:
model, optimizer, best_test_accuracy = load_checkpoint(model, optimizer, checkpoint_path)
except FileNotFoundError:
print("No checkpoint found, starting from scratch.")
for epoch in range(1, 6): # 20 epochs
train_accuracy = train(model, device, trainloader, optimizer, criterion, epoch)
test_accuracy, test_loss = test(model, device, testloader, criterion)
print(f'Epoch {epoch} | Train Accuracy: {train_accuracy:.2f}% | Test Accuracy: {test_accuracy:.2f}%')
if test_loss < best_loss:
best_loss = test_loss
patience_counter = 0
save_checkpoint(model, optimizer, epoch, test_loss, checkpoint_path)
else:
patience_counter += 1
if patience_counter >= patience:
print("Early stopping triggered. Training terminated.")
break