基于共享内存的数据分发DDS——C语言实现
一、软件功能介绍
在linux环境下用C语言开发的基于共享内存的数据分发DDS软件。采用了共享内存、多线程、读写锁以及互斥锁实现。
软件支持功能如下:
- 内部采用共享内存进行数据传输,支持多进程、多线程的数据发布、订阅功能。
- 支持在数据发布端或订阅端使用队列功能,以适应不同的应用场景
- 集成类似rostopic、roabag命令行工具
- 支持采用cjson、protobuf-c、结构体格式数据发布订阅数据。
二、软件接口介绍
/*******************************************************************
* 函 数 名:init
* 功能描述:数据分发功能初始化
* 输入参数:无
* 输出参数:无
* 返 回 值:无
******************************************************************/
void init(void);/*******************************************************************
* 函 数 名:deinit
* 功能描述:数据分发功能销毁
* 输入参数:无
* 输出参数:无
* 返 回 值:无
******************************************************************/
void deinit(void);/*******************************************************************
* 函 数 名:create_node
* 功能描述:创建一个节点
* 输入参数:node_name 节点名称
* 输出参数:无
* 返 回 值:节点结构体指针
******************************************************************/
node_struct* create_node(const uint8_t node_name[NODE_MAX_LENGTH]);/*******************************************************************
* 函 数 名:destroy_node
* 功能描述:销毁节点
* 输入参数:node 节点结构体指针
* 输出参数:无
* 返 回 值:无
******************************************************************/
void destroy_node(node_struct* node);/*******************************************************************
* 函 数 名:show_topic_list
* 功能描述:获取所有话题
* 输入参数:node 节点结构体指针
* 输出参数:topic_list 话题名数组;topic_count 话题个数
* 返 回 值:无
******************************************************************/
void show_topic_list(node_struct* node,uint8_t topic_list[TOPIC_COUNT_MAX_SUPPORT][TOPIC_MAX_LENGTH],uint32_t* topic_count);/*******************************************************************
* 函 数 名:create_pub_with_topic
* 功能描述:创建一个发布者,并绑定到一个话题中
* 输入参数:node 节点结构体指针;topic_name 话题名;max_data_size 话题数据的最大长度单位字节;pub_rate 数据发布频率单位hz;
* 输入参数:pub_model 发布模式;queue_max_count 队列元素最大个数
* 输出参数:无
* 返 回 值:发布结构体指针
******************************************************************/
topic_pub_struct* create_pub_with_topic(node_struct* node,const uint8_t topic_name[TOPIC_MAX_LENGTH],const int max_data_size,float pub_rate,PUB_DATA_MODEL pub_model,uint32_t queue_max_count);/*******************************************************************
* 函 数 名:pub_data
* 功能描述:发布数据
* 输入参数:pub_dev 发布结构体指针;data 发布数据;data_size 发布数据大小
* 输出参数:无
* 返 回 值:成功返回0,失败返回-1
******************************************************************/
int pub_data(topic_pub_struct* pub_dev,const uint8_t* data,const size_t data_size);/*******************************************************************
* 函 数 名:destroy_pub
* 功能描述:销毁发布者
* 输入参数:pub_ptr 发布结构体指针
* 输出参数:无
* 返 回 值:无
******************************************************************/
void destroy_pub(topic_pub_struct* pub_ptr);/*******************************************************************
* 函 数 名:create_sub_with_topic
* 功能描述:创建一个订阅者,并绑定到一个话题中
* 输入参数:node 节点结构体指针;topic_name 话题名;max_data_size 话题数据的最大长度单位字节;data_process 数据处理回调函数;
* 输入参数:sub_rate 数据订阅频率单位hz;sub_model 订阅模式;queue_max_count 队列元素最大个数
* 输出参数:无
* 返 回 值:订阅结构体指针
******************************************************************/
topic_sub_struct* create_sub_with_topic(node_struct* node,const uint8_t topic_name[TOPIC_MAX_LENGTH],const int max_data_size,sub_callback_func data_process,float sub_rate,SUB_DATA_HANDLE_MODEL sub_model,uint32_t queue_max_count);/*******************************************************************
* 函 数 名:sub_data_run
* 功能描述:开始启动子线程订阅数据
* 输入参数:sub_ptr 订阅结构体指针
* 输出参数:无
* 返 回 值:成功返回0;失败返回-1
******************************************************************/
int sub_data_run(topic_sub_struct* sub_ptr);/*******************************************************************
* 函 数 名:destroy_sub
* 功能描述:销毁订阅者
* 输入参数:sub_ptr 订阅结构体指针
* 输出参数:无
* 返 回 值:无
******************************************************************/
void destroy_sub(topic_sub_struct* sub_ptr);
三、软件实现原理
首先在共享内存上申请绑定固定key的话题管理内存块,用于存储所有正在使用的话题信息。里面将话题名与共享内存块的ID一一对应。
typedef struct
{int32_t reference; //话题管理引用计数,当计数为0后表示可以释放该共享内存mutex_struct lock; //同步锁uint32_t topic_count; //已经存储的topic个数topic_struct topic[TOPIC_COUNT_MAX_SUPPORT];
}topic_manage_struct; //话题管理,所有进程都可以访问
应用程序首先从话题管理共享内存块获取话题名对应的共享内存ID(如果没有则创建,并将id与话题名填入话题管理共享内存块中)。然后开始向话题名对应的共享内存块发布或者订阅数据。
每块共享内存分为数据头块和数据块,数据头块保存当前数据的信息,包括用于同步的读写锁、当前数据的大小以及数据更新标识符。
typedef struct
{int32_t reference; //话题引用计数,当计数为0后表示可以释放该共享内存mutex_struct lock; //话题信息同步锁size_t max_data_size; //话题最大数据大小 pub与sub创建时指定的大小必须一致rwlock_struct rwlock; //数据同步锁size_t data_size; //话题当前数据大小 uint8_t data_has_flag; //数据标识,0 表示没有数据;每更新一次数据增1uint32_t data_pub_cycle; //数据发布周期
}topic_header_struct; //话题数据结构体
四、数据分发命令行工具
cdds-topic
tcdds-topic list #列出当前运行的话题
cdds-topic hz topicname #计算当前话题数据的发布的平均频率
cdds-bag
cdds-bag record topicname1 topicname2 topicname3 ... #对执行的话题进行数据录制
cdds-bag player bag_file #播放录制的文件
五、测试DEMO
基于结构体数据发布订阅
/****************************************************************
文件名称: main.c
功能描述: 基于共享内存的数据分发测试demo
创建日期: 2024-02-15
作者 : skynet
版本 : V1.0
修订记录:
***************************************************************/
#include "cshmdds.h"#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/time.h>typedef struct
{uint64_t timestamp;uint64_t sequence_num;uint8_t age;uint8_t name[9];uint8_t phone_num[12];uint8_t email[64];
}person_info_struct;static uint64_t get_time()
{struct timeval tv;gettimeofday(&tv, NULL);return tv.tv_sec * 1000000 + tv.tv_usec;
}void help_usage(void)
{printf("Usage:\n");printf("\tcdds-test pub topicname pubfreq\n");printf("\tcdds-test sub topicname subfreq\n");
}void test_sub_callback(const uint8_t* data,const size_t data_size)
{person_info_struct* person_data=(person_info_struct*)data; LOG_RECORD(LOGINFO,"sequence_num=%ld,timestamp=%ld,age=%d,name=%s,phone_num=%s,email=%s\n",person_data->sequence_num,person_data->timestamp,person_data->age,person_data->name,person_data->phone_num,person_data->email);
}int main(int argc, char* argv[])
{uint8_t pub_sub_type[4]={'\0'};uint8_t topic_name[TOPIC_MAX_LENGTH]={'\0'};float pub_or_sub_freq=0;LOG_RECORD(LOGDEBUG,"start...argc=%d\n",argc);if(argc!=4){help_usage();return -1;}else{strcpy(pub_sub_type,argv[1]);strcpy(topic_name,argv[2]);pub_or_sub_freq=atof(argv[3]);LOG_RECORD(LOGINFO,"pub_sub_type=%s\ntopic_name=%s\ndata_size=%ld\npub_or_sub_freq=%f\n",pub_sub_type,topic_name,sizeof(person_info_struct),pub_or_sub_freq);if(pub_or_sub_freq<=0){help_usage();return -1;}}init();if(strcmp("pub", pub_sub_type)==0){LOG_RECORD(LOGDEBUG,"create node\n");node_struct* tnode=create_node("pub_data");LOG_RECORD(LOGDEBUG,"create pub\n");topic_pub_struct* tpub_dev=create_pub_with_topic(tnode, topic_name, sizeof(person_info_struct),pub_or_sub_freq,DIRECT_PUB,5);if(tpub_dev==NULL){LOG_RECORD(LOGDEBUG,"no topicname create\n");deinit();return -1;}LOG_RECORD(LOGDEBUG,"begin pub data\n");int i=0;while(1){i++;person_info_struct person_data;person_data.timestamp=get_time();person_data.sequence_num=i;person_data.age=i%62+1;memset(person_data.name,'\0',sizeof(person_data.name));sprintf(person_data.name,"Bob%d",i%10);memset(person_data.phone_num,'\0',sizeof(person_data.phone_num));sprintf(person_data.phone_num,"1326874651%d",i%10);memset(person_data.email,'\0',sizeof(person_data.email));sprintf(person_data.email,"Bob%d_workspace@163.com",i%10);pub_data(tpub_dev, (uint8_t*)&person_data, sizeof(person_data));LOG_RECORD(LOGINFO,"sequence_num=%ld,timestamp=%ld,age=%d,name=%s,phone_num=%s,email=%s\n",person_data.sequence_num,person_data.timestamp,person_data.age,person_data.name,person_data.phone_num,person_data.email);}}else if (strcmp("sub", pub_sub_type)==0) {LOG_RECORD(LOGDEBUG,"create node\n");node_struct* tnode=create_node("pub_data");LOG_RECORD(LOGDEBUG,"create sub\n");topic_sub_struct* tsub_dev=create_sub_with_topic(tnode, topic_name, sizeof(person_info_struct), test_sub_callback,pub_or_sub_freq,DIRECT_SUB_HANDLE,5);if(tsub_dev==NULL){LOG_RECORD(LOGDEBUG,"no topicname create\n");deinit();return -1;}if(sub_data_run(tsub_dev)!=0){LOG_RECORD(LOGDEBUG,"cannot create sub thread\n");deinit();return -1;}LOG_RECORD(LOGDEBUG,"begin sub data\n");while(1){usleep(5000*1000);}}return 0;
}
基于cjson数据发布订阅
/****************************************************************
文件名称: main.c
功能描述: 基于共享内存的数据分发测试demo
创建日期: 2024-02-15
作者 : skynet
版本 : V1.0
修订记录:
***************************************************************/
#include "cshmdds.h"
#include "cJSON.h"#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/time.h>/* 数据格式
{
"timestamp":123123123,
"sequence_num":1,
"name":"skynet",
"age":20,
"interests":[ "read", "movie", "sports" ],
"phone_num":"13278765213",
"email":"skynet1_workspace@163.com"
}
*/static uint64_t get_time()
{struct timeval tv;gettimeofday(&tv, NULL);return tv.tv_sec * 1000000 + tv.tv_usec;
}void help_usage(void)
{printf("Usage:\n");printf("\tcdds-test pub topicname pubfreq\n");printf("\tcdds-test sub topicname subfreq\n");
}void test_sub_callback(const uint8_t* data,const size_t data_size)
{cJSON* cjson_test=cJSON_Parse(data);char* t_data=cJSON_Print(cjson_test);LOG_RECORD(LOGINFO,"\n%s\n",t_data);cJSON_Delete(cjson_test);cJSON_free(t_data);
}int main(int argc, char* argv[])
{uint8_t pub_sub_type[4]={'\0'};uint8_t topic_name[TOPIC_MAX_LENGTH]={'\0'};float pub_or_sub_freq=0;LOG_RECORD(LOGDEBUG,"start...argc=%d\n",argc);if(argc!=4){help_usage();return -1;}else{strcpy(pub_sub_type,argv[1]);strcpy(topic_name,argv[2]);pub_or_sub_freq=atof(argv[3]);LOG_RECORD(LOGINFO,"pub_sub_type=%s\ntopic_name=%s\ndata_size=%ld\npub_or_sub_freq=%f\n",pub_sub_type,topic_name,256,pub_or_sub_freq);if(pub_or_sub_freq<=0){help_usage();return -1;}}init();if(strcmp("pub", pub_sub_type)==0){LOG_RECORD(LOGDEBUG,"create node\n");node_struct* tnode=create_node("pub_data");LOG_RECORD(LOGDEBUG,"create pub\n");topic_pub_struct* tpub_dev=create_pub_with_topic(tnode, topic_name, 256,pub_or_sub_freq,DIRECT_PUB,5);if(tpub_dev==NULL){LOG_RECORD(LOGDEBUG,"no topicname create\n");deinit();return -1;}LOG_RECORD(LOGDEBUG,"begin pub data\n");int i=0;while(1){i++;cJSON* cjson_test = cJSON_CreateObject();cJSON_AddNumberToObject(cjson_test, "timestamp", get_time());cJSON_AddNumberToObject(cjson_test, "sequence_num", i);cJSON_AddStringToObject(cjson_test, "name", "skynet");cJSON_AddNumberToObject(cjson_test, "age", i%80+1);cJSON* cjson_interests=cJSON_CreateArray();cJSON_AddItemToArray(cjson_interests, cJSON_CreateString( "read" ));cJSON_AddItemToArray(cjson_interests, cJSON_CreateString( "movie" ));cJSON_AddItemToArray(cjson_interests, cJSON_CreateString( "sports" ));cJSON_AddItemToObject(cjson_test, "interests", cjson_interests);cJSON_AddStringToObject(cjson_test, "phone_num", "13278765213");cJSON_AddStringToObject(cjson_test, "email", "skynet1_workspace@163.com");char* t_pub_data=cJSON_Print(cjson_test);size_t pub_data_len=strlen(t_pub_data)+5;pub_data(tpub_dev, (uint8_t*)t_pub_data, pub_data_len);LOG_RECORD(LOGINFO,"\n%s\n",t_pub_data);cJSON_Delete(cjson_test);cJSON_free(t_pub_data);}}else if (strcmp("sub", pub_sub_type)==0) {LOG_RECORD(LOGDEBUG,"create node\n");node_struct* tnode=create_node("pub_data");LOG_RECORD(LOGDEBUG,"create sub\n");topic_sub_struct* tsub_dev=create_sub_with_topic(tnode, topic_name, 256, test_sub_callback,pub_or_sub_freq,DIRECT_SUB_HANDLE,5);if(tsub_dev==NULL){LOG_RECORD(LOGDEBUG,"no topicname create\n");deinit();return -1;}if(sub_data_run(tsub_dev)!=0){LOG_RECORD(LOGDEBUG,"cannot create sub thread\n");deinit();return -1;}LOG_RECORD(LOGDEBUG,"begin sub data\n");while(1){usleep(5000*1000);}}return 0;
}
基于protobuf-c数据发布订阅
/****************************************************************
文件名称: main.c
功能描述: 基于共享内存的数据分发测试demo
创建日期: 2024-02-15
作者 : skynet
版本 : V1.0
修订记录:
***************************************************************/
#include "cshmdds.h"#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/time.h>#include "message.pb-c.h"TestMessage test_msg = TEST_MESSAGE__INIT;static uint64_t get_time()
{struct timeval tv;gettimeofday(&tv, NULL);return tv.tv_sec * 1000000 + tv.tv_usec;
}void help_usage(void)
{printf("Usage:\n");printf("\tcdds-test pub topicname pubfreq\n");printf("\tcdds-test sub topicname subfreq\n");
}void test_sub_callback(const uint8_t* data,const size_t data_size)
{TestMessage *test_unpack = test_message__unpack(NULL, data_size,data);LOG_RECORD(LOGINFO,"\nsub size=%d;\ntimestamp=%ld\nsequence_num=%ld\nage=%d\nname=%s\nphone=%s\nemail=%s\n", data_size,test_unpack->timestamp,test_unpack->sequence_num,test_unpack->age,test_unpack->name,test_unpack->phone_num,test_unpack->email);test_message__free_unpacked(test_unpack, NULL);
}uint32_t init_message()
{uint32_t total_size=0;test_msg.timestamp=get_time();total_size+=sizeof(test_msg.timestamp);test_msg.sequence_num=0;total_size+=sizeof(test_msg.sequence_num);test_msg.age=0;total_size+=sizeof(test_msg.age);test_msg.name = (char*)calloc(1, 32);sprintf(test_msg.name,"helloworld");total_size+=32;test_msg.phone_num = (char*)calloc(1, 12);sprintf(test_msg.phone_num,"11133327651");total_size+=12;test_msg.email = (char*)calloc(1, 18);sprintf(test_msg.email,"skynet@111.com.cn");total_size+=18;return total_size;
}int main(int argc, char* argv[])
{uint8_t pub_sub_type[4]={'\0'};uint8_t topic_name[TOPIC_MAX_LENGTH]={'\0'};float pub_or_sub_freq=0;LOG_RECORD(LOGDEBUG,"start...argc=%d\n",argc);if(argc!=4){help_usage();return -1;}else{strcpy(pub_sub_type,argv[1]);strcpy(topic_name,argv[2]);pub_or_sub_freq=atof(argv[3]);LOG_RECORD(LOGINFO,"pub_sub_type=%s\ntopic_name=%s\npub_or_sub_freq=%f\n",pub_sub_type,topic_name,pub_or_sub_freq);if(pub_or_sub_freq<=0){help_usage();return -1;}}int pack_len = init_message()*3/2;LOG_RECORD(LOGINFO,"max_pack_len=%d\n",pack_len);init();if(strcmp("pub", pub_sub_type)==0){LOG_RECORD(LOGDEBUG,"create node\n");node_struct* tnode=create_node("pub_data");LOG_RECORD(LOGDEBUG,"create pub\n");topic_pub_struct* tpub_dev=create_pub_with_topic(tnode, topic_name, pack_len,pub_or_sub_freq,DIRECT_PUB,5);if(tpub_dev==NULL){LOG_RECORD(LOGDEBUG,"no topicname create\n");deinit();return -1;}LOG_RECORD(LOGDEBUG,"begin pub data\n");char *pack_buf = NULL;pack_buf = (char*)calloc(1, pack_len);int token=1;while(1){test_msg.timestamp=get_time();test_msg.sequence_num=token;test_msg.age=1+(token*23)%100;memset(test_msg.name,'\0',32);sprintf(test_msg.name,"helloworld%d",token);memset(test_msg.phone_num,'\0',12);sprintf(test_msg.phone_num,"%011d",token);memset(test_msg.email,'\0',18);sprintf(test_msg.email,"skynet@%d.com.cn",100+token%100);memset(pack_buf,'\0',pack_len);int unpack_len=test_message__pack(&test_msg, pack_buf);pub_data(tpub_dev, (uint8_t*)pack_buf, unpack_len);LOG_RECORD(LOGINFO,"\nsize=%d:\ntimestamp=%ld\nsequence_num=%ld\nage=%d\nname=%s\nphone=%s\nemail=%s\n", unpack_len,test_msg.timestamp,test_msg.sequence_num,test_msg.age,test_msg.name,test_msg.phone_num,test_msg.email);token++;}}else if (strcmp("sub", pub_sub_type)==0) {LOG_RECORD(LOGDEBUG,"create node\n");node_struct* tnode=create_node("pub_data");LOG_RECORD(LOGDEBUG,"create sub\n");topic_sub_struct* tsub_dev=create_sub_with_topic(tnode, topic_name, pack_len, test_sub_callback,pub_or_sub_freq,DIRECT_SUB_HANDLE,5);if(tsub_dev==NULL){LOG_RECORD(LOGDEBUG,"no topicname create\n");deinit();return -1;}if(sub_data_run(tsub_dev)!=0){LOG_RECORD(LOGDEBUG,"cannot create sub thread\n");deinit();return -1;}LOG_RECORD(LOGDEBUG,"begin sub data\n");while(1){usleep(5000*1000);}}return 0;
}
六、源码
目录结构
c-shm-dds
├── build #cmake 编译目录
├── build.sh #编译脚本
├── cdds #cdds源码目录
│ ├── CMakeLists.txt
│ ├── cshmdds.c
│ └── cshmdds.h
├── CMakeLists.txt
├── install #cdds编译结果目录
│ ├── bin
│ ├── include
│ └── lib
└── tools #工具目录├── cdds-bag├── cdds-demo1├── cdds-demo2├── cdds-demo3├── cdds-test└── cdds-topic
源码地址
基于C语言和linux共享内存的数据分发软件