MapReduce案例-读取数据库表

现在,在ETL的世界中,读取数据库并处理数百万条记录可能是主要的操作。

Hadoop确实提供了各种数据类型,如IntWritable、FloatWritable、DoubleWritable等, 但是要访问数据库的话,我们需要实现自定义OutputWritable,这将使我们能够将数据行写入数据库表。

为了实现自定义输出可写,我们必须实现DBWritable接口。

问题描述

考虑一个假设的情况,我们有一个在线零售商店的数据库,我们有一个包含数百万行的User表,我们有兴趣使用Mapreduce读取这个表。 为了简单起见,我们将读取表,并在HDFS中以key-value对的形式发送记录。

假设有一个名为users的MySql表。表中包含user_id、user_name、department 其3列。

user_id   	user_name        department           
------- -------------- --------------------------------------------- 
1         	张三     	大数据部门 
2         	李四       	人力资源部        
3         	王老五          Java                 
4         	赵小六          Web开发部      
5         	钱小七          设计部

准备工作

1)在MySQL中创建数据表:

-- 创建数据库 xueai8
create database xueai8;

-- 打开数据库 xueai8
use xueai8;

-- 在当前数据库中创建用户表users
CREATE TABLE users (
  user_id       INT NOT NULL AUTO_INCREMENT,
  user_name     VARCHAR(45) NOT NULL,
  Department    VARCHAR(45) NOT NULL,
  PRIMARY KEY (user_id)
);

-- 向users表中插入样本数据
insert into users values
(1, '张三', '大数据部门'),
(2, '李四', '人力资源部'),
(3, '王老五', 'Java'),
(4, '赵小六', 'Web开发部'),
(5, '小七', '设计部');

2)读取RDBMS需要从Hadoop到MySql的数据库连接,所以我们需要将相关的驱动程序jar文件放到Hadoop的lib文件夹中, 所以在我们的例子中,我们已经将mysql-connector-java-5.1.38-bin.jar文件复制到HADOOP common的lib文件夹中。 另外,我们必须将该jar文件复制到启动作业的机器上。

一、创建Java Maven项目

Maven依赖:注意,需要增加mysql驱动依赖。

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>HadoopDemo</groupId>
    <artifactId>com.xueai8</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <!--hadoop依赖-->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>3.3.1</version>
        </dependency>
        <!--hdfs文件系统依赖-->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>3.3.1</version>
        </dependency>
        <!--MapReduce相关的依赖-->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-core</artifactId>
            <version>3.3.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
            <version>3.3.1</version>
        </dependency>
        <!--mysql驱动程序-->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.49</version>
        </dependency>
        <!--junit依赖-->
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <!--编译器插件用于编译拓扑-->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <!--指定maven编译的jdk版本和字符集,如果不指定,maven3默认用jdk 1.5 maven2默认用jdk1.3-->
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>1.8</source> <!-- 源代码使用的JDK版本 -->
                    <target>1.8</target> <!-- 需要生成的目标class文件的编译版本 -->
                    <encoding>UTF-8</encoding><!-- 字符集编码 -->
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

DBInputWritable.java:

从数据库读取或写入的对象应该实现DBWritable。 DBWritable类似于Writable,但write方法参数采用PreparedStatement,而readFields方法参数采用ResultSet。 实现负责将对象的字段写入PreparedStatement,并从ResultSet读取对象的字段。

package com.xueai8.fromysql;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;

public class DBInputWritable implements Writable, DBWritable{
    
    private int userId;
    private String userName,department;

    // 写数据表的方法
    public void write(PreparedStatement statement) throws SQLException {
        statement.setInt(1, userId);
        statement.setString(2, userName);
        statement.setString(3, department);

    }
    
    // 读数据表的方法
    public void readFields(ResultSet resultSet) throws SQLException {
        userId = resultSet.getInt(1);
        userName = resultSet.getString(2);
        department=resultSet.getString(3);
    }
    
    public void write(DataOutput out) throws IOException {
    }
    
    public void readFields(DataInput in) throws IOException {
    }
    
    public int getUserId() {
        return userId;
    }
    
    public String getUserName() {
        return userName;
    }
    
    public String getDepartment() {
        return department;
    }
}

DBMapper.java:

Mapper类。使用Mapper并行读取数据表记录,并构建DBInputWritable对象,以key写入到HDFS中。

package com.xueai8.fromysql;

import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class DBMapper extends Mapper<LongWritable, DBInputWritable, Text, NullWritable> {
    
    private Text outKey = new Text();
    
    @Override
    protected void map(LongWritable id, DBInputWritable value, Context context) {
        try{
            String userDetails = value.getUserName() + "," + value.getDepartment();
            outKey.set(userDetails);
            context.write(outKey, NullWritable.get());
        } catch (IOException | InterruptedException ioException) {
            ioException.printStackTrace();
        }
    }
}

DBReducer.java:

无。本例中不需要Reducer。

DBDriver.java:

驱动程序。注意DBInputFormat类和输入源的设置。

package com.xueai8.fromysql;

import com.xueai8.tomysql.DBReducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class DBDriver extends Configured implements Tool {

    public static void main(String[] args) throws Exception {
        // 数据输入路径和输出路径
        int ec = ToolRunner.run(new Configuration(), new DBDriver(), args);
        System.exit(ec);
    }

    @Override
    public int run(String[] args) throws Exception {
        if(args.length != 1) {
            System.out.println("用法: DBDriver <output>");
            System.exit(1);
        }

		/*
		 * 配置数据库.主要包括以下几项:
		 * 1、数据库驱动的名称:com.mysql.jdbc.Driver
		 * 2、数据库URL:jdbc:mysql://localhost:3306/xueai8
		 * 3、用户名:root
		 * 4、密码:admin
		 */	
        DBConfiguration.configureDB(getConf(),
                "com.mysql.jdbc.Driver",            // 驱动程序类
                "jdbc:mysql://192.168.190.133:3306/xueai8",     // db url
                "root",                             // 账号
                "admin");                           // 密码

        // 新建一个任务
        Job job = Job.getInstance(getConf(), "DB Demo");
        job.setJarByClass(DBDriver.class);

        job.setMapperClass(DBMapper.class);
        job.setNumReduceTasks(0);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(NullWritable.class);

        // reducer输出格式
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);

        // 需要指定输入格式为DBInputFormat(默认就是TextInputFormat)
        job.setInputFormatClass(DBInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);

        // 设置输入源为DBMS
        DBInputFormat.setInput(job,
                DBInputWritable.class,
                "users",    // input table name
                null,       // where
                null,       // orderBy
                "user_id", "user_name", "department");  // table columns
        // 输出到HDFS路径
        FileOutputFormat.setOutputPath(job, new Path(args[0]));

        // 提交任务
        return job.waitForCompletion(true) ? 0 : 1;
    }
}

二、配置log4j

在src/main/resources目录下新增log4j的配置文件log4j.properties,内容如下:

log4j.rootLogger = info,stdout

### 输出信息到控制抬 ###
log4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target = System.out
log4j.appender.stdout.layout = org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern = [%-5p] %d{yyyy-MM-dd HH:mm:ss,SSS} method:%l%n%m%n

三、项目打包

打开IDEA下方的终端窗口terminal,执行"mvn clean package"打包命令,如下图所示:

如果一切正常,会提示打jar包成功。如下图所示:

这时查看项目结构,会看到多了一个target目录,打好的jar包就位于此目录下。如下图所示:

四、项目部署

请按以下步骤执行。

1、启动HDFS集群和YARN集群。在Linux终端窗口中,执行如下的脚本:

    $ start-dfs.sh
    $ start-yarn.sh

查看进程是否启动,集群运行是否正常。在Linux终端窗口中,执行如下的命令:

    $ jps

这时应该能看到有如下5个进程正在运行,说明集群运行正常:

    5542 NodeManager
    5191 SecondaryNameNode
    4857 NameNode
    5418 ResourceManager
    4975 DataNode

2、提交作业到Hadoop集群上运行。(如果jar包在Windows下,请先拷贝到Linux中。)

在终端窗口中,执行如下的作业提交命令(注意,只有一个命令行参数,为读取的csv文件的路径):

$ hadoop jar com.xueai8-1.0-SNAPSHOT.jar com.xueai8.fromysql.DBDriver /data/mr 

3、查看输出结果。

在终端窗口中,执行如下的HDFS命令,查看输出结果:

$ hdfs dfs -ls /data/mr-output 

可以看到如下的输出结果:

张三,大数据部门
李四,人力资源部
王老五,Java
赵小六,Web开发部
小七,设计部

《Flink原理深入与编程实战》