HDFS Java API编程示例

HDFS是一个分布式文件系统,既然是文件系统,就可以对其文件进行操作,比如说新建文件、删除文件、读取文件内容等操作。 Java程序使用 HDFS JavaAPI与 HDFS交互。 使用这个 API,我们可以从 Java程序中使用存储在 HDFS中的数据,以及使用其它非 Hadoop计算框架来处理这些数据。

一、HDFS Java API读写示例

【示例】通过 Java程序使用 HDFS JavaAPI操作 HDFS文件系统。

该 Java程序会在 HDFS中创建一个新的文件,并在其中写入一些文本,然后从 HDFS读取回这个文件。

import java.io.IOException; 
import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.fs.FSDataInputStream; 
import org.apache.hadoop.fs.FSDataOutputStream; 
import org.apache.hadoop.fs.FileSystem; 
import org.apache.hadoop.fs.Path; 

public class HDFSJavaAPIDemo { 
    public static String hdfsUrl = "hdfs://xueai8:8020"; 

    public static void main(String[] args) throws IOException { 
        readWriteHDFSFile(); 
    } 

    // 读写HDFS上文件的方法
    private static void readWriteHDFSFile() throws IOException{ 
        Configuration conf = new Configuration(); 
        FileSystem fs = FileSystem.get(URI.create(hdfsUrl), conf); 

        Path file = new Path("demo.txt"); 
        if(fs.exists(file)){ 
            System.out.println("文件已经存在。"); 
        }else{ 
            //写文件 
            FSDataOutputStream outStream = fs.create(file); 

            //在给定路径创建一个新的文件 
            outStream.writeUTF("欢迎使用 HDFS JavaAPI !!!"); 
            outStream.close(); 
        } 

        //读文件 
        FSDataInputStream inStream = fs.open(file); 
        String data = inStream.readUTF(); 
        System.out.println("从 HDFS文件中读取到本地的内容:" + data); 

        inStream.close(); 
        fs.close(); 
    } 
} 

二、HDFS Java API文件过滤和合并示例

【示例】显示HDFS文件系统中指定路径下的目录中所有的文件信息。

  • 1)从该目录中过滤出所有后缀名为".abc"的文件。
  • 2)对过滤之后的文件进行读取。
  • 3)最后,将这些文件的内容合并到文件merge.txt中。
import java.io.IOException;
import java.io.PrintStream;
import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.io.IOUtils;

// 自定义过滤器类
class MyPathFilter implements PathFilter{
    String reg = null;       // 接收指定的正则表达式
	
    MyPathFilter(String reg){
	this.reg = reg;
    }

    // 过滤指定路径下的文件
    public boolean accept(Path path) {
	if((path.toString().matches(reg))) {
            return true;
	}
	return false;
    }
}

/**
 * 利用FSDataOutputStream和FSDataInputStream合并HDFS中的文件
 */
class Merge{
    Path inputPath = null;	// 待合并的文件所在的目录的路径
    Path outputPath = null;	// 输出文件的路径
	
    public Merge(String input, String output) {
	this.inputPath = new Path(input);
	this.outputPath = new Path(output);
    }
	
    public void doMerge() throws IOException, InterruptedException{
	Configuration conf = new Configuration();		
	FileSystem fs = FileSystem.get(URI.create("hdfs://xueai8:8020"), conf, "hduser");
		
	FSDataOutputStream out = fs.create(outputPath);
		
	// 下面过滤掉输入目录中后缀为.abc的文件
	FileStatus[] sourceStatus = fs.listStatus(inputPath,new MyPathFilter(".*\\.abc"));		
	System.out.println(sourceStatus.length);
		
	// 下面分别读取过滤之后的每个文件的内容,并输出到同一个文件中
	for(FileStatus sta : sourceStatus) {			
            // 下面打印后缀为.abc的文件的路径、文件大小
            System.out.println("路径:" + sta.getPath() 
				+ "  文件大小:" + sta.getLen() 
				+ "  权限:" + sta.getPermission() 
				+ "  内容:");

            FSDataInputStream in = fs.open(sta.getPath());	// 打开输入流
			
            byte[] data = new byte[1024];
            int length = -1;
            PrintStream ps = new PrintStream(System.out);
			
            while((length=in.read(data))>0) {
		ps.write(data,0,length);		// 输出到屏幕
		out.write(data,0,length);		// 输出到合并文件
            }
            in.close();
            ps.close();
	}
	out.close();
    }
	
    // 下面这个方法使用了IOUtils的copyBytes方法,以及seek方法重定位
    public void doMerge2() throws IOException, InterruptedException{
	Configuration conf = new Configuration();
		
	FileSystem fs = FileSystem.get(URI.create("hdfs://xueai8:8020"), conf, "hduser");
	FSDataOutputStream out = fs.create(outputPath);
		
	// 下面过滤出输入目录中后缀为.abc的文件
	FileStatus[] sourceStatus = fs.listStatus(inputPath,new MyPathFilter(".*\\.abc"));		
		
	// 下面分别读取过滤之后的每个文件的内容,并输出到同一个文件中
	for(FileStatus sta : sourceStatus) {
            // 下面打印后缀为.abc的文件的路径、文件大小
            System.out.println("\n路径:" + sta.getPath() 
				+ "  文件大小:" + sta.getLen() 
				+ "  权限:" + sta.getPermission() 
				+ "  内容:");
            FSDataInputStream in = fs.open(sta.getPath());	// 打开输出流				
            PrintStream ps = new PrintStream(System.out);
			
            // 使用IOUtils的copyBytes方法
            IOUtils.copyBytes(in, out, 4096);
            in.seek(0);		// 重定位
            IOUtils.copyBytes(in, ps, 4096);			
        }
	out.close();
    }
}

// 测试
public class PathFilterDemo {

    public static void main(String[] args) throws IOException, InterruptedException {
	String path1 = "/";
	String path2 = "/merge5.txt";
		
	Merge merge = new Merge(path1, path2);
	merge.doMerge2();
    }	
}     

三、java.net.URL

对象java.net.URL用于读取文件的内容。

首先,我们需要让Java识别Hadoop的HDFS URL schema。这是通过在URL对象上调用setURLStreamHandlerFactory方法来完成的, 并将FsUrlStreamHandlerFactory的一个实例传递给它。 每个JVM只需要执行这个方法一次,因此它被封装在一个静态块中 (这个限制意味着如果程序的其他组件已经声明了一个setURLStreamHandlerFactory实例,将无法使用这种方法从hadoop中读取数据)。

【示例】对象java.net.URL用于读取文件的内容。

public class URLCat {

    static {
        URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory());
    }

    /**
     * 读取hadoop文件系统中文件的内容(不推荐使用)
     */
    public static void main(String[] args) throws Exception {
        InputStream in = null;
        try {
            // 调用java.net.URL对象打开数据流
            in = new URL("hdfs://xueai8:8020/test.txt").openStream();

            // 调用copyBytes函数,可以在输入流和输出流之间复制数据,
            // 输出到控制台,第三个参数是设置缓冲区大小,最后一个,设置复制结束后是否关闭数据流
            IOUtils.copyBytes(inputStream, System.out, 4096, false);
        } catch (IOException e) {
            e.printStackTrace();
        }finally{
            //关闭数据流
            IOUtils.closeStream(in);
        }
    }
}

《PySpark原理深入与编程实战》