Hadoop MapReduce(WordCount) Java编程

简介:

编写WordCount程序数据如下:

hello beijing

hello shanghai

hello chongqing

hello tianjin

hello guangzhou

hello shenzhen

...


1、WCMapper:

package com.hadoop.testHadoop;


import java.io.IOException;


import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;


//4个泛型中,前两个是指定mapper输入数据的类型,KEYIN是输入的key的类型,VALUEIN是输入的value的类型

//map 和 reduce 的数据输入输出都是以 key-value对的形式封装的

//默认情况下,框架传递给我们的mapper的输入数据中,key是要处理的文本中一行的起始偏移量,value为这一行的内容

//LongWritable  Text 是hadoop为了序列化 定义的数据类型

public class WCMapper extends  Mapper<LongWritable,Text,Text,LongWritable>{


   //mapreduce框架每读一行数据就调用一次该方法

@Override

protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException {

String line=value.toString();

String [] words = line.split(" ");

for(String word:words){

context.write(new Text(word), new LongWritable(1));

}

}

}

2、WCReducer:

package com.hadoop.testHadoop;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Reducer;

public class WCReducer extends Reducer<Text, LongWritable, Text, LongWritable>{

//框架在map处理完成之后,将所有kv对缓存起来,进行分组,然后传递一个组<key,valus{}>,调用一次reduce方法

//<hello,{1,1,1,1,1,1.....}>

@Override

protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws java.io.IOException ,InterruptedException {

long count=0;

for(LongWritable value:values){

count+=value.get();

}

context.write(key, new LongWritable(count));

}


}


3、WCRunner:

package com.hadoop.testHadoop;


import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


public class WCRunner {

public static void main(String[] args) throws Exception {

Configuration conf=new Configuration();

Job job = Job.getInstance(conf);

//设置整个job所用的那些类在哪个jar包

   job.setJarByClass(WCRunner.class);

job.setMapperClass(WCMapper.class);

job.setReducerClass(WCReducer.class);

//map输出数据kv类型

job.setMapOutputKeyClass(Text.class);

job.setMapOutputValueClass(LongWritable.class);

//reduce输出数据kv类型

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(LongWritable.class);

//执行输入数据的路径

FileInputFormat.setInputPaths(job, new Path("/wordcount/inpput"));

//执行输出数据的路径

FileOutputFormat.setOutputPath(job, new Path("/wordcount/outputmy"));

//将job提交给集群运行 

job.waitForCompletion(true);

}

}


















本文转自lzf0530377451CTO博客,原文链接:http://blog.51cto.com/8757576/1839294,如需转载请自行联系原作者




相关文章
|
3月前
|
分布式计算 Hadoop Java
MapReduce编程:自定义分区和自定义计数器
MapReduce编程:自定义分区和自定义计数器
29 0
|
4月前
|
分布式计算 Hadoop
Hadoop系列 mapreduce 原理分析
Hadoop系列 mapreduce 原理分析
40 1
|
7月前
|
分布式计算 Hadoop 大数据
Hadoop学习:深入解析MapReduce的大数据魔力之数据压缩(四)
Hadoop学习:深入解析MapReduce的大数据魔力之数据压缩(四)
|
7月前
|
分布式计算 Hadoop 大数据
Hadoop学习:深入解析MapReduce的大数据魔力(三)
Hadoop学习:深入解析MapReduce的大数据魔力(三)
|
4月前
|
存储 分布式计算 负载均衡
【大数据技术Hadoop+Spark】MapReduce概要、思想、编程模型组件、工作原理详解(超详细)
【大数据技术Hadoop+Spark】MapReduce概要、思想、编程模型组件、工作原理详解(超详细)
59 0
|
11天前
|
机器学习/深度学习 分布式计算 监控
面经:MapReduce编程模型与优化策略详解
【4月更文挑战第10天】本文是关于MapReduce在大数据处理中的关键作用的博客摘要。作者分享了面试经验,强调了MapReduce的基本原理、Hadoop API、优化策略和应用场景。MapReduce包含Map和Reduce两个主要阶段,Map阶段处理输入数据生成中间键值对,Reduce阶段进行聚合计算。面试重点包括理解MapReduce工作流程、使用Hadoop API编写Map/Reduce函数、选择优化策略(如分区、Combiner和序列化)以及应用场景,如日志分析和机器学习。
19 2
|
3月前
|
存储 分布式计算 监控
Hadoop的JobTracker和TaskTracker在MapReduce中的作用是什么?
Hadoop的JobTracker和TaskTracker在MapReduce中的作用是什么?
53 0
|
3月前
|
分布式计算 Java Hadoop
MapReduce编程:检索特定群体搜索记录和定义分片操作
MapReduce编程:检索特定群体搜索记录和定义分片操作
28 0
|
3月前
|
分布式计算 Java Hadoop
MapReduce编程:数据过滤保存、UID 去重
MapReduce编程:数据过滤保存、UID 去重
40 0
|
3月前
|
缓存 分布式计算 Java
MapReduce编程:join操作和聚合操作
MapReduce编程:join操作和聚合操作
32 0

热门文章

最新文章