1、项目背景

**经常在使用paddle的时候发现常用的对抗训练基础工具有待开源补充,本项目基于常见的对抗训练论文进行代码复现,应该是目前飞桨社区最全的对抗训练合集,如果觉得本项目对您有用,请动动小手Fork、Star鼓励一下!在这里插入图片描述

2、项目介绍

对抗训练是一种引入噪声的训练方式,可以对参数进行正则化,提升模型鲁棒性和泛化能力。
有监督数据下使用交叉熵作为损失:在这里插入图片描述

半监督数据下使用KL散度作为损失:在这里插入图片描述

扰动如何得来呢?这需要对抗的思想,即往增大损失的方向增加扰动
有监督数据下:在这里插入图片描述

在这里插入图片描述

半监督数据下:在这里插入图片描述

θ上面一个尖儿代表的是常数,r代表的当前输入计算出来的梯度,r_adv是对抗训练计算出来的梯度。目的是说在计算对抗扰动时虽然计算了梯度,但不对参数进行更新,因为当前得到的对抗扰动是对旧参数最优的

用一句话形容对抗训练的思路,就是在输入上进行梯度上升(增大loss),在参数上进行梯度下降(减小loss)。由于输入会进行embedding lookup,所以实际的做法是在embedding table上进行梯度上升。

接下来介绍不同的方法,后续方法优化的主要方向有两点:得到更优的扰动 & 提升训练速度。

3、对抗论文

FGSM (Fast Gradient Sign Method):ICLR2015
FGM (Fast Gradient Method):ICLR2017
PGD (Projected Gradient Descent):ICLR2018
FreeAT (Free Adversarial Training):NIPS2019
YOPO (You Only Propagate Once):NIPS2019
AWP(Adversarial Weight Perturbation Helps Robust Generalization):NIPS 2020

导入环境

import paddle
from paddle import nn

FGSM (Fast Gradient Sign Method):ICLR2015

FGSM是Goodfellow提出对抗训练时的方法,假设对于输入的梯度为:在这里插入图片描述

那扰动肯定是沿着梯度的方向往损失函数的极大值走:在这里插入图片描述

用老太太打架形容:跟着你的方向前进,步步逼人,你来我往在这里插入图片描述

class FGSM:
    def __init__(self, model: nn.Module, eps=0.1):
        self.model = (model.module if hasattr(model, "module") else model)
        self.eps = eps
        self.backup = {}

    # only attack word embedding
    def attack(self, emb_name='embedding'):
        for name, param in self.model.named_parameters():
            if param.requires_grad and emb_name in name:
                self.backup[name] = param.data.clone()
                r_at = self.eps * param.grad.sign()
                param.data.add_(r_at)

    def restore(self, emb_name='embedding'):
        for name, para in self.model.named_parameters():
            if para.requires_grad and emb_name in name:
                assert name in self.backup
                para.data = self.backup[name]

        self.backup = {}
# 对应第一步
optimizer = paddle.optimizer.AdamW(learning_rate=5e-5, parameters=model.parameters())
fgsm = FGSM(model=model)

for batch in dataloader:
    # 对应第二步
    loss = model(batch)
    loss.backward()
    # 对应第三步
    fgsm.asttack()
    # 对应第四步
    loss_adv = model(batch)
    loss_adv.backward()
    # 对应第五步
    fgsm.restore()
    # 对应第六步
    optimizer.step()
    optimizer.clear_grad()

FGM (Fast Gradient Method):ICLR2017

Goodfellow后续提出的FGM则是根据具体的梯度进行scale,得到更好的对抗样本:在这里插入图片描述

用老太太打架形容:在限定规矩范围内,见招学招,你来我往在这里插入图片描述

class FGM:
    def __init__(self, model: nn.Module, eps=1.):
        self.model = (model.module if hasattr(model, "module") else model)
        self.eps = eps
        self.backup = {}

    # only attack embedding
    def attack(self, emb_name='embedding'):
        for name, param in self.model.named_parameters():
            if param.requires_grad and emb_name in name:
                self.backup[name] = param.data.clone()
                norm = paddle.norm(param.grad)
                if norm and not paddle.isnan(norm):
                    r_at = self.eps * param.grad / norm
                    param.data.add_(r_at)

    def restore(self, emb_name='embedding'):
        for name, para in self.model.named_parameters():
            if para.requires_grad and emb_name in name:
                assert name in self.backup
                para.data = self.backup[name]

        self.backup = {}
# 对应第一步
optimizer = paddle.optimizer.AdamW(learning_rate=5e-5, parameters=model.parameters())
fgm = FGM(model=model)

for batch in dataloader:
    # 对应第二步
    loss = model(batch)
    loss.backward()
    # 对应第三步
    fgm.attack()
    # 对应第四步
    loss_adv = model(batch)
    loss_adv.backward()
    # 对应第五步
    fgm.restore()
    # 对应第六步
    optimizer.step()
    optimizer.clear_grad()

PGD (Projected Gradient Descent):ICLR2018

FGM简单粗暴的“一步到位”,可能走不到约束内的最优点。PGD则是“小步走,多走几步”,如果走出了扰动半径为epsilon的空间,就映射回“球面”上,以保证扰动不要过大在这里插入图片描述

在这里插入图片描述

用老太太打架形容:让你没有机会出手,一来多回,小步多走在这里插入图片描述

class PGD:
    def __init__(self, model, eps=1., alpha=0.3):
        self.model = (model.module if hasattr(model, "module") else model)
        self.eps = eps
        self.alpha = alpha
        self.emb_backup = {}
        self.grad_backup = {}

    def attack(self, emb_name='embedding', is_first_attack=False):
        for name, param in self.model.named_parameters():
            if param.requires_grad and emb_name in name:
                if is_first_attack:
                    self.emb_backup[name] = param.data.clone()
                norm = paddle.norm(param.grad)
                if norm != 0 and not paddle.isnan(norm):
                    r_at = self.alpha * param.grad / norm
                    param.data.add_(r_at)
                    param.data = self.project(name, param.data)

    def restore(self, emb_name='embedding'):
        for name, param in self.model.named_parameters():
            if param.requires_grad and emb_name in name:
                assert name in self.emb_backup
                param.data = self.emb_backup[name]
        self.emb_backup = {}

    def project(self, param_name, param_data):
        r = param_data - self.emb_backup[param_name]
        if paddle.norm(r) > self.eps:
            r = self.eps * r / paddle.norm(r)
        return self.emb_backup[param_name] + r

    def backup_grad(self):
        for name, param in self.model.named_parameters():
            if param.requires_grad and param.grad is not None:
                self.grad_backup[name] = param.grad.clone()

    def restore_grad(self):
        for name, param in self.model.named_parameters():
            if param.requires_grad and param.grad is not None:
                param.grad = self.grad_backup[name]
# 对应第一步
optimizer = paddle.optimizer.AdamW(learning_rate=5e-5, parameters=model.parameters())
pgd = PGD(model=model)

for batch in dataloader:
	# 对应第二步
	loss = model(batch)
	loss.backward()
    # 对应第三步
	pgd.backup_grad()
    # 对应第四步
	for k in range(pgd_k):
		pgd.attack(is_first_attack=(k == 0))
		if k != pgd_k - 1:
			model.zero_grad()
		else:
			pgd.restore_grad()
		loss_adv = model(batch)
		loss_adv.backward()
    # 对应第五步
	pgd.restore()
    # 对应第六步
	optimizer.step()
	optimizer.clear_grad()

FreeAT (Free Adversarial Training):NIPS2019

FreeAT的思想是在对每个样本x连续重复m次训练,计算r时复用上一步的梯度,为了保证速度,整体epoch会除以m。r的更新公式为:在这里插入图片描述

class FreeAT:
    def __init__(self, model, eps=0.1):
        self.model = (model.module if hasattr(model, "module") else model)
        self.eps = eps
        self.emb_backup = {}
        self.grad_backup = {}
        self.last_r_at = 0

    def attack(self, emb_name='embedding', is_first_attack=False):
        for name, param in self.model.named_parameters():
            if param.requires_grad and emb_name in name:
                if is_first_attack:
                    self.emb_backup[name] = param.data.clone()
                param.data.add_(self.last_r_at)
                param.data = self.project(name, param.data)
                self.last_r_at = self.last_r_at + self.eps * param.grad.sign()

    def restore(self, emb_name='embedding'):
        for name, param in self.model.named_parameters():
            if param.requires_grad and emb_name in name:
                assert name in self.emb_backup
                param.data = self.emb_backup[name]
        self.emb_backup = {}

    def project(self, param_name, param_data):
        r = param_data - self.emb_backup[param_name]
        if paddle.norm(r) > self.eps:
            r = self.eps * r / paddle.norm(r)
        return self.emb_backup[param_name] + r

    def backup_grad(self):
        for name, param in self.model.named_parameters():
            if param.requires_grad and param.grad is not None:
                self.grad_backup[name] = param.grad.clone()

    def restore_grad(self):
        for name, param in self.model.named_parameters():
            if param.requires_grad and param.grad is not None:
                param.grad = self.grad_backup[name]

YOPO (You Only Propagate Once):NIPS2019

极大值原理PMP(Pontryagin’s maximum principle)是optimizer的一种,它将神经网络看作动力学系统。这个方法的优点是在优化网络参数时,层之间是解藕的。通过这个思想,我们可以想到,既然扰动是加在embedding层的,为什么每次还要计算完整的前后向传播呢?基于这个想法,作者想复用后几层的梯度。
假设p为定值:
在这里插入图片描述

则对r的更新就可以变为在这里插入图片描述

class YOPO:
    def __init__(self, eps= 6/255.0, sigma=3/255.0, nb_iter=20,
                 norm=np.inf, DEVICE=paddle.device('cpu'),
                 mean=paddle.tensor(np.array([0]).astype(np.float32)[np.newaxis, :, np.newaxis, np.newaxis]),
                 std=paddle.tensor(np.array([1.0]).astype(np.float32)[np.newaxis, :, np.newaxis, np.newaxis]), random_start=True):
        '''
        :param eps: maximum distortion of adversarial examples
        :param sigma: single step size
        :param nb_iter: number of attack iterations
        :param norm: which norm to bound the perturbations
        '''
        self.eps = eps
        self.sigma = sigma
        self.nb_iter = nb_iter
        self.norm = norm
        self.criterion = nn.CrossEntropyLoss().to(DEVICE)
        self.DEVICE = DEVICE
        self._mean = mean.to(DEVICE)
        self._std = std.to(DEVICE)
        self.random_start = random_start

    def single_attack(self, net, inp, label, eta, target = None):
        '''
        Given the original image and the perturbation computed so far, computes
        a new perturbation.
        :param net:
        :param inp: original image
        :param label:
        :param eta: perturbation computed so far
        :return: a new perturbation
        '''

        adv_inp = inp + eta

        #net.zero_grad()

        pred = net(adv_inp)
        if target is not None:
            targets = paddle.sum(pred[:, target])
            grad_sign = paddle.autograd.grad(targets, adv_in, only_inputs=True, retain_graph = False)[0].sign()

        else:
            loss = self.criterion(pred, label)
            grad_sign = paddle.autograd.grad(loss, adv_inp,
                                            only_inputs=True, retain_graph = False)[0].sign()

        adv_inp = adv_inp + grad_sign * (self.sigma / self._std)
        tmp_adv_inp = adv_inp * self._std +  self._mean

        tmp_inp = inp * self._std + self._mean
        tmp_adv_inp = paddle.clamp(tmp_adv_inp, 0, 1) ## clip into 0-1
        #tmp_adv_inp = (tmp_adv_inp - self._mean) / self._std
        tmp_eta = tmp_adv_inp - tmp_inp
        tmp_eta = clip_eta(tmp_eta, norm=self.norm, eps=self.eps, DEVICE=self.DEVICE)

        eta = tmp_eta/ self._std

        return eta

    def attack(self, net, inp, label, target = None):

        if self.random_start:
            eta = paddle.FloatTensor(*inp.shape).uniform_(-self.eps, self.eps)
        else:
            eta = paddle.zeros_like(inp)
        eta = eta.to(self.DEVICE)
        eta = (eta - self._mean) / self._std
        net.eval()

        inp.requires_grad = True
        eta.requires_grad = True
        for i in range(self.nb_iter):
            eta = self.single_attack(net, inp, label, eta, target)
            #print(i)

        #print(eta.max())
        adv_inp = inp + eta
        tmp_adv_inp = adv_inp * self._std +  self._mean
        tmp_adv_inp = paddle.clamp(tmp_adv_inp, 0, 1)
        adv_inp = (tmp_adv_inp - self._mean) / self._std

        return adv_inp

    def to(self, device):
        self.DEVICE = device
        self._mean = self._mean.to(device)
        self._std = self._std.to(device)
        self.criterion = self.criterion.to(device)

AWP(Adversarial Weight Perturbation Helps Robust Generalization):NIPS 2020

class AWP:
    def __init__(
        self,
        model,
        optimizer,
        adv_param=['weight'],
        adv_lr=1,
        adv_eps=0.001,
        adv_step=1,
        scaler=None
    ):
        self.model = model
        self.optimizer = optimizer
        self.adv_param = adv_param
        self.adv_lr = adv_lr
        self.adv_eps = adv_eps
        self.adv_step = adv_step
        self.backup = {}
        self.backup_eps = {}
        self.scaler = scaler

    def attack_backward(self, batch):
        if (self.adv_lr == 0):
            return None

        self._save() 
        for i in range(self.adv_step):
            self._attack_step()
            loss = self.model(batch)
            self.optimizer.zero_grad()
            self.scaler.scale(loss).backward()
        self._restore()

    def _attack_step(self):
        e = 1e-6
        for name, param in self.model.named_parameters():
            if (param.requires_grad) and (param.grad is not None) and (self.adv_param[0] in name):
                norm1 = paddle.norm(param.grad)
                norm2 = paddle.norm(param.data.detach())
                if norm1 != 0 and not paddle.isnan(norm1):
                    r_at = self.adv_lr * param.grad / (norm1 + e) * (norm2 + e)
                    param.data.add_(r_at)
                    param.data = paddle.min(
                        paddle.max(param.data, self.backup_eps[name][0]), self.backup_eps[name][1]
                    )
                # param.data.clamp_(*self.backup_eps[name])

    def _save(self):
        for name, param in self.model.named_parameters():
            if (param.requires_grad) and (param.grad is not None) and (self.adv_param[0] in name):
                if name not in self.backup:
                    self.backup[name] = param.data.clone()
                    grad_eps = self.adv_eps * param.abs().detach()
                    self.backup_eps[name] = (
                        self.backup[name] - grad_eps,
                        self.backup[name] + grad_eps,
                    )

    def _restore(self,):
        for name, param in self.model.named_parameters():
            if name in self.backup:
                param.data = self.backup[name]
        self.backup = {}
        self.backup_eps = {}

4、项目总结

1、在深度学习项目、竞赛中对抗训练是必不可少的增强工具,现在完善了飞桨此处的分支,可以更加愉快的使用了。
2、fgsm、fgm、pgd是最常使用的增强工具,所以还附加了使用逻辑,其余的增强工具只是完成了核心代码的复现。

5、引用资料

1、Explaining and Harnessing Adversarial Examples
2、Adversarial Training for Free!
3、Adversarial Training Methods for Semi-Supervised Text Classification
4、You Only Propagate Once: Accelerating Adversarial Training via Maximal Principle
5、综述】NLP 对抗训练
6、一文搞懂NLP中的对抗训练
7、加速对抗训练——YOPO算法浅析
8、炼丹技巧 功守道:NLP中的对抗训练
9、<EYD与机器学习>:对抗攻击基础知识

文章仅为搬运,原作链接:https://aistudio.baidu.com/aistudio/projectdetail/4327353

Logo

学大模型,用大模型上飞桨星河社区!每天8点V100G算力免费领!免费领取ERNIE 4.0 100w Token >>>

更多推荐