Java中间件JMS(四)之ActiveMQ整合spring之类转换

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介:

原文链接:http://blog.csdn.net/dwc_fly/article/details/11096071

前几章都是直接发送MapMessage类型的数据,拿前面的例子来讲,如果生产者发送的是TextMessage,消费者也是必须TextMessage;如果我们自己要发送的数据不是TextMessage类型,而消费者还是TextMessage的,那该怎么办?难道每次接受后都要增加一个转换方法么?其实spring早就考虑到这种情况了。转化器在很多组件中都是必不缺少的东西Spring的MessageConverter接口提供了对消息转换的支持。


1、转换类的相关代码POJO

新建一个类MsgPoJo,就是一个简单的Pojo类。具体代码如下:

[java] view plain copy
  1. package jms.mq.spring;  

  2. import java.io.Serializable;  

  3. publicclass MsgPoJo implements Serializable{  

  4. private String id;  

  5. private String text;  

  6. public String getId() {  

  7. return id;  

  8.    }  

  9. publicvoid setId(String id) {  

  10. this.id = id;  

  11.    }  

  12. public String getText() {  

  13. return text;  

  14.    }  

  15. publicvoid setText(String text) {  

  16. this.text = text;  

  17.    }    

  18. }  


2.转换类的实现

新建一个类MsgConverter.java,实现MessageConverter接口。生成的代码如下


[java] view plain copy
  1. package jms.mq.spring;  

  2. import javax.jms.JMSException;  

  3. import javax.jms.Message;  

  4. import javax.jms.Session;  

  5. import javax.jms.TextMessage;  

  6. import org.springframework.jms.support.converter.MessageConversionException;  

  7. import org.springframework.jms.support.converter.MessageConverter;  

  8. publicclass MsgConverter implements MessageConverter{  

  9. @Override

  10. public Object fromMessage(Message message) throws JMSException,  

  11.    MessageConversionException {  

  12. if (!(message instanceof TextMessage)) {  

  13. thrownew MessageConversionException("Message is not TextMessage");  

  14.        }  

  15.        System.out.println("--转换接收的消息--");  

  16.        TextMessage textMessage = (TextMessage) message;  

  17.        MsgPoJo msgPojo = new MsgPoJo();  

  18.        String[] texts=textMessage.getText().split(",");  

  19.        msgPojo.setId(texts[0]);  

  20.        msgPojo.setText(texts[1]);  

  21. return msgPojo;  

  22.    }  

  23. @Override

  24. public Message toMessage(Object object, Session session) throws JMSException,  

  25.    MessageConversionException {  

  26. if (!(object instanceof MsgPoJo)) {  

  27. thrownew MessageConversionException("obj is not MsgPojo");  

  28.        }  

  29.        System.out.println("--转换发送的消息--");  

  30.        MsgPoJo msgPojo = (MsgPoJo) object;  

  31.        TextMessage textMessage = session.createTextMessage();  

  32.        textMessage.setText(msgPojo.getId()+","+msgPojo.getText());  

  33. return  textMessage;  

  34.    }  

  35. }  



代码很简单就是做些转换,有fromMessage和toMessage两个方法,真好对应发送转换toMessage和接受转换fromMessage。此时,发送和接收消息要换成template.convertAndSend(message);template.receiveAndConvert()。接下来我做一些配置,让spring知道我们的转换类。修改applicationContext.xml中jms模版配置的代码,修改后的代码如下:

[html] view plain copy
  1. <!-- 类转换 -->

  2. <beanid="msgConverter"class="jms.mq.spring.MsgConverter"></bean>

  3. <!-- 配置Jms模板 -->

  4. <beanid="jmsQueueTemplate"class="org.springframework.jms.core.JmsTemplate">

  5. <propertyname="connectionFactory"ref="connectionFactory"/>

  6. <propertyname="defaultDestination"ref="queueDest"/>

  7. <!--<property name="receiveTimeout" value="10000" /> -->

  8. <!-- 类转换 -->

  9. <propertyname="messageConverter"ref="msgConverter"></property>

  10. </bean>

注意:如果你有队列监听容器配置,配置jmsQueueTemplate和jmsTopicTemplate可能与队列容器配置冲突。

3、业务相关代码和配置

在QueueProducerService.java增加convertAndSend()方法并在其实现类中实现,实现类的代码如下:


[java] view plain copy
  1. package jms.mq.spring;  

  2. import java.util.Date;  

  3. import javax.jms.Destination;  

  4. import javax.jms.JMSException;  

  5. import javax.jms.Message;  

  6. import javax.jms.Session;  

  7. import javax.jms.TextMessage;  

  8. import org.springframework.jms.core.JmsTemplate;  

  9. import org.springframework.jms.core.MessageCreator;  

  10. publicclass QueueProducerService{  

  11.    JmsTemplate jmsTemplate;  

  12.    Destination destination;  

  13. publicvoid send() {  

  14.        MessageCreator messageCreator = new MessageCreator() {  

  15. public Message createMessage(Session session) throws JMSException {  

  16.                TextMessage message = session.createTextMessage();  

  17.                message.setText("QueueProducerService发送消息"+new Date());  

  18. return message;  

  19.            }  

  20.        };  

  21.        jmsTemplate.send(this.destination,messageCreator);  

  22.    }  

  23. publicvoid convertAndSend(){  

  24.        MsgPoJo msgPojo = new MsgPoJo();  

  25.        msgPojo.setId("1");  

  26.        msgPojo.setText("first msg");  

  27.        System.out.println("--发送消息:msgPojo.id为"+msgPojo.getId()+";msgPojo.text为"+msgPojo.getText());  

  28.        jmsTemplate.convertAndSend(this.destination, msgPojo);  

  29.    }  

  30. publicvoid setJmsTemplate(JmsTemplate jmsTemplate) {  

  31. this.jmsTemplate = jmsTemplate;  

  32.    }  

  33. publicvoid setDestination(Destination destination) {  

  34. this.destination = destination;  

  35.    }  

  36. }  


同样在QueueConsumerService.java中增加receiveAndConvert()方法并在其实现类中实现,实现类的代码如下:


[java] view plain copy
  1. package jms.mq.spring;  

  2. import javax.jms.Destination;  

  3. import javax.jms.JMSException;  

  4. import javax.jms.TextMessage;  

  5. import org.springframework.jms.core.JmsTemplate;  

  6. publicclass QueueConsumerService{  

  7.    JmsTemplate jmsTemplate;  

  8.    Destination destination;  

  9. publicvoid receive() {  

  10.        TextMessage message = (TextMessage) jmsTemplate.receive();  

  11. try {  

  12.            System.out.println("QueueConsumerService收到消息:"+message.getText());  

  13.        } catch (JMSException e) {  

  14.            e.printStackTrace();  

  15.        }  

  16.    }  

  17. publicvoid receiveAndConvert() {  

  18.        MsgPoJo msgPojo = (MsgPoJo)jmsTemplate.receiveAndConvert();  

  19. if(msgPojo!=null){  

  20.            System.out.println("--收到消息:msgPojo.id为"+msgPojo.getId()+";msgPojo.text为"+msgPojo.getText());  

  21.        }  

  22.    }  

  23. publicvoid setJmsTemplate(JmsTemplate jmsTemplate) {  

  24. this.jmsTemplate = jmsTemplate;  

  25.    }  

  26. publicvoid setDestination(Destination destination) {  

  27. this.destination = destination;  

  28.    }  

  29. }  



修改我们的两个测试类,增加对转换方法的调用,不再赘述,直接上代码:

QueueConsumerTest.java测试类

[java] view plain copy
  1. package jms.mq.spring;  

  2. import org.springframework.context.ApplicationContext;  

  3. import org.springframework.context.support.ClassPathXmlApplicationContext;  

  4. publicclass QueueConsumerTest {  

  5. privatestatic ApplicationContext appContext = new ClassPathXmlApplicationContext( "applicationContext.xml");  

  6. privatestaticvoid receive() {  

  7.        QueueConsumerService consumerService = (QueueConsumerService) appContext.getBean("queueConsumerService");  

  8.        consumerService.receive();  

  9.    }  

  10. privatestaticvoid receiveAndConvert() {  

  11.        QueueConsumerService consumerService = (QueueConsumerService) appContext.getBean("queueConsumerService");  

  12.        consumerService.receiveAndConvert();  

  13.    }  

  14. publicstaticvoid main(String[] args) {  

  15. //receive();

  16.        receiveAndConvert();  

  17.    }  

  18. }  

QueueProducerTest.java测试类

[java] view plain copy
  1. package jms.mq.spring;  

  2. import org.springframework.context.ApplicationContext;  

  3. import org.springframework.context.support.ClassPathXmlApplicationContext;  

  4. publicclass QueueProducerTest {  

  5. privatestatic ApplicationContext appContext = new ClassPathXmlApplicationContext( "applicationContext.xml");  

  6. privatestaticvoid send() {  

  7.        QueueProducerService producerService = (QueueProducerService) appContext.getBean("queueProducerService");  

  8.        producerService.send();  

  9.    }  

  10. privatestaticvoid convertAndSend() {  

  11.        QueueProducerService producerService = (QueueProducerService) appContext.getBean("queueProducerService");  

  12.        producerService.convertAndSend();  

  13.    }  

  14. publicstaticvoid main(String[] args) {  

  15. //send();

  16.        convertAndSend();  

  17.    }  

  18. }  


代码编写完毕,我们看一下我们的劳动成果。首先运行生产者类和消费者控制台信息如下:

SouthEast

SouthEast


收到的内容与发的内容相同,说明转换成功了。如果这一部分的程序使用的队列跟上面的一样,那你会发现发送的时候打印出的信息不值上面的一个,还包括一个接收的信息,这是为什么呢?了解spring原理的人应该知道,spring是把所有类都加载到内容中,当然也包括我们上门写的按个实现MessageListener的一个消费者类,他们也在运行,如果监听的地址跟你送的地址正好相同的话,他也有可能收到这个信息。所以在测试的时候要注意修改配置文件。


[html] view plain copy
  1. <beanid="queueProducerService"class="jms.mq.spring.QueueProducerService">

  2. <propertyname="jmsTemplate"ref="jmsQueueTemplate"/>

  3. <propertyname="destination"ref="queueDest"/>

  4. </bean>

  5. <beanid="queueConsumerService"class="jms.mq.spring.QueueConsumerService">

  6. <propertyname="jmsTemplate"ref="jmsQueueTemplate"/>

  7. <propertyname="destination"ref="queueDest"/>

  8. </bean>


4、监听器上的使用方式

我再来学习一下跟监听器联合使用的方式,只在发布订阅者模式上演示一下。我们先来修改发布者的实现方式,在发布者中增加convertAndSend方法并在其实现类中实现,订阅者监听器没有类转换,不用修改,发布者修改后的代码如下:


[java] view plain copy
  1. package jms.mq.spring;  

  2. import java.util.Date;  

  3. import javax.jms.Destination;  

  4. import javax.jms.JMSException;  

  5. import javax.jms.MapMessage;  

  6. import javax.jms.Message;  

  7. import javax.jms.Session;  

  8. import javax.jms.TextMessage;  

  9. import org.springframework.jms.core.JmsTemplate;  

  10. import org.springframework.jms.core.MessageCreator;  

  11. import jms.spring.QueueProducerService;  

  12. publicclass TopicPublisherService{  

  13.    JmsTemplate jmsTemplate;  

  14.    Destination destination;  

  15. publicvoid send() {  

  16.        MessageCreator messageCreator = new MessageCreator() {  

  17. public Message createMessage(Session session) throws JMSException {  

  18.                TextMessage message = session.createTextMessage();  

  19.                message.setText("QueueProducerService发送消息"+new Date());  

  20. return message;  

  21.            }  

  22.        };  

  23.        jmsTemplate.send(this.destination,messageCreator);  

  24.    }  

  25. publicvoid convertAndSend(Object obj) {  

  26.        System.out.println("--发送PoJo对象...");  

  27.        jmsTemplate.convertAndSend(destination, obj);  

  28.    }  

  29. publicvoid setJmsTemplate(JmsTemplate jmsTemplate) {  

  30. this.jmsTemplate = jmsTemplate;  

  31.    }  

  32. publicvoid setDestination(Destination destination) {  

  33. this.destination = destination;  

  34.    }  

  35. }  


发布订阅者配置文件如下


[html] view plain copy
  1. <!-- 配置TopicJms模板 -->

  2. <beanid="jmsTopicTemplate"class="org.springframework.jms.core.JmsTemplate">

  3. <propertyname="connectionFactory"ref="connectionFactory"/>

  4. <propertyname="defaultDestination"ref="topicDest"/>

  5. <!-- 配置是否为发布订阅者模式,默认为false -->

  6. <propertyname="pubSubDomain"value="true"/>

  7. <!--<property name="receiveTimeout" value="10000" /> -->

  8. <propertyname="messageConverter"ref="msgConverter"></property>

  9. </bean>

[html] view plain copy
  1. <beanid="topicPublisherService"class="jms.mq.spring.TopicPublisherService">

  2. <propertyname="jmsTemplate"ref="jmsTopicTemplate"/>

  3. <!-- <property name="destination" ref="topicDest" /> -->

  4. <propertyname="destination"ref="topicSubscriberMessageListenerDest"/>

  5. </bean>

  6. <beanid="topicSubscriberService"class="jms.mq.spring.TopicSubscriberService">

  7. <propertyname="jmsTemplate"ref="jmsTopicTemplate"/>

  8. <propertyname="destination"ref="topicDest"/>

  9. </bean>


修改上面的发布测试类,修改增加对新增方法的调用,修改后的内容如下:

[java] view plain copy
  1. package jms.mq.spring;  

  2. import org.springframework.context.ApplicationContext;  

  3. import org.springframework.context.support.ClassPathXmlApplicationContext;  

  4. publicclass TopicPublisherTest {  

  5. privatestatic ApplicationContext appContext = new ClassPathXmlApplicationContext( "applicationContext.xml");  

  6. privatestaticvoid send() {  

  7.        TopicPublisherService topicPublisherService = (TopicPublisherService) appContext.getBean("topicPublisherService");  

  8.        topicPublisherService.send();  

  9.    }  

  10. privatestaticvoid convertAndSend() {  

  11.        TopicPublisherService topicPublisherService = (TopicPublisherService) appContext.getBean("topicPublisherService");  

  12.        MsgPoJo msgPoJo = new MsgPoJo();  

  13.        msgPoJo.setId("1");  

  14.        msgPoJo.setText("测试内容");  

  15.        topicPublisherService.convertAndSend(msgPoJo);  

  16.    }  

  17. publicstaticvoid main(String[] args) {  

  18. //send();

  19.        convertAndSend();  

  20.    }  

  21. }  

运行发布测试类,运行结果如下:

SouthEast

写在到这里,ActiveMQ与spring整合就讲完了,主要讲了ActiveMQ与spring的简单整合,监听器和类转换这些主要功能.















本文转自yunlielai51CTO博客,原文链接: http://blog.51cto.com/4925054/1288918 ,如需转载请自行联系原作者
相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
28天前
|
XML 存储 Java
Spring重要类解析
Spring重要类解析
20 0
|
1月前
|
网络安全
ssh(Spring+Spring mvc+hibernate)——DeptDaoImpl.java
ssh(Spring+Spring mvc+hibernate)——DeptDaoImpl.java
12 0
|
1月前
|
网络安全
ssh(Spring+Spring mvc+hibernate)——BaseDaoImpl.java
ssh(Spring+Spring mvc+hibernate)——BaseDaoImpl.java
12 0
|
1月前
|
Shell
sh(Spring+Spring mvc+hibernate)——IEmpDao.java
sh(Spring+Spring mvc+hibernate)——IEmpDao.java
11 0
|
1月前
|
Shell
sh(Spring+Spring mvc+hibernate)——IDeptDao.java
sh(Spring+Spring mvc+hibernate)——IDeptDao.java
13 0
|
1月前
|
网络安全
ssh(Spring+Spring mvc+hibernate)——Dept.java
ssh(Spring+Spring mvc+hibernate)——Dept.java
12 0
|
1天前
|
Java Nacos 开发者
Java从入门到精通:4.2.1学习新技术与框架——以Spring Boot和Spring Cloud Alibaba为例
Java从入门到精通:4.2.1学习新技术与框架——以Spring Boot和Spring Cloud Alibaba为例
|
1天前
|
Dubbo Java 应用服务中间件
Java从入门到精通:3.2.2分布式与并发编程——了解分布式系统的基本概念,学习使用Dubbo、Spring Cloud等分布式框架
Java从入门到精通:3.2.2分布式与并发编程——了解分布式系统的基本概念,学习使用Dubbo、Spring Cloud等分布式框架
|
8天前
|
负载均衡 Java 开发者
细解微服务架构实践:如何使用Spring Cloud进行Java微服务治理
【4月更文挑战第17天】Spring Cloud是Java微服务治理的首选框架,整合了Eureka(服务发现)、Ribbon(客户端负载均衡)、Hystrix(熔断器)、Zuul(API网关)和Config Server(配置中心)。通过Eureka实现服务注册与发现,Ribbon提供负载均衡,Hystrix实现熔断保护,Zuul作为API网关,Config Server集中管理配置。理解并运用Spring Cloud进行微服务治理是现代Java开发者的关键技能。
|
9天前
|
安全 Java 数据安全/隐私保护
使用Spring Security进行Java身份验证与授权
【4月更文挑战第16天】Spring Security是Java应用的安全框架,提供认证和授权解决方案。通过添加相关依赖到`pom.xml`,然后配置`SecurityConfig`,如设置用户认证信息和URL访问规则,可以实现应用的安全保护。认证流程包括请求拦截、身份验证、响应生成和访问控制。授权则涉及访问决策管理器,如基于角色的投票。Spring Security为开发者构建安全应用提供了全面且灵活的工具,涵盖OAuth2、CSRF保护等功能。