Writable接口与序列化机制
Writable接口与序列化机制
一、序列化概念
序列化(Serialization)是指把结构化对象转化为字节流。
反序列化(Deserialization)是序列化的逆过程。即把字节流转回结构化对象。
Java序列化(java.io.Serializable)
二、Hadoop序列化的特点
序列化格式特点:
- 紧凑:高效使用存储空间。
- 快速:读写数据的额外开销小
- 可扩展:可透明地读取老格式的数据
- 互操作:支持多语言的交互
Hadoop的序列化格式:Writable
三、Hadoop序列化的作用
序列化在分布式环境的两大作用:进程间通信,永久存储。
Hadoop节点间通信。
1
2
3节点1->节点2: 二进制流消息
Note right of 节点2: 二进制流反序列化为消息
Note left of 节点1: 消息序列化为二进制流
四、Writable接口
Writable接口, 是根据 DataInput 和 DataOutput 实现的简单、有效的序列化对象.
MR的任意Key和Value必须实现Writable接口.
1 | package org.apache.hadoop.io; |
MR的任意key必须实现WritableComparable接口
1 | package org.apache.hadoop.io; |
五、常用的Writable实现类
Text一般认为它等价于java.lang.String的Writable。针对UTF-8序列。
例:
Text test = new Text("test");
IntWritable one = new IntWritable(1);
1 | graph RL |
Java基本类型 | Writable实现类 | 序列化字节大小 |
---|---|---|
boolean | BooleanWritable | 1 |
byte | ByteWritable | 1 |
int | IntWritable | 4 |
VintWritable | 1~5 | |
float | FloatWritable | 4 |
Long | LongWritable | 8 |
VlongWritable | 1-9 | |
double | DoubleWritable | 8 |
String(不是基本类型) | Text |
六、自定义Writable类
想要具备序列化 和 反序列的能力,必须实现Writable接口
Writable
write 是把每个对象序列化到输出流
readFields是把输入流字节反序列化
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
//实现Writable接口
public class FlowBean implements Writable{
//上行流量
private int upflow;
//下行流量
private int downflow;
//总流量
private int sumflow;
public int getUpflow() {
return upflow;
}
public void setUpflow(int upflow) {
this.upflow = upflow;
}
public int getDownflow() {
return downflow;
}
public void setDownflow(int downflow) {
this.downflow = downflow;
}
public int getSumflow() {
return sumflow;
}
public void setSumflow(int sumflow) {
this.sumflow = sumflow;
}
public FlowBean() {
super();
}
public FlowBean(int upflow, int downflow) {
super();
this.upflow = upflow;
this.downflow = downflow;
this.sumflow = upflow+downflow;
}
//toString 一定重写 不重写 输出地址
public String toString() {
return upflow + "\t" + downflow + "\t" + sumflow;
}
//序列化的方法 原始数据----二进制
public void write(DataOutput out) throws IOException {
//通过参数的输出流 写出 字符串的序列化 writeUTF
out.writeInt(upflow);
out.writeInt(downflow);
out.writeInt(sumflow);
}
//反序列化的方法 二进制-----原始数据
//反序列化的顺序 一定和序列化的顺序一致
public void readFields(DataInput in) throws IOException {
//从流中读取 转换过来 String---readUTF
this.upflow=in.readInt();
this.downflow=in.readInt();
this.sumflow=in.readInt();
}
}Mapper端使用自定义的Writable类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17static class MyMapper extends Mapper<LongWritable, Text, Text, FlowBean>{
Text mk=new Text();
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String[] datas = value.toString().split("\t");
//获取需要的数据 进行封装发送
mk.set(datas[1]);
//封装fb
int upflow=Integer.parseInt(datas[datas.length-3]);
int downflow=Integer.parseInt(datas[datas.length-2]);
FlowBean fb=new FlowBean(upflow,downflow );
//发送
context.write(mk, fb);
}
}实现WritableComparable.
Java值对象的比较:一般需要重写toString(),hashCode(),equals()方法
七、排序comparable接口
shuffle(快速排序 归并排序)
mapreduce:中会默认按照map输出的key进行排序 默认升序
- 如果map输出key是数值类型 按照大小排序的 升序排序的
- 如果map输出的key是字符串类型的 按照顺序进行排序的 升序排序
- 自定义的类型 具备以下两点需要实现的接口:
1)序列化和反序列功能 —Writable(write readFields)
2)具备比较的能力 — comparable(comparaTo)- 自定义的类型必须同时实现两个接口即实现以下的接口:
WritableComparable—–(write readFields comparaTo)
1 | import java.io.DataInput; |
需求说明
对wordcount的结果(二次MapReduce)进行排序 按照单词出现的次数进行排序, shuffle过程中是可以进行排序的,利用这个排序为我们排序.
1 | hadoop 256 |
实现思路
1 | 想要利用shuffle排序:将需要排序的字段放在map的输出的key上 |
1 | package com.ghgj.cn.sort; |
八、MapReduce输入的处理类
1、FileInputFormat:
- FileInputFormat是所有以文件作为数据源的InputFormat实现的基类,FileInputFormat保存作为job输入的所有文件,并实现了对输入文件计算splits的方法。至于获得记录的方法是有不同的子类——TextInputFormat进行实现的。
2、InputFormat:
public interface InputFormat<K, V> { InputSplit[] getSplits(JobConf var1, int var2) throws IOException; RecordReader<K, V> getRecordReader(InputSplit var1, JobConf var2, Reporter var3) throws IOException; }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
- InputFormat 负责处理MR的输入部分.有三个作用:
- 验证作业的输入是否规范.
- 把输入文件切分成InputSplit.
- 提供RecordReader 的实现类,把InputSplit读到Mapper中进行处理.
- 3、InputSplit:
- 在执行mapreduce之前,原始数据被分割成若干split,每个split作为一个map任务的输入,在map执行过程中split会被分解成一个个记录(key-value对),map会依次处理每一个记录。
- FileInputFormat只划分比HDFS block大的文件,所以FileInputFormat划分的结果是这个文件或者是这个文件中的一部分.
- 如果一个文件的大小比block小,将不会被划分,这也是Hadoop处理大文件的效率要比处理很多小文件的效率高的原因。
- 当Hadoop处理很多小文件(文件大小小于hdfs block大小)的时候,由于FileInputFormat不会对小文件进行划分,所以每一个小文件都会被当做一个split并分配一个map任务,导致效率底下。
- 例如:一个1G的文件,会被划分成16个64MB的split,并分配16个map任务处理,而10000个100kb的文件会被10000个map任务处理。
- 4、TextInputFormat:
- TextInputformat是默认的处理类,处理普通文本文件。
- 文件中每一行作为一个记录,他将每一行在文件中的起始偏移量作为key,每一行的内容作为value。
- 默认以\n或回车键作为一行记录。
- TextInputFormat继承了FileInputFormat。
### 九、InputFormat类的层次结构
```mermaid
graph RL
A[Fileinpputformat] -->B(InputFormat)
C[ComposableInputformat] -->B
D[DBInputformat] -->B
E[Emptyinputformat] -->B
A1[CombinefileInputformat] -->A
A2[TextInputformat] -->A
A3[KeyValueTextInputformat] -->A
A4[NLineInputformat] -->A
A5[SequenceFileInputformat] -->A
A6[StreamInputformat] -->A3
A7[SequenceFileAsBinaryInputformat] -->A5
A8[SequenceFileAsTextInputformat] -->A5
A9[SequenceFileInputfiltter] -->A5
C1[CompositeInputformat] -->C
十、其他输入类
1、CombineFileInputFormat
- 相对于大量的小文件来说,hadoop更合适处理少量的大文件。
- CombineFileInputFormat可以缓解这个问题,它是针对小文件而设计的。
- 相对于大量的小文件来说,hadoop更合适处理少量的大文件。
2、KeyValueTextInputFormat
- 当输入数据的每一行是两列,并用tab分离的形式的时候,KeyValueTextInputformat处理这种格式的文件非常适合。
3、NLineInputformat
- NLineInputformat可以控制在每个split中数据的行数。
4、SequenceFileInputformat
当输入文件格式是sequencefile的时候,要使用SequenceFileInputformat作为输入。
十一、自定义输入格式
1、继承FileInputFormat基类。
2、重写里面的getSplits(JobContext context)方法。
3、重写createRecordReader(InputSplit split,TaskAttemptContext context)方法。
十二、Hadoop的输出
1、TextOutputformat
- 默认的输出格式,key和value中间值用tab隔开的。
2、SequenceFileOutputformat
- 将key和value以sequencefile格式输出。
3、SequenceFileAsOutputFormat
- 将key和value以原始二进制的格式输出。
4、MapFileOutputFormat
- 将key和value写入MapFile中。由于MapFile中的key是有序的,所以写入的时候必须保证记录是按key值顺序写入的。
5、MultipleOutputFormat
- 默认情况下一个reducer会产生一个输出,但是有些时候我们想一个reducer产生多个输出,MultipleOutputFormat和MultipleOutputs可以实现这个功能。
十三、思考
MapReduce框架的结构是什么
Map在整个MR框架中作用是什么
Reduce在整个MR框架中作用是什么
十四、序列化与反序列化例子
数据
1 | 1363157985066 13726230503 2481 24681 200 |
DataBean类
1 | import java.io.DataInput; |
DataCount类
1 | import java.io.IOException; |
public static class DataCountMapper extends Mapper<LongWritable, Text, Text, DataBean>{
@Override
protected void map(LongWritable key, Text value,
Mapper<LongWritable, Text, Text, DataBean>.Context context)
throws IOException, InterruptedException {
String hang=value.toString();
String[] strings=hang.split("\t");
String phone=strings[1];
long up=Long.parseLong(strings[2]);
long down=Long.parseLong(strings[3]);
DataBean dataBean=new DataBean(phone,up, down);
context.write(new Text(phone), dataBean);
}
}
1 | public static class DataCountReducer extends Reducer<Text, DataBean, Text, DataBean>{ |