Flink窗口操作

在进行流处理时,很自然的,我们想要计算流的有界子集的聚合分析。例如回答以下问题:

  • 每分钟的页面浏览(pv)次数
  • 每用户每周的会话次数
  • 每分钟每传感器的最高温度
  • 当电商中发布一个秒杀活动时,想要每隔10分钟了解流量数据
  • ......

显然,要回答这些问题,我们需要处理元素组,而不是单个的元素。因此,通常使用窗口来限定在数据流上的聚合(count、sum等)的范围,例如“过去5分钟内的计数”或“最后100个元素的总和”。因此,在处理流时,通常更有意义的是考虑有限窗口上的聚合,而不是整个流。Flink具有非常富有表现力的窗口语义。

理解Flink窗口概念

窗口(window)是处理无限流的核心,使用窗口计算无界流上的聚合。窗口将流分割为有限大小的组,我们可以在这样的组上面应用计算。窗口可以是时间驱动的(例如,每30秒),也可以是数据驱动的(例如,每100个元素)。

简而言之,流窗口允许我们对流中的元素进行分组,并在每个组上执行用户定义的函数。这个用户定义的函数可以返回0、1或多个元素,通过这种方式,它创建了一个新的流,我们可以在单独的系统中处理或存储这个流。

如何对流中的元素进行分组?Flink支持不同类型的窗口。

  • 滚动窗口:tumbling window,是在流中创建不重叠的相邻窗口。它们是固定长度的窗口,没有重叠。我们可以根据时间对元素进行分组(比如,从10:00到10:05的所有元素进入一个组),或者根据计数(前50个元素进入一个单独的组)对元素进行分组。例如,我们可以用它来回答这样的问题:“在不重叠的五分钟间隔内计算流中元素的数量”。
  • 滑动窗口:sliding window,类似于滚动窗口,但是窗口可以重叠。滑动窗口是固定长度的窗口,通过用户给定的窗口滑动参数与前面的窗口重叠。例如,如果需要计算最后5分钟的指标,但我们希望每分钟显示一个输出时。
  • 会话窗口:session window,当对发生的事件进行分组,时间接近的分到一组(即一个窗口中)。还可以提供会话间隔的配置参数,该参数指示在关闭会话之前需要等待多长时间。
  • 全局窗口:global window,Flink将所有元素放到一个窗口中。通常在这种情况下,每个元素都被分配给一个单一的per-key全局窗口(per-key global Window)。如果我们不指定任何触发器,就不会触发任何计算。这只有在定义自定义触发器时才有用,该触发器定义了窗口何时结束。

下图形象地展示了这几种不同的窗口语义:

窗口支持的两种流类型

除了选择如何将元素分配给不同的窗口之外,我们还需要选择一个流类型。Flink的窗口支持两种流类型:

  • Non-keyed stream:在这种情况下,流中的所有元素将一起被处理,我们的用户定义函数将访问流中的所有元素。这种流类型的缺点是它不提供并行性,集群中只有一台机器能够执行我们的代码。
  • Keyed stream:使用这种流类型,Flink将通过一个key(例如,设备的ID)将单个流划分为多个独立的流。当处理keyed stream中的窗口时,定义的函数只能访问具有相同key的项。但是使用多个独立的流可以让Flink并行化工作。

下图是keyed stream分区示意图。

必须在定义窗口之前指定要处理的流是否应该被赋予key。使用keyBy(…)将把无限流分割成逻辑keyed流。如果没有调用keyBy(…),则说明流是non-keyed流。

对于keyed stream,传入事件的任何属性都可以用作key。这时窗口计算可以由多个任务并行执行,因为每个逻辑keyed流都可以独立于其他流进行处理。所有引用相同key的元素将被发送到相同的并行任务进行处理。

在non-keyed stream的情况下,原始流将不会被分割成多个逻辑流,所有的窗口逻辑将由一个任务执行,即并行度为1。

窗口分配器

在指定流是否为keyed流之后,下一步是定义一个窗口分配程序(窗口分配器)。窗口分配器定义如何将元素分配给窗口。这是通过在调用window(…)(针对keyed stream)或windowAll()(针对non-keyed stream)时指定所选择的WindowAssigner来实现的。

WindowAssigner负责将每个传入元素分配给一个或多个窗口。下面的图是一个乱序的基于事件时间的数据流窗口分配示例。

Flink有几种内置的窗口分配程序类型,如下图所示:

Flink为最常见的场景(即滚动时间窗口、滑动时间窗口、全局窗口和会话窗口)提供了预定义的窗口分配器。

  • 滚动时间窗口:例如,每分钟PV数据(浏览量)
  • TumblingEventTimeWindows.of(Time.minutes(1))
    
  • 滑动时间窗口:例如,每10秒计算一次每分钟的页面浏览量
  • SlidingEventTimeWindows.of(Time.minutes(1), Time.seconds(10))
    
  • 会话窗口:例如,每个会话的PV数据,其中会话定义为会话之间至少30分钟的间隔
  • EventTimeSessionWindows.withGap(Time.minutes(30))
    

可以使用Time.milliseconds(n)、Time.seconds(n)、Time.minutes(n)、Time.hours(n)和Time.days(n)中的一个来指定持续时间。

Flink窗口程序结构

一个Flink窗口程序的总体结构如下所示。

其中第一个片段代表keyed stream,第二个片段代表non-keyed stream。可以看到,惟一的区别是为keyed stream调用keyBy(…)函数,而为non-keyed stream调用windowAll(…)函数。

【示例1】计算每5秒内来自web套接字的字数。

下面我们来看一个滚动窗口示例。请按以下步骤操作。

1、在IntelliJ IDEA中创建一个Flink项目,使用flink-quickstart-scala/flink-quickstart-java项目模板。

2、打开项目中的StreamingJob对象文件,编辑流处理代码如下:

Scala实现:

import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time

/**
  * 窗口操作:窗口单词计数
  * Socket数据源: 流窗口单词计数应用程序示例,它在5秒内计算来自web套接字的单词。
  */
object WindowOperatorDemo1 {
  def main(args: Array[String]): Unit = {
    val host = "localhost"
    val port = 9999

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val text = env.socketTextStream(host, port)

    val counts = text.flatMap { _.toLowerCase.split("\\W+").filter{ _.nonEmpty } }
      .map { (_, 1) }
      .keyBy(_._1)          // keyed stream
      .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))  // 5秒滚动窗口
      .sum(1)               // 对元组第二个字段求和

    counts.print()

    env.execute("Window Stream WordCount")
  }
}

Java实现:

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

/**
 * 窗口操作:窗口单词计数
 * Socket数据源: 流窗口单词计数应用程序示例,它在5秒内计算来自web套接字的单词。
 */
public class WindowOperatorDemo1 {
    public static void main(String[] args) throws Exception {
        // 设置流执行环境
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 加载数据-转换-执行窗口操作
        // 读取socket数据源
        String host = "localhost";
        int port = 9999;

        DataStream<Tuple2<String, Integer>> dataStream = env
                .socketTextStream(host, port)
                .map(String::toLowerCase)
                .flatMap(new Splitter())
                .keyBy(t -> t.f0)               	// keyed stream
                .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))  // 5秒滚动窗口
                .sum(1);                        // 对元组第二个字段求和

        dataStream.print();

        env.execute("Window Stream WordCount");
    }

    // 自定义flatMap函数
    public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
        @Override
        public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
            for (String word: sentence.split("\\W+")) {
                out.collect(new Tuple2<>(word, 1));
            }
        }
    }
}

3、另起一个终端窗口,执行如下命令,起动一个Web Socket服务器:

$ nc -l 9999

4、回到Flink代码文件,并运行程序。

5、在Socket运行窗口,随便输入一些内容,以空格分隔。比如:

good good study
day day up

6、在IntelliJ IDEA的运行窗口,可以观察到输出如下的统计结果:

 (good,2)
 (study,1)
 (up,1)
 (day,2)

【示例】使用滑动窗口,每5秒计算一次前10秒内来自web套接字的单词计数。

下面我们来看一个滑动窗口示例。

Scala代码:

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time

/**
  * 窗口操作:滑动窗口
  * 每5秒钟计算一次过去10秒钟内来自web套接字的单词计数。
  */
object WindowOperatorDemo2 {
  def main(args: Array[String]): Unit = {
    val host = "localhost"
    val port = 9999

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val text = env.socketTextStream(host, port)

    val counts = text.flatMap { _.toLowerCase.split("\\W+").filter{ _.nonEmpty } }
      .map { (_, 1) }
      .keyBy(_._1)                        	// keyed stream
      .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5))) // 滑动窗口
      .sum(1) 							// 对元组第二个字段求和

    counts.print()

    env.execute("Window Stream WordCount")
  }
}

Java代码:

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

/**
 * 使用滑动窗口,每5秒计算一次前10秒内来自web套接字的单词计数。
 */
public class WindowOperatorDemo2 {
    
    public static void main(String[] args) throws Exception {
        // 设置流执行环境
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 读取socket数据源
        String host = "localhost";
        int port = 9999;

        DataStream<Tuple2<String, Integer>> dataStream = env
                .socketTextStream(host, port)
                .map(String::toLowerCase)
                .flatMap(new Splitter())
                .keyBy(t -> t.f0)                       // keyed stream
                .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5))) // 滑动窗口
                .sum(1);                             // 对元组第二个字段求和

        dataStream.print();

        env.execute("Window Stream WordCount");
    }

    // 自定义FlatMapFunction
    public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
        @Override
        public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
            for (String word: sentence.split("\\W+")) {
                out.collect(new Tuple2<>(word, 1));
            }
        }
    }
}

《Flink原理深入与编程实战》