数据转换-connect

DataStream的connect操作创建的是ConnectedStreams或BroadcastConnectedStream,它用了两个泛型,即不要求两个dataStream的element是同一类型。

union虽然可以合并多个数据流,但有一个限制,即多个数据流的数据类型必须相同。connect提供了和union类似的功能,用来连接两个数据流,它与union的区别在于:

  • connect只能连接两个数据流,union可以连接多个数据流。
  • connect所连接的两个数据流的数据类型可以不一致,union所连接的两个数据流的数据类型必须一致。
  • 两个DataStream经过connect之后被转化为ConnectedStreams,ConnectedStreams会对两个流的数据应用不同的处理方法,且双流之间可以共享状态。

两个输入流经过connect合并后,可以进一步使用CoProcessFunction、CoMap、CoFlatMap、KeyedCoProcessFunction等API 对两个流分别处理。

下面是进行connect转换的示例代码。

Scala代码:

import org.apache.flink.streaming.api.functions.co.{CoMapFunction, CoProcessFunction}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector

/**
  * connect转换操作
  * connect和union都有一个共同的作用,就是将2个流或多个流合成一个流。
  * 但是两者的区别是:union连接的2个流的类型必须一致,connect连接的流可以不一致,但是可以统一处理。
  */
object TransformerConnect {
  def main(args: Array[String]): Unit = {
    // 设置流执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    // connect
    // 第一个数据集
    val ds1:DataStream[(String, Int)] = env.fromElements("good good study").flatMap(_.split("\\W+")).map((_, 1))

    // 第二个数据集
    val ds2:DataStream[String] = env.fromElements("day day up").flatMap(_.split("\\W+"))

    // 连接两个数据集
    val ds:ConnectedStreams[(String, Int),String] = ds1.connect(ds2)

    // 调用process方法
    // CoProcessFunction泛型参数:[输入流1数据类型,输入流2数据类型,输出流数据类型]
    ds.process(new CoProcessFunction[(String, Int), String, (String, Int)]{
      // 处理输入流1的元素
      override def processElement1(in1: (String, Int),
                                   context: CoProcessFunction[(String, Int), String, (String, Int)]#Context,
                                   out: Collector[(String, Int)]): Unit = {
          out.collect(in1)      // 发送给下游算子
      }

      // 处理输入流2的元素
      override def processElement2(in2: String,
                                   context: CoProcessFunction[(String, Int),
                                     String, (String, Int)]#Context,
                                   out: Collector[(String, Int)]): Unit = {
          out.collect((in2,1))  // 将来自输入流2的元素转换为元组,再发送给下游算子
      }
    }).print

    // map
    ds.map(new CoMapFunction[(String, Int), String, (String, Int)] {
      override def map1(in1: (String, Int)): (String, Int) = {
          (in1._1.toUpperCase, in1._2)
      }

      override def map2(in2: String): (String, Int) = {
        (in2, 1)
      }
    }).print

    // 执行
    env.execute("flink connect transformatiion")
  }
}

Java代码

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
import org.apache.flink.util.Collector;

/**
 * connect转换操作
 * connect和union都有一个共同的作用,就是将2个流或多个流合成一个流。
 * 但是两者的区别是:union连接的2个流的类型必须一致,connect连接的流可以不一致,但是可以统一处理。
 */
public class TransformerConnect {
    public static void main(String[] args) throws Exception {
        // 设置流执行环境
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // connect转换操作
        // 第一个数据集
        DataStream<Tuple2<String, Integer>> ds1 = env.fromElements("good good study")
                .map(String::toLowerCase)
                .flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
                    @Override
                    public void flatMap(String value, Collector<Tuple2<String, Integer>> out) 
                                throws Exception {
                        for(String word: value.split("\\W+")){
                            out.collect(new Tuple2<>(word,1));
                        }
                    }
                });

        // 第二个数据集
        DataStream<String> ds2 = env.fromElements("day day up")
                .map(String::toLowerCase)
                .flatMap(new FlatMapFunction<String, String>() {
                    @Override
                    public void flatMap(String value, Collector<String> out) throws Exception {
                        for(String word: value.split("\\W+")){
                            out.collect(word);
                        }
                    }
                });

        // 连接两个数据集
        ConnectedStreams<Tuple2<String, Integer>,String> ds = ds1.connect(ds2);
        
        ds.process(new CoProcessFunction<Tuple2<String,Integer>, String, Tuple2<String,Integer>>() {
            @Override
            public void processElement1(Tuple2<String, Integer> t, 
                                        Context context, 
                                        Collector<Tuple2<String,Integer>> out) throws Exception {
                out.collect(t);
            }

            @Override
            public void processElement2(String s, Context context, Collector<Tuple2<String,Integer>> out)
                                  throws Exception {
                out.collect(new Tuple2<>(s,1));
            }
        }).print("process");

        ds.map(new CoMapFunction<Tuple2<String,Integer>, String, Tuple2<String,Integer>>() {
            @Override
            public Tuple2<String, Integer> map1(Tuple2<String, Integer> t) throws Exception {
                return new Tuple2<>(t.f0.toUpperCase(),t.f1);
            }

            @Override
            public Tuple2<String, Integer> map2(String s) throws Exception {
                return new Tuple2<>(s, 1);
            }
        }).print("map");

        // 执行
        env.execute("flink connect transformatiion");
    }
}

执行以上代码,输出结果如下所示:

process:1> (day,1)
process:1> (day,1)
process:1> (up,1)
map:1> (day,1)
map:1> (day,1)
map:1> (up,1)
map:5> (GOOD,1)
map:5> (GOOD,1)
map:5>> (STUDY,1)
process:5> (good,1)
process:5> (good,1)
process:5> (study,1)

《PySpark原理深入与编程实战》