MapReduce案例-二次排序算法_示例2

MapReduce框架基于key-value对的key对Reduce输入数据排序,还基于该key对数据分组。Hadoop为排序后的key中的每个唯一key调用reduce函数,排序后的key带有属于该key的值列表作为第二个参数。

但是,每个key的值列表是无序的。有许多场景需要每个key的Reduce输入值列表基于一定的条件排序。 例如,对于一个给定的key,找出值的最大值或最小值,而不需要迭代整个值列表。再例如,优化Reduce端的join,识别重复的数据产出等。

问题描述

下面我们对HTTP服务器日志项进行分析。 这里我们假定一个日志项由五部分组成:request host、timestamp、request URL、response size和HTTP状态码。如下所示:

192.168.0.2 - - [01/Jul/1995:00:00:01 -0400] "GET /history/apollo/HTTP/1.0" 200 6245

其中:

  • 199.72.81.55 客户端用户的ip
  • 01/Jul/1995:00:00:01 -0400 访问的时间
  • GET HTTP方法,GET或POST
  • /history/apollo/ 客户请求的URL
  • 200 响应码 404
  • 6245 响应内容的大小

要求:对用户访问日志记录进行排序,先按访问用户IP排序;同一个访问用户IP,访问的文件大小按字节排序。

输出数据格式如下:"host [234, 363, 669, 1382, 5866, 6245, 7074, ]"。

129.94.144.152	[0, 7074, ]
199.120.110.21	[1713, 4085, 4179, ]
199.72.81.55	[234, 363, 669, 1382, 5866, 6245, 7074, ]
205.189.154.54	[110, 786, 1204, 1224, 3985, 7634, 40310, ]
205.212.115.106	[3985, 7634, ]
alyssa.prodigy.com	[12054, ]
burger.letters.com	[0, 0, 0, ]
......

实现思路

我们可以使用Hadoop框架中一个叫做”二次排序”的机制来排序Reduce输入值。其原理如下。

MapReduce工作流程:

数据流图如下:

实现思路:

要实现二次排序特性,我们必须告诉MapReduce/Hadoop框架:

  • 怎样排序reducer keys?
  • 怎样分区(partition)传递给reducers的keys(自定义partitioner)?
  • 怎样分组(group)到达每个reducer的数据?

Map端处理:

(1)复合键设计

使用Value-to-Key转换设计模式:形成一个复合中间键-(K,V1)。 其中v1是次关键字。这里K被称为自然键。 要将一个value(例如,V1)注入到一个reducer key内,简单地创建一个复合键即可。 也就是说,将原始数据的key值和其对应的数据组合成一个新的key值,然后新的key值对应的value还是原始数据中的value。 那么我们只需要对组合键(新key值)进行排序就OK了。

(2)分区器设计

然后我们需要自定义一个分区处理器,因为我们的目标不是想将新key值相同的记录传到一个reduce中,而是想将新key值中第一个字段相同的记录放到同一个reduce中进行分组合并,所以我们需要根据新key值的第一个字段来自定义一个分区处理器。

分区操作完成之后,再调用自己的自定义排序器对新的key值进行排序。

Reduce端处理:

经过Shuffle处理之后,数据传输到Reducer端了。 在Reducer端按照组合键的第一个字段进行分组,并且每处理完一次分组之后就会调用一次reduce函数来对这个分组进行处理和输出。 所以这里需要自定义一个分组比较器。

一、创建Java Maven项目

Maven依赖:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>HadoopDemo</groupId>
    <artifactId>com.xueai8</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <!--hadoop依赖-->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>3.3.1</version>
        </dependency>
        <!--hdfs文件系统依赖-->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>3.3.1</version>
        </dependency>
        <!--MapReduce相关的依赖-->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-core</artifactId>
            <version>3.3.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
            <version>3.3.1</version>
        </dependency>
        <!--junit依赖-->
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <!--编译器插件用于编译拓扑-->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <!--指定maven编译的jdk版本和字符集,如果不指定,maven3默认用jdk 1.5 maven2默认用jdk1.3-->
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>1.8</source> <!-- 源代码使用的JDK版本 -->
                    <target>1.8</target> <!-- 需要生成的目标class文件的编译版本 -->
                    <encoding>UTF-8</encoding><!-- 字符集编码 -->
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

SecondarySortWritable.java:

首先,定义一个可比较的key数据类型,用作复合键类型。

package com.xueai8.secondarysort2;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;

/**
 *
 * 组合键(自然键+次关键字),作为Mapper的输出
 * 自定义的组合键类型: IP + Size
 * 因为要用作key,需要实现WritableComparable接口
 */
public class SecondarySortWritable implements WritableComparable<SecondarySortWritable>{
    // 成员变量
    private String ipAddress;       // 访问用户的IP地址
    private int size;               // 响应文件的大小

    public SecondarySortWritable() {}

    public String getIpAddress() {
        return ipAddress;
    }

    public int getSize() {
        return size;
    }

    public void set(String ipAddress,int size) {
        this.ipAddress = ipAddress;
        this.size = size;
    }

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeUTF(ipAddress);
        out.writeInt(size);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        this.ipAddress = in.readUTF();
        this.size = in.readInt();
    }

    /*
	 * 自定义比较策略:先对visitorAddress排序, 再对responseSize排序
	 * 注意:该比较策略用于MapReduce的第一次默认排序
	 * 也就是发生在Map端的sort阶段
	 * 发生地点为环形缓冲区(可以通过io.sort.mb进行大小调整)
	 */
    @Override
    public int compareTo(SecondarySortWritable o) {
        // 先比较ip,如果 ip相同,再比较size
        if(this.ipAddress.equals(o.getIpAddress())) {
            return this.size-o.getSize();		// 大小升序
        }else {
            return this.ipAddress.compareTo(o.getIpAddress());
        }
    }

    // 实现equals方法和hashCode方法
    @Override
    public int hashCode() {
        return ipAddress.hashCode();
    }

    @Override
    public boolean equals(Object obj) {
        if (this == obj)
            return true;
        if (obj == null)
            return false;
        if (getClass() != obj.getClass())
            return false;

        SecondarySortWritable other = (SecondarySortWritable) obj;
        if (ipAddress == null) {
            if (other.ipAddress != null)
                return false;
        } else if (!ipAddress.equals(other.ipAddress))
            return false;

        // ipAddress.equals(other.ipAddress) 如果走到这里,说明两个ip是相同的
        if (size != other.size)
            return false;
        return true;
    }
}

LogWritable.java:

然后,再定义一个实现Writable接口的value数据类型。

package com.xueai8.secondarysort2;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;

/*
 * 定义值类型(实现Writable接口),代表日志信息的对象
// 199.72.81.55 - - [01/Jul/1995:00:00:01 -0400] "GET /history/apollo/ HTTP/1.0" 200 6245
// "^(\\S+) (\\S+) (\\S+) \\[([\\w:/]+\\s[+\\-]\\d{4})\\] \"(.+?)\" (\\d{3}) (\\d+)"
// group(1)   - ip
// group(4)   - timestamp
// group(6)	  - status
// group(7)   - responseSize
 *
 */
public class LogWritable implements Writable{
	private Text userIP;		// 客户端的IP地址
	private Text timestamp;		// 客户访问时间
	private Text url;		// 客户访问的url

	private IntWritable status;	// 状态码
	private IntWritable responseSize;// 服务端响应数据的大小

	public LogWritable() {
		this.userIP = new Text();
		this.timestamp = new Text();
		this.url = new Text();
		this.status = new IntWritable();
		this.responseSize = new IntWritable();
	}

	public void set(String userIP, String timestamp, String url, int status, int responseSize) {
		this.userIP.set(userIP);
		this.timestamp.set(timestamp);
		this.url.set(url);
		this.status.set(status);
		this.responseSize.set(responseSize);
	}

	public Text getUserIP() {
		return userIP;
	}

	public void setUserIP(Text userIP) {
		this.userIP = userIP;
	}

	public Text getTimestamp() {
		return timestamp;
	}

	public void setTimestamp(Text timestamp) {
		this.timestamp = timestamp;
	}

	public Text getUrl() {
		return url;
	}

	public void setUrl(Text url) {
		this.url = url;
	}

	public IntWritable getStatus() {
		return status;
	}

	public void setStatus(IntWritable status) {
		this.status = status;
	}

	public IntWritable getResponseSize() {
		return responseSize;
	}

	public void setResponseSize(IntWritable responseSize) {
		this.responseSize = responseSize;
	}

	// 序列化方法
	@Override
	public void write(DataOutput out) throws IOException {
		userIP.write(out);
		timestamp.write(out);
		url.write(out);
		status.write(out);
		responseSize.write(out);
	}

	// 反序列化方法
	@Override
	public void readFields(DataInput in) throws IOException {
		userIP.readFields(in);
		timestamp.readFields(in);
		url.readFields(in);
		status.readFields(in);
		responseSize.readFields(in);
	}
}

LogMapper.java

Mapper类。在这里组装复合键。

package com.xueai8.secondarysort2;

import java.io.IOException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

/**
 *
 * Mapper的输出key是组合键
 */
public class LogMapper extends Mapper<LongWritable,Text,SecondarySortWritable,LogWritable>{

    // 指定一个可重用的key变量
    private SecondarySortWritable keyInfo = new SecondarySortWritable(); // key
//    private final Text outKey = new Text();
    private final LogWritable outValue = new LogWritable();		 // value

    @Override
    protected void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
        // 提取相应字段的正则表达式
        String regexp = "^(\\S+) (\\S+) (\\S+) \\[([\\w:/]+\\s[+\\-]\\d{4})\\] \"(.+?)\" (\\d{3}) (\\d+)";

        Pattern pattern = Pattern.compile(regexp);
        Matcher matcher = pattern.matcher(value.toString());
        if(!matcher.matches()) {
            System.out.println("不是一个有效的日志记录");
            return;
        }

        // 提取相应的字段
        String ip = matcher.group(1);
        String timestamp = matcher.group(4);
        String url = matcher.group(5);
        int status = Integer.parseInt(matcher.group(6));
        int responseSize = Integer.parseInt(matcher.group(7));

        // LogWritable为 value
        outValue.set(ip, timestamp, url, status, responseSize);
        // SecondarySortWritable为复合键
        keyInfo.set(ip, responseSize);

        // 写出
        context.write(keyInfo, outValue);
    }
}

LogPartitioner.java:

LogPartitioner是一个自定义的分区器类,它按自然key(ip或host)来分区数据。 在Hadoop中,分区阶段发生在map()阶段之后,reduce()阶段之前。 这个分区器(Partitioner)确保所有具有相同key的数据都会被发送到相同的reducer。

package com.xueai8.secondarysort2;

import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Partitioner;

/**
 * 实现一个自定义的partitioner来对Map输出数据进行分区,只基于组合key中包含的实际的key(ip地址)
 */
public class LogPartitioner extends Partitioner<SecondarySortWritable, Writable>{

    /**
     * 数据输入来源:map输出
     * 自定义分区规则:按IP地址(我们这里根据组合键的第一个值)作为分区
     * @param key map输出键值
     * @param value map输出value值
     * @param numPartitions 分区总数,即reduce task个数
     */
    @Override
    public int getPartition(SecondarySortWritable key, Writable value, int numPartitions) {
        return (key.getIpAddress().hashCode() & Integer.MAX_VALUE) % numPartitions;
    }
}

LogGroupingComparator.java:

在调用Reducer的reduce函数之前,Reducer先通过LogGroupingComparator进行组内排序。

package com.xueai8.secondarysort2;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

/**
 *
 * 实现一个自定义的grouping comparator来仅基于实际的key (ip地址)对Reduce输入进行分组
 *
 * 分组比较器:只按ip地址分组
 * 自定义分组有两种方式:
 * 一种是继承WritableComparator
 * 另外一种是实现RawComparator接口
 *
 */
public class LogGroupingComparator extends WritableComparator {

    public LogGroupingComparator() {
        super(SecondarySortWritable.class,true);	// 反射要用
    }

    // 在分组时,只比较组合键中的IP地址。相同的IP地址分为一组
    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        SecondarySortWritable firstKey = (SecondarySortWritable)a;
        SecondarySortWritable secondKey = (SecondarySortWritable)b;
        return (firstKey.getIpAddress()).compareTo(secondKey.getIpAddress());
    }

}

LogReducer.java:

在调用reduce之前,已经由LogGroupingComparator进行了组内排序。 在定义key数据类型时,已经指定了比较器:先按ip排序;相同ip再按日志中response size大小排序。

package com.xueai8.secondarysort2;

import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class LogReducer extends Reducer<SecondarySortWritable,LogWritable, Text, Text> {

    private Text keyInfo = new Text();
    private Text valueInfo = new Text();

    // 修改
    @Override
    protected void reduce(SecondarySortWritable key, Iterable<LogWritable> values, Context context)
            throws IOException, InterruptedException {
        StringBuilder builder = new StringBuilder("[");
        for(LogWritable log : values) {
            builder.append(log.getResponseSize().get());
            builder.append(", ");
        }
        builder.append("]");
        keyInfo.set(key.getIpAddress());
        valueInfo.set(builder.toString());
        context.write(keyInfo,valueInfo);		// <ip,[sorted size]>
    }
}

LogSecondarySortDriver.java:

驱动程序类,用于将作业提交到Hadoop上执行。

package com.xueai8.secondarysort2;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
 *
 * 在驱动程序中配置partitioner、LogGroupingComparator、以及Map输出key类型
 */
public class LogSecondarySortDriver {
    public static void main(String[] args) throws Exception {
        if(args.length < 2) {
            System.out.println("用法: LogSecondarySortDriver <input> <output>");
            System.exit(1);
        }

        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf,"日志分析");
        job.setJarByClass(LogSecondarySortDriver.class);

        // set mapper
        job.setMapperClass(LogMapper.class);
        job.setMapOutputKeyClass(SecondarySortWritable.class);  // 设置map输出类型
        job.setMapOutputValueClass(LogWritable.class);

        job.setPartitionerClass(LogPartitioner.class);  	// 设置分区器

        // set reducer
        job.setReducerClass(LogReducer.class);
//        job.setNumReduceTasks(3); 		// 指定分区数量
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        job.setGroupingComparatorClass(LogGroupingComparator.class);

        // 设置输入路径
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        // 提交作业
        System.exit(job.waitForCompletion(true)?0:1);
    }
}

说明:

我们首先实现一个自定义的WritableComparable key类型,它拥有实际的key和value中需要排序的字段。我们确保这个新的组合key类型的排序顺序是实际的key后跟value的排序字段。这将确保Reduce输入数据被基于实际的key首先排序,后跟value中给定的字段。

然后我们实现一个自定义的partitioner,用来对Map输出数据分区(仅基于新的组合key中实际的key字段)。这一步确保每个具有相同实际key的key-value对将被同一个Reducer所处理。

最后,我们实现一个grouping comparator,当grouping reduce的输入key-value对时,它将只考虑新的组合key中实际的key。这确保每个reduce函数输入都将是聚集了属于实际key的值列表的新的组合key。值列表将按照在组合key的comparator中定义的顺序进行排序。

二、配置log4j

在src/main/resources目录下新增log4j的配置文件log4j.properties,内容如下:

log4j.rootLogger = info,stdout

### 输出信息到控制抬 ###
log4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target = System.out
log4j.appender.stdout.layout = org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern = [%-5p] %d{yyyy-MM-dd HH:mm:ss,SSS} method:%l%n%m%n

三、项目打包

打开IDEA下方的终端窗口terminal,执行"mvn clean package"打包命令,如下图所示:

如果一切正常,会提示打jar包成功。如下图所示:

这时查看项目结构,会看到多了一个target目录,打好的jar包就位于此目录下。如下图所示:

四、项目部署

请按以下步骤执行。

1、启动HDFS集群和YARN集群。在Linux终端窗口中,执行如下的脚本:

    $ start-dfs.sh
    $ start-yarn.sh

查看进程是否启动,集群运行是否正常。在Linux终端窗口中,执行如下的命令:

    $ jps

这时应该能看到有如下5个进程正在运行,说明集群运行正常:

    5542 NodeManager
    5191 SecondaryNameNode
    4857 NameNode
    5418 ResourceManager
    4975 DataNode

2、将日志文件log_sample.txt上传到HDFS的/data/mr/目录下。

$ hdfs dfs -mkdir -p /data/mr
$ hdfs dfs -put log_sample.txt /data/mr/
$ hdfs dfs -ls /data/mr/

3、提交作业到Hadoop集群上运行。(如果jar包在Windows下,请先拷贝到Linux中。)

在终端窗口中,执行如下的作业提交命令:

$ hadoop jar com.xueai8-1.0-SNAPSHOT.jar com.xueai8.secondarysort2.LogSecondarySortDriver /data/mr /data/mr-output 

4、查看输出结果。

在终端窗口中,执行如下的HDFS命令,查看输出结果:

$ hdfs dfs -ls /data/mr-output 

可以看到如下的输出结果:

129.94.144.152	[0, 7074, ]
199.120.110.21	[1713, 4085, 4179, ]
199.72.81.55	[234, 363, 669, 1382, 5866, 6245, 7074, ]
205.189.154.54	[110, 786, 1204, 1224, 3985, 7634, 40310, ]
205.212.115.106	[3985, 7634, ]
alyssa.prodigy.com	[12054, ]
burger.letters.com	[0, 0, 0, ]
d104.aa.net	[786, 1204, 3985, 40310, ]
dave.dev1.ihub.com	[786, 1204, 3985, 40310, ]
dd14-012.compuserve.com	[42732, ]
dial22.lloyd.com	[61716, ]
gater3.sematech.org	[1204, 40310, ]
gater4.sematech.org	[786, 3985, ]
gayle-gaston.tenet.edu	[12040, ]
ix-or10-06.ix.netcom.com	[4151, 5998, ]
ix-orl2-01.ix.netcom.com	[1204, 3985, 40310, ]
link097.txdirect.net	[786, 1204, 1713, 4179, 4377, 6922, 8677, 11417, 11853, ]
net-1-141.eden.com	[34029, ]
netport-27.iu.net	[7074, ]
onyx.southwind.net	[0, 3985, 40310, ]
piweba3y.prodigy.com	[12054, 55666, ]
pm13.j51.com	[305722, ]
port26.annex2.nwlink.com	[234, 363, 669, 1204, 1414, 4441, 9867, 13372, 25218, ]
ppp-mia-30.shadow.net	[234, 363, 669, 786, 5866, 7074, ]
ppp-nyc-3-1.ios.com	[52491, 77163, ]
ppptky391.asahi-net.or.jp	[3977, 11473, ]
remote27.compusmart.ab.ca	[110, 3985, 7634, 12054, ]
scheyer.clark.net	[49152, ]
slip1.yab.com	[6168, 16991, ]
smyth-pc.moorecap.com	[2261, 18149, 101267, ]
unicomp6.unicomp.net	[786, 1204, 3214, 3985, 40310, ]
waters-gw.starway.net.au	[6723, ]
www-a1.proxy.aol.com	[3985, ]
www-b4.proxy.aol.com	[70712, ]

《PySpark原理深入与编程实战》