1、定义组合key
package com.cr.com.cr.test;import org.apache.hadoop.io.WritableComparable;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;public class ComKey implements WritableComparable { private int year; private int temp; public int getYear() { return year; } public void setYear(int year) { this.year = year; } public int getTemp() { return temp; } public void setTemp(int temp) { this.temp = temp; } //对key进行比较实现 public int compareTo(ComKey o) { System.out.println("对key进行比较实现compareTo" ); int y1 = o.getYear(); int t1 = o.getTemp(); System.out.println(y1 + "=" + t1); //如果年份相同 if (year == y1) { //气温降序 int result = -(temp - t1); System.out.println("年份相同,比较气温" + result); return result; } else { //年份升序 int result = year - y1; System.out.println("年份升序" + result); return result; } } //串行化过程 public void write(DataOutput out) throws IOException { out.writeInt(year); out.writeInt(temp); } //反串行化过程 public void readFields(DataInput in) throws IOException { this.year = in.readInt(); this.temp = in.readInt(); } @Override public String toString() { return "year:" + year + ",temp:" + temp; }}
2、mapper
package com.cr.com.cr.test;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;/** * mapper:输出为组合key,输出value为空值 */public class MaxTempMapper extends Mapper { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { System.out.println("进入mapper"); String line = value.toString(); String[] arr = line.split(" "); ComKey keyout = new ComKey(); keyout.setYear(Integer.parseInt(arr[0])); keyout.setTemp(Integer.parseInt(arr[1])); System.out.println("mapper输出key"+keyout.getYear()+ "==" + keyout.getTemp()); System.out.println("mapper输出value==nullwritable"); context.write(keyout,NullWritable.get()); }}
3、进行分区
package com.cr.com.cr.test;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.mapreduce.Partitioner;public class YearPartitioner extends Partitioner { @Override public int getPartition(ComKey comkey, NullWritable nullWritable, int i) { System.out.println("进行分区YearPartitioner" ); int year = comkey.getYear(); System.out.println("分区"+year % i); return year % i; }}
4、对mapper的输出key进行比较,年份升序排列,如果年份相同,按照气温降序排列
package com.cr.com.cr.test;import org.apache.hadoop.io.RawComparator;import org.apache.hadoop.io.WritableComparable;import org.apache.hadoop.io.WritableComparator;public class ComKeyComparator extends WritableComparator { ComKeyComparator() { super(ComKey.class, true); } @Override public int compare(WritableComparable a, WritableComparable b) { System.out.println("进入组合key比较ComKeyComparator"); System.out.println(a + "==" + b); int result = a.compareTo(b); System.out.println(" a.compareTo(b)比较结果:"+result); return result; }}
5、reducer按照key聚合
package com.cr.com.cr.test;import org.apache.hadoop.io.RawComparator;import org.apache.hadoop.io.WritableComparable;import org.apache.hadoop.io.WritableComparator;/** * 按照年份进行分组对比器实现 */public class YearGroupComparator extends WritableComparator { YearGroupComparator() { super(ComKey.class, true); } @Override public int compare(WritableComparable a, WritableComparable b) { System.out.println("年份进行分组对比器YearGroupComparator"); int y1 = ((ComKey) a).getYear(); int y2 = ((ComKey) b).getYear(); int result = y1 - y2; return result; }}
6、对年份进行分组
package com.cr.com.cr.test;import org.apache.hadoop.io.RawComparator;import org.apache.hadoop.io.WritableComparable;import org.apache.hadoop.io.WritableComparator;/** * 按照年份进行分组对比器实现 */public class YearGroupComparator extends WritableComparator { YearGroupComparator() { super(ComKey.class, true); } @Override public int compare(WritableComparable a, WritableComparable b) { System.out.println("年份进行分组对比器YearGroupComparator"); int y1 = ((ComKey) a).getYear(); int y2 = ((ComKey) b).getYear(); int result = y1 - y2; return result; }}
7、主函数
package com.cr.com.cr.test;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class MaxTempApp { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { //单例作业 Configuration conf = new Configuration(); conf.set("fs.defaultFS","file:///"); Job job = Job.getInstance(conf); System.setProperty("hadoop.home.dir","E:\\hadoop-2.7.5"); //设置job的各种属性 job.setJobName("MaxTempApp"); //设置job名称 job.setJarByClass(MaxTempApp.class); //设置搜索类 job.setInputFormatClass(TextInputFormat.class); //设置输入路径 FileInputFormat.addInputPath(job,new Path(("D:\\data\\test1.txt"))); //设置输出路径 Path path = new Path("D:\\data\\out"); FileSystem fs = FileSystem.get(conf); if (fs.exists(path)) { fs.delete(path, true); } FileOutputFormat.setOutputPath(job,path); job.setMapperClass(MaxTempMapper.class); //设置mapper类 job.setReducerClass(MaxTempReducer.class); //设置reduecer类 job.setMapOutputKeyClass(ComKey.class); //设置之map输出key job.setMapOutputValueClass(NullWritable.class); //设置map输出value job.setOutputKeyClass(IntWritable.class); //设置mapreduce 输出key job.setOutputValueClass(IntWritable.class); //设置mapreduce输出value //设置分区类 job.setPartitionerClass(YearPartitioner.class); //设置分组对比器 job.setGroupingComparatorClass(YearGroupComparator.class);// //设置排序对比器 job.setSortComparatorClass(ComKeyComparator.class); job.setNumReduceTasks(3); job.waitForCompletion(true); }}
8、运行过程解析
进入mapper18/01/14 16:54:06 INFO mapred.MapTask: Map output collector class = org.apache.hadoop.mapred.MapTask$MapOutputBuffermapper输出key1995==12mapper输出value==nullwritable进行分区YearPartitioner分区0进入mappermapper输出key1994==13mapper输出value==nullwritable进行分区YearPartitioner分区2进入mappermapper输出key1995==23mapper输出value==nullwritable进行分区YearPartitioner分区0进入mappermapper输出key1998==34mapper输出value==nullwritable进行分区YearPartitioner分区0进入mappermapper输出key1991==23mapper输出value==nullwritable进行分区YearPartitioner分区2进入mappermapper输出key2004==18mapper输出value==nullwritable进行分区YearPartitioner分区0进入mappermapper输出key1995==20mapper输出value==nullwritable进行分区YearPartitioner分区0进入mappermapper输出key2004==11mapper输出value==nullwritable进行分区YearPartitioner分区0
进入组合key比较ComKeyComparatoryear:2004,temp:11==year:1995,temp:20对key进行比较实现compareTo1995=》20年份升序排列 year - y1--》9 a.compareTo(b)比较结果:9进入组合key比较ComKeyComparatoryear:2004,temp:11==year:2004,temp:18对key进行比较实现compareTo2004=》18年份相同,气温降序排列 -(temp - t1)--》7 a.compareTo(b)比较结果:7进入组合key比较ComKeyComparatoryear:1995,temp:20==year:2004,temp:18对key进行比较实现compareTo2004=》18年份升序排列 year - y1--》-9 a.compareTo(b)比较结果:-9进入组合key比较ComKeyComparatoryear:2004,temp:11==year:1998,temp:34对key进行比较实现compareTo1998=》34年份升序排列 year - y1--》6 a.compareTo(b)比较结果:6进入组合key比较ComKeyComparatoryear:2004,temp:18==year:1998,temp:34对key进行比较实现compareTo1998=》34年份升序排列 year - y1--》6 a.compareTo(b)比较结果:6进入组合key比较ComKeyComparatoryear:1995,temp:20==year:1998,temp:34对key进行比较实现compareTo1998=》34年份升序排列 year - y1--》-3 a.compareTo(b)比较结果:-3进入组合key比较ComKeyComparatoryear:2004,temp:11==year:1995,temp:23对key进行比较实现compareTo1995=》23年份升序排列 year - y1--》9 a.compareTo(b)比较结果:9进入组合key比较ComKeyComparatoryear:2004,temp:18==year:1995,temp:23对key进行比较实现compareTo1995=》23年份升序排列 year - y1--》9 a.compareTo(b)比较结果:9进入组合key比较ComKeyComparatoryear:1998,temp:34==year:1995,temp:23对key进行比较实现compareTo1995=》23年份升序排列 year - y1--》3 a.compareTo(b)比较结果:3进入组合key比较ComKeyComparatoryear:1995,temp:20==year:1995,temp:23对key进行比较实现compareTo1995=》23年份相同,气温降序排列 -(temp - t1)--》3 a.compareTo(b)比较结果:3进入组合key比较ComKeyComparatoryear:1991,temp:23==year:1994,temp:13对key进行比较实现compareTo1994=》13年份升序排列 year - y1--》-3 a.compareTo(b)比较结果:-3进入组合key比较ComKeyComparatoryear:2004,temp:11==year:1995,temp:12对key进行比较实现compareTo1995=》12年份升序排列 year - y1--》9 a.compareTo(b)比较结果:9进入组合key比较ComKeyComparatoryear:2004,temp:18==year:1995,temp:12对key进行比较实现compareTo1995=》12年份升序排列 year - y1--》9 a.compareTo(b)比较结果:9进入组合key比较ComKeyComparatoryear:1998,temp:34==year:1995,temp:12对key进行比较实现compareTo1995=》12年份升序排列 year - y1--》3 a.compareTo(b)比较结果:3进入组合key比较ComKeyComparatoryear:1995,temp:20==year:1995,temp:12对key进行比较实现compareTo1995=》12年份相同,气温降序排列 -(temp - t1)--》-8 a.compareTo(b)比较结果:-8
年份进行分组对比器YearGroupComparator进入reducerreducer输出19952318/01/14 16:54:06 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 52 bytes年份进行分组对比器YearGroupComparatorreducer输出19952018/01/14 16:54:06 INFO reduce.MergeManagerImpl: Merged 1 segments, 62 bytes to disk to satisfy reduce memory limit年份进行分组对比器YearGroupComparatorreducer输出19951218/01/14 16:54:06 INFO reduce.MergeManagerImpl: Merging 1 files, 66 bytes from disk年份进行分组对比器YearGroupComparator进入reducer18/01/14 16:54:06 INFO reduce.MergeManagerImpl: Merging 0 segments, 0 bytes from memory into reducereducer输出199834年份进行分组对比器YearGroupComparator18/01/14 16:54:06 INFO mapred.Merger: Merging 1 sorted segments进入reducerreducer输出200418reducer输出20041118/01/14 16:54:06 INFO mapred.Task: Task:attempt_local1143039137_0001_r_000000_0 is done. And is in the process of 年份进行分组对比器YearGroupComparator18/01/14 16:54:06 INFO reduce.MergeManagerImpl: Merged 1 segments, 22 bytes to disk to satisfy reduce memory limit进入reducerreducer输出199123进入reducer18/01/14 16:54:06 INFO reduce.MergeManagerImpl: Merging 1 files, 26 bytes from diskreducer输出199413