【深度学习实战】二、Numpy手撸神经网络实现cifar10分类

一、简介

在学习深度学习时,在理论学习完成后,我们常常会直接使用框架(paddle/torch/tensorflow)来搭建我们的模型,常常忽略了各种层结构的底层实现。学习完成深度学习理论的你,能不能手撸一个简单的模型呢?本文旨在从基础开始,一步一步实现深度学习的参数优化,模型搭建过程,巩固基础知识,从理论到实践,一步一步探索深度学习的奥秘。

本文不会过多介绍深度学习的理论,直接从代码层面来实现全连接层、激活函数和优化器,搭建一个简单的全连接模型,从cifar10挑选10000条数据,并对比自己实现的模型和paddle的同样结构的预测准确率,验证模型的效果。

二、目标

手撸神经网络交叉熵损失函数,实现cifar10图像分类任务。
主要内容包括:
1、实现全连接层;
2、实现ReLU激活函数;
3、实现交叉熵损失函数(SoftmaxWithLogits);
4、实现动量梯度下降优化器;
5、实现cifar10图片分类(仅挑选部分数据);
6、相同的网络结构,与Paddle对比cifar10的分类效果。

三、实现思路

在深度学习框架中,数据都是以tensor的形式进行计算,这里为了简单,数据的输入和输入都是以numpy.ndarray的格式传输。
本小节内容包含了相关类的实现。

1、tensor和初始化

tensor包含data和grad,保存data和对应的梯度数据。

# 因为层的参数需要保存值和对应的梯度,这里定义梯度,可训练的参数全部以Tensor的类别保存

import numpy as np
np.random.seed(10001)

class Tensor:
    def __init__(self, shape):
        self.data = np.zeros(shape=shape, dtype=np.float32) # 存放数据
        self.grad = np.zeros(shape=shape, dtype=np.float32) # 存放梯度

    def clear_grad(self):
        self.grad = np.zeros_like(self.grad)

    def __str__(self):
        return "Tensor shape: {}, data: {}".format(self.data.shape, self.data)


# Tensor的初始化类,目前仅提供Normal初始化和Constant初始化
class Initializer:
    """
    基类
    """
    def __init__(self, shape=None, name='initializer'):
        self.shape = shape
        self.name = name

    def __call__(self, *args, **kwargs):
        raise NotImplementedError

    def __str__(self):
        return self.name


class Constant(Initializer):
    def __init__(self, value=0., name='constant initializer', *args, **kwargs):
        super().__init__(name=name, *args, **kwargs)
        self.value = value

    def __call__(self, shape=None, *args, **kwargs):
        if shape:
            self.shape = shape
        assert shape is not None, "the shape of initializer must not be None."
        return self.value + np.zeros(shape=self.shape)


class Normal(Initializer):
    def __init__(self, mean=0., std=0.01, name='normal initializer', *args, **kwargs):
        super().__init__(name=name, *args, **kwargs)
        self.mean = mean
        self.std = std

    def __call__(self, shape=None, *args, **kwargs):
        if shape:
            self.shape = shape
        assert shape is not None, "the shape of initializer must not be None."
        return np.random.normal(self.mean, self.std, size=self.shape)

2、Layer

这里实现了全连接层Linear和ReLU激活函数,主要包含矩阵求导等内容,可自行寻找相关资料。
1、全连接层前向传播和梯度计算
2、ReLU

# 为了使层能够组建起来,实现前向传播和反向传播,首先定义层的基类Layer
# Layer的几个主要方法说明:
#   forward: 实现前向传播
#   backward: 实现反向传播
#   parameters: 返回该层的参数,传入优化器进行优化

class Layer:
    def __init__(self, name='layer', *args, **kwargs):
        self.name = name

    def forward(self, *args, **kwargs):
        raise NotImplementedError

    def backward(self):
        raise NotImplementedError

    def parameters(self):
        return []

    def __call__(self, *args, **kwargs):
        return self.forward(*args, **kwargs)

    def __str__(self):
        return self.name


class Linear(Layer):
    """
    input X, shape: [N, C]
    output Y, shape: [N, O]
    weight W, shape: [C, O]
    bias b, shape: [1, O]
    grad dY, shape: [N, O]
    forward formula:
        Y = X @ W + b   # @表示矩阵乘法
    backward formula:
        dW = X.T @ dY
        db = sum(dY, axis=0)
        dX = dY @ W.T
    """
    def __init__(
        self,
        in_features,
        out_features,
        name='linear',
        weight_attr=Normal(),
        bias_attr=Constant(),
        *args,
        **kwargs
        ):
        super().__init__(name=name, *args, **kwargs)
        self.weights = Tensor((in_features, out_features))
        self.weights.data = weight_attr(self.weights.data.shape)
        self.bias = Tensor((1, out_features))
        self.bias.data = bias_attr(self.bias.data.shape)
        self.input = None

    def forward(self, x):
        self.input = x
        output = np.dot(x, self.weights.data) + self.bias.data
        return output

    def backward(self, gradient):
        self.weights.grad += np.dot(self.input.T, gradient)  # dy / dw
        self.bias.grad += np.sum(gradient, axis=0, keepdims=True)  # dy / db 
        input_grad = np.dot(gradient, self.weights.data.T)  # dy / dx
        return input_grad

    def parameters(self):
        return [self.weights, self.bias]

    def __str__(self):
        string = "linear layer, weight shape: {}, bias shape: {}".format(self.weights.data.shape, self.bias.data.shape)
        return string


class ReLU(Layer):
    """
    forward formula:
        relu = x if x >= 0
             = 0 if x < 0
    backwawrd formula:
        grad = gradient * (x > 0)
    """
    def __init__(self, name='relu', *args, **kwargs):
        super().__init__(name=name, *args, **kwargs)
        self.activated = None

    def forward(self, x):
        x[x < 0] = 0
        self.activated = x
        return self.activated

    def backward(self, gradient):
        return gradient * (self.activated > 0)

3、模型组网

将层串联起来,实现前向传播和反向传播。

# 模型组网的功能是将层串起来,实现数据的前向传播和梯度的反向传播
# 添加层的时候,按照顺序添加层的参数
# Sequential方法说明:
#   add: 向组网中添加层
#   forward: 按照组网构建的层顺序,依次前向传播
#   backward: 接收损失函数的梯度,按照层的逆序反向传播

class Sequential:
    def __init__(self, *args, **kwargs):
        self.graphs = []
        self._parameters = []
        for arg_layer in args:
            if isinstance(arg_layer, Layer):
                self.graphs.append(arg_layer)
                self._parameters += arg_layer.parameters()

    def add(self, layer):
        assert isinstance(layer, Layer), "The type of added layer must be Layer, but got {}.".format(type(layer))
        self.graphs.append(layer)
        self._parameters += layer.parameters()

    def forward(self, x):
        for graph in self.graphs:
            x = graph(x)
        return x

    def backward(self, grad):
        # grad backward in inverse order of graph
        for graph in self.graphs[::-1]:
            grad = graph.backward(grad)

    def __call__(self, *args, **kwargs):
        return self.forward(*args, **kwargs)

    def __str__(self):
        string = 'Sequential:\n'
        for graph in self.graphs:
            string += graph.__str__() + '\n'
        return string

    def parameters(self):
        return self._parameters


4、优化器

实现了SGD优化器(带动量)
1、动量梯度下降

# 优化器主要完成根据梯度来优化参数的任务,其主要参数有学习率和正则化类型和正则化系数
# Optimizer主要方法:
#   step: 梯度反向传播后调用,该方法根据计算出的梯度,对参数进行优化
#   clear_grad: 模型调用backward后,梯度会进行累加,如果已经调用step优化过参数,需要将使用过的梯度清空
#   get_decay: 根据不同的正则化方法,计算出正则化惩罚值

class Optimizer:
    """
    optimizer base class.
    Args:
        parameters (Tensor): parameters to be optimized.
        learning_rate (float): learning rate. Default: 0.001.
        weight_decay (float): The decay weight of parameters. Defaylt: 0.0.
        decay_type (str): The type of regularizer. Default: l2.
    """
    def __init__(self, parameters, learning_rate=0.001, weight_decay=0.0, decay_type='l2'):
        assert decay_type in ['l1', 'l2'], "only support decay_type 'l1' and 'l2', but got {}.".format(decay_type)
        self.parameters = parameters
        self.learning_rate = learning_rate
        self.weight_decay = weight_decay
        self.decay_type = decay_type

    def step(self):
        raise NotImplementedError

    def clear_grad(self):
        for p in self.parameters:
            p.clear_grad()

    def get_decay(self, g):
        if self.decay_type == 'l1':
            return self.weight_decay
        elif self.decay_type == 'l2':
            return self.weight_decay * g


class SGD(Optimizer):
    def __init__(self, momentum=0.9, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.momentum = momentum
        self.velocity = []
        for p in self.parameters:
            self.velocity.append(np.zeros_like(p.grad))

    def step(self):
        for p, v in zip(self.parameters, self.velocity):
            decay = self.get_decay(p.grad)
            v = self.momentum * v + p.grad + decay # 动量计算
            p.data = p.data - self.learning_rate * v

5、损失函数

实现了交叉熵损失函数。【如果这里代码错了,请务必联系我】
1、softmax梯度计算


class SoftmaxWithLogits(Layer):
    """
    Softmax with logits error:
        loss[j] = -input[class] + log(sum(exp(input)))
    """
    def __init__(self, reduction='mean', name='softamxwithlogits', *args, **kwargs):
        super().__init__(name=name, *args, **kwargs)
        assert reduction in ['mean', 'none', 'sum'], "reduction only support 'mean', 'none' and 'sum', but got {}.".format(reduction)
        self.reduction = reduction
        self.logits = None
        self.target = None

    def forward(self, logits, target):
        """
        :param y (np.ndarray): predicted logits, shape [N, C]
        :param target (np.ndarray): target logits, shape [N, 1]
        :return: loss
        """
        assert logits.shape[0] == target.shape[0], "The first fimension of logits and target is not same, logits shape {} cann't match target shape {}.".format(logits.shape, target.shape)
        self.logits = logits
        self.target = target
        loss = []
        for i in range(logits.shape[0]):
            loss_i = -logits[i, target.squeeze(-1)[i]] + np.log(np.sum(np.exp(logits[i])))
            loss.append(loss_i)
        loss = np.array(loss).reshape(target.shape)
        if self.reduction == 'mean':
            return loss.mean()
        elif self.reduction == 'sum':
            return loss.sum()
        else:
            return loss

    def backward(self):
        soft_denominator = np.sum(np.exp(self.logits), axis=1, keepdims=True)  # [N, 1]
        eq_grad = np.zeros_like(self.logits)
        for i in range(self.logits.shape[0]):
            eq_grad[i, self.target.squeeze(-1)[i]] = -1
        gradient = np.exp(self.logits) / soft_denominator + eq_grad
        return gradient

# loss_fn = SoftmaxWithLogits()
# logits = np.array([[1., 2., 3.]])
# target = np.array([[1]])
# print(-2 + np.log(np.exp(1)+np.exp(2)+np.exp(3)))
# print(logits.shape, target.shape)
# print(loss_fn(logits, target))
# print(loss_fn.backward(), np.exp(2) / (np.exp(1)+np.exp(2)+np.exp(3)))

6、dataset

# 这里仿照PaddlePaddle,Dataset需要实现__getitem__和__len__方法
class Dataset:
    def __init__(self, *args, **kwargs):
        pass

    def __getitem__(self, idx):
        raise NotImplementedError("'{}' not implement in class {}"
                                  .format('__getitem__', self.__class__.__name__))

    def __len__(self):
        raise NotImplementedError("'{}' not implement in class {}"
                                  .format('__len__', self.__class__.__name__))


# 根据dataset和一些设置,生成每个batch在dataset中的索引
class BatchSampler:
    def __init__(self, dataset=None, shuffle=False, batch_size=1, drop_last=False):
        self.batch_size = batch_size
        self.drop_last = drop_last
        self.shuffle = shuffle

        self.num_data = len(dataset)
        if self.drop_last or (self.num_data % batch_size == 0):
            self.num_samples = self.num_data // batch_size
        else:
            self.num_samples = self.num_data // batch_size + 1
        indices = np.arange(self.num_data)
        if shuffle:
            np.random.shuffle(indices)
        if drop_last:
            indices = indices[:self.num_samples * batch_size]
        self.indices = indices

    def __len__(self):
        return self.num_samples

    def __iter__(self):
        batch_indices = []
        for i in range(self.num_samples):
            if (i + 1) * self.batch_size <= self.num_data:
                for idx in range(i * self.batch_size, (i + 1) * self.batch_size):
                    batch_indices.append(self.indices[idx])
                yield batch_indices
                batch_indices = []
            else:
                for idx in range(i * self.batch_size, self.num_data):
                    batch_indices.append(self.indices[idx])
        if not self.drop_last and len(batch_indices) > 0:
            yield batch_indices


# 根据sampler生成的索引,从dataset中取数据,并组合成一个batch
class DataLoader:
    def __init__(self, dataset, sampler=BatchSampler, shuffle=False, batch_size=1, drop_last=False):
        self.dataset = dataset
        self.batch_sampler = sampler
        self.sampler = self.batch_sampler(dataset, shuffle, batch_size, drop_last)
        self.shuffle = shuffle
        self.drop_last = drop_last
        self.batch_size = batch_size

    def __len__(self):
        return len(self.sampler)

    def __call__(self):
        self.__iter__()

    def __iter__(self):
        for sample_indices in self.sampler:
            data_list = []
            label_list = []
            for indice in sample_indices:
                data, label = self.dataset[indice]
                data_list.append(data)
                label_list.append(label)
            yield np.stack(data_list, axis=0), np.stack(label_list, axis=0)
        self.sampler = self.batch_sampler(self.dataset, self.shuffle, self.batch_size, self.drop_last)

四、cifar10实战

本小节的目标是使用上面完成的类,搭建一个简单的模型,并完成cifar10的分类任务,由于再cpu运行,速度较慢,仅挑选10000条数据,其中8000用作训练,2000用作验证。

# 一个简单的辅助类,计算累计均值

class AverageMeter:
    def __init__(self):
        self.val = 0.
        self.count = 0

    def update(self, value, n=1):
        self.val += value
        self.count += n

    def __call__(self):
        return self.val / self.count

    def reset(self):
        self.val = 0.
        self.count = 0

    def __str__(self):
        return str(self.__call__())

1、提取数据

%matplotlib inline
import pickle
import matplotlib.pyplot as plt

# 读取cifar数据
def read_cifar(data_path):
    with open(data_path, 'rb') as f:
        data_dict = pickle.load(f, encoding='latin1')
        X = data_dict['data']
        Y = data_dict['labels']
        X = X.reshape(10000, 3, 32, 32).transpose(0, 2, 3, 1).astype("float")
        Y = np.array(Y).reshape((10000, 1))
    return X, Y


# 仅训练10000张图片
data_path = "/home/aistudio/data/data120154/data_batch_1"
X, Y = read_cifar(data_path)

show_number = 5
# 显示示例图片,查看数据
for i in range(show_number):
    img = X[i]
    plt.subplot(1, show_number, i + 1)
    plt.imshow(img.astype('uint8'))

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-N5bTjzbV-1639043698595)(output_16_0.png)]

2、查看数据分布

# 查看数据的标签分布
plt.hist(Y[:, 0].astype('int'), edgecolor='black')
(array([1005.,  974., 1032., 1016.,  999.,  937., 1030., 1001., 1025.,
         981.]),
 array([0. , 0.9, 1.8, 2.7, 3.6, 4.5, 5.4, 6.3, 7.2, 8.1, 9. ]),
 <a list of 10 Patch objects>)

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-aWqvyPyZ-1639043698596)(output_18_1.png)]

3、搭建模型,设置超参数

# 构建dataset,并且进行了简单的预处理,图像值缩放到[0, 1]

class CifarDataset(Dataset):
    def __init__(self, X, Y):
        self.X = X
        self.Y = Y

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        return self.X[idx] / 255.0, self.Y[idx] # 图像值缩放到[0, 1]

# 设置超参数
train_number = 8000  # 训练集大小(共10000)
epoches = 100   # epoch
batch_size = 4  # batch_size, 注意后面paddle的训练参数不同
learning_rate = 0.01  # 学习率, 注意后面paddle的训练参数不同
num_classes = 10


4、训练

# 划分数据集,训练集和验证集
train_X, train_Y = X[:train_number], Y[:train_number]
val_X, val_Y = X[train_number:], Y[train_number:]

# 构建dataloader
train_dataset = CifarDataset(train_X, train_Y)
train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, drop_last=True)

val_dataset = CifarDataset(val_X, val_Y)
val_dataloader = DataLoader(val_dataset, batch_size=1, shuffle=False, drop_last=False)

# 自己实现的模型组网
model = Sequential(
    Linear(3 * 32 * 32, 64, name='linear1'),
    ReLU(name='relu1'),
    Linear(64, 128, name='linear2'),
    ReLU(name='relu1'),
    Linear(128, 64, name='linear3'),
    ReLU(name='relu1'),
    Linear(64, num_classes, name='linear4'),
)
opt = SGD(parameters=model.parameters(), learning_rate=learning_rate, weight_decay=0.0, decay_type='l2')
loss_fn = SoftmaxWithLogits()

# 一个简单的验证函数,计算模型预测的准确率
def eval(model, val_dataloader):
    predict_labels = []
    labels = []
    for x, y in val_dataloader:
        x = x.reshape((1, -1))
        logits = model(x)
        pred = np.argmax(logits, axis=1)
        predict_labels.append(pred)
        labels.append(y.squeeze(1))
    pred = np.array(predict_labels)
    labels = np.array(labels)
    acc = np.sum(pred == labels) / len(labels)
    print("val dataset accuracy:", acc)
    return acc

# 开始训练
lddl_acc = []   # lddl的意思不用纠结,这是开源地址的名字: https://github.com/justld/LDDL
loss_avg = AverageMeter()
for epoch in range(1, epoches + 1):
    acc = eval(model, val_dataloader=val_dataloader)  # 先计算一下在验证集上的模型准确率
    lddl_acc.append(acc)  # 保存准确理,用于后续和paddle对比
    for idx, (x, y) in enumerate(train_dataloader):
        x = x.reshape((batch_size, -1))  # 因为用的全连接层实现分类,这里需要先reshape,修改数据维度为[batch_size, channels * H * W]
        logits = model(x)
        loss = loss_fn(logits, y)
        loss_avg.update(loss)

        grad = loss_fn.backward()
        model.backward(grad)

        opt.step()
        opt.clear_grad()
    print("epoch: {}. loss: {}".format(epoch, loss_avg))

5、Paddle实现同样的网络

# paddle的代码此处不过多介绍,可以在aistudio找到很多,亦非本项目重点内容
import paddle
from paddle import nn
from paddle.io import Dataset, DataLoader
from paddle.optimizer import Momentum


# paddle的参数和上面的不一样,否则paddle的模型效果比较差
batch_size = 64
learning_rate = 0.01

class ToyDataset(Dataset):
    def __init__(self, X, Y):
        self.X = X
        self.Y = Y
    
    def __len__(self):
        return self.X.shape[0]

    def __getitem__(self, idx):
        return self.X[idx].astype('float32') / 255.0, self.Y[idx]


model = nn.Sequential(
    nn.Linear(3*32*32, 64),
    nn.ReLU(),
    nn.Linear(64, 128),
    nn.ReLU(),
    nn.Linear(128, 64),
    nn.ReLU(),
    nn.Linear(64, num_classes),
)
optimizer = Momentum(learning_rate=learning_rate, momentum=0.9, parameters=model.parameters())
loss_fn = nn.CrossEntropyLoss()
train_dataset = ToyDataset(train_X, train_Y)
train_dataloader = DataLoader(train_dataset, shuffle=True, batch_size=batch_size)
val_dataset = ToyDataset(val_X, val_Y)
val_dataloader = DataLoader(val_dataset, batch_size=1)


def eval(model, val_dataloader):
    predict_labels = []
    labels = []
    for x, y in val_dataloader:
        x = x.reshape((1, -1))
        logits = model(x)
        pred = np.argmax(logits, axis=1)
        predict_labels.append(pred)
        labels.append(y.squeeze(1))
    pred = np.array(predict_labels)
    labels = np.stack(labels, axis=0)
    acc = np.sum(pred == labels) / len(predict_labels)
    print("val dataset accuracy:", acc)
    return acc

paddle_acc = []
loss_avg = AverageMeter()
for epoch in range(1, epoches + 1):
    acc = eval(model, val_dataloader)
    paddle_acc.append(acc)
    for idx, (x, y) in enumerate(train_dataloader):
        x = x.reshape([x.shape[0], -1])
        pred = model(x)
        loss = loss_fn(pred, y)
        loss_avg.update(loss.numpy()[0])

        loss.backward()
        optimizer.step()
        optimizer.clear_grad()

    print("epoch: {}. loss: {}".format(epoch, loss_avg))


6、lddl和paddle 准确率对比

# 绘制自己实现的框架模型的准确率和paddle对比,这里看出我们自己实现的还是逊色于Paddle
plt.plot(np.arange(epoches), paddle_acc, label='paddle')
plt.plot(np.arange(epoches), lddl_acc, label='lddl')
plt.legend()

<matplotlib.legend.Legend at 0x7f56207e87d0>

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-0U7iu5n7-1639043698597)(output_26_1.png)]

五、参考

本项目github地址:https://github.com/justld/LDDL

参考:
1、PaddlePaddle

六、其他

由于水平有限,难免会出现错误,希望大家批评指正,如果有什么建议,请在下方评论,看到就会回复。

请点击此处查看本环境基本用法.

Please click here for more detailed instructions.

Logo

学大模型,用大模型上飞桨星河社区!每天8点V100G算力免费领!免费领取ERNIE 4.0 100w Token >>>

更多推荐