【深度学习实战】二、Numpy手撸神经网络实现cifar10分类
深度学习框架复杂难懂,初学者很难了解底层原理。本项目使用python及numpy实现了一个简单的神经网络,实现cifar10分类,带你探索深度学习底层实现。
【深度学习实战】二、Numpy手撸神经网络实现cifar10分类
一、简介
在学习深度学习时,在理论学习完成后,我们常常会直接使用框架(paddle/torch/tensorflow)来搭建我们的模型,常常忽略了各种层结构的底层实现。学习完成深度学习理论的你,能不能手撸一个简单的模型呢?本文旨在从基础开始,一步一步实现深度学习的参数优化,模型搭建过程,巩固基础知识,从理论到实践,一步一步探索深度学习的奥秘。
本文不会过多介绍深度学习的理论,直接从代码层面来实现全连接层、激活函数和优化器,搭建一个简单的全连接模型,从cifar10挑选10000条数据,并对比自己实现的模型和paddle的同样结构的预测准确率,验证模型的效果。
二、目标
手撸神经网络交叉熵损失函数,实现cifar10图像分类任务。
主要内容包括:
1、实现全连接层;
2、实现ReLU激活函数;
3、实现交叉熵损失函数(SoftmaxWithLogits);
4、实现动量梯度下降优化器;
5、实现cifar10图片分类(仅挑选部分数据);
6、相同的网络结构,与Paddle对比cifar10的分类效果。
三、实现思路
在深度学习框架中,数据都是以tensor的形式进行计算,这里为了简单,数据的输入和输入都是以numpy.ndarray的格式传输。
本小节内容包含了相关类的实现。
1、tensor和初始化
tensor包含data和grad,保存data和对应的梯度数据。
# 因为层的参数需要保存值和对应的梯度,这里定义梯度,可训练的参数全部以Tensor的类别保存
import numpy as np
np.random.seed(10001)
class Tensor:
def __init__(self, shape):
self.data = np.zeros(shape=shape, dtype=np.float32) # 存放数据
self.grad = np.zeros(shape=shape, dtype=np.float32) # 存放梯度
def clear_grad(self):
self.grad = np.zeros_like(self.grad)
def __str__(self):
return "Tensor shape: {}, data: {}".format(self.data.shape, self.data)
# Tensor的初始化类,目前仅提供Normal初始化和Constant初始化
class Initializer:
"""
基类
"""
def __init__(self, shape=None, name='initializer'):
self.shape = shape
self.name = name
def __call__(self, *args, **kwargs):
raise NotImplementedError
def __str__(self):
return self.name
class Constant(Initializer):
def __init__(self, value=0., name='constant initializer', *args, **kwargs):
super().__init__(name=name, *args, **kwargs)
self.value = value
def __call__(self, shape=None, *args, **kwargs):
if shape:
self.shape = shape
assert shape is not None, "the shape of initializer must not be None."
return self.value + np.zeros(shape=self.shape)
class Normal(Initializer):
def __init__(self, mean=0., std=0.01, name='normal initializer', *args, **kwargs):
super().__init__(name=name, *args, **kwargs)
self.mean = mean
self.std = std
def __call__(self, shape=None, *args, **kwargs):
if shape:
self.shape = shape
assert shape is not None, "the shape of initializer must not be None."
return np.random.normal(self.mean, self.std, size=self.shape)
2、Layer
这里实现了全连接层Linear和ReLU激活函数,主要包含矩阵求导等内容,可自行寻找相关资料。
1、全连接层前向传播和梯度计算
2、ReLU
# 为了使层能够组建起来,实现前向传播和反向传播,首先定义层的基类Layer
# Layer的几个主要方法说明:
# forward: 实现前向传播
# backward: 实现反向传播
# parameters: 返回该层的参数,传入优化器进行优化
class Layer:
def __init__(self, name='layer', *args, **kwargs):
self.name = name
def forward(self, *args, **kwargs):
raise NotImplementedError
def backward(self):
raise NotImplementedError
def parameters(self):
return []
def __call__(self, *args, **kwargs):
return self.forward(*args, **kwargs)
def __str__(self):
return self.name
class Linear(Layer):
"""
input X, shape: [N, C]
output Y, shape: [N, O]
weight W, shape: [C, O]
bias b, shape: [1, O]
grad dY, shape: [N, O]
forward formula:
Y = X @ W + b # @表示矩阵乘法
backward formula:
dW = X.T @ dY
db = sum(dY, axis=0)
dX = dY @ W.T
"""
def __init__(
self,
in_features,
out_features,
name='linear',
weight_attr=Normal(),
bias_attr=Constant(),
*args,
**kwargs
):
super().__init__(name=name, *args, **kwargs)
self.weights = Tensor((in_features, out_features))
self.weights.data = weight_attr(self.weights.data.shape)
self.bias = Tensor((1, out_features))
self.bias.data = bias_attr(self.bias.data.shape)
self.input = None
def forward(self, x):
self.input = x
output = np.dot(x, self.weights.data) + self.bias.data
return output
def backward(self, gradient):
self.weights.grad += np.dot(self.input.T, gradient) # dy / dw
self.bias.grad += np.sum(gradient, axis=0, keepdims=True) # dy / db
input_grad = np.dot(gradient, self.weights.data.T) # dy / dx
return input_grad
def parameters(self):
return [self.weights, self.bias]
def __str__(self):
string = "linear layer, weight shape: {}, bias shape: {}".format(self.weights.data.shape, self.bias.data.shape)
return string
class ReLU(Layer):
"""
forward formula:
relu = x if x >= 0
= 0 if x < 0
backwawrd formula:
grad = gradient * (x > 0)
"""
def __init__(self, name='relu', *args, **kwargs):
super().__init__(name=name, *args, **kwargs)
self.activated = None
def forward(self, x):
x[x < 0] = 0
self.activated = x
return self.activated
def backward(self, gradient):
return gradient * (self.activated > 0)
3、模型组网
将层串联起来,实现前向传播和反向传播。
# 模型组网的功能是将层串起来,实现数据的前向传播和梯度的反向传播
# 添加层的时候,按照顺序添加层的参数
# Sequential方法说明:
# add: 向组网中添加层
# forward: 按照组网构建的层顺序,依次前向传播
# backward: 接收损失函数的梯度,按照层的逆序反向传播
class Sequential:
def __init__(self, *args, **kwargs):
self.graphs = []
self._parameters = []
for arg_layer in args:
if isinstance(arg_layer, Layer):
self.graphs.append(arg_layer)
self._parameters += arg_layer.parameters()
def add(self, layer):
assert isinstance(layer, Layer), "The type of added layer must be Layer, but got {}.".format(type(layer))
self.graphs.append(layer)
self._parameters += layer.parameters()
def forward(self, x):
for graph in self.graphs:
x = graph(x)
return x
def backward(self, grad):
# grad backward in inverse order of graph
for graph in self.graphs[::-1]:
grad = graph.backward(grad)
def __call__(self, *args, **kwargs):
return self.forward(*args, **kwargs)
def __str__(self):
string = 'Sequential:\n'
for graph in self.graphs:
string += graph.__str__() + '\n'
return string
def parameters(self):
return self._parameters
4、优化器
实现了SGD优化器(带动量)
1、动量梯度下降
# 优化器主要完成根据梯度来优化参数的任务,其主要参数有学习率和正则化类型和正则化系数
# Optimizer主要方法:
# step: 梯度反向传播后调用,该方法根据计算出的梯度,对参数进行优化
# clear_grad: 模型调用backward后,梯度会进行累加,如果已经调用step优化过参数,需要将使用过的梯度清空
# get_decay: 根据不同的正则化方法,计算出正则化惩罚值
class Optimizer:
"""
optimizer base class.
Args:
parameters (Tensor): parameters to be optimized.
learning_rate (float): learning rate. Default: 0.001.
weight_decay (float): The decay weight of parameters. Defaylt: 0.0.
decay_type (str): The type of regularizer. Default: l2.
"""
def __init__(self, parameters, learning_rate=0.001, weight_decay=0.0, decay_type='l2'):
assert decay_type in ['l1', 'l2'], "only support decay_type 'l1' and 'l2', but got {}.".format(decay_type)
self.parameters = parameters
self.learning_rate = learning_rate
self.weight_decay = weight_decay
self.decay_type = decay_type
def step(self):
raise NotImplementedError
def clear_grad(self):
for p in self.parameters:
p.clear_grad()
def get_decay(self, g):
if self.decay_type == 'l1':
return self.weight_decay
elif self.decay_type == 'l2':
return self.weight_decay * g
class SGD(Optimizer):
def __init__(self, momentum=0.9, *args, **kwargs):
super().__init__(*args, **kwargs)
self.momentum = momentum
self.velocity = []
for p in self.parameters:
self.velocity.append(np.zeros_like(p.grad))
def step(self):
for p, v in zip(self.parameters, self.velocity):
decay = self.get_decay(p.grad)
v = self.momentum * v + p.grad + decay # 动量计算
p.data = p.data - self.learning_rate * v
5、损失函数
实现了交叉熵损失函数。【如果这里代码错了,请务必联系我】
1、softmax梯度计算
class SoftmaxWithLogits(Layer):
"""
Softmax with logits error:
loss[j] = -input[class] + log(sum(exp(input)))
"""
def __init__(self, reduction='mean', name='softamxwithlogits', *args, **kwargs):
super().__init__(name=name, *args, **kwargs)
assert reduction in ['mean', 'none', 'sum'], "reduction only support 'mean', 'none' and 'sum', but got {}.".format(reduction)
self.reduction = reduction
self.logits = None
self.target = None
def forward(self, logits, target):
"""
:param y (np.ndarray): predicted logits, shape [N, C]
:param target (np.ndarray): target logits, shape [N, 1]
:return: loss
"""
assert logits.shape[0] == target.shape[0], "The first fimension of logits and target is not same, logits shape {} cann't match target shape {}.".format(logits.shape, target.shape)
self.logits = logits
self.target = target
loss = []
for i in range(logits.shape[0]):
loss_i = -logits[i, target.squeeze(-1)[i]] + np.log(np.sum(np.exp(logits[i])))
loss.append(loss_i)
loss = np.array(loss).reshape(target.shape)
if self.reduction == 'mean':
return loss.mean()
elif self.reduction == 'sum':
return loss.sum()
else:
return loss
def backward(self):
soft_denominator = np.sum(np.exp(self.logits), axis=1, keepdims=True) # [N, 1]
eq_grad = np.zeros_like(self.logits)
for i in range(self.logits.shape[0]):
eq_grad[i, self.target.squeeze(-1)[i]] = -1
gradient = np.exp(self.logits) / soft_denominator + eq_grad
return gradient
# loss_fn = SoftmaxWithLogits()
# logits = np.array([[1., 2., 3.]])
# target = np.array([[1]])
# print(-2 + np.log(np.exp(1)+np.exp(2)+np.exp(3)))
# print(logits.shape, target.shape)
# print(loss_fn(logits, target))
# print(loss_fn.backward(), np.exp(2) / (np.exp(1)+np.exp(2)+np.exp(3)))
6、dataset
# 这里仿照PaddlePaddle,Dataset需要实现__getitem__和__len__方法
class Dataset:
def __init__(self, *args, **kwargs):
pass
def __getitem__(self, idx):
raise NotImplementedError("'{}' not implement in class {}"
.format('__getitem__', self.__class__.__name__))
def __len__(self):
raise NotImplementedError("'{}' not implement in class {}"
.format('__len__', self.__class__.__name__))
# 根据dataset和一些设置,生成每个batch在dataset中的索引
class BatchSampler:
def __init__(self, dataset=None, shuffle=False, batch_size=1, drop_last=False):
self.batch_size = batch_size
self.drop_last = drop_last
self.shuffle = shuffle
self.num_data = len(dataset)
if self.drop_last or (self.num_data % batch_size == 0):
self.num_samples = self.num_data // batch_size
else:
self.num_samples = self.num_data // batch_size + 1
indices = np.arange(self.num_data)
if shuffle:
np.random.shuffle(indices)
if drop_last:
indices = indices[:self.num_samples * batch_size]
self.indices = indices
def __len__(self):
return self.num_samples
def __iter__(self):
batch_indices = []
for i in range(self.num_samples):
if (i + 1) * self.batch_size <= self.num_data:
for idx in range(i * self.batch_size, (i + 1) * self.batch_size):
batch_indices.append(self.indices[idx])
yield batch_indices
batch_indices = []
else:
for idx in range(i * self.batch_size, self.num_data):
batch_indices.append(self.indices[idx])
if not self.drop_last and len(batch_indices) > 0:
yield batch_indices
# 根据sampler生成的索引,从dataset中取数据,并组合成一个batch
class DataLoader:
def __init__(self, dataset, sampler=BatchSampler, shuffle=False, batch_size=1, drop_last=False):
self.dataset = dataset
self.batch_sampler = sampler
self.sampler = self.batch_sampler(dataset, shuffle, batch_size, drop_last)
self.shuffle = shuffle
self.drop_last = drop_last
self.batch_size = batch_size
def __len__(self):
return len(self.sampler)
def __call__(self):
self.__iter__()
def __iter__(self):
for sample_indices in self.sampler:
data_list = []
label_list = []
for indice in sample_indices:
data, label = self.dataset[indice]
data_list.append(data)
label_list.append(label)
yield np.stack(data_list, axis=0), np.stack(label_list, axis=0)
self.sampler = self.batch_sampler(self.dataset, self.shuffle, self.batch_size, self.drop_last)
四、cifar10实战
本小节的目标是使用上面完成的类,搭建一个简单的模型,并完成cifar10的分类任务,由于再cpu运行,速度较慢,仅挑选10000条数据,其中8000用作训练,2000用作验证。
# 一个简单的辅助类,计算累计均值
class AverageMeter:
def __init__(self):
self.val = 0.
self.count = 0
def update(self, value, n=1):
self.val += value
self.count += n
def __call__(self):
return self.val / self.count
def reset(self):
self.val = 0.
self.count = 0
def __str__(self):
return str(self.__call__())
1、提取数据
%matplotlib inline
import pickle
import matplotlib.pyplot as plt
# 读取cifar数据
def read_cifar(data_path):
with open(data_path, 'rb') as f:
data_dict = pickle.load(f, encoding='latin1')
X = data_dict['data']
Y = data_dict['labels']
X = X.reshape(10000, 3, 32, 32).transpose(0, 2, 3, 1).astype("float")
Y = np.array(Y).reshape((10000, 1))
return X, Y
# 仅训练10000张图片
data_path = "/home/aistudio/data/data120154/data_batch_1"
X, Y = read_cifar(data_path)
show_number = 5
# 显示示例图片,查看数据
for i in range(show_number):
img = X[i]
plt.subplot(1, show_number, i + 1)
plt.imshow(img.astype('uint8'))
2、查看数据分布
# 查看数据的标签分布
plt.hist(Y[:, 0].astype('int'), edgecolor='black')
(array([1005., 974., 1032., 1016., 999., 937., 1030., 1001., 1025.,
981.]),
array([0. , 0.9, 1.8, 2.7, 3.6, 4.5, 5.4, 6.3, 7.2, 8.1, 9. ]),
<a list of 10 Patch objects>)
3、搭建模型,设置超参数
# 构建dataset,并且进行了简单的预处理,图像值缩放到[0, 1]
class CifarDataset(Dataset):
def __init__(self, X, Y):
self.X = X
self.Y = Y
def __len__(self):
return len(self.X)
def __getitem__(self, idx):
return self.X[idx] / 255.0, self.Y[idx] # 图像值缩放到[0, 1]
# 设置超参数
train_number = 8000 # 训练集大小(共10000)
epoches = 100 # epoch
batch_size = 4 # batch_size, 注意后面paddle的训练参数不同
learning_rate = 0.01 # 学习率, 注意后面paddle的训练参数不同
num_classes = 10
4、训练
# 划分数据集,训练集和验证集
train_X, train_Y = X[:train_number], Y[:train_number]
val_X, val_Y = X[train_number:], Y[train_number:]
# 构建dataloader
train_dataset = CifarDataset(train_X, train_Y)
train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, drop_last=True)
val_dataset = CifarDataset(val_X, val_Y)
val_dataloader = DataLoader(val_dataset, batch_size=1, shuffle=False, drop_last=False)
# 自己实现的模型组网
model = Sequential(
Linear(3 * 32 * 32, 64, name='linear1'),
ReLU(name='relu1'),
Linear(64, 128, name='linear2'),
ReLU(name='relu1'),
Linear(128, 64, name='linear3'),
ReLU(name='relu1'),
Linear(64, num_classes, name='linear4'),
)
opt = SGD(parameters=model.parameters(), learning_rate=learning_rate, weight_decay=0.0, decay_type='l2')
loss_fn = SoftmaxWithLogits()
# 一个简单的验证函数,计算模型预测的准确率
def eval(model, val_dataloader):
predict_labels = []
labels = []
for x, y in val_dataloader:
x = x.reshape((1, -1))
logits = model(x)
pred = np.argmax(logits, axis=1)
predict_labels.append(pred)
labels.append(y.squeeze(1))
pred = np.array(predict_labels)
labels = np.array(labels)
acc = np.sum(pred == labels) / len(labels)
print("val dataset accuracy:", acc)
return acc
# 开始训练
lddl_acc = [] # lddl的意思不用纠结,这是开源地址的名字: https://github.com/justld/LDDL
loss_avg = AverageMeter()
for epoch in range(1, epoches + 1):
acc = eval(model, val_dataloader=val_dataloader) # 先计算一下在验证集上的模型准确率
lddl_acc.append(acc) # 保存准确理,用于后续和paddle对比
for idx, (x, y) in enumerate(train_dataloader):
x = x.reshape((batch_size, -1)) # 因为用的全连接层实现分类,这里需要先reshape,修改数据维度为[batch_size, channels * H * W]
logits = model(x)
loss = loss_fn(logits, y)
loss_avg.update(loss)
grad = loss_fn.backward()
model.backward(grad)
opt.step()
opt.clear_grad()
print("epoch: {}. loss: {}".format(epoch, loss_avg))
5、Paddle实现同样的网络
# paddle的代码此处不过多介绍,可以在aistudio找到很多,亦非本项目重点内容
import paddle
from paddle import nn
from paddle.io import Dataset, DataLoader
from paddle.optimizer import Momentum
# paddle的参数和上面的不一样,否则paddle的模型效果比较差
batch_size = 64
learning_rate = 0.01
class ToyDataset(Dataset):
def __init__(self, X, Y):
self.X = X
self.Y = Y
def __len__(self):
return self.X.shape[0]
def __getitem__(self, idx):
return self.X[idx].astype('float32') / 255.0, self.Y[idx]
model = nn.Sequential(
nn.Linear(3*32*32, 64),
nn.ReLU(),
nn.Linear(64, 128),
nn.ReLU(),
nn.Linear(128, 64),
nn.ReLU(),
nn.Linear(64, num_classes),
)
optimizer = Momentum(learning_rate=learning_rate, momentum=0.9, parameters=model.parameters())
loss_fn = nn.CrossEntropyLoss()
train_dataset = ToyDataset(train_X, train_Y)
train_dataloader = DataLoader(train_dataset, shuffle=True, batch_size=batch_size)
val_dataset = ToyDataset(val_X, val_Y)
val_dataloader = DataLoader(val_dataset, batch_size=1)
def eval(model, val_dataloader):
predict_labels = []
labels = []
for x, y in val_dataloader:
x = x.reshape((1, -1))
logits = model(x)
pred = np.argmax(logits, axis=1)
predict_labels.append(pred)
labels.append(y.squeeze(1))
pred = np.array(predict_labels)
labels = np.stack(labels, axis=0)
acc = np.sum(pred == labels) / len(predict_labels)
print("val dataset accuracy:", acc)
return acc
paddle_acc = []
loss_avg = AverageMeter()
for epoch in range(1, epoches + 1):
acc = eval(model, val_dataloader)
paddle_acc.append(acc)
for idx, (x, y) in enumerate(train_dataloader):
x = x.reshape([x.shape[0], -1])
pred = model(x)
loss = loss_fn(pred, y)
loss_avg.update(loss.numpy()[0])
loss.backward()
optimizer.step()
optimizer.clear_grad()
print("epoch: {}. loss: {}".format(epoch, loss_avg))
6、lddl和paddle 准确率对比
# 绘制自己实现的框架模型的准确率和paddle对比,这里看出我们自己实现的还是逊色于Paddle
plt.plot(np.arange(epoches), paddle_acc, label='paddle')
plt.plot(np.arange(epoches), lddl_acc, label='lddl')
plt.legend()
<matplotlib.legend.Legend at 0x7f56207e87d0>
五、参考
本项目github地址:https://github.com/justld/LDDL
参考:
1、PaddlePaddle
六、其他
由于水平有限,难免会出现错误,希望大家批评指正,如果有什么建议,请在下方评论,看到就会回复。
请点击此处查看本环境基本用法.
Please click here for more detailed instructions.
更多推荐
所有评论(0)