Redis源码学习——BIO

2年前 3540

 Redis源码学习之BIO

BIO顾名思义,background IO,是redis中运行的后台IO。 网上千篇一律的说法是redis是单线程单进程。 实际上redis运行过程中并不是严格单进程单线程应用。
Redis中的多进程:
在写入备份(RDB,AOF)的时候,会fork出子进程进行备份文件的写入。
Redis中的多线程:

  1. AOF的备份模式中,如果我们设置的是AOF_FSYNC_EVERYSEC(每秒备份一次,这个设置可理解为弱同步备份),redis会create一个backgroud线程,在这个线程中执行aof备份文件的写入。
  2. 新生成的AOF文件,在覆盖旧AOF文件时。 如果在此之前AOF备份已经开启,在执行该fd的close前,我们的Redis进程与旧的AOF文件存在引用, 旧的AOF文件不会真正被删除。 所以当我们执行close(oldfd)时,旧AOF文件的被打开该文件的进程数为0,即没有进程打开过这个文件,这时这个文件在执行close时会被真正删除。 而删除旧AOF文件可能会阻塞服务,所以我们将它放到另一个线程调用。
  3. 执行DEL操作,假如碰巧这个key对应有非常多对象,那么这个删除操作会阻塞服务器几秒钟时间, 所以将删除操作放到另一个线程执行。 具体可看这篇文章: Lazy Redis is better Redis

BIO

Redis将所有多线程操作封装到BIO中,在bio.c,bio.h中可以看到。 本文我们关注的不是具体的操作,而是Redis封装的BIO行为, 这个代码简洁,维护性好。 值得学习一下。
BIO提供以下几个api:

void bioInit(void); //初始化BIO
void bioCreateBackgroundJob(int type, void *arg1, void *arg2, void *arg3);   //新建一个BIO任务
unsigned long long bioPendingJobsOfType(int type);  //获取当前BIO任务类型,队列中待执行的任务个数
unsigned long long bioWaitStepOfType(int type); //阻塞等待某个类型的BIO任务的执行,返回等待任务个数
void bioKillThreads(void); //中断所有BIO进程

BIO操作的类型:

/* Background job opcodes */
#define BIO_CLOSE_FILE    0 /* 关闭文件*/
#define BIO_AOF_FSYNC     1 /* AOF写入  */
#define BIO_LAZY_FREE     2 /* 释放对象 */
#define BIO_NUM_OPS       3 /*BIO数*/

BIO对象:

static pthread_t bio_threads[BIO_NUM_OPS];  //BIO线程
static pthread_mutex_t bio_mutex[BIO_NUM_OPS]; //BIO每个线程的mutex锁变量
static pthread_cond_t bio_newjob_cond[BIO_NUM_OPS]; //BIO线程锁的条件变量, 监听这个条件变量唤起当前线程
static pthread_cond_t bio_step_cond[BIO_NUM_OPS]; //BIO线程阻塞锁,bioWaitStepOfType监听这个条件变量被通知该操作的执行。
static list *bio_jobs[BIO_NUM_OPS];
static unsigned long long bio_pending[BIO_NUM_OPS]; // BIO未执行的

我们先看初始化的时候执行的部分:

bioInit() {
    for (j = 0; j < BIO_NUM_OPS; j++) {
        void *arg = (void*)(unsigned long) j;
        if (pthread_create(&thread,&attr,bioProcessBackgroundJobs,arg) != 0) { // 初始化线程
            serverLog(LL_WARNING,"Fatal: Can't initialize Background Jobs.");
            exit(1);
        }
        bio_threads[j] = thread;
    }
}

主要功能分为两个部分:

  1. bioCreateBackgroundJob: 创建BIO任务,插入bio_jobs,并调用pthread_cond_signal,通知进程解锁。
  2. bioProcessBackgroundJobs: 执行BIO任务线程。 线程中通过pthread管理进程锁,当bioCreateBackgroundJob执行pthread_cond_signal通知到该任务对应的线程时,从bio_jobs读出上一个任务,并执行。

bioCreateBackgroundJob

void bioCreateBackgroundJob(int type, void *arg1, void *arg2, void *arg3) {
    struct bio_job *job = zmalloc(sizeof(*job));
    job->time = time(NULL);
    job->arg1 = arg1;
    job->arg2 = arg2;
    job->arg3 = arg3;
    pthread_mutex_lock(&bio_mutex[type]); // 加锁 保护bio_jobs和bio_pending的一致性
    listAddNodeTail(bio_jobs[type],job);  //插入到任务队列中
    bio_pending[type]++;  
    pthread_cond_signal(&bio_newjob_cond[type]);  //通知preocess线程,执行任务
    pthread_mutex_unlock(&bio_mutex[type]);  //解锁
}

bioProcessBackgroundJobs

void *bioProcessBackgroundJobs(void *arg) {
    // 使进程可以被手动kill
    pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
    pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL); 
    // 加锁 确保不会有两个进程使用pthread_cond_wait监听同一个锁
    pthread_mutex_lock(&bio_mutex[type]);
    while(1) {
        listNode *ln;
        /* The loop always starts with the lock hold. */
        if (listLength(bio_jobs[type]) == 0) {
            pthread_cond_wait(&bio_newjob_cond[type],&bio_mutex[type]); // 等待bioCreateBackgroundJob通知解锁
            continue;
        }
        //取队列中第一个任务
        ln = listFirst(bio_jobs[type]);
        job = ln->value;
        /* It is now possible to unlock the background system as we know have
         * a stand alone job structure to process.*/
        pthread_mutex_unlock(&bio_mutex[type]); //解锁
        // 根据type执行任务
        // do somethings...
        pthread_cond_broadcast(&bio_step_cond[type]); // 广播解锁,用于解bioWaitStepOfType中的锁, 接触阻塞。
        pthread_mutex_lock(&bio_mutex[type]); // 为下面的操作加锁,且用于下一个循环的pthread_cond_wait阻塞。
        listDelNode(bio_jobs[type],ln);  // 操作bio_jobs 和 bio_pending  标志这个任务已完成。
        bio_pending[type]--;
    }
}

pthread

整个BIO就是通过锁进行的阻塞后台IO。 如果我们梳理一下这个锁过程:

  1. bioInit,新建线程,执行bioProcessBackgroundJobs。
  2. bioProcessBackgroundJobs 中,pthread_mutex_lock(&bio_mutex[type]),给该任务的锁变量加锁。
  3. 进入while循环, 调用pthread_cond_wait, 等待解锁。 由于mutex锁是“sleep-lock”,线程会sleep,等待唤醒。
  4. 主线程调用创建BIO任务, 调用bioCreateBackgroundJob。
  5. bioCreateBackgroundJob中 pthread_mutex_lock(&bio_mutex[type]); 又对bio_mutex[type]加锁
  6. bioCreateBackgroundJob中pthread_cond_signal(&bio_newjob_cond[type]) //发送信号,通知BIO线程继续执行。
  7. bioCreateBackgroundJob中pthread_mutex_unlock(&bio_mutex[type]); //解锁
  8. bioProcessBackgroundJobs 中被唤醒继续进行。
  9. 执行任务完毕后,pthread_mutex_unlock解锁, pthread_cond_broadcast广播解锁。
  10. 再pthread_mutex_lock加锁 。 用于下一次while循环。

在梳理的时候,我发现一个奇怪的地方,我们第2步在BIO线程中加锁,第5步调用bioCreateBackgroundJob在主线程中又对mutex进行了一次加锁。 而在他们之间并没有pthread_mutex_unlock执行。 为什么bioCreateBackgroundJob没有被mutex的锁阻塞?
一切的关键都在pthread_cond_wait这个函数中。 按照我原来的理解,pthread_cond_wait应该只是进行了一次信号等待, 等到某个信号后,将mutex[type]解锁。 为什么在信号发送前,pthread_mutex_lock没有将主线程的bioCreateBackgroundJob阻塞住。 所以我猜测, pthread_cond_wait不不仅仅是一次wait signal,而是unlock+wait。
为了验证这个猜想,我们进去看pthread_cond_wait的实现:
glibc中的pthread_cond_wait

// line 93
int __pthread_cond_wait (cond, mutex)
// line 110
  err = __pthread_mutex_unlock_usercnt (mutex, 0);  //解锁mutex
do {
// line 155    
lll_futex_wait (&cond->__data.__futex, futex_val, pshared);  // wait signal
} while (val == seq || cond->__data.__woken_seq == val);
// line 193
return __pthread_mutex_cond_lock (mutex);

可以看到, pthread_cond_wait 实际上就是一次 Unlock -> Wait -> Lock。

  1. Redis源码
  2. Redis源码注释

数据存储与数据库 阿里技术协会 云数据库Redis版

作者

萧元
TA的文章

相关文章