接下来就是上代码了,这里使用java调用API说明。

配置环境:

这里是在windows下的环境配置,我在mac环境下没有配置程序也能够运行。

  1. 将hadoop的插件包放置在eclipse的安装目录的plugins

    hadoop-eclipse-plugin-2.7.5.jar 添加过去。

    编译好的jarhttps://github.com/HuangDongdong666/Hadoop-eclipse-plugin-2.8.3

  2. 配置本地的hadoop环境 安装单机版的windows下的hadoop ,用于代码调试

    1. 解压hadoop的安装包

    2. 配置hadoop的windows下的环境变量

      1
      2
      HADOOP_HOME=C:\soft\hadoop-3.2.1
      PATH= 末尾追加 ;%HADOOP_HOME%\bin;%HADOOP_HOME%\sbin
    3. 添加插件https://github.com/cdarlint/winutils

      • hadoop.dll放在windows下的System32文件夹下
      • 将winutil.exe放在hadoop安装目录的bin目录下
    4. 重启eclipse

    5. 配置eclipse的可视化操作界面

      1. 配置eclipse下的hadoop的环境变量
        • windows>>prefrences>>左侧菜单栏搜索 hadoop >> browse 导入hadoop的windows下的安装目录
      2. 配置hadoop的控制台界面
        • window>>>show view>>other>>搜索map 选择map/reduce,面板会出现黄色小象
      3. 配置hadoop集群的本地连接
        • 前提:先配置windows下的hosts文件
        • 修改eclipse的显示方式:project explorer

创建工程

导入依赖包

有下面3种方式,为了方便我使用maven进行依赖管理

1
2
3
4
5
6
7
1)在工程下建一个lib的包   bulid PATH
工程移动的时候比较方便
不能解决jar包冲入问题 工程比较臃肿
2)maven
自动解决jar包依赖冲突问题 便于项目的管理
工程移动的时候需要重新构建 构建工程比较麻烦
3)自己创建一个本地的library 导入

案例

本地文件系统对象上传

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package pers.sfl.hdfs;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

import java.io.IOException;

public class TestHdfs1 {
public static void main(String[] args) throws IOException {
// Configuration 配置文件对象 加载配置文件 或设置配置文件
Configuration conf = new Configuration();
// 获取hdfs的操作对象 获取的是hdfs的操作句柄对象 FileSystem对象
// hdfs的文件系统对象 想要操作hdfs必须先拿到这个对象
FileSystem fileSystem = FileSystem.get(conf);
// org.apache.hadoop.fs.LocalFileSystem@37574691 本地文件系统对象:文件上传和下载都在本地操作的
System.out.println(fileSystem);

Path src=new Path("/Users/sun/Documents/idea/bigdata/src/main/resources/name.txt");
Path dst=new Path("/Users/sun/Documents/idea/bigdata/src/main/resources/name2.txt");

/**
*
* 文件上传
* put 本地 hdfs路径
* copyfromlocal-----api
* movefromlocal
*/
//文件上传 上传到本地 文件传到了工程的根目录下
//文件上传的时候生成了两个文件 原始文件name2.txt 另外一个文件 .name2.txt.crc
fileSystem.copyFromLocalFile(src, dst);
fileSystem.close();
}
}

结果

1
2
3
4
5
6
7
8
9
SUN:resources sun$ pwd
/Users/sun/Documents/idea/bigdata/src/main/resources
SUN:resources sun$ ll
total 24
drwxr-xr-x 5 sun staff 160 Oct 24 16:01 ./
drwxr-xr-x 4 sun staff 128 Oct 22 20:19 ../
-rw-r--r-- 1 sun staff 12 Oct 24 16:01 .name2.txt.crc
-rw-r--r-- 1 sun staff 18 Oct 24 15:58 name.txt
-rw-r--r-- 1 sun staff 18 Oct 24 16:01 name2.txt

分布式文件系统的对象

  • conf.set("fs.defaultFS", "hdfs://hadoop01:9000");
  • FileSystem fs=FileSystem.get(new URI("hdfs://hadoop01:9000"), conf, "hadoop");
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package pers.sfl.hdfs;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

import java.io.IOException;

public class TestHdfs1 {
public static void main(String[] args) throws IOException {
// Configuration 配置文件对象 加载配置文件 或设置配置文件
Configuration conf = new Configuration();

//没有指定集群的连接入口 想要获取集群的需要指定集群的连接入口
//在conf对象上设置连接入口
/**
* 参数1:配置文件的属性名
* 参数2:配置文件的属性的值
*/
//conf.set("dfs.replication", "4");
//conf.set("fs.defaultFS", "hdfs://hadoop01:9000");
//参数1:namenode的连接入口
FileSystem fs=FileSystem.get(new URI("hdfs://hadoop01:9000"), conf, "hadoop");
// 获取hdfs的操作对象 获取的是hdfs的操作句柄对象 FileSystem对象
FileSystem fileSystem = FileSystem.get(conf);
// org.apache.hadoop.fs.LocalFileSystem@37574691 本地文件系统对象:文件上传和下载都在本地操作的
System.out.println(fileSystem);

Path src=new Path("/Users/sun/Documents/idea/bigdata/src/main/resources/name.txt");
Path dst=new Path("/Users/sun/Documents/idea/bigdata/src/main/resources/name3.txt");

//文件上传
fileSystem.copyFromLocalFile(src, dst);
fileSystem.close();
}
}

结果

1
2
3
4
5
6
7
8
9
10
11
[root@hadoop01 ~]# hadoop fs -ls /wordcount/output
Found 3 items
-rw-r--r-- 3 root supergroup 0 2019-10-22 13:28 /wordcount/output/_SUCCESS
-rw-r--r-- 3 root supergroup 77 2019-10-22 13:28 /wordcount/output/part-r-00000
-rw-r--r-- 3 root supergroup 119 2019-10-22 13:28 /wordcount/output/part-r-00001
[root@hadoop01 ~]# hadoop fs -ls /wordcount/output
Found 4 items
-rw-r--r-- 3 root supergroup 0 2019-10-22 13:28 /wordcount/output/_SUCCESS
-rw-r--r-- 3 root supergroup 18 2019-10-23 00:16 /wordcount/output/name3.txt
-rw-r--r-- 3 root supergroup 77 2019-10-22 13:28 /wordcount/output/part-r-00000
-rw-r--r-- 3 root supergroup 119 2019-10-22 13:28 /wordcount/output/part-r-00001

文件上传的时候报一个权限的问题

原因是用户在eclipse中进行上传文件的时候 使用windows下的用户 而不是hdfs的安装用户。

解决权限问题:

  1. 在代码提交的时候指定用户

    1
    2
    3
    4
    5
    代码运行的时候  右键》》 run configurations>> 配置运行代码需要的参数 -DHADOOP_USER_NAME=hadoop
    -> program arguments:代码程序运行需要的参数
    代码中需要控制台传入的参数 这个参数通过main(String[] args)
    -> VM arguments:JVM运行过程中需要的参数
    比如jvm内存大小 jvm运行的用户指定
  2. 在代码中指定用户

    • 指定系统的运行参数 System

      1
      2
      3
      4
      5
      System.setProperty("HADOOP_USER_NAME", "hadoop");

      //或者

      FileSystem fs=FileSystem.get(new URI("hdfs://hadoop01:9000"), conf, "hadoop");
  3. windows下添加一个hadoop用户不建议

Configuration配置文件加载说明

配置文件加载的是默认的配置文件

  • 默认加载的配置文件是 工程下hadoop-hdfs-3.2.1.jar的jar包下的hdfs-default.xml这个文件
  • 进行文件上传的时候 所有的参数都是默认的参数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
Configuration对象默认加载:
* hdfs-default.xml
* mapred-default.xml
* yarn-default.xml
* core-default.xml
* 想要加载自己的配置文件,需要将自己的配置文件拷贝到工程的src下才能,默认加载到自己的配置文件
* 自动加载的配置文件只能识别
* hdfs-default.xml
* hdfs-site.xml
* mapred-default.xml
* mapred-site.xml
* yarn-default.xml
* yarn-site.xml
* core-default.xml
* core-site.xml
* 配置如果没有放在src下?需要代码中手动加载配置文件
* conf.addResource("conf/hdfs-site.xml");
* 在代码中也可以手动设置配置文件

配置文件的加载顺序:

1
2
3
4
5
* 1)jar包
* 2)src下
* 3)代码中的
* 生效:
* 3 >> 2 >> 1

API 操作

  • 上传
  • 下载
  • 创建文件夹
  • 删除文件夹和文件
  • 判断文件是否存在
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
package pers.sfl.hdfs;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Arrays;

public class TestAPI {
public static void main(String[] args) throws IOException, InterruptedException, URISyntaxException {
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(new URI("hdfs://hadoop01:9000"), conf, "hadoop");

//文件上传 本地---hdfs
fs.copyFromLocalFile(new Path("D:\\movies.dat"), new Path("/movie_tt"));

//文件下载
fs.copyToLocalFile(new Path("/movie_tt"), new Path("D:\\movie01.avi"));

//创建文件夹 hdfs dfs -mkdir 级联创建文件夹
boolean isfinish = fs.mkdirs(new Path("/bd1807/aa/ss/ff"));
// System.out.println(isfinish);

//delete() 既可以删除文件 也可以删除文件夹的
//删除文件 返回值 bool 删除--true 否则--false
System.out.println(fs.delete(new Path("/aa001"),false));

//删除文件夹
/**
* 参数1:需要删除的路径 文件 文件夹
* 参数2:是否需要级联删除 true需要 false --不需要
*/
fs.delete(new Path("/ss"),true);

//判断文件夹/文件是否存在 如果存在 则返回true 否则--false
boolean ise=fs.exists(new Path("/bd1807")); //mkdir/delete
//System.out.println(ise);

if(fs.exists(new Path("/movie_tt"))){
fs.delete(new Path("/movie_tt"),false);
}
fs.close();
}

}

API查看文件的信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
package pers.sfl.hdfs;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Arrays;

public class TestAPI {
public static void main(String[] args) throws IOException, InterruptedException, URISyntaxException {
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(new URI("hdfs://hadoop01:9000"), conf, "hadoop");

//1.查看文件的信息 指定路径下的文件信息 参数1--需要查看的路径 参数2--是否级联 ls -R
RemoteIterator<LocatedFileStatus> listFiles = fs.listFiles(new Path("/"), false);

//迭代器 迭代 hasNext next()
//LocatedFileStatus 文件状态对象
//封装的文件状态 -- 文件的路径 文件的大小 文件的用户 组 文件的副本
while(listFiles.hasNext()){
System.out.println("------------------------------");
//next对象 获取的是文件的状态信息 一个next代表的是一个文件
LocatedFileStatus next = listFiles.next();
System.out.println(next.getPath());
System.out.println(next.getBlockSize());//获取的配置文件的分块大小 128M
System.out.println(next.getLen());//获取文件的真实大小 块的实际大小
System.out.println(next.getReplication());

//获取的是文件的切块信息 每一个文件的切块信息
//BlockLocation 数据块的封装对象
//包括数据块的存储位置 数据块的大小 数据块的所属用户 组
BlockLocation[] blockLocations = next.getBlockLocations();

//[0,268435456,hadoop03,hadoop02, 268435456,90761455,hadoop03,hadoop02]
//[0起始的偏移量, 268435456 当前块的实际大小,hadoop03,hadoop02, 副本存放的位置==> 文件的第一个块
//268435456,90761455,hadoop03,hadoop02] ==> 第二个块
System.out.println(Arrays.toString(blockLocations));
//循环遍历每一个文件的每一个数据块
for(BlockLocation bl:blockLocations){
System.out.print("这个块的起始偏移量"+bl.getOffset()+"\t");
System.out.print("这个块的长度"+bl.getLength()+"\t");
//这个是同一个数据块的所有副本存放的位置
String[] hosts = bl.getHosts();
System.out.println("这个数据块的副本存放位置"+Arrays.toString(hosts));
}
}
fs.close();
}

}
------------------------------
hdfs://hadoop01:9000/hadoop-3.2.1.tar.gz
268435456
359196911
2
[0,268435456,hadoop03,hadoop02, 268435456,90761455,hadoop03,hadoop02]
这个块的起始偏移量0 这个块的长度268435456 这个数据块的副本存放位置[hadoop03, hadoop02]
这个块的起始偏移量268435456 这个块的长度90761455 这个数据块的副本存放位置[hadoop03, hadoop02]
------------------------------
hdfs://hadoop01:9000/jdk-8u221-linux-x64.tar.gz
268435456
195094741
2
[0,195094741,hadoop03,hadoop02]
这个块的起始偏移量0 这个块的长度195094741 这个数据块的副本存放位置[hadoop03, hadoop02]

API查看文件夹的信息2

看不到块信息

可以看到文件夹

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
package pers.sfl.hdfs;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Arrays;

public class TestAPI {
public static void main(String[] args) throws IOException, InterruptedException, URISyntaxException {
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(new URI("hdfs://hadoop01:9000"), conf, "hadoop");

//2.查看指定目录的文件或文件夹的状态信息
//FileStatus 文件或文件夹的状态对象 封装的文件或文件夹的 用户 组 长度 路径
FileStatus[] listStatus = fs.listStatus(new Path("/"));
for (FileStatus fss : listStatus) {
System.out.println("=======================");
System.out.println(fss.getPath());
System.out.println(fss.getBlockSize());
System.out.println(fss.getLen());
System.out.println(fss.getReplication());
}
fs.close();
}
}

=======================
hdfs://hadoop01:9000/hadoop-3.2.1.tar.gz
268435456
359196911
2
=======================
hdfs://hadoop01:9000/jdk-8u221-linux-x64.tar.gz
268435456
195094741
2
=======================
hdfs://hadoop01:9000/wordcount
0
0
0

Process finished with exit code 0

IO流本地上传

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
package pers.sfl.hdfs;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;

import java.io.FileInputStream;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;

public class TestIO {
public static void main(String[] args) throws IOException, InterruptedException, URISyntaxException {
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(new URI("hdfs://hadoop01:9000"), conf, "hadoop");

// 文件上传
// 本地 -> 读 -> 输入流
// hdfs -> 写 -> 输出流

//创建本地的输入流
FileInputStream in = new FileInputStream("D:\\movie01.avi");

//创建hdfs的输出流 fs
FSDataOutputStream out = fs.create(new Path("/movie_yy01"));

//进行读写 IOUtils hadoop进行文件读写的工具类

//参数1:输入流 参数2:输出流 参数3:缓冲大小
IOUtils.copyBytes(in, out, 1024);
//关闭流
in.close();
out.close();

//参数1:输入流 参数2:输出流 参数3:缓冲大小 参数4:执行完成 是否关闭流
IOUtils.copyBytes(in, out, 1024, true);

//参数3---long 读取的字节数 读取指定的字节数
IOUtils.copyBytes(in, out, 2L, true);
}
}

IO 下载

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package pers.sfl.hdfs;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;

import java.io.FileOutputStream;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;

public class testIO2 {
public static void main(String[] args) throws IOException, InterruptedException, URISyntaxException {
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(new URI("hdfs://hadoop01:9000"), conf, "hadoop");

//文件下载
// hdfs -> 读 -> 输入流
// 本地 -> 写 -> 输出流

//hdfs的输入流 fs
FSDataInputStream in = fs.open(new Path("/movie_yy"));

//这个方法将流的指针设置到某一个字节开始读取 参数--偏移量
in.seek(10L);

//创建本地的输出流
FileOutputStream out = new FileOutputStream("D:\\moo02");

//进行流的复制
//参数3是Long类型:读取的字节数 读取指定的字节数
IOUtils.copyBytes(in, out, 100L, true);

}

}

crc校验

检测数据是否损坏的措施是,在数据第一次引入系统时候计算校验和(checksum),并在数据通过一个不可靠的通道时候进行传输时再次计算校验和,这样就能发现数据是否损坏了,如果两次计算的校验和不匹配,你就认为数据已经损坏了,但是该技术不能修复数据,它只能检测出错误。常用的错误检测码是CRC-32(循环冗余校验),任何大小的数据输入均计算得到一个32位的整数校验和。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package pers.sfl.hdfs;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;

public class TestCrc {
public static void main(String[] args) throws IOException, InterruptedException, URISyntaxException {
Configuration conf=new Configuration();
FileSystem fs=FileSystem.get(new URI("hdfs://hadoop01:9000"), conf,"hadoop");
Path src=new Path("/testcrc");
Path dst=new Path("D:\\test04");
/**
* 文件下载过程中 只要有一个是没有损坏的副本 下载是没有损坏的块 crc校验是可通过的
*
* crc校验的时候:校验的内容只是原始文件的偏移量内的内容
* 只要这部分内容没有发生变化 -> 校验通过
* 这部分内容 发生变化 -> 校验不通过的
*/

fs.copyToLocalFile(src, dst);
fs.close();
}

}