借助Redis完成延时任务

本文涉及的产品
云数据库 Redis 版,社区版 2GB
推荐场景:
搭建游戏排行榜
简介:

借助Redis完成延时任务
背景
相信我们或多或少的会遇到类似下面这样的需求:

第三方给了一批数据给我们处理,我们处理好之后就通知他们处理结果。

大概就是下面这个图说的。

本来在处理完数据之后,我们就会马上把处理结果返回给对方,但是对方要求我们处理速度不能过快,要有一种人为处理的效果。

换句话就是说,就算是处理好了,也要晚一点再执行通知操作。

这就是一个典型的延时任务。

延时,那还不简单,执行完之后,让它Sleep一下就好了,这样就达到目标了。

Sleep一下确定是最容易实现的一种方案,但是试想一下,数据的数量不断的增加,这样Sleep真的好吗?答案是否定的。

延时队列,是处理这个场景最为妥当的方案。

RabbitMQ,RocketMQ,Cmq等都可以直接或间接的达到相应的效果。

如果不具备队列条件,又要怎么处理呢?还可以借助Redis来完成这项工作。

MQ不一定每个公司都会用,但Redis应该80%以上的都会用吧。

处理方案
Redis这边,可用的方案有两种,下面分别来介绍一下。

1 键的过期时间

在设置缓存的时候,我们比较多情况下都会设置一个缓存的过期时间,这个时间过期后,会重新去数据源拿数据回来。

可以基于这个过期时间结合Redis的keyspace notifications共同完成。

keyspace notifications里面包含了非常多的事件,这里只关注EXPIRE,这个是和过期有关的。

只要订阅了__keyevent@0__:expired这个主题,当有key过期的时候,就会收到对应的信息。

注:主题@后面的0,指的是db 0.

要想使用这个特性,必不可少的一步是修改Redis默认的配置,把notify-keyspace-events设置成Ex。

Event notification

Redis can notify Pub/Sub clients about events happening in the key space.

This feature is documented at http://redis.io/topics/notifications

.........

By default all notifications are disabled because most users don't need

this feature and the feature has some overhead. Note that if you don't

specify at least one of K or E, no events will be delivered.

notify-keyspace-events "Ex"
其中 E 指的是键事件通知,x 指的是过期事件。

根据这个特性,重新调整一下流程图:

应该也比较好懂,下面通过简单的代码来实现一下这种方案。

首先是处理完数据及往Redis写数据。

public async Task DoTaskAsync()
{

// 数据处理
// ...

// 后续操作要延时,把Id记录下来
var taskId = new Random().Next(1, 10000);
// 要延迟的时间
int sec = new Random().Next(1, 5);

// 可以加个重试机制,预防单次执行失败。
await RedisHelper.SetAsync($"task:{taskId}", "1", sec);

}
还需要回传结果的后台任务,这个任务就是去订阅上面说的键过期事件,然后回传结果。

这里可以借助BackgroundService来订阅处理。

public class SubscribeTaskBgTask : BackgroundService
{

protected override Task ExecuteAsync(CancellationToken stoppingToken)
{
    stoppingToken.ThrowIfCancellationRequested();
    var keyPrefix = "task:";
    RedisHelper.Subscribe(
        ("__keyevent@0__:expired", arg =>
            {
                var msg = arg.Body;
                Console.WriteLine($"recive {msg}");
                if (msg.StartsWith(keyPrefix))
                {
                    // 取到任务Id
                    var val = msg.Substring(keyPrefix.Length);
                    Console.WriteLine($"{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")} begin to do task {val}");
                    
                    // 回传处理结果给第三方,这里可以考虑这个并发锁,避免多实例都处理了这个任务。
                    // ....
                }
            }
        ));

    return Task.CompletedTask;
}

}
这里有一个要注意的地方,要在key里面包含任务的Id,因为订阅处理的时候,只能拿到一个key,后续能做的操作也只是基于这个key。

上面的例子,是用了task:任务Id的形式,所以在订阅处理的时候,只处理以task:开头的那些key。

效果如下:

这种方案,直观上是非常简单的,不过这种方案会遇到一个小问题。

当一个key过期后,并不一定会马上收到通知,这个也是会有一定的延时的,取决于Redis的内部机制。

Redis Keyspace Notifications文档的最后一段也提到了这个问题。

所以用这种方案的时候,要考虑一下,你的延时是不是要及时~~

2 有序集合

有序集合是Redis中一种十分有用的数据结构,它的本质其实就是集合加了一个排序的功能,每个集合里面的元素还会有一个分值的属性。

它提供了一个可以获取指定分值范围内的元素,这个也就是我们的出发点。

在这个场景下,什么东西可能作为这个分值呢?现在只有一个处理任务的Id还有一个延迟的时间,Id肯定不行,那么也只能是延迟时间来作这个分值了。

延迟1秒,5秒,1分钟,这个都是比较大粒度的时间,这里要转化一下,用时间戳来代替这些延迟的时间。

假设现在的时间戳是 1584171520, 要延迟5秒执行,那么执行任务的时间就是 1584171525,在当前时间戳的基础上加个5秒,就是最终要执行的了。

到时有序集合中存的元素就会是这样的

任务Id-1 1584171525
任务Id-2 1584171528
任务Id-3 1584171530
接下来就是要怎么取出这些任务的问题了!

把当前时间戳当成是取数的最大分值,0作为最小分值,这个时候取出的元素就是应该要执行回传的任务了。

根据这个方案,重新调整一下流程图:

交代清楚了思路,再来点代码,加深一下理解。

首先还是处理完数据后往Redis写数据。

public async Task DoTaskAsync()
{

// 数据处理
// ...

// 后续操作要延时,把Id记录下来
var taskId = new Random().Next(1, 10000);

var cacheKey = "task:delay";
int sec = new Random().Next(1, 5);

// 要执行这个任务的时间戳
var time = DateTimeOffset.Now.AddSeconds(sec).ToUnixTimeSeconds();

await RedisHelper.ZAddAsync(cacheKey, (time, taskId));
Console.WriteLine($"{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")} done {taskId} here - {sec}");

}
后面就是轮训有序集合里面的元素了,这里同样是借助BackgroundService来处理。

public class SubscribeTaskBgTask : BackgroundService
{

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
    stoppingToken.ThrowIfCancellationRequested();
    var cacheKey = "task:delay";
    while (true)
    {
        // 先取,后删,不具备原子性,可考虑用lua脚本来保证原子性。
        var vals = await RedisHelper.ZRangeByScoreAsync(cacheKey, -1, DateTimeOffset.Now.ToUnixTimeSeconds(), 1, 0);

        if (vals != null && vals.Length > 0)
        {
            var val = vals[0];

            var rmCount = await RedisHelper.ZRemAsync(cacheKey, vals);

            if (rmCount > 0)
            {
                // 要把这个元素先删除成功了,再执行任务,不然会重复
                Console.WriteLine($"{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")} begin to do task {val}");
                
                // 回传处理结果给第三方,这里可以考虑这个并发锁,避免多实例都处理了这个任务。
                // ....
            }
        }
        else
        {
            // 没有数据,休眠500ms,避免CPU空转
            await Task.Delay(500);
        }
    }
}

}
效果如下:

参考文章
https://redis.io/topics/notifications

https://zhuanlan.zhihu.com/p/87113913

如果您认为这篇文章还不错或者有所收获,可以点击右下角的【推荐】按钮,因为你的支持是我继续写作,分享的最大动力!
作者:Catcher ( 黄文清 )
来源:http://catcher1994.cnblogs.com/

相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore     ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库 ECS 实例和一台目标数据库 RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
10天前
|
缓存 NoSQL Java
面试官:Redis如何实现延迟任务?
延迟任务是计划任务,用于在未来特定时间执行。常见应用场景包括定时通知、异步处理、缓存管理、计划任务、订单处理、重试机制、提醒和数据采集。Redis虽无内置延迟任务功能,但可通过过期键通知、ZSet或Redisson实现。然而,这种方法精度有限,稳定性较差,适合轻量级需求。Redisson的RDelayedQueue提供更简单的延迟队列实现。
47 9
|
2月前
|
存储 NoSQL API
【小小思考】Redis实现去重任务队列
【2月更文挑战第1天】思考一下如何用Redis实现去重的任务队列,主要有List 、List + Set/Hash/Bloom Filter、ZSet、Lua和开源库等方式。
75 1
|
2月前
|
缓存 NoSQL Java
一次访问Redis延时高问题排查与总结
作者抽丝剥茧的记录了一次访问Redis延时高问题的排查和总结。
397 1
|
7月前
|
Arthas NoSQL Java
一次访问Redis延时高问题排查与总结(2)
本文是一次访问Redis延时高问题排查与总结的续篇,主要讲述了当时没有发现的一些问题和解决方案。
46950 22
|
7月前
|
NoSQL 安全 容灾
1分钟实现Redis数据迁移任务
NineData 基于全量复制、增量日志复制技术,提供了高效、安全可靠的 Redis 不停机迁移方案。当然,除了 Redis,NineData 已经支持数十种常见数据库的迁移复制,实现数据库迁移、数据容灾、数据双活、数据仓库实时集成等业务场景。同时,除了 SAAS 模式外,还提供了企业专属集群模式,满足企业最高的数据安全合规要求。
157 0
|
7月前
|
监控 NoSQL Java
面试官:Redis分布式锁超时了,任务还没执行完怎么办?
今天主要分享的是面试中常见的redis的一些面试内容。如果你正好需要刚好可以帮你回顾一下,如果不需要可以收藏起来后面用到的时候翻出来回顾。
|
10月前
|
NoSQL Redis
【Redis原理机制 四】基于Redis实现延时任务
【Redis原理机制 四】基于Redis实现延时任务
34 0
|
消息中间件 缓存 NoSQL
SpringBoot应用篇基于Redis实现延时队列
延时队列,相信各位小伙伴并不会陌生,jdk原生提供了延时队列的使用,当然我们这里介绍的不是这种;在实际的项目中,如果我们有延时队列的场景,可以怎样去实现呢 举一个简单的例子,如下单15分钟内,若没有支付,则自动取消订单 本文将介绍一种非常非常简单的实现方式
444 0
SpringBoot应用篇基于Redis实现延时队列
|
机器学习/深度学习 Prometheus 运维
使用Redis实现延时任务(二)
前一篇文章通过Redis的有序集合Sorted Set和调度框架Quartz实例一版简单的延时任务,但是有两个相对重要的问题没有解决:分片、监控。这篇文章的内容就是要完善这两个方面的功能。前置文章:使用Redis实现延时任务(一)。
306 0
使用Redis实现延时任务(二)
|
机器学习/深度学习 JSON 监控
使用Redis实现延时任务(一)(下)
最近在生产环境刚好遇到了延时任务的场景,调研了一下目前主流的方案,分析了一下优劣并且敲定了最终的方案。这篇文章记录了调研的过程,以及初步方案的实现。
126 0
使用Redis实现延时任务(一)(下)

热门文章

最新文章