Redis笔记(七)Java实现Redis消息队列

简介: 这里我使用Redis的发布、订阅功能实现简单的消息队列,基本的命令有publish、subscribe等。 在Jedis中,有对应的java方法,但是只能发布字符串消息。为了传输对象,需要将对象进行序列化,并封装成字符串进行处理。

这里我使用Redis的发布、订阅功能实现简单的消息队列,基本的命令有publish、subscribe等。

在Jedis中,有对应的java方法,但是只能发布字符串消息。为了传输对象,需要将对象进行序列化,并封装成字符串进行处理。


使用Redis实现消息队列

1.封装一个消息对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public  class  Message  implements  Serializable{
 
private  static  final  long  serialVersionUID = 1L;
 
private  String titile;
private  String info;
 
public  Message(String titile,String info){
this .titile=titile;
this .info=info;
}
 
public  String getTitile() {
return  titile;
}
public  void  setTitile(String titile) {
this .titile = titile;
}
public  String getInfo() {
return  info;
}
public  void  setInfo(String info) {
this .info = info;
}
}

  

2.为这个消息对象提供序列化方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public  class  MessageUtil {
 
//convert To String
public  static  String convertToString(Object obj,String charset)  throws  IOException{
 
ByteArrayOutputStream bo =  new  ByteArrayOutputStream();
ObjectOutputStream oo =  new  ObjectOutputStream(bo);
oo.writeObject(obj);
String str = bo.toString(charset);
bo.close();
oo.close();
return  str;
}
 
//convert To Message
public  static  Object convertToMessage( byte [] bytes)  throws  Exception{
ByteArrayInputStream in =  new  ByteArrayInputStream(bytes);
ObjectInputStream sIn =  new  ObjectInputStream(in);
return  sIn.readObject();
 
}
}

  

3.从Jedis连接池中获取连接

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public  class  RedisUtil {
 
/**
* Jedis connection pool
* @Title: config
*/
public  static  JedisPool getJedisPool(){
ResourceBundle bundle=ResourceBundle.getBundle( "redis" );
String host=bundle.getString( "host" );
int  port=Integer.valueOf(bundle.getString( "port" ));
int  timeout=Integer.valueOf(bundle.getString( "timeout" ));
//  String password=bundle.getString("password");
 
JedisPoolConfig config= new  JedisPoolConfig();
config.setMaxActive(Integer.valueOf(bundle.getString( "maxActive" )));
config.setMaxWait(Integer.valueOf(bundle.getString( "maxWait" )));
config.setTestOnBorrow(Boolean.valueOf(bundle.getString( "testOnBorrow" )));
config.setTestOnReturn(Boolean.valueOf(bundle.getString( "testOnReturn" )));
 
JedisPool pool= new  JedisPool(config, host, port, timeout);
 
return  pool;
}
}

  

4.创建Provider类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public  class  Producer {
 
private  Jedis jedis;
private  JedisPool pool;
 
public  Producer(){
pool=RedisUtil.getJedisPool();
jedis = pool.getResource();
}
 
 
public  void  provide(String channel,Message message)  throws  IOException{
String str1=MessageUtil.convertToString(channel, "UTF-8" );
String str2=MessageUtil.convertToString(message, "UTF-8" );
jedis.publish(str1, str2);
}
 
//close the connection
public  void  close()  throws  IOException {
//将Jedis对象归还给连接池,关闭连接
pool.returnResource(jedis);
}
}

  

5.创建Consumer类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
public  class  Consumer {
 
private  Jedis jedis;
private  JedisPool pool;
 
public  Consumer(){
pool=RedisUtil.getJedisPool();
jedis = pool.getResource();
}
 
 
public  void  consum(String channel)  throws  IOException{
JedisPubSub jedisPubSub =  new  JedisPubSub() {
// 取得订阅的消息后的处理
public  void  onMessage(String channel, String message) {
System.out.println( "Channel:" +channel);
System.out.println( "Message:" +message.toString());
}
 
// 初始化订阅时候的处理
public  void  onSubscribe(String channel,  int  subscribedChannels) {
System.out.println( "onSubscribe:" +channel);
}
 
// 取消订阅时候的处理
public  void  onUnsubscribe(String channel,  int  subscribedChannels) {
System.out.println( "onUnsubscribe:" +channel);
}
 
// 初始化按表达式的方式订阅时候的处理
public  void  onPSubscribe(String pattern,  int  subscribedChannels) {
// System.out.println(pattern + "=" + subscribedChannels);
}
 
// 取消按表达式的方式订阅时候的处理
public  void  onPUnsubscribe(String pattern,  int  subscribedChannels) {
// System.out.println(pattern + "=" + subscribedChannels);
}
 
// 取得按表达式的方式订阅的消息后的处理
public  void  onPMessage(String pattern, String channel, String message) {
System.out.println(pattern +  "="  + channel +  "="  + message);
}
};
 
jedis.subscribe(jedisPubSub, channel);
}
 
//close the connection
public  void  close()  throws  IOException {
//将Jedis对象归还给连接池
pool.returnResource(jedis);
}
}

  

6.测试方法

1
2
3
4
5
6
7
8
9
10
11
12
13
public  static  void  main(String[] args){
 
Message msg= new  Message( "hello!" "this is the first message!" );
 
Producer producer= new  Producer();
Consumer consumer= new  Consumer();
try  {
producer.provide( "chn1" ,msg);
consumer.consum( "chn1" );
catch  (IOException e) {
e.printStackTrace();
}
}

  

 


相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore     ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库 ECS 实例和一台目标数据库 RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
目录
相关文章
|
9天前
|
消息中间件 NoSQL Java
springboot redis 实现消息队列
springboot redis 实现消息队列
23 1
|
15天前
|
算法 搜索推荐 Java
数据结构与算法(Java篇)笔记--希尔排序
数据结构与算法(Java篇)笔记--希尔排序
|
15天前
|
算法 搜索推荐 Java
数据结构与算法(Java篇)笔记--快速排序
数据结构与算法(Java篇)笔记--快速排序
|
15天前
|
机器学习/深度学习 算法 搜索推荐
数据结构与算法(Java篇)笔记--归并排序
数据结构与算法(Java篇)笔记--归并排序
|
15天前
|
算法 搜索推荐 Java
数据结构与算法(Java篇)笔记--插入排序
数据结构与算法(Java篇)笔记--插入排序
|
15天前
|
算法 搜索推荐 Java
数据结构与算法(Java篇)笔记--选择排序
数据结构与算法(Java篇)笔记--选择排序
|
15天前
|
算法 搜索推荐 Java
数据结构与算法(Java篇)笔记--冒泡排序
数据结构与算法(Java篇)笔记--冒泡排序
|
15天前
|
算法 搜索推荐 Java
数据结构与算法(Java篇)笔记--Comparable接口
数据结构与算法(Java篇)笔记--Comparable接口
|
22天前
|
消息中间件 存储 缓存
【Redis实战】有MQ为啥不用?用Redis作消息队列!?Redis作消息队列使用方法及底层原理高级进阶
【Redis实战】有MQ为啥不用?用Redis作消息队列!?Redis作消息队列使用方法及底层原理高级进阶
|
25天前
|
消息中间件 存储 监控
RabbitMQ:分布式系统中的高效消息队列
RabbitMQ:分布式系统中的高效消息队列

热门文章

最新文章

相关产品

  • 云消息队列 Kafka 版
  • 消息服务
  • 云消息队列 MQ