博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
二次排序解析
阅读量:4710 次
发布时间:2019-06-10

本文共 10569 字,大约阅读时间需要 35 分钟。

1、定义组合key

package com.cr.com.cr.test;import org.apache.hadoop.io.WritableComparable;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;public class ComKey implements WritableComparable
{ private int year; private int temp; public int getYear() { return year; } public void setYear(int year) { this.year = year; } public int getTemp() { return temp; } public void setTemp(int temp) { this.temp = temp; } //对key进行比较实现 public int compareTo(ComKey o) { System.out.println("对key进行比较实现compareTo" ); int y1 = o.getYear(); int t1 = o.getTemp(); System.out.println(y1 + "=" + t1); //如果年份相同 if (year == y1) { //气温降序 int result = -(temp - t1); System.out.println("年份相同,比较气温" + result); return result; } else { //年份升序 int result = year - y1; System.out.println("年份升序" + result); return result; } } //串行化过程 public void write(DataOutput out) throws IOException { out.writeInt(year); out.writeInt(temp); } //反串行化过程 public void readFields(DataInput in) throws IOException { this.year = in.readInt(); this.temp = in.readInt(); } @Override public String toString() { return "year:" + year + ",temp:" + temp; }}

2、mapper

package com.cr.com.cr.test;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;/** * mapper:输出为组合key,输出value为空值 */public class MaxTempMapper extends Mapper
{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { System.out.println("进入mapper"); String line = value.toString(); String[] arr = line.split(" "); ComKey keyout = new ComKey(); keyout.setYear(Integer.parseInt(arr[0])); keyout.setTemp(Integer.parseInt(arr[1])); System.out.println("mapper输出key"+keyout.getYear()+ "==" + keyout.getTemp()); System.out.println("mapper输出value==nullwritable"); context.write(keyout,NullWritable.get()); }}

3、进行分区

package com.cr.com.cr.test;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.mapreduce.Partitioner;public class YearPartitioner extends Partitioner
{ @Override public int getPartition(ComKey comkey, NullWritable nullWritable, int i) { System.out.println("进行分区YearPartitioner" ); int year = comkey.getYear(); System.out.println("分区"+year % i); return year % i; }}

4、对mapper的输出key进行比较,年份升序排列,如果年份相同,按照气温降序排列

package com.cr.com.cr.test;import org.apache.hadoop.io.RawComparator;import org.apache.hadoop.io.WritableComparable;import org.apache.hadoop.io.WritableComparator;public class ComKeyComparator extends WritableComparator {    ComKeyComparator() {        super(ComKey.class, true);    }    @Override    public int compare(WritableComparable a, WritableComparable b) {        System.out.println("进入组合key比较ComKeyComparator");        System.out.println(a + "==" + b);        int result =  a.compareTo(b);        System.out.println(" a.compareTo(b)比较结果:"+result);        return result;    }}

5、reducer按照key聚合

package com.cr.com.cr.test;import org.apache.hadoop.io.RawComparator;import org.apache.hadoop.io.WritableComparable;import org.apache.hadoop.io.WritableComparator;/** * 按照年份进行分组对比器实现 */public class YearGroupComparator extends WritableComparator {    YearGroupComparator() {        super(ComKey.class, true);    }    @Override    public int compare(WritableComparable a, WritableComparable b) {        System.out.println("年份进行分组对比器YearGroupComparator");        int y1 = ((ComKey) a).getYear();        int y2 = ((ComKey) b).getYear();        int result = y1 - y2;        return result;    }}

6、对年份进行分组

package com.cr.com.cr.test;import org.apache.hadoop.io.RawComparator;import org.apache.hadoop.io.WritableComparable;import org.apache.hadoop.io.WritableComparator;/** * 按照年份进行分组对比器实现 */public class YearGroupComparator extends WritableComparator {    YearGroupComparator() {        super(ComKey.class, true);    }    @Override    public int compare(WritableComparable a, WritableComparable b) {        System.out.println("年份进行分组对比器YearGroupComparator");        int y1 = ((ComKey) a).getYear();        int y2 = ((ComKey) b).getYear();        int result = y1 - y2;        return result;    }}

7、主函数

package com.cr.com.cr.test;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class MaxTempApp {    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {        //单例作业        Configuration conf = new Configuration();        conf.set("fs.defaultFS","file:///");        Job job = Job.getInstance(conf);        System.setProperty("hadoop.home.dir","E:\\hadoop-2.7.5");        //设置job的各种属性        job.setJobName("MaxTempApp");                 //设置job名称        job.setJarByClass(MaxTempApp.class);              //设置搜索类        job.setInputFormatClass(TextInputFormat.class);        //设置输入路径        FileInputFormat.addInputPath(job,new Path(("D:\\data\\test1.txt")));        //设置输出路径        Path path = new Path("D:\\data\\out");        FileSystem fs = FileSystem.get(conf);        if (fs.exists(path)) {            fs.delete(path, true);        }        FileOutputFormat.setOutputPath(job,path);        job.setMapperClass(MaxTempMapper.class);               //设置mapper类        job.setReducerClass(MaxTempReducer.class);               //设置reduecer类        job.setMapOutputKeyClass(ComKey.class);            //设置之map输出key        job.setMapOutputValueClass(NullWritable.class);   //设置map输出value        job.setOutputKeyClass(IntWritable.class);               //设置mapreduce 输出key        job.setOutputValueClass(IntWritable.class);      //设置mapreduce输出value        //设置分区类        job.setPartitionerClass(YearPartitioner.class);        //设置分组对比器        job.setGroupingComparatorClass(YearGroupComparator.class);//        //设置排序对比器        job.setSortComparatorClass(ComKeyComparator.class);        job.setNumReduceTasks(3);        job.waitForCompletion(true);    }}

8、运行过程解析

进入mapper18/01/14 16:54:06 INFO mapred.MapTask: Map output collector class = org.apache.hadoop.mapred.MapTask$MapOutputBuffermapper输出key1995==12mapper输出value==nullwritable进行分区YearPartitioner分区0进入mappermapper输出key1994==13mapper输出value==nullwritable进行分区YearPartitioner分区2进入mappermapper输出key1995==23mapper输出value==nullwritable进行分区YearPartitioner分区0进入mappermapper输出key1998==34mapper输出value==nullwritable进行分区YearPartitioner分区0进入mappermapper输出key1991==23mapper输出value==nullwritable进行分区YearPartitioner分区2进入mappermapper输出key2004==18mapper输出value==nullwritable进行分区YearPartitioner分区0进入mappermapper输出key1995==20mapper输出value==nullwritable进行分区YearPartitioner分区0进入mappermapper输出key2004==11mapper输出value==nullwritable进行分区YearPartitioner分区0
进入组合key比较ComKeyComparatoryear:2004,temp:11==year:1995,temp:20对key进行比较实现compareTo1995=》20年份升序排列 year - y1--》9 a.compareTo(b)比较结果:9进入组合key比较ComKeyComparatoryear:2004,temp:11==year:2004,temp:18对key进行比较实现compareTo2004=》18年份相同,气温降序排列 -(temp - t1)--》7 a.compareTo(b)比较结果:7进入组合key比较ComKeyComparatoryear:1995,temp:20==year:2004,temp:18对key进行比较实现compareTo2004=》18年份升序排列 year - y1--》-9 a.compareTo(b)比较结果:-9进入组合key比较ComKeyComparatoryear:2004,temp:11==year:1998,temp:34对key进行比较实现compareTo1998=》34年份升序排列 year - y1--》6 a.compareTo(b)比较结果:6进入组合key比较ComKeyComparatoryear:2004,temp:18==year:1998,temp:34对key进行比较实现compareTo1998=》34年份升序排列 year - y1--》6 a.compareTo(b)比较结果:6进入组合key比较ComKeyComparatoryear:1995,temp:20==year:1998,temp:34对key进行比较实现compareTo1998=》34年份升序排列 year - y1--》-3 a.compareTo(b)比较结果:-3进入组合key比较ComKeyComparatoryear:2004,temp:11==year:1995,temp:23对key进行比较实现compareTo1995=》23年份升序排列 year - y1--》9 a.compareTo(b)比较结果:9进入组合key比较ComKeyComparatoryear:2004,temp:18==year:1995,temp:23对key进行比较实现compareTo1995=》23年份升序排列 year - y1--》9 a.compareTo(b)比较结果:9进入组合key比较ComKeyComparatoryear:1998,temp:34==year:1995,temp:23对key进行比较实现compareTo1995=》23年份升序排列 year - y1--》3 a.compareTo(b)比较结果:3进入组合key比较ComKeyComparatoryear:1995,temp:20==year:1995,temp:23对key进行比较实现compareTo1995=》23年份相同,气温降序排列 -(temp - t1)--》3 a.compareTo(b)比较结果:3进入组合key比较ComKeyComparatoryear:1991,temp:23==year:1994,temp:13对key进行比较实现compareTo1994=》13年份升序排列 year - y1--》-3 a.compareTo(b)比较结果:-3进入组合key比较ComKeyComparatoryear:2004,temp:11==year:1995,temp:12对key进行比较实现compareTo1995=》12年份升序排列 year - y1--》9 a.compareTo(b)比较结果:9进入组合key比较ComKeyComparatoryear:2004,temp:18==year:1995,temp:12对key进行比较实现compareTo1995=》12年份升序排列 year - y1--》9 a.compareTo(b)比较结果:9进入组合key比较ComKeyComparatoryear:1998,temp:34==year:1995,temp:12对key进行比较实现compareTo1995=》12年份升序排列 year - y1--》3 a.compareTo(b)比较结果:3进入组合key比较ComKeyComparatoryear:1995,temp:20==year:1995,temp:12对key进行比较实现compareTo1995=》12年份相同,气温降序排列 -(temp - t1)--》-8 a.compareTo(b)比较结果:-8
年份进行分组对比器YearGroupComparator进入reducerreducer输出19952318/01/14 16:54:06 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 52 bytes年份进行分组对比器YearGroupComparatorreducer输出19952018/01/14 16:54:06 INFO reduce.MergeManagerImpl: Merged 1 segments, 62 bytes to disk to satisfy reduce memory limit年份进行分组对比器YearGroupComparatorreducer输出19951218/01/14 16:54:06 INFO reduce.MergeManagerImpl: Merging 1 files, 66 bytes from disk年份进行分组对比器YearGroupComparator进入reducer18/01/14 16:54:06 INFO reduce.MergeManagerImpl: Merging 0 segments, 0 bytes from memory into reducereducer输出199834年份进行分组对比器YearGroupComparator18/01/14 16:54:06 INFO mapred.Merger: Merging 1 sorted segments进入reducerreducer输出200418reducer输出20041118/01/14 16:54:06 INFO mapred.Task: Task:attempt_local1143039137_0001_r_000000_0 is done. And is in the process of 年份进行分组对比器YearGroupComparator18/01/14 16:54:06 INFO reduce.MergeManagerImpl: Merged 1 segments, 22 bytes to disk to satisfy reduce memory limit进入reducerreducer输出199123进入reducer18/01/14 16:54:06 INFO reduce.MergeManagerImpl: Merging 1 files, 26 bytes from diskreducer输出199413

转载于:https://www.cnblogs.com/flyingcr/p/10326950.html

你可能感兴趣的文章
HDU1532 网络流最大流【EK算法】(模板题)
查看>>
PHP使用curl替代file_get_contents
查看>>
Webstorm通用设置
查看>>
jquery倾斜的动画导航菜单
查看>>
JAVA IO流的简单总结+收集日志异常信息
查看>>
类型转换与键盘输入
查看>>
面向对象(2)
查看>>
运算符(1)
查看>>
掷骰子游戏和条件语句
查看>>
循环语句
查看>>
加标签的continue用法
查看>>
递归算法
查看>>
java继承 、方法重写、重写toString方法
查看>>
SQL注入原理-手工联合注入查询技术
查看>>
实验3 SQL注入原理-万能密码注入
查看>>
redis cluster
查看>>
feign传输String json串 自动转义 \ 解决方法
查看>>
本站已稳定运行了XX天,网页时间显示功能实现方法
查看>>
实习的开始阶段
查看>>
搭建第一个node服务器
查看>>