使用Java API访问HDFS
HDFS是一个分布式文件系统,既然是文件系统,就可以对其文件进行操作,比如说新建文件、删除文件、读取文件内容等操作。 Java程序使用 HDFS JavaAPI与 HDFS交互。 使用这个 API,我们可以从 Java程序中使用存储在 HDFS中的数据,以及使用其它非 Hadoop计算框架来处理这些数据。
为了以编程方式与Hadoop的文件系统交互,Hadoop提供了多个JAVA类。在名为org.apache.hadoop.fs的包中包含了在Hadoop文件系统中操作文件时有用的类。 这些操作包括:打开、读取、写入和关闭。实际上,Hadoop的文件API是通用的,可以扩展到与除HDFS之外的其他文件系统交互。
对存储在HDFS中的文件操作主要涉及以下几个类:
- Configuration类:该类的对象封装了客户端或者服务器的配置。它将Hadoop配置信息传递给FileSystem。 它通过类加载器加载core-site.xml和core-default.xml,并保存Hadoop配置信息,如fs.defaultFS、fs.default.name等。
- FileSystem类:该类的对象代表一个文件系统对象,可以用该对象的一些方法来对文件进行操作。
- FSDataInputStream和 FSDataOutputStream:这两个类是 HDFS中的输入输出流。分别通过 FileSystem的 open方法和 create方法获得。
- Path:代表文件或文件对象。下面这个示例演示了怎样通过 Java程序使用 HDFS JavaAPI来执行在 HDFS上的文件。
- FileStatus:代表文件信息。
Hadoop的org.apache.hadoop.fs.FileSystem是访问和管理位于分布式环境中的HDFS文件/目录的通用类。 Hadoop提供了FileSystem的各种实现,如下所述:
- DistributedFileSystem:在分布式环境下访问HDFS文件。
- LocalFileSystem:访问本地系统中的HDFS文件。
- FTPFileSystem:访问HDFS文件的FTP客户端。
- WebHdfsFileSystem:通过web访问HDFS文件。
HDFS分布式文件系统的 JAVA API提供了丰富的访问接口。主要包括:目录的创建、列表、查询、删除和文件的创建 (写入 )、读取等。
---------------------------------------------------------------一、HDFS目录操作
在HDFS上创建目录:
public void creatDir(){ String url="hdfs://xueai8:8020/user/test"; //configuration封装了HDFS客户端或者HDFS集群的配置信息, //该方法通过给定的URI方案和权限来确定要使用的文件系统 Configuration configuration=new Configuration(); try { //通过给定的URI方案和权限来确定要使用的文件系统 fs = FileSystem.get(URI.create(url), configuration); fs.mkdirs(new Path(url)); System.out.println("========================"); } catch (IOException e) { e.printStackTrace(); } }
在HDFS上删除目录:
public void deleteDir(){ String url="hdfs://xueai8:8020/user/test"; Configuration configuration=new Configuration(); try { //通过给定的URI方案和权限来确定要使用的文件系统 fs = FileSystem.get(URI.create(url), configuration); fs.delete(new Path(url)); System.out.println("========================"); } catch (IOException e) { e.printStackTrace(); } }
列出指定目录下的文件或目录名称:
public void listFiles(){ String urls[] = {"hdfs://xueai8:8020/user/","hdfs://xueai8:8020/user/test.txt"}; Configuration configuration=new Configuration(); try { //通过给定的URI方案和权限来确定要使用的文件系统 fs = FileSystem.get(URI.create(urls[1]), configuration); System.out.println("user目录下的方件有"); System.out.println("===================================="); //调用FileSystem中的listStatus()方法返回一个FileStatus[]数组 FileStatus[] listFiles = fs.listStatus(new Path(urls[0])); //遍历listFiles for (int i = 0; i < listFiles.length; i++) { FileStatus fileStatus = listFiles[i]; System.out.println(fileStatus.getPath().toString()); } //FileStatus类中封装了文件系统中文件和目录的元数据,包括文件的长度、块大小、复本、所有者、及权限信息 FileStatus file = fs.getFileStatus(new Path(urls[1])); //文件大小 long lenthg = file.getLen(); //块大小 long size = file.getBlockSize(); //最近修改时间 long time = file.getModificationTime(); //副本数 int n = file.getReplication(); //所有者 String owner = file.getOwner(); //权限信息 String chmod = file.getPermission().toString(); System.out.println("user目录下的文件所具有的属性"); System.out.println("===================================="); System.out.println("文件大小是:" + lenthg); System.out.println("块大小" + size); System.out.println("最近修改时间:" + time); System.out.println("复本数" + n); System.out.println("文件所有者" + owner); System.out.println("权限信息" + chmod); //关闭输入流 fs.close(); } catch (IOException e) { e.printStackTrace(); } }
列出指定目录下的内容(文件/子目录):
static final String hdfsurl = "hdfs://xueai8:8020"; public void list(String dir) throws IOException { // 获取配置信息 Configuration conf = new Configuration(); // 获取文件系统抽象-hdfs FileSystem fs = FileSystem.get(URI.create(hdfsurl), conf); // 调用自定义的辅助方法(递归方法) listDir(fs, new Path(dir)); } // 列出指定目录下所有的内容 private void listDir(FileSystem fs, Path dir) throws IOException { FileStatus[] status = fs.listStatus(dir); // 获取目录dir下所有的FileStatus对象 for (int i = 0; i < status.length; i++) { // 遍历 if (status[i].isDirectory()) { // 如果当前这个FileStatus是一个目录 Path dirPath = status[i].getPath(); System.out.println("## " + dirPath.toString()); // 递归显示 listDir(fs, status[i].getPath()); // 继续 } else { System.out.println(status[i].getPath().toString()); } } }---------------------------------------------------------------------------
二、HDFS文件操作
判断一个文件是否存在:
static final String hdfsurl = "hdfs://xueai8:8020"; // 判断一个文件是否存在 public void exists(String file) throws IOException { // 获取配置信息 Configuration conf = new Configuration(); // 获取文件系统抽象-hdfs FileSystem fs = FileSystem.get(URI.create(hdfsurl), conf); // scheme // 执行操作 boolean flag = fs.exists(new Path(file)); if (flag) { System.out.println("文件存在"); } else { System.out.println("文件不存在"); } }
上传文本文件到HDFS上:
static final String hdfsurl = "hdfs://xueai8:8020"; public void upload(String src, String dst) throws IOException { // 获取配置信息 Configuration conf = new Configuration(); // 获取文件系统抽象-hdfs FileSystem fs = FileSystem.get(URI.create(hdfsurl), conf); // 需要两个流:一个指向源文件的输入流,一个指向目标文件的输出流 FSDataOutputStream out = fs.create(new Path(dst)); // 输出流 BufferedReader reader = new BufferedReader(new FileReader(src)); // 输入流 // 从输入流读取每一行,写出到输出流中 String line = null; while ((line = reader.readLine()) != null) { out.writeUTF(line); } // 关闭输入输出流 reader.close(); out.close(); System.out.println("上传结束"); }
上传非文本文件到HDFS上:
static final String hdfsurl = "hdfs://xueai8:8020"; public void upload2(String src, String dst) throws IOException { // 获取配置信息 Configuration conf = new Configuration(); // 获取文件系统抽象-hdfs FileSystem fs = FileSystem.get(URI.create(hdfsurl), conf); // 需要两个流:一个指向源文件的输入流,一个指向目标文件的输出 流 FSDataOutputStream out = fs.create(new Path(dst)); // 输出流 FileInputStream in = new FileInputStream(src); // 输入流 // 使用 IOUtils.copyBytes(in, out, 4096, true); // 关闭输入输出流 in.close(); out.close(); System.out.println("上传结束"); }
从HDFS上下载文件到本地:
static final String hdfsurl = "hdfs://xueai8:8020"; public void download(String src, String dst) throws IOException { // 获取配置信息 Configuration conf = new Configuration(); // 获取文件系统抽象-hdfs FileSystem fs = FileSystem.get(URI.create(hdfsurl), conf); // 需要两个流:一个指向源文件的输入流,一个指向目标文件的输出 流 FSDataInputStream in = fs.open(new Path(src)); FileOutputStream out = new FileOutputStream(dst); // 使用 IOUtils.copyBytes(in, out, 4096); // 关闭输入输出流 in.close(); out.close(); System.out.println("下载结束"); }
在HDFS上创建文件并写入内容:
static final String hdfsurl = "hdfs://xueai8:8020"; public void create(String file, String content) throws IOException { // 获取配置信息 Configuration conf = new Configuration(); // 获取文件系统抽象-hdfs FileSystem fs = FileSystem.get(URI.create(hdfsurl), conf); FSDataOutputStream out = fs.create(new Path(file)); // out向DataNode写数据 out.writeUTF(content); out.close(); System.out.println("文件已创建"); }
查看HDFS上指定文本文件的内容:
static final String hdfsurl = "hdfs://xueai8:8020"; public void read(String file) throws IOException { // 获取配置信息 Configuration conf = new Configuration(); // 获取文件系统抽象-hdfs FileSystem fs = FileSystem.get(URI.create(hdfsurl), conf); // 打开文件输入流 FSDataInputStream in = fs.open(new Path(file)); // 读取文本内容 String content = in.readUTF(); System.out.println(content); in.close(); }
带有进度提示信息的文件写入:
static final String hdfsurl = "hdfs://xueai8:8020"; public void writeWithProgress(String file) throws IOException{ StringBuffer sb = new StringBuffer(); //创建辅助可变字符串 Random rand = new Random(); //创建一些随机数 for(int i=0; i<9;i++){ //随机写入字符 sb.append(rand.nextInt(100)); } byte[] buff = sb.toString().getBytes(); //生成字节数组 Configuration conf = new Configuration(); // 获取配置信息 FileSystem fs = FileSystem.get(URI.create(hdfsurl), conf);// 获得文件系统引用 // 创建写出流 FSDataOutputStream fsout = fs.create(new Path(file),new Progressable(){ @Override public void progress(){ //默认的实用方法 System.out.println("."); //打印出序列号 } }); fsout.write(buff); // 开始写出操作 fsout.close(); // 关闭写出流 }
获取指定文件的属性:
static final String hdfsurl = "hdfs://xueai8:8020"; public void status(String file) throws IOException { // 获取配置信息 Configuration conf = new Configuration(); // 获取文件系统抽象-hdfs FileSystem fs = FileSystem.get(URI.create(hdfsurl), conf); // 获得指定文件的状态对象 FileStatus fileStatus = fs.getFileStatus(new Path(file)); System.out.println(fileStatus.getLen()); System.out.println(fileStatus.getPath()); //获取绝对路径 System.out.println(fileStatus.getPath().toUri().getPath()); //获取相对路径 System.out.println(fileStatus.getBlockSize()); //获取当前Block大小 System.out.println(fileStatus.getGroup()); //获取所属组 System.out.println(fileStatus.getOwner()); //获取所有者 }
查找某个HDFS文件所有的block位置信息
static final String hdfsurl = "hdfs://xueai8:8020"; public void getBlockLocations(String file) throws IOException { // 获取配置信息 Configuration conf = new Configuration(); // 获取文件系统抽象-hdfs FileSystem fs = FileSystem.get(URI.create(hdfsurl), conf); Path path = new Path(file); FileStatus status = fs.getFileStatus(path); BlockLocation[] locations = fs.getFileBlockLocations(status, 0, status.getLen()); for (BlockLocation bl : locations) { String[] hosts = bl.getHosts(); for (String host : hosts) { System.out.println(host); } String[] names = bl.getNames(); for (String name : names) { System.out.println(name); } } }
其中getFileBlockLocations方法签名如下:
public BlockLocation[] getFileBlockLocations(Path p, long start, long len)
获取集群中所有DataNode节点信息:
static final String hdfsurl = "hdfs://xueai8:8020"; public void getAllDataNodes() throws IOException { // 获取配置信息 Configuration conf = new Configuration(); // 获取文件系统抽象-hdfs FileSystem fs = FileSystem.get(URI.create(hdfsurl), conf); // 将fs转换为DistributedFileSystem DistributedFileSystem dfs = (DistributedFileSystem)fs; // 获取所有DataNode节点信息 DatanodeInfo[] dnInfo = dfs.getDataNodeStats(); // 遍历输出所有节点信息 for(DatanodeInfo di : dnInfo) { System.out.println(di.getHostName()); System.out.println(di.getInfoAddr()); System.out.println(di.getName()); } }
随机读取HDFS文件内容。
FSDaraInputStream实现了Seekable接口的seek (long pos)和getPos()方法。 seek()方法将文件查找到文件开头的给定偏移量,这样read()将流到该位置, 而getPos()方法将返回InputStream上的当前位置。
static final String hdfsurl = "hdfs://xueai8:8020"; public void seek(String file) throws IOException{ Configuration conf = new Configuration(); // 获取环境变量 FileSystem fs = FileSystem.get(URI.create(hdfsurl), conf); FSDataInputStream fsin = fs.open(new Path(file)); // 建立输入流 byte[] buff = new byte[5]; // 建立缓存数组 int length = 0; // 辅助长度 while ((length = fsin.read(buff, 0, 5)) != -1) { // 将数据读入缓存数组 System.out.println("缓存数组的长度:" + length); System.out.println(new String(buff, 0, length)); // 打印数据 } System.out.println("length=" + fsin.getPos()); // 打印输出流的长度 fsin.seek(10); // 返回要读取文件的第11个字节处,空格也算字节 while ((length = fsin.read(buff, 0, 128)) != -1) { // 将数据读入缓存 System.out.println(new String(buff, 0, length)); // 打印数据 } fsin.seek(0); //返回开始处 byte[] buff2 = new byte[128]; // 建立辅助字节数组 fsin.read(buff2, 0, 128); // 将数据读入缓存数组 System.out.println("buff2=" + new String(buff2)); // 打印数据 System.out.println(buff2.length); // 打印数组长度 }