hbase源码系列(七)Snapshot的过程

简介: 本文讲解Snapshot的过程 ,在看这一章之前,建议大家先去看一下snapshot的使用。
在看这一章之前,建议大家先去看一下 snapshot的使用。可能有人会有疑问为什么要做Snapshot,hdfs不是自带了3个备份吗,这是个很大的误区,要知道hdfs的3个备份是用于防止网络传输中的失败或者别的异常情况导致数据块丢失或者不正确,它不能避免人为的删除数据导致的后果。它就想是给数据库做备份,尤其是做删除动作之前,不管是hbase还是hdfs,请经常做Snapshot,否则哪天手贱了。。。

直接进入主题吧,上代码。

public void takeSnapshot(SnapshotDescription snapshot) throws IOException {
    // 清空之前完成的备份和恢复的任务
    cleanupSentinels();

    // 设置snapshot的版本
    snapshot = snapshot.toBuilder().setVersion(SnapshotDescriptionUtils.SNAPSHOT_LAYOUT_VERSION)
        .build();

    // if the table is enabled, then have the RS run actually the snapshot work
    TableName snapshotTable = TableName.valueOf(snapshot.getTable());
    AssignmentManager assignmentMgr = master.getAssignmentManager();
    //根据表的状态选择snapshot的类型
    if (assignmentMgr.getZKTable().isEnabledTable(snapshotTable)) {
      snapshotEnabledTable(snapshot);
    }
    // 被禁用的表走这个方法
    else if (assignmentMgr.getZKTable().isDisabledTable(snapshotTable)) {
      snapshotDisabledTable(snapshot);
    } else {
      throw new SnapshotCreationException("Table is not entirely open or closed", tpoe, snapshot);
    }
  }
从代码上看得出来,启用的表和被禁用的表走的是两个不同的方法。

Snapshot启用的表

先看snapshotEnabledTable方法吧,看看在线的表是怎么备份的。

private synchronized void snapshotEnabledTable(SnapshotDescription snapshot)
      throws HBaseSnapshotException {
    // snapshot准备工作
    prepareToTakeSnapshot(snapshot);
    
    // new一个handler
    EnabledTableSnapshotHandler handler =
        new EnabledTableSnapshotHandler(snapshot, master, this);
    //通过handler线程来备份
    snapshotTable(snapshot, handler);
 }
这里就两步,先去看看snapshot前的准备工作吧,F3进入prepareToTakeSnapshot方法。这个方法里面也没干啥,就是检查一下是否可以对这个表做备份或者恢复的操作,然后就会重建这个工作目录,这个工作目录在.hbase-snapshot/.tmps下面,每个snapshot都有自己的目录。

在snapshotTable里面把就线程提交一下,让handler来处理了。

handler.prepare();
this.executorService.submit(handler);
this.snapshotHandlers.put(TableName.valueOf(snapshot.getTable()), handler);
这些都不是重点,咱到handler那边去看看吧,EnabledTableSnapshotHandler是继承TakeSnapshotHandler的,prepare方法和process方法都一样,区别在于snapshotRegions方法被重写了。

看prepare方法还是检查表的定义文件在不在,我们直接进入process方法吧。

// 把snapshot的信息写入到工作目录
      SnapshotDescriptionUtils.writeSnapshotInfo(snapshot, workingDir, this.fs);
      // 开一个线程去复制表信息文件
      new TableInfoCopyTask(monitor, snapshot, fs, rootDir).call();
      monitor.rethrowException();
      //查找该表相关的region和位置
      List<Pair<HRegionInfo, ServerName>> regionsAndLocations =
          MetaReader.getTableRegionsAndLocations(this.server.getCatalogTracker(),
              snapshotTable, false);

      // 开始snapshot
      snapshotRegions(regionsAndLocations);

      // 获取serverNames列表,后面的校验snapshot用到
      Set<String> serverNames = new HashSet<String>();
      for (Pair<HRegionInfo, ServerName> p : regionsAndLocations) {
        if (p != null && p.getFirst() != null && p.getSecond() != null) {
          HRegionInfo hri = p.getFirst();
          if (hri.isOffline() && (hri.isSplit() || hri.isSplitParent())) continue;
          serverNames.add(p.getSecond().toString());
        }
      }

      // 检查snapshot是否合格
      status.setStatus("Verifying snapshot: " + snapshot.getName());
      verifier.verifySnapshot(this.workingDir, serverNames);
// 备份完毕之后,把临时目录转移到正式的目录
      completeSnapshot(this.snapshotDir, this.workingDir, this.fs);
1、写一个.snapshotinfo文件到工作目录下

2、把表的定义信息写一份到工作目录下,即.tabledesc文件

3、查找和表相关的Region Server和机器

4、开始备份

5、检验snapshot的结果

6、确认没问题了,就把临时目录rename到正式目录

  

我们直接到备份这一步去看吧,方法在EnabledTableSnapshotHandler里面,重写了。

// 用分布式事务来备份在线的,太强悍了
    Procedure proc = coordinator.startProcedure(this.monitor, this.snapshot.getName(),
      this.snapshot.toByteArray(), Lists.newArrayList(regionServers));
    try {
      // 等待完成
      proc.waitForCompleted();
    // 备份split过的region
      Path snapshotDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(snapshot, rootDir);
      for (Pair<HRegionInfo, ServerName> region : regions) {
        HRegionInfo regionInfo = region.getFirst();
        if (regionInfo.isOffline() && (regionInfo.isSplit() || regionInfo.isSplitParent())) {
          if (!fs.exists(new Path(snapshotDir, regionInfo.getEncodedName()))) {
            LOG.info("Take disabled snapshot of offline region=" + regionInfo);
            snapshotDisabledRegion(regionInfo);
          }
        }
      }
这里用到一个分布式事务,这里被我叫做分布式事务,我也不知道它是不是事务,但是Procedure这个词我真的不好翻译,叫过程也不合适。

分布式事务

我们进入ProcedureCoordinator的startProcedure看看吧。

Procedure proc = createProcedure(fed, procName, procArgs,expectedMembers);
if (!this.submitProcedure(proc)) {
      LOG.error("Failed to submit procedure '" + procName + "'");
      return null;
}

先创建Procedure,然后提交它,这块没什么特别的,继续深入进去submitProcedure方法也找不到什么有用的信息,我们得回到Procedure类里面去,它是一个Callable的类,奥秘就在call方法里面。

final public Void call() {
    try {
      //在acquired节点下面建立实例节点
      sendGlobalBarrierStart();

      // 等待所有的rs回复
      waitForLatch(acquiredBarrierLatch, monitor, wakeFrequency, "acquired");

      //在reached节点下面建立实例节点
      sendGlobalBarrierReached();

      //等待所有的rs回复
      waitForLatch(releasedBarrierLatch, monitor, wakeFrequency, "released");

    } finally {
      sendGlobalBarrierComplete();
      completedLatch.countDown();
    }
}

从sendGlobalBarrierStart开始看吧,里面就一句话。

coord.getRpcs().sendGlobalBarrierAcquire(this, args, Lists.newArrayList(this.acquiringMembers));

再追杀下去。

final public void sendGlobalBarrierAcquire(Procedure proc, byte[] info, List<String> nodeNames)
      throws IOException, IllegalArgumentException {
    String procName = proc.getName();
    // 获取abort节点的名称
    String abortNode = zkProc.getAbortZNode(procName);
    try {
      // 如果存在abort节点,就广播错误,中断该过程
      if (ZKUtil.watchAndCheckExists(zkProc.getWatcher(), abortNode)) {
        abort(abortNode);
      }
    } catch (KeeperException e) {throw new IOException("Failed while watching abort node:" + abortNode, e);
    }

    // 获得acquire节点名称
    String acquire = zkProc.getAcquiredBarrierNode(procName);
  try {
      // 创建acquire节点,info信息是Snapshot的信息,包括表名
      byte[] data = ProtobufUtil.prependPBMagic(info); 
      ZKUtil.createWithParents(zkProc.getWatcher(), acquire, data);
    // 监控acquire下面的节点,发现指定的节点,就报告给coordinator
      for (String node : nodeNames) {
        String znode = ZKUtil.joinZNode(acquire, node);if (ZKUtil.watchAndCheckExists(zkProc.getWatcher(), znode)) {
          coordinator.memberAcquiredBarrier(procName, node);
        }
      }
    } catch (KeeperException e) {
      throw new IOException("Failed while creating acquire node:" + acquire, e);
    }
  }
1、首先是检查abortNode ,什么是abortNode ?每个procName在zk下面都有一个对应的节点,比如snapshot,然后在procName下面又分了acquired、reached、abort三个节点。检查abort节点下面有没有当前的实例。

2、在acquired节点为该实例创建节点,创建实例节点的时候,把SnapshotDescription的信息(在EnabledTableSnapshotHandler类里面通过this.snapshot.toByteArray()传进去的)放了进去,创建完成之后,在该实例节点下面监控各个Region Server的节点。如果发现已经有了,就更新Procedure中的acquiringMembers列表和inBarrierMembers,把节点从

acquiringMembers中删除,然后添加到inBarrierMembers列表当中。

3、到这一步服务端的工作就停下来了,等到所有RS接收到指令之后通过实例节点当中保存的表信息找到相应的region创建子过程,子过程在acquired节点下创建节点。

4、收到所有RS的回复之后,它才会开始在reached节点创建实例节点,然后继续等待。

5、RS完成任务之后,在reached的实例节点下面创建相应的节点,然后回复。

6、在确定所有的RS都完成工作之后,清理zk当中的相应proName节点。

注意:在这个过程当中,有任务的错误,都会在abort节点下面建立该实例的节点,RS上面的子过程一旦发现abort存在该节点的实例,就会取消该过程。

 Snapshot这块在Region Server是由RegionServerSnapshotManager类里面的ProcedureMemberRpcs负责监测snapshot下面的节点变化,当发现acquired下面有实例之后,启动新任务。

public ZKProcedureMemberRpcs(final ZooKeeperWatcher watcher, final String procType)
      throws KeeperException {
    this.zkController = new ZKProcedureUtil(watcher, procType) {
      @Override
      public void nodeCreated(String path) {
        if (!isInProcedurePath(path)) {
          return;
        }

        String parent = ZKUtil.getParent(path);
        // if its the end barrier, the procedure can be completed
        if (isReachedNode(parent)) {
          receivedReachedGlobalBarrier(path);
          return;
        } else if (isAbortNode(parent)) {
          abort(path);
          return;
        } else if (isAcquiredNode(parent)) {
          startNewSubprocedure(path);
        } else {
          LOG.debug("Ignoring created notification for node:" + path);
        }
      }

     };
  }

这块折叠起来,不是咱们的重点,让大家看看而已。我们直接进入Subprocedure这个类里面看看吧。

final public Void call() {
    try {
      // 目前是什么也没干
      acquireBarrier();
      // 在acquired的实例节点下面建立rs的节点
      rpcs.sendMemberAcquired(this);
      // 等待reached的实例节点的建立
      waitForReachedGlobalBarrier();
     // 干活
      insideBarrier();
      // 活干完了
      rpcs.sendMemberCompleted(this);
     } catch (Exception e) {
     } finally {
          releasedLocalBarrier.countDown();
     }
}
insideBarrier的实现在FlushSnapshotSubprocedure这个类里面,调用了flushSnapshot(),这个方法给每个region都开一个线程去提交。
for (HRegion region : regions) {
      taskManager.submitTask(new RegionSnapshotTask(region));
 }

Snapshot在线的region

我们接下来看看RegionSnapshotTask的call方法。

public Void call() throws Exception {
      // 上锁,暂时不让读了
      region.startRegionOperation();
      try {
        region.flushcache();
        region.addRegionToSnapshot(snapshot, monitor);
      } finally {
        LOG.debug("Closing region operation on " + region);
        region.closeRegionOperation();
      }
      return null;
    }
}
在对region操作之前,先上锁,不让读了。然后就flushCache,这个方法很大,也好难懂哦,不过我们还是要迎接困难上,我折叠起来吧,想看的就看,不想看的就看我下面的写的步骤吧。
MultiVersionConsistencyControl.WriteEntry w = null;
    this.updatesLock.writeLock().lock();
    long flushsize = this.memstoreSize.get();
    List<StoreFlushContext> storeFlushCtxs = new ArrayList<StoreFlushContext>(stores.size());
    long flushSeqId = -1L;
    //先flush日志,再flush memstore到文件
    try {
      // Record the mvcc for all transactions in progress.
      w = mvcc.beginMemstoreInsert();
      mvcc.advanceMemstore(w);

      if (wal != null) {
        //准备flush日志,进入等待flush的队列,这个startSeqId很重要,在恢复的时候就靠它了,它之前的日志就是已经flush过了,不用恢复
        Long startSeqId = wal.startCacheFlush(this.getRegionInfo().getEncodedNameAsBytes());
        if (startSeqId == null) {
          return false;
        }
        flushSeqId = startSeqId.longValue();
      } else {
        flushSeqId = myseqid;
      }

      for (Store s : stores.values()) {
        storeFlushCtxs.add(s.createFlushContext(flushSeqId));
      }

      // 给MemStore做个snapshot,它的内部是两个队列,实际是从一个经常访问的队列放到另外一个不常访问的队列,那个队列名叫snapshot
      for (StoreFlushContext flush : storeFlushCtxs) {
        flush.prepare();
      }
    } finally {
      this.updatesLock.writeLock().unlock();
    }
    // 同步未flush的日志到硬盘上
    if (wal != null && !shouldSyncLog()) {
      wal.sync();
    }

    // 等待日志同步完毕
    mvcc.waitForRead(w);
    boolean compactionRequested = false;
    try {//把memstore中的keyvalues全部flush到storefile保存在临时目录当中,把flushSeqId追加到storefile里
      for (StoreFlushContext flush : storeFlushCtxs) {
        flush.flushCache(status);
      }
      // 把之前生成在临时目录的文件转移到正式目录
      for (StoreFlushContext flush : storeFlushCtxs) {
        boolean needsCompaction = flush.commit(status);
        if (needsCompaction) {
          compactionRequested = true;
        }
      }
      storeFlushCtxs.clear();
      // flush之后,就要减掉相应的memstore的大小
      this.addAndGetGlobalMemstoreSize(-flushsize);
1、获取WAL日志的flushId(要写入到hfile当中,以后恢复的时候,要拿日志的flushId和hfile的flushId对比,小于hfile的flushId的就不用恢复了)

2、给MemStore的做snapshot,从kvset集合转移到snapshot集合

3、同步日志,写入到硬盘

4、把MemStore的的snapshot集合当中的内容写入到hfile当中,MemStore当中保存的是KeyValue的集合,写入其实就是一个循环,调用StoreFile.Writer的append方法追加,具体的可以看我的那篇博客《非mapreduce生成Hfile,然后导入hbase当中》

5、上一步的生成的文件是保存在临时目录中的,转移到正式的目录当中

6、更新MemStore当中的大小

好,我们继续看addRegionToSnapshot方法,好累啊,尼玛,这么多步骤。

public void addRegionToSnapshot(SnapshotDescription desc,
      ForeignExceptionSnare exnSnare) throws IOException {// 获取工作目录
    Path rootDir = FSUtils.getRootDir(this.rsServices.getConfiguration());
    Path snapshotDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(desc, rootDir);

    // 1. 在工作目录创建region目录和写入region的信息
    HRegionFileSystem snapshotRegionFs = HRegionFileSystem.createRegionOnFileSystem(conf,
        this.fs.getFileSystem(), snapshotDir, getRegionInfo());

    // 2. 为hfile创建引用
    for (Store store : stores.values()) {
      // 2.1. 分列族为store创建引用目录,每个store属于不同的列族
      Path dstStoreDir = snapshotRegionFs.getStoreDir(store.getFamily().getNameAsString());
      List<StoreFile> storeFiles = new ArrayList<StoreFile>(store.getStorefiles());// 2.2. 遍历hfile,然后创建引用
      int sz = storeFiles.size();
      for (int i = 0; i < sz; i++) {
        StoreFile storeFile = storeFiles.get(i);
        Path file = storeFile.getPath();

        Path referenceFile = new Path(dstStoreDir, file.getName());
        boolean success = true;
        if (storeFile.isReference()) {
          // 把旧的引用文件的内容写入到新的引用文件当中
          storeFile.getFileInfo().getReference().write(fs.getFileSystem(), referenceFile);
        } else {
          // 创建一个空的引用文件
          success = fs.getFileSystem().createNewFile(referenceFile);
        }
        if (!success) {
          throw new IOException("Failed to create reference file:" + referenceFile);
        }
      }
    }
  }
在工作目录在.hbase-snapshot/.tmps/snapshotName/region/familyName/下面给hfile创建引用文件。在创建引用文件的时候,还要先判断一下这个所谓的hfile是不是真的hfile,还是它本身就是一个引用文件了。

如果已经是引用文件的话,把旧的引用文件里面的内容写入到新的引用文件当中。

如果是一个正常的hfile的话,就创建一个空的引用文件即可,以后我们可以通过它的名字找到它在snapshot下面相应的文件。

okay,到这里,每个RS的工作都完成了。

备份split过的region

完成执行分布式事务,就是备份split过的region了,把之前的代码再贴一次吧,折叠起来,需要的自己看。

if (regionInfo.isOffline() && (regionInfo.isSplit() || regionInfo.isSplitParent())) {
    if (!fs.exists(new Path(snapshotDir, regionInfo.getEncodedName()))) {
          LOG.info("Take disabled snapshot of offline region=" + regionInfo);
          snapshotDisabledRegion(regionInfo);
     }
}

protected void snapshotDisabledRegion(final HRegionInfo regionInfo)
      throws IOException {
    // 创建新的region目录和region信息
    HRegionFileSystem regionFs = HRegionFileSystem.createRegionOnFileSystem(conf, fs,
      workingDir, regionInfo);

     // 把region下的recovered.edits目录的文件复制snapshot的对应region下面
    Path regionDir = HRegion.getRegionDir(rootDir, regionInfo);
    Path snapshotRegionDir = regionFs.getRegionDir();
    new CopyRecoveredEditsTask(snapshot, monitor, fs, regionDir, snapshotRegionDir).call();
    
    // 给每个列族的下面的文件创建引用,所谓引用就是一个同名的空文件
    new ReferenceRegionHFilesTask(snapshot, monitor, regionDir, fs, snapshotRegionDir).call();
}
备份启用的表,现在已经结束了,但是备份禁用的表吧,前面说了区别是snapshotRegions方法,但是方法除了做一些准备工作之外,就是snapshotDisabledRegion。。。。所以snapshot到这里就完了,下面我们 再回顾一遍吧

1、进行snapshot之前的准备,创建目录,复制一些必要的信息文件等。

2、对于启用的表,启动分布式事务,RS接到任务,flush掉WAL日志和MemStore的数据,写入文件。

3、为hfile创建引用文件,这里的引用文件居然是空的文件,而且名字一样,它不是真的备份hfile,这是什么回事呢?这个要到下一章,从snapshot中恢复,才能弄明白了,这个和hbase的归档文件机制有关系,hbase删除文件的时候,不是直接删除,而是把它先放入archive文件夹内。

相关实践学习
云数据库HBase版使用教程
&nbsp; 相关的阿里云产品:云数据库 HBase 版 面向大数据领域的一站式NoSQL服务,100%兼容开源HBase并深度扩展,支持海量数据下的实时存储、高并发吞吐、轻SQL分析、全文检索、时序时空查询等能力,是风控、推荐、广告、物联网、车联网、Feeds流、数据大屏等场景首选数据库,是为淘宝、支付宝、菜鸟等众多阿里核心业务提供关键支撑的数据库。 了解产品详情:&nbsp;https://cn.aliyun.com/product/hbase &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
目录
相关文章
|
分布式数据库 Hbase
|
分布式数据库 Hbase
HBase 源码解析
HBase Read读流程源码解析&HBase Write写流程源码解析 &HBase Flush & Compact流程源码解析
4606 0
|
Java 分布式数据库 Ruby
HBase Filter 过滤器之 Comparator 原理及源码学习
HBase所有的比较器实现类都继承于父类ByteArrayComparable,而ByteArrayComparable又实现了Comparable接口;不同功能的比较器差别在于对父类compareTo()方法的重写逻辑不同。 下面分别对HBase Filter默认实现的七大比较器一一进行介绍。 1. BinaryComparator 介绍:二进制比较器,用于按字典顺序比较指定字节数组。 先看一个小例子: public class BinaryComparatorDemo { public static void main(String[] args) {
429 0
|
分布式数据库 Hbase
hbase snapshot源码分析
snapshot操作在硬盘上形式: /hbase/.snapshots /.tmp &lt;---- working directory /[snapshot name] &lt;---...
1338 0
|
分布式数据库 Hbase 分布式计算
hbase源码系列(十五)终结篇&Scan续集-->如何查询出来下一个KeyValue
这是这个系列的最后一篇了,实在没精力写了,本来还想写一下hbck的,这个东西很常用,当hbase的Meta表出现错误的时候,它能够帮助我们进行修复,无奈看到3000多行的代码时,退却了,原谅我这点自私的想法吧。
3380 0
|
Java
hbase源码系列(十四)Compact和Split
本文介绍hbase中的Compact和Split。
4011 0
|
缓存 固态存储 分布式数据库
hbase源码系列(十三)缓存机制MemStore与Block Cache
这一章讲hbase的缓存机制,这里面涉及的内容也是比较多,呵呵,我理解中的缓存是保存在内存中的特定的便于检索的数据结构就是缓存。
2957 0
|
分布式数据库 Hbase
hbase源码系列(十二)Get、Scan在服务端是如何处理?
继上一篇讲了Put和Delete之后,这一篇我们讲Get和Scan, 因为我发现这两个操作几乎是一样的过程,就像之前的Put和Delete一样,上一篇我本来只打算写Put的,结果发现Delete也可以走这个过程,所以就一起写了。
3488 0
|
分布式数据库 Hbase
hbase源码系列(十一)Put、Delete在服务端是如何处理?
在讲完之后HFile和HLog之后,今天我想分享是Put在Region Server经历些了什么?
2116 0
|
分布式数据库 Hbase
hbase源码系列(十)HLog与日志恢复
hbase在写入数据之前会先写入MemStore,成功了再写入HLog,当MemStore的数据丢失的时候,还可以用HLog的数据来进行恢复。本文将介绍“HLog与日志恢复”。
2028 0