★★★ 本文源自AlStudio社区精品项目,【点击此处】查看更多精品内容 >>>

1. 关于LSTM

1.1 简介

长短期记忆网络(LSTM,Long Short-Term Memory)是一种时间循环神经网络,是为了解决一般的RNN(循环神经网络)存在的长期依赖问题而专门设计出来的,所有的RNN都具有一种重复神经网络模块的链式形式。在标准RNN中,这个重复的结构模块只有一个非常简单的结构,例如一个tanh层。

1.2 不足

1.2.1 并行处理能力差:由于LSTM的内部结构相对复杂,导致其在处理较长的序列时需要逐层处理,而无法进行并行处理,因此在处理长序列时效率较低。

1.2.2 RNN的梯度问题:LSTM及其变种虽然解决了一些梯度问题,但仍存在梯度消失或梯度爆炸等问题,这会导致模型难以训练或训练不稳定。

1.2.3 计算量大:由于LSTM的内部结构复杂,需要进行较多的计算,因此在训练时需要消耗较多的计算资源,特别是在处理长序列时,计算量会更大,训练效率也会相应降低。

1.3 LSTF

1.3.1 LSTF(Long Short-Term Memory Network)是一种基于LSTM(Long Short-Term Memory)模型的时序预测方法,用于处理长时间序列数据。

1.3.2 LSTM模型在时序预测任务中表现良好,但也存在一些缺点,主要包括以下几点:

    • 预测误差(MSE)过高:在处理长时间序列数据时,LSTM模型的预测误差较高,这使得它在一些应用场景下表现不尽如人意。
    • 预测速度较慢:LSTM模型的预测速度相对较慢,这主要是因为其内部复杂的计算过程所致。
    • 模型参数较多:LSTM模型需要训练的参数较多,这使得它在处理大规模数据时需要更多的计算资源和时间。
    • 容易出现模式震荡(mode switching):在处理非平稳时间序列数据时,LSTM模型容易出现模式震荡现象,这会导致模型预测结果的不稳定性。

2. Informer简介(2021 AAAI Best Paper)

2.1 Reference

@inproceedings{haoyietal-informer-2021,
author = {Haoyi Zhou and
Shanghang Zhang and
Jieqi Peng and
Shuai Zhang and
Jianxin Li and
Hui Xiong and
Wancai Zhang},
title = {Informer: Beyond Efficient Transformer for Long Sequence Time-Series Forecasting},
booktitle = {The Thirty-Fifth {AAAI} Conference on Artificial Intelligence, {AAAI} 2021, Virtual Conference},
volume = {35},
number = {12},
pages = {11106–11115},
publisher = {{AAAI} Press},
year = {2021},
}

一个基于Transformer的效率优化的长时间序列预测模型

2.2 基本介绍

2.2.1 许多实际应用需要长序列时间序列的预测,如电力消耗规划。长序列时间序列预测(LSTF)对模型的预测能力提出了很高的要求,即能够高效地精确捕捉输出与输入之间的长范围依赖耦合。最近的研究显示了Transformer在提高预测能力方面的潜力。

2.2.2 然而,Transformer存在几个严重的问题,使其无法直接应用于LSTF,包括二次时间复杂度、高内存使用以及编码器-解码器体系结构的固有限制。为了解决这些问题,我们设计了一个高效的基于Transformer的LSTF模型,命名为Informer

2.2.3 具有的显著的特征与创新点:

  1. ProbSparse自注意机制,在时间复杂度和内存使用方面达到O(Llog L),并且在序列依赖项对齐方面具有相当的性能。Informer核心创新点,解决二次计算复杂度,高耗内存,预测长输出受限问题。

  2. 自注意蒸馏通过将级联层输入减半来突出支配注意,并有效地处理极长输入序列。通过训练一个较小的模型来指导一个较大的模型的学习,从而提高模型的效率和准确性。

  3. 生成式解码器虽然概念简单,但在一次向前操作中预测长时间序列,而不是一步一步地预测,这大大提高了长序列预测的推理速度。在4个大规模数据集上的大量实验表明,Informer算法显著优于现有方法,为LSTF问题提供了一种新的解决方案

  4. 将输入表示为统一的矩阵,可以更好地处理各种输入类型。

2.3 自注意蒸馏机制

自注意蒸馏机制是一种高效的方法,通过训练一个较小的模型来指导一个较大的模型的学习,以提高模型的效率和准确性。自注意蒸馏机制的核心思想是,在训练小模型时,利用大模型的注意力分布来指导小模型的训练,以便小模型可以更好地模仿大模型。

自注意蒸馏机制的特点是,它能够捕捉输入数据的全局特征,并将这些特征传递给小模型,从而使小模型能够更好地模拟大模型。此外,自注意蒸馏机制还可以通过最小化目标函数来优化模型的参数,从而提高模型的准确性。

自注意蒸馏机制的原理是,它通过对输入数据的注意力分布进行计算,从而捕捉输入数据的全局特征。然后,将这些特征传递给小模型,以便小模型可以更好地模仿大模型。同时,自注意蒸馏机制还通过最小化目标函数来优化模型的参数,从而提高模型的准确性。

3. 搭建模型(使用PaddlePaddle)

3.1 导入所需的库

import paddle
import paddle.nn as nn
import paddle.nn.functional as F
import numpy as np
import math

3.2 概率掩码

  • 下面这段代码定义了两个类:TriangularCausalMask 和 ProbMask。这两个类都是用来实现概率掩码(Probability Mask)的。概率掩码是一种在生成文本时常用的技术,它有助于减少生成的文本中重复或不相关的单词。

  • TriangularCausalMask 类实现了一个对角线因果掩码(Diagonal Causal Mask),其中对角线因果掩码用于引导生成过程。在这个例子中,类接收三个参数:batch size (B),序列长度(L),以及设备(device)。它使用 PaddlePaddle 的 extract_loc 函数生成一个对角线因果掩码,并将其放在设备上。然后,将此掩码暴露给外界的属性为 mask。

  • 另一方面,ProbMask 类实现了一个概率掩码,它基于一个指示器(indicator)张量。这个指示器张量用于确定哪个单词应该被保留,哪个单词应该被掩码掉。在这个例子中,类接收五个参数:batch size (B),每个时间步长的嵌入维度(H),序列长度(L),掩码的索引(index),以及嵌入的得分(scores)。这个类通过使用 PaddlePaddle 的 triu 函数创建一个上三角形张量,然后将其扩展并转换为概率掩码。这个概率掩码被暴露给外界的属性为 mask。

class TriangularCausalMask():
    def __init__(self, B, L, device="cpu"):
        mask_shape = [B, 1, L, L]
        with paddle.disable_grad():
            self._mask = paddle.nn.functional.extract_loc(paddle.ones(mask_shape, dtype=paddle.bool), diagonal=1).to(device)

    @property
    def mask(self):
        return self._mask

class ProbMask():
    def __init__(self, B, H, L, index, scores, device="cpu"):
        _mask = paddle.ones(L, scores.shape[-1], dtype=paddle.bool).to(device).triu(1)
        _mask_ex = _mask[None, None, :].expand(B, H, L, scores.shape[-1])
        indicator = _mask_ex[paddle.arange(B)[:, None, None],
                             paddle.arange(H)[None, :, None],
                             index, :].to(device)
        self._mask = indicator.view(scores.shape).to(device)
    
    @property
    def mask(self):
        return self._mask

3.3 Self-Attention(自注意力机制)

  • 自注意力机制(Self-Attention)使得模型能够自动捕捉输入序列中各个元素之间的关系,从而更好地理解输入序列。这种机制使得模型能够并行处理输入序列中的所有元素,避免了RNN/LSTM等序列模型中存在的计算顺序依赖问题,从而更高效地进行处理。自注意力机制在自然语言处理领域的应用非常广泛,例如机器翻译、语音识别、文本分类等任务中都有很好的效果。
#mask_flag:是否对输入的 queries 进行掩码处理,如果为 True,则使用 TriangularCausalMask 对 queries 进行掩码处理。
# factor:计算注意力权重的缩放因子。
# scale:缩放因子,可以替代 factor。
# attention_dropout:注意力机制中的dropout概率。
# output_attention:是否输出注意力权重。
# 在 forward 函数中,输入的 queries、keys 和 values 分别具有形状 (B, L, H, E)、(B, S, H, E) 和 (B, L, D)。
# 其中,B 表示批次大小,L 表示 queries 的长度,H 表示 queries 的通道数,E 表示 queries 的维度。paddle.einsum函数用于执行多维张量的线性操作,这里用于计算注意力分数。
# 在计算分数之后,根据 self.mask_flag 是否为 True,以及是否传递了 attn_mask 参数来判断是否进行掩码处理。然后通过 self.dropout 函数对注意力权重进行 dropout 处理,并使用 paddle.softmax 函数计算注意力权重。
# 最后,将注意力权重和 values 相乘得到输出结果。
# 如果 self.output_attention 为 True,则返回输出结果和注意力权重;否则只返回输出结果。
class FullAttention(nn.Layer):
    def __init__(self, mask_flag=True, factor=5, scale=None, attention_dropout=0.1, output_attention=False):
        super(FullAttention, self).__init__()
        self.scale = scale
        self.mask_flag = mask_flag
        self.output_attention = output_attention
        self.dropout = nn.Dropout(attention_dropout)
        
    def forward(self, queries, keys, values, attn_mask):
        B, L, H, E = queries.shape
        _, S, _, D = values.shape
        scale = self.scale or 1./sqrt(E)

        scores = paddle.einsum("blhe,bshe->bhls", queries, keys)
        if self.mask_flag:
            if attn_mask is None:
                attn_mask = TriangularCausalMask(B, L, device=queries.device)

            scores.masked_fill_(attn_mask.mask, -np.inf)

        A = self.dropout(paddle.softmax(scale * scores, dim=-1))
        V = paddle.einsum("bhls,bshd->blhd", A, values)
    
        if self.output_attention:
            return (V.contiguous(), A)
        else:
            return (V.contiguous(), None)

class ProbAttention(nn.Layer):
    def __init__(self, mask_flag=True, factor=5, scale=None, attention_dropout=0.1, output_attention=False):
        super(ProbAttention, self).__init__()
        self.factor = factor
        self.scale = scale
        self.mask_flag = mask_flag
        self.output_attention = output_attention
        self.dropout = nn.Dropout(attention_dropout)

    def _prob_QK(self, Q, K, sample_k, n_top): # n_top: c*ln(L_q)
        # Q [B, H, L, D]
        B, H, L_K, E = K.shape
        _, _, L_Q, _ = Q.shape

        # 计算采样后的Q_K
        K_expand = K.unsqueeze(-3).expand(B, H, L_Q, L_K, E)
        index_sample = paddle.randint(L_K, (L_Q, sample_k)) # real U = U_part(factor*ln(L_k))*L_q
        K_sample = K_expand[:, :, paddle.arange(L_Q).unsqueeze(1), index_sample, :]
        Q_K_sample = paddle.matmul(Q.unsqueeze(-2), K_sample.transpose(-2, -1)).squeeze(-2)
        # 使用稀疏度量查找Top_k
        M = Q_K_sample.max(-1)[0] - paddle.sum(Q_K_sample, -1) / L_K 
        M_top = M.topk(n_top, sorted=False)[1]

        # 使用简化后的Q来计算Q_K
        Q_reduce = Q[paddle.arange(B)[:, None, None],
                     paddle.arange(H)[None, :, None],
                     M_top, :] # factor*ln(L_q)
        Q_K = paddle.matmul(Q_reduce, K.transpose(-2, -1)) # factor*ln(L_q)*L_k

        return Q_K, M_top

    def _get_initial_context(self, V, L_Q):
        B, H, L_V, D = V.shape
        if not self.mask_flag:
            # V_sum = V.sum(dim=-2)
            V_sum = V.mean(dim=-2)
            contex = V_sum.unsqueeze(-2).expand(B, H, L_Q, V_sum.shape[-1]).clone()
        else: # use mask
            assert(L_Q == L_V) # 要求L_Q == L_V,即仅用于自注意力机制
            contex = V.cumsum(dim=-2)
        return contex

    def _update_context(self, context_in, V, scores, index, L_Q, attn_mask):
        B, H, L_V, D = V.shape

        if self.mask_flag:
            attn_mask = ProbMask(B, H, L_Q, index, scores, device=V.device)
            scores.masked_fill_(attn_mask.mask, -np.inf)

        attn = paddle.softmax(scores, dim=-1) # nn.Softmax(dim=-1)(scores)

        context_in[paddle.arange(B)[:, None, None],
                   paddle.arange(H)[None, :, None],
                   index, :] = paddle.matmul(attn, V).type_as(context_in)
        if self.output_attention:
            attns = (paddle.ones([B, H, L_V, L_V])/L_V).type_as(attn).to(attn.device)
            attns[paddle.arange(B)[:, None, None], paddle.arange(H)[None, :, None], index, :] = attn
            return (context_in, attns)
        else:
            return (context_in, None)

    def forward(self, queries, keys, values, attn_mask):
        B, L_Q, H, D = queries.shape
        _, L_K, _, _ = keys.shape

        queries = queries.transpose(2,1)
        keys = keys.transpose(2,1)
        values = values.transpose(2,1)

        U_part = self.factor * np.ceil(np.log(L_K)).astype('int').item() # c*ln(L_k)
        u = self.factor * np.ceil(np.log(L_Q)).astype('int').item() # c*ln(L_q) 

        U_part = U_part if U_part<L_K else L_K
        u = u if u<L_Q else L_Q
        
        scores_top, index = self._prob_QK(queries, keys, sample_k=U_part, n_top=u) 

        # 加比例因子
        scale = self.scale or 1./sqrt(D)
        if scale is not None:
            scores_top = scores_top * scale
        # 获取上下文
        context = self._get_initial_context(values, L_Q)
        # 用选定的top_k个查询更新上下文
        context, attn = self._update_context(context, values, scores_top, index, L_Q, attn_mask)
        
        return context.transpose(2,1).contiguous(), attn


class AttentionLayer(nn.Layer):
    def __init__(self, attention, d_model, n_heads, 
                 d_keys=None, d_values=None, mix=False):
        super(AttentionLayer, self).__init__()

        d_keys = d_keys or (d_model//n_heads)
        d_values = d_values or (d_model//n_heads)

        self.inner_attention = attention
        self.query_projection = nn.Linear(d_model, d_keys * n_heads)
        self.key_projection = nn.Linear(d_model, d_keys * n_heads)
        self.value_projection = nn.Linear(d_model, d_values * n_heads)
        self.out_projection = nn.Linear(d_values * n_heads, d_model)
        self.n_heads = n_heads
        self.mix = mix

    def forward(self, queries, keys, values, attn_mask):
        B, L, _ = queries.shape
        _, S, _ = keys.shape
        H = self.n_heads

        queries = self.query_projection(queries).view(B, L, H, -1)
        keys = self.key_projection(keys).view(B, S, H, -1)
        values = self.value_projection(values).view(B, S, H, -1)

        out, attn = self.inner_attention(
            queries,
            keys,
            values,
            attn_mask
        )
        if self.mix:
            out = out.transpose(2,1).contiguous()
        out = out.view(B, L, -1)

        return self.out_projection(out), attn

3.4 Embedding(词嵌入)

  • 词嵌入是一种将自然语言转换为向量空间模型的技术,它可以将一个单词表示为一个实数向量。这种技术有着广泛的应用,包括但不限于:
  • 词向量表示:将单词表示为低维向量,可以用于文本分类、聚类、信息检索等任务。
  • 语言模型:通过训练一个神经网络来预测一个单词是否为一个给定上下文中的下一个单词,可以用于文本生成、机器翻译等任务。
  • 文本分类:使用 Transformer 模型作为特征提取器,将文本表示为向量,可以用于情感分析、关键词提取等任务。
  • 命名实体识别:使用 Transformer 模型作为识别器,可以用于识别文本中的人名、地名、组织名等实体。
  • 总之,Transformer 词嵌入技术在自然语言处理领域中有着非常广泛的应用,它可以帮助我们更好地理解和处理自然语言文本数据。
# 创建一个长度为 max_len 的零张量 pe,其维度为 d_model。
# 将 pe.require_grad 设置为 False,以确保反向传播时不会计算 pe 的梯度。
# 创建一个与 pe 相同维度的张量 position,其中包含从 0 到 max_len-1 的数字。
# 创建一个与 pe 相同维度的张量 div_term,其中包含 -(math.log(10000.0) / d_model) 的幂次。
# 将 pe 中每两个相邻的元素设置为 position * div_term 的正弦和余弦值。
# 将 pe 张量展开为一个大小为 (1, max_len, d_model) 的张量,并将其传递到前向传递中。在前向传递中,该类将输入张量 x 的长度与位置编码矩阵的长度进行比较,并返回相应长度的位置编码张量。
# 这个 PositionalEmbedding 类可以用于为输入序列中的每个位置提供位置编码,这些位置编码可用于各种任务。
class PositionalEmbedding(nn.Layer):
    def __init__(self, d_model, max_len=5000):
        super(PositionalEmbedding, self).__init__()
        # Compute the positional encodings once in log space.
        pe = paddle.zeros(max_len, d_model).float()
        pe.require_grad = False

        position = paddle.arange(0, max_len).float().unsqueeze(1)
        div_term = (paddle.arange(0, d_model, 2).float() * -(math.log(10000.0) / d_model)).exp()

        pe[:, 0::2] = paddle.sin(position * div_term)
        pe[:, 1::2] = paddle.cos(position * div_term)

        pe = pe.unsqueeze(0)
        self.register_buffer('pe', pe)

    def forward(self, x):
        return self.pe[:, :x.size(1)]

# 这是一个PaddlePaddle中的TokenEmbedding类,继承自nn.Layer。它包括一个卷积层,用于从输入张量x中提取嵌入特征。
# 在初始化方法中,它定义了一个名为tokenConv的卷积层,输入通道数为c_in,输出通道数为d_model,卷积核大小为3,填充大小为1(如果使用的是PaddlePaddle 1.5.0及以上的版本)或2,填充方式为循环模式。
#然后,它使用nn.init.kaiming_normal_函数初始化所有模块的权重,包括卷积层,使用fan_in模式和leaky_relu非线性激活函数。
# 在前向传递方法中,它首先将输入张量x的维度转置为(0, 2, 1),然后通过卷积层tokenConv进行处理。处理后,它将输出张量的维度转置回(0, 1, 2),并返回结果。
# 这个TokenEmbedding类可以用于为输入序列中的每个token提供嵌入特征,这些嵌入特征可以用于各种NLP任务,例如文本分类、命名实体识别等。
class TokenEmbedding(nn.Layer):
    def __init__(self, c_in, d_model):
        super(TokenEmbedding, self).__init__()
        padding = 1 if paddle.__version__>='1.5.0' else 2
        self.tokenConv = nn.Conv1d(in_channels=c_in, out_channels=d_model, 
                                    kernel_size=3, padding=padding, padding_mode='circular')
        for m in self.modules():
            if isinstance(m, nn.Conv1d):
                nn.init.kaiming_normal_(m.weight,mode='fan_in',nonlinearity='leaky_relu')

    def forward(self, x):
        x = self.tokenConv(x.permute(0, 2, 1)).transpose(1,2)
        return x
# 这是一个名为FixedEmbedding的PaddlePaddle层类,它继承自nn.Layer。
# 在初始化方法中,它定义了一个名为w的张量,其大小为c_in x d_model,并将其梯度设置为False。
# 接下来,它创建了一个位置张量position,其大小为c_in x 1,并将其梯度设置为False。然后,它计算了div_term,即(d_model / 2) * math.log(10000)的幂次方。
# 最后,它将w张量设置为正弦和余弦函数的组合,这些函数是位置张量乘以div_term的结果。
# 在forward方法中,它创建了一个名为emb的nn.Embedding对象,并将权重设置为初始化方法中计算出的w张量。
# 这个FixedEmbedding类可以在固定的位置编码上使用标准的nn.Embedding。因此,它的实现不需要在运行时进行昂贵的计算。
# 通常,它适用于需要高效计算的任务,例如大规模文本分类。
class FixedEmbedding(nn.Layer):
    def __init__(self, c_in, d_model):
        super(FixedEmbedding, self).__init__()

        w = paddle.zeros(c_in, d_model).float()
        w.require_grad = False

        position = paddle.arange(0, c_in).float().unsqueeze(1)
        div_term = (paddle.arange(0, d_model, 2).float() * -(math.log(10000.0) / d_model)).exp()

        w[:, 0::2] = paddle.sin(position * div_term)
        w[:, 1::2] = paddle.cos(position * div_term)

        self.emb = nn.Embedding(c_in, d_model)
        self.emb.weight = nn.Parameter(w, requires_grad=False)

    def forward(self, x):
        return self.emb(x).detach()
# 这是一个名为TemporalEmbedding的PaddlePaddle层类,它继承自nn.Layer。
# 在初始化方法中,它定义了一些大小,具体取决于要嵌入的时间特征的类型和频率。
# 然后,根据embed_type参数,它创建了一个FixedEmbedding对象(如果embed_type=='fixed')或nn.Embedding对象。
# 最后,它根据freq参数创建了一个时间特征的嵌入对象。
# 在forward方法中,如果freq='t',则根据分钟大小创建一个嵌入对象minute_embed,并根据小时大小创建一个嵌入对象hour_embed。
# 然后,根据星期大小创建一个嵌入对象weekday_embed,根据天大小创建一个嵌入对象day_embed,根据月大小创建一个嵌入对象month_embed。
# 最后,它将所有嵌入对象的输出连接在一起,并返回结果。
# 这个TemporalEmbedding类可以用于为时间序列数据中的不同时间特征(例如分钟、小时、星期、天和月)添加位置编码。
class TemporalEmbedding(nn.Layer):
    def __init__(self, d_model, embed_type='fixed', freq='h'):
        super(TemporalEmbedding, self).__init__()

        minute_size = 4; hour_size = 24
        weekday_size = 7; day_size = 32; month_size = 13

        Embed = FixedEmbedding if embed_type=='fixed' else nn.Embedding
        if freq=='t':
            self.minute_embed = Embed(minute_size, d_model)
        self.hour_embed = Embed(hour_size, d_model)
        self.weekday_embed = Embed(weekday_size, d_model)
        self.day_embed = Embed(day_size, d_model)
        self.month_embed = Embed(month_size, d_model)
    
    def forward(self, x):
        x = x.long()
        
        minute_x = self.minute_embed(x[:,:,4]) if hasattr(self, 'minute_embed') else 0.
        hour_x = self.hour_embed(x[:,:,3])
        weekday_x = self.weekday_embed(x[:,:,2])
        day_x = self.day_embed(x[:,:,1])
        month_x = self.month_embed(x[:,:,0])
        
        return hour_x + weekday_x + day_x + month_x + minute_x
# 这是一个名为TimeFeatureEmbedding的PaddlePaddle层类,它继承自nn.Layer。
# 在初始化方法中,它定义了一个名为embed的nn.Linear对象,输入维度为freq_map[freq],输出维度为d_model。
# 其中,freq_map是一个字典,用于将频率('h'、't'、's'、'm'、'a'、'w'、'd'和'b'分别代表小时、分钟、秒、分钟内的刻钟、天、周、月和周内的小时)映射到相应的输入维度。
# 在前向传递方法中,它通过调用nn.Linear对象的forward方法来计算嵌入特征。
# 这个TimeFeatureEmbedding类可以用于将时间序列数据中的不同时间特征(例如小时、分钟、秒、分钟内的刻钟、天、周、月和周内的小时)映射到一个低维向量空间中。
class TimeFeatureEmbedding(nn.Layer):
    def __init__(self, d_model, embed_type='timeF', freq='h'):
        super(TimeFeatureEmbedding, self).__init__()

        freq_map = {'h':4, 't':5, 's':6, 'm':1, 'a':1, 'w':2, 'd':3, 'b':3}
        d_inp = freq_map[freq]
        self.embed = nn.Linear(d_inp, d_model)
    
    def forward(self, x):
        return self.embed(x)
# 这是一个名为TimeFeatureEmbedding的PaddlePaddle层类,它继承自nn.Layer。
# 在初始化方法中,它定义了三个嵌入对象:value_embedding(用于嵌入输入序列中的值)、position_embedding(用于嵌入输入序列中的位置)和temporal_embedding(用于嵌入时间序列数据中的时间特征)。
# 其中,如果embed_type不为'timeF',则使用TemporalEmbedding对象;否则,使用TimeFeatureEmbedding对象。
# 在前向传递方法中,它首先将输入序列x传递给value_embedding对象,并将位置编码添加到结果中。
# 然后,如果embed_type不为'timeF',则将时间序列数据中的时间特征x_mark传递给temporal_embedding对象,并将结果添加到嵌入特征中。
# 最后,它通过调用nn.Dropout对象的forward方法来应用dropout正则化,并返回结果。
class DataEmbedding(nn.Layer):
    def __init__(self, c_in, d_model, embed_type='fixed', freq='h', dropout=0.1):
        super(DataEmbedding, self).__init__()

        self.value_embedding = TokenEmbedding(c_in=c_in, d_model=d_model)
        self.position_embedding = PositionalEmbedding(d_model=d_model)
        self.temporal_embedding = TemporalEmbedding(d_model=d_model, embed_type=embed_type, freq=freq) if embed_type!='timeF' else TimeFeatureEmbedding(d_model=d_model, embed_type=embed_type, freq=freq)

        self.dropout = nn.Dropout(p=dropout)

    def forward(self, x, x_mark):
        x = self.value_embedding(x) + self.position_embedding(x) + self.temporal_embedding(x_mark)
        
        return self.dropout(x)

3.5 Encoder(编码器)

# 这个ConvLayer类的初始化方法中,
# 创建了一个nn.Conv1d对象downConv,一个nn.BatchNorm1d对象norm,一个nn.ELU对象activation和一个nn.MaxPool1d对象maxPool。
# 在forward方法中,先将输入x进行转置,然后经过卷积层downConv、批归一化层norm、激活函数层activation和最大池化层maxPool的处理,最后将结果转置回去并返回。
class ConvLayer(nn.Layer):
    def __init__(self, c_in):
        super(ConvLayer, self).__init__()
        padding = 1 if paddle.__version__>='1.5.0' else 2
        self.downConv = nn.Conv1d(in_channels=c_in,
                                  out_channels=c_in,
                                  kernel_size=3,
                                  padding=padding,
                                  padding_mode='circular')
        self.norm = nn.BatchNorm1d(c_in)
        self.activation = nn.ELU()
        self.maxPool = nn.MaxPool1d(kernel_size=3, stride=2, padding=1)

    def forward(self, x):
        x = self.downConv(x.permute(0, 2, 1))
        x = self.norm(x)
        x = self.activation(x)
        x = self.maxPool(x)
        x = x.transpose(1,2)
        return x
# 这是一个PaddlePaddle的EncoderLayer类,继承自nn.Layer。
# 在初始化方法中,它定义了几个成员变量,包括self.attention(一个自注意力机制的实例),self.conv1(一个1D卷积层实例),self.conv2(另一个1D卷积层实例),self.norm1(一个规范化层实例),self.norm2(另一个规范化层实例)和self.dropout(一个dropout层实例)。
# 其中,d_ff是一个可选参数,默认为4*d_model。
# 在forward方法中,它接收一个输入张量x和一个可选参数attn_mask(一个注意力掩码张量)。
# 首先,它使用self.attention实例处理x、自身和自身,将结果加到x上,得到新的x。
# 这个处理包括使用自注意力机制计算x、自身和自身之间的相似度,然后应用softmax函数得到注意力权重,最后将结果加到x上。
# 接着,它将新的x通过规范化层self.norm1、激活函数层self.activation、卷积层self.conv1、转置张量、规范化层self.norm2、dropout层self.dropout和激活函数层self.activation处理,得到最终的输出张量。
# 其中,self.attention的返回值包括新的张量new_x和注意力权重attn。
# 最后,它通过将x作为输入并将注意力权重设置为None,在不需要的情况下禁用自注意力机制来创建具有attention_mask参数的forward方法。
class EncoderLayer(nn.Layer):
    def __init__(self, attention, d_model, d_ff=None, dropout=0.1, activation="relu"):
        super(EncoderLayer, self).__init__()
        d_ff = d_ff or 4*d_model
        self.attention = attention
        self.conv1 = nn.Conv1d(in_channels=d_model, out_channels=d_ff, kernel_size=1)
        self.conv2 = nn.Conv1d(in_channels=d_ff, out_channels=d_model, kernel_size=1)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)
        self.activation = F.relu if activation == "relu" else F.gelu

    def forward(self, x, attn_mask=None):
        # x [B, L, D]
        # x = x + self.dropout(self.attention(
        #     x, x, x,
        #     attn_mask = attn_mask
        # ))
        new_x, attn = self.attention(
            x, x, x,
            attn_mask = attn_mask
        )
        x = x + self.dropout(new_x)

        y = x = self.norm1(x)
        y = self.dropout(self.activation(self.conv1(y.transpose(-1,1))))
        y = self.dropout(self.conv2(y).transpose(-1,1))

        return self.norm2(x+y), attn
# 这是一个PaddlePaddle的Encoder类,继承自nn.Layer。在初始化方法中,它定义了一个nn.ModuleList类型的attn_layers成员变量,一个可选的nn.ModuleList类型的conv_layers成员变量和一个可选的norm_layer成员变量。
# 在forward方法中,它接收一个输入张量x和一个可选参数attn_mask(一个注意力掩码张量)。
# 首先,它遍历self.attn_layers中的每个自注意力机制实例和可选的卷积层实例,并使用zip函数将它们一一配对。对于每个配对,它使用attn_layer处理x,并将结果与attn_mask一起传递给attn_layer。
# 然后,它使用conv_layer处理x,并将结果与attn一起添加到attns列表中。
# 接着,它将x和最后一个自注意力机制的注意力张量添加到attns列表中。
# 如果conv_layers为None,则跳过卷积层处理。否则,在遍历完所有自注意力机制后,它将x和最后一个自注意力机制的注意力张量添加到attns列表中。
# 最后,如果self.norm不为None,则使用self.norm处理x。最终输出是处理后的x和每个自注意力机制的注意力张量列表attns。
class Encoder(nn.Layer):
    def __init__(self, attn_layers, conv_layers=None, norm_layer=None):
        super(Encoder, self).__init__()
        self.attn_layers = nn.ModuleList(attn_layers)
        self.conv_layers = nn.ModuleList(conv_layers) if conv_layers is not None else None
        self.norm = norm_layer

    def forward(self, x, attn_mask=None):
        # x [B, L, D]
        attns = []
        if self.conv_layers is not None:
            for attn_layer, conv_layer in zip(self.attn_layers, self.conv_layers):
                x, attn = attn_layer(x, attn_mask=attn_mask)
                x = conv_layer(x)
                attns.append(attn)
            x, attn = self.attn_layers[-1](x, attn_mask=attn_mask)
            attns.append(attn)
        else:
            for attn_layer in self.attn_layers:
                x, attn = attn_layer(x, attn_mask=attn_mask)
                attns.append(attn)

        if self.norm is not None:
            x = self.norm(x)

        return x, attns
# 这是一个PaddlePaddle的EncoderStack类,继承自nn.Layer。
# 在初始化方法中,它接受一个encoders列表和一个inp_lens列表作为参数。encoders列表包含了多个Encoder实例,inp_lens列表包含了每个Encoder实例对应的输入序列长度。
# 在forward方法中,它接收一个输入张量x和一个可选参数attn_mask(一个注意力掩码张量)。
# 首先,它遍历self.encoders中的每个Encoder实例,并使用zip函数将它们与self.inp_lens中的对应输入序列长度配对。
# 对于每个配对,它使用Encoder实例处理x[:, -inp_len:, :],并将结果添加到x_stack列表中。同时,它也将每个Encoder实例的注意力张量添加到attns列表中。
# 最后,它将x_stack列表中的所有张量沿着第2维度(即时间维度)连接起来,并返回处理后的x_stack和每个Encoder实例的注意力张量列表attns。
class EncoderStack(nn.Layer):
    def __init__(self, encoders, inp_lens):
        super(EncoderStack, self).__init__()
        self.encoders = nn.ModuleList(encoders)
        self.inp_lens = inp_lens

    def forward(self, x, attn_mask=None):
        # x [B, L, D]
        x_stack = []; attns = []
        for i_len, encoder in zip(self.inp_lens, self.encoders):
            inp_len = x.shape[1]//(2**i_len)
            x_s, attn = encoder(x[:, -inp_len:, :])
            x_stack.append(x_s); attns.append(attn)
        x_stack = paddle.cat(x_stack, -2)
        
        return x_stack, attns

3.6 Decoder(解码器)

# 这是一个PaddlePaddle的DecoderLayer类,继承自nn.Layer。
# 在初始化方法中,它定义了self_attention、cross_attention、d_model、d_ff(默认为4*d_model)、dropout(默认为0.1)和activation(默认为"relu")等参数。
# self_attention和cross_attention分别是自注意力机制和交叉注意力机制的实例。
# d_ff是FFN(前馈神经网络)的输入和输出通道数,默认为4*d_model。conv1和conv2是1D卷积层实例,用于将输入张量转换为d_model通道数。
# norm1、norm2和norm3是规范化层实例,用于对输入张量进行归一化。dropout是dropout层实例,用于防止过拟合。activation是激活函数实例,默认为ReLU激活函数,但也可以设置为GELU激活函数。
# 在forward方法中,它接收一个输入张量x、一个可选的注意力掩码attn_mask和多个可选参数,并返回输出张量x、self_attn_output、cross_attn_output和Norm后处理结果。
# 注意,这里有两个self_attn_output和cross_attn_output输出,分别表示自注意力机制和交叉注意力机制的输出。
# 这些输出还需要通过Norm层进行归一化处理,并使用dropout层进行dropout处理,最后通过激活函数进行激活。
class DecoderLayer(nn.Layer):
    def __init__(self, self_attention, cross_attention, d_model, d_ff=None,
                 dropout=0.1, activation="relu"):
        super(DecoderLayer, self).__init__()
        d_ff = d_ff or 4*d_model
        self.self_attention = self_attention
        self.cross_attention = cross_attention
        self.conv1 = nn.Conv1d(in_channels=d_model, out_channels=d_ff, kernel_size=1)
        self.conv2 = nn.Conv1d(in_channels=d_ff, out_channels=d_model, kernel_size=1)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.norm3 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)
        self.activation = F.relu if activation == "relu" else F.gelu
# 这是一个PaddlePaddle的DecoderLayer类的forward方法,用于执行Decoder的每一层计算。
# 该方法接受两个输入张量x和cross,以及可选参数x_mask和cross_mask,表示输入张量的注意力掩码。
# 首先,它使用self.self_attention计算自注意力机制的输出,并使用dropout、规范化层self.norm1和dropout对输出进行处理。
# 然后,它使用self.cross_attention计算交叉注意力机制的输出,并再次使用dropout、规范化层self.norm2和dropout对输出进行处理。
# 接下来,它使用1D卷积层self.conv1和self.conv2对输出进行处理,并使用dropout进行dropout处理。
# 最后,它将所有输出张量相加,并使用规范化层self.norm3对结果进行归一化处理。
# 需要注意的是,该方法返回的是经过所有处理后的最终输出张量。
    def forward(self, x, cross, x_mask=None, cross_mask=None):
        x = x + self.dropout(self.self_attention(
            x, x, x,
            attn_mask=x_mask
        )[0])
        x = self.norm1(x)

        x = x + self.dropout(self.cross_attention(
            x, cross, cross,
            attn_mask=cross_mask
        )[0])

        y = x = self.norm2(x)
        y = self.dropout(self.activation(self.conv1(y.transpose(-1,1))))
        y = self.dropout(self.conv2(y).transpose(-1,1))

        return self.norm3(x+y)
# 这是一个PaddlePaddle的Decoder类,继承自nn.Layer。
# 在初始化方法中,它接受一个layers参数,这是一个包含多个DecoderLayer实例的列表,以及一个可选的norm_layer参数,表示使用的规范化层类型。
# 在forward方法中,它遍历self.layers中的每个DecoderLayer实例,并传递输入张量x、交叉注意力机制的输出cross、注意力掩码x_mask和cross_mask。
# 最后,它使用规范化层self.norm对输出进行处理,并返回处理后的输出张量x。
class Decoder(nn.Layer):
    def __init__(self, layers, norm_layer=None):
        super(Decoder, self).__init__()
        self.layers = nn.ModuleList(layers)
        self.norm = norm_layer

    def forward(self, x, cross, x_mask=None, cross_mask=None):
        for layer in self.layers:
            x = layer(x, cross, x_mask=x_mask, cross_mask=cross_mask)

        if self.norm is not None:
            x = self.norm(x)

        return x

3.7 Informer模型

class Informer(nn.Layer):  
    def __init__(self, enc_in, dec_in, c_out, seq_len, label_len, out_len,   
                factor=5, d_model=512, n_heads=8, e_layers=3, d_layers=2, d_ff=512,   
                dropout=0.0, attn='prob', embed='fixed', freq='h', activation='gelu',   
                output_attention = False, distil=True, mix=True,  
                device=paddle.CUDAPlace(0)):  
        super(Informer, self).__init__()   
        self.pred_len = out_len  
        self.attn = attn  
        self.output_attention = output_attention
        # Encoding
        self.enc_embedding = DataEmbedding(enc_in, d_model, embed, freq, dropout)
        self.dec_embedding = DataEmbedding(dec_in, d_model, embed, freq, dropout)
        # Attention
        Attn = ProbAttention if attn=='prob' else FullAttention
        # Encoder
        self.encoder = Encoder(
            [
                EncoderLayer(
                    AttentionLayer(Attn(False, factor, attention_dropout=dropout, output_attention=output_attention), 
                                d_model, n_heads, mix=False),
                    d_model,
                    d_ff,
                    dropout=dropout,
                    activation=activation
                ) for l in range(e_layers)
            ],
            [
                ConvLayer(
                    d_model
                ) for l in range(e_layers-1)
            ] if distil else None,
            norm_layer=paddle.nn.LayerNorm(d_model)
        )
        # Decoder
        self.decoder = Decoder(
            [
                DecoderLayer(
                    AttentionLayer(Attn(True, factor, attention_dropout=dropout, output_attention=False), 
                                d_model, n_heads, mix=mix),
                    AttentionLayer(FullAttention(False, factor, attention_dropout=dropout, output_attention=False), 
                                d_model, n_heads, mix=False),
                    d_model,
                    d_ff,
                    dropout=dropout,
                    activation=activation,
                )
                for l in range(d_layers)
            ],
            norm_layer=paddle.nn.LayerNorm(d_model)
        )
        # self.end_conv1 = nn.Conv1d(in_channels=label_len+out_len, out_channels=out_len, kernel_size=1, bias=True)
        # self.end_conv2 = nn.Conv1d(in_channels=d_model, out_channels=c_out, kernel_size=1, bias=True)
        self.projection = nn.Linear(d_model, c_out, bias=True)
        
    def forward(self, x_enc, x_mark_enc, x_dec, x_mark_dec, 
                enc_self_mask=None, dec_self_mask=None, dec_enc_mask=None):
        enc_out = self.enc_embedding(x_enc, x_mark_enc)
        enc_out, attns = self.encoder(enc_out, attn_mask=enc_self_mask)

        dec_out = self.dec_embedding(x_dec, x_mark_dec)
        dec_out = self.decoder(dec_out, enc_out, x_mask=dec_self_mask, cross_mask=dec_enc_mask)
        dec_out = self.projection(dec_out)
        
        # dec_out = self.end_conv1(dec_out)
        # dec_out = self.end_conv2(dec_out.transpose(2,1)).transpose(1,2)
        if self.output_attention:
            return dec_out[:,-self.pred_len:,:], attns
        else:
            return dec_out[:,-self.pred_len:,:] # [B, L, D]


class InformerStack(nn.Layer):
    def __init__(self, enc_in, dec_in, c_out, seq_len, label_len, out_len, 
                factor=5, d_model=512, n_heads=8, e_layers=[3,2,1], d_layers=2, d_ff=512, 
                dropout=0.0, attn='prob', embed='fixed', freq='h', activation='gelu',
                output_attention = False, distil=True, mix=True,
                device=paddle.CUDAPlace(0)):  
        super(InformerStack, self).__init__()
        self.pred_len = out_len
        self.attn = attn
        self.output_attention = output_attention

        # Encoding
        self.enc_embedding = DataEmbedding(enc_in, d_model, embed, freq, dropout)
        self.dec_embedding = DataEmbedding(dec_in, d_model, embed, freq, dropout)
        # Attention
        Attn = ProbAttention if attn=='prob' else FullAttention
        # Encoder

        inp_lens = list(range(len(e_layers))) # [0,1,2,...] you can customize here
        encoders = [
            Encoder(
                [
                    EncoderLayer(
                        AttentionLayer(Attn(False, factor, attention_dropout=dropout, output_attention=output_attention), 
                                    d_model, n_heads, mix=False),
                        d_model,
                        d_ff,
                        dropout=dropout,
                        activation=activation
                    ) for l in range(el)
                ],
                [
                    ConvLayer(
                        d_model
                    ) for l in range(el-1)
                ] if distil else None,
                norm_layer=paddle.nn.LayerNorm(d_model)
            ) for el in e_layers]
        self.encoder = EncoderStack(encoders, inp_lens)
        # Decoder
        self.decoder = Decoder(
            [
                DecoderLayer(
                    AttentionLayer(Attn(True, factor, attention_dropout=dropout, output_attention=False), 
                                d_model, n_heads, mix=mix),
                    AttentionLayer(FullAttention(False, factor, attention_dropout=dropout, output_attention=False), 
                                d_model, n_heads, mix=False),
                    d_model,
                    d_ff,
                    dropout=dropout,
                    activation=activation,
                )
                for l in range(d_layers)
            ],
            norm_layer=paddle.nn.LayerNorm(d_model)
        )
        # self.end_conv1 = nn.Conv1d(in_channels=label_len+out_len, out_channels=out_len, kernel_size=1, bias=True)
        # self.end_conv2 = nn.Conv1d(in_channels=d_model, out_channels=c_out, kernel_size=1, bias=True)
        self.projection = nn.Linear(d_model, c_out, bias=True)
        
    def forward(self, x_enc, x_mark_enc, x_dec, x_mark_dec, 
                enc_self_mask=None, dec_self_mask=None, dec_enc_mask=None):
        enc_out = self.enc_embedding(x_enc, x_mark_enc)
        enc_out, attns = self.encoder(enc_out, attn_mask=enc_self_mask)

        dec_out = self.dec_embedding(x_dec, x_mark_dec)
        dec_out = self.decoder(dec_out, enc_out, x_mask=dec_self_mask, cross_mask=dec_enc_mask)
        dec_out = self.projection(dec_out)
        
        # dec_out = self.end_conv1(dec_out)
        # dec_out = self.end_conv2(dec_out.transpose(2,1)).transpose(1,2)
        if self.output_attention:
            return dec_out[:,-self.pred_len:,:], attns
        else:
            return dec_out[:,-self.pred_len:,:] # [B, L, D]
e, dec_enc_mask=None):
        enc_out = self.enc_embedding(x_enc, x_mark_enc)
        enc_out, attns = self.encoder(enc_out, attn_mask=enc_self_mask)

        dec_out = self.dec_embedding(x_dec, x_mark_dec)
        dec_out = self.decoder(dec_out, enc_out, x_mask=dec_self_mask, cross_mask=dec_enc_mask)
        dec_out = self.projection(dec_out)
        
        # dec_out = self.end_conv1(dec_out)
        # dec_out = self.end_conv2(dec_out.transpose(2,1)).transpose(1,2)
        if self.output_attention:
            return dec_out[:,-self.pred_len:,:], attns
        else:
            return dec_out[:,-self.pred_len:,:] # [B, L, D]

本篇文章在模型训练和预测部分就不作赘述了,后续我会推出相关的应用实战哦!

4. 关于作者 & 总结

4.2 复现作者:徐嘉祁 成都锦城学院 飞桨领航团团长

4.3 Informer可以解决长时间序列预测的问题,总体来说,该算法设计了ProbSparse自注意机制和蒸馏操作来处理vanillaTransformer中二次时间复杂度和二次内存使用的挑战。此外,精心设计的生成式解码器减轻了传统Encoder-Decoder架构的局限性。在实际数据上的实验证明了Informer在提高LSTF问题预测能力方面的有效性。

4.4 这个算法可以用于2023软件杯风电预测赛道哦!

4.5 关于Informer的相关应用复现敬请期待!

点赞加关注 找我不迷路❤

此文章为搬运
原项目链接

Logo

学大模型,用大模型上飞桨星河社区!每天8点V100G算力免费领!免费领取ERNIE 4.0 100w Token >>>

更多推荐