bin-log-distributor消费数据丢失问题解决记录

本文涉及的产品
云数据库 Redis 版,社区版 2GB
推荐场景:
搭建游戏排行榜
简介: bin-log-distributor项目简介 bin-log-distributor是凯京科技开源的Mysql数据库数据变动实时监听分发中间件,详情见码云开源地址, github开源地址 背景 线上反馈有bin-log-distributor客户端偶尔有丢失数据的情况。

bin-log-distributor项目简介

bin-log-distributor是凯京科技开源的Mysql数据库数据变动实时监听分发中间件,详情见码云开源地址, github开源地址

背景

线上反馈有bin-log-distributor客户端偶尔有丢失数据的情况。

验证方法

在装mysql的服务器上,使用多个线程并发循环对bin-log-distributor监控的数据表进行增、删、改。

预期

bin-log-distributor客户端出现数据丢失。

定位

分析客户端收到的数据,发现不仅有数据丢失,还有数据重复消费。
将客户端日志打印更仔细,发现以下两点问题:

  1. 从redis队列peek数据后,remove抛出如下异常:
    a
  2. Redis队列数据没消费完其他线程异常进行了消费:
    b

分析如下消费redis数据方法源码:

private void doRunWithLock() {
    RLock rLock = redissonClient.getLock(dataKeyLock);
    EventBaseDTO dto;
    try {
        RQueue<EventBaseDTO> queue = redissonClient.getQueue(dataKey);
        // 尝试加锁,最多等待20秒,上锁以后30秒自动解锁
        boolean lockRes = rLock.tryLock(20 * 1000, 3 * retryInterval,TimeUnit.MILLISECONDS);
        //拿到锁且 队列不为空 进入
        if (lockRes && !queue.isEmpty()) {
            //让这个线程把队列里的全部处理完吧
            rLock.lock();
            //DATA_KEY_IN_PROCESS.add(dataKey);
            while ((dto = queue.peek()) != null) {
                //处理完毕,把数据从队列摘除
                boolean handleRes = doHandleWithLock(dto, 0);
                if (handleRes) {
                    queue.remove();
                }
            }
        }
    } catch (Exception e) {
        e.printStackTrace();
        log.severe("接收处理数据失败:" + e.toString());
    } finally {
        rLock.forceUnlock();
        rLock.delete();
        //DATA_KEY_IN_PROCESS.remove(dataKey);
    }
}

猜测是redis RLock并没有效,于是在rLock.tryLock() 和 rLock.delete() 方法后面分别打印出 acquireLock 和 releaseLock的日志,再次执行测试,得到如下日志:
c

发现 pool-2-thread-1线程成功获取锁后,pool-2-thread-2获取锁失败,并立刻将pool-2-thread-1 持有的锁release了。结合源码发现pool-2-thread-2获取锁失败后,执行了finally代码块中的rLock.forceUnlock()方法,查阅资料发现forceUnlock()方法可以直接释放相同key的其他线程持有的锁。

至此,定位到问题为:每次收到数据起一个线程执行doRunWithLock()方法,当数据库操作较频繁时,一个线程获取到的redis锁会被另一个线程release掉,导致redis锁内部本来应该是单例执行的消费redis队列数据变成了多个线程同时执行,从而出现多个线程消费队列数据冲突,引发数据丢失和重复消费的问题。

解决

确保每个线程获取到的Redis锁只能由当前线程release(),更改后的代码如下:

private void doRunWithLock() {
    RLock rLock = redissonClient.getLock(dataKeyLock);
    EventBaseDTO dto;
    boolean lockRes = false;
    try {
        // 尝试加锁,最多等待50ms(防止过多线程等待)
        // 上锁以后6个小时自动解锁(防止redis队列太长,当前拿到锁的线程处理时间过长)
        lockRes = rLock.tryLock(50, 6 * 3600 * 1000, TimeUnit.MILLISECONDS);
        if (!lockRes) {
            return;
        }
        DATA_KEY_IN_PROCESS.add(dataKey);
        //拿到锁之后再获取队列
        RQueue<EventBaseDTO> queue = redissonClient.getQueue(dataKey);
        if (!queue.isExists() || queue.isEmpty()) {
            return;
        }
        //拿到锁且 队列不为空 进入
        while ((dto = queue.peek()) != null) {
            //处理完毕,把数据从队列摘除
            boolean handleRes = doHandleWithLock(dto, 0);
            if (handleRes) {
                queue.remove();
            }
        }
    } catch (Exception e) {
        e.printStackTrace();
        log.severe("接收处理数据失败:" + e.toString());
 
    } finally {
        //forceUnlock是可以释放别的线程拿到的锁的,需要判断是否是当前线程持有的锁
        if (lockRes) {
            rLock.forceUnlock();
            rLock.delete();
            DATA_KEY_IN_PROCESS.remove(dataKey);
        }
    }
}

回归

更改后执行继续执行相同的测试用例,得到如下结果日志:
d

pool-2-thread-2在获取到redis锁之后一直持续消费redis队列中的数据,其他线程尝试获取redis锁失败后,不再强制release该锁,客户端消费条数、类型结果正确。

完成

修改测试用例,使用更多线程、执行更多数据库操作,分析消费结果,结果数据条数正确,未出现数据丢失、重复消费等问题。
确认结果回归结果正确,提交代码。

相关实践学习
日志服务之使用Nginx模式采集日志
本文介绍如何通过日志服务控制台创建Nginx模式的Logtail配置快速采集Nginx日志并进行多维度分析。
相关文章
|
28天前
|
SQL 关系型数据库 MySQL
MySQL数据库,可以使用二进制日志(binary log)进行时间点恢复
对于MySQL数据库,可以使用二进制日志(binary log)进行时间点恢复。二进制日志是MySQL中记录所有数据库更改操作的日志文件。要进行时间点恢复,您需要执行以下步骤: 1. 确保MySQL配置文件中启用了二进制日志功能。在配置文件(通常是my.cnf或my.ini)中找到以下行,并确保没有被注释掉: Copy code log_bin = /path/to/binary/log/file 2. 在需要进行恢复的时间点之前创建一个数据库备份。这将作为恢复的基准。 3. 找到您要恢复到的时间点的二进制日志文件和位置。可以通过执行以下命令来查看当前的二进制日志文件和位
|
6月前
|
C++
C++-实现日志log功能
C++-实现日志log功能
|
25天前
|
Java
log4j2定期删除日志文件的配置
确保将以上配置嵌入到你的Log4j 2配置文件中,并根据项目的需求进行适当的调整。
26 1
|
4月前
|
消息中间件 Java RocketMQ
修改rocketmq的日志文件位置
修改rocketmq的日志文件位置
63 0
修改rocketmq的日志文件位置
|
9月前
|
存储 SQL 机器学习/深度学习
MySQL 日志体系解析:保障数据一致性与恢复的三位英雄:Redo Log、Undo Log、Bin Log
MySQL 日志体系解析:保障数据一致性与恢复的三位英雄:Redo Log、Undo Log、Bin Log
214 0
|
消息中间件 Java Kafka
kafka log4j日志级别修改,一天生成一个日志文件
kafka log4j日志级别修改,一天生成一个日志文件
|
存储 SQL 缓存
MySQL日志系统redo log(两阶段提交)和binlog
MySQL日志系统redo log(两阶段提交)和binlog
164 0
MySQL日志系统redo log(两阶段提交)和binlog
|
Java
加入log4j日志功能
加入log4j日志功能
129 0
|
SQL 存储 缓存
分析MySQL执行的流程(连接、缓存、分析、优化、执行、Undo Log、Binlog、Redo Log)
熟悉MySQL的都知道MySQL服务端实现主要分为Server层和存储引擎层。Server层负责接收和管理客户端连接、管理缓存、解析SQL、优化SQL、调用存储引擎执行SQL;存储引擎层主要负责存储、查询数据。
分析MySQL执行的流程(连接、缓存、分析、优化、执行、Undo Log、Binlog、Redo Log)