HDFS Java API编程示例
HDFS是一个分布式文件系统,既然是文件系统,就可以对其文件进行操作,比如说新建文件、删除文件、读取文件内容等操作。 Java程序使用 HDFS JavaAPI与 HDFS交互。 使用这个 API,我们可以从 Java程序中使用存储在 HDFS中的数据,以及使用其它非 Hadoop计算框架来处理这些数据。
一、HDFS Java API读写示例
【示例】通过 Java程序使用 HDFS JavaAPI操作 HDFS文件系统。
该 Java程序会在 HDFS中创建一个新的文件,并在其中写入一些文本,然后从 HDFS读取回这个文件。
import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; public class HDFSJavaAPIDemo { public static String hdfsUrl = "hdfs://xueai8:8020"; public static void main(String[] args) throws IOException { readWriteHDFSFile(); } // 读写HDFS上文件的方法 private static void readWriteHDFSFile() throws IOException{ Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(URI.create(hdfsUrl), conf); Path file = new Path("demo.txt"); if(fs.exists(file)){ System.out.println("文件已经存在。"); }else{ //写文件 FSDataOutputStream outStream = fs.create(file); //在给定路径创建一个新的文件 outStream.writeUTF("欢迎使用 HDFS JavaAPI !!!"); outStream.close(); } //读文件 FSDataInputStream inStream = fs.open(file); String data = inStream.readUTF(); System.out.println("从 HDFS文件中读取到本地的内容:" + data); inStream.close(); fs.close(); } }
二、HDFS Java API文件过滤和合并示例
【示例】显示HDFS文件系统中指定路径下的目录中所有的文件信息。
- 1)从该目录中过滤出所有后缀名为".abc"的文件。
- 2)对过滤之后的文件进行读取。
- 3)最后,将这些文件的内容合并到文件merge.txt中。
import java.io.IOException; import java.io.PrintStream; import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.io.IOUtils; // 自定义过滤器类 class MyPathFilter implements PathFilter{ String reg = null; // 接收指定的正则表达式 MyPathFilter(String reg){ this.reg = reg; } // 过滤指定路径下的文件 public boolean accept(Path path) { if((path.toString().matches(reg))) { return true; } return false; } } /** * 利用FSDataOutputStream和FSDataInputStream合并HDFS中的文件 */ class Merge{ Path inputPath = null; // 待合并的文件所在的目录的路径 Path outputPath = null; // 输出文件的路径 public Merge(String input, String output) { this.inputPath = new Path(input); this.outputPath = new Path(output); } public void doMerge() throws IOException, InterruptedException{ Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(URI.create("hdfs://xueai8:8020"), conf, "hduser"); FSDataOutputStream out = fs.create(outputPath); // 下面过滤掉输入目录中后缀为.abc的文件 FileStatus[] sourceStatus = fs.listStatus(inputPath,new MyPathFilter(".*\\.abc")); System.out.println(sourceStatus.length); // 下面分别读取过滤之后的每个文件的内容,并输出到同一个文件中 for(FileStatus sta : sourceStatus) { // 下面打印后缀为.abc的文件的路径、文件大小 System.out.println("路径:" + sta.getPath() + " 文件大小:" + sta.getLen() + " 权限:" + sta.getPermission() + " 内容:"); FSDataInputStream in = fs.open(sta.getPath()); // 打开输入流 byte[] data = new byte[1024]; int length = -1; PrintStream ps = new PrintStream(System.out); while((length=in.read(data))>0) { ps.write(data,0,length); // 输出到屏幕 out.write(data,0,length); // 输出到合并文件 } in.close(); ps.close(); } out.close(); } // 下面这个方法使用了IOUtils的copyBytes方法,以及seek方法重定位 public void doMerge2() throws IOException, InterruptedException{ Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(URI.create("hdfs://xueai8:8020"), conf, "hduser"); FSDataOutputStream out = fs.create(outputPath); // 下面过滤出输入目录中后缀为.abc的文件 FileStatus[] sourceStatus = fs.listStatus(inputPath,new MyPathFilter(".*\\.abc")); // 下面分别读取过滤之后的每个文件的内容,并输出到同一个文件中 for(FileStatus sta : sourceStatus) { // 下面打印后缀为.abc的文件的路径、文件大小 System.out.println("\n路径:" + sta.getPath() + " 文件大小:" + sta.getLen() + " 权限:" + sta.getPermission() + " 内容:"); FSDataInputStream in = fs.open(sta.getPath()); // 打开输出流 PrintStream ps = new PrintStream(System.out); // 使用IOUtils的copyBytes方法 IOUtils.copyBytes(in, out, 4096); in.seek(0); // 重定位 IOUtils.copyBytes(in, ps, 4096); } out.close(); } } // 测试 public class PathFilterDemo { public static void main(String[] args) throws IOException, InterruptedException { String path1 = "/"; String path2 = "/merge5.txt"; Merge merge = new Merge(path1, path2); merge.doMerge2(); } }
三、java.net.URL
对象java.net.URL用于读取文件的内容。
首先,我们需要让Java识别Hadoop的HDFS URL schema。这是通过在URL对象上调用setURLStreamHandlerFactory方法来完成的, 并将FsUrlStreamHandlerFactory的一个实例传递给它。 每个JVM只需要执行这个方法一次,因此它被封装在一个静态块中 (这个限制意味着如果程序的其他组件已经声明了一个setURLStreamHandlerFactory实例,将无法使用这种方法从hadoop中读取数据)。
【示例】对象java.net.URL用于读取文件的内容。
public class URLCat { static { URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory()); } /** * 读取hadoop文件系统中文件的内容(不推荐使用) */ public static void main(String[] args) throws Exception { InputStream in = null; try { // 调用java.net.URL对象打开数据流 in = new URL("hdfs://xueai8:8020/test.txt").openStream(); // 调用copyBytes函数,可以在输入流和输出流之间复制数据, // 输出到控制台,第三个参数是设置缓冲区大小,最后一个,设置复制结束后是否关闭数据流 IOUtils.copyBytes(inputStream, System.out, 4096, false); } catch (IOException e) { e.printStackTrace(); }finally{ //关闭数据流 IOUtils.closeStream(in); } } }