水印策略

Flink提供了用于处理事件时间、时间戳和水印的API。

什么是水印策略?

为了处理事件时间,Flink流程序需要知道事件的时间戳,这意味着流中的每个元素都需要分配其事件时间戳。这通常是通过使用TimestampAssigner从元素中的某个字段访问/提取时间戳来实现的。

时间戳分配与生成水印密切相关,生成水印是告诉系统事件时间进度的一种方式。可以通过指定WatermarkGenerator来配置水印。 TimestampAssigner和WatermarkGenerator形成了一个水印策略,它定义了如何在流源中生成水印。Flink API需要一个同时包含TimestampAssigner和WatermarkGenerator的WatermarkStrategy。

使用水印策略

在Flink应用程序中有两个地方可以使用WatermarkStrategy:

  • 1)直接在源上使用;
  • 2)在非源操作之后使用。

第一种选择更可取,因为它允许源利用关于水印逻辑中的分片/分区/分割的知识。源通常可以在更细的水平上跟踪水印,源产生的整体水印将更准确。直接在源上指定WatermarkStrategy通常意味着必须使用一个源特定的接口。

第二个选项(在任意操作后设置WatermarkStrategy)应该只在不能直接在源上设置策略的情况下使用。

内置水印生成器

Flink提供了抽象,允许程序员分配他们自己的时间戳并发出他们自己的水印。更具体地说,可以通过实现WatermarkGenerator接口来实现。

不过,为了进一步简化此类任务的编程工作,Flink附带了一些预先实现的时间戳赋值器,包括:

  • 单调递增时间戳
  • 固定的迟到时间

分配时间戳和水印示例

下面通过一个示例来掌握如何为数据流中的事件分配时间戳和水印。

【示例】(Scala实现)为数据流中的事件分配时间戳和水印。

请按以下步骤执行。

1、在IntelliJ IDEA中创建一个Flink项目,使用flink-quickstart-scala项目模板。(Flink项目创建过程,请参见2.2节)

2、在pom.xml中添加依赖。

3、创建流应用程序类。代码如下:

import org.apache.flink.api.common.eventtime.WatermarkStrategy
import org.apache.flink.streaming.api.scala._

/**
 * 功能:为数据流中的事件分配时间戳和水印
 */
object AssignerDemo2 {

  // case类,表示流元素数据类型
  case class MessageInfo(hostname: String, status: String)

  // main方法
  def main(args: Array[String]) {
    // 设置流执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    // 数据流源
    val stream = env.fromElements(
          MessageInfo("host1", "1234"),
          MessageInfo("host2", "2234"),
          MessageInfo("host3", "1234")
    )

    // 因为模拟数据没有时间戳,所以用此方法添加递增时间戳和水印
    // 为数据流中的元素分配时间戳,并生成标记以指示事件时间进展
    val withTimestampsAndWatermarks = stream.assignTimestampsAndWatermarks(
      WatermarkStrategy.forMonotonousTimestamps[MessageInfo]()
                       .withTimestampAssigner(new SerializableTimestampAssigner[MessageInfo]() {
          // 取当前时间戳为事件时间戳
          override def extractTimestamp(t: MessageInfo, ts: Long): Long = {
            System.currentTimeMillis()
          }
        })
    )

    // 将结果输出到控制台
    withTimestampsAndWatermarks.print

    // 执行
    env.execute("Flink Watermark Strategy ")
  }
}

【示例】(Java实现)为数据流中的事件分配时间戳和水印。

请按以下步骤执行。

1、在IntelliJ IDEA中创建一个Flink项目,使用flink-quickstart-java项目模板。(Flink项目创建过程,请参见2.2节)

2、在pom.xml中添加依赖。

3、创建事件数据结构。代码如下:

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * 功能:为数据流中的事件分配时间戳和水印
 */
public class AssignerDemo {

	public static class MessageInfo {
		public String hostname;
		public String status;

		public MessageInfo() {
		}

		public MessageInfo(String hostname, String status){
			this.hostname = hostname;
			this.status = status;
		}

		@Override
		public String toString() {
			return "MessageInfo{" +
					"hostname='" + hostname + '\'' +
					", status='" + status + '\'' +
					'}';
		}
	}

	public static void main(String[] args) throws Exception {
		// 设置流执行环境
		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

		// 触发流程序执行
		env.execute("Flink Watermark Strategy");
	}
}

4、创建流应用程序类。代码如下:

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * 功能:为数据流中的事件分配时间戳和水印
 */
public class AssignerDemo {

	public static class MessageInfo {
		public String hostname;
		public String status;

		public MessageInfo() {
		}

		public MessageInfo(String hostname, String status){
			this.hostname = hostname;
			this.status = status;
		}

		@Override
		public String toString() {
			return "MessageInfo{" +
					"hostname='" + hostname + '\'' +
					", status='" + status + '\'' +
					'}';
		}
	}

	public static void main(String[] args) throws Exception {
		// 设置流执行环境
		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

		// 源数据流
		DataStream<MessageInfo> stream = env.fromElements(
				new MessageInfo("host1","1234"),
				new MessageInfo("host2","2234"),
				new MessageInfo("host3","1234")
		);

		// 因为模拟数据没有时间戳,所以用此方法添加单调增加时间戳和水印
		DataStream<MessageInfo> withTimestampsAndWatermarks = stream
			.assignTimestampsAndWatermarks(WatermarkStrategy
                              .<MessageInfo>forMonotonousTimestamps()
                              .withTimestampAssigner((MessageInfo, ts) -> System.currentTimeMillis()));

		// 将结果输出到控制台
		withTimestampsAndWatermarks.print();

		// 触发流程序执行
		env.execute("Flink Watermark Strategy");
	}
}

《Spark原理深入与编程实战》