目 录CONTENT

文章目录

RocketMQ—生产者发送消息

路口、下车
2026-01-19 / 0 评论 / 0 点赞 / 6 阅读 / 0 字
温馨提示:
本文最后更新于2026-01-19,若内容或图片失效,请留言反馈。 部分素材来自网络,若不小心影响到您的利益,请联系我们删除。

RocketMQ五种消息类型

  • 普通消息:普通消息也称为并发消息,和传统的队列相比,并发消息没有顺序,但是生产消费都是并行进行的,单机性能可达十万级别的TPS。
  • 分区有序消息:与Kafka中的分区类似,把一个Topic消息分为多个分区“保存”和消费,在一个分区内的消息就是传统的队列,遵循FIFO(先进先出)原则。
  • 全局有序消息:如果把一个 Topic 的分区数设置为 1,那么该 Topic 中的消息就是单分区,所有消息都遵循FIFO(先进先出)的原则。
  • 延迟消息:消息发送后,消费者要在一定时间后,或者指定某个时间点才可以消费。在没有延迟消息时,基本的做法是基于定时计划任务调度,定时发送消息。在RocketMQ中只需要在发送消息时设置延迟级别即可实现。
  • 事务消息:主要涉及分布式事务,即需要保证在多个操作同时成功或者同时失败时,消费者才能消费消息。RocketMQ通过发送Half消息、处理本地事务、提交(Commit)消息或者回滚(Rollback)消息优雅地实现分布式事务。

image-ecjb.png

Broker、分区、队列的关系

image-e8nx.png

普通消息

架构拓扑

image-f76s.png

执行流程

  1. Master与Slave启动向NameServer注册
  2. 生产者Producer发送数据前从NameServer获取Master的IP、端口等通信参数
  3. 生产者Producer向Master发送消息
  4. Master向Slave进行消息同步

代码解析

@Slf4j
public class MessageType1 {
    public static void main(String[] args) {
        //DefaultMQProducer用于发送非事务消息
        DefaultMQProducer producer = new
        DefaultMQProducer("ProducerGroupName");
        //注册NameServer地址
        producer.setNamesrvAddr("192.168.31.103:9876");
        //异步发送失败后Producer自动重试2次
        producer.setRetryTimesWhenSendAsyncFailed(2);
        try {
            //启动生产者实例
            producer.start();
            //消息数据
            String data = "{\"title\":\"X市2021年度第四季度税务汇总数据\"}";
            //消息主题
            Message message = new Message("tax-data", "2021S4", data.getBytes());
            //发送结果
            SendResult result = producer.send(message);
            log.info("Broker响应:" + result);
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            try {
                //关闭连接
                producer.shutdown();
                log.info("连接已关闭");
            }catch (Exception e){
                e.printStackTrace();
            }
        }
    }
}

运行结果

23:00:48.741 [main] INFO com.itlaoqi.rocketmq.mtype.MessageType1 - Broker响
应:
SendResult [
    sendStatus=SEND_OK,
    msgId=7F000001347018B4AAC20A1687DC0000,
    offsetMsgId=C0A81F6900002A9F00000000000A21CB,
    messageQueue=MessageQueue [topic=tax-data, brokerName=broker-a,
    queueId=3],
    queueOffset=2
]

  • sendStatus:发送状态,SEND_OK代表成功
  • msgId:消息由RocketMQ分配的全局唯一Id,由 producer客户端生成,调用方法MessageClientIDSetter.createUniqID()生成全局唯一的Id
  • offsetMsgId:Broker 服务端将消息追加到内存后会返回其物理偏移量,即在
  • commitlog 文件中的偏移量,然后会生成一个Id
  • messageQueue:消息队列内容
    • topic:主题名称
    • brokerName:broker服务器名字,在RocketMQ
    • xxx.propertites配置文件中brokerName项定义
    • queueId:queueId队列Id,默认会初始化4个(0-3)
  • queueOffset:queueId对应队列逻辑上的位置(偏移量)

有序消息

假设没有分区时有什么问题?
如果某一笔业务产分为多条普通消息同时发送,消费者无法保证按按生产者预期的顺序进行消费,进而导致代码逻辑错误

image-crrg.png

分区有序消息

分区有序消息:与Kafka中的分区类似,把一个Topic消息分为多个分区“保存”和消费,在一个分区内的消息就是传统的队列,遵循FIFO(先进先出)原则。
全局有序消息:如果把一个 Topic 的分区数设置为 1,那么该 Topic 中的消息就是单分区,所有消息都遵循FIFO(先进先出)的原则。

image-j9mu.png

Consumer消费时通过一个分区只能有一个线程消费的方式来保证消息顺序。

部署拓扑

image-hly2.png

源码

pom.xml
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.9.2</version>
</dependency>
package com.itlaoqi.rocketmq.mtype;
...
import java.util.List;
@Slf4j
//发送分区顺序消息
public class MessageType2 {
    public static void main(String[] args) {
        DefaultMQProducer producer = new     DefaultMQProducer("ProducerGroupName");
        producer.setNamesrvAddr("192.168.31.103:9876");
        producer.setRetryTimesWhenSendAsyncFailed(2);
        try {
            producer.start();
            Integer id = 4465;
            String data = "{\"id\":" + id+" , + \"title\":\"X市2021年度第四季度税务汇总数据\"}";
            Message message = new Message("tax-data", "2021S4",id.toString(),
            data.getBytes(RemotingHelper.DEFAULT_CHARSET));
            //分区有序消息最大的区别便是调用send方法是,需要实现MessageQueueSelector接
            口,确定使用哪个队列投递消息
        SendResult result = producer.send(message, new MessageQueueSelector() {
            @Override
            public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                log.info("当前队列数量:" + mqs.size() + ",明细:" + mqs.toString());
                log.info("Message对象:" + msg.toString());
                int dataId = Integer.parseInt(msg.getKeys());
                int index = dataId % mqs.size();
                MessageQueue messageQueue = mqs.get(index);
                log.info("分区队列:" + messageQueue);
                return messageQueue;
            }
        },null);
            log.info("消息已发送:MsgId:" + result.getMsgId() + ",发送状态:" +
            result.getSendStatus());
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            try {
                producer.shutdown();
                System.out.println("连接已关闭");
            }catch (Exception e){
                e.printStackTrace();
            }
        }
    }
}

输出结果

15:04:49.407 [main] INFO com.itlaoqi.rocketmq.mtype.MessageType2 - 当前队列数量:4,明细:[MessageQueue [topic=tax-data, brokerName=broker-a, queueId=0], MessageQueue[topic=tax-data, brokerName=broker-a, queueId=1], MessageQueue [topic=tax-data,brokerName=broker-a, queueId=2], MessageQueue [topic=tax-data, brokerName=brokera, queueId=3]]
15:04:49.409 [main] INFO com.itlaoqi.rocketmq.mtype.MessageType2 - Message对象:Message{topic='tax-data', flag=0, properties={KEYS=4465, WAIT=true, TAGS=2021S4},body=[123, 34, 105, 100, 34, 58, 52, 52, 54, 53, 32, 44, 32, 43, 32, 34, 116, 105, 116, 108,101, 34, 58, 34, 88, -27, -72, -126, 50, 48, 50, 49, -27, -71, -76, -27, -70, -90, -25, -84, -84,-27, -101, -101, -27, -83, -93, -27, -70, -90, -25, -88, -114, -27, -118, -95, -26, -79, -121, -26,-128, -69, -26, -107, -80, -26, -115, -82, 34, 125], transactionId='null'}
15:04:49.409 [main] INFO com.itlaoqi.rocketmq.mtype.MessageType2 - 分区队
列:MessageQueue [topic=tax-data, brokerName=broker-a, queueId=1]
15:04:49.712 [main] INFO com.itlaoqi.rocketmq.mtype.MessageType2 - 消息已发送:MsgId:7F0000013C4018B4AAC20D891D290000,发送状态:SEND_OK

全局有序消息

代码解析

在实现MessageQueueSelector接口时,固定选择某个队列就代表全局有序。注意:这里的全局有序代表broker中全局有序。如果消息被分发到不同的broker中,不保证有序,当然这种使用方法是错误的。

SendResult result = producer.send(message, new MessageQueueSelector() {
    @Override
    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
        MessageQueue messageQueue = mqs.get(0);
        return messageQueue;
    }
},id);

输出结果

15:08:18.295 [main] INFO com.itlaoqi.rocketmq.mtype.MessageType2 - 当前队列数量:4,明细:[MessageQueue [topic=tax-data, brokerName=broker-a, queueId=0], MessageQueue[topic=tax-data, brokerName=broker-a, queueId=1], MessageQueue [topic=tax-data,brokerName=broker-a, queueId=2], MessageQueue [topic=tax-data, brokerName=brokera, queueId=3]]
15:08:18.297 [main] INFO com.itlaoqi.rocketmq.mtype.MessageType2 - Message对象:Message{topic='tax-data', flag=0, properties={KEYS=4465, WAIT=true, TAGS=2021S4},body=[123, 34, 105, 100, 34, 58, 52, 52, 54, 53, 32, 44, 32, 43, 32, 34, 116, 105, 116, 108,101, 34, 58, 34, 88, -27, -72, -126, 50, 48, 50, 49, -27, -71, -76, -27, -70, -90, -25, -84, -84,-27, -101, -101, -27, -83, -93, -27, -70, -90, -25, -88, -114, -27, -118, -95, -26, -79, -121, -26,-128, -69, -26, -107, -80, -26, -115, -82, 34, 125], transactionId='null'}
15:08:18.297 [main] INFO com.itlaoqi.rocketmq.mtype.MessageType2 - 分区队
列:MessageQueue [topic=tax-data, brokerName=broker-a, queueId=0]
15:08:18.602 [main] INFO com.itlaoqi.rocketmq.mtype.MessageType2 - 消息已发送:MsgId:7F00000134C418B4AAC20D8C4D220000,发送状态:SEND_OK

延迟消息

延迟消息是指消息发送后,消费者要在一定时间后,或者指定某个时间点才可以消费。
在没有延迟消息时,基本的做法是基于定时计划任务调度,定时发送消息。在 RocketMQ中
只需要在发送消息时设置延迟级别即可实现。
image-lf8o.png
Broker在启动时,内部会创建一个内部主题:SCHEDULE_TOPIC_XXXX,根据延迟level的个
数,创建对应数量的队列,也就是说18个level对应了18个队列。注意,这并不是说这个内
部主题只会有18个队列,因为Broker通常是集群模式部署的,因此每个节点都有18个队列。
image-ld7s.png

1. 修改消息Topic名称和队列信息
RocketMQ Broker端在存储生产者写入的消息时,首先都会将其写入到CommitLog中。之
后将Topic的名称修改为SCHEDULE_TOPIC_XXXX,并根据延迟级别确定要投递到哪个队列下。
2. 转发消息到延迟主题的CosumeQueue中
CommitLog中的消息转发到CosumeQueue中是异步进行的。在转发过程中,会对延迟消息
进行特殊处理,主要是计算这条延迟消息需要在什么时候进行投递。
3. 延迟服务消费SCHEDULE_TOPIC_XXXX消息
Broker内部有一个ScheduleMessageService类,其充当延迟服务,消费
SCHEDULE_TOPIC_XXXX中的消息,并投递到目标Topic中。ScheduleMessageService在启动时,其会创建一个定时器Timer,并根据延迟级别的个数,启动对应数量的TimerTask,每个TimerTask负责一个延迟级别的消费与投递。
4. 将信息重新存储到CommitLog中
在将消息到期后,需要投递到目标Topic。由于在第一步已经记录了原来的Topic和队列
信息,因此这里重新设置,再存储到CommitLog即可。此外,由于之前Message Tag
HashCode字段存储的是消息的投递时间,这里需要重新计算tag的哈希值后再存储
5. 将消息投递到目标Topic中
这一步与第二步类似,不过由于消息的Topic名称已经改为了目标Topic。因此消息会直
接投递到目标Topic的ConsumeQueue中,之后消费者即消费到这条消息。
6. 消费者消费目标topic中的数据

public class MessageType3 {
    public static void main(String[] args) {
        DefaultMQProducer producer = new
        DefaultMQProducer("ProducerGroupName");
        producer.setNamesrvAddr("192.168.31.103:9876");
        producer.setRetryTimesWhenSendAsyncFailed(2);
        try {
            producer.start();
            long id = 4466l;
            String data = "{\"id\":" + id+" , + \"title\":\"X市2021年度第四季度税务汇总数据
            \"}";
            Message message = new Message("tax-data", "2021S4",
            data.getBytes(RemotingHelper.DEFAULT_CHARSET));
            //1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
            message.setDelayTimeLevel(5);
            SendResult result = producer.send(message);
            System.out.println("消息已发送:MsgId:" + result.getMsgId() + ",发送状态:"
            + result.getSendStatus());
        }catch (Exception e){
            e.printStackTrace();
        }finally {
        try {
            producer.shutdown();
                System.out.println("连接已关闭");
            }catch (Exception e){
                e.printStackTrace();
            }
        }
    }
}

修改延时配置

single-master.properties
#集群名称,同一个集群下的broker要求统一
brokerClusterName=DefaultCluster
#broker名称
brokerName=broker-a
messageDelayLevel=90s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

事务消息

先写库还是先发消息?
首先,咱们来看一下工作场景,订单ID1030被创建后要保存到数据库,同时该1030订单通过MQ投递给其他系统进行消费。如果要保证订单数据入库与消息投递状态要保证最终一致,要怎么做?
这里有两种常见做法:
第一种,先写库,再发送数据

//伪代码
//插入1030号订单
orderDao.insert(1030,order);
//向1030号订单新增3条订单明细,10081-10083,
orderDetailDao.insert(10081,1030,orderDetail1);
orderDetailDao.insert(10082,1030,orderDetail2);
orderDetailDao.insert(10083,1030,orderDetail3);
//向MQ发送数据,如果数据发送失败
SendResult result = producer.send(orderMessage)
if(result.getState().equals("SEND_OK"))){
    connection.commit();
}else{
    connection.rollback();
}

如果生产者发送消息时,因为网络原因导致10秒消息才返回SendResult结果,这就意味这10秒内数据库事务无法提交,大量并发下,数据库连接资源会在这10秒内迅速耗尽,后续请求进入连接池等待状态,最终导致系统停止响应。

第二种,先发消息,再写库

//伪代码
//向MQ发送数据,如果数据发送失败
SendResult result = producer.send(orderMessage)
if(result.getState().equals("SEND_OK"))){
//插入1030号订单
orderDao.insert(1030,order);
//向1030号订单新增3条订单明细,10081-10083,
orderDetailDao.insert(10081,1030,orderDetail1);
orderDetailDao.insert(10082,1030,orderDetail2);
orderDetailDao.insert(10083,1030,orderDetail3);
connection.commit;
}

问题更严重,因为消息已经被发送了,消费者可以立即消费,比如下游消费者为1030订单自动设置了“快递信息”,可是如果后续orderDao向数据库插入数据产生异常导致业务失败。我们还需要再次发送“取消1030订单”的消息把下游1030订单分配的“快递信息”给撤销,这些都是在业务层面上的额外处理,这无疑提高了对程序员的要求与处理的难度。

那有没有什么方式可以既不阻塞数据库事务,也能保证最终一致性呢?有,RocketMQ提供了事务消息可以保障应用本地事务与MQ最终一致性。

实践案例

架构拓扑

image-scvy.png

代码分析

MessageType4-发出事务消息代码

public class MessageType4 {
    public static void main(String[] args) throws MQClientException, InterruptedException,
        UnsupportedEncodingException {
        //事务消息一定要使用TransactionMQProducer事务生产者创建
        TransactionMQProducer producer = new
        TransactionMQProducer("transaction_producer_group");
        //从NameServer获取配置数据
        producer.setNamesrvAddr("192.168.31.103:9876");
        //CachedThreadPool线程池用于回查本地事务状态
        ExecutorService cachedThreadPool = Executors.newCachedThreadPool(new
        ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setName("check-transaction-thread");
                return thread;
                }
            });
        //将生产者与线程池绑定
        producer.setExecutorService(cachedThreadPool);
        //绑定事务监听器,用于执行代码
        TransactionListener transactionListener = new OrderTransactionListenerImpl();
        producer.setTransactionListener(transactionListener);
        //启动生产者
        producer.start();
        //创建消息对象
        Message msg =
        new Message("order","order-1030" ,
        "1030", "1030订单与明细的完整JSON数据(略)".getBytes());
        //一定要调用sendMessageInTransaction发送事务消息
        //参数1:消息对象
        //参数2:其他参数,目前用不到
        producer.sendMessageInTransaction(msg, null);
    }
}

TransactionListenerImpl-处理本地事务业务代码

public class OrderTransactionListenerImpl implements TransactionListener {
    @Override
    //执行本地事务代码
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        log.info("正在执行本地事务,订单编号:" + msg.getKeys());
        /* 伪代码
        try{
            //插入1030号订单
            orderDao.insert(1030,order);
            //向1030号订单新增3条订单明细,10081-10083,
            orderDetailDao.insert(10081,1030,orderDetail1);
            orderDetailDao.insert(10082,1030,orderDetail2);
            orderDetailDao.insert(10083,1030,orderDetail3);
            connection.commit();
            //返回Commit,消费者可以消费1030订单消息
            return LocalTransactionState.COMMIT_MESSAGE;
        }catch(Exception e){
            //返回Rollback,Broker直接将数据删除,消费者不能收到1030订单消息
            connection.rollback();
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }
        */
        log.info("模拟网络中断,Broker并未收到生产者本地事务状态回执,返回UNKNOW");
        return LocalTransactionState.UNKNOW;
        }
        @Override
        //会查本地事务处理状态
        public LocalTransactionState checkLocalTransaction(MessageExt msg) {
            String keys = msg.getKeys();
            log.info("触发回查,正在检查" + keys + "订单状态");
            /* 伪代码
            Order order = orderDao.selectById(1030);
            if(order != null){
                //查询到记录,代表数据库已处理成功,回查返回Commit,消费者可以消费1030订单消
                息
                return LocalTransactionState.COMMIT_MESSAGE;
            }else{
                //未查询到记录,代表数据库处理失败,回查返回Rollback,Broker直接将数据删除,消
                费者不能收到1030订单消息
                return LocalTransactionState.ROLLBACK_MESSAGE;
            }
            */
            log.info("回查结果," + keys + "订单已入库,发送Commit指令");
            return LocalTransactionState.COMMIT_MESSAGE;
    }
}

事务执行过程

标准流程

1. producer.sendMessageInTransaction(msg, null); 执行成功
此时1030订单消息已被发送到MQ服务器(Broker),不过该消息在Broker此时状态为“halfmessage”,相当于存储在MQ中的“临时消息”,此状态下消息无法被投递给消费者。

image-yjvt.png

2. 生产者发送消息成功后自动触发
OrderTransactionListenerImpl.executeLocalTransaction()执行本地事务
当消息发送成功,紧接着生产者向本地数据库写数据,数据库写入后提交commit,同时
executeLocalTransaction方法返回COMMIT_MESSAGE,生产者会再次向MQ服务器发送一个commit提交消息,此前在Broker中保存1030订单消息状态就从“half-message”变为"已提交",broker将消息发给下游的消费者处理。

image-qnei.png

public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
    /* 伪代码
    try{
        orderDao.insert(1030,order);
        //向1030号订单新增3条订单明细,10081-10083,
        orderDetailDao.insert(10081,1030,orderDetail1);
        orderDetailDao.insert(10082,1030,orderDetail2);
        orderDetailDao.insert(10083,1030,orderDetail3);
        connection.commit();
        //返回Commit,消费者可以消费1030订单消息
        return LocalTransactionState.COMMIT_MESSAGE;
    }catch(Exception e){...}
}

异常流程1

producer.sendMessageInTransaction(msg, null); 执行失败,抛出异常
此时没有任何消息被发出,本地事务也不会执行,除了报错外不会产生任何不一致。

image-l9ji.png

异常流程2

producer.sendMessageInTransaction(msg, null); 执行成功,本地事务执行失败
OrderTransactionListenerImpl:

public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
    /* 伪代码
    try{
        //插入1030号订单
        orderDao.insert(1030,order);
        //插入失败
        orderDetailDao.insert(10081,1030,orderDetail1);
        ...
    }catch(Exception e){
        //返回Rollback,Broker直接将数据删除,消费者不能收到1030订单消息
        connection.rollback();
        return LocalTransactionState.ROLLBACK_MESSAGE;
    }
}

此时本地事务执行rollback回滚,数据库数据被撤销,同时executeLocalTransaction方法返回ROLLBACK_MESSAGE代表回滚,生产者会再次向MQ服务器发送一个rollback回滚消息,此前在Broker中保存1030订单消息就会被直接删除,不会发送给消费者,本地事务也可以保证与MQ消息一致。

image-pilz.png

异常流程3

producer.sendMessageInTransaction(msg, null); 执行成功,本地事务执行成功,但给Broker返回Commit消息时断网了,导致broker无法收到提交指令。
orderTransactionListenerImpl:

public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
    /* 伪代码
    try{
        orderDao.insert(1030,order);
        //向1030号订单新增3条订单明细,10081-10083,
        orderDetailDao.insert(10081,1030,orderDetail1);
        orderDetailDao.insert(10082,1030,orderDetail2);
        orderDetailDao.insert(10083,1030,orderDetail3);
        connection.commit();
        //返回Commit时网络中断
        return LocalTransactionState.COMMIT_MESSAGE;
    }catch(Exception e){...}
}

此时本地数据库订单数据已入库,但MQ因为断网无法收到生产者的发来的“commit”消息,
1030订单数据一直处于“half message”的状态,消息无法被投递到消费者,本地事务与MQ消息的一致性被破坏。

image-p6ip.png

RocketMQ为了解决这个问题,设计了回查机制,对于broker中的half message,每过一小段时间就自动尝试与生产者通信,试图调用通OrderTransactionListenerImpl.checkLocalTransaction()方法确认之前的本地事务是否成功。

//会查本地事务处理状态
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
    String keys = msg.getKeys();
    log.info("触发回查,正在检查" + keys + "订单状态");
    /* 伪代码
    Order order = orderDao.selectById(1030);
    if(order != null){
        //查询到记录,代表数据库已处理成功,回查返回Commit,消费者可以消费1030订单消息
        return LocalTransactionState.COMMIT_MESSAGE;
    }else{
        //未查询到记录,代表数据库处理失败,回查返回Rollback,Broker直接将数据删除,消费
        者不能收到1030订单消息
        return LocalTransactionState.ROLLBACK_MESSAGE;
    }
    */
    log.info("回查结果," + keys + "订单已入库,发送Commit指令");
    return LocalTransactionState.COMMIT_MESSAGE;
    }

checkLocalTransaction()查询到订单数据,说明之前的数据库事务已经完成,返回
COMMIT_MESSAGE,这样Broker中的1030订单消息就可以被发送给消费者进行处理。

image-0i8h.png

运行结果:

22:31:35.670 [main] INFO com.itlaoqi.rocketmq.mtype.OrderTransactionListenerImpl - 正在执行本地事务,订单编号:1030
22:31:35.672 [main] INFOcom.itlaoqi.rocketmq.mtype.OrderTransactionListenerImpl - 模拟网络中断,Broker并未收到生产者本地事务状态回执,返回UNKNOW
22:31:45.995 [check-transaction-thread] INFO com.itlaoqi.rocketmq.mtype.OrderTransactionListenerImpl - 触发回查,正在检查1030订单状态
22:31:45.996 [check-transaction-thread] INFO
com.itlaoqi.rocketmq.mtype.OrderTransactionListenerImpl - 回查结果,1030订单已入库,
发送Commit指令

checkLocalTransaction()未查询到订单数据,说明之前的数据库事务没有处理成功,返回
ROLLBACK_MESSAGE,这样Broker中的1030订单消息就会被删除。

image-awh5.png

事务消息执行执行流程

image-4fqq.png

0

评论区