自定义数据分区

默认情况下,Flink应用程序是使用key的哈希分区或随机分区。但在有的情况下,我们需要自定义分区规则,这就需要自己来定义分区器(分区程序)。

【示例】(Scala实现)使用自定义分区程序,按年龄对数据流元素划分分区。

1、在IntelliJ IDEA中创建一个Flink项目,使用flink-quickstart-scala项目模板。(Flink项目创建过程,请参见2.2节)

2、设置依赖。在pom.xml中添加如下依赖(根据项目模板创建的话,这个依赖会自动添加,此步可省略):

Scala Maven依赖:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-scala_2.12</artifactId>
    <version>1.13.2</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-scala_2.12</artifactId>
    <version>1.13.2</version>
    <scope>provided</scope>
</dependency>

3、创建一个case class类,用来表示流中的数据类型。代码如下:

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

/**
  * 自定义分区
  */
object PartitionDemo1 {

  // 定义case class类,表示流数据类型
  case class Person(name:String, age:Int)

  def main(args: Array[String]): Unit = {
    // 设置批处理执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    // 触发流程序开始执行
    env.execute("stream demo")
  }
}

4、自定义分区类,实现了Partitioner接口,按年龄(age字段)划分分区。这里我们按年龄把流数据分为三个分区:20岁以下的、20~30岁以及30岁以上的。代码如下:

import org.apache.flink.api.common.functions.Partitioner
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

/**
  * 自定义分区
  */
object PartitionDemo1 {

  // 定义case class类,表示流数据类型
  case class Person(name:String, age:Int)

  // 自定义分区器,以年龄为key
  class AgePartitioner extends Partitioner[Int] {
    // 重写partition方法
    override def partition(key: Int, i: Int): Int = {
      key match {
        case age if age<20 => 0
        case age if age>30 => 2
        case _ => 1
      }
    }
  }

  def main(args: Array[String]): Unit = {
    // 设置批处理执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    // 触发流程序开始执行
    env.execute("stream demo")
  }
}

5、测试自定义分区逻辑。编辑流处理代码如下:

import org.apache.flink.api.common.functions.Partitioner
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

/**
  * 自定义分区
  */
object PartitionDemo1 {

  // 定义case class类,表示流数据类型
  case class Person(name:String, age:Int)

  // 自定义分区器,以年龄为key
  class AgePartitioner extends Partitioner[Int] {
    // 重写partition方法
    override def partition(k: Int, i: Int): Int = {
      k match {
        case age if age<20 => 0
        case age if age>30 => 2
        case _ => 1
      }
    }
  }

  def main(args: Array[String]): Unit = {
    // 设置批处理执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    // 读取数据源,构造DataStream
    val peoples = List(
      Person("张三", 21),
      Person("李四", 16),
      Person("王老五", 35),
      Person("张三2", 22),
      Person("李四2", 17),
      Person("王老五2", 36)
    )
    val personDS = env.fromCollection(peoples)

    // 应用自定义分区器,按年龄字段进行分组
    // 注:通过字段位置指定key只对元组数据类型有效
    val adults = personDS.partitionCustom(new AgePartitioner, "age")

    // 将结果输出到控制台
    adults.print

    // 触发流程序开始执行
    env.execute("stream demo")
  }
}

6、执行以上程序,输出结果如下:

分区之前分区数:8
......
2> Person(张三,21)
1> Person(李四,16)
3> Person(王老五,35)
1> Person(李四2,17)
2> Person(张三2,22)
3> Person(王老五2,36)

从上面的输出结果可以看到,数据在三个分区中分别并行地被处理。

注意:输出结果前面的整数是分区号。

对于简单的自定义分区器,也可以直接使用匿名内部类,以简化代码。如下所示:

import org.apache.flink.api.common.functions.Partitioner
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

/**
  * 自定义分区
  * 对于简单的自定义分区器,也可以直接使用匿名内部类,以简化代码。
  */
object PartitionDemo2 {

  // 定义case class类,表示流数据类型
  case class Person(name:String, age:Int)

  def main(args: Array[String]): Unit = {
    // 设置批处理执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    // 读取数据源,构造DataStream
    val peoples = List(
      Person("张三", 21),
      Person("李四", 16),
      Person("王老五", 35),
      Person("张三2", 22),
      Person("李四2", 17),
      Person("王老五2", 36)
    )
    val personDS = env.fromCollection(peoples)

    // 应用自定义分区器,按年龄字段进行分组
    // 注:通过字段位置指定key只对元组数据类型有效
    val adults = personDS.partitionCustom(new Partitioner[Int] {
      // 重写partition方法
      override def partition(k: Int, i: Int): Int = {
        k match {
          case age if age<20 => 0
          case age if age>30 => 2
          case _ => 1
        }
      }
    }, "age")     // 第二个分组字段参数,也可以使用_.age或person => person.age

    // 将结果输出到控制台
    adults.print

    // 触发流程序开始执行
    env.execute("stream demo")
  }
}

【示例】(Java实现)使用自定义分区程序,按年龄对数据流元素划分分区。

1、在IntelliJ IDEA中创建一个Flink项目,使用flink-quickstart-java项目模板。(Flink项目创建过程,请参见2.2节)

2、设置依赖。在pom.xml中添加如下依赖(根据项目模板创建的话,这个依赖会自动添加,此步可省略):

Java Maven依赖:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-java</artifactId>
    <version>1.13.2</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java_2.12</artifactId>
    <version>1.13.2</version>
    <scope>provided</scope>
</dependency>

3、创建一个POJO类,用来表示流中的数据。代码如下:

import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class PartitionDemo1 {

    // POJO类,表示人员信息实体
    public static class Person {
        public String name;			// 存储姓名
        public Integer age;			// 存储年龄

        // 空构造器
        public Person() {}

        // 构造器,初始化属性
        public Person(String name, Integer age) {
            this.name = name;
            this.age = age;
        }

        // 用于调试时输出信息
        public String toString() {
            return this.name.toString() + ": age " + this.age.toString();
        }
    }

    public static void main(String[] args) throws Exception {
        // 设置流执行环境
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 触发流程序开始执行
        env.execute("stream demo");
    }
}

4、自定义分区类,实现了Partitioner接口,按年龄(age字段)划分分区。这里我们按年龄把流数据分为三个分区:20岁以下的、20~30岁以及30岁以上的。代码如下:

import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class PartitionDemo1 {

    // POJO类,表示人员信息实体
    public static class Person {
        public String name;			// 存储姓名
        public Integer age;			// 存储年龄

        // 空构造器
        public Person() {};

        // 构造器,初始化属性
        public Person(String name, Integer age) {
            this.name = name;
            this.age = age;
        };

        // 用于调试时输出信息
        public String toString() {
            return this.name.toString() + ": age " + this.age.toString();
        };
    }

    // 自定义分区器,以年龄为key
    public static class AgePartitioner implements Partitioner {
        @Override
        public int partition(Integer key, int numPartitions) {
            if(key < 20){
                return 0;
            }else if(key > 30){
                return 2;
            }else{
                return 1;
            }
        }
    }

    public static void main(String[] args) throws Exception {
        // 设置流执行环境
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 触发流程序开始执行
        env.execute("stream demo");
    }
}

5、测试自定义分区逻辑。编辑流处理代码如下:

import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class PartitionDemo1 {

    // POJO类,表示人员信息实体
    public static class Person {
        public String name;			// 存储姓名
        public Integer age;			// 存储年龄

        // 空构造器
        public Person() {};

        // 构造器,初始化属性
        public Person(String name, Integer age) {
            this.name = name;
            this.age = age;
        };

        // 用于调试时输出信息
        public String toString() {
            return this.name.toString() + ": age " + this.age.toString();
        };
    }

    // 自定义分区器,以年龄为key
    public static class AgePartitioner implements Partitioner {
        @Override
        public int partition(Integer key, int numPartitions) {
            if(key < 20){
                return 0;
            }else if(key > 30){
                return 2;
            }else{
                return 1;
            }
        }
    }

    public static void main(String[] args) throws Exception {
        // 设置流执行环境
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 读取数据源,构造DataStream
        DataStream<Person> personDS = env.fromElements(
                new Person("张三", 21),
                new Person("李四", 16),
                new Person("王老五", 35),
                new Person("张三2", 22),
                new Person("李四2", 17),
                new Person("王老五2", 36));

        // 应用自定义分区器,按年龄字段进行分组
        // 注:通过字段位置指定key只对元组数据类型有效
        DataStream<Person> adults = personDS.partitionCustom(new AgePartitioner(), p -> p.age);

        // 将结果输出到控制台
        adults.print();

        // 触发流程序开始执行
        env.execute("stream demo");
    }
}

6、执行以上程序,输出结果如下:

1> 李四: age 16
2> 张三: age 21
3> 王老五: age 35
1> 李四2: age 17
3> 王老五2: age 36
2> 张三2: age 22

从上面的输出结果可以看到,数据在三个分区中分别并行地被处理。

注意:输出结果前面的整数是分区号。


《PySpark原理深入与编程实战》