MapReduce基础案例08-大数据排序

“数据排序”是许多实际任务执行时要完成的第一项工作,比如学生成绩评比、数据建立索引等。 这个实例和数据去重类似,都是先对原始数据进行初步处理,为进一步的数据操作打好基础。 下面就进入这个实例的MapReduce程序设计。

对输入文件中数据进行排序。输入文件中的每行内容均为一个数字,即一个数据。 要求在输出中每行有两个间隔的数字,其中,第一个代表原始数据在原始数据集中的位次,第二个代表原始数据。

样本数据文件 file1.txt。内容如下:

2
32
654
32
15
756
65223

样本数据文件 file2.txt。内容如下:

5956
22
650
92

样本数据文件 file3.txt。内容如下:

26
54
6

设计思路

这个实例仅仅要求对输入数据进行排序,而在MapReduce过程中本身就有排序,所以我们可以利用这个默认的排序,而不需要自己再实现具体的排序了。

但是在使用之前首先需要了解它的默认排序规则。它是按照key值进行排序的,如果key为封装int的IntWritable类型,那么MapReduce按照数字大小对key排序,如果key为封装为String的Text类型,那么MapReduce按照字典顺序对字符串排序。

了解了这个细节,我们就知道应该使用封装int的IntWritable型数据结构了。也就是在map中将读入的数据转化成IntWritable型,然后作为key值输出(value任意)。reduce拿到之后,将输入的key作为value输出,并根据value-list中元素的个数决定输出的次数。输出的key(即代码中的linenum)是一个全局变量,它统计当前key的位次。需要注意的是这个程序中没有配置Combiner,也就是在MapReduce过程中不使用Combiner。这主要是因为使用map和reduce就已经能够完成任务了。

一、创建Java Maven项目

Maven依赖:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>HadoopDemo</groupId>
    <artifactId>com.xueai8</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <!--hadoop依赖-->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>3.3.1</version>
        </dependency>
        <!--hdfs文件系统依赖-->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>3.3.1</version>
        </dependency>
        <!--MapReduce相关的依赖-->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-core</artifactId>
            <version>3.3.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
            <version>3.3.1</version>
        </dependency>
        <!--junit依赖-->
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <!--编译器插件用于编译拓扑-->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <!--指定maven编译的jdk版本和字符集,如果不指定,maven3默认用jdk 1.5 maven2默认用jdk1.3-->
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>1.8</source> <!-- 源代码使用的JDK版本 -->
                    <target>1.8</target> <!-- 需要生成的目标class文件的编译版本 -->
                    <encoding>UTF-8</encoding><!-- 字符集编码 -->
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

SortMapper.java:

package com.xueai8.sort;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
 *
 * 大数据排序
 */
public class SortMapper extends Mapper<Object,Text,IntWritable,IntWritable>{

    private static IntWritable data = new IntWritable();

    //实现map函数:将输入中的value化成IntWritable类型,作为输出的key
    @Override
    public void map(Object key, Text value, Context context) throws IOException,InterruptedException{
        String line = value.toString();
        data.set(Integer.parseInt(line));
        context.write(data, new IntWritable(1));
    }
}

SortReducer.java

package com.xueai8.sort;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
 *
 * 大数据排序
 */
public class SortReducer extends Reducer<IntWritable,IntWritable,IntWritable,IntWritable> {

    // 注意:linenum必须定义在此,并且是static静态类型的
    private static IntWritable linenum = new IntWritable(1);

    // 实现reduce函数
    // 将输入中的key复制到输出数据的key上,
    // 然后根据输入的value-list中元素的个数决定key的输出次数
    // 用全局linenum来代表key的排名
    @Override
    public void reduce(IntWritable key, Iterable<IntWritable> values,Context context)
            throws IOException,InterruptedException{
        for(IntWritable val : values){
            context.write(linenum, key);
            linenum.set(linenum.get() + 1);
        }
    }
}

SortDriver.java:

package com.xueai8.sort;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

/**
 *
 * 大数据排序
 */
public class SortDriver {
    public static void main(String[] args)  throws Exception{

        Configuration conf = new Configuration();

        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
        if (otherArgs.length != 2) {
            System.err.println("用法: SortDriver <in> <out>");
            System.exit(2);
        }

        Job job = Job.getInstance(conf, "Data Sort");
        job.setJarByClass(SortDriver.class);

        //设置Map和Reduce处理类
        job.setMapperClass(SortMapper.class);
        job.setReducerClass(SortReducer.class);

        //设置输出类型
        job.setOutputKeyClass(IntWritable.class);
        job.setOutputValueClass(IntWritable.class);

        //设置输入和输出目录
        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

        // 提交作业
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

二、配置log4j

在src/main/resources目录下新增log4j的配置文件log4j.properties,内容如下:

log4j.rootLogger = info,stdout

### 输出信息到控制抬 ###
log4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target = System.out
log4j.appender.stdout.layout = org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern = [%-5p] %d{yyyy-MM-dd HH:mm:ss,SSS} method:%l%n%m%n

三、项目打包

打开IDEA下方的终端窗口terminal,执行"mvn clean package"打包命令,如下图所示:

如果一切正常,会提示打jar包成功。如下图所示:

这时查看项目结构,会看到多了一个target目录,打好的jar包就位于此目录下。如下图所示:

四、项目部署

请按以下步骤执行。

1、启动HDFS集群和YARN集群。在Linux终端窗口中,执行如下的脚本:

    $ start-dfs.sh
    $ start-yarn.sh

查看进程是否启动,集群运行是否正常。在Linux终端窗口中,执行如下的命令:

    $ jps

这时应该能看到有如下5个进程正在运行,说明集群运行正常:

    5542 NodeManager
    5191 SecondaryNameNode
    4857 NameNode
    5418 ResourceManager
    4975 DataNode

2、将数据文件sample.txt上传到HDFS的/data/mr/目录下。

$ hdfs dfs -mkdir -p /data/mr
$ hdfs dfs -put file1.txt /data/mr/
$ hdfs dfs -put file2.txt /data/mr/
$ hdfs dfs -put file3.txt /data/mr/
$ hdfs dfs -ls /data/mr/

3、提交作业到Hadoop集群上运行。(如果jar包在Windows下,请先拷贝到Linux中。)

在终端窗口中,执行如下的作业提交命令:

$ hadoop jar com.xueai8-1.0-SNAPSHOT.jar com.xueai8.sort.SortDriver /data/mr /data/mr-output 

4、查看输出结果。

在终端窗口中,执行如下的HDFS命令,查看输出结果:

$ hdfs dfs -ls /data/mr-output 
$ hdfs dfs -cat /data/mr-output/part-r-00000

可以得到类似下面这样的输出结果:

1	2
2	6
3	15
4	22
5	26
6	32
7	32
8	54
9	92
10	650
11	654
12	756
13	5956
14	65223

《Flink原理深入与编程实战》