<h2>导语</h2>
随着分布式系统架构的普及,消息队列已成为支撑大规模、高并发在线业务的核心组件之一。腾讯云消息队列 RocketMQ 版作为一款高性能、高可靠的消息中间件,通过提供稳定、低延迟的消息服务,帮助企业轻松应对业务洪峰、实现系统解耦。
最初的消息队列只支持简单的在线消息收发,但随着业务场景的丰富,越来越多的需求涌现,例如订单超时处理、轻量级延时任务调度、定时通知推送等场景,这些场景都要求消息能够延迟消费。为此,消息队列技术不断演进,最初 Apache RocketMQ 实现了多级别延时消息,但是无法满足更灵活的延时场景,因此演进为文件版定时消息时间轮,到腾讯云消息队列 RocketMQ 版也推出 RocksDB 版本定时消息多级时间轮,最终实现了高吞吐场景下定时消息场景的精准调度。
本文将带您深入探索 TDMQ RocketMQ 版秒级定时消息的实现原理:
-
首先从典型业务场景切入,看看定时消息在分布式定时调度、电商等场景的应用;
-
接着回顾定时消息的技术演进历程,了解定时消息如何从基础延时功能发展为高精度调度系统;
-
最后,深入核心架构设计,解析定时消息技术原理,并介绍腾讯云基于 RocksDB 版本定时消息多级时间轮的优化创新,揭秘秒级定时投递的底层逻辑。
概念与应用场景
定时消息,顾名思义就是当用户将消息封装定时属性后发送到 MQ,MQ 会在指定时间后将消息暴露给消费者消费,而在未达到指定时间期间消息对消费者是不可见的。这种特性在分布式系统中具有广泛的应用价值,以下是两个典型场景:
1、大量分布式定时任务调度
在分布式定时调度场景下,需要实现各类精度的定时任务,例如每天5点执行文件清理、每隔2分钟触发一次消息推送等需求。传统的基于数据库的定时调度方案实现复杂,且在高并发场景下性能较差。基于 Apache RocketMQ 的定时消息可以封装灵活的定时消息,同时使用方法比较简单。
2、电商场景的订单超时取消/延迟支付订单
以电商交易场景为例,用户下单后若未及时支付,系统通常需要在一定时间后自动关闭订单。使用 Apache RocketMQ 定时消息可以实现超时任务的检查触发,一方面可以定时去关闭订单,另一方面支持定时消息的撤回,如果订单已经支付,可以发送一条删除属性的消息进行 Abort。
基于定时消息的超时任务处理具备如下优势:
-
精度高、开发门槛低:基于消息通知方式不存在定时阶梯间隔。可以轻松实现任意精度事件触发,无需业务去重。
-
高性能可扩展:传统的数据库扫描方式较为复杂,需要频繁调用接口扫描,容易产生性能瓶颈。 Apache RocketMQ 的定时消息具有高并发和水平扩展的能力。
技术演进路线
术语说明
- 消息存储文件 CommitLog**: 存储原始消息的文件。
- 消息索引文件 ConsumeQueue: 存储消息索引数据,定位到 CommitLog。
- Real Topic:用户指定消息投递到的 Topic。
- 延时主题(Delay Topic)/定时主题(Timer Topic) :所有的延时/定时消息都不会直接发到 Real Topic,而是先发到定时/延时 Topic。
多级别延时消息
最开始,Apache RocketMQ 复用 RetryTopic 的多级重试投递逻辑,采用延时队列来实现延时消息。但是由于每一条队列对应一个延迟时间,导致队列数量成为瓶颈,因此最终支持多级别延时消息。以下是目前所支持的18个延时级别分别对应的延时时间。
1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
技术实现原理
多级别延时消息底层基于一个延时 Topic,多条 Queue,每一条 Queue 对应一个延时级别。
// 延时消息服务启动
public void start() {
if (started.compareAndSet(false, true)) {
this.load();
// 创建与延时级别一样数量的线程
this.deliverExecutorService = ThreadUtils.newScheduledThreadPool(this.maxDelayLevel, new ThreadFactoryImpl("ScheduleMessageTimerThread_"));
// 初始化线程池等操作...
for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
// 为每个延时等级都创建定时任务
if (timeDelay != null) {
if (this.enableAsyncDeliver) {
this.handleExecutorService.schedule(new HandlePutResultTask(level),
FIRST_DELAY_TIME, TimeUnit.MILLISECONDS);
}
this.deliverExecutorService.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME, TimeUnit.MILLISECONDS);
}
}
// 位点以及数据持久化定时任务... 、
}
}
-
当延时消息进入时会判断延时等级属性,放进相应的队列尾部。
-
通过线程池对每个队列进行轮询扫描,判断头部消息是否达到延时时间,如果达到则将消息投递到 Real Topic,否则继续轮询。
注意:多级别延时消息受到消息存储时间限制;
// 执行循环扫描
public void executeOnTimeUp() {
ConsumeQueueInterface cq;
// 获取cq,如果cq为null则新建定时任务并校准下次扫描开始位点
....
// 设置本次扫描开始位点
long nextOffset = this.offset;
try {
while (bufferCQ.hasNext() && isStarted()) {
CqUnit cqUnit = bufferCQ.next();
long offsetPy = cqUnit.getPos();
int sizePy = cqUnit.getSize();
long tagsCode = cqUnit.getTagsCode();
// 判断是否达到延时时间
long now = System.currentTimeMillis();
long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);
// 计算下次拉取offset
long currOffset = cqUnit.getQueueOffset();
nextOffset = currOffset + cqUnit.getBatchNum();
long countdown = deliverTimestamp - now;
if (countdown > 0) {
// 未到延时时间,则创建下次定时任务并return
this.scheduleNextTimerTask(currOffset, DELAY_FOR_A_WHILE);
ScheduleMessageService.this.updateOffset(this.delayLevel, currOffset);
return;
}
MessageExt msgExt = ScheduleMessageService.this.brokerController.
getMessageStore().lookMessageByOffset(offsetPy, sizePy);
// 获取消息内容并重新投递
if (ScheduleMessageService.this.enableAsyncDeliver) {
this.asyncDeliver(msgInner, msgExt.getMsgId(), currOffset, offsetPy, sizePy);
} else {
this.syncDeliver(msgInner, msgExt.getMsgId(), currOffset, offsetPy, sizePy);
}
//如果投递失败,立即创建下次任务,并设置下次任务开始扫描位点为currOffset,防止消息丢失
}
} catch (Exception e) {
......
} finally {
bufferCQ.release();
}
this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE);
}
使用示例
// 在Producer端设置消息为延时消息
Message msg = new Message();
msg.setTopic("TopicA");
msg.setTags("Tag");
msg.setBody("this is a delay message".getBytes());
// 设置延迟level为5,对应延迟1分钟
msg.setDelayTimeLevel(5);
producer.send(msg);
存在哪些痛点
-
系统仅支持预定义的有限延迟级别,限制了延迟配置的灵活性与适应性。
-
最大延迟时间存在硬性上限,无法满足超长周期延迟任务的需求。
-
延迟时间的精度控制不足,难以实现细粒度的延迟调度。
由于多级别延时受限于队列数量,因此我们需要额外探索一种支持高精度、超长延时,自定义延时的实现,这时便出现了超长秒级定时消息。
超长秒级定时消息
超长秒级定时消息支持用户设置任意时间(社区版默认最长7天), Apache RocketMQ 社区引入文件版时间轮来实现,同时支持定时消息的撤回,在设计上通过消息的重新投递使超长延时消息不受消息存储时间限制。
技术实现原理
设计上的考虑:
-
定时消息实现不侵入原本存储逻辑,防止互相影响,通过将定时消息写入定时消息主题,对该主题的索引文件扫描从而拿到原始消息;
-
实现任意时间定时的要点在于知道在某一时刻需要投递哪些消息,因此需要额外设计存储格式,同时尽可能复用 Commitlog 消息文件存储,通过引入定时消息索引文件,原始消息存储在 Commitlog,同时为了能扫描出此刻的所有定时消息并且兼顾消息写入性能,采用链表结构进行索引单元的链接,定时消息索引文件写入直接 Append-only Log(顺序写入),保证了消息写入的性能。
-
为了定位第一条定时消息索引,引出时间轮结构,需要作为中间层去精准访问定时消息索引文件。
最终社区为定时消息(在 rip-43)引入两个存储文件:Timelog + Timewheel。
- TimerWheel 是时间轮的文件,表示投递时间,它保存了2天(默认,同时保证超长定时消息不受消息存储时间限制)内的所有时间窗。每个槽位表示一个对应的投递时间窗,并且可以调整槽位对应的时间窗长度来控制定时的精确度。采用时间轮的好处是它可以复用,在2天之后无需新建时间轮文件,而是只要将当前的时间轮直接覆盖即可。
/**
* Represents a slot of timing wheel. Format:
* ┌────────────┬───────────┬───────────┬───────────┬───────────┐
* │delayed time│ first pos │ last pos │ num │ magic │
* ├────────────┼───────────┼───────────┼───────────┼───────────┤
* │ 8bytes │ 8bytes │ 8bytes │ 4bytes │ 4bytes │
* └────────────┴───────────┴───────────┴───────────┴───────────┘
*/
- TimerLog 是定时消息索引文件,保存定时消息的索引(在消息文件中存储的位置),内部通过反向链表进行链接,它的写入为 Append-only Log,保证了消息写入的性能。
public final static int UNIT_SIZE = 4 //size
+ 8 //prev pos
+ 4 //magic value
+ 8 //curr write time, for trace
+ 4 //delayed time, for check
+ 8 //offsetPy
+ 4 //sizePy
+ 4 //hash code of real topic
+ 8; //reserved value, just in case of
TimerWheel 中的每个槽位都可以保存一个指向 TimerLog 中某个元素的索引,TimerLog 中的元素又保存它前一个元素的索引。TimerLog 呈链表结构,存储着 TimerWheel 对应槽位时间窗所要投递的所有定时消息。
从图中可以看出,共有五个 Service 分别处理定时消息的放置和存储。工作流如下:
- 针对放置定时消息的 Service,每50ms从消息文件读取指定主题(TIMER_TOPIC)的定时消息。
a. TimerEnqueueGetService 从消息文件读取得到定时主题的消息,并先将其放入 EnqueuePutQueue。
public boolean enqueue(int queueId) {
// 判断服务是否正常执行,获取cq文件准备遍历
ConsumeQueueInterface cq = this.messageStore.getConsumeQueue(TIMER_TOPIC, queueId);
// 校准位点...
long offset = currQueueOffset;
ReferredIterator<CqUnit> iterator = null;
try {
iterator = cq.iterateFrom(offset);
if (null == iterator) {
return false;
}
// 开始遍历索引文件并从CommitLog读取出消息体
int i = 0;
while (iterator.hasNext()) {
i++;
try {
CqUnit cqUnit = iterator.next();
// 计算消息读取位点与消息大小
MessageExt msgExt = getMessageByCommitOffset(offsetPy, sizePy);
if (null == msgExt) {
perfCounterTicks.getCounter("enqueue_get_miss");
} else {
// 读取消息的相关信息进行timerRequest构造并投递到enqueuePutQueue
TimerRequest timerRequest = new TimerRequest(offsetPy, sizePy,
delayedTime, System.currentTimeMillis(), MAGIC_DEFAULT, msgExt);
while (!enqueuePutQueue.offer(timerRequest, 3, TimeUnit.SECONDS)) {
if (!isRunningEnqueue()) {
return false;
}
}
}
// 指标计算等逻辑... }
} catch (Exception e) {...} finally {...}
// if broker role changes, ignore last enqueue
if (!isRunningEnqueue()) {
return false;
}
currQueueOffset = offset + i;
}
currQueueOffset = offset + i;
return i > 0;
} catch (Exception e) {...} finally {
if (iterator != null) {
iterator.release();
}
}
return false;
}
b. 另一个线程 TimerEnqueuePutService 将其执行 Timerlog-unit 构建逻辑并放入 TimerLog,更新时间轮(Timewheel)的存储内容。
protected void putMessageToTimerWheel(TimerRequest req) {
try {
// 指标计算...+如果消息已经到期直接投递到dequeuePutQueue等待投递到realTopic
if (shouldRunningDequeue && req.getDelayTime() < currWriteTimeMs) {
req.setEnqueueTime(Long.MAX_VALUE);
dequeuePutQueue.put(req);
} else {
boolean doEnqueueRes = doEnqueue(
req.getOffsetPy(), req.getSizePy(), req.getDelayTime(), req.getMsg());
req.idempotentRelease(doEnqueueRes || storeConfig.isTimerSkipUnknownError());
}
} catch (Throwable t) {...}
}
public boolean doEnqueue(long offsetPy, int sizePy, long delayedTime, MessageExt messageExt) {
// 调整deleayTime等操作...
String realTopic = messageExt.getProperty(MessageConst.PROPERTY_REAL_TOPIC);
Slot slot = timerWheel.getSlot(delayedTime);
// 构造timeLog unit
ByteBuffer tmpBuffer = timerLogBuffer;
tmpBuffer.clear();
tmpBuffer.putInt(TimerLog.UNIT_SIZE); //size
//...
long ret = timerLog.append(tmpBuffer.array(), 0, TimerLog.UNIT_SIZE);
if (-1 != ret) {
// 如果timelog更新成功则更新timewheel
timerWheel.putSlot(delayedTime, slot.firstPos == -1 ? ret : slot.firstPos, ret,
isDelete ? slot.num - 1 : slot.num + 1, slot.magic);
}
return -1 != ret;
}
- 针对取出定时消息的 Service,每50ms读取下一秒的 Slot。有三个线程将读取到的消息重新放回 CommitLog。
a. 首先,TimerDequeueGetService 每50ms读一次下一秒的 Slot,从 TimerLog 中得到指定的数据,并放进 dequeueGetQueue。
public int dequeue() throws Exception {
// 判断消息是否正常出队
Slot slot = timerWheel.getSlot(currReadTimeMs);
try {
long currOffsetPy = slot.lastPos;
Set<String> deleteUniqKeys = new ConcurrentSkipListSet<>();
LinkedList<TimerRequest> normalMsgStack = new LinkedList<>();
LinkedList<TimerRequest> deleteMsgStack = new LinkedList<>();
LinkedList<SelectMappedBufferResult> sbrs = new LinkedList<>();
SelectMappedBufferResult timeSbr = null;
//read the timer log one by one, 开始遍历当前时间的所有消息
while (currOffsetPy != -1) {
// 读取出timelog对应的unit
if (null == timeSbr || timeSbr.getStartOffset() > currOffsetPy) {
timeSbr = timerLog.getWholeBuffer(currOffsetPy);
if (null != timeSbr) {
sbrs.add(timeSbr);
}
}
if (null == timeSbr) {
break;
}
long prevPos = -1;
try {
// 读取buffer获取相关数据并构造timerRequest
TimerRequest timerRequest = new TimerRequest(offsetPy, sizePy,
delayedTime, enqueueTime, magic);
timerRequest.setDeleteList(deleteUniqKeys);
// 判断消息是否不用滚动并是删除类型,是就加进deleteMsgStack表示删除类型消息
if (needDelete(magic) && !needRoll(magic)) {
deleteMsgStack.add(timerRequest);
} else {
normalMsgStack.addFirst(timerRequest);
}
} catch (Exception e) {...} finally {//计算下次读取timelog位点,以及指标计算}
}
// 下面先投递delete类型msg,确保可以精准删除对应消息
CountDownLatch deleteLatch = new CountDownLatch(deleteMsgStack.size());
//read the delete msg: the msg used to mark another msg is deleted
for (List<TimerRequest> deleteList : splitIntoLists(deleteMsgStack)) {
for (TimerRequest tr : deleteList) {
tr.setLatch(deleteLatch);
}
dequeueGetQueue.put(deleteList);
}
//do we need to use loop with tryAcquire
checkDequeueLatch(deleteLatch, currReadTimeMs);
CountDownLatch normalLatch = new CountDownLatch(normalMsgStack.size());
//read the normal msg
for (List<TimerRequest> normalList : splitIntoLists(normalMsgStack)) {
for (TimerRequest tr : normalList) {
tr.setLatch(normalLatch);
}
dequeueGetQueue.put(normalList);
}
checkDequeueLatch(normalLatch, currReadTimeMs);
} catch (Throwable t) {...}
return 1;
}
b. 而后 TimerDequeueGetMessageService 从 dequeueGetQueue 中取出数据并根据索引信息,从消息文件中查出对应的 msgs,并将其放入待写入消息文件的队列中(dequeuePutQueue)。
c. 最后 TimerDequeuePutMessageService 将这个 Putqueue 中的消息取出,若已到期则修改 Topic,放回 Commitlog(投递到真正的 Topic),否则继续按指定主题(TIMER_TOPIC)写回 CommitLog 滚动(避免消息过期)。
使用示例
Message message = new Message(TOPIC, ("Hello" + i).getBytes(StandardCharsets.UTF_8));
// 延迟 10s 后投递
message.setDelayTimeSec(10);
// 延迟 10000ms 后投递,投递到服务端后计算定时时间,即投递到服务端的时间+delayTime
message.setDelayTimeMs(10_000L);
// 定时投递,定时时间为当前时间 + 10000ms
message.setDeliverTimeMs(System.currentTimeMillis() + 10_000L);
// 发送消息
SendResult result = producer.send(message);
腾讯云定时消息的技术优化
技术实现原理
至此我们已经支持了秒级精度,超长时间定时消息,看似已经是完美的实现,那这文件版时间轮定时消息还有什么问题吗?
RocketMQ 5.0 社区开发的基于文件版,利用反向链表索引的方案,大大降低了存储成本,但是反向链表的扫描效率不高,SSD 盘下基本1000 TPS 就会成为瓶颈,造成调度误差增大。
腾讯云选取 RocksDB 支持定时消息多级时间轮,利用 KV 结构可以快速范围扫描某一时刻的定时消息,保证更精准的定时调度。
我们为小时/分钟/秒钟都设置为一个 Wheel 管理,类似钟表,通过秒针转动一圈驱动分针转动,分针转动一圈驱动时针转动;当定时超过一天时,仍放进小时级别的时间轮,后续会重新投递此消息,避免消息过期;
压测实验结果
压测场景
- 定时业务场景: 发送一亿条消息,延迟时间随机30秒~10分钟,一个下游实时消费。
- 普通对比场景: 发型普通消息一亿条消息,一个下游实时消费,对比资源利用率。
- Broker节点规格: 8C16G 1T SSD 云盘。
- 延迟误差定义: 定时消息指定的预期触发时间和实际触发消息的差值,预期1s以内,超过1s时,对下游表现为消费延迟增大。
压测结论
-
定时消息在写入 14000 TPS 时,发送耗时平稳,延迟误差可以控制在1s内。
-
对比普通消息,定时消息对资源的消耗约为普通消息的一倍,符合设计预期。
压测详细数据
延迟时间随机30秒~10分钟,延迟误差比较低,P999 稳定在1s以内。
生产线程数 | 生产TPS | 发送平均耗时 (ms) | 延迟误差P50 (ms) | 延迟误差P90 (ms) | 延迟误差P99 (ms) | 延迟误差P999 (ms) | 机器负载 |
---|---|---|---|---|---|---|---|
16 | 14000 | 1.2 | 301.0 | 690.0 | 904.0 | 979.0 | cpu30% |
普通消息对比
作为对比,普通消息14000 TPS,发送耗时稳定,并且 CPU 利用率是定时消息的一半,符合设计预期,发送耗时和消费耗时也符合预期。
生产线程数 | 生产TPS | 发送平均耗时 (ms) | 端到端平均耗时 (ms) | 机器负载 |
---|---|---|---|---|
16 | 14000 | 1.1 | 2 | cpu15% |
方案优势总结
-
高精度调度:在较高吞吐下,依然保持1s的精准调度。
-
无限延时支持:理论上支持无限的延时时长。
-
精细化监控:提供更精准的指标统计,便于问题排查。
-
低成本高可靠:嵌入式 KV 不需要外挂其他存储,降低运维复杂度与成本。
消息查询优化
社区版对于未触发的定时消息,根据 MessageID 无法查询,腾讯云版优化后支持根据 MessageID 查询未触发的定时消息。
使用示例
发送消息:延时一分钟
SendResult [sendStatus=SEND_OK, msgId=155BEAE2BB5218B4AAC2318E41AF0000, offsetMsgId=0B8D67E5000022CF0000000001C66D54, messageQueue=MessageQueue [topic=test-timer, brokerName=vbroker-rmq-16x4gww4j5-0, queueId=1], queueOffset=1794, recallHandle=null]
根据 MessageID 查询并查看消息轨迹:
总结
腾讯云 TDMQ RocketMQ 版定时消息完全兼容开源,极其容易上手使用。
该方案在保证高性能的同时,兼顾了存储效率与调度准确性,是腾讯云在消息中间件领域的又一技术创新,欢迎大家来使用腾讯云 TDMQ RocketMQ 版!
</div>
维权提醒:如果你或身边的朋友近五年内因投顾公司虚假宣传、诱导交费导致亏损,别放弃!立即联系小羊维权(158 2783 9931,微信同号),专业团队帮你讨回公道! 📞立即免费咨询退费