关于ActiveMQ的几种集群配置

简介:

ActiveMQ的几种集群配置。

Queue consumer clusters

此集群让多个消费者同时消费一个队列,若某个消费者出问题无法消费信息,则未消费掉的消息将被发给其他正常的消费者,结构图如下:

Broker clusters

此种配置是一个消费者连接到多个broker集群的中的一个broker,当该broker出问题时,消费者自动连接到其他一个正常的broker。消费者使用 failover:// 协议来连接broker。

failover:(tcp://localhost:61616,tcp://localhost:61617)

failover官网介绍 http://activemq.apache.org/failover-transport-reference.html

broker之间的通过静态发现(static discovery)和动态发现(dynamic discovery)来维持彼此发现,下面来介绍静态发现和动态发现的机制:

静态发现:

静态发现通过配置固定的broker uri来发现彼此,配置语法如下:

static:(uri1,uri2,uri3,...)?options

例如:

1
static :(tcp: //localhost:61616,tcp://remotehost:61617?trace=false,vm://localbroker)?initialReconnectDelay=100

  

更多静态发现介绍,见ActiveMQ官网 http://activemq.apache.org/static-transport-reference.html

动态发现:

动态发现机制是在各个broker启动时通过Fanout transport来发现彼此,配置举例如下:

1
2
3
4
5
6
<broker name= "foo" >
   <transportConnectors>
     <transportConnector uri= "tcp://localhost:0"  discoveryUri= "multicast://default" />
   </transportConnectors>
   ...
</broker>

 更多动态发现机制介绍,见官网 http://activemq.apache.org/discovery-transport-reference.html

 

Networks of brokers

多个broker组成集群,当其中一个broker的消费者出问题导致消息堆积无法消费掉时,通过ActiveMQ支持的Network of Broker方案可将该broker堆积的消息转发到其他有消费者的broker。

该方案主要有以下两种配置方式:

1、为broker配置文件配置networkConnector元素

2、使用发现机制互相探测broker

Here is an example of using the fixed list of URIs:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
<? xml  version="1.0" encoding="UTF-8"?>
  
< beans  xmlns="http://activemq.org/config/1.0">
  
   < broker  brokerName="receiver" persistent="false" useJmx="false">
     < networkConnectors >
       <!-- Static discovery -->
       < networkConnector  uri="static:(tcp://localhost:62001)"/>
       <!-- MasterSlave Discovery -->
       <!--<networkConnector uri="masterslave:(tcp://host1:61616,tcp://host2:61616,tcp://..)"/> -->
     </ networkConnectors >
  
     < persistenceAdapter >
       < memoryPersistenceAdapter />
     </ persistenceAdapter >
  
    < transportConnectors >
       < transportConnector  uri="tcp://localhost:62002"/>
     </ transportConnectors >
   </ broker >
  
</ beans >

  

This example uses multicast discovery:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
<? xml  version="1.0" encoding="UTF-8"?>
  
< beans  xmlns="http://activemq.org/config/1.0">
  
   < broker  name="sender" persistent="false" useJmx="false">
     < networkConnectors >
       < networkConnector  uri="multicast://default"/>
     </ networkConnectors >
  
     < persistenceAdapter >
       < memoryPersistenceAdapter />
     </ persistenceAdapter >
  
   < transportConnectors >
       < transportConnector  uri="tcp://localhost:0" discoveryUri="multicast://default"/>
     </ transportConnectors >
   </ broker >
  
</ beans >

  

Master Slave

通过部署多个broker实例,一个master和多个slave关系的broker来达到高可用性,有三种方案:

1、Master-Slave
2、SharedFile System Master Slave
3、JDBCMaster Slave

第一种方案由于只可以由两个AMQ实例组件,实际应用场景并不广泛;
第三种方案支持N个AMQ实例组网,但他的性能会受限于数据库;
第二种方案同样支持N个AMQ实例组网,基于kahadb存储策略,亦可以部署在分布式文件系统上,应用灵活、高效且安全。

Master Slave方案当其中一个broker启动并拿到独占锁时自动成为master,其他后续的broker则一直等待锁,当master宕机释放锁时其他slave拿到独占锁则自动成为master,部署结构如下:

第二种方案的配置只需修改config文件夹下activemq.xml文件,修改消息持久化使用的方案:

1
2
3
4
5
6
7
< broker  xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="D:/Platform/mq_share_file">
   ...
     < persistenceAdapter >
             < kahaDB  directory="D:/Platform/mq_share_file/kahadb" enableIndexWriteAsync="true" enableJournalDiskSyncs="false"/>
     </ persistenceAdapter >
     ...
</ broker >

 

消息生产者代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
public  class  P2PSender {
     private  static  final  String QUEUE =  "client1-to-client2" ;
 
     public  static  void  main(String[] args) {
         // ConnectionFactory :连接工厂,JMS用它创建连接
         ConnectionFactory connectionFactory;
         // Connection :JMS客户端到JMS Provider的连接
         Connection connection =  null ;
         // Session:一个发送或接收消息的线程
         Session session;
         // Destination :消息的目的地;消息发送给谁.
         Destination destination;
         // MessageProducer:消息发送者
         MessageProducer producer;
         // TextMessage message;
         // 构造ConnectionFactory实例对象,此处采用ActiveMq的实现
         connectionFactory =  new  ActiveMQConnectionFactory(
                 "failover:(tcp://localhost:61616?wireFormat.maxInactivityDuration=0,tcp://localhost:61617?wireFormat.maxInactivityDuration=0)" );
         try  {
             // 构造从工厂得到连接对象
             connection = connectionFactory.createConnection();
             // 启动
             connection.start();
             // 获取操作连接
             session = connection.createSession( false , Session.AUTO_ACKNOWLEDGE);
             destination = session.createQueue(QUEUE);
             // 获取session,FirstQueue是一个服务器的queue destination = session.createQueue("FirstQueue");
             // 得到消息生成者【发送者】
             producer = session.createProducer(destination);
             // 设置不持久化
             producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
             // 构造消息
             sendMessage(session, producer);
             // session.commit();
             connection.close();
         catch  (Exception e) {
             e.printStackTrace();
         finally  {
             if  ( null  != connection) {
                 try  {
                     connection.close();
                 catch  (JMSException e) {
                     e.printStackTrace();
                 }
             }
         }
     }
 
     public  static  void  sendMessage(Session session, MessageProducer producer)  throws  Exception {
         for  ( int  i =  1 ; i <=  1 ; i++) {
             Date d =  new  Date();
             TextMessage message = session.createTextMessage( "ActiveMQ发送消息"  + i +  "  "  new  Date());
             System.out.println( "发送消息:ActiveMQ发送的消息"  + i +  "  "  new  Date());
             producer.send(message);
         }
     }
}

消息消费者代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public  class  P2PReceiver {
     private  static  final  String QUEUE =  "client1-to-client2" ;
     
     public  static  void  main(String[] args) {
         // ConnectionFactory :连接工厂,JMS用它创建连接
         ConnectionFactory connectionFactory;
         // Connection :JMS客户端到JMS Provider的连接
         Connection connection =  null ;
         // Session:一个发送或接收消息的线程
         Session session;
         // Destination :消息的目的地;消息发送给谁.
         Destination destination;
         // 消费者,消息接收者
         MessageConsumer consumer;
         connectionFactory =  new  ActiveMQConnectionFactory( "failover:(tcp://localhost:61616?wireFormat.maxInactivityDuration=0,tcp://localhost:61617?wireFormat.maxInactivityDuration=0)" );
         try  {
             // 得到连接对象
             connection = connectionFactory.createConnection();
             // 启动
             connection.start();
             // 获取操作连接
             session = connection.createSession( false , Session.AUTO_ACKNOWLEDGE);
             // 创建Queue
             destination = session.createQueue(QUEUE);
             consumer = session.createConsumer(destination);
             while  ( true ) {
                 TextMessage message = (TextMessage) consumer.receive();
                 if  ( null  != message) {
                     System.out.println( "收到消息"  + message.getText());
                 }
             }
         catch  (Exception e) {
             e.printStackTrace();
         finally  {
             try  {
                 if  ( null  != connection)
                     connection.close();
             catch  (Throwable ignore) {
             }
         }
     }
}

  



本文转自邴越博客园博客,原文链接:http://www.cnblogs.com/binyue/p/5325945.html,如需转载请自行联系原作者

相关文章
|
4天前
|
消息中间件 Kafka
Kafka【部署 03】Zookeeper与Kafka自动部署脚本
【4月更文挑战第11天】Kafka【部署 03】Zookeeper与Kafka自动部署脚本
24 8
|
10月前
|
消息中间件 存储 缓存
RabbitMQ 部署及配置详解(集群部署)
RabbitMQ 部署及配置详解(集群部署)
906 0
|
10月前
|
消息中间件 Ubuntu NoSQL
zookeeper+activeMQ 高可用
zookeeper+activeMQ 高可用
|
消息中间件 分布式计算 Kafka
Rocketmq、Rabbitmq、Kafka、Mongo、Elasticsearch、Logstash、Kibana、Nacos、Skywalking、Seata、Spark、Zookeeper安装
Rocketmq、Rabbitmq、Kafka、Mongo、Elasticsearch、Logstash、Kibana、Nacos、Skywalking、Seata、Spark、Zookeeper安装
292 0
Rocketmq、Rabbitmq、Kafka、Mongo、Elasticsearch、Logstash、Kibana、Nacos、Skywalking、Seata、Spark、Zookeeper安装
|
消息中间件 Java Kafka
集群环境下zookeeper和kafka详细安装教程
集群环境下zookeeper和kafka详细安装教程
171 0
集群环境下zookeeper和kafka详细安装教程
|
消息中间件 数据可视化 Kafka
kafka安装启动集群搭建
kafka安装启动集群搭建
kafka安装启动集群搭建
|
消息中间件 存储 开发工具
ActiveMQ - 集群
ActiveMQ - 集群
145 0
ActiveMQ - 集群
|
消息中间件 网络安全
activemq Broker-Cluster集群部署
activemq Broker-Cluster集群部署
215 0
|
存储 消息中间件 负载均衡
zookeeper集群 +kafka 集群部署(上)
Zookeeper 定义 zookeeper是一个开源的分布式的,为分布式框架提供协调服务的Apache项目 Zookeeper 工作机制 Zookeeper是–个基于观察者模式设计的分布式服务管理框架,它负责存储和管理大家都关心的数据,然后接受观察者的注册, 一旦这些数据的状态发生变化,Zookeeper就将负责通知已经在Zookeeper上注册的那些客户端做出相应的反应。 也就是说Zookeeper =文件系统+通知机制
zookeeper集群 +kafka 集群部署(上)
|
消息中间件 存储 分布式计算
zookeeper集群 +kafka 集群部署(下)
为什么需要消息列队(MQ)   主要原因是由于在高并发环境下,同步请求来不及处理,请求往往会发生阻塞。比如大量的请求并发访问数据库,导致行锁表锁,最后请求线程会堆积过多,从而触发 too many connection 错误,引发雪崩效应。 我们使用消息队列,通过异步处理请求,从而缓解系统的压力。消息队列常应用于异步处理,流量削峰,应用解耦,消息通讯等场景。 当前比较常见的 MQ 中间件有 ActiveMQ、RabbitMQ、RocketMQ、Kafka 等
zookeeper集群 +kafka 集群部署(下)

热门文章

最新文章