使用Java API访问HDFS

HDFS是一个分布式文件系统,既然是文件系统,就可以对其文件进行操作,比如说新建文件、删除文件、读取文件内容等操作。 Java程序使用 HDFS JavaAPI与 HDFS交互。 使用这个 API,我们可以从 Java程序中使用存储在 HDFS中的数据,以及使用其它非 Hadoop计算框架来处理这些数据。

为了以编程方式与Hadoop的文件系统交互,Hadoop提供了多个JAVA类。在名为org.apache.hadoop.fs的包中包含了在Hadoop文件系统中操作文件时有用的类。 这些操作包括:打开、读取、写入和关闭。实际上,Hadoop的文件API是通用的,可以扩展到与除HDFS之外的其他文件系统交互。

对存储在HDFS中的文件操作主要涉及以下几个类:

  • Configuration类:该类的对象封装了客户端或者服务器的配置。它将Hadoop配置信息传递给FileSystem。 它通过类加载器加载core-site.xml和core-default.xml,并保存Hadoop配置信息,如fs.defaultFS、fs.default.name等。
  • FileSystem类:该类的对象代表一个文件系统对象,可以用该对象的一些方法来对文件进行操作。
  • FSDataInputStream和 FSDataOutputStream:这两个类是 HDFS中的输入输出流。分别通过 FileSystem的 open方法和 create方法获得。
  • Path:代表文件或文件对象。下面这个示例演示了怎样通过 Java程序使用 HDFS JavaAPI来执行在 HDFS上的文件。
  • FileStatus:代表文件信息。

Hadoop的org.apache.hadoop.fs.FileSystem是访问和管理位于分布式环境中的HDFS文件/目录的通用类。 Hadoop提供了FileSystem的各种实现,如下所述:

  • DistributedFileSystem:在分布式环境下访问HDFS文件。
  • LocalFileSystem:访问本地系统中的HDFS文件。
  • FTPFileSystem:访问HDFS文件的FTP客户端。
  • WebHdfsFileSystem:通过web访问HDFS文件。

HDFS分布式文件系统的 JAVA API提供了丰富的访问接口。主要包括:目录的创建、列表、查询、删除和文件的创建 (写入 )、读取等。

---------------------------------------------------------------

一、HDFS目录操作

在HDFS上创建目录:

    public void creatDir(){
        String url="hdfs://xueai8:8020/user/test";

        //configuration封装了HDFS客户端或者HDFS集群的配置信息,
        //该方法通过给定的URI方案和权限来确定要使用的文件系统
        Configuration configuration=new Configuration();

        try {
            //通过给定的URI方案和权限来确定要使用的文件系统
            fs = FileSystem.get(URI.create(url), configuration);
            fs.mkdirs(new Path(url));
            System.out.println("========================");
        } catch (IOException e) {
            e.printStackTrace();
        }        
    }

在HDFS上删除目录:

    public void deleteDir(){
        String url="hdfs://xueai8:8020/user/test";
        Configuration configuration=new Configuration();
        try {
            //通过给定的URI方案和权限来确定要使用的文件系统
            fs = FileSystem.get(URI.create(url), configuration);
            fs.delete(new Path(url));
            System.out.println("========================");
        } catch (IOException e) {
            e.printStackTrace();
        }        
    }

列出指定目录下的文件或目录名称:

    public void listFiles(){
        String urls[] = {"hdfs://xueai8:8020/user/","hdfs://xueai8:8020/user/test.txt"};
        Configuration configuration=new Configuration();
        try {
            //通过给定的URI方案和权限来确定要使用的文件系统
            fs = FileSystem.get(URI.create(urls[1]), configuration);

            System.out.println("user目录下的方件有");
            System.out.println("====================================");

            //调用FileSystem中的listStatus()方法返回一个FileStatus[]数组
            FileStatus[] listFiles = fs.listStatus(new Path(urls[0]));

            //遍历listFiles
            for (int i = 0; i < listFiles.length; i++) {
                FileStatus fileStatus = listFiles[i];
                System.out.println(fileStatus.getPath().toString());
            }

            //FileStatus类中封装了文件系统中文件和目录的元数据,包括文件的长度、块大小、复本、所有者、及权限信息
            FileStatus file = fs.getFileStatus(new Path(urls[1]));

            //文件大小
            long lenthg = file.getLen();

            //块大小
            long size = file.getBlockSize();

            //最近修改时间
            long time = file.getModificationTime();

            //副本数
            int n = file.getReplication();

            //所有者
            String owner = file.getOwner();

            //权限信息
            String chmod = file.getPermission().toString();

            System.out.println("user目录下的文件所具有的属性");
            System.out.println("====================================");
            System.out.println("文件大小是:" + lenthg);
            System.out.println("块大小" + size);
            System.out.println("最近修改时间:" + time);
            System.out.println("复本数" + n);
            System.out.println("文件所有者" + owner);
            System.out.println("权限信息" + chmod);

            //关闭输入流
            fs.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

列出指定目录下的内容(文件/子目录):

    static final String hdfsurl = "hdfs://xueai8:8020";
    
    public void list(String dir) throws IOException {
	// 获取配置信息
	Configuration conf = new Configuration();

	// 获取文件系统抽象-hdfs
	FileSystem fs = FileSystem.get(URI.create(hdfsurl), conf);

        // 调用自定义的辅助方法(递归方法)
	listDir(fs, new Path(dir));
    }

    // 列出指定目录下所有的内容
    private void listDir(FileSystem fs, Path dir) throws IOException {
	FileStatus[] status = fs.listStatus(dir);   // 获取目录dir下所有的FileStatus对象

	for (int i = 0; i < status.length; i++) {   // 遍历
            if (status[i].isDirectory()) {          // 如果当前这个FileStatus是一个目录
		Path dirPath = status[i].getPath();
		System.out.println("## " + dirPath.toString());
		// 递归显示
		listDir(fs, status[i].getPath());   // 继续
            } else {
		System.out.println(status[i].getPath().toString());
            }
        }
    } 
---------------------------------------------------------------------------

二、HDFS文件操作

判断一个文件是否存在:

    static final String hdfsurl = "hdfs://xueai8:8020";

    // 判断一个文件是否存在
    public void exists(String file) throws IOException {
        // 获取配置信息
	Configuration conf = new Configuration();

	// 获取文件系统抽象-hdfs
	FileSystem fs = FileSystem.get(URI.create(hdfsurl), conf);	// scheme

	// 执行操作
	boolean flag = fs.exists(new Path(file));

	if (flag) {
            System.out.println("文件存在");
	} else {
            System.out.println("文件不存在");
	}
    }

上传文本文件到HDFS上:

    static final String hdfsurl = "hdfs://xueai8:8020";
	
    public void upload(String src, String dst) throws IOException {
	// 获取配置信息
	Configuration conf = new Configuration();

	// 获取文件系统抽象-hdfs
	FileSystem fs = FileSystem.get(URI.create(hdfsurl), conf);

	// 需要两个流:一个指向源文件的输入流,一个指向目标文件的输出流
	FSDataOutputStream out = fs.create(new Path(dst)); // 输出流
	BufferedReader reader = new BufferedReader(new FileReader(src)); // 输入流

        // 从输入流读取每一行,写出到输出流中
	String line = null;
	while ((line = reader.readLine()) != null) {
            out.writeUTF(line);
	}
	
        // 关闭输入输出流
        reader.close();
	out.close();
	System.out.println("上传结束");
    }

上传非文本文件到HDFS上:

    static final String hdfsurl = "hdfs://xueai8:8020";
    public void upload2(String src, String dst) throws IOException {
	// 获取配置信息
	Configuration conf = new Configuration();

	// 获取文件系统抽象-hdfs
	FileSystem fs = FileSystem.get(URI.create(hdfsurl), conf);

	// 需要两个流:一个指向源文件的输入流,一个指向目标文件的输出 流
	FSDataOutputStream out = fs.create(new Path(dst));  // 输出流
	FileInputStream in = new FileInputStream(src);      // 输入流

	// 使用
	IOUtils.copyBytes(in, out, 4096, true);

        // 关闭输入输出流
        in.close();
	out.close();
	System.out.println("上传结束");
    }

从HDFS上下载文件到本地:

    static final String hdfsurl = "hdfs://xueai8:8020";
	
    public void download(String src, String dst) throws IOException {
	// 获取配置信息
	Configuration conf = new Configuration();

	// 获取文件系统抽象-hdfs
	FileSystem fs = FileSystem.get(URI.create(hdfsurl), conf);

	// 需要两个流:一个指向源文件的输入流,一个指向目标文件的输出 流
	FSDataInputStream in = fs.open(new Path(src));
	FileOutputStream out = new FileOutputStream(dst);

	// 使用
	IOUtils.copyBytes(in, out, 4096);

        // 关闭输入输出流
        in.close();
	out.close();
	System.out.println("下载结束");
    }

在HDFS上创建文件并写入内容:

    static final String hdfsurl = "hdfs://xueai8:8020";

    public void create(String file, String content) throws IOException {
	// 获取配置信息
	Configuration conf = new Configuration();

	// 获取文件系统抽象-hdfs
	FileSystem fs = FileSystem.get(URI.create(hdfsurl), conf);

	FSDataOutputStream out = fs.create(new Path(file));
	// out向DataNode写数据
	out.writeUTF(content);

	out.close();
        System.out.println("文件已创建");
    }

查看HDFS上指定文本文件的内容:

    static final String hdfsurl = "hdfs://xueai8:8020";

    public void read(String file) throws IOException {
	// 获取配置信息
	Configuration conf = new Configuration();

	// 获取文件系统抽象-hdfs
	FileSystem fs = FileSystem.get(URI.create(hdfsurl), conf);

        // 打开文件输入流
	FSDataInputStream in = fs.open(new Path(file));

        // 读取文本内容
	String content = in.readUTF();
	System.out.println(content);

        in.close();
    }

带有进度提示信息的文件写入:

    static final String hdfsurl = "hdfs://xueai8:8020";

    public void writeWithProgress(String file) throws IOException{
	StringBuffer sb = new StringBuffer();		//创建辅助可变字符串
	Random rand = new Random();			//创建一些随机数
	for(int i=0; i<9;i++){				//随机写入字符
            sb.append(rand.nextInt(100));
	}
	byte[] buff = sb.toString().getBytes();		//生成字节数组
		
	Configuration conf = new Configuration();	// 获取配置信息
	FileSystem fs = FileSystem.get(URI.create(hdfsurl), conf);// 获得文件系统引用

        // 创建写出流
	FSDataOutputStream fsout = fs.create(new Path(file),new Progressable(){
            @Override
            public void progress(){		//默认的实用方法
		System.out.println(".");	//打印出序列号
            }
	});

	fsout.write(buff);	// 开始写出操作
	fsout.close();		// 关闭写出流
    }

获取指定文件的属性:

    static final String hdfsurl = "hdfs://xueai8:8020";

    public void status(String file) throws IOException {
	// 获取配置信息
	Configuration conf = new Configuration();

	// 获取文件系统抽象-hdfs
	FileSystem fs = FileSystem.get(URI.create(hdfsurl), conf);

	// 获得指定文件的状态对象
	FileStatus fileStatus = fs.getFileStatus(new Path(file));

	System.out.println(fileStatus.getLen());
	System.out.println(fileStatus.getPath());       //获取绝对路径
	System.out.println(fileStatus.getPath().toUri().getPath()); //获取相对路径
	System.out.println(fileStatus.getBlockSize());  //获取当前Block大小
	System.out.println(fileStatus.getGroup());      //获取所属组
	System.out.println(fileStatus.getOwner());      //获取所有者
    }

查找某个HDFS文件所有的block位置信息

    static final String hdfsurl = "hdfs://xueai8:8020";

    public void getBlockLocations(String file) throws IOException {
	// 获取配置信息
	Configuration conf = new Configuration();

	// 获取文件系统抽象-hdfs
	FileSystem fs = FileSystem.get(URI.create(hdfsurl), conf);

	Path path = new Path(file);
	FileStatus status = fs.getFileStatus(path);
	BlockLocation[] locations = fs.getFileBlockLocations(status, 0, status.getLen());

	for (BlockLocation bl : locations) {
            String[] hosts = bl.getHosts();
            for (String host : hosts) {
		System.out.println(host);
            }
            String[] names = bl.getNames();
            for (String name : names) {
		System.out.println(name);
            }
	}
    }

其中getFileBlockLocations方法签名如下:

    public BlockLocation[] getFileBlockLocations(Path p, long start, long len)

获取集群中所有DataNode节点信息:

    static final String hdfsurl = "hdfs://xueai8:8020";

    public void getAllDataNodes() throws IOException {
	// 获取配置信息
	Configuration conf = new Configuration();

	// 获取文件系统抽象-hdfs
	FileSystem fs = FileSystem.get(URI.create(hdfsurl), conf);
		
	// 将fs转换为DistributedFileSystem
	DistributedFileSystem dfs = (DistributedFileSystem)fs;
	
        // 获取所有DataNode节点信息
	DatanodeInfo[] dnInfo = dfs.getDataNodeStats();
		
        // 遍历输出所有节点信息
	for(DatanodeInfo di : dnInfo) {
            System.out.println(di.getHostName());
            System.out.println(di.getInfoAddr());
            System.out.println(di.getName());
	}
    }

随机读取HDFS文件内容。

FSDaraInputStream实现了Seekable接口的seek (long pos)和getPos()方法。 seek()方法将文件查找到文件开头的给定偏移量,这样read()将流到该位置, 而getPos()方法将返回InputStream上的当前位置。

    static final String hdfsurl = "hdfs://xueai8:8020";

    public void seek(String file) throws IOException{
		
	Configuration conf = new Configuration();		// 获取环境变量
	FileSystem fs = FileSystem.get(URI.create(hdfsurl), conf);

	FSDataInputStream fsin = fs.open(new Path(file));	// 建立输入流
	byte[] buff = new byte[5];	// 建立缓存数组
	int length = 0;			// 辅助长度
	while ((length = fsin.read(buff, 0, 5)) != -1) {	// 将数据读入缓存数组
            System.out.println("缓存数组的长度:" + length);
            System.out.println(new String(buff, 0, length));    // 打印数据
	}

	System.out.println("length=" + fsin.getPos());		// 打印输出流的长度
	fsin.seek(10);	// 返回要读取文件的第11个字节处,空格也算字节
	while ((length = fsin.read(buff, 0, 128)) != -1) {	// 将数据读入缓存
            System.out.println(new String(buff, 0, length));	// 打印数据
	}

	fsin.seek(0);	//返回开始处
	byte[] buff2 = new byte[128];                           // 建立辅助字节数组
	fsin.read(buff2, 0, 128);				// 将数据读入缓存数组
	System.out.println("buff2=" + new String(buff2));	// 打印数据
	System.out.println(buff2.length);                       // 打印数组长度
    }

《PySpark原理深入与编程实战》