Flink流处理程序编程模型

Flink程序的基本构建块是stream和transformation(流和转换)。从概念上讲,stream是数据记录的流(可能永远不会结束),transformation是一个运算,它接受一个或多个流作为输入,经过处理/计算后生成一个或多个输出流。

下面我们实现一个完整的、可工作的Flink流应用程序示例。

【示例】将有关人员的记录流作为输入,并从中筛选出未成年人信息。

Scala实现

import org.apache.flink.streaming.api.scala._

/**
  * 将有关人员的记录流作为输入,并从中筛选出未成年人信息。
 */
object StreamingJobDemo1 {
  // 定义事件类
  case class Person(name:String, age:Integer)

  def main(args: Array[String]) {

    // 设置流执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    // 读取数据源,构造数据流
    val peoples = env.fromElements(
      Person("张三", 21),
      Person("李四", 16),
      Person("王老五", 35)
    )

    // 对数据流执行filter转换
    val adults = peoples.filter(_.age>18)

    // 输出结果
    adults.print

    // 执行
    env.execute("Flink Streaming Job")
  }
}

执行代码,输出结果如下:

7> Person(张三,21)
1> Person(王老五,35)

Java实现

(1)在IntelliJ IDEA中创建一个Flink项目,使用flink-quickstart-java项目模板。

(2)设置依赖。在pom.xml中添加如下依赖:

<dependency>
	<groupId>org.apache.flink</groupId>
	<artifactId>flink-java</artifactId>
	<version>1.13.2</version>
	<scope>provided</scope>
</dependency>
<dependency>
	<groupId>org.apache.flink</groupId>
	<artifactId>flink-streaming-java_2.12</artifactId>
	<version>1.13.2</version>
	<scope>provided</scope>
</dependency>

(3)创建一个POJO类,用来表示流中的数据。代码如下。

// POJO类,表示人员信息实体
public class Person {
	public String name;			// 存储姓名
	public Integer age;			// 存储年龄

	// 空构造器
	public Person() {};		

	// 构造器,初始化属性
	public Person(String name, Integer age) {
		this.name = name;
		this.age = age;
	};

	// 用于调试时输出信息
	public String toString() {
		return this.name.toString() + ": age " + this.age.toString();
	};
}

(4)打开项目中的StreamingJob对象文件,编辑流处理代码如下。

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.api.common.functions.FilterFunction;

public class StreamingJobDemo1 {

	public static void main(String[] args) throws Exception {
		// 获得流执行环境
		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

		// 读取数据源,构造DataStream
		DataStream personDS = env.fromElements(
				new Person("张三", 21),
				new Person("李四", 16),
				new Person("王老五", 35)
		);

		// 执行转换运算(这里是过滤年龄不小于18岁的人)
		// 注意,这里我们使用了匿名函数 
		DataStream<Person> adults = personDS.filter(new FilterFunction() {
			@Override
			public boolean filter(Person person) throws Exception {
				return person.age >= 18;
			}
		});

		// 将结果输出到控制台
		adults.print();

		// 触发流程序开始执行
		env.execute("stream demo");
	}	
}

(5)执行以上程序,输出结果如下。

张三: age 21
王老五: age 35

《PySpark原理深入与编程实战》