【RocketMQ】RocketMQ 5.0版本任意时刻延迟消息的实现原理浅析

【RocketMQ】RocketMQ 5.0版本任意时刻延迟消息的实现原理浅析

文章目录

意外发现设计方案时间轮定时消息存储

具体实现流程图流程步骤

意外发现

无意间从官方的最新的客户端代码中看到下面的Example:

感兴趣的可以看看这个介绍:https://rocketmq.apache.org/docs/featureBehavior/02delaymessage

生产者:

// Send delay messages.

MessageBuilder messageBuilder = null;

// Specify a millisecond-level Unix timestamp. In this example, the specified timestamp indicates that the message will be delivered in 10 minutes from the current time.

Long deliverTimeStamp = System.currentTimeMillis() + 10L * 60 * 1000;

Message message = messageBuilder.setTopic("topic")

// Specify the message index key. The system uses the key to locate the message.

.setKeys("messageKey")

// Specify the message tag. The consumer can use the tag to filter messages.

.setTag("messageTag")

.setDeliveryTimestamp(deliverTimeStamp)

// Configure the message body.

.setBody("messageBody".getBytes())

.build();

try {

// Send the messages. Focus on the result of message sending and exceptions such as failures.

SendReceipt sendReceipt = producer.send(message);

System.out.println(sendReceipt.getMessageId());

} catch (ClientException e) {

e.printStackTrace();

}

可以看到一句:setDeliveryTimestamp(deliverTimeStamp),也就是说可以支持任意时刻的延迟消息了???

消费者:

// Consumption example 1: If a scheduled message is consumed by a push consumer, the consumer needs to process the message only in the message listener.

MessageListener messageListener = new MessageListener() {

@Override

public ConsumeResult consume(MessageView messageView) {

System.out.println(messageView.getDeliveryTimestamp());

// Return the status based on the consumption result.

return ConsumeResult.SUCCESS;

}

};

// Consumption example 2: If a scheduled message is consumed by a simple consumer, the consumer must obtain the message for consumption and submit the consumption result.

List messageViewList = null;

try {

messageViewList = simpleConsumer.receive(10, Duration.ofSeconds(30));

messageViewList.forEach(messageView -> {

System.out.println(messageView);

// After consumption is complete, the consumer must invoke ACK to submit the consumption result.

try {

simpleConsumer.ack(messageView);

} catch (ClientException e) {

e.printStackTrace();

}

});

} catch (ClientException e) {

// If the pull fails due to system traffic throttling or other reasons, you must re-initiate the request to obtain the message.

e.printStackTrace();

}

RocketMQ5.X版本新增了Proxy模块,从配置中可以看到,延迟消息方案目前默认还是通过按level来指定的,也就是说,可以选择不按level来执行了!!!

下面一起看看新版本是怎么实现任意时刻延迟消息的。

设计方案

时间轮

首先,RocketMQ对任意时刻延迟消息的支持,是基于主流的方案——时间轮做的,时间轮,对时刻表的一种抽象,通常使用数组实现。时刻表上的每一秒,顺序对应到数组中的位置,然后数组循环使用。时间轮的每一格,指向了TimerLog中的对应位置,如果这一格的时间到了,则按TimerLog中的对应位置以及prev_pos位置依次读出每条消息。时间轮一格一格向前推进,配合TimerLog,依次读出到期的消息,从而达到定时消息的目的。

时间轮的每一格设计如下:

定时消息存储

定时消息的记录文件,Append Only。每条记录包含一个prev_pos,指向前一条定时到同样时刻的记录。每条记录的内容可以包含定时消息本身,也可以只包含定时消息的位置信息。每一条记录包含如下信息:

名称大小备注size4B保存记录的大小prev_pos8B前一条记录的位置current_time8B当前时间magic4Bmagic valuedelayed_time4B该条记录的定时时间offset_real8B该条消息在commitLog中的位置size_real4B该条消息在commitLog中的大小hash_topic4B该条消息topic的hash codevarbody8B存储可变的body,暂时没有为空

具体实现

流程图

流程步骤

从图中可以看出,共有五个Service分别处理定时消息的放置和存储。工作流如下:

针对放置定时消息的service,每50ms从commitLog读取指定主题(rmq_sys_wheel_timer)的定时消息。

a. TimerEnqueueGetService从commitLog读取得到定时主题的消息,并先将其放入enqueuePutQueue。

org.apache.rocketmq.store.timer.TimerMessageStore.TimerEnqueueGetService#run

org.apache.rocketmq.store.timer.TimerMessageStore#enqueue

b. 另一个线程TimerEnqueuePutService将其放入timerLog,更新时间轮的存储内容。将该任务放进时间轮的指定位置。

org.apache.rocketmq.store.timer.TimerMessageStore.TimerEnqueuePutService#run

org.apache.rocketmq.store.timer.TimerMessageStore#doEnqueue

针对取出定时消息的service,每50ms读取下一秒的slot。有三个线程将读取到的消息重新放回commitLog。

a. 首先,TimerDequeueGetService每50ms读一次下一秒的slot,从timerLog中得到指定的msgs,并放进dequeueGetQueue。

org.apache.rocketmq.store.timer.TimerMessageStore.TimerDequeueGetService#run

org.apache.rocketmq.store.timer.TimerMessageStore#dequeue

b. 而后TimerDequeueGetMessageService从dequeueGetQueue中取出msg,并将其放入队列中。该队列为待写入commitLog的队列,dequeuePutQueue。

org.apache.rocketmq.store.timer.TimerMessageStore.TimerDequeueGetMessageService#run

c. 最后TimerDequeuePutMessageService将这个queue中的消息取出,若已到期则修改topic,放回commitlog,否则继续按原topic写回CommitLog滚动。

org.apache.rocketmq.store.timer.TimerMessageStore.TimerDequeuePutMessageService#run

org.apache.rocketmq.store.timer.TimerMessageStore#convertMessage

消息转换,更改真实Topic

投递消息

org.apache.rocketmq.store.timer.TimerMessageStore#doPut

消息投递到真实Topic后,其实就变成了一条“正常的消息”了,消费者就能正常消费了,以上就是对RocketMQ 5.0中延迟消息的变更做的分析,参考了部分官方的资料,后续会使用5.0版本,实际做一些演练,目前对于这个新特性,官方并没有大肆宣扬,也不知道具体有哪些限制,所以还需要做一些实践,踩踩坑。

相关文章