MapReduce表连接_map端连接

在Hadoop数据处理管道中,通过使用过滤和投影可以减少处理的数据量,通过使用下推优化技术来改善数据管道。

应该尽可能地靠近数据源执行过滤和投影;在MapReduce中,最好是在mapper中执行这个工作。例如下面的代码执行过滤:

@Override
protected void map(LongWritable offset, Text value, Context context) {
    User user = User.fromText(value);
    if (user.getAge() >= 30) {			// 过滤掉30岁以下的用户
        context.write(new Text(user.getName()),new Text(user.getState()));	// 只投影用户名和所在地区
    }
}   

同样,在对两个数据集进行join连接时,也要尽可能在map端(mapper中)执行。这样可以避免数据到reducer的shuffle和排序。

因此,作为一般策略,首选是map端连接。这里,我们探讨三种不同风格的map端连接:

  • 有一个数据集小到足够放入内存缓存;
  • 有一个数据集经过过滤后可以放入内存缓存中;(在两个数据集中都存在join key);
  • 数据被排序并以某种方式跨文件分发。

我们首先看第一种情况:连接两个数据集,其中有一个数据集小到足够放入内存缓存

实现思路

map端连接也称为“复制连接”。

复制连接得名于它的具体实现:连接中最小的数据集将会被复制到所有的map主机节点。

复制连接有一个假设前提:在被连接的数据集中,有一个数据集足够小到可以缓存在内存中。 使用分布式缓存来缓存较小的那个数据集,当较大的那个数据集流向mapper时与其执行连接。

MapReduce复制连接工作原理如下:

  • 1) 使用分布式缓存(Districubted cache)将这个小数据集复制到所有运行map任务的节点。
  • 2) 用各个map任务初始化方法将这个小数据集装载到一个哈希表(hashtable)中。
  • 3) 使用输入到map函数的大数据集的每条记录的key来查找这个小数据集的哈希表,并在这个大数据集记录和匹配该连接值的小数据集的所有记录间执行一个连接。
  • 4) 输出符合连接条件的结果。

案例描述

有两个数据集mylogs_tsv.txt和ip_country_tsv.txt。其中mylogs_tsv.txt中数据为web网站日志记录,ip_country_tsv.txt中数据为ip地址和国家代码映射信息。

文件mylogs_tsv.txt:

221.220.8.0	1341391325000	/index.html	200	140	Mozilla/5.0 (Windows; U; Windows NT 6.1; rv:2.2) Gecko/20110201
166.36.182.90	1341365098000	/cart.html	200	140	Mozilla/5.0 (Windows; U; Windows NT 6.1; rv:2.2) Gecko/20110201
221.220.8.0	1341376657000	/about.html	200	140	Mozilla/5.0 (Windows; U; Windows NT 6.1; rv:2.2) Gecko/20110201
213.59.105.58	1341388286000	/index.html	200	140	Mozilla/5.0 (Windows; U; Windows NT 6.1; rv:2.2) Gecko/20110201
229.48.154.221	1341393014000	/index.html	200	140	Mozilla/5.0 (Windows; U; Windows NT 6.1; rv:2.2) Gecko/20110201
166.36.182.90	1341385550000	/cart.html	200	140	Mozilla/5.0 (Windows; U; Windows NT 6.1; rv:2.2) Gecko/20110201
213.59.105.58	1341383826000	/index.html	200	140	Mozilla/5.0 (Windows; U; Windows NT 6.1; rv:2.2) Gecko/20110201
166.36.182.90	1341397694000	/index.html	200	140	Mozilla/5.0 (Windows; U; Windows NT 6.1; rv:2.2) Gecko/20110201
213.59.105.58	1341397811000	/index.html	200	140	Mozilla/5.0 (Windows; U; Windows NT 6.1; rv:2.2) Gecko/20110201
229.48.154.221	1341381934000	/about.html	200	140	Mozilla/5.0 (Windows; U; Windows NT 6.1; rv:2.2) Gecko/20110201
......

文件ip_country_tsv.txt:

213.59.105.58	CA
166.36.182.90	UK
221.220.8.0	US
229.48.154.221	UK

希望经过MapReduce连接处理以后,输出如下形式的结果:

US      221.220.8.0     1341391325000   /index.html     200     140     Mozilla/5.0 (Windows; U; Windows NT 6.1; rv:2.2) Gecko/20110201
UK      166.36.182.90   1341365098000   /cart.html      200     140     Mozilla/5.0 (Windows; U; Windows NT 6.1; rv:2.2) Gecko/20110201
US      221.220.8.0     1341376657000   /about.html     200     140     Mozilla/5.0 (Windows; U; Windows NT 6.1; rv:2.2) Gecko/20110201
CA      213.59.105.58   1341388286000   /index.html     200     140     Mozilla/5.0 (Windows; U; Windows NT 6.1; rv:2.2) Gecko/20110201
UK      229.48.154.221  1341393014000   /index.html     200     140     Mozilla/5.0 (Windows; U; Windows NT 6.1; rv:2.2) Gecko/20110201
UK      166.36.182.90   1341385550000   /cart.html      200     140     Mozilla/5.0 (Windows; U; Windows NT 6.1; rv:2.2) Gecko/20110201
CA      213.59.105.58   1341383826000   /index.html     200     140     Mozilla/5.0 (Windows; U; Windows NT 6.1; rv:2.2) Gecko/20110201
UK      166.36.182.90   1341397694000   /index.html     200     140     Mozilla/5.0 (Windows; U; Windows NT 6.1; rv:2.2) Gecko/20110201
CA      213.59.105.58   1341397811000   /index.html     200     140     Mozilla/5.0 (Windows; U; Windows NT 6.1; rv:2.2) Gecko/20110201
UK      229.48.154.221  1341381934000   /about.html     200     140     Mozilla/5.0 (Windows; U; Windows NT 6.1; rv:2.2) Gecko/20110201
UK      166.36.182.90   1341365729000   /index.html     200     140     Mozilla/5.0 (Windows; U; Windows NT 6.1; rv:2.2) Gecko/20110201
CA      213.59.105.58   1341372154000   /index.html     200     140     Mozilla/5.0 (Windows; U; Windows NT 6.1; rv:2.2) Gecko/20110201
......

一、创建Java Maven项目

Maven依赖:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>HadoopDemo</groupId>
    <artifactId>com.xueai8</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <!--hadoop依赖-->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>3.3.1</version>
        </dependency>
        <!--hdfs文件系统依赖-->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>3.3.1</version>
        </dependency>
        <!--MapReduce相关的依赖-->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-core</artifactId>
            <version>3.3.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
            <version>3.3.1</version>
        </dependency>
        <!--junit依赖-->
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <!--编译器插件用于编译拓扑-->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <!--指定maven编译的jdk版本和字符集,如果不指定,maven3默认用jdk 1.5 maven2默认用jdk1.3-->
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>1.8</source> <!-- 源代码使用的JDK版本 -->
                    <target>1.8</target> <!-- 需要生成的目标class文件的编译版本 -->
                    <encoding>UTF-8</encoding><!-- 字符集编码 -->
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

WeblogMapper.java:

Mapper类。使用分布式缓存,将小的数据集缓存到各个节点。然后在Mapper中读取出来。

package com.xueai8.mapjoin;

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 *
 * map端连接:将其中较小的数据集加载到内存中
 */
public class WeblogMapper extends Mapper<Object, Text, Text, Text> {

    private static final Logger logger = LoggerFactory.getLogger(WeblogMapper.class);

    public static final String IP_COUNTRY_TABLE_FILENAME = "ip_country_tsv.txt";
    private Map<String, String> ipCountryMap = new HashMap<>();

    private Text outputKey = new Text();
    private Text outputValue = new Text();

    // 先从分布式缓存中读取被缓存到本地的文件
    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        URI[] patternsURIs = context.getCacheFiles();               // 获取缓存文件的uri
        Path patternsPath = new Path(patternsURIs[0].getPath());    // 这里我们只缓存了一个文件
        String patternsFileName = patternsPath.getName();           // 获得缓存文件的文件名
        logger.warn("patternsFileName :"+patternsFileName);

        // 从分布式缓存中读取ip_country_tsv.txt,并存入到HashMap中
        if (IP_COUNTRY_TABLE_FILENAME.equals(patternsFileName)) {
            BufferedReader br = new BufferedReader(new FileReader(patternsFileName));
            String line;
            while ((line=br.readLine()) != null) {
                String[] tokens = line.split("\t");
                String ip = tokens[0];
                String country = tokens[1];
                ipCountryMap.put(ip, country);                      // 放入HashMap中
            }
            br.close();
        }

        if (ipCountryMap.isEmpty()) {
            throw new IOException("无法加载 ip country 表!");
        }
    }

    @Override
    protected void map(Object key, Text value, Context context)
            throws IOException, InterruptedException {
        String row = value.toString();
        String[] tokens = row.split("\t");

        String ip = tokens[0];			// 取每条日志的ip字段
        String country = ipCountryMap.get(ip);	// 根据ip,找到对应的国家名称

        outputKey.set(country);
        outputValue.set(row);

        context.write(outputKey, outputValue);
    }
}

不需要Reducer类。

MapSideJoin.java:

驱动程序类。在这里要对小数据集进行分布式缓存。

package com.xueai8.mapjoin;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
 *
 * map端连接:只有mapper,没有reducer,避免了数据 shuffle
 */
public class MapSideJoin extends Configured implements Tool {

    public static void main(String[] args) throws Exception {
        int returnCode = ToolRunner.run(new MapSideJoin(), args);
        System.exit(returnCode);
    }

    public int run(String[] args) throws Exception {
        // 需要从命令行传入输入输出路径参数
        if (args.length < 2) {
            System.err.println("用法: MapSideJoin <input_path> <output_path> ");
            System.exit(-1);
        }

        Configuration conf = getConf();
        Job job = Job.getInstance(conf, "MapSideJoin");

        // 将nobots_ip_country_tsv.txt文件放入分布式缓存中
        job.addCacheFile(new Path("libs/ip_country_tsv.txt").toUri());

        job.setJarByClass(getClass());
        job.setMapperClass(WeblogMapper.class);
        job.setNumReduceTasks(0);     // 没有reducer

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);

        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        boolean flag = job.waitForCompletion(true);
        return flag? 0 : 1;
    }
}

二、配置log4j

在src/main/resources目录下新增log4j的配置文件log4j.properties,内容如下:

log4j.rootLogger = info,stdout

### 输出信息到控制抬 ###
log4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target = System.out
log4j.appender.stdout.layout = org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern = [%-5p] %d{yyyy-MM-dd HH:mm:ss,SSS} method:%l%n%m%n

三、项目打包

打开IDEA下方的终端窗口terminal,执行"mvn clean package"打包命令,如下图所示:

如果一切正常,会提示打jar包成功。如下图所示:

这时查看项目结构,会看到多了一个target目录,打好的jar包就位于此目录下。如下图所示:

四、项目部署

请按以下步骤执行。

1、启动HDFS集群和YARN集群。在Linux终端窗口中,执行如下的脚本:

$ start-dfs.sh
$ start-yarn.sh

查看进程是否启动,集群运行是否正常。在Linux终端窗口中,执行如下的命令:

$ jps

这时应该能看到有如下5个进程正在运行,说明集群运行正常:

    5542 NodeManager
    5191 SecondaryNameNode
    4857 NameNode
    5418 ResourceManager
    4975 DataNode

2、将数据文件mylogs_tsv.txt上传到HDFS的/data/mr/目录下。

$ hdfs dfs -mkdir -p /data/mr
$ hdfs dfs -put mylogs_tsv.txt /data/mr/
$ hdfs dfs -ls /data/mr/

3、将用于缓存的小数据文件ip_country_tsv.txt上传到HDFS的libs/目录下。

$ hdfs dfs -mkdir -p libs
$ hdfs dfs -put ip_country_tsv.txt libs/
$ hdfs dfs -ls libs/

4、提交作业到Hadoop集群上运行。(如果jar包在Windows下,请先拷贝到Linux中。)

在终端窗口中,执行如下的作业提交命令:

$ hadoop jar com.xueai8-1.0-SNAPSHOT.jar com.xueai8.mapjoin.MapSideJoin /data/mr /data/mr-output 

5、查看输出结果。

在终端窗口中,执行如下的HDFS命令,查看输出结果:

$ hdfs dfs -ls /data/mr-output 

可以看到如下的输出结果:

US	221.220.8.0	1341391325000	/index.html	200	140	Mozilla/5.0 (Windows; U; Windows NT 6.1; rv:2.2) Gecko/20110201
UK	166.36.182.90	1341365098000	/cart.html	200	140	Mozilla/5.0 (Windows; U; Windows NT 6.1; rv:2.2) Gecko/20110201
US	221.220.8.0	1341376657000	/about.html	200	140	Mozilla/5.0 (Windows; U; Windows NT 6.1; rv:2.2) Gecko/20110201
CA	213.59.105.58	1341388286000	/index.html	200	140	Mozilla/5.0 (Windows; U; Windows NT 6.1; rv:2.2) Gecko/20110201
UK	229.48.154.221	1341393014000	/index.html	200	140	Mozilla/5.0 (Windows; U; Windows NT 6.1; rv:2.2) Gecko/20110201
UK	166.36.182.90	1341385550000	/cart.html	200	140	Mozilla/5.0 (Windows; U; Windows NT 6.1; rv:2.2) Gecko/20110201
......

《Flink原理深入与编程实战》