Flink窗口操作
在进行流处理时,很自然的,我们想要计算流的有界子集的聚合分析。例如回答以下问题:
- 每分钟的页面浏览(pv)次数
- 每用户每周的会话次数
- 每分钟每传感器的最高温度
- 当电商中发布一个秒杀活动时,想要每隔10分钟了解流量数据
- ......
显然,要回答这些问题,我们需要处理元素组,而不是单个的元素。因此,通常使用窗口来限定在数据流上的聚合(count、sum等)的范围,例如“过去5分钟内的计数”或“最后100个元素的总和”。因此,在处理流时,通常更有意义的是考虑有限窗口上的聚合,而不是整个流。Flink具有非常富有表现力的窗口语义。
理解Flink窗口概念
窗口(window)是处理无限流的核心,使用窗口计算无界流上的聚合。窗口将流分割为有限大小的组,我们可以在这样的组上面应用计算。窗口可以是时间驱动的(例如,每30秒),也可以是数据驱动的(例如,每100个元素)。
简而言之,流窗口允许我们对流中的元素进行分组,并在每个组上执行用户定义的函数。这个用户定义的函数可以返回0、1或多个元素,通过这种方式,它创建了一个新的流,我们可以在单独的系统中处理或存储这个流。
如何对流中的元素进行分组?Flink支持不同类型的窗口。
- 滚动窗口:tumbling window,是在流中创建不重叠的相邻窗口。它们是固定长度的窗口,没有重叠。我们可以根据时间对元素进行分组(比如,从10:00到10:05的所有元素进入一个组),或者根据计数(前50个元素进入一个单独的组)对元素进行分组。例如,我们可以用它来回答这样的问题:“在不重叠的五分钟间隔内计算流中元素的数量”。
- 滑动窗口:sliding window,类似于滚动窗口,但是窗口可以重叠。滑动窗口是固定长度的窗口,通过用户给定的窗口滑动参数与前面的窗口重叠。例如,如果需要计算最后5分钟的指标,但我们希望每分钟显示一个输出时。
- 会话窗口:session window,当对发生的事件进行分组,时间接近的分到一组(即一个窗口中)。还可以提供会话间隔的配置参数,该参数指示在关闭会话之前需要等待多长时间。
- 全局窗口:global window,Flink将所有元素放到一个窗口中。通常在这种情况下,每个元素都被分配给一个单一的per-key全局窗口(per-key global Window)。如果我们不指定任何触发器,就不会触发任何计算。这只有在定义自定义触发器时才有用,该触发器定义了窗口何时结束。
下图形象地展示了这几种不同的窗口语义:
窗口支持的两种流类型
除了选择如何将元素分配给不同的窗口之外,我们还需要选择一个流类型。Flink的窗口支持两种流类型:
- Non-keyed stream:在这种情况下,流中的所有元素将一起被处理,我们的用户定义函数将访问流中的所有元素。这种流类型的缺点是它不提供并行性,集群中只有一台机器能够执行我们的代码。
- Keyed stream:使用这种流类型,Flink将通过一个key(例如,设备的ID)将单个流划分为多个独立的流。当处理keyed stream中的窗口时,定义的函数只能访问具有相同key的项。但是使用多个独立的流可以让Flink并行化工作。
下图是keyed stream分区示意图。
必须在定义窗口之前指定要处理的流是否应该被赋予key。使用keyBy(…)将把无限流分割成逻辑keyed流。如果没有调用keyBy(…),则说明流是non-keyed流。
对于keyed stream,传入事件的任何属性都可以用作key。这时窗口计算可以由多个任务并行执行,因为每个逻辑keyed流都可以独立于其他流进行处理。所有引用相同key的元素将被发送到相同的并行任务进行处理。
在non-keyed stream的情况下,原始流将不会被分割成多个逻辑流,所有的窗口逻辑将由一个任务执行,即并行度为1。
窗口分配器
在指定流是否为keyed流之后,下一步是定义一个窗口分配程序(窗口分配器)。窗口分配器定义如何将元素分配给窗口。这是通过在调用window(…)(针对keyed stream)或windowAll()(针对non-keyed stream)时指定所选择的WindowAssigner来实现的。
WindowAssigner负责将每个传入元素分配给一个或多个窗口。下面的图是一个乱序的基于事件时间的数据流窗口分配示例。
Flink有几种内置的窗口分配程序类型,如下图所示:
Flink为最常见的场景(即滚动时间窗口、滑动时间窗口、全局窗口和会话窗口)提供了预定义的窗口分配器。
- 滚动时间窗口:例如,每分钟PV数据(浏览量)
TumblingEventTimeWindows.of(Time.minutes(1))
SlidingEventTimeWindows.of(Time.minutes(1), Time.seconds(10))
EventTimeSessionWindows.withGap(Time.minutes(30))
可以使用Time.milliseconds(n)、Time.seconds(n)、Time.minutes(n)、Time.hours(n)和Time.days(n)中的一个来指定持续时间。
Flink窗口程序结构
一个Flink窗口程序的总体结构如下所示。
其中第一个片段代表keyed stream,第二个片段代表non-keyed stream。可以看到,惟一的区别是为keyed stream调用keyBy(…)函数,而为non-keyed stream调用windowAll(…)函数。
【示例1】计算每5秒内来自web套接字的字数。
下面我们来看一个滚动窗口示例。请按以下步骤操作。
1、在IntelliJ IDEA中创建一个Flink项目,使用flink-quickstart-scala/flink-quickstart-java项目模板。
2、打开项目中的StreamingJob对象文件,编辑流处理代码如下:
Scala实现:
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _} import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows import org.apache.flink.streaming.api.windowing.time.Time /** * 窗口操作:窗口单词计数 * Socket数据源: 流窗口单词计数应用程序示例,它在5秒内计算来自web套接字的单词。 */ object WindowOperatorDemo1 { def main(args: Array[String]): Unit = { val host = "localhost" val port = 9999 val env = StreamExecutionEnvironment.getExecutionEnvironment val text = env.socketTextStream(host, port) val counts = text.flatMap { _.toLowerCase.split("\\W+").filter{ _.nonEmpty } } .map { (_, 1) } .keyBy(_._1) // keyed stream .window(TumblingProcessingTimeWindows.of(Time.seconds(5))) // 5秒滚动窗口 .sum(1) // 对元组第二个字段求和 counts.print() env.execute("Window Stream WordCount") } }
Java实现:
import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.util.Collector; /** * 窗口操作:窗口单词计数 * Socket数据源: 流窗口单词计数应用程序示例,它在5秒内计算来自web套接字的单词。 */ public class WindowOperatorDemo1 { public static void main(String[] args) throws Exception { // 设置流执行环境 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 加载数据-转换-执行窗口操作 // 读取socket数据源 String host = "localhost"; int port = 9999; DataStream<Tuple2<String, Integer>> dataStream = env .socketTextStream(host, port) .map(String::toLowerCase) .flatMap(new Splitter()) .keyBy(t -> t.f0) // keyed stream .window(TumblingProcessingTimeWindows.of(Time.seconds(5))) // 5秒滚动窗口 .sum(1); // 对元组第二个字段求和 dataStream.print(); env.execute("Window Stream WordCount"); } // 自定义flatMap函数 public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> { @Override public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception { for (String word: sentence.split("\\W+")) { out.collect(new Tuple2<>(word, 1)); } } } }
3、另起一个终端窗口,执行如下命令,起动一个Web Socket服务器:
$ nc -l 9999
4、回到Flink代码文件,并运行程序。
5、在Socket运行窗口,随便输入一些内容,以空格分隔。比如:
good good study day day up
6、在IntelliJ IDEA的运行窗口,可以观察到输出如下的统计结果:
(good,2) (study,1) (up,1) (day,2)
【示例】使用滑动窗口,每5秒计算一次前10秒内来自web套接字的单词计数。
下面我们来看一个滑动窗口示例。
Scala代码:
import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows import org.apache.flink.streaming.api.windowing.time.Time /** * 窗口操作:滑动窗口 * 每5秒钟计算一次过去10秒钟内来自web套接字的单词计数。 */ object WindowOperatorDemo2 { def main(args: Array[String]): Unit = { val host = "localhost" val port = 9999 val env = StreamExecutionEnvironment.getExecutionEnvironment val text = env.socketTextStream(host, port) val counts = text.flatMap { _.toLowerCase.split("\\W+").filter{ _.nonEmpty } } .map { (_, 1) } .keyBy(_._1) // keyed stream .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5))) // 滑动窗口 .sum(1) // 对元组第二个字段求和 counts.print() env.execute("Window Stream WordCount") } }
Java代码:
import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.util.Collector; /** * 使用滑动窗口,每5秒计算一次前10秒内来自web套接字的单词计数。 */ public class WindowOperatorDemo2 { public static void main(String[] args) throws Exception { // 设置流执行环境 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 读取socket数据源 String host = "localhost"; int port = 9999; DataStream<Tuple2<String, Integer>> dataStream = env .socketTextStream(host, port) .map(String::toLowerCase) .flatMap(new Splitter()) .keyBy(t -> t.f0) // keyed stream .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5))) // 滑动窗口 .sum(1); // 对元组第二个字段求和 dataStream.print(); env.execute("Window Stream WordCount"); } // 自定义FlatMapFunction public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> { @Override public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception { for (String word: sentence.split("\\W+")) { out.collect(new Tuple2<>(word, 1)); } } } }