Spaces:
Running
Running
from __future__ import division | |
import torch | |
import torch.nn as nn | |
import torch.nn.functional as F | |
from torch.autograd import Variable | |
import numpy as np | |
import cv2 | |
import matplotlib.pyplot as plt | |
try: | |
from util import count_parameters as count | |
from util import convert2cpu as cpu | |
from util import predict_transform | |
except ImportError: | |
from yolo.util import count_parameters as count | |
from yolo.util import convert2cpu as cpu | |
from yolo.util import predict_transform | |
class test_net(nn.Module): | |
def __init__(self, num_layers, input_size): | |
super(test_net, self).__init__() | |
self.num_layers= num_layers | |
self.linear_1 = nn.Linear(input_size, 5) | |
self.middle = nn.ModuleList([nn.Linear(5,5) for x in range(num_layers)]) | |
self.output = nn.Linear(5,2) | |
def forward(self, x): | |
x = x.view(-1) | |
fwd = nn.Sequential(self.linear_1, *self.middle, self.output) | |
return fwd(x) | |
def get_test_input(): | |
img = cv2.imread("dog-cycle-car.png") | |
img = cv2.resize(img, (416,416)) | |
img_ = img[:,:,::-1].transpose((2,0,1)) | |
img_ = img_[np.newaxis,:,:,:]/255.0 | |
img_ = torch.from_numpy(img_).float() | |
img_ = Variable(img_) | |
return img_ | |
def parse_cfg(cfgfile): | |
""" | |
Takes a configuration file | |
Returns a list of blocks. Each blocks describes a block in the neural | |
network to be built. Block is represented as a dictionary in the list | |
""" | |
file = open(cfgfile, 'r') | |
lines = file.read().split('\n') #store the lines in a list | |
lines = [x for x in lines if len(x) > 0] #get read of the empty lines | |
lines = [x for x in lines if x[0] != '#'] | |
lines = [x.rstrip().lstrip() for x in lines] | |
block = {} | |
blocks = [] | |
for line in lines: | |
if line[0] == "[": #This marks the start of a new block | |
if len(block) != 0: | |
blocks.append(block) | |
block = {} | |
block["type"] = line[1:-1].rstrip() | |
else: | |
key,value = line.split("=") | |
block[key.rstrip()] = value.lstrip() | |
blocks.append(block) | |
return blocks | |
# print('\n\n'.join([repr(x) for x in blocks])) | |
import pickle as pkl | |
class MaxPoolStride1(nn.Module): | |
def __init__(self, kernel_size): | |
super(MaxPoolStride1, self).__init__() | |
self.kernel_size = kernel_size | |
self.pad = kernel_size - 1 | |
def forward(self, x): | |
padding = int(self.pad / 2) | |
#padded_x = F.pad(x, (0,self.pad,0,self.pad), mode="replicate") | |
#pooled_x = nn.MaxPool2d(self.kernel_size, self.pad)(padded_x) | |
#padded_x = F.pad(x, (0, self.pad, 0, self.pad), mode="replicate") | |
padded_x = F.pad(x, (padding, padding, padding, padding), mode="constant", value=0) | |
pooled_x = nn.MaxPool2d(self.kernel_size, 1)(padded_x) | |
return pooled_x | |
class EmptyLayer(nn.Module): | |
def __init__(self): | |
super(EmptyLayer, self).__init__() | |
class DetectionLayer(nn.Module): | |
def __init__(self, anchors): | |
super(DetectionLayer, self).__init__() | |
self.anchors = anchors | |
def forward(self, x, inp_dim, num_classes, confidence): | |
x = x.data | |
global CUDA | |
prediction = x | |
prediction = predict_transform(prediction, inp_dim, self.anchors, num_classes, confidence, CUDA) | |
return prediction | |
class Upsample(nn.Module): | |
def __init__(self, stride=2): | |
super(Upsample, self).__init__() | |
self.stride = stride | |
def forward(self, x): | |
stride = self.stride | |
assert(x.data.dim() == 4) | |
B = x.data.size(0) | |
C = x.data.size(1) | |
H = x.data.size(2) | |
W = x.data.size(3) | |
ws = stride | |
hs = stride | |
x = x.view(B, C, H, 1, W, 1).expand(B, C, H, stride, W, stride).contiguous().view(B, C, H*stride, W*stride) | |
return x | |
# | |
class ReOrgLayer(nn.Module): | |
def __init__(self, stride = 2): | |
super(ReOrgLayer, self).__init__() | |
self.stride= stride | |
def forward(self,x): | |
assert(x.data.dim() == 4) | |
B,C,H,W = x.data.shape | |
hs = self.stride | |
ws = self.stride | |
assert(H % hs == 0), "The stride " + str(self.stride) + " is not a proper divisor of height " + str(H) | |
assert(W % ws == 0), "The stride " + str(self.stride) + " is not a proper divisor of height " + str(W) | |
x = x.view(B,C, H // hs, hs, W // ws, ws).transpose(-2,-3).contiguous() | |
x = x.view(B,C, H // hs * W // ws, hs, ws) | |
x = x.view(B,C, H // hs * W // ws, hs*ws).transpose(-1,-2).contiguous() | |
x = x.view(B, C, ws*hs, H // ws, W // ws).transpose(1,2).contiguous() | |
x = x.view(B, C*ws*hs, H // ws, W // ws) | |
return x | |
def create_modules(blocks): | |
net_info = blocks[0] #Captures the information about the input and pre-processing | |
module_list = nn.ModuleList() | |
index = 0 #indexing blocks helps with implementing route layers (skip connections) | |
prev_filters = 3 | |
output_filters = [] | |
for x in blocks: | |
module = nn.Sequential() | |
if (x["type"] == "net"): | |
continue | |
#If it's a convolutional layer | |
if (x["type"] == "convolutional"): | |
#Get the info about the layer | |
activation = x["activation"] | |
try: | |
batch_normalize = int(x["batch_normalize"]) | |
bias = False | |
except: | |
batch_normalize = 0 | |
bias = True | |
filters= int(x["filters"]) | |
padding = int(x["pad"]) | |
kernel_size = int(x["size"]) | |
stride = int(x["stride"]) | |
if padding: | |
pad = (kernel_size - 1) // 2 | |
else: | |
pad = 0 | |
#Add the convolutional layer | |
conv = nn.Conv2d(prev_filters, filters, kernel_size, stride, pad, bias=bias) | |
module.add_module("conv_{0}".format(index), conv) | |
#Add the Batch Norm Layer | |
if batch_normalize: | |
bn = nn.BatchNorm2d(filters) | |
module.add_module("batch_norm_{0}".format(index), bn) | |
#Check the activation. | |
#It is either Linear or a Leaky ReLU for YOLO | |
if activation == "leaky": | |
activn = nn.LeakyReLU(0.1, inplace = True) | |
module.add_module("leaky_{0}".format(index), activn) | |
#If it's an upsampling layer | |
#We use Bilinear2dUpsampling | |
elif (x["type"] == "upsample"): | |
stride = int(x["stride"]) | |
# upsample = Upsample(stride) | |
upsample = nn.Upsample(scale_factor = 2, mode = "nearest") | |
module.add_module("upsample_{}".format(index), upsample) | |
#If it is a route layer | |
elif (x["type"] == "route"): | |
x["layers"] = x["layers"].split(',') | |
#Start of a route | |
start = int(x["layers"][0]) | |
if len(x["layers"]) <= 2: | |
#end, if there exists one. | |
try: | |
end = int(x["layers"][1]) | |
except: | |
end = 0 | |
#Positive anotation | |
if start > 0: | |
start = start - index | |
if end > 0: | |
end = end - index | |
route = EmptyLayer() | |
module.add_module("route_{0}".format(index), route) | |
if end < 0: | |
filters = output_filters[index + start] + output_filters[index + end] | |
else: | |
filters= output_filters[index + start] | |
else: #SPP-route | |
assert len(x["layers"]) == 4 | |
round = EmptyLayer() | |
module.add_module("route_{0}".format(index), route) | |
filters = output_filters[index + start] + output_filters[index + int(x["layers"][1])] \ | |
+ output_filters[index + int(x["layers"][2])] + output_filters[index + int(x["layers"][3])] | |
#shortcut corresponds to skip connection | |
elif x["type"] == "shortcut": | |
from_ = int(x["from"]) | |
shortcut = EmptyLayer() | |
module.add_module("shortcut_{}".format(index), shortcut) | |
elif x["type"] == "maxpool": | |
stride = int(x["stride"]) | |
size = int(x["size"]) | |
if stride != 1: | |
maxpool = nn.MaxPool2d(size, stride) | |
else: | |
maxpool = MaxPoolStride1(size) | |
#maxpool = nn.MaxPool2d(size, stride=1, padding=size-1) | |
module.add_module("maxpool_{}".format(index), maxpool) | |
#Yolo is the detection layer | |
elif x["type"] == "yolo": | |
mask = x["mask"].split(",") | |
mask = [int(x) for x in mask] | |
anchors = x["anchors"].split(",") | |
anchors = [int(a) for a in anchors] | |
anchors = [(anchors[i], anchors[i+1]) for i in range(0, len(anchors),2)] | |
anchors = [anchors[i] for i in mask] | |
detection = DetectionLayer(anchors) | |
module.add_module("Detection_{}".format(index), detection) | |
else: | |
print("Something I dunno") | |
assert False | |
module_list.append(module) | |
prev_filters = filters | |
output_filters.append(filters) | |
index += 1 | |
return (net_info, module_list) | |
class Darknet(nn.Module): | |
def __init__(self, cfgfile): | |
super(Darknet, self).__init__() | |
self.blocks = parse_cfg(cfgfile) | |
self.net_info, self.module_list = create_modules(self.blocks) | |
self.header = torch.IntTensor([0,0,0,0]) | |
self.seen = 0 | |
def get_blocks(self): | |
return self.blocks | |
def get_module_list(self): | |
return self.module_list | |
def forward(self, x, CUDA): | |
detections = [] | |
modules = self.blocks[1:] | |
outputs = {} #We cache the outputs for the route layer | |
write = 0 | |
for i in range(len(modules)): | |
module_type = (modules[i]["type"]) | |
if module_type == "convolutional" or module_type == "upsample" or module_type == "maxpool": | |
x = self.module_list[i](x) | |
outputs[i] = x | |
elif module_type == "route": | |
layers = modules[i]["layers"] | |
layers = [int(a) for a in layers] | |
if (layers[0]) > 0: | |
layers[0] = layers[0] - i | |
if len(layers) == 1: | |
x = outputs[i + (layers[0])] | |
elif len(layers) == 2: | |
if (layers[1]) > 0: | |
layers[1] = layers[1] - i | |
map1 = outputs[i + layers[0]] | |
map2 = outputs[i + layers[1]] | |
x = torch.cat((map1, map2), 1) | |
elif len(layers) == 4: # SPP | |
map1 = outputs[i + layers[0]] | |
map2 = outputs[i + layers[1]] | |
map3 = outputs[i + layers[2]] | |
map4 = outputs[i + layers[3]] | |
x = torch.cat((map1, map2, map3, map4), 1) | |
outputs[i] = x | |
elif module_type == "shortcut": | |
from_ = int(modules[i]["from"]) | |
x = outputs[i-1] + outputs[i+from_] | |
outputs[i] = x | |
elif module_type == 'yolo': | |
anchors = self.module_list[i][0].anchors | |
#Get the input dimensions | |
inp_dim = int (self.net_info["height"]) | |
#Get the number of classes | |
num_classes = int (modules[i]["classes"]) | |
#Output the result | |
x = x.data | |
x = predict_transform(x, inp_dim, anchors, num_classes, CUDA) | |
if type(x) == int: | |
continue | |
if not write: | |
detections = x | |
write = 1 | |
else: | |
detections = torch.cat((detections, x), 1) | |
outputs[i] = outputs[i-1] | |
try: | |
return detections | |
except: | |
return 0 | |
def load_weights(self, weightfile): | |
#Open the weights file | |
fp = open(weightfile, "rb") | |
#The first 4 values are header information | |
# 1. Major version number | |
# 2. Minor Version Number | |
# 3. Subversion number | |
# 4. IMages seen | |
header = np.fromfile(fp, dtype = np.int32, count = 5) | |
self.header = torch.from_numpy(header) | |
self.seen = self.header[3] | |
#The rest of the values are the weights | |
# Let's load them up | |
weights = np.fromfile(fp, dtype = np.float32) | |
ptr = 0 | |
for i in range(len(self.module_list)): | |
module_type = self.blocks[i + 1]["type"] | |
if module_type == "convolutional": | |
model = self.module_list[i] | |
try: | |
batch_normalize = int(self.blocks[i+1]["batch_normalize"]) | |
except: | |
batch_normalize = 0 | |
conv = model[0] | |
if (batch_normalize): | |
bn = model[1] | |
#Get the number of weights of Batch Norm Layer | |
num_bn_biases = bn.bias.numel() | |
#Load the weights | |
bn_biases = torch.from_numpy(weights[ptr:ptr + num_bn_biases]) | |
ptr += num_bn_biases | |
bn_weights = torch.from_numpy(weights[ptr: ptr + num_bn_biases]) | |
ptr += num_bn_biases | |
bn_running_mean = torch.from_numpy(weights[ptr: ptr + num_bn_biases]) | |
ptr += num_bn_biases | |
bn_running_var = torch.from_numpy(weights[ptr: ptr + num_bn_biases]) | |
ptr += num_bn_biases | |
#Cast the loaded weights into dims of model weights. | |
bn_biases = bn_biases.view_as(bn.bias.data) | |
bn_weights = bn_weights.view_as(bn.weight.data) | |
bn_running_mean = bn_running_mean.view_as(bn.running_mean) | |
bn_running_var = bn_running_var.view_as(bn.running_var) | |
#Copy the data to model | |
bn.bias.data.copy_(bn_biases) | |
bn.weight.data.copy_(bn_weights) | |
bn.running_mean.copy_(bn_running_mean) | |
bn.running_var.copy_(bn_running_var) | |
else: | |
#Number of biases | |
num_biases = conv.bias.numel() | |
#Load the weights | |
conv_biases = torch.from_numpy(weights[ptr: ptr + num_biases]) | |
ptr = ptr + num_biases | |
#reshape the loaded weights according to the dims of the model weights | |
conv_biases = conv_biases.view_as(conv.bias.data) | |
#Finally copy the data | |
conv.bias.data.copy_(conv_biases) | |
#Let us load the weights for the Convolutional layers | |
num_weights = conv.weight.numel() | |
#Do the same as above for weights | |
conv_weights = torch.from_numpy(weights[ptr:ptr+num_weights]) | |
ptr = ptr + num_weights | |
conv_weights = conv_weights.view_as(conv.weight.data) | |
conv.weight.data.copy_(conv_weights) | |
def save_weights(self, savedfile, cutoff = 0): | |
if cutoff <= 0: | |
cutoff = len(self.blocks) - 1 | |
fp = open(savedfile, 'wb') | |
# Attach the header at the top of the file | |
self.header[3] = self.seen | |
header = self.header | |
header = header.numpy() | |
header.tofile(fp) | |
# Now, let us save the weights | |
for i in range(len(self.module_list)): | |
module_type = self.blocks[i+1]["type"] | |
if (module_type) == "convolutional": | |
model = self.module_list[i] | |
try: | |
batch_normalize = int(self.blocks[i+1]["batch_normalize"]) | |
except: | |
batch_normalize = 0 | |
conv = model[0] | |
if (batch_normalize): | |
bn = model[1] | |
#If the parameters are on GPU, convert them back to CPU | |
#We don't convert the parameter to GPU | |
#Instead. we copy the parameter and then convert it to CPU | |
#This is done as weight are need to be saved during training | |
cpu(bn.bias.data).numpy().tofile(fp) | |
cpu(bn.weight.data).numpy().tofile(fp) | |
cpu(bn.running_mean).numpy().tofile(fp) | |
cpu(bn.running_var).numpy().tofile(fp) | |
else: | |
cpu(conv.bias.data).numpy().tofile(fp) | |
#Let us save the weights for the Convolutional layers | |
cpu(conv.weight.data).numpy().tofile(fp) | |
# | |
#dn = Darknet('cfg/yolov3.cfg') | |
#dn.load_weights("yolov3.weights") | |
#inp = get_test_input() | |
#a, interms = dn(inp) | |
#dn.eval() | |
#a_i, interms_i = dn(inp) | |