Table of Contents
- 一、PSPNet 介绍
- 1、原理阐述
- 2、论文解释
- 3、网络模型
- 二、部署实现
- 1、PASCAL VOC 2012
- 2、模型训练
- 3、度量指标
- 4、结果分析
- 5、图像测试
一、PSPNet 介绍
PSPNet(Pyramid Scene Parsing Network)来自于CVPR2017的一篇文章,中文翻译为金字塔场景解析网络,主要用于图像分割。此架构引入了金字塔池化(Pyramid Pooling)模块,以捕捉不同尺度下的上下文信息。Pyramid Pooling可以在不同尺度上提取全局和局部上下文信息,有助于更好地理解图像中的语义内容,从而提高分割性能。
1、原理阐述
- (a)输入图像
- (b)使用预训练的 ResNet 模型获取特征图
- (c)利用Pyramid Pooling获得不同子区域的表示,通过上采样和concat形成包含局部和全局上下文信息的特征表示
- (d)将特征送入卷积层,得到像素级预测结果
2、论文解释
金字塔池化模块融合了四种不同金字塔尺度下的特征。红色标注的是全局池化,以生成单个bin输出。下面的金字塔级别将特征图分为不同的子区域,并形成不同位置的池化表示。金字塔池化模块中不同层级的输出包含不同大小的特征图。为了保持全局特征的权重,在每个金字塔层级后使用 1×1 卷积层,将上下文表示的维度降低到原始维度的 1/N(如果金字塔层级大小为 N)。
然后对低维特征图进行上采样,通过双线性插值得到与原始特征图相同大小的特征。最后将不同级别的特征连接起来作为最终的金字塔池化输出的全局特征。
3、网络模型
class PSPNet(BaseModel):def __init__(self, num_classes, in_channels=3, backbone='resnet152', pretrained=True, use_aux=True, freeze_bn=False, freeze_backbone=False):super(PSPNet, self).__init__()norm_layer = nn.BatchNorm2d # 用于规范化的层类型# 使用getattr根据backbone参数选择合适的骨干网络模型,并可能加载预训练权重model = getattr(resnet, backbone)(pretrained, norm_layer=norm_layer)m_out_sz = model.fc.in_features # 提取骨干网络的输出特征通道数self.use_aux = use_aux # 是否使用辅助分割分支# 初始卷积层,根据in_channels来调整输入通道数self.initial = nn.Sequential(*list(model.children())[:4])if in_channels != 3:self.initial[0] = nn.Conv2d(in_channels, 64, kernel_size=7, stride=2, padding=3, bias=False)self.initial = nn.Sequential(*self.initial)# 骨干网络的不同层self.layer1 = model.layer1self.layer2 = model.layer2self.layer3 = model.layer3self.layer4 = model.layer4# 主要分割分支,包括特征融合和分割输出self.master_branch = nn.Sequential(PSPModule(m_out_sz, bin_sizes=[1, 2, 3, 6], norm_layer=norm_layer), # 特征融合模块nn.Conv2d(m_out_sz // 4, num_classes, kernel_size=1) # 分割输出卷积层)# 辅助分割分支,可选,用于训练时帮助主分割任务self.auxiliary_branch = nn.Sequential(nn.Conv2d(m_out_sz // 2, m_out_sz // 4, kernel_size=3, padding=1, bias=False),norm_layer(m_out_sz // 4),nn.ReLU(inplace=True),nn.Dropout2d(0.1),nn.Conv2d(m_out_sz // 4, num_classes, kernel_size=1))# 初始化网络权重initialize_weights(self.master_branch, self.auxiliary_branch)def forward(self, x):input_size = (x.size()[2], x.size()[3]) # 记录输入图像的尺寸x = self.initial(x) # 初始卷积层x = self.layer1(x) # 第一层x = self.layer2(x) # 第二层x_aux = self.layer3(x) # 第三层,用于辅助分割分支x = self.layer4(x) # 第四层output = self.master_branch(x) # 主要分割分支output = F.interpolate(output, size=input_size, mode='bilinear') # 插值操作,将分割输出大小调整为输入大小output = output[:, :, :input_size[0], :input_size[1]] # 调整输出的尺寸以匹配输入# 如果在训练模式下且使用辅助分割分支,还生成辅助分割输出if self.training and self.use_aux:aux = self.auxiliary_branch(x_aux)aux = F.interpolate(aux, size=input_size, mode='bilinear') # 调整辅助分割输出大小aux = aux[:, :, :input_size[0], :input_size[1]] # 调整输出的尺寸以匹配输入return output, aux # 返回主分割输出和辅助分割输出return output # 只返回主分割输出
其中,PSPModule类的定义如下
class PSPModule(nn.Module):def __init__(self, in_channels, bin_sizes, norm_layer):super(_PSPModule, self).__init__()# 计算每个池化分支的输出通道数out_channels = in_channels // len(bin_sizes)# 创建池化分支,将它们存储在一个 ModuleList 中self.stages = nn.ModuleList([self._make_stages(in_channels, out_channels, b_s, norm_layer) for b_s in bin_sizes])# 创建特征融合模块(bottleneck)self.bottleneck = nn.Sequential(nn.Conv2d(in_channels + (out_channels * len(bin_sizes)), out_channels, kernel_size=3, padding=1, bias=False), # 卷积层norm_layer(out_channels), # 规范化层nn.ReLU(inplace=True), # ReLU激活函数nn.Dropout2d(0.1) # 2D Dropout层)def _make_stages(self, in_channels, out_channels, bin_sz, norm_layer):# 创建池化分支的内部结构prior = nn.AdaptiveAvgPool2d(output_size=bin_sz) # 自适应平均池化层conv = nn.Conv2d(in_channels, out_channels, kernel_size=1, bias=False) # 卷积层bn = norm_layer(out_channels) # 规范化层relu = nn.ReLU(inplace=True) # ReLU激活函数return nn.Sequential(prior, conv, bn, relu) # 返回池化分支的Sequential模块def forward(self, features):h, w = features.size()[2], features.size()[3] # 获取输入特征的高度和宽度pyramids = [features] # 存储原始特征到金字塔中# 遍历每个池化分支,对特征进行插值操作并存储在金字塔中pyramids.extend([F.interpolate(stage(features), size=(h, w), mode='bilinear', align_corners=True) for stage in self.stages])# 将金字塔中的特征拼接在一起并通过特征融合模块output = self.bottleneck(torch.cat(pyramids, dim=1))return output # 返回特征融合后的输出
此类用于执行金字塔池化和特征融合操作,并将它们融合成一个具有更丰富语义信息的特征表示。
PSPNet的核心思想是利用4 级金字塔结构,池化核可以覆盖图像的(whole)整体、(half)一半和(small portions)一小部分,即
self.stages = nn.ModuleList([self._make_stages(in_channels, out_channels, b_s, norm_layer) for b_s in bin_sizes])
以上代码中,self.stages 包含多个池化分支,bin_sizes 为一个包含4个元素的列表,对应于4个不同的池化分支。通过遍历 bin_sizes,使用 make_stages 方法创建了4个池化分支。每个池化分支均由自适应平均池化层、卷积层、归一化层和ReLU激活层组成。由此形成金字塔结构的特征提取部分。
4个池化分支具有不同的感受野大小,以此来捕获不同尺度的图像信息。
换句话说,self.stages 中的每个元素都代表金字塔中的一个级别,体现了4级金字塔结构。forward 方法将遍历这些池化分支,并对输入特征执行插值操作,将它们调整为与原始特征相同的大小,以便进行特征融合。
二、部署实现
我的环境是
- 操作系统:win11
- 语言:python3.10
- IDE:PyCharm 2023
- GPU:RTX 4060
1、PASCAL VOC 2012
Dataset采用经典的PASCAL VOC 2012,一个用于计算机视觉研究的标准数据集,它提供了多种任务的图像数据和相关标注。其中包含20个不同的物体类别,如飞机、自行车、汽车、狗、猫、椅子等,以及一类"背景"。这些图像均是从真实世界中采集,涵盖了不同场景和角度,代表了常见的日常物体。大小方面,包含1,464张训练图像、1,449张验证图像和1,456张测试图像。其中每张图像都带有详细的标注信息,包括每个物体实例的边界框(目标检测任务)和像素级的语义分割标签(语义分割任务)。
2、模型训练
核心训练部分的代码如下:
def _train_epoch(self, epoch):self.logger.info('\n') # 打印日志信息self.model.train() # 设置模型为训练模式if self.config['arch']['args']['freeze_bn']: if isinstance(self.model, torch.nn.DataParallel):self.model.module.freeze_bn() else:self.model.freeze_bn() self.wrt_mode = 'train' # 设置写入模式为'train'tic = time.time() # 记录当前时间self._reset_metrics() # 重置度量指标tbar = tqdm(self.train_loader, ncols=130) # 创建一个进度条以迭代训练数据集for batch_idx, (data, target) in enumerate(tbar): # 遍历训练数据self.data_time.update(time.time() - tic) # 更新数据加载时间self.lr_scheduler.step(epoch=epoch - 1) # 根据当前训练的epoch调整学习率# LOSS & OPTIMIZEself.optimizer.zero_grad() # 清零梯度output = self.model(data) # 前向传播,获取模型输出if self.config['arch']['type'][:3] == 'PSP': assert output[0].size()[2:] == target.size()[1:] # 检查输出和目标的空间尺寸匹配assert output[0].size()[1] == self.num_classes # 检查输出通道数与类别数匹配loss = self.loss(output[0], target) # 计算损失loss += self.loss(output[1], target) * 0.4 # 添加辅助损失,加权为0.4output = output[0] # 将主要输出作为最终输出else:assert output.size()[2:] == target.size()[1:] assert output.size()[1] == self.num_classes loss = self.loss(output, target) if isinstance(self.loss, torch.nn.DataParallel): loss = loss.mean() # 计算损失的均值loss.backward() # 反向传播,计算梯度self.optimizer.step() # 更新模型参数self.total_loss.update(loss.item()) # 更新总损失# measure elapsed timeself.batch_time.update(time.time() - tic) # 更新批次处理时间tic = time.time()# LOGGING & TENSORBOARDif batch_idx % self.log_step == 0: # 每隔一定步数记录一次日志和TensorBoardself.wrt_step = (epoch - 1) * len(self.train_loader) + batch_idx # 当前步数self.writer.add_scalar(f'{self.wrt_mode}/loss', loss.item(), self.wrt_step) # 记录损失到TensorBoard# FOR EVALseg_metrics = eval_metrics(output, target, self.num_classes) # 计算分割度量指标self._update_seg_metrics(*seg_metrics) # 更新分割度量指标pixAcc, mIoU, _ = self._get_seg_metrics().values() # 获取分割指标值# PRINT INFOtbar.set_description('TRAIN ({}) | Loss: {:.3f} | Acc {:.2f} mIoU {:.2f} | B {:.2f} D {:.2f} |'.format(epoch, self.total_loss.average, pixAcc, mIoU,self.batch_time.average, self.data_time.average)) # 打印训练信息# METRICS TO TENSORBOARDseg_metrics = self._get_seg_metrics()for k, v in list(seg_metrics.items())[:-1]: # 遍历分割度量指标并记录self.writer.add_scalar(f'{self.wrt_mode}/{k}', v, self.wrt_step)for i, opt_group in enumerate(self.optimizer.param_groups): # 记录学习率self.writer.add_scalar(f'{self.wrt_mode}/Learning_rate_{i}', opt_group['lr'], self.wrt_step)# RETURN LOSS & METRICSlog = {'loss': self.total_loss.average, # 返回平均损失**seg_metrics} # 返回分割度量指标return log # 返回日志信息
交叉验证部分,我们进行以下的定义
def _valid_epoch(self, epoch):if self.val_loader is None:self.logger.warning('Not data loader was passed for the validation step, No validation is performed !')return {} # 如果没有提供验证数据加载器,发出警告并返回一个空字典self.logger.info('\n###### EVALUATION ######')self.model.eval() # 设置模型为评估(验证)模式self.wrt_mode = 'val' # 设置写入模式为'val'(用于TensorBoard记录)self._reset_metrics() # 重置度量指标tbar = tqdm(self.val_loader, ncols=130) # 创建一个进度条用于遍历验证数据集with torch.no_grad(): # 禁用梯度计算val_visual = [] # 用于可视化的图像列表for batch_idx, (data, target) in enumerate(tbar):#data, target = data.to(self.device), target.to(self.device) # 将数据和目标移到指定的设备上(通常是GPU)# LOSSoutput = self.model(data) # 前向传播,获取模型的输出loss = self.loss(output, target) # 计算损失if isinstance(self.loss, torch.nn.DataParallel): # 如果损失函数是DataParallel损失函数loss = loss.mean() # 计算损失的均值self.total_loss.update(loss.item()) # 更新总损失seg_metrics = eval_metrics(output, target, self.num_classes) # 计算分割度量指标self._update_seg_metrics(*seg_metrics) # 更新分割度量指标# LIST OF IMAGE TO VIZ (15 images)if len(val_visual) < 15: # 用于可视化的图像数量限制在15张以内target_np = target.data.cpu().numpy() # 将目标从GPU移到CPU并转换为NumPy数组output_np = output.data.max(1)[1].cpu().numpy() # 将模型输出的类别概率最大的类别作为预测结果val_visual.append([data[0].data.cpu(), target_np[0], output_np[0]]) # 添加可视化所需的图像和标签# PRINT INFOpixAcc, mIoU, _ = self._get_seg_metrics().values() # 获取分割度量指标的值tbar.set_description('EVAL ({}) | Loss: {:.3f}, PixelAcc: {:.2f}, Mean IoU: {:.2f} |'.format( epoch,self.total_loss.average,pixAcc, mIoU)) # 打印验证信息# WRITING & VISUALIZING THE MASKSval_img = [] # 用于可视化的图像列表palette = self.train_loader.dataset.palette # 获取调色板信息for d, t, o in val_visual: # 遍历可视化图像列表d = self.restore_transform(d) # 还原图像的转换(例如,去均值、缩放等)t, o = colorize_mask(t, palette), colorize_mask(o, palette) # 将标签和模型输出转换为彩色掩码d, t, o = d.convert('RGB'), t.convert('RGB'), o.convert('RGB') # 将图像转换为RGB格式[d, t, o] = [self.viz_transform(x) for x in [d, t, o]] # 应用可视化转换val_img.extend([d, t, o]) # 添加可视化图像到列表中val_img = torch.stack(val_img, 0) # 将可视化图像堆叠成一个张量val_img = make_grid(val_img.cpu(), nrow=3, padding=5) # 使用Grid方式排列可视化图像self.writer.add_image(f'{self.wrt_mode}/inputs_targets_predictions', val_img, self.wrt_step) # 将可视化图像写入TensorBoard# METRICS TO TENSORBOARDself.wrt_step = (epoch) * len(self.val_loader) # 计算当前步数self.writer.add_scalar(f'{self.wrt_mode}/loss', self.total_loss.average, self.wrt_step) # 记录平均损失到TensorBoardseg_metrics = self._get_seg_metrics() # 获取分割度量指标for k, v in list(seg_metrics.items())[:-1]: # 遍历分割度量指标并记录到TensorBoardself.writer.add_scalar(f'{self.wrt_mode}/{k}', v, self.wrt_step)log = {'val_loss': self.total_loss.average, # 返回平均验证损失**seg_metrics # 返回分割度量指标}return log # 返回日志信息# 以下是用于度量指标的辅助函数
def _reset_metrics(self):self.batch_time = AverageMeter() # 用于记录批次处理时间的平均值self.data_time = AverageMeter() # 用于记录数据加载时间的平均值self.total_loss = AverageMeter() # 用于记录总损失的平均值self.total_inter, self.total_union = 0, 0 # 用于记录交集和并集的总和self.total_correct, self.total_label = 0, 0 # 用于记录正确分类和标签的总和def _update_seg_metrics(self, correct, labeled, inter, union):self.total_correct += correct # 更新正确分类的数量self.total_label += labeled # 更新标签的数量self.total_inter += inter # 更新交集的总和self.total_union += union # 更新并集的总和def _get_seg_metrics(self):pixAcc = 1.0 * self.total_correct / (np.spacing(1) + self.total_label) # 计算像素准确率IoU = 1.0 * self.total_inter / (np.spacing(1) + self.total_union) # 计算各类别的IoUmIoU = IoU.mean() # 计算平均IoUreturn {"Pixel_Accuracy": np.round(pixAcc, 3), # 返回像素准确率"Mean_IoU": np.round(mIoU, 3), # 返回平均IoU"Class_IoU": dict(zip(range(self.num_classes), np.round(IoU, 3))) # 返回各类别的IoU}
3、度量指标
这里我们使用两种指标来评估模型的性能:
Pixel_Accuracy(像素准确率)是用于评估图像分割任务性能的一种度量指标,用于衡量模型在整个图像上正确分类的像素数量占总像素数量的比例。可以简单地表示为以下数学公式:
其中:
- “Number of Correctly Classified Pixels” 表示模型在分割图像中正确分类的像素数量。
- “Total Number of Pixels” 表示整个分割图像的像素总数。
Pixel Accuracy的取值范围在0到1之间,其中1表示模型在整个图像上完全正确分类了所有像素,而0表示模型未正确分类任何像素。
Mean_IoU :IoU(Intersection over Union)是一个表示两个集合重叠程度的指标,通常用于分割任务中。在分割任务中,一个集合代表模型的预测分割区域,另一个集合代表真实的分割区域。IoU 的计算公式如下:
其中:
- Area of Intersection" 是模型预测分割区域和真实分割区域的交集面积。
- Area of Union" 是模型预测分割区域和真实分割区域的并集面积。
或者
其中:
- TP(True Positives):表示模型正确预测为正类(目标类别)的像素数量
- FP(False Positives):表示模型错误地将背景像素预测为正类的像素数量
- FN(False Negatives):表示模型错误地将正类像素预测为背景的像素数量
Mean Intersection over Union (Mean IoU)是所有类别IoU的平均值
其中,N是类别的数量,IoU_i是第i个类别的IoU
4、结果分析
主要参数设置部分如下:
"epochs": 80,
"loss": "CrossEntropyLoss2d",
"batch_size": 8,
"base_size": 400, //图像大小调整为base_size,然后随机裁剪
"crop_size": 380, //重新缩放后随机裁剪的大小"optimizer": {"type": "SGD","differential_lr": true,"args":{"lr": 0.01,"weight_decay": 1e-4,"momentum": 0.9}},
由于GPU资源有限,这里只运行80个epoch,得到的日志信息如下:
Tensorboard记录的信息:
train
validation
交叉验证集的Input、Ground Truth和Output对比:
部分细节仍未完全分割,但已经可识别出图像主体。train_loss和val_loss相差不大,并未出现过拟合,增加训练周期可能会达到更好的效果。
5、图像测试
测试部分代码如下:
args = parse_arguments() # 解析命令行参数config = json.load(open(args.config)) # 从JSON文件中加载配置信息# 根据配置信息创建数据加载器loader = getattr(dataloaders, config['train_loader']['type'])(**config['train_loader']['args'])to_tensor = transforms.ToTensor() # 创建图像到张量的转换normalize = transforms.Normalize(loader.MEAN, loader.STD) # 创建归一化转换num_classes = loader.dataset.num_classes # 获取数据集中的类别数量palette = loader.dataset.palette # 获取颜色映射表# 创建模型model = getattr(models, config['arch']['type'])(num_classes, **config['arch']['args']) # 根据配置创建模型availble_gpus = list(range(torch.cuda.device_count())) # 获取可用的GPU列表device = torch.device('cuda:0' if len(availble_gpus) > 0 else 'cpu') # 选择运行设备(GPU或CPU)# 加载模型检查点checkpoint = torch.load(args.model, map_location=device)if isinstance(checkpoint, dict) and 'state_dict' in checkpoint.keys():checkpoint = checkpoint['state_dict']# 如果在训练期间使用了数据并行,需要处理模型if 'module' in list(checkpoint.keys())[0] and not isinstance(model, torch.nn.DataParallel):# 对于GPU推理,使用数据并行if "cuda" in device.type:model = torch.nn.DataParallel(model)else:# 对于CPU推理,移除模型的"module"前缀new_state_dict = OrderedDict()for k, v in checkpoint.items():name = k[7:]new_state_dict[name] = vcheckpoint = new_state_dict# 加载模型权重model.load_state_dict(checkpoint)model.to(device) # 将模型移动到所选设备model.eval() # 设置模型为评估模式# 创建输出目录if not os.path.exists('outputs'):os.makedirs('outputs')# 获取图像文件列表image_files = sorted(glob(os.path.join(args.images, f'*.{args.extension}')))with torch.no_grad():tbar = tqdm(image_files, ncols=100) # 创建进度条for img_file in tbar:image = Image.open(img_file).convert('RGB') # 打开图像并将其转换为RGB格式input = normalize(to_tensor(image)).unsqueeze(0) # 转换图像并添加批次维度#预测图像分割结果prediction = multi_scale_predict(model, input, scales, num_classes, device)prediction = F.softmax(torch.from_numpy(prediction), dim=0).argmax(0).cpu().numpy() # 计算最终的预测结果save_images(image, prediction, args.output, img_file, palette) # 保存预测结果的图像
其中对多尺度图像预测函数定义为:
def multi_scale_predict(model, image, scales, num_classes, device, flip=False):# 获取输入图像的尺寸input_size = (image.size(2), image.size(3))# 创建上采样层,用于将不同尺度的预测结果恢复到原始尺寸upsample = nn.Upsample(size=input_size, mode='bilinear', align_corners=True)# 初始化用于累计预测结果的数组total_predictions = np.zeros((num_classes, image.size(2), image.size(3)))# 将输入图像转换为NumPy数组,并移动到CPU上image = image.data.data.cpu().numpy()# 遍历不同的尺度for scale in scales:# 缩放图像scaled_img = ndimage.zoom(image, (1.0, 1.0, float(scale), float(scale)), order=1, prefilter=False)# 将缩放后的图像转换为PyTorch张量并移动到指定设备scaled_img = torch.from_numpy(scaled_img).to(device)# 使用模型进行预测并上采样到原始尺寸scaled_prediction = upsample(model(scaled_img).cpu())# 如果启用了翻转,对翻转后的图像进行预测并平均if flip:fliped_img = scaled_img.flip(-1).to(device)fliped_predictions = upsample(model(fliped_img).cpu())scaled_prediction = 0.5 * (fliped_predictions.flip(-1) + scaled_prediction)# 将当前尺度的预测结果累加到总体预测中total_predictions += scaled_prediction.data.cpu().numpy().squeeze(0)# 计算平均预测结果total_predictions /= len(scales)return total_predictions
我们任意指定一张输入图像,测试模型的分割效果
效果欠佳,如果大家有资源可以增加epoch数量训练,尝试不同的数据集
——————————————————————————————————————————————