作者:SkyXZ
CSDN:SkyXZ~-CSDN博客
博客园:SkyXZ - 博客园
宿主机环境:WSL2-Ubuntu22.04+Cuda12.6、D-Robotics-OE 1.2.8、Ubuntu20.04 GPU Docker
端侧设备环境:RDK X5-Server-3.1.0
2025年随着RDK X5的发布,地瓜官方随之也开放了RDK系列DNN推理部署的Python版的API进一步降低了使用X5的门槛,上一篇文章我介绍了C++版的API,这一篇文章我将介绍一下如何使用Python在我们的RDK X5上部署推理代码,地瓜的算法工具链的安装及模型的量化教程在上一篇已经写了具体请参照:学弟一看就会的RDKX5模型转换及部署,你确定不学? - SkyXZ - 博客园
地瓜这次释放的Python推理API可以分为两种版本,一种版本为BSP,使用hobot_dnn
包,这是地瓜BSP(Board Support Package)提供的底层API,可以给用户提供更底层的控制,虽然需要手动实现更多的预处理和后处理步骤,但是可以更精细地控制内存和计算资源,适合需要深度优化或自定义处理的场景;另外一种版本为ModelZoo,使用bpu_infer_lib
,这是地瓜ModelZoo提供的高级封装API适合用来进行快速验证和部署,其API更加简洁,使用也更方便,这个包提供更完整的预处理和后处理功能,包含了很多内置的辅助函数且模型加载和推理过程更简化,我们将在此结合手册手把手介绍这两种推理方式的使用
一、ModelZoo——bpu_infer_lib
- 参考资料:4.3.2 ModelZoo快速上手 | RDK DOC
我们先从最简单最易用的包开始讲起叭,根据这个包中的.cpython-310-aarch64-linux-gnu.so
文件可以知道bpu_infer_lib
模块是为Python 3.10编译的,因此我们首先要确保我们的Python环境是3.10的版本,在安装完环境之后我们便可以从地瓜官方仓库下载bpu_infer_lib
包啦(其他包如:OpenCV默认大家已经自行安装),具体命令如下大家可以根据自己的设备(X3 or X5)来选择下载:
# RDK X5 Python3.10
pip install bpu_infer_lib_x5 -i http://archive.d-robotics.cc/simple/ --trusted-host archive.d-robotics.cc
# RDK X3
pip install bpu_infer_lib_x3 -i http://archive.d-robotics.cc/simple/ --trusted-host archive.d-robotics.cc
安装完之后我们先来查看地瓜官方的API手册,根据手册的介绍ModelZoo的推理API为一个Infer
类,而这个类里面一共只有四个用法,分别是load_model
、read_input
、forward
和get_output
用于加载模型、读取输入、推理以及获取输出,整个推理的步骤使用起来非常的简单,接下来我将手把手带着大家结合API以及官方在ModelZoo中给出的jupyter_ModelZoo_YOLOv5.ipynb
示例代码来从零部署一遍推理部分,首先我们先导入一些必要的包:
import cv2
import numpy as np
from scipy.special import softmax
from scipy.special import expit as sigmoid
from time import time
import bpu_infer_lib # Model Zoo Python API
from typing import Tuple
import os
(1)封装BPU_Detect类及初始化
我们还是使用类的形式来实现我们的检测,因此我们还是来创建我们的BPU_Detect
类,我们在这个类里首先将常见参数进行初始化,由于初始化特征图网络只需要加载一次因此我们创建了一个私有的_init_grids
用于加载特征图,同时我们依旧按照前处理、推理、后处理的推理流程来对应创建了三个函数PreProcess
、detect
、PostPrecess
以及一系列的辅助函数:bgr2nv12_opencv
、draw_detection
和结果处理函数detect_result
class BPU_Detect:def __init__(self, labelnames, model_path, conf, iou, anchors, strides, mode, is_save):# 初始化类def _init_grids(self) : # 初始化特征图网格函数def bgr2nv12_opencv(self, image): # OpenCV-BGR图像转推理NV12数据def PreProcess(self, img): # 预处理函数用于图像预处理def PostPrecess(self, method): # 后处理函数def draw_detection(self, img, box, score, class_id, labelname): # 结果可视化绘制函数def detect_result(self, img): # 推理结果处理函数def detect(self, img_path, method_pre, method_post): # 推理函数
在BPU_Detect
类中我们首先初始化我们需要的所有参数比如我们最基本的模型路径、标签列表以及标签数量和置信度等参数,由于我们的特征图只需要初始化一次,因此我们便将特征图初始化函数添加至类的初始化中,在每次继承类的时候便完成特征图的初始化,最后我们继承bpu_infer_lib.Infer
类,并直接完成模型的加载load_model
class BPU_Detect:def __init__(self, model_path:str,labelnames:list,num_classes:int = None,conf:float = 0.45,iou:float = 0.45,anchors:np.array = np.array([[10,13, 16,30, 33,23], # P3/8[30,61, 62,45, 59,119], # P4/16[116,90, 156,198, 373,326], # P5/32]),strides = np.array([8, 16, 32]),mode:bool = False,is_save:bool = False):self.model = model_pathself.labelname = labelnamesself.conf = confself.iou = iouself.anchors = anchorsself.strides = stridesself.input_w = 640self.input_h = 640self.nc = num_classes if num_classes is not None else len(self.labelname)self.mode = modeself.is_save = is_saveself._init_grids()self.inf = bpu_infer_lib.Infer(self.mode)self.inf.load_model(self.model)
(2)完成_init_grid()特征图及锚框生成函数
接着便要来完成我们的_init_grids
函数了,特征图大家应该不会陌生啦,在Yolov5中,特征图的生成与不同尺度的网格和锚框紧密相关,每个尺度的特征图通过步幅(strides)来计算出不同大小的网格,这些网格点会映射到输入图像的对应位置,通过这种方式,模型便能够对输入图像进行多尺度的目标检测,具体而言,我们需要根据输入图像的宽度和高度,结合每个尺度的步幅,生成相应的网格坐标,同时,为每个网格点分配合适的锚框,这样之后网络便可以更好地预测图像上物体的位置和大小。而在这里,我们使用np.tile
和np.linspace
来生成每个尺度的网格坐标,通过步幅对输入图像进行划分,计算出每个网格中心的位置,比如在生成小尺度网格时,我们使用以下代码:
self.s_grid = np.stack([np.tile(np.linspace(0.5, self.input_w//self.strides[0] - 0.5, self.input_w//self.strides[0]), reps=self.input_h//self.strides[0]),np.repeat(np.arange(0.5, self.input_h//self.strides[0] + 0.5,1),self.input_w//self.strides[0])], axis=0).transpose(1,0)
这将计算出一个大小适应当前尺度的网格,其中np.linspace
会生成从0.5到输入宽度除以步幅的均匀间隔值,np.repeat
则生成每行的纵坐标,从而确保生成的网格覆盖整个特征图。这些网格的坐标随后会被平铺并重新组织成形状为(-1, 2)
的矩阵,以便每个网格点对应一个特定的空间位置,为后续的锚框匹配和目标定位做好准备,接着我们使用以下代码将先前计算出的网格坐标(self.s_grid
)进行扩展,使用np.hstack
来将原本的网格坐标数组沿水平方向(即列方向)拼接三次用于给每个网格点重复分配多个坐标值:
self.s_grid = np.hstack([self.s_grid, self.s_grid, self.s_grid]).reshape(-1, 2)
在完成了网格的生成之后我们接着来完成锚框的分配,还是以小尺度网格为例,self.anchors[0]
是第一尺度的锚框尺寸,我们通过 np.tile
将其按网格点数量进行重复,确保每个网格点都有一个对应的锚框之后我们使用 .reshape(-1, 2)
将重复后的锚框数组重塑成每行包含两个值(宽度和高度)的形状,这样之后我们最终得到的 self.s_anchors
便是为第一尺度的特征图每个网格点分配的锚框集合:
self.s_anchors = np.tile(self.anchors[0], self.input_w//self.strides[0] * self.input_h//self.strides[0]).reshape(-1, 2)
不同的尺度的grid网格和anchors锚框完整计算代码如下:
# strides的grid网格
self.s_grid = np.stack([np.tile(np.linspace(0.5, self.input_w//self.strides[0] - 0.5, self.input_w//self.strides[0]), reps=self.input_h//self.strides[0]),np.repeat(np.arange(0.5, self.input_h//self.strides[0] + 0.5,1),self.input_w//self.strides[0])], axis=0).transpose(1,0)
self.s_grid = np.hstack([self.s_grid, self.s_grid, self.s_grid]).reshape(-1, 2)self.m_grid = np.stack([np.tile(np.linspace(0.5, self.input_w//self.strides[1] - 0.5, self.input_w//self.strides[1]), reps=self.input_h//self.strides[1]),np.repeat(np.arange(0.5, self.input_h//self.strides[1] + 0.5,1),self.input_w//self.strides[1])], axis=0).transpose(1,0)
self.m_grid = np.hstack([self.m_grid, self.m_grid, self.m_grid]).reshape(-1, 2)self.l_grid = np.stack([np.tile(np.linspace(0.5, self.input_w//self.strides[2] - 0.5, self.input_w//self.strides[2]), reps=self.input_h//self.strides[2]),np.repeat(np.arange(0.5, self.input_h//self.strides[2] + 0.5,1),self.input_w//self.strides[2])], axis=0).transpose(1,0)
self.l_grid = np.hstack([self.l_grid, self.l_grid, self.l_grid]).reshape(-1, 2)
# 用于广播的anchors
self.s_anchors = np.tile(self.anchors[0], self.input_w//self.strides[0] * self.input_h//self.strides[0]).reshape(-1, 2)
self.m_anchors = np.tile(self.anchors[1], self.input_w//self.strides[1] * self.input_h//self.strides[1]).reshape(-1, 2)
self.l_anchors = np.tile(self.anchors[2], self.input_w//self.strides[2] * self.input_h//self.strides[2]).reshape(-1, 2)
但是如果我们按照这样的写法的话代码的就有些许丑陋,我们优化一下通过将重复的网格生成和锚框分配逻辑提取成一个单独的函数 _create_grid
,这样可以避免重复代码的冗余,同时使得每个尺度的网格和锚框的生成更加清晰易懂,我们只需要传入步幅(stride)参数,_create_grid
函数便可以灵活地为不同尺度的特征图生成对应的网格和锚框,因此我们的_init_grids
函数完整代码如下:
def _init_grids(self) :"""初始化特征图网格"""def _create_grid(stride: int) :"""创建单个stride的网格和anchors"""grid = np.stack([np.tile(np.linspace(0.5, self.input_w//stride - 0.5, self.input_w//stride), reps=self.input_h//stride),np.repeat(np.arange(0.5, self.input_h//stride + 0.5, 1), self.input_w//stride)], axis=0).transpose(1,0)grid = np.hstack([grid] * 3).reshape(-1, 2)anchors = np.tile(self.anchors[int(np.log2(stride/8))], self.input_w//stride * self.input_h//stride).reshape(-1, 2)return grid, anchors# 创建不同尺度的网格self.s_grid, self.s_anchors = _create_grid(self.strides[0])self.m_grid, self.m_anchors = _create_grid(self.strides[1]) self.l_grid, self.l_anchors = _create_grid(self.strides[2])print(f"网格尺寸: {self.s_grid.shape = } {self.m_grid.shape = } {self.l_grid.shape = }")print(f"Anchors尺寸: {self.s_anchors.shape = } {self.m_anchors.shape = } {self.l_anchors.shape = }")
(3)完成bgr2nv12_opencv()函数
我们现在开始完成我们的工具函数bgr2nv12_opencv()
,由于我们的BPU需要输入NV12格式的图像才能进行推理,因此我们这个函数主要用于将OpenCV读取的BGR格式的图像转换为BPU可以使用的NV12格式,NV12格式大家应该也不陌生,他是一种YUV格式,其中Y分量平面紧接在一起而UV分量则是交替排列,因此我们首先使用OpenCV将BGR图像转换为YUV420P格式,接着我们提取Y和UV分量并转换为符合NV12格式的结构即可,因此完整代码如下:
def bgr2nv12_opencv(self, image):height, width = image.shape[0], image.shape[1]area = height * widthyuv420p = cv2.cvtColor(image, cv2.COLOR_BGR2YUV_I420).reshape((area * 3 // 2,))y = yuv420p[:area] # Y分量:前area个元素uv_planar = yuv420p[area:].reshape((2, area // 4)) # UV分量:后面的元素,每2个元素分别为U和V分量uv_packed = uv_planar.transpose((1, 0)).reshape((area // 2,)) # 将UV分量交替排列为交错的UV格式nv12 = np.zeros_like(yuv420p) # 创建与原YUV数据形状相同的空数组用于存放NV12格式数据nv12[:height * width] = y # 将Y分量直接赋值到NV12数组的前部nv12[height * width:] = uv_packed # 将交错的UV分量赋值到NV12数组的后部return nv12
(4)完成PreProcess()预处理函数
我们现在开始实现我们的预处理函数部分啦,预处理其实无非就两步,只需要修改输入图像的尺寸以及将输入图像的数据格式转换为NV12即可,接着我们查看用户手册中传入图像数据的read_input
函数的介绍可以知道这个函数会自动校验我们输入的输入数据的尺寸和类型,因此,我们只需要将图像的尺寸调整符合目标大小,再将其转换为BPU所需要的NV12格式后,便可以通过 read_input
函数将数据传入模型啦
因此我们读取图片后首先将输入图片resize到我们所需要的640上,接着使用我们先前定义的bgr2nv12_opencv()
函数将输入图片的格式转化为NV12,最后调用API将准备好的数据输入进模型即可,具体代码如下:
orig_img = cv2.imread(img) # 读取图片
if orig_img is None:raise ValueError(f"无法读取图片: {img}")
input_tensor = cv2.resize(orig_img, (self.input_h, self.input_w)) # 将图片缩放到需求尺寸
input_tensor = self.bgr2nv12_opencv(input_tensor) # 转换图片格式
self.inf.read_input(input_tensor, 0) # 输入图像
虽然按照上述操作我们便可以完成预处理部分,但是!!!由于这个库是使用的 pybind11 封装的 C++ 库,我们在终端中打开Python环境导入bpu_infer_lib后我们使用help函数来打印出这个库的用法会发现关于输入这块,除了官方手册中提到的read_input
,还暴露出了其他可用的接口函数:
(xq) root@ubuntu:~# python
Python 3.10.16 (main, Dec 11 2024, 16:18:56) [GCC 11.2.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> import bpu_infer_lib
>>> help(bpu_infer_lib)
我们在这里以其中一个read_img_to_nv12
API为例子继续介绍,其他的函数大家可以自行探索或者是等地瓜官方完善手册后再使用。read_img_to_nv12
函数的使用比read_input
还简单,因为这个函数输入图像后会自动帮我们将图像转换为NV12格式,因此我们的代码可以如下所示更加简单,只需要用imread
读取图像后直接传入这个函数即可:
orig_img = cv2.imread(img) # 读取图片
if orig_img is None:raise ValueError(f"无法读取图片: {img}")
img_h, img_w = orig_img.shape[0:2]
self.inf.read_img_to_nv12(img, 0)# 使用API的方法直接读取
至此,我们便完成了PreProcess预处理函数,我们加上一些细节,以及图像读取API的选择后完整代码如下:
def PreProcess(self, img, method=0):"""预处理函数Args:img: 输入图片路径method: 选择使用的方法- 0: 使用read_input- 1: 使用read_img_to_nv12"""# 获取原始图片和尺寸if isinstance(img, str):# 输入是图片路径if method == 1:# 先获取原始图片尺寸orig_img = cv2.imread(img)if orig_img is None:raise ValueError(f"无法读取图片: {img}")img_h, img_w = orig_img.shape[0:2]self.inf.read_img_to_nv12(img, 0)# 使用API的方法直接读取elif method == 0:# method == 0,读取图片后处理orig_img = cv2.imread(img)if orig_img is None:raise ValueError(f"无法读取图片: {img}")input_tensor = cv2.resize(orig_img, (self.input_h, self.input_w))input_tensor = self.bgr2nv12_opencv(input_tensor)self.inf.read_input(input_tensor, 0)img_h, img_w = orig_img.shape[0:2]else:print("输入格式有误")return False# 计算缩放比例self.y_scale = img_h / self.input_hself.x_scale = img_w / self.input_wprint(f"原始尺寸: {img_w}x{img_h}, 输入尺寸: {self.input_w}x{self.input_h}")print(f"缩放比例: x_scale={self.x_scale}, y_scale={self.y_scale}")return True
(5)完成PostProcess()后处理函数
完成了预处理后我们接着来完成后处理部分,后处理部分也无非就是几步,首先我们获取到模型的输出之后经过阈值处理将输出转换回我们可以理解的格式,接着根据输出的类别、位置等信息,进行非极大值抑制(NMS),去除冗余的框即完成整个后处理的流程。我们依旧翻开官方的API手册可以看到手册上给了我们get_output
函数用来获取模型的输出,我们可以看到我们只需要使用这个函数便可以从outputs
中获取classes_scores
和bboxes
我们首先使用get_output
函数来获取输出,接着使用classes_scores
和bboxes
来获取结果,我们通过print
这两个结果的维度可以发现classes_scores
的形状为(1, 80, 80, 18)
,这意味着网络的输出是一个大小为80x80的网格,而每个网格点(即每个像素)有18个输出值,分别与每个锚框的6个参数(4个框坐标、1个物体得分、1个类别得分)相关,而bboxes
的形状为(1, 40, 40, 18)
,这意味着bboxes
的形状为(1, 40, 40, 18)
,这意味着网络的另一个输出是一个大小为40x40的网格,每个网格点同样有18个输出值,这些值也与3个锚框的6个参数相关
if not self.inf.get_output():raise RuntimeError("获取输出失败")
classes_scores = self.inf.outputs[0].data # (1, 80, 80, 18)
bboxes = self.inf.outputs[1].data # (1, 40, 40, 18)
接着我们便可以从classes_scores
中提取每个锚框的预测结果,我们设定num_anchors = 3
和pred_per_anchor = 6
分别对应3个锚框和6个参数,初始化3个列表,scores_list
、boxes_list
和ids_list
,用来保存每个网格点经过筛选后的预测框、分数和类别ID
batch, height, width, channels = classes_scores.shape
num_anchors = 3
pred_per_anchor = 6
# 提取每个anchor的预测
scores_list = []
boxes_list = []
ids_list = []
之后我们便开始遍历每个网格点,并针对每个锚框提取其相应的预测值(框坐标、物体得分、类别得分)通过sigmoid激活函数计算得分,如果得分超过设定的阈值(score >= self.conf
),便将框坐标进行解码,并转换为xyxy
格式
# 处理每个网格点
for h in range(height):for w in range(width):for a in range(num_anchors):# 获取当前anchor的预测值start_idx = int(a * pred_per_anchor) # 确保索引是整数box = classes_scores[0, h, w, start_idx:start_idx+4].copy() # 框坐标obj_score = float(classes_scores[0, h, w, start_idx+4]) # objectnesscls_score = float(classes_scores[0, h, w, start_idx+5]) # 类别分数# sigmoid激活obj_score = 1 / (1 + np.exp(-obj_score))cls_score = 1 / (1 + np.exp(-cls_score))score = obj_score * cls_score# 如果分数超过阈值,保存这个预测if score >= self.conf:# 解码框坐标box = 1 / (1 + np.exp(-box)) # sigmoidcx = float((box[0] * 2.0 + w - 0.5) * self.strides[0])cy = float((box[1] * 2.0 + h - 0.5) * self.strides[0])w_pred = float((box[2] * 2.0) ** 2 * self.anchors[0][a*2])h_pred = float((box[3] * 2.0) ** 2 * self.anchors[0][a*2+1])# 转换为xyxy格式x1 = cx - w_pred/2y1 = cy - h_pred/2x2 = cx + w_pred/2y2 = cy + h_pred/2boxes_list.append([x1, y1, x2, y2])scores_list.append(float(score)) # 确保是标量ids_list.append(0) # 假设只有一个类别
如果有检测结果,说明在遍历完所有网格点并进行筛选后,我们有符合条件的框,此时,我们将保存的框坐标列表boxes_list
、分数列表scores_list
和类别ID列表ids_list
转化为NumPy数组,并确保它们的数据类型分别为np.float32
和np.int32
。转换后的数组xyxy
包含框的坐标信息,scores
保存每个框的得分,ids
保存类别标签。而如果没有检测结果,则说明所有框的得分都低于阈值,此时我们创建空的NumPy数组,xyxy
的形状为(0, 4)
表示没有框,scores
和ids
也是空数组,分别表示没有得分和类别ID。这样,我们可以保证后续的代码在处理检测结果时不会出错,避免空列表带来的问题
# 如果有检测结果
if boxes_list:xyxy = np.array(boxes_list, dtype=np.float32)scores = np.array(scores_list, dtype=np.float32)ids = np.array(ids_list, dtype=np.int32)
else:xyxy = np.array([], dtype=np.float32).reshape(0, 4)scores = np.array([], dtype=np.float32)ids = np.array([], dtype=np.int32)
最后在完成框筛选和得分计算后,我们使用OpenCV的cv2.dnn.NMSBoxes
函数进行非极大值抑制(NMS),这个函数的介绍如下:
cv2.dnn.NMSBoxes(boxes, scores, score_threshold, nms_threshold, eta=1.0, top_k=0)
# [boxes] float 类型的数组,形状为 [num_boxes, 4],表示边界框的坐标。每个边界框由其左上角和右下角的坐标 [x, y, w, h] 表示。(x,y)为左上角坐标
# [scores] float 类型的数组,形状为 [num_boxes],表示每个边界框的置信度分数
# [score_threshold] float 类型,用于过滤低于这个分数阈值的边界框
# [nms_threshold] float 类型,用于确定哪些重叠的边界框应该被保留。如果两个边界框的重叠面积大于 nms_threshold,则其中一个边界框将被丢弃
# [eta] float 类型,默认值为 1,用于自适应调整 NMS 阈值。如果设置为小于 1 的值,则 NMS 阈值会随着迭代的进行而减小,这有助于在迭代过程中保留更多的边界框
# [top_k] int 类型,表示保留的最大边界框数量。默认值为 0,表示保留所有边界框
这个NMSBoxes
函数将会返回一个indices
,它是一个包含保留下来的框的索引的列表,我们通过检查indices
的长度,如果大于零,表示有框通过了NMS,我们根据这些索引从原始的xyxy
、scores
和ids
中提取对应的框坐标、得分和类别ID,然后,将框坐标进行缩放(根据输入图像与原始图像的比例),并将它们转换为np.int32
类型。而如果indices
为空,表示没有框通过NMS,这时我们便将bboxes
、scores
和ids
设置为空数组,表示没有有效的检测结果
# NMS处理
indices = cv2.dnn.NMSBoxes(xyxy.tolist(), scores.tolist(), self.conf, self.iou)
if len(indices) > 0:indices = np.array(indices).flatten()self.bboxes = (xyxy[indices] * np.array([self.x_scale, self.y_scale, self.x_scale, self.y_scale])).astype(np.int32)self.scores = scores[indices]self.ids = ids[indices]
else:print("No detections after NMS")self.bboxes = np.array([], dtype=np.int32).reshape(0, 4)self.scores = np.array([], dtype=np.float32)self.ids = np.array([], dtype=np.int32)
和输入图像类似,bpu_infer_lib
库中也提供了其他可用的输出获取接口,而ModelZoo中的官方示例代码jupyter_ModelZoo_YOLOv5.ipynb
中使用的也不是get_output
,而是get_infer_res_np_float32
接口,因此我们接下来以官方的代码为例子介绍这个API
之前介绍的get_output
函数返回的是已经经过处理之后的classes_scores
和bboxes
,但是get_infer_res_np_float32
输出的是未经处理的原始推理结果这些原始输出包含更基础的信息,例如每个网格点的所有锚框的回归值、置信度得分以及类别得分等,因此我们首先需要从get_infer_res_np_float32
函数获取分别对应不同尺度的特征图的预测输出:s_pred
、m_pred
和 l_pred
,接着我们将每个尺度的输出通过reshape
操作转化为一维数组,其中每个元素包含5个回归值(4个边界框坐标和1个置信度)以及类别得分部分,这样我们就得到了每个锚框的预测信息
# 获取不同尺度的特征图的预测输出
s_pred = self.inf.get_infer_res_np_float32(0)
m_pred = self.inf.get_infer_res_np_float32(1)
l_pred = self.inf.get_infer_res_np_float32(2)
# reshape
s_pred = s_pred.reshape([-1, (5 + self.nc)])
m_pred = m_pred.reshape([-1, (5 + self.nc)])
l_pred = l_pred.reshape([-1, (5 + self.nc)])
接下来,我们使用numpy
向量化操作进行阈值筛选,首先计算每个锚框的最大类别得分,并将物体置信度与最大类别得分进行组合,得到每个锚框的综合得分,我们以小尺寸s为例子进行介绍:
首先,我们通过np.max
函数便可以计算出每个锚框的最大类别得分s_raw_max_scores
,这个里面保存的是每个锚框在所有类别中的最大值,然后,我们对物体置信度(s_pred[:, 4]
)和最大类别得分进行sigmoid激活,便可以进一步得到更直观的综合得分s_max_scores
:
s_raw_max_scores = np.max(s_pred[:, 5:], axis=1)
s_max_scores = 1 / ((1 + np.exp(-s_pred[:, 4])) * (1 + np.exp(-s_raw_max_scores)))
接着,我们使用np.flatnonzero
方法筛选出得分大于等于设定阈值(self.conf)
的锚框索引,保证只有那些置信度较高的预测会被保留,最后我们利用np.argmax
找到每个筛选后的锚框对应的最大类别ID,确保我们能够确定每个有效锚框所代表的物体类别
s_valid_indices = np.flatnonzero(s_max_scores >= self.conf)
s_ids = np.argmax(s_pred[s_valid_indices, 5:], axis=1)
s_scores = s_max_scores[s_valid_indices]
而对于其他尺寸的完整代码如下:
# classify: 利用numpy向量化操作完成阈值筛选
s_raw_max_scores = np.max(s_pred[:, 5:], axis=1)
s_max_scores = 1 / ((1 + np.exp(-s_pred[:, 4]))*(1 + np.exp(-s_raw_max_scores)))
s_valid_indices = np.flatnonzero(s_max_scores >= self.conf)
s_ids = np.argmax(s_pred[s_valid_indices, 5:], axis=1)
s_scores = s_max_scores[s_valid_indices]m_raw_max_scores = np.max(m_pred[:, 5:], axis=1)
m_max_scores = 1 / ((1 + np.exp(-m_pred[:, 4]))*(1 + np.exp(-m_raw_max_scores)))
m_valid_indices = np.flatnonzero(m_max_scores >= self.conf)
m_ids = np.argmax(m_pred[m_valid_indices, 5:], axis=1)
m_scores = m_max_scores[m_valid_indices]l_raw_max_scores = np.max(l_pred[:, 5:], axis=1)
l_max_scores = 1 / ((1 + np.exp(-l_pred[:, 4]))*(1 + np.exp(-l_raw_max_scores)))
l_valid_indices = np.flatnonzero(l_max_scores >= self.conf)
l_ids = np.argmax(l_pred[l_valid_indices, 5:], axis=1)
l_scores = l_max_scores[l_valid_indices]
在获得了经过阈值筛选的锚框之后,我们便需要进行特征解码,将网络输出的回归值(如中心坐标、宽高等)转化为实际的边界框坐标,我们还是以小尺度的s为例子,我们先通过sigmoid
函数对每个有效锚框的回归值进行处理,得到位置和尺寸的偏移量也就是锚框回归值s_dxyhw
,其包含包括锚框的中心坐标(dx
, dy
)和宽高(dw
, dh
),接着我们使用网格位置self.s_grid
和步幅self.strides[0]
便可以来还原边界框的真实位置
s_dxyhw = 1 / (1 + np.exp(-s_pred[s_valid_indices, :4]))
s_xy = (s_dxyhw[:, 0:2] * 2.0 + self.s_grid[s_valid_indices,:] - 1.0) * self.strides[0]
s_wh = (s_dxyhw[:, 2:4] * 2.0) ** 2 * self.s_anchors[s_valid_indices, :]
s_xyxy = np.concatenate([s_xy - s_wh * 0.5, s_xy + s_wh * 0.5], axis=-1)
而对于其他尺寸的完整代码如下:
# 特征解码
s_dxyhw = 1 / (1 + np.exp(-s_pred[s_valid_indices, :4]))
s_xy = (s_dxyhw[:, 0:2] * 2.0 + self.s_grid[s_valid_indices,:] - 1.0) * self.strides[0]
s_wh = (s_dxyhw[:, 2:4] * 2.0) ** 2 * self.s_anchors[s_valid_indices, :]
s_xyxy = np.concatenate([s_xy - s_wh * 0.5, s_xy + s_wh * 0.5], axis=-1)m_dxyhw = 1 / (1 + np.exp(-m_pred[m_valid_indices, :4]))
m_xy = (m_dxyhw[:, 0:2] * 2.0 + self.m_grid[m_valid_indices,:] - 1.0) * self.strides[1]
m_wh = (m_dxyhw[:, 2:4] * 2.0) ** 2 * self.m_anchors[m_valid_indices, :]
m_xyxy = np.concatenate([m_xy - m_wh * 0.5, m_xy + m_wh * 0.5], axis=-1)l_dxyhw = 1 / (1 + np.exp(-l_pred[l_valid_indices, :4]))
l_xy = (l_dxyhw[:, 0:2] * 2.0 + self.l_grid[l_valid_indices,:] - 1.0) * self.strides[2]
l_wh = (l_dxyhw[:, 2:4] * 2.0) ** 2 * self.l_anchors[l_valid_indices, :]
l_xyxy = np.concatenate([l_xy - l_wh * 0.5, l_xy + l_wh * 0.5], axis=-1)
最后我们将大中小特征层阈值筛选结果拼接即可得到最终的边界框、得分和类别信息,然后我们将这些信息通过get_output
函数一样的NMS部分即可得到我们需要的检测结果
xyxy = np.concatenate((s_xyxy, m_xyxy, l_xyxy), axis=0)
scores = np.concatenate((s_scores, m_scores, l_scores), axis=0)
ids = np.concatenate((s_ids, m_ids, l_ids), axis=0)
至此,我们便完成了PostProcess预处理函数,我们加上一些细节以及输出获取方式后的完整代码如下:
def PostProcess(self, method=1):"""后处理函数Args:method: 选择使用的方法- '0': 使用get_output- '1': 使用get_infer_res_np_float32"""if method == 1 :# 方法1:使用get_infer_res_np_float32获取原始输出并处理print("\n=== 方法1: 使用get_infer_res_np_float32 ===")s_pred = self.inf.get_infer_res_np_float32(0)m_pred = self.inf.get_infer_res_np_float32(1)l_pred = self.inf.get_infer_res_np_float32(2)print(f"原始输出: {s_pred.shape = } {m_pred.shape = } {l_pred.shape = }")# reshapes_pred = s_pred.reshape([-1, (5 + self.nc)])m_pred = m_pred.reshape([-1, (5 + self.nc)])l_pred = l_pred.reshape([-1, (5 + self.nc)])print(f"Reshape后: {s_pred.shape = } {m_pred.shape = } {l_pred.shape = }")# classify: 利用numpy向量化操作完成阈值筛选s_raw_max_scores = np.max(s_pred[:, 5:], axis=1)s_max_scores = 1 / ((1 + np.exp(-s_pred[:, 4]))*(1 + np.exp(-s_raw_max_scores)))s_valid_indices = np.flatnonzero(s_max_scores >= self.conf)s_ids = np.argmax(s_pred[s_valid_indices, 5:], axis=1)s_scores = s_max_scores[s_valid_indices]m_raw_max_scores = np.max(m_pred[:, 5:], axis=1)m_max_scores = 1 / ((1 + np.exp(-m_pred[:, 4]))*(1 + np.exp(-m_raw_max_scores)))m_valid_indices = np.flatnonzero(m_max_scores >= self.conf)m_ids = np.argmax(m_pred[m_valid_indices, 5:], axis=1)m_scores = m_max_scores[m_valid_indices]l_raw_max_scores = np.max(l_pred[:, 5:], axis=1)l_max_scores = 1 / ((1 + np.exp(-l_pred[:, 4]))*(1 + np.exp(-l_raw_max_scores)))l_valid_indices = np.flatnonzero(l_max_scores >= self.conf)l_ids = np.argmax(l_pred[l_valid_indices, 5:], axis=1)l_scores = l_max_scores[l_valid_indices]# 特征解码s_dxyhw = 1 / (1 + np.exp(-s_pred[s_valid_indices, :4]))s_xy = (s_dxyhw[:, 0:2] * 2.0 + self.s_grid[s_valid_indices,:] - 1.0) * self.strides[0]s_wh = (s_dxyhw[:, 2:4] * 2.0) ** 2 * self.s_anchors[s_valid_indices, :]s_xyxy = np.concatenate([s_xy - s_wh * 0.5, s_xy + s_wh * 0.5], axis=-1)m_dxyhw = 1 / (1 + np.exp(-m_pred[m_valid_indices, :4]))m_xy = (m_dxyhw[:, 0:2] * 2.0 + self.m_grid[m_valid_indices,:] - 1.0) * self.strides[1]m_wh = (m_dxyhw[:, 2:4] * 2.0) ** 2 * self.m_anchors[m_valid_indices, :]m_xyxy = np.concatenate([m_xy - m_wh * 0.5, m_xy + m_wh * 0.5], axis=-1)l_dxyhw = 1 / (1 + np.exp(-l_pred[l_valid_indices, :4]))l_xy = (l_dxyhw[:, 0:2] * 2.0 + self.l_grid[l_valid_indices,:] - 1.0) * self.strides[2]l_wh = (l_dxyhw[:, 2:4] * 2.0) ** 2 * self.l_anchors[l_valid_indices, :]l_xyxy = np.concatenate([l_xy - l_wh * 0.5, l_xy + l_wh * 0.5], axis=-1)# 大中小特征层阈值筛选结果拼接xyxy = np.concatenate((s_xyxy, m_xyxy, l_xyxy), axis=0)scores = np.concatenate((s_scores, m_scores, l_scores), axis=0)ids = np.concatenate((s_ids, m_ids, l_ids), axis=0)elif method == 0:# 方法2:使用get_output获取输出print("\n=== 方法2: 使用get_output ===")if not self.inf.get_output():raise RuntimeError("获取输出失败")classes_scores = self.inf.outputs[0].data # (1, 80, 80, 18)bboxes = self.inf.outputs[1].data # (1, 40, 40, 18)print(f"classes_scores: shape={classes_scores.shape}")print(f"bboxes: shape={bboxes.shape}")# 直接使用4D数据# 每个网格有3个anchor,每个anchor预测6个值(4个框坐标+1个objectness+1个类别)batch, height, width, channels = classes_scores.shapenum_anchors = 3pred_per_anchor = 6scores_list = []boxes_list = []ids_list = []# 处理每个网格点for h in range(height):for w in range(width):for a in range(num_anchors):# 获取当前anchor的预测值start_idx = int(a * pred_per_anchor)box = classes_scores[0, h, w, start_idx:start_idx+4].copy() # 框坐标obj_score = float(classes_scores[0, h, w, start_idx+4]) # objectnesscls_score = float(classes_scores[0, h, w, start_idx+5]) # 类别分数# sigmoid激活obj_score = 1 / (1 + np.exp(-obj_score))cls_score = 1 / (1 + np.exp(-cls_score))score = obj_score * cls_score# 如果分数超过阈值,保存这个预测if score >= self.conf:# 解码框坐标box = 1 / (1 + np.exp(-box)) # sigmoidcx = float((box[0] * 2.0 + w - 0.5) * self.strides[0])cy = float((box[1] * 2.0 + h - 0.5) * self.strides[0])w_pred = float((box[2] * 2.0) ** 2 * self.anchors[0][a*2])h_pred = float((box[3] * 2.0) ** 2 * self.anchors[0][a*2+1])# 转换为xyxy格式x1 = cx - w_pred/2y1 = cy - h_pred/2x2 = cx + w_pred/2y2 = cy + h_pred/2boxes_list.append([x1, y1, x2, y2])scores_list.append(float(score)) # 确保是标量ids_list.append(0) # 假设只有一个类别if boxes_list:xyxy = np.array(boxes_list, dtype=np.float32)scores = np.array(scores_list, dtype=np.float32)ids = np.array(ids_list, dtype=np.int32)else:xyxy = np.array([], dtype=np.float32).reshape(0, 4)scores = np.array([], dtype=np.float32)ids = np.array([], dtype=np.int32)else:raise ValueError("method must be 0 or 1")# NMS处理indices = cv2.dnn.NMSBoxes(xyxy.tolist(), scores.tolist(), self.conf, self.iou)if len(indices) > 0:indices = np.array(indices).flatten()self.bboxes = (xyxy[indices] * np.array([self.x_scale, self.y_scale, self.x_scale, self.y_scale])).astype(np.int32)self.scores = scores[indices]self.ids = ids[indices]else:print("No detections after NMS")self.bboxes = np.array([], dtype=np.int32).reshape(0, 4)self.scores = np.array([], dtype=np.float32)self.ids = np.array([], dtype=np.int32)
(6)完成draw_detection()结果绘制函数
结果绘制函数比较简单,无非就是使用OpenCV将输入进来的检测结果绘制成检测框和类别信息,因此便不再详细叙述,直接贴出我的完整代码:
def draw_detection(self,img: np.array, box,score: float, class_id: int,labelname: list):x1, y1, x2, y2 = boxrdk_colors = [(255, 0, 0), # 红色(0, 255, 0), # 绿色(0, 0, 255), # 蓝色(255, 255, 0), # 黄色(255, 0, 255), # 紫色(0, 255, 255), # 青色]color = rdk_colors[class_id % len(rdk_colors)]cv2.rectangle(img, (x1, y1), (x2, y2), color, 2)label = f"{labelname[class_id]}: {score:.2f}"(label_width, label_height), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 1)label_x = x1label_y = y1 - 10 if y1 - 10 > label_height else y1 + 10cv2.rectangle(img, (label_x, label_y - label_height), (label_x + label_width, label_y + label_height), color, cv2.FILLED)cv2.putText(img, label, (label_x, label_y), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 0), 1, cv2.LINE_AA)
(7)完成detect_result()结果处理函数
接着我们完成detect_result()
函数,这个函数我设计是用来实现对推理结果的处理,但是示例中我们只用来保存检测结果和输出检测信息,具体代码如下:
def detect_result(self, img):if isinstance(img, str):draw_img = cv2.imread(img)else:draw_img = img.copy()for class_id, score, bbox in zip(self.ids, self.scores, self.bboxes):x1, y1, x2, y2 = bboxprint("(%d, %d, %d, %d) -> %s: %.2f"%(x1,y1,x2,y2, self.labelname[class_id], score))if self.is_save:self.draw_detection(draw_img, (x1, y1, x2, y2), score, class_id, self.labelname)if self.is_save:cv2.imwrite("result.jpg", draw_img)
(8)完成detect()检测函数
最后我们便可以来完成detect()
函数啦,经过我们上面的各种封装,我们只需要掉包就好啦,具体代码及推理API介绍如下:
def detect(self, img_path, method_pre=0, method_post=1):"""检测函数Args:img_path: 图片路径或图片数组method_pre: 预处理方法- 0: 使用read_input- 1: 使用read_img_to_nv12method_post: 后处理方法- '0': 使用get_output- '1': 使用get_infer_res_np_float32"""self.PreProcess(img_path, method=method_pre) # 预处理self.inf.forward(self.mode) # 推理self.PostProcess(method_post) # 后处理self.detect_result(img) #获得结果
(9)完成主函数main
最后我们来完成主函数部分,这部分也非常简单,大家直接看叭:
if __name__ == "__main__":labelname = ["tennis"]test_img = "/path/to/your/img"model_path = "/path/to/your/model"infer = BPU_Detect( model_path, labelname)infer.detect( test_img, method_pre=1, method_post=0)
那如果要实现实时推理我们该怎么办呢?很简单我们只需要在继承BPU_Detect
类的时候传入mode=True
参数就好啦,这样的化每次推理结束后便不会立即释放推理句柄啦!剩下的就只需要在循环中不断传入图像便可以得到结果啦!
infer = BPU_Detect( model_path, coconame, mode = True)
(10)完整代码示例
仅供参考!代码写的比较丑陋大家最好根据自己的需求自行修改
import cv2
import numpy as np
from scipy.special import softmax
from scipy.special import expit as sigmoid
from time import time
import bpu_infer_lib # Model Zoo Python API
from typing import Tuple
import osclass BPU_Detect:def __init__(self, model_path:str,labelnames:list,num_classes:int = None,conf:float = 0.45,iou:float = 0.45,anchors:np.array = np.array([[10,13, 16,30, 33,23], # P3/8[30,61, 62,45, 59,119], # P4/16[116,90, 156,198, 373,326], # P5/32]),strides = np.array([8, 16, 32]),mode:bool = False,is_save:bool = False):self.model = model_pathself.labelname = labelnamesself.inf = bpu_infer_lib.Infer(False)self.inf.load_model(self.model)self.conf = confself.iou = iouself.anchors = anchorsself.strides = stridesself.input_w = 640self.input_h = 640self.nc = num_classes if num_classes is not None else len(self.labelname)self.mode = modeself.is_save = is_saveself._init_grids()def _init_grids(self) :"""初始化特征图网格"""def _create_grid(stride: int) :"""创建单个stride的网格和anchors"""grid = np.stack([np.tile(np.linspace(0.5, self.input_w//stride - 0.5, self.input_w//stride), reps=self.input_h//stride),np.repeat(np.arange(0.5, self.input_h//stride + 0.5, 1), self.input_w//stride)], axis=0).transpose(1,0)grid = np.hstack([grid] * 3).reshape(-1, 2)anchors = np.tile(self.anchors[int(np.log2(stride/8))], self.input_w//stride * self.input_h//stride).reshape(-1, 2)return grid, anchors# 创建不同尺度的网格self.s_grid, self.s_anchors = _create_grid(self.strides[0])self.m_grid, self.m_anchors = _create_grid(self.strides[1]) self.l_grid, self.l_anchors = _create_grid(self.strides[2])print(f"网格尺寸: {self.s_grid.shape = } {self.m_grid.shape = } {self.l_grid.shape = }")print(f"Anchors尺寸: {self.s_anchors.shape = } {self.m_anchors.shape = } {self.l_anchors.shape = }")def bgr2nv12_opencv(self, image):height, width = image.shape[0], image.shape[1]area = height * widthyuv420p = cv2.cvtColor(image, cv2.COLOR_BGR2YUV_I420).reshape((area * 3 // 2,))y = yuv420p[:area]uv_planar = yuv420p[area:].reshape((2, area // 4))uv_packed = uv_planar.transpose((1, 0)).reshape((area // 2,))nv12 = np.zeros_like(yuv420p)nv12[:height * width] = ynv12[height * width:] = uv_packedreturn nv12def PreProcess(self, img, method=0):"""预处理函数Args:img: 输入图像或图片路径method: 选择使用的方法- 0: 使用read_input- 1: 使用read_img_to_nv12"""# 获取原始图片和尺寸if isinstance(img, str):# 输入是图片路径if method == 1:# 先获取原始图片尺寸orig_img = cv2.imread(img)if orig_img is None:raise ValueError(f"无法读取图片: {img}")img_h, img_w = orig_img.shape[0:2]self.inf.read_img_to_nv12(img, 0)# 使用API的方法直接读取elif method == 0:# method == 0,读取图片后处理orig_img = cv2.imread(img)if orig_img is None:raise ValueError(f"无法读取图片: {img}")input_tensor = cv2.resize(orig_img, (self.input_h, self.input_w))input_tensor = self.bgr2nv12_opencv(input_tensor)self.inf.read_input(input_tensor, 0)img_h, img_w = orig_img.shape[0:2]else:print("输入格式有误")return False# 计算缩放比例self.y_scale = img_h / self.input_hself.x_scale = img_w / self.input_wprint(f"原始尺寸: {img_w}x{img_h}, 输入尺寸: {self.input_w}x{self.input_h}")print(f"缩放比例: x_scale={self.x_scale}, y_scale={self.y_scale}")return Truedef PostPrecess(self, method=1):"""后处理函数Args:method: 选择使用的方法- '0': 使用get_output- '1': 使用get_infer_res_np_float32"""if method == 1 :# 方法1:使用get_infer_res_np_float32获取原始输出并处理print("\n=== 方法1: 使用get_infer_res_np_float32 ===")s_pred = self.inf.get_infer_res_np_float32(0)m_pred = self.inf.get_infer_res_np_float32(1)l_pred = self.inf.get_infer_res_np_float32(2)print(f"原始输出: {s_pred.shape = } {m_pred.shape = } {l_pred.shape = }")# reshapes_pred = s_pred.reshape([-1, (5 + self.nc)])m_pred = m_pred.reshape([-1, (5 + self.nc)])l_pred = l_pred.reshape([-1, (5 + self.nc)])print(f"Reshape后: {s_pred.shape = } {m_pred.shape = } {l_pred.shape = }")# classify: 利用numpy向量化操作完成阈值筛选s_raw_max_scores = np.max(s_pred[:, 5:], axis=1)s_max_scores = 1 / ((1 + np.exp(-s_pred[:, 4]))*(1 + np.exp(-s_raw_max_scores)))s_valid_indices = np.flatnonzero(s_max_scores >= self.conf)s_ids = np.argmax(s_pred[s_valid_indices, 5:], axis=1)s_scores = s_max_scores[s_valid_indices]m_raw_max_scores = np.max(m_pred[:, 5:], axis=1)m_max_scores = 1 / ((1 + np.exp(-m_pred[:, 4]))*(1 + np.exp(-m_raw_max_scores)))m_valid_indices = np.flatnonzero(m_max_scores >= self.conf)m_ids = np.argmax(m_pred[m_valid_indices, 5:], axis=1)m_scores = m_max_scores[m_valid_indices]l_raw_max_scores = np.max(l_pred[:, 5:], axis=1)l_max_scores = 1 / ((1 + np.exp(-l_pred[:, 4]))*(1 + np.exp(-l_raw_max_scores)))l_valid_indices = np.flatnonzero(l_max_scores >= self.conf)l_ids = np.argmax(l_pred[l_valid_indices, 5:], axis=1)l_scores = l_max_scores[l_valid_indices]# 特征解码s_dxyhw = 1 / (1 + np.exp(-s_pred[s_valid_indices, :4]))s_xy = (s_dxyhw[:, 0:2] * 2.0 + self.s_grid[s_valid_indices,:] - 1.0) * self.strides[0]s_wh = (s_dxyhw[:, 2:4] * 2.0) ** 2 * self.s_anchors[s_valid_indices, :]s_xyxy = np.concatenate([s_xy - s_wh * 0.5, s_xy + s_wh * 0.5], axis=-1)m_dxyhw = 1 / (1 + np.exp(-m_pred[m_valid_indices, :4]))m_xy = (m_dxyhw[:, 0:2] * 2.0 + self.m_grid[m_valid_indices,:] - 1.0) * self.strides[1]m_wh = (m_dxyhw[:, 2:4] * 2.0) ** 2 * self.m_anchors[m_valid_indices, :]m_xyxy = np.concatenate([m_xy - m_wh * 0.5, m_xy + m_wh * 0.5], axis=-1)l_dxyhw = 1 / (1 + np.exp(-l_pred[l_valid_indices, :4]))l_xy = (l_dxyhw[:, 0:2] * 2.0 + self.l_grid[l_valid_indices,:] - 1.0) * self.strides[2]l_wh = (l_dxyhw[:, 2:4] * 2.0) ** 2 * self.l_anchors[l_valid_indices, :]l_xyxy = np.concatenate([l_xy - l_wh * 0.5, l_xy + l_wh * 0.5], axis=-1)# 大中小特征层阈值筛选结果拼接xyxy = np.concatenate((s_xyxy, m_xyxy, l_xyxy), axis=0)scores = np.concatenate((s_scores, m_scores, l_scores), axis=0)ids = np.concatenate((s_ids, m_ids, l_ids), axis=0)elif method == 0:# 方法2:使用get_output获取输出print("\n=== 方法2: 使用get_output ===")if not self.inf.get_output():raise RuntimeError("获取输出失败")classes_scores = self.inf.outputs[0].data # (1, 80, 80, 18)bboxes = self.inf.outputs[1].data # (1, 40, 40, 18)print(f"classes_scores: shape={classes_scores.shape}")print(f"bboxes: shape={bboxes.shape}")# 直接使用4D数据# 每个网格有3个anchor,每个anchor预测6个值(4个框坐标+1个objectness+1个类别)batch, height, width, channels = classes_scores.shapenum_anchors = 3pred_per_anchor = 6scores_list = []boxes_list = []ids_list = []# 处理每个网格点for h in range(height):for w in range(width):for a in range(num_anchors):# 获取当前anchor的预测值start_idx = int(a * pred_per_anchor)box = classes_scores[0, h, w, start_idx:start_idx+4].copy() # 框坐标obj_score = float(classes_scores[0, h, w, start_idx+4]) # objectnesscls_score = float(classes_scores[0, h, w, start_idx+5]) # 类别分数# sigmoid激活obj_score = 1 / (1 + np.exp(-obj_score))cls_score = 1 / (1 + np.exp(-cls_score))score = obj_score * cls_score# 如果分数超过阈值,保存这个预测if score >= self.conf:# 解码框坐标box = 1 / (1 + np.exp(-box)) # sigmoidcx = float((box[0] * 2.0 + w - 0.5) * self.strides[0])cy = float((box[1] * 2.0 + h - 0.5) * self.strides[0])w_pred = float((box[2] * 2.0) ** 2 * self.anchors[0][a*2])h_pred = float((box[3] * 2.0) ** 2 * self.anchors[0][a*2+1])# 转换为xyxy格式x1 = cx - w_pred/2y1 = cy - h_pred/2x2 = cx + w_pred/2y2 = cy + h_pred/2boxes_list.append([x1, y1, x2, y2])scores_list.append(float(score)) # 确保是标量ids_list.append(0) # 假设只有一个类别if boxes_list:xyxy = np.array(boxes_list, dtype=np.float32)scores = np.array(scores_list, dtype=np.float32)ids = np.array(ids_list, dtype=np.int32)else:xyxy = np.array([], dtype=np.float32).reshape(0, 4)scores = np.array([], dtype=np.float32)ids = np.array([], dtype=np.int32)else:raise ValueError("method must be 0 or 1")# NMS处理indices = cv2.dnn.NMSBoxes(xyxy.tolist(), scores.tolist(), self.conf, self.iou)if len(indices) > 0:indices = np.array(indices).flatten()self.bboxes = (xyxy[indices] * np.array([self.x_scale, self.y_scale, self.x_scale, self.y_scale])).astype(np.int32)self.scores = scores[indices]self.ids = ids[indices]else:print("No detections after NMS")self.bboxes = np.array([], dtype=np.int32).reshape(0, 4)self.scores = np.array([], dtype=np.float32)self.ids = np.array([], dtype=np.int32)def draw_detection(self,img: np.array, box,score: float, class_id: int,labelname: list):x1, y1, x2, y2 = boxrdk_colors = [(255, 0, 0), # 红色(0, 255, 0), # 绿色(0, 0, 255), # 蓝色(255, 255, 0), # 黄色(255, 0, 255), # 紫色(0, 255, 255), # 青色]color = rdk_colors[class_id % len(rdk_colors)]cv2.rectangle(img, (x1, y1), (x2, y2), color, 2)label = f"{labelname[class_id]}: {score:.2f}"(label_width, label_height), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 1)label_x = x1label_y = y1 - 10 if y1 - 10 > label_height else y1 + 10cv2.rectangle(img, (label_x, label_y - label_height), (label_x + label_width, label_y + label_height), color, cv2.FILLED)cv2.putText(img, label, (label_x, label_y), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 0), 1, cv2.LINE_AA)def detect_result(self, img):if isinstance(img, str):draw_img = cv2.imread(img)else:draw_img = img.copy()for class_id, score, bbox in zip(self.ids, self.scores, self.bboxes):x1, y1, x2, y2 = bboxprint("(%d, %d, %d, %d) -> %s: %.2f"%(x1,y1,x2,y2, self.labelname[class_id], score))if self.is_save:self.draw_detection(draw_img, (x1, y1, x2, y2), score, class_id, self.labelname)if self.is_save:cv2.imwrite("result.jpg", draw_img)def detect(self, img, method_pre=0, method_post=1):"""检测函数Args:img_path: 图片路径或图片数组method_pre: 预处理方法- 0: 使用read_input(默认,读取图片后处理)- 1: 使用read_img_to_nv12(直接读取路径)method_post: 后处理方法"""# 预处理self.PreProcess(img, method=method_pre)# 推理和后处理self.inf.forward(self.mode)self.PostPrecess(method_post)self.detect_result(img)if __name__ == "__main__":coconame = ["tennis"]test_img = "/root/Deep_Learning/YOLOv5/imgs/tennis_1_frame_0001.jpg"model_path = "/root/Deep_Learning/YOLOv5/models/tennis_detect_640x640_bayese_.bin"infer = BPU_Detect(model_path,coconame,conf=0.1)infer.detect(test_img,method_pre=1,method_post=0)
二、BSP——hobot_dnn
在介绍了ModelZoo的推理方法后我们便能很快的完成BSP版本的使用啦,我们接下来将基于上述完成的ModelZoo代码主要介绍两版有变化的部分,个人感觉BSP的版本比较贴近C++版本的使用感觉
- 参考资料:4.1.6 模型推理接口说明 | RDK DOC
首先我们先查看一下官方手册,我们可以发现BSP版本主要是使用的Model
对象,它包含了inputs
、outputs
和forward
等成员和方法,因此首先我们导入pyeasy_dnn
包:
from hobot_dnn import pyeasy_dnn as dnn # BSP Python API
(1)修改BPU_Detect类及初始化部分
接着我们在类中使用pyeasy_dnn
的接口加载模型以及获取输入的张量格式
self.models = dnn.load(self.model_path)
self.model = self.models[0] # 获取第一个模型self.input_shape = self.model.inputs[0].properties.shape
self.input_w = self.input_shape[2] # NCHW格式
self.input_h = self.input_shape[3]
(2)修改预处理PreProcess部分
然后便是修改预处理部分,之前ModelZoo的版本需要使用read_input
函数接口来传入图像数据,但是BSP版的将图像数据作为了推理接口的参数,因此预处理部分只需要处理图像尺寸和图像格式即可,修改后的代码如下:
def PreProcess(self, img):"""预处理函数"""# 获取原始图片和尺寸if isinstance(img, str):# 输入是图片路径orig_img = cv2.imread(img)if orig_img is None:raise ValueError(f"无法读取图片: {img}")else:orig_img = imgimg_h, img_w = orig_img.shape[0:2]# 调整图像大小并转换为NV12格式input_tensor = cv2.resize(orig_img, (self.input_w, self.input_h))input_tensor = self.bgr2nv12_opencv(input_tensor)# 计算缩放比例self.y_scale = img_h / self.input_hself.x_scale = img_w / self.input_wprint(f"原始尺寸: {img_w}x{img_h}, 输入尺寸: {self.input_w}x{self.input_h}")print(f"缩放比例: x_scale={self.x_scale}, y_scale={self.y_scale}")return input_tensor
(3)修改后处理PostProcess部分
我们继续看到后处理部分,BSP版的后处理与ModelZoo版的get_infer_res_np_float32
接口类似,我们首先需要获取模型的输出:
outputs = self.model_outputs
接着我们从outputs
中获取三个尺度的输出即可,剩下的代码和ModelZoo版中get_infer_res_np_float32
的处理方式一样
# 处理三个输出层
s_pred = outputs[0].buffer.reshape([-1, (5 + self.nc)])
m_pred = outputs[1].buffer.reshape([-1, (5 + self.nc)])
l_pred = outputs[2].buffer.reshape([-1, (5 + self.nc)]
(4)修改推理detect()部分
最后我们只需要修改推理部分的推理接口就好啦,我们查看一下官方的API手册,推理部分的介绍如下:
据此我们可以知道我们只需要从预处理函数获取处理之后的输入数据input_tensor
后即可调用forward
进行推理啦,forward
的三个参数分别是输入数据、推理模型的BPU_ID和推理任务的优先级,具体代码如下:
def detect(self, img):"""检测主函数"""input_tensor = self.PreProcess(img)# 预处理self.model_outputs = self.model.forward(input_tensor)# 模型推理self.PostProcess()# 后处理self.detect_result(img)# 显示结果
(5)完整代码示例
import cv2
import numpy as np
from scipy.special import softmax
from scipy.special import expit as sigmoid
from time import time
from hobot_dnn import pyeasy_dnn as dnn # BSP Python API
from typing import Tuple
import osclass BPU_Detect:def __init__(self, model_path:str,labelnames:list,num_classes:int = None,conf:float = 0.45,iou:float = 0.45,anchors:np.array = np.array([[10,13, 16,30, 33,23], # P3/8[30,61, 62,45, 59,119], # P4/16[116,90, 156,198, 373,326], # P5/32]),strides = np.array([8, 16, 32]),mode:bool = False,is_save:bool = False):self.model_path = model_pathself.labelname = labelnames# 加载模型self.models = dnn.load(self.model_path)self.model = self.models[0] # 获取第一个模型self.conf = confself.iou = iouself.anchors = anchorsself.strides = strides# 从模型输入获取输入尺寸self.input_shape = self.model.inputs[0].properties.shapeself.input_w = self.input_shape[2] # NCHW格式self.input_h = self.input_shape[3]self.nc = num_classes if num_classes is not None else len(self.labelname)self.mode = modeself.is_save = is_saveself._init_grids()def _init_grids(self) :"""初始化特征图网格"""def _create_grid(stride: int) :"""创建单个stride的网格和anchors"""grid = np.stack([np.tile(np.linspace(0.5, self.input_w//stride - 0.5, self.input_w//stride), reps=self.input_h//stride),np.repeat(np.arange(0.5, self.input_h//stride + 0.5, 1), self.input_w//stride)], axis=0).transpose(1,0)grid = np.hstack([grid] * 3).reshape(-1, 2)anchors = np.tile(self.anchors[int(np.log2(stride/8))], self.input_w//stride * self.input_h//stride).reshape(-1, 2)return grid, anchors# 创建不同尺度的网格self.s_grid, self.s_anchors = _create_grid(self.strides[0])self.m_grid, self.m_anchors = _create_grid(self.strides[1]) self.l_grid, self.l_anchors = _create_grid(self.strides[2])print(f"网格尺寸: {self.s_grid.shape = } {self.m_grid.shape = } {self.l_grid.shape = }")print(f"Anchors尺寸: {self.s_anchors.shape = } {self.m_anchors.shape = } {self.l_anchors.shape = }")def bgr2nv12_opencv(self, image):"""将BGR图像转换为NV12格式"""height, width = image.shape[0], image.shape[1]area = height * widthyuv420p = cv2.cvtColor(image, cv2.COLOR_BGR2YUV_I420).reshape((area * 3 // 2,))y = yuv420p[:area]uv_planar = yuv420p[area:].reshape((2, area // 4))uv_packed = uv_planar.transpose((1, 0)).reshape((area // 2,))nv12 = np.zeros_like(yuv420p)nv12[:height * width] = ynv12[height * width:] = uv_packedreturn nv12def PreProcess(self, img):"""预处理函数"""# 获取原始图片和尺寸if isinstance(img, str):# 输入是图片路径orig_img = cv2.imread(img)if orig_img is None:raise ValueError(f"无法读取图片: {img}")else:orig_img = imgimg_h, img_w = orig_img.shape[0:2]# 调整图像大小并转换为NV12格式input_tensor = cv2.resize(orig_img, (self.input_w, self.input_h))input_tensor = self.bgr2nv12_opencv(input_tensor)# 计算缩放比例self.y_scale = img_h / self.input_hself.x_scale = img_w / self.input_wprint(f"原始尺寸: {img_w}x{img_h}, 输入尺寸: {self.input_w}x{self.input_h}")print(f"缩放比例: x_scale={self.x_scale}, y_scale={self.y_scale}")return input_tensordef PostProcess(self):"""后处理函数"""# 获取模型输出outputs = self.model_outputs# 处理三个输出层s_pred = outputs[0].buffer.reshape([-1, (5 + self.nc)])m_pred = outputs[1].buffer.reshape([-1, (5 + self.nc)])l_pred = outputs[2].buffer.reshape([-1, (5 + self.nc)])print(f"输出形状: {s_pred.shape = } {m_pred.shape = } {l_pred.shape = }")# 处理小特征图输出s_raw_max_scores = np.max(s_pred[:, 5:], axis=1)s_max_scores = 1 / ((1 + np.exp(-s_pred[:, 4]))*(1 + np.exp(-s_raw_max_scores)))s_valid_indices = np.flatnonzero(s_max_scores >= self.conf)s_ids = np.argmax(s_pred[s_valid_indices, 5:], axis=1)s_scores = s_max_scores[s_valid_indices]# 处理中特征图输出m_raw_max_scores = np.max(m_pred[:, 5:], axis=1)m_max_scores = 1 / ((1 + np.exp(-m_pred[:, 4]))*(1 + np.exp(-m_raw_max_scores)))m_valid_indices = np.flatnonzero(m_max_scores >= self.conf)m_ids = np.argmax(m_pred[m_valid_indices, 5:], axis=1)m_scores = m_max_scores[m_valid_indices]# 处理大特征图输出l_raw_max_scores = np.max(l_pred[:, 5:], axis=1)l_max_scores = 1 / ((1 + np.exp(-l_pred[:, 4]))*(1 + np.exp(-l_raw_max_scores)))l_valid_indices = np.flatnonzero(l_max_scores >= self.conf)l_ids = np.argmax(l_pred[l_valid_indices, 5:], axis=1)l_scores = l_max_scores[l_valid_indices]# 特征解码s_dxyhw = 1 / (1 + np.exp(-s_pred[s_valid_indices, :4]))s_xy = (s_dxyhw[:, 0:2] * 2.0 + self.s_grid[s_valid_indices,:] - 1.0) * self.strides[0]s_wh = (s_dxyhw[:, 2:4] * 2.0) ** 2 * self.s_anchors[s_valid_indices, :]s_xyxy = np.concatenate([s_xy - s_wh * 0.5, s_xy + s_wh * 0.5], axis=-1)m_dxyhw = 1 / (1 + np.exp(-m_pred[m_valid_indices, :4]))m_xy = (m_dxyhw[:, 0:2] * 2.0 + self.m_grid[m_valid_indices,:] - 1.0) * self.strides[1]m_wh = (m_dxyhw[:, 2:4] * 2.0) ** 2 * self.m_anchors[m_valid_indices, :]m_xyxy = np.concatenate([m_xy - m_wh * 0.5, m_xy + m_wh * 0.5], axis=-1)l_dxyhw = 1 / (1 + np.exp(-l_pred[l_valid_indices, :4]))l_xy = (l_dxyhw[:, 0:2] * 2.0 + self.l_grid[l_valid_indices,:] - 1.0) * self.strides[2]l_wh = (l_dxyhw[:, 2:4] * 2.0) ** 2 * self.l_anchors[l_valid_indices, :]l_xyxy = np.concatenate([l_xy - l_wh * 0.5, l_xy + l_wh * 0.5], axis=-1)# 合并所有预测结果xyxy = np.concatenate((s_xyxy, m_xyxy, l_xyxy), axis=0)scores = np.concatenate((s_scores, m_scores, l_scores), axis=0)ids = np.concatenate((s_ids, m_ids, l_ids), axis=0)# NMS处理indices = cv2.dnn.NMSBoxes(xyxy.tolist(), scores.tolist(), self.conf, self.iou)if len(indices) > 0:indices = np.array(indices).flatten()self.bboxes = (xyxy[indices] * np.array([self.x_scale, self.y_scale, self.x_scale, self.y_scale])).astype(np.int32)self.scores = scores[indices]self.ids = ids[indices]else:print("未检测到目标")self.bboxes = np.array([], dtype=np.int32).reshape(0, 4)self.scores = np.array([], dtype=np.float32)self.ids = np.array([], dtype=np.int32)def draw_detection(self, img: np.array, box,score: float, class_id: int,labelname: list):"""绘制检测结果"""x1, y1, x2, y2 = boxrdk_colors = [(255, 0, 0), # 红色(0, 255, 0), # 绿色(0, 0, 255), # 蓝色(255, 255, 0), # 黄色(255, 0, 255), # 紫色(0, 255, 255), # 青色]color = rdk_colors[class_id % len(rdk_colors)]cv2.rectangle(img, (x1, y1), (x2, y2), color, 2)label = f"{labelname[class_id]}: {score:.2f}"(label_width, label_height), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 1)label_x = x1label_y = y1 - 10 if y1 - 10 > label_height else y1 + 10cv2.rectangle(img, (label_x, label_y - label_height), (label_x + label_width, label_y + label_height), color, cv2.FILLED)cv2.putText(img, label, (label_x, label_y), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 0), 1, cv2.LINE_AA)def detect_result(self, img):"""显示检测结果"""if isinstance(img, str):draw_img = cv2.imread(img)else:draw_img = img.copy()for class_id, score, bbox in zip(self.ids, self.scores, self.bboxes):x1, y1, x2, y2 = bboxprint("(%d, %d, %d, %d) -> %s: %.2f"%(x1,y1,x2,y2, self.labelname[class_id], score))if self.is_save:self.draw_detection(draw_img, (x1, y1, x2, y2), score, class_id, self.labelname)if self.is_save:cv2.imwrite("result.jpg", draw_img)def detect(self, img):"""检测主函数"""input_tensor = self.PreProcess(img)# 预处理self.model_outputs = self.model.forward(input_tensor)# 模型推理self.PostProcess()# 后处理self.detect_result(img)# 显示结果if __name__ == "__main__":coconame = ["tennis"]test_img = "/root/Deep_Learning/YOLOv5/imgs/tennis_1_frame_0001.jpg"model_path = "/root/Deep_Learning/YOLOv5/models/tennis_detect_640x640_bayese_.bin"infer = BPU_Detect(model_path, coconame, conf=0.1, is_save=True)infer.detect(test_img)
至此,我们便完成了RDK系列ModelZoo和BSP两种版本的推理介绍啦!!!