enhanced_transfer_learning / experiment /transfer_learning_train&test.py
Andrzej Daniel Dobrzycki
First version
89b1f42
import json
import gc
import psutil
import yaml
import wget
import json
import os, sys
from glob import glob
from yolo_cam.eigen_cam import EigenCAM
from yolo_cam.utils.image import show_cam_on_image
import torch
from torchvision.utils import make_grid
import wandb
import cv2
import numpy as np
import argparse
def parse_args():
parser = argparse.ArgumentParser(description="Transfer learning script.")
parser.add_argument("--dataset", type=str, required=True, help='Dataset name to be used')
parser.add_argument("--epochs", type=int, default=1000, help="Number of epochs")
parser.add_argument("--batch", type=int, default=16, help="Batch size")
parser.add_argument("--imgsz", type=int, default=640, help="Image size")
parser.add_argument("--patience", type=int, default=30, help="Patience for early stopping")
parser.add_argument("--cache", type=str, default="ram", help="Cache option")
parser.add_argument("--pretrained", type=bool, default=False, help="Use pretrained weights")
parser.add_argument("--cos_lr", type=bool, default=False, help="Use cosine learning rate")
parser.add_argument("--profile", type=bool, default=False, help="Profile the training")
parser.add_argument("--plots", type=bool, default=True, help="Generate plots")
parser.add_argument("--resume", type=bool, default=False, help="Resume run")
parser.add_argument("--augment", type=bool, default=False, help="Apply augmentation techniques during training")
parser.add_argument("--model", type=str, required=True, help="Model name")
parser.add_argument("--run", type=str, required=True, help="Run mode")
return parser.parse_args()
args = parse_args()
dict_to_freeze = {"Finetuning": 0,
"freeze_[P1-P3]": 4,
"freeze_Backbone": 9,
"freeze_[P1-23]": 23
}
layers_to_freeze = dict_to_freeze[args.run]
##--------------------------------------------/USER DEFINE
mem_torch = 0
RAM_used = 0
RAM_available = 0
grad_norm = []
cam_images = {}
first_batch_paths = None
args_model = {
"epochs": args.epochs, "batch": args.batch, "imgsz": args.imgsz,
"patience": args.patience, "cache": args.cache, "pretrained": args.pretrained,
"cos_lr": args.cos_lr, "profile": args.profile, "plots": args.plots}
def freeze_layer(trainer):
model = trainer.model
num_freeze = layers_to_freeze
print(f"Freezing {num_freeze} layers")
if num_freeze:
freeze = [f'model.{x}.' for x in range(num_freeze)] # layers to freeze
for k, v in model.named_parameters():
v.requires_grad = True # train all layers
if any(x in k for x in freeze):
print(f'freezing {k}')
v.requires_grad = False # Non trainable layer
print(f"{num_freeze} layers are freezed.")
model.info(detailed=True)
def get_gpu_usage(param):
global mem_torch
global RAM_used
global RAM_available
mem_torch = float(torch.cuda.memory_reserved() / 1E6 if torch.cuda.is_available() else 0) #(MB)
RAM_used = float(psutil.virtual_memory().used / 1e9)
RAM_available = float(psutil.virtual_memory().available / 1e9)
def compute_gradients_L2_norm(trainer):
model = trainer.model
global grad_norm
temp_grad = 0.0
for _, params in model.named_parameters():
if params.grad is not None:
temp_grad += params.grad.data.norm(2).item() ** 2
grad_norm.append(float(temp_grad ** 0.5))
def save_val_images_paths(trainer):
global first_batch_paths
first_batch_paths = next(iter(trainer.validator.dataloader))['im_file'][:args.batch]
def compute_grad_CAM(trainer):
global global_step
global cam_images
global first_batch_paths
if ((global_step in [1, 2, 3, 4, 5] or global_step %10 == 0) and global_step <= args.epochs):
try:
model_copy = YOLO(f"./experiment/runs/{args.dataset}/{args.model}/train_{run_name}/weights/last.pt")
except FileNotFoundError:
if(global_step!=0):
print(f"No such file or directory: /experiment/runs/{args.dataset}/{args.model}/train_{run_name}/weights/last.pt")
global_step += 1
return
iterator = [("C2fCIB [-2]", -2),("Conv [-4]", -4), ("SPPF", 9), ("PSA", 10)] if "v10" in args.model else [("Conv [-2]", -2),("Conv [-4]", -4), ("SPPF", 9)]
json_preds_gradCAM = {layer[0] : None for layer in iterator}
for layer_name, layer_index in iterator:
cam_images[layer_name] = []
target_layers = [model_copy.model.model[layer_index]]
for path_to_image in first_batch_paths:
image_name = path_to_image.split("/")[-1].split(".")[0]
os.makedirs(f"./experiment/runs/{args.dataset}/{args.model}/train_{run_name}/outs/gradCAMs_inferences/images/{image_name}/{layer_name}", exist_ok=True)
img = cv2.cvtColor(cv2.imread(path_to_image), cv2.COLOR_BGR2RGB)
#img = cv2.resize(img, (640, 640))
rgb_img = img.copy()
img = np.float32(img) / 255
cam = EigenCAM(model_copy, target_layers, task='od')
grayscale_cam = cam(rgb_img)[0, :, :]
temp_cam_image = show_cam_on_image(img, grayscale_cam, use_rgb=True)
cam = EigenCAM(model_copy, target_layers, task='od')
grayscale_cam = cam(rgb_img)[0, :, :]
temp_cam_image = show_cam_on_image(img, grayscale_cam, use_rgb=True)
if (not json_preds_gradCAM[layer_name]):
json_preds_gradCAM[layer_name] = []
boxes = model_copy.predictor.results[0].boxes.xyxy.detach().cpu().numpy() # Get boxes as numpy array
confs = model_copy.predictor.results[0].boxes.conf.detach().cpu().numpy() # Get boxes as numpy array
cam_image_annotated = temp_cam_image.copy()
cv2.imwrite(f"./experiment/runs/{args.dataset}/{args.model}/train_{run_name}/outs/gradCAMs_inferences/images/{image_name}/{layer_name}/{image_name}_gs_{global_step}.jpg", cv2.cvtColor(temp_cam_image, cv2.COLOR_BGR2RGB))
for i, box in enumerate(boxes):
cam_image_annotated = cv2.rectangle(cam_image_annotated,
(int(box[0]), int(box[1])),
(int(box[2]), int(box[3])),
(0, 255, 0), 2)
# Prepare confidence text
conf_text = f"{confs[i]:.2f}"
# Choose the position for the text (top-left corner of the rectangle)
text_position = (int(box[0]), int(box[1]) - 5) # Slightly above the top-left corner
# Get text size
(text_width, text_height), baseline = cv2.getTextSize(conf_text,
cv2.FONT_HERSHEY_SIMPLEX,
0.5,
1)
# Draw a filled rectangle as background for the text
background_tl = (text_position[0], text_position[1] - text_height - baseline)
background_br = (text_position[0] + text_width, text_position[1] + baseline)
cam_image_annotated = cv2.rectangle(cam_image_annotated,
background_tl,
background_br,
(0, 255, 0),
cv2.FILLED)
# Add text to the image
cam_image_annotated = cv2.putText(cam_image_annotated,
conf_text,
text_position,
cv2.FONT_HERSHEY_SIMPLEX,
0.5, # Font scale
(0, 0, 0), # White color
1, # Thickness
cv2.LINE_AA) # Anti-aliased line
json_preds_gradCAM[layer_name].append({"image_name": image_name,
"cls" : model_copy.predictor.results[0].boxes.cls.detach().clone().tolist(),
"conf" : model_copy.predictor.results[0].boxes.conf.detach().clone().tolist(),
"boxes(xywhn)" : model_copy.predictor.results[0].boxes.xywhn.detach().clone().tolist(),
"orig_shape": model_copy.predictor.results[0].boxes.orig_shape})
cam_images[layer_name].append(torch.from_numpy(cv2.resize(temp_cam_image, (640,640))).permute(2, 0, 1))
cv2.imwrite(f"./experiment/runs/{args.dataset}/{args.model}/train_{run_name}/outs/gradCAMs_inferences/images/{image_name}/{layer_name}/{image_name}_annotated_gs_{global_step}.jpg",
cv2.cvtColor(cam_image_annotated, cv2.COLOR_BGR2RGB))
model_copy = None
cam = None
with open(f'./experiment/runs/{args.dataset}/{args.model}/train_{run_name}/outs/gradCAMs_inferences/gradCAMS_inference_step_{global_step}.json', 'w') as f:
# Serializing json
json_object = json.dumps(json_preds_gradCAM, indent=4)
f.write(json_object)
gc.collect()
torch.cuda.empty_cache()
global_step += 1
def log_data(trainer):
global grad_norm
global mem_torch
global RAM_used
global RAM_available
global global_step
global cam_images
grad_norm = np.array(grad_norm)
grad_norm[grad_norm == 0] = np.nan
grad_norm = np.nanmean(grad_norm, axis=0)
if(cam_images and global_step <= args.epochs):
if "v10" in args.model:
cam_images_C2fCIB = torch.stack(cam_images["C2fCIB [-2]"])
cam_images_PSA = torch.stack(cam_images["PSA"])
cam_images_SPPF = torch.stack(cam_images["SPPF"])
cam_images_Conv = torch.stack(cam_images["Conv [-4]"])
grid_C2fCIB = wandb.Image(make_grid(cam_images_C2fCIB, nrow=int(args.batch/4)).float(), caption="C2fCIB layer")
grid_PSA = wandb.Image(make_grid(cam_images_PSA, nrow=int(args.batch/4)).float(), caption="PSA layer")
grid_SPPF = wandb.Image(make_grid(cam_images_SPPF, nrow=int(args.batch/4)).float(), caption="SPPF layer")
grid_Conv = wandb.Image(make_grid(cam_images_Conv, nrow=int(args.batch/4)).float(), caption="Conv [-2] layer")
wandb.log({"Gradients/L2 Gradients Norm": grad_norm,
"GPU/GPU usage Ultralytics (MB)": mem_torch,
"Memory/Memory used (GB):": RAM_used,
"Memory/Memory available (GB):": RAM_available,
"GradCAM/C2fCIB": grid_C2fCIB,
"GradCAM/SPPF": grid_SPPF,
"GradCAM/PSA": grid_PSA,
"GradCAM/Conv": grid_Conv,
}
)
elif "v8" in args.model:
cam_images_Conv2 = torch.stack(cam_images["Conv [-2]"])
cam_images_Conv4 = torch.stack(cam_images["Conv [-4]"])
cam_images_SPPF = torch.stack(cam_images["SPPF"])
grid_Conv2 = wandb.Image(make_grid(cam_images_Conv2, nrow=int(args.batch/4)).float(), caption="Conv [-2] layer")
grid_Conv4 = wandb.Image(make_grid(cam_images_Conv4, nrow=int(args.batch/4)).float(), caption="Conv [-4] layer")
grid_SPPF = wandb.Image(make_grid(cam_images_SPPF, nrow=int(args.batch/4)).float(), caption="SPPF layer")
wandb.log({"Gradients/L2 Gradients Norm": grad_norm,
"GPU/GPU usage Ultralytics (MB)": mem_torch,
"Memory/Memory used (GB):": RAM_used,
"Memory/Memory available (GB):": RAM_available,
"GradCAM/Conv2": grid_Conv2,
"GradCAM/Conv4": grid_Conv4,
"GradCAM/SPPF": grid_SPPF,
}
)
elif(global_step <= args.epochs):
wandb.log({"Gradients/L2 Gradients Norm": grad_norm,
"GPU/GPU usage Ultralytics (MB)": mem_torch,
"Memory/Memory used (GB):": RAM_used,
"Memory/Memory available (GB):": RAM_available
}
)
cam_images = {}
grad_norm = []
mem_torch = None
RAM_used = None
RAM_available = None
ultralytics_augmentation_args_disabled = {
"hsv_h": 0.0,
"hsv_s": 0.0,
"hsv_v": 0.0,
"degrees": 0.0,
"translate": 0.0,
"scale": 0.0, # Setting scale to 0.0 keeps the original size
"shear": 0.0,
"perspective": 0.0,
"flipud": 0.0,
"fliplr": 0.0,
"mosaic": 0.0,
"mixup": 0.0,
"copy_paste": 0.0,
"augment": False, # Setting to 'none' disables auto augmentation
}
os.makedirs("./experiment/pretrained_weights", exist_ok=True)
pretrained_weights_list = [weights_path.split("/")[-1][:-3]for weights_path in glob("./experiment/pretrained_weights/*.pt")]
if args.model not in pretrained_weights_list:
if "v10" in args.model:
wget.download(f"https://github.com/THU-MIG/yolov10/releases/download/v1.1/{args.model}.pt", out="./experiment/pretrained_weights")
elif "v8" in args.model:
wget.download(f"https://github.com/ultralytics/assets/releases/download/v8.2.0/{args.model}.pt", out="./experiment/pretrained_weights")
global global_step
global_step = 0
gc.collect()
torch.cuda.empty_cache()
wandb.login(key="c5c2f8d387804338825114c4133a31016c9ebf87")
api = wandb.Api()
# Step 1: Initialize a Weights & Biases run
run_name = args.model.split('yolo')[-1]+"_"+args.run
wandb.init(project=f"transfer_learning_{args.dataset}",
dir="./experiment",
name=run_name, # run name
job_type="training",
notes=f"Finetuning of {args.model} model on {args.dataset} dataset",
tags=["object detection", "FaRADAI", "AI4TES", "Finetuning", args.dataset],
resume="allow")
username = wandb.run.entity
project = wandb.run.project
run_id = wandb.run.id
if("From_Scratch" in run_name):
flag_pretrained = False
args.pretrained = flag_pretrained
else:
flag_pretrained = True
args.pretrained = flag_pretrained
# Step 2: Define the YOLO Model
if "v10" in args.model:
from ultralytics import YOLOv10 as YOLO
elif "v8" in args.model:
from ultralytics import YOLO
if(not args.pretrained):
model = YOLO(f"{args.model}.yaml") #Necessary to doesn't initialize pretrained weights
elif args.resume:
model = YOLO(f"./experiment/runs/{args.dataset}/{args.model}/train_{run_name}/weights/last.pt")
else:
model = YOLO(f'./experiment/pretrained_weights/{args.model}.pt')
# Step 3: Add trainer & validator callbacks
model.add_callback("on_train_start", freeze_layer)
model.add_callback("on_train_start", save_val_images_paths)
model.add_callback("on_batch_end", compute_gradients_L2_norm) #After scheduler step
model.add_callback("on_train_epoch_end", get_gpu_usage)
model.add_callback("on_train_epoch_end", compute_grad_CAM)
model.add_callback("on_train_epoch_end", log_data)
if(not args.augment):
args_model.update(ultralytics_augmentation_args_disabled)
if args.resume:
try:
model.train(model=f"./experiment/runs/{args.dataset}/{args.model}/train_{run_name}/weights/last.pt", resume=True)
except:
print("Exception catched, proceeding with validation:")
print(model.info(detailed=True))
pass
else:
model.train(data=f"./datasets/{args.dataset}/dataset.yaml", project=f"./experiment/runs/{args.dataset}/{args.model}", name=f"train_{run_name}", **args_model)
model.data = None
model = None # Necessary to Free RAM
gc.collect()
torch.cuda.empty_cache()
with open(f'./experiment/runs/{args.dataset}/{args.model}/train_{run_name}/args.yaml', 'r') as config_file:
config = yaml.safe_load(config_file)
config['model'] = args.model
if (layers_to_freeze!=0):
config["freeze"] = {"number_of_layers": layers_to_freeze,
"layers": run_name.split("_")[1]}
else:
config["freeze"] = {"number_of_layers": layers_to_freeze,
"layers": "None"}
wandb.init(entity=username,dir="./experiment", project=project, id=run_id, resume="must")
wandb.config.update(config)
model = YOLO(f"./experiment/runs/{args.dataset}/{args.model}/train_{run_name}/weights/best.pt")
metrics = model.val(data=f"../datasets/{args.dataset}/dataset.yaml",
imgsz=args.imgsz,
batch=args.batch,
save_json=True,
save_txt=True,
split='test',
plots=True,
conf=0.5,
iou=0.7,
project=f"./experiment/runs/{args.dataset}/{args.model}",
name=f"test_{run_name}"
)
with open(f"./experiment/runs/{args.dataset}/{args.model}/test_{run_name}/metrics.json", "w") as f:
f.write(json.dumps(metrics.results_dict, indent=4))
wandb.log( {"test/precision(B)": metrics.results_dict["metrics/precision(B)"],
"test/recall(B)": metrics.results_dict["metrics/recall(B)"],
"test/mAP50(B)": metrics.results_dict["metrics/mAP50(B)"],
"test/mAP50-95(B)": metrics.results_dict["metrics/mAP50-95(B)"],
"test/fitness": metrics.results_dict["fitness"]
}
)
# Step 7: Finalize the W&B Run
wandb.finish()
model.data = None
model = None # Necessary to Free RAM
gc.collect()
torch.cuda.empty_cache()