HDFS Java API编程示例
HDFS是一个分布式文件系统,既然是文件系统,就可以对其文件进行操作,比如说新建文件、删除文件、读取文件内容等操作。 Java程序使用 HDFS JavaAPI与 HDFS交互。 使用这个 API,我们可以从 Java程序中使用存储在 HDFS中的数据,以及使用其它非 Hadoop计算框架来处理这些数据。
一、HDFS Java API读写示例
【示例】通过 Java程序使用 HDFS JavaAPI操作 HDFS文件系统。
该 Java程序会在 HDFS中创建一个新的文件,并在其中写入一些文本,然后从 HDFS读取回这个文件。
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
public class HDFSJavaAPIDemo {
public static String hdfsUrl = "hdfs://xueai8:8020";
public static void main(String[] args) throws IOException {
readWriteHDFSFile();
}
// 读写HDFS上文件的方法
private static void readWriteHDFSFile() throws IOException{
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(hdfsUrl), conf);
Path file = new Path("demo.txt");
if(fs.exists(file)){
System.out.println("文件已经存在。");
}else{
//写文件
FSDataOutputStream outStream = fs.create(file);
//在给定路径创建一个新的文件
outStream.writeUTF("欢迎使用 HDFS JavaAPI !!!");
outStream.close();
}
//读文件
FSDataInputStream inStream = fs.open(file);
String data = inStream.readUTF();
System.out.println("从 HDFS文件中读取到本地的内容:" + data);
inStream.close();
fs.close();
}
}
二、HDFS Java API文件过滤和合并示例
【示例】显示HDFS文件系统中指定路径下的目录中所有的文件信息。
- 1)从该目录中过滤出所有后缀名为".abc"的文件。
- 2)对过滤之后的文件进行读取。
- 3)最后,将这些文件的内容合并到文件merge.txt中。
import java.io.IOException;
import java.io.PrintStream;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.io.IOUtils;
// 自定义过滤器类
class MyPathFilter implements PathFilter{
String reg = null; // 接收指定的正则表达式
MyPathFilter(String reg){
this.reg = reg;
}
// 过滤指定路径下的文件
public boolean accept(Path path) {
if((path.toString().matches(reg))) {
return true;
}
return false;
}
}
/**
* 利用FSDataOutputStream和FSDataInputStream合并HDFS中的文件
*/
class Merge{
Path inputPath = null; // 待合并的文件所在的目录的路径
Path outputPath = null; // 输出文件的路径
public Merge(String input, String output) {
this.inputPath = new Path(input);
this.outputPath = new Path(output);
}
public void doMerge() throws IOException, InterruptedException{
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create("hdfs://xueai8:8020"), conf, "hduser");
FSDataOutputStream out = fs.create(outputPath);
// 下面过滤掉输入目录中后缀为.abc的文件
FileStatus[] sourceStatus = fs.listStatus(inputPath,new MyPathFilter(".*\\.abc"));
System.out.println(sourceStatus.length);
// 下面分别读取过滤之后的每个文件的内容,并输出到同一个文件中
for(FileStatus sta : sourceStatus) {
// 下面打印后缀为.abc的文件的路径、文件大小
System.out.println("路径:" + sta.getPath()
+ " 文件大小:" + sta.getLen()
+ " 权限:" + sta.getPermission()
+ " 内容:");
FSDataInputStream in = fs.open(sta.getPath()); // 打开输入流
byte[] data = new byte[1024];
int length = -1;
PrintStream ps = new PrintStream(System.out);
while((length=in.read(data))>0) {
ps.write(data,0,length); // 输出到屏幕
out.write(data,0,length); // 输出到合并文件
}
in.close();
ps.close();
}
out.close();
}
// 下面这个方法使用了IOUtils的copyBytes方法,以及seek方法重定位
public void doMerge2() throws IOException, InterruptedException{
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create("hdfs://xueai8:8020"), conf, "hduser");
FSDataOutputStream out = fs.create(outputPath);
// 下面过滤出输入目录中后缀为.abc的文件
FileStatus[] sourceStatus = fs.listStatus(inputPath,new MyPathFilter(".*\\.abc"));
// 下面分别读取过滤之后的每个文件的内容,并输出到同一个文件中
for(FileStatus sta : sourceStatus) {
// 下面打印后缀为.abc的文件的路径、文件大小
System.out.println("\n路径:" + sta.getPath()
+ " 文件大小:" + sta.getLen()
+ " 权限:" + sta.getPermission()
+ " 内容:");
FSDataInputStream in = fs.open(sta.getPath()); // 打开输出流
PrintStream ps = new PrintStream(System.out);
// 使用IOUtils的copyBytes方法
IOUtils.copyBytes(in, out, 4096);
in.seek(0); // 重定位
IOUtils.copyBytes(in, ps, 4096);
}
out.close();
}
}
// 测试
public class PathFilterDemo {
public static void main(String[] args) throws IOException, InterruptedException {
String path1 = "/";
String path2 = "/merge5.txt";
Merge merge = new Merge(path1, path2);
merge.doMerge2();
}
}
三、java.net.URL
对象java.net.URL用于读取文件的内容。
首先,我们需要让Java识别Hadoop的HDFS URL schema。这是通过在URL对象上调用setURLStreamHandlerFactory方法来完成的, 并将FsUrlStreamHandlerFactory的一个实例传递给它。 每个JVM只需要执行这个方法一次,因此它被封装在一个静态块中 (这个限制意味着如果程序的其他组件已经声明了一个setURLStreamHandlerFactory实例,将无法使用这种方法从hadoop中读取数据)。
【示例】对象java.net.URL用于读取文件的内容。
public class URLCat {
static {
URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory());
}
/**
* 读取hadoop文件系统中文件的内容(不推荐使用)
*/
public static void main(String[] args) throws Exception {
InputStream in = null;
try {
// 调用java.net.URL对象打开数据流
in = new URL("hdfs://xueai8:8020/test.txt").openStream();
// 调用copyBytes函数,可以在输入流和输出流之间复制数据,
// 输出到控制台,第三个参数是设置缓冲区大小,最后一个,设置复制结束后是否关闭数据流
IOUtils.copyBytes(inputStream, System.out, 4096, false);
} catch (IOException e) {
e.printStackTrace();
}finally{
//关闭数据流
IOUtils.closeStream(in);
}
}
}