Flink流应用程序剖析
所有的Flink应用程序都以特定的步骤来工作。这些工作步骤如下图所示:
也就是说,每个Flink程序都由相同的基本部分组成:
- 获取一个执行环境;
- 加载/创建初始数据;
- 指定对该数据的转换;
- 指定计算结果放在哪里;
- 触发程序执行。
1、获取一个执行环境
Flink应用程序从其main()方法中生成一个或多个Flink作业(job)。这些作业可以在本地JVM (LocalEnvironment)中执行,也可以在具有多台机器的集群的远程设置中执行(RemoteEnvironment)。对于每个程序,ExecutionEnvironment提供了控制作业执行(例如设置并行性或容错/检查点参数)和与外部环境交互(数据访问)的方法。
每个Flink应用程序都需要一个执行环境(本例中为env)。流应用程序需要的执行环境使用的是StreamExecutionEnvironment。为了开始编写Flink程序,我们首先需要获得一个现有的执行环境,如果没有的话,就需要先创建一个。根据我们的目的不同,Flink支持以下几种方式:
- 获得一个已经存在的Flink环境
- 创建本地环境
- 创建远程环境
Flink流程序的入口点是StreamExecutionEnvironment类的一个实例,它定义了程序执行的上下文。StreamExecutionEnvironment是所有Flink程序的基础。可以通过使用以下这些静态方法获得一个StreamExecutionEnvironment的实例:
StreamExecutionEnvironment.getExecutionEnvironment() StreamExecutionEnvironment.createLocalEnvironment() StreamExecutionEnvironment.createRemoteEnvironment(String host, int port, String... jarFiles)
要获得执行环境,通常只需要调用getExecutionEnvironment()方法。这将根据我们的上下文选择正确的执行环境。如果正在IDE中的本地环境上执行,那么它将启动一个本地执行环境。如果是从程序中创建了一个JAR文件,并通过命令行调用它,那么Flink集群管理器将执行main()方法,getExecutionEnvironment()将返回用于在集群上以分布式方式执行程序的执行环境。
在上面的示例程序中,我们使用以下语句来获得流程序的执行环境:
Scala代码:
// 设置流执行环境 val env = StreamExecutionEnvironment.getExecutionEnvironment
Java代码:
// 获得流执行环境 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamExecutionEnvironment包含ExecutionConfig,可使用它为运行时设置特定于作业的配置值。例如,我们如果要设置设置自动水印发送间隔,可以像下面这样在代码进行配置。
Scala代码:
val env = StreamExecutionEnvironment.getExecutionEnvironment env.getConfig.setAutoWatermarkInterval(long milliseconds)
Java代码:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.getConfig().setAutoWatermarkInterval(long milliseconds);
2、加载/创建初始数据
执行环境可以从多种数据源读取数据,包括文本文件、CSV文件、Socket套接字数据等,也可以使用自定义的数据输入格式。例如,要将文本文件读取为行序列,可以使用以下命令:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamtext = env.readTextFile("file:///path/to/file");
数据被逐行读取到内存后,Flink会将它们组织到DataStream中,这是Flink中用来表示流数据的特殊类。
在上一节的示例程序中,我们使用fromElements()方法读取集合数据,并将读取的数据存储为DataStream类型:
Scala代码:
// 读取数据源,构造数据流 val personDS = env.fromElements( Person("张三", 21), Person("李四", 16), Person("王老五", 35) )
Java代码:
// 读取数据源,构造DataStream DataStreampersonDS = env.fromElements( new Person("张三", 21), new Person("李四", 16), new Person("王老五", 35));
3、对数据进行转换
每个Flink程序都对分布式数据集合执行转换。Flink的DataStream API提供了多种数据转换功能,包括过滤、映射、连接、分组和聚合。例如,下面是一个map转换代码,通过将原始集合中的每个字符串转换为整数来创建一个新的DataStream:
DataStreaminput = env.fromElements("12","3","25","5","32","6"); DataStream parsed = input.map(new MapFunction () { @Override public Integer map(String value) { return Integer.parseInt(value); } });
在上节的示例程序中,我们使用了filter过滤转换,将原始数据集转换为只包含成年人信息的新DataStream流:
Scala代码:
// 对数据流执行filter转换 val adults = personDS.filter(_.age>18)
Java代码:
// 对数据流执行filter转换 DataStreamadults = flintstones.filter( new FilterFunction () { @Override public boolean filter(Person person) throws Exception { return person.age >= 18; } });
这里不必了解每个转换的具体含义,后面我们会详细介绍它们。需要强调的是,Flink中的转换是惰性的,在调用sink操作之前不会真正执行。
4、指定计算结果放在哪里
一旦有了包含最终结果的DataStream,就可以通过创建接收器(sink)将其写入外部系统。在上面的示例程序中,我们将计算结果打印输出到屏幕上。
Scala代码:
// 输出结果 adults.print
Java代码:
// 输出结果 adults.print();
Flink中的接收器(sink)操作触发流的执行,以生成程序所需的结果,例如将结果保存到文件系统或将其打印到标准输出。上面的示例使用adults.print()将结果打印到任务管理器日志中(在IDE中运行时,任务管理器日志将显示在IDE的控制台中)。这将对流的每个元素调用其toString()方法。
5、触发程序执行
一旦写好了程序处理逻辑,就需要通过调用StreamExecutionEnvironment上的execute()来触发程序执行。所有的Flink程序都是延迟执行的:当程序的主方法执行时,数据加载和转换不会直接发生,而是创建每个运算并添加到程序的执行计划中。当执行环境上的execute()调用显式触发执行时,这些操作才实际上被执行。程序是在本地执行还是提交到集群中执行取决于ExecutionEnvironment的类型。延迟计算可以让我们构建复杂的程序,然后Flink将其作为一个整体计划的单元执行。
在上一节的示例程序中,我们使用如下代码来触发流处理程序的执行。
Scala代码:
// 触发流程序执行 env.execute("Flink Streaming Job") // 参数是程序名称,会显示在Web UI界面上
Java代码:
// 触发流程序执行 env.execute("Flink Streaming Job"); // 参数是程序名称,会显示在Web UI界面上
在应用程序中进行的DataStream API调用将构建一个附加到StreamExecutionEnvironment的作业图(Job Graph)。调用env.execute()时,此图被打包并发送到Flink Master,该Master并行化作业并将其片段分发给TaskManagers以供执行。作业的每个并行片段将在一个task slot(任务槽)中执行。