1. Asymmetric Contextual Modulation for Infrared Small Target Detection

本文的贡献如下:

  • 开源数据集 Sirst Dai
  • 提出ACM模块,可以实现小目标浅层和深层信息的高效交互。
  • 超越了当时的其他算法。

目前该文章的代码一共有两版本Dai Mxnet 以及Zhang Pytorch,本项目中将采用Paddle复现。

推荐阅读PP-ISTD:Dense Nested Attention Network

2. 论文解读

文章的核心模型ACM如下:

研究动机:

  • 1)如何构建深度模型来检测缺乏内在信息的红外小目标;
  • 2) 如何在不影响目标细节的情况下对高层上下文信息进行编码。

ACM由,自下而上、自上而下两个核心模块构成:如图1

在这里插入图片描述

具体的计算流程如下:

X , Y 分别代表相邻的渐层特征和深层特征

  • X ′ = G ( Y ) ⊗ X = σ   ( B   ( W 2 δ   ( B   ( W 1 N ) ) ) ) ⊗ X {\bf X}^{\prime}={\bf G}({\bf Y})\otimes{\bf X}=\sigma\,(B\,({\bf W}_{2}\delta\,(B\,({\bf W}_{1N}))))\otimes{\bf X} X=G(Y)X=σ(B(W2δ(B(W1N))))X

其中 σ \sigma σ 为激活函数,论文中为Rele ⊗ \otimes 为Sigmoid函数,B为Batch Normalization , W则是全连接层。

  • L ( X ) = σ ( B ( P W C o n v 2 ( δ ( P W C o n v 1 ( X ) ) ) ) ) \mathbf{L}(\mathbf{X})={\boldsymbol{\sigma}}\left({\boldsymbol{B}}\left(\mathbf{P}\mathbf{W}\mathbf{C}\mathrm{on}\mathbf{v}_{2}\left(\delta\left(\mathbf{P}\mathbf{W}\mathbf{C}\mathrm{on}\mathbf{v}_{1}(\mathbf{X})\right)\right)\right)\right) L(X)=σ(B(PWConv2(δ(PWConv1(X)))))

PWConv为1x1的卷积

  • Y ′ = L ( X ) ⊗ Y {\bf Y}^{\prime}={\bf L}({\bf X})\otimes{\bf Y} Y=L(X)Y

  • Z = G ( Y ) ⊗ X = D ( X ) ⊗ Y {\bf Z}=\mathrm{G}({\bf Y})\otimes\bf X=\mathrm{D}(\bf X)\otimes\bf Y Z=G(Y)X=D(X)Y
    在这里插入图片描述

通过上式计算得到融合的特征Z。

3. 结果展示

在这里插入图片描述

4. 论文复现

4. 1 环境依赖

PaddlePaddle 2.3 PaddleSeg

4.2 数据集

本项目已经下载好相关数据,无需下载

Sirst Dai

数据集部分图像
在这里插入图片描述

!unzip -o sirst/images.zip -d sirst/
!unzip -o sirst/masks.zip -d  sirst/

4.3. 评价指标

本文采用了IOU, nIOU作为评价指标

相关指标的介绍可以参考 link , 下面给出了基于Paddle的指标计算代码

import numpy as np
import paddle.nn.functional as F

class SigmoidMetric():
    def __init__(self):
        self.reset()

    def update(self, pred, labels):
        correct, labeled = self.batch_pix_accuracy(pred, labels)
        inter, union = self.batch_intersection_union(pred, labels)

        self.total_correct += correct
        self.total_label += labeled
        self.total_inter += inter
        self.total_union += union

    def get(self):
        """Gets the current evaluation result."""
        pixAcc = 1.0 * self.total_correct / (np.spacing(1) + self.total_label)
        IoU = 1.0 * self.total_inter / (np.spacing(1) + self.total_union)
        mIoU = IoU.mean()
        return pixAcc, mIoU

    def reset(self):
        """Resets the internal evaluation result to initial state."""
        self.total_inter = 0
        self.total_union = 0
        self.total_correct = 0
        self.total_label = 0

    def batch_pix_accuracy(self, output, target):
        assert output.shape == target.shape
        output = output.numpy()
        target = target.numpy()

        predict = (output > 0).astype('int64') # P
        pixel_labeled = np.sum(target > 0) # T
        pixel_correct = np.sum((predict == target)*(target > 0)) # TP
        assert pixel_correct <= pixel_labeled
        return pixel_correct, pixel_labeled

    def batch_intersection_union(self, output, target):
        mini = 1
        maxi = 1 # nclass
        nbins = 1 # nclass
        predict = (output.numpy() > 0).astype('int64') # P
        target = target.numpy().astype('int64') # T
        intersection = predict * (predict == target) # TP

        # areas of intersection and union
        area_inter, _ = np.histogram(intersection, bins=nbins, range=(mini, maxi))
        area_pred, _ = np.histogram(predict, bins=nbins, range=(mini, maxi))
        area_lab, _ = np.histogram(target, bins=nbins, range=(mini, maxi))
        area_union = area_pred + area_lab - area_inter
        assert (area_inter <= area_union).all()
        return area_inter, area_union


class SamplewiseSigmoidMetric():
    def __init__(self, nclass, score_thresh=0.5):
        self.nclass = nclass
        self.score_thresh = score_thresh
        self.reset()

    def update(self, preds, labels):
        """Updates the internal evaluation result."""
        inter_arr, union_arr = self.batch_intersection_union(preds, labels,
                                                             self.nclass, self.score_thresh)
        self.total_inter = np.append(self.total_inter, inter_arr)
        self.total_union = np.append(self.total_union, union_arr)

    def get(self):
        """Gets the current evaluation result."""
        IoU = 1.0 * self.total_inter / (np.spacing(1) + self.total_union)
        mIoU = IoU.mean()
        return IoU, mIoU

    def reset(self):
        """Resets the internal evaluation result to initial state."""
        self.total_inter = np.array([])
        self.total_union = np.array([])
        self.total_correct = np.array([])
        self.total_label = np.array([])

    def batch_intersection_union(self, output, target, nclass, score_thresh):
        """mIoU"""
        # inputs are tensor
        # the category 0 is ignored class, typically for background / boundary
        mini = 1
        maxi = 1  # nclass
        nbins = 1  # nclass

        predict = (F.sigmoid(output).numpy() > score_thresh).astype('int64') # P
        target = target.numpy().astype('int64') # T
        intersection = predict * (predict == target) # TP

        num_sample = intersection.shape[0]
        area_inter_arr = np.zeros(num_sample)
        area_pred_arr = np.zeros(num_sample)
        area_lab_arr = np.zeros(num_sample)
        area_union_arr = np.zeros(num_sample)

        for b in range(num_sample):
            # areas of intersection and union
            area_inter, _ = np.histogram(intersection[b], bins=nbins, range=(mini, maxi))
            area_inter_arr[b] = area_inter

            area_pred, _ = np.histogram(predict[b], bins=nbins, range=(mini, maxi))
            area_pred_arr[b] = area_pred

            area_lab, _ = np.histogram(target[b], bins=nbins, range=(mini, maxi))
            area_lab_arr[b] = area_lab

            area_union = area_pred + area_lab - area_inter
            area_union_arr[b] = area_union

            assert (area_inter <= area_union).all()

        return area_inter_arr, area_union_arr


class ROCMetric():
    def __init__(self, nclass, bins):
        self.nclass = nclass
        self.bins = bins
        self.tp_arr = np.zeros(self.bins+1)
        self.pos_arr = np.zeros(self.bins+1)
        self.fp_arr = np.zeros(self.bins+1)
        self.neg_arr = np.zeros(self.bins+1)

    def update(self, preds, labels):
        for iBin in range(self.bins+1):
            score_thresh = (iBin + 0.0) / self.bins
            i_tp, i_pos, i_fp, i_neg = cal_tp_pos_fp_neg(preds, labels, self.nclass, score_thresh)

            self.tp_arr[iBin] += i_tp
            self.pos_arr[iBin] += i_pos
            self.fp_arr[iBin] += i_fp
            self.neg_arr[iBin] += i_neg

    def get(self):
        tp_rates = self.tp_arr / (self.pos_arr + 0.001)
        fp_rates = self.fp_arr / (self.neg_arr + 0.001)

        return tp_rates, fp_rates

def cal_tp_pos_fp_neg(output, target, nclass, score_thresh):
    mini = 1
    maxi = 1 # nclass
    nbins = 1 # nclass

    predict = (F.sigmoid(output).numpy() > score_thresh).astype('int64') # P
    target = target.numpy().astype('int64')  # T
    intersection = predict * (predict == target) # TP
    tp = intersection.sum()
    fp = (predict * (predict != target)).sum()  # FP
    tn = ((1 - predict) * (predict == target)).sum()  # TN
    fn = ((predict != target) * (1 - predict)).sum()   # FN
    pos = tp + fn
    neg = fp + tn
    return tp, pos, fp, neg

5 模型搭建

import paddle
from paddle import nn
import paddle.nn.functional as F
use_gpu = True
paddle.device.set_device('gpu:0') if use_gpu else paddle.device.set_device('cpu')
paddle.seed(1024)
<paddle.fluid.core_avx.Generator at 0x7fda54e7fb30>
class BiLocalChaFuseReduce(nn.Layer):
    def __init__(self, in_high_channels, in_low_channels, out_channels=64, r=4):
        super(BiLocalChaFuseReduce, self).__init__()

        assert in_low_channels == out_channels
        self.high_channels = in_high_channels
        self.low_channels = in_low_channels
        self.out_channels = out_channels
        self.bottleneck_channels = int(out_channels // r)

        self.feature_high = nn.Sequential(
            nn.Conv2D(self.high_channels, self.out_channels, 1, 1, 0),
            nn.BatchNorm2D(self.out_channels),
            nn.ReLU(True),
        )

        self.topdown = nn.Sequential(
            nn.Conv2D(self.out_channels, self.bottleneck_channels, 1, 1, 0),
            nn.BatchNorm2D(self.bottleneck_channels),
            nn.ReLU(True),

            nn.Conv2D(self.bottleneck_channels, self.out_channels, 1, 1, 0),
            nn.BatchNorm2D(self.out_channels),
            nn.Sigmoid()
        )

        self.bottomup = nn.Sequential(
            nn.Conv2D(self.low_channels, self.bottleneck_channels, 1, 1, 0),
            nn.BatchNorm2D(self.bottleneck_channels),
            nn.ReLU(True),

            nn.Conv2D(self.bottleneck_channels, self.out_channels, 1, 1, 0),
            nn.BatchNorm2D(self.out_channels),
            nn.Sigmoid(),
        )

        self.post = nn.Sequential(
            nn.Conv2D(self.out_channels, self.out_channels, 3, 1, 1),
            nn.BatchNorm2D(self.out_channels),
            nn.ReLU(True),
        )

    def forward(self, xh, xl):
        xh = self.feature_high(xh)
        topdown_wei = self.topdown(xh)
        bottomup_wei = self.bottomup(xl)

        out = 2 * xl * topdown_wei + 2* xh * bottomup_wei
        out = self.post(out)
        return out


class AsymBiChaFuseReduce(nn.Layer):
    def __init__(self, in_high_channels, in_low_channels, out_channels=64, r=4):
        super(AsymBiChaFuseReduce, self).__init__()

        assert in_low_channels == out_channels
        self.high_channels = in_high_channels
        self.low_channels = in_low_channels
        self.out_channels = out_channels
        self.bottleneck_channels = int(out_channels // r)

        self.feature_high = nn.Sequential(
            nn.Conv2D(self.high_channels, self.out_channels, 1, 1, 0),
            nn.BatchNorm2D(out_channels),
            nn.ReLU(True),
        )

        self.topdown = nn.Sequential(
            nn.AdaptiveAvgPool2D((1, 1)),
            nn.Conv2D(self.out_channels, self.bottleneck_channels, 1, 1, 0),
            nn.BatchNorm2D(self.bottleneck_channels),
            nn.ReLU(True),

            nn.Conv2D(self.bottleneck_channels, self.out_channels, 1, 1, 0),
            nn.BatchNorm2D(self.out_channels),
            nn.Sigmoid(),
        )

        self.bottomup = nn.Sequential(
            nn.Conv2D(self.low_channels, self.bottleneck_channels, 1, 1, 0),
            nn.BatchNorm2D(self.bottleneck_channels),
            nn.ReLU(True),

            nn.Conv2D(self.bottleneck_channels, self.out_channels, 1, 1, 0),
            nn.BatchNorm2D(self.out_channels),
            nn.Sigmoid(),
        )

        self.post = nn.Sequential(
            nn.Conv2D(self.out_channels, self.out_channels, 3, 1, 1),
            nn.BatchNorm2D(self.out_channels),
            nn.ReLU(True),
        )

    def forward(self, xh, xl):
        xh = self.feature_high(xh)

        topdown_wei = self.topdown(xh)
        bottomup_wei = self.bottomup(xl)
        xs = 2 * xl * topdown_wei + 2 * xh * bottomup_wei
        out = self.post(xs)
        return out


class BiGlobalChaFuseReduce(nn.Layer):
    def __init__(self, in_high_channels, in_low_channels, out_channels=64, r=4):
        super(BiGlobalChaFuseReduce, self).__init__()

        assert in_low_channels == out_channels
        self.high_channels = in_high_channels
        self.low_channels = in_low_channels
        self.out_channels = out_channels
        self.bottleneck_channels = int(out_channels // r)

        self.feature_high = nn.Sequential(
            nn.Conv2D(self.high_channels, self.out_channels, 1, 1, 0),
            nn.BatchNorm2D(out_channels),
            nn.ReLU(True),
        )

        self.topdown = nn.Sequential(
            nn.AdaptiveAvgPool2D((1, 1)),
            nn.Conv2D(self.out_channels, self.bottleneck_channels, 1, 1, 0),
            nn.BatchNorm2D(self.bottleneck_channels),
            nn.ReLU(True),

            nn.Conv2D(self.bottleneck_channels, self.out_channels, 1, 1, 0),
            nn.BatchNorm2D(self.out_channels),
            nn.Sigmoid(),
        )

        self.bottomup = nn.Sequential(
            nn.AdaptiveAvgPool2D((1, 1)),
            nn.Conv2D(self.low_channels, self.bottleneck_channels, 1, 1, 0),
            nn.BatchNorm2D(self.bottleneck_channels),
            nn.ReLU(True),

            nn.Conv2D(self.bottleneck_channels, self.out_channels, 1, 1, 0),
            nn.BatchNorm2D(self.out_channels),
            nn.Sigmoid(),
        )

        self.post = nn.Sequential(
            nn.Conv2D(self.out_channels, self.out_channels, 3, 1, 1),
            nn.BatchNorm2D(self.out_channels),
            nn.ReLU(True),
        )

    def forward(self, xh, xl):
        xh = self.feature_high(xh)

        topdown_wei = self.topdown(xh)
        bottomup_wei = self.bottomup(xl)
        xs = 2 * xl * topdown_wei + 2 * xh * bottomup_wei
        out = self.post(xs)
        return out      
class ResidualBlock(nn.Layer):
    def __init__(self, in_channels, out_channels, stride, downsample):
        super(ResidualBlock, self).__init__()
        self.body = nn.Sequential(
            nn.Conv2D(in_channels, out_channels, 3, stride, 1, bias_attr=False),
            nn.BatchNorm2D(out_channels),
            nn.ReLU(True),

            nn.Conv2D(out_channels, out_channels, 3, 1, 1, bias_attr=False),
            nn.BatchNorm2D(out_channels),
        )
        if downsample:
            self.downsample = nn.Sequential(
                nn.Conv2D(in_channels, out_channels, 1, stride, 0, bias_attr=False),
                nn.BatchNorm2D(out_channels),
            )
        else:
            self.downsample = nn.Sequential()

    def forward(self, x):
        residual = x
        x = self.body(x)

        if self.downsample:
            residual = self.downsample(residual)

        out = F.relu(x+residual, True)
        return out


class _FCNHead(nn.Layer):
    def __init__(self, in_channels, out_channels):
        super(_FCNHead, self).__init__()
        inter_channels = in_channels // 4
        self.block = nn.Sequential(
            nn.Conv2D(in_channels, inter_channels, 3, 1, 1, bias_attr=False),
            nn.BatchNorm2D(inter_channels),
            nn.ReLU(True),
            nn.Dropout(0.1),
            nn.Conv2D(inter_channels, out_channels, 1, 1, 0)
        )

    def forward(self, x):
        return self.block(x)


class ASKCResNetFPN(nn.Layer):
    def __init__(self, layer_blocks, channels, fuse_mode='AsymBi'):
        super(ASKCResNetFPN, self).__init__()

        stem_width = channels[0]
        self.stem = nn.Sequential(
            nn.BatchNorm2D(3),
            nn.Conv2D(3, stem_width, 3, 2, 1, bias_attr=False),
            nn.BatchNorm2D(stem_width),
            nn.ReLU(True),

            nn.Conv2D(stem_width, stem_width, 3, 1, 1, bias_attr=False),
            nn.BatchNorm2D(stem_width),
            nn.ReLU(True),

            nn.Conv2D(stem_width, stem_width*2, 3, 1, 1, bias_attr=False),
            nn.BatchNorm2D(stem_width*2),
            nn.ReLU(True),
            nn.MaxPool2D(3, 2, 1)
        )

        self.layer1 = self._make_layer(block=ResidualBlock, block_num=layer_blocks[0],
                                       in_channels=channels[1], out_channels=channels[1], stride=1)
        self.layer2 = self._make_layer(block=ResidualBlock, block_num=layer_blocks[1],
                                       in_channels=channels[1], out_channels=channels[2], stride=2)
        self.layer3 = self._make_layer(block=ResidualBlock, block_num=layer_blocks[2],
                                       in_channels=channels[2], out_channels=channels[3], stride=2)

        self.fuse23 = self._fuse_layer(channels[3], channels[2], channels[2], fuse_mode)
        self.fuse12 = self._fuse_layer(channels[2], channels[1], channels[1], fuse_mode)

        self.head = _FCNHead(channels[1], 1)

    def forward(self, x):
        _, _, hei, wid = x.shape

        x = self.stem(x)
        c1 = self.layer1(x)
        c2 = self.layer2(c1)
        out = self.layer3(c2)

        out = F.interpolate(out, size=[hei//8, wid//8], mode='bilinear')
        out = self.fuse23(out, c2)

        out = F.interpolate(out, size=[hei//4, wid//4], mode='bilinear')
        out = self.fuse12(out, c1)

        pred = self.head(out)
        out = F.interpolate(pred, size=[hei, wid], mode='bilinear')

        return out

    def _make_layer(self, block, block_num, in_channels, out_channels, stride):
        downsample = (in_channels != out_channels) or (stride != 1)
        layer = []
        layer.append(block(in_channels, out_channels, stride, downsample))
        for _ in range(block_num-1):
            layer.append(block(out_channels, out_channels, 1, False))
        return nn.Sequential(*layer)

    def _fuse_layer(self, in_high_channels, in_low_channels, out_channels, fuse_mode='AsymBi'):
        assert fuse_mode in ['BiLocal', 'AsymBi', 'BiGlobal']
        if fuse_mode == 'BiLocal':
            fuse_layer = BiLocalChaFuseReduce(in_high_channels, in_low_channels, out_channels)
        elif fuse_mode == 'AsymBi':
            fuse_layer = AsymBiChaFuseReduce(in_high_channels, in_low_channels, out_channels)
        elif fuse_mode == 'BiGlobal':
            fuse_layer = BiGlobalChaFuseReduce(in_high_channels, in_low_channels, out_channels)
        else:
            NameError
        return fuse_layer


class ASKCResUNet(nn.Layer):
    def __init__(self, layer_blocks, channels, fuse_mode='AsymBi'):
        super(ASKCResUNet, self).__init__()

        stem_width = int(channels[0])
        self.stem = nn.Sequential(
            nn.BatchNorm2D(3),
            nn.Conv2D(3, stem_width, 3, 2, 1, bias_attr=False),
            nn.BatchNorm2D(stem_width),
            nn.ReLU(True),

            nn.Conv2D(stem_width, stem_width, 3, 1, 1, bias_attr=False),
            nn.BatchNorm2D(stem_width),
            nn.ReLU(True),

            nn.Conv2D(stem_width, 2*stem_width, 3, 1, 1, bias_attr=False),
            nn.BatchNorm2D(2*stem_width),
            nn.ReLU(True),

            nn.MaxPool2D(3, 2, 1),
        )

        self.layer1 = self._make_layer(block=ResidualBlock, block_num=layer_blocks[0],
                                       in_channels=channels[1], out_channels=channels[1], stride=1)
        self.layer2 = self._make_layer(block=ResidualBlock, block_num=layer_blocks[1],
                                       in_channels=channels[1], out_channels=channels[2], stride=2)
        self.layer3 = self._make_layer(block=ResidualBlock, block_num=layer_blocks[2],
                                       in_channels=channels[2], out_channels=channels[3], stride=2)

        self.deconv2 = nn.Conv2DTranspose(channels[3], channels[2], 4, 2, 1)
        self.fuse2 = self._fuse_layer(channels[2], channels[2], channels[2], fuse_mode)
        self.uplayer2 = self._make_layer(block=ResidualBlock, block_num=layer_blocks[1],
                                         in_channels=channels[2], out_channels=channels[2], stride=1)

        self.deconv1 = nn.Conv2DTranspose(channels[2], channels[1], 4, 2, 1)
        self.fuse1 = self._fuse_layer(channels[1], channels[1], channels[1], fuse_mode)
        self.uplayer1 = self._make_layer(block=ResidualBlock, block_num=layer_blocks[0],
                                         in_channels=channels[1], out_channels=channels[1], stride=1)

        self.head = _FCNHead(channels[1], 1)

    def forward(self, x):
        _, _, hei, wid = x.shape

        x = self.stem(x)
        c1 = self.layer1(x)
        c2 = self.layer2(c1)
        c3 = self.layer3(c2)

        deconc2 = self.deconv2(c3)
        fusec2 = self.fuse2(deconc2, c2)
        upc2 = self.uplayer2(fusec2)

        deconc1 = self.deconv1(upc2)
        fusec1 = self.fuse1(deconc1, c1)
        upc1 = self.uplayer1(fusec1)

        pred = self.head(upc1)
        out = F.interpolate(pred, size=[hei, wid], mode='bilinear')
        return out

    def _make_layer(self, block, block_num, in_channels, out_channels, stride):
        layer = []
        downsample = (in_channels != out_channels) or (stride != 1)
        layer.append(block(in_channels, out_channels, stride, downsample))
        for _ in range(block_num-1):
            layer.append(block(out_channels, out_channels, 1, False))
        return nn.Sequential(*layer)

    def _fuse_layer(self, in_high_channels, in_low_channels, out_channels, fuse_mode='AsymBi'):
        assert fuse_mode in ['BiLocal', 'AsymBi', 'BiGlobal']
        if fuse_mode == 'BiLocal':
            fuse_layer = BiLocalChaFuseReduce(in_high_channels, in_low_channels, out_channels)
        elif fuse_mode == 'AsymBi':
            fuse_layer = AsymBiChaFuseReduce(in_high_channels, in_low_channels, out_channels)
        elif fuse_mode == 'BiGlobal':
            fuse_layer = BiGlobalChaFuseReduce(in_high_channels, in_low_channels, out_channels)
        else:
            NameError
        return fuse_layer

6. 构建数据流

from tqdm import tqdm
from paddle.io import Dataset,DataLoader
from paddle.vision.transforms import transforms as T
import matplotlib.pyplot as plt
import glob 
import os 
from PIL import Image, ImageOps, ImageFilter
import os.path as osp
import sys
import random
import numpy as np 
from utils import * 
class InfraredDataset(Dataset):
    def __init__(self, dataset_dir, image_index, crop_size=480 , base_size=512, mode='train'):
        super(InfraredDataset, self).__init__()
        self.dataset_dir = dataset_dir
        self.image_index = image_index
        self.crop_size = crop_size
        self.base_size = base_size
        self.mode = mode
        self.transform = T.Compose([
            T.ToTensor(),
            T.Normalize([.485, .456, .406], [.229, .224, .225]),  # Default mean and std
        ])
    def __getitem__(self, index):
        image_index = self.image_index[index].strip('\n')
        image_path = os.path.join(self.dataset_dir, 'images', '%s.png' % image_index)
        label_path = os.path.join(self.dataset_dir, 'masks', '%s_pixels0.png' % image_index)
        img = Image.open(image_path)
        img = img.convert('RGB')
        mask = Image.open(label_path)

        if self.mode == 'train':
            img, mask = self._sync_transform(img, mask)
        elif self.mode == 'val':
            img, mask = self._testval_sync_transform(img, mask)
        else:
            raise ValueError("Unkown self.mode")
        img, mask =  self.transform(img), T.ToTensor()(mask)
        return paddle.cast(img, 'float32'), paddle.cast(mask, 'float32')

    def __len__(self):
        return len(self.image_index)

    def _sync_transform(self, img, mask):
            # random mirror
            if random.random() < 0.5:
                img = img.transpose(Image.FLIP_LEFT_RIGHT)
                mask = mask.transpose(Image.FLIP_LEFT_RIGHT)
            crop_size = self.crop_size
            # random scale (short edge)
            long_size = random.randint(int(self.base_size * 0.5), int(self.base_size * 2.0))
            w, h = img.size
            if h > w:
                oh = long_size
                ow = int(1.0 * w * long_size / h + 0.5)
                short_size = ow
            else:
                ow = long_size
                oh = int(1.0 * h * long_size / w + 0.5)
                short_size = oh
            img = img.resize((ow, oh), Image.BILINEAR)
            mask = mask.resize((ow, oh), Image.NEAREST)
            # pad crop
            if short_size < crop_size:
                padh = crop_size - oh if oh < crop_size else 0
                padw = crop_size - ow if ow < crop_size else 0
                img = ImageOps.expand(img, border=(0, 0, padw, padh), fill=0)
                mask = ImageOps.expand(mask, border=(0, 0, padw, padh), fill=0)
            # random crop crop_size
            w, h = img.size
            x1 = random.randint(0, w - crop_size)
            y1 = random.randint(0, h - crop_size)
            img = img.crop((x1, y1, x1 + crop_size, y1 + crop_size))
            mask = mask.crop((x1, y1, x1 + crop_size, y1 + crop_size))
            # gaussian blur as in PSP
            if random.random() < 0.5:
                img = img.filter(ImageFilter.GaussianBlur(
                    radius=random.random()))
            return img, mask

    def _val_sync_transform(self, img, mask):
        outsize = self.crop_size
        short_size = outsize
        w, h = img.size
        if w > h:
            oh = short_size
            ow = int(1.0 * w * oh / h)
        else:
            ow = short_size
            oh = int(1.0 * h * ow / w)
        img = img.resize((ow, oh), Image.BILINEAR)
        mask = mask.resize((ow, oh), Image.NEAREST)
        # center crop
        w, h = img.size
        x1 = int(round((w - outsize) / 2.))
        y1 = int(round((h - outsize) / 2.))
        img = img.crop((x1, y1, x1 + outsize, y1 + outsize))
        mask = mask.crop((x1, y1, x1 + outsize, y1 + outsize))

        return img, mask

    def _testval_sync_transform(self, img, mask):
        base_size = self.base_size
        img = img.resize((base_size, base_size), Image.BILINEAR)
        mask = mask.resize((base_size, base_size), Image.NEAREST)

        return img, mask
f = open('./sirst/idx_427/trainval.txt').readlines()
ds = InfraredDataset(dataset_dir='./sirst', image_index=f)
image , label = next(iter(ds))
image, label = image.numpy(), label.numpy()

6.1 数据流验证

plt.subplot(121)
plt.imshow(image[0], cmap='gray')
plt.subplot(122)
plt.imshow(np.uint8(label[0]), cmap='gray')
<matplotlib.image.AxesImage at 0x7fd83378cbd0>

在这里插入图片描述

6.2 构建DataLoader

dataset_dir = './sirst'
train_index = open('./sirst/idx_427/trainval.txt').readlines()
test_index = open('./sirst/idx_427/test.txt').readlines()
batch_size = 8
image_size = (480, 480)
train_ds = InfraredDataset(dataset_dir, train_index)
test_ds = InfraredDataset(dataset_dir, test_index)
train_dl = DataLoader(train_ds, batch_size=batch_size, shuffle=True, num_workers=8)
test_dl = DataLoader(test_ds, batch_size=8,
                        shuffle=False,  num_workers=8)

7. 模型训练

def training(net, train_data_loader, epoch, criterion, optimizer, epochs, learning_rate, warm_up_epochs):
    # training step
    losses = []
    net.train()
    tbar = tqdm(train_data_loader)
    for i, (data, labels) in enumerate(tbar):
        output = net(data)
        loss = criterion(output, labels)

        optimizer.clear_grad()
        loss.backward()
        optimizer.step()

        losses.append(loss.item())
        tbar.set_description('Epoch:%3d, lr:%f, train loss:%f'
                                % (epoch, optimizer.get_lr(), np.mean(losses)))

    adjust_learning_rate(optimizer, epoch, epochs, learning_rate,
                            warm_up_epochs, 1e-6)

def validation(net, val_data_loader, epoch, criterion, iou_metric, nIoU_metric):
    iou_metric.reset()
    nIoU_metric.reset()

    eval_losses = []
    net.eval()
    tbar = tqdm(val_data_loader)
    for i, (data, labels) in enumerate(tbar):
        output = net(data)
        loss = criterion(output, labels)
        eval_losses.append(loss.item())
        iou_metric.update(output, labels)
        nIoU_metric.update(output, labels)
        _, IoU = iou_metric.get()
        _, nIoU = nIoU_metric.get()
        tbar.set_description('  Epoch:%3d, eval loss:%f, IoU:%f, nIoU:%f'
                                %(epoch, np.mean(eval_losses), IoU, nIoU))
    _, IoU = iou_metric.get()
    _, nIoU = nIoU_metric.get()
    return IoU, nIoU
!pip install paddleseg
from paddleseg.cvlibs import param_init
def weight_init(m):
    if isinstance(m, nn.Conv2D):
        param_init.normal_init(m.weight,mean=0.0, std=0.02)
    elif isinstance(m, nn.BatchNorm2D):
        param_init.normal_init(m.weight, mean=1.0, std=0.02)
        param_init.constant_init(m.bias,value=0)
from paddle import  optimizer 
import os.path as ops 
epochs = 100  # 训练总轮数
learning_rate = 0.05
criterion = SoftLoULoss()
backbone_mode = 'UNet'
blocks_per_layer = 4 
warm_up_epochs = 0
fuse_mode = 'AsymBi'
## model
layer_blocks = [blocks_per_layer] * 3
channels = [8, 16, 32, 64]
if backbone_mode == 'FPN':
    net = ASKCResNetFPN(layer_blocks, channels, fuse_mode)
elif backbone_mode == 'UNet':
    net = ASKCResUNet(layer_blocks, channels, fuse_mode)
net.apply(weight_init)
## optimizer
optimizer = paddle.optimizer.Adagrad(learning_rate=learning_rate, parameters=net.parameters())
## evaluation metrics
iou_metric = SigmoidMetric()
nIoU_metric = SamplewiseSigmoidMetric(1, score_thresh=0.5)
save_pkl = 'weights/%s_%s' %(backbone_mode, fuse_mode)  # 预训练模型保存位置
best_iou = 0.0
best_nIoU = 0.0 
for epoch in range(1, epochs+1):
    training(net, train_dl, epoch, criterion, optimizer, epochs, learning_rate, warm_up_epochs)
    IoU, nIoU = validation(net, test_dl, epoch, criterion, iou_metric, nIoU_metric)
    pkl_name = 'Epoch-%3d_IoU-%.4f_nIoU-%.4f.pkl' % (epoch, IoU, nIoU)
    if IoU > best_iou:
        paddle.save(net.state_dict(), ops.join(save_pkl, pkl_name))
        best_iou = IoU
    if nIoU > best_nIoU:
        paddle.save(net.state_dict(), ops.join(save_pkl, pkl_name))
        best_nIoU = nIoU

print('Best IoU: %.5f, best nIoU: %.5f' % (best_iou, best_nIoU))

8. 模型验证

pkl_name = 'weights/UNet_AsymBi/Epoch- 93_IoU-0.4525_nIoU-0.3955.pkl'
net.set_state_dict(paddle.load(pkl_name))
net.eval()
TF = T.Compose([
    T.Resize((int(image_size[0]), int(image_size[1]))),
    T.ToTensor(),
])
image = Image.open('./sirst/images/Misc_1.png').convert('RGB')
label = Image.open('./sirst/masks/Misc_1_pixels0.png')
tensor_img = TF(image)
tensor_img = paddle.unsqueeze(tensor_img, 0)
pred = net(tensor_img)[0]
import cv2 
w, h = image.size
prediction = F.sigmoid(pred[0])
prediction = cv2.resize(prediction.numpy(), (w, h))
plt.figure(figsize=(30, 30))
plt.subplot(131)
plt.title('Input')
plt.imshow(np.array(image), cmap='gray')
plt.subplot(132)
plt.title('Pred')
plt.imshow(prediction, cmap='gray')
plt.subplot(133)
plt.title('Label')
plt.imshow(label, cmap='gray')
<matplotlib.image.AxesImage at 0x7fd849e8f2d0>

在这里插入图片描述

iou_metric = SigmoidMetric()
nIoU_metric = SamplewiseSigmoidMetric(1, score_thresh=0.5)
iou_metric.reset()
nIoU_metric.reset()

net.eval()
tbar = tqdm(test_dl)
for i, (data, labels) in enumerate(tbar):
    output = net(data)
    iou_metric.update(output, labels)
    nIoU_metric.update(output, labels)
_, IoU = iou_metric.get()
_, nIoU = nIoU_metric.get()
print('IOU %f nIOU %f'%(IoU, nIoU))

9. 结论

这篇论文最重要的贡献就是数据集,在这之前基于深度学习的红外弱小目标论文还较少,这篇文章也一定程度上推动了深度学习在红外弱小目标检测上的发展。

PP-ISTD ?

寻求构建基于的Paddle Paddle的红外弱小目标检测算法, 如果你也对红外弱小目标检测感兴趣欢迎加入一起复现更多的算法。

Asymmetric Contextual Modulation for Infrared Small Target Detection

  • [x]

Dense Nested Attention Network for Infrared Small Target Detection

  • [x]

此文仅为搬运,原作链接:https://aistudio.baidu.com/aistudio/projectdetail/4338034

Logo

学大模型,用大模型上飞桨星河社区!每天8点V100G算力免费领!免费领取ERNIE 4.0 100w Token >>>

更多推荐