HBase2 使用协处理器删除指定qualifier的全部数据

简介:

用户画像的场景中,通常会开发很多标签,每个标签作为一个qualifier,其中有一些不再使用后需要下线,但hbase提供的delete相关api都只能针对单行,要清理某个qualifier的全部数据不太容易,这里提供一个基于协处理器的实现方案;

hbase对于compact过程提供了以下5个hook可以嵌入自定义代码:

  • preCompactSelection
  • postCompactSelection
  • preCompactScannerOpen
  • preCompact
  • postCompact

而preCompact会在创建了storeScanner之后读取数据之前调用,因此这里的思路就是对scanner进行代理,创建一个新的scanner实现其next方法,进而对读取到的原始数据进行加工;

代码如下,参考了hbase-examples模块中的ValueRewritingObserver类:

public class QualifierDeletingObserver implements RegionObserver, RegionCoprocessor {
  private static final Logger LOG = LoggerFactory.getLogger(QualifierDeletingObserver.class);

  private byte[] qualifierToDelete = null;
  private Bytes.ByteArrayComparator comparator;

  @Override
  public Optional<RegionObserver> getRegionObserver() {
    // Extremely important to be sure that the coprocessor is invoked as a RegionObserver
    return Optional.of(this);
  }

  @Override
  public void start(
      @SuppressWarnings("rawtypes") CoprocessorEnvironment env) throws IOException {
    RegionCoprocessorEnvironment renv = (RegionCoprocessorEnvironment) env;
    qualifierToDelete = Bytes.toBytes(renv.getConfiguration().get("qualifier.to.delete"));
    comparator = new Bytes.ByteArrayComparator();
  }

  @Override
  public InternalScanner preCompact(
      ObserverContext<RegionCoprocessorEnvironment> c, Store store,
      final InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker,
      CompactionRequest request) {
    InternalScanner modifyingScanner = new InternalScanner() {
      @Override
      public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException {
        boolean ret = scanner.next(result, scannerContext);
        for (int i = 0; i < result.size(); i++) {
          Cell c = result.get(i);
          byte[] qualifier = CellUtil.cloneQualifier(c);
          if (comparator.compare(qualifier, qualifierToDelete) == 0) {
            result.remove(i);
          }
        }
        return ret;
      }

      @Override
      public void close() throws IOException {
        scanner.close();
      }
    };

    return modifyingScanner;
  }
}

打成jar包上传到hdfs;

以下是简单的测试过程展示;

create 'cp_test','f'

put 'cp_test','rk1','f:q1','123'
put 'cp_test','rk1','f:q2','123'
put 'cp_test','rk2','f:q1','123'
put 'cp_test','rk2','f:q2','123'
put 'cp_test','rk2','f:q3','123'


hbase(main):015:0> scan 'cp_test'
ROW                                    COLUMN+CELL                                                                                                     
 rk1                                   column=f:q1, timestamp=1590567958995, value=123                                                                 
 rk1                                   column=f:q2, timestamp=1590567959023, value=123                                                                 
 rk2                                   column=f:q1, timestamp=1590567959048, value=123                                                                 
 rk2                                   column=f:q2, timestamp=1590567959073, value=123                                                                 
 rk2                                   column=f:q3, timestamp=1590567959842, value=123        
 

alter 'cp_test' \
, METHOD => 'table_att', 'coprocessor'=>'hdfs://xxx.jar|xxx.QualifierDelexxxtingObserver|1024|qualifier.to.delete=q1'

flush 'cp_test'

major_compact 'cp_test'

hbase(main):017:0> scan 'cp_test'
ROW                                    COLUMN+CELL                                                                                                     
 rk1                                   column=f:q2, timestamp=1590567959023, value=123                                                                 
 rk2                                   column=f:q2, timestamp=1590567959073, value=123                                                                 
 rk2                                   column=f:q3, timestamp=1590567959842, value=123  
相关实践学习
云数据库HBase版使用教程
&nbsp; 相关的阿里云产品:云数据库 HBase 版 面向大数据领域的一站式NoSQL服务,100%兼容开源HBase并深度扩展,支持海量数据下的实时存储、高并发吞吐、轻SQL分析、全文检索、时序时空查询等能力,是风控、推荐、广告、物联网、车联网、Feeds流、数据大屏等场景首选数据库,是为淘宝、支付宝、菜鸟等众多阿里核心业务提供关键支撑的数据库。 了解产品详情:&nbsp;https://cn.aliyun.com/product/hbase &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
目录
相关文章
|
分布式计算 Java Hadoop
|
分布式数据库 Hbase 设计模式
|
分布式数据库 Hbase 存储
hbase协处理器Coprocessor(简介)
一:介绍 把一部分计算也移动到数据的存放端;允许用户执行region级的操作;可以动态加载。 二:使用场景: 1、使用钩子来关联行修改操作来维护辅助索引,或维护一些数据间的引用完整性。
1716 0
|
存储 分布式数据库 Hbase
Hbase协处理器coprocessor
对每个region进行处理,弥补了scan的时候有限的几个过滤器的不足 分为两种类型 observer 观察者相当于触发器 Endpoint终端相当于存储过程 下面的观察者实现查询之前替换掉行键为Jack的KeyValue import java.io.IOException; import java.util.List; import org.apache.hadoop.hba
2098 0
|
4月前
|
Java Shell 分布式数据库
【大数据技术Hadoop+Spark】HBase数据模型、Shell操作、Java API示例程序讲解(附源码 超详细)
【大数据技术Hadoop+Spark】HBase数据模型、Shell操作、Java API示例程序讲解(附源码 超详细)
82 0
|
8月前
|
SQL 分布式计算 Hadoop
Hadoop集群hbase的安装
Hadoop集群hbase的安装
140 0
|
4月前
|
分布式计算 Hadoop 关系型数据库
Hadoop任务scan Hbase 导出数据量变小分析
Hadoop任务scan Hbase 导出数据量变小分析
53 0
|
3月前
|
存储 分布式计算 Hadoop
Hadoop中的HBase是什么?请解释其作用和用途。
Hadoop中的HBase是什么?请解释其作用和用途。
40 0
|
4月前
|
SQL 分布式计算 Hadoop
Hadoop学习笔记(HDP)-Part.16 安装HBase
01 关于HDP 02 核心组件原理 03 资源规划 04 基础环境配置 05 Yum源配置 06 安装OracleJDK 07 安装MySQL 08 部署Ambari集群 09 安装OpenLDAP 10 创建集群 11 安装Kerberos 12 安装HDFS 13 安装Ranger 14 安装YARN+MR 15 安装HIVE 16 安装HBase 17 安装Spark2 18 安装Flink 19 安装Kafka 20 安装Flume
82 1
Hadoop学习笔记(HDP)-Part.16 安装HBase