本文共 3629 字,大约阅读时间需要 12 分钟。
Mapreduce的编程流程主要分八个阶段:两个MAP阶段,四个shuffle阶段,两个reduce阶段。
Map两个阶段:
1:设置inputformat类,将数据分为key-value对(k1 v1),并将其输入到第二步。
2:自定义Map逻辑将第一步的结果转换为另外的键值对(k2,v2),并输出。
shuffle四个阶段:
3:对输出的键值对进行分区。
4:对不同分区的数据按照相同的key排序。
5:(可选),对分组后的数据进行初步规约,降低数据的网络拷贝。
6:对数据进行分组,将相同key的value放入同一个集合中。
Reduce两个阶段:
7:对多个Map任务的结果进行排序以及合并,编写Reduce函数实现自己的逻辑,对输入的键对值进行处理,转换为新的键对值(k3 v3),输出。
8:设置outputformat类处理并保存Reduce输出的键值对。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-pbJTboOf-1603113436059)(https://s1.ax1x.com/2020/09/29/0eqy8g.png)]
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-K0BpE1tg-1603113436066)(https://s1.ax1x.com/2020/09/29/0eq4aV.png)]
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-UlWoI5sv-1603113436069)(https://s1.ax1x.com/2020/09/29/0eLkqI.png)]
链接:
重写Mapper类:
package org.example.wordcount;import java.io.IOException;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;public class WcMapper extends Mapper{ Text k = new Text(); IntWritable v = new IntWritable(1); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 1 获取一行 String line = value.toString(); // 2 切割 String[] words = line.split(" "); // 3 输出 for (String word : words) { k.set(word); context.write(k, v); } }}
重写Reducer类:
package org.example.wordcount;import java.io.IOException;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;public class WcReducer extends Reducer{ int sum; IntWritable v = new IntWritable(); @Override protected void reduce(Text key, Iterable values,Context context) throws IOException, InterruptedException { // 1 累加求和 sum = 0; for (IntWritable count : values) { sum += count.get(); } // 2 输出 v.set(sum); context.write(key,v); }}
main函数主驱动类:
package org.example.wordcount;import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.junit.Test;public class WcDriver { @Test public void main() throws IOException, ClassNotFoundException, InterruptedException { // 1 获取配置信息以及封装任务 Configuration configuration = new Configuration(); Job job = Job.getInstance(configuration); // 2 设置jar加载路径 job.setJarByClass(WcDriver.class); // 3 设置map和reduce类 job.setMapperClass(WcMapper.class); job.setReducerClass(WcReducer.class); // 4 设置map输出 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); // 5 设置最终输出kv类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // 6 设置输入和输出路径 FileInputFormat.setInputPaths(job, new Path("D:/IDEA java项目/data/a.txt")); FileOutputFormat.setOutputPath(job, new Path("D:/IDEA java项目/data/output")); // 7 提交 boolean result = job.waitForCompletion(true); System.exit(result ? 0 : 1); }}
1:将mapreduce程序提交给Yarn集群,分发到许多节点并发执行。
2:处理和输出的数据应位于hdfs文件系统中。
3:将文件打为jar包并上传,在集群中使用Hadoop命令执行。
应使用IDEA将map程序打包成jar包。
map程序用在本地用单进程执行,且处理和输出的数据在本地文件系统中。
转载地址:http://zicki.baihongyu.com/