Hbase 学习(三)Coprocessors

简介: 最近在狂啃hadoop的书籍,这部《hbase:权威指南》就进入我的视野里面了,啃吧,因为是英文的书籍,有些个人理解不对的地方,欢迎各位拍砖。

Coprocessors

之前我们的filter都是在客户端定义,然后传到服务端去执行的,这个Coprocessors是在服务端定义,在客户端调用,然后在服务端执行,他有点儿想我们熟悉的存储过程,传一些参数进去,然后进行我们事先定义好的操作,我们常常用它来做一些比如二次索引啊,统计函数什么的,它也和自定义filter一样,需要事先定好,然后在hbase-env.sh中的HBASE_CLASSPATH中指明,就像我的上一篇中的写的那样。
Coprocessors分两种,observer和endpoint。
(1)observer就像触发器一样,当某个事件发生的时候,它就出发。
已经有一些内置的接口让我们去实现,RegionObserver、MasterObserver、WALObserver,看名字就大概知道他们是干嘛的。
(2)endpoint可以认为是自定义函数,可以把这个理解为关系数据库的存储过程。
所有的Coprocessor都是实现自Coprocessor 接口,它分SYSTEM和USER,前者的优先级比后者的优先级高,先执行。
它有两个方法,start和stop方法,两个方法都有一个相同的上下文对象CoprocessorEnvironment。
void start(CoprocessorEnvironment env) throws IOException; 
void stop(CoprocessorEnvironment env) throws IOException;
这是CoprocessorEnvironment的方法。
47e2112edd36ecfb6f7a13b0e78c6de42c1d9b2d
Working with Tables
对表进行操作的时候,必须先调用getTable方法活得HTable,不可以自己定义一个HTable,目前貌似没有禁止,但是将来会禁止。
并且在对表操作的时候,不能对行加锁。
Coprocessor Loading
Coprocessor加载需要在配置文件里面全局加载,比如在hbase-site.xml中设置。
<property>
    <name>hbase.coprocessor.region.classes</name>
    <value>coprocessor.RegionObserverExample,coprocessor.AnotherCoprocessor</value>
</property>
<property>
    <name>hbase.coprocessor.master.classes</name>
    <value>coprocessor.MasterObserverExample</value>
</property>
<property>
    <name>hbase.coprocessor.wal.classes</name>
    <value>coprocessor.WALObserverExample,bar.foo.MyWALObserver</value>
</property>
我们自定义的时间可以注册到三个配置项上,分别是hbase.coprocessor.region.classes,hbase.coprocessor.master.classes,
hbase.coprocessor.wal.classes上,他们分别负责region,master,wal,注册到region的要特别注意小心,因为它是针对所有表的。
<property>
    <name>hbase.coprocessor.region.classes</name>
    <value>coprocessor.RegionObserverExample</value>
</property>
注册到这三个触发器上,可以监控到几乎所有我们的操作上面,非常恐怖。。可以说是想要什么就有什么,详细的代码大家自己去摸索。
EndPoint的可以用来定义聚合函数,我们可以调用CoprocessorProtocol中的方法来实现我们的需求。
调用coprocessorProxy() 传一个单独的row key,这是在单独一个region上操作的。
要在所有region上面操作,我们要调用coprocessorExec()方法 传一个开始row key 和结束row key。
Demo
说了那么多废话,我都不好意思再说了,来个例子吧,统计行数的。
public interface RowCountProtocol extends CoprocessorProtocol {
    long getRowCount() throws IOException;

    long getRowCount(Filter filter) throws IOException;

    long getKeyValueCount() throws IOException;
}

public class RowCountEndpoint extends BaseEndpointCoprocessor implements
        RowCountProtocol {
    private long getCount(Filter filter, boolean countKeyValues)
            throws IOException {
        Scan scan = new Scan();
        scan.setMaxVersions(1);
        if (filter != null) {
            scan.setFilter(filter);
        }
        RegionCoprocessorEnvironment environment = (RegionCoprocessorEnvironment) getEnvironment();
        // use an internal scanner to perform scanning.
        InternalScanner scanner = environment.getRegion().getScanner(scan);
        int result = 0;
        try {
            List<KeyValue> curVals = new ArrayList<KeyValue>();
            boolean done = false;
            do {
                curVals.clear();
                done = scanner.next(curVals);
                result += countKeyValues ? curVals.size() : 1;
            } while (done);
        } finally {
            scanner.close();
        }
        return result;
    }

    @Override
    public long getRowCount() throws IOException {
        return getRowCount(new FirstKeyOnlyFilter());
    }

    @Override
    public long getRowCount(Filter filter) throws IOException {
        return getCount(filter, false);
    }

    @Override
    public long getKeyValueCount() throws IOException {
        return getCount(null, true);
    }
}
写完之后,注册一下吧。
<property>
    <name>hbase.coprocessor.region.classes</name>
    <value>coprocessor.RowCountEndpoint</value>
</property>

JAVA 客户端调用

在服务端定义之后,我们怎么在客户端用java代码调用呢,看下面的例子你就明白啦!
public class EndPointExample {
    public static void main(String[] args) throws IOException {
        Configuration conf = HBaseConfiguration.create();
        HTable table = new HTable(conf, "testtable");

        try {
            Map<byte[], Long> results = table.coprocessorExec(
                    RowCountProtocol.class, null, null,
                    new Batch.Call<RowCountProtocol, Long>() {
                        @Override
                        public Long call(RowCountProtocol counter)
                                throws IOException {
                            return counter.getRowCount();
                        }
                    });
            long total = 0;
            for (Map.Entry<byte[], Long> entry : results.entrySet()) {
                total += entry.getValue().longValue();
                System.out.println("Region: " + Bytes.toString(entry.getKey())
                        + ", Count: " + entry.getValue());
            }
            System.out.println("Total Count: " + total);
        } catch (Throwable throwable) {
            throwable.printStackTrace();
        }
    }

}
通过table的coprocessorExec方法调用,然后调用RowCountProtocol接口的getRowCount()方法。
然后遍历每个Region返回的结果,合起来就是最终的结果,打印结果如下。
Region:
testtable,,1303417572005.51f9e2251c29ccb2...cbcb0c66858f.,
Count: 2
Region:
testtable,row3,1303417572005.7f3df4dcba3f...dbc99fce5d87.,
Count: 3
Total Count: 5
在上面的例子当中,我们是用Batch.Call()方法来调用接口当中的方法,我们可以用另外一个方法来简化上述代码,来看例子。
Batch.Call call =Batch.forMethod(RowCountProtocol.class,"getKeyValueCount");
Map<byte[], Long> results = table.coprocessorExec(RowCountProtocol.class, null, null, call);

采用Batch.Call方法调用同时调用多个方法

Map<byte[], Pair<Long, Long>> results =table.coprocessorExec(
RowCountProtocol.class,
null, null,
new Batch.Call<RowCountProtocol, Pair<Long, Long>>()
{
    public Pair<Long, Long> call(RowCountProtocol counter) throws IOException {
        return new Pair(counter.getRowCount(),counter.getKeyValueCount());
    }
});
long totalRows = 0;
long totalKeyValues = 0;
for (Map.Entry<byte[], Pair<Long, Long>> entry :results.entrySet()) {
    totalRows +=
    entry.getValue().getFirst().longValue();
    totalKeyValues +=entry.getValue().getSecond().longValue();
    System.out.println("Region: " +Bytes.toString(entry.getKey()) +", Count: " + entry.getValue());
}
System.out.println("Total Row Count: " + totalRows);
System.out.println("Total KeyValue Count: " +totalKeyValues);

调用coprocessorProxy()在单个region上执行

RowCountProtocol protocol = table.coprocessorProxy(RowCountProtocol.class, Bytes.toBytes("row4"));
long rowsInRegion = protocol.getRowCount();
System.out.println("Region Row Count: " +rowsInRegion);
上面这个例子是查找row4行所在region的数据条数,这个可以帮助我们统计每个region上面的数据分布。
相关实践学习
云数据库HBase版使用教程
&nbsp; 相关的阿里云产品:云数据库 HBase 版 面向大数据领域的一站式NoSQL服务,100%兼容开源HBase并深度扩展,支持海量数据下的实时存储、高并发吞吐、轻SQL分析、全文检索、时序时空查询等能力,是风控、推荐、广告、物联网、车联网、Feeds流、数据大屏等场景首选数据库,是为淘宝、支付宝、菜鸟等众多阿里核心业务提供关键支撑的数据库。 了解产品详情:&nbsp;https://cn.aliyun.com/product/hbase &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
目录
相关文章
|
8月前
|
SQL 缓存 Java
【大数据】HBase入门学习 3
【大数据】HBase入门学习
84 0
|
8月前
|
存储 大数据 分布式数据库
【大数据】HBase入门学习 2
【大数据】HBase入门学习
80 0
|
8月前
|
存储 分布式计算 Hadoop
【大数据】HBase入门学习 1
【大数据】HBase入门学习
82 0
|
存储 NoSQL 关系型数据库
每日积累【Day 3】Hbase架构深入学习
每日积累【Day 3】Hbase架构深入学习
每日积累【Day 3】Hbase架构深入学习
|
大数据 Shell 分布式数据库
基于mac构建大数据伪分布式学习环境(六)-部署HBase
本文主要讲解列式数据库HBase的单机部署方式与配置
107 0
|
Java 分布式数据库 Ruby
HBase Filter 过滤器之 Comparator 原理及源码学习
HBase所有的比较器实现类都继承于父类ByteArrayComparable,而ByteArrayComparable又实现了Comparable接口;不同功能的比较器差别在于对父类compareTo()方法的重写逻辑不同。 下面分别对HBase Filter默认实现的七大比较器一一进行介绍。 1. BinaryComparator 介绍:二进制比较器,用于按字典顺序比较指定字节数组。 先看一个小例子: public class BinaryComparatorDemo { public static void main(String[] args) {
429 0
|
Hbase 分布式数据库 Serverless
1元包年,阿里云HBase Serverless开启大数据学习与测试的新时代
阿里云HBase Serverless 版是基于HBase,使用Serverless架构构建的一套新型的HBase 服务。 阿里云HBase Serverless版真正把HBase变成了一个服务,用户无需提前规划资源,选择CPU,内存资源数量,购买集群。在应对业务高峰,业务空间增长时,也无需进行扩容
4659 0
1元包年,阿里云HBase Serverless开启大数据学习与测试的新时代
|
存储 监控 物联网
HBase全网最佳学习资料汇总
前言 HBase这几年在国内使用的越来越广泛,在一定规模的企业中几乎是必备存储引擎,互联网企业阿里巴巴、京东、小米都有数千台的HBase集群,中国电信的话单、中国人寿的保单都是存储在HBase中。注意大公司有数十个数百个HBase集群,此点跟Hadoop集群很不相同。
5698 0
|
关系型数据库 分布式数据库 Hbase
hive_学习_02_hive整合hbase(失败)
一、前言 本文承接上一篇:hive_学习_01_hive环境搭建(单机) ,主要是记录 hive 整合hbase的流程 二、环境准备 1.环境准备 操作系统 : linux CentOS 6.
1830 0
|
分布式数据库 Apache Hbase
hbase_学习_00_资源帖
一、官方资料 1.官网:http://hbase.apache.org/ 2.官方文档:HBase 官方文档中文版   二、apache软件下载基地 1. Apache Software Foundation Distribution Directory archive.
1087 0