Redis学习笔记~分布式的Pub/Sub模式

本文涉及的产品
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
云数据库 Tair(兼容Redis),内存型 2GB
简介:

redis的客户端有很多,这次用它的pub/sub发布与订阅我选择了StackExchange.Redis,发布与订阅大家应该很清楚了,首先一个订阅者,订阅一个服务,服务执行一些处理程序(可能是写个日志,插入个数据,发个email)然后当另一个项目的某个业务发布这个服务后,被订阅的程序将会被执行,这个听起来很有意思,redis有好的实现了这个pub/sub功能。

看一下结构图

 

Sub订阅(消息消费者)

对于订阅方,这里类似于一个服务,它会长期运行着,被启动后动态订阅一些服务进来,以便以后再被其它服务调用

        //sub a function in A-project
            PubSubManager.Instance.Subscribe("UserLog", (msg) =>
            {
                //订阅者处理自己的业务逻辑,这相关于队列服务要干的事
                Console.WriteLine(msg);
            });

Pub发布(消息生产者)

而对于其它项目,如A网站,它可能在被在POST动作后发布这个UserLog的服务,这时上面的订阅方将会消费它(消费者模式),或者服务代码被执行!

     [HttpPost]
        public ActionResult Index(string user)
        {
            PubSubManager.Instance.Publish("UserLog", user + "这个用户提交表单了");
            return View();
        }

对于一直运行的服务,将会收到来自不同项目的消息,而它只负责消费它!

Lind.DDD.PublishSubscribe

封装了一些标准的pub/sub方法,它可以有多种实现方法,本例使用redis这个中间件

   /// <summary>
    /// 发布订阅的接口规则
    /// </summary>
    public interface IPubSub
    {
        /// <summary>
        /// 发布,有顺序,对象源是字符串
        /// </summary>
        /// <param name="channel"></param>
        /// <param name="value"></param>
        void Publish(string channel, string value);
        /// <summary>
        /// 订阅,对象源是字符串
        /// </summary>
        /// <param name="channel"></param>
        /// <param name="action"></param>
        void Subscribe(string channel, Action<string> action);
        /// <summary>
        /// 异步发布,无顺序,对象源是字符串
        /// </summary>
        /// <param name="channel"></param>
        /// <param name="value"></param>
        void PublishAsync(string channel, string value);
        /// <summary>
        /// 异步订阅,无顺序,对象源是字符串
        /// </summary>
        /// <param name="channel"></param>
        /// <param name="action"></param>
        void SubscribeAsync(string channel, Action<string> action);

        /// <summary>
        /// 发布,有顺序,对象源是Byte[]
        /// </summary>
        /// <param name="channel"></param>
        /// <param name="value"></param>
        void PublishByte(string channel, byte[] value);
        /// <summary>
        /// 订阅,对象源是Byte[]
        /// </summary>
        /// <param name="channel"></param>
        /// <param name="action"></param>
        void SubscribeByte(string channel, Action<byte[]> action);
        /// <summary>
        /// 异步发布,有顺序,对象源是Byte[]
        /// </summary>
        /// <param name="channel"></param>
        /// <param name="value"></param>
        void PublishByteAsync(string channel, byte[] value);
        /// <summary>
        /// 异步订阅,对象源是Byte[]
        /// </summary>
        /// <param name="channel"></param>
        /// <param name="action"></param>
        void SubscribeByteAsync(string channel, Action<byte[]> action);

        /// <summary>
        /// 发布,有顺序,对象源是泛型对象
        /// </summary>
        /// <typeparam name="T"></typeparam>
        /// <param name="channel"></param>
        /// <param name="value"></param>
        void Publish<T>(string channel, T value);
        /// <summary>
        /// 订阅,对象源是泛型对象
        /// </summary>
        /// <typeparam name="T"></typeparam>
        /// <param name="channel"></param>
        /// <param name="action"></param>
        void Subscribe<T>(string channel, Action<T> action);
        /// <summary>
        /// 异步发布,有顺序,对象源是泛型对象
        /// </summary>
        /// <typeparam name="T"></typeparam>
        /// <param name="channel"></param>
        /// <param name="value"></param>
        void PublishAsync<T>(string channel, T value);
        /// <summary>
        /// 异步订阅,对象源是泛型对象
        /// </summary>
        /// <typeparam name="T"></typeparam>
        /// <param name="channel"></param>
        /// <param name="action"></param>
        void SubscribeAsync<T>(string channel, Action<T> action);
        /// <summary>
        /// 取消指定订阅
        /// </summary>
        /// <param name="channel"></param>
        void UnSubscribe(string channel);
        /// <summary>
        /// 取消所有订阅
        /// </summary>
        /// <param name="channel"></param>
        void UnSubscribeAll();
    }

 本文转自博客园张占岭(仓储大叔)的博客,原文链接:Redis学习笔记~分布式的Pub/Sub模式,如需转载请自行联系原博主。

相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
分布式爬虫框架Scrapy-Redis实战指南
本文介绍如何使用Scrapy-Redis构建分布式爬虫系统,采集携程平台上热门城市的酒店价格与评价信息。通过代理IP、Cookie和User-Agent设置规避反爬策略,实现高效数据抓取。结合价格动态趋势分析,助力酒店业优化市场策略、提升服务质量。技术架构涵盖Scrapy-Redis核心调度、代理中间件及数据解析存储,提供完整的技术路线图与代码示例。
124 0
分布式爬虫框架Scrapy-Redis实战指南
Redis 逻辑数据库与集群模式详解
Redis 是高性能内存键值数据库,广泛用于缓存与实时数据处理。本文深入解析 Redis 逻辑数据库与集群模式:逻辑数据库提供16个独立存储空间,适合小规模隔离;集群模式通过分布式架构支持高并发和大数据量,但仅支持 database 0。文章对比两者特性,讲解配置与实践注意事项,并探讨持久化及性能优化策略,助你根据需求选择最佳方案。
50 5
【📕分布式锁通关指南 02】基于Redis实现的分布式锁
本文介绍了从单机锁到分布式锁的演变,重点探讨了使用Redis实现分布式锁的方法。分布式锁用于控制分布式系统中多个实例对共享资源的同步访问,需满足互斥性、可重入性、锁超时防死锁和锁释放正确防误删等特性。文章通过具体示例展示了如何利用Redis的`setnx`命令实现加锁,并分析了简化版分布式锁存在的问题,如锁超时和误删。为了解决这些问题,文中提出了设置锁过期时间和在解锁前验证持有锁的线程身份的优化方案。最后指出,尽管当前设计已解决部分问题,但仍存在进一步优化的空间,将在后续章节继续探讨。
543 131
【📕分布式锁通关指南 02】基于Redis实现的分布式锁
Redis分布式锁如何实现 ?
Redis分布式锁主要依靠一个SETNX指令实现的 , 这条命令的含义就是“SET if Not Exists”,即不存在的时候才会设置值。 只有在key不存在的情况下,将键key的值设置为value。如果key已经存在,则SETNX命令不做任何操作。 这个命令的返回值如下。 ● 命令在设置成功时返回1。 ● 命令在设置失败时返回0。 假设此时有线程A和线程B同时访问临界区代码,假设线程A首先执行了SETNX命令,并返回结果1,继续向下执行。而此时线程B再次执行SETNX命令时,返回的结果为0,则线程B不能继续向下执行。只有当线程A执行DELETE命令将设置的锁状态删除时,线程B才会成功执行S
【📕分布式锁通关指南 03】通过Lua脚本保证redis操作的原子性
本文介绍了如何通过Lua脚本在Redis中实现分布式锁的原子性操作,避免并发问题。首先讲解了Lua脚本的基本概念及其在Redis中的使用方法,包括通过`eval`指令执行Lua脚本和通过`script load`指令缓存脚本。接着详细展示了如何用Lua脚本实现加锁、解锁及可重入锁的功能,确保同一线程可以多次获取锁而不发生死锁。最后,通过代码示例演示了如何在实际业务中调用这些Lua脚本,确保锁操作的原子性和安全性。
152 6
【📕分布式锁通关指南 03】通过Lua脚本保证redis操作的原子性
|
3月前
|
【📕分布式锁通关指南 04】redis分布式锁的细节问题以及RedLock算法原理
本文深入探讨了基于Redis实现分布式锁时遇到的细节问题及解决方案。首先,针对锁续期问题,提出了通过独立服务、获取锁进程自己续期和异步线程三种方式,并详细介绍了如何利用Lua脚本和守护线程实现自动续期。接着,解决了锁阻塞问题,引入了带超时时间的`tryLock`机制,确保在高并发场景下不会无限等待锁。最后,作为知识扩展,讲解了RedLock算法原理及其在实际业务中的局限性。文章强调,在并发量不高的场景中手写分布式锁可行,但推荐使用更成熟的Redisson框架来实现分布式锁,以保证系统的稳定性和可靠性。
95 0
【📕分布式锁通关指南 04】redis分布式锁的细节问题以及RedLock算法原理
【YashanDB 知识库】用 yasldr 配置 Bulkload 模式作单线程迁移 300G 的业务数据到分布式数据库,迁移任务频繁出错
问题描述 详细版本:YashanDB Server Enterprise Edition Release 23.2.4.100 x86_64 6db1237 影响范围: 离线数据迁移场景,影响业务数据入库。 外场将部分 NewCIS 的报表业务放到分布式数据库,验证 SQL 性能水平。 操作系统环境配置: 125G 内存 32C CPU 2T 的 HDD 磁盘 问题出现的步骤/操作: 1、部署崖山分布式数据库 1mm 1cn 3dn 单线启动 yasldr 数据迁移任务,设置 32 线程的 bulk load 模式 2、观察 yasldr.log 是否出现如下错
|
3月前
|
Springboot使用Redis实现分布式锁
通过这些步骤和示例,您可以系统地了解如何在Spring Boot中使用Redis实现分布式锁,并在实际项目中应用。希望这些内容对您的学习和工作有所帮助。
230 83
太惨痛: Redis 分布式锁 5个大坑,又大又深, 如何才能 避开 ?
Redis分布式锁在高并发场景下是重要的技术手段,但其实现过程中常遇到五大深坑:**原子性问题**、**连接耗尽问题**、**锁过期问题**、**锁失效问题**以及**锁分段问题**。这些问题不仅影响系统的稳定性和性能,还可能导致数据不一致。尼恩在实际项目中总结了这些坑,并提供了详细的解决方案,包括使用Lua脚本保证原子性、设置合理的锁过期时间和使用看门狗机制、以及通过锁分段提升性能。这些经验和技巧对面试和实际开发都有很大帮助,值得深入学习和实践。
太惨痛: Redis 分布式锁 5个大坑,又大又深, 如何才能 避开 ?
AI助理

你好,我是AI助理

可以解答问题、推荐解决方案等