AsymmNet
本文复现了CVPR2021 :AsymmNet网络,该网络轻量化鲁棒性好,可以使分类、检测等多任务涨点,性能优于MobileNet系列网络。
·
① 项目背景
- AsymmNet的思路来自于德国的Hasso_Plattner_Institute以及阿里云/字节跳动等三家团队的贡献。论文中发现使用深度可分离卷积的系列模型,它的主要计算量都集中在了(两个)PointWise卷积上,因此基于Invered_residual_block的设计理念,也就是说把第一个Pointwise卷积用来扩充通道数,来提升信息流规模,就是团队做的一个共识的分析;
- 作者团队认为第一个Pointwise卷积主要是用来扩充通道,第二个Depthwise卷积用来学习特征的空间相关性,然后接下来的Pointwise卷积是用来学习通道相关性,这就像是从Inception_BlockXception一条通路传承下来的工程共识,虽然它没有被理论证明过的。
- 所以,这里就有一个设想,如果能够把第一个Pointwise卷积(就是用来做通道扩容、信息扩容这件事的卷积),把它的一部分的features直接用搬运的方式、用Copy的方式来做,省下来的计算量把它迁移到主要任务是在学习特征的Depthwise卷积和第二个Pointwise卷积上面,把它转移过去,这样可以提升这个Block的特征学习能力和表达能力?
基于这个的想法,通过简单的改变和尝试,作者发现它的实验结果是在220M FLOPs这样的计算区间和低于220M的那种超轻量CNN模型的计算区间上面它的性能提升是极为明显而有效的,且是可以优于Invered_residual_block原始设计的。
论文地址:https://arxiv.org/pdf/2104.07770.pdf
② 数据准备
2.1 解压缩数据集
我们将网上获取的数据集以压缩包的方式上传到aistudio数据集中,并加载到我们的项目内。
在使用之前我们进行数据集压缩包的一个解压。
!unzip -oq /home/aistudio/data/data69664/Images.zip -d work/dataset
import paddle
import numpy as np
from typing import Callable
#参数配置
config_parameters = {
"class_dim": 16, #分类数
"target_path":"/home/aistudio/work/",
'train_image_dir': '/home/aistudio/work/trainImages',
'eval_image_dir': '/home/aistudio/work/evalImages',
'epochs':100,
'batch_size': 32,
'lr': 0.01
}
2.2 划分数据集
接下来我们使用标注好的文件进行数据集类的定义,方便后续模型训练使用。
import os
import shutil
train_dir = config_parameters['train_image_dir']
eval_dir = config_parameters['eval_image_dir']
paths = os.listdir('work/dataset/Images')
if not os.path.exists(train_dir):
os.mkdir(train_dir)
if not os.path.exists(eval_dir):
os.mkdir(eval_dir)
for path in paths:
imgs_dir = os.listdir(os.path.join('work/dataset/Images', path))
target_train_dir = os.path.join(train_dir,path)
target_eval_dir = os.path.join(eval_dir,path)
if not os.path.exists(target_train_dir):
os.mkdir(target_train_dir)
if not os.path.exists(target_eval_dir):
os.mkdir(target_eval_dir)
for i in range(len(imgs_dir)):
if ' ' in imgs_dir[i]:
new_name = imgs_dir[i].replace(' ', '_')
else:
new_name = imgs_dir[i]
target_train_path = os.path.join(target_train_dir, new_name)
target_eval_path = os.path.join(target_eval_dir, new_name)
if i % 5 == 0:
shutil.copyfile(os.path.join(os.path.join('work/dataset/Images', path), imgs_dir[i]), target_eval_path)
else:
shutil.copyfile(os.path.join(os.path.join('work/dataset/Images', path), imgs_dir[i]), target_train_path)
print('finished train val split!')
finished train val split!
2.3 数据集定义与数据集展示
2.3.1 数据集展示
我们先看一下解压缩后的数据集长成什么样子,对比分析经典模型在Caltech101抽取16类mini版数据集上的效果
import os
import random
from matplotlib import pyplot as plt
from PIL import Image
imgs = []
paths = os.listdir('work/dataset/Images')
for path in paths:
img_path = os.path.join('work/dataset/Images', path)
if os.path.isdir(img_path):
img_paths = os.listdir(img_path)
img = Image.open(os.path.join(img_path, random.choice(img_paths)))
imgs.append((img, path))
f, ax = plt.subplots(4, 4, figsize=(12,12))
for i, img in enumerate(imgs[:16]):
ax[i//4, i%4].imshow(img[0])
ax[i//4, i%4].axis('off')
ax[i//4, i%4].set_title('label: %s' % img[1])
plt.show()
2.3.2 导入数据集的定义实现
#数据集的定义
class Dataset(paddle.io.Dataset):
"""
步骤一:继承paddle.io.Dataset类
"""
def __init__(self, transforms: Callable, mode: str ='train'):
"""
步骤二:实现构造函数,定义数据读取方式
"""
super(Dataset, self).__init__()
self.mode = mode
self.transforms = transforms
train_image_dir = config_parameters['train_image_dir']
eval_image_dir = config_parameters['eval_image_dir']
train_data_folder = paddle.vision.DatasetFolder(train_image_dir)
eval_data_folder = paddle.vision.DatasetFolder(eval_image_dir)
if self.mode == 'train':
self.data = train_data_folder
elif self.mode == 'eval':
self.data = eval_data_folder
def __getitem__(self, index):
"""
步骤三:实现__getitem__方法,定义指定index时如何获取数据,并返回单条数据(训练数据,对应的标签)
"""
data = np.array(self.data[index][0]).astype('float32')
data = self.transforms(data)
label = np.array([self.data[index][1]]).astype('int64')
return data, label
def __len__(self):
"""
步骤四:实现__len__方法,返回数据集总数目
"""
return len(self.data)
from paddle.vision import transforms as T
#数据增强
transform_train =T.Compose([T.Resize((256,256)),
#T.RandomVerticalFlip(10),
#T.RandomHorizontalFlip(10),
T.RandomRotation(10),
T.Transpose(),
T.Normalize(mean=[0, 0, 0], # 像素值归一化
std =[255, 255, 255]), # transforms.ToTensor(), # transpose操作 + (img / 255),并且数据结构变为PaddleTensor
T.Normalize(mean=[0.50950350, 0.54632660, 0.57409690],# 减均值 除标准差
std= [0.26059777, 0.26041326, 0.29220656])# 计算过程:output[channel] = (input[channel] - mean[channel]) / std[channel]
])
transform_eval =T.Compose([ T.Resize((256,256)),
T.Transpose(),
T.Normalize(mean=[0, 0, 0], # 像素值归一化
std =[255, 255, 255]), # transforms.ToTensor(), # transpose操作 + (img / 255),并且数据结构变为PaddleTensor
T.Normalize(mean=[0.50950350, 0.54632660, 0.57409690],# 减均值 除标准差
std= [0.26059777, 0.26041326, 0.29220656])# 计算过程:output[channel] = (input[channel] - mean[channel]) / std[channel]
])
2.3.3 实例化数据集类
根据所使用的数据集需求实例化数据集类,并查看总样本量。
train_dataset =Dataset(mode='train',transforms=transform_train)
eval_dataset =Dataset(mode='eval', transforms=transform_eval )
#数据异步加载
train_loader = paddle.io.DataLoader(train_dataset,
places=paddle.CUDAPlace(0),
batch_size=32,
shuffle=True,
#num_workers=2,
#use_shared_memory=True
)
eval_loader = paddle.io.DataLoader (eval_dataset,
places=paddle.CUDAPlace(0),
batch_size=32,
#num_workers=2,
#use_shared_memory=True
)
print('训练集样本量: {},验证集样本量: {}'.format(len(train_loader), len(eval_loader)))
训练集样本量: 45,验证集样本量: 12
③ 模型选择和开发
3.1 对比网络构建
本次我们选取了经典的卷积神经网络resnet50,vgg19,mobilenet_v2来进行实验比较。
network = paddle.vision.models.mobilenet_v2(num_classes=16)
#模型封装
model = paddle.Model(network)
#模型可视化
model.summary((-1, 3,256 , 256))
3.2 对比网络训练
#优化器选择
class SaveBestModel(paddle.callbacks.Callback):
def __init__(self, target=0.5, path='work/best_model', verbose=0):
self.target = target
self.epoch = None
self.path = path
def on_epoch_end(self, epoch, logs=None):
self.epoch = epoch
def on_eval_end(self, logs=None):
if logs.get('acc') > self.target:
self.target = logs.get('acc')
self.model.save(self.path)
print('best acc is {} at epoch {}'.format(self.target, self.epoch))
callback_visualdl = paddle.callbacks.VisualDL(log_dir='work/vgg19')
callback_savebestmodel = SaveBestModel(target=0.5, path='work/best_model')
callbacks = [callback_visualdl, callback_savebestmodel]
base_lr = config_parameters['lr']
epochs = config_parameters['epochs']
def make_optimizer(parameters=None):
momentum = 0.9
learning_rate= paddle.optimizer.lr.CosineAnnealingDecay(learning_rate=base_lr, T_max=epochs, verbose=False)
weight_decay=paddle.regularizer.L2Decay(0.0001)
optimizer = paddle.optimizer.Momentum(
learning_rate=learning_rate,
momentum=momentum,
weight_decay=weight_decay,
parameters=parameters)
return optimizer
optimizer = make_optimizer(model.parameters())
model.prepare(optimizer,
paddle.nn.CrossEntropyLoss(),
paddle.metric.Accuracy())
model.fit(train_loader,
eval_loader,
epochs=100,
batch_size=1, # 是否打乱样本集
callbacks=callbacks,
verbose=1) # 日志展示格式
3.3 Asymmetrical bottlenecks
3.3.1 Asymmetrical bottlenecks模块的介绍
在总结了以前的工作后,作者认为要实现更节约资源的设计,feature reuse是有效的操作。
图1 Asymmetrical bottlenecks模块细节示意图
__all__ = ['AsymmNet_Large', 'AsymmNet_Small', 'AsymmNet']
import paddle
import paddle.nn as nn
import math
import paddle.nn.functional as F
class HardSigmoid(nn.Layer):
def __init__(self):
super(HardSigmoid, self).__init__()
self.relu = nn.ReLU6()
def forward(self, x):
return self.relu(x + 3) / 6
class HardSwish(nn.Layer):
def __init__(self):
super(HardSwish, self).__init__()
self.sigmoid = HardSigmoid()
def forward(self, x):
return x * self.sigmoid(x)
class Activation(nn.Layer):
def __init__(self, act_func):
super(Activation, self).__init__()
if act_func == "relu":
self.act = nn.ReLU()
elif act_func == "ReLU6":
self.act = nn.ReLU6()
elif act_func == "hard_sigmoid":
self.act = HardSigmoid()
elif act_func == "hard_swish":
self.act = HardSwish()
else:
raise NotImplementedError
def forward(self, x):
return self.act(x)
def make_divisible(x, divisible_by=8):
return int(math.ceil(x * 1. / divisible_by) * divisible_by)
class _BasicUnit(nn.Layer):
def __init__(self, num_in, num_out, kernel_size=1, strides=1, pad=0, num_groups=1,
use_act=True, act_type="relu", norm_layer=nn.BatchNorm2D):
super(_BasicUnit, self).__init__()
self.use_act = use_act
self.conv = nn.Conv2D(in_channels=num_in, out_channels=num_out,
kernel_size=kernel_size, stride=strides,
padding=pad, groups=num_groups, bias_attr=False,
)
self.bn = norm_layer(num_out)
if use_act is True:
self.act = Activation(act_type)
def forward(self, x):
out = self.conv(x)
out = self.bn(out)
if self.use_act:
out = self.act(out)
return out
class SE_Module(nn.Layer):
def __init__(self, channels, reduction=4):
super(SE_Module, self).__init__()
self.Avg = nn.AdaptiveAvgPool2D(1)
reduction_c = make_divisible(channels // reduction)
self.out = nn.Sequential(
nn.Conv2D(channels, reduction_c, 1, bias_attr=True),
nn.ReLU(),
nn.Conv2D(reduction_c, channels, 1, bias_attr=True),
HardSigmoid()
)
def forward(self, x):
y = self.Avg(x)
y = self.out(y)
return x * y
class AsymmBottleneck(nn.Layer):
def __init__(self, num_in, num_mid, num_out, kernel_size, asymmrate=1,
act_type="relu", use_se=False, strides=1,
norm_layer=nn.BatchNorm2D):
super(AsymmBottleneck, self).__init__()
assert isinstance(asymmrate, int)
self.asymmrate = asymmrate
self.use_se = use_se
self.use_short_cut_conv = (num_in == num_out and strides == 1)
self.do_expand = (num_mid > max(num_in, asymmrate * num_in))
if self.do_expand:
self.expand = _BasicUnit(num_in, num_mid - asymmrate * num_in,
kernel_size=1,
strides=1, pad=0, act_type=act_type,
norm_layer=norm_layer)
num_mid += asymmrate * num_in
self.dw_conv = _BasicUnit(num_mid, num_mid, kernel_size, strides,
pad=self._get_pad(kernel_size), act_type=act_type,
num_groups=num_mid, norm_layer=norm_layer)
if self.use_se:
self.se = SE_Module(num_mid)
self.pw_conv_linear = _BasicUnit(num_mid, num_out, kernel_size=1, strides=1,
pad=0, act_type=act_type, use_act=False,
norm_layer=norm_layer, num_groups=1)
def forward(self, x):
if self.do_expand:
out = self.expand(x)
feat = []
for i in range(self.asymmrate):
feat.append(x)
feat.append(out)
for i in range(self.asymmrate):
feat.append(x)
if self.asymmrate > 0:
out = paddle.concat(feat, axis=1)
else:
out = x
out = self.dw_conv(out)
if self.use_se:
out = self.se(out)
out = self.pw_conv_linear(out)
if self.use_short_cut_conv:
return x + out
return out
def _get_pad(self, kernel_size):
if kernel_size == 1:
return 0
elif kernel_size == 3:
return 1
elif kernel_size == 5:
return 2
elif kernel_size == 7:
return 3
else:
raise NotImplementedError
def get_asymmnet_cfgs(model_name):
if model_name == 'asymmnet_large':
inplanes = 16
cfg = [
# k, exp, c, se, nl, s,
# stage1
[3, 16, 16, False, 'relu', 1],
# stage2
[3, 64, 24, False, 'relu', 2],
[3, 72, 24, False, 'relu', 1],
# stage3
[5, 72, 40, True, 'relu', 2],
[5, 120, 40, True, 'relu', 1],
[5, 120, 40, True, 'relu', 1],
# stage4
[3, 240, 80, False, 'hard_swish', 2],
[3, 200, 80, False, 'hard_swish', 1],
[3, 184, 80, False, 'hard_swish', 1],
[3, 184, 80, False, 'hard_swish', 1],
[3, 480, 112, True, 'hard_swish', 1],
[3, 672, 112, True, 'hard_swish', 1],
# stage5
[5, 672, 160, True, 'hard_swish', 2],
[5, 960, 160, True, 'hard_swish', 1],
[5, 960, 160, True, 'hard_swish', 1],
]
cls_ch_squeeze = 960
cls_ch_expand = 1280
elif model_name == 'asymmnet_small':
inplanes = 16
cfg = [
# k, exp, c, se, nl, s,
[3, 16, 16, True, 'relu', 2],
[3, 72, 24, False, 'relu', 2],
[3, 88, 24, False, 'relu', 1],
[5, 96, 40, True, 'hard_swish', 2],
[5, 240, 40, True, 'hard_swish', 1],
[5, 240, 40, True, 'hard_swish', 1],
[5, 120, 48, True, 'hard_swish', 1],
[5, 144, 48, True, 'hard_swish', 1],
[5, 288, 96, True, 'hard_swish', 2],
[5, 576, 96, True, 'hard_swish', 1],
[5, 576, 96, True, 'hard_swish', 1],
]
cls_ch_squeeze = 576
cls_ch_expand = 1280
else:
raise ValueError('{} model_name is not supported now!'.format(model_name))
return inplanes, cfg, cls_ch_squeeze, cls_ch_expand
class AsymmNet(nn.Layer):
def __init__(self, cfgs_name, num_classes=config_parameters['class_dim'], multiplier=1.0, asymmrate=1, dropout_rate=0.2,
norm_layer=nn.BatchNorm2D):
super(AsymmNet, self).__init__()
inplanes, cfg, cls_ch_squeeze, cls_ch_expand = get_asymmnet_cfgs(cfgs_name)
k = multiplier
self.inplanes = make_divisible(inplanes * k)
self.first_block = nn.Sequential(
nn.Conv2D(3, self.inplanes, 3, 2, 1, bias_attr=False),
nn.BatchNorm2D(self.inplanes),
HardSwish(),
)
asymm_layers = []
for layer_cfg in cfg:
layer = self._make_layer(kernel_size=layer_cfg[0],
exp_ch=make_divisible(k * layer_cfg[1]),
out_channel=make_divisible(k * layer_cfg[2]),
use_se=layer_cfg[3],
act_func=layer_cfg[4],
asymmrate=asymmrate,
stride=layer_cfg[5],
norm_layer=norm_layer,
)
asymm_layers.append(layer)
self.asymm_block = nn.Sequential(*asymm_layers)
self.last_block = nn.Sequential(
nn.Conv2D(self.inplanes, make_divisible(k * cls_ch_squeeze), 1, bias_attr=False),
nn.BatchNorm2D(make_divisible(k * cls_ch_squeeze)),
HardSwish(),
nn.AdaptiveAvgPool2D(1),
nn.Conv2D(make_divisible(k * cls_ch_squeeze), cls_ch_expand, 1, bias_attr=False),
HardSwish(),
nn.Dropout2D(p=dropout_rate),
nn.Flatten(),
)
self.output = nn.Linear(cls_ch_expand, num_classes)
def _make_layer(self, kernel_size, exp_ch, out_channel, use_se, act_func, asymmrate, stride,
norm_layer):
mid_planes = exp_ch
out_planes = out_channel
layer = AsymmBottleneck(self.inplanes, mid_planes,
out_planes, kernel_size, asymmrate,
act_func, strides=stride, use_se=use_se, norm_layer=norm_layer)
self.inplanes = out_planes
return layer
def forward(self, x):
x = self.first_block(x)
x = self.asymm_block(x)
x = self.last_block(x)
x = self.output(x)
return x
class AsymmNet_Large(AsymmNet):
def __init__(self, **kwargs):
super(AsymmNet_Large, self).__init__(cfgs_name='asymmnet_large', **kwargs)
class AsymmNet_Small(AsymmNet):
def __init__(self, **kwargs):
super(AsymmNet_Small, self).__init__(cfgs_name='asymmnet_small', **kwargs)
if __name__ == '__main__':
img = paddle.rand((1, 3, 256, 256))
vit = AsymmNet_Large()
out = vit(img)
print(out.shape)
[1, 16]
/opt/conda/envs/python35-paddle120-env/lib/python3.7/site-packages/paddle/nn/layer/norm.py:641: UserWarning: When training, we now always track global mean and variance.
"When training, we now always track global mean and variance.")
④改进模型的训练和优化器的选择
model = AsymmNet_Large()
model = paddle.Model(model)
model.summary((1,3,256,256))
#优化器选择
class SaveBestModel(paddle.callbacks.Callback):
def __init__(self, target=0.5, path='work/best_model', verbose=0):
self.target = target
self.epoch = None
self.path = path
def on_epoch_end(self, epoch, logs=None):
self.epoch = epoch
def on_eval_end(self, logs=None):
if logs.get('acc') > self.target:
self.target = logs.get('acc')
self.model.save(self.path)
print('best acc is {} at epoch {}'.format(self.target, self.epoch))
callback_visualdl = paddle.callbacks.VisualDL(log_dir='work/AsymmNet_Net')
callback_savebestmodel = SaveBestModel(target=0.5, path='work/best_model')
callbacks = [callback_visualdl, callback_savebestmodel]
base_lr = config_parameters['lr']
epochs = config_parameters['epochs']
def make_optimizer(parameters=None):
momentum = 0.9
learning_rate= paddle.optimizer.lr.CosineAnnealingDecay(learning_rate=base_lr, T_max=epochs, verbose=False)
weight_decay=paddle.regularizer.L2Decay(0.0002)
optimizer = paddle.optimizer.Momentum(
learning_rate=learning_rate,
momentum=momentum,
weight_decay=weight_decay,
parameters=parameters)
return optimizer
optimizer = make_optimizer(model.parameters())
model.prepare(optimizer,
paddle.nn.CrossEntropyLoss(),
paddle.metric.Accuracy())
model.fit(train_loader,
eval_loader,
epochs=100,
batch_size=1, # 是否打乱样本集
callbacks=callbacks,
tum,
weight_decay=weight_decay,
parameters=parameters)
return optimizer
optimizer = make_optimizer(model.parameters())
model.prepare(optimizer,
paddle.nn.CrossEntropyLoss(),
paddle.metric.Accuracy())
model.fit(train_loader,
eval_loader,
epochs=100,
batch_size=1, # 是否打乱样本集
callbacks=callbacks,
verbose=1) # 日志展示格式
⑤模型训练效果展示
AsymmNet模型训练速度很快,模型收敛的速度也非常快,性能有了大幅度的提升。
⑥项目总结
-
1.项目中的注意力AsymmNet模型时采取了学习率分段衰减的方式,对比实验模型采取了同样的方式进行训练。AsymmNet模型Mobilenet_v2的轻量化模型鲁棒性更好,模型收敛的速度更快,模型也更小。
-
2.小伙伴们后期可以增大L2正则化项系数和数据增强来抑制过拟合,模型的准确度应该还会增加。
更多推荐
所有评论(0)