C++流媒体服务器 ZLMediaKit框架ZLToolKit源码解读

ZLMediaKit是国人开发的开源C++流媒体服务器,同SRS一样是主流的流媒体服务器。
ZLToolKit是基于C++11的高性能服务器框架,和ZLMediaKit是同一个作者,ZLMediaKit正是使用该框架开发的。

ZLMediaKit开源地址:https://github.com/ZLMediaKit/ZLMediaKit
ZLToolKit开源地址:https://github.com/ZLMediaKit/ZLToolKit

推荐ZLToolKit的理由
1、基于C++11,大量使用C++11新特性,如智能指针、lambda表达式等,安全性高,是高度运用C++特性的框架。
2、ZLMediaKit是应用ZLToolKit开发的,可以看到框架的使用实例,且ZLMediaKit流媒体服务器被全世界开发者使用,相当于是在测试ZLToolKit框架,因此框架的实用性和稳定性很高。
3、看过一些github上star数量很高的C++服务器框架,功能模块大同小异,但ZLT是把C++发挥到极致的框架。

目录

  • ZLToolKit源码框架
    • Thread
    • Poller
    • Network
    • Util
  • ZLToolKit源码测试
    • EventPollerPool事件循环线程池测试
    • WorkThreadPool工作线程池
    • Timer定时器测试
    • TcpClient测试
    • ThreadPool任务线程池测试
    • ResourcePool内存池测试
    • stampthread时间戳线程测试
    • NoticeCenter广播中心测试
    • onceToken测试
    • Any数据结构测试
    • function_traits测试
    • ObjectStatistic类统计测试

ZLToolKit源码框架

主要分为Thread、Poller、Network、Util四大部分。

Thread

semaphore.h(自定义信号量,封装类,由条件变量实现)

class semaphore,接口:post、wait。

TaskExecutor.h(cpu负载计算,Task函数指针模板,任务执行器管理,管理任务执行线程池)

class ThreadLoadCounter,cpu负载计算器,基类,统计线程每一次的睡眠时长和工作时长,并记录样本,调用load计算cpu负载=工作时长/总时长。
class TaskCancelable : public noncopyable,抽象类,可取消任务基类。
class TaskCancelableImp<R(ArgTypes…)> : public TaskCancelable,函数指针模板,event poller async 任务、DelayTask 任务等均使用该类型,任务可取消,重载()运算符执行任务,根据返回值类型,返回默认返回值。
class TaskExecutorInterface,抽象类,提供任务执行接口:async、async_first、sync、sync_first。
class TaskExecutor : public ThreadLoadCounter, public TaskExecutorInterface,任务执行器,抽象类,无新增接口。
class TaskExecutorGetter,获得任务执行器,抽象类,接口:getExecutor、getExecutorSize。
class TaskExecutorGetterImp : public TaskExecutorGetter,实现抽象类接口,提供接口:getExecutorLoad(cpu负载)、for_each(遍历所有线程)、addPoller(创建 EventPoller 线程池)。

TaskQueue.h(由信号量控制的任务队列,加了线程锁,线程安全)

class TaskQueue,接口:push_task、push_exit、get_task、size。

threadgroup.h(线程组管理,创建线程,移除线程)

class thread_group,成员:_threads(umap存储线程组),接口:create_thread、remove_thread、is_thread_in、join_all、size。

ThreadPool.h(线程池任务管理,管理线程组执行任务队列)

class ThreadPool : public TaskExecutor,成员:thread_group、TaskQueueTask::Ptr,接口:start(启动线程池)、async(异步加入任务到队列)。

WorkThreadPool.h(创建一个工作线程池,可以加入线程负载均衡分配算法,类似EventPollerPool)

class WorkThreadPool : public TaskExecutorGetterImp,接口:getPoller、getFirstPoller、setPoolSize。

Poller

Pipe.h(管道对象封装)

class Pipe,成员:std::shared_ptr、EventPoller::Ptr _poller。

PipeWrap.h(管道的封装,windows下由socket模拟)

class PipeWrap,成员:int _pipe_fd[2],接口:write、read。

SelectWrap.h(select 模型的简单封装)

class FdSet

Timer.h(定时器对象)

class Timer,成员:EventPoller::Ptr(引用),构造函数传参超时时长和超时回调函数,EventPoller 选传或自动获取,超时回调在 EventPoller 线程执行。

EventPoller.h(基于epoll事件轮询模块)

class EventPoller : public TaskExecutor, public AnyStorage,基于epoll,可监听fd网络事件,async管道触发执行异步任务,doDelayTask定时器回调任务,runLoop执行事件循环体,添加/删除/修改监听事件,_event_map<网络fd和管道fd,回调>,_delay_task_map<延迟触发时间,回调>,_list_task<异步任务列表Task>。
class EventPollerPool :public TaskExecutorGetterImp,管理 EventPoller 线程池,可创建多个 EventPoller 线程,使用cpu负载均衡算法均匀分配线程,getPoller-》getExecutor 获得线程池内cpu负载最低的 EventPoller 线程。

Network

Buffer.h

class Buffer : public noncopyable,缓存抽象类,纯虚函数:data、size、toString、getCapacity,成员:ObjectStatistic对象个数统计。
class BufferOffset : public Buffer,成员:typename _data,构造函数传参offset,data获取+offset偏移的buffer。
class BufferRaw : public Buffer,成员:char *_data,接口:setCapacity分配,assign赋值,指针式缓存,根据分配内存大小自动扩减容。
class BufferLikeString : public Buffer,成员:std::string _str,接口:erase、append、push_back、insert、assign、clear、capacity、reserve、resize、empty、substr等,字符串操作缓存。

BufferSock.h

class BufferSock : public Buffer,成员:Buffer::Ptr _buffer、sockaddr_storage,管理_buffer指向的缓存。
class BufferList : public noncopyable,抽象类,接口:create、empty、count、send。
内部类
class BufferCallBack,成员:BufferList::SendResult回调函数,List<std::pair<Buffer::Ptr, bool> > 缓存列表,接口:sendFrontSuccess、sendCompleted,发送结果回调。
class BufferSendMsg final : public BufferList, public BufferCallBack,成员:_remain_size(剩余字节数)、_iovec(data和len组成的vector)、_iovec_off(_iovec当前发送下标),接口:send、send_l(执行系统调用sendmsg),socket发送数据时的buffer封装,用于tcp发送。
class BufferSendTo final: public BufferList, public BufferCallBack,接口:send(执行系统调用::sendto和::send)。
class BufferSendMMsg : public BufferList, public BufferCallBack,和 BufferSendMsg 类似,用于udp发送。

Server.h

class SessionMap,成员:std::unordered_map<std::string, std::weak_ptr >,管理Session,add、del、get。
class SessionHelper,成员:Session::Ptr、SessionMap::Ptr、Server,记录session至全局的map,方便后面管理。
class Server : public mINI,成员:EventPoller::Ptr,初始化设置EventPoller线程。

Session.h

class TcpSession : public Session
class UdpSession : public Session
class Session : public SocketHelper,成员:std::unique_ptr<toolkit::ObjectStatistictoolkit::TcpSession >、std::unique_ptr<toolkit::ObjectStatistictoolkit::UdpSession >,用于存储一对客户端与服务端间的关系。

Socket.h

typedef enum ErrCode,自定义socket错误枚举。
class SockException : public std::exception,成员:ErrCode,错误信息类,用于抛出系统和自定义异常,接口:what、getErrCode、getCustomCode、reset。
typedef enum SockType,socket类型,udp、tcp、tcpserver。
class SockNum,成员:int _fd、SockType _type,析构时关闭socket。
class SockFD : public noncopyable,成员:SockNum、EventPoller,文件描述符fd的封装,析构时停止事件监听,关闭socket。
class MutexWrapper,接口:lock、unlock,线程锁的封装,默认使用递归锁recursive_mutex。
class SockInfo,抽象类,接口:get_local_ip、get_local_port、get_peer_ip、get_peer_port、getIdentifier。
class Socket : public noncopyable, public SockInfo,成员:SockFD、EventPoller(网络事件触发和异步执行在此线程),异步IO Socket对象,包括tcp客户端、服务器和udp套接字,包含:错误回调、接收数据回调、tcp服务监听进入回调,connect、listen、send等接口的封装。
class SockSender,抽象类,接口:send、shutdown,重载运算符<<发送数据,定义socket发送接口。
class SocketHelper : public SockSender, public SockInfo, public TaskExecutorInterface,抽象类,成员:Socket、EventPoller,主要是对Socket类的二次封装,自定义类继承该类,实现纯虚接口即可创建一个完整的socket类,比如tcpclient。
class SockUtil,套接字工具类,封装了socket、网络的一些基本操作,提供静态全局接口,比如connect、listen等。
class TcpClient : public SocketHelper,抽象类,Tcp客户端,自定义类继承与该类,实现onConnect、onManager回调即可创建一个可运行的tcp客户端。
class TcpServer : public Server,可配置的TCP服务器。
class UdpServer : public Server,可配置的UDP服务器。

Util

NoticeCenter.h(通知中心)

class EventDispatcher,成员:std::unordered_multimap<void *, Any>(first指针,多个对象监听相同事件传的指针必须不同,second是监听该事件的回调),recursive_mutex,事件分发器,监听同一个事件的回调。
class NoticeCenter,成员:std::unordered_map<std::string, EventDispatcher::Ptr>(first事件名,second分发器),recursive_mutex,接口:emitEvent,addListener,delListener,广播中心,全局单例。

ResourcePool.h(资源池)

class shared_ptr_imp : public std::shared_ptr,对智能指针封装,增加接口:quit,放弃或回收到资源池。
class ResourcePool_l,成员:std::vector<C *> _objs(C对象指针内存数组),std::atomic_flag _busy(原子锁,线程安全),接口:obtain、obtain2,内存池功能实现。
class ResourcePool,成员:std::shared_ptr<ResourcePool_l> pool,接口:obtain、obtain2,封装内存池对外接口。

mini.h(读写配置文件)

class mINI_basic : public std::map<key, variant>,接口:parseFile(解析配置文件)、dumpFile(保存配置文件),实际上是个map,保存的是配置文件键值对。
struct variant : public std::string,把任何配置项按 std::string 字符串处理。
using mINI = mINI_basic<std::string, variant>,mINI::Instance() 是全局单例对象,管理<key,value>配置项。

CMD.h(命令行参数解析)

class Option,选项类,成员:_short_opt(短选项名)、_long_opt(长选项名)、_des(描述)、_default_value(默认值)、_cb(回调)、_type(参数类型)。
class OptionParser,选项解析类,成员:Option _helper(初始化帮助选项)、std::map<char, int> _map_char_index(短选项名映射)、std::map<int, Option> _map_options(选项映射),接口:重载 operator<< 增加选项,delOption 删除选项。
class CMD : public mINI,成员:std::shared_ptr _parser,接口:重载 operator() 解析命令行参数,hasKey(是否存在key),splitedVal(按分隔符分隔字符串)。
class CMDRegister,全局单例对象,成员:std::map<std::string, std::shared_ptr > _cmd_map,接口:registCMD,宏:GET_CMD、CMD_DO、REGIST_CMD。

ZLToolKit源码测试

在tests/文件夹中有作者写的测试程序,这里记录我对框架关键模块的测试。

EventPollerPool事件循环线程池测试

框架的核心是 EventPoller 事件循环线程,由 EventPollerPool 管理多个 EventPoller 组成的线程池。
在这里插入图片描述

EventPollerPool,获取一个可用的线程池。
EventPollerPool 是全局单例对象,用来管理多个 EventPoller 组成的线程池,后者是基于 epoll 实现的线程。
EventPoller 对象 只能在 EventPollerPool 中构造,EventPollerPool 管理 EventPoller 线程池。
EventPollerPool 负责创建和管理 EventPoller 对象, 可获取当前EventPoller线程,最低负荷EventPoller线程,第一个EventPoller线程等,也可以自定义规则获取EventPoller线程对象。

EventPoller 线程主要处理:定时器(Timer)、异步任务(async)、网络事件(socket),epoll 监听管道fd和 socket fd 事件,加入 _event_map<fd,CB>。

1、定时器(Timer):可由任意线程调用,线程安全,异步加入延时任务队列 _delay_task_map<触发时间(ms),Task> ,距离最近触发定时器时间传入 epoll_wait 的超时时间,每次循环检测定时器队列,触发回调。
2、异步任务(async):可由任意线程调用,任务加入 _list_task 队列,有锁线程安全,并通过写管道 _pipe 唤醒epoll线程执行任务。
3、网络事件(socket):EventPoller 智能指针可以和 socket 绑定,监听处理fd接收/断开/错误等网络事件;socket 数据发送可以在单线程或任意线程进行(enable_mutex),线程锁 _mtx_sock_fd 。

#ifndef TEST_EVENTPOLLERPOOL_H
#define TEST_EVENTPOLLERPOOL_H#include <csignal>
#include <iostream>
#include "Util/logger.h"
#include "Network/TcpClient.h"
using namespace std;
using namespace toolkit;
void test_EventPollerPool() {//全局单例,获取实例即执行构造,addPoller 创建线程池,线程保存在其基类成员: std::vector<TaskExecutor::Ptr> _threads;//默认创建线程个数=CPU个数,也可以 setPoolSize 设置线程个数EventPollerPool::Instance();//从线程池返回一个 EventPoller 线程的智能指针,增加其引用计数//可以选择优先返回当前线程,或返回最低负荷线程std::shared_ptr<EventPoller> poller1 = EventPollerPool::Instance().getPoller();std::shared_ptr<EventPoller> poller2 = EventPollerPool::Instance().getPoller();printf("use_count=%ld\n",poller1.use_count());//use_count=3printf("main threadid=%ld\n",pthread_self());//异步执行,可以在任意线程调用,lambda 表达式在 EventPoller 线程异步执行//通过 lambda 表达式传参,把 lambda 这个匿名函数加入 EventPoller 线程的 List<Task::Ptr> _list_task 任务队列,Task 无参无返回值。int num = 15;poller1->async([num](){printf("poller1 threadid=%ld,num=%d\n",pthread_self(),num);});//定时器(Timer)参考: test_timer.h//网络事件(socket)参考: TestClient.h/*** 打印:* use_count=3* main threadid=139907182602176* poller1 threadid=139907174205184,num=15*///退出程序事件处理static semaphore sem;signal(SIGINT, [](int) { sem.post(); });// 设置退出信号sem.wait();
}
#endif // TEST_EVENTPOLLERPOOL_H

WorkThreadPool工作线程池

WorkThreadPool,获取一个可用的线程池。
WorkThreadPool 是全局单例对象,和 EventPollerPool 功能几乎一致,都是管理 EventPoller 所组成的线程池。
EventPollerPool 为了线程安全,支持优先返回当前线程,也可以选择返回最低负荷线程; WorkThreadPool 只返回最低负荷线程。

EventPollerPool 通常用于实时性较高的业务,比如定时器、fd网络事件等,该线程不应该被耗时业务阻塞。
ZLM 使用 WorkThreadPool 主要用于文件读写、DNS解析、mp4关闭等耗时的工作,完成后再通过 EventPollerPool::async 切回自己的线程。

Timer定时器测试

测试定时器
Ticker类:可以统计代码执行时间,一般的计时统计。
Timer类:定时器类,构造函数传参超时时长、回调函数等,根据回调函数判断是否重复下次任务,回调函数在event poller线程异步执行。
程序运行5个线程:stamp thread、QT_ZLToolKit、async log、event poller 0、event poller 1。

调用栈

1、添加定时器(任意线程):Timer::Timer-》EventPoller::doDelayTask-》async_first(异步执行,把定时器任务添加到 _delay_task_map)-》EventPoller::async_l(_list_task.emplace_front)。
2、定时器超时:EventPoller::runLoop-》EventPoller::getMinDelay-》EventPoller::flushDelayTask-》(*(it->second))(),在这里执行 Timer 构造函数第二个参数传的回调函数。

实现原理
创建定时器时把延迟触发时间和回调函数传参加入 _delay_task_map,时间会加上当前时间(now+delay_ms),event poller 线程轮询所有定时器,比较当前时间now与上述 _delay_task_map 里的时间,超时则执行回调函数。

实现技巧
1、先看 event poller 线程的唤醒,如果是网络事件(比如接收tcp数据)可以直接唤醒 epoll_wait ,新加入异步任务是通过管道唤醒;
2、Timer 定时器任务则是依赖 event poller 线程轮询检测是否超时,那多久检测一次(也就是 epoll_wait 超时时间)?这里在每次执行 getMinDelay 时会把最近定时器超时时长作为返回值,传参给 epoll_wait,下次线程唤醒时正好最近的定时器超时,执行任务;
3、这样既保证线程不会过度轮询浪费cpu资源,也可以保证定时器任务能尽快执行。
4、EventPoller::doDelayTask 加入 _delay_task_map 是在 event poller 线程异步执行,通过写管道唤醒 event poller 线程,可以刷新 minDelay=getMinDelay 时间,也就是下次唤醒 epoll_wait 的时间。

#ifndef TEST_TIMER_H
#define TEST_TIMER_H#include <csignal>
#include <iostream>
#include "Util/util.h"
#include "Util/logger.h"
#include "Util/TimeTicker.h"
#include "Poller/Timer.h"using namespace std;
using namespace toolkit;
void test_timer() {//设置日志Logger::Instance().add(std::make_shared<ConsoleChannel>());Logger::Instance().setWriter(std::make_shared<AsyncLogWriter>());Ticker ticker0;Timer::Ptr timer0 = std::make_shared<Timer>(0.5f,[&](){TraceL << "timer0重复:" << ticker0.elapsedTime();ticker0.resetTime();return true;}, nullptr);Timer::Ptr timer1 = std::make_shared<Timer>(1.0f,[](){DebugL << "timer1不再重复";return false;},nullptr);Ticker ticker2;Timer::Ptr timer2 = std::make_shared<Timer>(2.0f,[&]() -> bool {InfoL << "timer2,测试任务中抛异常" << ticker2.elapsedTime();ticker2.resetTime();throw std::runtime_error("timer2,测试任务中抛异常");},nullptr);//退出程序事件处理static semaphore sem;signal(SIGINT, [](int) { sem.post(); });// 设置退出信号sem.wait();
}
#endif // TEST_TIMER_H

TcpClient测试

自定义类,继承于TcpClient,并重写onConnect、onRecv等虚函数,根据需求实现相应功能
程序运行5个线程:stamp thread、QT_ZLToolKit、async log、event poller 0、event poller 1

调用栈:
发起连接(主线程或其他线程):test_TcpClient->TcpClient::startConnect->Socket::connect,_poller->async加入 event poller 线程异步执行连接任务。
异步执行连接(event poller 线程):EventPoller::runLoop->EventPoller::onPipeEvent->Socket::connect->Socket::connect_l->async_con_cb(SockUtil::connect)->::connect,开始连接。
这里使用的是非阻塞连接,connect返回EINPROGRESS则表示正在连接,async_con_cb->_poller->addEvent给fd添加可写事件,添加成功则说明连接成功。

连接结果回调(eventpoller线程):async_con_cb-》Socket::onConnected(Socket::attachEvent 添加epoll监听读事件,监听接收数据)-》con_cb-》con_cb_in-》TcpClient::onSockConnect-》TestClient::onConnect。

数据接收回调:TcpClient::onSockConnect-》Socket::setOnRead-》_on_read=TestClient::onRecv
数据接收(eventpoller线程):EventPoller::runLoop-》epoll_wait监听到事件-》Socket::onRead-》_on_read-》TestClient::onRecv。

数据发送:demo里 TcpClient::startConnect 会创建定时器:_timer,每隔2秒回调一次 TestClient::onManager ,执行数据发送,定时器超时回调是在 event poller 线程,socket跨线程安全,线程锁:Socket::_mtx_sock_fd。
EventPoller::runLoop->EventPoller::getMinDelay->EventPoller::flushDelayTask(_timer 定时器超时)->TestClient::onManager->SockSender::<< ->SockSender::send->SocketHelper::send->Socket::send->Socket::send_l->Socket::flushAll->Socket::flushData->BufferSendMsg::send->BufferSendMsg::send_l->sendmsg(系统调用)。

小结
发起tcp连接可以在任意线程,非阻塞连接任务是在 event poller 线程异步执行,连接成功会添加 epoll 事件监听数据接收,发送数据可以在任意线程,使用线程锁保证线程安全。

#ifndef TESTCLIENT_H
#define TESTCLIENT_H#include <csignal>
#include <iostream>
#include "Util/logger.h"
#include "Network/TcpClient.h"
using namespace std;
using namespace toolkit;
class TestClient: public TcpClient {
public:using Ptr = std::shared_ptr<TestClient>;TestClient():TcpClient() {DebugL;}~TestClient(){DebugL;}
protected:virtual void onConnect(const SockException &ex) override{//连接结果事件InfoL << (ex ?  ex.what() : "success");}virtual void onRecv(const Buffer::Ptr &pBuf) override{//接收数据事件DebugL << pBuf->data() << " from port:" << get_peer_port();}virtual void onFlush() override{//发送阻塞后,缓存清空事件DebugL;}virtual void onError(const SockException &ex) override{//断开连接事件,一般是EOFWarnL << ex.what();}virtual void onManager() override{//定时发送数据到服务器auto buf = BufferRaw::create();if(buf){buf->assign("[BufferRaw]\0");(*this) << _nTick++ << " "<< 3.14 << " "<< string("string") << " "<<(Buffer::Ptr &)buf;}}
private:int _nTick = 0;
};int test_TcpClient() {// 设置日志系统Logger::Instance().add(std::make_shared<ConsoleChannel>());Logger::Instance().setWriter(std::make_shared<AsyncLogWriter>());TestClient::Ptr client(new TestClient());//必须使用智能指针client->startConnect("192.168.3.64",9090);//连接服务器//    TcpClientWithSSL<TestClient>::Ptr clientSSL(new TcpClientWithSSL<TestClient>());//必须使用智能指针
//    clientSSL->startConnect("192.168.3.64",9090);//连接服务器//退出程序事件处理static semaphore sem;///SIGINT:Ctrl+C发送信号,结束程序signal(SIGINT, [](int) { sem.post(); });// 设置退出信号sem.wait();return 0;
}
#endif // TESTCLIENT_H

ThreadPool任务线程池测试

线程池,可以输入functional任务至后台线程执行。

构造函数中创建线程组 thread_group _thread_group,从任务队列 TaskQueueTask::Ptr _queue 获取任务在线程池执行。

线程池中所有线程共用一个任务队列 _queue,ThreadPool 无锁,但 _queue 中有锁,目前ZLM中没有 ThreadPool 应用。

#ifndef TEST_THREADPOOL_H
#define TEST_THREADPOOL_H#include <chrono>
#include "Util/logger.h"
#include "Util/onceToken.h"
#include "Util/TimeTicker.h"
#include "Thread/ThreadPool.h"using namespace std;
using namespace toolkit;/*** @brief thread_group :线程组,移植自boost。* create_thread 快速创建一组线程,并指定参数和线程处理函数。*/
int test_ThreadPool() {//初始化日志系统Logger::Instance().add(std::make_shared<ConsoleChannel>());Logger::Instance().setWriter(std::make_shared<AsyncLogWriter>());ThreadPool pool(thread::hardware_concurrency(), ThreadPool::PRIORITY_HIGHEST, true);//每个任务耗时3秒auto task_second = 3;//每个线程平均执行4次任务,总耗时应该为12秒auto task_count = thread::hardware_concurrency() * 4;semaphore sem;vector<int> vec;vec.resize(task_count);Ticker ticker;{//放在作用域中确保token引用次数减1auto token = std::make_shared<onceToken>(nullptr, [&]() {sem.post();});for (auto i = 0; i < task_count; ++i) {pool.async([token, i, task_second, &vec]() {setThreadName(("thread pool " + to_string(i)).data());std::this_thread::sleep_for(std::chrono::seconds(task_second)); //休眠三秒InfoL << "task " << i << " done!";vec[i] = i;});}}sem.wait();InfoL << "all task done, used milliseconds:" << ticker.elapsedTime();//打印执行结果for (auto i = 0; i < task_count; ++i) {InfoL << vec[i];}return 0;
}
#endif // TEST_THREADPOOL_H

ResourcePool内存池测试

ResourcePool :基于智能指针实现的一个循环池,不需要手动回收对象。

ResourcePool 是个类模板,可传入自定义数据类型C,内存池中存放的是该数据类型C的指针数组 std::vector<C > _objs。
setSize(_pool_size) 可设置内存池最大容量,当超出最大容量,recycle 不再回收该资源,而是直接释放。
obtain 获取内存,返回的是指向内存对象C的自定义智能指针 shared_ptr_imp,特点是提供接口quit,当内存对象使用完后,可以选择不再回收到内存池,此时可以自定义回收或直接释放。
obtain2 获取内存,返回指向内存对象C的智能指针 std::shared_ptr,当智能指针离开作用域时自动回收内存对象C到内存池。
私有接口:
getPtr 获取C原始指针,当 _objs 为空时分配一个新的C
返回,当 _objs 不为空则从 _objs 尾部取已一个C*,并从 _objs 中删除。
recycle 回收C*,如果 _objs 已满(>=_pool_size),直接释放C*,否则回收到 _objs 尾部。

总结
1、内存池中存放的是任意类型数据指针C*,C大小固定或可动态扩容,刚开始内存池是空的,使用时分配内存,用完后回收到内存池,下次再使用时就不用重新分配了,直接用上次分配并回收的C*;
2、不用担心高并发内存池不够用,因为当内存池为空时总会立即分配内存,如果分配的太多,回收时超出内存池大小后会直接释放,合理的内存池大小在高并发时会减少分配和释放的次数。

#ifndef TEST_RESOURCEPOOL_H
#define TEST_RESOURCEPOOL_H#include <csignal>
#include <iostream>
#include <random>
#include "Util/util.h"
#include "Util/logger.h"
#include "Util/ResourcePool.h"
#include "Thread/threadgroup.h"
#include <list>using namespace std;
using namespace toolkit;//程序退出标志
bool g_bExitFlag = false;
class string_imp : public string{
public:template<typename ...ArgTypes>string_imp(ArgTypes &&...args) : string(std::forward<ArgTypes>(args)...){DebugL << "创建string对象:" << this << " " << *this;};~string_imp(){WarnL << "销毁string对象:" << this << " " << *this;}
};//后台线程任务
void onRun(ResourcePool<string_imp> &pool,int threadNum){std::random_device rd;while(!g_bExitFlag){//从循环池获取一个可用的对象auto obj_ptr = pool.obtain();if(obj_ptr->empty()){//这个对象是全新未使用的InfoL << "后台线程 " << threadNum << ":" << "obtain a emptry object!";}else{//这个对象是循环使用的InfoL << "后台线程 " << threadNum << ":" << *obj_ptr;}//标记该对象被本线程使用obj_ptr->assign(StrPrinter << "keeped by thread:" << threadNum );//随机休眠,打乱循环使用顺序usleep( 1000 * (rd()% 10));obj_ptr.reset();//手动释放,也可以注释这句代码。根据RAII的原理,该对象会被自动释放并重新进入循环列队usleep( 1000 * (rd()% 1000));}
}int test_ResourcePool() {//初始化日志Logger::Instance().add(std::make_shared<ConsoleChannel>());Logger::Instance().setWriter(std::make_shared<AsyncLogWriter>());//大小为50的循环池ResourcePool<string_imp> pool;pool.setSize(50);//获取一个对象,该对象将被主线程持有,并且不会被后台线程获取并赋值auto reservedObj = pool.obtain();//在主线程赋值该对象reservedObj->assign("This is a reserved object , and will never be used!");thread_group group;//创建4个后台线程,该4个线程模拟循环池的使用场景,//理论上4个线程在同一时间最多同时总共占用4个对象WarnL << "主线程打印:" << "开始测试,主线程已经获取到的对象应该不会被后台线程获取到:" << *reservedObj;for(int i = 0 ;i < 4 ; ++i){group.create_thread([i,&pool](){onRun(pool,i);});}//等待3秒钟,此时循环池里面可用的对象基本上最少都被使用过一遍了sleep(3);//但是由于reservedObj早已被主线程持有,后台线程是获取不到该对象的//所以其值应该尚未被覆盖WarnL << "主线程打印: 该对象还在被主线程持有,其值应该保持不变:" << *reservedObj;//获取该对象的引用auto &objref = *reservedObj;//显式释放对象,让对象重新进入循环列队,这时该对象应该会被后台线程持有并赋值reservedObj.reset();WarnL << "主线程打印: 已经释放该对象,它应该会被后台线程获取到并被覆盖值";//再休眠3秒,让reservedObj被后台线程循环使用sleep(3);//这时,reservedObj还在循环池内,引用应该还是有效的,但是值应该被覆盖了WarnL << "主线程打印:对象已被后台线程赋值为:" << objref << endl;{WarnL << "主线程打印:开始测试主动放弃循环使用功能";List<decltype(pool)::ValuePtr> objlist;for (int i = 0; i < 8; ++i) {reservedObj = pool.obtain();string str = StrPrinter << i << " " << (i % 2 == 0 ? "此对象将脱离循环池管理" : "此对象将回到循环池");reservedObj->assign(str);reservedObj.quit(i % 2 == 0);objlist.emplace_back(reservedObj);}}sleep(3);//通知后台线程退出g_bExitFlag = true;//等待后台线程退出group.join_all();return 0;
}
#endif // TEST_RESOURCEPOOL_H

stampthread时间戳线程测试

测试时间戳线程
只要执行了静态方法initMillisecondThread,就会创建时间戳线程,最多创建1个,线程名称:stamp thread。
提供getCurrentMillisecond和getCurrentMicrosecond接口,获取程序启动到当前时间的毫秒数和微秒数,或从1970年开始到当前时间的毫秒数和微秒数。
程序启动后有两个线程:QT_ZLToolKit(主线程)和stamp thread。

#ifndef TEST_STAMPTHREAD_H
#define TEST_STAMPTHREAD_H#include "Util/util.h"
#include <sys/time.h>
using namespace std;
using namespace toolkit;void test_stampthread() {uint64_t cur_ms = getCurrentMillisecond(true);printf("cur_ms = %lu\n",cur_ms);usleep(100*1000);cur_ms = getCurrentMillisecond(true);printf("cur_ms = %lu\n",cur_ms);
}#endif // TEST_STAMPTHREAD_H

NoticeCenter广播中心测试

广播中心,可以在程序的任意线程添加监听事件并定义回调;可以在任意线程发出一个事件,通知所有监听了该事件的地方执行回调。

每个事件创建一个分发器 EventDispatcher ,分发器存放监听该事件的key和回调,加线程锁,多线程安全。

NoticeCenter::Instance() 定义对外接口,是全局单例对象,加线程锁,添加/删除事件、发出事件均是多线程安全。

addListener(指针key,事件名,回调) 可以在任意线程添加监听,emitEvent(事件名,参数列表) 可以在任意线程发出一个事件,注意:监听回调是在 emitEvent 所在线程执行的

class EventDispatcher,成员:std::unordered_multimap<void *, Any>(first指针,多个对象监听相同事件传的指针必须不同,second是监听该事件的回调),recursive_mutex,事件分发器,监听同一个事件的回调。
class NoticeCenter,成员:std::unordered_map<std::string, EventDispatcher::Ptr>(first事件名,second分发器),recursive_mutex,接口:emitEvent,addListener,delListener,全局单例。

下面实验:多线程监听相同事件,线程安全。

#ifndef TEST_NOTICECENTER_H
#define TEST_NOTICECENTER_H#include <csignal>
#include "Util/util.h"
#include "Util/logger.h"
#include "Util/NoticeCenter.h"
using namespace std;
using namespace toolkit;//定义两个事件,事件是字符串类型
//广播名称1
#define NOTICE_NAME1 "NOTICE_NAME1"
//广播名称2
#define NOTICE_NAME2 "NOTICE_NAME2"//程序退出标记
bool g_bExitFlag = false;static void *tag1;
static void *tag2;void* func0(void*) {//addListener方法第一个参数是标签,用来删除监听时使用//需要注意的是监听回调的参数列表个数类型需要与emitEvent广播时的完全一致,否则会有无法预知的错误NoticeCenter::Instance().addListener(tag1,NOTICE_NAME1,[](int &a,const char * &b,double &c,string &d){printf("func0=%d\n",a);});return nullptr;
}void* func1(void*) {NoticeCenter::Instance().addListener(tag2,NOTICE_NAME1,[](int &a,const char * &b,double &c,string &d){printf("func1=%d\n",a);});return nullptr;
}void* func2(void*) {//监听NOTICE_NAME2事件NoticeCenter::Instance().addListener(0,NOTICE_NAME2,[](string &d,double &c,const char *&b,int &a){printf("func2=%d\n",a);});return nullptr;
}int test_NoticeCenter() {//设置程序退出信号处理函数signal(SIGINT, [](int){g_bExitFlag = true;});//设置日志Logger::Instance().add(std::make_shared<ConsoleChannel>());pthread_t tid[5];pthread_create(&tid[0],nullptr,func0,nullptr);pthread_create(&tid[1],nullptr,func1,nullptr);pthread_create(&tid[2],nullptr,func2,nullptr);int a = 0;while(!g_bExitFlag){const char *b = "b";double c = 3.14;string d("d");//每隔1秒广播一次事件,如果无法确定参数类型,可加强制转换NoticeCenter::Instance().emitEvent(NOTICE_NAME1,++a,(const char *)"b",c,d);NoticeCenter::Instance().emitEvent(NOTICE_NAME2,d,c,b,a);sleep(1); // sleep 1 second}return 0;
}
#endif // TEST_NOTICECENTER_H

onceToken测试

RAII [1] (Resource Acquisition Is Initialization)
也称为“资源获取就是初始化”,是C++语言的一种管理资源、避免泄漏的惯用法。
RAII的思想:构造时获取资源,在对象生命周期内保持资源有效,最后对象析构时释放资源。

onceToken
使用RAII模式实现,可以在对象构造和析构时执行一段代码。
也就是在构造时执行一段代码(传nullptr则什么都不执行),在离开作用域时执行一段代码。

在ZLM中,onceToken 主要用于防止在程序抛出异常时提前返回,没有执行接下来的代码。
把一定要执行的代码放在 onceToken 析构时执行,防止程序抛出异常提前返回。
如果要等待异步执行后再析构,在执行 async 时把 onceToken 智能指针作为行参传递给Lambda表达式。

#ifndef TEST_ONCETOKEN_H
#define TEST_ONCETOKEN_H#include <csignal>
#include "Util/onceToken.h"
#include "Poller/EventPoller.h"
using namespace std;
using namespace toolkit;
void token_start() {printf("token start\n");
}void test_onceToken() {EventPoller::Ptr poller = EventPollerPool::Instance().getPoller();//异步执行时传递 onceToken 智能指针行参,引用计数加1,等所有的异步执行结束后引用计数变为0,执行析构/*** 打印:* token start* async=0* async=1* async=2* token destruct*/{auto token = std::make_shared<onceToken>(token_start, []() {printf("token destruct\n");});for (auto i = 0; i < 3; ++i) {poller->async([token, i]() {//EventPoller 线程异步执行printf("async=%d\n",i);std::this_thread::sleep_for(std::chrono::seconds(1)); //休眠1秒});}}//退出程序事件处理static semaphore sem;signal(SIGINT, [](int) { sem.post(); });// 设置退出信号sem.wait();
}
#endif // TEST_ONCETOKEN_H

Any数据结构测试

Any可以保存任意类型的数据。

#ifndef TEST_ANY_H
#define TEST_ANY_H#include <csignal>
#include <iostream>
#include "Util/util.h"
#include "Util/logger.h"
#include "Util/TimeTicker.h"
#include "Poller/Timer.h"using namespace std;
using namespace toolkit;class Student{
public:Student(int age):age(age) {printf("Student\n");}~Student() {printf("~Student\n");}Student(Student &s) {printf("Student copy\n");age = s.age;}int age;
};template <typename FUNC>
void test_func(FUNC &&func) {}/*** @brief Any :可以保存任意的对象*/
void test_Any() {//1.Any保存类的对象{Any aa;//创建 Student 的智能指针,(17)是构造函数的参数aa.set<Student>(17);//拷贝构造,get<Student>(bool可选参数)返回智能指针所管理的原始指针的对象引用Student S1 = aa.get<Student>();
//        aa.reset();//如果天提前释放aa,则捕获异常,打印: ex=Any is emptytry{printf("aa age=%d\n",aa.get<Student>().age);}catch(std::exception& e) {printf("ex=%s\n",e.what());}printf("s1 age=%d\n",S1.age);//离开作用域打印:// Student// Student copy// aa age=17// s1 age=17// ~Student// ~Student}//2.Any保存 function 函数指针模板Any bb;//set<function函数指针模板的数据类型>(function 的实例化,lambda表达式)bb.set<std::function<void(int)>>([](int a){printf("a=%d\n",a);});//获取bb所管理的对象并调用方法,(bool可选参数)(10:调用对象所传的参数)bb.get<std::function<void(int)>>()(10);//调用lambda表达式,打印:a=10//退出程序事件处理static semaphore sem;signal(SIGINT, [](int) { sem.post(); });// 设置退出信号sem.wait();
}
#endif // TEST_ANY_H

function_traits测试

源码类似于: 《深入应用C++11 代码优化与工程级应用》3.3.6 function_traits

function_traits 用来获取所有函数语义类型的信息(函数类型、返回类型、参数个数和参数的具体类型),通过 stl_function_type 把任意函数转换成 std::function。
函数类型包括:普通函数、函数指针、function/lambda、成员函数、函数对象。

实现function_traits的关键技术:
要通过模板特化和可变参数模板来获取函数类型和返回类型。
先定义一个基本的function_traits的模板类:
template
struct function_traits;
再通过特化,将返回类型和可变参数模板作为模板参数,就可以获取函数类型、函数返回值和参数的个数了。

如:
int func(int a, string b);
1## 获取函数类型
function_traits<decltype(func)>::function_type; // int __cdecl(int, string)
2# 获取函数返回值
function_traits<decltype(func)>::return_type; // int
3# 获取函数的参数个数
function_traits<decltype(func)>::arity; // 2
4# 获取函数第一个入参类型
function_traits<decltype(func)>::args<0>::type; // int
5# 获取函数第二个入参类型
function_traits<decltype(func)>::args<1>::type; // string
6# 将函数转换为 std::function
stl_function_type

#ifndef TEST_FUNCTION_TRAITS_H
#define TEST_FUNCTION_TRAITS_H#include <csignal>
#include <iostream>
#include "Util/util.h"
#include "Util/logger.h"
#include "Util/TimeTicker.h"
#include "Poller/Timer.h"
#include "Util/function_traits.h"using namespace std;
using namespace toolkit;
//打印数据类型
template<typename T>
void printType()
{printf("%s\n",demangle(typeid(T).name()).c_str());
}//自定义类
class Student2{};//函数指针
float(*cast_func)(int, int, int, int);//普通函数
int func(int a, Student2 b)
{printf("a=%d\n",a);return 0;
}struct AA
{int f(int a, int b)volatile { return a + b; }//成员函数int operator()(int)const { return 0; }//函数对象
};//function 函数包装模板,指向 lambda 表达式
std::function<int(int)> func_lam = [](int a) {return a; };template <typename FUNC>
void to_function(FUNC &&func) {using stl_func = typename function_traits<typename std::remove_reference<FUNC>::type>::stl_function_type;stl_func f = func;f(10,Student2());//调用func,打印: a=10
}void test_function_traits() {//1.获取函数信息printf("func:%s\n",demangle(typeid(function_traits<decltype(func)>::function_type).name()).c_str());//打印函数类型printf("func ret:%s\n",demangle(typeid(function_traits<decltype(func)>::return_type).name()).c_str());//打印返回值类型printf("fucn arg num:%d\n",function_traits<decltype(func)>::arity);//打印参数个数printf("fucn arg[0]:%s\n",demangle(typeid(function_traits<decltype(func)>::args<0>::type).name()).c_str());//打印第一个参数的类型printType<function_traits<std::function<int(int)>>::function_type>();printType<function_traits<std::function<int(int)>>::args<0>::type>();printType<function_traits<decltype(func_lam)>::function_type>();printType<function_traits<decltype(cast_func)>::function_type>();printType<function_traits<AA>::function_type>();using T = decltype(&AA::f);printType<T>();printType<function_traits<decltype(&AA::f)>::function_type>();static_assert(std::is_same<function_traits<decltype(func_lam)>::return_type, int>::value, "");//2.使用 stl_function_type 把任意函数转换为 std::functionto_function(func);/*** 打印:* func:int (int, Student2)* func ret:int* fucn arg num:2* fucn arg[0]:int* int (int)* int* int (int)* float (int, int, int, int)* int (int)* int (AA::*)(int, int) volatile* int (int, int)* a=10*///退出程序事件处理static semaphore sem;signal(SIGINT, [](int) { sem.post(); });// 设置退出信号sem.wait();
}
#endif // TEST_FUNCTION_TRAITS_H

ObjectStatistic类统计测试

ObjectStatistic :统计类所实例化对象的个数。

创建私有成员: ObjectStatistic<Test_Obj> _statistic;并使用宏声明: StatisticImp(Test_Obj)。

在任意实例化的对象里均可调用接口 _statistic.count 查询实例化对象的总数。

#ifndef TEST_OBJECTSTATISTIC_H
#define TEST_OBJECTSTATISTIC_H#include <csignal>
#include <iostream>
#include "Util/util.h"
#include "Util/logger.h"
#include "Util/TimeTicker.h"
#include "Poller/Timer.h"using namespace std;
using namespace toolkit;class Test_CLS {public:int get_count() {return _statistic.count();}
private:ObjectStatistic<Test_CLS> _statistic;
};StatisticImp(Test_CLS)void test_ObjectStatistic() {Test_CLS tTest_CLS1;Test_CLS tTest_CLS2;printf("count=%d\n",tTest_CLS1.get_count());//count=2{Test_CLS tTest_CLS3;printf("count=%d\n",tTest_CLS1.get_count());//count=3}printf("count=%d\n",tTest_CLS1.get_count());//count=2//退出程序事件处理static semaphore sem;signal(SIGINT, [](int) { sem.post(); });// 设置退出信号sem.wait();
}
#endif // TEST_OBJECTSTATISTIC_H

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/329710.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

RT-DETR Gradio 前端展示页面

效果展示 使用方法 Gradio 是一个开源库,旨在为机器学习模型提供快速且易于使用的网页界面。它允许开发者和研究人员轻松地为他们的模型创建交互式的演示,使得无论技术背景如何的人都可以方便地试用和理解这些模型。使用Gradio,你只需几行代码就可以生成一个网页应用程序,…

route 命令

格式&#xff1a; route [-f] [-p] [Command] [Destination] [mask Netmask] [Gateway] [metric Metric] [if Interface] 功能&#xff1a; route 命令是用于操作基于内核ip路由表&#xff0c;它的主要作用是创建一个静态路由让指定一个主机或者一个网络通过一个网络接口。 …

CSS效果(工作中常用)

1、css文字溢出省略号 overflow: hidden; // 溢出隐藏 text-overflow: ellipsis; // 溢出用省略号显示 white-space: nowrap; // 规定段落中的文本不进行换行 overflow: hidden; // 溢出隐藏 text-overflow: ellipsis; // 溢出用省略…

【Docker基础二】Docker安装Mysql8

下载镜像 安装mysql&#xff08;版本&#xff1a;8.0.35&#xff09; # 拉取镜像 docker pull mysql:8.0.35 # 查看镜像是否已经下载 docker images 创建挂载目录 # 宿主机上创建挂载目录 (可以不创建&#xff0c;docker run -v配置了挂载目录&#xff0c;docker会自动…

PolarDB Serverless能力测评:秒级弹升、无感伸缩与强一致性,助您实现高效云数据库管理!

前言 PolarDB MySQL 传统的关系型数据库有着悠久的历史&#xff0c;从上世纪60年代开始就已经在航空领域发挥作用。因为其严谨的一致性保证以及通用的关系型数据模型接口&#xff0c;获得了越来越多的应用。2000年以后&#xff0c;随着互联网应用的出现&#xff0c;很多场景…

JS新手入门笔记整理:条件判断

判断语句&#xff1a;IF 单向判断&#xff1a;if... 语法 if&#xff08;条件&#xff09; {…… } 如果“条件”返回结果为true&#xff0c;则会执行大括号{}内部的程序&#xff1b;如果“条件”返回结果为false&#xff0c;则会直接跳过大括号{}内部的程序&#xff0c;然后…

【JAVA】Iterator 和 ListIterator 有什么区别?

&#x1f34e;个人博客&#xff1a;个人主页 &#x1f3c6;个人专栏&#xff1a; JAVA ⛳️ 功不唐捐&#xff0c;玉汝于成 目录 前言 在Java中&#xff0c;遍历集合是日常编程中常见的任务&#xff0c;而Iterator和ListIterator作为遍历集合的两个主要接口&#xff0…

【基础工具篇使用】VScode 远程 Ubuntu 系统 进行使用

文章目录 准备条件使用步骤step1&#xff1a; 打开远程窗口step2&#xff1a;选择中的红色框“Connect to Host”功能Step3: 根据图中的红色框提示信息输入远程电脑的用户名和 IP 地址&#xff0c;输入如下命令即可连接&#xff1a; 显示效果 准备条件 我们可以使用 vscode 的…

【嵌入式移植】2、使用Crosstool-NG制作交叉编译工具链

【嵌入式移植】2、使用Crosstool-NG制作交叉编译工具链 1 准备工作1.1 下载Crosstool-NG1.2 尝试配置crosstool-ng&#xff0c;安装依赖项1.2.1 Crosstool-NG所需软件包 1.3 编译及安装 2 制作交叉编译工具链2.1 选择配置文件2.2 使用用户界面菜单进行配置2.2.1 Paths and misc…

计算机原理 (2) CPU的诞生 输入 输出 PC指针

文章目录 计算机的前世今生计算机的三个根本性基础1. 计算机是执行输入、运算、输出的机器&#xff1b;2.程序是指令和数据的集合&#xff1b;3.计算机的处理方式有时与人们的思维习惯不同 二、结论三、参考资料交个朋友 计算机的前世今生 上一篇文章最终结束的时候谈到希望给…

Robot Operating System 2: Design, Architecture, and Uses In The Wild

Robot Operating System 2: Design, Architecture, and Uses In The Wild (机器人操作系统 2&#xff1a;设计、架构和实际应用) 摘要&#xff1a;随着机器人在广泛的商业用例中的部署&#xff0c;机器人革命的下一章正在顺利进行。即使在无数的应用程序和环境中&#xff0c;也…

vue下载阿里OSS上的图片与视频,纯前端实现,以及纯JS下载图片案例

vue下载阿里OSS上的图片与视频&#xff0c;以及纯JS下载图片案例 1. 简介与日常使用2. Vue下载阿里OSS上的图片与视频3. 调用&#xff08;单个与批量下载均可使用&#xff09;4. 纯JS下载图片案例&#xff0c;Vue里面也可以用 1. 简介与日常使用 参考这篇文章即可&#xff1a;…