PP-ISTD:Asymmetric Contextual Modulation
PP-ISTD基于飞桨的红外弱小目标检测,论文复现: Asymmetric Contextual Modulation for Infrared Small Target Detection
1. Asymmetric Contextual Modulation for Infrared Small Target Detection
本文的贡献如下:
- 开源数据集 Sirst Dai 。
- 提出ACM模块,可以实现小目标浅层和深层信息的高效交互。
- 超越了当时的其他算法。
目前该文章的代码一共有两版本Dai Mxnet 以及Zhang Pytorch,本项目中将采用Paddle复现。
推荐阅读PP-ISTD:Dense Nested Attention Network
2. 论文解读
文章的核心模型ACM如下:
研究动机:
- 1)如何构建深度模型来检测缺乏内在信息的红外小目标;
- 2) 如何在不影响目标细节的情况下对高层上下文信息进行编码。
ACM由,自下而上、自上而下两个核心模块构成:如图1
具体的计算流程如下:
X , Y 分别代表相邻的渐层特征和深层特征
- X ′ = G ( Y ) ⊗ X = σ ( B ( W 2 δ ( B ( W 1 N ) ) ) ) ⊗ X {\bf X}^{\prime}={\bf G}({\bf Y})\otimes{\bf X}=\sigma\,(B\,({\bf W}_{2}\delta\,(B\,({\bf W}_{1N}))))\otimes{\bf X} X′=G(Y)⊗X=σ(B(W2δ(B(W1N))))⊗X
其中 σ \sigma σ 为激活函数,论文中为Rele ⊗ \otimes ⊗ 为Sigmoid函数,B为Batch Normalization , W则是全连接层。
- L ( X ) = σ ( B ( P W C o n v 2 ( δ ( P W C o n v 1 ( X ) ) ) ) ) \mathbf{L}(\mathbf{X})={\boldsymbol{\sigma}}\left({\boldsymbol{B}}\left(\mathbf{P}\mathbf{W}\mathbf{C}\mathrm{on}\mathbf{v}_{2}\left(\delta\left(\mathbf{P}\mathbf{W}\mathbf{C}\mathrm{on}\mathbf{v}_{1}(\mathbf{X})\right)\right)\right)\right) L(X)=σ(B(PWConv2(δ(PWConv1(X)))))
PWConv为1x1的卷积
-
Y ′ = L ( X ) ⊗ Y {\bf Y}^{\prime}={\bf L}({\bf X})\otimes{\bf Y} Y′=L(X)⊗Y
-
Z = G ( Y ) ⊗ X = D ( X ) ⊗ Y {\bf Z}=\mathrm{G}({\bf Y})\otimes\bf X=\mathrm{D}(\bf X)\otimes\bf Y Z=G(Y)⊗X=D(X)⊗Y
通过上式计算得到融合的特征Z。
3. 结果展示
4. 论文复现
4. 1 环境依赖
PaddlePaddle 2.3
PaddleSeg
4.2 数据集
本项目已经下载好相关数据,无需下载
Sirst Dai
数据集部分图像
!unzip -o sirst/images.zip -d sirst/
!unzip -o sirst/masks.zip -d sirst/
4.3. 评价指标
本文采用了IOU, nIOU作为评价指标
相关指标的介绍可以参考 link , 下面给出了基于Paddle的指标计算代码
import numpy as np
import paddle.nn.functional as F
class SigmoidMetric():
def __init__(self):
self.reset()
def update(self, pred, labels):
correct, labeled = self.batch_pix_accuracy(pred, labels)
inter, union = self.batch_intersection_union(pred, labels)
self.total_correct += correct
self.total_label += labeled
self.total_inter += inter
self.total_union += union
def get(self):
"""Gets the current evaluation result."""
pixAcc = 1.0 * self.total_correct / (np.spacing(1) + self.total_label)
IoU = 1.0 * self.total_inter / (np.spacing(1) + self.total_union)
mIoU = IoU.mean()
return pixAcc, mIoU
def reset(self):
"""Resets the internal evaluation result to initial state."""
self.total_inter = 0
self.total_union = 0
self.total_correct = 0
self.total_label = 0
def batch_pix_accuracy(self, output, target):
assert output.shape == target.shape
output = output.numpy()
target = target.numpy()
predict = (output > 0).astype('int64') # P
pixel_labeled = np.sum(target > 0) # T
pixel_correct = np.sum((predict == target)*(target > 0)) # TP
assert pixel_correct <= pixel_labeled
return pixel_correct, pixel_labeled
def batch_intersection_union(self, output, target):
mini = 1
maxi = 1 # nclass
nbins = 1 # nclass
predict = (output.numpy() > 0).astype('int64') # P
target = target.numpy().astype('int64') # T
intersection = predict * (predict == target) # TP
# areas of intersection and union
area_inter, _ = np.histogram(intersection, bins=nbins, range=(mini, maxi))
area_pred, _ = np.histogram(predict, bins=nbins, range=(mini, maxi))
area_lab, _ = np.histogram(target, bins=nbins, range=(mini, maxi))
area_union = area_pred + area_lab - area_inter
assert (area_inter <= area_union).all()
return area_inter, area_union
class SamplewiseSigmoidMetric():
def __init__(self, nclass, score_thresh=0.5):
self.nclass = nclass
self.score_thresh = score_thresh
self.reset()
def update(self, preds, labels):
"""Updates the internal evaluation result."""
inter_arr, union_arr = self.batch_intersection_union(preds, labels,
self.nclass, self.score_thresh)
self.total_inter = np.append(self.total_inter, inter_arr)
self.total_union = np.append(self.total_union, union_arr)
def get(self):
"""Gets the current evaluation result."""
IoU = 1.0 * self.total_inter / (np.spacing(1) + self.total_union)
mIoU = IoU.mean()
return IoU, mIoU
def reset(self):
"""Resets the internal evaluation result to initial state."""
self.total_inter = np.array([])
self.total_union = np.array([])
self.total_correct = np.array([])
self.total_label = np.array([])
def batch_intersection_union(self, output, target, nclass, score_thresh):
"""mIoU"""
# inputs are tensor
# the category 0 is ignored class, typically for background / boundary
mini = 1
maxi = 1 # nclass
nbins = 1 # nclass
predict = (F.sigmoid(output).numpy() > score_thresh).astype('int64') # P
target = target.numpy().astype('int64') # T
intersection = predict * (predict == target) # TP
num_sample = intersection.shape[0]
area_inter_arr = np.zeros(num_sample)
area_pred_arr = np.zeros(num_sample)
area_lab_arr = np.zeros(num_sample)
area_union_arr = np.zeros(num_sample)
for b in range(num_sample):
# areas of intersection and union
area_inter, _ = np.histogram(intersection[b], bins=nbins, range=(mini, maxi))
area_inter_arr[b] = area_inter
area_pred, _ = np.histogram(predict[b], bins=nbins, range=(mini, maxi))
area_pred_arr[b] = area_pred
area_lab, _ = np.histogram(target[b], bins=nbins, range=(mini, maxi))
area_lab_arr[b] = area_lab
area_union = area_pred + area_lab - area_inter
area_union_arr[b] = area_union
assert (area_inter <= area_union).all()
return area_inter_arr, area_union_arr
class ROCMetric():
def __init__(self, nclass, bins):
self.nclass = nclass
self.bins = bins
self.tp_arr = np.zeros(self.bins+1)
self.pos_arr = np.zeros(self.bins+1)
self.fp_arr = np.zeros(self.bins+1)
self.neg_arr = np.zeros(self.bins+1)
def update(self, preds, labels):
for iBin in range(self.bins+1):
score_thresh = (iBin + 0.0) / self.bins
i_tp, i_pos, i_fp, i_neg = cal_tp_pos_fp_neg(preds, labels, self.nclass, score_thresh)
self.tp_arr[iBin] += i_tp
self.pos_arr[iBin] += i_pos
self.fp_arr[iBin] += i_fp
self.neg_arr[iBin] += i_neg
def get(self):
tp_rates = self.tp_arr / (self.pos_arr + 0.001)
fp_rates = self.fp_arr / (self.neg_arr + 0.001)
return tp_rates, fp_rates
def cal_tp_pos_fp_neg(output, target, nclass, score_thresh):
mini = 1
maxi = 1 # nclass
nbins = 1 # nclass
predict = (F.sigmoid(output).numpy() > score_thresh).astype('int64') # P
target = target.numpy().astype('int64') # T
intersection = predict * (predict == target) # TP
tp = intersection.sum()
fp = (predict * (predict != target)).sum() # FP
tn = ((1 - predict) * (predict == target)).sum() # TN
fn = ((predict != target) * (1 - predict)).sum() # FN
pos = tp + fn
neg = fp + tn
return tp, pos, fp, neg
5 模型搭建
import paddle
from paddle import nn
import paddle.nn.functional as F
use_gpu = True
paddle.device.set_device('gpu:0') if use_gpu else paddle.device.set_device('cpu')
paddle.seed(1024)
<paddle.fluid.core_avx.Generator at 0x7fda54e7fb30>
class BiLocalChaFuseReduce(nn.Layer):
def __init__(self, in_high_channels, in_low_channels, out_channels=64, r=4):
super(BiLocalChaFuseReduce, self).__init__()
assert in_low_channels == out_channels
self.high_channels = in_high_channels
self.low_channels = in_low_channels
self.out_channels = out_channels
self.bottleneck_channels = int(out_channels // r)
self.feature_high = nn.Sequential(
nn.Conv2D(self.high_channels, self.out_channels, 1, 1, 0),
nn.BatchNorm2D(self.out_channels),
nn.ReLU(True),
)
self.topdown = nn.Sequential(
nn.Conv2D(self.out_channels, self.bottleneck_channels, 1, 1, 0),
nn.BatchNorm2D(self.bottleneck_channels),
nn.ReLU(True),
nn.Conv2D(self.bottleneck_channels, self.out_channels, 1, 1, 0),
nn.BatchNorm2D(self.out_channels),
nn.Sigmoid()
)
self.bottomup = nn.Sequential(
nn.Conv2D(self.low_channels, self.bottleneck_channels, 1, 1, 0),
nn.BatchNorm2D(self.bottleneck_channels),
nn.ReLU(True),
nn.Conv2D(self.bottleneck_channels, self.out_channels, 1, 1, 0),
nn.BatchNorm2D(self.out_channels),
nn.Sigmoid(),
)
self.post = nn.Sequential(
nn.Conv2D(self.out_channels, self.out_channels, 3, 1, 1),
nn.BatchNorm2D(self.out_channels),
nn.ReLU(True),
)
def forward(self, xh, xl):
xh = self.feature_high(xh)
topdown_wei = self.topdown(xh)
bottomup_wei = self.bottomup(xl)
out = 2 * xl * topdown_wei + 2* xh * bottomup_wei
out = self.post(out)
return out
class AsymBiChaFuseReduce(nn.Layer):
def __init__(self, in_high_channels, in_low_channels, out_channels=64, r=4):
super(AsymBiChaFuseReduce, self).__init__()
assert in_low_channels == out_channels
self.high_channels = in_high_channels
self.low_channels = in_low_channels
self.out_channels = out_channels
self.bottleneck_channels = int(out_channels // r)
self.feature_high = nn.Sequential(
nn.Conv2D(self.high_channels, self.out_channels, 1, 1, 0),
nn.BatchNorm2D(out_channels),
nn.ReLU(True),
)
self.topdown = nn.Sequential(
nn.AdaptiveAvgPool2D((1, 1)),
nn.Conv2D(self.out_channels, self.bottleneck_channels, 1, 1, 0),
nn.BatchNorm2D(self.bottleneck_channels),
nn.ReLU(True),
nn.Conv2D(self.bottleneck_channels, self.out_channels, 1, 1, 0),
nn.BatchNorm2D(self.out_channels),
nn.Sigmoid(),
)
self.bottomup = nn.Sequential(
nn.Conv2D(self.low_channels, self.bottleneck_channels, 1, 1, 0),
nn.BatchNorm2D(self.bottleneck_channels),
nn.ReLU(True),
nn.Conv2D(self.bottleneck_channels, self.out_channels, 1, 1, 0),
nn.BatchNorm2D(self.out_channels),
nn.Sigmoid(),
)
self.post = nn.Sequential(
nn.Conv2D(self.out_channels, self.out_channels, 3, 1, 1),
nn.BatchNorm2D(self.out_channels),
nn.ReLU(True),
)
def forward(self, xh, xl):
xh = self.feature_high(xh)
topdown_wei = self.topdown(xh)
bottomup_wei = self.bottomup(xl)
xs = 2 * xl * topdown_wei + 2 * xh * bottomup_wei
out = self.post(xs)
return out
class BiGlobalChaFuseReduce(nn.Layer):
def __init__(self, in_high_channels, in_low_channels, out_channels=64, r=4):
super(BiGlobalChaFuseReduce, self).__init__()
assert in_low_channels == out_channels
self.high_channels = in_high_channels
self.low_channels = in_low_channels
self.out_channels = out_channels
self.bottleneck_channels = int(out_channels // r)
self.feature_high = nn.Sequential(
nn.Conv2D(self.high_channels, self.out_channels, 1, 1, 0),
nn.BatchNorm2D(out_channels),
nn.ReLU(True),
)
self.topdown = nn.Sequential(
nn.AdaptiveAvgPool2D((1, 1)),
nn.Conv2D(self.out_channels, self.bottleneck_channels, 1, 1, 0),
nn.BatchNorm2D(self.bottleneck_channels),
nn.ReLU(True),
nn.Conv2D(self.bottleneck_channels, self.out_channels, 1, 1, 0),
nn.BatchNorm2D(self.out_channels),
nn.Sigmoid(),
)
self.bottomup = nn.Sequential(
nn.AdaptiveAvgPool2D((1, 1)),
nn.Conv2D(self.low_channels, self.bottleneck_channels, 1, 1, 0),
nn.BatchNorm2D(self.bottleneck_channels),
nn.ReLU(True),
nn.Conv2D(self.bottleneck_channels, self.out_channels, 1, 1, 0),
nn.BatchNorm2D(self.out_channels),
nn.Sigmoid(),
)
self.post = nn.Sequential(
nn.Conv2D(self.out_channels, self.out_channels, 3, 1, 1),
nn.BatchNorm2D(self.out_channels),
nn.ReLU(True),
)
def forward(self, xh, xl):
xh = self.feature_high(xh)
topdown_wei = self.topdown(xh)
bottomup_wei = self.bottomup(xl)
xs = 2 * xl * topdown_wei + 2 * xh * bottomup_wei
out = self.post(xs)
return out
class ResidualBlock(nn.Layer):
def __init__(self, in_channels, out_channels, stride, downsample):
super(ResidualBlock, self).__init__()
self.body = nn.Sequential(
nn.Conv2D(in_channels, out_channels, 3, stride, 1, bias_attr=False),
nn.BatchNorm2D(out_channels),
nn.ReLU(True),
nn.Conv2D(out_channels, out_channels, 3, 1, 1, bias_attr=False),
nn.BatchNorm2D(out_channels),
)
if downsample:
self.downsample = nn.Sequential(
nn.Conv2D(in_channels, out_channels, 1, stride, 0, bias_attr=False),
nn.BatchNorm2D(out_channels),
)
else:
self.downsample = nn.Sequential()
def forward(self, x):
residual = x
x = self.body(x)
if self.downsample:
residual = self.downsample(residual)
out = F.relu(x+residual, True)
return out
class _FCNHead(nn.Layer):
def __init__(self, in_channels, out_channels):
super(_FCNHead, self).__init__()
inter_channels = in_channels // 4
self.block = nn.Sequential(
nn.Conv2D(in_channels, inter_channels, 3, 1, 1, bias_attr=False),
nn.BatchNorm2D(inter_channels),
nn.ReLU(True),
nn.Dropout(0.1),
nn.Conv2D(inter_channels, out_channels, 1, 1, 0)
)
def forward(self, x):
return self.block(x)
class ASKCResNetFPN(nn.Layer):
def __init__(self, layer_blocks, channels, fuse_mode='AsymBi'):
super(ASKCResNetFPN, self).__init__()
stem_width = channels[0]
self.stem = nn.Sequential(
nn.BatchNorm2D(3),
nn.Conv2D(3, stem_width, 3, 2, 1, bias_attr=False),
nn.BatchNorm2D(stem_width),
nn.ReLU(True),
nn.Conv2D(stem_width, stem_width, 3, 1, 1, bias_attr=False),
nn.BatchNorm2D(stem_width),
nn.ReLU(True),
nn.Conv2D(stem_width, stem_width*2, 3, 1, 1, bias_attr=False),
nn.BatchNorm2D(stem_width*2),
nn.ReLU(True),
nn.MaxPool2D(3, 2, 1)
)
self.layer1 = self._make_layer(block=ResidualBlock, block_num=layer_blocks[0],
in_channels=channels[1], out_channels=channels[1], stride=1)
self.layer2 = self._make_layer(block=ResidualBlock, block_num=layer_blocks[1],
in_channels=channels[1], out_channels=channels[2], stride=2)
self.layer3 = self._make_layer(block=ResidualBlock, block_num=layer_blocks[2],
in_channels=channels[2], out_channels=channels[3], stride=2)
self.fuse23 = self._fuse_layer(channels[3], channels[2], channels[2], fuse_mode)
self.fuse12 = self._fuse_layer(channels[2], channels[1], channels[1], fuse_mode)
self.head = _FCNHead(channels[1], 1)
def forward(self, x):
_, _, hei, wid = x.shape
x = self.stem(x)
c1 = self.layer1(x)
c2 = self.layer2(c1)
out = self.layer3(c2)
out = F.interpolate(out, size=[hei//8, wid//8], mode='bilinear')
out = self.fuse23(out, c2)
out = F.interpolate(out, size=[hei//4, wid//4], mode='bilinear')
out = self.fuse12(out, c1)
pred = self.head(out)
out = F.interpolate(pred, size=[hei, wid], mode='bilinear')
return out
def _make_layer(self, block, block_num, in_channels, out_channels, stride):
downsample = (in_channels != out_channels) or (stride != 1)
layer = []
layer.append(block(in_channels, out_channels, stride, downsample))
for _ in range(block_num-1):
layer.append(block(out_channels, out_channels, 1, False))
return nn.Sequential(*layer)
def _fuse_layer(self, in_high_channels, in_low_channels, out_channels, fuse_mode='AsymBi'):
assert fuse_mode in ['BiLocal', 'AsymBi', 'BiGlobal']
if fuse_mode == 'BiLocal':
fuse_layer = BiLocalChaFuseReduce(in_high_channels, in_low_channels, out_channels)
elif fuse_mode == 'AsymBi':
fuse_layer = AsymBiChaFuseReduce(in_high_channels, in_low_channels, out_channels)
elif fuse_mode == 'BiGlobal':
fuse_layer = BiGlobalChaFuseReduce(in_high_channels, in_low_channels, out_channels)
else:
NameError
return fuse_layer
class ASKCResUNet(nn.Layer):
def __init__(self, layer_blocks, channels, fuse_mode='AsymBi'):
super(ASKCResUNet, self).__init__()
stem_width = int(channels[0])
self.stem = nn.Sequential(
nn.BatchNorm2D(3),
nn.Conv2D(3, stem_width, 3, 2, 1, bias_attr=False),
nn.BatchNorm2D(stem_width),
nn.ReLU(True),
nn.Conv2D(stem_width, stem_width, 3, 1, 1, bias_attr=False),
nn.BatchNorm2D(stem_width),
nn.ReLU(True),
nn.Conv2D(stem_width, 2*stem_width, 3, 1, 1, bias_attr=False),
nn.BatchNorm2D(2*stem_width),
nn.ReLU(True),
nn.MaxPool2D(3, 2, 1),
)
self.layer1 = self._make_layer(block=ResidualBlock, block_num=layer_blocks[0],
in_channels=channels[1], out_channels=channels[1], stride=1)
self.layer2 = self._make_layer(block=ResidualBlock, block_num=layer_blocks[1],
in_channels=channels[1], out_channels=channels[2], stride=2)
self.layer3 = self._make_layer(block=ResidualBlock, block_num=layer_blocks[2],
in_channels=channels[2], out_channels=channels[3], stride=2)
self.deconv2 = nn.Conv2DTranspose(channels[3], channels[2], 4, 2, 1)
self.fuse2 = self._fuse_layer(channels[2], channels[2], channels[2], fuse_mode)
self.uplayer2 = self._make_layer(block=ResidualBlock, block_num=layer_blocks[1],
in_channels=channels[2], out_channels=channels[2], stride=1)
self.deconv1 = nn.Conv2DTranspose(channels[2], channels[1], 4, 2, 1)
self.fuse1 = self._fuse_layer(channels[1], channels[1], channels[1], fuse_mode)
self.uplayer1 = self._make_layer(block=ResidualBlock, block_num=layer_blocks[0],
in_channels=channels[1], out_channels=channels[1], stride=1)
self.head = _FCNHead(channels[1], 1)
def forward(self, x):
_, _, hei, wid = x.shape
x = self.stem(x)
c1 = self.layer1(x)
c2 = self.layer2(c1)
c3 = self.layer3(c2)
deconc2 = self.deconv2(c3)
fusec2 = self.fuse2(deconc2, c2)
upc2 = self.uplayer2(fusec2)
deconc1 = self.deconv1(upc2)
fusec1 = self.fuse1(deconc1, c1)
upc1 = self.uplayer1(fusec1)
pred = self.head(upc1)
out = F.interpolate(pred, size=[hei, wid], mode='bilinear')
return out
def _make_layer(self, block, block_num, in_channels, out_channels, stride):
layer = []
downsample = (in_channels != out_channels) or (stride != 1)
layer.append(block(in_channels, out_channels, stride, downsample))
for _ in range(block_num-1):
layer.append(block(out_channels, out_channels, 1, False))
return nn.Sequential(*layer)
def _fuse_layer(self, in_high_channels, in_low_channels, out_channels, fuse_mode='AsymBi'):
assert fuse_mode in ['BiLocal', 'AsymBi', 'BiGlobal']
if fuse_mode == 'BiLocal':
fuse_layer = BiLocalChaFuseReduce(in_high_channels, in_low_channels, out_channels)
elif fuse_mode == 'AsymBi':
fuse_layer = AsymBiChaFuseReduce(in_high_channels, in_low_channels, out_channels)
elif fuse_mode == 'BiGlobal':
fuse_layer = BiGlobalChaFuseReduce(in_high_channels, in_low_channels, out_channels)
else:
NameError
return fuse_layer
6. 构建数据流
from tqdm import tqdm
from paddle.io import Dataset,DataLoader
from paddle.vision.transforms import transforms as T
import matplotlib.pyplot as plt
import glob
import os
from PIL import Image, ImageOps, ImageFilter
import os.path as osp
import sys
import random
import numpy as np
from utils import *
class InfraredDataset(Dataset):
def __init__(self, dataset_dir, image_index, crop_size=480 , base_size=512, mode='train'):
super(InfraredDataset, self).__init__()
self.dataset_dir = dataset_dir
self.image_index = image_index
self.crop_size = crop_size
self.base_size = base_size
self.mode = mode
self.transform = T.Compose([
T.ToTensor(),
T.Normalize([.485, .456, .406], [.229, .224, .225]), # Default mean and std
])
def __getitem__(self, index):
image_index = self.image_index[index].strip('\n')
image_path = os.path.join(self.dataset_dir, 'images', '%s.png' % image_index)
label_path = os.path.join(self.dataset_dir, 'masks', '%s_pixels0.png' % image_index)
img = Image.open(image_path)
img = img.convert('RGB')
mask = Image.open(label_path)
if self.mode == 'train':
img, mask = self._sync_transform(img, mask)
elif self.mode == 'val':
img, mask = self._testval_sync_transform(img, mask)
else:
raise ValueError("Unkown self.mode")
img, mask = self.transform(img), T.ToTensor()(mask)
return paddle.cast(img, 'float32'), paddle.cast(mask, 'float32')
def __len__(self):
return len(self.image_index)
def _sync_transform(self, img, mask):
# random mirror
if random.random() < 0.5:
img = img.transpose(Image.FLIP_LEFT_RIGHT)
mask = mask.transpose(Image.FLIP_LEFT_RIGHT)
crop_size = self.crop_size
# random scale (short edge)
long_size = random.randint(int(self.base_size * 0.5), int(self.base_size * 2.0))
w, h = img.size
if h > w:
oh = long_size
ow = int(1.0 * w * long_size / h + 0.5)
short_size = ow
else:
ow = long_size
oh = int(1.0 * h * long_size / w + 0.5)
short_size = oh
img = img.resize((ow, oh), Image.BILINEAR)
mask = mask.resize((ow, oh), Image.NEAREST)
# pad crop
if short_size < crop_size:
padh = crop_size - oh if oh < crop_size else 0
padw = crop_size - ow if ow < crop_size else 0
img = ImageOps.expand(img, border=(0, 0, padw, padh), fill=0)
mask = ImageOps.expand(mask, border=(0, 0, padw, padh), fill=0)
# random crop crop_size
w, h = img.size
x1 = random.randint(0, w - crop_size)
y1 = random.randint(0, h - crop_size)
img = img.crop((x1, y1, x1 + crop_size, y1 + crop_size))
mask = mask.crop((x1, y1, x1 + crop_size, y1 + crop_size))
# gaussian blur as in PSP
if random.random() < 0.5:
img = img.filter(ImageFilter.GaussianBlur(
radius=random.random()))
return img, mask
def _val_sync_transform(self, img, mask):
outsize = self.crop_size
short_size = outsize
w, h = img.size
if w > h:
oh = short_size
ow = int(1.0 * w * oh / h)
else:
ow = short_size
oh = int(1.0 * h * ow / w)
img = img.resize((ow, oh), Image.BILINEAR)
mask = mask.resize((ow, oh), Image.NEAREST)
# center crop
w, h = img.size
x1 = int(round((w - outsize) / 2.))
y1 = int(round((h - outsize) / 2.))
img = img.crop((x1, y1, x1 + outsize, y1 + outsize))
mask = mask.crop((x1, y1, x1 + outsize, y1 + outsize))
return img, mask
def _testval_sync_transform(self, img, mask):
base_size = self.base_size
img = img.resize((base_size, base_size), Image.BILINEAR)
mask = mask.resize((base_size, base_size), Image.NEAREST)
return img, mask
f = open('./sirst/idx_427/trainval.txt').readlines()
ds = InfraredDataset(dataset_dir='./sirst', image_index=f)
image , label = next(iter(ds))
image, label = image.numpy(), label.numpy()
6.1 数据流验证
plt.subplot(121)
plt.imshow(image[0], cmap='gray')
plt.subplot(122)
plt.imshow(np.uint8(label[0]), cmap='gray')
<matplotlib.image.AxesImage at 0x7fd83378cbd0>
6.2 构建DataLoader
dataset_dir = './sirst'
train_index = open('./sirst/idx_427/trainval.txt').readlines()
test_index = open('./sirst/idx_427/test.txt').readlines()
batch_size = 8
image_size = (480, 480)
train_ds = InfraredDataset(dataset_dir, train_index)
test_ds = InfraredDataset(dataset_dir, test_index)
train_dl = DataLoader(train_ds, batch_size=batch_size, shuffle=True, num_workers=8)
test_dl = DataLoader(test_ds, batch_size=8,
shuffle=False, num_workers=8)
7. 模型训练
def training(net, train_data_loader, epoch, criterion, optimizer, epochs, learning_rate, warm_up_epochs):
# training step
losses = []
net.train()
tbar = tqdm(train_data_loader)
for i, (data, labels) in enumerate(tbar):
output = net(data)
loss = criterion(output, labels)
optimizer.clear_grad()
loss.backward()
optimizer.step()
losses.append(loss.item())
tbar.set_description('Epoch:%3d, lr:%f, train loss:%f'
% (epoch, optimizer.get_lr(), np.mean(losses)))
adjust_learning_rate(optimizer, epoch, epochs, learning_rate,
warm_up_epochs, 1e-6)
def validation(net, val_data_loader, epoch, criterion, iou_metric, nIoU_metric):
iou_metric.reset()
nIoU_metric.reset()
eval_losses = []
net.eval()
tbar = tqdm(val_data_loader)
for i, (data, labels) in enumerate(tbar):
output = net(data)
loss = criterion(output, labels)
eval_losses.append(loss.item())
iou_metric.update(output, labels)
nIoU_metric.update(output, labels)
_, IoU = iou_metric.get()
_, nIoU = nIoU_metric.get()
tbar.set_description(' Epoch:%3d, eval loss:%f, IoU:%f, nIoU:%f'
%(epoch, np.mean(eval_losses), IoU, nIoU))
_, IoU = iou_metric.get()
_, nIoU = nIoU_metric.get()
return IoU, nIoU
!pip install paddleseg
from paddleseg.cvlibs import param_init
def weight_init(m):
if isinstance(m, nn.Conv2D):
param_init.normal_init(m.weight,mean=0.0, std=0.02)
elif isinstance(m, nn.BatchNorm2D):
param_init.normal_init(m.weight, mean=1.0, std=0.02)
param_init.constant_init(m.bias,value=0)
from paddle import optimizer
import os.path as ops
epochs = 100 # 训练总轮数
learning_rate = 0.05
criterion = SoftLoULoss()
backbone_mode = 'UNet'
blocks_per_layer = 4
warm_up_epochs = 0
fuse_mode = 'AsymBi'
## model
layer_blocks = [blocks_per_layer] * 3
channels = [8, 16, 32, 64]
if backbone_mode == 'FPN':
net = ASKCResNetFPN(layer_blocks, channels, fuse_mode)
elif backbone_mode == 'UNet':
net = ASKCResUNet(layer_blocks, channels, fuse_mode)
net.apply(weight_init)
## optimizer
optimizer = paddle.optimizer.Adagrad(learning_rate=learning_rate, parameters=net.parameters())
## evaluation metrics
iou_metric = SigmoidMetric()
nIoU_metric = SamplewiseSigmoidMetric(1, score_thresh=0.5)
save_pkl = 'weights/%s_%s' %(backbone_mode, fuse_mode) # 预训练模型保存位置
best_iou = 0.0
best_nIoU = 0.0
for epoch in range(1, epochs+1):
training(net, train_dl, epoch, criterion, optimizer, epochs, learning_rate, warm_up_epochs)
IoU, nIoU = validation(net, test_dl, epoch, criterion, iou_metric, nIoU_metric)
pkl_name = 'Epoch-%3d_IoU-%.4f_nIoU-%.4f.pkl' % (epoch, IoU, nIoU)
if IoU > best_iou:
paddle.save(net.state_dict(), ops.join(save_pkl, pkl_name))
best_iou = IoU
if nIoU > best_nIoU:
paddle.save(net.state_dict(), ops.join(save_pkl, pkl_name))
best_nIoU = nIoU
print('Best IoU: %.5f, best nIoU: %.5f' % (best_iou, best_nIoU))
8. 模型验证
pkl_name = 'weights/UNet_AsymBi/Epoch- 93_IoU-0.4525_nIoU-0.3955.pkl'
net.set_state_dict(paddle.load(pkl_name))
net.eval()
TF = T.Compose([
T.Resize((int(image_size[0]), int(image_size[1]))),
T.ToTensor(),
])
image = Image.open('./sirst/images/Misc_1.png').convert('RGB')
label = Image.open('./sirst/masks/Misc_1_pixels0.png')
tensor_img = TF(image)
tensor_img = paddle.unsqueeze(tensor_img, 0)
pred = net(tensor_img)[0]
import cv2
w, h = image.size
prediction = F.sigmoid(pred[0])
prediction = cv2.resize(prediction.numpy(), (w, h))
plt.figure(figsize=(30, 30))
plt.subplot(131)
plt.title('Input')
plt.imshow(np.array(image), cmap='gray')
plt.subplot(132)
plt.title('Pred')
plt.imshow(prediction, cmap='gray')
plt.subplot(133)
plt.title('Label')
plt.imshow(label, cmap='gray')
<matplotlib.image.AxesImage at 0x7fd849e8f2d0>
iou_metric = SigmoidMetric()
nIoU_metric = SamplewiseSigmoidMetric(1, score_thresh=0.5)
iou_metric.reset()
nIoU_metric.reset()
net.eval()
tbar = tqdm(test_dl)
for i, (data, labels) in enumerate(tbar):
output = net(data)
iou_metric.update(output, labels)
nIoU_metric.update(output, labels)
_, IoU = iou_metric.get()
_, nIoU = nIoU_metric.get()
print('IOU %f nIOU %f'%(IoU, nIoU))
9. 结论
这篇论文最重要的贡献就是数据集,在这之前基于深度学习的红外弱小目标论文还较少,这篇文章也一定程度上推动了深度学习在红外弱小目标检测上的发展。
PP-ISTD ?
寻求构建基于的Paddle Paddle的红外弱小目标检测算法, 如果你也对红外弱小目标检测感兴趣欢迎加入一起复现更多的算法。
Asymmetric Contextual Modulation for Infrared Small Target Detection
- [x]
Dense Nested Attention Network for Infrared Small Target Detection
- [x]
此文仅为搬运,原作链接:https://aistudio.baidu.com/aistudio/projectdetail/4338034
更多推荐
所有评论(0)