Flink流应用程序剖析

所有的Flink应用程序都以特定的步骤来工作。这些工作步骤如下图所示:

也就是说,每个Flink程序都由相同的基本部分组成:

  1. 获取一个执行环境;
  2. 加载/创建初始数据;
  3. 指定对该数据的转换;
  4. 指定计算结果放在哪里;
  5. 触发程序执行。

1、获取一个执行环境

Flink应用程序从其main()方法中生成一个或多个Flink作业(job)。这些作业可以在本地JVM (LocalEnvironment)中执行,也可以在具有多台机器的集群的远程设置中执行(RemoteEnvironment)。对于每个程序,ExecutionEnvironment提供了控制作业执行(例如设置并行性或容错/检查点参数)和与外部环境交互(数据访问)的方法。

每个Flink应用程序都需要一个执行环境(本例中为env)。流应用程序需要的执行环境使用的是StreamExecutionEnvironment。为了开始编写Flink程序,我们首先需要获得一个现有的执行环境,如果没有的话,就需要先创建一个。根据我们的目的不同,Flink支持以下几种方式:

  • 获得一个已经存在的Flink环境
  • 创建本地环境
  • 创建远程环境

Flink流程序的入口点是StreamExecutionEnvironment类的一个实例,它定义了程序执行的上下文。StreamExecutionEnvironment是所有Flink程序的基础。可以通过使用以下这些静态方法获得一个StreamExecutionEnvironment的实例:

StreamExecutionEnvironment.getExecutionEnvironment()
StreamExecutionEnvironment.createLocalEnvironment()
StreamExecutionEnvironment.createRemoteEnvironment(String host, int port, String... jarFiles)

要获得执行环境,通常只需要调用getExecutionEnvironment()方法。这将根据我们的上下文选择正确的执行环境。如果正在IDE中的本地环境上执行,那么它将启动一个本地执行环境。如果是从程序中创建了一个JAR文件,并通过命令行调用它,那么Flink集群管理器将执行main()方法,getExecutionEnvironment()将返回用于在集群上以分布式方式执行程序的执行环境。

在上面的示例程序中,我们使用以下语句来获得流程序的执行环境:

Scala代码:

// 设置流执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment

Java代码:

// 获得流执行环境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

StreamExecutionEnvironment包含ExecutionConfig,可使用它为运行时设置特定于作业的配置值。例如,我们如果要设置设置自动水印发送间隔,可以像下面这样在代码进行配置。

Scala代码:

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.getConfig.setAutoWatermarkInterval(long milliseconds)

Java代码:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setAutoWatermarkInterval(long milliseconds);

2、加载/创建初始数据

执行环境可以从多种数据源读取数据,包括文本文件、CSV文件、Socket套接字数据等,也可以使用自定义的数据输入格式。例如,要将文本文件读取为行序列,可以使用以下命令:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream text = env.readTextFile("file:///path/to/file");

数据被逐行读取到内存后,Flink会将它们组织到DataStream中,这是Flink中用来表示流数据的特殊类。

在上一节的示例程序中,我们使用fromElements()方法读取集合数据,并将读取的数据存储为DataStream类型:

Scala代码:

// 读取数据源,构造数据流
val personDS = env.fromElements(
      Person("张三", 21),
      Person("李四", 16),
      Person("王老五", 35)
    )

Java代码:

// 读取数据源,构造DataStream
DataStream personDS = env.fromElements(
				new Person("张三", 21),
				new Person("李四", 16),
				new Person("王老五", 35));

3、对数据进行转换

每个Flink程序都对分布式数据集合执行转换。Flink的DataStream API提供了多种数据转换功能,包括过滤、映射、连接、分组和聚合。例如,下面是一个map转换代码,通过将原始集合中的每个字符串转换为整数来创建一个新的DataStream:

DataStream input = env.fromElements("12","3","25","5","32","6");

DataStream parsed = input.map(new MapFunction() {
    @Override
    public Integer map(String value) {
        return Integer.parseInt(value);
    }
});

在上节的示例程序中,我们使用了filter过滤转换,将原始数据集转换为只包含成年人信息的新DataStream流:

Scala代码:

// 对数据流执行filter转换
val adults = personDS.filter(_.age>18)

Java代码:

// 对数据流执行filter转换
DataStream adults = flintstones.filter(
		new FilterFunction() {
			@Override
			public boolean filter(Person person) throws Exception {
				return person.age >= 18;
			}
		});

这里不必了解每个转换的具体含义,后面我们会详细介绍它们。需要强调的是,Flink中的转换是惰性的,在调用sink操作之前不会真正执行。

4、指定计算结果放在哪里

一旦有了包含最终结果的DataStream,就可以通过创建接收器(sink)将其写入外部系统。在上面的示例程序中,我们将计算结果打印输出到屏幕上。

Scala代码:

// 输出结果
adults.print

Java代码:

// 输出结果
adults.print();

Flink中的接收器(sink)操作触发流的执行,以生成程序所需的结果,例如将结果保存到文件系统或将其打印到标准输出。上面的示例使用adults.print()将结果打印到任务管理器日志中(在IDE中运行时,任务管理器日志将显示在IDE的控制台中)。这将对流的每个元素调用其toString()方法。

5、触发程序执行

一旦写好了程序处理逻辑,就需要通过调用StreamExecutionEnvironment上的execute()来触发程序执行。所有的Flink程序都是延迟执行的:当程序的主方法执行时,数据加载和转换不会直接发生,而是创建每个运算并添加到程序的执行计划中。当执行环境上的execute()调用显式触发执行时,这些操作才实际上被执行。程序是在本地执行还是提交到集群中执行取决于ExecutionEnvironment的类型。延迟计算可以让我们构建复杂的程序,然后Flink将其作为一个整体计划的单元执行。

在上一节的示例程序中,我们使用如下代码来触发流处理程序的执行。

Scala代码:

// 触发流程序执行
env.execute("Flink Streaming Job")		// 参数是程序名称,会显示在Web UI界面上

Java代码:

// 触发流程序执行
env.execute("Flink Streaming Job");		// 参数是程序名称,会显示在Web UI界面上

在应用程序中进行的DataStream API调用将构建一个附加到StreamExecutionEnvironment的作业图(Job Graph)。调用env.execute()时,此图被打包并发送到Flink Master,该Master并行化作业并将其片段分发给TaskManagers以供执行。作业的每个并行片段将在一个task slot(任务槽)中执行。


《PySpark原理深入与编程实战》