MapReduce表连接_reduce端连接

如果没有一个map-side join技术适合我们的数据集,那么就需要在MapReduce中使用shuffle来排序和连接两个数据集。 这称为reduce-side join,也叫“重分区连接”。

重分区连接是reduce端连接。它利用MapReduce的排序-合并机制来分组数据。它只使用一个单独的MapReduce任务,并支持N-way join,这里N指的是被连接的数据集的数量。

Map阶段负责从多个数据集中读取数据,决定用于每条记录的连接值,并将连接值作为输出key。输出value则包含在reduce阶段所合并的数据集的数据。

Reduce阶段,一个reduce接收map函数传来的每一个join key的所有输出值,并将数据分为N个分区,这里N指的是被连接的数量。在该reducer接收到用于该join value的所有输入记录并在内存中对他们分区之后,它对所有分区执行一个笛卡尔积(Cartersian product),并输出每个join的结果。

下图描述了Hadoop中reduce端连接的整体流程流。

在这里,map端处理发出两个表的连接key和相应的元组值。 作为这种处理的结果,所有具有相同连接key的元组落入相同的reducer中,然后reducer将具有相同连接键的记录连接起来。

案例描述

有两个数据集logs.txt和users.txt(假设两个数据集都很大,大到都无法直接加载到内存中)。其中 logs.txt为基于用户的一些活动(可从应用程序或web服务器日志中抽取出来),包括用户名、活动、源IP地址; users.txt中为用户数据,包括用户名、年龄和所在地区。

文件logs.txt:

anne	22	NY
joe	39	CO
alison	35	NY
mike	69	VA
marie	27	OR
jim	21	OR
bob	71	CA
mary	53	NY
dave	36	VA
dude	50	CA

文件users.txt:

213.59.105.58	CA
166.36.182.90	UK
221.220.8.0	US
229.48.154.221	UK

要求使用MapReduce reduce-side连接这两个数据集。

解决方案

要支持这个技术,MapReduce代码需要满足以下条件:

  • 它需要支持多个map类,每个map处理一个不同的输入数据集。这是通过使用MultipleInputs来完成的。
  • 它需要一个方式来标记由mapper所输出的记录,这样它们才能与它们原始的数据集相关联。这里我们将使用htuple项目来简化MapReduce中组合数据(composite data)的处理。

一、创建Java Maven项目

Maven依赖:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>HadoopDemo</groupId>
    <artifactId>com.xueai8</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <!--hadoop依赖-->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>3.3.1</version>
        </dependency>
        <!--hdfs文件系统依赖-->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>3.3.1</version>
        </dependency>
        <!--MapReduce相关的依赖-->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-core</artifactId>
            <version>3.3.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
            <version>3.3.1</version>
        </dependency>
        <!--junit依赖-->
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <!--编译器插件用于编译拓扑-->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <!--指定maven编译的jdk版本和字符集,如果不指定,maven3默认用jdk 1.5 maven2默认用jdk1.3-->
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>1.8</source> <!-- 源代码使用的JDK版本 -->
                    <target>1.8</target> <!-- 需要生成的目标class文件的编译版本 -->
                    <encoding>UTF-8</encoding><!-- 字符集编码 -->
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

UserMap.java:

处理users.txt数据的Mapper类。

package com.xueai8.reducejoin;

import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import static com.xueai8.reducejoin.SimpleReduceJoinDriver.USERS;

public class UserMap extends Mapper<LongWritable, Text, Text, TupleWritable> {

    private Text outputKey = new Text();
    private TupleWritable outputValue = new TupleWritable();

    @Override
    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String username = value.toString().split("\t")[0]; // 提取用户名
        outputKey.set(username);
        outputValue.setTable(USERS);
        outputValue.setRecord(value.toString());
        // 写出
        context.write(outputKey, outputValue);
    }
}

UserLogMap.java:

处理logs.txt数据的Mapper类。

package com.xueai8.reducejoin;

import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import static com.xueai8.reducejoin.SimpleReduceJoinDriver.USER_LOGS;

public class UserLogMap extends Mapper<LongWritable, Text, Text, TupleWritable> {

    private Text outputKey = new Text();
    private TupleWritable outputValue = new TupleWritable();

    @Override
    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String username = value.toString().split("\t")[0]; // 提取用户名
        outputKey.set(username);
        outputValue.setTable(USER_LOGS);
        outputValue.setRecord(value.toString());

        context.write(outputKey, outputValue);
    }
}

JoinReduce.java:

Reducer类。在这里对两个Mapper的输出结果执行join连接。

package com.xueai8.reducejoin;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import static com.xueai8.reducejoin.SimpleReduceJoinDriver.USERS;
import static com.xueai8.reducejoin.SimpleReduceJoinDriver.USER_LOGS;

/**
 *
 * reduce-join:连接两个mapper的输出
 */
public class JoinReduce extends Reducer<Text, TupleWritable, Text, Text> {

    private Text keyInfo = new Text();
    private Text valueInfo = new Text();

    @Override
    protected void reduce(Text key, Iterable<TupleWritable> values, Context context)
            throws IOException, InterruptedException {
        List<String> users = new ArrayList<>();
        List<String> userLogs = new ArrayList<>();

        // 解析从mapper收到的输入,分别放入相应的List集合中
        for (TupleWritable tuple : values) {
            System.out.println("Tuple: " + tuple);
            switch (tuple.getTable()) {
                case USERS: {           // 来自users.txt
                    users.add(tuple.getRecord());
                    break;
                }
                case USER_LOGS: {	    // 来自logs.txt
                    userLogs.add(tuple.getRecord());
                    break;
                }
            }
        }

        // 笛卡尔积
        for (String user : users) {
            for (String userLog : userLogs) {
                keyInfo.set(user);
                valueInfo.set(userLog);
                context.write(keyInfo, valueInfo);
            }
        }

    }
}

SimpleReduceJoinDriver.java:

驱动程序,MultipleInputs输入格式化类处理两个输入数据集。

package com.xueai8.reducejoin;

import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class SimpleReduceJoinDriver extends Configured implements Tool{

    // 声明代表不同数据表的标志变量
    public static final int USERS = 0;      // 代表记录来自用户信息表
    public static final int USER_LOGS = 1;  // 代表记录来自用户日志记录表

    public static void main(final String[] args) throws Exception {
        int res = ToolRunner.run(new SimpleReduceJoinDriver(), args);
        System.exit(res);
    }

    @Override
    public int run(final String[] args) throws Exception {
        if (args.length < 3) {
            System.err.println("用法: SimpleReduceJoinDriver <userpath> <logpath> <output>");
            System.exit(-1);
        }

        Path usersPath = new Path(args[0]);         // users.txt输入路径
        Path userLogsPath = new Path(args[1]);      // logs.txt输入路径
        Path outputPath = new Path(args[2]);        // reduce-join输出路径

        Job job = Job.getInstance(getConf(),"Simple Repartition Join");
        job.setJarByClass(SimpleReduceJoinDriver.class);

        // Mapper
        // 使用MultipleInputs输入格式化类
        // 分别为不同的输入文件指定不同的InputFormat类和Mapper类
        MultipleInputs.addInputPath(job, usersPath, TextInputFormat.class, UserMap.class);
        MultipleInputs.addInputPath(job, userLogsPath, TextInputFormat.class, UserLogMap.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(TupleWritable.class);

        // Reducer
        job.setReducerClass(JoinReduce.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        // 如果输出路径已经存在了,先删除它,免得每次修改输出路径名
        FileSystem fs = FileSystem.get(outputPath.toUri(),getConf());
        if(fs.exists(outputPath)) {
            fs.delete(outputPath, true);
        }
        FileOutputFormat.setOutputPath(job, outputPath);

        return job.waitForCompletion(true) ? 0 : 1;
    }
}    

二、配置log4j

在src/main/resources目录下新增log4j的配置文件log4j.properties,内容如下:

log4j.rootLogger = info,stdout

### 输出信息到控制抬 ###
log4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target = System.out
log4j.appender.stdout.layout = org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern = [%-5p] %d{yyyy-MM-dd HH:mm:ss,SSS} method:%l%n%m%n

三、项目打包

打开IDEA下方的终端窗口terminal,执行"mvn clean package"打包命令,如下图所示:

如果一切正常,会提示打jar包成功。如下图所示:

这时查看项目结构,会看到多了一个target目录,打好的jar包就位于此目录下。如下图所示:

四、项目部署

请按以下步骤执行。

1、启动HDFS集群和YARN集群。在Linux终端窗口中,执行如下的脚本:

$ start-dfs.sh
$ start-yarn.sh

查看进程是否启动,集群运行是否正常。在Linux终端窗口中,执行如下的命令:

$ jps

这时应该能看到有如下5个进程正在运行,说明集群运行正常:

    5542 NodeManager
    5191 SecondaryNameNode
    4857 NameNode
    5418 ResourceManager
    4975 DataNode

2、分别将日志数据文件logs.txt和用户数据文件users.txt上传到HDFS的/input/reducejoin/目录下。

$ hdfs dfs -mkdir -p /input/reducejoin
$ hdfs dfs -put logs.txt /input/reducejoin/
$ hdfs dfs -put users.txt /input/reducejoin/
$ hdfs dfs -ls /input/reducejoin/

3、提交作业到Hadoop集群上运行。(如果jar包在Windows下,请先拷贝到Linux中。)

在终端窗口中,执行如下的作业提交命令:

$ hadoop jar com.xueai8-1.0-SNAPSHOT.jar com.xueai8.reducejoin.SimpleReduceJoinDriver /input/reducejoin/users.txt  /input/reducejoin/logs.txt  /output/reducejoin 

然后查看输出结果。在终端窗口中,执行如下的HDFS命令:

$ hdfs dfs -ls /output/reducejoin/part-r-00000 

可以看到如下的join结果:

bob	71	CA	bob	new_tweet	58.133.120.100
jim	21	OR	jim	login	198.184.237.49
jim	21	OR	jim	new_tweet	93.24.237.12
jim	21	OR	jim	logout	93.24.237.12
marie	27	OR	marie	login	58.133.120.100
marie	27	OR	marie	view_user	122.158.130.90
mike	69	VA	mike	logout	55.237.104.36
mike	69	VA	mike	new_tweet	87.124.79.252

《PySpark原理深入与编程实战》