MapReduce案例-二次排序算法_示例2
MapReduce框架基于key-value对的key对Reduce输入数据排序,还基于该key对数据分组。Hadoop为排序后的key中的每个唯一key调用reduce函数,排序后的key带有属于该key的值列表作为第二个参数。
但是,每个key的值列表是无序的。有许多场景需要每个key的Reduce输入值列表基于一定的条件排序。 例如,对于一个给定的key,找出值的最大值或最小值,而不需要迭代整个值列表。再例如,优化Reduce端的join,识别重复的数据产出等。
问题描述
下面我们对HTTP服务器日志项进行分析。 这里我们假定一个日志项由五部分组成:request host、timestamp、request URL、response size和HTTP状态码。如下所示:
192.168.0.2 - - [01/Jul/1995:00:00:01 -0400] "GET /history/apollo/HTTP/1.0" 200 6245
其中:
- 199.72.81.55 客户端用户的ip
- 01/Jul/1995:00:00:01 -0400 访问的时间
- GET HTTP方法,GET或POST
- /history/apollo/ 客户请求的URL
- 200 响应码 404
- 6245 响应内容的大小
要求:对用户访问日志记录进行排序,先按访问用户IP排序;同一个访问用户IP,访问的文件大小按字节排序。
输出数据格式如下:"host [234, 363, 669, 1382, 5866, 6245, 7074, ]"。
129.94.144.152 [0, 7074, ] 199.120.110.21 [1713, 4085, 4179, ] 199.72.81.55 [234, 363, 669, 1382, 5866, 6245, 7074, ] 205.189.154.54 [110, 786, 1204, 1224, 3985, 7634, 40310, ] 205.212.115.106 [3985, 7634, ] alyssa.prodigy.com [12054, ] burger.letters.com [0, 0, 0, ] ......
实现思路
我们可以使用Hadoop框架中一个叫做”二次排序”的机制来排序Reduce输入值。其原理如下。
MapReduce工作流程:
数据流图如下:
实现思路:
要实现二次排序特性,我们必须告诉MapReduce/Hadoop框架:
- 怎样排序reducer keys?
- 怎样分区(partition)传递给reducers的keys(自定义partitioner)?
- 怎样分组(group)到达每个reducer的数据?
Map端处理:
(1)复合键设计
使用Value-to-Key转换设计模式:形成一个复合中间键-(K,V1)。 其中v1是次关键字。这里K被称为自然键。 要将一个value(例如,V1)注入到一个reducer key内,简单地创建一个复合键即可。 也就是说,将原始数据的key值和其对应的数据组合成一个新的key值,然后新的key值对应的value还是原始数据中的value。 那么我们只需要对组合键(新key值)进行排序就OK了。
(2)分区器设计
然后我们需要自定义一个分区处理器,因为我们的目标不是想将新key值相同的记录传到一个reduce中,而是想将新key值中第一个字段相同的记录放到同一个reduce中进行分组合并,所以我们需要根据新key值的第一个字段来自定义一个分区处理器。
分区操作完成之后,再调用自己的自定义排序器对新的key值进行排序。
Reduce端处理:
经过Shuffle处理之后,数据传输到Reducer端了。 在Reducer端按照组合键的第一个字段进行分组,并且每处理完一次分组之后就会调用一次reduce函数来对这个分组进行处理和输出。 所以这里需要自定义一个分组比较器。
一、创建Java Maven项目
Maven依赖:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>HadoopDemo</groupId> <artifactId>com.xueai8</artifactId> <version>1.0-SNAPSHOT</version> <dependencies> <!--hadoop依赖--> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>3.3.1</version> </dependency> <!--hdfs文件系统依赖--> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>3.3.1</version> </dependency> <!--MapReduce相关的依赖--> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-core</artifactId> <version>3.3.1</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-jobclient</artifactId> <version>3.3.1</version> </dependency> <!--junit依赖--> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <!--编译器插件用于编译拓扑--> <plugin> <groupId>org.apache.maven.plugins</groupId> <!--指定maven编译的jdk版本和字符集,如果不指定,maven3默认用jdk 1.5 maven2默认用jdk1.3--> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>1.8</source> <!-- 源代码使用的JDK版本 --> <target>1.8</target> <!-- 需要生成的目标class文件的编译版本 --> <encoding>UTF-8</encoding><!-- 字符集编码 --> </configuration> </plugin> </plugins> </build> </project>
SecondarySortWritable.java:
首先,定义一个可比较的key数据类型,用作复合键类型。
package com.xueai8.secondarysort2; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.WritableComparable; /** * * 组合键(自然键+次关键字),作为Mapper的输出 * 自定义的组合键类型: IP + Size * 因为要用作key,需要实现WritableComparable接口 */ public class SecondarySortWritable implements WritableComparable<SecondarySortWritable>{ // 成员变量 private String ipAddress; // 访问用户的IP地址 private int size; // 响应文件的大小 public SecondarySortWritable() {} public String getIpAddress() { return ipAddress; } public int getSize() { return size; } public void set(String ipAddress,int size) { this.ipAddress = ipAddress; this.size = size; } @Override public void write(DataOutput out) throws IOException { out.writeUTF(ipAddress); out.writeInt(size); } @Override public void readFields(DataInput in) throws IOException { this.ipAddress = in.readUTF(); this.size = in.readInt(); } /* * 自定义比较策略:先对visitorAddress排序, 再对responseSize排序 * 注意:该比较策略用于MapReduce的第一次默认排序 * 也就是发生在Map端的sort阶段 * 发生地点为环形缓冲区(可以通过io.sort.mb进行大小调整) */ @Override public int compareTo(SecondarySortWritable o) { // 先比较ip,如果 ip相同,再比较size if(this.ipAddress.equals(o.getIpAddress())) { return this.size-o.getSize(); // 大小升序 }else { return this.ipAddress.compareTo(o.getIpAddress()); } } // 实现equals方法和hashCode方法 @Override public int hashCode() { return ipAddress.hashCode(); } @Override public boolean equals(Object obj) { if (this == obj) return true; if (obj == null) return false; if (getClass() != obj.getClass()) return false; SecondarySortWritable other = (SecondarySortWritable) obj; if (ipAddress == null) { if (other.ipAddress != null) return false; } else if (!ipAddress.equals(other.ipAddress)) return false; // ipAddress.equals(other.ipAddress) 如果走到这里,说明两个ip是相同的 if (size != other.size) return false; return true; } }
LogWritable.java:
然后,再定义一个实现Writable接口的value数据类型。
package com.xueai8.secondarysort2; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; /* * 定义值类型(实现Writable接口),代表日志信息的对象 // 199.72.81.55 - - [01/Jul/1995:00:00:01 -0400] "GET /history/apollo/ HTTP/1.0" 200 6245 // "^(\\S+) (\\S+) (\\S+) \\[([\\w:/]+\\s[+\\-]\\d{4})\\] \"(.+?)\" (\\d{3}) (\\d+)" // group(1) - ip // group(4) - timestamp // group(6) - status // group(7) - responseSize * */ public class LogWritable implements Writable{ private Text userIP; // 客户端的IP地址 private Text timestamp; // 客户访问时间 private Text url; // 客户访问的url private IntWritable status; // 状态码 private IntWritable responseSize;// 服务端响应数据的大小 public LogWritable() { this.userIP = new Text(); this.timestamp = new Text(); this.url = new Text(); this.status = new IntWritable(); this.responseSize = new IntWritable(); } public void set(String userIP, String timestamp, String url, int status, int responseSize) { this.userIP.set(userIP); this.timestamp.set(timestamp); this.url.set(url); this.status.set(status); this.responseSize.set(responseSize); } public Text getUserIP() { return userIP; } public void setUserIP(Text userIP) { this.userIP = userIP; } public Text getTimestamp() { return timestamp; } public void setTimestamp(Text timestamp) { this.timestamp = timestamp; } public Text getUrl() { return url; } public void setUrl(Text url) { this.url = url; } public IntWritable getStatus() { return status; } public void setStatus(IntWritable status) { this.status = status; } public IntWritable getResponseSize() { return responseSize; } public void setResponseSize(IntWritable responseSize) { this.responseSize = responseSize; } // 序列化方法 @Override public void write(DataOutput out) throws IOException { userIP.write(out); timestamp.write(out); url.write(out); status.write(out); responseSize.write(out); } // 反序列化方法 @Override public void readFields(DataInput in) throws IOException { userIP.readFields(in); timestamp.readFields(in); url.readFields(in); status.readFields(in); responseSize.readFields(in); } }
LogMapper.java
Mapper类。在这里组装复合键。
package com.xueai8.secondarysort2; import java.io.IOException; import java.util.regex.Matcher; import java.util.regex.Pattern; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; /** * * Mapper的输出key是组合键 */ public class LogMapper extends Mapper<LongWritable,Text,SecondarySortWritable,LogWritable>{ // 指定一个可重用的key变量 private SecondarySortWritable keyInfo = new SecondarySortWritable(); // key // private final Text outKey = new Text(); private final LogWritable outValue = new LogWritable(); // value @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 提取相应字段的正则表达式 String regexp = "^(\\S+) (\\S+) (\\S+) \\[([\\w:/]+\\s[+\\-]\\d{4})\\] \"(.+?)\" (\\d{3}) (\\d+)"; Pattern pattern = Pattern.compile(regexp); Matcher matcher = pattern.matcher(value.toString()); if(!matcher.matches()) { System.out.println("不是一个有效的日志记录"); return; } // 提取相应的字段 String ip = matcher.group(1); String timestamp = matcher.group(4); String url = matcher.group(5); int status = Integer.parseInt(matcher.group(6)); int responseSize = Integer.parseInt(matcher.group(7)); // LogWritable为 value outValue.set(ip, timestamp, url, status, responseSize); // SecondarySortWritable为复合键 keyInfo.set(ip, responseSize); // 写出 context.write(keyInfo, outValue); } }
LogPartitioner.java:
LogPartitioner是一个自定义的分区器类,它按自然key(ip或host)来分区数据。 在Hadoop中,分区阶段发生在map()阶段之后,reduce()阶段之前。 这个分区器(Partitioner)确保所有具有相同key的数据都会被发送到相同的reducer。
package com.xueai8.secondarysort2; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.Partitioner; /** * 实现一个自定义的partitioner来对Map输出数据进行分区,只基于组合key中包含的实际的key(ip地址) */ public class LogPartitioner extends Partitioner<SecondarySortWritable, Writable>{ /** * 数据输入来源:map输出 * 自定义分区规则:按IP地址(我们这里根据组合键的第一个值)作为分区 * @param key map输出键值 * @param value map输出value值 * @param numPartitions 分区总数,即reduce task个数 */ @Override public int getPartition(SecondarySortWritable key, Writable value, int numPartitions) { return (key.getIpAddress().hashCode() & Integer.MAX_VALUE) % numPartitions; } }
LogGroupingComparator.java:
在调用Reducer的reduce函数之前,Reducer先通过LogGroupingComparator进行组内排序。
package com.xueai8.secondarysort2; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; /** * * 实现一个自定义的grouping comparator来仅基于实际的key (ip地址)对Reduce输入进行分组 * * 分组比较器:只按ip地址分组 * 自定义分组有两种方式: * 一种是继承WritableComparator * 另外一种是实现RawComparator接口 * */ public class LogGroupingComparator extends WritableComparator { public LogGroupingComparator() { super(SecondarySortWritable.class,true); // 反射要用 } // 在分组时,只比较组合键中的IP地址。相同的IP地址分为一组 @Override public int compare(WritableComparable a, WritableComparable b) { SecondarySortWritable firstKey = (SecondarySortWritable)a; SecondarySortWritable secondKey = (SecondarySortWritable)b; return (firstKey.getIpAddress()).compareTo(secondKey.getIpAddress()); } }
LogReducer.java:
在调用reduce之前,已经由LogGroupingComparator进行了组内排序。 在定义key数据类型时,已经指定了比较器:先按ip排序;相同ip再按日志中response size大小排序。
package com.xueai8.secondarysort2; import java.io.IOException; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class LogReducer extends Reducer<SecondarySortWritable,LogWritable, Text, Text> { private Text keyInfo = new Text(); private Text valueInfo = new Text(); // 修改 @Override protected void reduce(SecondarySortWritable key, Iterable<LogWritable> values, Context context) throws IOException, InterruptedException { StringBuilder builder = new StringBuilder("["); for(LogWritable log : values) { builder.append(log.getResponseSize().get()); builder.append(", "); } builder.append("]"); keyInfo.set(key.getIpAddress()); valueInfo.set(builder.toString()); context.write(keyInfo,valueInfo); // <ip,[sorted size]> } }
LogSecondarySortDriver.java:
驱动程序类,用于将作业提交到Hadoop上执行。
package com.xueai8.secondarysort2; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * * 在驱动程序中配置partitioner、LogGroupingComparator、以及Map输出key类型 */ public class LogSecondarySortDriver { public static void main(String[] args) throws Exception { if(args.length < 2) { System.out.println("用法: LogSecondarySortDriver <input> <output>"); System.exit(1); } Configuration conf = new Configuration(); Job job = Job.getInstance(conf,"日志分析"); job.setJarByClass(LogSecondarySortDriver.class); // set mapper job.setMapperClass(LogMapper.class); job.setMapOutputKeyClass(SecondarySortWritable.class); // 设置map输出类型 job.setMapOutputValueClass(LogWritable.class); job.setPartitionerClass(LogPartitioner.class); // 设置分区器 // set reducer job.setReducerClass(LogReducer.class); // job.setNumReduceTasks(3); // 指定分区数量 job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setGroupingComparatorClass(LogGroupingComparator.class); // 设置输入路径 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); // 提交作业 System.exit(job.waitForCompletion(true)?0:1); } }
说明:
我们首先实现一个自定义的WritableComparable key类型,它拥有实际的key和value中需要排序的字段。我们确保这个新的组合key类型的排序顺序是实际的key后跟value的排序字段。这将确保Reduce输入数据被基于实际的key首先排序,后跟value中给定的字段。
然后我们实现一个自定义的partitioner,用来对Map输出数据分区(仅基于新的组合key中实际的key字段)。这一步确保每个具有相同实际key的key-value对将被同一个Reducer所处理。
最后,我们实现一个grouping comparator,当grouping reduce的输入key-value对时,它将只考虑新的组合key中实际的key。这确保每个reduce函数输入都将是聚集了属于实际key的值列表的新的组合key。值列表将按照在组合key的comparator中定义的顺序进行排序。
二、配置log4j
在src/main/resources目录下新增log4j的配置文件log4j.properties,内容如下:
log4j.rootLogger = info,stdout ### 输出信息到控制抬 ### log4j.appender.stdout = org.apache.log4j.ConsoleAppender log4j.appender.stdout.Target = System.out log4j.appender.stdout.layout = org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern = [%-5p] %d{yyyy-MM-dd HH:mm:ss,SSS} method:%l%n%m%n
三、项目打包
打开IDEA下方的终端窗口terminal,执行"mvn clean package"打包命令,如下图所示:
如果一切正常,会提示打jar包成功。如下图所示:
这时查看项目结构,会看到多了一个target目录,打好的jar包就位于此目录下。如下图所示:
四、项目部署
请按以下步骤执行。
1、启动HDFS集群和YARN集群。在Linux终端窗口中,执行如下的脚本:
$ start-dfs.sh $ start-yarn.sh
查看进程是否启动,集群运行是否正常。在Linux终端窗口中,执行如下的命令:
$ jps
这时应该能看到有如下5个进程正在运行,说明集群运行正常:
5542 NodeManager 5191 SecondaryNameNode 4857 NameNode 5418 ResourceManager 4975 DataNode
2、将日志文件log_sample.txt上传到HDFS的/data/mr/目录下。
$ hdfs dfs -mkdir -p /data/mr $ hdfs dfs -put log_sample.txt /data/mr/ $ hdfs dfs -ls /data/mr/
3、提交作业到Hadoop集群上运行。(如果jar包在Windows下,请先拷贝到Linux中。)
在终端窗口中,执行如下的作业提交命令:
$ hadoop jar com.xueai8-1.0-SNAPSHOT.jar com.xueai8.secondarysort2.LogSecondarySortDriver /data/mr /data/mr-output
4、查看输出结果。
在终端窗口中,执行如下的HDFS命令,查看输出结果:
$ hdfs dfs -ls /data/mr-output
可以看到如下的输出结果:
129.94.144.152 [0, 7074, ] 199.120.110.21 [1713, 4085, 4179, ] 199.72.81.55 [234, 363, 669, 1382, 5866, 6245, 7074, ] 205.189.154.54 [110, 786, 1204, 1224, 3985, 7634, 40310, ] 205.212.115.106 [3985, 7634, ] alyssa.prodigy.com [12054, ] burger.letters.com [0, 0, 0, ] d104.aa.net [786, 1204, 3985, 40310, ] dave.dev1.ihub.com [786, 1204, 3985, 40310, ] dd14-012.compuserve.com [42732, ] dial22.lloyd.com [61716, ] gater3.sematech.org [1204, 40310, ] gater4.sematech.org [786, 3985, ] gayle-gaston.tenet.edu [12040, ] ix-or10-06.ix.netcom.com [4151, 5998, ] ix-orl2-01.ix.netcom.com [1204, 3985, 40310, ] link097.txdirect.net [786, 1204, 1713, 4179, 4377, 6922, 8677, 11417, 11853, ] net-1-141.eden.com [34029, ] netport-27.iu.net [7074, ] onyx.southwind.net [0, 3985, 40310, ] piweba3y.prodigy.com [12054, 55666, ] pm13.j51.com [305722, ] port26.annex2.nwlink.com [234, 363, 669, 1204, 1414, 4441, 9867, 13372, 25218, ] ppp-mia-30.shadow.net [234, 363, 669, 786, 5866, 7074, ] ppp-nyc-3-1.ios.com [52491, 77163, ] ppptky391.asahi-net.or.jp [3977, 11473, ] remote27.compusmart.ab.ca [110, 3985, 7634, 12054, ] scheyer.clark.net [49152, ] slip1.yab.com [6168, 16991, ] smyth-pc.moorecap.com [2261, 18149, 101267, ] unicomp6.unicomp.net [786, 1204, 3214, 3985, 40310, ] waters-gw.starway.net.au [6723, ] www-a1.proxy.aol.com [3985, ] www-b4.proxy.aol.com [70712, ]