MySQL · RocksDB · Memtable flush分析

本文涉及的产品
云原生数据库 PolarDB MySQL 版,Serverless 5000PCU 100GB
云原生数据库 PolarDB 分布式版,标准版 2核8GB
云数据库 RDS MySQL Serverless,0.5-2RCU 50GB
简介: 概述首先我们知道在RocksDB中,最终数据的持久化都是保存在SST中,而SST则是由Memtable刷新到磁盘生成的,因此这次我们就主要来分析在RocksDB中何时以及如何来Flush内存数据(memtable)到SST.

概述

首先我们知道在RocksDB中,最终数据的持久化都是保存在SST中,而SST则是由Memtable刷新到磁盘生成的,因此这次我们就主要来分析在RocksDB中何时以及如何来Flush内存数据(memtable)到SST.

简单来说在RocksDB中,每一个ColumnFamily都有自己的Memtable,当Memtable超过固定大小之后(或者WAL文件超过限制),它将会被设置为immutable,然后会有后台的线程启动来刷新这个immutable memtable到磁盘(SST).

相关设置

  1. write_buffer_size 表示每个columnfamily的memtable的大小限制
  2. db_write_buffer_size 总的memtable的大小限制(所有的ColumnFamily).
  3. max_write_buffer_number 最大的memtable的个数
  4. min_write_buffer_number_to_merge 表示最小的可以被flush的memtable的个数

Flush Memtable的触发条件

在下面这几种条件下RocksDB会flush memtable到磁盘.

  1. 当某一个memtable的大小超过write_buffer_size.
  2. 当总的memtable的大小超过db_write_buffer_size.
  3. 当WAL文件的大小超过max_total_wal_size之后 最后一个条件的原因是,当WAL文件大小太大之后,我们需要清理WAL,因此此时我们需要将此WAL对应的数据都刷新到磁盘,也是刷新Memtable.

源码

首先在全局的DBImpl中包含了一个flush_queue_的队列,这个队列将会保存所有的将要被flush到磁盘的ColumnFamily.只有当当前的ColumnFamily满足flush条件(cfd->imm()->IsFlushPending())才会将此CF加入到flush队列.

class DBImpl {
................................
  std::deque<ColumnFamilyData*> flush_queue_;
...................
};

然后我们来看IsFlushPending的实现.这个函数的意思就是至少有一个memtable需要被flush.而MemTableList这个类则是保存了所有的immutable memtables.

bool MemTableList::IsFlushPending() const {
  if ((flush_requested_ && num_flush_not_started_ >= 1) ||
      (num_flush_not_started_ >= min_write_buffer_number_to_merge_)) {
    assert(imm_flush_needed.load(std::memory_order_relaxed));
    return true;
  }
  return false;
}

上面这几个变量的含义在注释中比较清楚, 而min_write_buffer_number_to_merge_就是min_write_buffer_number_to_merge.

  // the number of elements that still need flushing
  int num_flush_not_started_;

  // committing in progress
  bool commit_in_progress_;

  // Requested a flush of all memtables to storage
  bool flush_requested_;

可以看到在SchedulePendingFlush函数中,最终会将对应的ColumnFamily加入到flush queue中.

void DBImpl::SchedulePendingFlush(ColumnFamilyData* cfd,
                                  FlushReason flush_reason) {
  if (!cfd->queued_for_flush() && cfd->imm()->IsFlushPending()) {
    AddToFlushQueue(cfd, flush_reason);
    ++unscheduled_flushes_;
  }
}

而刷新MemTable到磁盘是一个后台线程来做的,这个后台线程叫做BGWorkFlush,最终这个函数会调用BackgroundFlush函数,而BackgroundFlush主要功能是在flush_queue_中找到一个ColumnFamily然后刷新它的memtable到磁盘.

Status DBImpl::BackgroundFlush(bool* made_progress, JobContext* job_context,
                               LogBuffer* log_buffer) {
................................
  while (!flush_queue_.empty()) {
    // This cfd is already referenced
    auto first_cfd = PopFirstFromFlushQueue();

    if (first_cfd->IsDropped() || !first_cfd->imm()->IsFlushPending()) {
      // can't flush this CF, try next one
      if (first_cfd->Unref()) {
        delete first_cfd;
      }
      continue;
    }

    // found a flush!
    cfd = first_cfd;
    break;
  }

  if (cfd != nullptr) {
....................................
    status = FlushMemTableToOutputFile(cfd, mutable_cf_options, made_progress,
                                       job_context, log_buffer);
    if (cfd->Unref()) {
      delete cfd;
    }
  }
  return status;
}

通过上面可以看到最终会调用FlushMemTableToOutputFile来刷新Memtable到磁盘,等到最后我们来分析这个函数.

而这个刷新线程的调用是在MaybeScheduleFlushOrCompaction函数中进行的。这里可以看到刷新县城的限制是在max_flushes中设置的.

void DBImpl::MaybeScheduleFlushOrCompaction() {
..........................................
  auto bg_job_limits = GetBGJobLimits();
  bool is_flush_pool_empty =
      env_->GetBackgroundThreads(Env::Priority::HIGH) == 0;
  while (!is_flush_pool_empty && unscheduled_flushes_ > 0 &&
         bg_flush_scheduled_ < bg_job_limits.max_flushes) {
    unscheduled_flushes_--;
    bg_flush_scheduled_++;
    env_->Schedule(&DBImpl::BGWorkFlush, this, Env::Priority::HIGH, this);
  }
...........................................
}

在RocksDB中,有一个SwitchMemtable函数,这个函数用来将现在的memtable改变为immutable,然后再新建一个memtable,也就是说理论上来说每一次内存的memtable被刷新到磁盘之前肯定会调用这个函数.而在实现中,每一次调用SwitchMemtable之后,都会调用对应immutable memtable的FlushRequested函数来设置对应memtable的flush_requeseted_, 并且会调用上面的SchedulePendingFlush来将对应的ColumnFamily加入到flush_queue_队列中.因此这里我们就通过这几个函数的调用栈来分析RocksDB中何时会触发flush操作.

在RocksDB中会有四个地方会调用SwitchMemtable,分别是:

  1. DbImpl::HandleWriteBufferFull
  2. DBImpl::SwitchWAL
  3. DBImpl::FlushMemTable
  4. DBImpl::ScheduleFlushes

接下来我们就来一个个分析这几个函数.

先来看HandleWriteBufferFull.这个函数主要是处理所有ColumnFamily的memtable内存超过限制的情况.可以看到它会调用SwitchMemtable然后再将对应的cfd加入到flush_queue_,最后再来调用后台刷新线程.

Status DBImpl::HandleWriteBufferFull(WriteContext* write_context) {
...................................
  for (auto cfd : *versions_->GetColumnFamilySet()) {
...............................
  if (cfd_picked != nullptr) {
    status = SwitchMemtable(cfd_picked, write_context,
                            FlushReason::kWriteBufferFull);
    if (status.ok()) {
      cfd_picked->imm()->FlushRequested();
      SchedulePendingFlush(cfd_picked, FlushReason::kWriteBufferFull);
      MaybeScheduleFlushOrCompaction();
    }
  }
  return status;
}

这个函数的调用是在是在写WAL之前,也就是每次写WAL都会进行这个判断.

Status DBImpl::PreprocessWrite(const WriteOptions& write_options,
                               bool* need_log_sync,
                               WriteContext* write_context) {
..........................................
  if (UNLIKELY(status.ok() && write_buffer_manager_->ShouldFlush())) {
    // Before a new memtable is added in SwitchMemtable(),
    // write_buffer_manager_->ShouldFlush() will keep returning true. If another
    // thread is writing to another DB with the same write buffer, they may also
    // be flushed. We may end up with flushing much more DBs than needed. It's
    // suboptimal but still correct.
    status = HandleWriteBufferFull(write_context);
  }
........................................
}

可以看到会调用write_buffer的shouldflush来判断是否处理bufferfull.而这个函数很简单,就是判断memtable所使用的内存是否已经超过限制.

  // Should only be called from write thread
  bool ShouldFlush() const {
    if (enabled()) {
      if (mutable_memtable_memory_usage() > mutable_limit_) {
        return true;
      }
      if (memory_usage() >= buffer_size_ &&
          mutable_memtable_memory_usage() >= buffer_size_ / 2) {
        // If the memory exceeds the buffer size, we trigger more aggressive
        // flush. But if already more than half memory is being flushed,
        // triggering more flush may not help. We will hold it instead.
        return true;
      }
    }
    return false;
  }

而mutable_limit_和buffer_size_的初始化在这里,这里buffer_size_就是db_write_buffer_size这个可配置的选项.

WriteBufferManager::WriteBufferManager(size_t _buffer_size,
                                       std::shared_ptr<Cache> cache)
    : buffer_size_(_buffer_size),
      mutable_limit_(buffer_size_ * 7 / 8),

然后我们来看mutable_memtable_memory_usage和memory_usage,这两个函数用来返回整体的write_buffer所使用的内存(memory_used_)以及将要被释放的内存(memory_active_),比如一个memory table被标记为immutable,则表示这块内存将要被释放.

  // Only valid if enabled()
  size_t memory_usage() const {
    return memory_used_.load(std::memory_order_relaxed);
  }
  size_t mutable_memtable_memory_usage() const {
    return memory_active_.load(std::memory_order_relaxed);
  }

然后我们来看SwitchWAL,流程和上面的HandleWriteBufferFull基本一致.

Status DBImpl::SwitchWAL(WriteContext* write_context) {
...............................................
  for (auto cfd : *versions_->GetColumnFamilySet()) {
    if (cfd->IsDropped()) {
      continue;
    }
    if (cfd->OldestLogToKeep() <= oldest_alive_log) {
      status = SwitchMemtable(cfd, write_context);
      if (!status.ok()) {
        break;
      }
      cfd->imm()->FlushRequested();
      SchedulePendingFlush(cfd, FlushReason::kWriteBufferManager);
    }
  }
  MaybeScheduleFlushOrCompaction();
  return status;
}

这个函数被调用比较简单,就是判断是否WAL的大小是否已经超过了设置的wal大小(max_total_wal_size).可以看到它的调用也是在每次写WAL之前.

Status DBImpl::PreprocessWrite(const WriteOptions& write_options,
                               bool* need_log_sync,
                               WriteContext* write_context) {
.................................................
  if (UNLIKELY(status.ok() && !single_column_family_mode_ &&
               total_log_size_ > GetMaxTotalWalSize())) {
    status = SwitchWAL(write_context);
  }

然后是FlushMemTable,这个函数用来强制刷新刷新memtable到磁盘,比如用户直接调用Flush接口.可以看到和上面的集中情况基本一致,switchmemtable->flushrequested->maybescheduleflushorcompaction.

Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
                             const FlushOptions& flush_options,
                             FlushReason flush_reason, bool writes_stopped) {
  Status s;
  uint64_t flush_memtable_id = 0;
  {
.........................................

    // SwitchMemtable() will release and reacquire mutex during execution
    s = SwitchMemtable(cfd, &context);
    flush_memtable_id = cfd->imm()->GetLatestMemTableID();

    if (!writes_stopped) {
      write_thread_.ExitUnbatched(&w);
    }

    cfd->imm()->FlushRequested();

    // schedule flush
    SchedulePendingFlush(cfd, flush_reason);
    MaybeScheduleFlushOrCompaction();
  }
...........................
  return s;
}

最后我们来看最后一种情况,这种情况和前面三种有一个最大的区别就是前面三种情况的出现都是需要立即调用flush线程来刷新memtable到磁盘,而还有一种情况则是没那么紧急的情况,也就是说可以等到后面某个时间段再调用flush线程来刷新内容到磁盘.

在这种情况下,每一个memtable都会有一个状态叫做flush_state_,而每个memtable都有可能有三种状态.而状态的更新是通过UpdateFlushState来进行的.这里可以推测的到这些都是对于单个memtable的限制.

  enum FlushStateEnum { FLUSH_NOT_REQUESTED, FLUSH_REQUESTED, FLUSH_SCHEDULED };

void MemTable::UpdateFlushState() {
  auto state = flush_state_.load(std::memory_order_relaxed);
  if (state == FLUSH_NOT_REQUESTED && ShouldFlushNow()) {
    // ignore CAS failure, because that means somebody else requested
    // a flush
    flush_state_.compare_exchange_strong(state, FLUSH_REQUESTED,
                                         std::memory_order_relaxed,
                                         std::memory_order_relaxed);
  }
}

而UpdateFlushState什么时候会被调用呢,很简单,就是当你每次操作memtable的时候,比如update/add这些操作.

可以看到当shoudflushnow之后,将会设置flush_state_状态为FLUSH_REQUESTED,也就是此memtable将会被flush.

然后来看shouldflushnow函数,这个函数主要的判断就是判断是否当前MemTable的内存使用是否超过了write_buffer_size,如果超过了,那么就返回true.

bool MemTable::ShouldFlushNow() const {
  size_t write_buffer_size = write_buffer_size_.load(std::memory_order_relaxed);
  const double kAllowOverAllocationRatio = 0.6;

  // If arena still have room for new block allocation, we can safely say it
  // shouldn't flush.
  auto allocated_memory = table_->ApproximateMemoryUsage() +
                          range_del_table_->ApproximateMemoryUsage() +
                          arena_.MemoryAllocatedBytes();

  // if we can still allocate one more block without exceeding the
  // over-allocation ratio, then we should not flush.
  if (allocated_memory + kArenaBlockSize <
      write_buffer_size + kArenaBlockSize * kAllowOverAllocationRatio) {
    return false;
  }

  // if user keeps adding entries that exceeds write_buffer_size, we need to
  // flush earlier even though we still have much available memory left.
  if (allocated_memory >
      write_buffer_size + kArenaBlockSize * kAllowOverAllocationRatio) {
    return true;
  }

  return arena_.AllocatedAndUnused() < kArenaBlockSize / 4;
}

然后我们来看当设置了flush_state_状态之后,会做什么操作.对应的MEmtable有一个ShouldScheduleFlush函数,这个函数用来返回当前的memtable是否已经被设置flush_requested状态位。

 bool ShouldScheduleFlush() const {
    return flush_state_.load(std::memory_order_relaxed) == FLUSH_REQUESTED;
  }

而这个函数会在checkmemtablefull中被调用,这个函数主要用来将已经设置flush_state_为flush_requested的memtable的状态改变为flush_schedule(意思就是已经进入flush的调度队列),然后将这个columnfamily加入到对应的调度队列.

  void CheckMemtableFull() {
    if (flush_scheduler_ != nullptr) {
      auto* cfd = cf_mems_->current();
      assert(cfd != nullptr);
      if (cfd->mem()->ShouldScheduleFlush() &&
          cfd->mem()->MarkFlushScheduled()) {
        // MarkFlushScheduled only returns true if we are the one that
        // should take action, so no need to dedup further
        flush_scheduler_->ScheduleFlush(cfd);
      }
    }
  }

其中MarkFlushScheduled就是用来改变状态.

  bool MarkFlushScheduled() {
    auto before = FLUSH_REQUESTED;
    return flush_state_.compare_exchange_strong(before, FLUSH_SCHEDULED,
                                                std::memory_order_relaxed,
                                                std::memory_order_relaxed);
  }

而ScheduleFlush则是比较重要的一个函数,就是用来将对应的CF加入到flush调度队列(FlushScheduler).

void FlushScheduler::ScheduleFlush(ColumnFamilyData* cfd) {
#ifndef NDEBUG
  std::lock_guard<std::mutex> lock(checking_mutex_);
  assert(checking_set_.count(cfd) == 0);
  checking_set_.insert(cfd);
#endif  // NDEBUG
  cfd->Ref();
// Suppress false positive clang analyzer warnings.
#ifndef __clang_analyzer__
  Node* node = new Node{cfd, head_.load(std::memory_order_relaxed)};
  while (!head_.compare_exchange_strong(
      node->next, node, std::memory_order_relaxed, std::memory_order_relaxed)) {
    // failing CAS updates the first param, so we are already set for
    // retry.  TakeNextColumnFamily won't happen until after another
    // inter-thread synchronization, so we don't even need release
    // semantics for this CAS
  }
#endif  // __clang_analyzer__
}

而checkmemtablefull会在下面三种条件下被调用

  1. delete操作
  2. put操作
  3. merge操作.

然后我们来看flushscheduler如何来调度flush线程.首先在每次写WAL之前都会调用PreprocessWrite,然后这个函数会判断flush_scheduler是否为空(也就是是否有已经满掉的memtable需要刷新到磁盘).

Status DBImpl::PreprocessWrite(const WriteOptions& write_options,
                               bool* need_log_sync,
                               WriteContext* write_context) {
..................................................................
  if (UNLIKELY(status.ok() && !flush_scheduler_.Empty())) {
    status = ScheduleFlushes(write_context);
  }

而在SscheduleFlushes中,则会遍历之前所有的需要被flush的memtable,然后调用switchMemtable来进行后续操作.这里要注意在SwitchMemtable也会触发调用flush线程.

Status DBImpl::ScheduleFlushes(WriteContext* context) {
  ColumnFamilyData* cfd;
  while ((cfd = flush_scheduler_.TakeNextColumnFamily()) != nullptr) {
    auto status = SwitchMemtable(cfd, context, FlushReason::kWriteBufferFull);
    if (cfd->Unref()) {
      delete cfd;
    }
    if (!status.ok()) {
      return status;
    }
  }
  return Status::OK();
}

刷新memtable到sst

在RocksDB中刷新是通过FlushJob这个类来实现的,整个实现还是比较简单.最终这里是调用WriteLevel0Table来刷新内容到磁盘。这里就不分析sst的格式了,需要了解具体格式的可以看RocksDB的wiki.

Status FlushJob::Run(LogsWithPrepTracker* prep_tracker,
                     FileMetaData* file_meta) {
...........................................
  // This will release and re-acquire the mutex.
  Status s = WriteLevel0Table();

  if (s.ok() &&
      (shutting_down_->load(std::memory_order_acquire) || cfd_->IsDropped())) {
    s = Status::ShutdownInProgress(
        "Database shutdown or Column family drop during flush");
  }

  if (!s.ok()) {
    cfd_->imm()->RollbackMemtableFlush(mems_, meta_.fd.GetNumber());
  } else {
    TEST_SYNC_POINT("FlushJob::InstallResults");
    // Replace immutable memtable with the generated Table
    s = cfd_->imm()->InstallMemtableFlushResults(
        cfd_, mutable_cf_options_, mems_, prep_tracker, versions_, db_mutex_,
        meta_.fd.GetNumber(), &job_context_->memtables_to_free, db_directory_,
        log_buffer_);
  }
........................................................
}
相关实践学习
基于CentOS快速搭建LAMP环境
本教程介绍如何搭建LAMP环境,其中LAMP分别代表Linux、Apache、MySQL和PHP。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助 &nbsp; &nbsp; 相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
目录
相关文章
|
8天前
|
关系型数据库 MySQL 索引
mysql 分析5语句的优化--索引添加删除
mysql 分析5语句的优化--索引添加删除
11 0
|
3月前
|
关系型数据库 MySQL Serverless
高顿教育:大数据抽数分析业务引入polardb mysql serverless
高顿教育通过使用polardb serverless形态进行数据汇总,然后统一进行数据同步到数仓,业务有明显高低峰期,灵活的弹性伸缩能力,大大降低了客户使用成本。
|
3月前
|
关系型数据库 MySQL 索引
【MySQL 解析】Hash索引和B+树索引对比分析
【1月更文挑战第11天】【MySQL 解析】Hash索引和B+树索引对比分析
|
3月前
|
SQL 关系型数据库 MySQL
mysql事务原理分析
mysql事务原理分析
26 0
|
3月前
|
缓存 固态存储 关系型数据库
MySQL性能优化指南:深入分析重做日志刷新到磁盘的机制
MySQL性能优化指南:深入分析重做日志刷新到磁盘的机制
|
3月前
|
SQL 监控 关系型数据库
MySQL Metadata Locking(MDL)机制的实现与获取机制分析
MySQL Metadata Locking(MDL)机制的实现与获取机制分析 为了满足数据库在并发请求下的事务隔离性和一致性要求,同时针对MySQL插件式多种存储引擎都能发挥作用,MySQL在Server层实现了 Metadata Locking(MDL)机制。这种机制可以灵活自定义锁的对象、锁的类型以及不同锁类型的优先级,甚至可以做到在系统不同状态时动态调整不同锁类型的兼容性。本篇文章将详细介绍MDL系统中的常用数据结构及含义,从实现角度讨论MDL的获取机制与死锁检测,以及在实践中如何监控MDL状态。
35 2
|
19天前
|
SQL 关系型数据库 MySQL
【MySQL技术专题】「问题实战系列」深入探索和分析MySQL数据库的数据备份和恢复实战开发指南(8.0版本升级篇)
【MySQL技术专题】「问题实战系列」深入探索和分析MySQL数据库的数据备份和恢复实战开发指南(8.0版本升级篇)
92 0
|
8天前
|
SQL 缓存 关系型数据库
mysql性能优化-慢查询分析、优化索引和配置
mysql性能优化-慢查询分析、优化索引和配置
75 0
|
14天前
|
缓存 关系型数据库 MySQL
MySQL 查询优化:提速查询效率的13大秘籍(索引设计、查询优化、缓存策略、子查询优化以及定期表分析和优化)(中)
MySQL 查询优化:提速查询效率的13大秘籍(索引设计、查询优化、缓存策略、子查询优化以及定期表分析和优化)(中)
|
16天前
|
SQL 关系型数据库 MySQL
【MySQL】慢SQL分析流程
【4月更文挑战第1天】【MySQL】慢SQL分析流程

相关产品

  • 云数据库 RDS MySQL 版
  • 推荐镜像

    更多