目录
1、基于JavaCV
1.1、pom依赖
1.2、读取帧Frame
1.3、转换BufferedImage
1.4、完整代码
2、基于Ffmpeg命令
2.1、ffmpeg命令
2.2、读取帧
2.3、转换BufferedImage
2.4、完整代码
1、基于JavaCV
1.1、pom依赖
<dependency><groupId>org.bytedeco</groupId><artifactId>javacv</artifactId><version>1.5.8</version></dependency><dependency><groupId>org.bytedeco</groupId><artifactId>javacpp</artifactId><version>1.5.8</version></dependency><!-- 此版本中主要兼容linux和windows系统,如需兼容其他系统平台,请引入对应依赖即可 --><dependency><groupId>org.bytedeco</groupId><artifactId>opencv</artifactId><version>4.6.0-1.5.8</version><classifier>linux-x86_64</classifier></dependency><dependency><groupId>org.bytedeco</groupId><artifactId>opencv</artifactId><version>4.6.0-1.5.8</version><classifier>windows-x86_64</classifier></dependency><dependency><groupId>org.bytedeco</groupId><artifactId>openblas</artifactId><version>0.3.21-1.5.8</version><classifier>linux-x86_64</classifier></dependency><dependency><groupId>org.bytedeco</groupId><artifactId>openblas</artifactId><version>0.3.21-1.5.8</version><classifier>windows-x86_64</classifier></dependency><dependency><groupId>org.bytedeco</groupId><artifactId>ffmpeg</artifactId><version>5.1.2-1.5.8</version><classifier>linux-x86_64</classifier></dependency><dependency><groupId>org.bytedeco</groupId><artifactId>ffmpeg</artifactId><version>5.1.2-1.5.8</version><classifier>windows-x86_64</classifier></dependency>
1.2、读取帧Frame
FFmpegFrameGrabber frameGrabber = null;
try {
frameGrabber = new FFmpegFrameGrabber(rtsp);
frameGrabber.setOption("rtsp_transport", "tcp");
frameGrabber.startUnsafe();
int retry = 0;
while (this.captureDevice.getStatus()) {
Frame frame = frameGrabber.grabImage();
// long frameNumber = frameGrabber.getFrameNumber();
}
}
1.3、转换BufferedImage
Java2DFrameConverter jfc = new Java2DFrameConverter(); BufferedImage bufferedImage = jfc.getBufferedImage(frame);
1.4、完整代码
由于获取frame 经常 null帧,所以添加了循环。
长时间读取不到,或者rtsp连接断开等,添加断开重连机制。
import lombok.extern.slf4j.Slf4j;
import org.bytedeco.javacv.FFmpegFrameGrabber;
import org.bytedeco.javacv.Frame;
import org.bytedeco.javacv.Java2DFrameConverter;import java.awt.image.BufferedImage;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;@Slf4j
public class CaptureThread extends Thread {private String final rtsp ;public CaptureThread(String rtsp) {this.rtsp = rtsp;this.setName("CaptureId-1"); }@Overridepublic void run() { // 开启接收流 log.info("组装后的地址为=" + rtsp);FFmpegFrameGrabber frameGrabber = null;long count = 0;int error = 0;try {frameGrabber = new FFmpegFrameGrabber(rtsp);frameGrabber.setOption("rtsp_transport", "tcp");frameGrabber.startUnsafe();int retry = 0;while (true) {Frame frame = frameGrabber.grabImage();// long frameNumber = frameGrabber.getFrameNumber();if (null == frame) {if (error > 3) {break;}retry++;if (retry >= 10) {log.info("取帧失败,即将重启rtsp=" + rtsp);error++;Thread.sleep(200);//frameGrabber.restart();frameGrabber.releaseUnsafe();frameGrabber.startUnsafe();}continue;}retry = 0;error = 0;doSoftDecode(frame);}} catch (org.bytedeco.javacv.FrameGrabber.Exception ex) {log.error("视频获取失败", (Throwable) ex);} catch (Error error) {log.error("是否能捕获堆栈异常");} catch (Throwable ex) {log.error("帧处理失败", ex);} finally {if (frameGrabber != null) {try {frameGrabber.stop();frameGrabber.close();} catch (org.bytedeco.javacv.FrameGrabber.Exception e) {log.error(e.getMessage(), e);}}}}private void doSoftDecode(Frame frame) throws Exception {long start = System.currentTimeMillis();Java2DFrameConverter jfc = new Java2DFrameConverter();BufferedImage bufferedImage = jfc.getBufferedImage(frame);log.debug("解码耗时=" + (System.currentTimeMillis() - start));// TODO ...}
}
2、基于Ffmpeg命令
2.1、ffmpeg命令
ffmpeg -rtsp_transport tcp -i %s -f image2pipe -vcodec mjpeg -rtbufsize 20M -q:v 2 -
-rtsp_transport tcp
:使用 TCP 协议来传输 RTSP 流,以提高稳定性。-i rtsp://username:password@camera_ip:port/path
:指定 RTSP 流的 URL。-f image2pipe
:指定输出格式为图片流。-vcodec mjpeg
:使用 MJPEG 编解码器处理视频流,保持图像质量。-r 25
:设置帧率为 25 帧每秒。-rtbufsize 20M
:设置解码器缓冲区大小为 20MB,减少图像丢失和马赛克问题。-q:v 2
:设置图像质量,2 是较高的质量值(范围是 1-31,1 是最高质量)。
2.2、读取帧
使用独立线程读取数据流,process所在的ffmpeg 命令将流推送到 inputStream 中。内部线程循环读取。
Thread pt = new Thread(() -> {
try (InputStream errorStream = process.getErrorStream()) {
byte[] buffer = new byte[BUFFER_SIZE];
int bytesRead;
while ((bytesRead = errorStream.read(buffer)) != -1) {
log.error("异常" + bytesRead);
// System.err.write(buffer, 0, bytesRead);
}
} catch (IOException e) {
e.printStackTrace();
}
});
pt.start();
2.3、转换BufferedImage
读取的BufferedImage 也需要异步的处理,否则会造成 pt 线程阻塞。
使用CompletableFuture.supplyAsync 异步的返回结果,结合join()等待任务处理完成。
byte[] buffer = new byte[BUFFER_SIZE];
int bytesRead;
while (running && (bytesRead = inputStream.read(buffer)) != -1) {
ByteArrayInputStream bais = new ByteArrayInputStream(buffer, 0, bytesRead);
BufferedImage image = ImageIO.read(bais);
if (image != null) {
// 将图像保存操作移到后台线程中处理
Boolean flag = CompletableFuture.supplyAsync( () -> saveImg(image)).join();
}
}
2.4、完整代码
import lombok.extern.slf4j.Slf4j;import javax.imageio.ImageIO;
import java.awt.image.BufferedImage;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;/**** 实际的监控采集帧* @date 2024/7/19*/
@Slf4j
public class CaptureFfmpegThread extends Thread {/**重连延迟时间*/private static final int RECONNECT_DELAY_MS = 5000;/**缓冲区大小*/private static final int BUFFER_SIZE = 5 * 1024 * 1024;private static final String FFMPEG_CMD_TEMPLATE = "ffmpeg -rtsp_transport tcp -i %s -f image2pipe -vcodec mjpeg -rtbufsize 20M -q:v 2 -";/**通过外部控制线程中断,停止采集*/private boolean running;private final String rtspUrl;public CaptureFfmpegThread(String rtsp) {this.setName("CaptureId-1" );this.rtspUrl = rtsp;this.running = true;}@Overridepublic void run() { log.info("组装后的地址为=" + rtspUrl);while (true) {try {String ffmpegCmd = String.format(FFMPEG_CMD_TEMPLATE, rtspUrl);Process process = Runtime.getRuntime().exec(ffmpegCmd);InputStream inputStream = process.getInputStream();// 处理错误流以避免阻塞Thread pt = new Thread(() -> {try (InputStream errorStream = process.getErrorStream()) {byte[] buffer = new byte[BUFFER_SIZE];int bytesRead;while ((bytesRead = errorStream.read(buffer)) != -1) {log.error("异常" + bytesRead);// System.err.write(buffer, 0, bytesRead);}} catch (IOException e) {e.printStackTrace();}});pt.start();byte[] buffer = new byte[BUFFER_SIZE];int bytesRead;while (running && (bytesRead = inputStream.read(buffer)) != -1) {ByteArrayInputStream bais = new ByteArrayInputStream(buffer, 0, bytesRead);BufferedImage image = ImageIO.read(bais);if (image != null) {// 将图像保存操作移到后台线程中处理Boolean flag = CompletableFuture.supplyAsync( () -> saveImg(image)).join();}}process.waitFor();log.info("FFmpeg process ended for stream: " + rtspUrl + ", attempting to restart...");} catch (IOException | InterruptedException e) {log.info("Error occurred for stream: " + rtspUrl + " - " + e.getMessage());}try {log.info("Waiting before attempting to reconnect stream: " + rtspUrl);TimeUnit.MILLISECONDS.sleep(RECONNECT_DELAY_MS);} catch (InterruptedException e) {log.info("Reconnect delay interrupted for stream: " + rtspUrl + " - " + e.getMessage());} finally {if(!running){break;}}}}private boolean saveImg(BufferedImage image){// TODO ...return true;}}