GB28181学习(十七)——基于jrtplib实现tcp被动和主动发流

前言

GB/T28181-2022实时流的传输方式介绍:https://blog.csdn.net/www_dong/article/details/134255185

基于jrtplib实现tcp被动和主动收流介绍:https://blog.csdn.net/www_dong/article/details/134451387

本文主要介绍下级平台或设备发流功能,用于对接特定的SIP服务器或上级平台。

UDP发流

流程图

在这里插入图片描述

发送端流程

  • 初始化rtp参数;
  • 裸流数据做PS复用;
  • 组RTP包发送;

设计

  1. 初始化rtp参数
int CUdp::InitRtp_()
{RTPSessionParams sessionParams;sessionParams.SetMinimumRTCPTransmissionInterval(10);sessionParams.SetOwnTimestampUnit(1.0 / 90000.0);sessionParams.SetAcceptOwnPackets(true);sessionParams.SetMaximumPacketSize(1450);RTPUDPv4TransmissionParams transParams;transParams.SetRTPSendBuffer(2*1024*1024);transParams.SetBindIP(m_ip);transParams.SetPortbase((uint16_t)m_port);if (0 != Create(sessionParams, &transParams)){return -1;}SetDefaultPayloadType((uint8_t)m_payload);SetDefaultTimestampIncrement(3600);SetDefaultMark(true);RTPIPv4Address addr(ntohl(inet_addr(m_ip), (uint16_t)m_port);if(0 != AddDestination(addr)){return -1;}return 0;
}
  1. 流数据复用为PS
// 使用ireader开源库进行ps复用
// 初始化
CData2PS::CData2PS()
{struct ps_muxer_func_t func;func.alloc = Alloc;func.free = Free;func.write = Packet;m_ps = ps_muxer_create(&func, this);// TODO codecid待补充m_ps_stream = ps_muxer_add_stream(m_ps, PSI_STREAM_H264, nullptr, 0);
}// 塞数据
int CData2PS::InputData(void* data, int len)
{if (!m_ps)return -1;uint64_t clock = time64_now();if (0 == m_ps_clock)m_ps_clock = clock;return ps_muxer_input(m_ps, m_ps_stream, 0, (clock - m_ps_clock) * 90, (clock - m_ps_clock) * 90, data, len);
}
  1. 发送rtp包
// 调用jrtplib中SendPacket(data, len);接口发送数据// 以下为SendPacket部分源码
// 主要流程:
// 1. 构建packet
// 2. 发送rtp数据
int RTPSession::SendPacket(const void *data,size_t len,uint8_t pt,bool mark,uint32_t timestampinc)
{int status;if (!created)return ERR_RTP_SESSION_NOTCREATED;BUILDER_LOCKif ((status = packetbuilder.BuildPacket(data,len,pt,mark,timestampinc)) < 0){BUILDER_UNLOCKreturn status;}if ((status = SendRTPData(packetbuilder.GetPacket(),packetbuilder.GetPacketLength())) < 0){BUILDER_UNLOCKreturn status;}BUILDER_UNLOCKSOURCES_LOCKsources.SentRTPPacket();SOURCES_UNLOCKPACKSENT_LOCKsentpackets = true;PACKSENT_UNLOCKreturn 0;
}// 构建包
int RTPPacketBuilder::PrivateBuildPacket(const void *data,size_t len,uint8_t pt,bool mark,uint32_t timestampinc,bool gotextension,uint16_t hdrextID,const void *hdrextdata,size_t numhdrextwords)
{RTPPacket p(pt,data,len,seqnr,timestamp,ssrc,mark,numcsrcs,csrcs,gotextension,hdrextID,(uint16_t)numhdrextwords,hdrextdata,buffer,maxpacksize,GetMemoryManager());int status = p.GetCreationError();if (status < 0)return status;packetlength = p.GetPacketLength();if (numpackets == 0) // first packet{lastwallclocktime = RTPTime::CurrentTime();lastrtptimestamp = timestamp;prevrtptimestamp = timestamp;}else if (timestamp != prevrtptimestamp){lastwallclocktime = RTPTime::CurrentTime();lastrtptimestamp = timestamp;prevrtptimestamp = timestamp;}numpayloadbytes += (uint32_t)p.GetPayloadLength();numpackets++;timestamp += timestampinc;seqnr++;return 0;
}// 发送包
int RTPSession::SendRTPData(const void *data, size_t len)
{if (!m_changeOutgoingData)return rtptrans->SendRTPData(data, len);void *pSendData = 0;size_t sendLen = 0;int status = 0;status = OnChangeRTPOrRTCPData(data, len, true, &pSendData, &sendLen);if (status < 0)return status;if (pSendData){status = rtptrans->SendRTPData(pSendData, sendLen);OnSentRTPOrRTCPData(pSendData, sendLen, true);}return status;
}// 底层实现
int RTPUDPv4Transmitter::SendRTPData(const void *data,size_t len)	
{if (!init)return ERR_RTP_UDPV4TRANS_NOTINIT;MAINMUTEX_LOCKif (!created){MAINMUTEX_UNLOCKreturn ERR_RTP_UDPV4TRANS_NOTCREATED;}if (len > maxpacksize){MAINMUTEX_UNLOCKreturn ERR_RTP_UDPV4TRANS_SPECIFIEDSIZETOOBIG;}destinations.GotoFirstElement();while (destinations.HasCurrentElement()){// 调用sendto函数实现udp包的发送sendto(rtpsock,(const char *)data,len,0,(const struct sockaddr *)destinations.GetCurrentElement().GetRTPSockAddr(),sizeof(struct sockaddr_in));destinations.GotoNextElement();}MAINMUTEX_UNLOCKreturn 0;
}

tcp passive发流

流程图

在这里插入图片描述

发送端流程:

  • 上级平台或sip服务器以主动方式连接,对于下级平台或者设备(数据发送端)为被动方式;
  • 下级平台或者设备(数据发送端)启动端口监听;
  • 接收上级平台或sip服务器tcp连接请求;
  • 向上级平台或sip服务器发送流数据;

设计

  1. 创建socket、bind、listen,启动数据接收线程;
// TcpServer为封装的socket类int CGBTcpServer::Start()
{if (0 != m_localPort || m_tcpServer.get())return 0;int ret = -1;do {m_tcpServer = std::make_shared<TcpServer>(nullptr, this);if (!m_tcpServer.get())break;ret = m_tcpServer->TcpCreate();if (0 != ret)break;ret = m_tcpServer->TcpBind(m_localPort);if (0 != ret)break;ret = m_tcpServer->TcpListen(5);if (0 != ret)break;m_thread = std::thread(TCPData2PSThread, this);return 0;} while (0);Stop();return ret;
}
  1. 在线程内等待连接,连接成功后接收数据并回调至应用层处理
void CGBTcpServer::TCPData2PSWorker()
{if (!m_pspacker)m_pspacker = new(std::nothrow) CData2PS(PSTCPDataCB, this);bool bAccept = false;while (m_running){if (!bAccept){if (0 == m_tcpServer->TcpAccept()){bAccept = true;if (0 != InitRtp_()){break;}}continue;}std::this_thread::sleep_for(std::chrono::seconds(1));}
}
  1. 初始化rtp参数
int CGBTcpServer::InitRtp_()
{const int packetSize = 45678;RTPSessionParams sessionparams;sessionparams.SetProbationType(RTPSources::NoProbation);sessionparams.SetOwnTimestampUnit(1.0 / packetSize);sessionparams.SetMaximumPacketSize(packetSize + 64);m_rtpTcpTransmitter = new RTPTCPTransmitter(nullptr);m_rtpTcpTransmitter->Init(true);m_rtpTcpTransmitter->Create(65535, 0);int status = Create(sessionparams, m_rtpTcpTransmitter);if (status < 0){return status;}status = AddDestination(RTPTCPAddress(m_tcpServer->GetClientSocket()));if (0 != status)return status;SetDefaultPayloadType(96);SetDefaultMark(false);SetDefaultTimestampIncrement(160);return 0;
}
  1. 将数据复用为PS;
  2. tcp方式发包
// 调用jrtplib中SendPacket(data, len);接口发送数据// 以下为tcp方式SendPacket部分源码
int RTPTCPTransmitter::SendRTPData(const void *data,size_t len)	
{return SendRTPRTCPData(data, len);
}int RTPTCPTransmitter::SendRTPRTCPData(const void *data, size_t len)
{if (!m_init)return ERR_RTP_TCPTRANS_NOTINIT;MAINMUTEX_LOCKif (!m_created){MAINMUTEX_UNLOCKreturn ERR_RTP_TCPTRANS_NOTCREATED;}// #define RTPTCPTRANS_MAXPACKSIZE							65535if (len > RTPTCPTRANS_MAXPACKSIZE){MAINMUTEX_UNLOCKreturn ERR_RTP_TCPTRANS_SPECIFIEDSIZETOOBIG;}std::map<SocketType, SocketData>::iterator it = m_destSockets.begin();std::map<SocketType, SocketData>::iterator end = m_destSockets.end();vector<SocketType> errSockets;int flags = 0;
#ifdef RTP_HAVE_MSG_NOSIGNALflags = MSG_NOSIGNAL;
#endif // RTP_HAVE_MSG_NOSIGNALwhile (it != end){uint8_t lengthBytes[2] = { (uint8_t)((len >> 8)&0xff), (uint8_t)(len&0xff) };SocketType sock = it->first;// 调用send接口发送数据// 1. 先发送2字节头(固定格式)// 2. 再发送数据if (send(sock,(const char *)lengthBytes,2,flags) < 0 ||send(sock,(const char *)data,len,flags) < 0)errSockets.push_back(sock);++it;}MAINMUTEX_UNLOCKif (errSockets.size() != 0){for (size_t i = 0 ; i < errSockets.size() ; i++)OnSendError(errSockets[i]);}// Don't return an error code to avoid the poll thread exiting// due to one closed connection for examplereturn 0;
}

tcp active发流

流程图

在这里插入图片描述

发送端流程:

  • 上级平台或sip服务器启动tcp监听连接,对于下级平台或者设备(数据发送端)为主动方式;
  • 下级平台或者设备(数据发送端)发起tcp连接;
  • 接收上级平台或sip服务器tcp响应;
  • 向上级平台或sip服务器发送流数据;

设计

  1. 创建socket、connect、初始化rtp,启动数据接收线程
// TcpClient为封装的客户端socket类int CGBTcpClient::Start()
{if (0 != m_localPort || m_tcpClient.get())return 0;int ret = -1;do{m_tcpClient = std::make_shared<TcpClient>(nullptr, this);if (!m_tcpClient.get() || 0 != m_tcpClient->TcpCreate())break;ret = m_tcpClient->TcpConnectByTime(m_localIP.c_str(), m_localPort, 5);if (0 != ret)break;ret = InitRtp_();if (0 != ret)break;m_thread = std::thread(RTPPackerThread, this);return 0;} while (0);Stop();return ret;
}
  1. 初始化rtp参数
int CGBTcpClient::InitRtp_()
{const int packSize = 45678;RTPSessionParams sessionParams;sessionParams.SetProbationType(RTPSources::NoProbation);sessionParams.SetOwnTimestampUnit(90000.0 / 25.0);sessionParams.SetMaximumPacketSize(packSize + 64);m_rtpTcpTransmitter = new RTPTCPTransmitter(nullptr);m_rtpTcpTransmitter->Init(true);m_rtpTcpTransmitter->Create(65535, 0);if (0 != Create(sessionParams, m_rtpTcpTransmitter))return -1;if (0 != AddDestination(RTPTCPAddress(m_tcpClient->GetClientSocket())))return -1;return 0;
}
  1. 视音频数据复用为PS
  2. 发送数据,同tcp passive发流

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/207202.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【计算机网络学习之路】TCP socket编程

文章目录 前言一. 服务器1. 初始化服务器2. 启动服务器 二. 客户端三. 多进程服务器结束语 前言 本系列文章是计算机网络学习的笔记&#xff0c;欢迎大佬们阅读&#xff0c;纠错&#xff0c;分享相关知识。希望可以与你共同进步。 本篇博客基于UDP socket基础&#xff0c;介绍…

C++ vector 使用类作为模板参数/C++多态展示

C vector 使用类作为模板参数 #include<iostream> #include<string> #include<vector>class vector3D {public:float x,y,z; vector3D(int x_,int y_,int z_):x(x_),y(y_),z(z_){}public:float add(){return (xyz);}};int main(){int *a new int[10];a[0] …

基于原子轨道搜索算法优化概率神经网络PNN的分类预测 - 附代码

基于原子轨道搜索算法优化概率神经网络PNN的分类预测 - 附代码 文章目录 基于原子轨道搜索算法优化概率神经网络PNN的分类预测 - 附代码1.PNN网络概述2.变压器故障诊街系统相关背景2.1 模型建立 3.基于原子轨道搜索优化的PNN网络5.测试结果6.参考文献7.Matlab代码 摘要&#xf…

五大资源之Service(可以固定IP)

Service可以看作是一组同类Pod对外访问接口,借助Service应用可以方便的实现服务发现与负载均衡 创建集群内部可以访问Service #暴露Service(也创建在了namespace dev下) [root@master ~]# kubectl expose deployment(pod控制器) nginx --name=svc-nginx1 --type=Cluste…

基于天鹰算法优化概率神经网络PNN的分类预测 - 附代码

基于天鹰算法优化概率神经网络PNN的分类预测 - 附代码 文章目录 基于天鹰算法优化概率神经网络PNN的分类预测 - 附代码1.PNN网络概述2.变压器故障诊街系统相关背景2.1 模型建立 3.基于天鹰优化的PNN网络5.测试结果6.参考文献7.Matlab代码 摘要&#xff1a;针对PNN神经网络的光滑…

Java架构师软件架构风格

目录 1 数据流风格1.1 管道过滤器1.2 数据流风格的优点2 调用返回风格2.1 面向对象风格2.2 调用返回风格总结3 独立构件风格3.1 事件驱动系统风格的主要特点3.2 独立构件风格总结4 虚拟机风格4.1 虚拟机风格总结5 仓库风格5.1 仓库风格总结想学习架构师构建流程请跳转:Java架构…

第一百七十六回 如何创建渐变色边角

文章目录 1. 概念介绍2. 实现方法3. 代码与细节3.1 示例代码3.2 代码细节 4. 内容总结 我们在上一章回中介绍了"如何创建放射形状渐变背景"相关的内容&#xff0c;本章回中将介绍"如何创建渐变色边角".闲话休提&#xff0c;让我们一起Talk Flutter吧。 1.…

LeetCode(32)串联所有单词的子串【滑动窗口】【困难】(含图解)

目录 1.题目2.答案3.提交结果截图4.图解 链接&#xff1a; 串联所有单词的子串 1.题目 给定一个字符串 s 和一个字符串数组 words。 words 中所有字符串 长度相同。 s 中的 串联子串 是指一个包含 words 中所有字符串以任意顺序排列连接起来的子串。 例如&#xff0c;如果 w…

sonar对webgoat进行静态扫描

安装sonar并配置 docker安装sonarqube&#xff0c;sonarQube静态代码扫描 - Joson6350 - 博客园 (cnblogs.com) 对webgoat进行sonar扫描 扫描结果 bugs Change this condition so that it does not always evaluate to "false" 意思是这里的else if语句不会执行…

计算机网络之物理层(数据通信有关)

一、概述 1.1物理层引入的目的 屏蔽掉传输介质的多样性&#xff0c;导致数据传输方式的不同&#xff1b;物理层的引入使得高层看到的数据都是统一的0,1构成的比特流 1.2.物理层如何实现屏蔽 物理层靠定义的不同的通信协议&#xff08;一般称通信规程&#xff09; 这些协议…

电动汽车充放电V2G模型MATLAB代码

微❤关注“电气仔推送”获得资料&#xff08;专享优惠&#xff09; 主要内容&#xff1a; 本程序主要建立电动汽车充放电V2G模型&#xff0c;采用粒子群算法&#xff0c;在保证电动汽车用户出行需求的前提下&#xff0c;为了使工作区域电动汽车尽可能多的消纳供给商场基础负荷…

使用paddleocr进行OCR文字识别

1 OCR介绍 OCR&#xff08;Optical Character Recognition&#xff09;即光学字符识别&#xff0c;是一种将不同类型的文档&#xff08;如扫描的纸质文件、PDF文件或图像文件中的文本&#xff09;转换成可编辑和可搜索的数据的技术。OCR技术能够识别和转换印刷或手写文字&…