Flink低级操作API_ProcessFunction
从之前的内容我们知道,Flink的转换操作是无法访问事件的时间戳信息和水印信息的。例如我们常用的MapFunction转换操作就无法访问时间戳或者当前事件的事件时间。而在一些应用场景下,访问事件的时间戳信息和水印信息极为重要。因此,Flink DataStream API提供了一系列的低级(Low-Level)转换操作,可以访问时间戳、水印以及注册定时事件。还可以输出特定的一些事件,例如超时事件等。这一类的低级API,被称为ProcessFunction。
ProcessFunction将事件处理与计时器和状态相结合,使其成为流处理应用程序的强大组件。这是使用Flink创建事件驱动应用程序的基础。
ProcessFunction
ProcessFunction是一个低级的流处理操作,允许访问所有(非循环)流应用程序的基本构件:
- events:数据流中的元素。
- state:状态,用于容错和一致性,仅用于keyed stream。
- timer:定时器,支持事件时间和处理时间,仅用于keyed stream。
ProcessFunction可以被认为是一个访问keyed state和定时器的FlatMapFunction。通过为输入流中接收的每个事件调用它来处理事件。
ProcessFunction用来构建事件驱动的应用以及实现自定义的业务逻辑(使用之前的窗口函数和转换算子无法实现)。例如,Flink SQL就是使用ProcessFunction实现的。
Flink提供了8个Process Function:
- ProcessFunction:用于DataStream。
- KeyedProcessFunction:用于KeyedStream,keyBy之后的流处理。
- CoProcessFunction:用于connect连接的流。
- ProcessJoinFunction:用于join流操作。
- BroadcastProcessFunction:用于广播。
- KeyedBroadcastProcessFunction:keyBy之后的广播。
- ProcessWindowFunction:窗口增量聚合。
- ProcessAllWindowFunction:全窗口聚合。
KeyedProcessFunction
KeyedProcessFunction是RichFunction的一种,是最常用的ProcessFunction之一。
作为一个RichFunction,它可以访问使用托管keyed state所需的open和getRuntimeContext方法。对于容错状态,KeyedProcessFunction可以通过 RuntimeContext 访问Flink的keyed state,这与其他有状态函数访问keyed state的方式类似。
作为ProcessFunction的扩展,KeyedProcessFunction在其onTimer(…)方法中通过OnTimerContext提供了对计时器key的访问。
【示例】维护数据流中每个key的计数,并在每过10秒钟(以事件时间)而未更新该key时,发出一个key/count对。
在下面的示例中,KeyedProcessFunction维护每个key的计数,并在每过10秒钟(以事件时间)而未更新该key时,发出一个key/count对:
- 监听Socket数据源,获取输入字符串。
- 把key(单词)、计数和最后修改时间戳存储在一个ValueState状态中, ValueState的作用域是通过key隐式确定的。
- 对于每个记录,KeyedProcessFunction递增计数器并设置最后修改时间戳。
- 该函数还安排了一个10秒后的回调(以事件时间)。
- 在每次回调时,它根据存储的计数的最后修改时间检查回调的事件时间时间戳,并在它们匹配时发出key/count(即,在10秒钟内这个单词没有再次出现,就把这个单词和它出现的总次数发送到下游算子)。
Scala代码:
import java.text.SimpleDateFormat import java.time.Duration import java.util.Date import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy} import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} import org.apache.flink.streaming.api.functions.KeyedProcessFunction import org.apache.flink.streaming.api.scala._ import org.apache.flink.util.Collector /** * 维护每个key的计数,并在每过10秒钟(以事件时间)而未更新该key时,发出一个key/count对: */ object KeyedProcessFunDemo { // 存储在状态中的数据类型 case class CountWithTimestamp(key: String, count: Long, lastModified: Long) def main(args: Array[String]) { // 设置流执行环境 val env = StreamExecutionEnvironment.getExecutionEnvironment // 并行度1 env.setParallelism(1) // 源数据流 val stream = env .socketTextStream("localhost", 9999) .flatMap(_.split("\\W+")) .map((_,1)) // 为事件分配时间戳和水印 stream .assignTimestampsAndWatermarks(WatermarkStrategy .forMonotonousTimestamps[(String, Int)]() .withTimestampAssigner(new SerializableTimestampAssigner[(String, Int)] { override def extractTimestamp(t: (String, Int), ts: Long): Long = System.currentTimeMillis() }) .withIdleness(Duration.ofSeconds(5)) ) .keyBy(_._1) .process(new CountWithTimeoutFunction()) .print() // 执行流程序 env.execute("Process Function") } /** * KeyedProcessFunction的子类,维护计数和超时。 * 它的作用是将每个单词最新出现时间记录到backend,并创建定时器, * 定时器触发的时候,检查这个单词距离上次出现是否已经达到10秒,如果是,就发射给下游算子 */ class CountWithTimeoutFunction extends KeyedProcessFunction[String, (String, Int), (String, Long)] { /** * 首先获得由这个处理函数(process function)维护的状态 * 通过 RuntimeContext 访问Flink的keyed state */ lazy val state: ValueState[CountWithTimestamp] = getRuntimeContext.getState( new ValueStateDescriptor[CountWithTimestamp]("myState", classOf[CountWithTimestamp]) ) /** * 对于在输入流中接收到的每一个事件,此函数就会被调用以处理该事件 * @param value 输入元素 * @param ctx 上下文件环境 * @param out * @throws Exception */ override def processElement(value: (String, Int), ctx: KeyedProcessFunction[String, (String, Int), (String, Long)]#Context, out: Collector[(String, Long)]): Unit = { // 初始化或检索/更新状态 val current = state.value match { case null => CountWithTimestamp(value._1, 1, ctx.timestamp) // 如果是第一个事件,设初始计数为1 case CountWithTimestamp(key, count, lastModified) => // 先撤销之前的定时器 // ctx.timerService().deleteEventTimeTimer(lastModified + 10000); ctx.timerService.deleteProcessingTimeTimer(lastModified + 10000) CountWithTimestamp(key, count + 1, ctx.timestamp) // 如果不是第一个事件,则累加 } // 将修改过后的状态写回 state.update(current) // 从当前事件时间开始安排下一个定时器10秒 // 为当前单词创建定时器,十秒后后触发 val timer = current.lastModified + 10000 // ctx.timerService.registerEventTimeTimer(timer) ctx.timerService.registerProcessingTimeTimer(timer) // 打印所有信息,用于核对数据正确性 println(s"process, ${current.key}, ${current.count}, " + s"lastModified : ${current.lastModified}, (${time(current.lastModified)}), " + s"timer : ${timer} (${time(timer)})\n") } /** 定时器触发后执行的方法 * 如果一分钟内没有新来的相同的单词,则发出 key/count对 * * @param timestamp 这个时间戳代表的是该定时器的触发时间 * @param ctx * @param out * @throws Exception */ override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[String, (String, Int), (String, Long)]#OnTimerContext, out: Collector[(String, Long)]): Unit = { // 取得当前单词 val currentKey = ctx.getCurrentKey // 获取调度此计时器的key的状态(即此单词的状态) val result = state.value // 当前元素是否已经连续10秒未出现的标志 var isTimeout = false // timestamp是定时器触发时间 // 如果等于最后一次更新时间+10秒,就表示这10秒内已经收到过该单词了, // 这种连续10秒没有出现的元素,被发送到下游算子 result match { case CountWithTimestamp(key, count, lastModified) if timestamp >= lastModified + 10000 => out.collect((key, count)) // 发出key/count对 isTimeout = true case _ => } // 打印数据,用于核对是否符合预期 println(s"ontimer, ${currentKey}, ${result.count}, " + s"lastModified : ${result.lastModified}, (${time(result.lastModified)}), " + s"stamp : ${timestamp} (${time(timestamp)}),isTimeout: ${isTimeout}\n") } } private def time(timeStamp: Long) = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date(timeStamp)) }
Java代码:
import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; import org.apache.flink.util.Collector; import org.apache.flink.util.StringUtils; import java.text.SimpleDateFormat; import java.time.Duration; import java.util.Date; /** * 维护每个key的计数,并在每过10秒钟(以事件时间)而未更新该key时,发出一个key/count对: */ public class KeyedProcessFunDemo { // 存储在状态中的数据类型,POJO类 public static class CountWithTimestamp { public String key; public long count; public long lastModified; } public static void main(String[] args) throws Exception { // 设置流执行环境 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 并行度1 env.setParallelism(1); // 源数据流 DataStream<Tuple2<String, Integer>> stream = env .socketTextStream("localhost",9999) .flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>() { @Override public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception { if(StringUtils.isNullOrWhitespaceOnly(s)) { System.out.println("invalid line"); return; } for(String word : s.split("\\W+")){ collector.collect(new Tuple2<>(word,1)); } } }); stream .assignTimestampsAndWatermarks(WatermarkStrategy .<Tuple2<String, Integer>>forMonotonousTimestamps() .withTimestampAssigner((event, ts) -> System.currentTimeMillis()) .withIdleness(Duration.ofSeconds(5)) ) .keyBy(t -> t.f0) .process(new CountWithTimeoutFunction()) .print(); // 执行 env.execute("flink streaming job"); } /** * KeyedProcessFunction的子类,维护计数和超时。 * 它的作用是将每个单词最新出现时间记录到backend,并创建定时器, * 定时器触发的时候,检查这个单词距离上次出现是否已经达到10秒,如果是,就发射给下游算子 */ public static class CountWithTimeoutFunction extends KeyedProcessFunction< String, // key Tuple2<String, Integer>, // input Tuple2<String, Long>> { // output // 由此函数所维护的存储状态 private ValueState<CountWithTimestamp> state; // 首先获得由这个处理函数维护的状态 // 通过 RuntimeContext 访问Flink的keyed state @Override public void open(Configuration parameters) throws Exception { // 初始化状态,状态名称是myState state = getRuntimeContext().getState(new ValueStateDescriptor<>("myState", CountWithTimestamp.class)); } /** * 对于在输入流中接收到的每一个事件,此函数就会被调用以处理该事件 * @param value 输入元素 * @param context 上下文件环境 * @param out * @throws Exception */ @Override public void processElement( Tuple2<String, Integer> value, Context context, Collector<Tuple2<String, Long>> out) throws Exception { // 取得当前是哪个单词 // String currentKey = context.getCurrentKey(); // 从backend取得当前单词的myState状态 CountWithTimestamp current = state.value(); // 如果myState还从未没有赋值过,就在此初始化 if (current == null) { current = new CountWithTimestamp(); current.key = value.f0; } // 先撤销之前的定时器 // context.timerService().deleteEventTimeTimer(current.lastModified + 10000); context.timerService().deleteProcessingTimeTimer(current.lastModified + 10000); // 更新状态计数值(单词数量加1) current.count++; // 将状态的时间戳设置为记录分配的事件时间戳 // 取当前元素的时间戳,作为该单词最后一次出现的时间 current.lastModified = context.timestamp(); // 重新保存到backend,包括该单词出现的次数,以及最后一次出现的时间 state.update(current); // 为当前单词创建定时器,10秒后后触发 long timer = current.lastModified + 10000; context.timerService().registerEventTimeTimer(timer); // context.timerService().registerProcessingTimeTimer(timer); // 打印所有信息,用于核对数据正确性 System.out.println(String.format("process, %s, %d, lastModified : %d (%s), timer : %d (%s)\n", current.key, current.count, current.lastModified, time(current.lastModified), timer, time(timer))); } /** 定时器触发后执行的方法 * 如果一分钟内没有新来的相同的单词,则发出 key/count对 * * @param timestamp 这个时间戳代表的是该定时器的触发时间 * @param ctx * @param out * @throws Exception */ @Override public void onTimer( long timestamp, OnTimerContext ctx, Collector<Tuple2<String, Long>> out) throws Exception { // 取得当前单词 String currentKey = ctx.getCurrentKey(); // 获取调度此计时器的key的状态(即此单词的状态) CountWithTimestamp result = state.value(); // 当前元素是否已经连续10秒未出现的标志 boolean isTimeout = false; // timestamp是定时器触发时间 // 超过10秒钟该单词(key)没有再出现,就发送给下游 if (timestamp >= result.lastModified + 10000) { out.collect(new Tuple2<>(result.key, result.count)); isTimeout = true; } // 打印数据,用于核对是否符合预期 System.out.println(String.format("ontimer, %s, %d, lastModified : %d (%s), stamp : %d (%s), isTimeout : %s\n\n", currentKey, result.count, result.lastModified, time(result.lastModified), timestamp, time(timestamp), String.valueOf(isTimeout))); } } private static String time(long timeStamp) { return new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date(timeStamp)); } }
在processElement方法中,state.value()可以取得当前单词的状态,state.update(current)可以设置当前单词的状态执行以上代码。
请按以下步骤执行上面的程序:
1)在控制台执行命令nc -lk 9999,这样就可以从控制台向9999端口发送字符串了;
2)在IDEA上直接执行上同的程序代码,程序运行就开始监听本机的9999端口了;
3)在netcat的控制台输入aaa再回车,连续两次,中间间隔不要超过10秒;然后再输入ttt回车。如下图所示: