RocketMQ源码分析之RocketMQ事务消息实现原理上篇(二阶段提交)

简介: 初步展示了事务消息的发送流程,总的说来,RocketMQ的事务消息发送使用二阶段提交思路,首先,在消息发送时,先发送消息类型为Prepread类型的消息,然后在将该消息成功存入到消息服务器后,回调TransactionListener#executeLocalTransaction,执行本地事务状态回调函数,然后根据该方法的返回值,结束事务。

在阅读本文前,若您对RocketMQ技术感兴趣,请加入 RocketMQ技术交流群

根据上文的描述,发送事务消息的入口为:

TransactionMQProducer#sendMessageInTransaction:
public TransactionSendResult sendMessageInTransaction(final Message msg, final Object arg) throws MQClientException {
        if (null == this.transactionListener) {    // @1
            throw new MQClientException("TransactionListener is null", null);
        }

        return this.defaultMQProducerImpl.sendMessageInTransaction(msg, transactionListener, arg);  // @2
    }

代码@1:如果transactionListener为空,则直接抛出异常。
代码@2:调用defaultMQProducerImpl的sendMessageInTransaction方法。

接下来重点分享sendMessageInTransaction方法

DefaultMQProducerImpl#sendMessageInTransaction
public TransactionSendResult sendMessageInTransaction(final Message msg,
           final TransactionListener tranExecuter, final Object arg)  throws MQClientException {

Step1:首先先阐述一下参数含义。

  • final Message msg:消息
  • TransactionListener tranExecuter:事务监听器
  • Object arg:其他附加参数
DefaultMQProducerImpl#sendMessageInTransaction
SendResult sendResult = null;
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
try {
       sendResult = this.send(msg);
} catch (Exception e) {
       throw new MQClientException("send message Exception", e);
}

Step2:在消息属性中增加两个属性:TRAN_MSG,其值为true,表示为事务消息;PGROUP:消息所属发送者组,然后以同步方式发送消息。在消息发送之前,会先检查消息的属性TRAN_MSG,如果存在并且值为true,则通过设置消息系统标记的方式,设置消息为MessageSysFlag.TRANSACTION_PREPARED_TYPE。

DefaultMQProducerImpl#sendKernelImpl
final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {
       sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
}

SendMessageProcessor#sendMessage
String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (traFlag != null && Boolean.parseBoolean(traFlag)) {
        if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
             response.setCode(ResponseCode.NO_PERMISSION);
             response.setRemark(
                    "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
                        + "] sending transaction message is forbidden");
             return response;
       }
      putMessageResult = this.brokerController.getTransactionalMessageService().prepareMessage(msgInner);
} else {
      putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
}

Step3:Broker端收到客户端发送消息请求后,判断消息类型。如果是事务消息,则调用TransactionalMessageService#prepareMessage方法,否则走原先的逻辑,调用MessageStore#putMessage方法将消息存入Broker服务端。
本节重点阐述事务消息的实现原理,故接下来将重点关注prepareMessage方法,如想了解RocketMQ消息存储相关,可以关注作者源码分析RocketMQ系列

org.apache.rocketmq.broker.transaction.queue.TransactionalMessageServiceImpl#prepareMessage
public PutMessageResult prepareMessage(MessageExtBrokerInner messageInner) {
        return transactionalMessageBridge.putHalfMessage(messageInner);
 }

step4:事务消息,将调用TransactionalMessageServiceImpl#prepareMessage方法,继而调用TransactionalMessageBridge#prepareMessage方法。

TransactionalMessageBridge#parseHalfMessageInner
public PutMessageResult putHalfMessage(MessageExtBrokerInner messageInner) {
        return store.putMessage(parseHalfMessageInner(messageInner));
    }

    private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {
        MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());
        MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,
            String.valueOf(msgInner.getQueueId()));
        msgInner.setSysFlag(
            MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));
        msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());
        msgInner.setQueueId(0);
        msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
        return msgInner;
    }

Step5:备份消息的原主题名称与原队列ID,然后取消消息的事务消息标签,重新设置消息的主题为:RMQ_SYS_TRANS_HALF_TOPIC,队列ID固定为0。然后调用MessageStore#putMessage方法将消息持久化,这里TransactionalMessageBridge桥接类,就是封装事务消息的相关流程,最终调用MessageStore完成消息的持久化。消息入库后,会继续回到DefaultMQProducerImpl#sendMessageInTransaction,上文的Step2后面,也就是通过同步将消息发送到消息服务端。

注:这是事务消息Prepare状态的处理逻辑,消息是存储在消息服务器了,但存储的并不是原主题,而是RMQ_SYS_TRANS_HALF_TOPIC,故此时消费端是无法消费shen
生产者发送的消息的。看到这里,如果对RocketMQ比较熟悉的话,肯定会有一个“定时任务”去取这个主题下的消息,然后则“合适”的时机将消息的主题恢复。

DefaultMQProducerImpl#sendMessageInTransaction
switch (sendResult.getSendStatus()) {
            case SEND_OK: {
                try {
                    if (sendResult.getTransactionId() != null) {
                        msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
                    }
                    String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
                    if (null != transactionId && !"".equals(transactionId)) {
                        msg.setTransactionId(transactionId);
                    }
                    localTransactionState = tranExecuter.executeLocalTransaction(msg, arg);
                    if (null == localTransactionState) {
                        localTransactionState = LocalTransactionState.UNKNOW;
                    }

                    if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
                        log.info("executeLocalTransactionBranch return {}", localTransactionState);
                        log.info(msg.toString());
                    }
                } catch (Throwable e) {
                    log.info("executeLocalTransactionBranch exception", e);
                    log.info(msg.toString());
                    localException = e;
                }
            }
            break;
            case FLUSH_DISK_TIMEOUT:
            case FLUSH_SLAVE_TIMEOUT:
            case SLAVE_NOT_AVAILABLE:
                localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
                break;
            default:
                break;
        }

Step6:如果消息发送成功,会回调TransactionListener#executeLocalTransaction方法,执行本地事务,并且返回本地事务状态为LocalTransactionState,枚举值如下:

  • COMMIT_MESSAGE,
  • ROLLBACK_MESSAGE,
  • UNKNOW

注意:TransactionListener#executeLocalTransaction是在发送者成功发送PREPARED消息后,会执行本地事务方法,然后返回本地事务状态;如果PREPARED消息发送失败,则不会调用TransactionListener#executeLocalTransaction,并且本地事务消息,设置为LocalTransactionState.ROLLBACK_MESSAGE,表示消息需要被回滚。

DefaultMQProducerImpl#sendMessageInTransaction
try {
this.endTransaction(sendResult, localTransactionState, localException);
} catch (Exception e) {
log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
}

step7:调用endTransaction方法结束事务(提交或回滚)。

DefaultMQProducerImpl#endTransaction
EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();
requestHeader.setTransactionId(transactionId);
requestHeader.setCommitLogOffset(id.getOffset());
switch (localTransactionState) {
    case COMMIT_MESSAGE:
         requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
         break;
    case ROLLBACK_MESSAGE:
         requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
         break;
     case UNKNOW:
         requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
         break;
     default:
         break;
}
requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());
requestHeader.setMsgId(sendResult.getMsgId());

step8:组装结束事务请求,主要参数为:事务ID、事务操作(commitOrRollback)、消费组、消息队列偏移量、消息ID,fromTransactionCheck,从这里发出的请求,默认为false。Broker端的请求处理器为:EndTransactionProcessor。

step9:EndTransactionProcessor根据事务提交类型:TRANSACTION_COMMIT_TYPE(提交事务)、TRANSACTION_ROLLBACK_TYPE(回滚事务)、TRANSACTION_NOT_TYPE(忽略该请求)。

到目前为止,已详细梳理了RocketMQ事务消息的发送流程,更加准确的说是Prepare状态的消息发送流程。具体流程如图所示:
clipboard

本文到这里,初步展示了事务消息的发送流程,总的说来,RocketMQ的事务消息发送使用二阶段提交思路,首先,在消息发送时,先发送消息类型为Prepread类型的消息,然后在将该消息成功存入到消息服务器后,会回调TransactionListener#executeLocalTransaction,执行本地事务状态回调函数,然后根据该方法的返回值,结束事务:
1、COMMIT_MESSAGE :提交事务。
2、ROLLBACK_MESSAGE:回滚事务。
3、UNKNOW:未知事务状态,此时消息服务器(Broker)收到EndTransaction命令时,将不对这种消息做处理,消息还处于Prepared类型,存储在主题为:RMQ_SYS_TRANS_HALF_TOPIC的队列中,然后消息发送流程将结束,那这些消息如何提交或回滚呢?

为了实现避免客户端需要再次发送提交、回滚命令,RocketMQ会采取定时任务将RMQ_SYS_TRANS_HALF_TOPIC中的消息取出,然后回到客户端,判断该消息是否需要提交或回滚,来完成事务消息的声明周期,该部分内容将在下节重点探讨。

若您对RocketMQ技术感兴趣,请加入作者所在的 RocketMQ技术交流群

相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
2月前
|
消息中间件 Java API
RocketMQ事务消息, 图文、源码学习探究~
介绍 RocketMQ是阿里巴巴开源的分布式消息中间件,它是一个高性能、低延迟、可靠的消息队列系统,用于在分布式系统中进行异步通信。 从4.3.0版本开始正式支持分布式事务消息~ RocketMq事务消息支持最终一致性:在普通消息基础上,支持二阶段的提交能力。将二阶段提交和本地事务绑定,实现全局提交结果的一致性。 原理、流程 本质上RocketMq的事务能力是基于二阶段提交来实现的 在消息发送上,将二阶段提交与本地事务绑定 本地事务执行成功,则事务消息成功,可以交由Consumer消费 本地事务执行失败,则事务消息失败,Consumer无法消费 但是,RocketMq只能保证本地事务
|
2月前
|
消息中间件 存储 Apache
精华推荐 | 【深入浅出RocketMQ原理及实战】「性能原理挖掘系列」透彻剖析贯穿RocketMQ的事务性消息的底层原理并在分析其实际开发场景
事务消息(Transactional Message)是指应用本地事务和发送消息操作可以被定义到全局事务中,要么同时成功,要么同时失败。RocketMQ的事务消息提供类似 X/Open XA 的分布事务功能,通过事务消息能达到分布式事务的最终一致。
355 2
精华推荐 | 【深入浅出RocketMQ原理及实战】「性能原理挖掘系列」透彻剖析贯穿RocketMQ的事务性消息的底层原理并在分析其实际开发场景
|
2月前
|
消息中间件 RocketMQ Docker
分布式事物【RocketMQ事务消息、Docker安装 RocketMQ、实现订单微服务、订单微服务业务层实现】(八)-全面详解(学习总结---从入门到深化)
分布式事物【RocketMQ事务消息、Docker安装 RocketMQ、实现订单微服务、订单微服务业务层实现】(八)-全面详解(学习总结---从入门到深化)
53 0
|
3月前
|
消息中间件 数据库 RocketMQ
Springboot+RocketMQ通过事务消息优雅的实现订单支付功能
RocketMQ的事务消息,是指发送消息事件和其他事件需要同时成功或同时失败。比如银行转账, A银行的某账户要转一万元到B银行的某账户。A银行发送“B银行账户增加一万元”这个消息,要和“从A银 行账户扣除一万元”这个操作同时成功或者同时失败。RocketMQ采用两阶段提交的方式实现事务消息。
108 0
|
3月前
|
消息中间件 RocketMQ Docker
分布式事物【RocketMQ事务消息、Docker安装 RocketMQ、实现订单微服务、订单微服务业务层实现】(八)-全面详解(学习总结---从入门到深化)(下)
分布式事物【RocketMQ事务消息、Docker安装 RocketMQ、实现订单微服务、订单微服务业务层实现】(八)-全面详解(学习总结---从入门到深化)
30 0
|
消息中间件 RocketMQ Docker
分布式事物【RocketMQ事务消息、Docker安装 RocketMQ、实现订单微服务、订单微服务业务层实现】(八)-全面详解(学习总结---从入门到深化)(上)
分布式事物【RocketMQ事务消息、Docker安装 RocketMQ、实现订单微服务、订单微服务业务层实现】(八)-全面详解(学习总结---从入门到深化)
66 0
|
3月前
|
存储 消息中间件 关系型数据库
解密分布式事务:CAP理论、BASE理论、两阶段提交(2PC)、三阶段提交(3PC)、补偿事务(TCC)、MQ事务消息、最大努力通知
解密分布式事务:CAP理论、BASE理论、两阶段提交(2PC)、三阶段提交(3PC)、补偿事务(TCC)、MQ事务消息、最大努力通知
|
消息中间件 算法 Java
弥补延时消息的不足,RocketMQ 基于时间轮算法实现了定时消息!
弥补延时消息的不足,RocketMQ 基于时间轮算法实现了定时消息!
604 1
弥补延时消息的不足,RocketMQ 基于时间轮算法实现了定时消息!
|
消息中间件 Apache RocketMQ
《万亿级数据洪峰下的消息引擎——Apache RocketMQ》电子版地址
万亿级数据洪峰下的消息引擎——Apache RocketMQ
310 0
《万亿级数据洪峰下的消息引擎——Apache RocketMQ》电子版地址
|
消息中间件 存储 缓存
RocketMQ Schema——让消息成为流动的结构化数据
RocketMQ Schema 提供了对消息的数据结构托管服务,同时为原生客户端提供了较为丰富的序列化/反序列化 SDK ,补齐了 RocketMQ 在数据治理和业务上下游解耦方面的短板,让数据成为流动的结构化数据,那么快来了解下实现原理吧~
466 0
RocketMQ Schema——让消息成为流动的结构化数据

热门文章

最新文章