【Paddle版本】阐述深度学习对抗训练方法合集
对抗训练是竞赛与项目必不可少的工具箱,本项目集合FGSM、FGM、PGD等常用对抗训练方法,可直接用于各项飞桨深度学习竞赛与项目中提升效果指标!
1、项目背景
**经常在使用paddle的时候发现常用的对抗训练基础工具有待开源补充,本项目基于常见的对抗训练论文进行代码复现,应该是目前飞桨社区最全的对抗训练合集,如果觉得本项目对您有用,请动动小手Fork、Star鼓励一下!
2、项目介绍
对抗训练是一种引入噪声的训练方式,可以对参数进行正则化,提升模型鲁棒性和泛化能力。
有监督数据下使用交叉熵作为损失:
半监督数据下使用KL散度作为损失:
扰动如何得来呢?这需要对抗的思想,即往增大损失的方向增加扰动
有监督数据下:
半监督数据下:
θ上面一个尖儿代表的是常数,r代表的当前输入计算出来的梯度,r_adv是对抗训练计算出来的梯度。目的是说在计算对抗扰动时虽然计算了梯度,但不对参数进行更新,因为当前得到的对抗扰动是对旧参数最优的。
用一句话形容对抗训练的思路,就是在输入上进行梯度上升(增大loss),在参数上进行梯度下降(减小loss)。由于输入会进行embedding lookup,所以实际的做法是在embedding table上进行梯度上升。
接下来介绍不同的方法,后续方法优化的主要方向有两点:得到更优的扰动 & 提升训练速度。
3、对抗论文
FGSM (Fast Gradient Sign Method):ICLR2015
FGM (Fast Gradient Method):ICLR2017
PGD (Projected Gradient Descent):ICLR2018
FreeAT (Free Adversarial Training):NIPS2019
YOPO (You Only Propagate Once):NIPS2019
AWP(Adversarial Weight Perturbation Helps Robust Generalization):NIPS 2020
导入环境
import paddle
from paddle import nn
FGSM (Fast Gradient Sign Method):ICLR2015
FGSM是Goodfellow提出对抗训练时的方法,假设对于输入的梯度为:
那扰动肯定是沿着梯度的方向往损失函数的极大值走:
用老太太打架形容:跟着你的方向前进,步步逼人,你来我往
class FGSM:
def __init__(self, model: nn.Module, eps=0.1):
self.model = (model.module if hasattr(model, "module") else model)
self.eps = eps
self.backup = {}
# only attack word embedding
def attack(self, emb_name='embedding'):
for name, param in self.model.named_parameters():
if param.requires_grad and emb_name in name:
self.backup[name] = param.data.clone()
r_at = self.eps * param.grad.sign()
param.data.add_(r_at)
def restore(self, emb_name='embedding'):
for name, para in self.model.named_parameters():
if para.requires_grad and emb_name in name:
assert name in self.backup
para.data = self.backup[name]
self.backup = {}
# 对应第一步
optimizer = paddle.optimizer.AdamW(learning_rate=5e-5, parameters=model.parameters())
fgsm = FGSM(model=model)
for batch in dataloader:
# 对应第二步
loss = model(batch)
loss.backward()
# 对应第三步
fgsm.asttack()
# 对应第四步
loss_adv = model(batch)
loss_adv.backward()
# 对应第五步
fgsm.restore()
# 对应第六步
optimizer.step()
optimizer.clear_grad()
FGM (Fast Gradient Method):ICLR2017
Goodfellow后续提出的FGM则是根据具体的梯度进行scale,得到更好的对抗样本:
用老太太打架形容:在限定规矩范围内,见招学招,你来我往
class FGM:
def __init__(self, model: nn.Module, eps=1.):
self.model = (model.module if hasattr(model, "module") else model)
self.eps = eps
self.backup = {}
# only attack embedding
def attack(self, emb_name='embedding'):
for name, param in self.model.named_parameters():
if param.requires_grad and emb_name in name:
self.backup[name] = param.data.clone()
norm = paddle.norm(param.grad)
if norm and not paddle.isnan(norm):
r_at = self.eps * param.grad / norm
param.data.add_(r_at)
def restore(self, emb_name='embedding'):
for name, para in self.model.named_parameters():
if para.requires_grad and emb_name in name:
assert name in self.backup
para.data = self.backup[name]
self.backup = {}
# 对应第一步
optimizer = paddle.optimizer.AdamW(learning_rate=5e-5, parameters=model.parameters())
fgm = FGM(model=model)
for batch in dataloader:
# 对应第二步
loss = model(batch)
loss.backward()
# 对应第三步
fgm.attack()
# 对应第四步
loss_adv = model(batch)
loss_adv.backward()
# 对应第五步
fgm.restore()
# 对应第六步
optimizer.step()
optimizer.clear_grad()
PGD (Projected Gradient Descent):ICLR2018
FGM简单粗暴的“一步到位”,可能走不到约束内的最优点。PGD则是“小步走,多走几步”,如果走出了扰动半径为epsilon的空间,就映射回“球面”上,以保证扰动不要过大
用老太太打架形容:让你没有机会出手,一来多回,小步多走
class PGD:
def __init__(self, model, eps=1., alpha=0.3):
self.model = (model.module if hasattr(model, "module") else model)
self.eps = eps
self.alpha = alpha
self.emb_backup = {}
self.grad_backup = {}
def attack(self, emb_name='embedding', is_first_attack=False):
for name, param in self.model.named_parameters():
if param.requires_grad and emb_name in name:
if is_first_attack:
self.emb_backup[name] = param.data.clone()
norm = paddle.norm(param.grad)
if norm != 0 and not paddle.isnan(norm):
r_at = self.alpha * param.grad / norm
param.data.add_(r_at)
param.data = self.project(name, param.data)
def restore(self, emb_name='embedding'):
for name, param in self.model.named_parameters():
if param.requires_grad and emb_name in name:
assert name in self.emb_backup
param.data = self.emb_backup[name]
self.emb_backup = {}
def project(self, param_name, param_data):
r = param_data - self.emb_backup[param_name]
if paddle.norm(r) > self.eps:
r = self.eps * r / paddle.norm(r)
return self.emb_backup[param_name] + r
def backup_grad(self):
for name, param in self.model.named_parameters():
if param.requires_grad and param.grad is not None:
self.grad_backup[name] = param.grad.clone()
def restore_grad(self):
for name, param in self.model.named_parameters():
if param.requires_grad and param.grad is not None:
param.grad = self.grad_backup[name]
# 对应第一步
optimizer = paddle.optimizer.AdamW(learning_rate=5e-5, parameters=model.parameters())
pgd = PGD(model=model)
for batch in dataloader:
# 对应第二步
loss = model(batch)
loss.backward()
# 对应第三步
pgd.backup_grad()
# 对应第四步
for k in range(pgd_k):
pgd.attack(is_first_attack=(k == 0))
if k != pgd_k - 1:
model.zero_grad()
else:
pgd.restore_grad()
loss_adv = model(batch)
loss_adv.backward()
# 对应第五步
pgd.restore()
# 对应第六步
optimizer.step()
optimizer.clear_grad()
FreeAT (Free Adversarial Training):NIPS2019
FreeAT的思想是在对每个样本x连续重复m次训练,计算r时复用上一步的梯度,为了保证速度,整体epoch会除以m。r的更新公式为:
class FreeAT:
def __init__(self, model, eps=0.1):
self.model = (model.module if hasattr(model, "module") else model)
self.eps = eps
self.emb_backup = {}
self.grad_backup = {}
self.last_r_at = 0
def attack(self, emb_name='embedding', is_first_attack=False):
for name, param in self.model.named_parameters():
if param.requires_grad and emb_name in name:
if is_first_attack:
self.emb_backup[name] = param.data.clone()
param.data.add_(self.last_r_at)
param.data = self.project(name, param.data)
self.last_r_at = self.last_r_at + self.eps * param.grad.sign()
def restore(self, emb_name='embedding'):
for name, param in self.model.named_parameters():
if param.requires_grad and emb_name in name:
assert name in self.emb_backup
param.data = self.emb_backup[name]
self.emb_backup = {}
def project(self, param_name, param_data):
r = param_data - self.emb_backup[param_name]
if paddle.norm(r) > self.eps:
r = self.eps * r / paddle.norm(r)
return self.emb_backup[param_name] + r
def backup_grad(self):
for name, param in self.model.named_parameters():
if param.requires_grad and param.grad is not None:
self.grad_backup[name] = param.grad.clone()
def restore_grad(self):
for name, param in self.model.named_parameters():
if param.requires_grad and param.grad is not None:
param.grad = self.grad_backup[name]
YOPO (You Only Propagate Once):NIPS2019
极大值原理PMP(Pontryagin’s maximum principle)是optimizer的一种,它将神经网络看作动力学系统。这个方法的优点是在优化网络参数时,层之间是解藕的。通过这个思想,我们可以想到,既然扰动是加在embedding层的,为什么每次还要计算完整的前后向传播呢?基于这个想法,作者想复用后几层的梯度。
假设p为定值:
则对r的更新就可以变为
class YOPO:
def __init__(self, eps= 6/255.0, sigma=3/255.0, nb_iter=20,
norm=np.inf, DEVICE=paddle.device('cpu'),
mean=paddle.tensor(np.array([0]).astype(np.float32)[np.newaxis, :, np.newaxis, np.newaxis]),
std=paddle.tensor(np.array([1.0]).astype(np.float32)[np.newaxis, :, np.newaxis, np.newaxis]), random_start=True):
'''
:param eps: maximum distortion of adversarial examples
:param sigma: single step size
:param nb_iter: number of attack iterations
:param norm: which norm to bound the perturbations
'''
self.eps = eps
self.sigma = sigma
self.nb_iter = nb_iter
self.norm = norm
self.criterion = nn.CrossEntropyLoss().to(DEVICE)
self.DEVICE = DEVICE
self._mean = mean.to(DEVICE)
self._std = std.to(DEVICE)
self.random_start = random_start
def single_attack(self, net, inp, label, eta, target = None):
'''
Given the original image and the perturbation computed so far, computes
a new perturbation.
:param net:
:param inp: original image
:param label:
:param eta: perturbation computed so far
:return: a new perturbation
'''
adv_inp = inp + eta
#net.zero_grad()
pred = net(adv_inp)
if target is not None:
targets = paddle.sum(pred[:, target])
grad_sign = paddle.autograd.grad(targets, adv_in, only_inputs=True, retain_graph = False)[0].sign()
else:
loss = self.criterion(pred, label)
grad_sign = paddle.autograd.grad(loss, adv_inp,
only_inputs=True, retain_graph = False)[0].sign()
adv_inp = adv_inp + grad_sign * (self.sigma / self._std)
tmp_adv_inp = adv_inp * self._std + self._mean
tmp_inp = inp * self._std + self._mean
tmp_adv_inp = paddle.clamp(tmp_adv_inp, 0, 1) ## clip into 0-1
#tmp_adv_inp = (tmp_adv_inp - self._mean) / self._std
tmp_eta = tmp_adv_inp - tmp_inp
tmp_eta = clip_eta(tmp_eta, norm=self.norm, eps=self.eps, DEVICE=self.DEVICE)
eta = tmp_eta/ self._std
return eta
def attack(self, net, inp, label, target = None):
if self.random_start:
eta = paddle.FloatTensor(*inp.shape).uniform_(-self.eps, self.eps)
else:
eta = paddle.zeros_like(inp)
eta = eta.to(self.DEVICE)
eta = (eta - self._mean) / self._std
net.eval()
inp.requires_grad = True
eta.requires_grad = True
for i in range(self.nb_iter):
eta = self.single_attack(net, inp, label, eta, target)
#print(i)
#print(eta.max())
adv_inp = inp + eta
tmp_adv_inp = adv_inp * self._std + self._mean
tmp_adv_inp = paddle.clamp(tmp_adv_inp, 0, 1)
adv_inp = (tmp_adv_inp - self._mean) / self._std
return adv_inp
def to(self, device):
self.DEVICE = device
self._mean = self._mean.to(device)
self._std = self._std.to(device)
self.criterion = self.criterion.to(device)
AWP(Adversarial Weight Perturbation Helps Robust Generalization):NIPS 2020
class AWP:
def __init__(
self,
model,
optimizer,
adv_param=['weight'],
adv_lr=1,
adv_eps=0.001,
adv_step=1,
scaler=None
):
self.model = model
self.optimizer = optimizer
self.adv_param = adv_param
self.adv_lr = adv_lr
self.adv_eps = adv_eps
self.adv_step = adv_step
self.backup = {}
self.backup_eps = {}
self.scaler = scaler
def attack_backward(self, batch):
if (self.adv_lr == 0):
return None
self._save()
for i in range(self.adv_step):
self._attack_step()
loss = self.model(batch)
self.optimizer.zero_grad()
self.scaler.scale(loss).backward()
self._restore()
def _attack_step(self):
e = 1e-6
for name, param in self.model.named_parameters():
if (param.requires_grad) and (param.grad is not None) and (self.adv_param[0] in name):
norm1 = paddle.norm(param.grad)
norm2 = paddle.norm(param.data.detach())
if norm1 != 0 and not paddle.isnan(norm1):
r_at = self.adv_lr * param.grad / (norm1 + e) * (norm2 + e)
param.data.add_(r_at)
param.data = paddle.min(
paddle.max(param.data, self.backup_eps[name][0]), self.backup_eps[name][1]
)
# param.data.clamp_(*self.backup_eps[name])
def _save(self):
for name, param in self.model.named_parameters():
if (param.requires_grad) and (param.grad is not None) and (self.adv_param[0] in name):
if name not in self.backup:
self.backup[name] = param.data.clone()
grad_eps = self.adv_eps * param.abs().detach()
self.backup_eps[name] = (
self.backup[name] - grad_eps,
self.backup[name] + grad_eps,
)
def _restore(self,):
for name, param in self.model.named_parameters():
if name in self.backup:
param.data = self.backup[name]
self.backup = {}
self.backup_eps = {}
4、项目总结
1、在深度学习项目、竞赛中对抗训练是必不可少的增强工具,现在完善了飞桨此处的分支,可以更加愉快的使用了。
2、fgsm、fgm、pgd是最常使用的增强工具,所以还附加了使用逻辑,其余的增强工具只是完成了核心代码的复现。
5、引用资料
1、Explaining and Harnessing Adversarial Examples
2、Adversarial Training for Free!
3、Adversarial Training Methods for Semi-Supervised Text Classification
4、You Only Propagate Once: Accelerating Adversarial Training via Maximal Principle
5、综述】NLP 对抗训练
6、一文搞懂NLP中的对抗训练
7、加速对抗训练——YOPO算法浅析
8、炼丹技巧 功守道:NLP中的对抗训练
9、<EYD与机器学习>:对抗攻击基础知识
文章仅为搬运,原作链接:https://aistudio.baidu.com/aistudio/projectdetail/4327353
更多推荐
所有评论(0)