reducetask的并行度
reducetask的并行度
一个yarnchild 对应一个maptask
1个reucetask任务对应的就是MyReducer的计算量, 运行reduce任务的并行度
默认情况下 reducetask的个数只有1个, 一个reducetask只能运行在一个节点上。
通过wordcount的结果我们可以看出也是只有一个reducetask。
- 当数据量很大时候 只有一个reducetask不合理的
- 这个reducetask的压力很大
- 负载不均衡
设置reducetask个数
reducetask也应该根据我们的实际的数据 设置多个
如何设置:
代码设置
1 | //设置reducetask的个数 |
参数代表就是reducetask的个数 需要几个reducetask的时候 设置为几就可以。
这个参数值的默认为,默认情况只运行一个reducetask。
job.setNumReduceTask(3);
发现输出的结果3个文件 1个标志文件part-r-00000
part-r-00001
part-r-00002
结论:一个reducetask最终输出一个对应的结果文件
reduce中的数据划分
默认情况下:hash分割数据
一个reducetask的数据对应的是一个分区的数据
分区:对map输出的数据进行一个按照一定的规则划分,每一部分称为一个分区
1 | key.hash%reducetask的个数 |
- partition,这里的分割规则叫分区算法
推断:默认的分区算法:hash算法 1)散列 2)唯一
分区的底层实现
默认的抽象类 **Partitioner **抽象类 定义分区算法/分区规则
1 | public abstract class Partitioner<KEY, VALUE> { |
默认调用的实现类:HashPartitioner
默认的分区算法 取map的key的hash值%reducetask的个数 获取返回值就是分区编号
1 | //泛型指的是map输出的k v的类型 |
key.hashCode() & Integer.MAX_VALUE
目的:防止溢出 范围控制integer_max1
2
3
4
51011001100111010101010101
&
111111111111111111111111
--------------------------
011001100111010101010101
小结
默认情况下
- 一个分区—-一个reducetask—-一个输出结果文件
- part-r-00001 00001—代表分区编号
- 分区算法采用hash分区
(key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
- 分区算法 和 reducetask的个数一一对应的
- reducetask的并行度,取决于reducetask的个数 ,job.setNumReduceTasks
- reducetask的数据如何分配,取决于分区算法的,默认的hash分区
自定义分区:自己定义的
- 分区算法决定map输出的数据如何分配给reduce
自定义分区
默认的分区算法 并不能满足所有的需求,不能自定义数据的去向。
1 | 假设:有一堆数据,地区有江苏,浙江,上海,内蒙等地区 |
自定义分区说明
1 | 写法: |
例子
分析
1 | 例子:将流量汇总统计结果按照手机归属地不同省份输出到不同文件中 |
1 | //指定自定义分区 |
1 | import org.apache.hadoop.io.Text; |
报错
Illegal partition for 15013685858 (3)
分区个数说明:
- reducetask的个数为1的时候,所有的分区的数据默认全部到一个reducetask中
- reducetask的个数大于1, 按照分区编号对应的分区到对应的reducetask中
- 设置的时候
reducetask的个数=分区个数
分区编号值设置为3----说明reducetask-3个
1)reducetask0=分区0
2)reducetask1=分区1
3)reducetask2=分区2
报错:
Illegal partition for 15013685858 (3)
原因:分区和reducetask的个数不匹配
注意:设定分区编号的时候,最好顺序递增的,不要跳数(从0开始)
分区个数3个(getPartition方法返回的个数):reducetask的个数(job.setNumReduceTasks可以设置:
1) 1 === 可以:对应一个输出结果
2) 2 === 不可以:
3) 3 可以
4) >3 可以
数据倾斜:分区不合理
多个reducetask并行计算的时候,某一个reducetask分配数据不均匀,这个时候产生现象数据倾斜。
后果:reducetask的整体运行的性能低
- 数据倾斜一定需要尽量避免的
- 原因:分区算法没有合理的分配均匀数据
1 | 假设有100个reducetask |
解决
合理设计分区算法,根据实际的数据抽样做测试,进行合理设计分区,业务,数据。
比如购物的的时候,西藏等地方的数据量就会少于北上广等地的数据。
reducetask数据倾斜的原因
mapreduce阶段:
maptask:1个切片默认128M—1maptask
- maptask的并行度比较高,切片个数—数据块的个数(最后一个切片有可能跨块的)
- maptask数据一个切片为128M,对应的数据量比较小。
reducetask:容易产生数据倾斜
并行度不高,job.setNumReduceTasks()
经验值:reducetask最大值=datanode*0.95 即100个节点设置reducetask的个数为95。
reducetask的数据分配取决于分区算法
reducetask的对应的数据量本身比较大
combiner组件
- 这个组件不适用所有的场景的
- 优化组件:提升性能的作用 ,这个组件默认不加的
- 作用:对到reducetask之前的数据做预处理:
- 帮助reducetask做预处理,减少reducetask的数据量,提升性能
组件怎么加?
- 这个组件就是提前帮助reducetask做一些自己的工作,减轻redcetask的压力.
- combiner和reducetask的业务逻辑一样
- 作用的时间点在maptask之后,reducetask之前
实现:这个组件不会影响业务逻辑和reducetask的实现是一样
- 继承Reducer类
- 重写reduce方法
- job中设置添加combiner组件
1 | //实现和reducetask的实现一样 |
场景
可以适用
- 求和
- 求最大值
- 求最小值
不适用:
- 求平均值(这是是相当于平均值再取平均值)
例子
流量汇总统计结果按照手机归属地不同省份输出到不同文件中
1 | import java.io.IOException; |