★★★ 本文源自AlStudio社区精品项目,【点击此处】查看更多精品内容 >>>

[AI特训营第三期]采用前沿分类网络PVT v2的十一类天气识别

一、项目背景

首先,全球气候变化是一个重要的研究领域,而天气变化是气候变化的一个重要组成部分。因此,对天气变化的深入理解和预测对于应对气候变化和减缓其影响至关重要。

其次,随着人工智能和机器学习技术的快速发展,深度学习已经成为许多领域的重要技术手段。在天气预报领域,深度学习已经被证明是一种有效的模式识别和数据处理方法,可以帮助预测天气变化。

最后,天气预报对于人们的生产生活和环境保护具有重要意义。因此,提高天气预报的准确性和可靠性对于保障人们的生产生活和维护环境的可持续性非常重要。

基于上述背景,我们设计了基于深度学习的天气分类项目。我们将利用深度学习技术来训练一个神经网络模型,该模型将可以对未来一段时间内的天气进行预测和分类。我们将使用大量的实时数据来训练和优化模型,以达到最好的预测和分类效果。

总之,基于深度学习的天气分类项目是为了利用深度学习技术提高天气预报的准确性和可靠性而设计的。该项目的背景和意义非常重要,有助于我们更好地理解和应对气候变化,并且对于人们的生产生活和环境保护具有重要的实际意义。
(written by 文心一言)

二、项目任务和项目路线

  • 采用前沿PVT v2作为分类网络
  • 采用了数据增强的方法来提高准确度
  • 采用了迁移学习的方式提高模型精度

同时任务精度要求为:
Accuracy≥80%
Precision≥80%

三、数据集介绍

该数据集包含6862张不同类型天气的图像,可用于基于图片实现天气分类。

图片被分为十一个类分别为: dew, fog/smog, frost, glaze, hail, lightning , rain, rainbow, rime, sandstorm and snow.

#解压数据集
!unzip data/data191244/Weather.zip
#导包
import paddle
import os
import cv2
import glob
import paddle.nn as nn
from paddle.io import Dataset
import pandas as pd
import paddle.vision.transforms as T
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
from PIL import Image
from sklearn import preprocessing
from paddle.vision.transforms import ToTensor
path = 'dataset'
path_imgs = list(glob.glob(path+'/**/*.jpg'))
#warning是因为libpng1.6及以上版本增强了ICC profiles检查,可以忽略
#清洗数据
for item in path_imgs:
    img = cv2.imread(item)
    if type(img) != np.ndarray:
        path_imgs.remove(item)
        print(item)
labels = list(map(lambda x:os.path.split(os.path.split(x)[0])[1], path_imgs))
file_path = pd.Series(path_imgs, name='File_Path').astype(str)
labels = pd.Series(labels, name='Labels')
data = pd.concat([file_path, labels], axis=1)
data = data.sample(frac=1).reset_index(drop=True)
data.head()
examples = pd.DataFrame([])
unique_labels = data['Labels'].unique()
for label in unique_labels:
    examples = pd.concat([examples, data.query(f"Labels == '{label}'").sample(1)])
fig = plt.figure(figsize=(16, 8))
for index, row in examples.reset_index().iterrows():
    ax = plt.subplot(4, 3, index + 1)
    image = cv2.imread(row["File_Path"])
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    image = cv2.resize(image, (256, 256), interpolation=cv2.INTER_AREA)
    plt.imshow(image)
    ax.set_title(row["Labels"].title(),fontsize=20)
    ax.axis("off")

fig.tight_layout(pad=0.1)
plt.show()

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-gIAILMGi-1681387852745)(main_files/main_11_0.png)]

counts = data.Labels.value_counts()
sns.barplot(x=counts.index, y=counts)
plt.axhline(y=counts.mean(), lw=3, color="#346c9a")
plt.xlabel('Labels')
plt.ylabel('Count')
plt.xticks(rotation=50);

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Xg1CnOg3-1681387852746)(main_files/main_12_0.png)]

enc=preprocessing.LabelEncoder()
enc=enc.fit(unique_labels)#训练LabelEncoder

class WeatherDataset(Dataset):
    def __init__(self,data,rate = 0.7,mode='train'):
        super(WeatherDataset, self).__init__()
        self.data = data.copy()
        self.data['Labels'] = enc.transform(self.data['Labels'])
        if mode == 'train':
            self.data = self.data[:int(len(self.data)*rate)]
            self.transforms = T.Compose([
            T.RandomResizedCrop((224,224)),    # 随机裁剪大小
            T.RandomHorizontalFlip(0.5),        # 随机水平翻转
            # T.ToTensor(),                       # 数据的格式转换和标准化 HWC => CHW  
            ])
            
        else:
            self.data = self.data[int(len(self.data)*rate):]
            self.transforms = T.Compose([
            ])
    
    def __getitem__(self, index):
        image_path, label = self.data.iloc[index]
        img = cv2.imread(image_path, cv2.IMREAD_COLOR)
        if type(img)==np.ndarray:
            img = cv2.resize(img,(224, 224))
            img = self.transforms(img)
            image = np.array(img,dtype = 'float32')
            image = paddle.vision.transforms.to_tensor(image, data_format='CHW').astype('float32')
            return image, label
    def __len__(self):
        return len(self.data)
# 测试定义的数据集
train_dataset = WeatherDataset(data=data,mode='train')
val_dataset = WeatherDataset(data=data,mode='test')
train_dataset.__len__(),val_dataset.__len__()

四、代码实现

4.1模型介绍

最近对vision Transformer are converging on the backbone network为图像分类,目标检测、实例和语义分割等下游视觉任务而设计。迄今为止,已经取得了可喜成果。例如,ViT证明了pure Transformer可以在图像分类任务中完成SOTA。PVT v1表明在检测和分割等密集预测中,pure Transformer主干也可以超过CNN counterparts,之后,swin Transformer,CoaT,LeViT和Twin进一步提高了Transformer主干的分类,检测和分割等性能。

PVT v2旨在建立更强大and more feasible baselines built on the PVTv1 framework。三个设计改进,即(1)线性复杂度注意力层,(2)重叠块嵌入overlapping patch embedding;(3)卷积前馈网络。当与PVTv1一起使用时,可以带来更好的性能,改进的框架成为PVT v2,具体来说PVT v2-B5【PVTv2有6中不同大小的变体,from B0-B5】在ImageNet上产生了83.8%的top-1 accuracy由于Swin-B和Swin-SVT-L,而PVTv2具有更少的参数和FLOPs。

paper:https://arxiv.org/pdf/2106.13797.pdf

code:https://github.com/whai362/PVT

总而言之, PVT v2具有以下优点

  • PVT v2 引入了卷积操作、zero-padding、avgpool的注意力层,从三个方面提升了性能
  • 相比同时期的ViT模型,具有更小的参数量和计算量
  • 在下游任务,PVT v2 展现了良好的性能

下图给出PVT v2在下游任务中的性能评价

4.2构造模型

import paddle
import paddle.nn as nn
import paddle.nn.functional as F
from functools import partial
import math

trunc_normal_ = nn.initializer.TruncatedNormal(std=.02)
zeros_ = nn.initializer.Constant(value=0.)
ones_ = nn.initializer.Constant(value=1.)
kaiming_normal_ = nn.initializer.KaimingNormal()
#定义网络基本模型
def to_2tuple(x):
    return tuple([x] * 2)

def swapdim(x, dim1, dim2):
    a = list(range(len(x.shape)))
    a[dim1], a[dim2] = a[dim2], a[dim1]

    return x.transpose(a)

def drop_path(x, drop_prob = 0., training = False):

    if drop_prob == 0. or not training:
        return x
    keep_prob = 1 - drop_prob
    shape = (x.shape[0],) + (1,) * (x.ndim - 1)  
    random_tensor = paddle.to_tensor(keep_prob) + paddle.rand(shape)
    random_tensor = paddle.floor(random_tensor) 
    output = x.divide(keep_prob) * random_tensor
    return output

class DropPath(nn.Layer):

    def __init__(self, drop_prob=None):
        super(DropPath, self).__init__()
        self.drop_prob = drop_prob

    def forward(self, x):
        return drop_path(x, self.drop_prob, self.training)


class Identity(nn.Layer):                      

    def __init__(self, *args, **kwargs):
        super(Identity, self).__init__()
 
    def forward(self, input):
        return input
#构造网络主干部分
class Mlp(nn.Layer):
    def __init__(self, in_features, hidden_features=None, out_features=None, act_layer=nn.GELU, drop=0., linear=False):
        super().__init__()
        out_features = out_features or in_features
        hidden_features = hidden_features or in_features
        self.fc1 = nn.Linear(in_features, hidden_features)
        self.dwconv = DWConv(hidden_features)
        self.act = act_layer()
        self.fc2 = nn.Linear(hidden_features, out_features)
        self.drop = nn.Dropout(drop)
        self.linear = linear
        if self.linear:
            self.relu = nn.ReLU()


    def forward(self, x, H, W):
        x = self.fc1(x)
        if self.linear:
            x = self.relu(x)
        x = self.dwconv(x, H, W)
        x = self.act(x)
        x = self.drop(x)
        x = self.fc2(x)
        x = self.drop(x)
        return x


class Attention(nn.Layer):
    def __init__(self, dim, num_heads=8, qkv_bias=False, qk_scale=None, attn_drop=0., proj_drop=0., sr_ratio=1, linear=False):
        super().__init__()
        assert dim % num_heads == 0, f"dim {dim} should be divided by num_heads {num_heads}."

        self.dim = dim
        self.num_heads = num_heads
        head_dim = dim // num_heads
        self.scale = qk_scale or head_dim ** -0.5

        self.q = nn.Linear(dim, dim, bias_attr=qkv_bias)
        self.kv = nn.Linear(dim, dim * 2, bias_attr=qkv_bias)
        self.attn_drop = nn.Dropout(attn_drop)
        self.proj = nn.Linear(dim, dim)
        self.proj_drop = nn.Dropout(proj_drop)

        self.linear = linear
        self.sr_ratio = sr_ratio
        if not linear:
            if sr_ratio > 1:
                self.sr = nn.Conv2D(dim, dim, kernel_size=sr_ratio, stride=sr_ratio)
                self.norm = nn.LayerNorm(dim)
        else:
            self.pool = nn.AdaptiveAvgPool2D(7)
            self.sr = nn.Conv2D(dim, dim, kernel_size=1, stride=1)
            self.norm = nn.LayerNorm(dim)
            self.act = nn.GELU()


    def forward(self, x, H, W):
        B, N, C = x.shape
        q = self.q(x).reshape([B, N, self.num_heads, C // self.num_heads]).transpose([0, 2, 1, 3])

        if not self.linear:
            if self.sr_ratio > 1:
                x_ = x.transpose([0, 2, 1]).reshape([B, C, H, W])
                x_ = self.sr(x_).reshape([B, C, -1]).transpose([0, 2, 1])
                x_ = self.norm(x_)
                kv = self.kv(x_).reshape([B, -1, 2, self.num_heads, C // self.num_heads]).transpose([2, 0, 3, 1, 4])
            else:
                kv = self.kv(x).reshape([B, -1, 2, self.num_heads, C // self.num_heads]).transpose([2, 0, 3, 1, 4])
        else:
            x_ = x.transpose([0, 2, 1]).reshape([B, C, H, W])
            x_ = self.sr(self.pool(x_)).reshape([B, C, -1]).transpose([0, 2, 1])
            x_ = self.norm(x_)
            x_ = self.act(x_)
            kv = self.kv(x_).reshape([B, -1, 2, self.num_heads, C // self.num_heads]).transpose([2, 0, 3, 1, 4])
        k, v = kv[0], kv[1]

        attn = (q @ swapdim(k, -2, -1)) * self.scale
        attn = F.softmax(attn, axis=-1)
        attn = self.attn_drop(attn)

        x = swapdim((attn @ v), 1, 2).reshape([B, N, C])
        x = self.proj(x)
        x = self.proj_drop(x)

        return x


class Block(nn.Layer):

    def __init__(self, dim, num_heads, mlp_ratio=4., qkv_bias=False, qk_scale=None, drop=0., attn_drop=0.,
                 drop_path=0., act_layer=nn.GELU, norm_layer=nn.LayerNorm, sr_ratio=1, linear=False):
        super().__init__()
        self.norm1 = norm_layer(dim)
        self.attn = Attention(
            dim,
            num_heads=num_heads, qkv_bias=qkv_bias, qk_scale=qk_scale,
            attn_drop=attn_drop, proj_drop=drop, sr_ratio=sr_ratio, linear=linear)
        # NOTE: drop path for stochastic depth, we shall see if this is better than dropout here
        self.drop_path = DropPath(drop_path) if drop_path > 0. else Identity()
        self.norm2 = norm_layer(dim)
        mlp_hidden_dim = int(dim * mlp_ratio)
        self.mlp = Mlp(in_features=dim, hidden_features=mlp_hidden_dim, act_layer=act_layer, drop=drop, linear=linear)

    def forward(self, x, H, W):
        x = x + self.drop_path(self.attn(self.norm1(x), H, W))
        x = x + self.drop_path(self.mlp(self.norm2(x), H, W))

        return x


class OverlapPatchEmbed(nn.Layer):
    """ Image to Patch Embedding
    """

    def __init__(self, img_size=224, patch_size=7, stride=4, in_chans=3, embed_dim=768):
        super().__init__()
        img_size = to_2tuple(img_size)
        patch_size = to_2tuple(patch_size)

        self.img_size = img_size
        self.patch_size = patch_size
        self.H, self.W = img_size[0] // patch_size[0], img_size[1] // patch_size[1]
        self.num_patches = self.H * self.W
        self.proj = nn.Conv2D(in_chans, embed_dim, kernel_size=patch_size, stride=stride,
                              padding=(patch_size[0] // 2, patch_size[1] // 2))
        self.norm = nn.LayerNorm(embed_dim)

    def forward(self, x):
        x = self.proj(x)
        _, _, H, W = x.shape
        x = x.flatten(2)
        x = swapdim(x, 1, 2)
        x = self.norm(x)

        return x, H, W

#定义网络
class PyramidVisionTransformerV2(nn.Layer):
    def __init__(self, img_size=224, patch_size=16, in_chans=3, num_classes=1000, embed_dims=[64, 128, 256, 512],
                 num_heads=[1, 2, 4, 8], mlp_ratios=[4, 4, 4, 4], qkv_bias=False, qk_scale=None, drop_rate=0.,
                 attn_drop_rate=0., drop_path_rate=0., norm_layer=nn.LayerNorm,
                 depths=[3, 4, 6, 3], sr_ratios=[8, 4, 2, 1], num_stages=4, linear=False):
        super().__init__()
        self.num_classes = num_classes
        self.depths = depths
        self.num_stages = num_stages

        dpr = [x for x in paddle.linspace(0, drop_path_rate, sum(depths))]  # stochastic depth decay rule
        cur = 0

        for i in range(num_stages):
            patch_embed = OverlapPatchEmbed(img_size=img_size if i == 0 else img_size // (2 ** (i + 1)),
                                            patch_size=7 if i == 0 else 3,
                                            stride=4 if i == 0 else 2,
                                            in_chans=in_chans if i == 0 else embed_dims[i - 1],
                                            embed_dim=embed_dims[i])

            block = nn.LayerList([Block(
                dim=embed_dims[i], num_heads=num_heads[i], mlp_ratio=mlp_ratios[i], qkv_bias=qkv_bias, qk_scale=qk_scale,
                drop=drop_rate, attn_drop=attn_drop_rate, drop_path=dpr[cur + j], norm_layer=norm_layer,
                sr_ratio=sr_ratios[i], linear=linear)
                for j in range(depths[i])])
            norm = norm_layer(embed_dims[i])
            cur += depths[i]

            setattr(self, f"patch_embed{i + 1}", patch_embed)
            setattr(self, f"block{i + 1}", block)
            setattr(self, f"norm{i + 1}", norm)

        # classification head
        self.head = nn.Linear(embed_dims[3], num_classes) if num_classes > 0 else Identity()

    def freeze_patch_emb(self):
        self.patch_embed1.requires_grad = False

    def reset_classifier(self, num_classes, global_pool=''):
        self.num_classes = num_classes
        self.head = nn.Linear(self.embed_dim, num_classes) if num_classes > 0 else Identity()

    def forward_features(self, x):
        B = x.shape[0]

        for i in range(self.num_stages):
            patch_embed = getattr(self, f"patch_embed{i + 1}")
            block = getattr(self, f"block{i + 1}")
            norm = getattr(self, f"norm{i + 1}")
            x, H, W = patch_embed(x)
            for blk in block:
                x = blk(x, H, W)
            x = norm(x)
            if i != self.num_stages - 1:
                x = x.reshape([B, H, W, -1]).transpose([0, 3, 1, 2])
        return x.mean(axis=1)

    def forward(self, x):
        x = self.forward_features(x)
        x = self.head(x)

        return x


class DWConv(nn.Layer):
    def __init__(self, dim=768):
        super(DWConv, self).__init__()
        self.dwconv = nn.Conv2D(dim, dim, 3, 1, 1, bias_attr=True, groups=dim)

    def forward(self, x, H, W):
        B, N, C = x.shape
        x = swapdim(x, 1, 2)
        x = x.reshape([B, C, H, W])
        x = self.dwconv(x)
        x = x.flatten(2)
        x = swapdim(x, 1, 2)

        return x

4.3 权重加载

#加载权重
network = PyramidVisionTransformerV2(
        patch_size=4, embed_dims=[64, 128, 320, 512],num_classes=11, num_heads=[1, 2, 5, 8], mlp_ratios=[8, 8, 4, 4], qkv_bias=True,
        norm_layer=partial(nn.LayerNorm, epsilon=1e-6), depths=[3, 4, 6, 3], sr_ratios=[8, 4, 2, 1], linear=True)
network.set_state_dict(paddle.load('pvt_v2_b2_li.pdparams'))
#查看网络构造
model = paddle.Model(network)
model.summary((-1, ) + tuple((3,224,224)))

4.4模型训练

#模型训练
EPOCHS = 20
BATCH_SIZE = 100

optimizer = paddle.optimizer.Adamax(learning_rate=0.0001, parameters=model.parameters())

# 模型训练配置
model.prepare(optimizer,  # 优化器
              paddle.nn.CrossEntropyLoss(),        # 损失函数
              [paddle.metric.Accuracy()] # 评估指标
)
# 训练可视化VisualDL工具的回调函数
visualdl = paddle.callbacks.VisualDL(log_dir='visualdl_log')

# 启动模型全流程训练
model.fit(train_dataset,            # 训练数据集
          val_dataset,            # 评估数据集
          epochs=EPOCHS,            # 总的训练轮次
          batch_size=BATCH_SIZE,    # 批次计算的样本量大小
          shuffle=True,             # 是否打乱样本集
          verbose=1,         # 日志展示格式
          callbacks=[visualdl])     # 回调函数使用

4.5训练结果可视化

#保存模型权重
model.save('model')

五、模型评价

#加载训练好的权重
model.load('model')
#模型评价
model.evaluate(val_dataset, batch_size=72, verbose=1)
result = model.predict(val_dataset, batch_size=72)
from sklearn.metrics import precision_score,accuracy_score
pre_label = []
for i in range(len(result[0])):
    for j in range(result[0][i].shape[0]):
        pre_label.append(np.argmax(result[0][i][j]))
real_label = data[int(len(data)*0.7):]['Labels'].copy()
real_label = enc.transform(real_label)
real_label = list(real_label)       
print('测试集上准确率和查准率分别为:',end='')
accuracy_score(pre_label,real_label),precision_score(pre_label,real_label,average='macro')
测试集上准确率和查准率分别为:




(0.9208833839656291, 0.9115646258503401)

六、效果展示

test_path ='dataset/frost/3606.jpg'

#把数据转化成tensor
test_img = cv2.imread(test_path)
plt.imshow(test_img)
test_img = cv2.resize(test_img,(224,224))
test_img = test_img.transpose(2,0,1)
test_img = test_img.reshape(1,1,3,224,224)
test = paddle.to_tensor(test_img,dtype='float32')

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-zIU37x4x-1681387852749)(main_files/main_36_0.png)]

#开始预测
pred = model.predict(test)
pred = np.argmax(pred[0][0])
pred = enc.inverse_transform([pred])
pred
Predict begin...
step 1/1 [==============================] - 40ms/step
Predict samples: 1

1

array(['frost'], dtype=object)

七、总结

  • 1.本项目可以使用迁移学习的方式来提高精度
  • 2.可以通过数据增强的方式来扩充数据集来提高精度
  • 3.可以使用更加前沿的模型来提高精度
  • 4.如果对推理速度有较高需求,可以使用更加轻量的模型

八、作者介绍

作者:姓名:李灿 个人主页

指导老师:姓名:郑博培 个人主页

Logo

学大模型,用大模型上飞桨星河社区!每天8点V100G算力免费领!免费领取ERNIE 4.0 100w Token >>>

更多推荐