import torch import torchvision.transforms as transforms from torch.utils.data import DataLoader, random_split import torchvision.models as models import torch.nn as nn import torch.optim as optim from torch.optim.lr_scheduler import ReduceLROnPlateau from torchvision.datasets import ImageFolder import os def main(): dataset_path = "categorized_images" if not os.path.exists(dataset_path): raise FileNotFoundError(f"❌ Dataset folder '{dataset_path}' not found!") # Get class names dynamically from dataset folders class_names = sorted(os.listdir(dataset_path)) num_classes = len(class_names) # Data Augmentation & Normalization train_transform = transforms.Compose([ transforms.RandomResizedCrop(224), transforms.RandomHorizontalFlip(), transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2), transforms.ToTensor(), transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]) ]) val_transform = transforms.Compose([ transforms.Resize(256), transforms.CenterCrop(224), transforms.ToTensor(), transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]) ]) dataset = ImageFolder(root=dataset_path, transform=train_transform) train_size = int(0.8 * len(dataset)) val_size = len(dataset) - train_size train_dataset, val_dataset = random_split(dataset, [train_size, val_size]) train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True, num_workers=4, pin_memory=True) val_loader = DataLoader(val_dataset, batch_size=16, shuffle=False, num_workers=4, pin_memory=True) # Load Pretrained Model model = models.mobilenet_v2(weights=models.MobileNet_V2_Weights.IMAGENET1K_V1) # Freeze all layers except the classifier for param in model.features.parameters(): param.requires_grad = False # Update the classifier for our dataset model.classifier[1] = nn.Linear(1280, num_classes) # Unfreeze last 3 layers to fine-tune for param in model.features[-3:].parameters(): param.requires_grad = True device = torch.device("cuda" if torch.cuda.is_available() else "cpu") model.to(device) criterion = nn.CrossEntropyLoss() optimizer = optim.Adam(model.parameters(), lr=0.0001) scheduler = ReduceLROnPlateau(optimizer, 'min', patience=3, factor=0.1) best_val_loss = float('inf') for epoch in range(30): model.train() train_loss = 0.0 for images, labels in train_loader: images, labels = images.to(device), labels.to(device) optimizer.zero_grad() outputs = model(images) loss = criterion(outputs, labels) loss.backward() optimizer.step() train_loss += loss.item() avg_train_loss = train_loss / len(train_loader) model.eval() val_loss, correct, total = 0.0, 0, 0 with torch.no_grad(): for images, labels in val_loader: images, labels = images.to(device), labels.to(device) outputs = model(images) loss = criterion(outputs, labels) val_loss += loss.item() _, predicted = torch.max(outputs, 1) total += labels.size(0) correct += (predicted == labels).sum().item() avg_val_loss = val_loss / len(val_loader) val_accuracy = 100 * correct / total print(f"📢 Epoch [{epoch+1}/30] → Train Loss: {avg_train_loss:.4f} | Val Loss: {avg_val_loss:.4f} | Val Accuracy: {val_accuracy:.2f}%") scheduler.step(avg_val_loss) if avg_val_loss < best_val_loss: best_val_loss = avg_val_loss torch.save(model.state_dict(), "custom_image_model.pth") print("✅ Best model saved!") print("🎉 Training Complete!") if __name__ == '__main__': main()