MapReduce的思想
MapReduce的思想核心是“先分再合,分而治之”。具体来说,MapReduce通过将复杂任务分解为若干个简单的子任务,这些子任务可以并行处理,彼此之间相对独立,没有依赖关系。MapReduce运行在yarn之上,一旦所有子任务完成,再将中间结果合并,形成对原始问题的完整解答。
MapReduce的工作流程主要包括两个阶段:Map阶段和Reduce阶段
- Map阶段: 负责将一个大的任务划分成小的任务,小任务之间不能有依赖关系
- Reduce阶段: 负责将Map阶段的结果进行汇总
MapReduce的八个步骤
- InputFormat:输入数据被分割成多个小数据块(splits),每个数据块可以在不同的服务器上并行处理12。
- Map阶段:Map任务对每个数据块进行处理,生成一系列的key/value对。Map任务通过RecordReader读取InputSplit,并调用用户定义的map函数处理这些数据12。
- Shuffle阶段:Map任务处理完数据后,将结果写入环形缓冲区。当缓冲区满时,数据会被写入本地磁盘,生成临时文件,并进行排序和合并操作3。
- Partition阶段:对Map输出的key/value对进行分区,决定每个key/value对应该由哪个Reduce任务处理12。
- Copy&Merge阶段:将分区后的数据复制到Reduce任务所在的节点,并进行合并操作1。
- Sort阶段:对Reduce任务接收到的数据进行全局排序3。
- Reduce阶段:Reduce任务对排序后的数据进行处理,生成最终结果12。
- OutputFormat:将Reduce任务的输出写入最终的输出文件中12。
每个步骤的详细解释和作用:
- InputFormat:将输入数据分割成多个小数据块,以便并行处理。例如,TextInputFormat会将文本文件按行分割12。
- Map阶段:Map任务读取分割后的数据块,执行用户定义的map函数,生成key/value对。这些结果存储在环形缓冲区中12。
- Shuffle阶段:当环形缓冲区满时,数据会被写入本地磁盘,生成临时文件。在写入前,会对数据进行排序和合并操作。如果设置了Combiner,还会在写入前对数据进行局部聚合3。
- Partition阶段:根据key或value及Reduce任务的数量,决定每个key/value对应该由哪个Reduce任务处理。默认使用HashPartitioner进行分区12。
- Copy&Merge阶段:将分区后的数据复制到Reduce任务所在的节点,并进行合并操作,生成全局排序后的数据集1。
- Sort阶段:Reduce任务接收到的数据进行全局排序,确保最终结果的有序性3。
- Reduce阶段:Reduce任务对排序后的数据进行处理,生成最终结果12。
- OutputFormat:将Reduce任务的输出写入最终的输出文件中,完成整个MapReduce过程12。
简单概述
- Map阶段2个步骤:
- 1. 设置 InputFormat 类, 将数据切分为 Key-Value(K1和V1) 对, 输入到第二步
- 2. 自定义 Map 逻辑(自己写代码), 将第一步的结果转换成另外的 Key-Value(K2和V2) 键值对对, 并输出结果
- Shuffle 阶段 4 个步骤
- 3. 分区(Partition)
- 4. 排序(Sort)
- 5. 规约(Combiner)
- 6. 分组(Group By)
- Reduce阶段2个步骤
- 7. 对多个 Map 任务的结果进行排序以及合并 , 编写 Reduce 函数实现自己的逻辑 , 对输入的Key-Value 进行处理 , 转为新的 Key-Value ( K3 和 V3 )输出
- 8. 设置 OutputFormat 处理并保存 Reduce 输出的 Key-Value 数据
词频统计实例
实现思路
代码实现
自定义WordCountMapper类,继承Mapper类,并重写Map方法将读取的<k1,v1>转换为<k2,v2>,并将 k2和v2写入上下文 发送到下一个阶段进行处理。
package cn.itcast.mapreuce;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;/*四个泛型解释:KEYIN :K1的类型 --java(Long)-->(LongWritable)VALUEIN:VI的类型 --java(String)-->(Text)KEYOUT:K2的类型 --java(String)-->(Text)VALUEOUT:V2的类型 --java(Long)-->(LongWritable)
*/
// 1.继承 Mapper类
public class WordCountMapper extends Mapper<LongWritable, Text,Text,LongWritable> {// 2.重写 Map方法// 将 <k1,v1> 转为 <k2,v2>/* 参数* key : k1 行偏移量* value :v1 每一行的文本数据* context :表示上下文对象,桥梁作用* *//*如何将 <k1,v1> 转为 <k2,v2>K1 V10 hello,world,hadoop15 hdfs,hive,hello---------------------------K2 V2hello 1world 1hdfs 1hadoop 1hello 1*/@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {Text text = new Text();LongWritable longWritable = new LongWritable();// 1.将一行数据的文本数据进行拆分String[] split = value.toString().split(",");// 2.遍历数组,组装 k2和v2for (String word : split) {// 3.将 k2和v2写入上下文 --将数据发送到下一个阶段进行处理
// context.write(new Text(word),new L
// ongWritable(1));text.set(word);longWritable.set(1);context.write(text, longWritable);}}
}
自定义WordCountReduce类,继承Reduce类,并重写reduce方法将读取的<k2,v2>转换为<k3,v3>,并将 k3和v3写入上下文。
package cn.itcast.mapreuce;/*四个泛型解释:KEYIN: K2类型VALULEIN: V2类型KEYOUT: K3类型VALUEOUT:V3类型*/import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class WordCountReducer extends Reducer<Text, LongWritable,Text, LongWritable> {//reduce方法作用: 将新的K2和V2转为 K3和V3 ,将K3和V3写入上下文中/*参数:key : 新K2values: 集合 新 V2context :表示上下文对象----------------------如何将新的K2和V2转为 K3和V3新 K2 V2hello <1,1,1>world <1,1>hadoop <1>------------------------K3 V3hello 3world 2hadoop 1*/@Overrideprotected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {long count = 0;// 1.遍历集合将集合中的数字相加,得到v3for (LongWritable value : values) {count +=value.get();}// 2.将k3和v3写入上下文context.write(key,new LongWritable(count));}
}
定义JobMain主类用于启动MapReduce程序,将上述自定义的WordCountMapper类和WordCountReduce联系起来,并设置文件的输入输出流以及MapReduce程序的八个步骤。
package cn.itcast.mapreuce;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;import java.net.URI;public class JobMain extends Configured implements Tool {//该方法用于指定一个job任务@Overridepublic int run(String[] strings) throws Exception {// 1.创建 job任务对象 两个参数 1.configuration 2.job任务名称Job job = Job.getInstance(super.getConf(), "wordcount");// 打包运行出错添加job.setJarByClass(JobMain.class);// 2.配置 job任务对象(八个步骤)// 2.1 读取文件 ---指定读取类job.setInputFormatClass(TextInputFormat.class);// 指点源文件路径TextInputFormat.addInputPath(job,new Path("hdfs://hadoop01:9000/hadoop_mapreduce/wordcount"));// --本地测试--
// TextInputFormat.addInputPath(job,new Path("file:///D:\\input_javaword"));// 2.2 进入指定map阶段处理方式和数据类型// 设置map阶段用的类job.setMapperClass(WordCountMapper.class);// 设置Map阶段K2的类型 --- 单词(字符串)job.setMapOutputKeyClass(Text.class);// 设置Map阶段V2的类型 --- 数字(long)job.setMapOutputValueClass(LongWritable.class);// 2.3(4,5,6) 进入Shuffle阶段 --先采用默认方式处理// 2.7 指定Reduce阶段的处理方式和数据类型job.setReducerClass(WordCountReducer.class);// 设置Reduce阶段K3的类型 --- 单词(字符串)job.setOutputKeyClass(Text.class);// 设置Reduce阶段v3的类型 --- 单词(字符串)job.setOutputValueClass(LongWritable.class);// 2.8 设置输出类型job.setOutputFormatClass(TextOutputFormat.class);// 设置输出路径
// TextOutputFormat.setOutputPath(job,new Path("hdfs://hadoop01:9000/wordcount_out"));// 判断目标目录是否存在,存在则删除Path path = new Path("hdfs://hadoop01:9000/hadoop_mapreduce/wordcount_out");TextOutputFormat.setOutputPath(job,path);// 获取hdfs文件系统FileSystem fileSystem = FileSystem.get(new URI("hdfs://hadoop01:9000/hadoop_mapreduce/"), new Configuration());// --本地测试--
// FileSystem fileSystem = FileSystem.get(new URI("file:///D:\\output"), new Configuration());// 判断目录是否存在boolean exists = fileSystem.exists(path);if (exists){// 删除目标目录fileSystem.delete(path,true);}// 等待任务结束boolean bl = job.waitForCompletion(true);return bl ? 0:1;}public static void main(String[] args) throws Exception {Configuration configuration = new Configuration();// 启动 job任务 --记录任务执行状态 0表示成功int run = ToolRunner.run(configuration, new JobMain(), args);System.exit(run);}
}
程序启动
将源文件上传到上述定义的HDFS源文件夹中,并将MapReduce程序打包上传到Liunx系统中使用hadoop命令执行jar包运行。如下图所示先清空jar包再重新打包,在Liunx系统中上传压缩后的jab包即可。
运行指令:hadoop jar jar包名称 待运行主类的名称
hadoop jar hadoop_hdfs_operate-1.0-SNAPSHOT.jar demo1.JobMain
总结
上述实例实现了简单的MapReduce程序,对于Shuffle阶段的分区、排序、规约、分组均采用的是默认的方式。
分区实例
在 MapReduce 中, 通过我们指定分区, 会将同一个分区的数据发送到同一个 Reduce 当中进行处理
例如: 为了数据的统计, 可以把一批类似的数据发送到同一个 Reduce 当中, 在同一个 Reduce 当中统计相同类型的数据, 就可以实现类似的数据分区和统计等,其实就是相同类型的数据, 有共性的数据, 送到一起去处理。Reduce 当中默认的分区只有一个。
实现思路
与默认分区不同的是,自定义分区需自定义MyPartitioner类继承Partitioner并重新getPartition方法,之后在主类中需设置分区类和ReduceTask个数。指定的分区类在指定的Mapper类之后Reducer类之前,设置ReduceTack的个数在Reducer类之后
// 指定分区类 job.setPartitionerClass(MyPartitioner.class); // ------设置ReduceTack的个数------- job.setNumReduceTasks(2);
代码实现
// 自定义分区规则package partition;import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;public class MyPartitioner extends Partitioner<Text, NullWritable> {// 实现功能 定义分区规则并返回对应的分区编号@Overridepublic int getPartition(Text text, NullWritable nullWritable, int i) {// 拆分行文本数据--k2数据,获取中奖字段的值String[] split = text.toString().split("\t");String numStr = split[5];// 判断中奖字段的值和15的关系,返回对应的分区编号if (Integer.parseInt(numStr)>15){// 表示分区编号为 1return 1;}else {// 表示分区编号为 0return 0;}}
}
// 定义PartitionMapper类package partition;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;/*四个泛型解释:KEYIN :K1的类型 --java(Long)-->(LongWritable)VALUEIN:VI的类型 --java(String)-->(Text)KEYOUT:K2的类型 --java(String)-->(Text)VALUEOUT:V2的类型 --NullWritable 站位
*/// 2.重写 Map方法
// 将 <k1,v1> 转为 <k2,v2>/* 参数* key : k1 行偏移量* value :v1 每一行的文本数据* context :表示上下文对象,桥梁作用* */
public class PartitionMapper extends Mapper<LongWritable,Text,Text,NullWritable> {@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {/*将 <k1,v1> 转为 <k2,v2>*/context.write(value,NullWritable.get());}
}
// 定义PartitionReducer 类
package partition;import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;/*四个泛型解释:KEYIN: K2类型 --TextVALULEIN: V2类型 --NullWritableKEYOUT: K3类型 --TextVALUEOUT:V3类型 --NullWritable*/public class PartitionReducer extends Reducer<Text, NullWritable,Text, NullWritable> {@Overrideprotected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {// K2和V2转为 K3和V3context.write(key,NullWritable.get());}
}
// 定义启动类
package partition;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;import java.net.URI;// 固定模版 继承类实现接口
public class JobMain extends Configured implements Tool {@Overridepublic int run(String[] strings) throws Exception {// 1.创建 job任务对象 两个参数 1.configuration 2.job任务名称Job job = Job.getInstance(super.getConf(), "partition_maperduce");// 打包运行出错添加job.setJarByClass(partition.JobMain.class);// 2.配置 job任务对象(八个步骤)// 2.1 读取文件 ---指定读取类job.setInputFormatClass(TextInputFormat.class);// 指点源文件路径TextInputFormat.addInputPath(job,new Path("hdfs://hadoop01:9000/hadoop_mapreduce/partition"));// 2.2 进入指定map阶段处理方式和数据类型// 设置map阶段用的类job.setMapperClass(PartitionMapper.class);// 设置Map阶段K2的类型 --- 单词(字符串)job.setMapOutputKeyClass(Text.class);// 设置Map阶段V2的类型 --- 数字(long)job.setMapOutputValueClass(NullWritable.class);// 2.3(4,5,6) 进入Shuffle阶段 分区-排序-规约-分组// 指定分区类job.setPartitionerClass(MyPartitioner.class);// 4,5,6采用默认// 2.7 指定Reduce阶段的处理方式和数据类型job.setReducerClass(PartitionReducer.class);// 设置Reduce阶段K3的类型 --- 单词(字符串)job.setOutputKeyClass(Text.class);// 设置Reduce阶段v3的类型 --- 单词(字符串)job.setOutputValueClass(NullWritable.class);// ------设置ReduceTack的个数-------job.setNumReduceTasks(2);// 2.8 设置输出类型job.setOutputFormatClass(TextOutputFormat.class);// 设置输出路径Path path = new Path("hdfs://hadoop01:9000/hadoop_mapreduce/partition_out");TextOutputFormat.setOutputPath(job,path);// 获取hdfs文件系统FileSystem fileSystem = FileSystem.get(new URI("hdfs://hadoop01:9000/hadoop_mapreduce/"), new Configuration());// 判断目录是否存在boolean exists = fileSystem.exists(path);if (exists){// 删除目标目录fileSystem.delete(path,true);}// 等待任务结束boolean bl = job.waitForCompletion(true);return bl ? 0:1;}public static void main(String[] args) throws Exception {Configuration configuration = new Configuration();// 启动 job任务 --记录任务执行状态 0表示成功int run = ToolRunner.run(configuration, new JobMain(), args);System.exit(run);}
}
排序实例
- 序列化 (Serialization) 是指把结构化对象转化为字节流。
- 反序列化 (Deserialization) 是序列化的逆过程. 把字节流转为结构化对象. 当要在进程间传递对象或持久化对象的时候, 就需要序列化对象成字节流, 反之当要将接收到或从磁盘读取的字节流转换为对象, 就要进行反序列化。
- Java 的序列化 (Serializable) 是一个重量级序列化框架, 一个对象被序列化后, 会附带很多额外的信息 (各种校验信息, header, 继承体系等), 不便于在网络中高效传输. 所以, Hadoop 自己开发了一套序列化机制(Writable), 精简高效. 不用像 Java 对象类一样传输多层的父子关系, 需要哪个属性就传输哪个属性值, 大大的减少网络传输的开销。
- Writable 是 Hadoop 的序列化格式, Hadoop 定义了这样一个 Writable 接口. 一个类要支持可序列化只需实现这个接口即可。
- Writable 有一个子接口是 WritableComparable, WritableComparable 是既可实现序列化, 也可以对key进行比较, 我们这里可以通过自定义 Key 实现 WritableComparable 来实现排序功能。
a 1
a 9
b 3
a 7
b 8
b 10
a 5要求 :第一列按照字典顺序进行排列第一列相同的时候 , 第二列按照升序进行排列
实现思路
- 将 Map 端输出的 <key,value> 中的 key 和 value 组合成一个新的 key (newKey), value值不变
- 这里就变成 <(key,value),value> , 在针对 newKey 排序的时候, 如果 key 相同, 就再对 value进行排序
与默认排序不同的是,自定义排序需自定义SortBean类并重写WritableComparable接口,compareTo方法中实现自定义排序逻辑,write(readFields)方法实现序列化(反序列化)方法为固定格式写法,主类方面无需增添排序规则,会默认执行自定义的排序规则。
代码实现
// 自定义排序规则
package sort;import org.apache.hadoop.io.WritableComparable;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;// 序列化和排序
public class SortBean implements WritableComparable<SortBean> {// 自定义成员变量private String word;private int num;public String getWord() {return word;}public void setWord(String word) {this.word = word;}public int getNum() {return num;}public void setNum(int num) {this.num = num;}@Overridepublic String toString() {return word + "\t" + num;}// 自定义比较器方法,指定升序排序规则 先排word——再排num@Overridepublic int compareTo(SortBean o) {// 字符串比较int result = this.word.compareTo(o.word);// 如果字符串相同则按照第二列排序if (result == 0){return this.num - o.num;// 降序 return o.num - this.num;}return result;}// 序列化方法 固定格式@Overridepublic void write(DataOutput dataOutput) throws IOException {// 自定义字符串进行序列化dataOutput.writeUTF(word);// 自定义数字进行序列化dataOutput.writeInt(num);}// 反序列化方法@Overridepublic void readFields(DataInput dataInput) throws IOException {// 自定义字符串进行反序列化this.word = dataInput.readUTF();// 自定义数字进行反序列化this.num = dataInput.readInt();}
}
// SortMapper 类
package sort;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;/*map方法将K1和V1转为K2和V2:K1 V10 a 35 b 7----------------------K2 V2SortBean(a 3) NullWritableSortBean(b 7) NullWritable*/public class SortMapper extends Mapper<LongWritable, Text,SortBean, NullWritable> {@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {//1:将行文本数据(V1)拆分,并将数据封装到SortBean对象,就可以得到K2String[] split = value.toString().split("\t");SortBean sortBean = new SortBean();sortBean.setWord(split[0]);sortBean.setNum(Integer.parseInt(split[1]));//2:将K2和V2写入上下文中context.write(sortBean,NullWritable.get());}
}
// SortReducer 类
package sort;import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class SortReducer extends Reducer<SortBean, NullWritable,SortBean,NullWritable> {@Overrideprotected void reduce(SortBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {//reduce方法将新的K2和V2转为K3和V3 ---直接向后传递context.write(key, NullWritable.get());}
}
// JobMain 主类
package sort;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;import java.net.URI;// 固定模版 继承类实现接口
public class JobMain extends Configured implements Tool {@Overridepublic int run(String[] strings) throws Exception {// 1.创建 job任务对象 两个参数 1.configuration 2.job任务名称Job job = Job.getInstance(super.getConf(), "sort_maperduce");// 打包运行出错添加job.setJarByClass(sort.JobMain.class);// 2.配置 job任务对象(八个步骤)// 2.1 读取文件 ---指定读取类job.setInputFormatClass(TextInputFormat.class);// 指点源文件路径TextInputFormat.addInputPath(job,new Path("hdfs://hadoop01:9000/hadoop_mapreduce/sort"));// 2.2 进入指定map阶段处理方式和数据类型// 设置map阶段用的类job.setMapperClass(SortMapper.class);// 设置Map阶段K2的类型 --- 单词(字符串)job.setMapOutputKeyClass(SortBean.class);// 设置Map阶段V2的类型 --- 数字(long)job.setMapOutputValueClass(NullWritable.class);// 2.3(4,5,6) 进入Shuffle阶段 分区-排序-规约-分组// 排序只需指定排序规则即可// 2.7 指定Reduce阶段的处理方式和数据类型job.setReducerClass(SortReducer.class);// 设置Reduce阶段K3的类型 --- 单词(字符串)job.setOutputKeyClass(SortBean.class);// 设置Reduce阶段v3的类型 --- 单词(字符串)job.setOutputValueClass(NullWritable.class);// 2.8 设置输出类型job.setOutputFormatClass(TextOutputFormat.class);// 设置输出路径Path path = new Path("hdfs://hadoop01:9000/hadoop_mapreduce/sort_out");TextOutputFormat.setOutputPath(job,path);// 获取hdfs文件系统FileSystem fileSystem = FileSystem.get(new URI("hdfs://hadoop01:9000/hadoop_mapreduce/"), new Configuration());// 判断目录是否存在boolean exists = fileSystem.exists(path);if (exists){// 删除目标目录fileSystem.delete(path,true);}// 等待任务结束boolean bl = job.waitForCompletion(true);return bl ? 0:1;}public static void main(String[] args) throws Exception {Configuration configuration = new Configuration();// 启动 job任务 --记录任务执行状态 0表示成功int run = ToolRunner.run(configuration, new JobMain(), args);System.exit(run);}
}
规约实例
每一个 map 都可能会产生大量的本地输出,Combiner 的作用就是对 map 端的输出先做一次合并,以减少在 map 和 reduce 节点之间的数据传输量,以提高网络IO 性能,是 MapReduce的一种优化手段之一。
1. combiner 是 MR 程序中 Mapper 和 Reducer 之外的一种组件
2. combiner 组件的父类就是 Reducer
3. combiner 和 reducer 的区别在于运行的位置
Combiner 是在每一个 maptask 所在的节点运行
Reducer 是接收全局所有 Mapper 的输出结果
4. combiner 的意义就是对每一个 maptask 的输出进行局部汇总,以减小网络传输量
实现思路
- 1. 自定义一个 combiner 继承 Reducer,重写 reduce 方法
- 2. 在 job 中设置 job.setCombinerClass(CustomCombiner.class)
代码实现
下述代码继续以词频统计为实现对象,添加combine规约处理。与默认规约不同的是,自定义规约需自定义MyCombiner类重写Reducer类(本案例添加规约即提前执行一遍Reducer流程),之后在主类中需设置规约类 job.setCombinerClass(MyCombiner.class),在指定的Mapper类之后Reducer类之前。
// 自定义规约
package combiner;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class MyCombiner extends Reducer<Text, LongWritable,Text, LongWritable> {@Overrideprotected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {long count = 0;// 1.遍历集合将集合中的数字相加,得到v3for (LongWritable value : values) {count +=value.get();}// 2.将k3和v3写入上下文context.write(key,new LongWritable(count));}
}
package combiner;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;/*四个泛型解释:KEYIN :K1的类型 --java(Long)-->(LongWritable)VALUEIN:VI的类型 --java(String)-->(Text)KEYOUT:K2的类型 --java(String)-->(Text)VALUEOUT:V2的类型 --java(Long)-->(LongWritable)
*/
// 1.继承 Mapper类
public class WordCountMapper extends Mapper<LongWritable, Text,Text,LongWritable> {// 2.重写 Map方法// 将 <k1,v1> 转为 <k2,v2>/* 参数* key : k1 行偏移量* value :v1 每一行的文本数据* context :表示上下文对象,桥梁作用* *//*如何将 <k1,v1> 转为 <k2,v2>K1 V10 hello,world,hadoop15 hdfs,hive,hello---------------------------K2 V2hello 1world 1hdfs 1hadoop 1hello 1*/@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {Text text = new Text();LongWritable longWritable = new LongWritable();// 1.将一行数据的文本数据进行拆分String[] split = value.toString().split(",");// 2.遍历数组,组装 k2和v2for (String word : split) {// 3.将 k2和v2写入上下文 --将数据发送到下一个阶段进行处理
// context.write(new Text(word),new L
// ongWritable(1));text.set(word);longWritable.set(1);context.write(text, longWritable);}}
}
package combiner;/*四个泛型解释:KEYIN: K2类型VALULEIN: V2类型KEYOUT: K3类型VALUEOUT:V3类型*/import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class WordCountReducer extends Reducer<Text, LongWritable,Text, LongWritable> {//reduce方法作用: 将新的K2和V2转为 K3和V3 ,将K3和V3写入上下文中/*参数:key : 新K2values: 集合 新 V2context :表示上下文对象----------------------如何将新的K2和V2转为 K3和V3新 K2 V2hello <1,1,1>world <1,1>hadoop <1>------------------------K3 V3hello 3world 2hadoop 1*/@Overrideprotected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {long count = 0;// 1.遍历集合将集合中的数字相加,得到v3for (LongWritable value : values) {count +=value.get();}// 2.将k3和v3写入上下文context.write(key,new LongWritable(count));}
}
package combiner;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;import java.net.URI;public class JobMain extends Configured implements Tool {//该方法用于指定一个job任务@Overridepublic int run(String[] strings) throws Exception {// 1.创建 job任务对象 两个参数 1.configuration 2.job任务名称Job job = Job.getInstance(super.getConf(), "add_combiner_wordcount");// 打包运行出错添加job.setJarByClass(JobMain.class);// 2.配置 job任务对象(八个步骤)// 2.1 读取文件 ---指定读取类job.setInputFormatClass(TextInputFormat.class);// 指点源文件路径TextInputFormat.addInputPath(job,new Path("hdfs://hadoop01:9000/hadoop_mapreduce/add_combiner_wordcount"));// --本地测试--
// TextInputFormat.addInputPath(job,new Path("file:///D:\\input_javaword"));// 2.2 进入指定map阶段处理方式和数据类型// 设置map阶段用的类job.setMapperClass(WordCountMapper.class);// 设置Map阶段K2的类型 --- 单词(字符串)job.setMapOutputKeyClass(Text.class);// 设置Map阶段V2的类型 --- 数字(long)job.setMapOutputValueClass(LongWritable.class);// 2.3(4,,6) 进入Shuffle阶段 --先采用默认方式处理// 2.5 ---添加规约---job.setCombinerClass(MyCombiner.class);// 2.7 指定Reduce阶段的处理方式和数据类型job.setReducerClass(WordCountReducer.class);// 设置Reduce阶段K3的类型 --- 单词(字符串)job.setOutputKeyClass(Text.class);// 设置Reduce阶段v3的类型 --- 单词(字符串)job.setOutputValueClass(LongWritable.class);// 2.8 设置输出类型job.setOutputFormatClass(TextOutputFormat.class);// 设置输出路径
// TextOutputFormat.setOutputPath(job,new Path("hdfs://hadoop01:9000/wordcount_out"));// 判断目标目录是否存在,存在则删除Path path = new Path("hdfs://hadoop01:9000/hadoop_mapreduce/out_combiner_wordcount");TextOutputFormat.setOutputPath(job,path);// 获取hdfs文件系统FileSystem fileSystem = FileSystem.get(new URI("hdfs://hadoop01:9000/hadoop_mapreduce/"), new Configuration());// --本地测试--
// FileSystem fileSystem = FileSystem.get(new URI("file:///D:\\output"), new Configuration());// 判断目录是否存在boolean exists = fileSystem.exists(path);if (exists){// 删除目标目录fileSystem.delete(path,true);}// 等待任务结束boolean bl = job.waitForCompletion(true);return bl ? 0:1;}public static void main(String[] args) throws Exception {Configuration configuration = new Configuration();// 启动 job任务 --记录任务执行状态 0表示成功int run = ToolRunner.run(configuration, new JobMain(), args);System.exit(run);}
}
分组实例
分组是mapreduce当中reduce端的一个功能组件,主要的作用是决定哪些数据作为一组,调用一次reduce的逻辑,默认是每个不同的key,作为多个不同的组,每个组调用一次reduce逻辑,我们可以自定义分组实现不同的key作为同一个组,调用一次reduce逻辑。
实例需求
求出每一个订单中成交金额最大的一笔交易,订单如下图所示。
实现思路
- 利用“订单id和成交金额”作为key,可以将map阶段读取到的所有订单数据按照id分区,按照金额排序,发送到reduce
- 在reduce端利用分组将订单id相同的kv聚合成组,然后取第一个即是最大值
- 首先第一步定义一个OrderBean,里面定义两个字段,第一个字段是我们的orderId,第二个字段是我们的金额(注意金额一定要使用Double或者DoubleWritable类型,否则没法按照金额顺序排序)定义好排序规则。
- 第二步定义Mapper类封装OrderBean,得到K2,v2是传递过来的v1并将<k2,v2>写入上下文进行后续处理。
- 第三步自定义分区,按照订单id进行分区,把所有订单id相同的数据,都发送到同一个reduce中去。
- 第四步自定义分组按照我们自己的逻辑进行分组,通过比较相同的订单id,将相同的订单id放到一个组里面去,进过分组之后当中的数据,已经全部是排好序的数据,我们只需要取前topN即可
- 第五步定义Reduce类将每个分区中的第一条记录取出即实现需求
- 第六步定义JobMain类将上述步骤串起来打包放在Hadoop上运行查看结果
// 第一步
package mygrouping;import org.apache.hadoop.io.WritableComparable;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;public class OrderBean implements WritableComparable<OrderBean> {// 成员变量 订单idprivate String orderid;// 成交金额private Double price;public String getOrderid() {return orderid;}public void setOrderid(String orderid) {this.orderid = orderid;}public Double getPrice() {return price;}public void setPrice(Double price) {this.price = price;}@Overridepublic String toString() {return orderid + '\t' + price ;}// 指定排序规则@Overridepublic int compareTo(OrderBean o) {// 1.比较订单id,若订单id一致则进行金额比较排序(降序)compareTo一致返回0int i = this.orderid.compareTo(o.orderid);if (i==0){// 行金额比较排序(降序)i = this.price.compareTo(o.price) * -1;}return i;}// 实现对象序列化@Overridepublic void write(DataOutput dataOutput) throws IOException {dataOutput.writeUTF(orderid);dataOutput.writeDouble(price);}// 实现对象反序列化@Overridepublic void readFields(DataInput dataInput) throws IOException {this.orderid = dataInput.readUTF();this.price = dataInput.readDouble();}
}
// 第二步
package mygrouping;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class GroupMapper extends Mapper<LongWritable, Text,OrderBean,Text> {@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {// 1.拆分行文本数据,得到订单id和订单金额String[] split = value.toString().split("\t");// 2.封装OrderBean得到 K2OrderBean orderBean = new OrderBean();orderBean.setOrderid(split[0]);orderBean.setPrice(Double.valueOf(split[2]));// 3.写入上下文context.write(orderBean,value);}
}
// 第三步
package mygrouping;import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;public class OrderPatition extends Partitioner<OrderBean, Text> {// 分区规则 ---> 根据订单id实现分区/** orderBean -----> k2* text -----> v2* i -----> ReduceTask个数* 返回分区编号* */@Overridepublic int getPartition(OrderBean orderBean, Text text, int i) {return (orderBean.getOrderid().hashCode() & 2147483647) % i;}
}
// 第四步
package mygrouping;import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;/*
* 分组--->默认按照k2分组,k2相同的分到一个组
* 实现步骤:
* 1.继承 WritableComparator类
* 2.调用父类的有参构造
* 3.指定分组规则(重写方法)
* */
public class OrderGroupComparator extends WritableComparator {// 自定义无参构造public OrderGroupComparator() {// 调用父类的有参构造 true允许创建 OrderBean实例super(OrderBean.class,true);}// 指定分组规则@Overridepublic int compare(WritableComparable a, WritableComparable b) {// 1.对形参做强类型转换OrderBean first = (OrderBean) a;OrderBean second = (OrderBean) b;// 2.指定分组规则 first.getOrderid()与second.getOrderid()是否相同相同则分到同一组中return first.getOrderid().compareTo(second.getOrderid());}
}
// 第五步
package mygrouping;import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class GroupReducer extends Reducer<OrderBean, Text,Text, NullWritable> {@Overrideprotected void reduce(OrderBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException {// <k2,v2>转为<k3,v3>int i = 0;for (Text value : values) {context.write(value,NullWritable.get()); // 默认输出集合中所有数据i++;if (i>1){break;}}}
}
// 第六步
package mygrouping;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;import java.net.URI;public class JobMain extends Configured implements Tool {//该方法用于指定一个job任务@Overridepublic int run(String[] strings) throws Exception {// 1.创建 job任务对象 两个参数 1.configuration 2.job任务名称Job job = Job.getInstance(super.getConf(), "mygrouping");// 打包运行出错添加job.setJarByClass(JobMain.class);// 2.配置 job任务对象(八个步骤)// 2.1 读取文件 ---指定读取类job.setInputFormatClass(TextInputFormat.class);// 指点源文件路径TextInputFormat.addInputPath(job,new Path("hdfs://hadoop01:9000/hadoop_mapreduce/mygrouping"));// 2.2 进入指定map阶段处理方式和数据类型// 设置map阶段用的类job.setMapperClass(GroupMapper.class);// 设置Map阶段K2的类型 --- 单词(字符串)job.setMapOutputKeyClass(OrderBean.class);// 设置Map阶段V2的类型 --- 数字(long)job.setMapOutputValueClass(Text.class);// 2.3(4,5,6) 进入Shuffle阶段 --先采用默认方式处理// -----分区-----job.setPartitionerClass(OrderPatition.class);// -----排序(定义好自动执行)-----// -----分组-----job.setGroupingComparatorClass(OrderGroupComparator.class);// 2.7 指定Reduce阶段的处理方式和数据类型job.setReducerClass(GroupReducer.class);// 设置Reduce阶段K3的类型 --- 单词(字符串)job.setOutputKeyClass(Text.class);// 设置Reduce阶段v3的类型 --- 单词(字符串)job.setOutputValueClass(NullWritable.class);// ----设置 ReduceTesk个数(默认一个)// 2.8 设置输出类型job.setOutputFormatClass(TextOutputFormat.class);// 设置输出路径
// TextOutputFormat.setOutputPath(job,new Path("hdfs://hadoop01:9000/wordcount_out"));// 判断目标目录是否存在,存在则删除Path path = new Path("hdfs://hadoop01:9000/hadoop_mapreduce/mygrouping_out");TextOutputFormat.setOutputPath(job,path);// 获取hdfs文件系统FileSystem fileSystem = FileSystem.get(new URI("hdfs://hadoop01:9000/hadoop_mapreduce/"), new Configuration());// --本地测试--
// FileSystem fileSystem = FileSystem.get(new URI("file:///D:\\output"), new Configuration());// 判断目录是否存在boolean exists = fileSystem.exists(path);if (exists){// 删除目标目录fileSystem.delete(path,true);}// 等待任务结束boolean bl = job.waitForCompletion(true);return bl ? 0:1;}public static void main(String[] args) throws Exception {Configuration configuration = new Configuration();// 启动 job任务 --记录任务执行状态 0表示成功int run = ToolRunner.run(configuration, new JobMain(), args);System.exit(run);}
}