1. 简介

  • 线框解析(Wireframe parsing),即要在图像中检测出显著的直线(salient line segments)和连接点(junction)。

2. 演示

  • 示例图像:

在这里插入图片描述

  • 检测结果:

在这里插入图片描述

3. 参考

4. 模型

4.1 研究背景

  • 在这篇论文之前,效果最好的线框解析方法是 L-CNN,L-CNN 对于这个问题的处理方法是通过深度学习学习到一个连接点检测器(junction detector),而没有直接学习一个线段检测器.

  • 对于线段,L-CNN 使用了一种采样方法基于预测的 junctions 来生成线段的 proposals,之后通过一个验证模块对 proposals 进行分类。

  • L-CNN的整体结构如下图:

在这里插入图片描述

  • HAWP 的论文作者所提出的另外一种方法 Attraction Field Map(AFM),在学习过程中则没有使用 junction 的信息,而是对线段进行检测。

4.2 总体架构

  • 模型结构图如下所示:

在这里插入图片描述

  • HAWP 主要分为三个部分:

    • Proposal Initialization:主要是根据输入图像经过某种网络(论文中使用了堆叠沙漏网络 Stacked Hourglass Network)得到的特征图预测 junctions 和 line segments

    • Proposal Refinement:对线段和连接点进行匹配,将不能匹配的线段和连接点舍弃掉

    • Proposal Verification:将上一步保留的线段和连接点进行分类,这里使用的是 L-CNN 当中相同的方法

4.3 Holistic Attraction Field Representation

  • 这一部分是对原图中的像素进行重新参数化,得到线段的表示,从而能够使用网络进行学习。

在这里插入图片描述

  • 从上图可以看出论文中使用了三步完成对线段的表示:

    • 平移:使用 p 作为新的坐标原点

    • 旋转:线段旋转到与新坐标系的 Y 轴平行,坐标原点(p)在线段左侧,旋转角度为θ

    • 缩放:使用 p 到线段的距离 d 作为坐标轴的单位长度,对坐标进行标准化,同时令 θ1、θ2 分别表示图中的两个角,则线段两端点的坐标可以重新表示为图中方式

  • 这样线段 l ¨ \ddot{l} l¨ 支持区域中的点 p 可以重新参数化为: p ( l ¨ ) = ( d , θ , θ 1 , θ 2 ) p(\ddot{l})=(d,\theta,\theta_1,\theta_2) p(l¨)=(d,θ,θ1,θ2)

  • 另外,原图中有些不属于任何一条线段的支持区域的点,被称作背景点(因为与所有线段的 d 都大于某个设定好的阈值),这种点重新参数化为 (-1, 0, 0, 0)

5. 实现

5.1 导入依赖

import cv2
import numpy as np

import paddle
import paddle.nn as nn
import paddle.nn.functional as F

from PIL import Image

5.2 骨干网络

  • HWAP 的骨干网络采用关键点检测模型中经典的堆叠沙漏神经网络(Stacked Hourglass Networks)

  • 这个网络结构能够捕获并整合图像所有尺度的信息。因为它的结构长得很像堆叠起来的沙漏,所以取名 Stacked Hourglass Networks,大体结构如下图所示:

  • 这种堆叠在一起的 Hourglass 模块结构是对称的,bottom-up 过程将图片从高分辨率降到低分辨率,top-down 过程将图片从低分辨率升到高分辨率,这种网络结构包含了许多 pooling 和upsampling 的步骤,pooling 可以将图片降到一个很低的分辨率,upsampling 可以结合多个分辨率的特征。

  • 其中最重要的模块就是 Hourglass 模块,在 Hourglass 模块中,卷积和 max pooling 被用来将特征降到一个很低的分辨率,在每一个 max pooling 步骤中,网络产生分支并在原来提前池化的分辨率下使用更多的卷积,当到达最低的分辨率的时候,网络开始 upsample 并结合不同尺度下的特征。这里 upsample(上采样)采用的方法是最邻近插值,之后再将两个特征集按元素位置相加。

class Bottleneck2D(nn.Layer):
    expansion = 2

    def __init__(self, inplanes, planes, stride=1, downsample=None):
        super(Bottleneck2D, self).__init__()

        self.bn1 = nn.BatchNorm2D(inplanes)
        self.conv1 = nn.Conv2D(inplanes, planes, kernel_size=1)
        self.bn2 = nn.BatchNorm2D(planes)
        self.conv2 = nn.Conv2D(
            planes, planes, kernel_size=3, stride=stride, padding=1)
        self.bn3 = nn.BatchNorm2D(planes)
        self.conv3 = nn.Conv2D(planes, planes * 2, kernel_size=1)
        self.relu = nn.ReLU()
        self.downsample = downsample
        self.stride = stride

    def forward(self, x):
        residual = x

        out = self.bn1(x)
        out = self.relu(out)
        out = self.conv1(out)

        out = self.bn2(out)
        out = self.relu(out)
        out = self.conv2(out)

        out = self.bn3(out)
        out = self.relu(out)
        out = self.conv3(out)

        if self.downsample is not None:
            residual = self.downsample(x)

        out += residual

        return out


class Hourglass(nn.Layer):
    def __init__(self, block, num_blocks, planes, depth):
        super(Hourglass, self).__init__()
        self.depth = depth
        self.block = block
        self.hg = self._make_hour_glass(block, num_blocks, planes, depth)

    def _make_residual(self, block, num_blocks, planes):
        layers = []
        for i in range(0, num_blocks):
            layers.append(block(planes * block.expansion, planes))
        return nn.Sequential(*layers)

    def _make_hour_glass(self, block, num_blocks, planes, depth):
        hg = []
        for i in range(depth):
            res = []
            for j in range(3):
                res.append(self._make_residual(block, num_blocks, planes))
            if i == 0:
                res.append(self._make_residual(block, num_blocks, planes))
            hg.append(nn.LayerList(res))
        return nn.LayerList(hg)

    def _hour_glass_forward(self, n, x):
        up1 = self.hg[n - 1][0](x)
        low1 = F.max_pool2d(x, 2, stride=2)
        low1 = self.hg[n - 1][1](low1)

        if n > 1:
            low2 = self._hour_glass_forward(n - 1, low1)
        else:
            low2 = self.hg[n - 1][3](low1)
        low3 = self.hg[n - 1][2](low2)
        up2 = F.interpolate(low3, scale_factor=2)
        out = up1 + up2
        return out

    def forward(self, x):
        return self._hour_glass_forward(self.depth, x)


class HourglassNet(nn.Layer):
    def __init__(self, inplanes, num_feats, block, head, depth, num_stacks, num_blocks, num_classes):
        super(HourglassNet, self).__init__()

        self.inplanes = inplanes
        self.num_feats = num_feats
        self.num_stacks = num_stacks
        self.conv1 = nn.Conv2D(
            3, self.inplanes, kernel_size=7, stride=2, padding=3)
        self.bn1 = nn.BatchNorm2D(self.inplanes)
        self.relu = nn.ReLU()
        self.layer1 = self._make_residual(block, self.inplanes, 1)
        self.layer2 = self._make_residual(block, self.inplanes, 1)
        self.layer3 = self._make_residual(block, self.num_feats, 1)
        self.maxpool = nn.MaxPool2D(2, stride=2)

        ch = self.num_feats * block.expansion

        hg, res, fc, score, fc_, score_ = [], [], [], [], [], []
        for i in range(num_stacks):
            hg.append(Hourglass(block, num_blocks, self.num_feats, depth))
            res.append(self._make_residual(block, self.num_feats, num_blocks))
            fc.append(self._make_fc(ch, ch))
            score.append(head(ch, num_classes))
            if i < num_stacks - 1:
                fc_.append(nn.Conv2D(ch, ch, kernel_size=1))
                score_.append(nn.Conv2D(num_classes, ch, kernel_size=1))
        self.hg = nn.LayerList(hg)
        self.res = nn.LayerList(res)
        self.fc = nn.LayerList(fc)
        self.score = nn.LayerList(score)
        self.fc_ = nn.LayerList(fc_)
        self.score_ = nn.LayerList(score_)

    def _make_residual(self, block, planes, blocks, stride=1):
        downsample = None
        if stride != 1 or self.inplanes != planes * block.expansion:
            downsample = nn.Sequential(
                nn.Conv2D(
                    self.inplanes,
                    planes * block.expansion,
                    kernel_size=1,
                    stride=stride,
                )
            )

        layers = []
        layers.append(block(self.inplanes, planes, stride, downsample))
        self.inplanes = planes * block.expansion
        for i in range(1, blocks):
            layers.append(block(self.inplanes, planes))

        return nn.Sequential(*layers)

    def _make_fc(self, inplanes, outplanes):
        bn = nn.BatchNorm2D(inplanes)
        conv = nn.Conv2D(inplanes, outplanes, kernel_size=1)
        return nn.Sequential(conv, bn, self.relu)

    def forward(self, x):
        out = []
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)

        x = self.layer1(x)
        x = self.maxpool(x)
        x = self.layer2(x)
        x = self.layer3(x)

        for i in range(self.num_stacks):
            y = self.hg[i](x)
            y = self.res[i](y)
            y = self.fc[i](y)
            score = self.score[i](y)
            out.append(score)

            if i < self.num_stacks - 1:
                fc_ = self.fc_[i](y)
                score_ = self.score_[i](score)
                x = x + fc_ + score_

        return out[::-1], y


class MultitaskHead(nn.Layer):
    def __init__(self, input_channels, num_class, head_size):
        super(MultitaskHead, self).__init__()

        m = int(input_channels / 4)
        heads = []
        for output_channels in sum(head_size, []):
            heads.append(
                nn.Sequential(
                    nn.Conv2D(input_channels, m, kernel_size=3, padding=1),
                    nn.ReLU(),
                    nn.Conv2D(m, output_channels, kernel_size=1),
                )
            )
        self.heads = nn.LayerList(heads)
        assert num_class == sum(sum(head_size, []))

    def forward(self, x):
        return paddle.concat([head(x) for head in self.heads], axis=1)

5.3 解码器

  • 使用 CNN 骨干网络提取出图像特征之后,需要通过一个解码器去解码线段表示,解码器结构如下图所示:

在这里插入图片描述

  • 解码器包含模型后处理大致完成如下几个功能:

    • 将输出特征按通道拆分为不同功能的特征

    • 通过 NMS 操作剔除多余的线条

    • 转换模型输出的线段表示为常规的 (x1, y1, x2, y2) 格式

def non_maximum_suppression(a):
    ap = F.max_pool2d(a, 3, stride=1, padding=1)[0]
    mask = (a == ap).cast('float32').clip(min=0.0)
    return a * mask


def get_junctions(jloc, joff, topk=300, th=0):
    _, width = jloc.shape[1], jloc.shape[2]
    jloc = jloc.reshape((-1,))
    joff = joff.reshape((2, -1))

    scores, index = paddle.topk(jloc, k=topk)
    y = (index // width).cast('float32') + \
        paddle.gather(joff[1], index, 0) + 0.5
    x = (index % width).cast('float32') + \
        paddle.gather(joff[0], index, 0) + 0.5

    junctions = paddle.stack((x, y)).t()

    return junctions[scores > th], scores[scores > th]


class WireframeDetector(nn.Layer):
    def __init__(self,
                 backbone,
                 n_dyn_junc,
                 n_dyn_posl,
                 n_dyn_negl,
                 n_dyn_othr,
                 n_dyn_othr2,
                 n_pts0,
                 n_pts1,
                 dim_loi,
                 dim_fc,
                 n_out_junc,
                 n_out_line,
                 use_residual):
        super(WireframeDetector, self).__init__()
        self.backbone = backbone

        self.n_dyn_junc = n_dyn_junc
        self.n_dyn_posl = n_dyn_posl
        self.n_dyn_negl = n_dyn_negl
        self.n_dyn_othr = n_dyn_othr
        self.n_dyn_othr2 = n_dyn_othr2
        self.n_pts0 = n_pts0
        self.n_pts1 = n_pts1
        self.dim_loi = dim_loi
        self.dim_fc = dim_fc
        self.n_out_junc = n_out_junc
        self.n_out_line = n_out_line
        self.use_residual = use_residual

        self.register_buffer('tspan', paddle.linspace(
            0, 1, self.n_pts0)[None, None, :])
        self.loss = nn.BCEWithLogitsLoss(reduction='none')

        self.fc1 = nn.Conv2D(256, self.dim_loi, 1)
        self.pool1d = nn.MaxPool1D(
            self.n_pts0//self.n_pts1, self.n_pts0//self.n_pts1)
        self.fc2 = nn.Sequential(
            nn.Linear(self.dim_loi * self.n_pts1, self.dim_fc),
            nn.ReLU(),
            nn.Linear(self.dim_fc, self.dim_fc),
            nn.ReLU(),
            nn.Linear(self.dim_fc, 1),
        )
        self.train_step = 0

    def pooling(self, features_per_image, lines_per_im):
        h, w = features_per_image.shape[1], features_per_image.shape[2]
        U, V = lines_per_im[:, :2], lines_per_im[:, 2:]
        sampled_points = U[:, :, None]*self.tspan + \
            V[:, :, None]*(1-self.tspan) - 0.5
        sampled_points = sampled_points.transpose((0, 2, 1)).reshape((-1, 2))
        px, py = sampled_points[:, 0], sampled_points[:, 1]
        px0 = px.floor().clip(min=0, max=w-1)
        py0 = py.floor().clip(min=0, max=h-1)
        px1 = (px0 + 1).clip(min=0, max=w-1)
        py1 = (py0 + 1).clip(min=0, max=h-1)
        px0l, py0l, px1l, py1l = px0.cast('int64'), py0.cast(
            'int64'), px1.cast('int64'), py1.cast('int64')

        flatten_features = features_per_image.flatten(1)
        xp = (
            paddle.index_select(flatten_features, py0l * w + px0l, 1) * (py1 - py) * (px1 - px) +
            paddle.index_select(flatten_features, py1l * w + px0l, 1) * (py - py0) * (px1 - px) +
            paddle.index_select(flatten_features, py0l * w + px1l, 1) * (py1 - py) * (px - px0) +
            paddle.index_select(flatten_features, py1l * w + px1l, 1) * (py - py0) * (px - px0)
        ).reshape((128, -1, 32)).transpose((1, 0, 2))

        xp = self.pool1d(xp)
        features_per_line = xp.reshape((-1, self.n_pts1*self.dim_loi))
        logits = self.fc2(features_per_line).flatten()
        return logits

    def forward(self, images):
        outputs, features = self.backbone(images)

        loi_features = self.fc1(features)
        output = outputs[0]
        md_pred = F.sigmoid(output[:, :3])
        dis_pred = F.sigmoid(output[:, 3:4])
        res_pred = F.sigmoid(output[:, 4:5])
        jloc_pred = F.softmax(output[:, 5:7], 1)[:, 1:]
        joff_pred = F.sigmoid(output[:, 7:9]) - 0.5

        batch_size = md_pred.shape[0]
        assert batch_size == 1

        if self.use_residual:
            lines_pred = self.proposal_lines(
                md_pred[0], dis_pred[0], res_pred[0]).reshape((-1, 4))
        else:
            lines_pred = self.proposal_lines(
                md_pred[0], dis_pred[0], None).reshape((-1, 4))

        jloc_pred_nms = non_maximum_suppression(jloc_pred)
        topK = min(300, int((jloc_pred_nms > 0.008).cast('float32').sum().item()))

        juncs_pred, _ = get_junctions(non_maximum_suppression(
            jloc_pred), joff_pred[0], topk=topK)

        idx_junc_to_end1 = paddle.sum(
            (lines_pred[:, :2]-juncs_pred[:, None])**2, axis=-1).argmin(0)
        idx_junc_to_end2 = paddle.sum(
            (lines_pred[:, 2:] - juncs_pred[:, None]) ** 2, axis=-1).argmin(0)

        idx_junc_to_end_min = paddle.minimum(
            idx_junc_to_end1, idx_junc_to_end2)
        idx_junc_to_end_max = paddle.maximum(
            idx_junc_to_end1, idx_junc_to_end2)

        iskeep = (idx_junc_to_end_min < idx_junc_to_end_max)

        idx_lines_for_junctions = paddle.unique(
            paddle.concat(
                (idx_junc_to_end_min[iskeep][..., None], idx_junc_to_end_max[iskeep][..., None]), axis=1),
            axis=0)
        lines_adjusted = paddle.concat(
            (juncs_pred[idx_lines_for_junctions[:, 0]], juncs_pred[idx_lines_for_junctions[:, 1]]), axis=1)

        scores = F.sigmoid(self.pooling(loi_features[0], lines_adjusted))

        return lines_adjusted, scores

    def proposal_lines(self, md_maps, dis_maps, residual_maps, scale=5.0):
        """
        :param md_maps: 3xhxw, the range should be (0,1) for every element
        :param dis_maps: 1xhxw
        :return:
        """
        sign_pad = paddle.to_tensor(
            [-1, 0, 1], dtype=paddle.float32).reshape((3, 1, 1))

        if residual_maps is None:
            dis_maps_new = dis_maps.tile((1, 1, 1))
        else:
            dis_maps_new = dis_maps.tile(
                (3, 1, 1))+sign_pad*residual_maps.tile((3, 1, 1))
        height, width = md_maps.shape[1], md_maps.shape[2]
        _y = paddle.arange(0, height).cast('float32')
        _x = paddle.arange(0, width).cast('float32')

        y0, x0 = paddle.meshgrid(_y, _x)
        md_ = (md_maps[0]-0.5)*np.pi*2
        st_ = md_maps[1]*np.pi/2
        ed_ = -md_maps[2]*np.pi/2

        cs_md = paddle.cos(md_)
        ss_md = paddle.sin(md_)

        cs_st = paddle.cos(st_).clip(min=1e-3)
        ss_st = paddle.sin(st_).clip(min=1e-3)

        cs_ed = paddle.cos(ed_).clip(min=1e-3)
        ss_ed = paddle.sin(ed_).clip(max=-1e-3)

        y_st = ss_st/cs_st
        y_ed = ss_ed/cs_ed

        x_st_rotated = (cs_md-ss_md*y_st)[None]*dis_maps_new*scale
        y_st_rotated = (ss_md + cs_md*y_st)[None]*dis_maps_new*scale

        x_ed_rotated = (cs_md - ss_md*y_ed)[None]*dis_maps_new*scale
        y_ed_rotated = (ss_md + cs_md*y_ed)[None]*dis_maps_new*scale

        x_st_final = (x_st_rotated + x0[None]).clip(min=0, max=width-1)
        y_st_final = (y_st_rotated + y0[None]).clip(min=0, max=height-1)

        x_ed_final = (x_ed_rotated + x0[None]).clip(min=0, max=width-1)
        y_ed_final = (y_ed_rotated + y0[None]).clip(min=0, max=height-1)

        lines = paddle.stack(
            (x_st_final, y_st_final, x_ed_final, y_ed_final)).transpose((1, 2, 3, 0))

        return lines

5. 测试

  • 使用上面搭建好的模型,创建一个模型实例,测试模型网络是否正常
head_size = [[3], [1], [1], [2], [2]]
backbone = HourglassNet(
    block=Bottleneck2D,
    inplanes=64,
    num_feats=128,
    depth=4,
    head=lambda c_in, c_out: MultitaskHead(
        c_in, c_out, head_size=head_size),
    num_stacks=2,
    num_blocks=1,
    num_classes=sum(sum(head_size, []))
)

detector = WireframeDetector(
    backbone=backbone,
    n_dyn_junc=300,
    n_dyn_posl=300,
    n_dyn_negl=0,
    n_dyn_othr=0,
    n_dyn_othr2=300,
    n_pts0=32,
    n_pts1=8,
    dim_loi=128,
    dim_fc=1024,
    n_out_junc=250,
    n_out_line=2500,
    use_residual=True
)

params = paddle.load('hawp.pdparams')
detector.set_state_dict(params)
detector.eval()

6. 推理

6.1 构建预处理函数

  • 缩放图像

  • 转换颜色空间

  • 归一化

  • 转置

  • 新增维度

  • 类型转换

  • 转为张量

def preprocess(img):
    img = cv2.resize(img, dsize=(512, 512))
    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    mean = [0.485, 0.456, 0.406]
    std = [0.229, 0.224, 0.225]
    img = (img / 255.0 - mean) / std
    img = img.transpose(2, 0, 1)[None, ...].astype('float32')
    img_tensor = paddle.to_tensor(img)
    return img_tensor

6.2 模型推理过程

  • 读取图像

  • 图像预处理

  • 模型前向计算

  • 根据得分筛选线段

  • 线段坐标反归一化

  • 根据线段坐标绘图

img = cv2.imread('sample.png')
h, w = img.shape[:2]
input_tensor = preprocess(img)
with paddle.no_grad():
    lines, scores = detector(input_tensor)
    lines = lines[scores>0.9]
    lines = (lines / 128 * paddle.to_tensor([w, h, w, h], dtype=paddle.float32)).cast(paddle.int64).numpy()

for line in lines:
    x1, y1, x2, y2 = line
    cv2.line(img, (x1, y1), (x2, y2), (255, 0, 255), 1)

cv2.imwrite('lines.jpg', img)
Image.fromarray(cv2.cvtColor(img, cv2.COLOR_BGR2RGB))

在这里插入图片描述

7. 总结

  • 简单介绍和搭建了一个线框解析模型 HAWP,并使用官方预训练模型实现模型推理。

此文章为搬运
原项目链接

Logo

学大模型,用大模型上飞桨星河社区!每天8点V100G算力免费领!免费领取ERNIE 4.0 100w Token >>>

更多推荐