自定义Hadoop OutputFormat输出格式
在许多场景中,默认输出格式和RecordWriter类并不最适合某些需求。 我们可以通过扩展FileOutputFormatClass类来创建自己的自定义输出类,并覆盖它的方法来实现我们的目标。 在本节中,我们将讨论如何创建自定义输出类和RecordWriter,并将这些类用作MapReduce程序。
自定义OutputFormat类,需要继承自FileOutputFormat类,实现getRecordWriter方法,并根据需要编写逻辑。 该方法主要包含了根据需要构建RecordWriter类的逻辑。
RecordWriter负责从Mapper或Reducer阶段将输出key-value对写入输出文件,所以我们可以通过扩展RecordWriter类来创建自定义RecordWriter类。
问题描述
我们知道TextOutputFormat的默认记录分隔符是NEW_LINE。 考虑这样一个场景,我们的mapper/reducer生成一个输出值,默认包含一些NEW_LINE。 但是我们想配置不同的记录分隔符,而不是NEW_LINE字符。
在本例中,为了更好地理解,我们使用了带有自定义输出格式类的单词计数程序。 我们将使用自定义配置属性mapreduce.output.textoutputformat.recordseparator设置自定义记录分隔符。
一、创建Java Maven项目
Maven依赖:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>HadoopDemo</groupId> <artifactId>com.xueai8</artifactId> <version>1.0-SNAPSHOT</version> <dependencies> <!--hadoop依赖--> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>3.3.1</version> </dependency> <!--hdfs文件系统依赖--> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>3.3.1</version> </dependency> <!--MapReduce相关的依赖--> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-core</artifactId> <version>3.3.1</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-jobclient</artifactId> <version>3.3.1</version> </dependency> <!--junit依赖--> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <!--编译器插件用于编译拓扑--> <plugin> <groupId>org.apache.maven.plugins</groupId> <!--指定maven编译的jdk版本和字符集,如果不指定,maven3默认用jdk 1.5 maven2默认用jdk1.3--> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>1.8</source> <!-- 源代码使用的JDK版本 --> <target>1.8</target> <!-- 需要生成的目标class文件的编译版本 --> <encoding>UTF-8</encoding><!-- 字符集编码 --> </configuration> </plugin> </plugins> </build> </project>
WordCountLineRecordWriter.java:
这是自定义自定义RecordWriter类,需要扩展RecordWriter<K, V>类。
package com.xueai8.customoutput; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import java.io.DataOutputStream; import java.io.IOException; import java.nio.charset.StandardCharsets; /** * * 自定义RecordWriter类 */ public class WordCountLineRecordWriter<K, V> extends RecordWriter<K, V> { protected DataOutputStream out; private final byte[] recordSeprator; // 记录分隔符 private final byte[] fieldSeprator; // 字段分隔符 // 指定字段分隔符和记录分隔符 public WordCountLineRecordWriter(DataOutputStream out, String fieldSeprator,String recordSeprator) { this.out = out; this.fieldSeprator = fieldSeprator.getBytes(StandardCharsets.UTF_8); this.recordSeprator = recordSeprator.getBytes(StandardCharsets.UTF_8); } // 使用默认的字段分隔符和记录分隔符 public WordCountLineRecordWriter(DataOutputStream out) { this(out, "\t","\n"); } private void writeObject(Object o) throws IOException { if (o instanceof Text) { Text to = (Text)o; this.out.write(to.getBytes(), 0, to.getLength()); } else { this.out.write(o.toString().getBytes(StandardCharsets.UTF_8)); } } public synchronized void write(K key, V value) throws IOException { boolean nullKey = key == null || key instanceof NullWritable; boolean nullValue = value == null || value instanceof NullWritable; if (!nullKey || !nullValue) { if (!nullKey) { this.writeObject(key); } if (!nullKey && !nullValue) { this.out.write(this.fieldSeprator); } if (!nullValue) { this.writeObject(value); } this.out.write(recordSeprator); // 写出自定义的记录分隔符而不是NEW_LINE } } public synchronized void close(TaskAttemptContext context) throws IOException { this.out.close(); } }
WordCountOutputFormat.java:
自定义Hadoop OutputFormat类,来格式化输出结果。
package com.xueai8.customoutput; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.ReflectionUtils; import java.io.DataOutputStream; import java.io.IOException; public class WordCountOutputFormat<K,V> extends FileOutputFormat<K, V> { public static String FIELD_SEPARATOR = "mapreduce.output.textoutputformat.separator"; public static String RECORD_SEPARATOR = "mapreduce.output.textoutputformat.recordseparator"; @Override public RecordWriter<K, V> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException { Configuration conf = job.getConfiguration(); boolean isCompressed = getCompressOutput(job); // 指定字段分隔符,默认是 \t String fieldSeprator = conf.get(FIELD_SEPARATOR, "\t"); // 指定记录分隔符,默认是 \n String recordSeprator = conf.get(RECORD_SEPARATOR, "\n"); // 压缩输出逻辑 CompressionCodec codec = null; String extension = ""; if (isCompressed) { Class codecClass = getOutputCompressorClass(job, GzipCodec.class); codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf); extension = codec.getDefaultExtension(); } Path file = this.getDefaultWorkFile(job, extension); FileSystem fs = file.getFileSystem(conf); FSDataOutputStream fileOut = fs.create(file, false); if(isCompressed){ return new WordCountLineRecordWriter<>(new DataOutputStream(codec.createOutputStream(fileOut)), fieldSeprator, recordSeprator); }else{ return new WordCountLineRecordWriter<>(fileOut, fieldSeprator, recordSeprator); } } }
WordCountMapper.java:
Mapper类。
package com.xueai8.customoutput; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; import java.util.StringTokenizer; public class WordCountMapper extends Mapper<LongWritable, Text,Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { /* * 1. 转换为小写 * 2. 将所有标点符号替换为空格 * 3. 输入行分词 * 4. 将其写入HDFS * */ String line = value.toString().toLowerCase().replaceAll("\\p{Punct}"," "); StringTokenizer st = new StringTokenizer(line," "); while(st.hasMoreTokens()){ word.set(st.nextToken()); context.write(word,one); } } }
WordCountReducer.java:
Reducer类。
package com.xueai8.customoutput; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class WordCountReducer extends Reducer<Text, IntWritable,Text,IntWritable> { @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for(IntWritable value : values){ sum = sum + value.get(); } context.write(key, new IntWritable(sum)); } }
WordCountDriver.java:
驱动程序类。注意这里我们使用了ToolRunner接口。
package com.xueai8.customoutput; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class WordCountDriver extends Configured implements Tool { public static void main(String[] args) throws Exception { int exitCode = ToolRunner.run(new Configuration(), new WordCountDriver(), args); System.exit(exitCode); } public int run(String[] args) throws Exception { if (args.length != 2) { System.out.println("执行程序需要提供两个参数:"); System.out.println("[ 1 ] 输入路径"); System.out.println("[ 2 ] 输出路径"); return -1; } Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf,args).getRemainingArgs(); Path input = new Path(otherArgs[0]); Path output = new Path(otherArgs[1]); /* * 取消下面三行注释以启用map reduce作业的本地调试 * */ /* conf.set("fs.defaultFS", "local"); conf.set("mapreduce.job.maps","1"); conf.set("mapreduce.job.reduces","1"); */ Job job = Job.getInstance(conf,"Hadoop Example"); job.setJarByClass(WordCountDriver.class); // set mapper job.setMapperClass(WordCountMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); // set reducer job.setReducerClass(WordCountReducer.class); job.setNumReduceTasks(1); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // set input format and output format job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(WordCountOutputFormat.class); // Custom record separator, set in job configuration job.getConfiguration().set("mapreduce.output.textoutputformat.recordseparator","<==>"); // 设置字段分隔符,而不是默认的\t字符 job.getConfiguration().set("mapreduce.output.textoutputformat.separator",";"); FileInputFormat.addInputPath(job, input); FileOutputFormat.setOutputPath(job, output); job.setSpeculativeExecution(false); // 关闭此作业的投机执行(即推测执行机制) boolean success = job.waitForCompletion(true); return (success?0:1); } }
二、配置log4j
在src/main/resources目录下新增log4j的配置文件log4j.properties,内容如下:
log4j.rootLogger = info,stdout ### 输出信息到控制抬 ### log4j.appender.stdout = org.apache.log4j.ConsoleAppender log4j.appender.stdout.Target = System.out log4j.appender.stdout.layout = org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern = [%-5p] %d{yyyy-MM-dd HH:mm:ss,SSS} method:%l%n%m%n
三、项目打包
打开IDEA下方的终端窗口terminal,执行"mvn clean package"打包命令,如下图所示:
如果一切正常,会提示打jar包成功。如下图所示:
这时查看项目结构,会看到多了一个target目录,打好的jar包就位于此目录下。如下图所示:
四、项目部署
请按以下步骤执行。
1、启动HDFS集群和YARN集群。在Linux终端窗口中,执行如下的脚本:
$ start-dfs.sh $ start-yarn.sh
查看进程是否启动,集群运行是否正常。在Linux终端窗口中,执行如下的命令:
$ jps
这时应该能看到有如下5个进程正在运行,说明集群运行正常:
5542 NodeManager 5191 SecondaryNameNode 4857 NameNode 5418 ResourceManager 4975 DataNode
2、先在本地创建一个输入数据文件word.txt,并编辑内容如下:
Hello World Bye World Hello Hadoop Bye Hadoop Bye Hadoop Hello Hadoop hello, every one!
3、将数据文件word.txt上传到HDFS的/data/mr/目录下。
$ hdfs dfs -mkdir -p /data/mr $ hdfs dfs -put word.txt /data/mr/ $ hdfs dfs -ls /data/mr/
4、提交作业到Hadoop集群上运行。(如果jar包在Windows下,请先拷贝到Linux中。)
在终端窗口中,执行如下的作业提交命令:
$ hadoop jar HadoopDemo-1.0-SNAPSHOT.jar com.xueai8.customoutput.WordCountDriver /data/mr /data/mr-output
5、查看输出结果。
在终端窗口中,执行如下的HDFS命令,查看输出结果:
$ hdfs dfs -ls /data/mr-output $ hdfs dfs -cat /data/mr-output/part-r-00000
可以看到最后的统计结果如下:
bye;3<==>every;1<==>hadoop;4<==>hello;4<==>one;1<==>world;2<==>