侧输出流
除了DataStream操作产生的主流之外,还可以生成任意数量的附加侧输出(side output)结果流。
什么是侧输出流?
很多场景下都需要从一个Flink操作符输出一个以上的输出流,比如:
- 异常
- 格式不正确的事件
- 延迟事件
- 操作警报,例如到外部服务的超时连接
除了错误报告之外,侧输出也是实现流的N-路拆分的好方法。
每个侧输出通道都与一个OutputTag
当使用侧输出时,首先需要定义一个OutputTag,用于标识侧输出流:
Java代码:
// 这需要是一个匿名的内部类,以便我们可以分析类型 OutputTagoutputTag = new OutputTag ("side-output") {};
Scala代码:
val outputTag = OutputTag[String]("side-output")
请注意OutputTag是如何根据侧输出流包含的元素类型进行类型化的。
可以通过以下函数将数据发送到侧输出流:
- ProcessFunction
- KeyedProcessFunction
- CoProcessFunction
- ProcessWindowFunction
- ProcessAllWindowFunction
可以使用上面函数中向用户公开的Context参数,将数据发送到OutputTag标识的侧输出。
要获得侧输出流,可以对DataStream操作的结果使用getSideOutput(OutputTag)。这将得到一个DataStream,这是输入到侧输出流的结果。
侧输出流应用示例
下面我们通过一个示例来理解侧输出流的应用方法。
【示例】下面是一个从ProcessFunction发出侧输出数据的例子,将数据集中的负数挑出来,输出到侧输出中。
Scala代码:
import org.apache.flink.streaming.api.functions.ProcessFunction import org.apache.flink.streaming.api.scala._ import org.apache.flink.util.Collector /** * 侧输出示例:将数据集中的负数挑出来,输出到侧输出中 */ object SideOutputDemo { def main(args: Array[String]): Unit = { // 设置流执行环境 val env = StreamExecutionEnvironment.getExecutionEnvironment // 侧输出 val numbers = List(1, 2, -3, 4, 5, -6, 7, 8, -9, 10) val ds = env.fromElements(numbers:_*) // 定义用来标记侧输出的标签,注意泛型参数是侧输出数据的类型 val outputTag = new OutputTag[Int]("side-output") {} // 分流处理 val mainDataStream = ds.process(new ProcessFunction[Int, Int] { override def processElement(value: Int, ctx: ProcessFunction[Int, Int]#Context, out: Collector[Int]): Unit = { if (value > 0) out.collect(value) // 将数据发送到常规输出 else ctx.output(outputTag, value) // 向侧输出发送负数 } }) // 获取侧输出结果 val sideOutputStream = mainDataStream.getSideOutput[Int](outputTag) // 打印注数据流 mainDataStream.print("主数据流") // 打印侧输出流 sideOutputStream.print("侧输出流") // 执行 env.execute("flink transformatiion") } }
Java代码:
import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.util.Collector; import org.apache.flink.util.OutputTag; /** * 侧输出示例:将数据集中的负数挑出来,输出到侧输出中 */ public class SideOutputDemo { public static void main(String[] args) throws Exception { // 设置流执行环境 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 侧输出 DataStream<Integer> ds = env.fromElements(1,2,-3,4,5,-6,7,8,-9,10); // 定义用来标记侧输出的标签,注意泛型参数是侧输出数据的类型 final OutputTag<Integer> outputTag = new OutputTag<Integer>("side-output"){}; // 分流处理 SingleOutputStreamOperator<Integer> mainDataStream = ds .process(new ProcessFunction<Integer, Integer>() { @Override public void processElement(Integer value, Context ctx, Collector<Integer> out) throws Exception { if(value > 0){ out.collect(value); // 将数据发送到常规输出 }else{ ctx.output(outputTag, value); // 向侧输出发送负数 } } }); // 获取侧输出结果 DataStream<Integer> sideOutputStream = mainDataStream.getSideOutput(outputTag); // 打印主输出流 // mainDataStream.print("主数据流"); // 打印侧输出流 sideOutputStream.print("侧输出流"); // 执行 env.execute("flink transformatiion"); } }
执行以上代码,输出结果如下所示:
侧输出流:8> -3 侧输出流:3> -6 侧输出流:6> -9