《Hadoop与大数据挖掘》——2.4 Hadoop编程开发

简介:

本节书摘来自华章计算机《Hadoop与大数据挖掘》一书中的第2章,第2.4节,作者 张良均 樊哲 位文超 刘名军 许国杰 周龙 焦正升,更多章节内容可以访问云栖社区“华章计算机”公众号查看。

2.4 Hadoop编程开发

Hadoop框架最核心的设计就是HDFS和MapReduce。HDFS为海量的数据提供了存储,则MapReduce为海量的数据提供了计算。本节就MapReduce开发相关内容进行分析,包括HDFS Java API操作、MapReduce原理、MapReduce相关流程组件配置及编程等。最后将给出两个算法:Kmeans算法、Tf-idf算法的动手实践,加深对MapReduce编程的认识和理解。

2.4.1 HDFS Java API操作

Hadoop中关于文件操作类基本上是在org.apache.hadoop.fs包中,这些API能够支持的操作有:打开文件,读写文件,删除文件,创建文件、文件夹,判断是文件或文件夹,判断文件或文件夹是否存在等。

Hadoop类库中最终面向用户提供的接口类是FileSystem,这个类是个抽象类,只能通过类的get方法得到其实例。get方法有几个重载版本,如图2-28所示。

image

比较常用的是第一个,即灰色背景的方法。

FileSystem针对HDFS相关操作的API如表2-5所示。

image

代码清单2-22,是FileSystem API的一个简单示例。该代码首先获取FileSystem的一个实例,然后调用该实例的listStatus方法,获取所有根目录下面的文件或文件夹(注意这里获取的不包含递归子目录);接着,调用create方法创建一个新文件,并写入“Hello World!”;最后,读取刚才创建的文件,并把创建的文件内容打印出来;关闭FileSystem实例。

image
image
image

执行完成后,在HDFS上可以看到创建的文件及内容,如图2-29所示。

image

2.4.2 MapReduce原理

1.通俗理解MapReduce原理

现在你接到一个任务,给你10本长篇英文小说,让你统计这10本书中每一个单词出现的次数。这便是Hadoop编程中赫赫有名的HelloWorld程序:词频统计。这个任务的结果形式如表2-6所示。

image

即在这10本书中a共出现了12300次,ai共出现了63次……依次计算出每一个单词出现多少次。天啊,这个工作必须由专业人士做呀,自己做的话还不累死呀。这时你可以把这个工作外包给一支职业分布式运算工程队做。

分布式运算工程队中按岗位有Mapper、Mapper助理Comb-iner、Mapper助理InputFormat、Mapper助理Patitioner、运输负责Shuffle、Reducer、Reducer助理Sort、Reducer助理OutputFormat。除了Combiner是非必需人员外,其他岗位都是必需的。下面描述一下这个工程队是怎么做这项工作的。

首先把这10本书分别分到10个Mapper手中。Mapper助理InputFormat负责从书中读取记录,Mapper负责记录怎么解析重新组织成新的格式。然后Mapper把自己的处理结果排好序后放到书旁边,等待Shuffle取走结果。Shuffle把取到的结果送给Reducer助理Sort,由Sort负责把所有Mapper的结果排好序,然后送给Reducer来进行汇总,以得到最终的结果;最后,由Reducer助理Outputformat记录到规定位置并存档。

下面说明什么时候需要Combiner。Maper助理InputForormat从书中一行行读取记录,给到Mapper,Mapper从Inputformat的记录中解析出一个个单词,并进行记录。Mapper处理的结果形如“a出现了一次,a出现了一次,ai出现了一次……zhe出现了一次”。工作一段时间后发现负责搬运工作的Suffle有点吃不消,这时就用到Mapper助理Combiner了。由Combiner对的输出结果进行短暂的汇总,把Mapper的结果处理成形如“书本一中单词a共出现1500次,ai出现了14次,are出现了80次……”这样Shuffle的压力顿时减轻了许多。

对于每个岗位工程队都是有默认时限的。但如果默认时限不能满足需求,也可以对工作量进行自定义。

上面的过程描述了一个MapReduce工程队是如何进行配合工作的。这个过程与MapReduce分布式运算是基本对应的。理解了上面的过程也就大概理解了Hadoop的Map-Reduce过程了。

2. MapReduce过程解析

MapReduce过程可以解析为如下所示:

1)文件在HDFS上被分块存储,DataNode存储实际的块。

2)在Map阶段,针对每个文件块建立一个map任务,map任务直接运行在DataNode上,即移动计算,而非数据,如图2-30所示。

3)每个map任务处理自己的文件块,然后输出新的键值对,如图2-31所示。

4)Map输出的键值对经过shuffle/sort阶段后,相同key的记录会被输送到同一个reducer中,同时键是排序的,值被放入一个列表中,如图2-32所示。

5)每个reducer处理从map输送过来的键值对,然后输出新的键值对,一般输出到HDFS上。

image
image

3.单词计数源码解析

上面的分析都是建立在理论基础上的,这样的分析有利于编写MapReduce程序。但是如果要实际编写一个MapReduce的简单程序,还是不够的,需要具体看示例代码。这里直接以官网提供的example代码中的WordCount程序作为示例,进行代码级别分析和说明。

首先,在Hadoop的发行版中找到对应的代码。在解压下载的Hadoop2.6.0的发行版目录中,找到hadoop-2.6.0sharehadoopmapreducesources目录,该目录下面有一个hadoop-mapreduce-examples-2.6.0-sources.jar文件,使用压缩文件解压缩该文件,在目录org/apache/Hadoop/examples中即可找到WordCount.java文件,如图2-33所示。

image

找到该文件后,使用文本软件打开,或拷贝到Eclipse工程中查看,如代码清单2-23所示。

image
image
image

下面对该代码进行分析。

(1)应用程序Driver分析

这里的Driver程序主要指的是main函数,在main函数里面设置MapReduce程序的一些初始化设置,并提交任务等待程序运行完成,如代码清单2-24所示。

image
image

下面,针对WordCount main函数代码进行分析说明。

1)第1部分Configuration代码,初始化相关Hadoop配置。在2.4.1节中也看到过,这里直接新建一个实例即可。如果是在实际的应用程序中,可以通过conf.set()函数添加必要参数,即可直接运行。

2)第2部分代码新建Job,并设置主类。这里的Job实例需要把Configuration的实例传入,后面的“word count”是该MapReduce任务的任务名(注意这里的方式使用的还是不推荐的MRV1的版本,推荐使用MRV2的版本)。

3)第3部分代码设置Mapper、Reducer、Combiner,这里的设置代码都是固定写法,里面的类名可以改变,一般情况下里面的类名为实际任务Mapper、Reducer、Combiner。

4)第4部分代码设置输出键值对格式。在MapReduce任务中涉及三个键值对格式:Mapper输入键值对格式,Mapper输出键值对格式,Reducer输入键值对格式,Reducer输出键值对格式。当Mapper输出键值对格式和Reducer输出键值对格式一样的时候,可以只设置输出键值对的格式(这个其实就是Reducer输出的键值对格式),否则需要设置“job.setMapOutputKeyClass(Text.class); job.setMapOutputKeyClass(IntWritable.class);”。

5)第5、第6部分代码设置输入、输出路径,其实还有输入、输出文件格式的设置,只是这里没有设置,如果不是默认格式,那么还是需要设置的。

6)最后部分代码是提交MapReduce任务运行(是固定写法),并等待任务运行结束。

综合上面的描述,这里给出MapReduce任务初始化以及提交运行的一般代码,如代码清单2-25所示。

image

在实际应用程序中,一般是直接从应用程序提交任务到Hadoop集群的,而非使用yarn jar的方式提交jar包来运行算法的。这里给出通用的提交应用程序到Hadoop集群的代码作为参考,不过在此之前需要简要分析下Configuration这个类。

Configuration是Hadoop系统的基础公共类,可以通过这个类的API加载配置信息,同时在初始化这个类的实例的时候也可以设置Hadoop集群的配置,从而直接针对某个Hadoop集群提交任务,其API如图2-34所示。

image

Configuration各种set API中用得比较多的还是第1个,通用的提交应用程序到Hadoop集群的代码也是使用的第1个,见代码清单2-26。

image

上面的值需要根据实际的Hadoop集群对应配置进行修改。

同时,通过Configuration的set方法也可以实现在Mapper和Reducer任务之间信息共享。比如在Driver中设置一个参数number,在Mapper或Reducer中取出该参数,如代码清单2-27所示(注意,在MapReduce程序中是不能通过全局static变量获取值的,这点需要特别注意)。

image

(2)Mapper分析

对于用户来说,其实比较关心的是Mapper的map函数以及Reducer的reduce函数,这里先分析Mapper的map函数,如代码清单2-28所示。

image
image

1)自定义Mapper需要继承Mapper,同时需要设置输入输出键值对格式,其中输入键值对格式是与输入格式设置的类读取生成的键值对格式匹配,而输出键值对格式需要与Driver中设置的Mapper输出的键值对格式匹配。

2)Mapper有3个函数,分别是setup、map、cleanup,其中实现setup、cleanup函数不是必须要求,Mapper任务启动后首先执行setup函数,该函数主要用于初始化工作;针对每个键值对会执行一次map函数,所有键值对处理完成后会调用cleanup函数,主要用于关闭资源等操作。

3)实现的map函数就是与实际业务逻辑挂钩的代码,主要由用户编写,这里是单词计数程序,所以这里的逻辑是把每个键值对(键值对组成为:<行的偏移量,行字符串>)的值(也就是行字符串)按照空格进行分割,得到每个单词,然后输出每个单词和1这样的键值对。

(3)Reducer分析

Reducer针对Mapper的输出进行整合,同时输入给Reducer的是键值对组,所以其实Reducer中的reduce函数就是针对每个键的所有汇总值的处理。Reducer代码如代码清单2-29所示。

image

1)自定义Reducer同样需要继承Reducer,与Mapper相同,需要设置输入输出键值对格式,这里的输入键值对格式需要与Mapper的输出键值对格式保持一致,输出键值对格式需要与Driver中设置的输出键值对格式保持一致。

2)Reducer也有3个函数:setup、cleanup、reduce,其中setup、cleanup函数其实和Mapper的同名函数功能一致,并且也是setup函数在最开始执行一次,而cleanup函数在最后执行一次。

3)用户一般比较关心reduce函数的实现,这个函数里面写的就是与业务相关的处理逻辑了,比如,这里单词计数,就针对相同键,把其值的列表全部加起来进行输出。

2.4.3 动手实践:编写Word Count程序并打包运行

1)打开Eclipse,新建MapReduce工程,如图2-35、图2-36所示。

需要配置Hadoop的安装目录,因为这里的Eclipse安装在Windows系统上,所以这里的Hadoop安装目录就是指Hadoop安装包的解压目录。

建好的工程如图2-37所示(注意,这里还有相关jar包没有列出)。

2)参考上一节的代码编写单词计数程序。

image

3)使用Eclipse的Export中的JAR file工具打包成jar包,如图2-38、图2-39所示。

image

4)获取导出的jar包,通过Linux连接工具把该jar包上传到Hadoop客户端,并使用命令yarn jar的方式运行。

image

5)查看输出结果信息及相关监控信息,并能进行简要分析。

思考:

1)使用yarn jar的方式运行完程序后,终端输出的信息怎么解读?

2)查看相关监控,除了使用浏览器,还可以使用什么方式查询?

2.4.4 MapReduce组件分析与编程实践

MapReduce整个流程包括以下步骤:输入格式(InputFormat)、Mapper、Combiner、Partitioner、Reducer、输出格式(OutputFormat)。这里会针对流程中的Combiner、Part-itioner、输入/输出格式进行分析,同时,也会介绍相关的编程技巧,如自定义键值对。

1. Combiner分析

Combiner是什么呢?从字面意思理解,Combine即合并。其实,Combiner就是对Mapper的输出进行一定的合并,减少网络输出的组件。所以,其去掉与否不影响最终结果,影响的只是性能。

Combiner是Mapper端的汇总,然后才通过网络发向Reducer。如图2-40所示,经过Combiner后,键值对,被合并为,这样发往Reducer的记录就可以减少一条(当然,实际中肯定不是只减少一条记录),从而减少了网络IO。

对于多个输入数据块,每个数据块产生一个InputSplit,每个InputSplit对应一个map任务,每个map任务会对应0个到多个Combiner,最后再汇总到Reducer。在单词计数的例子中,使用Combiner的情形如图2-41所示。

image

image

需要注意的是,自定义Combiner也是需要集成Reducer的,同样也需要在reduce函数中写入处理逻辑。但是要注意,Combiner的输入键值对格式与输出键值对格式必须保持一致,也正是因为这个要求,很多情况下,采用自定义Combiner的方式在业务或算法处理上行不通。还有,在单词计数程序中,Combiner和Reducer使用的是同一个类代码,这是可能的,但是大多数情况下不能这样做,因为Reducer和Combiner的逻辑在很多情况下是不一样的。

2. Partitioner分析

Partitioner是来做什么的呢?是用来提高性能的吗?非也!Partitioner主要的目的是把键值对分给不同的Reducer。分给不同的Reducer?难道Reducer可以有多个吗?这是当然的,只需要在初始化Job实例的时候进行设置即可,例如设置代码为job.setNum-ReduceTasks(3),这样就可以设置3个Reducer了。

经过前面的分析可以知道,在Reducer的输入端,其键值对组是按照一个键对应一个值列表的。如果同一个键的不同值被发送到了不同的Reducer中,那么(注意,每个Reducer在一个子节点运行,不同Reducer之间不会干扰),经过不同的Reducer处理后,其实我们已经做不到针对一个键,输出一个值了,而是输出了两条记录。我们可以看下Hadoop系统默认的Partitioner实现,默认的Partitioner是HashPartitioner,其源码如代码清单2-30所示。

image

在源码中,可以看到HashPartitiner中只有一个方法,就是getPartition(K key,V value, int numReducTasks)。3个参数分别为键、值、Reducer的个数,输出其实就是Reducer的ID。从代码的实现中可以看出,最终输出的Reducer ID只与键(key)的值有关,这样也就保证了同样的键会被发送到同一个Reducer中处理。

同一个键的记录会被发送到同一个Reducer中处理,一个Reducer可以处理不同的键的记录。

3.输入输出格式/键值类型

一般来说,HDFS一个文件对应多个文件块,每个文件块对应一个InputSplit,而一个InputSplit就对应一个Mapper任务,每个Mapper任务在一个节点上运行,其仅处理当前文件块的数据,但是我们编写Mapper的时候只是关心输入键值对,而不是关心输入文件块。那么,文件块怎么被处理成了键值对呢?这就是Hadoop的输入格式要做的工作了。

在InputFormat中定义了如何分割以及如何进行数据读取从而得到键值对的实现方式,它有一个子类FileInputFormat,如果要自定义输入格式,一般都会集成它的子类File-InputFormat,它里面帮我们实现了很多基本的操作,比如记录跨文件块的处理等。

图2-42所示是InputFormat的类继承结构。

然而,比较常用的则是如表2-7所示的几个实现方式。

同理,可以想象,输出格式(OutputFormat)也与输入格式相同,不过是输入格式的逆过程:把键值对写入HDFS中的文件块中。如图2-43所示是OutputFormat的类继承结构。

image
image
image

在Hadoop中,无论是Mapper或Reducer处理的都是键值对记录,那么Hadoop中有哪些键值对类型呢?Hadoop中常用的键值对类型如图2-44所示。

image

从各个类的命名上其实也可以看出其代表什么类型,比如LongWritable,代表的就是Long的实现,而Text就是String的实现。在前面的单词计数中我们使用过IntWritable以及Text。

这里有两点需要注意:

1)值类型都需实现Writale接口;

2)键需要实现WritableComparable接口。

其实从图2-44中也可以看出,Hadoop已有的键值类型都是实现WritableComparable接口的,然而WritableComparable接口又是实现Writable接口的。所以,Hadoop已有的键值类型既可以作为键类型也可以作为值类型。作为键类型的肯定可以作为值类型,但作为值类型的却不能作为键类型。为什么键类型是实现WritableComparable接口呢?其实,如果你联想到了Shuffle/Sort过程的话,应该不难理解,因为MapReduce框架需要在这里对键进行排序。

4.动手实践:指定输入输出格式

这个实验主要是加深理解Hadoop的输入/输出格式,熟悉常用的SequenceFileInput-Format和SequenceFileOutputFormat。

实验步骤:

1)打开Eclipse,打开已经完成的WordCount程序;

2)设置输出格式为SequenceFileOutputFormat,重新打包,并提交到Linux上运行;

3)查看输出的文件;

4)再次修改WordCount程序,设置输入格式为SequenceFileInputFormat、输入路径为3的输出;设置输出格式为TextFileInputFormat;

5)查看输出结果;

6)针对上面的各个步骤以及输出进行分析,解释对应的输出结构。
思考:

1)第4步中查看的文件是否是乱码?如果是乱码,为什么是乱码?针对这样的数据,如何使用HDFS Java API进行读取?如果不是乱码,看到的是什么?

2)使用SequenceFileInputFormat或SequenceFileOutputFormat有什么优势与劣势?

5.自定义键值类型

Hadoop已经定义了很多键值类型,比如Text、IntWritable、LongWritable等,那为什么需要用到自定键值类型呢?答案其实很简单,不够用。在有些情况下,我们需要一些特殊的键值类型来满足我们的业务需求,这种时候就需要自定义键值类型了。前面已经提到,自定义键需要实现WritableComparable接口,自定义值需要实现Writable接口,那么实现了接口后,还需要做哪些操作呢?

自定义值类型可参考代码清单2-31进行分析。

image

在代码清单2-31中,首先实现了Writable接口,接着定义了两个变量。这两个变量其实是与业务相关的(比如,这里定义了一个counter,一个timestamp)。实现了Writable接口后,需要覆写两个方法(write和readFields),这里需要注意写入和读取的顺序是很重要的,比如这里先把counter写入out输出流,再把timestamp写入out输出流。那么,在读取的时候就需要先读取counter,再读取timestamp(如果两个变量都是int型,那么就更加需要注意区分)。

自定义键类型可参考代码清单2-32进行分析。

image
image

从代码清单2-32中可以看出,自定义键类型其实就是比自定义值类型多了一个比较方法而已,其他都是一样的。

6.动手实践:自定义键值类型

针对source/hadoop/keyvalue.data数据求解每行数据的个数以及平均值,该数据格式如表2-9所示。

image

1)编写Driver程序,main函数接收两个参数和,设置输入格式为KeyValueInputFormat;

2)编写Mapper程序,map函数针对每个value值,使用‘t’进行分隔;接着,对分隔后的数据进行求和以及个数统计(注意将字符串转换为数值),输出平均值和个数,Mapper输出键值对类型为;

3)编写自定义value类型MyValue,定义两个字段,一个是average,一个是num,用于存储平均值和个数;重写toString方法;

4)编写Reducer程序,直接输出即可;

5)对编写的程序进行打包averagejob.jar;

6)上传source/hadoop/keyvalue.data到HDFS,上传averagejob.jar到Linux;

7)使用命令hadoop jar averagejob.jar进行调用;

8)查看输出结果。

思考:

1)Reducer类是否必需?如果不需要,则如何修改?如果去掉reducer,输出结果会有什么不一样?

2)如果想让程序可以直接在Eclipse中运行,应该如何修改程序?

相关实践学习
简单用户画像分析
本场景主要介绍基于海量日志数据进行简单用户画像分析为背景,如何通过使用DataWorks完成数据采集 、加工数据、配置数据质量监控和数据可视化展现等任务。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps&nbsp;
相关文章
|
20天前
|
存储 分布式计算 Hadoop
大数据处理架构Hadoop
【4月更文挑战第10天】Hadoop是开源的分布式计算框架,核心包括MapReduce和HDFS,用于海量数据的存储和计算。具备高可靠性、高扩展性、高效率和低成本优势,但存在低延迟访问、小文件存储和多用户写入等问题。运行模式有单机、伪分布式和分布式。NameNode管理文件系统,DataNode存储数据并处理请求。Hadoop为大数据处理提供高效可靠的解决方案。
46 2
|
20天前
|
分布式计算 Hadoop 大数据
大数据技术与Python:结合Spark和Hadoop进行分布式计算
【4月更文挑战第12天】本文介绍了大数据技术及其4V特性,阐述了Hadoop和Spark在大数据处理中的作用。Hadoop提供分布式文件系统和MapReduce,Spark则为内存计算提供快速处理能力。通过Python结合Spark和Hadoop,可在分布式环境中进行数据处理和分析。文章详细讲解了如何配置Python环境、安装Spark和Hadoop,以及使用Python编写和提交代码到集群进行计算。掌握这些技能有助于应对大数据挑战。
|
8天前
|
分布式计算 Hadoop 大数据
[大数据] mac 史上最简单 hadoop 安装过程
[大数据] mac 史上最简单 hadoop 安装过程
|
20天前
|
分布式计算 监控 Hadoop
Hadoop【基础知识 02】【分布式计算框架MapReduce核心概念+编程模型+combiner&partitioner+词频统计案例解析与进阶+作业的生命周期】(图片来源于网络)
【4月更文挑战第3天】Hadoop【基础知识 02】【分布式计算框架MapReduce核心概念+编程模型+combiner&partitioner+词频统计案例解析与进阶+作业的生命周期】(图片来源于网络)
56 0
|
22天前
|
SQL 分布式计算 Hadoop
利用Hive与Hadoop构建大数据仓库:从零到一
【4月更文挑战第7天】本文介绍了如何使用Apache Hive与Hadoop构建大数据仓库。Hadoop的HDFS和YARN提供分布式存储和资源管理,而Hive作为基于Hadoop的数据仓库系统,通过HiveQL简化大数据查询。构建过程包括设置Hadoop集群、安装配置Hive、数据导入与管理、查询分析以及ETL与调度。大数据仓库的应用场景包括海量数据存储、离线分析、数据服务化和数据湖构建,为企业决策和创新提供支持。
66 1
|
23天前
|
分布式计算 资源调度 Hadoop
Apache Hadoop入门指南:搭建分布式大数据处理平台
【4月更文挑战第6天】本文介绍了Apache Hadoop在大数据处理中的关键作用,并引导初学者了解Hadoop的基本概念、核心组件(HDFS、YARN、MapReduce)及如何搭建分布式环境。通过配置Hadoop、格式化HDFS、启动服务和验证环境,学习者可掌握基本操作。此外,文章还提及了开发MapReduce程序、学习Hadoop生态系统和性能调优的重要性,旨在为读者提供Hadoop入门指导,助其踏入大数据处理的旅程。
96 0
|
2月前
|
消息中间件 SQL 分布式计算
大数据Hadoop生态圈体系视频课程
熟悉大数据概念,明确大数据职位都有哪些;熟悉Hadoop生态系统都有哪些组件;学习Hadoop生态环境架构,了解分布式集群优势;动手操作Hbase的例子,成功部署伪分布式集群;动手Hadoop安装和配置部署;动手实操Hive例子实现;动手实现GPS项目的操作;动手实现Kafka消息队列例子等
21 1
大数据Hadoop生态圈体系视频课程

热门文章

最新文章