第5章MapReduce的介绍和简单使用 MapReduce是Hadoop上原生的分布式计算框架,本章主要对MapReduce计算框架的原理和开发环境的搭建进行介绍。本章内容安排如下。 5.1MapReduce简介 对MapReduce进行简单的介绍。 5.2Map过程 介绍MapReduce的Map过程。 5.3Reduce过程 介绍MapReduce的Reduce过程。 5.4开发环境的搭建 介绍如何搭建使用MapReduce需要的环境。 5.5实验 描述一些简单的MapReduce实验。 通过本章的学习,读者将对MapReduce有初步的了解,对Map和Reduce过程的原理有更清晰的认识,同时通过对MapReduce环境的搭建和简单的程序示例,能够加深读者对MapReduce工作原理的认识。 5.1MapReduce简介 MapReduce (MR)是现今一个非常流行的分布式计算框架,它被设计用于并行计算海量数据,通常是存储在HDFS上TB级和PB级别的数据。其前身是Google公司的MapReduce。MapReduce 框架将复杂的大规模并行计算高度抽象为两个函数: Map函数和Reduce函数。Map(映射)和Reduce(归约)以及其主要思想都是从函数式编程语言中借鉴过来的。Map负责把作业分解为多个任务,Reduce负责把分解后的多个任务处理的结果汇总起来。 当向MapReduce 框架提交一个计算作业(Job)时,它会首先把计算作业拆分成若干个Map 任务(Task),然后以完全并行的方式处理,分配到不同的节点上去执行,每一个Map 任务处理输入数据中的一部分,当Map 任务完成后,它会生成一些中间文件,这些中间文件将会作为Reduce 任务的输入数据。简单地说,MapReduce就是“任务的分解与结果的汇总”这样的一个过程。 在Hadoop中,用于执行MapReduce任务的对象有两个: JobTracker和TaskTracker。JobTracker是用于调度工作的,一个Hadoop集群中只有一个JobTracker,位于Master上。TaskTracker用于执行工作,位于各个Slave上。 需要特别注意的是,MapReduce处理的数据集必须可以分解成许多小数据集,而且每个小的数据集都可以完全独立地并行处理。 5.2Map过程 在第4章中已经介绍过,HDFS存储数据是按块存储,每个块的大小默认为128MB,而一个块为一个分片,一个Map任务处理一个分片,当然,也可以根据需要自主设置块的大小。Map输出的结果会暂时放在一个环形内存缓冲区中(缓冲区默认大小为100MB),当该缓冲区接近溢出时(默认为缓冲区大小的80%),会在本地文件系统中创建一个新文件,将该缓冲区中的数据写入这个文件; 在写入磁盘之前,首先根据Reduce任务的数目将数据划分为相同数目的分区,一个分区的中间数据对应一个Reduce任务。这样做是为了避免数据分配不均匀的情况。 当Map任务输出最后一个记录时,可能会有很多的溢出文件,这时需要将这些文件进行合并。合并的过程中会不断地进行排序和合并操作(即combia操作),这样做的目的有以下两个。 (1) 尽量减少每次写入磁盘的数据量。 (2) 尽量减少下一复制阶段网络传输的数据量。 最后合并完成后,会形成一个已分区且已排序的文件。为了减少网络传输的数据量,这里可以将数据进行压缩。 5.3Reduce过程 Reduce会接收到不同Map任务传来的数据,并且每个Map传来的数据都是有序的。如果Reduce端接收的数据量相当小,则直接存储在内存中,如果数据量超过了该缓冲区大小的一定比例,则对数据合并后溢写到磁盘中。 随着溢写文件的增多,后台线程会将它们合并成一个更大的有序的文件,这样做是为了给后面的合并节省时间。其实,不管在Map端还是Reduce端,MapReduce都在反复地执行排序、合并操作; 合并的过程中会产生许多的中间文件(写入磁盘),但MapReduce会让写入磁盘的数据尽可能的少,并且最后一次合并的结果并没有写入磁盘,而是直接输入到Reduce函数。 所以,一个完整的MapReduce过程如图5.1所示。 图5.1MapReduce过程 该过程的流程说明如下。 1. Map过程 (1) 输入文件,InputFormat产生键值对,并传送到Mapper类的Map函数中。 (2) Map输出键值对到一个没有排序的缓冲内存中。 (3) 当缓冲内存达到给定值或者Map完成时,就会对缓冲区内的键值对进行排序,然后溢写到磁盘上。 (4) 如果有多个溢出文件,那么将这些文件整合到一个文件中,并且这些文件是经过排序的。 (5) 在这些过程中,排序后的键值对等待Reducer获取。 2. Reduce过程 (1) Reducer获取Mapper的记录,作为输入。 (2) 相同的Key被传入同一个Reducer中。 (3) 当一个Mapper完成后,Reducer就开始获取Mapper结果,所有溢出文件被排序后放到一个内存缓冲区。 (4) 当内存缓冲区满后,就会产生溢出文件,存入本地磁盘。 (5) Reducer中所有数据传输完成后,所有溢出文件被整合和排序。 (6) Reducer将结果输出到HDFS。 5.4开发环境的搭建 在Windows环境下使用MapReduce进行实验前,需要搭建用于本地开发的MapReduce环境。本书使用的IDE为Eclipse,因此在搭建MapReduce开发环境前需要安装Eclipse 以及Hadoop插件。 本书在这里就不再对Eclipse的安装进行阐述了,读者可根据需要自行进行安装,接下来主要介绍Hadoop插件的安装。 (1) 下载Hadoop插件,将下载的插件存放到Eclipse的插件目录中,如图5.2所示。 图5.2保存插件到Eclipse的插件目录 (2) 删除Eclipse中configuration目录下的update文件夹,如图5.3所示,让Eclipse重新读取插件。 图5.3删除update文件夹 (3) 解压一份Hadoop插件文件到本地的磁盘,如图5.4所示。 图5.4解压Hadoop文件到本地 (4) 使用在Windows下编译Hadoop中的bin文件替换原本的bin文件,如图5.5所示。 图5.5替换bin文件 (5) 打开Eclipse,在菜单栏中选择Window→Preferences,如图5.6所示。 图5.6打开Eclipse的Preferences对话框 (6) 设置Hadoop文件的目录并添加环境变量,如图5.7所示。 图5.7设置路径及环境变量 (7) 设置Hadoop连接配置。在Eclipse菜单栏中选择Window→Show View→Other→MapReduce Tools,如图5.8所示。 图5.8设置Hadoop连接配置 (8) 在如图5.9所示的界面中单击右上角的“添加”按钮。然后按照如图5.10所示配置连接参数,单击“完成”按钮即可。 图5.9MapReduce Tool窗口 图5.10设置连接数据 (9) 配置成功后,就可以在Eclipse的窗口中看到如图5.11所示的Hadoop连接窗口。 图5.11Hadoop连接配置成功 5.5实验 本节将使用几个简单的实验来加深读者对MapReduce工作原理的理解。 5.5.1单词计数 本节将从项目的创建开始,向读者展示单词计数实验的整个操作过程,在后面的实验中,将只给出源码,其余操作请读者参考本节内容。 (1) 新建一个Map/Reduce项目,如图5.12所示。 图5.12新建Map/Reduce项目 (2) 将项目命名为“WordCounter”,如图5.13所示。 图5.13给项目命名 (3) 配置项目内容,如图5.14所示,单击Finish按钮,项目创建完成。 图5.14配置项目内容 (4) 在项目中新建一个类,输入以下代码。 package word; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class WordCountApp { public static class MyMapper extends Mapper { private Text word = new Text(); private IntWritable one = new IntWritable(1); @Override protected void map(LongWritable key, Text value, Mapper.Context context) throws IOException, InterruptedException { // 得到输入的每一行数据 String line = value.toString(); // 分割数据,通过空格来分割 String[] words = line.split("_"); // 循环遍历并输出 for(String w :words) { word.set(w); context.write(word, one); } } } public static class MyReducer extends Reducer { private IntWritable sum = new IntWritable(); @Override protected void reduce(Text key, Iterable values, Reducer.Context content) throws IOException, InterruptedException { Integer count = 0; for(IntWritable value :values) { count += value.get(); } sum.set(count); content.write(key, sum); } } public static void main(String[] args) throws Exception { if(args.length < 2) { args = new String[]{ "hdfs://10.250.109.123:8020/words", "hdfs://10.250.109.123:8020/out05" }; } // 创建配置对象 Configuration conf = new Configuration(); // 创建job对象 Job job = Job.getInstance(conf, "wordcount"); // 设置运行job的主类 job.setJarByClass(WordCountApp.class); // 设置mapper类 job.setMapperClass(MyMapper.class); // 设置reducer类 job.setReducerClass(MyReducer.class); // 设置mapper输出的key value job.setMapOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // 设置reducer输出的key value类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // 设置输入的路径 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); // 提交job boolean b = job.waitForCompletion(true); if(!b) { System.err.println("This task has failed!!!"); } } } (5) 通过文件或者直接输入的方式将数据文件上传到Hadoop,如图5.15所示,右键上传数据文件。本书使用的实验数据如图5.16所示。 图5.15上传数据 图5.16实验数据 (6) 运行程序,如图5.17所示,右击“类”,选择Run As→1 Java Application命令。 图5.17运行程序 (7) 运行完成后,打开如图5.18所示的文件,查看运行结果。 图5.18查看运行结果 5.5.2二次排序实验 (1) 新建一个项目。 (2) 在项目中,新建一个类,这里命名为“IntPair”,类的实现代码如下。 package expBigData.MapReduce.SecondarySort; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.WritableComparable; public class IntPair implements WritableComparable { private IntWritable first; private IntWritable second; public void set(IntWritable first, IntWritable second) { this.first = first; this.second = second; } //注意: 需要添加无参的构造方法,否则反射时会报错 public IntPair() { set(new IntWritable(), new IntWritable()); } public IntPair(int first, int second) { set(new IntWritable(first), new IntWritable(second)); } public IntPair(IntWritable first, IntWritable second) { set(first, second); } //其他成员函数 public IntWritable getFirst() { return first; } public void setFirst(IntWritable first) { this.first = first; } public IntWritable getSecond() { return second; } public void setSecond(IntWritable second) { this.second = second; } public void write(DataOutput out) throws IOException { first.write(out); second.write(out); } public void readFields(DataInput in) throws IOException { first.readFields(in); second.readFields(in); } public int hashCode() { return first.hashCode() * 163 + second.hashCode(); } public boolean equals(Object o) { if(o instanceof IntPair) { IntPair tp = (IntPair) o; return first.equals(tp.first) && second.equals(tp.second); } return false; } public String toString() { return first + "\\t" + second; } public int compareTo(IntPair tp) { int cmp = first.compareTo(tp.first); if(cmp != 0) { return cmp; } return second.compareTo(tp.second); } } (3) 再新建一个Java文件,命名为“SecondarySort”,在该文件中编写主程序,主程序代码如下。 package expBigData.MapReduce.SecondarySort; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Partitioner; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class SecondarySort { static class TheMapper extends Mapper { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] fields = value.toString().split("\\t"); int field1 = Integer.parseInt(fields[0]); int field2 = Integer.parseInt(fields[1]); context.write(new IntPair(field1, field2), NullWritable.get()); } } static class TheReduce extends Reducer { // private static final Text SEPARATOR=new Text("----------------------"); @Override protected void reduce(IntPair key, Iterable values, Context context) throws IOException, InterruptedException { context.write(key, NullWritable.get()); } } public static class FirstPartitioner extends Partitioner { public int getPartition(IntPair key, NullWritable value, int numPartitions) { return Math.abs(key.getFirst().get()) % numPartitions; } } //如果不添加这个类,默认第一列和第二列是升序排序的 //这个类的作用是使第一列升序排序,第二列降序排序 public static class KeyComparator extends WritableComparator { // 必须加上无参构造器,否则报错 protected KeyComparator() { super(IntPair.class, true); } public int compare(WritableComparable a, WritableComparable b) { IntPair ip1 = (IntPair) a; IntPair ip2 = (IntPair) b; // 第一列按升序排列 int cmp = ip1.getFirst().compareTo(ip2.getFirst()); if(cmp != 0) { return cmp; } // 在第一列相等的情况下,第二列按降序排序 return -ip1.getSecond().compareTo(ip2.getSecond()); } } //入口程序 public static void main(String[] args) throws Exception { if(args.length < 2) { args = new String[] { "hdfs://10.250.109.123:8020/dhy/in/secondsort.txt", "hdfs://10.250.109.123:8020/dhy/out/secondarysort_out00" }; } Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(SecondarySort.class); // 设置mapper的相关属性 job.setMapperClass(TheMapper.class); // 当mapper中的输出key和value类型和reducer中的相同时,以下两句省略 job.setMapOutputKeyClass(IntPair.class); job.setMapOutputValueClass(NullWritable.class); FileInputFormat.setInputPaths(job, new Path(args[0])); // 设置分区相关属性 job.setPartitionerClass(FirstPartitioner.class); // 在mapper中对key进行排序 job.setSortComparatorClass(KeyComparator.class); // job.setSortGroupComparatorClass(GroupComparator.class); // 设置reducer的相关属性 job.setReducerClass(TheReduce.class); job.setOutputKeyClass(IntPair.class); job.setOutputValueClass(NullWritable.class); FileOutputFormat.setOutputPath(job, new Path(args[1])); // 设置reducer数量 int reduceNum = 1; if(args.length >= 3 && args[2] != null) { reduceNum = Integer.parseInt(args[2]); } job.setNumReduceTasks(reduceNum); job.waitForCompletion(true); } } 5.5.3计数器实验 (1) 新建一个项目。 (2) 在项目中编写主程序,代码如下。 package cn.cqu.wzl; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class Counters { public static class MyCounterMap extends Mapper{ public static org.apache.hadoop.mapreduce.Counter ct =null; @Override protected void map(LongWritable key, Text value, Mapper.Context context) throws IOException, InterruptedException { String arr_value[]=value.toString().split("\t"); if(arr_value.length>3) { ct = context.getCounter("ERRorCounter", "toolong"); System.out.println("toolong"); ct.increment(1); }else if(arr_value.length<3) { ct = context.getCounter("ERRorCounter", "tooshort"); System.out.println("tooshort"); ct.increment(1); } } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { if(args.length<2) { args =new String[] { "hdfs://10.250.109.123:8020/datas/counters", "hdfs://10.250.109.123:8020/result/counters" }; } Configuration conf = new Configuration(); Job job = new Job(conf,"Counter"); job.setMapperClass(MyCounterMap.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true)?0:1); } }