接下来就是上代码了,这里使用java调用API说明。
配置环境:
这里是在windows下的环境配置,我在mac环境下没有配置程序也能够运行。
将hadoop的插件包放置在eclipse的安装目录的plugins
hadoop-eclipse-plugin-2.7.5.jar
添加过去。
编译好的jarhttps://github.com/HuangDongdong666/Hadoop-eclipse-plugin-2.8.3
配置本地的hadoop环境 安装单机版的windows下的hadoop ,用于代码调试
解压hadoop的安装包
配置hadoop的windows下的环境变量
1 2 HADOOP_HOME=C:\soft\hadoop-3.2.1 PATH= 末尾追加 ;%HADOOP_HOME%\bin;%HADOOP_HOME%\sbin
添加插件https://github.com/cdarlint/winutils
hadoop.dll放在windows下的System32文件夹下
将winutil.exe放在hadoop安装目录的bin目录下
重启eclipse
配置eclipse的可视化操作界面
配置eclipse下的hadoop的环境变量
windows>>prefrences>>左侧菜单栏搜索 hadoop >> browse 导入hadoop的windows下的安装目录
配置hadoop的控制台界面
window>>>show view>>other>>搜索map 选择map/reduce,面板会出现黄色小象
配置hadoop集群的本地连接
前提:先配置windows下的hosts文件
修改eclipse的显示方式:project explorer
创建工程 导入依赖包
有下面3种方式,为了方便我使用maven进行依赖管理
1 2 3 4 5 6 7 1)在工程下建一个lib的包 bulid PATH 工程移动的时候比较方便 不能解决jar包冲入问题 工程比较臃肿 2)maven 自动解决jar包依赖冲突问题 便于项目的管理 工程移动的时候需要重新构建 构建工程比较麻烦 3)自己创建一个本地的library 导入
案例 本地文件系统对象上传 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 package pers.sfl.hdfs; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import java.io.IOException; public class TestHdfs1 { public static void main(String[] args) throws IOException { // Configuration 配置文件对象 加载配置文件 或设置配置文件 Configuration conf = new Configuration(); // 获取hdfs的操作对象 获取的是hdfs的操作句柄对象 FileSystem对象 // hdfs的文件系统对象 想要操作hdfs必须先拿到这个对象 FileSystem fileSystem = FileSystem.get(conf); // org.apache.hadoop.fs.LocalFileSystem@37574691 本地文件系统对象:文件上传和下载都在本地操作的 System.out.println(fileSystem); Path src=new Path("/Users/sun/Documents/idea/bigdata/src/main/resources/name.txt"); Path dst=new Path("/Users/sun/Documents/idea/bigdata/src/main/resources/name2.txt"); /** * * 文件上传 * put 本地 hdfs路径 * copyfromlocal-----api * movefromlocal */ //文件上传 上传到本地 文件传到了工程的根目录下 //文件上传的时候生成了两个文件 原始文件name2.txt 另外一个文件 .name2.txt.crc fileSystem.copyFromLocalFile(src, dst); fileSystem.close(); } }
结果
1 2 3 4 5 6 7 8 9 SUN:resources sun$ pwd /Users/sun/Documents/idea/bigdata/src/main/resources SUN:resources sun$ ll total 24 drwxr-xr-x 5 sun staff 160 Oct 24 16:01 ./ drwxr-xr-x 4 sun staff 128 Oct 22 20:19 ../ -rw-r--r-- 1 sun staff 12 Oct 24 16:01 .name2.txt.crc -rw-r--r-- 1 sun staff 18 Oct 24 15:58 name.txt -rw-r--r-- 1 sun staff 18 Oct 24 16:01 name2.txt
分布式文件系统的对象
conf.set("fs.defaultFS", "hdfs://hadoop01:9000");
FileSystem fs=FileSystem.get(new URI("hdfs://hadoop01:9000"), conf, "hadoop");
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 package pers.sfl.hdfs; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import java.io.IOException; public class TestHdfs1 { public static void main(String[] args) throws IOException { // Configuration 配置文件对象 加载配置文件 或设置配置文件 Configuration conf = new Configuration(); //没有指定集群的连接入口 想要获取集群的需要指定集群的连接入口 //在conf对象上设置连接入口 /** * 参数1:配置文件的属性名 * 参数2:配置文件的属性的值 */ //conf.set("dfs.replication", "4"); //conf.set("fs.defaultFS", "hdfs://hadoop01:9000"); //参数1:namenode的连接入口 FileSystem fs=FileSystem.get(new URI("hdfs://hadoop01:9000"), conf, "hadoop"); // 获取hdfs的操作对象 获取的是hdfs的操作句柄对象 FileSystem对象 FileSystem fileSystem = FileSystem.get(conf); // org.apache.hadoop.fs.LocalFileSystem@37574691 本地文件系统对象:文件上传和下载都在本地操作的 System.out.println(fileSystem); Path src=new Path("/Users/sun/Documents/idea/bigdata/src/main/resources/name.txt"); Path dst=new Path("/Users/sun/Documents/idea/bigdata/src/main/resources/name3.txt"); //文件上传 fileSystem.copyFromLocalFile(src, dst); fileSystem.close(); } }
结果
1 2 3 4 5 6 7 8 9 10 11 [root@hadoop01 ~]# hadoop fs -ls /wordcount/output Found 3 items -rw-r--r-- 3 root supergroup 0 2019-10-22 13:28 /wordcount/output/_SUCCESS -rw-r--r-- 3 root supergroup 77 2019-10-22 13:28 /wordcount/output/part-r-00000 -rw-r--r-- 3 root supergroup 119 2019-10-22 13:28 /wordcount/output/part-r-00001 [root@hadoop01 ~]# hadoop fs -ls /wordcount/output Found 4 items -rw-r--r-- 3 root supergroup 0 2019-10-22 13:28 /wordcount/output/_SUCCESS -rw-r--r-- 3 root supergroup 18 2019-10-23 00:16 /wordcount/output/name3.txt -rw-r--r-- 3 root supergroup 77 2019-10-22 13:28 /wordcount/output/part-r-00000 -rw-r--r-- 3 root supergroup 119 2019-10-22 13:28 /wordcount/output/part-r-00001
文件上传的时候报一个权限的问题
原因是用户在eclipse中进行上传文件的时候 使用windows下的用户 而不是hdfs的安装用户。
解决权限问题:
在代码提交的时候指定用户
1 2 3 4 5 代码运行的时候 右键》》 run configurations>> 配置运行代码需要的参数 -DHADOOP_USER_NAME=hadoop -> program arguments:代码程序运行需要的参数 代码中需要控制台传入的参数 这个参数通过main(String[] args) -> VM arguments:JVM运行过程中需要的参数 比如jvm内存大小 jvm运行的用户指定
在代码中指定用户
指定系统的运行参数 System
1 2 3 4 5 System.setProperty("HADOOP_USER_NAME", "hadoop"); //或者 FileSystem fs=FileSystem.get(new URI("hdfs://hadoop01:9000"), conf, "hadoop");
windows下添加一个hadoop用户不建议
Configuration配置文件加载说明 配置文件加载的是默认的配置文件
默认加载的配置文件是 工程下hadoop-hdfs-3.2.1.jar的jar包下的hdfs-default.xml这个文件
进行文件上传的时候 所有的参数都是默认的参数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 Configuration对象默认加载: * hdfs-default.xml * mapred-default.xml * yarn-default.xml * core-default.xml * 想要加载自己的配置文件,需要将自己的配置文件拷贝到工程的src下才能,默认加载到自己的配置文件 * 自动加载的配置文件只能识别 * hdfs-default.xml * hdfs-site.xml * mapred-default.xml * mapred-site.xml * yarn-default.xml * yarn-site.xml * core-default.xml * core-site.xml * 配置如果没有放在src下?需要代码中手动加载配置文件 * conf.addResource("conf/hdfs-site.xml"); * 在代码中也可以手动设置配置文件
配置文件的加载顺序:
1 2 3 4 5 * 1)jar包 * 2)src下 * 3)代码中的 * 生效: * 3 >> 2 >> 1
API 操作
上传
下载
创建文件夹
删除文件夹和文件
判断文件是否存在
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 package pers.sfl.hdfs; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.*; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import java.util.Arrays; public class TestAPI { public static void main(String[] args) throws IOException, InterruptedException, URISyntaxException { Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(new URI("hdfs://hadoop01:9000"), conf, "hadoop"); //文件上传 本地---hdfs fs.copyFromLocalFile(new Path("D:\\movies.dat"), new Path("/movie_tt")); //文件下载 fs.copyToLocalFile(new Path("/movie_tt"), new Path("D:\\movie01.avi")); //创建文件夹 hdfs dfs -mkdir 级联创建文件夹 boolean isfinish = fs.mkdirs(new Path("/bd1807/aa/ss/ff")); // System.out.println(isfinish); //delete() 既可以删除文件 也可以删除文件夹的 //删除文件 返回值 bool 删除--true 否则--false System.out.println(fs.delete(new Path("/aa001"),false)); //删除文件夹 /** * 参数1:需要删除的路径 文件 文件夹 * 参数2:是否需要级联删除 true需要 false --不需要 */ fs.delete(new Path("/ss"),true); //判断文件夹/文件是否存在 如果存在 则返回true 否则--false boolean ise=fs.exists(new Path("/bd1807")); //mkdir/delete //System.out.println(ise); if(fs.exists(new Path("/movie_tt"))){ fs.delete(new Path("/movie_tt"),false); } fs.close(); } }
API查看文件的信息 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 package pers.sfl.hdfs; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.*; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import java.util.Arrays; public class TestAPI { public static void main(String[] args) throws IOException, InterruptedException, URISyntaxException { Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(new URI("hdfs://hadoop01:9000"), conf, "hadoop"); //1.查看文件的信息 指定路径下的文件信息 参数1--需要查看的路径 参数2--是否级联 ls -R RemoteIterator<LocatedFileStatus> listFiles = fs.listFiles(new Path("/"), false); //迭代器 迭代 hasNext next() //LocatedFileStatus 文件状态对象 //封装的文件状态 -- 文件的路径 文件的大小 文件的用户 组 文件的副本 while(listFiles.hasNext()){ System.out.println("------------------------------"); //next对象 获取的是文件的状态信息 一个next代表的是一个文件 LocatedFileStatus next = listFiles.next(); System.out.println(next.getPath()); System.out.println(next.getBlockSize());//获取的配置文件的分块大小 128M System.out.println(next.getLen());//获取文件的真实大小 块的实际大小 System.out.println(next.getReplication()); //获取的是文件的切块信息 每一个文件的切块信息 //BlockLocation 数据块的封装对象 //包括数据块的存储位置 数据块的大小 数据块的所属用户 组 BlockLocation[] blockLocations = next.getBlockLocations(); //[0,268435456,hadoop03,hadoop02, 268435456,90761455,hadoop03,hadoop02] //[0起始的偏移量, 268435456 当前块的实际大小,hadoop03,hadoop02, 副本存放的位置==> 文件的第一个块 //268435456,90761455,hadoop03,hadoop02] ==> 第二个块 System.out.println(Arrays.toString(blockLocations)); //循环遍历每一个文件的每一个数据块 for(BlockLocation bl:blockLocations){ System.out.print("这个块的起始偏移量"+bl.getOffset()+"\t"); System.out.print("这个块的长度"+bl.getLength()+"\t"); //这个是同一个数据块的所有副本存放的位置 String[] hosts = bl.getHosts(); System.out.println("这个数据块的副本存放位置"+Arrays.toString(hosts)); } } fs.close(); } } ------------------------------ hdfs://hadoop01:9000/hadoop-3.2.1.tar.gz 268435456 359196911 2 [0,268435456,hadoop03,hadoop02, 268435456,90761455,hadoop03,hadoop02] 这个块的起始偏移量0 这个块的长度268435456 这个数据块的副本存放位置[hadoop03, hadoop02] 这个块的起始偏移量268435456 这个块的长度90761455 这个数据块的副本存放位置[hadoop03, hadoop02] ------------------------------ hdfs://hadoop01:9000/jdk-8u221-linux-x64.tar.gz 268435456 195094741 2 [0,195094741,hadoop03,hadoop02] 这个块的起始偏移量0 这个块的长度195094741 这个数据块的副本存放位置[hadoop03, hadoop02]
API查看文件夹的信息2
看不到块信息
可以看到文件夹
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 package pers.sfl.hdfs; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.*; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import java.util.Arrays; public class TestAPI { public static void main(String[] args) throws IOException, InterruptedException, URISyntaxException { Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(new URI("hdfs://hadoop01:9000"), conf, "hadoop"); //2.查看指定目录的文件或文件夹的状态信息 //FileStatus 文件或文件夹的状态对象 封装的文件或文件夹的 用户 组 长度 路径 FileStatus[] listStatus = fs.listStatus(new Path("/")); for (FileStatus fss : listStatus) { System.out.println("======================="); System.out.println(fss.getPath()); System.out.println(fss.getBlockSize()); System.out.println(fss.getLen()); System.out.println(fss.getReplication()); } fs.close(); } } ======================= hdfs://hadoop01:9000/hadoop-3.2.1.tar.gz 268435456 359196911 2 ======================= hdfs://hadoop01:9000/jdk-8u221-linux-x64.tar.gz 268435456 195094741 2 ======================= hdfs://hadoop01:9000/wordcount 0 0 0 Process finished with exit code 0
IO流本地上传 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 package pers.sfl.hdfs; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; import java.io.FileInputStream; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; public class TestIO { public static void main(String[] args) throws IOException, InterruptedException, URISyntaxException { Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(new URI("hdfs://hadoop01:9000"), conf, "hadoop"); // 文件上传 // 本地 -> 读 -> 输入流 // hdfs -> 写 -> 输出流 //创建本地的输入流 FileInputStream in = new FileInputStream("D:\\movie01.avi"); //创建hdfs的输出流 fs FSDataOutputStream out = fs.create(new Path("/movie_yy01")); //进行读写 IOUtils hadoop进行文件读写的工具类 //参数1:输入流 参数2:输出流 参数3:缓冲大小 IOUtils.copyBytes(in, out, 1024); //关闭流 in.close(); out.close(); //参数1:输入流 参数2:输出流 参数3:缓冲大小 参数4:执行完成 是否关闭流 IOUtils.copyBytes(in, out, 1024, true); //参数3---long 读取的字节数 读取指定的字节数 IOUtils.copyBytes(in, out, 2L, true); } }
IO 下载 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 package pers.sfl.hdfs; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; import java.io.FileOutputStream; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; public class testIO2 { public static void main(String[] args) throws IOException, InterruptedException, URISyntaxException { Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(new URI("hdfs://hadoop01:9000"), conf, "hadoop"); //文件下载 // hdfs -> 读 -> 输入流 // 本地 -> 写 -> 输出流 //hdfs的输入流 fs FSDataInputStream in = fs.open(new Path("/movie_yy")); //这个方法将流的指针设置到某一个字节开始读取 参数--偏移量 in.seek(10L); //创建本地的输出流 FileOutputStream out = new FileOutputStream("D:\\moo02"); //进行流的复制 //参数3是Long类型:读取的字节数 读取指定的字节数 IOUtils.copyBytes(in, out, 100L, true); } }
crc校验
检测数据是否损坏的措施是,在数据第一次引入系统时候计算校验和(checksum),并在数据通过一个不可靠的通道时候进行传输时再次计算校验和,这样就能发现数据是否损坏了,如果两次计算的校验和不匹配,你就认为数据已经损坏了,但是该技术不能修复数据,它只能检测出错误。常用的错误检测码是CRC-32(循环冗余校验),任何大小的数据输入均计算得到一个32位的整数校验和。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 package pers.sfl.hdfs; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; public class TestCrc { public static void main(String[] args) throws IOException, InterruptedException, URISyntaxException { Configuration conf=new Configuration(); FileSystem fs=FileSystem.get(new URI("hdfs://hadoop01:9000"), conf,"hadoop"); Path src=new Path("/testcrc"); Path dst=new Path("D:\\test04"); /** * 文件下载过程中 只要有一个是没有损坏的副本 下载是没有损坏的块 crc校验是可通过的 * * crc校验的时候:校验的内容只是原始文件的偏移量内的内容 * 只要这部分内容没有发生变化 -> 校验通过 * 这部分内容 发生变化 -> 校验不通过的 */ fs.copyToLocalFile(src, dst); fs.close(); } }