自定义数据分区
默认情况下,Flink应用程序是使用key的哈希分区或随机分区。但在有的情况下,我们需要自定义分区规则,这就需要自己来定义分区器(分区程序)。
【示例】(Scala实现)使用自定义分区程序,按年龄对数据流元素划分分区。
1、在IntelliJ IDEA中创建一个Flink项目,使用flink-quickstart-scala项目模板。(Flink项目创建过程,请参见2.2节)
2、设置依赖。在pom.xml中添加如下依赖(根据项目模板创建的话,这个依赖会自动添加,此步可省略):
Scala Maven依赖:
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-scala_2.12</artifactId> <version>1.13.2</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_2.12</artifactId> <version>1.13.2</version> <scope>provided</scope> </dependency>
3、创建一个case class类,用来表示流中的数据类型。代码如下:
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment /** * 自定义分区 */ object PartitionDemo1 { // 定义case class类,表示流数据类型 case class Person(name:String, age:Int) def main(args: Array[String]): Unit = { // 设置批处理执行环境 val env = StreamExecutionEnvironment.getExecutionEnvironment // 触发流程序开始执行 env.execute("stream demo") } }
4、自定义分区类,实现了Partitioner接口,按年龄(age字段)划分分区。这里我们按年龄把流数据分为三个分区:20岁以下的、20~30岁以及30岁以上的。代码如下:
import org.apache.flink.api.common.functions.Partitioner import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment /** * 自定义分区 */ object PartitionDemo1 { // 定义case class类,表示流数据类型 case class Person(name:String, age:Int) // 自定义分区器,以年龄为key class AgePartitioner extends Partitioner[Int] { // 重写partition方法 override def partition(key: Int, i: Int): Int = { key match { case age if age<20 => 0 case age if age>30 => 2 case _ => 1 } } } def main(args: Array[String]): Unit = { // 设置批处理执行环境 val env = StreamExecutionEnvironment.getExecutionEnvironment // 触发流程序开始执行 env.execute("stream demo") } }
5、测试自定义分区逻辑。编辑流处理代码如下:
import org.apache.flink.api.common.functions.Partitioner import org.apache.flink.api.scala._ import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment /** * 自定义分区 */ object PartitionDemo1 { // 定义case class类,表示流数据类型 case class Person(name:String, age:Int) // 自定义分区器,以年龄为key class AgePartitioner extends Partitioner[Int] { // 重写partition方法 override def partition(k: Int, i: Int): Int = { k match { case age if age<20 => 0 case age if age>30 => 2 case _ => 1 } } } def main(args: Array[String]): Unit = { // 设置批处理执行环境 val env = StreamExecutionEnvironment.getExecutionEnvironment // 读取数据源,构造DataStream val peoples = List( Person("张三", 21), Person("李四", 16), Person("王老五", 35), Person("张三2", 22), Person("李四2", 17), Person("王老五2", 36) ) val personDS = env.fromCollection(peoples) // 应用自定义分区器,按年龄字段进行分组 // 注:通过字段位置指定key只对元组数据类型有效 val adults = personDS.partitionCustom(new AgePartitioner, "age") // 将结果输出到控制台 adults.print // 触发流程序开始执行 env.execute("stream demo") } }
6、执行以上程序,输出结果如下:
分区之前分区数:8 ...... 2> Person(张三,21) 1> Person(李四,16) 3> Person(王老五,35) 1> Person(李四2,17) 2> Person(张三2,22) 3> Person(王老五2,36)
从上面的输出结果可以看到,数据在三个分区中分别并行地被处理。
注意:输出结果前面的整数是分区号。
对于简单的自定义分区器,也可以直接使用匿名内部类,以简化代码。如下所示:
import org.apache.flink.api.common.functions.Partitioner import org.apache.flink.api.scala._ import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment /** * 自定义分区 * 对于简单的自定义分区器,也可以直接使用匿名内部类,以简化代码。 */ object PartitionDemo2 { // 定义case class类,表示流数据类型 case class Person(name:String, age:Int) def main(args: Array[String]): Unit = { // 设置批处理执行环境 val env = StreamExecutionEnvironment.getExecutionEnvironment // 读取数据源,构造DataStream val peoples = List( Person("张三", 21), Person("李四", 16), Person("王老五", 35), Person("张三2", 22), Person("李四2", 17), Person("王老五2", 36) ) val personDS = env.fromCollection(peoples) // 应用自定义分区器,按年龄字段进行分组 // 注:通过字段位置指定key只对元组数据类型有效 val adults = personDS.partitionCustom(new Partitioner[Int] { // 重写partition方法 override def partition(k: Int, i: Int): Int = { k match { case age if age<20 => 0 case age if age>30 => 2 case _ => 1 } } }, "age") // 第二个分组字段参数,也可以使用_.age或person => person.age // 将结果输出到控制台 adults.print // 触发流程序开始执行 env.execute("stream demo") } }
【示例】(Java实现)使用自定义分区程序,按年龄对数据流元素划分分区。
1、在IntelliJ IDEA中创建一个Flink项目,使用flink-quickstart-java项目模板。(Flink项目创建过程,请参见2.2节)
2、设置依赖。在pom.xml中添加如下依赖(根据项目模板创建的话,这个依赖会自动添加,此步可省略):
Java Maven依赖:
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>1.13.2</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.12</artifactId> <version>1.13.2</version> <scope>provided</scope> </dependency>
3、创建一个POJO类,用来表示流中的数据。代码如下:
import org.apache.flink.api.common.functions.Partitioner; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public class PartitionDemo1 { // POJO类,表示人员信息实体 public static class Person { public String name; // 存储姓名 public Integer age; // 存储年龄 // 空构造器 public Person() {} // 构造器,初始化属性 public Person(String name, Integer age) { this.name = name; this.age = age; } // 用于调试时输出信息 public String toString() { return this.name.toString() + ": age " + this.age.toString(); } } public static void main(String[] args) throws Exception { // 设置流执行环境 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 触发流程序开始执行 env.execute("stream demo"); } }
4、自定义分区类,实现了Partitioner接口,按年龄(age字段)划分分区。这里我们按年龄把流数据分为三个分区:20岁以下的、20~30岁以及30岁以上的。代码如下:
import org.apache.flink.api.common.functions.Partitioner; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public class PartitionDemo1 { // POJO类,表示人员信息实体 public static class Person { public String name; // 存储姓名 public Integer age; // 存储年龄 // 空构造器 public Person() {}; // 构造器,初始化属性 public Person(String name, Integer age) { this.name = name; this.age = age; }; // 用于调试时输出信息 public String toString() { return this.name.toString() + ": age " + this.age.toString(); }; } // 自定义分区器,以年龄为key public static class AgePartitioner implements Partitioner{ @Override public int partition(Integer key, int numPartitions) { if(key < 20){ return 0; }else if(key > 30){ return 2; }else{ return 1; } } } public static void main(String[] args) throws Exception { // 设置流执行环境 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 触发流程序开始执行 env.execute("stream demo"); } }
5、测试自定义分区逻辑。编辑流处理代码如下:
import org.apache.flink.api.common.functions.Partitioner; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public class PartitionDemo1 { // POJO类,表示人员信息实体 public static class Person { public String name; // 存储姓名 public Integer age; // 存储年龄 // 空构造器 public Person() {}; // 构造器,初始化属性 public Person(String name, Integer age) { this.name = name; this.age = age; }; // 用于调试时输出信息 public String toString() { return this.name.toString() + ": age " + this.age.toString(); }; } // 自定义分区器,以年龄为key public static class AgePartitioner implements Partitioner{ @Override public int partition(Integer key, int numPartitions) { if(key < 20){ return 0; }else if(key > 30){ return 2; }else{ return 1; } } } public static void main(String[] args) throws Exception { // 设置流执行环境 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 读取数据源,构造DataStream DataStream<Person> personDS = env.fromElements( new Person("张三", 21), new Person("李四", 16), new Person("王老五", 35), new Person("张三2", 22), new Person("李四2", 17), new Person("王老五2", 36)); // 应用自定义分区器,按年龄字段进行分组 // 注:通过字段位置指定key只对元组数据类型有效 DataStream<Person> adults = personDS.partitionCustom(new AgePartitioner(), p -> p.age); // 将结果输出到控制台 adults.print(); // 触发流程序开始执行 env.execute("stream demo"); } }
6、执行以上程序,输出结果如下:
1> 李四: age 16 2> 张三: age 21 3> 王老五: age 35 1> 李四2: age 17 3> 王老五2: age 36 2> 张三2: age 22
从上面的输出结果可以看到,数据在三个分区中分别并行地被处理。
注意:输出结果前面的整数是分区号。