九. RocketMQ消息过滤原理
1. 认识消息过滤
①消息过滤的场景
在消息系统中,同一主题下的消息往往需要根据消费者类型进行选择性投递。典型场景包括:
多环境消息隔离:测试环境1的服务实例不应接收测试环境2的消息,需通过过滤实现环境隔离;
业务维度筛选:B2C业务系统需过滤掉B2B订单消息,避免无关消息干扰核心业务流程。
通过消息过滤(如Tag、SQL表达式等),可实现消息的精细化路由,确保消费者仅接收与其相关的消息子集,提升系统效率并降低处理冗余数据的开销。
②消息过滤的方式
客户端过滤:消息中间件(Broker)将主题下的所有消息通过网络全部推送给消费者客户端,由客户端根据自身需求进行筛选和处理。例如,Broker 发送10条消息,客户端可能只处理其中的2条。这种方式可能造成不必要的网络带宽消耗,尤其当过滤掉的消息占比较大时。
服务端过滤:过滤逻辑在 Broker 端执行。Broker 在投递前就根据消费者预设的规则(如标签Tag或SQL属性条件)进行筛选,仅将符合条件的目标消息发送给消费者。例如,从10条消息中筛选出2条进行投递。此机制有效减少了冗余的网络传输,提升了整体效率。
③客户端过滤VS服务端过滤
消息过滤可分为服务端过滤与客户端过滤,其本质差异在于过滤发生的位置及对资源的影响。服务端过滤(如RocketMQ支持的Tag或SQL过滤)类似于数据库查询中的
WHERE
条件筛选,仅在Broker端返回符合条件的目标消息,极大减少了网络传输数据量。而客户端过滤则需要消费者接收全部消息后自行筛选,会造成大量冗余数据传输和资源浪费。在高性能消息中间件(如RocketMQ)的海量消息场景中,网卡资源常成为系统瓶颈。服务端过滤通过牺牲少量Broker端CPU资源,显著节约了宝贵的网络带宽,避免无用消息对消费者端网络和计算资源的占用。这一特性虽不影响业务功能实现,但对提升系统整体吞吐、降低运营成本具有关键意义。
④服务端过滤的必要性
RocketMQ 的服务端消息过滤功能解决了单纯依赖 Topic 分类的局限性。虽然 Topic 本身可作为消息过滤手段(如不同消息投递至不同 Topic),但完全依赖 Topic 会导致主题数量爆炸式增长(如为每个产品线和操作类型创建独立 Topic),引发运维复杂性和性能问题。更关键的是,这种硬编码方式缺乏灵活性:新增主题需修改发送代码,且无法支持多维度过滤(如同时按产品线和操作类型筛选)。
通过 RocketMQ 的服务端过滤(如 Tag 或 SQL 表达式),只需一个统一主题(如
topic_order_event
),消费者即可基于消息属性订阅特定内容(如 B2C 产品线的支付消息)。这不仅减少了主题数量,降低了运维负担,还实现了消息的精细化路由,提升系统效率和可维护性。服务端过滤通过牺牲少量 Broker CPU 资源,显著节约网络带宽,避免冗余数据传输,是高并发场景下的最优选择。
⑤RocketMQ 的消息过滤模式
/**
* RocketMQ 消息过滤机制示例类
*
* 本类演示了 RocketMQ 的消息过滤原理,重点强调其服务端过滤特性。
* 尽管客户端代码可能包含基于 Tag 的 switch-case 逻辑,但这并非过滤本身,
* 而是不同 Tag 消息所需的差异化业务处理。RocketMQ 的过滤实际发生在服务端(Broker),
* 仅将符合条件(如 Tag 或 SQL 属性)的消息投递给消费者,减少网络传输开销。
*
* 发展历程:
* - 最初支持标签过滤(Tag)和类过滤(Class Filter),后者允许自定义类实现复杂过滤。
* - 贡献给 Apache 后,增加了 SQL92 语法属性过滤,基于消息 Property 进行多维度筛选。
* - 类过滤模式因维护复杂已被移除,当前官方支持标签过滤和属性过滤两种模式。
*
* 理解这一背景有助于阅读源码时识别兼容性设计,例如某些“怪异”代码可能是历史遗留。
*/
public class MessageFilterExample implements MessageListenerConcurrently {
/**
* 消息监听器实现方法
* 此方法常被误解为客户端过滤,但实际上 RocketMQ 已通过服务端过滤减少了消息量。
* 客户端收到的消息均是 Broker 筛选后的结果,这里的 switch-case 仅用于业务逻辑分发。
*
* @param msgs 消息列表(已由服务端过滤)
* @param context 消费上下文
* @return 消费状态
*/
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
// 基于 Tag 的业务逻辑处理:服务端过滤后,客户端仅需处理收到的消息
switch (msg.getTags()) {
case "TAG1":
// 消费 Tag1 消息的逻辑
processTag1(msg);
break;
case "TAG2":
// 消费 Tag2 消息的逻辑
processTag2(msg);
break;
default:
// 其他 Tag 处理(如有)
break;
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
private void processTag1(MessageExt msg) {
// 实现 Tag1 消息的具体业务逻辑
System.out.println("Processing TAG1 message: " + new String(msg.getBody()));
}
private void processTag2(MessageExt msg) {
// 实现 Tag2 消息的具体业务逻辑
System.out.println("Processing TAG2 message: " + new String(msg.getBody()));
}
}
2. 标签过滤模式的使用及原理
①标签过滤模式的使用
/**
* RocketMQ 标签过滤模式示例
*
* 标签过滤是最早的消息过滤模式,也是截至目前使用最多的模式之一。
* 以下代码演示了生产者和消费者如何利用标签过滤进行消息处理。
*/
public class TagFilterExample {
/**
* 生产者发送消息示例
* 发送消息时需要指定标签,如下代码将标签设置为"TAGA"
* 注意:RocketMQ不支持给一个消息打上多个标签
*/
public void producerExample() throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroup");
producer.start();
// 创建消息并指定标签
Message msg = new Message(
"TopicTest",
"TAGA", // 单标签设置
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)
);
producer.send(msg);
producer.shutdown();
}
/**
* 消费者订阅和处理消息示例
* 消费者可以订阅多个标签的消息(使用||分隔符)
* 虽然过滤实际发生在服务端,但客户端仍需区分不同标签的业务逻辑
*/
public void consumerExample() throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroup");
// 订阅多个标签的消息
consumer.subscribe("TopicTest", "TAGA || TAGB || TAGC");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
/**
* 重要说明:虽然此处使用switch-case区分标签处理逻辑,
* 但实际的消息过滤(筛选哪些消息应该投递给消费者)是在Broker服务端完成的。
* 客户端收到的消息已经是服务端过滤后的结果,这里的switch-case仅用于
* 区分不同标签消息的业务处理逻辑,并非执行过滤操作。
*/
for (MessageExt msg : msgs) {
switch (msg.getTags()) {
case "TAGA":
// 处理TAGA标签的消息
processTagA(msg);
break;
case "TAGB":
// 处理TAGB标签的消息
processTagB(msg);
break;
case "TAGC":
// 处理TAGC标签的消息
processTagC(msg);
break;
default:
// 处理其他标签(如有)
processDefault(msg);
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}
// 以下为各标签处理方法的伪实现
private void processTagA(MessageExt msg) {
System.out.println("Processing TAGA: " + new String(msg.getBody()));
}
private void processTagB(MessageExt msg) {
System.out.println("Processing TAGB: " + new String(msg.getBody()));
}
private void processTagC(MessageExt msg) {
System.out.println("Processing TAGC: " + new String(msg.getBody()));
}
private void processDefault(MessageExt msg) {
System.out.println("Processing default tag: " + new String(msg.getBody()));
}
}
②标签服务端过滤原理
尽管 RocketMQ 支持服务端消息过滤(基于 Tag 的哈希值比较,存储在 consumequeue 文件的 tagCode 字段中),但依然建议客户端进行消息筛选。这是因为服务端过滤依赖哈希匹配,虽极大提升性能(避免读取 commitlog 文件),但存在极低概率的哈希冲突风险:不同 Tag 可能哈希值相同,导致错误消息被投递。因此,客户端兜底过滤(如检查 Tag 实际值)可作为额外保障,确保消息准确性。
RocketMQ 选择存储哈希值而非原始 Tag 是为平衡性能与存储效率,但理论上可通过二次过滤(服务端或客户端层)避免开发者误解。实际应用中,哈希冲突概率极低,但谨慎设计仍推荐客户端筛选,尤其在高可靠性要求的场景。
3. 类过滤模式原理
早期 RocketMQ 除标签过滤(Tag)外,曾支持类过滤模式(Class Filter)。该模式允许客户端将自定义过滤逻辑(Java Class 文件)上传至与 Broker 同机部署的 Filter Server 执行,实现在服务端完成复杂过滤(如按时间、业务规则),以 CPU 资源换取网络带宽(避免冗余消息传输)。此设计体现了“逻辑靠近数据”的架构思想,但因需独立部署组件、存在安全风险(如恶意代码导致 Broker 死循环或内存溢出)且运维复杂,最终被弃用。
随着 SQL92 属性过滤模式的引入(支持基于消息属性的表达式过滤),类过滤模式因冗余性和风险性被彻底取代。SQL92 模式在保障表达灵活性的同时,通过沙箱机制规避安全风险,成为复杂过滤场景的标准解决方案。这一演进反映了 RocketMQ 在功能丰富性、安全性和易用性间的持续权衡与优化。
4. SQL92 属性过滤的使用及原理
标签过滤模式存在固有局限:单条消息仅支持绑定一个标签,无法满足多标签组合过滤的复杂场景需求。
为解决此问题,RocketMQ 借鉴 ActiveMQ 设计,引入基于 SQL92 标准的属性过滤模式。该模式以消息的 Property 字段为过滤依据,支持完整 SQL92 语法(包括比较运算符 >、<、=、<> 及逻辑运算符 AND/OR/NOT),实现多维度灵活过滤,显著提升复杂业务场景下的消息筛选能力。
①属性过滤的使用
例如, 一条消息定义有 3 个 Property: a、 b、 c, 消费的时候可以写这样的表达式 “ a> 5 AND b = abc ” , 就能做消息过滤了, 而且是服务端的过滤。
里面支持很多功能:
1)数字比较,支持>、>=、<、<=、between、=。
2)字符比较,支持=、<>、IN。
3)IS NULL或者IS NOT NULL。
4)逻辑运算,支持AND、OR、NOT。
②开启属性过滤
启用属性过滤(SQL92)功能需在 Broker 端额外配置以下参数(与标签过滤原理冲突,故默认关闭):
enablePropertyFilter=true:启用属性过滤能力
enableCalcFilterBitMap=true:启用过滤位图计算优化
enableConsumeQueueExt=true:启用消费队列扩展存储
此三项配置共同保障属性过滤的完整功能与性能,缺失任一将导致过滤失效或异常。
③属性过滤的原理
RocketMQ 的属性过滤(SQL92)机制面临一个核心性能挑战:由于 consumequeue 文件并未存储消息属性(property)信息,因此基于属性的过滤必须查询 commitlog 文件来获取完整的消息属性。这个过程涉及将消息数据从堆外内存复制到堆内再进行筛选,导致其性能显著低于直接基于 consumequeue 中 tagCode 的标签过滤。
为了优化此性能瓶颈,RocketMQ 引入了布隆过滤器(Bloom Filter) 进行预处理,其核心思想是以空间换时间。
-
布隆过滤器原理与优势
布隆过滤器是一种概率型数据结构,用于高效判断一个元素是否可能存在于一个集合中。其特点如下:
它使用一个较长的二进制位数组(bit array)和多个(k个)独立的哈希函数。
添加元素时,会使用所有 k 个哈希函数计算出多个位位置,并将这些位置置为 1。
检查元素时,同样使用所有哈希函数计算位位置,只有当所有位置均为 1 时,它才认为元素“可能存在”于集合中;如果任何一个位为 0,则元素“肯定不存在”于集合中(无假阴性)。
其优势在于极高的空间效率和查询效率,但代价是存在一定的误判率(假阳性):即它可能判断某个元素存在,但实际上并不存在。
-
在RocketMQ中的具体应用与配置
RocketMQ 利用布隆过滤器预先判断一条消息是否可能被某个消费者组消费。
存储:Broker 在生成 consumequeue 文件的同时,会生成一个对应的 consumequeueext 文件。此文件可以理解为存储了与(每个)消息关联的布隆过滤器位图信息。
匹配:当消息发送时,Broker 会拿该消息的属性(property)与订阅了该 Topic 的所有消费者组的 SQL 表达式进行匹配。
置位:对于匹配成功的消费者组,会将其在布隆过滤器位图中对应的位(由
consumerFilter.json
文件中的bloomFilterData.bitPos
字段确定)置为 1,标记该消息应投递给此消费者组。查询:当需要检索消息时,Broker 首先会 consult consumequeueext 文件中的布隆过滤器数据,快速筛选出可能需要此消息的消费者组,从而极大减少了需要读取完整 commitlog 进行属性匹配的次数。
二次校验:由于布隆过滤器存在误判可能,当它判断一个消费者组可能需要某消息时(返回 true),Broker 会基于 commitlog 中的实际属性数据进行二次过滤,以确保投递的准确性。
关键配置参数:布隆过滤器的行为可通过 Broker 配置调整,主要参数包括:
expectConsumerNumUseFilter
:预期使用过滤的消费者组数量(默认32
)。maxErrorRateOfBloomFilter
:布隆过滤器允许的最大误判率(默认20%
)。在默认配置下,经过计算,所需的位数组长度(m)约为 112 bits,哈希函数次数(k)为 3。
-
总结而言,RocketMQ 通过引入布隆过滤器预处理机制,将属性过滤的性能提升至接近标签过滤的水平。其工作流程是:快速预筛选(布隆过滤器) -> 精准确认(二次查询commitlog)。这套机制在保证消息投递准确性的同时,通过牺牲少量存储空间和接受可控的误判率,换取了巨大的性能提升,非常适合处理大规模消息场景。
④属性过滤可替换类过滤模式
SQL92 过滤模式的出现,从实践层面已能完全取代传统的类过滤模式。尽管有观点认为上传 class 文件可实现更复杂的功能,而 SQL92 仅为其功能子集,但需回归其设计初衷:该机制专为高效实现服务端消息过滤而生,核心是对标签、属性等元数据进行简单匹配筛选。生产环境中,为消息过滤而让 Broker 执行数据库查询无异于“杀鸡用牛刀”,而 SQL92 以足够的易用性精准满足了匹配需求。正因如此,原本复杂且存在较高风险的 Filter Server 组件已无存在必要,自然退出历史舞台。
十. RocketMQ顺序消息原理
顺序消息是 RocketMQ 的重要特性,专为解决消息间存在逻辑依赖的场景设计。虽然多数情况下消息可独立消费,但在以下两类场景中,消息顺序性直接影响业务正确性:
电商订单状态流转:订单生命周期包含创建、支付、发货等关键状态,若消息乱序处理(如先消费发货再处理支付),将导致业务流程错乱与数据不一致。
MySQL binlog 订阅与异构数据构建:消费 binlog 时需严格遵循数据操作顺序(插入→更新→删除)。若消息乱序(如先处理删除再消费插入),异构存储可能保留本应删除的数据,造成数据冗余与逻辑错误。
1. 消息中间件的时序性挑战
在分布式架构的消息中间件与应用系统中,确保消息严格顺序是一项复杂挑战,需同时满足四个关键条件:
消息发送顺序性:生产者必须按业务顺序同步发送消息,若采用异步发送则无法保证后发消息后到达,这要求中间件提供可靠的同步发送接口。
消息存储顺序性:顺序存储要求串行化写入,这与高并发存储设计存在根本冲突,可能显著降低吞吐性能,需在顺序与性能间权衡。
消息投递顺序性:分布式存储环境下,即使单节点能顺序存储,也需协调多个节点按序向消费者投递,跨节点顺序协调难度较大。
消息消费顺序性:面临双重挑战:一是需保证顺序投递的消息由同一消费者实例处理,避免分散至不同实例;二是即便单实例接收,多线程消费时仍无法保证任务执行顺序与投递顺序一致,因线程调度具有不确定性。
2. Kafka 顺序消息的解决方案
①Kafka 消息存储和消息投递的顺序性
Kafka 的整体消息模型与 RocketMQ 类似,均采用主题(Topic)作为核心的消息组织和管理单元。为提升消息处理的并发能力,每个主题下会划分多个分区(Partition),其功能类似于 RocketMQ 中的队列(Queue),用于实现消息的并行消费。
在存储机制上,Kafka 与 RocketMQ 存在显著差异:Kafka 未采用集中式、全局有序的 CommitLog 存储方案,而是让每个分区独立地、按顺序(FIFO)存储其接收到的消息。因此,Kafka 的整体消息结构可视为由多个有序的分区队列构成。
由于每个分区本身是一个 FIFO 队列,Kafka 在单个分区维度天然保证了消息存储与投递的顺序性。需注意的是,该顺序性仅限于同一分区内部。若有多个消息需严格按序消费,则必须确保它们被发送至同一分区,否则无法保证全局顺序。
②Kafka 消息生产的顺序性
Kafka 通过分区(Partition)机制实现消息的顺序性保障。其核心设计在于:同一分区内的消息严格按 FIFO(先进先出)顺序存储和投递。若需保证特定业务场景(如同一订单)的消息顺序,开发者只需利用 Kafka 生产者提供的哈希路由策略,将同一业务键(如订单号 order1)的所有消息路由至同一分区即可。
例如,订单 order1 和 order2 分别包含创建、支付、发货三条消息。通过哈希策略,order1 固定投递至分区 0,order2 固定投递至分区 1。只要生产者按业务顺序发送消息,每个订单的所有操作在各自分区内必然保持顺序存储与投递,天然满足业务对顺序性的要求。此设计以分区粒度平衡了顺序性与并发性,无需额外复杂逻辑。
③Kafka 消息消费的顺序性
Kafka 通过其分区与消费者线程模型天然保障了消息的顺序性。其核心机制在于:一个分区在同一时刻只会被分配给一个消费者实例,且该实例通常以单线程模式运行,通过主动拉取的方式消费消息。这意味着,对于单个分区而言,消息的拉取和处理过程是串行化的,从而确保了分区内消息的消费顺序与存储顺序完全一致。
3. RocketMQ 顺序消息的解决方案
Apache RocketMQ 的开源版本在其源代码中并未明确定义“顺序消息”这一概念,但该平台确实提供了一套完整的顺序消息实现解决方案。鉴于“顺序消息”一词在业界具有高度的通用性和概括性,并且在主流搜索引擎中搜索此关键词,排名靠前的文章大多与 RocketMQ 相关,因此在技术讨论中继续沿用这一术语是恰当且便于理解的。
值得注意的是,RocketMQ 所提供的顺序消息解决方案,其设计思想是在借鉴并优化了 Kafka 的原有方案之上形成的。要深入理解其运作机制,一个有效的方式便是将其与 Kafka 的方案进行对比分析。
①RocketMQ 的消息存储顺序性实现
RocketMQ 中对应于 Kafka 分区的概念称为队列(queue),它是一个逻辑队列,具备 FIFO 特性,但并非实际存储消息的实体。所有消息均统一存储于共用的 commitlog 文件中,因此即便在逻辑队列中相邻的消息,其在 commitlog 中的物理存储位置可能并不相邻。
然而,在顺序消息的场景下,这一存储层面的差异并不产生影响。从需要保证顺序的消息批次角度来看,RocketMQ 依然以队列为维度进行消息投递,能够确保先存储的消息优先被投递,从而满足消息顺序性的要求。
②RocketMQ 的消息生产顺序性实现
RocketMQ 的生产者负载均衡策略默认仅提供轮询方式,但通过 MessageQueueSelector接口支持用户自定义队列选择逻辑。该接口定义如下:
public interface MessageQueueSelector {
MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg);
}
生产者发送方法提供了重载版本,支持传入选择器实例:
public SendResult send(Message msg, MessageQueueSelector selector, Object arg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException;
在顺序消息场景中,选择器的核心目标是将需要顺序处理的消息路由至同一队列。常见实现方式是基于业务键(如订单号、用户ID或数据库主键)进行取模、哈希或一致性哈希计算。以下是一个基于取模策略的示例实现:
选择器的参数 arg可由用户根据业务需求灵活决定,确保相关消息通过相同计算规则散列到目标队列,从而保障消息顺序性。
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer orderId = (Integer) arg;
int index = orderId % mqs.size();
return mqs.get(index);
}
③RocketMQ 的消息消费顺序性实现
RocketMQ 与 Kafka 的消费者模型在核心机制上并无本质区别,均遵循“一个队列在同一时刻只能分配给一个消费者实例”的原则。当消息被顺序投递到某个消费者实例后,消息消费的顺序性保障即转换为该单实例内部的顺序处理问题。
Kafka 消息顺序性的弊端
Kafka 提供了一套顺序消息的实现方案,但其核心缺陷在于系统的整体消费并行度受限于分区数量与消费者实例数量中的较小值(最大消费并行度 = Min(分区数, 消费者数))。例如,一个拥有 2 个分区和 2 个消费者实例的 Topic,其并行度上限仅为 2。这意味着,即使单机性能充足,也无法通过单纯增加消费线程来提升处理能力。
若需将并行度提升至 100,则必须同时创建 100 个分区并部署 100 个消费者实例,操作复杂且资源消耗大。开发者也可尝试在单个消费者进程内创建多线程来减少实例数量,但这将显著增加开发复杂度与资源管理难度,同时难以可靠地保证消息至少被消费一次。
RocketMQ 相比 Kafka 消费顺序性的优化
RocketMQ 在消费并行度上实现了显著优化。其并行消费模型采用明确的分工架构:一个线程专职负责消息拉取,另有一个独立线程池处理已拉取的消息。这种设计使得整体消费并行度由消费者实例数量与消费线程池大小的乘积决定,大幅提升了消息处理能力。
在顺序消费场景下,RocketMQ 通过特殊线程模型保障同一队列内的消息串行消费,同时允许不同队列并行处理。具体表现为:当多个队列分配给同一消费者实例时,各队列的消息消费互不干扰。其顺序消息并行度计算公式为各实例的线程池大小与分配队列数的最小值之和(∑Min(单实例线程池大小, 单实例分配队列数))。例如:20个队列由2个消费者实例处理(各消费10个队列),若每个实例线程池大小为10,则总并行度可达20。这一设计相比Kafka实现了显著优化——同等并行度需求下,Kafka需启动20个消费者实例,而RocketMQ仅需2个实例即可满足。
RocekeMQ 的线程池在顺序消费的实现
启动方式与回调接口:启动 RocketMQ 的顺序消费功能,需在编写消息消费回调逻辑时使用专用的
MessageListenerOrderly
接口,而非普通的并发监听器。这是实现顺序消费的基础前提。线程模型与队列加锁机制:线程池中实现顺序消费。RocketMQ 通过独特的队列加锁机制保障消息顺序。其底层仍使用 JDK 标准线程池,但在此基础上,对每个队列(Queue)施加锁进行同步控制。当多个消息(如 msg1, msg2, msg3 属于 queue0;msg4, msg5, msg6 属于 queue2)被并发拉取到同一消费者实例并提交至线程池后,线程在执行消费回调前,必须首先成功获取其目标队列的锁(注意, 这里的锁是针对本地内存 queue 的 对象进行加锁)。
同一队列串行消费:对于属于同一个队列的消息,只有成功获得该队列锁的线程才能执行消费,其余线程则等待锁释放。这确保了单个队列内部消息的严格串行处理。
不同队列并行消费:不同队列的锁相互独立,因此不同队列的消息消费可以并行进行,从而提升了系统的整体吞吐量。
基于队列的任务提交与顺序保证:RocketMQ 并非按单条消息提交任务,而是以队列为单位。抢到锁的线程会从该队列中按顺序提取消息(默认1条,可配置批量大小)进行消费。此设计确保了即便线程调度顺序不定,消费顺序也始终与消息在队列中的存储顺序一致。
全局顺序性与进程级锁:由于 RocketMQ 的负载均衡由客户端计算,在特定时刻可能出现同一队列被分配给多个消费者实例的情况,从而导致消息被重复投递给不同实例,从全局视角看破坏了顺序性。
为解决此问题,RocketMQ 在顺序消费场景下引入了 Broker 端的进程级锁:
消费者实例必须向 Broker 成功申请到某个队列的锁,才有资格拉取和消费该队列的消息。
此进程锁设有超时时间(默认60秒),可通过系统属性
rocketmq.broker.rebalance.lockMaxLiveTime
调整,以防止因消费者实例异常崩溃而导致锁被永久占用。例如两个消费者 c1、 c2 实例都在负载均衡的时候拿到了 queue0。 假设 queue0 里面有 msg1、 msg2、 msg3 需要顺序消费, c1 拉取消息和消费消息的时间更早, 那么从全局的角度来 看, 消息的消费顺序就是 msg1、 msg2、 msg3、 msg1、 msg2、 msg3。从每个消费者的角度看, 消息的确是顺序消费的, 但是从局部去看就会发现中间一段时间, 出现了 msg3 先消费才到 msg1 的情况。
这套机制共同作用,既在单个实例内保证了队列级别的消费顺序,又通过分布式锁在多个实例间协调,致力于实现更高层面的全局顺序性。
-
例如 queue0 的 msg1、 msg2、 msg3 需要顺序消费, queue2 的 msg4、 msg5、 msg6 也需要顺 序消费。 这时候 6 条消息都被并发地拉取到了同一个客户端实例, 然后这 6 条消息被提交到 了线程池中准备执行。 在执行指定的回调之前, RocketMQ 的客户端 API 会做很多的事, 其中就包括对 queue0、 queue2 进行上锁的操作。
假设现在线程池有 6 条线程都被激活去消费 6 条消息。 t1 拿到的是 msg1, t2 拿到的是 msg2, 以此类推。
刚开始 t1 消费 msg1 之前, 会对 queue0 加锁 (注意, 这里的锁是针对本地内存 queue0 的 对象进行加锁) , 由于这时候没有别的线程拿到这个队列的锁, 所以 t1 可以继续进行后面的 逻辑。
而 t2 和 t3 进行消费之前, 也会尝试对 queue0 进行加锁, 这时候它们都拿不到锁, 就会在 锁外等待。 当 t4 进行消费 msg4 的时候, 它会对 queue1 进行加锁, 由于加锁成功, t4 也会进 行消息的消费。 同样, t5 和 t6 在对 msg5 和 msg6 消费之前, 由于拿不到锁, 所以也需要原地 等待锁的释放。
这就是为什么不同队列之间的消息消费可以并行的同时, 同一个队列的消费却可以做到 串行的逻辑。
还是同样的例子, 当消费者准备进行消费 msg1、 msg2、 msg3 的时候, 会将这三个任务提 交到线程池, 这时候 t1、 t2、 t3 都会尝试去抢锁, 最后只有 t1 抢到。 t1 便从队列 queue0 中拿 出 1 条消息, 也就是 msg1 (拉取的数量默认是一条, 可配置成获取一批) , 然后消费, 消费结 束后释放锁。 后面即便抢到锁的是 t3, 它再从队列 queue0 拉取一条消息, 拉取到的也是 msg2, 所以能保证最后消费的顺序和队列的顺序是一致的。
/**
* 注册顺序消息监听
* 使用 MessageListenerOrderly 接口实现顺序消息消费
*
* 示例代码结构:
* consumer.registerMessageListener(new MessageListenerOrderly() {
* @Override
* public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
* for (MessageExt msg : msgs) {
* // 消息逻辑省略 - 在此处实现具体的顺序消息处理逻辑
* }
* return ConsumeOrderlyStatus.SUCCESS; // 返回消费成功状态
* }
* });
*/
***********
/**
* 顺序消费场景下,进程级的锁超时时间,默认是60s
* 通过系统属性 rocketmq.broker.rebalance.lockMaxLiveTime 进行配置
*/
private final static long REBALANCE_LOCK_MAX_LIVE_TIME = Long.parseLong(
System.getProperty("rocketmq.broker.rebalance.lockMaxLiveTime", "60000"));
4. 分区顺序VS全局顺序
RocketMQ 的顺序消息分为两种类型:
分区顺序消息:针对指定主题,所有消息按 Sharding Key(如用户ID、订单ID)进行分区,确保同一分区内的消息严格遵循 FIFO(先进先出)顺序发布和消费。此模式适用于绝大多数互联网场景,如用户注册验证码发送(以用户ID为Key)或电商订单全链路(创建、支付、退款、物流以订单ID为Key),在保障业务顺序性的同时兼顾并发性能。
全局顺序消息:要求主题下所有消息严格按 FIFO 顺序处理,是分区顺序的特例(通过单队列实现)。仅适用于极少数强顺序需求场景,如公平购票(先到先得)或消息乱序零容忍系统(宁可不可用也不乱序)。
分区顺序消息通过分治策略平衡顺序性与吞吐量,是推荐的主流方案;全局顺序因严格限制易成性能瓶颈,需谨慎评估。实际应用中应优先通过合理设计 Sharding Key 满足业务顺序需求,避免过度依赖全局顺序。
5. RocketMQ 顺序消息的缺陷
①分区热点问题
消息队列系统中存在一个关键的性能约束机制:单个分区(或队列)在同一时间点只能被分配给一个消费者实例中的单条线程进行消费。这种设计导致当出现消息积压或消息在不同分区间分配不均匀的情况时,开发人员无法通过常规的横向扩展手段——例如增加消费者实例数量或扩大单个消费者的线程池规模——来直接提升积压消息的整体消费速率。该机制清晰地揭示了某些分布式系统在特定场景下可能面临的扩展性瓶颈。
②扩容与顺序性的冲突
RocketMQ 顺序消息的最大并行度受限于队列总数,其具体数值为各消费者实例的线程池大小与所分配队列数中较小者的总和。若需提升该并行度,必须增加主题的队列数量。
需要注意的是,在系统运行过程中进行队列扩容,可能会改变消息生产的路由策略。例如,扩容前属于同一批顺序消息的消息A被路由至 queue1,而扩容后该批消息中的消息B可能被路由至新的 queue2,这种路由变更将破坏消息的顺序性。
因此,对于明确需要支持顺序消费的主题,建议在创建时就预留充足的队列数量,避免运行时扩容。针对此问题,社区正在探索通过“逻辑队列”(一个逻辑分区对应多个物理分区)的方案来解决,但该功能在 4.9.4 版本中尚未发布。
③顺序性与可用性的冲突
严格而言,RocketMQ 无法在所有异常场景下保障严格的消费顺序,因其顺序性与系统可用性存在本质冲突。例如,若需保证同一订单(如订单号
123456789
)下消息msg1
、msg2
、msg3
的顺序性,当msg1
和msg2
成功发送至Broker-0-queue0
后,若Broker-0
突发故障,则msg3
的路由面临两难:若按原路由规则发往已不可用的
Broker-0-queue0
,消息无法投递,牺牲可用性;若改发至其他可用队列(如
Broker-0-queue10
),则顺序性被破坏。此冲突揭示了分区顺序消息在极端场景下的局限性。若业务完全无法容忍乱序,需启用全局顺序消息(通过单队列实现),但此举会将主题退化为单点,显著牺牲可用性与扩展性。因此,实际应用中需根据业务需求权衡顺序性与可用性,通常推荐通过合理设计 Sharding Key 和容错机制降低乱序影响。
十一. RocketMQ事务性消息原理
1. 分布式场景下的ACID
①认识ACID
在传统的单体架构中,开发人员通常不会遇到分布式事务的挑战,这得益于关系型数据库对事务的原生支持。例如,在经典的Bob向Smith转账场景中,关系型数据库通过满足ACID四大特性来确保事务的正确执行和数据的一致性。
ACID是指原子性(Atomicity)、一致性(Consistency)、隔离性(Isolation)和持久性(Durability)。原子性要求事务是一个不可分割的操作单元,其中的操作要么全部成功,要么全部失败,MySQL通过undo log机制保证这一点。一致性确保事务执行前后数据库的完整性约束不被破坏,而这需要原子性、隔离性和持久性的共同保障。隔离性指多个并发事务之间相互隔离,互不干扰,MySQL通过MVCC和多版本并发控制实现。持久性则保证一旦事务提交,其所做的修改就会永久保存在数据库中,即使系统发生故障也不会丢失,MySQL利用redo log来实现持久性。
这些特性共同构成了关系型数据库可靠事务处理的基础,使其能够有效应对诸如转账等需要严格数据一致性的业务场景。
②分布式场景下的事务挑战
传统单体架构依赖单一关系型数据库实例处理事务,可满足基础需求。然而,互联网场景面临高并发、海量用户的挑战,单一实例难以支撑业务发展。为应对压力,架构师普遍采用“分拆”策略,遵循“大而化小,小而化了”的思路进行架构升级,通过分布式方案提升系统扩展性与承载力。
服务拆分对于事务的挑战
在分布式架构演进中,服务拆分是首要步骤,但会引入新的复杂性。以电商系统为例:订单处理与库存扣减本可通过关系型数据库的 ACID 特性(在同一事务中完成订单插入与库存更新)保证一致性。然而,当这两类逻辑被拆分至独立服务(如订单模块、库存模块)后,数据库事务的边界被打破。
拆分后,库存与订单模块各自建立独立的数据库连接操作资源,不同连接无法共享事务上下文,导致传统数据库的 ACID 特性难以直接应用。这一变化使得跨服务的数据一致性保障成为分布式架构的核心挑战。
业务拆分对于事务的挑战
在微服务架构中,"一服务一库"是常见的设计原则,即每个独立服务配备专属数据库。服务拆分后,数据库随之分离(如图11-2所示),形成分布式数据存储模式。
这种拆分导致传统关系型数据库事务机制失效。例如订单表与库存表分属不同服务及数据库后,无法通过单一事务保证跨表操作(如创建订单并扣减库存)的原子性。这可能引发超卖(库存扣减失败)或数据不一致(订单创建失败)等问题,需依赖分布式事务、消息队列或补偿事务等技术保障最终一致性,但会增加系统复杂性与性能开销。
数据库拆分对于事务的挑战
即便所有订单逻辑集中于订单模块,在高并发与大数据的双重压力下仍面临严峻挑战。以拼单业务为例:两用户拼单成功后,系统需为其同步创建订单记录。在单数据库架构下,可通过本地事务轻松实现两行记录的原子插入。
然而,面对亿级订单量的系统,单一数据库难以满足存储与性能需求,必须采用分库分表方案。假设将订单表拆分为 2 个库(各含 100 张表,共 200 张表),并以用户 ID 作为分片键,则可能出现拼单用户 ID 分属不同库表的情况(如 userId1 路由至 0 库 0 表,userId2 路由至 1 库 99 表)。
分库分表后,数据库相互独立,订单模块无法再利用关系型数据库的本地事务机制保障跨库操作的原子性(如图 11-3 所示)。此问题揭示了分布式环境下数据一致性的核心矛盾:存储扩展性与事务完整性难以兼得,需引入分布式事务或最终一致性方案弥补传统事务机制的失效。
2. CAP VS BASE
①认识 CAP 三角
CAP定理是分布式计算领域的公认定理,由加州大学伯克利分校Eric Brewer教授于1998年提出,1999年正式发表,后由麻省理工学院学者从理论上证明。该定理指出,在分布式系统中,一致性(Consistency)、可用性(Availability)和分区容错性(Partition Tolerance)三者不可兼得,系统设计时最多只能同时满足其中两个特性(CA、CP或AP)。
一致性(C):指分布式系统中所有数据副本在同一时刻对客户端读取呈现相同数据。
可用性(A):指系统部分节点故障后,集群仍能响应客户端读写请求,且在有限时间内返回正确结果。
分区容错性(P):指系统遇到网络分区故障时(非全网故障)仍能正常提供服务。
传统单机数据库优先保证CA(放弃P),而分布式系统必须保障分区容错性(P),因此需在CP(强一致性)与AP(高可用性)之间抉择。这一权衡体现了分布式场景下一致性问题的本质矛盾。
②认识BASE理论
根据CAP定理,在分布式系统中完整实现类似ACID的事务特性需放弃可用性(即采用CP模型)。然而,互联网应用普遍将可用性置于优先地位,因长时间无法响应的产品难以满足用户需求。
eBay架构师基于CAP定理提出BASE作为ACID的替代方案,旨在协调可用性与一致性。BASE包含三个核心原则:
基本可用(BA):系统故障时允许部分功能降级,确保核心服务可用;
软状态(S):接受系统存在中间状态(如“转账中”),不影响整体可用性;
最终一致性(E):不要求实时强一致,但承诺数据最终达到一致状态。
BASE理论源于大型互联网分布式实践,是对CAP中一致性与可用性权衡的结果,其核心是在强一致性无法保障时通过业务适配实现最终一致性。从ACID到BASE的演进被戏称为“酸碱理论”(Base与Acid的英文含义对应碱与酸)。
3. 分布式一致性方案
接下来将系统介绍包括 RocketMQ 事务消息在内的多种分布式事务解决方案。这些方案本质上都是对 BASE 理论的具体工程实践,旨在平衡分布式环境下的数据一致性与系统可用性。
不同方案在一致性和可用性的实现程度上存在差异,但遵循一个基本规律:方案的强一致性保障程度越高,往往需要以牺牲部分可用性为代价。这种权衡关系是分布式系统设计的核心挑战,也是选择具体方案时需要重点考量的因素。
①一致性概念解析
在深入探讨 RocketMQ 的事务消息实现原理之前,有必要先了解业界为解决“分布式事务”这一经典难题所提出的各类现有方案。为便于理解和对比,本书将这些解决方案归纳为三大类型:“强一致性方案”、“弱一致性方案”以及“最终一致性方案”。
强一致性方案
“强一致”一词在不同技术语境中存在多种解释,其含义具有多样性。常见的理解至少包括四种:1)CAP理论定义的完全一致性(任何时刻任何情况一致);2)副本复制场景中同步复制与异步复制的区别(同步即强一致);3)模糊的“相对强一致”概念(大部分情况一致即视为强一致);4)排除极小概率的网络分区等突发故障,确保事务完整执行且数据始终满足业务约束。
第1种定义(完全一致)在当前技术方案中均无法实现,因系统难以完全牺牲可用性(A)来保障一致性(C)。第2种定义在特定场景成立,但本书聚焦分布式事务,不以其为区分标准。第3种定义缺乏科学性,常见于方案对比时的非严谨表述。本书采纳第4种定义作为分类依据:假设事务执行过程顺利(无突发网络分区),数据保持一致,且在可预知故障(如超时、重启)下仍能保证事务一致,则视为强一致。此类方案均体现“舍A求C”的设计思路。
弱一致性方案
在分布式系统中,当某个写操作成功后,用户读取数据时可能短暂处于不一致状态,这段时间被称为“不一致性窗口”。其典型表现包括两类场景:
数据同步延迟:如在主备数据库同步过程中,从备用库读取到未更新的旧数据;
多数据源状态分歧:如电商场景中库存已扣减但订单尚未生成,导致数据暂时矛盾。
需注意弱一致性并非如其名称般“脆弱”。许多互联网架构采用的方案虽严格符合弱一致性定义,但其实际效果往往可接受,后续将通过实践案例具体说明其合理性。
最终一致性方案
最终一致性是分布式系统中的重要概念,对应 BASE 理论中的 "E"(Eventual Consistency)。它本质上是弱一致性的一种特殊形式,其核心在于明确承认数据更新后可能存在一个 "不一致性窗口" —— 在此期间,系统的不同副本可能呈现短暂的数据不一致状态。
与一般的弱一致性相比,最终一致性的关键特征在于其承诺性:它保证该不一致性窗口必然会在有限的时间内关闭,所有数据副本最终将收敛至完全一致的状态。这一承诺使其成为许多互联网分布式系统实现高可用性与数据可靠性平衡的实践基础。
②强一致性方案
两阶段提交
两阶段提交(Two-phase Commit)是分布式系统中确保事务一致性的核心协议,通过协调者统一调度参与者节点,分为准备与提交两个阶段:
准备阶段:协调者询问各参与者能否提交事务,参与者执行操作(记录undo/redo日志但不提交)并反馈结果(yes/no);
提交阶段:协调者根据反馈(准备阶段)决定全局提交或回滚,参与者执行指令后释放资源并反馈ack,协调者收齐ack后完成事务。
在正常提交的情况下:协调者向所有参与者发出正式提交 (commit) 事务的指令。参与者执行 commit, 并释放整个事务期间占用的资源。各参与者向协调者反馈ack (应答) 完成的消息。协调者收到所有参与者反馈的ack 消息后, 即完成事务提交。
而在异常的情况下:协调者向所有参与者发出回滚 (rollback) 指令。参与者使用阶段1 中的undo 信息执行回滚操作, 并释放整个事务期间占用的资源。各参与者向协调者反馈ack 完成的消息。协调者收到所有参与者反馈的ack 消息后, 即完成事务中断。
-
该协议虽逻辑清晰,但存在严重可用性问题:
同步阻塞:提交阶段所有参与者阻塞等待指令,第三方访问相应资源时亦被阻塞;
单点故障:协调者故障将导致参与者永久锁定;
网络分区风险:极端情况下部分节点收不到指令可能引发数据不一致(但概率极低)。在commit phase中如果 发生网络分区问题, 导致事务参与者一部分收到了commit指令, 而另一部分事务参与者却没 收到, 那么这种情况下节点之间就会出现数据不一致。 这也是为什么前文说绝对的强一致方 案理论上是不存在的。
因其强一致性设计(牺牲可用性换一致性)及实际缺陷,该协议在互联网高并发场景中极少采用。
三阶段提交
上文提到二阶段提交存在单点故障、 同步阻塞和数据一致性的问题, 进而 “三阶段提交” (3 Phase Commit, 3PC) 协议出现了, 三阶段提交能有效地解决这些问题。 三阶段提交协议把 过程分为三个阶段: CanCommit、 PreCommit、 DoCommit 。
第一阶段: CanCommit 阶段。 三阶段提交的 CanCommit 阶段其实和两阶段提交的准备 阶段很像, 协调者向参与者发送一个commit请求, 如果参与者可以提交, 就返回yes响应, 否则返回no响应。
-
第二阶段: PreCommit 阶段。 协调者在PreCommit 阶段会根据参与者在 CanCommit 阶段 的结果采取相应操作, 主要有以下两种。
情况1 (正常场景): 假如所有参与者的反馈都是yes响应, 那么协调者就会向所有参与 者发出PreCommit 请求执行事务的预执行, 参与者会将 undo 和 redo 信息记入事务日志中 (但 不提交事务)。
情况2 (异常场景): 假如有任何一个参与者向协调者发送了no响应, 或者协调者等待参 与者响应超时, 协调者进行事务中断。 引入这样的超时机制后, 就解决了永久阻塞问题。
-
第三阶段: DoCommit阶段。 该阶段进行真正的事务提交, 也就是在协调者发出了pre Commit 请求之后的阶段。 这时候也可以分为以下两种情况。
情况1 (正常提交): 即所有参与者均反馈ack, 执行真正的事务提交。首先协调者向各个参与者发起事务DoCommit 请求。 参与者收到DoCommit 请求后, 会正 式执行事务提交, 并释放整个事务期间占用的资源, 然后向协调者反馈ack。 最后协调者收到所有参与者ack后, 即完成事务提交。
情况2 (中断事务): 即PreCommit阶段任何一个参与者反馈 no, 或者协调者等待所有参 与者的响应超时, 即中断事务。首先参与者使用CanCommit中的 undo 信息执行回滚操作, 并释放整个事务期间占用的资 源。 然后各参与者向协调者反馈ack完成的消息。 最后协调者收到所有参与者反馈的ack消息 后, 即完成事务中断。
注: 在DoCommit阶段, 无论协调者出现问题, 还是协调者与参与者出现网络问题, 都可能导致参与 者无法接收到DoCommit或abort 请求。 那么参与者都会在等待超时后执行事务提交。 这个超时机制也是 为了解决永久阻塞的问题。
-
3PC 通过超时机制提升系统可用性,减少协调者单点故障的影响。然而,这种设计也带来更高的数据不一致风险(如协调者发送中断指令但参与者因超时已提交)。根据 CAP 理论,这实质是以一致性妥协换取可用性提升。加之协议实现复杂且性能难以满足高并发场景,实践中较少采用。
XA 协议
上文提到二提交阶段协议, 在传统方案里都是面向数据库层面实现的, 如Oracle、 MySQL 都支持二提交阶段协议。为统一标准并降低行业对接成本,国际开放标准组织 Open Group 于 1994 年定义了分布式事务处理参考模型(DTP),该模型包含三个核心角色:
AP(应用程序):事务发起者,定义具体事务操作(如数据库更新);
RM(资源管理器):事务参与者,管理共享资源(如数据库、消息中间件)并提供事务提交/回滚能力;
TM(事务管理器):全局协调者,与各 RM 通信以协调事务提交或回滚。
-
XA 协议是 DTP 模型的核心实现规范,目前主要被关系型数据库(如 MySQL、PostgreSQL)和消息中间件(如 ActiveMQ)支持。其本质是基于资源层的两阶段提交(2PC)方案,虽能保障强一致性,但存在显著性能瓶颈:全程持有资源锁延长事务执行时间,增加死锁风险。实测表明,基于 XA 的 MySQL 集群操作延迟可达单机的 10 倍,故难以适用于高并发互联网场景,目前多用于传统单体应用中的跨库一致性保障。
③弱一致性方案
弱一致性方案常因名称中的“弱”字而被轻视,但实际在绝大多数场景下其表现并非不堪。它既非完全不一致,也非大概率不一致,而是相较于强一致性方案,在超时、网络分区或机器重启等异常场景下更容易出现数据不一致;同时对比最终一致性,它又缺乏自我恢复的保证。然而在互联网领域,当前大部分正在采用或计划采用的技术方案均属于弱一致性范畴。本书列举的三种最常见方案即被归为此类,原因在于它们无法自动恢复一致性,但实际其一致性表现并不弱,且完全适用于互联网绝大部分应用场景。
基于业务妥协的状态补偿
在分布式业务场景中,应根据数据重要性实施分级管理。当强一致性与系统性能冲突时,可对非核心数据适当放宽一致性要求,以保障整体性能。这种策略通常不会对主营业务造成重大影响。
以超高并发电商秒杀为例,创建订单与库存扣减作为关键操作:
性能优先设计:采用先扣库存(快速操作)、后异步处理订单(相对缓慢)的执行顺序,有效规避超卖风险;
补偿机制保障:通过事后清理脚本定期扫描库存流水,对超时未支付订单自动回滚库存(类似"抢购未付款自动释放库存"机制);
故障容忍设计:接受极低概率的补偿失败(可通过人工核查修复),以较低实现成本换取系统高性能与高可用性。
重试+告警+人工修复
在应对分布式系统数据一致性挑战时,当自动重试和回滚机制均告失败后(例如出现“少卖”等不一致场景),采用人工修复作为一种低成本补救方案是可行且实用的策略。该方案的核心思想并非被动等待用户投诉,而是要求系统具备主动的不一致性检测能力,能通过监控日志、数据库记录等手段及时发现问题并触发告警。
实施时,研发人员可依据业务流程记录的操作日志和上下文信息定位问题,并进行数据修复。此方案实质上接受了分布式场景下难以完全避免的短期不一致性,但通过事后的人工干预确保数据的最终正确性。其优势在于实现成本低,适用于大多数对强一致性要求不高的互联网业务场景,是对技术方案难以完美解决问题的一种务实补充。
订单和库存的例子进行说明, 正常的业务逻辑是先扣库存, 然后创建订 单, 如果订单创建失败, 这时候程序可以重试。 但重试也无法保证100%成功。 如果重试还是 失败的话, 正常情况下应该需要回滚库存。 回滚的操作也是可能失败的, 回滚失败后也可以持续重试回滚。 但回滚重试依旧可能会失败。 这时候其实就是一个不一致的场景 (少卖)。 此 时系统应该触发告警, 然后通过人工介入的方式, 根据日志、 数据库记录、 监控视图等手段 辅助研发人员进行人工的数据修复, 以解决其中的不一致。
异步队列处理
为提升系统性能,库存扣减常采用 Redis 计数后异步同步至数据库的方案。异步实现分两种路径:若通过内存队列定时同步,服务重启可能导致任务丢失,属弱一致性方案;若依赖消息中间件(利用其持久化与至少一次投递机制),虽更可靠但消息成功发送本身存在挑战(如网络故障、Broker不可用),故本质上仍属弱一致。
此类异步处理在一致性要求不极致的场景中广泛应用。尽管存在弱一致性风险,但结合重试机制、实时告警与人工修复等辅助手段,可显著提升数据可靠性,成为平衡性能与一致性的有效实践方案。
对账
对账实际上是告警+人工修复的一种落地实践, 也是业界十分常见的应对分布式事务挑战 的方案。架构师在设计方案时, 对于可能导致不一致的数据点, 根据对账的思路来设计一个 自动对账流程以发现不一致的数据, 从而简化整个系统的设计。
对账是一种典型的事后处理机制,其核心在于只关注事务操作的最终结果,而非执行过程。在分布式系统中,研发人员通过定时扫描订单状态、对比本方与第三方系统(如支付流水)的数据一致性,主动发现数据不平问题并触发告警,进而通过人工干预实现数据修复。此方案以结果为导向,有效简化了系统设计的复杂性,成为应对分布式事务挑战的常见实践。
然而,对账机制存在明显局限:其一,它通常仅能发现问题而无法自动解决问题;其二,对账程序自身的稳定性受技术挑战影响(如异常场景下的执行可靠性存疑)。正因这些不确定性,本书将对账归类为弱一致性解决方案,其可靠性需结合人工修复等辅助手段共同保障。
④最终一致性方案
最终一致性是弱一致性的一个特例,其核心特征在于系统能够承诺:任何暂时的不一致状态都将在有限时间内自动修复并达到一致。除 RocketMQ 事务消息外,业界还存在多种成熟的最终一致性实现方案。了解这些方案的特点,有助于更深入地理解 RocketMQ 事务消息的设计理念与适用场景。下文将介绍几种典型场景下的最终一致性解决方案。
TCC
TCC 是基于 BASE 理论的类二阶段提交方案, 但是前文说过, 二阶段提交方案实际上在性能上 有极大的短板, 而 TCC 则根据业务的特性对流程 进行了优化。 以下还是以库存和订单的例子来讲 解, 如图 11-9 所示。
图 11-9 的流程中, 用户在下订单调用订单服务后, 还需要调用库存服务扣库存, 最后通过积 分服务给用户增加积分。
从事务的角度来看, 整个过程应该具有原子性, 即所有步骤要么都成功, 要么都失败。 而这里订单很可能成功了, 但是库存可能扣除失败, 这时候对于用户来说就会导致超卖问题。 前面内容提到可以用二阶段提交去解决类似的问题, 但是二阶段提交方案会使交易在事务成 功或者失败回滚前, 其他用户的操作都会阻塞, 极大地影响系统稳定性和用户体验。而TCC的流程虽然和二阶段提交类似, 但是却能极大地提高性能。
在具体实现上, TCC其实是一种业务侵入性较强的事务方案, 它要求业务处理过程必须 拆分为 “预留业务资源” 和 “确认 / 释放消费资源” 两个子过程。 而TCC实际上就是这两个 过程的三个阶段的缩写: Try、 Confirm、 Cancel。
Try: 尝试执行阶段, 完成所有业务可执行性的检查 (保障一致性), 并且预留好事务 需要用到的所有业务资源 (保障隔离性)。 在库存这个场景, 实际上就是冻结库存 (事实上没 有扣)。
Confirm: 确认执行阶段, 不进行任何业务检查, 直接使用 Try 阶段准备的资源来完成 业务处理。 注意, Confirm 阶段可能会重复执行, 因此需要满足幂等性。 在库存这个场景就是 确认扣除掉Try阶段的库存。
Cancel: 取消执行阶段, 释放 Try 阶段预留的业务资源。 注意, Cancel 阶段也可能会重复执行, 因此也需要满足幂等性。 在库存的场景就是把预扣库存的操作进行回滚。
-
TCC 协议相比二阶段提交/XA最明显的区别如下:
性能提升: 由具体业务来操作资源的锁定, 可以实现更小的锁粒度, 不会锁定整个 资源。
数据最终一致性: 基于 Confirm 和 Cancel 的幂等性, 保证事务最终完成确认或者取消, 保证数据的最终一致性。
可靠性: 解决了二阶段提交/XA里的协调者单点故障问题, 因为事务由主业务方发起并 控制, 使得事务的管理器也可变为多点。
但TCC最大的缺点是: TCC 的 Try、 Confirm 和 Cancel 操作功能要按具体业务来实现, 整体 改造、 开发成本高, 实现难度较大。 特别是老系统的改造, 因为深入到业务流程、 表设计、 接口 设计等阶段, 使得老系统用TCC改造的阻力极大。
查询+补偿
补偿是实现最终一致性的常见手段,以订单系统依赖支付系统为例,通常采用查询加自动查询补偿方式。这要求被动方提供查询、重试和取消接口,主动方定期查询未完成或异常操作,进行重新执行或回滚,以实现最终一致性,此过程如图11-14所示。订单系统不能完全依赖支付系统回调,因此需定时任务主动回查支付状态,若处于中间态,则通过重试下单或退款等自动补偿操作确保一致。在消息中间件消费重试中也常见类似机制。极端情况下(如系统bug或数据库持续问题),当重试到一定次数或时间仍失败时,会触发告警并结合人工修复,人工补偿成为保障一致性的最后屏障。
最大努力通知
最大努力通知方案(Best-Effort Delivery)是一种适用于回调、通知类柔性事务的设计模式,常见于银行、商户通知等需异步告知事务状态的场景。其核心特点包括:
努力通知:业务主动方(如支付系统)在完成处理后向被动方重复发送消息(N次后停止),允许消息丢失,确保基本可达性;
定期校对:被动方(如订单系统)通过定时查询主动方接口(如图11-15所示),恢复丢失消息,弥补通知不确定性。
以支付系统为例:微信支付需在回调失败时启动补偿机制,持续通知直至收到成功响应或达重试上限。定期校对本质是查询+补偿的组合策略,通过主动拉取保障最终一致性,成为分布式场景下可靠性设计的常见补充方案。
Saga 事务模式
Saga事务模式的历史十分悠久,甚至早于分布式事务概念的提出。其名称“Saga”意为“长篇故事”,核心思路是将一个大型分布式事务分解为一系列可交错运行的子事务(如图11-16和图11-17所示)。
Saga事务由幂等有序的子事务(T1, T2, ..., Tn)及其对应的补偿动作(C1, C2, ..., Cn)组成。若所有子事务成功提交,则事务完成;否则需执行恢复策略:向前恢复(持续重试)或向后恢复(回滚补偿)。以电商场景为例,T1扣减库存、T2创建订单、T3支付,若T2异常,两种恢复流程差异显著。
Saga适用于两类场景:一是业务流程长且多(如金融网络跨机构对接);二是需整合第三方或遗留系统,无法采用TCC或消息队列改造。其优势包括无锁设计(一阶段直接提交本地事务,避免资源锁定)和异步化能力(支持高吞吐处理)。
-
但Saga模式存在与其他异步方案相同的缺点:因一阶段已提交本地事务且未预留资源,无法保证事务隔离性(如图11-16和图11-17所示)。
一阶段直接提交:Saga 的每个本地事务(Tn)在完成后会立即提交并释放资源,而不是先“预留”或“冻结”资源。后续事务是否执行,不影响已提交事务的可见性。
无全局锁:Saga 模式不依赖全局锁机制来隔离并发事务对共享资源的访问。已提交的数据可能立即被其他事务感知和操作。
由于上述机制,在多个Saga事务并发执行,或Saga事务与系统内其他操作同时访问共享资源时,可能会遇到以下经典问题:
脏读(Dirty Read):一个事务读取了另一个尚未完成(可能后续会失败并补偿)的Saga事务已提交的中间结果。如果后者最终失败,那么读到的就是无效的“脏”数据。
丢失更新(Lost Update):两个Saga事务同时读取同一数据并进行更新,其中一个事务的提交覆盖了另一个事务的更新结果。
不可重复读(Non-Repeatable Read):一个Saga事务内的两个连续操作读取同一数据,但在两次读取之间,该数据被另一个并发事务修改并提交,导致两次读取结果不一致。
-
应对隔离性问题的常见策略
语义锁(Semantic Lock):在业务数据中增加一个状态字段(如
status
),用于标记数据正处于某个Saga事务处理中(如设为PENDING
)。其他事务发现此标记时,可以采取等待、跳过或拒绝策略。这在订单、库存等场景中很常见。版本控制(Versioning):为数据增加版本号(或时间戳)。更新数据时,需校验当前版本号与读取时是否一致(类似乐观锁)。若不一致,则说明数据已被其他事务修改,当前操作需中止或重试。
重读值(Reread Value):在执行补偿操作前,重新读取数据的当前值,确保基于最新状态进行回滚,避免在错误的基础上进行补偿。
本地事务状态表
本地事务状态表是一种通用性较强的分布式事务处理方案。其核心机制在于:在执行分布式事务前,凭借数据库本地事务的原子性,先将整个事务的流程与状态信息原子性地持久化到专用状态表中。此后,调用方基于该状态表推进后续调用:每一步成功则更新状态,失败则中止。后台定时任务会持续扫描此表,对未完成的事务进行重试调用或回滚操作,若重试达阈值仍失败则触发告警,转入人工修复流程。
异步确保模式是本地事务状态表方案的一种特定实现形式。其区别在于:在事务状态信息持久化完成后,并不立即执行实际的事务调用,而是完全依赖后台的定时任务扫描来触发和执行所有后续调用。此模式通过异步化调度进一步解耦了事务流程,提升了系统的灵活性与容错能力。
本地消息表
本地消息表与本地事务状态表设计思路类似,核心目标是通过补偿机制确保未完成事务的最终完成。消息中间件的 AT-LEAST-ONCE 投递机制本可用于实现补偿,但为何将事务异步化为任务持久化到消息中间件的方案被归类为弱一致方案?根本原因在于无法保证本地事务提交与消息发送的原子性:若 A 事务完成但消息未成功发送(如服务重启),则导致 A 成功而 B 未执行,数据不一致。
-
开发者常尝试用 try-catch 将消息发送与本地事务捆绑,利用异常回滚保障一致性。但存在两个隐秘问题:
消息发送超时:超时可能导致事务回滚,但消息实际发送成功,引发 B 事务执行而 A 回滚的不一致;
事务提交失败:若消息发送后事务提交失败,消息已投递但 A 未执行,同样导致数据错误。
-
本地消息表模式通过三重机制解决上述问题:
原子性写入:将业务操作与消息记录置于同一数据库事务,保证两者原子性;
异步可靠投递:执行后系统A不直接给消息中间件发消息, 而是通过后台定时任务扫描消息表中未发 送的消息进行发送。 对于发送过程遇到异常的消息则不断重试, 直到消息中间件返回 “发送 成功” 的确认, 并更新消息表中的投递状态。 这样一来, 就能保证进入到消息表中的消息最 终一定能被投递出去一次。
幂等消费保障:依赖消息中间件 AT-LEAST-ONCE 投递,要求消费方实现幂等处理(可能出现重复发送)。
可靠消息模式
可靠消息模式是本地消息表方案的演进版本,通过引入独立的第三方消息管理模块实现业务解耦(本地消息表里面维护消息的发送 状态是由业务模块设计的一张本地消息表负责的, 使消息表的插入和本地事务的执行在一个 事务中, 这对于业务是有所侵入的, 无法做到很通用。)。其关键流程分为四步:
业务执行本地事务前,通知消息管理模块预发送消息(消息暂存数据库而非立即投递);
业务完成本地事务后确认消息,触发管理模块正式投递;
若未及时确认,管理模块主动回查消息状态;
对已确认消息,管理模块保障投递至消息队列后标记发送成功。
分布式事务存在一致性与可用性的本质矛盾,需根据场景选择不同实践方案。RocketMQ 提供的异步队列方案,正是基于可靠消息模式的思想,通过异步化与解耦设计平衡两者关系,为分布式系统提供了一种可靠的消息事务处理思路。
4. 异步队列方案的挑战
分布式事务常采用异步化方案提升性能:当前系统处理完一个事务后,将后续事务委托给本地队列或消息中间件(如RocketMQ)异步执行,并辅以重试机制确保最终完成。这种设计通过解耦事务链提高了系统吞吐量,但因其可靠性局限被归类为“弱一致”。
弱一致性原因:一是本地队列易丢失任务(如机器重启导致数据丢失);二是即使使用可靠消息中间件(如订单系统发送消息至积分系统),仍存在异常场景破坏一致性(本地事务执行和消息发送的原子性问题)。例如,RocketMQ虽具备消息持久化、重试等优秀特性,表面可实现最终一致,但实际上在特定异常场景(如消息发送与事务提交的原子性破裂)下会出现数据不一致,需进一步探讨具体案例。
①场景一:服务重启导致消息发送失败
/**
* 分布式事务处理示例:订单与积分系统
* <p>
* 业务流程说明:
* 1. 首先执行本地订单事务处理
* 2. 成功后发送消息到积分系统处理用户积分
* <p>
* 注意:此方案存在潜在的一致性风险
* 如果在本地事务成功后,消息发送前发生服务重启(常见于系统发布场景),
* RocketMQ 实际上未收到该消息,导致积分系统事务无法执行。
* 在此场景下,RocketMQ 的重试机制和可靠性保障都将无法发挥作用。
*/
public class DistributedTransactionExample
{
public void processOrderWithPoints()
{
// 第一步:处理本地订单事务
processOrderTransaction();
// 第二步:发送消息给积分系统处理用户积分
// 消费失败会利用 RocketMQ 的重试机制一直重试
sendMsg();
}
private void processOrderTransaction()
{
// 本地订单事务处理逻辑实现
}
private void sendMsg()
{
// 消息发送逻辑实现
}
}
②场景二:服务重启导致消息发送失败
这时候有些读者可能会想到利用本地事务的能力, 把发送消息和本地事务打包在一个事务里, 如果事务没执行提交就重启是否就能解决问题呢? 伪代码如下所示。
三种情况:
本地事务成功, 消息发送成功, 事务提交, 事务完整。
本地事务成功, 发送消息失败, 回滚事务, 事务完整。
本地事务失败, 直接回滚事务, 事务完整。
-
风险:
本地事务成功, 消息发送超时。 这时候按照处理流程是需要回滚本地事务的。 但是消 息发送超时实际上是一个不确定的状态, 可能消息实际上是发送成功的, 也就是说积分系统 会收到这个消息, 最后可能的状态就是订单没成功, 但是用户积分却算多了。
本地事务成功, 消息发送成功, 事务提交之前机器重启。 虽然开发者已经把消息的发 送打包到了一个事务中, 但实际上消息的发送是不受事务控制的。 在程序执行完sendMsg这一 步之后, 消息实际上已经被发出去了。 消息发送出去后, 机器如果重启, 订单是没处理成功 的, 但这时候用户的积分却无故增长了。
③异步队列方案小结
即使采用高可靠性消息中间件,若缺乏特殊处理,仍无法保证分布式事务的最终一致性。问题的核心在于消息生产方:需确保本地事务操作与消息发送的原子性(要么全成功,要么全失败)。
为解决此问题,业界提出本地消息表等方案(将消息发送记录为状态并通过补偿机制保障成功),或引入独立消息管理模块(可靠消息模式)。但这些方案实现成本较高,需记录状态并应对中间件不可用风险。
而RocketMQ事务消息创新性地将类似方案集成至消息中间件内部,形成一体化解决方案,显著降低了实现复杂度,使系统能更从容应对分布式事务的原子性挑战。
5. RocketMQ 事务消息的实现
①RocketMQ 事务消息实现的关键流程
RocketMQ 通过三阶段流程保障事务消息的可靠性:
半消息发送:生产者先发送状态为“未确认”的半消息至 Broker,此类消息暂不投递给消费者;
本地事务执行:半消息发送成功后,生产者执行本地事务(需与消息发送保持原子性);
最终状态确认:依据本地事务结果,生产者通知 Broker 将半消息标记为“确认”(投递消费者)或“回滚”(永久不可见)。
针对事务执行中的异常(如服务重启),RocketMQ 通过回查机制容错:Broker 定时扫描未确认消息,主动查询生产者以确认本地事务状态,据此更新消息最终状态。此设计确保在各种异常场景下,消息发送与本地事务的原子性得以保障,为后续基于 ACK 和重试机制实现分布式事务最终一致性奠定基础。
②RocketMQ 事务消息 VS 可靠消息模式
RocketMQ 事务消息的设计思想,本质上是将业界实践多年的可靠消息模式深度集成到消息中间件内部,使其成为一个开箱即用的通用特性。从流程上看,其核心机制与可靠消息模式高度相似(具体对比参见表 11-2),但通过内置化实现显著降低了使用复杂度。
这一集成方案具有重大意义:在整个消息中间件领域中,RocketMQ 是目前提供分布式事务解决方案中最优雅、业务接入最简单的一个选择。其通过将复杂的一致性保障机制封装于中间件底层,为开发者提供了兼顾可靠性与易用性的分布式事务处理能力。
③RocketMQ 事务消息的存储
RocketMQ 的事务消息在发送时需使用特定的 API,但其消息体构造与普通消息完全一致,并无特殊之处。然而,在这看似简单的实现背后,RocketMQ 通过诸多精巧的底层设计,实现了分布式事务的高可靠性保障。
标记事务消息
RocketMQ 对事务消息进行特殊标识处理,核心包含两个关键属性:
事务标识属性 (TRAN_MSG):标记为
true
,使 Broker 能够识别该消息为事务消息,从而进入特殊处理流程;生产者组标识属性 (PGROUP):值为消息所属的 ProducerGroup 名称,为 Broker 提供回查入口,确保在需要时能定位到正确的生产者进行状态确认。
通过这两项属性标记,RocketMQ 在消息底层无缝集成了事务状态管理与回查能力,既实现了事务消息的可靠识别,又为异常场景下的协调恢复提供了必要信息支撑,保障了分布式事务的完整性与可靠性。
暂存事务半消息
在 RocketMQ 的事务消息机制中,客户端发送的消息会携带特殊标记(TRAN_MSG=true)。Broker 接收后识别为事务消息,并执行“偷天换日”操作:将消息暂存至专用中转主题 RMQ_SYS_TRANS_HALF_TOPIC(该主题仅含一个队列 queueId=0),此时消息状态为“半消息”,避免被消费者提前消费。同时,Broker 将原始主题和队列信息存入消息属性(REAL_TOPIC 和 REAL_QID),为后续事务确认后的恢复投递提供依据。这一设计确保了事务消息在本地事务完成前的隔离性与安全性。
/**
* 事务消息桥接器核心实现类
* 此部分核心源码位于 TransactionalMessageBridge.java 中
*/
public class TransactionalMessageBridge
{
/**
* 存储事务半消息
*
* @param messageInner 原始消息内部对象
* @return 消息存储结果
*/
public PutMessageResult putHalfMessage(MessageExtBrokerInner messageInner)
{
// 核心存储操作:将预处理后的半消息持久化存储
return store.putMessage(parseHalfMessageInner(messageInner));
}
/**
* 从原始消息构造半消息体
* 完成关键预处理步骤:备份原始信息 → 重置事务标志 → 重定向存储位置
*
* @param msgInner 原始消息内部对象
* @return 处理后的半消息对象
*/
private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner)
{
// 备份消息的原主题名称与原队列ID到属性中
// 确保事务完成后能正确恢复投递到原始目标
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msgInner.getQueueId()));
// 重置消息系统标志位,标记为事务半消息(非最终状态)
msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));
// 半消息统一存储到 RMQ_SYS_TRANS_HALF_TOPIC 的第0个队列里
// 实现所有事务消息的集中管理,避免被消费者提前消费
msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());
msgInner.setQueueId(0);
// 序列化消息属性到字符串格式,确保持久化时属性信息完整保存
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
return msgInner;
}
}
从半消息中恢复原消息
RMQ_SYS_TRANS_HALF_TOPIC 作为事务消息的中转存储主题,在本地事务确认为提交状态(COMMIT_MESSAGE)后,Broker 会启动半消息恢复流程。恢复时,Broker 从消息属性中提取原始主题(REAL_TOPIC)和队列ID(REAL_QID),将消息重新定向至真实目标。由于恢复过程实质是创建新消息,半消息的物理属性(如 offset)将全新生成,不与原消息共用。
半消息与最终投递的真实消息在物理存储上完全独立,但逻辑上通过属性字段紧密关联。这种设计既保障了事务未决期间消息的隔离性,又确保了事务提交后消息能准确路由至原始目标队列,实现了物理隔离与逻辑统一的平衡。
/**
* 事务消息恢复处理器
* 核心代码位于 EndTransactionProcessor.java
* 负责处理事务的最终提交或回滚请求,并恢复原始消息
*/
public class EndTransactionProcessor implements NettyRequestProcessor
{
/**
* 处理事务提交或回滚请求
*
* @param ctx 网络通道上下文
* @param request 远程命令请求
* @return 处理结果
* @throws RemotingCommandException 处理异常
*/
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException
{
// ... 前面逻辑略
OperationResult result = new OperationResult();
// 如果是提交事务的请求则处理
if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback())
{
// 提交事务消息
result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader);
if (result.getResponseCode() == ResponseCode.SUCCESS)
{
RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
if (res.getCode() == ResponseCode.SUCCESS)
{
// 首先从半消息里恢复原事务消息的真实主题、队列,并设置事务ID
MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage());
// 设置事务 sysFlag - 重置事务标志位
msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(
msgInner.getSysFlag(), requestHeader.getCommitOrRollback()));
// 设置这条事务消息的物理属性
msgInner.setQueueOffset(requestHeader.getTranStateTableOffset());
msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset());
msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp());
// 去除事务消息的标记 - 清除事务准备属性
MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_TRANSACTION_PREPARED);
// 发送最终消息到目标队列
RemotingCommand sendResult = sendFinalMessage(msgInner);
if (sendResult.getCode() == ResponseCode.SUCCESS)
{
// 成功发送后删除准备状态的消息
this.brokerController.getTransactionalMessageService()
.deletePrepareMessage(result.getPrepareMessage());
}
return sendResult;
}
return res;
}
}
// 恢复原事务消息处理完成
// ... 其他逻辑
}
/**
* 从事务半消息中恢复原始消息内容
* 从半消息的用户属性中提取真实主题和队列信息,重建原始消息
*
* @param msgExt 事务半消息
* @return 恢复后的原始消息内部对象
*/
private MessageExtBrokerInner endMessageTransaction(MessageExt msgExt)
{
MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
// 恢复主题和队列 - 从用户属性中获取真实主题和队列ID
msgInner.setTopic(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_TOPIC));
msgInner.setQueueId(Integer.parseInt(
msgExt.getUserProperty(MessageConst.PROPERTY_REAL_QUEUE_ID)));
// 其他属性从原消息中复制恢复
msgInner.setBody(msgExt.getBody());
msgInner.setFlag(msgExt.getFlag());
msgInner.setBornTimestamp(msgExt.getBornTimestamp());
msgInner.setBornHost(msgExt.getBornHost());
msgInner.setStoreHost(msgExt.getStoreHost());
msgInner.setReconsumeTimes(msgExt.getReconsumeTimes());
msgInner.setWaitStoreMsgOK(false);
// 设置事务ID
msgInner.setTransactionId(
msgExt.getUserProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGEID_KEYIDX));
msgInner.setSysFlag(msgExt.getSysFlag());
// 确定标签过滤类型(多标签或单标签)
TopicFilterType topicFilterType =
(msgInner.getSysFlag() & MessageSysFlag.MULTITAGS_FLAG) == MessageSysFlag.MULTITAGS_FLAG
? TopicFilterType.MULTITAG
: TopicFilterType.SINGLE_TAG;
// 转换标签字符串为标签代码
long tagsCodeValue = MessageExtBrokerInner.tagsString2tagsCode(
topicFilterType, msgInner.getTags());
msgInner.setTagsCode(tagsCodeValue);
// 设置消息属性
MessageAccessor.setProperties(msgInner, msgExt.getProperties());
msgInner.setPropertiesString(
MessageDecoder.messageProperties2String(msgExt.getProperties()));
// 清理恢复过程中使用的临时属性
MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC);
MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID);
return msgInner;
}
}
事务消息回滚
当本地事务执行失败时,生产者需向 Broker 发送 rollback 确认(而非 commit)。Broker 收到后,会将对应的半消息从
RMQ_SYS_TRANS_HALF_TOPIC
“删除”——实质是将其转移至专用主题RMQ_SYS_TRANS_OP_HALF_TOPIC
中归档。此操作确保该消息不再参与后续的恢复流程与事务回查,彻底终止其生命周期。通过归档式删除,RocketMQ 实现了事务回滚消息的不可逆终止,避免因残留半消息引发错误恢复。此机制保障了事务失败场景的最终一致性,同时通过逻辑隔离降低了系统复杂度。
④RocketMQ 事务回查及状态
RocketMQ 的事务消息常规流程要求客户端顺利执行本地事务并正常向 Broker 提交状态,以便 Broker 更新半消息状态(投递或删除)。然而,在分布式环境下,这一过程存在显著不确定性,主要体现为两种典型场景:
半消息发送成功但本地事务执行超时(可能因数据库慢查询、RPC 或服务不可用导致,但超时未必意味失败);
本地事务成功但向 Broker 发送确认指令失败(因 Broker 作为分布式节点可能面临网络问题、服务超时或宕机)。
上述场景凸显了保障本地事务与消息状态最终一致的挑战。为此,RocketMQ 设计了回查机制:Broker 定期扫描未确认的半消息,并向原生产者查询其本地事务的最终状态,据此更新半消息状态。该机制通过主动协调解决了分布式场景下的状态同步难题,虽实现复杂,但为事务消息的可靠性提供了关键保障。
事务回查需要解决的难点
RocketMQ 在实现事务消息回查机制时面临三个关键挑战:
执行者故障后的状态确认:若事务执行方完成本地事务后服务不可用且长期未重启,需解决如何确认事务状态以保障订单最终一致的问题;
独立组件的状态自治:作为不依赖外部数据库的第三方组件,需实现半消息的自主存储及提交/回滚状态跟踪;
乱序提交下的回查协调:事务消息的提交顺序和发送顺序可能是不一样的, 如何维持哪些事务需要回查哪些事 务不需要回查呢? 例如事务消息发送可能是1、 2、 3,但是提交的顺序可能是2、 3、 1。 当第2 条消息提交的时候, 回查的消息是1、 3。 这听起来很自然, 但是在缺乏MySQL这种存储引擎 的帮助下, 要实现这个待回查表实际上是很难的。
回查对象高可用设计
为解决执行本地事务的实例可能因发布、重启等操作而宕机的难题,RocketMQ 引入了生产者组(ProducerGroup) 概念(专用于事务场景)。同一生产者组的实例被视为对等实体,共享事务状态查询能力。
事务消息携带特殊的
PGROUP
属性(存储生产者组名称),使 Broker 可精准定位消息来源。回查时,Broker 通过其维护的生产者组连接记录(代码见ProducerManager.java
),向组内任一可用实例发起查询,理论上均可获取事务真实状态。此设计通过组内实例的等价性与连接冗余性,保障了回查操作的高可用性与可靠性。
半消息的高可用存储设计
半消息实际存储于专用主题
RMQ_SYS_TRANS_HALF_TOPIC
,其物理存储结构与普通消息完全一致,因此天然继承 RocketMQ 消息存储的高可用特性(包括主备复制、持久化落地等),无需额外设计即可解决半消息的可用性挑战(见图11-25)。针对回查需避免全量扫描的问题,RocketMQ 利用半消息单一队列顺序存储的特性,通过虚拟消费者组
CID_RMQ_SYS_TRANS
的消费位点记录上次回查位置。每次回查仅需从该位点向后扫描新消息,复用消费位点的持久化与高可用设计,既确保回查进度可靠性,又大幅提升查询效率(见图11-26)。
记录半消息的处理状态
RocketMQ 在处理半消息时具有顺序灵活性:无论是 Broker 主动发起的回查,还是客户端触发的提交或回滚操作,其执行顺序均可根据实际场景动态调整,无需严格遵循特定时序。
事务消息的存储顺序与最终提交/回滚状态的实际确定顺序可能存在差异。例如:生产者顺序发送 5 条消息(msg1 至 msg5),但因各消息处理耗时不同,可能出现 msg2 比 msg1 更早提交,或 msg4 因快速异常回滚而比 msg1 更早确定状态的情况。这种乱序特性由系统异步处理机制自然形成,并不影响事务的最终一致性保障。
存储顺序和状态确认的顺序不一致的主要原因如下:
不同的本地事务处理时间可能不一样, 半消息msg1可能比msg2先存储, 但是事务T2 可能比T1更早提交确认, 故msg2比msg1会更早确认半消息状态。
Broker 发起回查是按照消息存储顺序发起的, 但是单次消息的回查是异步的, 而且是 不可靠的。 试想一下, 如果消息的回查是同步的, 那么有一个事务生产者的回查结果卡死了, 就会阻塞整条队列里半消息的回查任务, 故而RocketMQ的回查都是异步不等待结果的, 需要 等待客户端异步确认事务状态, 所以回查的顺序可能是msg1、 msg2、 msg3, 但是收到确认状 态可能是乱序的 (如msg3、 msg2、 msg1)。
-
存储机制与回查进度管理的局限:半消息基于 CommitLog 顺序写入,无法随机修改已写入内容,且 RMQ_SYS_TRANS_HALF_TOPIC 主题无法记录消息确认状态。回查进度依赖虚拟消费者的消费位点,但因确认顺序可能乱序(如 msg4 和 msg2 已确认但 msg0 未确认),无法直接更新回查位点到已确认位置,否则会跳过未处理消息,导致回查机制失效。
前文介绍过哪些消息需要回查哪些不回查 (实际上是已处理的半消息的进度) 是 依赖半消息队列上一个消费进度的, 但是现在确认顺序可能完全是打乱的, 这会有什么困难 呢? 如图11-28 所示, msg4和msg2 的位点是3 和1, 虽然其都是已确认状态, 但是RocketMQ 的虚拟消费者CID_RMQ_SYS_TRANS 肯定不能把回查位点更新到3 或者1的位置上, 因为0 这个位置的消息还没确认.
-
RocketMQ 为保障事务消息的完整生命周期管理,设计了三个协同工作的主题:
原消息真实主题:存储业务原始消息;
RMQ_SYS_TRANS_HALF_TOPIC:专用于暂存半消息(状态未决);
RMQ_SYS_TRANS_OP_HALF_TOPIC(OP主题):记录半消息的最终处理状态(commit/rollback),标记为删除状态以避免重复回查。
OP主题采用单队列结构,其核心价值在于通过状态持久化实现消息处理状态的可靠跟踪。三主题协作完成状态流转:当半消息被提交或回滚时,状态变更被记录至OP主题,形成闭环管理,确保每条事务消息的生命周期可追溯、不重复处理。
-
状态流转:
半消息提交状态处理
当 Broker 接收到半消息的 commit 指令后,执行两个核心操作:首先写入一条 OP 消息,其消息体存储该 commit 消息在队列中的偏移量(queueOffset),作为指向原半消息的指针地址,同时标记原半消息为“已删除”状态(此标记使后续回查机制忽略该消息);随后读取该半消息,还原其原始主题等元数据,并重新写入目标主题,转化为可被消费者正常消费的普通消息。
半消息回滚状态处理
当Broker 接收到一个半消息是rollback 状态时, Broker 同样写入 OP 消息, 流程和 commit 一样, 也会标记这条半消息是被删除的。 但后续不会读取和还原该半消息。 这样消费者就不 会消费到该消息, 如图11-30所示。
半消息unknow 状态处理
当 Broker 接收到状态为 "unknown" 的半消息时,不会立即处理,而是依赖事务回查机制确认其最终状态(commit 或 rollback)。这一设计避免了因本地事务未完成或状态未确认而导致的误操作。
即使 commit 或 rollback 指令发送失败(如 Broker 故障或网络闪断),处理流程仍会退回至事务回查阶段,通过重试机制确保消息状态最终被正确确认。这种设计保障了事务消息在各种异常场景下的可靠性与最终一致性。
6. RocketMQ 事务消息与Kafka 事务消息的对比
尽管 Kafka 与 RocketMQ 均提供事务消息功能,但两者设计目标与解决场景存在根本区别,并非简单的优化关系。RocketMQ 事务消息核心解决本地事务与消息发送的最终一致性问题,而 Kafka 事务消息则专注于实现 Exactly-Once(精确一次) 语义,旨在避免消息重复投递或丢失,确保流处理场景下的端到端数据准确性。
理解 Kafka 事务消息需首先掌握 Exactly-Once 概念,其设计初衷与 RocketMQ 的分布式事务保障路径截然不同。
①Kafka 想要解决的Exactly-Once 是什么
Kafka 提供三种消息投递语义,其核心区别在于对消息持久化行为的保证程度:
At-most-once(至多一次):消息最多被持久化一次,可能丢失但绝不重发,对应 Producer 配置
acks=0
;At-least-once(至少一次):消息最少持久化一次,绝不丢失但可能重复,如
acks=all
下超时重发致消息写入两次;Exactly-once(精确一次):消息肯定且仅持久化一次,兼顾防丢与防重。
需注意 Exactly-once 语义主要解决消息重复问题(如生产者重试导致的重复写入),其实现依赖于 Kafka 事务机制而非业务侧的事务处理。该特性通过服务端去重保障流处理场景的端到端准确性,与传统数据库事务的 ACID 特性有本质区别。
②Kafka 事务消息的表现
Kafka 的幂等 Producer 通过设置
enable.idempotence=true
实现,Broker 为每条消息生成唯一 ID 进行去重,支持 Exactly-once 语义。但该机制有两个局限:仅保证单分区内的消息不重复,无法跨多分区;且仅支持单会话幂等,Producer 重启后无法维持跨会话幂等性。为解决幂等 Producer 的不足,Kafka 引入事务 Producer,支持多分区数据的原子性写入和跨会话的 Exactly-once 处理语义(即使 Producer 重启也能保证数据只处理一次)。需配合事务 Consumer 使用,方可实现端到端的 Exactly-once 保证。
Kafka 事务机制专注于消息的 Exactly-Once 语义,适用于流处理场景;而 RocketMQ 的事务设计旨在解决本地事务与消息发送的最终一致性问题,两者应用场景有本质区别。
十二. RocketMQ延迟消息原理
1. 互联网场景下延迟任务的常见场景
延迟任务指需在未来特定时间点触发的业务操作,常见于三类场景:
支付关单:订单下单后需在30分钟后关闭未支付订单;
账单提醒:信贷场景中凌晨跑批生成的还款提醒,需延迟至早10:00发送以避免骚扰用户;
外部系统超时处理:如审核系统回调超时(如30分钟未返回),需启动超时兜底策略(如自动发布文章)。
-
传统实现方案:定时任务轮询
传统方案采用定时任务(如每分钟执行)扫描业务数据:
扫描条件:筛选未支付订单且下单时间在31分钟内(30分钟时效+1分钟间隔);
执行操作:对符合条件的订单执行关单等操作。
此方案通过高频轮询平衡时效性与实现复杂度,是延迟任务的典型基础实现。
2. 使用定时任务处理延迟任务的一些问题
①时效性
定时任务方案(如每分钟执行)在处理延迟操作时存在固有缺陷:订单若在周期临界点生成(如9:30:01),可能因存活时间不足整周期而被下一轮扫描遗漏(如10:00:00任务忽略29分59秒订单),导致实际执行延迟近1分钟(延至10:01:00)。虽可通过缩短执行间隔(如X秒)降低延迟,但更高频的扫描会加剧系统性能消耗,形成时效性与资源开销的矛盾。
②扫描性能
当订单数据量达到千万级别时,定时任务扫描方案面临严峻的性能挑战:每次处理超时订单均需对全量数据进行扫描(如1000万条记录),即使通过索引优化查询,其开销依然巨大。若订单表进一步采用分库分表策略(如拆分为128张表),则需对所有分表执行全表扫描,导致资源消耗成倍增长,严重制约系统扩展性与响应效率。
③单一周期处理量问题
在双11等大促活动凌晨时段,系统可能瞬时产生海量订单,其中包含大量需超时关单的订单。例如在0:30需集中处理关单操作时,若单个定时任务周期内无法完成所有订单处理(如千万级数据量),会导致任务堆积,进而引发后续定时任务连锁拥塞,形成处理能力与任务积压的恶性循环。
3. 认识RocketMQ 延迟消息
①RocketMQ 延迟消息的优点
RocketMQ 通过定时消息机制优化延迟任务处理:业务方(如订单系统)可在下单时发送携带业务数据(如订单号)的定时消息,并指定投递时间(如30分钟后)。消息中间件在约定时间精准投递,消费者接收后执行对应业务逻辑(如查询订单状态并关单)。
时效性保障:依赖消息中间件的高性能投递机制,规避数据库扫描延迟;
性能与扩展性:无数据库全表扫描压力,通过增加消费者即可提升并发处理能力;
架构简洁性:替代繁琐的定时任务调度与扫描逻辑,实现更优雅的业务解耦。
需注意 RocketMQ 暂不支持任意精度定时(如秒级),仅提供固定延迟级别(如1s/5s/10s/30s/1m等),但其设计已覆盖典型业务场景,在性能与易用性上显著优于传统方案。
②RocketMQ 延迟消息的代码示意
/**
* RocketMQ 延迟消息示例类
* 演示如何发送和消费延迟消息,用于处理如订单关单等延迟任务
*/
public class RocketMQDelayMessageExample {
/**
* 发送延迟消息的示例代码
* 相关的代码也非常简单,发送延迟消息的示意代码如下
*/
public void sendDelayMessage() throws Exception {
// 正常启动一个生产者
DefaultMQProducer producer = new DefaultMQProducer("DelayProducerGroup");
producer.start();
// 和普通消息一样,构建对应的消息体
String orderNo = "LO123456789"; // 订单号
Message delayMsg = new Message();
delayMsg.setTopic("TOPIC-CLOSE-ORDER");
delayMsg.setBody("LO123456789".getBytes()); // 延迟消息的消息体,这里只需要存放订单号即可
// 唯一不同:对消息设置延迟level为16,默认配置下,16对应为延迟30min
delayMsg.setDelayTimeLevel(16);
// 和普通消息一样调用相同的方法进行发送
producer.send(delayMsg); // 发送消息,API和普通发送消息一致
producer.shutdown();
}
/**
* 消费延迟消息的示例代码
* 消费者端注册并发消息监听器处理延迟消息
*/
public void consumeDelayMessage() throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("DelayCloseOrderConsumer");
consumer.subscribe("TOPIC-CLOSE-ORDER", "*"); // 订阅延迟消息的主题
// 注册回调,里面有处理关单逻辑
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages,
ConsumeConcurrentlyContext context) {
for (MessageExt message : messages) {
// 打印日志,通过时间差确认是否按预期的延迟投递
System.out.printf("接收到延迟消息[ msgId=%s ms later ] \n", message.getMsgId(),
System.currentTimeMillis() - message.getStoreTimestamp());
String orderNo = new String(message.getBody()); // 从消息体里解析出订单号
// 检查订单,如果超时就执行关单操作。注意这里的代码,假设程序有一个orderDao的对象封装好了此逻辑
// orderDao.closeOrderIfUnpaid(orderNo);
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}
/**
* 延迟级别配置说明:
* 在Broker里面有一个配置项叫作messageDelayLevel,在不修改设置的情况下总共有18个级别,其值如下:
* 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
* 其中后面的字母表示单位,分别是秒(s)、分钟(m)、小时(h)和天(d)。
* 而这里每个位置就是delay level的值(从1开始计算),所以30分钟在默认的配置下就是16。
* 一般情况下,默认的等级已经够用。若不够用通常只需要往后追加级别即可,例如需要一天后生效的则改为1d。
*/
}
4. 延迟消息实现原理
①RocketMQ 延迟消息存储实现
RocketMQ 的延迟消息在存储层面与普通消息及事务消息采用相同的机制,均基于 commitlog 加 consumequeue 的架构实现。因此,其存储可靠性与普通消息完全一致,不会因采用延迟特性而降低可靠性。该设计通过多副本机制保障数据安全,确保即使单个 Broker 节点发生故障,消息也不会丢失。
②RocketMQ 延迟消息转存
RocketMQ 的延迟消息通过“转存”机制实现时间控制:Broker 在存储消息时,若检测到 delayLevel 设置,会将消息转存至无消费者订阅的 SCHEDULE_TOPIC_XXX 主题,避免提前投递。转存时,真实主题和队列 ID 分别存入 REAL_TOPIC 和 REAL_QID 属性(转存逻辑位于 Commitlog.java)。
投递时间由存储时间加延迟级别计算得出,其绝对值存储在 consumequeue 的 TagCode 字段中(设计为 8 字节以容纳时间戳)。当到达投递时间时,调度模块将消息恢复至原主题队列供消费。tagCodes 字段在此过程中起关键作用,用于判断消息是否到达调度时间。
// 如果消息是延迟消息
if (msg.getDelayTimeLevel() > 0) {
// 检查延迟级别是否超过最大允许值,如果超过则设置为最大延迟级别
if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
}
}
// 主题改为延迟消息的主题暂存:使用 RocketMQ 系统预定义的调度主题
topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
// 队列则依据其延迟级别计算得出:通过延迟级别映射到对应的队列ID
int queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
// 在消息中备份原来真实的主题和真实的队列:将原始主题和队列信息存储为消息属性,以便后续恢复
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
// 将消息属性序列化为字符串格式,确保持久化时属性信息完整保存
msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
// 设置消息的新主题和队列ID,完成延迟消息的转存操作
msg.setTopic(topic);
msg.setQueueId(queueId);
}
③延迟主题的队列
延迟消息的转存逻辑与事务消息类似,但关键区别在于其专用主题
SCHEDULE_TOPIC_XXXX
的队列数量并非单一。队列总数直接由messageDelayLevel
配置项决定:以默认设置为例,18 个延迟级别对应 18 个独立队列,队列 ID 与延迟级别一一映射(如队列 0 存储级别 1 的消息,队列 1 存储级别 2 的消息,依此类推)。此设计通过队列隔离实现不同延迟级别消息的并行管理,提升处理效率。
④延迟消息的调度
RocketMQ 作为由程序员开发的系统,在实现调度功能时不可避免地需要编写定时任务。其底层调度机制是基于 Java 标准库中的
ScheduledExecutorService
类来实现的,该类提供了强大的定时任务执行能力,用于管理延迟执行和周期性任务。
⑤调度的性能问题
RocketMQ 的 ConsumeQueue 在设计上不支持基于延迟级别的搜索或删除操作,这导致在处理延迟消息时必须进行全量扫描。例如,当队列中存在 10,000 条消息(其中 5,000 条已投递)且每秒需筛选 100 条当前投递的消息时,系统每次仍需扫描全部消息,造成显著的性能负担。此问题与传统定时任务的全表扫描瓶颈类似。
消息的存储顺序与投递顺序可能不一致(如存储为 MSG1、MSG2、MSG3,投递顺序可能为 MSG3、MSG1、MSG2),因此无法通过记录投递进度来优化扫描范围。投递完成后,下次扫描不能直接从上次位点开始,因为前方可能存在投递时间未来的消息。
尽管可借鉴事务消息的 OP 主题机制(通过额外主题记录处理状态),但延迟消息调度频率高,引入 OP 主题会导致大量重复存储,增加性能开销。因此 RocketMQ 未采用此方案。值得注意的是,同一延迟级别下的消息(如均为 10min 延迟),其存储与投递顺序一致;但不同级别消息按级别顺序投递,这一特性为部分场景提供了有限优化。
⑥分队列调度解决投递顺序问题
RocketMQ 的
SCHEDULE_TOPIC_XXXX
主题采用多队列设计(非单队列),核心目的是实现队列 ID 与延迟级别的一一对应。此设计确保每个队列仅处理特定延迟级别的消息,且消息在队列内严格按投递时间排序,从而保障存储顺序与投递顺序的一致性,避免乱序投递。由于每个队列的延迟级别固定且消息按时间有序,系统可基于延迟级别维度管理投递进度:已投递消息无需重复扫描,进度持续向前推进。投递进度以 JSON 格式持久化至
$HOME/store/config/delayOffset.json
文件(如{"offsetTable": {"1": 14, "2": 1}}
),记录各队列 ID 的当前偏移量。该机制确保 Broker 重启后能快速恢复进度,维持延迟消息处理的可靠性与效率。
⑦调度的性能优化
消息按预期投递时间排序(相同延迟级别下与存储时间顺序一致)。扫描时无需遍历全队列,仅需检查下一条消息:若未到投递时间,则后续消息皆可跳过;若已到时间,则投递后继续扫描下一条,极大提升处理效率。
⑧调度的延迟
RocketMQ 的调度器作为普通任务运行,固有存在一定延迟,主要源于三种场景:
队列空置延迟:当扫描完成所有延迟消息后,调度器会进入100ms睡眠再重启检查,导致最大100ms延迟;
时间未到延迟:若首条消息投递时间与当前时间差极小(如1ms),调度器仍会睡眠100ms后重试,同样引入最多100ms延迟;
IO堆积延迟:大量消息同时投递时,因需IO操作将消息恢复至原主题,可能因处理瓶颈造成部分消息投递延后。
5. 实现任意精度的定时消息
RocketMQ 的延时消息功能仅支持预设的时间精度(如固定延迟级别),无法满足所有场景下的精确定时需求。例如,在用户注册后次日凌晨6点发送通知的用例中,由于注册时间点的细微差异(如23:00:00与23:00:01),所需延迟时间可能不同(6小时与5小时59分59秒),导致消息投递时间无法精准对齐目标时刻。这种局限性凸显了在需要高精度时间控制的业务场景中,系统需依赖更灵活的定时消息机制,以确保任务在特定时间点准确触发。
①简易逼近法
RocketMQ 的定时消息功能通过一种简单的重复延时机制实现:即使目标时间点较远(如500米外的学校可通过1000步到达),也可通过多次短延时(如1秒级)消息串联达成。具体实现时,消息体携带投递时间戳(如deliveryTime字段),消费者检查当前时间若未到目标,则重新发送回原Topic继续等待;若到达则触发业务逻辑。此方案支持任意时刻的定时需求,精度误差控制在1秒内,得益于RocketMQ最小延迟级别为1秒的设计。
/**
* 自定义延迟消息消费者示例
* 通过消息重发机制实现精确定时投递
*/
public class DelayMessageConsumer
{
/**
* 注册并发消息监听器
* 核心逻辑:检查消息投递时间,未到目标时间则重新发送延迟消息
*/
public void setupConsumer()
{
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("YourConsumerGroup");
// ... 初始化消费者配置
consumer.registerMessageListener(new MessageListenerConcurrently()
{
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context)
{
for (MessageExt msg : msgs)
{
// 解析这条消息成为一个自定义的结构,通常情况下就是 JSON 反序列化
MyMsg myMsg = parse(msg);
// 拿到这个消息目标投递的时间戳
long deliveryTime = myMsg.getDeliveryTime();
// 如果时间已经到了,就立刻执行消费逻辑
if (System.currentTimeMillis() >= deliveryTime)
{
handle(myMsg); // 做对应的消费逻辑
} else
{
// 时间没到,就继续重复发送这条消息,等到时间到了才执行消费逻辑
// 复制这个消息
MessageExt newMsg = cloneMsg(msg);
// 新消息固定延迟 1s
newMsg.setDelayTimeLevel(1);
// 把这个新消息发送回原主题,新消费的时候会重复一样的逻辑
sendMsg(newMsg);
}
}
// 已经发送新消息回主题了,这条消息就可以认为消费成功了
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
}
// 以下为需要实现的辅助方法示例
private MyMsg parse(MessageExt msg)
{
// 实现消息解析逻辑(如JSON反序列化)
return null;
}
private MessageExt cloneMsg(MessageExt msg)
{
// 实现消息复制逻辑
return null;
}
private void sendMsg(MessageExt msg)
{
// 实现消息发送逻辑
}
private void handle(MyMsg myMsg)
{
// 实现具体的消息处理逻辑
}
// 自定义消息结构示例
static class MyMsg
{
private long deliveryTime;
// 其他字段...
public long getDeliveryTime()
{
return deliveryTime;
}
}
}
②最优逼近法模拟任意时刻的定时消息
简易逼近法虽可模拟任意时刻定时消息,但效率低下:例如延迟10小时1秒需重复发送消息36,001次,代价过高。为此,可基于RocketMQ固有18个延迟级别(如1s、2h等)采用组合逼近策略:以延迟10小时1秒为例,通过5次2小时延迟(级别18)逐步逼近剩余时间,最后叠加1秒延迟(级别1),仅需发送6条消息即可实现目标。该方案通过计算最接近的delayLevel动态选择延迟级别,显著降低消息重复次数,平衡了精度与性能。
/**
* RocketMQ 定时消息消费者示例
* 通过消息重发机制实现精确定时投递,支持任意目标时间点的消息处理
*/
public class DelayMessageConsumer
{
/**
* 设置消费者并注册消息监听器
* 核心逻辑:检查消息投递时间,未到目标时间则重新发送延迟消息
*/
public void setupConsumer()
{
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("YourConsumerGroup");
// 消费者初始化配置(如NameServer地址、订阅主题等)
// consumer.setNamesrvAddr("localhost:9876");
// consumer.subscribe("YourTopic", "*");
// 注册并发消息监听器
consumer.registerMessageListener(new MessageListenerConcurrently()
{
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context)
{
for (MessageExt msg : msgs)
{
// 解析这条消息成为一个自定义的结构,通常情况下就是 JSON 反序列化
MyMsg myMsg = parse(msg);
// 拿到这个消息目标投递的时间戳
long deliveryTime = myMsg.getDeliveryTime();
// 如果时间已经到了,就立刻执行消费逻辑
if (System.currentTimeMillis() >= deliveryTime)
{
handle(myMsg); // 做对应的消费逻辑
} else
{
// 时间没到,就继续重复发送这条消息,等到时间到了才执行消费逻辑
// 复制这个消息
MessageExt newMsg = cloneMsg(msg);
// 计算最接近投递时间的一个延迟级别
int nextDelayLevel = countNextDelayLevel(deliveryTime);
newMsg.setDelayTimeLevel(nextDelayLevel);
// 把这个新消息发送回原主题,新消费的时候会重复一样的逻辑,如果时间还没到会继续重复发送
sendMsg(newMsg);
}
}
// 打印接收消息日志(修复原图格式错误)
System.out.printf("Receive New Messages:%s %n", Thread.currentThread().getName(), msgs);
// 已经发送新消息回主题了,这条消息就可以认为消费成功了
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
}
/**
* 计算下一个延迟级别
* 根据目标投递时间动态选择最接近的RocketMQ延迟级别
*
* @param deliveryTime 目标投递时间戳
* @return 延迟级别(1-18)
*/
public static int countNextDelayLevel(long deliveryTime)
{
long now = System.currentTimeMillis();
// 定义延迟时间数组,对应RocketMQ的18个延迟级别
long[] delayTimes = new long[]{
now + TimeUnit.SECONDS.toMillis(1), // 1s
now + TimeUnit.SECONDS.toMillis(5), // 5s
now + TimeUnit.SECONDS.toMillis(10), // 10s
now + TimeUnit.SECONDS.toMillis(30), // 30s
now + TimeUnit.MINUTES.toMillis(1), // 1m
now + TimeUnit.MINUTES.toMillis(2), // 2m
now + TimeUnit.MINUTES.toMillis(3), // 3m
now + TimeUnit.MINUTES.toMillis(4), // 4m
now + TimeUnit.MINUTES.toMillis(5), // 5m
now + TimeUnit.MINUTES.toMillis(6), // 6m
now + TimeUnit.MINUTES.toMillis(7), // 7m
now + TimeUnit.MINUTES.toMillis(8), // 8m
now + TimeUnit.MINUTES.toMillis(9), // 9m
now + TimeUnit.MINUTES.toMillis(10), // 10m
now + TimeUnit.MINUTES.toMillis(20), // 20m
now + TimeUnit.MINUTES.toMillis(30), // 30m
now + TimeUnit.HOURS.toMillis(1), // 1h
now + TimeUnit.HOURS.toMillis(2) // 2h
};
int returnIndex = 0;
// 倒序遍历延迟时间数组,找到第一个小于或等于目标投递时间的索引
for (int i = delayTimes.length - 1; i >= 0; i--)
{
long time = delayTimes[i];
if (time > deliveryTime)
{
continue;
} else
{
returnIndex = i;
break;
}
}
// 返回索引加1(因为RocketMQ延迟级别从1开始)
return returnIndex + 1;
}
// 以下为需要实现的辅助方法示例
private MyMsg parse(MessageExt msg)
{
// 实现消息解析逻辑(如JSON反序列化)
return null;
}
private MessageExt cloneMsg(MessageExt msg)
{
// 实现消息复制逻辑
return null;
}
private void sendMsg(MessageExt msg)
{
// 实现消息发送逻辑
}
private void handle(MyMsg myMsg)
{
// 实现具体的消息处理逻辑
}
// 自定义消息结构示例
static class MyMsg
{
private long deliveryTime;
// 其他字段...
public long getDeliveryTime()
{
return deliveryTime;
}
}
}
③代理定时消息方案
为实现精确定时消息(误差控制在1秒内)同时避免消费者感知定时逻辑,可采用代理服务模式:消息发送者将定时消息发送至预留系统主题(如
SYSTEM_SCHEDULE_MSG
),并在消息属性中设置目标投递时间和真实主题(如短信主题topic_sms_push
)。代理服务作为专属消费者,监听该系统主题,通过逼近算法计算 RocketMQ 内置延迟等级(如1s、5s等),动态重发消息至系统主题暂存。到达投递时间后,代理服务将消息转发至真实目标主题,此时业务消费者(如短信服务)接收后立即处理,无需关注定时机制(重发逻辑)。此方案通过代理层封装复杂性,兼顾了定时精度、系统解耦与开发便利性,依托 RocketMQ 原生延迟功能降低实现成本。
④5.x 版本定时消息展望
/**
* RocketMQ 5.x 定时消息发送示例
* 展示新版本中精确定时消息的使用方式,使用 deliveryTimestamp 参数替代旧版的 delay level
*/
public class RocketMQ5DelayMessageExample
{
public void sendDelayMessage(Producer producer)
{
// 定时/延时消息发送
MessageBuilder messageBuilder = null; // 需初始化为具体的 MessageBuilder 实例
// 以下示例表示:延迟时间为10min之后的UNIX时间戳
Long deliverTimeStamp = System.currentTimeMillis() + 10L * 60 * 1000;
// 构建消息对象
Message message = messageBuilder.setTopic("topic")
// 设置消息索引键,可根据关键字精确查找某条消息
.setKeys("messageKey")
// 设置消息Tag,用于消费端根据指定Tag过滤消息
.setTag("messageTag")
// 设置精确投递时间戳(5.x版本新特性)
.setDeliveryTimestamp(deliverTimeStamp)
// 消息体
.setBody("messageBody".getBytes())
.build();
try
{
// 发送消息,需要关注发送结果,并捕获失败等异常
SendReceipt sendReceipt = producer.send(message);
System.out.println(sendReceipt.getMessageId());
} catch (ClientException e)
{
e.printStackTrace();
}
}
}