前言
RocketMQ 相较于其它消息队列产品的一个特性是支持延时消息,也就是说消息发送到 Broker 不会立马投递给消费者,要等待一个指定的延迟时间再投递,适用场景例如:下单后多长时间没付款系统自动关闭订单。
RocketMQ 4.x 版本的延时消息存在一定的局限性,实现原理是:Broker 内置了名称为SCHEDULE_TOPIC_XXXX
的Topic,包含 18 个对应延时级别的队列,每个延时级别的时间是固定的。Broker 收到消息后,如果发现是延时消息就会改写消息的 Topic 和 queueID,再通过专门的线程定时扫描这些队列,把延迟时间已到的消息重新投递到真正的 Topic 里面。
这种实现方案相对简单,但也存在局限性。消息的延迟时间不能随意设置,只能按照给定的延迟级别来设置;最长的延迟级别是两小时,如果用户需要更久的延时,就只能自行改造方案。
RocketMQ 5.0 意识到了这个问题,终于延时消息迎来升级,废弃了之前延迟级别的设计,消费者可以设置 24 小时内的任意时间,这个限制其实还可以增加,不过堆积太多的消息会带来其它问题。
设计难点
延时消息的设计是存在一些难点的,下面逐个分析。
1、任意延迟时间如何扫描?
延迟消息肯定需要一个数据结构来存储,为了不影响消息发送的吞吐量,还必须保证写入的高性能。然后还需要有线程定时的扫描这个数据结构,把延迟时间已到的消息投递给消费者。
RocketMQ 4.x 为了保证高性能写,还是把延时消息正常写 CommitLog,只不过换个 Topic。不同延迟时长的消息写入不同队列,这样就能保证每个队列前面的消息肯定比后面的消息先投递,线程扫描的时候顺序扫,只要扫到第一个延迟时间还没到的消息,后面的消息就可以跳过了,避免全局扫描。
2、消息清理问题
RocketMQ 本身可以看作是一个缓冲区,是用来做削峰填谷的,消息不可能一直保留,所以要有定时清理机制。CommitLog 默认会保存三天,如果支持太久的延迟时间,万一 CommitLog 被清理了,延迟时间投递的时候就无法读取出原始消息内容了。
3、大量消息同时到期
大量消息同时到期也是延迟消息比较头疼的问题,毕竟是单线程扫描投递,如果大量消息同时到期,短时间内投递的消息太多,就会导致消息延迟,不过这个问题可以业务上加个随机时间解决。
时间轮算法
RocketMQ 5.0 引入了时间轮算法 (TimingWheel) 来支持任意时间的延时消息。
先看一下算法的思想,如图所示,圆环就是一个时间轮,它共有 8 个刻度,假设每个刻度代表一秒钟。延时任务会根据延迟时间添加到时间轮对应的刻度上。Data1、Data2 延迟时间都是一秒,所以被添加到刻度1上;Data4 延迟时间 14 秒,饶了一圈后被添加到刻度6上。同时,会有一个指向当前时间刻度的指针,沿着时间轮顺时针旋转,指针每秒前进一个刻度,并把当前刻度上所有的延迟时间已到的延时任务全部执行一遍。
基于时间轮算法,就可以实现任意时间的延时任务的调度执行,如果你觉得“秒”的精度不够,可以把刻度再拆分的更精细化一些,定时任务跑的频率更高一些即可。
设计实现
RocketMQ 是怎么实现时间轮算法的呢?
RocketMQ 用 TimerWheel 类来封装时间轮,它实际对应磁盘上一个固定大小的文件,默认文件路径是${StoreHome}/timerwheel
,默认大小约 37 MB。
TimerWheel 由若干个刻度组成,一个刻度就代表一个时间单位,RocketMQ 用Slot
类来描述刻度,默认一个 Slot 代表一秒。TimerWheel 默认有 14 天内以秒为单位的所有 Slot,即 1209600 个 Slot。
每个 Slot 占用固定的 32 字节,格式如下:
字段 | 长度(字节) | 说明 |
---|---|---|
timeMs | 8 | 延迟时间 |
firstPos | 8 | 第一个消息的位置 |
lastPos | 8 | 最后一个消息的位置 |
num | 4 | 消息数量 |
magic | 4 | 魔数(废弃) |
每个 Slot 都有对应的延迟时间,相同延迟时间的消息才会被添加进来,多个延时消息会串联成链表,Slot 用两个指针分别指向了链表的首尾地址。现在的问题是,延时消息存到哪里呢?
为了保证消息写入的性能,延时消息会顺序写入到timerlog
文件里,它有点类似 CommitLog,区别是 timerlog 不会存储完整的消息,因为太浪费空间了。延时消息会被转换成固定的 52 字节的 TimerLog 写入。
写入的 TimerLog 格式如下:
字段 | 长度(字节) | 说明 |
---|---|---|
size | 4 | log 长度,固定值 52 |
prev pos | 8 | 前一条 log 的位置 |
magic | 4 | 魔数 |
curr write time | 8 | 写入时间 |
delayed time | 4 | 延迟时间 |
offsetPy | 8 | 实际消息在 CommitLog 偏移量 |
sizePy | 4 | 实际消息大小 |
hash code of real topic | 4 | 真实 Topic 哈希码 |
reserved value | 8 | 保留位 未使用 |
当有延时消息要被添加到 TimerWheel,首先要根据消息的延迟时间定位到 Slot,然后转换成 52 字节的 TimerLog,顺序写入 timerlog 文件,同时更新 Slot。
如图所示,现在要在 1号 Slot 上再添加一条延时消息1-4
,需要先把1-4
写入 timerlog,1-4
的 prevPos 指针会指向1-3
串联成链表,再把 Slot -> lastPos 指向1-4
。
延时消息存储起来了,接下就是线程定时扫描时间轮上的 Slot,判断消息如果到期就投递到原始 Topic 里面让消费者开始消费,如果消息没到期就重新投递进延时队列,进入下一期的时间轮。
不管是延时消息写入 timerlog、还是从 timerlog 取出消息重新投递,这些工作都是通过起单独的线程定时执行的,这里列举下相关的线程 Service:
- TimerEnqueueGetService:从rmq_sys_wheel_timer队列取出消息,构建TimerRequest放入enqueuePutQueue
- TimerEnqueuePutService:从enqueuePutQueue取出TimerRequest,写入timerlog
- TimerDequeueGetService:定时扫描时间轮,取出到期的TimerRequest,放入dequeueGetQueue
- TimerDequeueGutMessageService:从dequeueGetQueue取出TimerRequest,从CommitLog查询完整消息,放入dequeuePutQueue
- TimerDequeuePutMessageService:从dequeuePutQueue取出TimerRequest,判断消息是否到期,到期直接投递到目标Topic,没到期进入下一期时间轮
结合流程图再回顾一下整体流程,首先是延时消息发到 Broker 被写入 CommitLog:
然后是 TimerMessageStore 启动线程把延时消息写入 timerlog、再定时扫描到期消息重新投递的过程。
源码
客户端 SDK 发送延时消息前,会把延迟时间放在 Message -> system_properties 属性里面,Broker 收到消息准备 putMessage 前,会触发PutMessageHook
预留的钩子函数,其中一个叫handleScheduleMessage
的钩子就是专门处理延时消息的。
最终会调用HookUtils#handleScheduleMessage
方法,如果判断 Broker 启用了时间轮算法,且接收到的是延时消息,就会对消息进行转换。
public static PutMessageResult handleScheduleMessage(BrokerController brokerController,final MessageExtBrokerInner msg) {final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE|| tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {if (!isRolledTimerMessage(msg)) {if (checkIfTimerMessage(msg)) {if (!brokerController.getMessageStoreConfig().isTimerWheelEnable()) {return new PutMessageResult(PutMessageStatus.WHEEL_TIMER_NOT_ENABLE, null);}// 是延时消息 且启用时间轮算法 消息转换PutMessageResult tranformRes = transformTimerMessage(brokerController, msg);if (null != tranformRes) {return tranformRes;}}}// Delay Deliveryif (msg.getDelayTimeLevel() > 0) {transformDelayLevelMessage(brokerController, msg);}}return null;
}
转换的过程就是解析出延迟时间,然后把延迟时间和真实 Topic、QueueID 写入 properties,最后改写 Topic:
private static PutMessageResult transformTimerMessage(BrokerController brokerController, MessageExtBrokerInner msg) {//do transformint delayLevel = msg.getDelayTimeLevel();long deliverMs;try {// 从properties取出延迟时间if (msg.getProperty(MessageConst.PROPERTY_TIMER_DELAY_SEC) != null) {deliverMs = System.currentTimeMillis() + Long.parseLong(msg.getProperty(MessageConst.PROPERTY_TIMER_DELAY_SEC)) * 1000;} else if (msg.getProperty(MessageConst.PROPERTY_TIMER_DELAY_MS) != null) {deliverMs = System.currentTimeMillis() + Long.parseLong(msg.getProperty(MessageConst.PROPERTY_TIMER_DELAY_MS));} else {deliverMs = Long.parseLong(msg.getProperty(MessageConst.PROPERTY_TIMER_DELIVER_MS));}} catch (Exception e) {return new PutMessageResult(PutMessageStatus.WHEEL_TIMER_MSG_ILLEGAL, null);}if (deliverMs > System.currentTimeMillis()) {if (delayLevel <= 0 && deliverMs - System.currentTimeMillis() > brokerController.getMessageStoreConfig().getTimerMaxDelaySec() * 1000) {return new PutMessageResult(PutMessageStatus.WHEEL_TIMER_MSG_ILLEGAL, null);}// 处理精度int timerPrecisionMs = brokerController.getMessageStoreConfig().getTimerPrecisionMs();if (deliverMs % timerPrecisionMs == 0) {deliverMs -= timerPrecisionMs;} else {deliverMs = deliverMs / timerPrecisionMs * timerPrecisionMs;}if (brokerController.getTimerMessageStore().isReject(deliverMs)) {return new PutMessageResult(PutMessageStatus.WHEEL_TIMER_FLOW_CONTROL, null);}// 改写Topic,把真实Topic写入propertiesMessageAccessor.putProperty(msg, MessageConst.PROPERTY_TIMER_OUT_MS, deliverMs + "");MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));msg.setTopic(TimerMessageStore.TIMER_TOPIC);msg.setQueueId(0);} else if (null != msg.getProperty(MessageConst.PROPERTY_TIMER_DEL_UNIQKEY)) {return new PutMessageResult(PutMessageStatus.WHEEL_TIMER_MSG_ILLEGAL, null);}return null;
}
后续就是把消息写入 CommitLog,因为改写了 Topic,所以消费者现在是没办法消费延时消息的。
接着就是 TimerEnqueueGetService 线程消费rmq_sys_wheel_timer
队列,取出延时消息,构建 TimerRequest 放到 enqueuePutQueue。
public boolean enqueue(int queueId) {......ConsumeQueue cq = (ConsumeQueue) this.messageStore.getConsumeQueue(TIMER_TOPIC, queueId);SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(offset);for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {// 取出原始消息MessageExt msgExt = getMessageByCommitOffset(offsetPy, sizePy);// 延迟时间long delayedTime = Long.parseLong(msgExt.getProperty(TIMER_OUT_MS));// use CQ offset, not offset in MessagemsgExt.setQueueOffset(offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE));TimerRequest timerRequest = new TimerRequest(offsetPy, sizePy, delayedTime, System.currentTimeMillis(), MAGIC_DEFAULT, msgExt);while (true) {// 延时消息封装成TimerRequest入队if (enqueuePutQueue.offer(timerRequest, 3, TimeUnit.SECONDS)) {break;}if (!isRunningEnqueue()) {return false;}}}......
}
再是 TimerEnqueuePutService 线程从 enqueuePutQueue 取出 先前构建好的TimerRequest,写入 timerlog。
public void run() {......while (!this.isStopped() || enqueuePutQueue.size() != 0) {TimerRequest firstReq = enqueuePutQueue.poll(10, TimeUnit.MILLISECONDS);// 延迟时间小于当前时间,入队dequeuePutQueueif (shouldRunningDequeue && req.getDelayTime() < currWriteTimeMs) {dequeuePutQueue.put(req);} else {// 写入timerlogboolean doEnqueueRes = doEnqueue(req.getOffsetPy(), req.getSizePy(), req.getDelayTime(), req.getMsg());req.idempotentRelease(doEnqueueRes || storeConfig.isTimerSkipUnknownError());}}......
}
首先需要定位到 Slot,再顺序写 timerlog,最后更新 Slot 信息。
public boolean doEnqueue(long offsetPy, int sizePy, long delayedTime, MessageExt messageExt) {LOGGER.debug("Do enqueue [{}] [{}]", new Timestamp(delayedTime), messageExt);long tmpWriteTimeMs = currWriteTimeMs;// 超过一轮时间周期,到期后需要重新进入下一期时间轮等待boolean needRoll = delayedTime - tmpWriteTimeMs >= timerRollWindowSlots * precisionMs;int magic = MAGIC_DEFAULT;if (needRoll) {magic = magic | MAGIC_ROLL;if (delayedTime - tmpWriteTimeMs - timerRollWindowSlots * precisionMs < timerRollWindowSlots / 3 * precisionMs) {//give enough time to next rolldelayedTime = tmpWriteTimeMs + (timerRollWindowSlots / 2) * precisionMs;} else {delayedTime = tmpWriteTimeMs + timerRollWindowSlots * precisionMs;}}boolean isDelete = messageExt.getProperty(TIMER_DELETE_UNIQKEY) != null;if (isDelete) {magic = magic | MAGIC_DELETE;}String realTopic = messageExt.getProperty(MessageConst.PROPERTY_REAL_TOPIC);// 定位Slot,顺序写timerLogSlot slot = timerWheel.getSlot(delayedTime);ByteBuffer tmpBuffer = timerLogBuffer;tmpBuffer.clear();tmpBuffer.putInt(TimerLog.UNIT_SIZE); //sizetmpBuffer.putLong(slot.lastPos); //prev postmpBuffer.putInt(magic); //magictmpBuffer.putLong(tmpWriteTimeMs); //currWriteTimetmpBuffer.putInt((int) (delayedTime - tmpWriteTimeMs)); //delayTimetmpBuffer.putLong(offsetPy); //offsettmpBuffer.putInt(sizePy); //sizetmpBuffer.putInt(hashTopicForMetrics(realTopic)); //hashcode of real topictmpBuffer.putLong(0); //reserved value, just set to 0 nowlong ret = timerLog.append(tmpBuffer.array(), 0, TimerLog.UNIT_SIZE);if (-1 != ret) {// 更新timerWheel对应的SlottimerWheel.putSlot(delayedTime, slot.firstPos == -1 ? ret : slot.firstPos, ret,isDelete ? slot.num - 1 : slot.num + 1, slot.magic);addMetric(messageExt, isDelete ? -1 : 1);}return -1 != ret;
}
不管是 timerlog 还是 timerWheel 文件,都是需要频繁写的,为了提高性能,RocketMQ 均使用 mmap 技术写,然后定时 flush 到磁盘。
然后是 TimerDequeueGetService 线程,定时扫描时间轮,取出到期的TimerRequest,放入dequeueGetQueue
public int dequeue() throws Exception {// 定位到SlotSlot slot = timerWheel.getSlot(currReadTimeMs);if (-1 == slot.timeMs) {// Slot是空的moveReadTime();return 0;}long currOffsetPy = slot.lastPos;while (currOffsetPy != -1) {// 定位timerlogtimeSbr = timerLog.getWholeBuffer(currOffsetPy);int position = (int) (currOffsetPy % timerLogFileSize);timeSbr.getByteBuffer().position(position);timeSbr.getByteBuffer().getInt(); //sizeprevPos = timeSbr.getByteBuffer().getLong();int magic = timeSbr.getByteBuffer().getInt();long enqueueTime = timeSbr.getByteBuffer().getLong();long delayedTime = timeSbr.getByteBuffer().getInt() + enqueueTime;long offsetPy = timeSbr.getByteBuffer().getLong();int sizePy = timeSbr.getByteBuffer().getInt();// timerlog再转换成TimerRequestTimerRequest timerRequest = new TimerRequest(offsetPy, sizePy, delayedTime, enqueueTime, magic);timerRequest.setDeleteList(deleteUniqKeys);for (List<TimerRequest> normalList : splitIntoLists(normalMsgStack)) {for (TimerRequest tr : normalList) {tr.setLatch(normalLatch);}// TimerRequest入队dequeueGetQueuedequeueGetQueue.put(normalList);}}......
}
再是 TimerDequeueGetMessageService 线程,从 dequeueGetQueue 取出 TimerRequest,从 CommitLog 查询完整消息,放入 dequeuePutQueue。
public void run() {while (!this.isStopped()) {List<TimerRequest> trs = dequeueGetQueue.poll(100 * precisionMs / 1000, TimeUnit.MILLISECONDS);for (int i = 0; i < trs.size(); ) {// CommitLog查询完整消息MessageExt msgExt = getMessageByCommitOffset(tr.getOffsetPy(), tr.getSizePy());String uniqkey = MessageClientIDSetter.getUniqID(msgExt);if (null != uniqkey && tr.getDeleteList() != null && tr.getDeleteList().size() > 0 && tr.getDeleteList().contains(uniqkey)) {doRes = true;tr.idempotentRelease();perfs.getCounter("dequeue_delete").flow(1);} else {tr.setMsg(msgExt);while (!isStopped() && !doRes) {// 入队doRes = dequeuePutQueue.offer(tr, 3, TimeUnit.SECONDS);}}}}
}
最后是 TimerDequeuePutMessageService 线程,从 dequeuePutQueue 取出 TimerRequest,判断消息是否到期,到期直接投递到真实的 Topic,没到期进入下一期时间轮。
public void run() {while (!this.isStopped() || dequeuePutQueue.size() != 0) {TimerRequest tr = dequeuePutQueue.poll(10, TimeUnit.MILLISECONDS);// 消息转换 如果到期则复原Topic queueIDMessageExtBrokerInner msg = convert(tr.getMsg(), tr.getEnqueueTime(), needRoll(tr.getMagic()));doRes = PUT_NEED_RETRY != doPut(msg, needRoll(tr.getMagic()));while (!doRes && !isStopped()) {if (!isRunningDequeue()) {dequeueStatusChangeFlag = true;tmpDequeueChangeFlag = true;break;}// 重写写入CommitLogdoRes = PUT_NEED_RETRY != doPut(msg, needRoll(tr.getMagic()));Thread.sleep(500 * precisionMs / 1000);}}.......
}
时间轮对应的类是 TimerWheel,它对应磁盘上的一个文件,由若干个 Slot 组成,因为要随机读写,所以 RocketMQ 使用 RandomAccessFile 来读写文件。
public class TimerWheel {// 总的Slot数量public final int slotsTotal;// 时间精度public final int precisionMs;// 文件名private String fileName;private final RandomAccessFile randomAccessFile;private final FileChannel fileChannel;// mmapprivate final MappedByteBuffer mappedByteBuffer;private final ByteBuffer byteBuffer;private final ThreadLocal<ByteBuffer> localBuffer = new ThreadLocal<ByteBuffer>() {@Overrideprotected ByteBuffer initialValue() {return byteBuffer.duplicate();}};// 时间轮文件大小private final int wheelLength;
}
Slot 类的属性:
public class Slot {public static final short SIZE = 32;public final long timeMs; // 延迟时间public final long firstPos; // 第一个消息的位置public final long lastPos; // 最后一个消息的位置public final int num; // 消息数量public final int magic; //no use now, just keep it
}
最后是 TimerLog,它底层对应一组文件,单个文件限制在 100MB 大小,如果写满了就创建新的文件继续写。因为是顺序写的,所以效率很高。
public class TimerLog {private static InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);public final static int BLANK_MAGIC_CODE = 0xBBCCDDEE ^ 1880681586 + 8;private final static int MIN_BLANK_LEN = 4 + 8 + 4;public final static int UNIT_SIZE = 4 //size+ 8 //prev pos+ 4 //magic value+ 8 //curr write time, for trace+ 4 //delayed time, for check+ 8 //offsetPy+ 4 //sizePy+ 4 //hash code of real topic+ 8; //reserved value, just in case ofpublic final static int UNIT_PRE_SIZE_FOR_MSG = 28;public final static int UNIT_PRE_SIZE_FOR_METRIC = 40;private final MappedFileQueue mappedFileQueue;private final int fileSize;
}
尾巴
RocketMQ 5.0 解除了 4.x 版本延时消息延迟级别的时间限制,现在生产者可以设置任意延迟时间了,功能上更加强大,实现上也更加复杂。RocketMQ 引入了新的时间轮算法,简单理解就是把时间按照精度划分成 N 个Slot,消息会按照延迟时间加入到对应的 Slot,然后线程定时扫描时间轮,把 Slot 对应的到期消息重新投递即可。新的算法不仅实现更复杂,RocketMQ 还需要额外写 timerwheel 和 timerlog 文件,这两个文件也是要持久化定期刷盘的。