HBase结合MapReduce批量导入

简介:

  • Hbase是Hadoop生态体系配置的数据库,我们可以通过HTable api中的put方法向Hbase数据库中插入数据,但是由于put效率太低,不能批量插入大量的数据,文本将详细介绍如何通过MapReduce运算框架向Hbase数据库中导入数据。
    开篇先介绍业务场景:将电信手机上网日志中的数据导入到Hbase数据库中,将部分数据以及相应字段描述列出:
    这里写图片描述
    图片格式描述:
    这里写图片描述
    先介绍一个日期格式的转换:

    ?
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    public  class  TestDate
    {
          public  static  void  main(String[] args)
          {
                Date d =  new  Date();
                SimpleDateFormat df =  new  SimpleDateFormat( "yyyy-MM-dd HH:mm:ss" );  
                String time = df.format(d);
                System.out.println(time);
          }
    }
    /*2016-05-14 13:32:24*/
    ?
    1
    2
    3
    4
    5
    6
    7
    8
    9
      在Java当中,我们经常利用SimpledateFormat这个类将给定的日期转化成指定的格式。
      接下来在归纳一下Hbase结合MapReduce批量导入数据的时候,在代码当中应该注意的事项:
      ①MyReducer类继承的是TableReduce类,而不在是MapReduce中常用的Reducer类
      ②的数值类型没有什么用,通常将k3的数值类型设置为NullWritable即可
      ③只设置map函数的输出类型,不在设置reduce函数的输出类型,因为②的原因
      ④指定对输出文件格式化处理的类改为TableOutputFormat,而不在是TextOutputFormat
      ⑤输出文件的路径改为指定的表名,在Configuration中进行设定,而不在是path的方式
      ⑥如果想过jar包的方式运行程序,貌似还需要加入什么jar包,我没有整出来。
      接下来将贴出我在编程的时候第一次写出的业务代码:当然遇到了很多的问题。
    ?
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    package  IT01;
    import  java.io.IOException;
    import  java.text.SimpleDateFormat;
    import  java.util.Date;
    import  org.apache.hadoop.conf.Configuration;
    import  org.apache.hadoop.fs.Path;
    import  org.apache.hadoop.hbase.client.Put;
    import  org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
    import  org.apache.hadoop.hbase.mapreduce.TableReducer;
    import  org.apache.hadoop.io.LongWritable;
    import  org.apache.hadoop.io.NullWritable;
    import  org.apache.hadoop.io.Text;
    import  org.apache.hadoop.mapreduce.Job;
    import  org.apache.hadoop.mapreduce.Mapper;
    import  org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import  org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import  org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
    public  class  HbaseApp
    {
          public  static  String path1 =  "hdfs://hadoop80:9000/FlowData.txt" ;
          public  static  void  main(String[] args)  throws  Exception
          {
                Configuration conf =  new  Configuration();
                conf.set( "hbaser.rootdir" , "hdfs://hadoop80:9000/hbase" );
                conf.set( "hbase.zookeeper.quorum" , "hadoop80" );
                conf.set(TableOutputFormat.OUTPUT_TABLE, "wlan_log" ); //在这里需要指定表的名字:相当于输出文件的路径
                conf.set( "dfs.socket.timeout" , "2000" );
     
                Job job =  new  Job(conf, "HbaseApp" );
                FileInputFormat.setInputPaths(job,  new  Path(path1));
                job.setInputFormatClass(TextInputFormat. class );
                job.setMapperClass(MyMapper. class );
                job.setMapOutputKeyClass(Text. class );
                job.setMapOutputValueClass(Text. class );
     
                job.setNumReduceTasks( 1 );
                job.setPartitionerClass(HashPartitioner. class );
     
                job.setReducerClass(MyReducer. class );
    //         job.setOutputKeyClass(Text.class);
    //         job.setOutputValueClass(NullWritable.class);
                job.setOutputFormatClass(TableOutputFormat. class );
    //         FileOutputFormat.setOutputPath(job, new Path(path2));
                job.waitForCompletion( true );
          }
          public  static  class  MyMapper  extends  Mapper{
                 protected  void  map(LongWritable k1, Text v1,Context context) throws  IOException, InterruptedException
                 {
                       String[] splited = v1.toString().split( "\t" );
                       String reportTime = splited[ 0 ];
                       String msisdn = splited[ 1 ];
                       Date date =  new  Date(Long.parseLong(reportTime));
                       String time = DateConvert.dateParse(date);
                       String rowkey = msisdn+ ":" +time; //获取到行健
                       context.write( new  Text(rowkey), new  Text(v1.toString()));      
                 }
          }
          public  static  class  MyReducer  extends  TableReducer{
                 protected  void  reduce(Text k2, Iterablev2s,Context context) throws  IOException, InterruptedException
                 {
                       for  (Text v2 : v2s)
                      {
                          String[] splited = v2.toString().split( "\t" );
                          /**添加记录的时候需要指定行健、列族、列名、数值***/
                          Put put =  new  Put(k2.toString().getBytes());
                          put.add( "cf" .getBytes(), "reportTime" .getBytes(), splited[ 0 ].getBytes());
                          put.add( "cf" .getBytes(), "msisdn" .getBytes(), splited[ 1 ].getBytes());
                          put.add( "cf" .getBytes(), "apmac1" .getBytes(), splited[ 2 ].getBytes());
                          put.add( "cf" .getBytes(), "apmac2" .getBytes(), splited[ 3 ].getBytes());
                          put.add( "cf" .getBytes(), "host" .getBytes(), splited[ 4 ].getBytes());
                          put.add( "cf" .getBytes(), "sitetype" .getBytes(), splited[ 5 ].getBytes());
                          put.add( "cf" .getBytes(), "upPackNum" .getBytes(), splited[ 6 ].getBytes());
                          put.add( "cf" .getBytes(), "downPackNum" .getBytes(), splited[ 7 ].getBytes());
                          put.add( "cf" .getBytes(), "upPayLoad" .getBytes(), splited[ 8 ].getBytes());
                          put.add( "cf" .getBytes(), "downPayLoad" .getBytes(), splited[ 9 ].getBytes());
                          put.add( "cf" .getBytes(), "httpstatus" .getBytes(), splited[ 10 ].getBytes());
                          context.write(NullWritable.get(),put);
                      }       
                 }
          }
    }
    class  DateConvert
    {
         public  static  String dateParse(Date  date)
         {
              SimpleDateFormat df =  new  SimpleDateFormat( "yyyyMMddhhmmss" ); //构造一个日期解析器
              return  df.format(date); 
         }
    }

    程序运行完之后:显示如下异常NumberFormatException
    这里写图片描述
    显示的是数字格式异常, 于是我在map函数当中又加了一个throws NumberFormatException

    ?
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
                 protected  void  map(LongWritable k1, Text v1,Context context) throws  IOException, InterruptedException,NumberFormatException
                 {
                       String[] splited = v1.toString().split( "\t" );
                       String reportTime = splited[ 0 ];
                       String msisdn = splited[ 1 ];
                       Date date =  new  Date(Long.parseLong(reportTime));
                       String time = DateConvert.dateParse(date);
                       String rowkey = msisdn+ ":" +time; //获取到行健
                       context.write( new  Text(rowkey), new  Text(v1.toString()));      
                 }

    但是这样我发现也不对,因为当我追踪Mapp这个类的源代码时,我发现父类的map方法并没有抛出NumberFormatException这个异常,根据子类重写方法抛出异常的范围不能大于父类被重写方法抛出异常的范围,我又将上面这段代码用try——catch这种异常处理方式进行处理:

    ?
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    protected  void  map(LongWritable k1, Text v1,Context context) throws  IOException, InterruptedException
                 {
                       try
                       {
                           String[] splited = v1.toString().split( "\t" );
                           String reportTime = splited[ 0 ];
                           String msisdn = splited[ 1 ];
                           Date date =  new  Date(Long.parseLong(reportTime));
                           String time = DateConvert.dateParse(date);
                           String rowkey = msisdn+ ":" +time; //获取到行健
                           context.write( new  Text(rowkey), new  Text(v1.toString()));      
                       } catch (Exception e)
                       {
                           Counter counter = context.getCounter( "NumberExceptionNum" "num" );
                           counter.increment(1L);
                       }
                 }

    当我将代码改成这样的时候,此时程序并没有显示抛出NumberFormatException这个异常,说明异常得到了处理,但是当我去查看Hbase数据的时候,发现HDFS中的日志数据并没有导入到Hbase数据库中,于是我又查看了一下MapReduce的运行日志:
    这里写图片描述
    也就是我的22行数据在map函数中当中并没有输出,这个问题就匪夷所思了,为什么22行数据都会抛出数字格式异常呢,而且都没有输出,于是我想到可能是SimpleDateFZ喎"/kf/ware/vc/" target="_blank" class="keylink">vcm1hdNXiuPbA4LXEzsrM4qOs09rKx87S09a/qsq8uPfW1rDZtsijrLeiz9bN+MnPuty24M7E1cK2vMrHxfrF0NXiuPbA4LXEo6zX7tbV1tXT2tXStb3By87KzOK1xL3ivva3vbC4o6zTw3RyaW0oKdXiuPa3vbeoyKWz/dfWt/u0rsewuvO1xL/VuPG8tL/JoaM8L3A+DQo8cHJlIGNsYXNzPQ=="brush:java;">protected void map(LongWritable k1, Text v1,Context context)throws IOException, InterruptedException { try { String[] splited = v1.toString().split("\t"); String reportTime = splited[0].trim(); String msisdn = splited[1].trim(); Date date = new Date(Long.parseLong(reportTime)); String time = DateConvert.dateParse(date); String rowkey = msisdn+":"+time;//获取到行健 context.write(new Text(rowkey),new Text(v1.toString())); }catch(Exception e) { Counter counter = context.getCounter("NumberExceptionNum", "num"); counter.increment(1L); } }

    于是我又开始运行程序,但是当我运行完之后,从MapReduce的计数器当中,我发现第一条数据文本并没有导入:因为数字格式异常的这个原因估计在运行过程中被终止了。下面是计数器的显示:
    这里写图片描述
    于是我又想到了一个解决方案,将第一条数据多复制一条即可,然后重写将数据上传到HDFS中。
    此时在一次 运行程序,显示正确,此时数据也全部导入到Hbase数据库中。
    这里写图片描述
    Hbase中数据查看核实:
    这里写图片描述
    将HDFS中的数据通过MapReduce导入到Hbase数据库时,总结如下:
    核心步骤:先将数据文件上传到HDFS,然后用MapReduce进行处理,将处理后的数据插入到 hbase中
    注意事项:
    1>子类重写方法抛出异常的范围不能大于父类被重写方法抛出异常的范围
    2>用trim()这个方法可以去除字符串前后的空格,换行符。
    3>既然第一条数据总是显示数字格式异常,将第一条数据复制为2份即可。


本文转自 SimplePoint 51CTO博客,原文链接:http://blog.51cto.com/2226894115/1896517,如需转载请自行联系原作者
相关实践学习
云数据库HBase版使用教程
  相关的阿里云产品:云数据库 HBase 版 面向大数据领域的一站式NoSQL服务,100%兼容开源HBase并深度扩展,支持海量数据下的实时存储、高并发吞吐、轻SQL分析、全文检索、时序时空查询等能力,是风控、推荐、广告、物联网、车联网、Feeds流、数据大屏等场景首选数据库,是为淘宝、支付宝、菜鸟等众多阿里核心业务提供关键支撑的数据库。 了解产品详情: https://cn.aliyun.com/product/hbase   ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库 ECS 实例和一台目标数据库 RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
存储 分布式计算 Hadoop
【HBase】(八)往 HBase 导入数据的几种操作
【HBase】(八)往 HBase 导入数据的几种操作
2409 0
|
6月前
|
分布式计算 分布式数据库 Hbase
99 MapReduce操作Hbase
99 MapReduce操作Hbase
43 0
|
12月前
|
Shell 分布式数据库 数据库
第7章 HBase操作
第7章 HBase操作
140 0
第7章 HBase操作
|
12月前
|
分布式数据库 Hbase
使用eplicse对hbase进行操作
使用eplicse对hbase进行操作
|
存储 分布式计算 分布式数据库
【HBase】(九)MapReduce 操作 Hbase
【HBase】(九)MapReduce 操作 Hbase
373 0
|
SQL Java 分布式数据库
Hive与HBase的集成
Hive提供了与HBase的集成,使得能够在HBase表上使用HQL语句进行查询 插入操作以及进行Join和Union等复杂查询、同时也可以将hive表中的数据映射到Hbase中。
|
分布式数据库 Hbase 流计算
Flink操作Hbase
现在有这样一个场景,我们需要将hbase做成一个数据流,而不是数据集。根据Flink自带的Flink-Hbase只能帮我们做到数据集,所以这个时候选择了重写Hbase的数据源。 package com.
3748 0
|
SQL 分布式数据库 Apache
hive数据导入云hbase
网络环境 专线:用户需要把hbase集群的VPC相关网络信息配置到专线里面,可直通hbase环境 公有云虚拟机VPC环境:选择和hbase通VPC 其他:需要开hbase公网 注意:默认导入hbase数据,依赖的hbase-common、hbase-client、hbase-server、hbase-protocol使用社区的包即可。
5678 0
|
分布式计算 分布式数据库 Apache