消息中间件ActiveMQ(4)--Publisher/Subscriber实验

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介:

博学,切问,近思--詹子知 (https://jameszhan.github.io)


发布/订阅(Publish/Subscribe)模式:发布/订阅功能使消息的分发可以突破目的队列地理指向的限制,使消息按照特定的主题甚至内容进行分发,用户或应用程序可以根据主题或内容接收到所需要的消息。发布/订阅功能使得发送者和接收者之间的耦合关系变得更为松散,发送者不必关心接收者的目的地址,而接收者也不必关心消息的发送地址,而只是根据消息的主题进行消息的收发。在MQ家族产品中,MQ Event Broker是专门用于使用发布/订阅技术进行数据通讯的产品,它支持基于队列和直接基于TCP/IP两种方式的发布和订阅。

在开始编程之前,我们先看一下点对点和发布/订阅接口的关系:

JMS 公共 PTP 域 Pub/Sub 域
ConnectionFactory QueueConnectionFactory TopicConnectionFactory
Connection QueueConnection TopicConnection
Destination Queue Topic
Session QueueSession TopicSession
MessageProducer QueueSender TopicPublisher
MessageConsumer QueueReceiver TopicSubscriber

JMS 1.1 通过统一的域简化了消息传递,在编程中,我们实际上只需要使用JMS公共域编程即可,对于P2P模式和Pub/Sub模式在编程方式上几乎毫无区别。我们再看一下上文提到的那个JMS类关系图:
JMS

按照JMS规范,发布消息的步骤如下:

  1. 从连接工厂中拿出Connecion对象。
  2. 和服务器建立连接(Connection.start())。
  3. 创建会话(Session)对象。
  4. 通过Session,在指定的Topic创建消息发布者(MessageProducer)。
  5. 使用Session创建消息。
  6. 使用消息生产者发布消息。
import javax.jms.Connection; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; public class Producer { private String name; private String dest; private Connection conn; private MessageProducer producer; private Session session; public Producer(Connection conn, String dest, String name) { this.conn = conn; this.dest = dest; this.name = name; } public void start() throws JMSException { //conn 可以不连接,当发送消息是会自动建立连接。 conn.start(); session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); producer = session.createProducer(session.createTopic(dest)); } public void send(String text) throws JMSException{ TextMessage msg = session.createTextMessage(name + ": " + text); producer.send(msg); } }

按照JMS规范,订阅消息的步骤如下:

  1. 从连接工厂中拿出Connecion对象。
  2. 和服务器建立连接(Connection.start())。
  3. 创建会话(Session)对象。
  4. 通过Session,在指定的Topic创建消息订阅者(MessageConsumer)。
  5. 订阅消息:
    • 5.1.调用messageConsumer.receive方法接受消息,如果队列上有消息,则receive方法返回该消息对象,如果队列上无消息,则该方法阻塞。
    • 5.2.也可以以为Session指定MessageListener对象的方式来订阅消息,该方法的好处在于,一旦有新消息到来,会自动触发该对象的onMessage方法执行。
下类描述了以5.1的方式接受消息。 import java.util.concurrent.Executor; import java.util.concurrent.Executors; import javax.jms.Connection; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.Session; import javax.jms.TextMessage; public class Consumer { private String name; private String dest; private Connection conn; private MessageConsumer consumer; private Session session; private Executor executor = Executors.newFixedThreadPool(10); public Consumer(Connection conn, String dest, String name){ this.conn = conn; this.dest = dest; this.name = name; } public void start() throws JMSException{ //使用Consumer之前,必须调用conn的start方法建立连接。 conn.start(); session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); consumer = session.createConsumer(session.createTopic(dest)); } public void receive() { executor.execute(new Runnable() { @Override public void run() { while (true) { try { Message msg = consumer.receive(); if (msg instanceof TextMessage) { System.out.println(name + " receive message {" + ((TextMessage)msg).getText() + "}"); } else { System.out.println("msg: " + msg); } } catch (JMSException e) { e.printStackTrace(); } } } }); } } 下类描述了以5.2的方式接受消息。 import javax.jms.Connection; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Session; import javax.jms.TextMessage; public class Consumer2 implements MessageListener{ private String name; private String dest; private Connection conn; private MessageConsumer consumer; private Session session; public Consumer2(Connection conn, String dest, String name){ this.conn = conn; this.dest = dest; this.name = name; } public void start() throws JMSException{ //使用Consumer之前,必须调用conn的start方法建立连接。 conn.start(); session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); consumer = session.createConsumer(session.createTopic(dest)); consumer.setMessageListener(this); } @Override public void onMessage(Message msg) { try { System.out.println(name + " receive message {" + ((TextMessage)msg).getText() + "}"); } catch (JMSException e) { e.printStackTrace(); } } }

发布/订阅者模式的特点是:

  1. 可以多个发布者对同一个Topic发布消息。
  2. 可以多个订阅者监听同一个Topic。
  3. 消息将被所有的订阅者接收。默认情况下,消息只会发送给所有在线的订阅者,一旦消息发送给了所有在线的订阅者,消息就会从Topic中移除。
  4. 可以特别地为主题创建持久的订阅者,只要消息不被该消费者消费,消息就会一直保留在Topic中,一旦该持久订阅者上线,消息会自动发送给该订阅者。

在文章 消息中间件ActiveMQ(2)--创建连接对象 中,我们介绍了创建连接对象的不同方法,这里我们把这两种方式做一个包装: public class ConnFactory { private ConnectionFactory factory; public ConnFactory(){ try { Context context = new JndiFactory().getJndiContext(); this.factory = (ConnectionFactory) context.lookup("con1"); } catch (NamingException e) { this.factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_BROKER_URL); } } public Connection createConnection() throws JMSException{ return factory.createConnection(); } } 创建不同的发布者对同一Topic发送消息。 public static void main(String[] args) throws JMSException { ConnFactory cf = new ConnFactory(); Producer producer1 = new Producer(cf.createConnection(), "Topic1", "Product1"); Producer producer2 = new Producer(cf.createConnection(), "Topic1", "Product2"); producer1.start(); producer2.start(); for(int i = 0; i < 6; i++){ producer1.send("message " + i); producer2.send("message " + i); } } 创建不同的订阅者监听同一Topic。 public static void main(String[] args) throws JMSException { ConnFactory cf = new ConnFactory(); Consumer consumer1 = new Consumer(cf.createConnection(), "Topic1", "Consumer1"); Consumer consumer2 = new Consumer(cf.createConnection(), "Topic1", "Consumer2"); Consumer2 consumer3 = new Consumer2(cf.createConnection(), "Topic1", "Consumer3"); consumer1.start(); consumer2.start(); consumer3.start(); consumer1.receive(); consumer2.receive(); } 实验结果如下(注意事项,应先启动订阅者监听Topic,再使用发布者发布消息。): Consumer3 receive message {Product1: message 0} Consumer1 receive message {Product1: message 0} Consumer2 receive message {Product1: message 0} Consumer3 receive message {Product2: message 0} Consumer1 receive message {Product2: message 0} Consumer2 receive message {Product2: message 0} Consumer2 receive message {Product1: message 1} Consumer3 receive message {Product1: message 1} Consumer1 receive message {Product1: message 1} Consumer3 receive message {Product2: message 1} Consumer2 receive message {Product2: message 1} Consumer1 receive message {Product2: message 1} Consumer3 receive message {Product1: message 2} Consumer2 receive message {Product1: message 2} Consumer1 receive message {Product1: message 2} Consumer1 receive message {Product2: message 2} Consumer2 receive message {Product2: message 2} Consumer3 receive message {Product2: message 2} Consumer3 receive message {Product1: message 3} Consumer2 receive message {Product1: message 3} Consumer3 receive message {Product2: message 3} Consumer1 receive message {Product1: message 3} Consumer1 receive message {Product2: message 3} Consumer3 receive message {Product1: message 4} Consumer1 receive message {Product1: message 4} Consumer2 receive message {Product2: message 3} Consumer3 receive message {Product2: message 4} Consumer1 receive message {Product2: message 4} Consumer3 receive message {Product1: message 5} Consumer2 receive message {Product1: message 4} Consumer3 receive message {Product2: message 5} Consumer1 receive message {Product1: message 5} Consumer2 receive message {Product2: message 4} Consumer1 receive message {Product2: message 5} Consumer2 receive message {Product1: message 5} Consumer2 receive message {Product2: message 5}

目录
相关文章
|
SQL 开发框架 前端开发
java程序设计与j2ee中间件技术/软件开发技术(III)-实验六-采用MVC模式实现用户注册和管理员查询功能
java程序设计与j2ee中间件技术/软件开发技术(III)-实验六-采用MVC模式实现用户注册和管理员查询功能
101 1
java程序设计与j2ee中间件技术/软件开发技术(III)-实验六-采用MVC模式实现用户注册和管理员查询功能
|
开发框架 Java 中间件
java程序设计与j2ee中间件技术/软件开发技术(III)-实验五-实现一个简单的购物车功能,使用JSP内置对象实现猜数字的小游戏
java程序设计与j2ee中间件技术/软件开发技术(III)-实验五-实现一个简单的购物车功能,使用JSP内置对象实现猜数字的小游戏
200 1
java程序设计与j2ee中间件技术/软件开发技术(III)-实验五-实现一个简单的购物车功能,使用JSP内置对象实现猜数字的小游戏
|
Web App开发 开发框架 Java
java程序设计与j2ee中间件技术/软件开发技术(III)-实验四-编写简单的jsp页面
java程序设计与j2ee中间件技术/软件开发技术(III)-实验四-编写简单的jsp页面
160 0
java程序设计与j2ee中间件技术/软件开发技术(III)-实验四-编写简单的jsp页面
|
开发框架 Java 中间件
java程序设计与j2ee中间件技术/软件开发技术(I)-实验三-接口、开闭原则和异常
java程序设计与j2ee中间件技术/软件开发技术(I)-实验三-接口、开闭原则和异常
181 0
java程序设计与j2ee中间件技术/软件开发技术(I)-实验三-接口、开闭原则和异常
|
开发框架 Java 中间件
java程序设计与j2ee中间件技术/软件开发技术(I)-实验二-类与对象
java程序设计与j2ee中间件技术/软件开发技术(I)-实验二-类与对象
122 1
java程序设计与j2ee中间件技术/软件开发技术(I)-实验二-类与对象
|
开发框架 Java 中间件
java程序设计与j2ee中间件技术/软件开发技术(I)-实验一-你好世界
java程序设计与j2ee中间件技术/软件开发技术(I)-实验一-你好世界
167 0
java程序设计与j2ee中间件技术/软件开发技术(I)-实验一-你好世界
|
8月前
|
NoSQL Java Redis
阿里Java高级岗中间件二面:GC+IO+JVM+多线程+Redis+数据库+源码
虽然“钱多、事少、离家近”的工作可能离技术人比较远,但是找到一份合适的工作,其实并不像想象中那么难。但是,有些技术人确实是认真努力工作,但在面试时表现出的能力水平却不足以通过面试,或拿到高薪,其实不外乎以下 2 个原因:
|
8月前
|
算法 NoSQL Java
2023年阿里高频Java面试题:分布式+中间件+高并发+算法+数据库
又到了一年一度的金九银十,互联网行业竞争是一年比一年严峻,作为工程师的我们唯有不停地学习,不断的提升自己才能保证自己的核心竞争力从而拿到更好的薪水,进入心仪的企业(阿里、字节、美团、腾讯.....)
|
8月前
|
算法 NoSQL Java
2021年阿里高频Java面试题:分布式+中间件+高并发+算法+数据库
又到了一年一度的金九银十,互联网行业竞争是一年比一年严峻,作为工程师的我们唯有不停地学习,不断的提升自己才能保证自己的核心竞争力从而拿到更好的薪水,进入心仪的企业(阿里、字节、美团、腾讯.....)
|
9月前
|
消息中间件 数据采集 Java
开发神技!阿里消息中间件进阶手册限时开源,请接住我的下巴
相信大家在实际工作中都用过消息中间件进行系统间数据交换,解决应用解耦、异步消息、流量削峰等问题,由此消息中间件的强大功能想必也不用我多说了!目前业界上关于消息中间件的实现多达好几十种,可谓百花齐放,所用的实现语言同样也五花八门。不管使用哪一个消息中间件,我们的目的都是实现高性能、高可用、可伸缩和最终一致性架构。