python ./tools/deploy.py ./configs/mmpose/pose-detection_onnxruntime_static.py 网络配置文件地址 要转的checkpoint地址 测试图片地址 --work-dir 存放目录





import os
import urllib
import traceback
import time
import sys
import warningsimport numpy as np
import cv2# RKNN_MODEL = "hrnet_w32_macaque_256x192-f7e9e04f_20230208.rknn"mean = [0.485, 0.456, 0.406]
std = [0.229, 0.224, 0.225]
QUANTIZE_ON = Truedef bbox_xywh2cs(bbox, aspect_ratio, padding=1., pixel_std=200.):"""Transform the bbox format from (x,y,w,h) into (center, scale)Args:bbox (ndarray): Single bbox in (x, y, w, h)aspect_ratio (float): The expected bbox aspect ratio (w over h)padding (float): Bbox padding factor that will be multilied to scale.Default: 1.0pixel_std (float): The scale normalization factor. Default: 200.0Returns:tuple: A tuple containing center and scale.- np.ndarray[float32](2,): Center of the bbox (x, y).- np.ndarray[float32](2,): Scale of the bbox w & h."""x, y, w, h = bbox[:4]center = np.array([x + w * 0.5, y + h * 0.5], dtype=np.float32)if w > aspect_ratio * h:h = w * 1.0 / aspect_ratioelif w < aspect_ratio * h:w = h * aspect_ratioscale = np.array([w, h], dtype=np.float32) / pixel_stdscale = scale * paddingreturn center, scaledef rotate_point(pt, angle_rad):"""Rotate a point by an angle.Args:pt (list[float]): 2 dimensional point to be rotatedangle_rad (float): rotation angle by radianReturns:list[float]: Rotated point."""assert len(pt) == 2sn, cs = np.sin(angle_rad), np.cos(angle_rad)new_x = pt[0] * cs - pt[1] * snnew_y = pt[0] * sn + pt[1] * csrotated_pt = [new_x, new_y]return rotated_ptdef _get_3rd_point(a, b):"""To calculate the affine matrix, three pairs of points are required. Thisfunction is used to get the 3rd point, given 2D points a & b.The 3rd point is defined by rotating vector `a - b` by 90 degreesanticlockwise, using b as the rotation center.Args:a (np.ndarray): point(x,y)b (np.ndarray): point(x,y)Returns:np.ndarray: The 3rd point."""assert len(a) == 2assert len(b) == 2direction = a - bthird_pt = b + np.array([-direction[1], direction[0]], dtype=np.float32)return third_ptdef get_affine_transform(center,scale,rot,output_size,shift=(0., 0.),inv=False):"""Get the affine transform matrix, given the center/scale/rot/output_size.Args:center (np.ndarray[2, ]): Center of the bounding box (x, y).scale (np.ndarray[2, ]): Scale of the bounding boxwrt [width, height].rot (float): Rotation angle (degree).output_size (np.ndarray[2, ] | list(2,)): Size of thedestination heatmaps.shift (0-100%): Shift translation ratio wrt the width/height.Default (0., 0.).inv (bool): Option to inverse the affine transform direction.(inv=False: src->dst or inv=True: dst->src)Returns:np.ndarray: The transform matrix."""assert len(center) == 2assert len(scale) == 2assert len(output_size) == 2assert len(shift) == 2# pixel_std is 200.scale_tmp = scale * 200.0shift = np.array(shift)src_w = scale_tmp[0]dst_w = output_size[0]dst_h = output_size[1]rot_rad = np.pi * rot / 180src_dir = rotate_point([0., src_w * -0.5], rot_rad)dst_dir = np.array([0., dst_w * -0.5])src = np.zeros((3, 2), dtype=np.float32)src[0, :] = center + scale_tmp * shiftsrc[1, :] = center + src_dir + scale_tmp * shiftsrc[2, :] = _get_3rd_point(src[0, :], src[1, :])dst = np.zeros((3, 2), dtype=np.float32)dst[0, :] = [dst_w * 0.5, dst_h * 0.5]dst[1, :] = np.array([dst_w * 0.5, dst_h * 0.5]) + dst_dirdst[2, :] = _get_3rd_point(dst[0, :], dst[1, :])if inv:trans = cv2.getAffineTransform(np.float32(dst), np.float32(src))else:trans = cv2.getAffineTransform(np.float32(src), np.float32(dst))return transdef bbox_xyxy2xywh(bbox_xyxy):"""Transform the bbox format from x1y1x2y2 to xywh.Args:bbox_xyxy (np.ndarray): Bounding boxes (with scores), shaped (n, 4) or(n, 5). (left, top, right, bottom, [score])Returns:np.ndarray: Bounding boxes (with scores),shaped (n, 4) or (n, 5). (left, top, width, height, [score])"""bbox_xywh = bbox_xyxy.copy()bbox_xywh[:, 2] = bbox_xywh[:, 2] - bbox_xywh[:, 0]bbox_xywh[:, 3] = bbox_xywh[:, 3] - bbox_xywh[:, 1]return bbox_xywhdef _get_max_preds(heatmaps):"""Get keypoint predictions from score maps.Note:batch_size: Nnum_keypoints: Kheatmap height: Hheatmap width: WArgs:heatmaps (np.ndarray[N, K, H, W]): model predicted heatmaps.Returns:tuple: A tuple containing aggregated results.- preds (np.ndarray[N, K, 2]): Predicted keypoint location.- maxvals (np.ndarray[N, K, 1]): Scores (confidence) of the keypoints."""assert isinstance(heatmaps,np.ndarray), ('heatmaps should be numpy.ndarray')assert heatmaps.ndim == 4, 'batch_images should be 4-ndim'N, K, _, W = heatmaps.shapeheatmaps_reshaped = heatmaps.reshape((N, K, -1))idx = np.argmax(heatmaps_reshaped, 2).reshape((N, K, 1))maxvals = np.amax(heatmaps_reshaped, 2).reshape((N, K, 1))preds = np.tile(idx, (1, 1, 2)).astype(np.float32)preds[:, :, 0] = preds[:, :, 0] % Wpreds[:, :, 1] = preds[:, :, 1] // Wpreds = np.where(np.tile(maxvals, (1, 1, 2)) > 0.0, preds, -1)return preds, maxvalsdef transform_preds(coords, center, scale, output_size, use_udp=False):"""Get final keypoint predictions from heatmaps and apply scaling andtranslation to map them back to the image.Note:num_keypoints: KArgs:coords (np.ndarray[K, ndims]):* If ndims=2, corrds are predicted keypoint location.* If ndims=4, corrds are composed of (x, y, scores, tags)* If ndims=5, corrds are composed of (x, y, scores, tags,flipped_tags)center (np.ndarray[2, ]): Center of the bounding box (x, y).scale (np.ndarray[2, ]): Scale of the bounding boxwrt [width, height].output_size (np.ndarray[2, ] | list(2,)): Size of thedestination heatmaps.use_udp (bool): Use unbiased data processingReturns:np.ndarray: Predicted coordinates in the images."""assert coords.shape[1] in (2, 4, 5)assert len(center) == 2assert len(scale) == 2assert len(output_size) == 2# Recover the scale which is normalized by a factor of 200.scale = scale * 200.0if use_udp:scale_x = scale[0] / (output_size[0] - 1.0)scale_y = scale[1] / (output_size[1] - 1.0)else:scale_x = scale[0] / output_size[0]scale_y = scale[1] / output_size[1]target_coords = np.ones_like(coords)target_coords[:, 0] = coords[:, 0] * scale_x + center[0] - scale[0] * 0.5target_coords[:, 1] = coords[:, 1] * scale_y + center[1] - scale[1] * 0.5return target_coordsdef keypoints_from_heatmaps(heatmaps,center,scale,unbiased=False,post_process='default',kernel=11,valid_radius_factor=0.0546875,use_udp=False,target_type='GaussianHeatmap'):# Avoid being affectedheatmaps = heatmaps.copy()N, K, H, W = heatmaps.shapepreds, maxvals = _get_max_preds(heatmaps)# add +/-0.25 shift to the predicted locations for higher acc.for n in range(N):for k in range(K):heatmap = heatmaps[n][k]px = int(preds[n][k][0])py = int(preds[n][k][1])if 1 < px < W - 1 and 1 < py < H - 1:diff = np.array([heatmap[py][px + 1] - heatmap[py][px - 1],heatmap[py + 1][px] - heatmap[py - 1][px]])preds[n][k] += np.sign(diff) * .25if post_process == 'megvii':preds[n][k] += 0.5# Transform back to the imagefor i in range(N):preds[i] = transform_preds(preds[i], center[i], scale[i], [W, H], use_udp=use_udp)if post_process == 'megvii':maxvals = maxvals / 255.0 + 0.5return preds, maxvalsdef decode(output, center, scale, score_, batch_size=1):c = np.zeros((batch_size, 2), dtype=np.float32)s = np.zeros((batch_size, 2), dtype=np.float32)score = np.ones(batch_size)for i in range(batch_size):c[i, :] = centers[i, :] = scale#score[i] = np.array(score_).reshape(-1)score[i] = score_preds, maxvals = keypoints_from_heatmaps(output,c,s,False,'default',11,0.0546875,False,'GaussianHeatmap')all_preds = np.zeros((batch_size, preds.shape[1], 3), dtype=np.float32)all_boxes = np.zeros((batch_size, 6), dtype=np.float32)all_preds[:, :, 0:2] = preds[:, :, 0:2]all_preds[:, :, 2:3] = maxvalsall_boxes[:, 0:2] = c[:, 0:2]all_boxes[:, 2:4] = s[:, 0:2]all_boxes[:, 4] = np.prod(s * 200.0, axis=1)all_boxes[:, 5] = scoreresult = {}result['preds'] = all_predsresult['boxes'] = all_boxesprint(result)return resultdef draw(bgr, predict_dict, skeleton):bboxes = predict_dict["boxes"]for box in bboxes:cv2.rectangle(bgr, (int(box[0]), int(box[1])), (int(box[0]) + int(box[2]), int(box[1]) + int(box[3])),(255, 0, 0))all_preds = predict_dict["preds"]for all_pred in all_preds:for x, y, s in all_pred:cv2.circle(bgr, (int(x), int(y)), 3, (0, 255, 120), -1)for sk in skeleton:x0 = int(all_pred[sk[0]][0])y0 = int(all_pred[sk[0]][1])x1 = int(all_pred[sk[1]][0])y1 = int(all_pred[sk[1]][1])cv2.line(bgr, (x0, y0), (x1, y1), (0, 255, 0), 1)cv2.imwrite("t1.jpg", bgr)def myFunc00(rknn_lite, IMG):# bbox = [450, 150, 1100, 550, 0.99]# bbox = [0, 0, 3840, 2160, 0.99]bbox = [1428, 723, 1421, 847, 0.99]image_size = [384, 288]# img = src_imgimg = cv2.cvtColor(IMG, cv2.COLOR_BGR2RGB)  # hwc rgbaspect_ratio = image_size[0] / image_size[1]img_height = img.shape[0]img_width = img.shape[1]padding = 1.25pixel_std = 200center, scale = bbox_xywh2cs(bbox,aspect_ratio,padding,pixel_std)trans = get_affine_transform(center, scale, 0, image_size)img = cv2.warpAffine(  # 旋转后加入了黑边 最后生成的点的坐标也要对齐img,trans, (int(image_size[0]), int(image_size[1])),flags=cv2.INTER_LINEAR)print(trans)img = np.transpose(img, (2, 0, 1)).astype(np.float32)  # chw rgb# outputs = rknn.inference(inputs=[img], data_type=None, data_format="nchw")[0]# img[0, ...] = ((img[0, ...] / 255.0) - 0.485) / 0.229# img[1, ...] = ((img[1, ...] / 255.0) - 0.456) / 0.224# img[2, ...] = ((img[2, ...] / 255.0) - 0.406) / 0.225img = np.transpose(img, (1, 2, 0)).astype(np.float32)  # chw rgb# img = img.reshape(1,256,192,3)# Inferenceprint("--> Running model")start = time.time()img = np.expand_dims(img, axis=0)outputs = rknn_lite.inference(inputs=[img])[0]end = time.time()# 计算运行时间runTime = end - startrunTime_ms = runTime * 1000# 输出运行时间print("运行时间:", runTime_ms, "毫秒")print(outputs)predict_dict = decode(outputs, center, scale, bbox[-1])skeleton = [[15, 13], [13, 11], [16, 14], [14, 12], [11, 12], [5, 11], [6, 12], [5, 6], [5, 7], [6, 8], [7, 9],[8, 10], [1, 2], [0, 1], [0, 2], [1, 3], [2, 4], [3, 5], [4, 6]]draw(IMG, predict_dict, skeleton)return IMG



from queue import Queue
from rknnlite.api import RKNNLite
from concurrent.futures import ThreadPoolExecutor, as_completeddef initRKNN(rknnModel="./Models/mylightpose_288.rknn", id=0):rknn_lite = RKNNLite()ret = rknn_lite.load_rknn(rknnModel)if ret != 0:print("Load RKNN rknnModel failed")exit(ret)if id == 0:ret = rknn_lite.init_runtime(core_mask=RKNNLite.NPU_CORE_0)  # 初始化模型运行环境,每个线程选择不同的核心elif id == 1:ret = rknn_lite.init_runtime(core_mask=RKNNLite.NPU_CORE_1)elif id == 2:ret = rknn_lite.init_runtime(core_mask=RKNNLite.NPU_CORE_2)elif id == -1:ret = rknn_lite.init_runtime(core_mask=RKNNLite.NPU_CORE_0_1_2)else:ret = rknn_lite.init_runtime()if ret != 0:print("Init runtime environment failed")exit(ret)print(rknnModel, "\t\tdone")return rknn_litedef initRKNNs(rknnModel="./Models/mylightpose_288.rknn", TPEs=1):rknn_list = []for i in range(TPEs):rknn_list.append(initRKNN(rknnModel, i % 3))  # 3核return rknn_listclass rknnPoolExecutor():  # 管理RKNN模型的多线程推理def __init__(self, rknnModel, TPEs, func):self.TPEs = TPEsself.queue = Queue()self.rknnPool = initRKNNs(rknnModel, TPEs)  # 为每个线程初始化模型运行环境self.pool = ThreadPoolExecutor(max_workers=TPEs)  # 创建线程池self.func = func  # 推理函数self.num = 0   # 提交任务的计数,用于实现模型实例的轮询分配def put(self, frame):  # 提交任务self.queue.put(self.pool.submit(self.func, self.rknnPool[self.num % self.TPEs], frame))  # 通过轮询实现任务在不同的RKNN模型实例间均匀分配self.num += 1def get(self):  # 获取任务结果 if self.queue.empty():return None, Falsetemp = []temp.append(self.queue.get())for frame in as_completed(temp):return frame.result(), Truedef release(self):  # 释放资源self.pool.shutdown()for rknn_lite in self.rknnPool:rknn_lite.release()


import cv2
import time
from rknn_pool_executor import rknnPoolExecutor
# 图像处理函数,实际应用过程中需要自行修改
from hrnet_inference import myFunc00cap = cv2.VideoCapture('./input/out_240715151339.mp4')
# cap = cv2.VideoCapture(0)RKNN_MODEL = './models/test.rknn'# 线程数
TPEs = 6
# 初始化rknn池
pool = rknnPoolExecutor(rknnModel=RKNN_MODEL, TPEs=TPEs, func=myFunc00)
#pool = rknnPoolExecutor(rknnModel=modelPath, TPEs=TPEs, func=myFunc01)# 初始化异步所需要的帧
if (cap.isOpened()):for i in range(TPEs + 1):ret, frame = cap.read()if not ret:cap.release()del poolexit(-1)pool.put(frame)frames, loopTime, initTime = 0, time.time(), time.time()
pTime = 0
while (cap.isOpened()):frames += 1ret, frame = cap.read()if not ret:break# frame = frame[150:700, 450:1550, :]frame = cv2.imread("./input/240513_00000741.jpg")pool.put(frame)frame, flag = pool.get()if flag == False:breakcTime = time.time()fps = 1 / (cTime - pTime)pTime = cTimecv2.putText(frame, str(int(fps)), (50, 50), cv2.FONT_HERSHEY_PLAIN, 3, (0, 255, 0), 3)cv2.imshow('test', frame)input()if cv2.waitKey(1) & 0xFF == ord('q'):breakif frames % 30 == 0:print("30帧平均帧率:\t", 30 / (time.time() - loopTime), "帧")loopTime = time.time()print("总平均帧率\t", frames / (time.time() - initTime))
# 释放cap和rknn线程池


