数据通信三个层次
1.Component 是封装好的数据处理流程 2.Node Reader/Writer 或 Service/Client3.Transport 创建 Transmitter 或 Receiver
Component
Component 是封装好的数据处理流程 Dag 文件是模块拓扑关系的配置文件Launch 文件提供了一种启动模块的简单方法
Node
通过Node创建Reader或Writer进行通信
Node 是整个数据拓扑网络中的基本单元。通过在节点中定义 Reader/Writer 或 Service/Client,
cyber/node/node.h #include "cyber/node/node_channel_impl.h"#include "cyber/node/node_service_impl.h"01.class Node {public:template <typename M0, typename M1, typename M2, typename M3>friend class Component;friend class TimerComponent;auto CreateService(const std::string& service_name,const typename Service<Request, Response>::ServiceCallback&service_callback)-> std::shared_ptr<Service<Request, Response>>;auto CreateClient(const std::string& service_name)-> std::shared_ptr<Client<Request, Response>>; CreateService DeleteReader Node::CreateReader02.cyber/node/node_channel_impl.hauto CreateWriter(const std::string& channel_name)-> std::shared_ptr<Writer<MessageT>>; auto CreateReader(const ReaderConfig& config, const CallbackFunc<MessageT>& reader_func)-> std::shared_ptr<Reader<MessageT>>;
cyber/node/node_service_impl.h03.cyber/node/writer.hclass Writer : public WriterBase
cyber/node/reader.h class Reader : public ReaderBase04.cyber/node/reader_base.hclass ReaderBase { class ReceiverManager
cyber/node/writer_base.hclass WriterBase {cyber/service/client.hclass Client : public ClientBase {
cyber/service/client_base.h class ClientBase {cyber/service/service.hclass Service : public ServiceBase {
cyber/service/service_base.h class ServiceBase {
数据通信-数据传输层
transport 工厂模式 -通过 Transport 创建 Transmitter 或 Receiver 进行通信cyber/transport/transport.hauto Transport::CreateTransmittertransmitter = std::make_shared<IntraTransmitter<M>>(modified_attr);transmitter = std::make_shared<ShmTransmitter<M>>(modified_attr);std::make_shared<RtpsTransmitter<M>>(modified_attr, participant());std::make_shared<HybridTransmitter<M>>(modified_attr, participant());auto Transport::CreateReceiverstd::make_shared<IntraReceiver<M>>(modified_attr, msg_listener);std::make_shared<ShmReceiver<M>>(modified_attr, msg_listener);std::make_shared<RtpsReceiver<M>>(modified_attr, msg_listener);std::make_shared<HybridReceiver<M>>( modified_attr, msg_listener, participant());IntraDispatcher::Instance(); ShmDispatcher::Instance(); RtpsDispatcher::Instance();
Transmitter和Receiver各有三个派生类,对应Cyber的三种数据传输方式 ,分别是进程内(Intra) 进程间(Shm))以及网络(RTPS) cyber/transport/transmitter/transmitter.hclass IntraTransmitter : public Transmitter<M> class ShmTransmitter : public Transmitter<M> class RtpsTransmitter : public Transmitter<M> class HybridTransmitter : public Transmitter<M>cyber/transport/receiver/receiver.h class Receiver : public Endpoint class IntraReceiver : public Receiver<M>class ShmReceiver : public Receiver<M>class RtpsReceiver : public Receiver<M>class HybridReceiver : public Receiver<M>cyber/transport/dispatcher/dispatcher.h class IntraDispatcher : public Dispatcherclass ShmDispatcher : public Dispatcherclass RtpsDispatcher : public Dispatcherclass SubscriberListener : public eprosima::fastdds::dds::SubscriberListener
01.进程内 Intra 传输使用的是函数直接调用(回调)的方式 1.上层 Writer ---> IntraTransmitter ---> IntraDispatcher--->(回调)IntraReceiver ---> (回调)上层Reader。
02.进程间 Shm (本机)传输是通过共享内存辅助实现。链用链条是: 1、上层Writer---> Segment(共享内存)和Notifier(发送通知)2、ShmDispatcher(有独立线程)---> (主动读取)Segment---> (回调)上层Reader。03.主机间-Rtps(路网络)传输是通过RTPS(DDS)实现。链用链条是:1、上层Writer--->RtpsTransmitter 打包成protobuf---> fastrtps发送到网络。2、fastrtps接收到网络报文---> (回调)RtpsDispatcher ---> (回调)RtpsReceiver---> (回调)上层Reader。通知--工程模式--NotifierFactorynotifier_ = NotifierFactory::CreateNotifier();notifier_type = g_conf.transport_conf().shm_conf().notifier_type();NotifierFactory::CreateConditionNotifier()NotifierFactory::CreateMulticastNotifier()数据分发器 DataDispather 和数据访问器 DataVistor DataNotifier 用于唤醒协程任务DataDispatcher 进行消息分发, DataVisitor 允许用户自定义通道和缓冲区大小
ChannelBuffer 实现缓冲区管理,
数据通信-service_discovery
cyber/service_discovery/topology_manager.h Participant 类封装fastrtps接口成员 participant_ 是 Participant实例Manager 基于 Participant 管理 topology消息的收发NodeManager ChannelManager ServiceManager 并有共同的基类Manager TopologyManager 初始化三个子管理器 - NodeManager 用于管理网络拓扑中的节点。- ChannelManager用于管理channel,即网络拓扑中的边。- ServiceManager用于管理Service和Client。
cyber/service_discovery/topology_manager.cc node_manager_ = std::make_shared<NodeManager>();channel_manager_ = std::make_shared<ChannelManager>();service_manager_ = std::make_shared<ServiceManager>();class NodeManager : public Manager {class ChannelManager : public Manager { friend class TopologyManager;class ServiceManager : public Manager {
参数-parameter
cyber/parameter/parameter.hParameter::Name()Parameter::Type() TypeName() FromProtoParam ToProtoParamParameter::value() 通道 发送或接收消息。服务 是节点之间通信的另一种方式。 服务实现双向通信,例如节点通过发送请求获得响应。参数 都是apollo::cyber::Parameter 对象 它负责管理系统配置和参数。cyber/parameter/parameter_client.h #include "cyber/node/node.h" class ParameterClient{ GetParameter SetParameter ListParameterscyber/parameter/parameter_server.h#include "cyber/service/service.h" class ParameterServer { GetParameter SetParameter ListParameterscyber/parameter/parameter_service_names.h "get_parameter"; "set_parameter"; list_parameters";
Cyber RT
目前对整理的框架有了粗浅的了解,更进一步处理对源码进行处理基本概念--数据通信-任务调度-变化通知两部分proto ,一部分放在了 cyber/proto/,另外一部分放在了各个模块modules/common_msgs/sensor_msgs/pointcloud.protomodules/common_msgs/sensor_msgs/sensor_image.protomodules/data/proto/frame.proto
proto数据
cyber/proto/cyber_conf.proto
message CyberConfig {optional SchedulerConf scheduler_conf = 1;optional TransportConf transport_conf = 2;optional RunModeConf run_mode_conf = 3;optional PerfConf perf_conf = 4;
}cyber/proto/component_conf.protoComponentConfigTimerComponentConfig {
cyber/proto/dag_conf.protoComponentInfo TimerComponentInfoModuleConfig DagConfig
cyber/proto/qos_profile.proto cyber/proto/transport_conf.proto cyber/proto/role_attributes.proto
cyber/proto/topology_change.protocyber/proto/scheduler_conf.proto
cyber/proto/choreography_conf.proto
cyber/proto/classic_conf.protocyber/proto/run_mode_conf.proto
cyber/proto/record.protocyber/proto/proto_desc.proto
cyber/proto/parameter.proto
cyber/proto/perf_conf.proto
自动驾驶
Apollo的bridge模块UDP数据的格式。Apollo的bridge模块通过UDP协议发送的基本数据被称为帧,其由首部和数据组成。在UDPBridgeSenderComponent<T>::Proc函数当中,protobuf数据首先进行序列化,然后将其根据参数FRAME_SIZE分割为一个个大小相同的数据块,为每个数据块添加首部后,就组成了一个个的帧,然后将所有的帧发送发送cyber_launch start /apollo/modules/bridge/launch/bridge_receiver.launch
参考
https://apollo.baidu.com/docs/apollo/latest/rtps__transmitter_8h.html
https://blog.csdn.net/deyili/article/details/120327777
自动驾驶开发入门(三)---浅谈Apollo Cyber RT中的Transport
Apollo 应用与源码分析:CyberRT-服务通信与参数服务器