水印策略
Flink提供了用于处理事件时间、时间戳和水印的API。
什么是水印策略?
为了处理事件时间,Flink流程序需要知道事件的时间戳,这意味着流中的每个元素都需要分配其事件时间戳。这通常是通过使用TimestampAssigner从元素中的某个字段访问/提取时间戳来实现的。
时间戳分配与生成水印密切相关,生成水印是告诉系统事件时间进度的一种方式。可以通过指定WatermarkGenerator来配置水印。 TimestampAssigner和WatermarkGenerator形成了一个水印策略,它定义了如何在流源中生成水印。Flink API需要一个同时包含TimestampAssigner和WatermarkGenerator的WatermarkStrategy。
使用水印策略
在Flink应用程序中有两个地方可以使用WatermarkStrategy:
- 1)直接在源上使用;
- 2)在非源操作之后使用。
第一种选择更可取,因为它允许源利用关于水印逻辑中的分片/分区/分割的知识。源通常可以在更细的水平上跟踪水印,源产生的整体水印将更准确。直接在源上指定WatermarkStrategy通常意味着必须使用一个源特定的接口。
第二个选项(在任意操作后设置WatermarkStrategy)应该只在不能直接在源上设置策略的情况下使用。
内置水印生成器
Flink提供了抽象,允许程序员分配他们自己的时间戳并发出他们自己的水印。更具体地说,可以通过实现WatermarkGenerator接口来实现。
不过,为了进一步简化此类任务的编程工作,Flink附带了一些预先实现的时间戳赋值器,包括:
- 单调递增时间戳
- 固定的迟到时间
分配时间戳和水印示例
下面通过一个示例来掌握如何为数据流中的事件分配时间戳和水印。
【示例】(Scala实现)为数据流中的事件分配时间戳和水印。
请按以下步骤执行。
1、在IntelliJ IDEA中创建一个Flink项目,使用flink-quickstart-scala项目模板。(Flink项目创建过程,请参见2.2节)
2、在pom.xml中添加依赖。
3、创建流应用程序类。代码如下:
import org.apache.flink.api.common.eventtime.WatermarkStrategy import org.apache.flink.streaming.api.scala._ /** * 功能:为数据流中的事件分配时间戳和水印 */ object AssignerDemo2 { // case类,表示流元素数据类型 case class MessageInfo(hostname: String, status: String) // main方法 def main(args: Array[String]) { // 设置流执行环境 val env = StreamExecutionEnvironment.getExecutionEnvironment // 数据流源 val stream = env.fromElements( MessageInfo("host1", "1234"), MessageInfo("host2", "2234"), MessageInfo("host3", "1234") ) // 因为模拟数据没有时间戳,所以用此方法添加递增时间戳和水印 // 为数据流中的元素分配时间戳,并生成标记以指示事件时间进展 val withTimestampsAndWatermarks = stream.assignTimestampsAndWatermarks( WatermarkStrategy.forMonotonousTimestamps[MessageInfo]() .withTimestampAssigner(new SerializableTimestampAssigner[MessageInfo]() { // 取当前时间戳为事件时间戳 override def extractTimestamp(t: MessageInfo, ts: Long): Long = { System.currentTimeMillis() } }) ) // 将结果输出到控制台 withTimestampsAndWatermarks.print // 执行 env.execute("Flink Watermark Strategy ") } }
【示例】(Java实现)为数据流中的事件分配时间戳和水印。
请按以下步骤执行。
1、在IntelliJ IDEA中创建一个Flink项目,使用flink-quickstart-java项目模板。(Flink项目创建过程,请参见2.2节)
2、在pom.xml中添加依赖。
3、创建事件数据结构。代码如下:
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; /** * 功能:为数据流中的事件分配时间戳和水印 */ public class AssignerDemo { public static class MessageInfo { public String hostname; public String status; public MessageInfo() { } public MessageInfo(String hostname, String status){ this.hostname = hostname; this.status = status; } @Override public String toString() { return "MessageInfo{" + "hostname='" + hostname + '\'' + ", status='" + status + '\'' + '}'; } } public static void main(String[] args) throws Exception { // 设置流执行环境 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 触发流程序执行 env.execute("Flink Watermark Strategy"); } }
4、创建流应用程序类。代码如下:
import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; /** * 功能:为数据流中的事件分配时间戳和水印 */ public class AssignerDemo { public static class MessageInfo { public String hostname; public String status; public MessageInfo() { } public MessageInfo(String hostname, String status){ this.hostname = hostname; this.status = status; } @Override public String toString() { return "MessageInfo{" + "hostname='" + hostname + '\'' + ", status='" + status + '\'' + '}'; } } public static void main(String[] args) throws Exception { // 设置流执行环境 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 源数据流 DataStream<MessageInfo> stream = env.fromElements( new MessageInfo("host1","1234"), new MessageInfo("host2","2234"), new MessageInfo("host3","1234") ); // 因为模拟数据没有时间戳,所以用此方法添加单调增加时间戳和水印 DataStream<MessageInfo> withTimestampsAndWatermarks = stream .assignTimestampsAndWatermarks(WatermarkStrategy .<MessageInfo>forMonotonousTimestamps() .withTimestampAssigner((MessageInfo, ts) -> System.currentTimeMillis())); // 将结果输出到控制台 withTimestampsAndWatermarks.print(); // 触发流程序执行 env.execute("Flink Watermark Strategy"); } }