最近在做水面垃圾识别的智能船 用到了yolov8进行目标检测 修改并添加了SEAttention注意力机制
详情见其他大神
【保姆级教程|YOLOv8添加注意力机制】【1】添加SEAttention注意力机制步骤详解、训练及推理使用_yolov8添加se-CSDN博客
并且修改传统的iou方法改为添加了wise-iou的方法 ,对于小目标,传统的IoU可能不够敏感,因为即使是微小的偏移也可能导致IoU显著下降。Wise-IoU通过加权可以更公平地对待小目标,从而提高小目标检测的性能 ,这对于我们船体的摄像头 查找远处或较小漂浮物起到了一定作用。
好了,回归正题。我们写了一个脚本 用于收集识别后的标框和参数信息 将这些信息存储进一个jsonl文件中 启用两个线程 在jetson nano b01 4gb的板子上进行运行 。
目的: 通过存储这些信息我们可以用于计算 例如计算到屏幕正下方的距离 可以做些简单的计算和路径规划等问题 后续我们还在完成这份工作
话不多说,我们先上传代码。 该代码结合gpt添加了许多注释 (真的很多,组里有人看不懂代码 所以写的时候只能加很多注释并让gpt规范格式)不过这样也方便大家的阅读和使用
以下是源码环节:
import cv2
from ultralytics import YOLO
import datetime
import json
import threading
import queue
import time # 导入 time 模块# 队列用于线程间通信
data_queue = queue.Queue()# 事件用于通知其他线程停止
stop_event = threading.Event()# 将 id_counter 定义为全局变量
id_counter = 0# 修改 detection_data 的定义,去掉 timestamp 并添加 id 作为第一个元素
def process_frames(model, cap):"""对摄像头捕捉到的视频帧进行处理,使用YOLO模型进行目标检测,并将结果放入队列中。Args:model (YOLO): YOLO目标检测模型实例。cap (cv2.VideoCapture): 摄像头视频流对象。Returns:None"""global id_counter # 确保在函数内部使用的是全局变量id_counterwhile not stop_event.is_set(): # 当停止事件未设置时,循环继续ret, frame = cap.read() # 从摄像头读取一帧if not ret: # 如果无法读取帧(摄像头可能已断开)print("无法接收帧(可能是摄像头断开)")break # 跳出循环results = model(frame) # 使用模型处理帧for result in results: # 遍历模型检测结果boxes = result.boxes # 获取检测到的边界框for box in boxes: # 遍历每个边界框x1, y1, x2, y2 = map(int, box.xyxy[0]) # 提取边界框坐标confidence = round(float(box.conf[0]), 3) # 提取置信度cls = int(box.cls[0]) # 提取类别索引label = model.names[cls] # 获取类别名称detection_data = { # 构造检测数据字典"id": id_counter,"x1": x1,"y1": y1,"x2": x2,"y2": y2,"confidence": confidence,"label": label}data_queue.put(detection_data) # 将检测数据放入队列# 绘制检测框和标签cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2) # 绘制绿色矩形框cv2.putText(frame, f"{label} {confidence:.2f}", (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5,(0, 255, 0), 2) # 在框上方绘制标签和置信度id_counter += 1 # 在每次处理后增加 ID 计数器now = datetime.datetime.now() # 获取当前时间time_str = now.strftime("%Y-%m-%d %H:%M:%S") # 格式化时间字符串cv2.putText(frame, time_str, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2, cv2.LINE_AA) # 在画面上显示时间cv2.imshow('Camera', frame) # 显示画面if cv2.waitKey(1) & 0xFF == ord('q'): # 如果用户按下 'q' 键stop_event.set() # 设置停止事件break # 跳出循环# 添加 sleep 以控制帧率time.sleep(1) # 每隔1秒处理一帧cap.release() # 释放摄像头资源cv2.destroyAllWindows() # 关闭所有OpenCV窗口# write_to_file 函数不需要再处理 detection_data 中的 timestamp
def write_to_file(output_file, max_lines=1000):line_count = 0 # 记录写入的行数with open(output_file, 'w') as f: # 初始以写模式打开文件,清空文件内容while True:try:detection_data = data_queue.get(timeout=1) # 尝试从队列获取数据f.write(json.dumps(detection_data) + '\n')line_count += 1if line_count >= max_lines:print(f"已达到{max_lines}行数据,清空文件并继续运行。")f.close() # 关闭当前文件句柄f = open(output_file, 'w') # 重新打开文件,清空内容line_count = 0 # 重置行数计数器except queue.Empty: # 如果队列为空,则等待下一次尝试if stop_event.is_set(): # 检查是否需要退出returncontinuedef run_yolov8_detection(model_path="./yolov8n.pt", camera_id=0, output_file="ultralytics-main/detector.jsonl"):"""运行 YOLOv8 目标检测算法。Args:model_path (str, optional): YOLOv8 模型文件路径,默认为 "./yolov8n.pt"。camera_id (int, optional): 摄像头设备 ID,默认为 0。output_file (str, optional): 输出文件路径,默认为 "ultralytics-main/detector.jsonl"。Returns:None"""# 初始化YOLO模型model = YOLO(model_path)# 打开摄像头cap = cv2.VideoCapture(camera_id)# 检查摄像头是否成功打开if not cap.isOpened():print("无法打开摄像头")return# 创建并启动两个线程# 第一个线程用于处理摄像头捕捉到的帧processing_thread = threading.Thread(target=process_frames, args=(model, cap))# 第二个线程用于将处理后的帧写入文件writing_thread = threading.Thread(target=write_to_file, args=(output_file,))# 启动两个线程processing_thread.start()writing_thread.start()# 等待两个线程完成# 等待处理帧的线程完成processing_thread.join()# 等待写入文件的线程完成writing_thread.join()# 调用函数
run_yolov8_detection()
如有问题请及时私信,欢迎大家指正!!!