Writable接口与序列化机制

一、序列化概念

  • 序列化(Serialization)是指把结构化对象转化为字节流。

  • 反序列化(Deserialization)是序列化的逆过程。即把字节流转回结构化对象。

  • Java序列化(java.io.Serializable)

二、Hadoop序列化的特点

  • 序列化格式特点:

    • 紧凑:高效使用存储空间。
    • 快速:读写数据的额外开销小
    • 可扩展:可透明地读取老格式的数据
    • 互操作:支持多语言的交互
  • Hadoop的序列化格式:Writable

三、Hadoop序列化的作用

  • 序列化在分布式环境的两大作用:进程间通信,永久存储。

  • Hadoop节点间通信。

    1
    2
    3
    节点1->节点2: 二进制流消息
    Note right of 节点2: 二进制流反序列化为消息
    Note left of 节点1: 消息序列化为二进制流

四、Writable接口

Writable接口, 是根据 DataInput 和 DataOutput 实现的简单、有效的序列化对象.

MR的任意Key和Value必须实现Writable接口.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
package org.apache.hadoop.io;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Stable;

@Public
@Stable
public interface Writable {
void write(DataOutput var1) throws IOException;

void readFields(DataInput var1) throws IOException;
}

MR的任意key必须实现WritableComparable接口

1
2
3
4
5
6
7
8
9
10
package org.apache.hadoop.io;

import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Stable;

@Public
@Stable
public interface WritableComparable<T> extends Writable, Comparable<T> {
}

五、常用的Writable实现类

Text一般认为它等价于java.lang.String的Writable。针对UTF-8序列。

例:

  Text test = new Text("test");
  IntWritable one = new IntWritable(1);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
graph RL
A[WritableComparable] -->B(Writable)
C[ArrayWritable] -->B(Writable)
D[TwodArrayWritable] -->B(Writable)
E[AbstractMapWritable] -->B(Writable)
A1[BooleanWritable] -->A(WritableComparable)
A2[ByteWritable] -->A(WritableComparable)
A3[IntWritable] -->A(WritableComparable)
A4[VintWritable] -->A(WritableComparable)
A5[FloatWritable] -->A(WritableComparable)
A6[LongWritable] -->A(WritableComparable)
A7[VlongWritable] -->A(WritableComparable)
A8[DoubleWritable] -->A(WritableComparable)
A9[NullWritable] -->A(WritableComparable)
A10[Text] -->A(WritableComparable)
A11[BytesWritable] -->A(WritableComparable)
A12[MD5Hash] -->A(WritableComparable)
A13[ObjectWritable] -->A(WritableComparable)
A14[GenericWritable] -->A(WritableComparable)
E1(MapWritable)--> E[AbstractMapWritable]
E2(SortedMapWritable)--> E[AbstractMapWritable]
Java基本类型 Writable实现类 序列化字节大小
boolean BooleanWritable 1
byte ByteWritable 1
int IntWritable 4
VintWritable 1~5
float FloatWritable 4
Long LongWritable 8
VlongWritable 1-9
double DoubleWritable 8
String(不是基本类型) Text

六、自定义Writable类

想要具备序列化 和 反序列的能力,必须实现Writable接口

  • Writable

    • write 是把每个对象序列化到输出流

    • readFields是把输入流字节反序列化

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;

    import org.apache.hadoop.io.Writable;

    //实现Writable接口

    public class FlowBean implements Writable{
    //上行流量
    private int upflow;
    //下行流量
    private int downflow;
    //总流量
    private int sumflow;
    public int getUpflow() {
    return upflow;
    }
    public void setUpflow(int upflow) {
    this.upflow = upflow;
    }
    public int getDownflow() {
    return downflow;
    }
    public void setDownflow(int downflow) {
    this.downflow = downflow;
    }
    public int getSumflow() {
    return sumflow;
    }
    public void setSumflow(int sumflow) {
    this.sumflow = sumflow;
    }
    public FlowBean() {
    super();
    }
    public FlowBean(int upflow, int downflow) {
    super();
    this.upflow = upflow;
    this.downflow = downflow;
    this.sumflow = upflow+downflow;
    }

    //toString 一定重写 不重写 输出地址
    @Override
    public String toString() {
    return upflow + "\t" + downflow + "\t" + sumflow;
    }

    //序列化的方法 原始数据----二进制
    @Override
    public void write(DataOutput out) throws IOException {
    //通过参数的输出流 写出 字符串的序列化 writeUTF
    out.writeInt(upflow);
    out.writeInt(downflow);
    out.writeInt(sumflow);

    }

    //反序列化的方法 二进制-----原始数据
    //反序列化的顺序 一定和序列化的顺序一致
    @Override
    public void readFields(DataInput in) throws IOException {
    //从流中读取 转换过来 String---readUTF
    this.upflow=in.readInt();
    this.downflow=in.readInt();
    this.sumflow=in.readInt();

    }

    }

    Mapper端使用自定义的Writable类

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    static class MyMapper extends Mapper<LongWritable, Text, Text, FlowBean>{
    Text mk=new Text();
    @Override
    protected void map(LongWritable key, Text value, Context context)
    throws IOException, InterruptedException {
    String[] datas = value.toString().split("\t");
    //获取需要的数据 进行封装发送
    mk.set(datas[1]);
    //封装fb
    int upflow=Integer.parseInt(datas[datas.length-3]);
    int downflow=Integer.parseInt(datas[datas.length-2]);
    FlowBean fb=new FlowBean(upflow,downflow );
    //发送
    context.write(mk, fb);
    }

    }
  • 实现WritableComparable.

  • Java值对象的比较:一般需要重写toString(),hashCode(),equals()方法

七、排序comparable接口

shuffle(快速排序 归并排序)

mapreduce:中会默认按照map输出的key进行排序 默认升序

  • 如果map输出key是数值类型 按照大小排序的 升序排序的
  • 如果map输出的key是字符串类型的 按照顺序进行排序的 升序排序
  • 自定义的类型 具备以下两点需要实现的接口:
    1)序列化和反序列功能 —Writable(write readFields)
    2)具备比较的能力 — comparable(comparaTo)
  • 自定义的类型必须同时实现两个接口即实现以下的接口:
    WritableComparable—–(write readFields comparaTo)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;
/**
* 自定义类排序:实现writable&comparable
* 泛型---用于比较的对象的类型
*/
public class FlowBean implements WritableComparable<FlowBean>{
private String phoneno;
private int upflow;
private int downflow;
private int sumflow;
public String getPhoneno() {
return phoneno;
}
public void setPhoneno(String phoneno) {
this.phoneno = phoneno;
}
public int getUpflow() {
return upflow;
}
public void setUpflow(int upflow) {
this.upflow = upflow;
}
public int getDownflow() {
return downflow;
}
public void setDownflow(int downflow) {
this.downflow = downflow;
}
public int getSumflow() {
return sumflow;
}
public void setSumflow(int sumflow) {
this.sumflow = sumflow;
}
public FlowBean() {
super();
}
public FlowBean(String phoneno, int upflow, int downflow, int sumflow) {
super();
this.phoneno = phoneno;
this.upflow = upflow;
this.downflow = downflow;
this.sumflow = sumflow;
}
@Override
public String toString() {
return phoneno + "\t" + upflow + "\t" + downflow + "\t" + sumflow;
}

//序列化
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(phoneno);
out.writeInt(upflow);
out.writeInt(downflow);
out.writeInt(sumflow);

}
//反序列化
@Override
public void readFields(DataInput in) throws IOException {
this.phoneno=in.readUTF();
this.upflow=in.readInt();
this.downflow=in.readInt();
this.sumflow=in.readInt();

}
//比较 同java中
@Override
public int compareTo(FlowBean o) {
//先比较:1.总流量 2.比较手机号
//总流量倒序排
int tmp=o.getSumflow()-this.getSumflow();
if(tmp==0){
//手机号正序排
tmp=this.getPhoneno().compareTo(o.getPhoneno());
}
return tmp;
}
}

需求说明

wordcount的结果(二次MapReduce)进行排序 按照单词出现的次数进行排序, shuffle过程中是可以进行排序的,利用这个排序为我们排序.

1
2
3
4
5
6
7
8
hadoop    256
hadoophello 4
hello 164
hive 176
lily 84
spark 168
word 84
ww 84

实现思路

1
2
3
4
5
6
7
8
9
10
11
想要利用shuffle排序:将需要排序的字段放在map的输出的key上
输出结果还想要(单词--次数)
reduce端输出的时候:将map的结果进行一个反转

map端:
获取一行数据:切分
key:次数
value:单词
reduce端:
将map输过来的数据 进行对调之后 输出就可以
降序排序:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
package com.ghgj.cn.sort;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
* 对wc的结果进行排序的
* @author Administrator
*
*/
public class WCSort {
//key----次数 value---单词
static class MyMapper extends Mapper<LongWritable, Text, IntWritable, Text>{
IntWritable mk=new IntWritable();
Text mv=new Text();
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, IntWritable, Text>.Context context)
throws IOException, InterruptedException {
//单词,次数
String[] word_count = value.toString().split("\t");
//封装 发送
/*
* hello,5 world,5 a,5
* -5 -7 -8
* -8 -7 -5
*/
mk.set(-Integer.parseInt(word_count[1]));
mv.set(word_count[0]);
//5,hello 5,world 5,a
context.write(mk, mv);
}
}
//key--text value---int
static class MyReducer extends Reducer<IntWritable, Text, Text, IntWritable>{

/*
* 一组调用一次 相同的key为一组
* 相同的次数分到一组中
* @see org.apache.hadoop.mapreduce.Reducer#reduce(KEYIN, java.lang.Iterable, org.apache.hadoop.mapreduce.Reducer.Context)
*
* key=5
* values=<hello,world,a>
*/
@Override
protected void reduce(IntWritable key, Iterable<Text> values,
Reducer<IntWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
//循环遍历values 对调输出
for(Text v:values){
context.write(v, new IntWritable(-key.get()));
}
}

}

public static void main(String[] args) {
System.setProperty("HADOOP_USER_NAME", "hadoop");
//将mapper reducer类进行一个封装 封装为一个任务----job(作业)
//加载配置文件
Configuration conf=new Configuration();
//启动一个Job 创建一个job对象
try {
Job job=Job.getInstance(conf);
//设置这个job
//设置整个job的主函数入口
job.setJarByClass(WCSort.class);

//设置job的mappper的类
job.setMapperClass(MyMapper.class);

//设置job的reducer的类
job.setReducerClass(MyReducer.class);


//设置map输出key value的类型
//指定了泛型 这里为什么还要设置一次 泛型的作用范围 编译的时候生效 运行的时候泛型会自动擦除
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(Text.class);


//设置reduce的输出的k v类型 以下方法设置的是mr的最终输出
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);


//指定需要统计的文件的输入路径 FileInputFormat 文件输入类
Path inpath=new Path("hdfs://hadoop01:9000/wc_out");
FileInputFormat.addInputPath(job, inpath);

//指定输出目录 输出路径不能存在的 否则会报错 默认输出是覆盖式的输出 如果输出目录存在 有可能造成原始数据的丢失
Path outpath=new Path("hdfs://hadoop01:9000/wc_sort_02");
FileOutputFormat.setOutputPath(job, outpath);

//提交job 执行这一句的时候 job才会提交 上面做的一系列的工作 都是设置job
//job.submit();
job.waitForCompletion(true);

} catch (Exception e) {

e.printStackTrace();
}

}

}

八、MapReduce输入的处理类

  • 1、FileInputFormat:

    • FileInputFormat是所有以文件作为数据源的InputFormat实现的基类,FileInputFormat保存作为job输入的所有文件,并实现了对输入文件计算splits的方法。至于获得记录的方法是有不同的子类——TextInputFormat进行实现的。
  • 2、InputFormat:

    • public interface InputFormat<K, V> {
          InputSplit[] getSplits(JobConf var1, int var2) throws IOException;
      
          RecordReader<K, V> getRecordReader(InputSplit var1, JobConf var2, Reporter var3) throws IOException;
      }
      
      
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44

      - InputFormat 负责处理MR的输入部分.有三个作用:

      - 验证作业的输入是否规范.
      - 把输入文件切分成InputSplit.
      - 提供RecordReader 的实现类,把InputSplit读到Mapper中进行处理.

      - 3、InputSplit:

      - 在执行mapreduce之前,原始数据被分割成若干split,每个split作为一个map任务的输入,在map执行过程中split会被分解成一个个记录(key-value对),map会依次处理每一个记录。
      - FileInputFormat只划分比HDFS block大的文件,所以FileInputFormat划分的结果是这个文件或者是这个文件中的一部分.
      - 如果一个文件的大小比block小,将不会被划分,这也是Hadoop处理大文件的效率要比处理很多小文件的效率高的原因。
      - 当Hadoop处理很多小文件(文件大小小于hdfs block大小)的时候,由于FileInputFormat不会对小文件进行划分,所以每一个小文件都会被当做一个split并分配一个map任务,导致效率底下。

      - 例如:一个1G的文件,会被划分成16个64MB的split,并分配16个map任务处理,而10000个100kb的文件会被10000个map任务处理。

      - 4、TextInputFormat:

      - TextInputformat是默认的处理类,处理普通文本文件。

      - 文件中每一行作为一个记录,他将每一行在文件中的起始偏移量作为key,每一行的内容作为value。

      - 默认以\n或回车键作为一行记录。

      - TextInputFormat继承了FileInputFormat。

      ### 九、InputFormat类的层次结构

      ```mermaid
      graph RL
      A[Fileinpputformat] -->B(InputFormat)
      C[ComposableInputformat] -->B
      D[DBInputformat] -->B
      E[Emptyinputformat] -->B
      A1[CombinefileInputformat] -->A
      A2[TextInputformat] -->A
      A3[KeyValueTextInputformat] -->A
      A4[NLineInputformat] -->A
      A5[SequenceFileInputformat] -->A
      A6[StreamInputformat] -->A3
      A7[SequenceFileAsBinaryInputformat] -->A5
      A8[SequenceFileAsTextInputformat] -->A5
      A9[SequenceFileInputfiltter] -->A5
      C1[CompositeInputformat] -->C

十、其他输入类

  • 1、CombineFileInputFormat

    • 相对于大量的小文件来说,hadoop更合适处理少量的大文件。
      • CombineFileInputFormat可以缓解这个问题,它是针对小文件而设计的。
  • 2、KeyValueTextInputFormat

    • 当输入数据的每一行是两列,并用tab分离的形式的时候,KeyValueTextInputformat处理这种格式的文件非常适合。
  • 3、NLineInputformat

    • NLineInputformat可以控制在每个split中数据的行数。
  • 4、SequenceFileInputformat

  • 当输入文件格式是sequencefile的时候,要使用SequenceFileInputformat作为输入。

十一、自定义输入格式

  • 1、继承FileInputFormat基类。

  • 2、重写里面的getSplits(JobContext context)方法。

  • 3、重写createRecordReader(InputSplit split,TaskAttemptContext context)方法。

十二、Hadoop的输出

  • 1、TextOutputformat

    • 默认的输出格式,key和value中间值用tab隔开的。
  • 2、SequenceFileOutputformat

    • 将key和value以sequencefile格式输出。
  • 3、SequenceFileAsOutputFormat

    • 将key和value以原始二进制的格式输出。
  • 4、MapFileOutputFormat

    • 将key和value写入MapFile中。由于MapFile中的key是有序的,所以写入的时候必须保证记录是按key值顺序写入的。
  • 5、MultipleOutputFormat

    • 默认情况下一个reducer会产生一个输出,但是有些时候我们想一个reducer产生多个输出,MultipleOutputFormat和MultipleOutputs可以实现这个功能。

十三、思考

  • MapReduce框架的结构是什么

  • Map在整个MR框架中作用是什么

  • Reduce在整个MR框架中作用是什么

十四、序列化与反序列化例子

数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
1363157985066	13726230503	2481	24681	200
1363157995052 13826544101 264 0 200
1363157991076 13926435656 132 1512 200
1363154400022 13926251106 240 0 200
1363157993044 18211575961 1527 2106 200
1363157995074 84138413 4116 1432 200
1363157993055 13560439658 1116 954 200
1363157995033 15920133257 3156 2936 200
1363157983019 13719199419 240 0 200
1363157984041 13660577991 6960 690 200
1363157973098 15013685858 3659 3538 200
1363157986029 15989002119 1938 180 200
1363157992093 13560439658 918 4938 200
1363157986041 13480253104 180 180 200
1363157984040 13602846565 1938 2910 200

13726230503 2481 24681 sum

DataBean类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.Writable;

public class DataBean implements Writable{

//电话号码
private String phone;
//上行流量
private Long upPayLoad;
//下行流量
private Long downPayLoad;
//总流量
private Long totalPayLoad;

public DataBean(){}

public DataBean(String phone,Long upPayLoad, Long downPayLoad) {
super();
this.phone=phone;
this.upPayLoad = upPayLoad;
this.downPayLoad = downPayLoad;
this.totalPayLoad=upPayLoad+downPayLoad;
}

/**
* 序列化
* 注意:序列化和反序列化的顺序和类型必须一致
*/
@Override
public void write(DataOutput out) throws IOException {
// TODO Auto-generated method stub
out.writeUTF(phone);
out.writeLong(upPayLoad);
out.writeLong(downPayLoad);
out.writeLong(totalPayLoad);
}

/**
* 反序列化
*/
@Override
public void readFields(DataInput in) throws IOException {
// TODO Auto-generated method stub
this.phone=in.readUTF();
this.upPayLoad=in.readLong();
this.downPayLoad=in.readLong();
this.totalPayLoad=in.readLong();
}

@Override
public String toString() {
return upPayLoad +"\t"+ downPayLoad +"\t"+ totalPayLoad;
}

public String getPhone() {
return phone;
}

public void setPhone(String phone) {
this.phone = phone;
}

public Long getUpPayLoad() {
return upPayLoad;
}

public void setUpPayLoad(Long upPayLoad) {
this.upPayLoad = upPayLoad;
}

public Long getDownPayLoad() {
return downPayLoad;
}

public void setDownPayLoad(Long downPayLoad) {
this.downPayLoad = downPayLoad;
}

public Long getTotalPayLoad() {
return totalPayLoad;
}

public void setTotalPayLoad(Long totalPayLoad) {
this.totalPayLoad = totalPayLoad;
}
}

DataCount类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class DataCount {

public static void main(String[] args) throws IOException, ClassNotFoundException,
InterruptedException {
// TODO Auto-generated method stub
Job job=Job.getInstance(new Configuration());

job.setJarByClass(DataCount.class);

job.setMapperClass(DataCountMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(DataBean.class);
FileInputFormat.setInputPaths(job, args[0]);

job.setReducerClass(DataCountReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(DataBean.class);
FileOutputFormat.setOutputPath(job, new Path(args[1]));

job.waitForCompletion(true);
}
public static class DataCountMapper extends Mapper<LongWritable, Text, Text, DataBean>{

@Override
protected void map(LongWritable key, Text value,
Mapper<LongWritable, Text, Text, DataBean>.Context context)
throws IOException, InterruptedException {
String hang=value.toString();
String[] strings=hang.split("\t");
String phone=strings[1];
long up=Long.parseLong(strings[2]);
long down=Long.parseLong(strings[3]);
DataBean dataBean=new DataBean(phone,up, down);

context.write(new Text(phone), dataBean);
}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public static class DataCountReducer extends Reducer<Text, DataBean, Text, DataBean>{

@Override
protected void reduce(Text k2, Iterable<DataBean> v2,
Reducer<Text, DataBean, Text, DataBean>.Context context)
throws IOException, InterruptedException {
long upSum=0;
long downSum=0;

for(DataBean dataBean:v2){
upSum += dataBean.getUpPayLoad();
downSum += dataBean.getDownPayLoad();
}

DataBean dataBean=new DataBean(k2.toString(),upSum,downSum);

context.write(new Text(k2), dataBean);
}

}