Flink低级操作API_ProcessFunction

从之前的内容我们知道,Flink的转换操作是无法访问事件的时间戳信息和水印信息的。例如我们常用的MapFunction转换操作就无法访问时间戳或者当前事件的事件时间。而在一些应用场景下,访问事件的时间戳信息和水印信息极为重要。因此,Flink DataStream API提供了一系列的低级(Low-Level)转换操作,可以访问时间戳、水印以及注册定时事件。还可以输出特定的一些事件,例如超时事件等。这一类的低级API,被称为ProcessFunction。

ProcessFunction将事件处理与计时器和状态相结合,使其成为流处理应用程序的强大组件。这是使用Flink创建事件驱动应用程序的基础。

ProcessFunction

ProcessFunction是一个低级的流处理操作,允许访问所有(非循环)流应用程序的基本构件:

  • events:数据流中的元素。
  • state:状态,用于容错和一致性,仅用于keyed stream。
  • timer:定时器,支持事件时间和处理时间,仅用于keyed stream。

ProcessFunction可以被认为是一个访问keyed state和定时器的FlatMapFunction。通过为输入流中接收的每个事件调用它来处理事件。

ProcessFunction用来构建事件驱动的应用以及实现自定义的业务逻辑(使用之前的窗口函数和转换算子无法实现)。例如,Flink SQL就是使用ProcessFunction实现的。

Flink提供了8个Process Function:

  • ProcessFunction:用于DataStream。
  • KeyedProcessFunction:用于KeyedStream,keyBy之后的流处理。
  • CoProcessFunction:用于connect连接的流。
  • ProcessJoinFunction:用于join流操作。
  • BroadcastProcessFunction:用于广播。
  • KeyedBroadcastProcessFunction:keyBy之后的广播。
  • ProcessWindowFunction:窗口增量聚合。
  • ProcessAllWindowFunction:全窗口聚合。

KeyedProcessFunction

KeyedProcessFunction是RichFunction的一种,是最常用的ProcessFunction之一。

作为一个RichFunction,它可以访问使用托管keyed state所需的open和getRuntimeContext方法。对于容错状态,KeyedProcessFunction可以通过 RuntimeContext 访问Flink的keyed state,这与其他有状态函数访问keyed state的方式类似。

作为ProcessFunction的扩展,KeyedProcessFunction在其onTimer(…)方法中通过OnTimerContext提供了对计时器key的访问。

【示例】维护数据流中每个key的计数,并在每过10秒钟(以事件时间)而未更新该key时,发出一个key/count对。

在下面的示例中,KeyedProcessFunction维护每个key的计数,并在每过10秒钟(以事件时间)而未更新该key时,发出一个key/count对:

  • 监听Socket数据源,获取输入字符串。
  • 把key(单词)、计数和最后修改时间戳存储在一个ValueState状态中, ValueState的作用域是通过key隐式确定的。
  • 对于每个记录,KeyedProcessFunction递增计数器并设置最后修改时间戳。
  • 该函数还安排了一个10秒后的回调(以事件时间)。
  • 在每次回调时,它根据存储的计数的最后修改时间检查回调的事件时间时间戳,并在它们匹配时发出key/count(即,在10秒钟内这个单词没有再次出现,就把这个单词和它出现的总次数发送到下游算子)。

Scala代码:

import java.text.SimpleDateFormat
import java.time.Duration
import java.util.Date
import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector

/**
 * 维护每个key的计数,并在每过10秒钟(以事件时间)而未更新该key时,发出一个key/count对:
 */
object KeyedProcessFunDemo {

  // 存储在状态中的数据类型
  case class CountWithTimestamp(key: String, count: Long, lastModified: Long)

  def main(args: Array[String]) {
    // 设置流执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    // 并行度1
    env.setParallelism(1)

    // 源数据流
    val stream = env
      .socketTextStream("localhost", 9999)
      .flatMap(_.split("\\W+"))
      .map((_,1))

    // 为事件分配时间戳和水印
    stream
      .assignTimestampsAndWatermarks(WatermarkStrategy
        .forMonotonousTimestamps[(String, Int)]()
        .withTimestampAssigner(new SerializableTimestampAssigner[(String, Int)] {
          override def extractTimestamp(t: (String, Int), ts: Long): Long = System.currentTimeMillis()
        })
        .withIdleness(Duration.ofSeconds(5))
      )
      .keyBy(_._1)
      .process(new CountWithTimeoutFunction())
      .print()

    // 执行流程序
    env.execute("Process Function")
  }

  /**
    * KeyedProcessFunction的子类,维护计数和超时。
    * 它的作用是将每个单词最新出现时间记录到backend,并创建定时器,
    * 定时器触发的时候,检查这个单词距离上次出现是否已经达到10秒,如果是,就发射给下游算子
    */
  class CountWithTimeoutFunction extends KeyedProcessFunction[String, (String, Int), (String, Long)] {

    /**
      * 首先获得由这个处理函数(process function)维护的状态
      * 通过 RuntimeContext 访问Flink的keyed state
      */
    lazy val state: ValueState[CountWithTimestamp] = getRuntimeContext.getState(
      new ValueStateDescriptor[CountWithTimestamp]("myState", classOf[CountWithTimestamp])
    )

    /**
      * 对于在输入流中接收到的每一个事件,此函数就会被调用以处理该事件
      * @param value		输入元素
      * @param ctx			上下文件环境
      * @param out
      * @throws Exception
      */
    override def processElement(value: (String, Int),
                                ctx: KeyedProcessFunction[String, (String, Int), (String, Long)]#Context,
                                out: Collector[(String, Long)]): Unit = {

      // 初始化或检索/更新状态
      val current = state.value match {
        case null =>
          CountWithTimestamp(value._1, 1, ctx.timestamp)        // 如果是第一个事件,设初始计数为1
        case CountWithTimestamp(key, count, lastModified) =>
          // 先撤销之前的定时器
          // ctx.timerService().deleteEventTimeTimer(lastModified + 10000);
          ctx.timerService.deleteProcessingTimeTimer(lastModified + 10000)
          CountWithTimestamp(key, count + 1, ctx.timestamp)     // 如果不是第一个事件,则累加
      }

      // 将修改过后的状态写回
      state.update(current)

      // 从当前事件时间开始安排下一个定时器10秒
      // 为当前单词创建定时器,十秒后后触发
      val timer = current.lastModified + 10000
//      ctx.timerService.registerEventTimeTimer(timer)
      ctx.timerService.registerProcessingTimeTimer(timer)

      // 打印所有信息,用于核对数据正确性
      println(s"process, ${current.key}, ${current.count}, " +
        s"lastModified : ${current.lastModified}, (${time(current.lastModified)}), " +
        s"timer : ${timer} (${time(timer)})\n")
    }

    /** 定时器触发后执行的方法
      * 如果一分钟内没有新来的相同的单词,则发出 key/count对
      *
      * @param timestamp 这个时间戳代表的是该定时器的触发时间
      * @param ctx
      * @param out
      * @throws Exception
      */
    override def onTimer(timestamp: Long,
                         ctx: KeyedProcessFunction[String, (String, Int), (String, Long)]#OnTimerContext,
                         out: Collector[(String, Long)]): Unit = {
      // 取得当前单词
      val currentKey = ctx.getCurrentKey

      // 获取调度此计时器的key的状态(即此单词的状态)
      val result = state.value

      // 当前元素是否已经连续10秒未出现的标志
      var isTimeout = false

      // timestamp是定时器触发时间
      // 如果等于最后一次更新时间+10秒,就表示这10秒内已经收到过该单词了,
      // 这种连续10秒没有出现的元素,被发送到下游算子
      result match {
        case CountWithTimestamp(key, count, lastModified) if timestamp >= lastModified + 10000 =>
          out.collect((key, count))   // 发出key/count对
          isTimeout = true
        case _ =>
      }

      // 打印数据,用于核对是否符合预期
      println(s"ontimer, ${currentKey}, ${result.count}, " +
        s"lastModified : ${result.lastModified}, (${time(result.lastModified)}), " +
        s"stamp : ${timestamp} (${time(timestamp)}),isTimeout: ${isTimeout}\n")
    }
  }

  private def time(timeStamp: Long) = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date(timeStamp))
}

Java代码:

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.StringUtils;

import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.Date;

/**
 * 维护每个key的计数,并在每过10秒钟(以事件时间)而未更新该key时,发出一个key/count对:
 */
public class KeyedProcessFunDemo {

	// 存储在状态中的数据类型,POJO类
	public static class CountWithTimestamp {
		public String key;
		public long count;
		public long lastModified;
	}

	public static void main(String[] args) throws Exception {
		// 设置流执行环境
		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

		// 并行度1
		env.setParallelism(1);

		// 源数据流
		DataStream<Tuple2<String, Integer>> stream = env
				.socketTextStream("localhost",9999)
				.flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>() {
					@Override
					public void flatMap(String s, Collector<Tuple2<String, Integer>> collector)
							throws Exception {
						if(StringUtils.isNullOrWhitespaceOnly(s)) {
							System.out.println("invalid line");
							return;
						}

						for(String word : s.split("\\W+")){
							collector.collect(new Tuple2<>(word,1));
						}
					}
				});

		stream
				.assignTimestampsAndWatermarks(WatermarkStrategy
						.<Tuple2<String, Integer>>forMonotonousTimestamps()
						.withTimestampAssigner((event, ts) -> System.currentTimeMillis())
						.withIdleness(Duration.ofSeconds(5))
				)
				.keyBy(t -> t.f0)
				.process(new CountWithTimeoutFunction())
				.print();

		// 执行
		env.execute("flink streaming job");
	}

	/**
	 * KeyedProcessFunction的子类,维护计数和超时。
	 * 它的作用是将每个单词最新出现时间记录到backend,并创建定时器,
	 * 定时器触发的时候,检查这个单词距离上次出现是否已经达到10秒,如果是,就发射给下游算子
	 */
	public static class CountWithTimeoutFunction extends KeyedProcessFunction<
			String, 							// key
            	Tuple2<String, Integer>, 				// input
            	Tuple2<String, Long>> {				// output

		// 由此函数所维护的存储状态
		private ValueState<CountWithTimestamp> state;

		// 首先获得由这个处理函数维护的状态
		// 通过 RuntimeContext 访问Flink的keyed state
		@Override
		public void open(Configuration parameters) throws Exception {
			// 初始化状态,状态名称是myState
			state = getRuntimeContext().getState(new ValueStateDescriptor<>("myState", CountWithTimestamp.class));
		}

		/**
		 * 对于在输入流中接收到的每一个事件,此函数就会被调用以处理该事件
		 * @param value				输入元素
		 * @param context			上下文件环境
		 * @param out
		 * @throws Exception
		 */
		@Override
		public void processElement(
				Tuple2<String, Integer> value,
				Context context,
				Collector<Tuple2<String, Long>> out) throws Exception {

			// 取得当前是哪个单词
			// String currentKey = context.getCurrentKey();

			// 从backend取得当前单词的myState状态
			CountWithTimestamp current = state.value();

			// 如果myState还从未没有赋值过,就在此初始化
			if (current == null) {
				current = new CountWithTimestamp();
				current.key = value.f0;
			}

			// 先撤销之前的定时器
			// context.timerService().deleteEventTimeTimer(current.lastModified + 10000);
			context.timerService().deleteProcessingTimeTimer(current.lastModified + 10000);

			// 更新状态计数值(单词数量加1)
			current.count++;

			// 将状态的时间戳设置为记录分配的事件时间戳
			// 取当前元素的时间戳,作为该单词最后一次出现的时间
			current.lastModified = context.timestamp();

			// 重新保存到backend,包括该单词出现的次数,以及最后一次出现的时间
			state.update(current);

			// 为当前单词创建定时器,10秒后后触发
			long timer = current.lastModified + 10000;
			context.timerService().registerEventTimeTimer(timer);
			// context.timerService().registerProcessingTimeTimer(timer);

			// 打印所有信息,用于核对数据正确性
			System.out.println(String.format("process, %s, %d, lastModified : %d (%s), timer : %d (%s)\n",
					current.key,
					current.count,
					current.lastModified,
					time(current.lastModified),
					timer,
					time(timer)));
		}

		/** 定时器触发后执行的方法
		 * 如果一分钟内没有新来的相同的单词,则发出 key/count对
		 *
		 * @param timestamp   	这个时间戳代表的是该定时器的触发时间
		 * @param ctx
		 * @param out
		 * @throws Exception
		 */
		@Override
		public void onTimer(
				long timestamp,
				OnTimerContext ctx,
				Collector<Tuple2<String, Long>> out) throws Exception {

			// 取得当前单词
			String currentKey = ctx.getCurrentKey();

			// 获取调度此计时器的key的状态(即此单词的状态)
			CountWithTimestamp result = state.value();

			// 当前元素是否已经连续10秒未出现的标志
			boolean isTimeout = false;

			// timestamp是定时器触发时间
			// 超过10秒钟该单词(key)没有再出现,就发送给下游
			if (timestamp >= result.lastModified + 10000) {				
				out.collect(new Tuple2<>(result.key, result.count));
				isTimeout = true;
			}

			// 打印数据,用于核对是否符合预期
			System.out.println(String.format("ontimer, %s, %d, lastModified : %d (%s), stamp : %d (%s), isTimeout : %s\n\n",
					currentKey,
					result.count,
					result.lastModified,
					time(result.lastModified),
					timestamp,
					time(timestamp),
					String.valueOf(isTimeout)));
		}
	}

	private static String time(long timeStamp) {
		return new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date(timeStamp));
	}
}

在processElement方法中,state.value()可以取得当前单词的状态,state.update(current)可以设置当前单词的状态执行以上代码。

请按以下步骤执行上面的程序:

1)在控制台执行命令nc -lk 9999,这样就可以从控制台向9999端口发送字符串了;

2)在IDEA上直接执行上同的程序代码,程序运行就开始监听本机的9999端口了;

3)在netcat的控制台输入aaa再回车,连续两次,中间间隔不要超过10秒;然后再输入ttt回车。如下图所示:

4)观察IDEA控制台输出结果,如下图所示:

《Spark原理深入与编程实战》