muduo提供了基于底层事件循环的定时器,通过底层提供的特殊文件描述符timerfd实现。TimerId、Timer和TimerQueue是定时器的主要实现类。
timerfd
timerfd是linux提供的基于文件描述符的定时器接口。他通过文件描述符与定时器关联,当定时器到期时,文件描述符会变为可读。因此可以与常用的IO多路复用机制(epoll、poll、select)配合使用,自实现定时器。
timerfd有三个主要方法:
// 创建一个timerfd
// @param clockid 时钟类型,可以是CLOCK_REALTIME或CLOCK_MONOTONIC
// @param flags 附加选项
// @return timerfd的文件描述符
int timerfd_create(int clockid, int flags);// 设置timerfd的超时时间
// @param fd timerfd的文件描述符
// @param flags 附加选项
// @param new_value 新的超时时间
// @param old_value 旧的超时时间
// @return 0表示成功,其他表示失败
int timerfd_settime(int fd, int flags, const struct itimerspec *new_value, struct itimerspec *old_value);// 获取当前timerfd设置的时间消息
// @param fd timerfd的文件描述符
// @param curr_value 当前的超时时间
int timerfd_gettime(int fd, struct itimerspec *curr_value);
itimerspec是一个存储时间信息的结构体,包含定时器到期时间和定时器重复触发的间隔时间(若为零,则表示只触发一次)。
TimerId、Timer和TimerQueue
muduo库定时器设计中,每个事件循环即EventLoop只拥有一个timerfd,注册某个事件循环上的所有定时任务都使用同一个timerfd。定时器队列TimerQueue就是注册在事件循环上的所有定时任务的管理类,同样每个事件循环只拥有一个TimerQueue。定时器类Timer则封装了每个定时任务的相关信息,包括超时时间、定时任务的回调函数等。TimerId的是每个定时任务的标识符,用户在往事件循环注册定时任务时,会返回对应任务的TimerId,以便后续管理。
TimerId
TimerId是定时器标识符,结构简单,只有两个成员变量没有其它成员函数:
Timer* timer_; // 指向Timer对象
int64_t sequence_; // 定时器序号,确保每个定时器唯一
Timer
Timer是定时器的具体实现类,封装了定时器的相关信息,包括超时时间、定时任务的回调函数等。
class Timer : noncopyable
{
public:// 构造函数// @param cb 定时任务的回调函数// @param when 定时器的到期时间// @param interval 定时器的重复触发间隔,若为零,则表示只触发一次Timer(TimerCallback cb, Timestamp when, double interval): callback_(std::move(cb)),expiration_(when),interval_(interval),repeat_(interval > 0.0), // 时间间隔大于零,表示重复触发sequence_(s_numCreated_.incrementAndGet()) // 获取定时器序号,s_numCreated_是一个原子计数器{ }// 运行定时任务void run() const{callback_();}Timestamp expiration() const { return expiration_; }bool repeat() const { return repeat_; }int64_t sequence() const { return sequence_; }// 重启定时器// @param now 当前时间void restart(Timestamp now);static int64_t numCreated() { return s_numCreated_.get(); }private:const TimerCallback callback_; // 定时任务的回调函数Timestamp expiration_; // 定时器的到期时间const double interval_; // 定时器的重复触发间隔const bool repeat_; // 是否重复触发const int64_t sequence_; // 定时器序号static AtomicInt64 s_numCreated_;
};//实现
void Timer::restart(Timestamp now)
{if (repeat_){// 计算下次到期时间expiration_ = addTime(now, interval_);}else{expiration_ = Timestamp::invalid();}
}
TimerQueue
TimerQueue是定时器队列的实现类,管理注册在事件循环上的所有定时任务。
class TimerQueue : noncopyable
{
public:// 构造函数// @param loop 队列所在的事件循环explicit TimerQueue(EventLoop* loop);~TimerQueue();//向队列中添加一个定时器// @param cb 定时任务的回调函数// @param when 定时器的到期时间// @param interval 定时器的重复触发间隔,若为零,则表示只触发一次TimerId addTimer(TimerCallback cb,Timestamp when,double interval);// 取消一个定时任务// @param timerId 要取消的定时器的标识符void cancel(TimerId timerId);private:typedef std::pair<Timestamp, Timer*> Entry;typedef std::set<Entry> TimerList; typedef std::pair<Timer*, int64_t> ActiveTimer;typedef std::set<ActiveTimer> ActiveTimerSet;// 在事件循环线程中添加一个定时器void addTimerInLoop(Timer* timer);// 在事件循环线程中取消一个定时器void cancelInLoop(TimerId timerId);// 处理timerfd可读事件void handleRead();// 获取已到期的定时器// @param now 文件描述符到期时间// @return 已到期的定时器列表std::vector<Entry> getExpired(Timestamp now);// 重置已到期的定时器void reset(const std::vector<Entry>& expired, Timestamp now);// 插入一个定时器bool insert(Timer* timer);EventLoop* loop_; // 队列所在的事件循环const int timerfd_; // timerfd的文件描述符Channel timerfdChannel_; // timerfd对应的Channel,用于监听可读事件TimerList timers_; // 定时器队列ActiveTimerSet activeTimers_; // 当前活跃的定时器bool callingExpiredTimers_; /* atomic */ // 是否正在调用过期定时器的回调函数ActiveTimerSet cancelingTimers_; // 待取消的定时器
};
std::set底层通过红黑树实现,是一个有序的集合,因此对于TimerList,若向其中插入定时器,则会自动根据std::pair的第一个参数(重载了<运算符的Timestamp)来自动排序(从小到大)。
添加定时器
外部调用addTimer()来向定时队列添加定时器。首先构造一个定时器对象,然后在事件循环线程中调用addTimerInLoop()来添加定时器。避免在多个线程中竞争资源
TimerId TimerQueue::addTimer(TimerCallback cb,Timestamp when,double interval)
{Timer* timer = new Timer(std::move(cb), when, interval);loop_->runInLoop(std::bind(&TimerQueue::addTimerInLoop, this, timer));// 返回构造的定时器的标识符return TimerId(timer, timer->sequence());
}
调用insert函数将定时器插入到定时器队列,并根据当前定时器的到期时间是否是最早到期的,来选择是否重置timerfd的到期时间。
void TimerQueue::addTimerInLoop(Timer* timer)
{loop_->assertInLoopThread();bool earliestChanged = insert(timer);if (earliestChanged){resetTimerfd(timerfd_, timer->expiration());}
}
插入定时器到timers_和activeTimers_中,并判断是否是最早到期的定时器
bool TimerQueue::insert(Timer* timer)
{loop_->assertInLoopThread();assert(timers_.size() == activeTimers_.size());bool earliestChanged = false;Timestamp when = timer->expiration();TimerList::iterator it = timers_.begin();if (it == timers_.end() || when < it->first){earliestChanged = true;}{std::pair<TimerList::iterator, bool> result= timers_.insert(Entry(when, timer));assert(result.second); (void)result;}{std::pair<ActiveTimerSet::iterator, bool> result= activeTimers_.insert(ActiveTimer(timer, timer->sequence()));assert(result.second); (void)result;}assert(timers_.size() == activeTimers_.size());return earliestChanged;
}
定时任务到期
当timerfd设置的时间到期,timerfd可读,事件循环捕获到该可读事件发生后会调用先前注册的该函数。
void TimerQueue::handleRead()
{loop_->assertInLoopThread();Timestamp now(Timestamp::now()); // 获取当前时间readTimerfd(timerfd_, now); // 读取timerfd中的数据// 获取到期的定时器std::vector<Entry> expired = getExpired(now);// 标志当前正在处理到期的定时器任务callingExpiredTimers_ = true;// 清空待取消的定时器(后续讨论)cancelingTimers_.clear();// 处理任务for (const Entry& it : expired){it.second->run();}// 标志处理完成callingExpiredTimers_ = false;// 重置已到期的定时器reset(expired, now);
}
根据传入的时间获取当前到期的定时器,该函数通过整体批量处理的方式来提高效率。
std::vector<TimerQueue::Entry> TimerQueue::getExpired(Timestamp now)
{assert(timers_.size() == activeTimers_.size());std::vector<Entry> expired;// 构造边界元素,用于分割timers_Entry sentry(now, reinterpret_cast<Timer*>(UINTPTR_MAX));TimerList::iterator end = timers_.lower_bound(sentry);assert(end == timers_.end() || now < end->first);std::copy(timers_.begin(), end, back_inserter(expired));// 处理边界元素,从timers_中删除到期的定时器timers_.erase(timers_.begin(), end);for (const Entry& it : expired){// 从activeTimers_中删除已到期的定时器ActiveTimer timer(it.second, it.second->sequence());size_t n = activeTimers_.erase(timer);assert(n == 1); (void)n;}assert(timers_.size() == activeTimers_.size());return expired;
}
重置到期的定时任务,若定时器设置为重复触发则重新添加到timers_中,否则释放构造的Timer对象。最后重置timerfd的到期时间。
void TimerQueue::reset(const std::vector<Entry>& expired, Timestamp now)
{Timestamp nextExpire;for (const Entry& it : expired){ActiveTimer timer(it.second, it.second->sequence());// 如果定时器标志为重复触发且不是待取消的定时器if (it.second->repeat()&& cancelingTimers_.find(timer) == cancelingTimers_.end()){it.second->restart(now);insert(it.second);}else{delete it.second; }}if (!timers_.empty()){nextExpire = timers_.begin()->second->expiration();}if (nextExpire.valid()){resetTimerfd(timerfd_, nextExpire);}
}
取消定时器
同addTimer,提供外部调用接口,真正取消是在事件循环线程中
void TimerQueue::cancel(TimerId timerId)
{loop_->runInLoop(std::bind(&TimerQueue::cancelInLoop, this, timerId));
}
根据timerId定位到对应的定时器,并将其从activeTimers_和timers_中删除。如果删除时正在处理定时器任务,则将其放入cancelingTimers_中,在处理完毕后调用reset时处理。
void TimerQueue::cancelInLoop(TimerId timerId)
{loop_->assertInLoopThread();assert(timers_.size() == activeTimers_.size());ActiveTimer timer(timerId.timer_, timerId.sequence_);ActiveTimerSet::iterator it = activeTimers_.find(timer);if (it != activeTimers_.end()){size_t n = timers_.erase(Entry(it->first->expiration(), it->first));assert(n == 1); (void)n;delete it->first; activeTimers_.erase(it);}else if (callingExpiredTimers_){cancelingTimers_.insert(timer);}assert(timers_.size() == activeTimers_.size());
}
可能会有人不理解为什么要用两个容器来装定时器(TimerList和ActiveTimerSet)。我的理解是,TimerList装的是<Timestamp /*到期时间*/, Timer* /*定时器指针*/>,这样方便通过Timestamp来将所有定时器根据到期时间排序,更好获取到期的定时器以及设置timerfd的到期时间。而ActiveTimerSet装的是<Timer* /*定时器指针*/, int64_t /*定时器序号*/>,这和定时器标识符TimerId是一一对应的,方便在取消的时候,根据TimerId来定位到对应的定时器。
最后源码中在两次删除Timer对象时,注释有标注不要删除,而是放入一个待删除队列中,或许是考虑到直接删除容易引发错误,对指针管理不够严谨。因此我也建议大家在使用前改动一下。
定时器使用
EventLoop类提供了对应接口:
// 在time时刻运行cb
TimerId EventLoop::runAt(Timestamp time, TimerCallback cb)
{return timerQueue_->addTimer(std::move(cb), time, 0.0);
}// 在delay时长后运行cb
TimerId EventLoop::runAfter(double delay, TimerCallback cb)
{Timestamp time(addTime(Timestamp::now(), delay));return runAt(time, std::move(cb));
}// 每隔interval时长运行cb
TimerId EventLoop::runEvery(double interval, TimerCallback cb)
{Timestamp time(addTime(Timestamp::now(), interval));return timerQueue_->addTimer(std::move(cb), time, interval);
}// 取消定时器
void EventLoop::cancel(TimerId timerId)
{return timerQueue_->cancel(timerId);
}
示例:
void onTimeout() {std::cout << "Timeout!" << std::endl;
}int main() {EventLoop loop;// 添加一个 2 秒后执行的一次性定时任务TimerId timerId = loop.runAfter(2.0, onTimeout);// 取消定时任务loop.cancel(timerId);loop.loop(); // 进入事件循环return 0;
}