需求:通过私有的api我可以不断收到byte[]形式的视频数据,现在我需要处理这些数据,最终推送出RTMP流。
实现:通过管道流将不断收到的byte[]视频数据转化为输入流然后提供给JavaCV的FFmpegFrameGrabber使用,然后通过FFmpegFrameRecorder将视频数据推送至指定RTMP服务器(这个通过mediamtx实现)。
效果图
VLC播放
关键依赖
<dependency><groupId>org.bytedeco</groupId><artifactId>javacv-platform</artifactId><version>1.5.9</version></dependency>
完整的Demo代码
import lombok.extern.slf4j.Slf4j;
import org.bytedeco.ffmpeg.avcodec.AVCodecParameters;
import org.bytedeco.ffmpeg.avformat.AVFormatContext;
import org.bytedeco.ffmpeg.avformat.AVStream;
import org.bytedeco.ffmpeg.global.avcodec;
import org.bytedeco.ffmpeg.global.avutil;
import org.bytedeco.javacv.FFmpegFrameGrabber;
import org.bytedeco.javacv.FFmpegFrameRecorder;
import org.bytedeco.javacv.FFmpegLogCallback;
import org.bytedeco.javacv.Frame;
import org.jfjy.ch2ji.ecctv.dh.api.ApiService;
import org.jfjy.ch2ji.ecctv.dh.callback.RealPlayCallback;import java.io.*;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;import static org.bytedeco.ffmpeg.global.avutil.AV_LOG_DEBUG;
import static org.bytedeco.ffmpeg.global.avutil.AV_LOG_INFO;@Slf4j
public class GetBytes2PipedStreamAndPushRTMP {private static final String SRS_PUSH_ADDRESS = "rtmp://127.0.0.1:1935/live/livestream";static int BUFFER_CAPACITY = 1024 * 1024;public static void main(String[] args) throws Exception {FFmpegLogCallback.set();FFmpegLogCallback.setLevel(AV_LOG_DEBUG);ApiService apiService = new ApiService();Long login = apiService.login("10.3.0.54", 8801, "admin", "xxxx");PipedInputStream inputStream = new PipedInputStream(BUFFER_CAPACITY);PipedOutputStream outputStream = new PipedOutputStream(inputStream);ExecutorService executorService = Executors.newFixedThreadPool(2);executorService.execute(new Runnable() {@Overridepublic void run() {List<byte[]> bytesArray = new ArrayList<>();apiService.startRealPlay(new RealPlayCallback<Long, Integer, byte[]>() {@Overridepublic void apply(Long aLong, Integer integer, byte[] bytes) {log.info("收到视频数据,类型:{},字节:{}", integer, bytes.length);//之所以没在这里写入管道流,是因为每次回调都会创建新的线程,而管道流要求只能在一个线程中写入,否则会出错。//所以这里把数据丢给了集合对象synchronized (bytesArray) {bytesArray.add(bytes);}}}, 0, 0);try {while (true) {synchronized (bytesArray) {//将视频数据写入管道流for (byte[] bytes : bytesArray) {outputStream.write(bytes);}outputStream.flush();bytesArray.clear();}Thread.sleep(100);}} catch (IOException e) {throw new RuntimeException(e);} catch (InterruptedException e) {throw new RuntimeException(e);}}});executorService.execute(new Runnable() {@Overridepublic void run() {boolean isStartPush = false;log.info("推送数据线程启动");while (true) {try {//当管道输入流有数据后则开始推送,只需要调用一次if (!isStartPush && inputStream.available() > 0) {log.info("推送任务开始执行| available size : {}",inputStream.available());grabAndPush(inputStream, SRS_PUSH_ADDRESS);}Thread.sleep(500);} catch (Exception e) {throw new RuntimeException(e);}}}});while (true) {}}private static synchronized void grabAndPush(InputStream inputStream, String pushAddress) throws Exception {avutil.av_log_set_level(AV_LOG_INFO);FFmpegLogCallback.set();FFmpegFrameGrabber grabber = new FFmpegFrameGrabber(inputStream,0);grabber.setFormat("dhav");grabber.startUnsafe();AVFormatContext avFormatContext = grabber.getFormatContext();int streamNum = avFormatContext.nb_streams();if (streamNum < 1) {log.error("no media!");return;}int frameRate = (int) grabber.getVideoFrameRate();if (0 == frameRate) {frameRate = 15;}log.info("frameRate[{}],duration[{}]秒,nb_streams[{}]",frameRate,avFormatContext.duration() / 1000000,avFormatContext.nb_streams());for (int i = 0; i < streamNum; i++) {AVStream avStream = avFormatContext.streams(i);AVCodecParameters avCodecParameters = avStream.codecpar();log.info("stream index[{}],codec type[{}],codec ID[{}]", i, avCodecParameters.codec_type(), avCodecParameters.codec_id());}int frameWidth = grabber.getImageWidth();int frameHeight = grabber.getImageHeight();int audioChannels = grabber.getAudioChannels();log.info("frameWidth[{}],frameHeight[{}],audioChannels[{}]",frameWidth,frameHeight,audioChannels);FFmpegFrameRecorder recorder = new FFmpegFrameRecorder(pushAddress,frameWidth,frameHeight,audioChannels);recorder.setVideoCodec(avcodec.AV_CODEC_ID_H264);recorder.setInterleaved(true);recorder.setFormat("flv");recorder.setFrameRate(frameRate);recorder.setGopSize(frameRate);recorder.setAudioChannels(grabber.getAudioChannels());recorder.start();Frame frame;log.info("start push");int videoFrameNum = 0;int audioFrameNum = 0;int dataFrameNum = 0;int interVal = 1000 / frameRate;interVal /= 8;while (null != (frame = grabber.grab())) {if (null != frame.image) {videoFrameNum++;}if (null != frame.samples) {audioFrameNum++;}if (null != frame.data) {dataFrameNum++;}recorder.record(frame);Thread.sleep(interVal);}log.info("push complete,videoFrameNum[{}],audioFrameNum[{}],dataFrameNum[{}]",videoFrameNum,audioFrameNum,dataFrameNum);recorder.close();grabber.close();}}