Flink架构和执行原理

在大数据领域,有许多流计算框架,但是通常很难兼顾延迟性和吞吐量。Apache Storm提供低延迟,但目前不提供高吞吐量,也不支持在发生故障时正确处理状态。Apache Spark Streaming的微批处理方法实现了高吞吐量的容错性,但是难以实现真正的低延时和实时处理,并且表达能力方面也不是特别丰富。而Apache Flink兼顾了低延迟和高吞吐量,是企业部署流计算时的首选。 表1.1 三种流计算框架比较

流处理框架 高吞吐量 低延迟 易于使用和表达 正确的时间/窗口语义 压力下保持正确性
Storm × × × ×
Spark Streaming × × ×
Flink

Flink架构

Flink 是可以运行在多种不同的环境中的,例如,它可以通过单进程多线程的方式直接运行,从而提供调试的能力。它也可以运行在 Yarn 或者 K8S 这种资源管理系统上面,也可以在各种云环境中执行。

Flink的整体架构如下图所示。

针对不同的执行环境,Flink 提供了一套统一的分布式作业执行引擎,也就是 Flink Runtime(Flink运行时)这一层。Flink 在 Runtime 层之上提供了 DataStream 和 DataSet 两套 API,分别用来编写流作业与批作业,以及一组更高级的 API 来简化特定作业的编写。

Flink runtime是Flink的核心计算结构,这是一个分布式系统,它接受流数据流程序,并在一台或多台机器上以容错的方式执行这些数据流程序。这个运行时可以作为YARN的应用程序在集群中运行,也可以很快在Mesos集群中运行,或者在一台机器中运行(通常用于调试Flink应用程序)。

Flink Runtime 层的主要架构如下图所示,它展示了一个 Flink 集群的基本结构。Flink Runtime 层的整个架构采用了标准 Master-Slave 的结构,即总是由一个Flink Master和一个或多个Flink TaskManager组成。在下面的架构图中,其中左侧的AM(Application Manager)部分即是Master,它负责管理整个集群中的资源并处理作业提交、作业监督;而右侧的两个 TaskExecutor 则是 Slave,这是工作(worker)进程,负责提供具体的资源并实际执行作业。

一个Flink集群总是由一个Flink Master和一个或多个Flink TaskManager组成。Flink Master负责处理作业提交、作业监督以及资源管理。Flink TaskManager是工作(worker)进程,负责执行组成Flink作业的实际任务。

Flink Master是Flink集群的主进程。它包含三个不同的组件:Resource Manager、Dispatcher以及每个运行时Flink作业的JobManager。这三个组件都包含在 AppMaster 进程中。

  • Dispatcher 负责接收用户提供的作业,并且负责为这个新提交的作业拉起一个新的 JobManager 组件。
  • ResourceManager 负责资源的管理,在整个 Flink 集群中只有一个 ResourceManager。
  • JobManager 负责管理作业的执行,在一个 Flink 集群中可能有多个作业同时执行,每个作业都有自己的 JobManager 组件。

TaskManager是一个Flink集群的工作(worker)进程。任务(Tasks)被调度给TaskManager执行。它们彼此通信以在后续任务之间交换数据。

总体来说,Flink运行时由两种类型的进程组成:

  • JobManager:协调分布式执行。他们安排任务、协调检查点、协调故障恢复,等等。至少有一个JobManager。一个高可用性的设置将有多个JobManager,其中一个总是leader,其他的都是standby。
  • TaskManager:执行数据流的任务(或者更具体地说,子任务),并缓冲和交换数据流。必须始终至少有一个TaskManager。

JobManager和TaskManager可以多种方式启动:直接在机器上作为独立集群(standalone)启动,或者在容器中启动,或者由诸如YARN或Mesos之类的资源框架管理。

客户端不是运行时和程序执行的一部分,而是用于准备和向JobManager发送数据流。之后,客户端可以断开连接,或保持连接以接收作业进度报告。客户端可以作为触发执行的Java/Scala程序的一部分运行,也可以在命令行进程(./bin/flink run)中运行。

任务槽和资源

每个worker (TaskManager)都是一个JVM进程,可以在单独的线程中执行一个或多个子任务。为了控制一个worker接受多少任务,一个worker具有所谓的"任务插槽"(task slots,至少一个)。

每个task slot表示TaskManager资源的一个固定子集。例如,一个有三个插槽的TaskManager会将其1/3的托管内存分配给每个插槽。对资源进行插槽化意味着子任务不会与来自其他作业的子任务争夺托管内存,而是拥有一定数量的预留托管内存。注意,这里没有发生CPU隔离;当前插槽只分隔任务的托管内存。

通过调整任务槽的数量,用户可以定义子任务如何彼此隔离。每个TaskManager有一个插槽(slot)意味着每个任务组运行在各自的JVM中(例如,可以在单独的容器中启动JVM)。拥有多个插槽意味着更多的子任务共享同一个JVM。相同JVM中的任务共享TCP连接(通过多路复用)和心跳消息。它们还可以共享数据集和数据结构,从而减少每个任务的开销。

默认情况下,Flink允许子任务共享插槽,即使它们是不同任务的子任务,只要它们来自相同的作业。结果是一个槽可以容纳作业的整个管道。允许这个插槽共享(slot sharing)有两个主要好处:

  • Flink集群需要的任务插槽与作业中使用的最高并行度一样多。不需要计算一个程序总共包含多少任务(具有不同的并行度)。
  • 更容易得到更好的资源利用。如果没有插槽共享,非密集型source/map()子任务将阻塞与资源密集型窗口子任务一样多的资源。使用插槽共享,将我们示例中的基本并行度从2提高到6,可以充分利用插槽资源,同时确保繁重的子任务在TaskManager中得到公平分配。

API还包括一个资源组(resource group)机制,可用于防止不需要的插槽共享。

根据经验,一个好的默认任务槽数应该是CPU内核的数量。使用超线程,每个槽将接受2个或更多的硬件线程上下文。

Flink资源管理

Apache Flink是一个分布式系统,需要计算资源才能执行应用程序。实际上,Flink作业调度可以看做是对资源和任务进行匹配的过程。Flink集成了所有常见的集群资源管理器,如Hadoop YARN、Apache Mesos和Kubernetes,但也可以设置为作为独立集群运行。

补充:在部署Flink应用程序时,Flink根据应用程序配置的并行性自动标识所需的资源,并从资源管理器中请求这些资源。如果发生故障,Flink通过请求新的资源来替换失败的容器。所有提交或控制应用程序的通信都是通过REST调用进行的。这简化了Flink在许多环境中的集成。

在 Flink 中,资源是由 TaskExecutor 上的 Slot 来表示的,每个 Slot 可以用来执行不同的任务(Task)。而 Job 中实际的 Task,包含了待执行的用户逻辑。作业调度的主要目的就是为了给 Task 找到匹配的 Slot。

补充:逻辑上来说,每个 Slot 都应该有一个向量来描述它所能提供的各种资源的量,每个 Task 也需要相应的说明它所需要的各种资源的量。但是实际上在 1.9 之前,Flink 是不支持细粒度的资源描述的,而是统一的认为每个 Slot 提供的资源和 Task 需要的资源都是相同的。从 1.9 开始,Flink 开始增加对细粒度的资源匹配的支持的实现,但这部分功能目前仍在完善中。

在 ResourceManager 中,有一个子组件叫做 SlotManager,它维护了当前集群中所有 TaskExecutor 上的 Slot 的信息与状态,如该 Slot 在哪个 TaskExecutor 中,该 Slot 当前是否空闲等。如下图所示:

当 JobManger 为特定 Task 申请资源的时候,根据当前是 Per-job 还是 Session 模式,ResourceManager 可能会去申请资源来启动新的 TaskExecutor。当 TaskExecutor 启动之后,它会通过服务发现找到当前活跃的 ResourceManager 并进行注册。在注册信息中,会包含该 TaskExecutor中所有 Slot 的信息。 ResourceManager 收到注册信息后,其中的 SlotManager 就会记录下相应的 Slot 信息。当 JobManager 为某个 Task 来申请资源时, SlotManager 就会从当前空闲的 Slot 中按一定规则选择一个空闲的 Slot 进行分配。当分配完成后,RM 会首先向 TaskManager 发送 RPC 要求将选定的 Slot 分配给特定的 JobManager。TaskManager 如果还没有执行过该 JobManager 的 Task 的话,它需要首先向相应的 JobManager 建立连接,然后发送提供 Slot 的 RPC 请求。在 JobManager 中,所有 Task 的请求会缓存到 SlotPool 中。当有 Slot 被提供之后,SlotPool 会从缓存的请求中选择相应的请求并结束相应的请求过程。

当 Task 结束之后,无论是正常结束还是异常结束,都会通知 JobManager 相应的结束状态,然后在 TaskManager 端将 Slot 标记为已占用但未执行任务的状态。JobManager 会首先将相应的 Slot 缓存到 SlotPool 中,但不会立即释放。这种方式避免了如果将 Slot 直接还给 ResourceManager,在任务异常结束之后需要重启时,需要立刻重新申请 Slot 的问题。通过延时释放,Failover 的 Task 可以尽快调度回原来的 TaskManager,从而加快 Failover 的速度。当 SlotPool 中缓存的 Slot 超过指定的时间仍未使用时,SlotPool 就会发起释放该 Slot 的过程。与申请 Slot 的过程对应,SlotPool 会首先通知 TaskManager 来释放该 Slot,然后 TaskExecutor 通知 ResourceManager 该 Slot 已经被释放,从而最终完成释放的逻辑。

除了正常的通信逻辑外,在 ResourceManager 和 TaskExecutor 之间还存在定时的心跳消息来同步 Slot 的状态。在分布式系统中,消息的丢失、错乱不可避免,这些问题会在分布式系统的组件中引入不一致状态,如果没有定时消息,那么组件无法从这些不一致状态中恢复。此外,当组件之间长时间未收到对方的心跳时,就会认为对应的组件已经失效,并进入到容错的流程。

在 Slot 管理基础上,Flink 可以将 Task 调度到相应的 Slot 当中。如上文所述,Flink 尚未完全引入细粒度的资源匹配,默认情况下,每个 Slot 可以分配给一个 Task。但是,这种方式在某些情况下会导致资源利用率不高。如下图所示,假如 A、B、C 依次执行计算逻辑,那么给 A、B、C 分配单独的 Slot 就会导致资源利用率不高。为了解决这一问题,Flink 提供了 Share Slot 的机制。如图中所示,基于 Share Slot,每个 Slot 中可以部署来自不同 JobVertex(作业向量)的多个任务,但是不能部署来自同一个 JobVertex 的 Task。如图中所示,每个 Slot 中最多可以部署同一个 A、B 或 C 的 Task,但是可以同时部署 A、B 和 C 的各一个 Task。当单个 Task 占用资源较少时,Share Slot 可以提高资源利用率。 此外,Share Slot 也提供了一种简单的保持负载均衡的方式。

图 共享Slot

基于上述 Slot 管理和分配的逻辑,JobManager 负责维护作业中 Task执行的状态。如上文所述,客户端会向 JobManager 提交一个 JobGraph,它代表了作业的逻辑结构。JobManager 会根据 JobGraph 按并发展开,从而得到 JobManager 中关键的 ExecutionGraph。ExecutionGraph 的结构如下图所示,与 JobGraph 相比,ExecutionGraph 中对于每个 Task 与中间结果等均创建了对应的对象,从而可以维护这些实体的信息与状态。

图 ExecutionGraph 是 JobGraph 按并发展开所形成的,它是 JobMaster中的核心数据结构

在一个 Flink Job 中是包含多个 Task 的,因此另一个关键的问题是在 Flink 中按什么顺序来调度 Task。如下图所示,目前 Flink 提供了两种基本的调度逻辑,即延迟调度(Lazy From Source)和即时调度(Eager调度) 。即时调度会在作业启动时申请资源将所有的Task 调度起来。这种调度算法主要用来调度可能没有终止的流作业。与之对应,延迟调度则是从Source开始,按拓扑顺序来进行调度。简单来说,延迟调度会先调度没有上游任务的Source任务,当这些任务执行完成时,它会将输出数据缓存到内存或者写入到磁盘中。然后,对于后续的任务,当它的前驱任务全部执行完成后,Flink 就会将这些任务调度起来。这些任务会从读取上游缓存的输出数据进行自己的计算。这一过程继续进行直到所有的任务完成计算。

图 Flink中两种基本的调度策略

《PySpark原理深入与编程实战》