这篇文章将结合之前写的两篇文章 无人机实战系列(一)在局域网内传输数据 和 无人机实战系列(二)本地摄像头 + Depth-Anything V2 实现了以下功能:
- 本地笔记本摄像头发布图像 + 远程GPU实时处理(无回传);
- 【异步】本地笔记本摄像头发布图像 + 远程GPU实时处理(回传至笔记本并展示);
- 【同步】本地笔记本摄像头发布图像 + 远程GPU实时处理(回传至笔记本并展示);
建议在运行这个demo之前先查看先前的两篇文章以熟悉 zmq 库与 Depth-Anything V2 这个模型;
这里之所以提供两个在是否回传上的demo是因为你需要根据自己的实际状况进行选择,尽管回传并显示深度图能够更加直观查看计算结果但你仍然需要平衡以下两个方面:
- 深度本身体积较大,回传会占用通讯带宽;
- 回传的图像本地显示会占用本地算力;
【注意】:这篇文章的代码需要在 无人机实战系列(二)本地摄像头 + Depth-Anything V2 中的文件夹下运行,否则会报错找不到对应的文件与模型。
本地笔记本摄像头发布图像 + 远程GPU实时处理(无回传)
这个demo实现了本地笔记本打开摄像头后将图像发布出去,远程GPU服务器接受到图像后使用 Depth-Anything V2 处理图像并展示。
本地笔记本发布摄像头图像
在下面的代码中有以下几点需要注意:
- 设置发布频率
send_fps
,较低的发布频率可以让减少GPU端的压力; - 设置发布队列大小
socket.setsockopt(zmq.SNDHWM, 1)
,让发布队列始终仅有当前帧画面,降低带宽压力; - 设置仅保存最新消息
socket.setsockopt(zmq.CONFLATE, 1)
,让发布队列仅存储最新的画面,降低接受端的画面延迟;
import zmq
import cv2
import timecontext = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://*:5555") # 本地绑定端口socket.setsockopt(zmq.SNDHWM, 1) # 发送队列大小为1
socket.setsockopt(zmq.CONFLATE, 1) # 仅保存最新消息cap = cv2.VideoCapture(0) # 读取摄像头send_fps = 30 # 限制传输的fps,降低接受方的处理压力while True:start_time = time.time()ret, frame = cap.read()if not ret:continue_, buffer = cv2.imencode('.jpg', frame) # 编码成JPEG格式socket.send(buffer.tobytes()) # 发送图像数据cv2.imshow("Origin image", frame)if cv2.waitKey(1) & 0xFF == ord('q'):breaktime.sleep(max(1/send_fps - (time.time() - start_time), 0))
运行:
$ python camera_pub.py
GPU 服务器接受端
在下面的代码中有以下几点需要注意:
- 绑定发布端地址
socket.connect("tcp://192.168.75.201:5555")
,根据你笔记本的地址进行修改; - 仅接受最新消息
socket.setsockopt(zmq.CONFLATE, 1)
; - 清空旧数据帧
socket.setsockopt(zmq.RCVHWM, 1)
&&socket.poll(1)
;
import argparse
import cv2
import numpy as np
import torch
import time
import zmqfrom depth_anything_v2.dpt import DepthAnythingV2context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect("tcp://192.168.75.201:5555") # 远程发布端地址
socket.setsockopt(zmq.SUBSCRIBE, b"")
socket.setsockopt(zmq.CONFLATE, 1) # 仅接受最新消息
socket.setsockopt(zmq.RCVHWM, 1) # 清空旧数据帧if __name__ == '__main__':parser = argparse.ArgumentParser(description='Depth Anything V2')parser.add_argument('--input-size', type=int, default=518)parser.add_argument('--encoder', type=str, default='vits', choices=['vits', 'vitb', 'vitl', 'vitg'])parser.add_argument('--pred-only', action='store_true', help='only display the prediction')parser.add_argument('--grayscale', action='store_true', help='do not apply colorful palette')args = parser.parse_args()DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu'model_configs = {'vits': {'encoder': 'vits', 'features': 64, 'out_channels': [48, 96, 192, 384]},'vitb': {'encoder': 'vitb', 'features': 128, 'out_channels': [96, 192, 384, 768]},'vitl': {'encoder': 'vitl', 'features': 256, 'out_channels': [256, 512, 1024, 1024]},'vitg': {'encoder': 'vitg', 'features': 384, 'out_channels': [1536, 1536, 1536, 1536]}}depth_anything = DepthAnythingV2(**model_configs[args.encoder])depth_anything.load_state_dict(torch.load(f'./models/depth_anything_v2_{args.encoder}.pth', map_location='cpu'))depth_anything = depth_anything.to(DEVICE).eval()margin_width = 50while True:start_time = time.time()# **优化 1: ZMQ 数据接收**try:while socket.poll(1): # 尝试不断读取新数据,丢弃旧数据msg = socket.recv(zmq.NOBLOCK)zmq_time = time.time()# **优化 2: OpenCV 解码**raw_frame = cv2.imdecode(np.frombuffer(msg, dtype=np.uint8), 1)decode_time = time.time()# **优化 3: 模型推理**with torch.no_grad():depth = depth_anything.infer_image(raw_frame, args.input_size)infer_time = time.time()# **优化 4: 归一化 + OpenCV 伪彩色映射**depth = ((depth - depth.min()) / (depth.max() - depth.min()) * 255).astype(np.uint8)if args.grayscale:depth = np.repeat(depth[..., np.newaxis], 3, axis=-1)else:depth = cv2.applyColorMap(depth, cv2.COLORMAP_JET)process_time = time.time()# **优化 5: 合并图像**split_region = np.ones((raw_frame.shape[0], margin_width, 3), dtype=np.uint8) * 255combined_frame = cv2.hconcat([raw_frame, split_region, depth])cv2.imshow('Raw Frame and Depth Prediction', combined_frame)if cv2.waitKey(1) & 0xFF == ord('q'):breakprint(f"[{args.encoder}] Frame cost time: {time.time() - start_time:.4f} s")print(f" ZMQ receive: {zmq_time - start_time:.4f} s")print(f" Decode: {decode_time - zmq_time:.4f} s")print(f" Inference: {infer_time - decode_time:.4f} s")print(f" Processing: {process_time - infer_time:.4f} s")except zmq.Again:print("No msg received, skip...")continue # 没有消息就跳过cv2.destroyAllWindows()
运行:
$ python camera_recv.py
【异步】本地笔记本摄像头发布图像 + 远程GPU实时处理(回传至笔记本并展示)
和上面的代码基本一致,只不过在发送与接收端都增加了一个收发对象,通常情况下使用异步方式处理收发因为可以避免一端服务来不及处理而导致另一端持续等待。
本地笔记本发布摄像头图像
import zmq
import cv2
import numpy as np
import timecontext = zmq.Context()# 发布原始数据
pub_socket = context.socket(zmq.PUB)
pub_socket.bind("tcp://*:5555") # 发布数据# 接收处理结果
pull_socket = context.socket(zmq.PULL)
pull_socket.bind("tcp://*:5556") # 监听处理方返回数据send_fps = 30cap = cv2.VideoCapture(0)while True:start_time = time.time()ret, frame = cap.read()if not ret:continue# [可选] 图像降采样frame = cv2.pyrDown(frame)frame = cv2.pyrDown(frame)_, buffer = cv2.imencode('.jpg', frame) # 压缩图像pub_socket.send(buffer.tobytes()) # 发布数据# 非阻塞接收处理结果try:processed_data = pull_socket.recv(zmq.NOBLOCK)processed_frame = cv2.imdecode(np.frombuffer(processed_data, dtype=np.uint8), 1)except zmq.Again:print("No image received, continue...")continuecv2.imshow("Processed Frame", processed_frame)if cv2.waitKey(1) & 0xFF == ord('q'):breaktime.sleep(max(1/send_fps - (time.time() - start_time), 0))cv2.destroyAllWindows()
运行:
$ python camera_pub_async.py
GPU 服务器接受端(异步)
import argparse
import cv2
import numpy as np
import torch
import time
import zmqfrom depth_anything_v2.dpt import DepthAnythingV2context = zmq.Context()
sub_socket = context.socket(zmq.SUB)
sub_socket.connect("tcp://192.168.75.201:5555")
sub_socket.setsockopt(zmq.SUBSCRIBE, b"")
sub_socket.setsockopt(zmq.CONFLATE, 1) # 仅接受最新消息
sub_socket.setsockopt(zmq.RCVHWM, 1) # 清空旧数据帧# 发送处理结果
push_socket = context.socket(zmq.PUSH)
push_socket.connect("tcp://192.168.75.201:5556")if __name__ == '__main__':parser = argparse.ArgumentParser(description='Depth Anything V2')parser.add_argument('--input-size', type=int, default=518)parser.add_argument('--encoder', type=str, default='vits', choices=['vits', 'vitb', 'vitl', 'vitg'])parser.add_argument('--pred-only', action='store_true', help='only display the prediction')parser.add_argument('--grayscale', action='store_true', help='do not apply colorful palette')args = parser.parse_args()DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu'model_configs = {'vits': {'encoder': 'vits', 'features': 64, 'out_channels': [48, 96, 192, 384]},'vitb': {'encoder': 'vitb', 'features': 128, 'out_channels': [96, 192, 384, 768]},'vitl': {'encoder': 'vitl', 'features': 256, 'out_channels': [256, 512, 1024, 1024]},'vitg': {'encoder': 'vitg', 'features': 384, 'out_channels': [1536, 1536, 1536, 1536]}}depth_anything = DepthAnythingV2(**model_configs[args.encoder])depth_anything.load_state_dict(torch.load(f'./models/depth_anything_v2_{args.encoder}.pth', map_location='cpu'))depth_anything = depth_anything.to(DEVICE).eval()margin_width = 50while True:start_time = time.time()# **优化 1: ZMQ 数据接收**try:while sub_socket.poll(1): # 尝试不断读取新数据,丢弃旧数据msg = sub_socket.recv(zmq.NOBLOCK)msg = sub_socket.recv()zmq_time = time.time()# **优化 2: OpenCV 解码**raw_frame = cv2.imdecode(np.frombuffer(msg, dtype=np.uint8), 1)decode_time = time.time()# **优化 3: 模型推理**with torch.no_grad():depth = depth_anything.infer_image(raw_frame, args.input_size)infer_time = time.time()# **优化 4: 归一化 + OpenCV 伪彩色映射**depth = ((depth - depth.min()) / (depth.max() - depth.min()) * 255).astype(np.uint8)if args.grayscale:depth = np.repeat(depth[..., np.newaxis], 3, axis=-1)else:depth = cv2.applyColorMap(depth, cv2.COLORMAP_JET)process_time = time.time()# **优化 5: 合并图像**split_region = np.ones((raw_frame.shape[0], margin_width, 3), dtype=np.uint8) * 255combined_frame = cv2.hconcat([raw_frame, split_region, depth])cv2.imshow('Raw Frame and Depth Prediction', combined_frame)if cv2.waitKey(1) & 0xFF == ord('q'):breakprint(f"[{args.encoder}] Frame cost time: {time.time() - start_time:.4f} s")print(f" ZMQ receive: {zmq_time - start_time:.4f} s")print(f" Decode: {decode_time - zmq_time:.4f} s")print(f" Inference: {infer_time - decode_time:.4f} s")print(f" Processing: {process_time - infer_time:.4f} s")_, buffer = cv2.imencode('.jpg', combined_frame)push_socket.send(buffer.tobytes()) # 发送回处理结果except zmq.Again:print("No msg received, skip...")continue # 没有消息就跳过cv2.destroyAllWindows()
运行:
$ python camera_recv_async.py
【同步】本地笔记本摄像头发布图像 + 远程GPU实时处理(回传至笔记本并展示)
通常情况下这种视频流的传递不会考虑同步方式,因为这需要发布方与接收端保持一致,对网络稳定性有较高的要求。
本地笔记本发布摄像头图像
这个demo需要注意以下几点:
- 设置发送端为请求响应模式
context.socket(zmq.REQ)
; - 阻塞等待服务器回传数据
pub_socket.recv()
;
import zmq
import cv2
import numpy as np
import timecontext = zmq.Context()# 发布原始数据
pub_socket = context.socket(zmq.REQ) # 使用请求响应模式
pub_socket.bind("tcp://*:5555") # 发布数据send_fps = 30
cap = cv2.VideoCapture(0)while True:start_time = time.time()ret, frame = cap.read()if not ret:continue# [可选] 图像降采样frame = cv2.pyrDown(frame)frame = cv2.pyrDown(frame)try:_, buffer = cv2.imencode('.jpg', frame) # 压缩图像pub_socket.send(buffer.tobytes()) # 发布数据print("Waitting for server processed.")processed_data = pub_socket.recv()processed_frame = cv2.imdecode(np.frombuffer(processed_data, dtype=np.uint8), 1)except zmq.Again:print("No image received, continue...")continuecv2.imshow("Processed Frame", processed_frame)if cv2.waitKey(1) & 0xFF == ord('q'):breaktime.sleep(max(1/send_fps - (time.time() - start_time), 0))cv2.destroyAllWindows()
运行:
$ python camera_pub_sync.py
GPU 服务器接受端
这个demo需要注意以下几点:
- 设置接受端为请求响应模式
context.socket(zmq.REP)
; - 阻塞接受发布端数据
sub_socket.recv()
; - 将处理好的数据进行同步回传
sub_socket.send(buffer.tobytes())
;
import argparse
import cv2
import numpy as np
import torch
import time
import zmqfrom depth_anything_v2.dpt import DepthAnythingV2context = zmq.Context()
sub_socket = context.socket(zmq.REP)
sub_socket.connect("tcp://192.168.75.201:5555")if __name__ == '__main__':parser = argparse.ArgumentParser(description='Depth Anything V2')parser.add_argument('--input-size', type=int, default=518)parser.add_argument('--encoder', type=str, default='vits', choices=['vits', 'vitb', 'vitl', 'vitg'])parser.add_argument('--pred-only', action='store_true', help='only display the prediction')parser.add_argument('--grayscale', action='store_true', help='do not apply colorful palette')args = parser.parse_args()DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu'model_configs = {'vits': {'encoder': 'vits', 'features': 64, 'out_channels': [48, 96, 192, 384]},'vitb': {'encoder': 'vitb', 'features': 128, 'out_channels': [96, 192, 384, 768]},'vitl': {'encoder': 'vitl', 'features': 256, 'out_channels': [256, 512, 1024, 1024]},'vitg': {'encoder': 'vitg', 'features': 384, 'out_channels': [1536, 1536, 1536, 1536]}}depth_anything = DepthAnythingV2(**model_configs[args.encoder])depth_anything.load_state_dict(torch.load(f'./models/depth_anything_v2_{args.encoder}.pth', map_location='cpu'))depth_anything = depth_anything.to(DEVICE).eval()margin_width = 50while True:start_time = time.time()# **优化 1: ZMQ 数据接收**try:msg = sub_socket.recv()zmq_time = time.time()# **优化 2: OpenCV 解码**raw_frame = cv2.imdecode(np.frombuffer(msg, dtype=np.uint8), 1)decode_time = time.time()# **优化 3: 模型推理**with torch.no_grad():depth = depth_anything.infer_image(raw_frame, args.input_size)infer_time = time.time()# **优化 4: 归一化 + OpenCV 伪彩色映射**depth = ((depth - depth.min()) / (depth.max() - depth.min()) * 255).astype(np.uint8)if args.grayscale:depth = np.repeat(depth[..., np.newaxis], 3, axis=-1)else:depth = cv2.applyColorMap(depth, cv2.COLORMAP_JET)process_time = time.time()# **优化 5: 合并图像**split_region = np.ones((raw_frame.shape[0], margin_width, 3), dtype=np.uint8) * 255combined_frame = cv2.hconcat([raw_frame, split_region, depth])cv2.imshow('Raw Frame and Depth Prediction', combined_frame)if cv2.waitKey(1) & 0xFF == ord('q'):breakprint(f"[{args.encoder}] Frame cost time: {time.time() - start_time:.4f} s")print(f" ZMQ receive: {zmq_time - start_time:.4f} s")print(f" Decode: {decode_time - zmq_time:.4f} s")print(f" Inference: {infer_time - decode_time:.4f} s")print(f" Processing: {process_time - infer_time:.4f} s")_, buffer = cv2.imencode('.jpg', combined_frame)sub_socket.send(buffer.tobytes()) # 发送回处理结果except zmq.Again:print("No msg received, skip...")continue # 没有消息就跳过cv2.destroyAllWindows()
运行:
$ python camera_recv_sync.py