谈消息总线客户端的多线程实现

简介: 最近在实现一个基于RabbitMQ的消息总线。因为它提供了Client(客户端),这里就牵扯到凡是技术组件的client都无法回避的并发问题。本文借实现消息总线的client谈谈在实现过程中的想法以及最终的处理方式,当然这些都不仅仅适用于消息总线的client,其他通用组件的client也同样适用。

最近在实现一个基于RabbitMQ的消息总线。因为它提供了Client(客户端),这里就牵扯到凡是技术组件的client都无法回避的并发问题。本文借实现消息总线的client谈谈在实现过程中的想法以及最终的处理方式,当然这些都不仅仅适用于消息总线的client,其他通用组件的client也同样适用。

并发问题的分类

其实上面所提到的并发问题,从大的层面上可以划分为两类问题:

  • 自身固有的并发问题:这个存在的前提条件是client自身内部使用了多线程技术,并且本身就存在线程安全的缺陷。
  • 被动调用的并发问题:指的是该client处于多线程调用环境下产生的线程安全问题。
如果你想单纯得看待怎样的并发问题算是自身固有的并发问题,那么你可以假设一个前提:如果你client处于单线程的被调用环境中,那么你client内部使用了多线程,并且存在线程安全问题,就可以看成是纯粹意义上的自身固有的并发问题。说得再直白一点,如果你内部没有采用多线程,那么这个client你可以认为它不存在自身固有的并发问题。但有时候外部调用的多线程环境也能触发你client内部产生并发问题,这种情况下的并发问题我们将其归类为被动调用的并发问题。
听起来可能有些抽象,我们通过一个实例让它更直观一点。我们看redis的客户端jedis。jedis内部对redis的数据结构命令的调用实现并没有采用多线程技术,因此我们可以认为它没有自身固有的并发问题,但一旦你在多线程的环境下共享其Jedis对象(主对象),那么各种各样莫名其妙的错误就出来了。我们可以认为这是被动调用的并发问题。所以要实现一个绝对线程安全的类是非常不容易的,可以说代价也非常大——因为你必须同时考虑这两种不同的并发问题。换句话说,当你的client内部外部都可能存在多线程环境,那么你必须同时考虑调用以及被调用的线程安全。
为了不让讨论的问题变得大而空,这篇文章我们将关注点放在被动调用的并发问题。

RabbitMQ 通信简介

简单介绍一下RabbitMQ跟通信相关的两个关键对象:Connection、Channel。要通信,肯定要先建立TCP连接,这个过程主要由Connection 负责。RabbitMQ支持从一个Connection创建多个Channel,Channel定义并实现了通信的各种API。因此Connection 主要负责链路的建立,而Channel主要负责通信逻辑。RabbitMQ这么设计是为了避免创建太多Connection(每个Connection都是跟RabbitMQ Server的TCP连接)。而对Channel的设计就是网络中常用的“多路复用”技术。

共享实例

这是我最初的想法:构建完全线程安全的类,无论是怎样的被调用环境,都只使用一个单独的Messagebus 对象。这是对被调环境最友好的方式,但这种情况下你就必须以被调环境是多线程环境为基准来设计client的实现模型。
因此我之前的想法是在client内部建立一个Channel 的对象池。所有的单工(单向)通信,都直接从对象池中获取一个Channel对象,通信结束后再归还Channel到对象池。因为这里的Pool构建在Apache Common Pool的基础上,所以对Channel对象的获取与归还是线程安全的,而Channel对象在RabbitMQ 官方Java client中是被明确标注为线程安全的,因此在整个通信逻辑上没有线程安全问题。但完全共享实例意味着内部关键对象也是共享的,这里涉及到client内部的两个比较关键的对象:
  • Pubsuber:用于从一个pubsuberCenter获取实时配置变更数据
  • ConfigManager:用于解析pubsuberManager push过来的数据并及时调整客户端的控制逻辑
在共享实例的实现方式下,这两个对象提供的API必须是线程安全的。与此同时,所有client主对象的API都必须是同步的(最简单也是性能最差的API同步实现方式是将整个方法直接用synchronized标记,如果整个方法不实现为同步的,就必须在方法内部小心得处理同步问题)。

由共享实例向线程独占过渡

因为多线程问题最主要的根源就是数据共享的问题,因此共享实例的实现方式算是正中下怀,而且这是非常考验实现者并发能力的,本人作为并发新手,如果有办法敬而远之,那么是再好不过的了。因此我的思路逐渐从共享变量向独占过渡。
之前谈到一个Connection可以创建多个Channel。RabbitMQ 官方client在其doc上已经直接说明了:Channel是线程安全的,它直接支持在多线程环境下通信。
因此,我的思路很快就变成了像下面这样:

这种模式下,Connection被实现为单例模式,在一个JVM进程中(不管被调环境是否处于多线程状态)只会存在一个Connection对象。一个Client主对象(就是上图中的Messagebus对象)只关联一个独立的Channel并且约定一个Client主对象只能适用于一个独立的线程,不得跨线程使用。这样,整个client的通信逻辑中就完全不必关注并发问题。但这种模式下,Pubsuber以及ConfigManager仍然是单例的(也就是说是共享实例的模式),因此他们的API还必须实现为线程安全的。
这看起来是一种不错的方法,但当我已经开始动手实现之后,才发现一个问题:如果Messagebus对象在各个线程中是独占的,谁行驶Connection的关闭动作?如果它被某个线程上的Messagebus对象关闭了,对其他线程上正在工作的Messagebus将是一个灾难——它们将以一种极其不优雅的方式抛出异常。共享实例模式下,client主对象倒不存在这个问题,因为其client主对象的控制权完全归主线程所有。

完全线程独占的实现

client主对象以及所有的关联对象由线程独占(Connection对象在不同的线程中也是不同的实例)。这也是jedis的实现方式,其实在思考上面两种方式之前,我就了解了这种实现方式。但我一直在尝试是否还有其他的方式,因为这么做毕竟比较浪费资源——客户端创建多少个通信线程,就至少有多少套对象簇(包括这么多Connection对象)。但这也是最简单、通信性能最好的方式——因为它完全不需要处理被调线程安全问题,并且也不像上面一种思路中需要为谁掌管Connection的控制权而纠结。单个线程的内存模型如下:

也就是说,当你创建一个Messagebus client对象,就会有上面虚线框内的关联对象簇被创建(它们跟client主对象有相同的生命周期)。
这种实现方式是如何解决被调多线程并发问题的?两步:
  • 基于约定:规定不得在多线程之间共享client主对象,否则后果自负
  • Client主对象以及依赖对象完全独占,并创建Client主对象的对象池
现在,在多线程环境下就不存在任何对象共享了:

你可以看一下它的 使用示例
这种模式,client的所有实现代码都无需再为线程安全问题而做任何伤害性能的加锁操作而且对象的归属上也非常清晰。但毫无疑问,它也存在一些缺点:
  1. 它浪费了不少资源:不只是客户端的JVM的内存空间,还有RabbitMQ Server的连接资源
  2. 安全隐患:这种模式,如果服务端不做任何处理,客户端甚至可以通过不断创建Messagebus 对象,而发起DDos攻击
其实,归根到底一切都是权衡——看你关注什么又愿意舍弃什么。

总结

其实很多API都无法完全做到十全十美,本文的并发问题只是一个普遍现象,其他的问题还有乱用、错用的问题。比如上面说到的第二种模型,如果我们不谈消息总线,只采用RabbitMQ原生的java client的话,多线程通信时你可以这样:在主线程上创建Connection对象,然后为每个线程分配独立的Channel对象,最终在主线程上关闭Connection对象。但Channel对象开放了获取Connection对象的API,因此也就给了每个线程对Connection的控制权。你只能说技术组件只能顾好自身,它不揣测任何使用场景以及使用意图。




原文发布时间为:2015-03-08

本文作者:vinoYang

本文来自云栖社区合作伙伴 CSDN博客,了解相关信息可以关注CSDN博客。




相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
3月前
|
消息中间件 移动开发 NoSQL
Redis 协议 事务 发布订阅 异步连接
Redis 协议 事务 发布订阅 异步连接
|
1月前
|
消息中间件 缓存 API
|
8月前
|
消息中间件 存储 微服务
RPC 和消息队列的区别
RPC 和消息队列的区别
177 0
|
9月前
|
运维 监控 Dubbo
Dubbo协议异步单一长连接原理与优势
Dubbo协议异步单一长连接原理与优势
417 0
|
消息中间件 存储 设计模式
Seata 高性能 RPC 通信的实现- 巧用 reactor 模式
reactor 模式是一种事件驱动的应用层 I/O 处理模式,基于分而治之和事件驱动的思想,致力于构建一个高性能的可伸缩的 I/O 处理模式
122 0
|
消息中间件 安全 JavaScript
小家Spring】从Spring中的(ApplicationEvent)事件驱动机制出发,聊聊【观察者模式】【监听者模式】【发布订阅模式】【消息队列MQ】【EventSourcing】...(中)
小家Spring】从Spring中的(ApplicationEvent)事件驱动机制出发,聊聊【观察者模式】【监听者模式】【发布订阅模式】【消息队列MQ】【EventSourcing】...(中)
|
Dubbo Java 应用服务中间件
dubbo协议下的单一长连接与多线程并发如何协同工作
dubbo协议下的单一长连接与多线程并发如何协同工作
180 0
dubbo协议下的单一长连接与多线程并发如何协同工作
|
安全 微服务
|
消息中间件 设计模式 安全
小家Spring】从Spring中的(ApplicationEvent)事件驱动机制出发,聊聊【观察者模式】【监听者模式】【发布订阅模式】【消息队列MQ】【EventSourcing】...(下)
小家Spring】从Spring中的(ApplicationEvent)事件驱动机制出发,聊聊【观察者模式】【监听者模式】【发布订阅模式】【消息队列MQ】【EventSourcing】...(下)
|
消息中间件 前端开发 安全
小家Spring】从Spring中的(ApplicationEvent)事件驱动机制出发,聊聊【观察者模式】【监听者模式】【发布订阅模式】【消息队列MQ】【EventSourcing】...(上)
小家Spring】从Spring中的(ApplicationEvent)事件驱动机制出发,聊聊【观察者模式】【监听者模式】【发布订阅模式】【消息队列MQ】【EventSourcing】...(上)
小家Spring】从Spring中的(ApplicationEvent)事件驱动机制出发,聊聊【观察者模式】【监听者模式】【发布订阅模式】【消息队列MQ】【EventSourcing】...(上)