① 项目背景

  • AsymmNet的思路来自于德国的Hasso_Plattner_Institute以及阿里云/字节跳动等三家团队的贡献。论文中发现使用深度可分离卷积的系列模型,它的主要计算量都集中在了(两个)PointWise卷积上,因此基于Invered_residual_block的设计理念,也就是说把第一个Pointwise卷积用来扩充通道数,来提升信息流规模,就是团队做的一个共识的分析;
  • 作者团队认为第一个Pointwise卷积主要是用来扩充通道,第二个Depthwise卷积用来学习特征的空间相关性,然后接下来的Pointwise卷积是用来学习通道相关性,这就像是从Inception_BlockXception一条通路传承下来的工程共识,虽然它没有被理论证明过的。
  • 所以,这里就有一个设想,如果能够把第一个Pointwise卷积(就是用来做通道扩容、信息扩容这件事的卷积),把它的一部分的features直接用搬运的方式、用Copy的方式来做,省下来的计算量把它迁移到主要任务是在学习特征的Depthwise卷积和第二个Pointwise卷积上面,把它转移过去,这样可以提升这个Block的特征学习能力和表达能力?
    基于这个的想法,通过简单的改变和尝试,作者发现它的实验结果是在220M FLOPs这样的计算区间和低于220M的那种超轻量CNN模型的计算区间上面它的性能提升是极为明显而有效的,且是可以优于Invered_residual_block原始设计的。

论文地址:https://arxiv.org/pdf/2104.07770.pdf

② 数据准备

2.1 解压缩数据集

我们将网上获取的数据集以压缩包的方式上传到aistudio数据集中,并加载到我们的项目内。

在使用之前我们进行数据集压缩包的一个解压。

!unzip -oq /home/aistudio/data/data69664/Images.zip -d work/dataset
import paddle
import numpy as np
from typing import Callable
#参数配置
config_parameters = {
    "class_dim": 16,  #分类数
    "target_path":"/home/aistudio/work/",                     
    'train_image_dir': '/home/aistudio/work/trainImages',
    'eval_image_dir': '/home/aistudio/work/evalImages',
    'epochs':100,
    'batch_size': 32,
    'lr': 0.01
}

2.2 划分数据集

接下来我们使用标注好的文件进行数据集类的定义,方便后续模型训练使用。

import os
import shutil

train_dir = config_parameters['train_image_dir']
eval_dir = config_parameters['eval_image_dir']
paths = os.listdir('work/dataset/Images')

if not os.path.exists(train_dir):
    os.mkdir(train_dir)
if not os.path.exists(eval_dir):
    os.mkdir(eval_dir)

for path in paths:
    imgs_dir = os.listdir(os.path.join('work/dataset/Images', path))
    target_train_dir = os.path.join(train_dir,path)
    target_eval_dir = os.path.join(eval_dir,path)
    if not os.path.exists(target_train_dir):
        os.mkdir(target_train_dir)
    if not os.path.exists(target_eval_dir):
        os.mkdir(target_eval_dir)
    for i in range(len(imgs_dir)):
        if ' ' in imgs_dir[i]:
            new_name = imgs_dir[i].replace(' ', '_')
        else:
            new_name = imgs_dir[i]
        target_train_path = os.path.join(target_train_dir, new_name)
        target_eval_path = os.path.join(target_eval_dir, new_name)     
        if i % 5 == 0:
            shutil.copyfile(os.path.join(os.path.join('work/dataset/Images', path), imgs_dir[i]), target_eval_path)
        else:
            shutil.copyfile(os.path.join(os.path.join('work/dataset/Images', path), imgs_dir[i]), target_train_path)

print('finished train val split!')
finished train val split!

2.3 数据集定义与数据集展示

2.3.1 数据集展示

我们先看一下解压缩后的数据集长成什么样子,对比分析经典模型在Caltech101抽取16类mini版数据集上的效果


import os
import random
from matplotlib import pyplot as plt
from PIL import Image

imgs = []
paths = os.listdir('work/dataset/Images')
for path in paths:   
    img_path = os.path.join('work/dataset/Images', path)
    if os.path.isdir(img_path):
        img_paths = os.listdir(img_path)
        img = Image.open(os.path.join(img_path, random.choice(img_paths)))
        imgs.append((img, path))

f, ax = plt.subplots(4, 4, figsize=(12,12))
for i, img in enumerate(imgs[:16]):
    ax[i//4, i%4].imshow(img[0])
    ax[i//4, i%4].axis('off')
    ax[i//4, i%4].set_title('label: %s' % img[1])
plt.show()

在这里插入图片描述

2.3.2 导入数据集的定义实现

#数据集的定义
class Dataset(paddle.io.Dataset):
    """
    步骤一:继承paddle.io.Dataset类
    """
    def __init__(self, transforms: Callable, mode: str ='train'):
        """
        步骤二:实现构造函数,定义数据读取方式
        """
        super(Dataset, self).__init__()
        
        self.mode = mode
        self.transforms = transforms

        train_image_dir = config_parameters['train_image_dir']
        eval_image_dir = config_parameters['eval_image_dir']

        train_data_folder = paddle.vision.DatasetFolder(train_image_dir)
        eval_data_folder = paddle.vision.DatasetFolder(eval_image_dir)
        
        if self.mode  == 'train':
            self.data = train_data_folder
        elif self.mode  == 'eval':
            self.data = eval_data_folder

    def __getitem__(self, index):
        """
        步骤三:实现__getitem__方法,定义指定index时如何获取数据,并返回单条数据(训练数据,对应的标签)
        """
        data = np.array(self.data[index][0]).astype('float32')

        data = self.transforms(data)

        label = np.array([self.data[index][1]]).astype('int64')
        
        return data, label
        
    def __len__(self):
        """
        步骤四:实现__len__方法,返回数据集总数目
        """
        return len(self.data)
from paddle.vision import transforms as T

#数据增强
transform_train =T.Compose([T.Resize((256,256)),
                            #T.RandomVerticalFlip(10),
                            #T.RandomHorizontalFlip(10),
                            T.RandomRotation(10),
                            T.Transpose(),
                            T.Normalize(mean=[0, 0, 0],                           # 像素值归一化
                                        std =[255, 255, 255]),                    # transforms.ToTensor(), # transpose操作 + (img / 255),并且数据结构变为PaddleTensor
                            T.Normalize(mean=[0.50950350, 0.54632660, 0.57409690],# 减均值 除标准差    
                                        std= [0.26059777, 0.26041326, 0.29220656])# 计算过程:output[channel] = (input[channel] - mean[channel]) / std[channel]
                            ])
transform_eval =T.Compose([ T.Resize((256,256)),
                            T.Transpose(),
                            T.Normalize(mean=[0, 0, 0],                           # 像素值归一化
                                        std =[255, 255, 255]),                    # transforms.ToTensor(), # transpose操作 + (img / 255),并且数据结构变为PaddleTensor
                            T.Normalize(mean=[0.50950350, 0.54632660, 0.57409690],# 减均值 除标准差    
                                        std= [0.26059777, 0.26041326, 0.29220656])# 计算过程:output[channel] = (input[channel] - mean[channel]) / std[channel]
                            ])

2.3.3 实例化数据集类

根据所使用的数据集需求实例化数据集类,并查看总样本量。


train_dataset =Dataset(mode='train',transforms=transform_train)
eval_dataset  =Dataset(mode='eval', transforms=transform_eval )

#数据异步加载
train_loader = paddle.io.DataLoader(train_dataset, 
                                    places=paddle.CUDAPlace(0), 
                                    batch_size=32, 
                                    shuffle=True,
                                    #num_workers=2,
                                    #use_shared_memory=True
                                    )
eval_loader = paddle.io.DataLoader (eval_dataset, 
                                    places=paddle.CUDAPlace(0), 
                                    batch_size=32,
                                    #num_workers=2,
                                    #use_shared_memory=True
                                    )

print('训练集样本量: {},验证集样本量: {}'.format(len(train_loader), len(eval_loader)))
训练集样本量: 45,验证集样本量: 12

③ 模型选择和开发

3.1 对比网络构建

本次我们选取了经典的卷积神经网络resnet50,vgg19,mobilenet_v2来进行实验比较。

network = paddle.vision.models.mobilenet_v2(num_classes=16)
#模型封装
model = paddle.Model(network)
#模型可视化
model.summary((-1, 3,256 , 256))

3.2 对比网络训练

#优化器选择
class SaveBestModel(paddle.callbacks.Callback):
    def __init__(self, target=0.5, path='work/best_model', verbose=0):
        self.target = target
        self.epoch = None
        self.path = path

    def on_epoch_end(self, epoch, logs=None):
        self.epoch = epoch

    def on_eval_end(self, logs=None):
        if logs.get('acc') > self.target:
            self.target = logs.get('acc')
            self.model.save(self.path)
            print('best acc is {} at epoch {}'.format(self.target, self.epoch))

callback_visualdl = paddle.callbacks.VisualDL(log_dir='work/vgg19')
callback_savebestmodel = SaveBestModel(target=0.5, path='work/best_model')
callbacks = [callback_visualdl, callback_savebestmodel]

base_lr = config_parameters['lr']
epochs = config_parameters['epochs']

def make_optimizer(parameters=None):
    momentum = 0.9

    learning_rate= paddle.optimizer.lr.CosineAnnealingDecay(learning_rate=base_lr, T_max=epochs, verbose=False)
    weight_decay=paddle.regularizer.L2Decay(0.0001)
    optimizer = paddle.optimizer.Momentum(
        learning_rate=learning_rate,
        momentum=momentum,
        weight_decay=weight_decay,
        parameters=parameters)
    return optimizer

optimizer = make_optimizer(model.parameters())

model.prepare(optimizer,
              paddle.nn.CrossEntropyLoss(),
              paddle.metric.Accuracy())

model.fit(train_loader,
          eval_loader,
          epochs=100,
          batch_size=1,           # 是否打乱样本集     
          callbacks=callbacks, 
          verbose=1)   # 日志展示格式

3.3 Asymmetrical bottlenecks

3.3.1 Asymmetrical bottlenecks模块的介绍

在总结了以前的工作后,作者认为要实现更节约资源的设计,feature reuse是有效的操作。

图1 Asymmetrical bottlenecks模块细节示意图

__all__ = ['AsymmNet_Large', 'AsymmNet_Small', 'AsymmNet']

import paddle
import paddle.nn as nn
import math
import paddle.nn.functional as F


class HardSigmoid(nn.Layer):
    def __init__(self):
        super(HardSigmoid, self).__init__()
        self.relu = nn.ReLU6()

    def forward(self, x):
        return self.relu(x + 3) / 6

class HardSwish(nn.Layer):
    def __init__(self):
        super(HardSwish, self).__init__()
        self.sigmoid = HardSigmoid()

    def forward(self, x):
        return x * self.sigmoid(x)

class Activation(nn.Layer):
    def __init__(self, act_func):
        super(Activation, self).__init__()
        if act_func == "relu":
            self.act = nn.ReLU()
        elif act_func == "ReLU6":
            self.act = nn.ReLU6()
        elif act_func == "hard_sigmoid":
            self.act = HardSigmoid()
        elif act_func == "hard_swish":
            self.act = HardSwish()
        else:
            raise NotImplementedError

    def forward(self, x):
        return self.act(x)


def make_divisible(x, divisible_by=8):
    return int(math.ceil(x * 1. / divisible_by) * divisible_by)


class _BasicUnit(nn.Layer):
    def __init__(self, num_in, num_out, kernel_size=1, strides=1, pad=0, num_groups=1,
                 use_act=True, act_type="relu", norm_layer=nn.BatchNorm2D):
        super(_BasicUnit, self).__init__()
        self.use_act = use_act
        self.conv = nn.Conv2D(in_channels=num_in, out_channels=num_out,
                              kernel_size=kernel_size, stride=strides,
                              padding=pad, groups=num_groups, bias_attr=False,
                              )
        self.bn = norm_layer(num_out)
        if use_act is True:
            self.act = Activation(act_type)

    def forward(self, x):
        out = self.conv(x)
        out = self.bn(out)
        if self.use_act:
            out = self.act(out)
        return out


class SE_Module(nn.Layer):
    def __init__(self, channels, reduction=4):
        super(SE_Module, self).__init__()
        self.Avg = nn.AdaptiveAvgPool2D(1)
        reduction_c = make_divisible(channels // reduction)
        self.out = nn.Sequential(
            nn.Conv2D(channels, reduction_c, 1, bias_attr=True),
            nn.ReLU(),
            nn.Conv2D(reduction_c, channels, 1, bias_attr=True),
            HardSigmoid()
        )

    def forward(self, x):
        y = self.Avg(x)
        y = self.out(y)
        return x * y


class AsymmBottleneck(nn.Layer):
    def __init__(self, num_in, num_mid, num_out, kernel_size, asymmrate=1,
                 act_type="relu", use_se=False, strides=1,
                 norm_layer=nn.BatchNorm2D):
        super(AsymmBottleneck, self).__init__()
        assert isinstance(asymmrate, int)
        self.asymmrate = asymmrate
        self.use_se = use_se
        self.use_short_cut_conv = (num_in == num_out and strides == 1)
        self.do_expand = (num_mid > max(num_in, asymmrate * num_in))
        if self.do_expand:
            self.expand = _BasicUnit(num_in, num_mid - asymmrate * num_in,
                                     kernel_size=1,
                                     strides=1, pad=0, act_type=act_type,
                                     norm_layer=norm_layer)
            num_mid += asymmrate * num_in
        self.dw_conv = _BasicUnit(num_mid, num_mid, kernel_size, strides,
                                  pad=self._get_pad(kernel_size), act_type=act_type,
                                  num_groups=num_mid, norm_layer=norm_layer)
        if self.use_se:
            self.se = SE_Module(num_mid)
        self.pw_conv_linear = _BasicUnit(num_mid, num_out, kernel_size=1, strides=1,
                                         pad=0, act_type=act_type, use_act=False,
                                         norm_layer=norm_layer, num_groups=1)

    def forward(self, x):
        if self.do_expand:
            out = self.expand(x)
            feat = []
            for i in range(self.asymmrate):
                feat.append(x)
            feat.append(out)
            for i in range(self.asymmrate):
                feat.append(x)
            if self.asymmrate > 0:
                out = paddle.concat(feat, axis=1)
        else:
            out = x
        out = self.dw_conv(out)
        if self.use_se:
            out = self.se(out)
        out = self.pw_conv_linear(out)
        if self.use_short_cut_conv:
            return x + out
        return out

    def _get_pad(self, kernel_size):
        if kernel_size == 1:
            return 0
        elif kernel_size == 3:
            return 1
        elif kernel_size == 5:
            return 2
        elif kernel_size == 7:
            return 3
        else:
            raise NotImplementedError


def get_asymmnet_cfgs(model_name):
    if model_name == 'asymmnet_large':
        inplanes = 16
        cfg = [
            # k, exp, c,  se,     nl,  s,
            # stage1
            [3, 16, 16, False, 'relu', 1],
            # stage2
            [3, 64, 24, False, 'relu', 2],
            [3, 72, 24, False, 'relu', 1],
            # stage3
            [5, 72, 40, True, 'relu', 2],
            [5, 120, 40, True, 'relu', 1],
            [5, 120, 40, True, 'relu', 1],
            # stage4
            [3, 240, 80, False, 'hard_swish', 2],
            [3, 200, 80, False, 'hard_swish', 1],
            [3, 184, 80, False, 'hard_swish', 1],
            [3, 184, 80, False, 'hard_swish', 1],
            [3, 480, 112, True, 'hard_swish', 1],
            [3, 672, 112, True, 'hard_swish', 1],
            # stage5
            [5, 672, 160, True, 'hard_swish', 2],
            [5, 960, 160, True, 'hard_swish', 1],
            [5, 960, 160, True, 'hard_swish', 1],
        ]
        cls_ch_squeeze = 960
        cls_ch_expand = 1280
    elif model_name == 'asymmnet_small':
        inplanes = 16
        cfg = [
            # k, exp, c,  se,     nl,  s,
            [3, 16, 16, True, 'relu', 2],
            [3, 72, 24, False, 'relu', 2],
            [3, 88, 24, False, 'relu', 1],
            [5, 96, 40, True, 'hard_swish', 2],
            [5, 240, 40, True, 'hard_swish', 1],
            [5, 240, 40, True, 'hard_swish', 1],
            [5, 120, 48, True, 'hard_swish', 1],
            [5, 144, 48, True, 'hard_swish', 1],
            [5, 288, 96, True, 'hard_swish', 2],
            [5, 576, 96, True, 'hard_swish', 1],
            [5, 576, 96, True, 'hard_swish', 1],
        ]
        cls_ch_squeeze = 576
        cls_ch_expand = 1280
    else:
        raise ValueError('{} model_name is not supported now!'.format(model_name))

    return inplanes, cfg, cls_ch_squeeze, cls_ch_expand


class AsymmNet(nn.Layer):
    def __init__(self, cfgs_name, num_classes=config_parameters['class_dim'], multiplier=1.0, asymmrate=1, dropout_rate=0.2,
                 norm_layer=nn.BatchNorm2D):
        super(AsymmNet, self).__init__()
        inplanes, cfg, cls_ch_squeeze, cls_ch_expand = get_asymmnet_cfgs(cfgs_name)
        k = multiplier
        self.inplanes = make_divisible(inplanes * k)
        self.first_block = nn.Sequential(
            nn.Conv2D(3, self.inplanes, 3, 2, 1, bias_attr=False),
            nn.BatchNorm2D(self.inplanes),
            HardSwish(),
        )

        asymm_layers = []
        for layer_cfg in cfg:
            layer = self._make_layer(kernel_size=layer_cfg[0],
                                     exp_ch=make_divisible(k * layer_cfg[1]),
                                     out_channel=make_divisible(k * layer_cfg[2]),
                                     use_se=layer_cfg[3],
                                     act_func=layer_cfg[4],
                                     asymmrate=asymmrate,
                                     stride=layer_cfg[5],
                                     norm_layer=norm_layer,
                                     )
            asymm_layers.append(layer)
        self.asymm_block = nn.Sequential(*asymm_layers)
        self.last_block = nn.Sequential(
            nn.Conv2D(self.inplanes, make_divisible(k * cls_ch_squeeze), 1, bias_attr=False),
            nn.BatchNorm2D(make_divisible(k * cls_ch_squeeze)),
            HardSwish(),
            nn.AdaptiveAvgPool2D(1),
            nn.Conv2D(make_divisible(k * cls_ch_squeeze), cls_ch_expand, 1, bias_attr=False),
            HardSwish(),
            nn.Dropout2D(p=dropout_rate),
            nn.Flatten(),
        )
        self.output = nn.Linear(cls_ch_expand, num_classes)

    def _make_layer(self, kernel_size, exp_ch, out_channel, use_se, act_func, asymmrate, stride,
                    norm_layer):
        mid_planes = exp_ch
        out_planes = out_channel
        layer = AsymmBottleneck(self.inplanes, mid_planes,
                                out_planes, kernel_size, asymmrate,
                                act_func, strides=stride, use_se=use_se, norm_layer=norm_layer)
        self.inplanes = out_planes
        return layer

    def forward(self, x):
        x = self.first_block(x)
        x = self.asymm_block(x)
        x = self.last_block(x)
        x = self.output(x)
        return x


class AsymmNet_Large(AsymmNet):
    def __init__(self, **kwargs):
        super(AsymmNet_Large, self).__init__(cfgs_name='asymmnet_large', **kwargs)


class AsymmNet_Small(AsymmNet):
    def __init__(self, **kwargs):
        super(AsymmNet_Small, self).__init__(cfgs_name='asymmnet_small', **kwargs)


if __name__ == '__main__':

    
    img = paddle.rand((1, 3, 256, 256))
    vit = AsymmNet_Large()
    out = vit(img)
    print(out.shape)
[1, 16]


/opt/conda/envs/python35-paddle120-env/lib/python3.7/site-packages/paddle/nn/layer/norm.py:641: UserWarning: When training, we now always track global mean and variance.
  "When training, we now always track global mean and variance.")

④改进模型的训练和优化器的选择


model = AsymmNet_Large()
model = paddle.Model(model)
model.summary((1,3,256,256))
#优化器选择
class SaveBestModel(paddle.callbacks.Callback):
    def __init__(self, target=0.5, path='work/best_model', verbose=0):
        self.target = target
        self.epoch = None
        self.path = path

    def on_epoch_end(self, epoch, logs=None):
        self.epoch = epoch

    def on_eval_end(self, logs=None):
        if logs.get('acc') > self.target:
            self.target = logs.get('acc')
            self.model.save(self.path)
            print('best acc is {} at epoch {}'.format(self.target, self.epoch))

callback_visualdl = paddle.callbacks.VisualDL(log_dir='work/AsymmNet_Net')
callback_savebestmodel = SaveBestModel(target=0.5, path='work/best_model')
callbacks = [callback_visualdl, callback_savebestmodel]

base_lr = config_parameters['lr']
epochs = config_parameters['epochs']

def make_optimizer(parameters=None):
    momentum = 0.9

    learning_rate= paddle.optimizer.lr.CosineAnnealingDecay(learning_rate=base_lr, T_max=epochs, verbose=False)
    weight_decay=paddle.regularizer.L2Decay(0.0002)
    optimizer = paddle.optimizer.Momentum(
        learning_rate=learning_rate,
        momentum=momentum,
        weight_decay=weight_decay,
        parameters=parameters)
    return optimizer

optimizer = make_optimizer(model.parameters())
model.prepare(optimizer,
              paddle.nn.CrossEntropyLoss(),
              paddle.metric.Accuracy())
model.fit(train_loader,
          eval_loader,
          epochs=100,
          batch_size=1,           # 是否打乱样本集     
          callbacks=callbacks, 
tum,
        weight_decay=weight_decay,
        parameters=parameters)
    return optimizer

optimizer = make_optimizer(model.parameters())
model.prepare(optimizer,
              paddle.nn.CrossEntropyLoss(),
              paddle.metric.Accuracy())
model.fit(train_loader,
          eval_loader,
          epochs=100,
          batch_size=1,           # 是否打乱样本集     
          callbacks=callbacks, 
          verbose=1)   # 日志展示格式

⑤模型训练效果展示

AsymmNet模型训练速度很快,模型收敛的速度也非常快,性能有了大幅度的提升。

⑥项目总结

  • 1.项目中的注意力AsymmNet模型时采取了学习率分段衰减的方式,对比实验模型采取了同样的方式进行训练。AsymmNet模型Mobilenet_v2的轻量化模型鲁棒性更好,模型收敛的速度更快,模型也更小。

  • 2.小伙伴们后期可以增大L2正则化项系数和数据增强来抑制过拟合,模型的准确度应该还会增加。

Logo

学大模型,用大模型上飞桨星河社区!每天8点V100G算力免费领!免费领取ERNIE 4.0 100w Token >>>

更多推荐