侧输出流

除了DataStream操作产生的主流之外,还可以生成任意数量的附加侧输出(side output)结果流。

什么是侧输出流?

很多场景下都需要从一个Flink操作符输出一个以上的输出流,比如:

  • 异常
  • 格式不正确的事件
  • 延迟事件
  • 操作警报,例如到外部服务的超时连接

除了错误报告之外,侧输出也是实现流的N-路拆分的好方法。

每个侧输出通道都与一个OutputTag关联。标记具有与侧输出的DataStream类型对应的泛型类型,并且具有名称。具有相同名称的两个OutputTag应该具有相同的类型,并将引用相同的侧输出。结果流中的数据类型不必与主流中的数据类型匹配,不同端输出的类型也可以不同。当希望分割一个数据流时,这个操作非常有用。

当使用侧输出时,首先需要定义一个OutputTag,用于标识侧输出流:

Java代码:

// 这需要是一个匿名的内部类,以便我们可以分析类型
OutputTag outputTag = new OutputTag("side-output") {};

Scala代码:

val outputTag = OutputTag[String]("side-output")

请注意OutputTag是如何根据侧输出流包含的元素类型进行类型化的。

可以通过以下函数将数据发送到侧输出流:

  • ProcessFunction
  • KeyedProcessFunction
  • CoProcessFunction
  • ProcessWindowFunction
  • ProcessAllWindowFunction

可以使用上面函数中向用户公开的Context参数,将数据发送到OutputTag标识的侧输出。

要获得侧输出流,可以对DataStream操作的结果使用getSideOutput(OutputTag)。这将得到一个DataStream,这是输入到侧输出流的结果。

侧输出流应用示例

下面我们通过一个示例来理解侧输出流的应用方法。

【示例】下面是一个从ProcessFunction发出侧输出数据的例子,将数据集中的负数挑出来,输出到侧输出中。

Scala代码:

import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector

/**
  * 侧输出示例:将数据集中的负数挑出来,输出到侧输出中
  */
object SideOutputDemo {

  def main(args: Array[String]): Unit = {
    // 设置流执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    // 侧输出
    val numbers = List(1, 2, -3, 4, 5, -6, 7, 8, -9, 10)
    val ds = env.fromElements(numbers:_*)

    // 定义用来标记侧输出的标签,注意泛型参数是侧输出数据的类型
    val outputTag = new OutputTag[Int]("side-output") {}

    // 分流处理
    val mainDataStream = ds.process(new ProcessFunction[Int, Int] {
      override def processElement(value: Int,
                                  ctx: ProcessFunction[Int, Int]#Context,
                                  out: Collector[Int]): Unit = {
        if (value > 0) out.collect(value) 	  // 将数据发送到常规输出
        else ctx.output(outputTag, value) 	  // 向侧输出发送负数
      }
    })

    // 获取侧输出结果
    val sideOutputStream = mainDataStream.getSideOutput[Int](outputTag)

    // 打印注数据流
    mainDataStream.print("主数据流")

    // 打印侧输出流
    sideOutputStream.print("侧输出流")

    // 执行
    env.execute("flink transformatiion")
  }
}

Java代码:

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

/**
 * 侧输出示例:将数据集中的负数挑出来,输出到侧输出中
 */
public class SideOutputDemo {
    public static void main(String[] args) throws Exception {
        // 设置流执行环境
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 侧输出
        DataStream<Integer> ds = env.fromElements(1,2,-3,4,5,-6,7,8,-9,10);

        // 定义用来标记侧输出的标签,注意泛型参数是侧输出数据的类型
        final OutputTag<Integer> outputTag = new OutputTag<Integer>("side-output"){};

        // 分流处理
        SingleOutputStreamOperator<Integer> mainDataStream = ds
                .process(new ProcessFunction<Integer, Integer>() {
                    @Override
                    public void processElement(Integer value, Context ctx, Collector<Integer> out)
                            throws Exception {
                        if(value > 0){
                            out.collect(value); 			// 将数据发送到常规输出
                        }else{
                            ctx.output(outputTag, value); 		// 向侧输出发送负数
                        }
                    }
                });

        // 获取侧输出结果
        DataStream<Integer> sideOutputStream = mainDataStream.getSideOutput(outputTag);

        // 打印主输出流
        // mainDataStream.print("主数据流");

        // 打印侧输出流
        sideOutputStream.print("侧输出流");

        // 执行
        env.execute("flink transformatiion");
    }
}

执行以上代码,输出结果如下所示:

侧输出流:8> -3
侧输出流:3> -6
侧输出流:6> -9

《PySpark原理深入与编程实战》