利用YOLO模型进行高速公路交通流量分析
识别效果:
免责声明
本文所提供的信息和内容仅供参考。尽管我尽力确保所提供信息的准确性和可靠性,但我们不对其完整性、准确性或及时性作出任何保证。使用本文信息所造成的任何直接或间接损失,本人概不负责。请在做出任何决策之前自行进行必要的研究和咨询专业人士。
无偿提供代码,感谢点赞
、收藏
与关注
!
随着交通管理的日益重要,自动化监测系统的需求也随之增加。利用计算机视觉技术,我们可以高效地分析视频中的车辆流量,提升交通管理的科学性和有效性。本文将详细介绍如何使用YOLO(You Only Look Once)模型分析高速公路的交通流,结合视频数据提取车辆信息、速度计算和流量分析。整个过程将结合代码,逐步解析每个步骤的细节及其背后的原理。
1. 问题背景
在交通流量分析中,实时监测和处理车辆数据是一个具有挑战性的任务。我们希望能够快速且准确地识别视频中的车辆,并计算它们的速度、流量和密度。为此,我们将使用YOLO模型来进行目标检测,结合一些图像处理技术,分析交通状况。
2. 准备工作
在开始之前,我们需要确保安装必要的库,包括OpenCV、Pandas、NumPy、YOLO等。以下是我们将用到的库:
import cv2
import pandas as pd
import numpy as np
from ultralytics import YOLO
from datetime import datetime
from scipy.spatial import distance
3. 计算视频时长
首先,我们需要计算视频的时长,以便在后续处理中使用。以下是计算视频时长的函数:
def calculate_duration(start_time, end_time):fmt = '%H:%M:%S'start = datetime.strptime(start_time, fmt)end = datetime.strptime(end_time, fmt)duration = end - startreturn duration
在这个函数中,我们使用datetime
模块将输入的开始时间和结束时间转换为datetime
对象,并计算它们之间的差值,这就是视频的总时长。
4. 获取视频信息
我们需要从视频中提取一些关键信息,例如总帧数、帧率和每帧的时间间隔。可以使用以下函数来实现:
def get_video_info(video_path, duration):cap = cv2.VideoCapture(video_path)if not cap.isOpened():raise ValueError("无法打开视频文件")total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))fps = cap.get(cv2.CAP_PROP_FPS)frame_time = 1 / fps if fps > 0 else 0interval_time = duration.total_seconds() / total_frames if total_frames > 0 else 0cap.release()return total_frames, fps, frame_time, interval_time
这个函数会返回视频的总帧数、帧率、每帧时间以及每帧之间的时间间隔,供后续处理使用。
初始化路径和视频时长
接下来,我们需要初始化视频路径和时长:
video_path = './高速公路交通流数据/32.31.250.103/20240501_20240501125647_20240501140806_125649.mp4'
duration = calculate_duration('12:56:47', '14:08:05')
total_frames, fps, frame_time, interval_time = get_video_info(video_path, duration)
5. 加载YOLO模型
我们将加载预训练的YOLOv8模型以进行目标检测:
model = YOLO('yolov8n.pt')
这一步确保我们有一个强大的目标检测模型,可以处理视频中的车辆检测。
6. 处理视频帧
我们将处理视频的前10%帧,以减少计算量并专注于关键时刻。以下是相关代码:
num_frames_to_process = total_frames // 1
frame_count = 0# 存储车辆信息
vehicle_data = {}
unique_vehicle_counter = 0
data = []
6.1 定义参数
我们还需要定义一些判断参数,以帮助后续判断车辆是否为同一辆车:
IOU_THRESHOLD = 0.3
MAX_MOVEMENT_DISTANCE = 50
MAX_SIZE_DIFFERENCE = 0.2
COLOR_THRESHOLD = 30
- IOU_THRESHOLD:用于判断车辆边界框重叠的阈值。
- MAX_MOVEMENT_DISTANCE:判断车辆在相邻帧之间的最大移动距离。
- MAX_SIZE_DIFFERENCE:判断车辆大小差异的最大阈值。
- COLOR_THRESHOLD:用于判断颜色差异的阈值。
7. 定义辅助函数
7.1 计算IoU(交并比)
def calculate_iou(box1, box2):x1, y1, x2, y2 = box1x1b, y1b, x2b, y2b = box2inter_x1 = max(x1, x1b)inter_y1 = max(y1, y1b)inter_x2 = min(x2, x2b)inter_y2 = min(y2, y2b)inter_area = max(0, inter_x2 - inter_x1) * max(0, inter_y2 - inter_y1)box1_area = (x2 - x1) * (y2 - y1)box2_area = (x2b - x1b) * (y2b - y1b)union_area = box1_area + box2_area - inter_areareturn inter_area / union_area if union_area > 0 else 0
此函数计算两个边界框的交并比(IoU),用于判断两个检测框的重叠程度。
7.2 计算中心距离
def calculate_center_distance(box1, box2):center1 = ((box1[0] + box1[2]) / 2, (box1[1] + box1[3]) / 2)center2 = ((box2[0] + box2[2]) / 2, (box2[1] + box2[3]) / 2)return distance.euclidean(center1, center2)
计算两个边界框中心点之间的距离,以判断车辆是否为同一辆车。
7.3 提取车辆颜色
def extract_vehicle_color(frame, box):x1, y1, x2, y2 = map(int, box)vehicle_roi = frame[y1:y2, x1:x2]hsv_roi = cv2.cvtColor(vehicle_roi, cv2.COLOR_BGR2HSV)return cv2.mean(hsv_roi)[:3]
提取车辆在当前帧中的颜色,便于后续判断车辆是否相同。
7.4 计算颜色差异
def calculate_color_difference(color1, color2):return np.linalg.norm(np.array(color1) - np.array(color2))
计算两个颜色之间的差异,为车辆匹配提供依据。
7.5 判断是否为同一辆车
def is_same_vehicle(vehicle_info, new_box, new_color):prev_box = vehicle_info["box"]prev_color = vehicle_info["color"]iou = calculate_iou(prev_box, new_box)center_distance = calculate_center_distance(prev_box, new_box)prev_size = (prev_box[2] - prev_box[0]) * (prev_box[3] - prev_box[1])new_size = (new_box[2] - new_box[0]) * (new_box[3] - new_box[1])size_difference = abs(new_size - prev_size) / prev_size if prev_size > 0 else 1color_difference = calculate_color_difference(prev_color, new_color)return (iou > IOU_THRESHOLDand center_distance < MAX_MOVEMENT_DISTANCEand size_difference < MAX_SIZE_DIFFERENCEand color_difference < COLOR_THRESHOLD)
这个函数综合考虑了IoU、中心距离、大小差异和颜色差异,来判断检测到的车辆是否与之前记录的车辆相同。
7.6 车型类别映射
class_map = {2: 'car',7: 'truck',5: 'bus',3: 'motorcycle',1: 'bicycle',
}
我们定义了一个字典,用于将YOLO模型输出的类别编号映射到相应的车辆类型。
8. 处理视频帧
我们开始逐帧处理视频,检测车辆并记录相关信息:
cap = cv2.VideoCapture(video_path)start_time_seconds = 12 * 3600 + 56 * 60 + 47 # 起始时间转换为秒
frame_count = 0
LANE_WIDTH = 3 # 车道宽度(米)
LANE_LENGTH = 30 # 识别范围的长度(米)# 用于存储流量计算的车辆ID
unique_vehicles_in_interval = set()while cap.isOpened() and frame_count < num_frames_to_process:ret, frame = cap.read()if not ret:breakresults = model(frame)current_frame_vehicle_ids = []vehicle_speeds = []truck_count = 0other_count = 0# 处理检测结果for result in results:for box in result.boxes:coords = box.xyxy[0]x1, y1, x2, y2 = coords[0], coords[1], coords[2], coords[3]cv2.rectangle(frame, (int(x1), int(y1)), (int(x2), int(y2)), (0, 255, 0), 2)vehicle_class = int(box.cls.item())vehicle_type = class_map.get(vehicle_class, 'unknown')if vehicle_type == 'truck':truck_count += 1else:other_count += 1vehicle_color = extract_vehicle_color(frame, (x1, y1, x2, y2))matched_vehicle_id = Nonefor vehicle_id, vehicle_info in vehicle_data.items():if is_same_vehicle(vehicle_info, (x1, y1, x2, y2), vehicle_color):matched_vehicle_id = vehicle_idbreakif matched_vehicle_id is None:unique_vehicle_counter += 1matched_vehicle_id = unique_vehicle_countervehicle_data[matched_vehicle_id] = {"box": (x1, y1, x2, y2),"color": vehicle_color,"first_frame": frame_count,"last_frame": frame_count}else:vehicle_data[matched_vehicle_id]["last_frame"] = frame_countvehicle_data[matched_vehicle_id]["box"] = (x1, y1, x2, y2)vehicle_data[matched_vehicle_id]["color"] = vehicle_colorcurrent_frame_vehicle_ids.append(matched_vehicle_id)
在这段代码中,我们使用YOLO模型检测当前帧中的车辆,并提取每辆车的边界框、类型和颜色。通过判断条件,我们可以将车辆与已知车辆进行匹配,确保我们能够跟踪同一辆车。
9. 计算车辆速度
根据检测到的车辆信息,我们可以计算车辆的速度。以下是速度计算的代码:
# 计算车辆速度
for vehicle_id in current_frame_vehicle_ids:vehicle_info = vehicle_data[vehicle_id]first_frame = vehicle_info["first_frame"]last_frame = vehicle_info["last_frame"]frame_diff = last_frame - first_frame + 1if frame_diff > 0:vehicle_speed = 30 / (frame_diff * 0.4) # 计算速度的公式vehicle_speeds.append(vehicle_speed)
根据车辆在视频中的帧差和已知车道长度,计算每辆车的速度。
10. 过滤速度异常值
为了确保计算结果的可靠性,我们对速度进行过滤,去除异常值:
filtered_speeds = []
if vehicle_speeds:lower_bound = 0upper_bound = 40filtered_speeds = [speed for speed in vehicle_speeds if lower_bound <= speed <= upper_bound]
在这里,我们设定了速度的上下限,仅保留在该范围内的速度数据。
11. 计算流量和密度
在每20帧中,我们将计算流量和车辆密度:
unique_vehicles_in_interval.update(current_frame_vehicle_ids)
if (frame_count + 1) % 20 == 0:flow = len(unique_vehicles_in_interval) # 每20帧的唯一车辆数unique_vehicles_in_interval.clear() # 清空当前周期的车辆ID
else:flow = None
通过维护一个唯一车辆ID的集合,我们能够计算特定时间间隔内的流量。
12. 输出信息
最后,我们将输出当前帧的分析结果,并将所有数据存储在一个列表中,便于后续分析:
current_time_seconds = start_time_seconds + frame_count * interval_time
current_time = datetime.utcfromtimestamp(current_time_seconds).strftime('%H:%M:%S')# 输出信息
total_vehicles = truck_count + other_count
print(f"当前帧: {frame_count}, 时间: {current_time}, 卡车数: {truck_count}, 其他车辆数: {other_count}, 总车辆数: {total_vehicles}")
print(f"车辆编号集合: {current_frame_vehicle_ids}")
print(f"原始车辆速度: {vehicle_speeds}")
print(f"过滤后的车辆速度: {filtered_speeds}")
print(f"每帧平均速度: {average_speed:.2f}")
print(f"流量: {flow}辆 " if flow is not None and density is not None else "")
print(f"密度: {density:.2f}辆/米")
# 存储帧相关数据
data.append({'timestamp': current_time, 'frame': frame_count,'total_vehicles': total_vehicles,'trucks': truck_count,'others': other_count,'vehicle_ids': current_frame_vehicle_ids, 'original_speeds': vehicle_speeds, 'filtered_speeds': filtered_speeds, 'average_speed': average_speed, 'flow': flow, 'density': density
})
13. 显示结果
最后,我们将每帧的结果显示出来,并在按下’q’键时退出视频:
cv2.imshow('frame', frame)
frame_count += 1
if cv2.waitKey(1) & 0xFF == ord('q'):break
14. 保存结果
完成视频处理后,我们将分析结果转换为DataFrame,并保存到Excel文件中,以便后续分析:
df = pd.DataFrame(data)
df.to_excel('./video_analysis_with_color.xlsx', index=False)
输出表格展示:
15. 总结
通过上述步骤,我们实现了一个基于YOLO模型的高速公路交通流量分析系统。该系统不仅可以检测和跟踪车辆,还能计算它们的速度、流量和密度,为交通管理提供重要的数据支持。随着技术的不断进步,我们期待在交通流量监测方面能够实现更高的自动化和智能化。
完整代码
import cv2
import pandas as pd
import numpy as np
from ultralytics import YOLO
from datetime import datetime
from scipy.spatial import distance# 计算视频时长
def calculate_duration(start_time, end_time):fmt = '%H:%M:%S'start = datetime.strptime(start_time, fmt)end = datetime.strptime(end_time, fmt)duration = end - startreturn duration# 获取视频信息
def get_video_info(video_path, duration):cap = cv2.VideoCapture(video_path)if not cap.isOpened():raise ValueError("无法打开视频文件")total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))fps = cap.get(cv2.CAP_PROP_FPS)frame_time = 1 / fps if fps > 0 else 0interval_time = duration.total_seconds() / total_frames if total_frames > 0 else 0cap.release()return total_frames, fps, frame_time, interval_time# 初始化路径和视频时长
video_path = './高速公路交通流数据/32.31.250.103/20240501_20240501125647_20240501140806_125649.mp4'
duration = calculate_duration('12:56:47', '14:08:05')
total_frames, fps, frame_time, interval_time = get_video_info(video_path, duration)# 加载预训练的YOLOv8模型
model = YOLO('yolov8n.pt')# 处理前10%的帧
num_frames_to_process = total_frames // 100
frame_count = 0# 存储车辆信息
vehicle_data = {}
unique_vehicle_counter = 0
data = []# 定义IoU阈值以及其他判断参数
IOU_THRESHOLD = 0.3
MAX_MOVEMENT_DISTANCE = 50
MAX_SIZE_DIFFERENCE = 0.2
COLOR_THRESHOLD = 30# 计算IoU(交并比)
def calculate_iou(box1, box2):x1, y1, x2, y2 = box1x1b, y1b, x2b, y2b = box2inter_x1 = max(x1, x1b)inter_y1 = max(y1, y1b)inter_x2 = min(x2, x2b)inter_y2 = min(y2, y2b)inter_area = max(0, inter_x2 - inter_x1) * max(0, inter_y2 - inter_y1)box1_area = (x2 - x1) * (y2 - y1)box2_area = (x2b - x1b) * (y2b - y1b)union_area = box1_area + box2_area - inter_areareturn inter_area / union_area if union_area > 0 else 0# 计算中心距离
def calculate_center_distance(box1, box2):center1 = ((box1[0] + box1[2]) / 2, (box1[1] + box1[3]) / 2)center2 = ((box2[0] + box2[2]) / 2, (box2[1] + box2[3]) / 2)return distance.euclidean(center1, center2)# 提取车辆颜色
def extract_vehicle_color(frame, box):x1, y1, x2, y2 = map(int, box)vehicle_roi = frame[y1:y2, x1:x2]hsv_roi = cv2.cvtColor(vehicle_roi, cv2.COLOR_BGR2HSV)return cv2.mean(hsv_roi)[:3]# 计算颜色差异
def calculate_color_difference(color1, color2):return np.linalg.norm(np.array(color1) - np.array(color2))# 判断车辆是否为同一辆车
def is_same_vehicle(vehicle_info, new_box, new_color):prev_box = vehicle_info["box"]prev_color = vehicle_info["color"]iou = calculate_iou(prev_box, new_box)center_distance = calculate_center_distance(prev_box, new_box)prev_size = (prev_box[2] - prev_box[0]) * (prev_box[3] - prev_box[1])new_size = (new_box[2] - new_box[0]) * (new_box[3] - new_box[1])size_difference = abs(new_size - prev_size) / prev_size if prev_size > 0 else 1color_difference = calculate_color_difference(prev_color, new_color)return (iou > IOU_THRESHOLDand center_distance < MAX_MOVEMENT_DISTANCEand size_difference < MAX_SIZE_DIFFERENCEand color_difference < COLOR_THRESHOLD)# 车型类别映射
class_map = {2: 'car',7: 'truck',5: 'bus',3: 'motorcycle',1: 'bicycle',
}# 打开视频文件
cap = cv2.VideoCapture(video_path)start_time_seconds = 12 * 3600 + 56 * 60 + 47 # 起始时间转换为秒
frame_count = 0
LANE_WIDTH = 3 # 车道宽度(米)
LANE_LENGTH = 30 # 识别范围的长度(米)# 用于存储流量计算的车辆ID
unique_vehicles_in_interval = set()while cap.isOpened() and frame_count < num_frames_to_process:ret, frame = cap.read()if not ret:breakresults = model(frame)current_frame_vehicle_ids = []vehicle_speeds = []truck_count = 0other_count = 0# 处理检测结果for result in results:for box in result.boxes:coords = box.xyxy[0]x1, y1, x2, y2 = coords[0], coords[1], coords[2], coords[3]cv2.rectangle(frame, (int(x1), int(y1)), (int(x2), int(y2)), (0, 255, 0), 2)vehicle_class = int(box.cls.item())vehicle_type = class_map.get(vehicle_class, 'unknown')if vehicle_type == 'truck':truck_count += 1else:other_count += 1vehicle_color = extract_vehicle_color(frame, (x1, y1, x2, y2))matched_vehicle_id = Nonefor vehicle_id, vehicle_info in vehicle_data.items():if is_same_vehicle(vehicle_info, (x1, y1, x2, y2), vehicle_color):matched_vehicle_id = vehicle_idbreakif matched_vehicle_id is None:unique_vehicle_counter += 1matched_vehicle_id = unique_vehicle_countervehicle_data[matched_vehicle_id] = {"box": (x1, y1, x2, y2),"color": vehicle_color,"first_frame": frame_count,"last_frame": frame_count}else:vehicle_data[matched_vehicle_id]["last_frame"] = frame_countvehicle_data[matched_vehicle_id]["box"] = (x1, y1, x2, y2)vehicle_data[matched_vehicle_id]["color"] = vehicle_colorcurrent_frame_vehicle_ids.append(matched_vehicle_id)# 计算车辆速度for vehicle_id in current_frame_vehicle_ids:vehicle_info = vehicle_data[vehicle_id]first_frame = vehicle_info["first_frame"]last_frame = vehicle_info["last_frame"]frame_diff = last_frame - first_frame + 1if frame_diff > 0:vehicle_speed = 30 / (frame_diff * 0.4) # 计算速度的公式vehicle_speeds.append(vehicle_speed)# 过滤速度异常值filtered_speeds = []if vehicle_speeds:lower_bound = 0upper_bound = 40filtered_speeds = [speed for speed in vehicle_speeds if lower_bound <= speed <= upper_bound]# 计算每一帧的平均速度average_speed = np.mean(filtered_speeds) if filtered_speeds else 0# 计算流量和密度unique_vehicles_in_interval.update(current_frame_vehicle_ids)if (frame_count + 1) % 20 == 0:flow = len(unique_vehicles_in_interval) # 每20帧的唯一车辆数unique_vehicles_in_interval.clear() # 清空当前周期的车辆IDelse:flow = None# 计算实际时间current_time_seconds = start_time_seconds + frame_count * interval_timecurrent_time = datetime.utcfromtimestamp(current_time_seconds).strftime('%H:%M:%S')# 输出信息total_vehicles = truck_count + other_countdensity = total_vehicles / (LANE_WIDTH * LANE_LENGTH) # 单位长度内的车辆数print(f"当前帧: {frame_count}, 时间: {current_time}, 卡车数: {truck_count}, 其他车辆数: {other_count}, 总车辆数: {total_vehicles}")print(f"车辆编号集合: {current_frame_vehicle_ids}")print(f"原始车辆速度: {vehicle_speeds}")print(f"过滤后的车辆速度: {filtered_speeds}")print(f"每帧平均速度: {average_speed:.2f}")print(f"流量: {flow}辆 " if flow is not None and density is not None else "")print(f"密度: {density:.2f}辆/米")# 存储帧相关数据data.append({'timestamp': current_time, # 当前帧的时间戳'frame': frame_count,'total_vehicles': total_vehicles,'trucks': truck_count,'others': other_count,'vehicle_ids': current_frame_vehicle_ids, # 记录当前帧车辆编号'original_speeds': vehicle_speeds, # 原始速度'filtered_speeds': filtered_speeds, # 过滤后的速度'average_speed': average_speed, # 每帧平均速度'flow': flow, # 流量'density': density # 密度})# 显示结果cv2.imshow('frame', frame)# 更新帧计数器frame_count += 1if cv2.waitKey(1) & 0xFF == ord('q'):break
cap.release()
cv2.destroyAllWindows()# 转换为DataFrame并保存到Excel
df = pd.DataFrame(data)
df.to_excel('./video_analysis_with_color.xlsx', index=False)
再次提醒:本文内容仅供参考,作者对此不负任何责任。希望读者能够从中获得启发,继续探索交通流量分析的更深入领域。