RocketMQ实战(四)

简介:

前言

这将是RocketMQ实战系列的最后一篇文章,该系列的文章列表如下:

《RocketMQ实战(一)》

《RocketMQ实战(二)》

《RocketMQ实战(三):分布式事务》

RocketMQ 3.2.6的事务机制

在上一篇博客中,已经知道RocketMQ 3.0.8是支持事务回查机制,但是在RocketMQ 3.2.6中取消了这个功能,下面我们继续以转账功能分析我们自己如何解决这个问题。

wKiom1kJ1cPRn6RcAABWBYYjcKA754.jpg


在正常情况下,当然没有问题,如果第五步(向MQ发送确认消息)出现失败,加上RocketMQ 3.2.6版本没有事务回查机制,就会导致这条转账消息,在A银行完成了操作,但是迟迟对B银行系统不可见!

wKiom1kJ1gSAXzelAAB6Q70jL3M558.png

用户U1从A银行系统转账给B银行系统的用户U2的处理过程如下:

第一步:A银行系统生成一条转账消息,以事务消息的方式写入RocketMQ,此时B银行系统不可见这条消息

第二步:写入MQ成功后,回调A银行系统,对T1,T2表进行操作(很显然需要是一个事务)

我们重点关注下T2表,这个表是用来干嘛的呢?每条转账消息都会在T2表中,该表有2个特殊的字段:status,updatetime。(用途会在后文详述)

第三步:完成第二步,接下来发送确认消息给MQ,如果这个确认消息发送成功,那么这条转账消息,将对B银行系统可见。然后B银行系统,会在一个事务中完成对t3,t5的操作。

如果发送确认消息给MQ失败的处理思路:

首先,B银行系统,有一个定时任务(比如说每隔1MIN执行一次),扫描表t5,取得一段时间内的数据,发送给A银行系统。要知道t5中的数据,必然是A银行系统成功处理并发送确认消息成功的转账数据。为什么要发送给A银行系统呢,其实就是为了找到那些发送确认消息失败的转账数据。那么怎么发给A银行系统呢,这个方式比较多,可以考虑在来一个Topic,也可以考虑Netty等。发送给A银行系统,其实就是为了更新t2表的status,updatetime。

这里有一个关键,如何“扫描表t5,取得一段时间内的数据”?这就是t4的作用,在t4中记录一个time字段,每次定时任务启动,先更新time(比如设定为当前系统时间,设置前的的时间为old),然后扫描出t5中大于这个old时间的转账数据,如此循环往复。

其次,A银行系统,也有一个定时任务(可以根据业务消费能力定,可以大一些),扫描t2表(指定status及updatetime条件),将那些确认消息发送失败的转账消息找出来,更新updatetime并发送给MQ。

这样,我们并没有改动RocketMQ 3.2.6的源码,而是在外围解决了事务回查!


其实到这里,你可以发现RocketMQ的一个特点,就是将生产者和MQ绑定,而不需要特别处理消费者,这是为什么呢?因为消息只要发往RocketMQ成功,那么就意味着成功,为什么这么说?

前面,我们说过,消费者端消费消息只会产生2种错误,第一:timeout,第二:exception。要知道RocketMQ对于超时,会不断重试;对于消费异常,会根据消费端的返回码,会有重试机制保证。也就是,RocketMQ一定会让消息得到消费,如果消费有问题,只能是消费者的问题,而不会是RocketMQ的问题!



Pull Or Push

在前面的博客已经提到,在RocketMQ中Consumer分为2类:Push Consumer、Pull Consumer。以前的例子都是Push Consumer,接下来,为大家介绍下Pull Consumer。

wKioL1kJ1l-Q8Be4AAAhA9Onb10883.png


wKiom1kJ1nuz0T0dAADIcid4cW4589.png


从表面意思上来看,好像Push是MQ推送给消费者,而Pull是消费者从MQ中拉取;其实本质上都是拉取模式PULL,即消费者从MQ中轮询取得消息。

在Push模式下,Consumer把轮询过程封装了,并注册了MessageListener监听器,取到消息后,唤醒MessageListener监听器中的consumeMessage()进行消费,所以给我们造成了感觉上好像是“推消息”。

在Pull模式下,需要特别注意的是,本质上是从一个Topic下的所有Queue进行拉取,而且每个Queue都必须记录拉取位置,否则会导致重复消费。还有拉取的时间间隔,拉取的大小等等。不过所有的这一切,MQPullConsumerScheduleService都替我们考虑清楚了,提供updateConsumeOffset去更新消费的队列的位置(默认5S同步一次),提供setPullNextDelayTimeMillis设置下次拉取的时间间隔(应该设置的大一些,至少大于5S)。

仔细回想下,对于Push方式的回调   和  Pull方式的回调,还有什么关键区别么?

对于Push而言,不论是基于MessageListenerConcurrently的,还是基于MessageListenerOrderly的,都有返回值的;而Pull的doPullTask的返回值却是void?

这意味,我们需要在pull方式中,注意自己处理每条消息消费的异常情况!


wKioL1kJ1qjCh3XgAABfya_2KQ0041.png


通过运行结果,可以印证上面的观点:为什么每次消费都是4条开始,4条结束呢?因为一个Topic下有4个Queue,而且上面的代码实际上会针对每个Queue开启一个线程去消费!


RocketMQ Filter组件介绍

对于ActiveMQ而言,我们可以通过JMS Selectors机制(就是类似于SQL的语法)来实现过滤,很easy。那么和RocketMQ Filter组件有什么区别呢?

虽然,2者都能实现过滤,但是RocketMQ Filter的性能要更高效些,因为RocketMQ是在broker上将过滤后的数据发往filter,然后消费者直接从filter上取得数据;而ActiveMQ是消费者直接在broker上进行过滤消费!(当然,对于RocketMQ而言,Tag机制已经足够应付日常绝大数的过滤功能,除非你的业务对性能有特别高的要求)


wKioL1kJ1uaBn0l2AABatuiY9e8836.png


具体怎么做呢?这里我就不演示了,网上有很多例子,这里只说下大致的过程:

第一:broker-xxx.properties中指定filter个数 

第二:上传一段JAVA代码,其实就是一个类


到这里,整个RocketMQ实战系列就结束呢,你学到了么,体会到RocketMQ的强大了么?

See u next blog!

本文转自zfz_linux_boy 51CTO博客,原文链接:http://blog.51cto.com/zhangfengzhe/1921757,如需转载请自行联系原作者
相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
2月前
|
消息中间件 RocketMQ 微服务
RocketMQ 分布式事务消息实战指南
RocketMQ 分布式事务消息实战指南
271 1
|
2月前
|
消息中间件 存储 监控
搭建消息时光机:深入探究RabbitMQ_recent_history_exchange在Spring Boot中的应用【RabbitMQ实战 二】
搭建消息时光机:深入探究RabbitMQ_recent_history_exchange在Spring Boot中的应用【RabbitMQ实战 二】
32 1
|
6月前
|
消息中间件 存储 网络协议
企业实战(11)消息队列之Docker安装部署RabbitMQ实战
企业实战(11)消息队列之Docker安装部署RabbitMQ实战
125 0
|
7月前
|
物联网 Java Linux
一文读懂物联网 MQTT 协议之实战篇
一文读懂物联网 MQTT 协议之实战篇
220 1
|
2月前
|
消息中间件 监控 Java
Spring Boot中的RabbitMQ死信队列魔法:从异常到延迟,一网打尽【RabbitMQ实战 一】
Spring Boot中的RabbitMQ死信队列魔法:从异常到延迟,一网打尽【RabbitMQ实战 一】
64 0
|
28天前
|
消息中间件 存储 安全
【深入浅出RocketMQ原理及实战】「底层原理挖掘系列」透彻剖析贯穿RocketMQ的消息顺序消费和并发消费机制体系的原理分析
【深入浅出RocketMQ原理及实战】「底层原理挖掘系列」透彻剖析贯穿RocketMQ的消息顺序消费和并发消费机制体系的原理分析
26 0
|
7月前
|
消息中间件 存储 网络协议
从原理到实战,手把手教你在项目中使用RabbitMQ
RabbitMQ 的文章之前写过,但是当时给的示例是 Demo 版的,这篇文章主要是结合之前写的理论知识,将 RabbitMQ 集成到技术派项目中。 话不多说,上文章目录: 下面我们先回顾一下理论知识,如果对这块知识已经清楚的同学,可以直接跳到实战部分。 1. 消息队列 1.1 消息队列模式 消息队列目前主要 2 种模式,分别为“点对点模式”和“发布/订阅模式”。 点对点模式 一个具体的消息只能由一个消费者消费,多个生产者可以向同一个消息队列发送消息,但是一个消息在被一个消息者处理的时候,这个消息在队列上会被锁住或者被移除并且其他消费者无法处理该消息。 需要额外注意的是,如果消费者
446 5
|
28天前
|
消息中间件 存储 Kafka
【深入浅出 RocketMQ原理及实战】「底层源码挖掘系列」透彻剖析贯穿一下RocketMQ和Kafka索引设计原理和方案
【深入浅出 RocketMQ原理及实战】「底层源码挖掘系列」透彻剖析贯穿一下RocketMQ和Kafka索引设计原理和方案
48 1
|
1月前
|
消息中间件 前端开发 算法
【十七】RabbitMQ基础篇(延迟队列和死信队列实战)
【十七】RabbitMQ基础篇(延迟队列和死信队列实战)
42 1
|
1月前
|
消息中间件
动力RabbitMQ实战视频教程
动力RabbitMQ实战视频教程
22 3
动力RabbitMQ实战视频教程