九. RocketMQ消息过滤原理

1. 认识消息过滤

①消息过滤的场景

  • 在消息系统中,同一主题下的消息往往需要根据消费者类型进行选择性投递。典型场景包括:

  • 多环境消息隔离​​:测试环境1的服务实例不应接收测试环境2的消息,需通过过滤实现环境隔离;

  • 业务维度筛选​​:B2C业务系统需过滤掉B2B订单消息,避免无关消息干扰核心业务流程。

  • 通过消息过滤(如Tag、SQL表达式等),可实现消息的精细化路由,确保消费者仅接收与其相关的消息子集,提升系统效率并降低处理冗余数据的开销。

②消息过滤的方式

  • 客户端过滤​​:消息中间件(Broker)将主题下的所有消息通过网络全部推送给消费者客户端,由客户端根据自身需求进行筛选和处理。例如,Broker 发送10条消息,客户端可能只处理其中的2条。这种方式可能造成不必要的网络带宽消耗,尤其当过滤掉的消息占比较大时。

  • 服务端过滤​​:过滤逻辑在 Broker 端执行。Broker 在投递前就根据消费者预设的规则(如标签Tag或SQL属性条件)进行筛选,仅将符合条件的目标消息发送给消费者。例如,从10条消息中筛选出2条进行投递。此机制有效减少了冗余的网络传输,提升了整体效率。

③客户端过滤VS服务端过滤

  • 消息过滤可分为服务端过滤与客户端过滤,其本质差异在于过滤发生的位置及对资源的影响。服务端过滤(如RocketMQ支持的Tag或SQL过滤)类似于数据库查询中的WHERE条件筛选,仅在Broker端返回符合条件的目标消息,极大减少了网络传输数据量。而客户端过滤则需要消费者接收全部消息后自行筛选,会造成大量冗余数据传输和资源浪费。

  • 在高性能消息中间件(如RocketMQ)的海量消息场景中,网卡资源常成为系统瓶颈。服务端过滤通过牺牲少量Broker端CPU资源,显著节约了宝贵的网络带宽,避免无用消息对消费者端网络和计算资源的占用。这一特性虽不影响业务功能实现,但对提升系统整体吞吐、降低运营成本具有关键意义。

④服务端过滤的必要性

  • RocketMQ 的服务端消息过滤功能解决了单纯依赖 Topic 分类的局限性。虽然 Topic 本身可作为消息过滤手段(如不同消息投递至不同 Topic),但完全依赖 Topic 会导致主题数量爆炸式增长(如为每个产品线和操作类型创建独立 Topic),引发运维复杂性和性能问题。更关键的是,这种硬编码方式缺乏灵活性:新增主题需修改发送代码,且无法支持多维度过滤(如同时按产品线和操作类型筛选)

  • 通过 RocketMQ 的服务端过滤(如 Tag 或 SQL 表达式),只需一个统一主题(如 topic_order_event),消费者即可基于消息属性订阅特定内容(如 B2C 产品线的支付消息)。这不仅减少了主题数量,降低了运维负担,还实现了消息的精细化路由,提升系统效率和可维护性。服务端过滤通过牺牲少量 Broker CPU 资源,显著节约网络带宽,避免冗余数据传输,是高并发场景下的最优选择。

⑤RocketMQ 的消息过滤模式

/**
 * RocketMQ 消息过滤机制示例类
 * 
 * 本类演示了 RocketMQ 的消息过滤原理,重点强调其服务端过滤特性。
 * 尽管客户端代码可能包含基于 Tag 的 switch-case 逻辑,但这并非过滤本身,
 * 而是不同 Tag 消息所需的差异化业务处理。RocketMQ 的过滤实际发生在服务端(Broker),
 * 仅将符合条件(如 Tag 或 SQL 属性)的消息投递给消费者,减少网络传输开销。
 * 
 * 发展历程:
 * - 最初支持标签过滤(Tag)和类过滤(Class Filter),后者允许自定义类实现复杂过滤。
 * - 贡献给 Apache 后,增加了 SQL92 语法属性过滤,基于消息 Property 进行多维度筛选。
 * - 类过滤模式因维护复杂已被移除,当前官方支持标签过滤和属性过滤两种模式。
 * 
 * 理解这一背景有助于阅读源码时识别兼容性设计,例如某些“怪异”代码可能是历史遗留。
 */
public class MessageFilterExample implements MessageListenerConcurrently {

    /**
     * 消息监听器实现方法
     * 此方法常被误解为客户端过滤,但实际上 RocketMQ 已通过服务端过滤减少了消息量。
     * 客户端收到的消息均是 Broker 筛选后的结果,这里的 switch-case 仅用于业务逻辑分发。
     * 
     * @param msgs 消息列表(已由服务端过滤)
     * @param context 消费上下文
     * @return 消费状态
     */
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                    ConsumeConcurrentlyContext context) {
        for (MessageExt msg : msgs) {
            // 基于 Tag 的业务逻辑处理:服务端过滤后,客户端仅需处理收到的消息
            switch (msg.getTags()) {
                case "TAG1":
                    // 消费 Tag1 消息的逻辑
                    processTag1(msg);
                    break;
                case "TAG2":
                    // 消费 Tag2 消息的逻辑
                    processTag2(msg);
                    break;
                default:
                    // 其他 Tag 处理(如有)
                    break;
            }
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }

    private void processTag1(MessageExt msg) {
        // 实现 Tag1 消息的具体业务逻辑
        System.out.println("Processing TAG1 message: " + new String(msg.getBody()));
    }

    private void processTag2(MessageExt msg) {
        // 实现 Tag2 消息的具体业务逻辑
        System.out.println("Processing TAG2 message: " + new String(msg.getBody()));
    }
}

2. 标签过滤模式的使用及原理

①标签过滤模式的使用

/**
 * RocketMQ 标签过滤模式示例
 * 
 * 标签过滤是最早的消息过滤模式,也是截至目前使用最多的模式之一。
 * 以下代码演示了生产者和消费者如何利用标签过滤进行消息处理。
 */
public class TagFilterExample {

    /**
     * 生产者发送消息示例
     * 发送消息时需要指定标签,如下代码将标签设置为"TAGA"
     * 注意:RocketMQ不支持给一个消息打上多个标签
     */
    public void producerExample() throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("ProducerGroup");
        producer.start();
        
        // 创建消息并指定标签
        Message msg = new Message(
            "TopicTest", 
            "TAGA",  // 单标签设置
            "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)
        );
        
        producer.send(msg);
        producer.shutdown();
    }

    /**
     * 消费者订阅和处理消息示例
     * 消费者可以订阅多个标签的消息(使用||分隔符)
     * 虽然过滤实际发生在服务端,但客户端仍需区分不同标签的业务逻辑
     */
    public void consumerExample() throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroup");
        
        // 订阅多个标签的消息
        consumer.subscribe("TopicTest", "TAGA || TAGB || TAGC");
        
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                           ConsumeConcurrentlyContext context) {
                /**
                 * 重要说明:虽然此处使用switch-case区分标签处理逻辑,
                 * 但实际的消息过滤(筛选哪些消息应该投递给消费者)是在Broker服务端完成的。
                 * 客户端收到的消息已经是服务端过滤后的结果,这里的switch-case仅用于
                 * 区分不同标签消息的业务处理逻辑,并非执行过滤操作。
                 */
                for (MessageExt msg : msgs) {
                    switch (msg.getTags()) {
                        case "TAGA":
                            // 处理TAGA标签的消息
                            processTagA(msg);
                            break;
                        case "TAGB":
                            // 处理TAGB标签的消息
                            processTagB(msg);
                            break;
                        case "TAGC":
                            // 处理TAGC标签的消息
                            processTagC(msg);
                            break;
                        default:
                            // 处理其他标签(如有)
                            processDefault(msg);
                    }
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        
        consumer.start();
    }

    // 以下为各标签处理方法的伪实现
    private void processTagA(MessageExt msg) {
        System.out.println("Processing TAGA: " + new String(msg.getBody()));
    }

    private void processTagB(MessageExt msg) {
        System.out.println("Processing TAGB: " + new String(msg.getBody()));
    }

    private void processTagC(MessageExt msg) {
        System.out.println("Processing TAGC: " + new String(msg.getBody()));
    }

    private void processDefault(MessageExt msg) {
        System.out.println("Processing default tag: " + new String(msg.getBody()));
    }
}

②标签服务端过滤原理

  • 尽管 RocketMQ 支持服务端消息过滤(基于 Tag 的哈希值比较,存储在 consumequeue 文件的 tagCode 字段中),但依然建议客户端进行消息筛选。这是因为服务端过滤依赖哈希匹配,虽极大提升性能(避免读取 commitlog 文件),但存在极低概率的哈希冲突风险:不同 Tag 可能哈希值相同,导致错误消息被投递。因此,客户端兜底过滤(如检查 Tag 实际值)可作为额外保障,确保消息准确性。

  • RocketMQ 选择存储哈希值而非原始 Tag 是为平衡性能与存储效率,但理论上可通过二次过滤(服务端或客户端层)避免开发者误解。实际应用中,哈希冲突概率极低,但谨慎设计仍推荐客户端筛选,尤其在高可靠性要求的场景。

3. 类过滤模式原理

  • 早期 RocketMQ 除标签过滤(Tag)外,曾支持​​类过滤模式​​(Class Filter)。该模式允许客户端将自定义过滤逻辑(Java Class 文件)上传至与 Broker 同机部署的 Filter Server 执行,实现在服务端完成复杂过滤(如按时间、业务规则),以 CPU 资源换取网络带宽(避免冗余消息传输)。此设计体现了“逻辑靠近数据”的架构思想,但因需独立部署组件、存在安全风险(如恶意代码导致 Broker 死循环或内存溢出)且运维复杂,最终被弃用。

  • 随着 SQL92 属性过滤模式的引入(支持基于消息属性的表达式过滤),类过滤模式因冗余性和风险性被彻底取代。SQL92 模式在保障表达灵活性的同时,通过沙箱机制规避安全风险,成为复杂过滤场景的标准解决方案。这一演进反映了 RocketMQ 在功能丰富性、安全性和易用性间的持续权衡与优化。

4. SQL92 属性过滤的使用及原理

  • 标签过滤模式存在固有局限:单条消息仅支持绑定一个标签,无法满足多标签组合过滤的复杂场景需求。

  • 为解决此问题,RocketMQ 借鉴 ActiveMQ 设计,引入基于 SQL92 标准的属性过滤模式。该模式以消息的 Property 字段为过滤依据,支持完整 SQL92 语法(包括比较运算符 >、<、=、<> 及逻辑运算符 AND/OR/NOT),实现多维度灵活过滤,显著提升复杂业务场景下的消息筛选能力。

①属性过滤的使用

  • 例如, 一条消息定义有 3 个 Property: a、 b、 c, 消费的时候可以写这样的表达式 “ a> 5 AND b = abc ” , 就能做消息过滤了, 而且是服务端的过滤。

  • 里面支持很多功能:

  • 1)数字比较,支持>、>=、<、<=、between、=。

  • 2)字符比较,支持=、<>、IN。

  • 3)IS NULL或者IS NOT NULL。

  • 4)逻辑运算,支持AND、OR、NOT。

②开启属性过滤

  • 启用属性过滤(SQL92)功能需在 Broker 端额外配置以下参数(与标签过滤原理冲突,故默认关闭):

  • enablePropertyFilter=true​​:启用属性过滤能力

  • enableCalcFilterBitMap=true​​:启用过滤位图计算优化

  • enableConsumeQueueExt=true​​:启用消费队列扩展存储

  • 此三项配置共同保障属性过滤的完整功能与性能,缺失任一将导致过滤失效或异常。

③属性过滤的原理

  • RocketMQ 的属性过滤(SQL92)机制面临一个核心性能挑战:由于 consumequeue 文件并未存储消息属性(property)信息,因此基于属性的过滤必须查询 commitlog 文件来获取完整的消息属性。这个过程涉及将消息数据从堆外内存复制到堆内再进行筛选,导致其性能显著低于直接基于 consumequeue 中 tagCode 的标签过滤。

  • 为了优化此性能瓶颈,RocketMQ 引入了​​布隆过滤器(Bloom Filter)​​ 进行预处理,其核心思想是​​以空间换时间​​。

  • -

  • 布隆过滤器原理与优势​

  • 布隆过滤器是一种概率型数据结构,用于高效判断一个元素是否可能存在于一个集合中。其特点如下:

  • 它使用一个较长的二进制位数组(bit array)和多个(k个)独立的哈希函数。

  • 添加元素时,会使用所有 k 个哈希函数计算出多个位位置,并将这些位置置为 1。

  • 检查元素时,同样使用所有哈希函数计算位位置,​​只有当所有位置均为 1 时,它才认为元素“可能存在”于集合中​​;如果任何一个位为 0,则元素“肯定不存在”于集合中(无假阴性)。

  • 其优势在于极高的空间效率和查询效率,但代价是存在一定的​​误判率(假阳性)​​:即它可能判断某个元素存在,但实际上并不存在。

  • -

  • 在RocketMQ中的具体应用与配置​

  • RocketMQ 利用布隆过滤器预先判断一条消息是否​​可能​​被某个消费者组消费。

  • 存储​​:Broker 在生成 consumequeue 文件的同时,会生成一个对应的 ​​consumequeueext​​ 文件。此文件可以理解为存储了与(每个)消息关联的布隆过滤器位图信息。

  • 匹配​​:当消息发送时,Broker 会拿该消息的属性(property)与订阅了该 Topic 的所有消费者组的 SQL 表达式进行匹配。

  • 置位​​:对于匹配成功的消费者组,会将其在布隆过滤器位图中对应的位(由 consumerFilter.json 文件中的 bloomFilterData.bitPos 字段确定)置为 1,标记该消息应投递给此消费者组。

  • ​查询​​:当需要检索消息时,Broker 首先会 consult consumequeueext 文件中的布隆过滤器数据,快速筛选出​​可能​​需要此消息的消费者组,从而极大减少了需要读取完整 commitlog 进行属性匹配的次数。

  • 二次校验​​:由于布隆过滤器存在误判可能,当它判断一个消费者组可能需要某消息时(返回 true),Broker 会基于 commitlog 中的实际属性数据进行​​二次过滤​​,以确保投递的准确性。

  • 关键配置参数​​:布隆过滤器的行为可通过 Broker 配置调整,主要参数包括:

  • expectConsumerNumUseFilter:预期使用过滤的消费者组数量(默认 32)。

  • maxErrorRateOfBloomFilter:布隆过滤器允许的最大误判率(默认 20%)。

  • 在默认配置下,经过计算,所需的位数组长度(m)约为 112 bits,哈希函数次数(k)为 3。

  • -

  • 总结而言​​,RocketMQ 通过引入布隆过滤器预处理机制,将属性过滤的性能提升至接近标签过滤的水平。其工作流程是:​​快速预筛选(布隆过滤器) -> 精准确认(二次查询commitlog)​​。这套机制在保证消息投递准确性的同时,通过牺牲少量存储空间和接受可控的误判率,换取了巨大的性能提升,非常适合处理大规模消息场景。

④属性过滤可替换类过滤模式

  • SQL92 过滤模式的出现,从实践层面已能完全取代传统的类过滤模式。尽管有观点认为上传 class 文件可实现更复杂的功能,而 SQL92 仅为其功能子集,但需回归其设计初衷:该机制专为高效实现服务端消息过滤而生,核心是对标签、属性等元数据进行简单匹配筛选。生产环境中,为消息过滤而让 Broker 执行数据库查询无异于“杀鸡用牛刀”,而 SQL92 以足够的易用性精准满足了匹配需求。正因如此,原本复杂且存在较高风险的 Filter Server 组件已无存在必要,自然退出历史舞台。

十. RocketMQ顺序消息原理

  • 顺序消息是 RocketMQ 的重要特性,专为解决消息间存在逻辑依赖的场景设计。虽然多数情况下消息可独立消费,但在以下两类场景中,消息顺序性直接影响业务正确性:

  • 电商订单状态流转​​:订单生命周期包含创建、支付、发货等关键状态,若消息乱序处理(如先消费发货再处理支付),将导致业务流程错乱与数据不一致。

  • MySQL binlog 订阅与异构数据构建​​:消费 binlog 时需严格遵循数据操作顺序(插入→更新→删除)。若消息乱序(如先处理删除再消费插入),异构存储可能保留本应删除的数据,造成数据冗余与逻辑错误。

1. 消息中间件的时序性挑战

  • 在分布式架构的消息中间件与应用系统中,确保消息严格顺序是一项复杂挑战,需同时满足四个关键条件:

  • 消息发送顺序性​:生产者必须按业务顺序同步发送消息,若采用异步发送则无法保证后发消息后到达,这要求中间件提供可靠的同步发送接口。

  • 消息存储顺序性​:顺序存储要求串行化写入,这与高并发存储设计存在根本冲突,可能显著降低吞吐性能,需在顺序与性能间权衡。

  • 消息投递顺序性​:分布式存储环境下,即使单节点能顺序存储,也需协调多个节点按序向消费者投递,跨节点顺序协调难度较大。

  • 消息消费顺序性​:面临双重挑战:一是需保证顺序投递的消息由同一消费者实例处理,避免分散至不同实例;二是即便单实例接收,多线程消费时仍无法保证任务执行顺序与投递顺序一致,因线程调度具有不确定性。

2. Kafka 顺序消息的解决方案

①Kafka 消息存储和消息投递的顺序性

  • Kafka 的整体消息模型与 RocketMQ 类似,均采用主题(Topic)作为核心的消息组织和管理单元。为提升消息处理的并发能力,每个主题下会划分多个分区(Partition),其功能类似于 RocketMQ 中的队列(Queue),用于实现消息的并行消费。

  • 在存储机制上,Kafka 与 RocketMQ 存在显著差异:Kafka 未采用集中式、全局有序的 CommitLog 存储方案,而是让每个分区独立地、按顺序(FIFO)存储其接收到的消息。因此,Kafka 的整体消息结构可视为由多个有序的分区队列构成。

  • 由于每个分区本身是一个 FIFO 队列,Kafka 在​​单个分区维度​​天然保证了消息存储与投递的顺序性。需注意的是,该顺序性仅限于同一分区内部。若有多个消息需严格按序消费,则必须确保它们被发送至同一分区,否则无法保证全局顺序。

②Kafka 消息生产的顺序性

  • Kafka 通过分区(Partition)机制实现消息的顺序性保障。其核心设计在于:同一分区内的消息严格按 FIFO(先进先出)顺序存储和投递。若需保证特定业务场景(如同一订单)的消息顺序,开发者只需利用 Kafka 生产者提供的​​哈希路由策略​​,将同一业务键(如订单号 order1)的所有消息路由至同一分区即可。

  • 例如,订单 order1 和 order2 分别包含创建、支付、发货三条消息。通过哈希策略,order1 固定投递至分区 0,order2 固定投递至分区 1。只要生产者按业务顺序发送消息,每个订单的所有操作在各自分区内必然保持顺序存储与投递,天然满足业务对顺序性的要求。此设计以分区粒度平衡了顺序性与并发性,无需额外复杂逻辑。

③Kafka 消息消费的顺序性

  • Kafka 通过其分区与消费者线程模型天然保障了消息的顺序性。其核心机制在于:一个分区在同一时刻只会被分配给一个消费者实例,且该实例通常以单线程模式运行,通过主动拉取的方式消费消息。这意味着,对于单个分区而言,消息的拉取和处理过程是串行化的,从而确保了分区内消息的消费顺序与存储顺序完全一致。

3. RocketMQ 顺序消息的解决方案

  • Apache RocketMQ 的开源版本在其源代码中并未明确定义“顺序消息”这一概念,但该平台确实提供了一套完整的顺序消息实现解决方案。鉴于“顺序消息”一词在业界具有高度的通用性和概括性,并且在主流搜索引擎中搜索此关键词,排名靠前的文章大多与 RocketMQ 相关,因此在技术讨论中继续沿用这一术语是恰当且便于理解的。

  • 值得注意的是,RocketMQ 所提供的顺序消息解决方案,其设计思想是在借鉴并优化了 Kafka 的原有方案之上形成的。要深入理解其运作机制,一个有效的方式便是将其与 Kafka 的方案进行对比分析。

①RocketMQ 的消息存储顺序性实现

  • RocketMQ 中对应于 Kafka 分区的概念称为队列(queue),它是一个逻辑队列,具备 FIFO 特性,但并非实际存储消息的实体。所有消息均统一存储于共用的 commitlog 文件中,因此即便在逻辑队列中相邻的消息,其在 commitlog 中的物理存储位置可能并不相邻。

  • 然而,在顺序消息的场景下,这一存储层面的差异并不产生影响。从需要保证顺序的消息批次角度来看,RocketMQ 依然以队列为维度进行消息投递,能够确保先存储的消息优先被投递,从而满足消息顺序性的要求。

②RocketMQ 的消息生产顺序性实现

RocketMQ 的生产者负载均衡策略默认仅提供轮询方式,但通过 MessageQueueSelector接口支持用户自定义队列选择逻辑。该接口定义如下:
public interface MessageQueueSelector {
    MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg);
}

生产者发送方法提供了重载版本,支持传入选择器实例:
public SendResult send(Message msg, MessageQueueSelector selector, Object arg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException;

在顺序消息场景中,选择器的核心目标是将需要顺序处理的消息路由至同一队列。常见实现方式是基于业务键(如订单号、用户ID或数据库主键)进行取模、哈希或一致性哈希计算。以下是一个基于取模策略的示例实现:
选择器的参数 arg可由用户根据业务需求灵活决定,确保相关消息通过相同计算规则散列到目标队列,从而保障消息顺序性。
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
    Integer orderId = (Integer) arg;
    int index = orderId % mqs.size();
    return mqs.get(index);
}

③RocketMQ 的消息消费顺序性实现

  • RocketMQ 与 Kafka 的消费者模型在核心机制上并无本质区别,均遵循“一个队列在同一时刻只能分配给一个消费者实例”的原则。当消息被顺序投递到某个消费者实例后,消息消费的顺序性保障即转换为该单实例内部的顺序处理问题。


  • Kafka 消息顺序性的弊端

  • Kafka 提供了一套顺序消息的实现方案,但其核心缺陷在于系统的整体消费并行度受限于分区数量与消费者实例数量中的较小值(最大消费并行度 = Min(分区数, 消费者数))。例如,一个拥有 2 个分区和 2 个消费者实例的 Topic,其并行度上限仅为 2。这意味着,即使单机性能充足,也无法通过单纯增加消费线程来提升处理能力。

  • 若需将并行度提升至 100,则必须同时创建 100 个分区并部署 100 个消费者实例,操作复杂且资源消耗大。开发者也可尝试在单个消费者进程内创建多线程来减少实例数量,但这将显著增加开发复杂度与资源管理难度,同时难以可靠地保证消息至少被消费一次。


  • RocketMQ 相比 Kafka 消费顺序性的优化

  • RocketMQ 在消费并行度上实现了显著优化。其并行消费模型采用明确的分工架构:一个线程专职负责消息拉取,另有一个独立线程池处理已拉取的消息。这种设计使得整体消费并行度由消费者实例数量与消费线程池大小的乘积决定,大幅提升了消息处理能力。

  • 在顺序消费场景下,RocketMQ 通过特殊线程模型保障同一队列内的消息串行消费,同时允许不同队列并行处理。具体表现为:当多个队列分配给同一消费者实例时,各队列的消息消费互不干扰。其顺序消息并行度计算公式为各实例的线程池大小与分配队列数的最小值之和(∑Min(单实例线程池大小, 单实例分配队列数))。例如:20个队列由2个消费者实例处理(各消费10个队列),若每个实例线程池大小为10,则总并行度可达20。这一设计相比Kafka实现了显著优化——同等并行度需求下,Kafka需启动20个消费者实例,而RocketMQ仅需2个实例即可满足。


  • RocekeMQ 的线程池在顺序消费的实现

  • 启动方式与回调接口​:启动 RocketMQ 的顺序消费功能,需在编写消息消费回调逻辑时使用专用的 MessageListenerOrderly接口,而非普通的并发监听器。这是实现顺序消费的基础前提。

  • 线程模型与队列加锁机制​:线程池中实现顺序消费。RocketMQ 通过独特的队列加锁机制保障消息顺序。其底层仍使用 JDK 标准线程池,但在此基础上,对每个队列(Queue)施加锁进行同步控制。当多个消息(如 msg1, msg2, msg3 属于 queue0;msg4, msg5, msg6 属于 queue2)被并发拉取到同一消费者实例并提交至线程池后,线程在执行消费回调前,必须首先成功获取其目标队列的锁(注意, 这里的锁是针对本地内存 queue 的 对象进行加锁

  • 同一队列串行消费​​:对于属于同一个队列的消息,只有成功获得该队列锁的线程才能执行消费,其余线程则等待锁释放。这确保了单个队列内部消息的严格串行处理。

  • 不同队列并行消费​​:不同队列的锁相互独立,因此不同队列的消息消费可以并行进行,从而提升了系统的整体吞吐量。

  • 基于队列的任务提交与顺序保证​:RocketMQ 并非按单条消息提交任务,而是以队列为单位。抢到锁的线程会从该队列中按顺序提取消息(默认1条,可配置批量大小)进行消费。此设计确保了即便线程调度顺序不定,消费顺序也始终与消息在队列中的存储顺序一致。

  • 全局顺序性与进程级锁​:由于 RocketMQ 的负载均衡由客户端计算,在特定时刻可能出现同一队列被分配给多个消费者实例的情况,从而导致消息被重复投递给不同实例,从全局视角看破坏了顺序性。

  • 为解决此问题,RocketMQ 在顺序消费场景下引入了 ​​Broker 端的进程级锁​​:

  • 消费者实例必须向 Broker 成功申请到某个队列的锁,才有资格拉取和消费该队列的消息。

  • 此进程锁设有超时时间(默认60秒),可通过系统属性 rocketmq.broker.rebalance.lockMaxLiveTime调整,以防止因消费者实例异常崩溃而导致锁被永久占用。

  • 例如两个消费者 c1、 c2 实例都在负载均衡的时候拿到了 queue0。 假设 queue0 里面有 msg1、 msg2、 msg3 需要顺序消费, c1 拉取消息和消费消息的时间更早, 那么从全局的角度来 看, 消息的消费顺序就是 msg1、 msg2、 msg3、 msg1、 msg2、 msg3。从每个消费者的角度看, 消息的确是顺序消费的, 但是从局部去看就会发现中间一段时间, 出现了 msg3 先消费才到 msg1 的情况。

  • 这套机制共同作用,既在单个实例内保证了队列级别的消费顺序,又通过分布式锁在多个实例间协调,致力于实现更高层面的全局顺序性。

  • -

  • 例如 queue0 的 msg1、 msg2、 msg3 需要顺序消费, queue2 的 msg4、 msg5、 msg6 也需要顺 序消费。 这时候 6 条消息都被并发地拉取到了同一个客户端实例, 然后这 6 条消息被提交到 了线程池中准备执行。 在执行指定的回调之前, RocketMQ 的客户端 API 会做很多的事, 其中就包括对 queue0、 queue2 进行上锁的操作。

  • 假设现在线程池有 6 条线程都被激活去消费 6 条消息。 t1 拿到的是 msg1, t2 拿到的是 msg2, 以此类推。

  • 刚开始 t1 消费 msg1 之前, 会对 queue0 加锁 (注意, 这里的锁是针对本地内存 queue0 的 对象进行加锁) , 由于这时候没有别的线程拿到这个队列的锁, 所以 t1 可以继续进行后面的 逻辑。

  • 而 t2 和 t3 进行消费之前, 也会尝试对 queue0 进行加锁, 这时候它们都拿不到锁, 就会在 锁外等待。 当 t4 进行消费 msg4 的时候, 它会对 queue1 进行加锁, 由于加锁成功, t4 也会进 行消息的消费。 同样, t5 和 t6 在对 msg5 和 msg6 消费之前, 由于拿不到锁, 所以也需要原地 等待锁的释放。

  • 这就是为什么不同队列之间的消息消费可以并行的同时, 同一个队列的消费却可以做到 串行的逻辑。

  • 还是同样的例子, 当消费者准备进行消费 msg1、 msg2、 msg3 的时候, 会将这三个任务提 交到线程池, 这时候 t1、 t2、 t3 都会尝试去抢锁, 最后只有 t1 抢到。 t1 便从队列 queue0 中拿 出 1 条消息, 也就是 msg1 (拉取的数量默认是一条, 可配置成获取一批) , 然后消费, 消费结 束后释放锁。 后面即便抢到锁的是 t3, 它再从队列 queue0 拉取一条消息, 拉取到的也是 msg2, 所以能保证最后消费的顺序和队列的顺序是一致的。

/**
 * 注册顺序消息监听
 * 使用 MessageListenerOrderly 接口实现顺序消息消费
 * 
 * 示例代码结构:
 * consumer.registerMessageListener(new MessageListenerOrderly() {
 *     @Override
 *     public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
 *         for (MessageExt msg : msgs) {
 *             // 消息逻辑省略 - 在此处实现具体的顺序消息处理逻辑
 *         }
 *         return ConsumeOrderlyStatus.SUCCESS; // 返回消费成功状态
 *     }
 * });
 */
***********
/**
 * 顺序消费场景下,进程级的锁超时时间,默认是60s
 * 通过系统属性 rocketmq.broker.rebalance.lockMaxLiveTime 进行配置
 */
private final static long REBALANCE_LOCK_MAX_LIVE_TIME = Long.parseLong(
    System.getProperty("rocketmq.broker.rebalance.lockMaxLiveTime", "60000"));

4. 分区顺序VS全局顺序

  • RocketMQ 的顺序消息分为两种类型:

  • 分区顺序消息​​:针对指定主题,所有消息按 Sharding Key(如用户ID、订单ID)进行分区,确保同一分区内的消息严格遵循 FIFO(先进先出)顺序发布和消费。此模式适用于绝大多数互联网场景,如用户注册验证码发送(以用户ID为Key)或电商订单全链路(创建、支付、退款、物流以订单ID为Key),在保障业务顺序性的同时兼顾并发性能。

  • 全局顺序消息​​:要求主题下所有消息严格按 FIFO 顺序处理,是分区顺序的特例(通过单队列实现)。仅适用于极少数强顺序需求场景,如公平购票(先到先得)或消息乱序零容忍系统(宁可不可用也不乱序)。

  • 分区顺序消息通过分治策略平衡顺序性与吞吐量,是推荐的主流方案;全局顺序因严格限制易成性能瓶颈,需谨慎评估。实际应用中应优先通过合理设计 Sharding Key 满足业务顺序需求,避免过度依赖全局顺序。

5. RocketMQ 顺序消息的缺陷

①分区热点问题

  • 消息队列系统中存在一个关键的性能约束机制:单个分区(或队列)在同一时间点只能被分配给一个消费者实例中的单条线程进行消费。这种设计导致当出现消息积压或消息在不同分区间分配不均匀的情况时,开发人员无法通过常规的横向扩展手段——例如增加消费者实例数量或扩大单个消费者的线程池规模——来直接提升积压消息的整体消费速率。该机制清晰地揭示了某些分布式系统在特定场景下可能面临的扩展性瓶颈。

②扩容与顺序性的冲突

  • RocketMQ 顺序消息的最大并行度受限于队列总数,其具体数值为各消费者实例的线程池大小与所分配队列数中较小者的总和。若需提升该并行度,必须增加主题的队列数量。

  • 需要注意的是,在系统运行过程中进行队列扩容,可能会改变消息生产的路由策略。例如,扩容前属于同一批顺序消息的消息A被路由至 queue1,而扩容后该批消息中的消息B可能被路由至新的 queue2,这种路由变更将破坏消息的顺序性。

  • 因此,对于明确需要支持顺序消费的主题,建议在创建时就预留充足的队列数量,避免运行时扩容。针对此问题,社区正在探索通过“逻辑队列”(一个逻辑分区对应多个物理分区)的方案来解决,但该功能在 4.9.4 版本中尚未发布。

③顺序性与可用性的冲突

  • 严格而言,RocketMQ 无法在所有异常场景下保障严格的消费顺序,因其顺序性与系统可用性存在本质冲突。例如,若需保证同一订单(如订单号 123456789)下消息 msg1msg2msg3的顺序性,当 msg1msg2成功发送至 Broker-0-queue0后,若 Broker-0突发故障,则 msg3的路由面临两难:

  • 若按原路由规则发往已不可用的 Broker-0-queue0,消息无法投递,牺牲可用性;

  • 若改发至其他可用队列(如 Broker-0-queue10),则顺序性被破坏。

  • 此冲突揭示了分区顺序消息在极端场景下的局限性。若业务完全无法容忍乱序,需启用全局顺序消息(通过单队列实现),但此举会将主题退化为单点,显著牺牲可用性与扩展性。因此,实际应用中需根据业务需求权衡顺序性与可用性,通常推荐通过合理设计 Sharding Key 和容错机制降低乱序影响。

十一. RocketMQ事务性消息原理

1. 分布式场景下的ACID

①认识ACID

  • 在传统的单体架构中,开发人员通常不会遇到分布式事务的挑战,这得益于关系型数据库对事务的原生支持。例如,在经典的Bob向Smith转账场景中,关系型数据库通过满足ACID四大特性来确保事务的正确执行和数据的一致性。

  • ACID是指原子性(Atomicity)、一致性(Consistency)、隔离性(Isolation)和持久性(Durability)。原子性要求事务是一个不可分割的操作单元,其中的操作要么全部成功,要么全部失败,MySQL通过undo log机制保证这一点。一致性确保事务执行前后数据库的完整性约束不被破坏,而这需要原子性、隔离性和持久性的共同保障。隔离性指多个并发事务之间相互隔离,互不干扰,MySQL通过MVCC和多版本并发控制实现。持久性则保证一旦事务提交,其所做的修改就会永久保存在数据库中,即使系统发生故障也不会丢失,MySQL利用redo log来实现持久性。

  • 这些特性共同构成了关系型数据库可靠事务处理的基础,使其能够有效应对诸如转账等需要严格数据一致性的业务场景。

②分布式场景下的事务挑战

  • 传统单体架构依赖单一关系型数据库实例处理事务,可满足基础需求。然而,互联网场景面临高并发、海量用户的挑战,单一实例难以支撑业务发展。为应对压力,架构师普遍采用“分拆”策略,遵循“大而化小,小而化了”的思路进行架构升级,通过分布式方案提升系统扩展性与承载力。


  • 服务拆分对于事务的挑战

  • 在分布式架构演进中,服务拆分是首要步骤,但会引入新的复杂性。以电商系统为例:订单处理与库存扣减本可通过关系型数据库的 ACID 特性(在同一事务中完成订单插入与库存更新)保证一致性。然而,当这两类逻辑被拆分至独立服务(如订单模块、库存模块)后,数据库事务的边界被打破。

  • 拆分后,库存与订单模块各自建立独立的数据库连接操作资源,不同连接无法共享事务上下文,导致传统数据库的 ACID 特性难以直接应用。这一变化使得跨服务的数据一致性保障成为分布式架构的核心挑战。


  • 业务拆分对于事务的挑战

  • 在微服务架构中,"一服务一库"是常见的设计原则,即每个独立服务配备专属数据库。服务拆分后,数据库随之分离(如图11-2所示),形成分布式数据存储模式。

  • 这种拆分导致传统关系型数据库事务机制失效。例如订单表与库存表分属不同服务及数据库后,无法通过单一事务保证跨表操作(如创建订单并扣减库存)的原子性。这可能引发超卖(库存扣减失败)或数据不一致(订单创建失败)等问题,需依赖分布式事务、消息队列或补偿事务等技术保障最终一致性,但会增加系统复杂性与性能开销。


  • 数据库拆分对于事务的挑战

  • 即便所有订单逻辑集中于订单模块,在高并发与大数据的双重压力下仍面临严峻挑战。以拼单业务为例:两用户拼单成功后,系统需为其同步创建订单记录。在单数据库架构下,可通过本地事务轻松实现两行记录的原子插入。

  • 然而,面对亿级订单量的系统,单一数据库难以满足存储与性能需求,必须采用分库分表方案。假设将订单表拆分为 2 个库(各含 100 张表,共 200 张表),并以用户 ID 作为分片键,则可能出现拼单用户 ID 分属不同库表的情况(如 userId1 路由至 0 库 0 表,userId2 路由至 1 库 99 表)。

  • 分库分表后,数据库相互独立,订单模块无法再利用关系型数据库的本地事务机制保障跨库操作的原子性(如图 11-3 所示)。此问题揭示了分布式环境下数据一致性的核心矛盾:存储扩展性与事务完整性难以兼得,需引入分布式事务或最终一致性方案弥补传统事务机制的失效。

2. CAP VS BASE

①认识 CAP 三角

  • CAP定理是分布式计算领域的公认定理,由加州大学伯克利分校Eric Brewer教授于1998年提出,1999年正式发表,后由麻省理工学院学者从理论上证明。该定理指出,在分布式系统中,一致性(Consistency)、可用性(Availability)和分区容错性(Partition Tolerance)三者不可兼得,系统设计时最多只能同时满足其中两个特性(CA、CP或AP)。

  • 一致性(C)​​:指分布式系统中所有数据副本在同一时刻对客户端读取呈现相同数据。

  • 可用性(A)​​:指系统部分节点故障后,集群仍能响应客户端读写请求,且在有限时间内返回正确结果。

  • 分区容错性(P)​​:指系统遇到网络分区故障时(非全网故障)仍能正常提供服务。

  • 传统单机数据库优先保证CA(放弃P),而分布式系统必须保障分区容错性(P),因此需在CP(强一致性)与AP(高可用性)之间抉择。这一权衡体现了分布式场景下一致性问题的本质矛盾。

②认识BASE理论

  • 根据CAP定理,在分布式系统中完整实现类似ACID的事务特性需放弃可用性(即采用CP模型)。然而,互联网应用普遍将可用性置于优先地位,因长时间无法响应的产品难以满足用户需求。

  • eBay架构师基于CAP定理提出BASE作为ACID的替代方案,旨在协调可用性与一致性。BASE包含三个核心原则:

  • 基本可用(BA)​​:系统故障时允许部分功能降级,确保核心服务可用;

  • ​软状态(S)​​:接受系统存在中间状态(如“转账中”),不影响整体可用性;

  • 最终一致性(E)​​:不要求实时强一致,但承诺数据最终达到一致状态。

  • BASE理论源于大型互联网分布式实践,是对CAP中一致性与可用性权衡的结果,其核心是在强一致性无法保障时通过业务适配实现最终一致性。从ACID到BASE的演进被戏称为“酸碱理论”(Base与Acid的英文含义对应碱与酸)。

3. 分布式一致性方案

  • 接下来将系统介绍包括 RocketMQ 事务消息在内的多种分布式事务解决方案。这些方案本质上都是对 BASE 理论的具体工程实践,旨在平衡分布式环境下的数据一致性与系统可用性。

  • 不同方案在一致性和可用性的实现程度上存在差异,但遵循一个基本规律:方案的强一致性保障程度越高,往往需要以牺牲部分可用性为代价。这种权衡关系是分布式系统设计的核心挑战,也是选择具体方案时需要重点考量的因素。

①一致性概念解析

  • 在深入探讨 RocketMQ 的事务消息实现原理之前,有必要先了解业界为解决“分布式事务”这一经典难题所提出的各类现有方案。为便于理解和对比,本书将这些解决方案归纳为三大类型:“强一致性方案”、“弱一致性方案”以及“最终一致性方案”。


  • 强一致性方案

  • “强一致”一词在不同技术语境中存在多种解释,其含义具有多样性。常见的理解至少包括四种:1)CAP理论定义的完全一致性(任何时刻任何情况一致);2)副本复制场景中同步复制与异步复制的区别(同步即强一致);3)模糊的“相对强一致”概念(大部分情况一致即视为强一致);4)排除极小概率的网络分区等突发故障,确保事务完整执行且数据始终满足业务约束。

  • 第1种定义(完全一致)在当前技术方案中均无法实现,因系统难以完全牺牲可用性(A)来保障一致性(C)。第2种定义在特定场景成立,但本书聚焦分布式事务,不以其为区分标准。第3种定义缺乏科学性,常见于方案对比时的非严谨表述。本书采纳第4种定义作为分类依据:假设事务执行过程顺利(无突发网络分区),数据保持一致,且在可预知故障(如超时、重启)下仍能保证事务一致,则视为强一致。此类方案均体现“舍A求C”的设计思路。


  • 弱一致性方案

  • 在分布式系统中,当某个写操作成功后,用户读取数据时可能短暂处于不一致状态,这段时间被称为“不一致性窗口”。其典型表现包括两类场景:

  • 数据同步延迟​​:如在主备数据库同步过程中,从备用库读取到未更新的旧数据;

  • 多数据源状态分歧​​:如电商场景中库存已扣减但订单尚未生成,导致数据暂时矛盾。

  • 需注意弱一致性并非如其名称般“脆弱”。许多互联网架构采用的方案虽严格符合弱一致性定义,但其实际效果往往可接受,后续将通过实践案例具体说明其合理性。


  • 最终一致性方案

  • 最终一致性是分布式系统中的重要概念,对应 BASE 理论中的 "E"(Eventual Consistency)。它本质上是弱一致性的一种特殊形式,其核心在于明确承认数据更新后可能存在一个 ​​"不一致性窗口"​​ —— 在此期间,系统的不同副本可能呈现短暂的数据不一致状态。

  • 与一般的弱一致性相比,最终一致性的关键特征在于其​​承诺性​​:它保证该不一致性窗口必然会在有限的时间内关闭,所有数据副本最终将收敛至完全一致的状态。这一承诺使其成为许多互联网分布式系统实现高可用性与数据可靠性平衡的实践基础。

②强一致性方案

  • 两阶段提交

  • 两阶段提交(Two-phase Commit)是分布式系统中确保事务一致性的核心协议,通过协调者统一调度参与者节点,分为准备与提交两个阶段:

  • 准备阶段​​:协调者询问各参与者能否提交事务,参与者执行操作(记录undo/redo日志但不提交)并反馈结果(yes/no);

  • 提交阶段​​:协调者根据反馈(准备阶段)决定全局提交回滚,参与者执行指令后释放资源并反馈ack,协调者收齐ack后完成事务。

  • 在正常提交的情况下:协调者向所有参与者发出正式提交 (commit) 事务的指令。参与者执行 commit, 并释放整个事务期间占用的资源。各参与者向协调者反馈ack (应答) 完成的消息。协调者收到所有参与者反馈的ack 消息后, 即完成事务提交。

  • 而在异常的情况下:协调者向所有参与者发出回滚 (rollback) 指令。参与者使用阶段1 中的undo 信息执行回滚操作, 并释放整个事务期间占用的资源。各参与者向协调者反馈ack 完成的消息。协调者收到所有参与者反馈的ack 消息后, 即完成事务中断。

  • -

  • 该协议虽逻辑清晰,但存在严重可用性问题:

  • ​同步阻塞​​:提交阶段所有参与者阻塞等待指令,第三方访问相应资源时亦被阻塞;

  • 单点故障​​:协调者故障将导致参与者永久锁定;

  • 网络分区风险​​:极端情况下部分节点收不到指令可能引发数据不一致(但概率极低)。在commit phase中如果 发生网络分区问题, 导致事务参与者一部分收到了commit指令, 而另一部分事务参与者却没 收到, 那么这种情况下节点之间就会出现数据不一致。 这也是为什么前文说绝对的强一致方 案理论上是不存在的。

  • 因其强一致性设计(牺牲可用性换一致性)及实际缺陷,该协议在互联网高并发场景中极少采用。


  • 三阶段提交

  • 上文提到二阶段提交存在单点故障、 同步阻塞和数据一致性的问题, 进而 “三阶段提交” (3 Phase Commit, 3PC) 协议出现了, 三阶段提交能有效地解决这些问题。 三阶段提交协议把 过程分为三个阶段: CanCommit、 PreCommit、 DoCommit 。

  • 第一阶段: CanCommit 阶段。 三阶段提交的 CanCommit 阶段其实和两阶段提交的准备 阶段很像, 协调者向参与者发送一个commit请求, 如果参与者可以提交, 就返回yes响应, 否则返回no响应。

  • -

  • 第二阶段: PreCommit 阶段。 协调者在PreCommit 阶段会根据参与者在 CanCommit 阶段 的结果采取相应操作, 主要有以下两种。

  • 情况1 (正常场景): 假如所有参与者的反馈都是yes响应, 那么协调者就会向所有参与 者发出PreCommit 请求执行事务的预执行, 参与者会将 undo 和 redo 信息记入事务日志中 (但 不提交事务)。

  • 情况2 (异常场景): 假如有任何一个参与者向协调者发送了no响应, 或者协调者等待参 与者响应超时, 协调者进行事务中断。 引入这样的超时机制后, 就解决了永久阻塞问题。

  • -

  • 第三阶段: DoCommit阶段。 该阶段进行真正的事务提交, 也就是在协调者发出了pre Commit 请求之后的阶段。 这时候也可以分为以下两种情况。

  • 情况1 (正常提交): 即所有参与者均反馈ack, 执行真正的事务提交。首先协调者向各个参与者发起事务DoCommit 请求。 参与者收到DoCommit 请求后, 会正 式执行事务提交, 并释放整个事务期间占用的资源, 然后向协调者反馈ack。 最后协调者收到所有参与者ack后, 即完成事务提交。

  • 情况2 (中断事务): 即PreCommit阶段任何一个参与者反馈 no, 或者协调者等待所有参 与者的响应超时, 即中断事务。首先参与者使用CanCommit中的 undo 信息执行回滚操作, 并释放整个事务期间占用的资 源。 然后各参与者向协调者反馈ack完成的消息。 最后协调者收到所有参与者反馈的ack消息 后, 即完成事务中断。

  • 注: 在DoCommit阶段, 无论协调者出现问题, 还是协调者与参与者出现网络问题, 都可能导致参与 者无法接收到DoCommit或abort 请求。 那么参与者都会在等待超时后执行事务提交。 这个超时机制也是 为了解决永久阻塞的问题。

  • -

  • 3PC 通过超时机制提升系统可用性,减少协调者单点故障的影响。然而,这种设计也带来更高的数据不一致风险(如协调者发送中断指令但参与者因超时已提交)。根据 CAP 理论,这实质是以​​一致性妥协换取可用性提升​​。加之协议实现复杂且性能难以满足高并发场景,实践中较少采用。


  • XA 协议

  • 上文提到二提交阶段协议, 在传统方案里都是面向数据库层面实现的, 如Oracle、 MySQL 都支持二提交阶段协议。为统一标准并降低行业对接成本,国际开放标准组织 Open Group 于 1994 年定义了分布式事务处理参考模型(DTP),该模型包含三个核心角色:

  • AP(应用程序)​​:事务发起者,定义具体事务操作(如数据库更新);

  • RM(资源管理器)​​:事务参与者,管理共享资源(如数据库、消息中间件)并提供事务提交/回滚能力;

  • TM(事务管理器)​​:全局协调者,与各 RM 通信以协调事务提交或回滚。

  • -

  • XA 协议是 DTP 模型的核心实现规范,目前主要被关系型数据库(如 MySQL、PostgreSQL)和消息中间件(如 ActiveMQ)支持。其本质是基于资源层的两阶段提交(2PC)方案,虽能保障强一致性,但存在显著性能瓶颈:全程持有资源锁延长事务执行时间,增加死锁风险。实测表明,基于 XA 的 MySQL 集群操作延迟可达单机的 10 倍,故难以适用于高并发互联网场景,目前多用于传统单体应用中的跨库一致性保障。

③弱一致性方案

  • 弱一致性方案常因名称中的“弱”字而被轻视,但实际在绝大多数场景下其表现并非不堪。它既非完全不一致,也非大概率不一致,而是相较于强一致性方案,在超时、网络分区或机器重启等异常场景下更容易出现数据不一致;同时对比最终一致性,它又缺乏自我恢复的保证。然而在互联网领域,当前大部分正在采用或计划采用的技术方案均属于弱一致性范畴。本书列举的三种最常见方案即被归为此类,原因在于它们无法自动恢复一致性,但实际其一致性表现并不弱,且完全适用于互联网绝大部分应用场景。


  • 基于业务妥协的状态补偿

  • 在分布式业务场景中,应根据数据重要性实施分级管理。当强一致性与系统性能冲突时,可对非核心数据适当放宽一致性要求,以保障整体性能。这种策略通常不会对主营业务造成重大影响。

  • 以超高并发电商秒杀为例,创建订单与库存扣减作为关键操作:

  • 性能优先设计​​:采用先扣库存(快速操作)、后异步处理订单(相对缓慢)的执行顺序,有效规避超卖风险;

  • 补偿机制保障​​:通过事后清理脚本定期扫描库存流水,对超时未支付订单自动回滚库存(类似"抢购未付款自动释放库存"机制);

  • 故障容忍设计​​:接受极低概率的补偿失败(可通过人工核查修复),以较低实现成本换取系统高性能与高可用性。


  • 重试+告警+人工修复

  • 在应对分布式系统数据一致性挑战时,当自动重试和回滚机制均告失败后(例如出现“少卖”等不一致场景),采用​​人工修复​​作为一种低成本补救方案是可行且实用的策略。该方案的核心思想并非被动等待用户投诉,而是要求系统具备​​主动的不一致性检测能力​​,能通过监控日志、数据库记录等手段及时发现问题并触发告警。

  • 实施时,研发人员可依据业务流程记录的操作日志和上下文信息定位问题,并进行数据修复。此方案实质上接受了分布式场景下难以完全避免的短期不一致性,但通过事后的人工干预确保数据的最终正确性。其优势在于实现成本低,适用于大多数对强一致性要求不高的互联网业务场景,是对技术方案难以完美解决问题的一种务实补充。

  • 订单和库存的例子进行说明, 正常的业务逻辑是先扣库存, 然后创建订 单, 如果订单创建失败, 这时候程序可以重试。 但重试也无法保证100%成功。 如果重试还是 失败的话, 正常情况下应该需要回滚库存。 回滚的操作也是可能失败的, 回滚失败后也可以持续重试回滚。 但回滚重试依旧可能会失败。 这时候其实就是一个不一致的场景 (少卖)。 此 时系统应该触发告警, 然后通过人工介入的方式, 根据日志、 数据库记录、 监控视图等手段 辅助研发人员进行人工的数据修复, 以解决其中的不一致。


  • 异步队列处理

  • 为提升系统性能,库存扣减常采用 Redis 计数后异步同步至数据库的方案。异步实现分两种路径:若通过内存队列定时同步,服务重启可能导致任务丢失,属弱一致性方案;若依赖消息中间件(利用其持久化与至少一次投递机制),虽更可靠但消息成功发送本身存在挑战(如网络故障、Broker不可用),故本质上仍属弱一致。

  • 此类异步处理在一致性要求不极致的场景中广泛应用。尽管存在弱一致性风险,但结合重试机制、实时告警与人工修复等辅助手段,可显著提升数据可靠性,成为平衡性能与一致性的有效实践方案。


  • 对账

  • 对账实际上是告警+人工修复的一种落地实践, 也是业界十分常见的应对分布式事务挑战 的方案。架构师在设计方案时, 对于可能导致不一致的数据点, 根据对账的思路来设计一个 自动对账流程以发现不一致的数据, 从而简化整个系统的设计。

  • 对账是一种典型的事后处理机制,其核心在于​​只关注事务操作的最终结果​​,而非执行过程。在分布式系统中,研发人员通过定时扫描订单状态、对比本方与第三方系统(如支付流水)的数据一致性,主动发现数据不平问题并触发告警,进而通过人工干预实现数据修复。此方案以结果为导向,有效简化了系统设计的复杂性,成为应对分布式事务挑战的常见实践。

  • 然而,对账机制存在明显局限:其一,它通常仅能发现问题而无法自动解决问题;其二,对账程序自身的稳定性受技术挑战影响(如异常场景下的执行可靠性存疑)。正因这些不确定性,本书将对账归类为​​弱一致性解决方案​​,其可靠性需结合人工修复等辅助手段共同保障。

④最终一致性方案

  • 最终一致性是弱一致性的一个特例,其核心特征在于系统能够承诺:任何暂时的不一致状态都将在有限时间内自动修复并达到一致。除 RocketMQ 事务消息外,业界还存在多种成熟的最终一致性实现方案。了解这些方案的特点,有助于更深入地理解 RocketMQ 事务消息的设计理念与适用场景。下文将介绍几种典型场景下的最终一致性解决方案。


  • TCC

  • TCC 是基于 BASE 理论的类二阶段提交方案, 但是前文说过, 二阶段提交方案实际上在性能上 有极大的短板, 而 TCC 则根据业务的特性对流程 进行了优化。 以下还是以库存和订单的例子来讲 解, 如图 11-9 所示。

  • 图 11-9 的流程中, 用户在下订单调用订单服务后, 还需要调用库存服务扣库存, 最后通过积 分服务给用户增加积分。

  • 从事务的角度来看, 整个过程应该具有原子性, 即所有步骤要么都成功, 要么都失败。 而这里订单很可能成功了, 但是库存可能扣除失败, 这时候对于用户来说就会导致超卖问题。 前面内容提到可以用二阶段提交去解决类似的问题, 但是二阶段提交方案会使交易在事务成 功或者失败回滚前, 其他用户的操作都会阻塞, 极大地影响系统稳定性和用户体验。而TCC的流程虽然和二阶段提交类似, 但是却能极大地提高性能。

  • 在具体实现上, TCC其实是一种业务侵入性较强的事务方案, 它要求业务处理过程必须 拆分为 “预留业务资源” 和 “确认 / 释放消费资源” 两个子过程。 而TCC实际上就是这两个 过程的三个阶段的缩写: Try、 Confirm、 Cancel。

  • Try: 尝试执行阶段, 完成所有业务可执行性的检查 (保障一致性), 并且预留好事务 需要用到的所有业务资源 (保障隔离性)。 在库存这个场景, 实际上就是冻结库存 (事实上没 有扣)。

  • Confirm: 确认执行阶段, 不进行任何业务检查, 直接使用 Try 阶段准备的资源来完成 业务处理。 注意, Confirm 阶段可能会重复执行, 因此需要满足幂等性。 在库存这个场景就是 确认扣除掉Try阶段的库存。

  • Cancel: 取消执行阶段, 释放 Try 阶段预留的业务资源。 注意, Cancel 阶段也可能会重复执行, 因此也需要满足幂等性。 在库存的场景就是把预扣库存的操作进行回滚。

  • -

  • TCC 协议相比二阶段提交/XA最明显的区别如下:

  • 性能提升: 由具体业务来操作资源的锁定, 可以实现更小的锁粒度, 不会锁定整个 资源。

  • 数据最终一致性: 基于 Confirm 和 Cancel 的幂等性, 保证事务最终完成确认或者取消, 保证数据的最终一致性。

  • 可靠性: 解决了二阶段提交/XA里的协调者单点故障问题, 因为事务由主业务方发起并 控制, 使得事务的管理器也可变为多点。

  • 但TCC最大的缺点是: TCC 的 Try、 Confirm 和 Cancel 操作功能要按具体业务来实现, 整体 改造、 开发成本高, 实现难度较大。 特别是老系统的改造, 因为深入到业务流程、 表设计、 接口 设计等阶段, 使得老系统用TCC改造的阻力极大。


  • 查询+补偿

  • 补偿是实现最终一致性的常见手段,以订单系统依赖支付系统为例,通常采用查询加自动查询补偿方式。这要求被动方提供查询、重试和取消接口,主动方定期查询未完成或异常操作,进行重新执行或回滚,以实现最终一致性,此过程如图11-14所示。订单系统不能完全依赖支付系统回调,因此需定时任务主动回查支付状态,若处于中间态,则通过重试下单或退款等自动补偿操作确保一致。在消息中间件消费重试中也常见类似机制。极端情况下(如系统bug或数据库持续问题),当重试到一定次数或时间仍失败时,会触发告警并结合人工修复,人工补偿成为保障一致性的最后屏障。


  • 最大努力通知

  • 最大努力通知方案(Best-Effort Delivery)是一种适用于回调、通知类柔性事务的设计模式,常见于银行、商户通知等需异步告知事务状态的场景。其核心特点包括:

  • 努力通知​​:业务主动方(如支付系统)在完成处理后向被动方重复发送消息(N次后停止),允许消息丢失,确保基本可达性;

  • 定期校对​​:被动方(如订单系统)通过定时查询主动方接口(如图11-15所示),恢复丢失消息,弥补通知不确定性。

  • 以支付系统为例:微信支付需在回调失败时启动补偿机制,持续通知直至收到成功响应或达重试上限。定期校对本质是查询+补偿的组合策略,通过主动拉取保障最终一致性,成为分布式场景下可靠性设计的常见补充方案。


  • Saga 事务模式

  • Saga事务模式的历史十分悠久,甚至早于分布式事务概念的提出。其名称“Saga”意为“长篇故事”,核心思路是将一个大型分布式事务分解为一系列可交错运行的子事务(如图11-16和图11-17所示)。

  • Saga事务由幂等有序的子事务(T1, T2, ..., Tn)及其对应的补偿动作(C1, C2, ..., Cn)组成。若所有子事务成功提交,则事务完成;否则需执行恢复策略:向前恢复(持续重试)或向后恢复(回滚补偿)。以电商场景为例,T1扣减库存、T2创建订单、T3支付,若T2异常,两种恢复流程差异显著。

  • Saga适用于两类场景:一是业务流程长且多(如金融网络跨机构对接);二是需整合第三方或遗留系统,无法采用TCC或消息队列改造。其优势包括无锁设计(一阶段直接提交本地事务,避免资源锁定)和异步化能力(支持高吞吐处理)。

  • -

  • 但Saga模式存在与其他异步方案相同的缺点:因一阶段已提交本地事务且未预留资源,无法保证事务隔离性(如图11-16和图11-17所示)。

  • ​一阶段直接提交​​:Saga 的每个本地事务(Tn)在完成后会​​立即提交​​并释放资源,而不是先“预留”或“冻结”资源。后续事务是否执行,不影响已提交事务的可见性。

  • ​无全局锁​​:Saga 模式​​不依赖全局锁​​机制来隔离并发事务对共享资源的访问。已提交的数据可能立即被其他事务感知和操作。

  • 由于上述机制,在多个Saga事务并发执行,或Saga事务与系统内其他操作同时访问共享资源时,可能会遇到以下经典问题:

  • ​脏读(Dirty Read)​​:一个事务读取了另一个尚未完成(可能后续会失败并补偿)的Saga事务已提交的中间结果。如果后者最终失败,那么读到的就是无效的“脏”数据。

  • 丢失更新(Lost Update)​​:两个Saga事务同时读取同一数据并进行更新,其中一个事务的提交覆盖了另一个事务的更新结果。

  • 不可重复读(Non-Repeatable Read)​​:一个Saga事务内的两个连续操作读取同一数据,但在两次读取之间,该数据被另一个并发事务修改并提交,导致两次读取结果不一致。

  • -

  • 应对隔离性问题的常见策略

  • 语义锁(Semantic Lock):在业务数据中增加一个状态字段(如 status),用于标记数据正处于某个Saga事务处理中(如设为 PENDING)。其他事务发现此标记时,可以采取等待、跳过或拒绝策略。这在订单、库存等场景中很常见。

  • 版本控制(Versioning):为数据增加版本号(或时间戳)。更新数据时,需校验当前版本号与读取时是否一致(类似乐观锁)。若不一致,则说明数据已被其他事务修改,当前操作需中止或重试。

  • 重读值(Reread Value)​:在执行补偿操作前,重新读取数据的当前值,确保基于最新状态进行回滚,避免在错误的基础上进行补偿。


  • 本地事务状态表

  • 本地事务状态表是一种通用性较强的分布式事务处理方案。其核心机制在于:在执行分布式事务前,​​凭借数据库本地事务的原子性​​,先将整个事务的流程与状态信息原子性地持久化到专用状态表中。此后,调用方基于该状态表推进后续调用:每一步成功则更新状态,失败则中止。​​后台定时任务​​会持续扫描此表,对未完成的事务进行重试调用或回滚操作,若重试达阈值仍失败则触发告警,转入人工修复流程。

  • 异步确保模式是本地事务状态表方案的一种特定实现形式。其区别在于:在事务状态信息持久化完成后,​​并不立即执行实际的事务调用​​,而是完全依赖后台的定时任务扫描来触发和执行所有后续调用。此模式通过异步化调度进一步解耦了事务流程,提升了系统的灵活性与容错能力。


  • 本地消息表

  • 本地消息表与本地事务状态表设计思路类似,核心目标是通过补偿机制确保未完成事务的最终完成。消息中间件的 AT-LEAST-ONCE 投递机制本可用于实现补偿,但为何将事务异步化为任务持久化到消息中间件的方案被归类为弱一致方案?根本原因在于无法保证本地事务提交与消息发送的原子性:若 A 事务完成但消息未成功发送(如服务重启),则导致 A 成功而 B 未执行,数据不一致。

  • -

  • 开发者常尝试用 try-catch 将消息发送与本地事务捆绑,利用异常回滚保障一致性。但存在两个隐秘问题:

  • 消息发送超时​​:超时可能导致事务回滚,但消息实际发送成功,引发 B 事务执行而 A 回滚的不一致;

  • 事务提交失败​​:若消息发送后事务提交失败,消息已投递但 A 未执行,同样导致数据错误。

  • -

  • 本地消息表模式通过三重机制解决上述问题:

  • 原子性写入​​:将业务操作与消息记录置于同一数据库事务,保证两者原子性;

  • 异步可靠投递​​:执行后系统A不直接给消息中间件发消息, 而是通过后台定时任务扫描消息表中未发 送的消息进行发送。 对于发送过程遇到异常的消息则不断重试, 直到消息中间件返回 “发送 成功” 的确认, 并更新消息表中的投递状态。 这样一来, 就能保证进入到消息表中的消息最 终一定能被投递出去一次。

  • 幂等消费保障​​:依赖消息中间件 AT-LEAST-ONCE 投递,要求消费方实现幂等处理(可能出现重复发送)。


  • 可靠消息模式

  • 可靠消息模式是本地消息表方案的演进版本,通过引入独立的第三方消息管理模块实现业务解耦(本地消息表里面维护消息的发送 状态是由业务模块设计的一张本地消息表负责的, 使消息表的插入和本地事务的执行在一个 事务中, 这对于业务是有所侵入的, 无法做到很通用。。其关键流程分为四步:

  • 业务执行本地事务前,通知消息管理模块预发送消息(消息暂存数据库而非立即投递);

  • 业务完成本地事务后确认消息,触发管理模块正式投递;

  • 若未及时确认,管理模块主动回查消息状态;

  • 对已确认消息,管理模块保障投递至消息队列后标记发送成功。

  • 分布式事务存在一致性与可用性的本质矛盾,需根据场景选择不同实践方案。RocketMQ 提供的异步队列方案,正是基于可靠消息模式的思想,通过异步化与解耦设计平衡两者关系,为分布式系统提供了一种可靠的消息事务处理思路。

4. 异步队列方案的挑战

  • 分布式事务常采用异步化方案提升性能:当前系统处理完一个事务后,将后续事务委托给本地队列或消息中间件(如RocketMQ)异步执行,并辅以重试机制确保最终完成。这种设计通过解耦事务链提高了系统吞吐量,但因其可靠性局限被归类为“弱一致”。

  • 弱一致性原因:一是本地队列易丢失任务(如机器重启导致数据丢失);二是即使使用可靠消息中间件(如订单系统发送消息至积分系统),仍存在异常场景破坏一致性(本地事务执行和消息发送的原子性问题)。例如,RocketMQ虽具备消息持久化、重试等优秀特性,表面可实现最终一致,但实际上在特定异常场景(如消息发送与事务提交的原子性破裂)下会出现数据不一致,需进一步探讨具体案例。

①场景一:服务重启导致消息发送失败

/**
 * 分布式事务处理示例:订单与积分系统
 * <p>
 * 业务流程说明:
 * 1. 首先执行本地订单事务处理
 * 2. 成功后发送消息到积分系统处理用户积分
 * <p>
 * 注意:此方案存在潜在的一致性风险
 * 如果在本地事务成功后,消息发送前发生服务重启(常见于系统发布场景),
 * RocketMQ 实际上未收到该消息,导致积分系统事务无法执行。
 * 在此场景下,RocketMQ 的重试机制和可靠性保障都将无法发挥作用。
 */

public class DistributedTransactionExample
{

    public void processOrderWithPoints()
    {
        // 第一步:处理本地订单事务
        processOrderTransaction();

        // 第二步:发送消息给积分系统处理用户积分
        // 消费失败会利用 RocketMQ 的重试机制一直重试
        sendMsg();
    }

    private void processOrderTransaction()
    {
        // 本地订单事务处理逻辑实现
    }

    private void sendMsg()
    {
        // 消息发送逻辑实现
    }
}

②场景二:服务重启导致消息发送失败

  • 这时候有些读者可能会想到利用本地事务的能力, 把发送消息和本地事务打包在一个事务里, 如果事务没执行提交就重启是否就能解决问题呢? 伪代码如下所示。

  • 三种情况:

  • 本地事务成功, 消息发送成功, 事务提交, 事务完整。

  • 本地事务成功, 发送消息失败, 回滚事务, 事务完整。

  • 本地事务失败, 直接回滚事务, 事务完整。

  • -

  • 风险

  • 本地事务成功, 消息发送超时。 这时候按照处理流程是需要回滚本地事务的。 但是消 息发送超时实际上是一个不确定的状态, 可能消息实际上是发送成功的, 也就是说积分系统 会收到这个消息, 最后可能的状态就是订单没成功, 但是用户积分却算多了。

  • 本地事务成功, 消息发送成功, 事务提交之前机器重启。 虽然开发者已经把消息的发 送打包到了一个事务中, 但实际上消息的发送是不受事务控制的。 在程序执行完sendMsg这一 步之后, 消息实际上已经被发出去了。 消息发送出去后, 机器如果重启, 订单是没处理成功 的, 但这时候用户的积分却无故增长了。

③异步队列方案小结

  • 即使采用高可靠性消息中间件,若缺乏特殊处理,仍无法保证分布式事务的最终一致性。问题的核心在于消息生产方:需确保本地事务操作与消息发送的原子性(要么全成功,要么全失败)。

  • 为解决此问题,业界提出本地消息表等方案(将消息发送记录为状态并通过补偿机制保障成功),或引入独立消息管理模块(可靠消息模式)。但这些方案实现成本较高,需记录状态并应对中间件不可用风险。

  • 而RocketMQ事务消息创新性地将类似方案集成至消息中间件内部,形成一体化解决方案,显著降低了实现复杂度,使系统能更从容应对分布式事务的原子性挑战。

5. RocketMQ 事务消息的实现

①RocketMQ 事务消息实现的关键流程

  • RocketMQ 通过三阶段流程保障事务消息的可靠性:

  • 半消息发送​​:生产者先发送状态为“未确认”的半消息至 Broker,此类消息暂不投递给消费者;

  • 本地事务执行​​:半消息发送成功后,生产者执行本地事务(需与消息发送保持原子性);

  • 最终状态确认​​:依据本地事务结果,生产者通知 Broker 将半消息标记为“确认”(投递消费者)或“回滚”(永久不可见)。

  • 针对事务执行中的异常(如服务重启),RocketMQ 通过​​回查机制​​容错:Broker 定时扫描未确认消息,主动查询生产者以确认本地事务状态,据此更新消息最终状态。此设计确保在各种异常场景下,消息发送与本地事务的原子性得以保障,为后续基于 ACK 和重试机制实现分布式事务最终一致性奠定基础。

②RocketMQ 事务消息 VS 可靠消息模式

  • RocketMQ 事务消息的设计思想,本质上是将业界实践多年的可靠消息模式深度集成到消息中间件内部,使其成为一个开箱即用的通用特性。从流程上看,其核心机制与可靠消息模式高度相似(具体对比参见表 11-2),但通过内置化实现显著降低了使用复杂度。

  • 这一集成方案具有重大意义:在整个消息中间件领域中,RocketMQ 是目前提供分布式事务解决方案中最优雅、业务接入最简单的一个选择。其通过将复杂的一致性保障机制封装于中间件底层,为开发者提供了兼顾可靠性与易用性的分布式事务处理能力。

③RocketMQ 事务消息的存储

  • RocketMQ 的事务消息在发送时需使用特定的 API,但其消息体构造与普通消息完全一致,并无特殊之处。然而,在这看似简单的实现背后,RocketMQ 通过诸多精巧的底层设计,实现了分布式事务的高可靠性保障。


  • 标记事务消息

  • RocketMQ 对事务消息进行特殊标识处理,核心包含两个关键属性:

  • 事务标识属性 (TRAN_MSG)​​:标记为 true,使 Broker 能够识别该消息为事务消息,从而进入特殊处理流程;

  • 生产者组标识属性 (PGROUP)​​:值为消息所属的 ProducerGroup 名称,为 Broker 提供回查入口,确保在需要时能定位到正确的生产者进行状态确认。

  • 通过这两项属性标记,RocketMQ 在消息底层无缝集成了事务状态管理与回查能力,既实现了事务消息的可靠识别,又为异常场景下的协调恢复提供了必要信息支撑,保障了分布式事务的完整性与可靠性。


  • 暂存事务半消息

  • 在 RocketMQ 的事务消息机制中,客户端发送的消息会携带特殊标记(TRAN_MSG=true)。Broker 接收后识别为事务消息,并执行“偷天换日”操作:将消息暂存至专用中转主题 RMQ_SYS_TRANS_HALF_TOPIC(该主题仅含一个队列 queueId=0),此时消息状态为“半消息”,避免被消费者提前消费。同时,Broker 将原始主题和队列信息存入消息属性(REAL_TOPIC 和 REAL_QID),为后续事务确认后的恢复投递提供依据。这一设计确保了事务消息在本地事务完成前的隔离性与安全性。

/**
 * 事务消息桥接器核心实现类
 * 此部分核心源码位于 TransactionalMessageBridge.java 中
 */
public class TransactionalMessageBridge
{

    /**
     * 存储事务半消息
     *
     * @param messageInner 原始消息内部对象
     * @return 消息存储结果
     */
    public PutMessageResult putHalfMessage(MessageExtBrokerInner messageInner)
    {
        // 核心存储操作:将预处理后的半消息持久化存储
        return store.putMessage(parseHalfMessageInner(messageInner));
    }

    /**
     * 从原始消息构造半消息体
     * 完成关键预处理步骤:备份原始信息 → 重置事务标志 → 重定向存储位置
     *
     * @param msgInner 原始消息内部对象
     * @return 处理后的半消息对象
     */
    private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner)
    {
        // 备份消息的原主题名称与原队列ID到属性中
        // 确保事务完成后能正确恢复投递到原始目标
        MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());
        MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msgInner.getQueueId()));

        // 重置消息系统标志位,标记为事务半消息(非最终状态)
        msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));

        // 半消息统一存储到 RMQ_SYS_TRANS_HALF_TOPIC 的第0个队列里
        // 实现所有事务消息的集中管理,避免被消费者提前消费
        msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());
        msgInner.setQueueId(0);

        // 序列化消息属性到字符串格式,确保持久化时属性信息完整保存
        msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));

        return msgInner;
    }
}

  • 从半消息中恢复原消息

  • RMQ_SYS_TRANS_HALF_TOPIC 作为事务消息的中转存储主题,在本地事务确认为提交状态(COMMIT_MESSAGE)后,Broker 会启动半消息恢复流程。恢复时,Broker 从消息属性中提取原始主题(REAL_TOPIC)和队列ID(REAL_QID),将消息重新定向至真实目标。由于恢复过程实质是创建新消息,半消息的物理属性(如 offset)将全新生成,不与原消息共用。

  • 半消息与最终投递的真实消息在物理存储上完全独立,但逻辑上通过属性字段紧密关联。这种设计既保障了事务未决期间消息的隔离性,又确保了事务提交后消息能准确路由至原始目标队列,实现了物理隔离与逻辑统一的平衡。

/**
 * 事务消息恢复处理器
 * 核心代码位于 EndTransactionProcessor.java
 * 负责处理事务的最终提交或回滚请求,并恢复原始消息
 */
public class EndTransactionProcessor implements NettyRequestProcessor
{

    /**
     * 处理事务提交或回滚请求
     *
     * @param ctx     网络通道上下文
     * @param request 远程命令请求
     * @return 处理结果
     * @throws RemotingCommandException 处理异常
     */
    @Override
    public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException
    {

        // ... 前面逻辑略

        OperationResult result = new OperationResult();

        // 如果是提交事务的请求则处理
        if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback())
        {
            // 提交事务消息
            result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader);

            if (result.getResponseCode() == ResponseCode.SUCCESS)
            {
                RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);

                if (res.getCode() == ResponseCode.SUCCESS)
                {
                    // 首先从半消息里恢复原事务消息的真实主题、队列,并设置事务ID
                    MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage());

                    // 设置事务 sysFlag - 重置事务标志位
                    msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(
                            msgInner.getSysFlag(), requestHeader.getCommitOrRollback()));

                    // 设置这条事务消息的物理属性
                    msgInner.setQueueOffset(requestHeader.getTranStateTableOffset());
                    msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset());
                    msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp());

                    // 去除事务消息的标记 - 清除事务准备属性
                    MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_TRANSACTION_PREPARED);

                    // 发送最终消息到目标队列
                    RemotingCommand sendResult = sendFinalMessage(msgInner);

                    if (sendResult.getCode() == ResponseCode.SUCCESS)
                    {
                        // 成功发送后删除准备状态的消息
                        this.brokerController.getTransactionalMessageService()
                                .deletePrepareMessage(result.getPrepareMessage());
                    }
                    return sendResult;
                }
                return res;
            }
        }
        // 恢复原事务消息处理完成
        // ... 其他逻辑
    }

    /**
     * 从事务半消息中恢复原始消息内容
     * 从半消息的用户属性中提取真实主题和队列信息,重建原始消息
     *
     * @param msgExt 事务半消息
     * @return 恢复后的原始消息内部对象
     */
    private MessageExtBrokerInner endMessageTransaction(MessageExt msgExt)
    {
        MessageExtBrokerInner msgInner = new MessageExtBrokerInner();

        // 恢复主题和队列 - 从用户属性中获取真实主题和队列ID
        msgInner.setTopic(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_TOPIC));
        msgInner.setQueueId(Integer.parseInt(
                msgExt.getUserProperty(MessageConst.PROPERTY_REAL_QUEUE_ID)));

        // 其他属性从原消息中复制恢复
        msgInner.setBody(msgExt.getBody());
        msgInner.setFlag(msgExt.getFlag());
        msgInner.setBornTimestamp(msgExt.getBornTimestamp());
        msgInner.setBornHost(msgExt.getBornHost());
        msgInner.setStoreHost(msgExt.getStoreHost());
        msgInner.setReconsumeTimes(msgExt.getReconsumeTimes());
        msgInner.setWaitStoreMsgOK(false);

        // 设置事务ID
        msgInner.setTransactionId(
                msgExt.getUserProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGEID_KEYIDX));
        msgInner.setSysFlag(msgExt.getSysFlag());

        // 确定标签过滤类型(多标签或单标签)
        TopicFilterType topicFilterType =
                (msgInner.getSysFlag() & MessageSysFlag.MULTITAGS_FLAG) == MessageSysFlag.MULTITAGS_FLAG
                        ? TopicFilterType.MULTITAG
                        : TopicFilterType.SINGLE_TAG;

        // 转换标签字符串为标签代码
        long tagsCodeValue = MessageExtBrokerInner.tagsString2tagsCode(
                topicFilterType, msgInner.getTags());
        msgInner.setTagsCode(tagsCodeValue);

        // 设置消息属性
        MessageAccessor.setProperties(msgInner, msgExt.getProperties());
        msgInner.setPropertiesString(
                MessageDecoder.messageProperties2String(msgExt.getProperties()));

        // 清理恢复过程中使用的临时属性
        MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC);
        MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID);

        return msgInner;
    }
}

  • 事务消息回滚

  • 当本地事务执行失败时,生产者需向 Broker 发送 rollback 确认(而非 commit)。Broker 收到后,会将对应的半消息从 RMQ_SYS_TRANS_HALF_TOPIC​“删除”​​——实质是将其转移至专用主题 RMQ_SYS_TRANS_OP_HALF_TOPIC中归档。此操作确保该消息不再参与后续的恢复流程与事务回查,彻底终止其生命周期。

  • 通过归档式删除,RocketMQ 实现了事务回滚消息的​​不可逆终止​​,避免因残留半消息引发错误恢复。此机制保障了事务失败场景的最终一致性,同时通过逻辑隔离降低了系统复杂度。

④RocketMQ 事务回查及状态

  • RocketMQ 的事务消息常规流程要求客户端顺利执行本地事务并正常向 Broker 提交状态,以便 Broker 更新半消息状态(投递或删除)。然而,在分布式环境下,这一过程存在显著不确定性,主要体现为两种典型场景:

  • 半消息发送成功但本地事务执行超时(可能因数据库慢查询、RPC 或服务不可用导致,但超时未必意味失败);

  • 本地事务成功但向 Broker 发送确认指令失败(因 Broker 作为分布式节点可能面临网络问题、服务超时或宕机)。

  • 上述场景凸显了保障本地事务与消息状态最终一致的挑战。为此,RocketMQ 设计了回查机制:Broker 定期扫描未确认的半消息,并向原生产者查询其本地事务的最终状态,据此更新半消息状态。该机制通过主动协调解决了分布式场景下的状态同步难题,虽实现复杂,但为事务消息的可靠性提供了关键保障。


  • 事务回查需要解决的难点

  • RocketMQ 在实现事务消息回查机制时面临三个关键挑战:

  • 执行者故障后的状态确认​​:若事务执行方完成本地事务后服务不可用且长期未重启,需解决如何确认事务状态以保障订单最终一致的问题;

  • 独立组件的状态自治​​:作为不依赖外部数据库的第三方组件,需实现半消息的自主存储及提交/回滚状态跟踪;

  • 乱序提交下的回查协调​​:事务消息的提交顺序和发送顺序可能是不一样的, 如何维持哪些事务需要回查哪些事 务不需要回查呢? 例如事务消息发送可能是1、 2、 3,但是提交的顺序可能是2、 3、 1。 当第2 条消息提交的时候, 回查的消息是1、 3。 这听起来很自然, 但是在缺乏MySQL这种存储引擎 的帮助下, 要实现这个待回查表实际上是很难的。


  • 回查对象高可用设计

  • 为解决执行本地事务的实例可能因发布、重启等操作而宕机的难题,RocketMQ 引入了​​生产者组(ProducerGroup)​​ 概念(专用于事务场景)。同一生产者组的实例被视为对等实体,共享事务状态查询能力。

  • 事务消息携带特殊的 PGROUP属性(存储生产者组名称),使 Broker 可精准定位消息来源。回查时,Broker 通过其维护的生产者组连接记录(代码见 ProducerManager.java),向组内任一可用实例发起查询,理论上均可获取事务真实状态。此设计通过组内实例的等价性与连接冗余性,保障了回查操作的高可用性与可靠性。


  • 半消息的高可用存储设计

  • image-Hoqz.png

  • 半消息实际存储于专用主题 RMQ_SYS_TRANS_HALF_TOPIC,其物理存储结构与普通消息完全一致,因此天然继承 RocketMQ 消息存储的高可用特性(包括主备复制、持久化落地等),无需额外设计即可解决半消息的可用性挑战(见图11-25)。

  • 针对回查需避免全量扫描的问题,RocketMQ 利用半消息​​单一队列顺序存储​​的特性,通过虚拟消费者组 CID_RMQ_SYS_TRANS的消费位点记录上次回查位置。每次回查仅需从该位点向后扫描新消息,复用消费位点的持久化与高可用设计,既确保回查进度可靠性,又大幅提升查询效率(见图11-26)。


  • 记录半消息的处理状态

  • RocketMQ 在处理半消息时具有顺序灵活性:无论是 Broker 主动发起的回查,还是客户端触发的提交或回滚操作,其执行顺序均可根据实际场景动态调整,无需严格遵循特定时序。

  • 事务消息的存储顺序与最终提交/回滚状态的实际确定顺序可能存在差异。例如:生产者顺序发送 5 条消息(msg1 至 msg5),但因各消息处理耗时不同,可能出现 msg2 比 msg1 更早提交,或 msg4 因快速异常回滚而比 msg1 更早确定状态的情况。这种乱序特性由系统异步处理机制自然形成,并不影响事务的最终一致性保障。

  • 存储顺序和状态确认的顺序不一致的主要原因如下:

  • 不同的本地事务处理时间可能不一样, 半消息msg1可能比msg2先存储, 但是事务T2 可能比T1更早提交确认, 故msg2比msg1会更早确认半消息状态。

  • Broker 发起回查是按照消息存储顺序发起的, 但是单次消息的回查是异步的, 而且是 不可靠的。 试想一下, 如果消息的回查是同步的, 那么有一个事务生产者的回查结果卡死了, 就会阻塞整条队列里半消息的回查任务, 故而RocketMQ的回查都是异步不等待结果的, 需要 等待客户端异步确认事务状态, 所以回查的顺序可能是msg1、 msg2、 msg3, 但是收到确认状 态可能是乱序的 (如msg3、 msg2、 msg1)。

  • -

  • 存储机制与回查进度管理的局限:半消息基于 CommitLog 顺序写入,无法随机修改已写入内容,且 RMQ_SYS_TRANS_HALF_TOPIC 主题无法记录消息确认状态。回查进度依赖虚拟消费者的消费位点,但因确认顺序可能乱序(如 msg4 和 msg2 已确认但 msg0 未确认),无法直接更新回查位点到已确认位置,否则会跳过未处理消息,导致回查机制失效。

  • 前文介绍过哪些消息需要回查哪些不回查 (实际上是已处理的半消息的进度) 是 依赖半消息队列上一个消费进度的, 但是现在确认顺序可能完全是打乱的, 这会有什么困难 呢? 如图11-28 所示, msg4和msg2 的位点是3 和1, 虽然其都是已确认状态, 但是RocketMQ 的虚拟消费者CID_RMQ_SYS_TRANS 肯定不能把回查位点更新到3 或者1的位置上, 因为0 这个位置的消息还没确认.

  • -

  • RocketMQ 为保障事务消息的完整生命周期管理,设计了三个协同工作的主题:

  • 原消息真实主题​​:存储业务原始消息;

  • RMQ_SYS_TRANS_HALF_TOPIC​​:专用于暂存半消息(状态未决);

  • RMQ_SYS_TRANS_OP_HALF_TOPIC(OP主题)​​:记录半消息的最终处理状态(commit/rollback),标记为删除状态以避免重复回查。

  • OP主题采用单队列结构,其核心价值在于通过状态持久化实现消息处理状态的可靠跟踪。三主题协作完成状态流转:当半消息被提交或回滚时,状态变更被记录至OP主题,形成闭环管理,确保每条事务消息的生命周期可追溯、不重复处理。

  • -

  • 状态流转

  • 半消息提交状态处理

  • 当 Broker 接收到半消息的 commit 指令后,执行两个核心操作:首先写入一条 OP 消息,其消息体存储该 commit 消息在队列中的偏移量(queueOffset),作为指向原半消息的指针地址,同时标记原半消息为“已删除”状态(此标记使后续回查机制忽略该消息);随后读取该半消息,还原其原始主题等元数据,并重新写入目标主题,转化为可被消费者正常消费的普通消息。

  • 半消息回滚状态处理

  • 当Broker 接收到一个半消息是rollback 状态时, Broker 同样写入 OP 消息, 流程和 commit 一样, 也会标记这条半消息是被删除的。 但后续不会读取和还原该半消息。 这样消费者就不 会消费到该消息, 如图11-30所示。

  • 半消息unknow 状态处理

  • 当 Broker 接收到状态为 "unknown" 的半消息时,不会立即处理,而是依赖事务回查机制确认其最终状态(commit 或 rollback)。这一设计避免了因本地事务未完成或状态未确认而导致的误操作。

  • 即使 commit 或 rollback 指令发送失败(如 Broker 故障或网络闪断),处理流程仍会退回至事务回查阶段,通过重试机制确保消息状态最终被正确确认。这种设计保障了事务消息在各种异常场景下的可靠性与最终一致性。

6. RocketMQ 事务消息与Kafka 事务消息的对比

  • 尽管 Kafka 与 RocketMQ 均提供事务消息功能,但两者设计目标与解决场景存在根本区别,并非简单的优化关系。RocketMQ 事务消息核心解决​​本地事务与消息发送的最终一致性​​问题,而 Kafka 事务消息则专注于实现 ​​Exactly-Once(精确一次)​​ 语义,旨在避免消息重复投递或丢失,确保流处理场景下的端到端数据准确性。

  • 理解 Kafka 事务消息需首先掌握 Exactly-Once 概念,其设计初衷与 RocketMQ 的分布式事务保障路径截然不同。

①Kafka 想要解决的Exactly-Once 是什么

  • Kafka 提供三种消息投递语义,其核心区别在于对消息持久化行为的保证程度:

  • At-most-once(至多一次)​​:消息最多被持久化一次,可能丢失但绝不重发,对应 Producer 配置 acks=0

  • At-least-once(至少一次)​​:消息最少持久化一次,绝不丢失但可能重复,如 acks=all下超时重发致消息写入两次;

  • Exactly-once(精确一次)​​:消息肯定且仅持久化一次,兼顾防丢与防重。

  • 需注意 Exactly-once 语义主要解决​​消息重复问题​​(如生产者重试导致的重复写入),其实现依赖于 Kafka 事务机制而非业务侧的事务处理。该特性通过服务端去重保障流处理场景的端到端准确性,与传统数据库事务的 ACID 特性有本质区别。

②Kafka 事务消息的表现

  • Kafka 的幂等 Producer 通过设置 enable.idempotence=true实现,Broker 为每条消息生成唯一 ID 进行去重,支持 Exactly-once 语义。但该机制有两个局限:仅保证单分区内的消息不重复,无法跨多分区;且仅支持单会话幂等,Producer 重启后无法维持跨会话幂等性。

  • 为解决幂等 Producer 的不足,Kafka 引入事务 Producer,支持多分区数据的原子性写入和跨会话的 Exactly-once 处理语义(即使 Producer 重启也能保证数据只处理一次)。需配合事务 Consumer 使用,方可实现端到端的 Exactly-once 保证。

  • Kafka 事务机制专注于消息的 Exactly-Once 语义,适用于流处理场景;而 RocketMQ 的事务设计旨在解决本地事务与消息发送的最终一致性问题,两者应用场景有本质区别。

十二. RocketMQ延迟消息原理​

1. 互联网场景下延迟任务的常见场景

  • 延迟任务指需在未来特定时间点触发的业务操作,常见于三类场景:

  • 支付关单​​:订单下单后需在30分钟后关闭未支付订单;

  • 账单提醒​​:信贷场景中凌晨跑批生成的还款提醒,需延迟至早10:00发送以避免骚扰用户;

  • 外部系统超时处理​​:如审核系统回调超时(如30分钟未返回),需启动超时兜底策略(如自动发布文章)。

  • -

  • 传统实现方案:定时任务轮询

  • 传统方案采用定时任务(如每分钟执行)扫描业务数据:

  • 扫描条件​​:筛选未支付订单且下单时间在31分钟内(30分钟时效+1分钟间隔);

  • ​执行操作​​:对符合条件的订单执行关单等操作。

  • 此方案通过高频轮询平衡时效性与实现复杂度,是延迟任务的典型基础实现。

2. 使用定时任务处理延迟任务的一些问题

①时效性

  • 定时任务方案(如每分钟执行)在处理延迟操作时存在固有缺陷:订单若在周期临界点生成(如9:30:01),可能因存活时间不足整周期而被下一轮扫描遗漏(如10:00:00任务忽略29分59秒订单),导致实际执行延迟近1分钟(延至10:01:00)。虽可通过缩短执行间隔(如X秒)降低延迟,但更高频的扫描会加剧系统性能消耗,形成时效性与资源开销的矛盾。

②扫描性能

  • 当订单数据量达到千万级别时,定时任务扫描方案面临严峻的性能挑战:每次处理超时订单均需对全量数据进行扫描(如1000万条记录),即使通过索引优化查询,其开销依然巨大。若订单表进一步采用分库分表策略(如拆分为128张表),则需对所有分表执行全表扫描,导致资源消耗成倍增长,严重制约系统扩展性与响应效率。

③单一周期处理量问题

  • 在双11等大促活动凌晨时段,系统可能瞬时产生海量订单,其中包含大量需超时关单的订单。例如在0:30需集中处理关单操作时,若单个定时任务周期内无法完成所有订单处理(如千万级数据量),会导致任务堆积,进而引发后续定时任务连锁拥塞,形成处理能力与任务积压的恶性循环。

3. 认识RocketMQ 延迟消息

①RocketMQ 延迟消息的优点

  • RocketMQ 通过定时消息机制优化延迟任务处理:业务方(如订单系统)可在下单时发送携带业务数据(如订单号)的定时消息,并指定投递时间(如30分钟后)。消息中间件在约定时间精准投递,消费者接收后执行对应业务逻辑(如查询订单状态并关单)。

  • 时效性保障​​:依赖消息中间件的高性能投递机制,规避数据库扫描延迟;

  • 性能与扩展性​​:无数据库全表扫描压力,通过增加消费者即可提升并发处理能力;

  • 架构简洁性​​:替代繁琐的定时任务调度与扫描逻辑,实现更优雅的业务解耦。

  • 需注意 RocketMQ 暂不支持任意精度定时(如秒级),仅提供固定延迟级别(如1s/5s/10s/30s/1m等),但其设计已覆盖典型业务场景,在性能与易用性上显著优于传统方案。

②RocketMQ 延迟消息的代码示意

/**
 * RocketMQ 延迟消息示例类
 * 演示如何发送和消费延迟消息,用于处理如订单关单等延迟任务
 */
public class RocketMQDelayMessageExample {

    /**
     * 发送延迟消息的示例代码
     * 相关的代码也非常简单,发送延迟消息的示意代码如下
     */
    public void sendDelayMessage() throws Exception {
        // 正常启动一个生产者
        DefaultMQProducer producer = new DefaultMQProducer("DelayProducerGroup");
        producer.start();

        // 和普通消息一样,构建对应的消息体
        String orderNo = "LO123456789"; // 订单号
        Message delayMsg = new Message();
        delayMsg.setTopic("TOPIC-CLOSE-ORDER");
        delayMsg.setBody("LO123456789".getBytes()); // 延迟消息的消息体,这里只需要存放订单号即可

        // 唯一不同:对消息设置延迟level为16,默认配置下,16对应为延迟30min
        delayMsg.setDelayTimeLevel(16);

        // 和普通消息一样调用相同的方法进行发送
        producer.send(delayMsg); // 发送消息,API和普通发送消息一致
        producer.shutdown();
    }

    /**
     * 消费延迟消息的示例代码
     * 消费者端注册并发消息监听器处理延迟消息
     */
    public void consumeDelayMessage() throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("DelayCloseOrderConsumer");
        consumer.subscribe("TOPIC-CLOSE-ORDER", "*"); // 订阅延迟消息的主题

        // 注册回调,里面有处理关单逻辑
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages,
                                                           ConsumeConcurrentlyContext context) {
                for (MessageExt message : messages) {
                    // 打印日志,通过时间差确认是否按预期的延迟投递
                    System.out.printf("接收到延迟消息[ msgId=%s ms later ] \n", message.getMsgId(),
                            System.currentTimeMillis() - message.getStoreTimestamp());
                    String orderNo = new String(message.getBody()); // 从消息体里解析出订单号
                    // 检查订单,如果超时就执行关单操作。注意这里的代码,假设程序有一个orderDao的对象封装好了此逻辑
                    // orderDao.closeOrderIfUnpaid(orderNo);
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
    }

    /**
     * 延迟级别配置说明:
     * 在Broker里面有一个配置项叫作messageDelayLevel,在不修改设置的情况下总共有18个级别,其值如下:
     * 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
     * 其中后面的字母表示单位,分别是秒(s)、分钟(m)、小时(h)和天(d)。
     * 而这里每个位置就是delay level的值(从1开始计算),所以30分钟在默认的配置下就是16。
     * 一般情况下,默认的等级已经够用。若不够用通常只需要往后追加级别即可,例如需要一天后生效的则改为1d。
     */
}

4. 延迟消息实现原理

①RocketMQ 延迟消息存储实现

  • RocketMQ 的延迟消息在存储层面与普通消息及事务消息采用相同的机制,均基于 commitlog 加 consumequeue 的架构实现。因此,其存储可靠性与普通消息完全一致,不会因采用延迟特性而降低可靠性。该设计通过多副本机制保障数据安全,确保即使单个 Broker 节点发生故障,消息也不会丢失。

②RocketMQ 延迟消息转存

  • RocketMQ 的延迟消息通过“转存”机制实现时间控制:Broker 在存储消息时,若检测到 delayLevel 设置,会将消息转存至无消费者订阅的 SCHEDULE_TOPIC_XXX 主题,避免提前投递。转存时,真实主题和队列 ID 分别存入 REAL_TOPIC 和 REAL_QID 属性(转存逻辑位于 Commitlog.java)。

  • 投递时间由存储时间加延迟级别计算得出,其绝对值存储在 consumequeue 的 TagCode 字段中(设计为 8 字节以容纳时间戳)。当到达投递时间时,调度模块将消息恢复至原主题队列供消费。tagCodes 字段在此过程中起关键作用,用于判断消息是否到达调度时间。

// 如果消息是延迟消息
if (msg.getDelayTimeLevel() > 0) {
    // 检查延迟级别是否超过最大允许值,如果超过则设置为最大延迟级别
    if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
        msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
    }
}

// 主题改为延迟消息的主题暂存:使用 RocketMQ 系统预定义的调度主题
topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
// 队列则依据其延迟级别计算得出:通过延迟级别映射到对应的队列ID
int queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
// 在消息中备份原来真实的主题和真实的队列:将原始主题和队列信息存储为消息属性,以便后续恢复
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
// 将消息属性序列化为字符串格式,确保持久化时属性信息完整保存
msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
// 设置消息的新主题和队列ID,完成延迟消息的转存操作
msg.setTopic(topic);
msg.setQueueId(queueId);
}

③延迟主题的队列

  • 延迟消息的转存逻辑与事务消息类似,但关键区别在于其专用主题 SCHEDULE_TOPIC_XXXX的队列数量并非单一。队列总数直接由 messageDelayLevel配置项决定:以默认设置为例,18 个延迟级别对应 18 个独立队列,队列 ID 与延迟级别一一映射(如队列 0 存储级别 1 的消息,队列 1 存储级别 2 的消息,依此类推)。此设计通过队列隔离实现不同延迟级别消息的并行管理,提升处理效率。

④延迟消息的调度

  • RocketMQ 作为由程序员开发的系统,在实现调度功能时不可避免地需要编写定时任务。其底层调度机制是基于 Java 标准库中的 ScheduledExecutorService类来实现的,该类提供了强大的定时任务执行能力,用于管理延迟执行和周期性任务。

⑤调度的性能问题

  • RocketMQ 的 ConsumeQueue 在设计上不支持基于延迟级别的搜索或删除操作,这导致在处理延迟消息时必须进行全量扫描。例如,当队列中存在 10,000 条消息(其中 5,000 条已投递)且每秒需筛选 100 条当前投递的消息时,系统每次仍需扫描全部消息,造成显著的性能负担。此问题与传统定时任务的全表扫描瓶颈类似。

  • 消息的存储顺序与投递顺序可能不一致(如存储为 MSG1、MSG2、MSG3,投递顺序可能为 MSG3、MSG1、MSG2),因此无法通过记录投递进度来优化扫描范围。投递完成后,下次扫描不能直接从上次位点开始,因为前方可能存在投递时间未来的消息。

  • 尽管可借鉴事务消息的 OP 主题机制(通过额外主题记录处理状态),但延迟消息调度频率高,引入 OP 主题会导致大量重复存储,增加性能开销。因此 RocketMQ 未采用此方案。值得注意的是,同一延迟级别下的消息(如均为 10min 延迟),其存储与投递顺序一致;但不同级别消息按级别顺序投递,这一特性为部分场景提供了有限优化。

⑥分队列调度解决投递顺序问题

  • RocketMQ 的 SCHEDULE_TOPIC_XXXX主题采用多队列设计(非单队列),核心目的是实现队列 ID 与延迟级别的一一对应。此设计确保每个队列仅处理特定延迟级别的消息,且消息在队列内严格按投递时间排序,从而保障存储顺序与投递顺序的一致性,避免乱序投递。

  • 由于每个队列的延迟级别固定且消息按时间有序,系统可基于延迟级别维度管理投递进度:已投递消息无需重复扫描,进度持续向前推进。投递进度以 JSON 格式持久化至 $HOME/store/config/delayOffset.json文件(如 {"offsetTable": {"1": 14, "2": 1}}),记录各队列 ID 的当前偏移量。该机制确保 Broker 重启后能快速恢复进度,维持延迟消息处理的可靠性与效率。

⑦调度的性能优化

  • 消息按预期投递时间排序(相同延迟级别下与存储时间顺序一致)。扫描时无需遍历全队列,仅需检查下一条消息:若未到投递时间,则后续消息皆可跳过;若已到时间,则投递后继续扫描下一条,极大提升处理效率。

⑧调度的延迟

  • RocketMQ 的调度器作为普通任务运行,固有存在一定延迟,主要源于三种场景:

  • ​队列空置延迟​​:当扫描完成所有延迟消息后,调度器会进入100ms睡眠再重启检查,导致最大100ms延迟;

  • 时间未到延迟​​:若首条消息投递时间与当前时间差极小(如1ms),调度器仍会睡眠100ms后重试,同样引入最多100ms延迟;

  • IO堆积延迟​​:大量消息同时投递时,因需IO操作将消息恢复至原主题,可能因处理瓶颈造成部分消息投递延后。

5. 实现任意精度的定时消息

  • RocketMQ 的延时消息功能仅支持预设的时间精度(如固定延迟级别),无法满足所有场景下的精确定时需求。例如,在用户注册后次日凌晨6点发送通知的用例中,由于注册时间点的细微差异(如23:00:00与23:00:01),所需延迟时间可能不同(6小时与5小时59分59秒),导致消息投递时间无法精准对齐目标时刻。这种局限性凸显了在需要高精度时间控制的业务场景中,系统需依赖更灵活的定时消息机制,以确保任务在特定时间点准确触发。

①简易逼近法

  • RocketMQ 的定时消息功能通过一种简单的重复延时机制实现:即使目标时间点较远(如500米外的学校可通过1000步到达),也可通过多次短延时(如1秒级)消息串联达成。具体实现时,消息体携带投递时间戳(如deliveryTime字段),消费者检查当前时间若未到目标,则重新发送回原Topic继续等待;若到达则触发业务逻辑。此方案支持任意时刻的定时需求,精度误差控制在1秒内,得益于RocketMQ最小延迟级别为1秒的设计。

/**
 * 自定义延迟消息消费者示例
 * 通过消息重发机制实现精确定时投递
 */
public class DelayMessageConsumer
{

    /**
     * 注册并发消息监听器
     * 核心逻辑:检查消息投递时间,未到目标时间则重新发送延迟消息
     */
    public void setupConsumer()
    {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("YourConsumerGroup");
        // ... 初始化消费者配置

        consumer.registerMessageListener(new MessageListenerConcurrently()
        {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                            ConsumeConcurrentlyContext context)
            {
                for (MessageExt msg : msgs)
                {
                    // 解析这条消息成为一个自定义的结构,通常情况下就是 JSON 反序列化
                    MyMsg myMsg = parse(msg);

                    // 拿到这个消息目标投递的时间戳
                    long deliveryTime = myMsg.getDeliveryTime();

                    // 如果时间已经到了,就立刻执行消费逻辑
                    if (System.currentTimeMillis() >= deliveryTime)
                    {
                        handle(myMsg); // 做对应的消费逻辑
                    } else
                    {
                        // 时间没到,就继续重复发送这条消息,等到时间到了才执行消费逻辑
                        // 复制这个消息
                        MessageExt newMsg = cloneMsg(msg);
                        // 新消息固定延迟 1s
                        newMsg.setDelayTimeLevel(1);
                        // 把这个新消息发送回原主题,新消费的时候会重复一样的逻辑
                        sendMsg(newMsg);
                    }
                }
                // 已经发送新消息回主题了,这条消息就可以认为消费成功了
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
    }

    // 以下为需要实现的辅助方法示例
    private MyMsg parse(MessageExt msg)
    {
        // 实现消息解析逻辑(如JSON反序列化)
        return null;
    }

    private MessageExt cloneMsg(MessageExt msg)
    {
        // 实现消息复制逻辑
        return null;
    }

    private void sendMsg(MessageExt msg)
    {
        // 实现消息发送逻辑
    }

    private void handle(MyMsg myMsg)
    {
        // 实现具体的消息处理逻辑
    }

    // 自定义消息结构示例
    static class MyMsg
    {
        private long deliveryTime;
        // 其他字段...

        public long getDeliveryTime()
        {
            return deliveryTime;
        }
    }
}

②最优逼近法模拟任意时刻的定时消息

  • 简易逼近法虽可模拟任意时刻定时消息,但效率低下:例如延迟10小时1秒需重复发送消息36,001次,代价过高。为此,可基于RocketMQ固有18个延迟级别(如1s、2h等)采用组合逼近策略:以延迟10小时1秒为例,通过5次2小时延迟(级别18)逐步逼近剩余时间,最后叠加1秒延迟(级别1),仅需发送6条消息即可实现目标。该方案通过计算最接近的delayLevel动态选择延迟级别,显著降低消息重复次数,平衡了精度与性能。

/**
 * RocketMQ 定时消息消费者示例
 * 通过消息重发机制实现精确定时投递,支持任意目标时间点的消息处理
 */
public class DelayMessageConsumer
{

    /**
     * 设置消费者并注册消息监听器
     * 核心逻辑:检查消息投递时间,未到目标时间则重新发送延迟消息
     */
    public void setupConsumer()
    {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("YourConsumerGroup");
        // 消费者初始化配置(如NameServer地址、订阅主题等)
        // consumer.setNamesrvAddr("localhost:9876");
        // consumer.subscribe("YourTopic", "*");

        // 注册并发消息监听器
        consumer.registerMessageListener(new MessageListenerConcurrently()
        {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                            ConsumeConcurrentlyContext context)
            {
                for (MessageExt msg : msgs)
                {
                    // 解析这条消息成为一个自定义的结构,通常情况下就是 JSON 反序列化
                    MyMsg myMsg = parse(msg);

                    // 拿到这个消息目标投递的时间戳
                    long deliveryTime = myMsg.getDeliveryTime();

                    // 如果时间已经到了,就立刻执行消费逻辑
                    if (System.currentTimeMillis() >= deliveryTime)
                    {
                        handle(myMsg); // 做对应的消费逻辑
                    } else
                    {
                        // 时间没到,就继续重复发送这条消息,等到时间到了才执行消费逻辑
                        // 复制这个消息
                        MessageExt newMsg = cloneMsg(msg);
                        // 计算最接近投递时间的一个延迟级别
                        int nextDelayLevel = countNextDelayLevel(deliveryTime);
                        newMsg.setDelayTimeLevel(nextDelayLevel);
                        // 把这个新消息发送回原主题,新消费的时候会重复一样的逻辑,如果时间还没到会继续重复发送
                        sendMsg(newMsg);
                    }
                }
                // 打印接收消息日志(修复原图格式错误)
                System.out.printf("Receive New Messages:%s %n", Thread.currentThread().getName(), msgs);
                // 已经发送新消息回主题了,这条消息就可以认为消费成功了
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
    }

    /**
     * 计算下一个延迟级别
     * 根据目标投递时间动态选择最接近的RocketMQ延迟级别
     *
     * @param deliveryTime 目标投递时间戳
     * @return 延迟级别(1-18)
     */
    public static int countNextDelayLevel(long deliveryTime)
    {
        long now = System.currentTimeMillis();
        // 定义延迟时间数组,对应RocketMQ的18个延迟级别
        long[] delayTimes = new long[]{
                now + TimeUnit.SECONDS.toMillis(1),  // 1s
                now + TimeUnit.SECONDS.toMillis(5),  // 5s
                now + TimeUnit.SECONDS.toMillis(10), // 10s
                now + TimeUnit.SECONDS.toMillis(30), // 30s
                now + TimeUnit.MINUTES.toMillis(1),  // 1m
                now + TimeUnit.MINUTES.toMillis(2),  // 2m
                now + TimeUnit.MINUTES.toMillis(3),  // 3m
                now + TimeUnit.MINUTES.toMillis(4),  // 4m
                now + TimeUnit.MINUTES.toMillis(5),  // 5m
                now + TimeUnit.MINUTES.toMillis(6),  // 6m
                now + TimeUnit.MINUTES.toMillis(7),  // 7m
                now + TimeUnit.MINUTES.toMillis(8),  // 8m
                now + TimeUnit.MINUTES.toMillis(9),  // 9m
                now + TimeUnit.MINUTES.toMillis(10), // 10m
                now + TimeUnit.MINUTES.toMillis(20), // 20m
                now + TimeUnit.MINUTES.toMillis(30), // 30m
                now + TimeUnit.HOURS.toMillis(1),    // 1h
                now + TimeUnit.HOURS.toMillis(2)     // 2h
        };

        int returnIndex = 0;
        // 倒序遍历延迟时间数组,找到第一个小于或等于目标投递时间的索引
        for (int i = delayTimes.length - 1; i >= 0; i--)
        {
            long time = delayTimes[i];
            if (time > deliveryTime)
            {
                continue;
            } else
            {
                returnIndex = i;
                break;
            }
        }
        // 返回索引加1(因为RocketMQ延迟级别从1开始)
        return returnIndex + 1;
    }

    // 以下为需要实现的辅助方法示例
    private MyMsg parse(MessageExt msg)
    {
        // 实现消息解析逻辑(如JSON反序列化)
        return null;
    }

    private MessageExt cloneMsg(MessageExt msg)
    {
        // 实现消息复制逻辑
        return null;
    }

    private void sendMsg(MessageExt msg)
    {
        // 实现消息发送逻辑
    }

    private void handle(MyMsg myMsg)
    {
        // 实现具体的消息处理逻辑
    }

    // 自定义消息结构示例
    static class MyMsg
    {
        private long deliveryTime;
        // 其他字段...

        public long getDeliveryTime()
        {
            return deliveryTime;
        }
    }
}

③代理定时消息方案

  • 为实现精确定时消息(误差控制在1秒内)同时避免消费者感知定时逻辑,可采用代理服务模式:消息发送者将定时消息发送至预留系统主题(如 SYSTEM_SCHEDULE_MSG),并在消息属性中设置目标投递时间和真实主题(如短信主题 topic_sms_push)。代理服务作为专属消费者,监听该系统主题,通过逼近算法计算 RocketMQ 内置延迟等级(如1s、5s等),动态重发消息至系统主题暂存。到达投递时间后,代理服务将消息转发至真实目标主题,此时业务消费者(如短信服务)接收后立即处理,无需关注定时机制(重发逻辑)。此方案通过代理层封装复杂性,兼顾了定时精度、系统解耦与开发便利性,依托 RocketMQ 原生延迟功能降低实现成本。

④5.x 版本定时消息展望

/**
 * RocketMQ 5.x 定时消息发送示例
 * 展示新版本中精确定时消息的使用方式,使用 deliveryTimestamp 参数替代旧版的 delay level
 */
public class RocketMQ5DelayMessageExample
{

    public void sendDelayMessage(Producer producer)
    {
        // 定时/延时消息发送
        MessageBuilder messageBuilder = null; // 需初始化为具体的 MessageBuilder 实例

        // 以下示例表示:延迟时间为10min之后的UNIX时间戳
        Long deliverTimeStamp = System.currentTimeMillis() + 10L * 60 * 1000;

        // 构建消息对象
        Message message = messageBuilder.setTopic("topic")
                // 设置消息索引键,可根据关键字精确查找某条消息
                .setKeys("messageKey")
                // 设置消息Tag,用于消费端根据指定Tag过滤消息  
                .setTag("messageTag")
                // 设置精确投递时间戳(5.x版本新特性)
                .setDeliveryTimestamp(deliverTimeStamp)
                // 消息体
                .setBody("messageBody".getBytes())
                .build();

        try
        {
            // 发送消息,需要关注发送结果,并捕获失败等异常
            SendReceipt sendReceipt = producer.send(message);
            System.out.println(sendReceipt.getMessageId());
        } catch (ClientException e)
        {
            e.printStackTrace();
        }
    }
}

十三. 大型系统中实现消息幂等

1. 为什么消息中间件的消息会重复

  • 消息重复是使用任何消息中间件时都无法避免的现象,其根本原因在于消息中间件必须优先保障高可靠性。为实现这一目标,系统需确保三个核心环节的可靠性:消息发送的高可靠、消息存储的高可靠以及消息投递的高可靠。这种对可靠性的极致追求,自然导致了消息可能在某些环节被重复传递,成为高可靠性设计下的必然产物。

①发送的高可靠引起的消息重复

  • 为确保消息发送成功,生产者常采用重试机制:如某存储节点响应超时,生产者会自动选择其他节点重新发送消息(见图13-1)

  • 由于首次发送可能实际已存储成功(仅响应超时),重试会导致消息被重复存储至不同节点,从而使消费者接收到两条相同消息。该场景在 RocketMQ 中的示意参见图13-2。

②存储的高可靠引起的消息重复

  • 为实现存储高可靠,消息中间件除持久化外需依赖多副本同步(如 RabbitMQ 镜像队列、Kafka 的 ISR)。副本在主节点故障时保障投递连续性,但存在特殊场景:若主副本投递消息后宕机,备用副本接管后可能因状态未同步(如消费确认信息延迟),误判消息未消费而重新投递,导致消费者接收重复消息(图13-3)。

  • 该问题在 RocketMQ 中常见于主备切换:消费者从主副本消费消息后,若主节点不可用且从节点(Slave)未及时同步消费进度,新主副本会再次投递已处理消息(图13-4)。此机制虽保障了可用性,但需业务层通过幂等设计或时间戳去重解决重复消费问题。

③消息投递的高可靠引起的消息重复

  • 消息中间件为保障投递可靠性,需在收到消费者确认后才会删除消息;否则将持续重投直至至少成功一次。若消费者处理成功但确认失败(如重启前未确认),中间件不知情会导致消息被重新投递,造成消费者接收重复消息(见图13-5)。

  • 在 RocketMQ 中,消费者重启或扩容触发重平衡时,因消费进度定期异步同步,若同步前发生重平衡,已消费的消息可能被重新拉取并再次投递(见图13-6)。此机制虽保障了可用性,但需业务层通过幂等设计规避重复处理。

2. 消息幂等

①at least once

  • 消息中间件的可靠性核心在于保障消息投递成功且不丢失,确保能被消费者至少成功消费一次(即“at least once”特性)。例如,消息M投递至消费者A后,若A处理中途重启未标记成功,中间件将持续重投直至消费完成。

  • 此机制虽防止消息丢失,但可能引发重复投递:同一消息(如相同messageId)可能因消费者未及时确认(如重启、超时)而被多次投递,需业务层通过幂等设计处理重复问题。

②exactly once

  • 在分布式系统中,消息投递的可靠性(确保消息不丢失)相较于消息不重复具有更高优先级,且两者存在内在矛盾。因此,消息中间件(如 RocketMQ)优先保障消息必达(at least once),而将去重任务交由应用程序通过幂等设计实现。消息幂等意味着即使同一消息被多次投递,其最终效果与仅消费一次一致(例如订单创建消息消费多次仅生成一个订单)。

  • 消息中间件中的“exactly once”投递语义指消息肯定被成功消费且仅一次,其本质与幂等处理目标一致:均要求业务逻辑在重复消息场景下仅执行一次。实现此特性需依赖消费端幂等机制,而非仅靠中间件保证,因分布式场景下完全避免重复投递难以实现。

3. 基于业务逻辑实现消息幂等

①基于数据库的去重实现

/**
 * 消息消费逻辑示例:处理订单和库存更新
 * 假设业务消息的消费逻辑是:插入某张订单表的数据,然后更新库存
 * 注:为了简易地阐述逻辑,以下代码属于伪代码
 */
public class OrderMessageConsumer {

    public void consumeMessage(String orderNo, String goodId) {
        // 以下是图片中的 SQL 操作示例,转换为 Java 中的伪代码实现
        // insert into t_order values ...;
        // update t_inv set count = count-1 where good_id = 'good123';
        
        // 要实现消息的幂等,开发人员可能会采取如下的方案
        // 查询订单表检查是否已处理
        boolean orderExists = checkOrderExists(orderNo); // 对应: select * from t_order where order_no = 'order123'
        
        if (orderExists) {
            // if (order != null) { return; } // 消息重复,直接返回
            return; // 消息重复,直接返回
        }
        
        // 如果订单不存在,执行插入和更新操作
        insertOrder(orderNo);
        updateInventory(goodId);
    }

    // 辅助方法:检查订单是否存在
    private boolean checkOrderExists(String orderNo) {
        // 模拟数据库查询,返回订单是否存在
        // 实际实现中会执行 SQL: select * from t_order where order_no = ?
        return false; // 假设不存在
    }

    // 辅助方法:插入订单
    private void insertOrder(String orderNo) {
        // 模拟插入订单 SQL: insert into t_order values ...
        System.out.println("插入订单: " + orderNo);
    }

    // 辅助方法:更新库存
    private void updateInventory(String goodId) {
        // 模拟更新库存 SQL: update t_inv set count = count-1 where good_id = ?
        System.out.println("更新库存,商品ID: " + goodId);
    }
}

// 注释说明:上述方案在很多情况下能起到不错的效果,但是在并发场景下还是会有问题(例如多个线程同时检查订单是否存在,可能导致重复插入)。

②基于数据库去重的并发问题

  • 当消息消费逻辑整体耗时较长(如1秒)时,若重复消息在消费完成前快速到达(如生产者重发或Broker重启导致100毫秒内投递),由于前一条消息尚未处理完成(订单状态未更新),去重检查可能因数据状态未变而失效(如查询订单仍为空)。这将导致重复消息穿透防护,进入非幂等安全的业务逻辑,引发主键冲突、库存重复扣减等数据不一致问题。

  • 该问题源于​​消费耗时与重发时机的竞态条件​​:去重依赖的数据状态更新滞后于消息重发速度。在生产者快速重试、Broker故障恢复等场景下尤为常见,暴露了单纯依赖前置查询的去重机制在并发场景下的局限性,需结合更严格的并发控制或事务机制解决。

③加锁解决基于数据库去重的并发问题

/**
 * 处理消息消费,确保幂等性
 * 使用悲观锁方案:开启事务,通过 select for update 锁定记录
 * 要解决并发场景下的消息幂等问题,一个可取的方案是开启事务,把select改成select for update语句,并把记录锁定
 */
public void consumeMessageWithPessimisticLock(String orderNo) {
    // 开启事务(具体事务管理依赖框架如Spring的@Transactional)
    // 执行查询并锁定记录:select * from t_order where order_no = ? for update
    Order order = orderDao.selectForUpdate(orderNo); // 模拟SELECT FOR UPDATE查询

    if (order != null && order.getStatus() == OrderStatus.SUCCESS) {
        // 消息重复,直接返回
        return;
    }

    // 否则,继续处理消息...
    // 但这样消费的逻辑会因为引入了事务包裹而导致整个消息消费时间可能变长,并发度下降。
    // 这时候开发者也可以基于乐观锁解决。
}

④乐观锁解决基于数据库去重的并发问题

/**
 * 处理订单消息,使用乐观锁机制解决并发重复消费问题
 * 通过查询订单状态和带状态条件的更新操作实现幂等性
 */
public void processOrderWithOptimisticLock(String orderNo) {
    // 选择订单:select * from t_order where order_no = 'THIS_ORDER_NO'
    Order order = doSelect(orderNo); // 正常判断订单是否存在

    if (order != null && order.status == OrderStatus.SUCCESS) {
        return; // 消息重复,直接返回
    }

    // 来到这里,证明在 select 的瞬间订单的状态是待处理的
    // 对订单状态进行修改,更新条件中除了订单号外,还增加状态筛选以处理并发消息
    int count = doUpdate(orderNo); // update t_order set status = 'success' where order_no = ? AND status = 'processing'

    // 如果更新行数为0,证明状态已变更,可认为是重复消费
    if (count == 0) {
        return; // 直接幂等返回
    }

    // 真实场景需更复杂开发设计,细节不在本文讨论范围
}

// 辅助方法示例(需根据实际实现)
private Order doSelect(String orderNo) {
    // 执行数据库查询逻辑
    return null;
}

private int doUpdate(String orderNo) {
    // 执行数据库更新逻辑
    return 0;
}

⑤基于业务逻辑实现消息幂等的局限性

  • 当前基于业务表本身的去重方案(如 select for update或乐观锁)虽能解决部分场景的幂等问题,但显著增加了业务开发的复杂度。尤其在现代系统普遍依赖消息队列(MQ)进行请求处理的架构下,若每个消息消费逻辑均需单独实现去重机制,将导致开发工作繁琐且重复。

  • 这一现状引发了对通用去重方案的思考:是否存在一种可覆盖多场景、无需业务层频繁介入的去重方案,既能保障消息处理的幂等性,又能降低开发与维护成本?此类方案需突破传统基于业务表设计的局限,提供系统级的统一解决路径。

4. 基于数据库事务的消息表实现消息幂等

①基于数据库事务的消息表的实现

  • 通过数据库事务能力可构建通用消息幂等方案:在消费消息时,开启事务并先插入消息记录表(需处理主键冲突),再执行业务逻辑(如更新订单状态),最后提交事务。此机制利用事务原子性保障一致性——若事务提交成功,即使消息中间件未及时更新消费位点,重复投递也会因消息表记录而被视为已消费(插入后,再插入会因为主键重复而失败);若事务失败(如服务重启),消息中间件重新投递确保消息不丢失。

  • 该方案依赖消息表而非具体业务逻辑,具备高度可扩展性。核心设计要点:消息表需以业务主键(如订单号)而非消息ID作为唯一主键,以应对生产者重发时消息ID变化导致的去重失效问题。

②基于数据库事务的消息表的局限性

  • 该方案虽在特定场景下有效,但存在显著局限性:

  • 数据源依赖限制​​:消费逻辑必须完全基于关系型数据库(如MySQL),无法兼容Redis、ES等不支持事务的非关系型数据源操作,破坏了原子性保障;

  • 外部调用约束​​:流程中禁止依赖RPC调用或消息发送等外部操作,因无法与本地事务保持原子性;

  • 库级别隔离​​:仅支持单库操作,跨库场景(如同时操作订单库和商品库)无法实现事务统一;

  • 性能瓶颈风险​​:依赖数据库锁机制可能导致锁表时间延长,制约消费并发能力与系统吞吐量。

5. 基于无锁的消息表的通用幂等方案

①复杂业务消费场景

  • 基于数据库事务加消息表的幂等方案存在显著局限性,不具备广泛应用价值。以订单申请消息为例,消费流程通常涉及多步骤:检查库存(RPC)、锁库存(RPC)、开启事务插入订单表(MySQL)、调用下游服务(RPC)、更新订单状态及提交事务。这些步骤中,RPC调用等子过程不支持回滚,导致即使使用本地事务也无法保证整体原子性。

  • 在异常场景中(如订单服务重启),RPC操作(如锁库存)可能已执行但事务回滚,消息重投时会重复调用RPC接口,因此这些接口本身必须设计为幂等。基于数据库事务的方案无法应对此类问题,复杂场景下仍需依赖业务逻辑实现幂等,如使用 select for update 或乐观锁机制。


  • 为兼顾去重、通用性与高性能,可参考SEGA事务思想将操作拆解为多个子消息:例如,消息A发送至库存系统检查并锁库存,随后发送消息B至订单服务;订单系统消费消息B插入订单表后发消息C给下游系统;下游系统处理消息C后发消息D回订单系统更新状态。此方案利用RocketMQ事务消息特性保证本地事务与消息的最终一致性,使每一步操作原子化。

  • 然而,基于数据库事务的消息表方案存在显著局限:它强依赖关系型数据库的事务机制,必须将事务贯穿整个消费环节,导致系统复杂度增加——原本连续的代码逻辑被割裂成多次消息交互,其实现复杂度可能超过业务层直接实现幂等(如使用乐观锁)。因此,该方案在实际落地场景中应用较少。

②基于消息幂等表的非事务方案

  • 设想通过非事务性方案实现消息去重,可突破数据库事务依赖,扩展至 RPC、跨库等复杂场景。该方案保留消息表结构,但引入​​消费状态​​概念,通过在消费过程中动态维护状态(如“处理中/已完成”),实现无锁的通用幂等保障。其核心在于通过状态机机制替代传统事务锁,提升系统灵活性与扩展性。整体流程如图13-7所示,为分布式环境下消息幂等处理提供新思路。

  • 1) 在消费开始的时候, 会先针对这条消息插入到消息表, 并且记录这条消息是消费中。

  • 2) 如果插入成功, 那么证明这是一条新消息, 则执行后面的业务代码逻辑。

  • 3) 如果遇到主键冲突导致插入不成功, 说明这是一条重复消息。 但是重复消息也分这条 消息是已经消费成功过的, 还是没消费完成的, 所以需要判断其消费状态。 如果是前者, 因 为已经消费成功过了, 所以直接告诉 RocketMQ 消费成功即可; 如果是后者则利用 RocketMQ 的延迟消费的逻辑, 让消息重复消费。 因为延迟之后消息很可能已经消费结束 (成功或者失 败) 了, 消息表的消费状态会发生变更, 所以延迟消费的逻辑也会有所不同。 如果已经消费 成功了, 那就直接返回成功。 如果消费是失败的, 那就正常消费一次, 如果之前的消息还是 消费中, 那就继续延迟消费。

  • 4) 执行完业务代码之后, 按照业务代码的执行结果去更新消息表的状态, 如果执行成 功, 那么就更新为成功状态, 这样以后有重复消息进来就会直接发现这是一个已经消费过的 消息, 从而直接幂等返回; 如果执行业务逻辑失败, 那么这次的消息消费不能记录为成功, 否则后面有消息重发也会幂等返回, 故而需要把消息表的消费记录直接删除。

  • 该方案通过状态机管理替代数据库事务,实现消息幂等性处理。其核心设计包含两个关键状态:​​消费中​​与​​消费完成​​。系统仅对标记为“消费完成”的消息执行幂等处理;对于“消费中”的消息,则通过延迟消费机制(如 RocketMQ 的 Retry Topic)暂缓投递。此设计专为解决高并发场景下的重复消息投递问题:当首条消息尚未处理完成时,若重复消息到达,延迟机制可避免并发冲突,同时确保消息绝不丢失。通过状态分离与异步重试的协同,实现了无事务依赖的高效幂等控制。

  • -

  • 消息去重需解决三个核心问题:实现业务无关的通用去重、保障并发场景下的幂等性、处理生产者重发消息。通过消息表设计(以业务主键如订单号作为唯一键)可实现通用性;利用数据库主键冲突机制(如MySQL)使并发重复消息延迟消费,确保幂等。

  • 然而,并发场景下若首条消息因异常(如服务重启)未能完成消费且状态持续“消费中”,会导致后续消息不断延迟并最终进入死信队列,违反消息至少消费一次的语义。为此,引入消息消费过期时间(如10分钟),超时自动删除消息状态,避免消费僵局,保证消息最终被正常处理。

③消息表的设计

-- 消息表数据库选择说明:
-- 只要存储组件能存储特定字段(如业务键、状态等),即可作为消息表数据库。
-- 以下以MySQL为例的建表语句,包含基本字段注释。

CREATE TABLE message_table (
    id BIGINT AUTO_INCREMENT PRIMARY COMMENT '自增主键',
    business_key VARCHAR(64) NOT NULL COMMENT '业务唯一键(如订单号),用于去重',
    status TINYINT NOT NULL DEFAULT 0 COMMENT '消息状态:0-处理中,1-成功,2-失败',
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
    UNIQUE INDEX uk_business_key (business_key) COMMENT '唯一索引,防止重复消息'
) ENGINE=InnoDB COMMENT='消息表,用于实现消息幂等性';

-- 除了MySQL外,架构师还可以选择Redis作为消息表数据库。
-- 开发人员只需要用Redis的哈希结构存储即可,甚至不需要创建表。
-- Redis示例命令(使用HSET存储消息状态):
--   HSET message:{business_key} status 1
--   HSET message:{business_key} timestamp ${current_time}

-- 选择Redis还有两个额外的好处:
-- 1) 性能上损耗更低。这样操作消息表所带来的时长会降低。
-- 2) 消息表的最大消费时间可以利用Redis本身的ttl机制去实现(例如设置TTL为10分钟)。
-- 当然Redis存储在数据的可靠性、一致性等方面是不如MySQL的,需要用户自己取舍。

④无锁消息表的代码实现

  • 该方案的 Java 实现已开源(GitHub 地址:https://github.com/Jaskey/RocketMQDedupListener),其核心基于​​通用消息表​​设计,通过独立于业务逻辑的表结构实现去重功能。该设计使业务接入极为简便,仅需两步即可完成代码集成,后续将提供具体使用示例演示集成流程。

/**
 * SampleListener 类示例
 * 继承自 DedupConcurrentListener,实现消息去重和处理逻辑。
 * 该类演示了如何通过重写关键方法来实现基于主题的消息去重和自定义消费行为。
 */
public class SampleListener extends DedupConcurrentListener {
    private static final Logger log = LoggerFactory.getLogger(SampleListener.class);

    /**
     * 构造函数,初始化去重配置
     * @param dedupConfig 去重配置对象
     */
    public SampleListener(DedupConfig dedupConfig) {
        super(dedupConfig);
    }

    /**
     * 重写消息去重键生成方法
     * 根据消息主题决定去重策略:对于"TEST-TOPIC"主题,直接使用消息体作为去重键;
     * 其他主题使用默认策略(消息ID)。
     * @param messageExt RocketMQ 消息扩展对象
     * @return 去重键字符串
     */
    @Override
    protected String dedupMessageKey(MessageExt messageExt) {
        // 针对特定主题使用消息体作为去重键(简化示例,生产环境需谨慎)
        if ("TEST-TOPIC".equals(messageExt.getTopic())) {
            return new String(messageExt.getBody());
        } else {
            // 其他主题使用父类默认策略(如消息ID)
            return super.dedupMessageKey(messageExt);
        }
    }

    /**
     * 重写消息处理方法
     * 执行实际消费逻辑:对"TEST-TOPIC"主题消息模拟长时间处理(睡眠3秒),
     * 其他主题消息可扩展处理。返回true表示消费成功。
     * @param messageExt RocketMQ 消息扩展对象
     * @return 处理结果(true-成功)
     */
    @Override
    protected boolean doHandleMsg(MessageExt messageExt) {
        switch (messageExt.getTopic()) {
            case "TEST-TOPIC":
                // 模拟长时间消费操作(生产环境应避免睡眠,此处仅为演示)
                log.info("假装消费很久....{} {}", new String(messageExt.getBody()), messageExt);
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                break;
            // 可添加其他主题的处理逻辑
            default:
                // 默认处理或抛出异常
                break;
        }
        return true;
    }
}
/**
 * RocketMQ 消费者配置示例
 * 演示如何集成消息去重功能
 */
public class RocketMQConsumerWithDedup {

    public void configureAndStartConsumer() {
        // 第一步:创建 RocketMQ 消费者实例
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("TEST-APP1");
        consumer.subscribe("TEST-TOPIC", "*");

        // START: 区别于普通 RocketMQ 使用的代码 - 去重配置部分
        String appName = consumer.getConsumerGroup(); // 针对什么应用做去重,相同的消息在不同应用的去重是隔离处理的
        StringRedisTemplate stringRedisTemplate = null; // 这里省略获取 StringRedisTemplate 的过程,具体的消息幂等表会保存到 Redis中

        // 启用去重配置
        DedupConfig dedupConfig = DedupConfig.enableDedupConsumeConfig(appName, stringRedisTemplate);
        
        // 第二步:以第一步的回调器实例去启动 RocketMQ 消费者
        // 创建去重监听器实例(SampleListener 为自定义类,需实现消息处理逻辑)
        DedupConcurrentListener messageListener = new SampleListener(dedupConfig);
        // END: 区别于普通 RocketMQ 使用的代码

        // 注册消息监听器并启动消费者
        consumer.registerMessageListener(messageListener);
        consumer.start();
    }
}

// 注释说明:
// 以上代码大部分是原始 RocketMQ 必需的代码,唯一需要修改的仅仅是创建一个 DedupConcurrentListener 实例。
// 在这个实例中指明消费逻辑和去重的业务键(默认是 messageId)即可。
//
// 注:上述示例的去重能力依赖于 Redis,实现上依赖于 Spring 的 StringRedisTemplate,
// 需要业务人员自己想办法获取这个实例,才能往对应的 Redis 实例中插入消息表的记录。
// 但通常情况下用 Spring 的时候都可以轻松获取到。

6. 通用消息幂等的局限性

①消费过程的局限性

  • 无锁消息表幂等方案虽在表面上实现了快速接入和业务解耦,看似完美,但并未彻底解决所有幂等挑战。其核心问题在于:为确保消息至少被成功消费一次,消息重试在消费失败时不可避免且必要。以订单服务为例,典型流程包括检查库存(RPC)、锁库存(RPC)、开启事务插入订单表(MySQL)及调用下游服务(RPC)。若消费至第三步(MySQL操作)时因异常失败,触发重试后会删除幂等表记录,导致消息重新消费,从而使步骤1和步骤2重复执行。若这些RPC步骤本身缺乏幂等性,整体消息消费仍无法保证完整性。因此,该方案仅能保障消息投递环节的幂等(投递一次后不再重投),但消费子过程(如RPC调用)的幂等性仍需业务层自行实现,凸显了全链路幂等处理的复杂性。

②通用幂等解决方案的价值

  • 尽管该方案无法彻底解决所有消息幂等问题(软件工程领域本不存在“银弹”),但其价值显著:它以低成本、高便捷性的方式,有效应对三类典型异常场景——

  • 中间件层重复​​:解决因Broker、负载均衡等机制导致的消息重复投递;

  • 业务层重复​​:处理上游生产者产生的业务级别消息重复;

  • 并发控制​​:限制重复消息的并发消费窗口,避免多线程场景下的逻辑错误。

  • 在无异常中断的正常消费流程中,此方案可覆盖绝大多数幂等需求(包括业务主动重发与RocketMQ特性引发的重复),为分布式系统提供了一层轻量且可靠的防护屏障。

③关于通用幂等方案的实践建议

  • 该方案能有效解决大部分消息重复问题,尤其在常见场景下表现可靠,极端异常情况较为罕见。为应对异常场景下的幂等挑战,建议采取以下措施:首先,确保消息消费失败时具备回滚机制,使重试过程无副作用;其次,实现消费者优雅退出,避免程序中途退出引发不必要的消息重试;最后,对于无法实现幂等的操作(如锁库存),应至少做到终止消费并触发告警,防止重复执行带来业务风险。

  • 当消费因主键冲突等异常失败时,系统需触发告警并依赖人工介入。RocketMQ 的消息重试机制最多尝试16次,这为人工修复数据提供了时间窗口——只要在重试周期内解决问题,后续重试即可成功。同时,必须加强消息重试消费的监控,一旦发现持续失败,应快速定位根因并力争在下次重试前修复,避免消息因最大重试次数耗尽而进入死信队列或丢失。

十四. 复杂场景下的消息丢失问题

  • 随着后台系统复杂度提升,逐渐演进出多个由不同团队维护的子系统。这些子系统通过 RPC 接口、内部网关或消息中间件进行交互,但各自保持明确的系统边界(如 A 系统的数据库和 RocketMQ 集群不直接开放给 B 系统管理)。此架构下,单个微服务可能需对接多个 RocketMQ 集群,在此过程中易出现消息丢失的“坑”。

  • 当系统用户量增长迫使采用灰度发布时,RocketMQ 在灰度过程中也可能出现消息丢失现象,这是不可接受的。本章将深入剖析多集群对接和灰度发布两种场景下消息丢失的根本原因,并提供相应的解决方案。

1. 跨集群场景下消息的丢失问题

①多集群使用的场景

  • 作为消费者从不同的集群消费消息

  • 订单服务需同时处理来自两个独立 RocketMQ 集群的消息:订单消息集群​​:由订单团队维护,负责核心订单业务消息;​​支付回调集群​​:由支付团队维护,处理支付相关回调消息。

  • 因两集群完全独立(含各自 Name Server),订单服务需启动​​两个消费者组​​,分别配置对应集群的 Name Server 地址,实现与不同集群的连接与消息消费。此设计确保业务隔离性,避免单点依赖,同时保障各团队对自身集群的自主运维权。


  • 作为生产者往不同的集群发送消息

  • 订单服务在生成或变更订单时,需向两个独立的 RocketMQ 集群发送消息:其一为订单团队维护的集群,用于处理核心订单变更业务;其二为数据团队维护的集群,用于发送用户行为埋点数据,支持后续数据分析与监控。

  • 由于两集群物理隔离且分别配置专属 Name Server 节点,订单服务必须创建两组生产者实例——分别连接不同 Name Server 地址,确保订单消息与埋点消息准确路由至对应集群的主题队列。此设计通过物理隔离实现业务流量与数据流量的分离,保障核心交易与数据分析的独立性与可靠性。


  • 同时作为消费者和生产者从不同的集群收发消息

  • 订单服务需从订单团队维护的RocketMQ集群订阅下单消息,消费完成后,再向仓库团队独立的RocketMQ集群发送发货消息。依据部署架构,下单消息主题属于订单系统集群,发货消息主题属于库存系统集群,因此订单服务需同时启动消费者和生产者的实例:消费者连接订单集群的Name Server地址以接收消息,生产者连接仓库集群的Name Server地址以转发消息,实现跨集群的消息流转与处理。

②多集群场景下消息窜乱揭秘

/**
 * RocketMQ 多集群消息处理示例
 * 演示从A集群消费消息并转发到B集群的实现
 * 以上三种情况都是非常合理且常见的使用场景,但需注意潜在的"坑"
 */
public class MultiClusterMessageExample {

    public static void main(String[] args) throws Exception {
        // 1) 初始化一个生产者,配置地址为B集群的Name Server
        // 新建一个生产者实例
        DefaultMQProducer producerToClusterB = new DefaultMQProducer("MyProducer-B");
        // 假设集群B的Name Server地址是30.179.195.35:9876
        producerToClusterB.setNamesrvAddr("30.179.195.35:9876");
        producerToClusterB.start();

        // 2) 初始化一个消费者,配置地址为A集群的Name Server
        // 消费回调中利用前面建立好的生产者实例发送消息到Topic-B
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("MyOrderConsumer");
        // A集群的IP地址假设是30.179.195.34
        consumer.setNamesrvAddr("30.179.195.34:9876");
        // 设置从队列的消费位置
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        // 订阅A集群的Topic-A主题
        consumer.subscribe("Topic-A", "*");

        // 设置消费回调
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                           ConsumeConcurrentlyContext context) {
                // 这里省略代码逻辑,通常情况下会涉及数据库的写入,这里只是简单的打印日志
                System.out.printf("Receive New Messages: %s %n", msgs);
                
                // 调用生产者的发送代码逻辑,这里简单发送一个固定的消息到Topic-B
                Message msgToClusterB = new Message("Topic-B", 
                    "Hello RocketMQ".getBytes(StandardCharsets.UTF_8));
                try {
                    SendResult sendResult = producerToClusterB.send(msgToClusterB);
                    System.out.println("Message sent to Topic-B: " + sendResult);
                } catch (Exception e) {
                    e.printStackTrace();
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();
        System.out.println("Consumer and Producer started successfully.");
    }
}
  • 从A集群消费发往B集群消费的常见错误

  • 在多集群环境中,RocketMQ 可能报出“No route info of this topic”或“No topic route info in name server”错误,即使主题已在各自集群正确创建且权限正常。此问题的根本原因并非主题未建立,而是​​客户端连接配置错误​​:生产者和消费者可能意外连接到同一集群(如都连至A集群),导致发送消息时无法找到对方集群的主题(如向A集群请求Topic-B)。

  • 遇到此类错误时,建议通过控制台检查生产者与消费者的实例列表:若发现生产者IP出现在消费者列表或反之,则证实连接集群错误。修正方案是确保每个客户端(生产者/消费者)仅连接其目标集群的Name Server地址,避免交叉连接,即可恢复跨集群操作正常


  • 从A集群消费发往B集群消费的错误初探

  • 在RocketMQ 4.8.1之前的版本中,存在一个集群窜乱连接的Bug:当微服务实例同时作为生产者和消费者(如向Topic-A、Topic-B发送消息并从Topic-C、Topic-D消费消息)时,默认会与Broker建立单一长连接以复用资源、避免连接数过载。此设计在单集群场景下高效,但在多集群环境中,由于生产者和消费者共用同一连接,可能导致实例意外连接到错误集群(如生产者误连至消费者集群),从而引发“No route info of this topic”错误。该Bug根源在于资源管理的默认策略未严格隔离集群边界,需升级版本或调整配置以修复。

  • RocketMQ 客户端通过 MQClientInstance 对象统一管理网络资源,包括 Broker 与 Name Server 的连接维护、地址获取及长连接复用。该对象在初始化时即确定 Name Server 地址,后续所有生产者、消费者实例默认共享同一 MQClientInstance 以实现连接复用,优化资源效率。然而,此设计在多集群场景下存在局限:当服务需跨不同集群收发消息(如生产者连集群A、消费者连集群B)时,由于 MQClientInstance 固定绑定单一 Name Server 地址,可能导致所有连接意外指向同一集群,引发路由错误(如“No route info of this topic”)。此问题在 RocketMQ 旧版本中尤为突出,需通过配置隔离或升级版本来规避连接窜乱风险。

  • 为实现生产者和消费者分别连接独立 RocketMQ 集群,需确保两者使用不同的 MQClientInstance实例。该实例由 clientId(由客户端 IP 和 InstanceName共同决定)唯一标识,且同一 clientId的客户端共享同一 MQClientInstance,而该实例仅能关联一个 Name Server 地址。

  • 在 RocketMQ 4.8.1 之前,InstanceName默认取进程号,导致同服务下生产者和消费者因 clientId相同而共享实例,无法分连多集群。解决方案是通过 setInstanceName显式设置不同前缀(如 producer-clusterAconsumer-clusterB),强制隔离 clientId,从而创建独立的 MQClientInstance实例,实现各客户端精准连接目标集群。

  • 自4.8.1版本起,RocketMQ将InstanceName的默认生成规则由进程号升级为​​进程号+启动时间的纳秒值​​(1纳秒为十亿分之一秒)。此调整确保即使同一实例多次启动,其InstanceName也因纳秒级时间戳差异而唯一,从而避免复用MQClientInstance实例,从根源上解决了多集群场景下的连接冲突问题。

  • 新版本中,若未显式设置 InstanceName,不同客户端实例的连接将无法复用(即使目标集群相同)。此变更虽解决了多集群场景下的连接串扰问题,但可能导致临时生产者实例(如频繁创建销毁的实例)产生大量独立连接,造成资源浪费。设计权衡在于:以连接隔离保障集群边界清晰性,代价是潜在连接数增长,需开发者根据实际场景评估连接管理策略。


  • 客户端资源管理的源码讲解

/**
 * RocketMQ 客户端核心管理类示例
 * 演示 MQClientInstance 的复用机制、生产者启动流程及 InstanceName 处理
 * 整合自七张图片的源码内容
 */

// 模拟 MQProducerInner 接口
interface MQProducerInner {}

// 模拟 TopicPublishInfo 类
class TopicPublishInfo {}

// 模拟 FAQUrl 类
class FAQUrl {
    public static String suggestTodo(String url) { return url; }
    public static final String GROUP_NAME_DUPLICATE_URL = "";
    public static final String CLIENT_SERVICE_NOT_OK = "";
}

// 模拟 UtilAll 类
class UtilAll {
    public static int getPid() { return 0; }
}

/**
 * DefaultMQProducerImpl 类实现
 * 生产者实现类,包含 MQClientInstance 字段和启动逻辑
 * 图片1说明:在客户端的实现中,无论是消费者还是生产者,在底层都会有一个 MQClientInstance 的字段
 */
public class DefaultMQProducerImpl implements MQProducerInner {
    private MQClientInstance mQClientFactory; // 命名为 mQClientFactory 的 MQClientInstance 类字段
    private ServiceState serviceState = ServiceState.CREATE_JUST;
    private DefaultMQProducer defaultMQProducer;

    /**
     * 生产者启动方法
     * 图片2和图片3内容:展示 start 方法的完整逻辑,包括状态检查、配置验证、InstanceName 处理和 MQClientInstance 获取
     * MQClientInstance 本身不会在生产者对象创建的时候初始化,而是在 start() 调用的时候初始化,但是其具体的实例不是简单创建,而是尽可能复用
     */
    public void start(final boolean startFactory) throws MQClientException {
        switch (this.serviceState) {
            case CREATE_JUST: // 如果状态是未启动的
                this.serviceState = ServiceState.START_FAILED;
                this.checkConfig();
                
                // 默认情况下,InstanceName 是 DEFAULT,会在启动的时候替换成进程号相关的字符串
                if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENTInner_PRODUCER_GROUP)) {
                    this.defaultMQProducer.changeInstanceNameToPID(); // 调用 ClientConfig 的方法修改 InstanceName
                }
                
                // 赋值 MQClientInstance 对象,这里会尽可能复用已有的对象,复用的标准就是看是否有相同的 clientId
                // 图片4说明:生产者启动的时候会依赖 MQClientManager 一个单例对象的 getOrCreateMQClientInstance 方法
                this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, null);
                
                boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
                if (!registerOK) {
                    this.serviceState = ServiceState.CREATE_JUST;
                    throw new MQClientException("The producer group[ " + this.defaultMQProducer.getProducerGroup()
                                   + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL), null);
                }
                
                this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());
                
                // 启动,底层主要是一些心跳、网络连接、Name Server 地址更新等相关的逻辑
                // 注:在正常的使用场景下,这里的 startFactory 参数都是 true
                if (startFactory) {
                    mQClientFactory.start();
                }
                
                log.info("the producer [{}] start OK. sendMessageWithVIPChannel = {}", this.defaultMQProducer.getProducerGroup(),
                    this.defaultMQProducer.isSendMessageWithVIPChannel());
                this.serviceState = ServiceState.RUNNING;
                break;
                
            // 其他已启动状态都抛出异常
            case RUNNING:
            case START_FAILED:
            case SHUTDOWN_ALREADY:
                throw new MQClientException("The producer service state not OK, maybe started once, "
                    + this.serviceState + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), null);
            default:
                break;
        }
    }
    
    private void checkConfig() {
        // 配置检查逻辑
    }
    
    private ConcurrentHashMap<String, TopicPublishInfo> topicPublishInfoTable = new ConcurrentHashMap<>();
}

/**
 * MQClientManager 类
 * 单例模式管理 MQClientInstance 实例,实现实例复用
 * 图片4说明:getOrCreateMQClientInstance 就是低版本出现多集群地址窜乱的关键
 */
class MQClientManager {
    private static MQClientManager instance = new MQClientManager();
    private ConcurrentHashMap<String, MQClientInstance> factoryTable = new ConcurrentHashMap<>();
    private AtomicInteger factoryIndexGenerator = new AtomicInteger(0);
    
    public static MQClientManager getInstance() {
        return instance;
    }
    
    /**
     * 获取或创建 MQClientInstance 实例
     * 图片5内容:展示方法的完整实现,包括 clientId 计算和实例复用逻辑
     * 这里对象的获取是以 clientId 维度做一定的复用的,所以之前 clientId 创建过 MQClientInstance 实例的话,就不会重新创建一个新的实例
     */
    public MQClientInstance getOrCreateMQClientInstance(ClientConfig clientConfig, RPCHook rpcHook) {
        // 从 ClientConfig 对象中获取其 clientId,后续会作为 map 中的 key 值。clientId 主要由 ip 和 instanceName 构成
        String clientId = clientConfig.buildMQClientId();
        
        // 尝试在已有的 map 中看有没有已经存在的 MQClientInstance 对象
        MQClientInstance instance = this.factoryTable.get(clientId);
        
        // 若没有,就创建一个,并且记录在 factoryTable 这个 map 中
        if (null == instance) {
            instance = new MQClientInstance(clientConfig.cloneClientConfig(),
                this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
            
            // 用 putIfAbsent 做并发控制
            MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);
            if (prev != null) {
                instance = prev;
                log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);
            } else {
                log.info("Created new MQClientInstance for clientId:[{}]", clientId);
            }
        }
        return instance;
    }
}

/**
 * ClientConfig 类
 * 包含 InstanceName 处理逻辑,影响 clientId 生成
 * 图片6和图片7内容:展示 changeInstanceNameToPID 方法,说明 InstanceName 的默认逻辑
 */
class ClientConfig {
    private String instanceName = "DEFAULT";
    
    /**
     * 修改 InstanceName 为进程ID
     * 图片7说明:在老版本里,不动 InstanceName 的情况下,生产者实例和消费者实例都是一个 InstanceName(当前的进程号)
     * 注:以上源码是4.8.2版本之后的实现,在之前的版本,其实现仅仅只是用进程号替换
     */
    public void changeInstanceNameToPID() {
        // 在 4.8.2 之前的版本,只是用进程号做替换
        if (this.instanceName.equals("DEFAULT")) {
            // 4.8.2+版本使用进程号 + 纳秒时间戳,但图片中代码仅展示老版本逻辑
            this.instanceName = String.valueOf(UtilAll.getPid()); // 老版本仅用进程号
        }
    }
    
    public String buildMQClientId() {
        // 构建 clientId 的逻辑,通常基于 IP 和 instanceName
        return "127.0.0.1@" + instanceName;
    }
    
    public ClientConfig cloneClientConfig() {
        ClientConfig clone = new ClientConfig();
        clone.instanceName = this.instanceName;
        return clone;
    }
}

// 模拟 MQClientInstance 类
class MQClientInstance {
    public MQClientInstance(ClientConfig clientConfig, int index, String clientId, RPCHook rpcHook) {}
    public void start() {}
    public boolean registerProducer(String group, DefaultMQProducerImpl producer) { return true; }
}

// 模拟 DefaultMQProducer 类
class DefaultMQProducer {
    public String getProducerGroup() { return ""; }
    public void changeInstanceNameToPID() {}
    public boolean isSendMessageWithVIPChannel() { return false; }
    public String getCreateTopicKey() { return ""; }
}

// 模拟日志记录器
class log {
    public static void info(String format, Object... args) {}
    public static void warn(String format, Object... args) {}
}

③多集群使用下的最佳实践

  • 使用新的客户端版本

  • 要彻底解决RocketMQ连接窜乱问题,建议使用4.8.1及以上版本的客户端。从此版本起,Instance-Name默认值不同(如包含进程ID和启动时间戳),确保每个客户端实例拥有独立的底层网络资源管理,从而有效避免多集群场景下的连接交叉与路由错误。


  • 设置合理的InstanceName

  • 合理设置 InstanceName(优于默认值)有助于连接管理优化与控制台观测清晰度。建议生产者与消费者采用不同后缀(如 -producer / -consumer),同集群使用统一前缀(如 ClusterA-)。

  • 需注意:若 InstanceName 包含动态标识(如时间戳、进程号),在广播模式下会导致消费进度丢失(因进度存储路径与 clientId 关联,而 clientId 依赖 InstanceName)。因此,广播模式的消费者 InstanceName 必须固定,且建议与其他实例区别设置,以确保进度持久化与系统稳定性。


  • 不同的集群共用一个NameServer地址

  • RocketMQ Broker 的 brokerClusterName参数用于标识 Broker 集群,默认值为 DefaultCluster。在多集群场景下,开发者可通过此参数将不同业务集群(如订单集群、数据上报集群)整合为统一逻辑集群:例如订单集群配置 brokerClusterName=OrderCluster,数据上报集群配置 brokerClusterName=DataCluster,两者共享同一 Name Server 地址(如 192.168.0.10:9876)。此设计使客户端仅需连接单一 Name Server 即可获取所有集群路由信息,避免低版本客户端因多集群连接导致的窜乱问题,提升部署灵活性与兼容性。

2. 队列分配错误导致的消息丢失

①队列分配的奇怪现象

  • 在 RocketMQ 中,当消费者组(如 string_consumer)同时监听多个主题(如 Topic-A 和 Topic-B,各配置两条队列)且组内有两个实例时,默认负载均衡策略旨在将队列均匀分配。然而,异常场景下可能出现分配不均:例如 Topic-B 的一条队列未被任何实例订阅,导致消息堆积且消费者感知为“丢失”。重启消费者仅带来短暂缓解,消息很快再次堆积,根本原因需从客户端负载均衡机制入手解析。

②消费者负载均衡的过程

  • 在 RocketMQ 中,消费者客户端分配队列的过程称为 ​​Rebalance​​(重平衡或重排)。笔者更倾向于将其译为 ​​“负载均衡”​​,因其更贴合分布式系统中任务分配的实际场景。后续讨论中,“负载均衡”将统一指代此流程。该机制的核心是通过动态调整队列与消费者的映射关系,确保消息处理的高效与公平性。下文将通过具体问题解析其运作原理与实现细节。


  • 负载均衡解决什么问题

  • RocketMQ 的消费者负载均衡机制并非直接分配消息,而是通过管理队列的分配来间接控制消息负载。这是因为消息通常在各队列中均匀分布,因此分配更多队列给消费者实例意味着其拥有更多拉取消息的渠道,从而在大多数场景下等同于获得更高消息处理量。然而,这种关系并非绝对:如果某些队列不可写或生产者通过策略限制向特定队列发送消息(如集中发送到少数队列),那么消费者即使被分配更多队列,也可能无法实际处理更多消息,导致负载均衡效果与预期不符。总之,Rebalance 过程通过队列分配旨在优化消费者负载,但其有效性依赖于消息在队列中的均匀分布前提。


  • 负载均衡是如何决定队列分配数量的

  • RocketMQ 采用​​无中心化负载均衡​​设计,与 Kafka 的​​中心化分配模式​​存在本质区别。Kafka 通过选举 Leader 消费者统一计算全局分区分配结果并广播至所有消费者,类似卫生委员指派任务;而 RocketMQ 无统一协调节点,各消费者依赖相同的​​默契算法​​(如默认的平均分配策略)独立计算自身应处理的队列,最终实现全局均衡,如同学生默契分工打扫教室,无需显式沟通却覆盖所有角落且工作量近似。

  • RocketMQ 通过抽象负载均衡策略接口(如 AllocateMessageQueueStrategy)支持多类分配规则(如平均、轮询、一致性哈希等),用户可自定义策略。其无中心设计降低了协调复杂度,但依赖算法一致性保障秩序,需避免因计算规则分歧导致的分配冲突。

/**
 * 无论是哪个负载均衡的策略,归根结底就是一个接口,相关代码如下。
 * Strategy Algorithm for message allocating between consumers
 */
public interface AllocateMessageQueueStrategy {

    /**
     * 关键在于allocate方法。这个方法的出参就是当前客户端实例的负载均衡结果——本实例应该被分配到的队列列表。
     * Allocating by consumer id
     *
     * @param consumerGroup current consumer group(消费者组名)
     * @param currentCID current consumer id(当前消费者实例的clientId,实际上就是client的ip@instanceName再加上时间戳)
     * @param mqAll message queue set in current topic(当前准备负载均衡的主题下的所有队列)
     * @param cidAll consumer set in current consumer group(当前这个消费者组的在线clientId列表)
     * @return The allocate result of given strategy(负载均衡后的队列分配列表)
     * 实际上靠这4个参数,就能完成这个“默契算法”。
     */
    List<MessageQueue> allocate(final String consumerGroup, final String currentCID,
                                final List<MessageQueue> mqAll, final List<String> cidAll);

    /**
     * getName方法只是一个唯一标识,用以标识该消费者实例是用什么负载均衡策略去分配队列的,这个会显示在日志或者控制台里。
     * Algorithm name
     *
     * @return The strategy name(策略名称)
     */
    String getName();
}
  • RocketMQ 采用无中心化负载均衡策略,消费者实例通过相同的默契算法独立计算队列分配。例如,当4个消费者(C1-C4)在线且主题有8个队列(q0-q7)时,每个消费者基于算法得知应获取2个队列(8÷4=2),并按顺序分配:C1获q0、q1,C2获q2、q3,以此类推。该机制依赖三个输入:消费者列表、队列列表及当前消费者ID(clientId),确保各实例推导出的队列无冲突,无需中央协调即实现全局均衡。

  • 此策略的核心在于算法一致性:所有消费者实例使用相同逻辑(如默认的平均分配),通过感知其他消费者存在和自身ID,计算出专属队列范围。clientId参数在allocate方法中标识当前实例,使算法能定位其分配起点,避免争抢。这种设计提升了系统的扩展性与可靠性,但要求算法必须严格一致以防分配分歧。

/**
 * RocketMQ 平均分配策略实现类
 * 实现 AllocateMessageQueueStrategy 接口,采用无中心化负载均衡算法
 * 通过默契计算确保各消费者实例分配到的队列无冲突
 */
public class AllocateMessageQueueAveragely implements AllocateMessageQueueStrategy {
    private static final Logger log = LoggerFactory.getLogger(AllocateMessageQueueAveragely.class);

    /**
     * 消息队列分配方法
     * 根据当前消费者ID、消费者列表和队列列表,计算该实例应分配的队列
     * @param consumerGroup 消费者组名称
     * @param currentCID 当前消费者ID(格式:ip@instanceName)
     * @param mqAll 主题下的所有消息队列列表
     * @param cidAll 消费者组中所有在线消费者ID列表
     * @return 分配给当前消费者的队列列表
     */
    @Override
    public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll, List<String> cidAll) {
        // START: 前置参数校验,确保输入有效
        if (currentCID == null || currentCID.length() < 1) {
            throw new IllegalArgumentException("currentCID is empty");
        }
        if (mqAll == null || mqAll.isEmpty()) {
            throw new IllegalArgumentException("mqAll is null or mqAll empty");
        }
        if (cidAll == null || cidAll.isEmpty()) {
            throw new IllegalArgumentException("cidAll is null or cidAll empty");
        }

        List<MessageQueue> result = new ArrayList<MessageQueue>();
        // 检查当前消费者是否在有效列表中,若不在则返回空列表(可能为BUG场景)
        if (!cidAll.contains(currentCID)) {
            log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}", consumerGroup, currentCID, cidAll);
            return result;
        }
        // END: 前置判断

        // 核心分配逻辑开始:基于平均分配策略计算队列分配
        int index = cidAll.indexOf(currentCID); // 当前消费者在列表中的索引位置
        int mod = mqAll.size() % cidAll.size(); // 队列数对消费者数取余,计算余数
        // 计算每个消费者平均应分配的队列数量,考虑余数影响
        int averageSize = mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size() + 1 : mqAll.size() / cidAll.size());
        // 计算当前消费者分配的起始索引,考虑余数调整
        int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;
        // 计算实际分配数量,防止越界
        int range = Math.min(averageSize, mqAll.size() - startIndex);

        // 遍历分配队列,将对应队列添加到结果列表
        for (int i = 0; i < range; i++) {
            result.add(mqAll.get((startIndex + i) % mqAll.size()));
        }
        return result;
    }

    /**
     * 获取策略名称
     * @return 策略标识字符串
     */
    @Override
    public String getName() {
        return "AVG";
    }
}
  • 如何进行多主题的负载均衡

/**
 * 实际上上面提到的策略分配接口里,并没有订阅关系的信息,
 * 如果一个消费者组订阅了Topic1,也订阅了Topic2,且不同主题下的队列数量是不一样的,
 * 那么最后分配的结果肯定也是不同的,这应该怎样实现?
 * 
 * 其实很简单,每次分配是针对主题级别进行的,所以一个主题的分配就会单独调用一次allocate接口,
 * 即每次负载均衡实际上是多次分配组合而成的。
 * 这部分源码在RebalanceImpl.java中,具体源码也很简单,如下所示。
 */
public void doRebalance(final boolean isOrder) {
    // 获取当前消费者组的订阅关系表
    Map<String, SubscriptionData> subTable = this.getSubscriptionInner();

    if (subTable != null) {
        // 遍历所有订阅的主题
        for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
            final String topic = entry.getKey();
            try {
                // 对每个主题单独执行负载均衡
                this.rebalanceByTopic(topic, isOrder);
            } catch (Throwable e) {
                // 非重试主题的异常需要记录日志
                if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                    log.warn("rebalanceByTopic Exception", e);
                }
            }
        }
        // 清理不属于当前消费者订阅主题的队列
        this.truncateMessageQueueNotMyTopic();
    }
}

③订阅关系不同引发的问题

  • 订阅了不同的主题引发的问题

  • 在 RocketMQ 中,消费者组内实例订阅关系不一致会导致队列分配问题:当组内不同实例订阅主题不同(如实例1仅订阅 Topic-A,实例2订阅 Topic-A 和 Topic-B)时,违反官方要求(同一消费者组需完全一致订阅),引发负载均衡计算错误。实例2对 Topic-B 进行分配时,虽检测到2个队列和2个在线实例,但因实例1未订阅 Topic-B,实际仅实例2参与分配,最终仅分配1条队列,另一半队列无人消费,造成消息堆积。

  • 此现象根源在于客户端负载均衡策略:各实例独立计算队列分配,无中心协调。实例2基于在线列表误判所有实例均参与 Topic-B 消费,而实例1因未订阅直接跳过分配流程。从控制台观察,Topic-B 可能长期无实例绑定,导致消息持续积压。解决方案是严格保持组内订阅一致,确保负载均衡计算准确。


  • 订阅了不同主题引发的问题汇总

  • 各种诡异现象的根本原因在于主题订阅关系不一致,具体分为两种场景:场景一因客户端实例间缺乏互通导致信息不对齐;场景二因服务端处理请求时基于订阅关系差异产生误解。场景三:两个消费者仅加载自身订阅主题,但因看到彼此在线,各分配一半队列,导致消息处理不完整。图中还解释“闪烁”现象源于心跳机制不断触发负载均衡重计算,引发分配结果动态变化。


  • 灰度发布的过程可能导致主题订阅关系的不同

  • 虽然官网强调使用该方式不被支持,但在灰度发布场景中,研发人员仍可能遇到此类问题。例如,现网消费者已订阅Topic-A,因新功能需额外订阅Topic-B,当仅有部分机器(如2台中的1台)完成发布时,会出现场景一,导致大量消息无消费者处理而堆积,直至发布完成,如图14-14所示。

  • 该问题在实际中较常见,但灰度发布结束后堆积消息仍可被消费。是否需特殊处理取决于灰度发布过程的时长及消息及时性的重要程度。


  • 灰度发布的过程可能导致主题订阅升级的解决思路

  • 为规避消费者组订阅关系不一致引发的消息处理问题,推荐采用两种方案:其一,在设计初期约束每个消费者组仅订阅单一主题。新增(更换、更换后删除旧的主题)消费主题时,创建新消费者组进行监听;减少主题时则下线对应消费者组。其二,可通过变更消费者组名称实现订阅关系调整,但需确保消息处理逻辑具备幂等性,以应对发布过程中可能出现的短暂重复消费现象,保障数据一致性。

3. 标签订阅不同引发的消息丢失

  • 当多个消费者实例订阅同一主题但不同标签时(如实例1订阅Tag-a,实例2订阅Tag-b),负载均衡机制会正常分配队列(如实例1获q0,实例2获q1)。然而,若某标签消息(如b标签)被散列至未监听该标签的实例队列(如b标签消息进入实例1的q0),则这些消息因无实例消费而“丢失”。此问题隐蔽性强,且直接导致消息可靠性漏洞(见图14-17)。

  • 灰度发布的过程可能导致的标签不同在服务灰度发布过程中,容易遇到“踩坑”问题。例如,线上稳定版本原本监听a标签,而新需求要求服务更新后监听b标签。如果在发布切换过程中,监听机制未能正确处理新旧版本的过渡,可能导致数据丢失、通知延迟或错误等风险。

  • 灰度发布的过程可能导致标签更新的解决思路:和主题更换的问题类似, 其中一个解决思路是不订阅多标签, 而是每个标签的订阅都使 用一个独立的消费者组解决。另外的解决思路则是订阅关系的变化都需要变换消费者组的名称。 同样道理, 注意做好 消息的幂等。

十五. 应对消息大量堆积

1. 消息堆积发生的原因及影响

  • 消息中间件的核心设计目的之一,正是为应对​​消费速度可能滞后于生产速度​​的场景。因此,消息队列中存在一定量的消息积压属于正常现象,通常为几条至几十条的量级,属系统可承受范围。

  • 然而,当生产流量激增且与消费能力形成显著差距时,未及时处理的消息将持续累积。一旦堆积量达到数万甚至数十万条,即构成“大量堆积”,此时需介入处理,以避免系统性能下降或可靠性风险。

①消息堆积的场景

  • 消费与生产速率不匹配​​:当消费速度长期低于生产速度时,消息持续积压。典型场景如日志处理系统,消费端因需批量存储而耗时较长,生产端在流量峰值时易出现积压。

  • 消费端处理能力异常​​:生产速率稳定但消费能力突降,例如异常消息触发慢查询或线程死锁,导致消费线程阻塞,无法及时处理消息。

  • 消费者状态异常​​:消费者实例因启动故障、网络问题或负载均衡异常(如某队列无消费者绑定)而停止拉取消息,造成特定队列消息堆积且无人消费。

②消息堆积的危害

  • 消息堆积是消息中间件中的常见现象,虽最终能消费完,但会导致消费延迟,因队列遵循先进先出原则,新消息必须等待旧消息处理完毕。这引发两个核心问题:其一,重要消息被迫延迟,如付费用户消息可能因免费用户消息堆积而阻塞,影响用户体验;其二,资源被历史消息独占,新消息无法及时处理,导致服务看似失效,例如短信通知延迟数小时,即使后续加速处理,历史消息的时效性已无法弥补。RocketMQ 在当前设计下不支持优先处理新消息,从业务优先级角度考虑,存在优化空间以提升服务响应性。

2. 消息堆积了,哪些扩容手段是有用的?

  • 面对消息堆积问题时,扩容是常见的应对思路,但需注意其并非万能解决方案,特定场景下可能失效。通常考虑的扩容方向包括以下五个层面:

  • 横向扩展消费者服务实例数量;

  • 增加单消费者节点的处理线程数;

  • 提升Broker端的队列数量;

  • 扩展Broker集群节点规模;

  • 强化下游依赖服务的处理能力。

①消费者服务扩容对堆积的影响

  • 消费者扩容(增加实例数)通常能提升消费能力,但因负载均衡机制中每个队列仅分配给一个消费者,其效果受队列数量制约。若扩容前队列数≤消费者数,则新增消费者将空载,无法改善堆积;若队列数>消费者数(如4队列2消费者),扩容可让消费者独占队列(如4消费者各获1队列),消费速度随队列分配优化而提升(案例中从20条/秒增至40条/秒)。扩容前需优先评估队列数量,避免资源浪费。

②消费者线程扩容对堆积的影响

  • RocketMQ 消费者模型相比 Kafka 进行了显著优化,允许单个消费者通过多线程方式并行处理队列中的数据,从而提升消费效率。一种直接有效的扩容方法是增加消费者线程数而非仅扩展实例数量。例如,在拥有 2 条队列和 2 个消费者的场景中,若每个消费者线程数为 5,消费速度为 10 条/秒,则集群总消费速度为 20 条/秒。通过将线程数提升至 10,单个消费者速度增至 20 条/秒,集群总速度达到 40 条/秒,实现消费能力翻倍,且无需增加消费者实例数,避免了队列分配限制带来的资源浪费。

  • 消费者线程扩容在应对消息堆积时通常更有效,主要基于两点原因:其一,对于存量堆积数据,由于队列无法同时分配给多个消费者,增加消费者实例数最终会达到实例数等于队列数的状态,此后继续扩容实例无效;但线程扩容可直接提升单个消费者的处理能力,加速消费速度。其二,数据无法迁移,只能通过提升单个消费者的消费速度来处理堆积,扩容线程数即增加并发数,在不考虑资源消耗的情况下,消费速度可线性提升。

  • 许多开发者对线程池参数设置保守或错误,如默认线程数为20,但常被误设为最小1、最大64,这在线程池队列无限长的情况下实际等效于单线程消费。鉴于服务器资源通常充足,适当提升线程数(如合理设置consumeThreadMin和consumeThreadMax)是低成本扩展消费能力的有效手段,比单纯增加消费者实例更优。

③Broker 队列数扩容

  • 直接扩容队列数量通常无法解决现有消息堆积问题,原因在于 RocketMQ 不会在队列增删时自动迁移已有数据——原有队列的堆积消息仍保留,新增队列为空,且消费能力受限于消费者实例的线程处理能力,而非队列数量。盲目增队列可能加重系统负载,如同要求已满负荷的服务员接管更多餐桌,反而降低效率。

  • 然而,队列扩容在特定场景下具有价值:当消费者实例数需扩容且当前队列数≤实例数时,必须同步增加队列数,否则新增消费者因无法分配到队列而空载。此举旨在优化资源分配,而非直接提升消费吞吐量。

  • 当消息队列出现严重堆积时(如4个队列各积压10万条消息,消费速度100条/秒),新消息需等待约16分钟才能被处理,导致类似会议提醒等实时性要求高的业务受到显著影响。此时扩容队列数量(如从4队列增至8队列)可有效缓解新消息的延迟问题:因消息生产默认采用轮询机制,新消息会部分落入新增的空队列中,使这些消息无需等待历史堆积即可被快速消费。

  • 然而,此方案仍存在局限性:消费者本地的线程池采用串行处理模式,即便新消息快速到达消费者实例内存,仍需与其它队列的消息共同竞争线程资源,因此无法完全达到无堆积时的处理时效。但总体而言,队列扩容仍可在一定程度上优先保障新消息的及时处理,实现新老消息的隔离消费。

④Broker 扩容

  • 研发人员常误以为消息堆积与 Broker 投递性能相关,从而考虑扩容 Broker 以提升吞吐量。然而,此做法基本无效:RocketMQ 因采用零拷贝、Page Cache、批处理及长轮询等优化技术,其消息拉取与投递性能极高,罕见瓶颈。若真存在瓶颈,多集中于网卡层面,但生产环境中消费端处理速度极少导致网卡饱和。即便网卡受限,也应优先升级现有 Broker 的网卡而非扩容新 Broker。

  • 实际上,Broker 扩容在消息堆积场景中的作用与队列数扩容效果相似——二者均旨在优化新消息的及时处理。因此,若核心目标仅为加速新消息消费,直接在原 Broker 上增加队列数即可达成相同目的,无需复杂扩容。

⑤下游服务扩容

  • 消息堆积通常源于消费性能不足,可分为两类原因:一是消费者自身资源不足(如CPU、内存或线程数限制),此时扩容消费者实例或增加线程数能有效提升处理能力;二是下游依赖服务性能瓶颈(如数据库处理速度已达上限),若盲目扩容消费者反而会加重下游压力,导致消费速度下降甚至系统雪崩。

  • 在生产环境中快速判断瓶颈的方法:先少量扩容消费者实例或调整线程数,观察整体消费速度是否提升。若速度上升,表明瓶颈在消费者并发能力;若无效,则需立即分析链路中的强依赖服务(如数据库、外部API),针对性地扩容下游资源,避免误操作加剧问题。

3. 消息堆积的一些定位手段

①定位消费瓶颈

  • 消息堆积通常是由于消费速度无法匹配生产速度所致。定位问题时,可通过执行 jstack命令获取消费者服务的线程堆栈,并搜索以 "ConsumeMessageThread_" 开头的线程名(新版本可能包含消费者组名),以确认消费线程池的并发线程数量。若线程数异常(如过少或过多),需检查并调整线程池配置;若线程数正常,则进一步分析线程堆栈是否卡在相同代码段,从而识别下游服务或代码层面的性能瓶颈。

/**
 * DelayTest 类示例
 * 演示消息消费方法中的睡眠调用导致吞吐量下降的问题
 * 根据线程堆栈跟踪信息,问题出现在 consume 方法的第51行
 */
public class DelayTest {

    /**
     * 消息消费方法
     * 根据堆栈跟踪,此方法第51行调用了 Thread.sleep,导致线程进入 TIMED WAITING 状态
     * 堆栈跟踪详情:
     * STATE: TIMED WAITING
     * java.lang.Thread.sleep (Native Method)
     * mqtest.DelayTest $1.consume (DelayTest.java:51)
     * com.aliyun.openservices.ons.api.impl.rocketmq.ConsumerImpl $MessageListenerImpl.consumeMessage(ConsumerImpl.java:101)
     * com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService $ConsumeRequest.run(ConsumeMessageConcurrentlyService.java:415)
     * java.util.concurrent.Executors $RunnableAdapter.call(Executors.java:511)
     * java.util.concurrent.FutureTask.run (FutureTask.java:266)
     * java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
     * java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
     * java.lang.Thread.run(Thread.java:748)
     * 
     * 问题分析:消息线程处于睡眠等待状态,导致程序吞吐量下降。
     * 根源在于第51行的 sleep 调用,需优化以避免不必要的延迟。
     */
    public void consume() {
        // 模拟消息处理逻辑
        System.out.println("Processing message...");
        
        // 第51行:问题代码 - Thread.sleep 调用导致线程等待
        try {
            Thread.sleep(1000); // 此处睡眠导致吞吐量下降
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        
        // 后续处理逻辑
        System.out.println("Message processed.");
    }
}
/**
 * DelayTest 类示例
 * 演示消息消费方法中因调用 HTTP 请求导致的性能瓶颈问题
 * 根据线程堆栈跟踪信息,问题出现在 consume 方法的第58行
 */
public class DelayTest {

    /**
     * 消息消费方法
     * 从堆栈跟踪分析,线程状态为 RUNNABLE,但执行缓慢
     * 堆栈跟踪详情:
     * STATE: RUNNABLE
     * java.lang.ClassLoader.loadClass(ClassLoader.java:404)
     * sun.misc.Launcher $AppClassLoader.loadClass(Launcher.java:349)
     * java.lang.ClassLoader.loadClass(ClassLoader.java:357)
     * org.apache.http.impl.conn.PoolingHttpClientConnectionManager.<init>(PoolingHttpClientConnectionManager.java:174)
     * org.apache.http.impl.conn.PoolingHttpClientConnectionManager.<init>(PoolingHttpClientConnectionManager.java:158)
     * org.apache.http.impl.conn.PoolingHttpClientConnectionManager.<init>(PoolingHttpClientConnectionManager.java:149)
     * org.apache.http.impl.conn.PoolingHttpClientConnectionManager.<init>(PoolingHttpClientConnectionManager.java:125)
     * refactor.base.Tools.getHttpsClient(Tools.java:138)
     * refactor.base.Tools.httpsPost(Tools.java:257)
     * mqtest.DelayTest $1.consume(DelayTest.java:58)  // 问题点:第58行调用 httpsPost
     * com.aliyun.openservices.ons.api.impl.rocketmq.ConsumerImpl $MessageListenerImpl.consumeMessage(ConsumerImpl.java:101)
     * com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService $ConsumeRequest.run(ConsumeMessageConcurrentlyService.java:415)
     * java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
     * java.util.concurrent.FutureTask.run(FutureTask.java:266)
     * java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
     * java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
     * java.lang.Thread.run(Thread.java:748)
     * 
     * 问题分析:从堆栈上看,线程是正常的运行状态。如果很多线程都在这个状态,证明里面肯定有一些代码片段很慢,导致消费太久了。
     * 而上面这个例子,出问题的代码片段是 mqtest DelayTest 里面的 consume 函数,其代码行数是在第58行
     * [从 mqtest DelayTest$1 consume(DelayTest.java:58) 这一行可看出],里面调用了 httpsPost 的方法导致了瓶颈。
     * 那么具体 http 的瓶颈可能是下游的服务太慢了,也可能是某些参数设置不合理,但无论如何,定位的方向算是清楚了。
     */
    public void consume() {
        // 模拟消息处理逻辑
        System.out.println("Processing message...");

        // 第58行:问题代码 - 调用 httpsPost 方法,可能导致性能瓶颈
        // 由于 HTTP 请求依赖下游服务响应,或参数配置不当,此处可能执行缓慢
        Tools.httpsPost("https://example.com/api", "message data"); // 假设 Tools.httpsPost 方法存在

        // 后续处理逻辑
        System.out.println("Message processed.");
    }
}

// 假设的 Tools 类(仅用于演示,实际需根据项目实现)
package refactor.base;

import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
// 其他必要的 HTTP 客户端导入

class Tools {
    public static void httpsPost(String url, String data) {
        // 实际实现可能涉及 HTTP 客户端调用,如下示例:
        // PoolingHttpClientConnectionManager 的初始化可能在此或其他方法中
        // 具体代码可能类似 PoolingHttpClientConnectionManager cm = new PoolingHttpClientConnectionManager();
        // 然后执行 HTTP POST 请求
        System.out.println("HTTP POST to " + url + " with data: " + data);
        // 模拟可能由于下游服务慢或配置问题导致的延迟
        try {
            Thread.sleep(1000); // 模拟网络延迟或处理时间
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}
/**
 * 消费者示例类
 * 演示消息消费中可能出现的线程空闲与队列堆积问题
 * 以下代码模拟了一种特殊情况:大量消费线程处于空闲状态,但消息队列中仍有消息堆积
 */
public class ConsumerExample {

    /**
     * 消息消费方法
     * 该方法可能在线程池中执行,但当线程处于等待状态时,会导致消费进度停滞
     */
    public void consumeMessages() {
        // 模拟消息处理逻辑
        // 在实际场景中,这里可能会从阻塞队列中获取任务并处理
        // 但如果线程处于等待状态,则无法处理新消息
        
        // 以下堆栈跟踪信息展示了线程空闲时的典型状态:
        // STATE: WAITING
        // sun.misc.Unsafe.park(Native Method)
        // java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        // java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        // java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        // java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
        // java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
        // java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        // java.lang.Thread.run(Thread.java:748)
        
        // 问题分析:
        // 如果从堆栈上看到以上这样的情况,证明该消费线程是空闲的。
        // 而如果大量消费线程是空闲的话,理论上证明没有什么消息需要消费。
        // 但是如果确实看到队列有堆积,那么可能出现以下两个问题:
        
        // 1) 进度同步出现了问题,导致消费了,但是没同步到Broker。
        //    这块需要关注客户端里进度同步相关的日志。
        
        // 2) 中间某条消息把进度卡住了。前面的章节提到过,进度的提交是一批一批的,
        //    如果中间有消息一直处于消费中很久(如死锁、死循环),那么消费进度也会一直卡住,
        //    看起来就是进度一直停滞不前,但是实际上其他消息都是可能正常消费的。
        //    这时候应该可以发现其中某些线程是正常消费的,但是一直卡在某个状态。
        //    借助 jstack 也可以定位到具体的代码片段。只不过这种问题会比较难定位,
        //    因为很可能是在很特殊的条件下才会触发。这类问题需要特别关注的是有没有一些
        //    特殊的消费逻辑的分支或者在某些特殊的数据状态下可能导致的高耗时逻辑。
    }

    /**
     * 模拟进度同步方法
     * 在实际实现中,消费完成后需要将进度同步到Broker
     * 如果同步失败,可能导致消费进度无法更新
     */
    private void syncOffsetToBroker() {
        // 这里实现进度同步逻辑
        // 如果同步出现异常或失败,需要记录日志以便排查
    }

    /**
     * 模拟消息处理逻辑
     * 某些特殊消息可能导致处理时间过长或死锁
     */
    private void processMessage(Message message) {
        // 实际消息处理代码
        // 需要特别注意某些特殊分支逻辑可能导致的性能问题
        // 例如:死循环、死锁、长时间外部调用等
    }
}

// 辅助类定义
class Message {
    // 消息内容
}

②定位数据倾斜

  • 在消息队列使用中,有时会出现同一主题下大多数队列无堆积,仅个别队列延迟严重且堆积量大的情况。这通常源于两种原因:其一,该队列分配的消费者处理能力不足,可能是消费逻辑存在性能瓶颈(可通过jstack工具定位),或部署问题如消费者位于不同机房导致网络延迟;其二,消息分配策略设计不当,例如在平均负载下消费者处理速度与生产速度匹配,但因热点数据导致某个队列消息激增(如直播业务中以房间ID分片时,热门直播间消息集中),造成队列堆积。定位时需分析堆积消息的特征,并检查生产者端的selector逻辑是否需要优化以平衡负载。

③定位队列分配数量

  • 消息堆积在消息队列系统中常由消费队列分配问题引起。其一,队列数量分配不均可能导致负载失衡:例如3个队列与2个消费者实例时,默认分配规则可能使一个实例处理2个队列而另一实例仅处理1个,导致高负载实例更易出现堆积。优化方案是调整队列数量,使其成为消费者数量的整数倍,以提升分配均衡性。其二,异常场景下队列可能无消费者组处理,虽RocketMQ会触发重平衡机制,但若队列持续未分配(如因自定义分配策略算法缺陷或相同消费者组内实例订阅关系不一致),消息将只进不出造成堆积。例如,同组实例订阅不同主题(如一个仅消费Topic-A,另一个消费Topic-A和Topic-B)时,Topic-B的部分队列可能无实例消费,此问题常见于灰度发布过程,需提前规避以确保订阅一致性。

4.处理消息堆积的实践建议

①快速扩容

  • 遇到消息堆积, 通常情况下需要第一时间快速扩容。

②快速恢复新消息的消费

  • 系统扩容虽提升整体消费能力,但无法实现新消息优先处理(即“插队”机制)。为此提供三种参考方案:

  • ​更换消息主题​​:生产者将新消息发送至新主题,消费者组同步切换订阅主题,实现新消息的独立资源分配;

  • 重置消费进度​​:通过控制台或命令行重置消费者组进度,跳过历史堆积消息,直接消费新到达消息;

  • 变更消费者组与偏移策略​​:新建消费者组并配置 CONSUME_FROM_LAST_OFFSET策略,使其从队列末端开始消费,规避历史消息积压。

③恢复历史消息的消费

  • 通过调整消费策略可实现新消息的即时处理,但此操作可能导致堆积期间的消息被"跳过"(非真正丢失)。若这些消息时效性强或可忽略(如过期通知),此方案即可满足需求。

  • 若需保证消息零丢失,可利用 RocketMQ 的回放机制:新建消费者组并设置合理消费位点(如从堆积起始位置),即可重新处理积压消息,实现业务补救与新消息处理的并行进行。

5. 预防消息堆积的实践建议

①合理地规划消费并行度

  • 首先, 消费的并行能力是最容易导致消费瓶颈的, 而这其中特别需要关注以下三个指标

  • 消费者的实例数量。

  • 每个消费者能分配到的队列数。

  • 消费者线程池的线程数。

②避免热点数据

  • 在程序设计中进行消息分区时,选择合适的分片键至关重要,以避免热点问题。例如,在直播间功能中,若按房间ID分区,大房间可能导致数据堆积;而按观众账号ID分区则可均衡负载。类似地,在商品浏览记录场景中,按商品ID分区可能因某些商品浏览量激增而造成数据倾斜,导致队列堆积。

③适当丢弃过老的消息

  • 丢弃策略能有效优先处理新消息,避免系统被旧消息阻塞,特别适合消息通知等高时效性场景。若担忧消息丢失导致业务损失,可借助 RocketMQ 进行恢复:通过日志定位丢弃时间点(如 5 小时前),随后启动新消费者组并将消费进度回溯至该时间,重新消费消息。此过程需注意消息幂等处理,以防重复消费引发数据不一致。

/**
 * 消息消费者示例类
 * 优化策略:为避免消息堆积影响新消息消费,程序在处理消费逻辑时适度丢弃老消息,
 * 确保新消息能更快被消费。判断消息是否过老的手段之一是基于消息的生产时间。
 */
public class OptimizedMessageListener implements MessageListenerConcurrently {

    /**
     * 消息消费方法
     * 遍历消息列表,丢弃过期消息(如生产时间超过一分钟的消息),仅处理新消息
     * @param msgs 消息列表
     * @param context 消费上下文
     * @return 消费状态(成功)
     */
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        // 遍历所有消息
        for (MessageExt msg : msgs) {
            // 检查消息生产时间:如果当前时间减去消息出生时间大于60秒(一分钟),则认为消息过期
            if (System.currentTimeMillis() - msg.getBornTimestamp() > 60 * 1000) {
                continue; // 过期消息跳过,不处理
            }
            
            // 这里是实际消费逻辑(示例中省略)
            // 例如:处理消息内容,如解析、存储或转发
            // System.out.println("Processing message: " + new String(msg.getBody()));
        }
        
        // 返回消费成功状态
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
}
/**
 * 消息消费者优化类
 * 采用堆积量检查策略:当检测到消息堆积超过阈值时,自动丢弃当前消息批次,
 * 以减轻系统负载并优先处理新消息。适用于高吞吐场景下的消息积压处理。
 */
public class OptimizedMessageListener implements MessageListenerConcurrently {

    /**
     * 消息消费方法
     * 遍历消息列表,检查队列堆积情况。若堆积量超过10万条,则跳过当前批次处理,
     * 直接返回消费成功状态,避免进一步加剧堆积。
     * 
     * @param msgs 消息列表,通常包含同一队列的多个消息
     * @param context 消费上下文信息
     * @return 消费状态:成功(跳过或正常处理)
     */
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        // 获取当前消息的队列偏移量
        long offset = msgs.get(0).getQueueOffset();
        // 从消息属性中获取队列最大偏移量
        String maxOffset = msgs.get(0).getProperty(MessageConst.PROPERTY_MAX_OFFSET);
        // 计算堆积消息数量(最大偏移量与当前偏移量的差值)
        long diff = Long.parseLong(maxOffset) - offset;

        // 检查堆积是否超过10万条:如果堆积太多,则舍弃当前批次
        if (diff > 100000) {
            // 消息堆积了10万条情况的特殊处理:直接返回成功,丢弃消息
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }

        // 这里是正常消费逻辑(示例中省略具体实现)
        // 例如:遍历msgs处理每条消息,如解析、存储或业务操作
        // for (MessageExt msg : msgs) {
        //     // 处理消息内容
        // }

        // 正常处理完成后返回成功状态
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
}

十六. 大型互联网系统双活RocketMQ集群架构

1. 常见的同城双活后台架构

  • 大型互联网系统常采用同城双活架构保障高可用性,其核心设计如下:在异地机房(如北京汇天与昌平机房)独立部署应用服务与数据库。数据库采用跨机房主从部署,主库集中于汇天机房,从库同步至昌平机房。写请求统一路由至主库处理,读请求则通过读写分离机制就近访问本地从库,兼顾数据一致性与访问效率。消息中间件(如RocketMQ)在此架构中非必需组件,但可根据业务需求灵活引入。该模式通过流量分流与数据冗余,实现故障隔离与业务连续性保障。

  • 实现双活架构通常有两种手段。其一为双域名策略:两个机房分别配置不同的服务域名,终端设备通过路由策略选择特定域名,从而将请求定向至相应机房。

  • 网关路由是一种实现双活架构的手段,通过在应用前部署统一网关,由网关执行路由策略,将请求定向到合适的集群中,具体如图16-3所示。

  • 双活架构旨在实现跨机房级容灾能力:当任一机房(如服务、中间件或存储层)发生故障时,可通过流量切换由另一机房无缝接管服务,保障业务连续性(例如汇天机房网络故障时,将流量全量导向昌平机房)。

  • 本章聚焦 RocketMQ 在双机房架构中的技术实现,故暂不展开路由策略与域名解析等通用方案,但需强调这些要素在实际架构落地中至关重要,是确保流量精准调度与故障隔离的基础支撑。

2. 同城双机房的RocketMQ部署架构

①简单的同城双机房RocketMQ部署架构

  • 在同城双活架构中引入 RocketMQ 时,最简单的方式是在两个机房(如汇天与昌平)独立部署完整集群(含 Broker 与 Name Server),形成无交集的隔离环境。此方案虽适配双活架构,但存在显著缺陷:

  • 当单机房 RocketMQ 故障时(如汇天机房磁盘损坏),需人工流量调度才能恢复业务,切换期间服务必然中断;

  • 故障机房的消息数据因无跨机房同步机制将永久丢失,无法恢复,违背双活架构的高可用目标。

  • RocketMQ 原生不支持跨机房双活,需通过架构优化(如数据同步方案)弥补局限性。

②同城双机房高可靠的RocketMQ部署架构

  • 为了提升Broker集群的可靠性,建议调整部署架构:在汇天机房的Broker集群中增加一个昌平机房的slave节点,同时在昌平机房的Broker集群中增加一个汇天机房的slave节点。这样,即使某个机房发生Broker磁盘故障,也可利用另一个机房的slave节点恢复服务。整体部署架构如图16-5所示。

③同城双机房的RocketMQ部署架构(写高可靠)

  • 基于 master/slave 的架构不支持自动主备切换,若汇天机房 master 宕机,slave 虽可提供历史消息消费,但该机房消息写入服务完全中断,导致写入功能不可靠。

  • 为解决此问题,不推荐生产环境贸然采用 DLedger 模式(实践较少且 5.0 版本后有新 controller 模式)。建议在主备模式基础上,对等部署另一组 Broker,形成双组冗余:当一组 Broker 的 master 故障时,另一组可继续提供写服务,保障写入高可用性(架构如图 16-6 所示)。

3. 同城双活的RocketMQ部署架构

  • RocketMQ 同城双活部署架构(写高可靠)已能解决 90% 以上的容灾需求,国内大型互联网业务的消息中间件架构大多基于此方案。该架构通过在双机房独立部署集群保障写入高可用性,但其核心局限在于集群独立性——当单机房集群故障(如汇天机房集群不可用)时,该机房所有依赖消息服务的业务将中断。

  • 常规解决方案是通过全局流量调度将故障机房流量导入备用机房(如昌平机房),但此操作牵一发而动全身:不仅实施复杂度高,还会显著增加备用机房的整体负载压力,可能引发数据库等其他服务的连锁风险。后续优化方向需突破集群独立性限制,实现双机房服务的相互备份与自动流量接管,从而在灾难场景下实现无缝切换。

①简单的同城双活部署架构

  • 基于同城双机房策略,可通过将汇天和昌平机房的 RocketMQ Broker 统一注册至同一 Name Server 集群,形成逻辑上的大集群,实现双活架构。此方案使生产者和消费者能透明访问任一集群,确保单机房故障时另一机房可无缝接管服务,保障业务连续性与高可用性。整体部署依托集群逻辑整合,简化运维并提升容灾能力。

②同城双活的技术挑战

  • 同城双活部署虽提升了系统可靠性,但引入了流量穿越现象:消息可能在生产者和消费者之间跨机房流动(如汇天机房生产的消息被昌平机房消费),甚至绕行多机房后返回本机房,导致链路复杂化。此现象带来三大影响:

  • 问题定位难度增加​​:请求流转跨机房,追踪完整链路需多机房协作,排查效率降低;

  • 架构复杂度提升​​:需保障跨机房数据状态互通(如共享数据库或数据同步),否则可能引发一致性问题;

  • 延迟加剧​​:同机房内部通信延迟较低,但跨机房物理距离不可避免会增加网络延迟,影响服务稳定性。

③基于全局路由的同城双活部署架构

  • 为解决同城双活架构中的流量穿越问题,一个直接思路是引入全局路由服务(可作为独立模块或配置中心)。该服务为各机房的生产者和消费者提供本机房的 Name Server 地址,确保消息流本地化:例如,汇天机房的实例通过路由服务获取汇天 Name Server 地址,随后创建本地生产者/消费者,实现消息的“自产自销”,避免跨机房通信,降低延迟与复杂度。此机制依托路由服务的智能调度,保障各机房内部消息闭环处理,提升系统稳定性和可维护性。

  • 当汇天机房MQ集群发生故障时,运维人员可通过路由服务将集群地址切换为昌平机房地址,使原汇天机房生产的消息转发至昌平机房MQ集群,实现消息层的跨机房容灾。对于消费者处理,运维可选择将汇天机房消费者切换到昌平机房消费;若不切换,则消息由汇天机房实例生产至昌平机房集群,最终由昌平机房消费者处理,确保业务连续性。这一机制保障了单机房故障时的系统冗余与消息流无缝迁移。

  • 在如图16-10所示的架构切换场景中,若将消费压力全量导向昌平机房,需确保其容量足以承载两机房流量,否则需同步切换消费者。切换时,全局路由服务将下发昌平机房Name Server地址,汇天机房消费者据此新建连接至新集群,实现消息流重构:汇天生产者投递至昌平MQ集群,双机房消费者共同分摊消费(如图16-11)。

  • 全局路由服务可通过两种机制感知集群故障:手动变更​​:依赖人工监控与配置更新;自动探测​​:通过心跳机制监控Broker存活状态,故障时自动触发地址切换。此设计保障了故障场景下的业务连续性,但需提前评估容量与自动化程度。

④基于自定义负载均衡策略的同城双活部署架构

  • 基于全局路由的同城双活部署策略核心优势在于灵活性高,支持手动与自动切换模式,但需承担较高架构改造成本:需独立开发全局路由服务,并确保所有 RocketMQ 服务接入该路由;当路由下发新集群地址时,需动态创建对应消费者与生产者实例。

  • 基于自定义负载均衡策略的同城双活基于RocketMQ 原生支持自定义负载均衡策略,生产者可通过 MessageQueueSelector、消费者通过 AllocateMessageQueueStrategy实现就近接入与“自产自销”。实施时,先将双机房集群混合为逻辑大集群,确保生产者和消费者可感知所有节点,再通过自定义策略优化流量路由,避免跨机房开销。


  • 生产者

  • RocketMQ 生产者在发送消息前,会在客户端执行负载均衡,从指定主题的所有队列中选择目标队列。例如,若主题 TOPIC-A 在双机房各部署 4 个队列,则需从 8 个队列中择一发送。

  • 为优化跨机房场景,可采用分组策略:

  • 按机房分组队列​​(如昌平组、汇天组);

  • 优先选择生产者同机房队列组​​,若同机房无可用队列则降级至全队列选择;

  • 最终随机选定单一队列​​发送消息。

  • 此机制确保双机房均正常时,各机房生产者仅就近发送至本地队列,减少跨机房流量,提升效率与可靠性。

/**
 * MachineRoomSelector 类实现 MessageQueueSelector 接口
 * 用于在RocketMQ生产者中根据机房分组选择消息队列,优先选择本机房队列以优化跨机房流量
 */
public class MachineRoomSelector implements MessageQueueSelector {

    // 随机数生成器,用于随机选择队列
    private Random random = new Random(System.currentTimeMillis());

    // 这里假设已知本实例是在汇天机房
    // 实际上这里可以从配置文件取或者从IP中解析
    private String myMachineRoom = "HT"; // 默认设置为汇天机房

    /**
     * 选择消息队列的方法
     * 核心逻辑:先按机房分组队列,优先选择本机房队列,若无则随机选择
     * @param mqs 可用的消息队列列表
     * @param msg 消息对象
     * @param arg 附加参数
     * @return 选择的消息队列
     * @throws IllegalArgumentException 当Broker名称无法解析时机房时抛出异常
     */
    @Override
    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
        // 先按照机房分组:创建映射表,key为机房名称,value为该机房的队列列表
        Map<String/* machine room */, List<MessageQueue>> mr2Mq = new TreeMap<String, List<MessageQueue>>();
        for (MessageQueue mq : mqs) {
            // 假设Broker的名字都是以IDC名称开头命名的,例如CP-Broker,HT-Broker
            // 通过拆分Broker名称获取机房前缀
            String brokerMachineRoom = mq.getBrokerName().split("-")[0];

            // 修正逻辑:仅当机房名称不为空时才进行分组,否则抛出异常
            if (!StringUtils.isEmpty(brokerMachineRoom)) {
                if (mr2Mq.get(brokerMachineRoom) == null) {
                    mr2Mq.put(brokerMachineRoom, new ArrayList<MessageQueue>());
                }
                mr2Mq.get(brokerMachineRoom).add(mq);
            } else {
                throw new IllegalArgumentException("Machine room is null for mq " + mq);
            }
        }

        // 获取本机房的队列列表
        List<MessageQueue> nearMQs = mr2Mq.get(myMachineRoom);

        // 没有本机房的队列就随机挑一条
        if (nearMQs == null) {
            int value = random.nextInt(mqs.size());
            return mqs.get(value);
        } else { 
            // 有本机房的队列,就只在本机房随机挑一条
            int value = random.nextInt(nearMQs.size());
            return nearMQs.get(value);
        }
    }
}

  • 消费者

  • RocketMQ 内置的 AllocateMachineRoomNearby策略专为双机房场景设计,旨在实现消费者就近接入。其核心流程分为四步:

  • 队列分组​​:按机房维度将队列分组(如昌平机房组、汇天机房组);

  • 同机房判断​​:优先选择与消费者实例同机房的队列组,若同机房无队列则降级至全队列分配;

  • ​平均分配​​:对目标队列组执行平均分配算法,计算消费者应处理的队列;

  • 剩余处理​​:若有机房队列未分配(如该机房消费者全宕),则将剩余队列分配给其他机房消费者。

  • 该策略已封装于 RocketMQ 客户端,使用时仅需创建 AllocateMachineRoomNearby实例并注入 MachineRoomResolver(用于标识消费者与队列的机房归属),无需关注底层实现,即可自动优化跨机房流量与延迟。

/**
 * RocketMQ 消费者配置示例:实现双机房就近接入负载均衡策略
 * 本代码基于图片内容编写,展示如何配置消费者使用 AllocateMachineRoomNearby 策略,
 * 以确保消费者优先处理同机房消息,减少跨机房流量。
 */
public class RocketMQConsumerConfig {

    public static void main(String[] args) {
        // 创建名为 "MyConsumer" 的 DefaultMQPushConsumer 实例
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("MyConsumer");

        // 创建 MachineRoomResolver 匿名实例,用于解析 Broker 和消费者所在的机房
        // 注释说明:clientId 和队列命名以 IDC 开头,例如消费者 ID 为 "HT-192.168.0.1@12345",
        // Broker 名称为 "HT-Broker-a",其中 "HT" 表示汇天机房,"CP" 表示昌平机房。
        MachineRoomResolver machineRoomResolver = new MachineRoomResolver() {
            /**
             * 重写 brokerDeployIn 方法:从 Broker 名称中提取机房信息
             * @param brokerName Broker 名称,如 "HT-Broker-a"
             * @return 机房标识字符串,如 "HT"
             */
            @Override
            public String brokerDeployIn(String brokerName) {
                // 假设 Broker 名称以机房前缀开头,后跟 "-" 分隔符
                return brokerName.split("-")[0];
            }

            /**
             * 重写 consumerDeployIn 方法:从 clientID 中提取机房信息
             * @param clientID 消费者客户端 ID,如 "HT-192.168.0.1@12345"
             * @return 机房标识字符串,如 "HT"
             */
            @Override
            public String consumerDeployIn(String clientID) {
                // 通过分割 clientID 字符串返回机房部分
                return clientID.split("-")[0];
            }
        };

        // 基于 MachineRoomResolver 实例,创建 AllocateMachineRoomNearby 实例,
        // 并使用其作为消费者负载均衡策略。AllocateMessageQueueAveragely 作为基础分配策略。
        AllocateMessageQueueStrategy allocateMessageQueueStrategy = new AllocateMachineRoomNearby(
            new AllocateMessageQueueAveragely(), machineRoomResolver
        );

        // 将策略设置到消费者实例
        consumer.setAllocateMessageQueueStrategy(allocateMessageQueueStrategy);

        // 基于这样的策略,架构师就可以实现消费者的单边就近接入了。
        // 也就是说如果汇天机房的 Broker 集群和消费者在线,无论汇天机房的消费者数量和队列的数量是多少,
        // 都会在同一个机房内部分配完,如图16-12所示。
        // 此举优化了网络延迟和系统性能,确保流量本地化。
    }
}

  • 基于这样的策略, 架构师就可以实现消费者的单边就近接入了。 也就是说如果汇天机房 的Broker 集群和消费者在线, 无论汇天机房的消费者数量和队列的数量是多少, 都会在同一 个机房内部分配完, 如图16-12所示

  • 如果汇天机房的MQ集群出现故障, 那么就会出现下面这样的分配情况, 如图16-13所示。

  • 如果汇天机房的全部消费者不在线, 那么汇天机房的队列也会分给昌平机房的消费者, 如图16-14 所示