消息中间件ActiveMQ(3)--P2P实验

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介:

博学,切问,近思--詹子知 (https://jameszhan.github.io)


点对点方式是最为传统和常见的通讯方式,它支持一对一、一对多、多对多、多对一等多种配置方式,支持树状、网状等多种拓扑结构。 

按照JMS规范,发送消息的步骤如下:

1.从连接工厂中拿出Connecion对象。

2.和服务器建立连接(Connection.start())。

3.创建会话(Session)对象。

4.通过Session,在指定的Queue创建消息生产者(MessageProducer)。

5.使用Session创建消息。

6.使用消息生产者发送消息。 import javax.jms.Connection; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; public class Producer { private String name; private String dest; private Connection conn; private MessageProducer producer; private Session session; public Producer(Connection conn, String dest, String name) { this.conn = conn; this.dest = dest; this.name = name; } public void start() throws JMSException { //conn 可以不连接,当发送消息是会自动建立连接。 conn.start(); session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); producer = session.createProducer(session.createQueue(dest)); } public void send(String text) throws JMSException{ TextMessage msg = session.createTextMessage(name + ": " + text); producer.send(msg); } }    

接收消息的步骤如下: 

1.从连接工厂中拿出Connecion对象。

2.和服务器建立连接(Connection.start())。 

3.创建会话(Session)对象。

4.通过Session,在指定的Queue创建消息接受者(MessageConsumer)。

5.1.调用messageConsumer.receive方法接受消息,如果队列上有消息,则receive方法返回该消息对象,如果队列上无消息,则该方法阻塞。

5.2.也可以以为Session指定MessageListener对象的方式来接受消息,该方法的好处在于,一旦有新消息到来,会自动触发该对象的onMessage方法执行。 

下类描述了以5.1的方式接受消息。 import java.util.concurrent.Executor; import java.util.concurrent.Executors; import javax.jms.Connection; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.Session; import javax.jms.TextMessage; public class Consumer { private String name; private String dest; private Connection conn; private MessageConsumer consumer; private Session session; private Executor executor = Executors.newFixedThreadPool(10); public Consumer(Connection conn, String dest, String name){ this.conn = conn; this.dest = dest; this.name = name; } public void start() throws JMSException{ //使用Consumer之前,必须调用conn的start方法建立连接。 conn.start(); session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); consumer = session.createConsumer(session.createQueue(dest)); } public void receive() { executor.execute(new Runnable() { @Override public void run() { while (true) { try { Message msg = consumer.receive(); if (msg instanceof TextMessage) { System.out.println(name + " receive message {" + ((TextMessage)msg).getText() + "}"); } else { System.out.println("msg: " + msg); } } catch (JMSException e) { e.printStackTrace(); } } } }); } }

下类描述了以5.2的方式接受消息。 import javax.jms.Connection; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Session; import javax.jms.TextMessage; public class Consumer2 implements MessageListener{ private String name; private String dest; private Connection conn; private MessageConsumer consumer; private Session session; public Consumer2(Connection conn, String dest, String name){ this.conn = conn; this.dest = dest; this.name = name; } public void start() throws JMSException{ //使用Consumer之前,必须调用conn的start方法建立连接。 conn.start(); session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); consumer = session.createConsumer(session.createQueue(dest)); consumer.setMessageListener(this); } @Override public void onMessage(Message msg) { try { System.out.println(name + " receive message {" + ((TextMessage)msg).getText() + "}"); } catch (JMSException e) { e.printStackTrace(); } } }消息队列的特点是:

1.可以多个生产者对同一个消息队列发送消息。

2.可以多个接受者监听同一个消息对列。

3.消息只能一次性被消费,一旦消息被Consumer1消费了,则Consumer2不可能再拿到这一消息,并且同时该消息被消息队列移除。

4.持久性存储,一旦消息没有被消费,消息会一直保留在消息队列中。 

利用消息队列的这一特点,我们可以实现简单的负载均衡,比如,我们可以部署几个相同的Service到不同的机器上,让他们监听同一个Queue,那么客户的请求到来后,消息中间件会动态分配其到某一个Service处理。 

上一篇文章,我们介绍了创建连接对象的不同方法,这里我们把这两种方式做一个包装:public class ConnFactory { private ConnectionFactory factory; public ConnFactory(){ try { Context context = new JndiFactory().getJndiContext(); this.factory = (ConnectionFactory) context.lookup("con1"); } catch (NamingException e) { this.factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_BROKER_URL); } } public Connection createConnection() throws JMSException{ return factory.createConnection(); } }  
创建不同的生产者对同一队列发送消息。

public static void main(String[] args) throws JMSException { ConnFactory cf = new ConnFactory(); Producer producer1 = new Producer(cf.createConnection(), "Queue1", "Product1"); Producer producer2 = new Producer(cf.createConnection(), "Queue1", "Product2"); producer1.start(); producer2.start(); for(int i = 0; i < 50; i++){ producer1.send("message " + i); producer2.send("message " + i); } }

创建不同的消费者监听同一对列。

public static void main(String[] args) throws JMSException { ConnFactory cf = new ConnFactory(); Consumer consumer1 = new Consumer(cf.createConnection(), "Queue1", "Consumer1"); Consumer consumer2 = new Consumer(cf.createConnection(), "Queue1", "Consumer2"); Consumer2 consumer3 = new Consumer2(cf.createConnection(), "Queue1", "Consumer3"); consumer1.start(); consumer2.start(); consumer3.start(); consumer1.receive(); consumer2.receive(); } 实验结果如下(事实上,不同的生产者生产的消息被那个消费者接收到是不确定的):Consumer3 receive message {Product1: message 1} Consumer3 receive message {Product2: message 2} Consumer2 receive message {Product2: message 0} Consumer1 receive message {Product1: message 0} Consumer1 receive message {Product2: message 1} Consumer2 receive message {Product1: message 2} Consumer1 receive message {Product1: message 3} Consumer2 receive message {Product2: message 3} Consumer3 receive message {Product1: message 4} Consumer1 receive message {Product2: message 4} Consumer2 receive message {Product1: message 5} Consumer3 receive message {Product2: message 5} Consumer1 receive message {Product1: message 6} Consumer2 receive message {Product2: message 6} Consumer3 receive message {Product1: message 7} Consumer1 receive message {Product2: message 7} Consumer2 receive message {Product1: message 8} Consumer3 receive message {Product2: message 8} Consumer1 receive message {Product1: message 9} Consumer2 receive message {Product2: message 9} Consumer3 receive message {Product1: message 10} Consumer1 receive message {Product2: message 10} Consumer2 receive message {Product1: message 11} Consumer1 receive message {Product1: message 12} Consumer3 receive message {Product2: message 11} Consumer2 receive message {Product2: message 12} Consumer3 receive message {Product1: message 13} Consumer3 receive message {Product2: message 14} Consumer3 receive message {Product1: message 16} Consumer3 receive message {Product2: message 17} Consumer3 receive message {Product1: message 19} Consumer3 receive message {Product2: message 20} Consumer3 receive message {Product1: message 22} Consumer2 receive message {Product1: message 14} Consumer1 receive message {Product2: message 13} Consumer1 receive message {Product1: message 15} Consumer1 receive message {Product2: message 16} Consumer2 receive message {Product2: message 15} Consumer2 receive message {Product1: message 17} Consumer1 receive message {Product1: message 18} Consumer1 receive message {Product2: message 19} Consumer1 receive message {Product1: message 21} Consumer1 receive message {Product2: message 22} Consumer2 receive message {Product2: message 18} Consumer2 receive message {Product1: message 20} Consumer2 receive message {Product2: message 21} Consumer2 receive message {Product1: message 23} Consumer3 receive message {Product2: message 23} Consumer1 receive message {Product1: message 24} Consumer3 receive message {Product1: message 25} Consumer2 receive message {Product2: message 24} Consumer2 receive message {Product1: message 26} Consumer1 receive message {Product2: message 25} Consumer3 receive message {Product2: message 26} Consumer2 receive message {Product2: message 27} Consumer1 receive message {Product1: message 27} Consumer3 receive message {Product1: message 28} Consumer1 receive message {Product2: message 28} Consumer2 receive message {Product1: message 29} Consumer3 receive message {Product2: message 29} Consumer1 receive message {Product1: message 30} Consumer3 receive message {Product1: message 31} Consumer1 receive message {Product2: message 31} Consumer2 receive message {Product2: message 30} Consumer2 receive message {Product1: message 32} Consumer3 receive message {Product2: message 32} Consumer1 receive message {Product1: message 33} Consumer3 receive message {Product1: message 34} Consumer2 receive message {Product2: message 33} Consumer2 receive message {Product1: message 35} Consumer3 receive message {Product2: message 35} Consumer1 receive message {Product2: message 34} Consumer1 receive message {Product1: message 36} Consumer2 receive message {Product2: message 36} Consumer3 receive message {Product1: message 37} Consumer1 receive message {Product2: message 37} Consumer2 receive message {Product1: message 38} Consumer1 receive message {Product1: message 39} Consumer3 receive message {Product2: message 38} Consumer2 receive message {Product2: message 39} Consumer3 receive message {Product1: message 40} Consumer1 receive message {Product2: message 40} Consumer2 receive message {Product1: message 41} Consumer3 receive message {Product2: message 41} Consumer2 receive message {Product2: message 42} Consumer3 receive message {Product1: message 43} Consumer1 receive message {Product1: message 42} Consumer2 receive message {Product1: message 44} Consumer1 receive message {Product2: message 43} Consumer3 receive message {Product2: message 44} Consumer1 receive message {Product1: message 45} Consumer2 receive message {Product2: message 45} Consumer3 receive message {Product1: message 46} Consumer1 receive message {Product2: message 46} Consumer2 receive message {Product1: message 47} Consumer3 receive message {Product2: message 47} Consumer1 receive message {Product1: message 48} Consumer3 receive message {Product1: message 49} Consumer2 receive message {Product2: message 48} Consumer1 receive message {Product2: message 49}如果你先执行发送消息的程序,在启动接受消息的程序,所有的消息都有可能被同一消费者消费,这是ActiveMQ为了提高效率,重用了同一个连接传输了所有的消息。其他的MQ产品未必会这么做,SnoicMQ它就会以一种随机的方式分发给不同的消费者。一旦你创建好消费者先监听消息队列,然后,再发送消息,由于这个时候,消费者与JMS Server之间的连接都已经建立,所以消息会随机的分发到不同的消费者。 

目录
相关文章
|
消息中间件 缓存 安全
SpringBoot与JMS集成(中间件为ActiveMQ)
Apache ActiveMQ是最受欢迎和强有力的开源消息和集成模式服务器,支持许多跨语言客户端和协议,便利使用企业集成模式还有许多先进的特性。
|
消息中间件 XML 开发框架
|
消息中间件 网络协议 Java
|
8月前
|
NoSQL Java Redis
阿里Java高级岗中间件二面:GC+IO+JVM+多线程+Redis+数据库+源码
虽然“钱多、事少、离家近”的工作可能离技术人比较远,但是找到一份合适的工作,其实并不像想象中那么难。但是,有些技术人确实是认真努力工作,但在面试时表现出的能力水平却不足以通过面试,或拿到高薪,其实不外乎以下 2 个原因:
|
8月前
|
算法 NoSQL Java
2023年阿里高频Java面试题:分布式+中间件+高并发+算法+数据库
又到了一年一度的金九银十,互联网行业竞争是一年比一年严峻,作为工程师的我们唯有不停地学习,不断的提升自己才能保证自己的核心竞争力从而拿到更好的薪水,进入心仪的企业(阿里、字节、美团、腾讯.....)
|
8月前
|
算法 NoSQL Java
2021年阿里高频Java面试题:分布式+中间件+高并发+算法+数据库
又到了一年一度的金九银十,互联网行业竞争是一年比一年严峻,作为工程师的我们唯有不停地学习,不断的提升自己才能保证自己的核心竞争力从而拿到更好的薪水,进入心仪的企业(阿里、字节、美团、腾讯.....)
|
9月前
|
消息中间件 数据采集 Java
开发神技!阿里消息中间件进阶手册限时开源,请接住我的下巴
相信大家在实际工作中都用过消息中间件进行系统间数据交换,解决应用解耦、异步消息、流量削峰等问题,由此消息中间件的强大功能想必也不用我多说了!目前业界上关于消息中间件的实现多达好几十种,可谓百花齐放,所用的实现语言同样也五花八门。不管使用哪一个消息中间件,我们的目的都是实现高性能、高可用、可伸缩和最终一致性架构。
|
11月前
|
缓存 NoSQL 容灾
《Java应用提速(速度与激情)》——六、阿里中间件提速
《Java应用提速(速度与激情)》——六、阿里中间件提速
|
11月前
|
消息中间件 NoSQL Dubbo
阿里Java高级岗中间件二面:GC+IO+JVM+多线程+Redis+数据库+源码
一转眼,都2023年了,你是否在满意的公司?拿着理想的薪水? 虽然“钱多、事少、离家近”的工作可能离技术人比较远,但是找到一份合适的工作,其实并不像想象中那么难。但是,有些技术人确实是认真努力工作,但在面试时表现出的能力水平却不足以通过面试,或拿到高薪,其实不外乎以下 2 个原因: 第一,“知其然不知其所以然”。做了多年技术,开发了很多业务应用,但似乎并未思考过种种技术选择背后的逻辑。所以,他无法向面试官展现出自己未来技术能力的成长潜力。面试官也不会放心把具有一定深度的任务交给他。 第二,知识碎片化,不成系统。在面试中,面试者似乎无法完整、清晰地描述自己所开发的系统,或者使用的相关技术。