基于PelicanDT验证RocketMQ消息收发

简介: 具体介绍RocketMQ-example,是基于PelicanDT实现RocketMQ环境准备,收发消息验证示例前期准备本示例程序是基于阿里云ECS或远程Linux服务器完成,只需购买阿里云机器,或者选定已准备好的远程服务器即可下载RocketMQ-example代码注意事项:如果购...

具体介绍

RocketMQ-example,是基于PelicanDT实现RocketMQ环境准备,收发消息验证示例

前期准备

  1. 本示例程序是基于阿里云ECS或远程Linux服务器完成,只需购买阿里云机器,或者选定已准备好的远程服务器即可
  2. 下载RocketMQ-example代码

注意事项:如果购买的是阿里云ECS,配置:8C16G,且安全组配置访问端口:9876

快速入门

修改配置

  1. 打开rocketmq.properties配置文件,具体路径:RocketMQ-example/src/test/resources/env/func/rocketmq.properties
  2. 填写ip,userName,password

运行示例

本地代码控制远程服务器执行Dubbo验证:

  1. 打开TestRocketMQ.java,具体路径:RocketMQ-example/src/test/java/com/alibaba/pelican/rocketmq/TestRocketMQ.java
  2. 运行单元测试

预期结果

日志输出内容如下


2019-02-28 19:46:46 [INFO] [main] c.a.p.deployment.junit.rule.LogRule - --------- TO NEXT CASE ---------
2019-02-28 19:46:46 [INFO] [main] c.a.p.deployment.junit.rule.LogRule - Run TC[test(com.alibaba.pelican.rocketmq.TestRocketMQ)]
SendResult [sendStatus=SEND_OK, msgId=1E057C08A74518B4AAC28F4A3D090000, offsetMsgId=781B1FC600002A9F0000000000008BC4, messageQueue=MessageQueue [topic=TopicTest, brokerName=iZm5e0pe3xy3tjh9sw1kgpZ, queueId=3], queueOffset=50]
SendResult [sendStatus=SEND_OK, msgId=1E057C08A74518B4AAC28F4A3D920001, offsetMsgId=781B1FC600002A9F0000000000008C76, messageQueue=MessageQueue [topic=TopicTest, brokerName=iZm5e0pe3xy3tjh9sw1kgpZ, queueId=0], queueOffset=50]
SendResult [sendStatus=SEND_OK, msgId=1E057C08A74518B4AAC28F4A3DB10002, offsetMsgId=781B1FC600002A9F0000000000008D28, messageQueue=MessageQueue [topic=TopicTest, brokerName=iZm5e0pe3xy3tjh9sw1kgpZ, queueId=1], queueOffset=50]
SendResult [sendStatus=SEND_OK, msgId=1E057C08A74518B4AAC28F4A3DD60003, offsetMsgId=781B1FC600002A9F0000000000008DDA, messageQueue=MessageQueue [topic=TopicTest, brokerName=iZm5e0pe3xy3tjh9sw1kgpZ, queueId=2], queueOffset=50]
SendResult [sendStatus=SEND_OK, msgId=1E057C08A74518B4AAC28F4A3DFE0004, offsetMsgId=781B1FC600002A9F0000000000008E8C, messageQueue=MessageQueue [topic=TopicTest, brokerName=iZm5e0pe3xy3tjh9sw1kgpZ, queueId=3], queueOffset=51]
2019-02-28 19:46:47 [INFO] [NettyClientSelector_1] RocketmqRemoting - closeChannel: close the connection to remote address[120.27.31.198:10911] result: true
2019-02-28 19:46:47 [INFO] [NettyClientSelector_1] RocketmqRemoting - closeChannel: close the connection to remote address[120.27.31.198:9876] result: true
2019-02-28 19:46:47 [INFO] [NettyClientSelector_1] RocketmqRemoting - closeChannel: close the connection to remote address[120.27.31.198:10909] result: true
ConsumeMessageThread_2 Receive New Messages: [MessageExt [queueId=3, storeSize=178, queueOffset=51, sysFlag=0, bornTimestamp=1551354407422, bornHost=/42.120.74.97:44264, storeTimestamp=1551354407473, storeHost=/120.27.31.198:10911, msgId=781B1FC600002A9F0000000000008E8C, commitLogOffset=36492, bodyCRC=601994070, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=52, CONSUME_START_TIME=1551354408017, UNIQ_KEY=1E057C08A74518B4AAC28F4A3DFE0004, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 52], transactionId='null'}]] 
ConsumeMessageThread_1 Receive New Messages: [MessageExt [queueId=3, storeSize=178, queueOffset=50, sysFlag=0, bornTimestamp=1551354407177, bornHost=/42.120.74.97:44264, storeTimestamp=1551354407322, storeHost=/120.27.31.198:10911, msgId=781B1FC600002A9F0000000000008BC4, commitLogOffset=35780, bodyCRC=613185359, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=52, CONSUME_START_TIME=1551354408017, UNIQ_KEY=1E057C08A74518B4AAC28F4A3D090000, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 48], transactionId='null'}]] 
ConsumeMessageThread_3 Receive New Messages: [MessageExt [queueId=2, storeSize=178, queueOffset=50, sysFlag=0, bornTimestamp=1551354407382, bornHost=/42.120.74.97:44264, storeTimestamp=1551354407440, storeHost=/120.27.31.198:10911, msgId=781B1FC600002A9F0000000000008DDA, commitLogOffset=36314, bodyCRC=1032136437, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=51, CONSUME_START_TIME=1551354411038, UNIQ_KEY=1E057C08A74518B4AAC28F4A3DD60003, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 51], transactionId='null'}]] 
ConsumeMessageThread_4 Receive New Messages: [MessageExt [queueId=1, storeSize=178, queueOffset=50, sysFlag=0, bornTimestamp=1551354407345, bornHost=/42.120.74.97:44264, storeTimestamp=1551354407400, storeHost=/120.27.31.198:10911, msgId=781B1FC600002A9F0000000000008D28, commitLogOffset=36136, bodyCRC=1250039395, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=51, CONSUME_START_TIME=1551354411040, UNIQ_KEY=1E057C08A74518B4AAC28F4A3DB10002, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 50], transactionId='null'}]] 
ConsumeMessageThread_5 Receive New Messages: [MessageExt [queueId=0, storeSize=178, queueOffset=50, sysFlag=0, bornTimestamp=1551354407314, bornHost=/42.120.74.97:44264, storeTimestamp=1551354407361, storeHost=/120.27.31.198:10911, msgId=781B1FC600002A9F0000000000008C76, commitLogOffset=35958, bodyCRC=1401636825, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=51, CONSUME_START_TIME=1551354411042, UNIQ_KEY=1E057C08A74518B4AAC28F4A3D920001, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 49], transactionId='null'}]] 
  • SendResult 开头的日志代表发送消息
  • Receive New 开头的日志代表消费消息
相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
消息中间件 算法 Java
弥补延时消息的不足,RocketMQ 基于时间轮算法实现了定时消息!
弥补延时消息的不足,RocketMQ 基于时间轮算法实现了定时消息!
605 1
弥补延时消息的不足,RocketMQ 基于时间轮算法实现了定时消息!
|
2月前
|
消息中间件 存储 弹性计算
消息队列RocketMQ版:基础消息收发功能体验
【2月更文挑战第1天】假期闲着无聊,随便体验一下。本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
|
7月前
|
消息中间件 弹性计算 网络安全
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
392 0
|
存储 消息中间件 Cloud Native
RocketMQ 消息收发弹性--生产集群如何解决大促场景消息收发的弹性&降本诉求|学习笔记
快速学习 RocketMQ 消息收发弹性--生产集群如何解决大促场景消息收发的弹性&降本诉求
210 0
RocketMQ 消息收发弹性--生产集群如何解决大促场景消息收发的弹性&降本诉求|学习笔记
|
消息中间件 Apache RocketMQ
《万亿级数据洪峰下的消息引擎——Apache RocketMQ》电子版地址
万亿级数据洪峰下的消息引擎——Apache RocketMQ
311 0
《万亿级数据洪峰下的消息引擎——Apache RocketMQ》电子版地址
|
消息中间件 存储 缓存
RocketMQ Schema——让消息成为流动的结构化数据
RocketMQ Schema 提供了对消息的数据结构托管服务,同时为原生客户端提供了较为丰富的序列化/反序列化 SDK ,补齐了 RocketMQ 在数据治理和业务上下游解耦方面的短板,让数据成为流动的结构化数据,那么快来了解下实现原理吧~
468 0
RocketMQ Schema——让消息成为流动的结构化数据
|
消息中间件 存储 缓存
简述RocketMQ消息拉取过程【二】
简述RocketMQ消息拉取过程【二】
902 1
|
消息中间件 RocketMQ
简述RocketMQ消息拉取过程【一】
简述RocketMQ消息拉取过程【一】
634 0
|
消息中间件 缓存 负载均衡
RocketMQ消息生产者是如何选择Broker的
RocketMQ消息生产者是如何选择Broker的
474 1
|
消息中间件 存储 前端开发
同步异步调用,并谈谈消息队列mq;RocketMQ发送消息和消费消息测试类
同步调用优点: 时效性强,打电话、直播,很快可以得到结果 同步调用的问题:
483 1