reducetask的并行度

一个yarnchild 对应一个maptask

1个reucetask任务对应的就是MyReducer的计算量, 运行reduce任务的并行度

默认情况下 reducetask的个数只有1个, 一个reducetask只能运行在一个节点上

通过wordcount的结果我们可以看出也是只有一个reducetask。

  • 当数据量很大时候 只有一个reducetask不合理的
    1. 这个reducetask的压力很大
    2. 负载不均衡

设置reducetask个数

reducetask也应该根据我们的实际的数据 设置多个

如何设置:

代码设置

1
2
//设置reducetask的个数
job.setNumReduceTasks(tasks);
  • 参数代表就是reducetask的个数 需要几个reducetask的时候 设置为几就可以。

  • 这个参数值的默认为,默认情况只运行一个reducetask。

  • job.setNumReduceTask(3);发现输出的结果3个文件 1个标志文件

    • part-r-00000

    • part-r-00001

    • part-r-00002

    • 结论:一个reducetask最终输出一个对应的结果文件

reduce中的数据划分

默认情况下:hash分割数据

  • 一个reducetask的数据对应的是一个分区的数据

  • 分区:对map输出的数据进行一个按照一定的规则划分,每一部分称为一个分区

1
key.hash%reducetask的个数
  • partition,这里的分割规则叫分区算法
    推断:默认的分区算法:hash算法 1)散列 2)唯一

分区的底层实现

默认的抽象类 **Partitioner **抽象类 定义分区算法/分区规则

1
2
3
4
5
6
public abstract class Partitioner<KEY, VALUE> {
//返回值:int 这里的返回值代表的是分区编号,每一个分区的唯一标志,默认从0开始,顺序递增的
//参数:参数1-map输出的key 参数2-map输出的value 参数3-分区个数(job.setNumReduceTask())
public abstract int getPartition
(KEY key, VALUE value, int numPartitions);
}

默认调用的实现类:HashPartitioner

默认的分区算法 取map的key的hash值%reducetask的个数 获取返回值就是分区编号

1
2
3
4
5
6
7
8
9
//泛型指的是map输出的k   v的类型
public class HashPartitioner<K, V> extends Partitioner<K, V> {
//默认的参数3 numReduceTasks
/** Use {@link Object#hashCode()} to partition. */
public int getPartition(K key, V value,
int numReduceTasks) {
return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}
}
  • key.hashCode() & Integer.MAX_VALUE 目的:防止溢出 范围控制integer_max

    1
    2
    3
    4
    5
    1011001100111010101010101
    &
    111111111111111111111111
    --------------------------
    011001100111010101010101

小结

默认情况下

  • 一个分区—-一个reducetask—-一个输出结果文件
    • part-r-00001 00001—代表分区编号
  • 分区算法采用hash分区
    (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
  • 分区算法 和 reducetask的个数一一对应的
  • reducetask的并行度,取决于reducetask的个数 ,job.setNumReduceTasks
  • reducetask的数据如何分配,取决于分区算法的,默认的hash分区

自定义分区:自己定义的

  • 分区算法决定map输出的数据如何分配给reduce

自定义分区

默认的分区算法 并不能满足所有的需求,不能自定义数据的去向。

1
2
3
假设:有一堆数据,地区有江苏,浙江,上海,内蒙等地区
如果我们想按照地域进行划分reducetask的数据,
默认的就不可以,我们需要自定义分区

自定义分区说明

1
2
3
写法:
1)继承Partitioner类
2)重写getPartition 方法,这个方法的返回值,代表就是分区编号

例子

分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
例子:将流量汇总统计结果按照手机归属地不同省份输出到不同文件中

1. 手机号归属地--假设手机号的前三位
2. 输出到不同的文件-----使用不同的reducetask输出到不同分区
3. 分区算法:按照手机号的归属地进行划分的


自定义分区:
1. 分区字段:map的key的位置的数据
2. 按照手机号分区

shuffle过程
map端:
key:手机号
value:剩下的
reduce端:
输出
1
2
3
4
//指定自定义分区
job.setPartitionerClass(MyPartitioner.class);
//指定reducetask的个数
job.setNumReduceTasks(3);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
/**
* 泛型 map端输出的<key,value>
* @author Administrator
*/
public class MyPartitioner extends Partitioner<Text, Text>{
/**
* 参数1---map大输出key 参数2:map大输出的value 参数3:分区个数
*/
@Override
public int getPartition(Text key, Text value, int numPartitions) {
String mk = key.toString();
if(mk.startsWith("134")||mk.startsWith("135")||mk.startsWith("136")){
return 0;//part-r-00000
}else if(mk.startsWith("137")||mk.startsWith("138")||mk.startsWith("139")){
return 1;//part-r-00001
}else{
return 2;//part-r-00002
}
}
}

报错

Illegal partition for 15013685858 (3)

分区个数说明:

  • reducetask的个数为1的时候,所有的分区的数据默认全部到一个reducetask中
  • reducetask的个数大于1, 按照分区编号对应的分区到对应的reducetask中
  • 设置的时候reducetask的个数=分区个数
分区编号值设置为3----说明reducetask-3个
    1)reducetask0=分区0
    2)reducetask1=分区1
    3)reducetask2=分区2

报错:
Illegal partition for 15013685858 (3)

原因:分区和reducetask的个数不匹配

注意:设定分区编号的时候,最好顺序递增的,不要跳数(从0开始)
分区个数3个(getPartition方法返回的个数):reducetask的个数(job.setNumReduceTasks可以设置:
    1) 1 === 可以:对应一个输出结果
    2) 2 === 不可以:
    3) 3 可以
    4) >3  可以

数据倾斜:分区不合理

多个reducetask并行计算的时候,某一个reducetask分配数据不均匀,这个时候产生现象数据倾斜。
后果:reducetask的整体运行的性能低

  • 数据倾斜一定需要尽量避免的
  • 原因:分区算法没有合理的分配均匀数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
假设有100个reducetask
其中99reducetask--1T 需要10min
剩下1个reducetask---100T1000min
那么进度:
map 100% reduce 99%
map 100% reduce 99%
map 100% reduce 99%
map 100% reduce 99%
map 100% reduce 99%
map 100% reduce 99%
map 100% reduce 99%
map 100% reduce 99%
...............

大数据中不怕数据量大 怕数据倾斜

解决

合理设计分区算法,根据实际的数据抽样做测试,进行合理设计分区,业务,数据。

比如购物的的时候,西藏等地方的数据量就会少于北上广等地的数据。

reducetask数据倾斜的原因

  • mapreduce阶段:

    • maptask:1个切片默认128M—1maptask

      1. maptask的并行度比较高,切片个数—数据块的个数(最后一个切片有可能跨块的)
      2. maptask数据一个切片为128M,对应的数据量比较小。
    • reducetask:容易产生数据倾斜

      1. 并行度不高,job.setNumReduceTasks()

        经验值:reducetask最大值=datanode*0.95 即100个节点设置reducetask的个数为95。

      2. reducetask的数据分配取决于分区算法

      3. reducetask的对应的数据量本身比较大

combiner组件

  • 这个组件不适用所有的场景的
  • 优化组件:提升性能的作用 ,这个组件默认不加的
  • 作用:对到reducetask之前的数据做预处理:
    • 帮助reducetask做预处理,减少reducetask的数据量,提升性能

组件怎么加?

  • 这个组件就是提前帮助reducetask做一些自己的工作,减轻redcetask的压力.
  • combiner和reducetask的业务逻辑一样
  • 作用的时间点在maptask之后,reducetask之前

实现:这个组件不会影响业务逻辑和reducetask的实现是一样

  1. 继承Reducer类
  2. 重写reduce方法
  3. job中设置添加combiner组件
1
2
3
//实现和reducetask的实现一样
//实际的开发过程直接使用reducer的类作为combiner的类
job.setCombinerClass(MyCombiner.class);

场景

可以适用

  1. 求和
  2. 求最大值
  3. 求最小值

不适用:

  1. 求平均值(这是是相当于平均值再取平均值)

例子

流量汇总统计结果按照手机归属地不同省份输出到不同文件中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


public class FlowPartition {
//key--手机号 value---其他
static class MyMapper extends Mapper<LongWritable, Text, Text, Text>{
Text mk=new Text();
Text mv=new Text();
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
String[] datas = value.toString().split("\t");
mk.set(datas[0]);
mv.set(datas[1]+"\t"+datas[2]+"\t"+datas[3]);
context.write(mk, mv);
}

}
static class MyReducer extends Reducer<Text, Text, Text, Text>{
//分组 按照手机号分组的
@Override
protected void reduce(Text key, Iterable<Text> values,
Reducer<Text, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
for(Text v:values){
context.write(key, v);
}
}
}
public static void main(String[] args) {
System.setProperty("HADOOP_USER_NAME", "hadoop");
//将mapper reducer类进行一个封装 封装为一个任务----job(作业)
//加载配置文件
Configuration conf=new Configuration();
//启动一个Job 创建一个job对象
try {
Job job=Job.getInstance(conf);
//设置这个job
//设置整个job的主函数入口
job.setJarByClass(FlowPartition.class);

//设置job的mappper的类
job.setMapperClass(MyMapper.class);

//设置job的reducer的类
job.setReducerClass(MyReducer.class);


//如果map输出的key value的类型 和reduce输出的key value的类型相同 这个时候只设置最终输出
//设置reduce的输出的k v类型 以下方法设置的是mr的最终输出
//这个设置实际上 map和reduce的输出
//如果输出的类型不一致 一定分开设置
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

//指定自定义分区
job.setPartitionerClass(MyPartitioner.class);
//指定reducetask的个数
job.setNumReduceTasks(2);


//指定需要统计的文件的输入路径 FileInputFormat 文件输入类
Path inpath=new Path("hdfs://hadoop01:9000/flow_out01");
FileInputFormat.addInputPath(job, inpath);

//指定输出目录 输出路径不能存在的 否则会报错 默认输出是覆盖式的输出 如果输出目录存在 有可能造成原始数据的丢失
Path outpath=new Path("hdfs://hadoop01:9000/flow_partition_out05");
FileOutputFormat.setOutputPath(job, outpath);

//提交job 执行这一句的时候 job才会提交 上面做的一系列的工作 都是设置job
//job.submit();
job.waitForCompletion(true);

} catch (Exception e) {

e.printStackTrace();
}
}

}