数据转换-connect
DataStream的connect操作创建的是ConnectedStreams或BroadcastConnectedStream,它用了两个泛型,即不要求两个dataStream的element是同一类型。
union虽然可以合并多个数据流,但有一个限制,即多个数据流的数据类型必须相同。connect提供了和union类似的功能,用来连接两个数据流,它与union的区别在于:
- connect只能连接两个数据流,union可以连接多个数据流。
- connect所连接的两个数据流的数据类型可以不一致,union所连接的两个数据流的数据类型必须一致。
- 两个DataStream经过connect之后被转化为ConnectedStreams,ConnectedStreams会对两个流的数据应用不同的处理方法,且双流之间可以共享状态。
两个输入流经过connect合并后,可以进一步使用CoProcessFunction、CoMap、CoFlatMap、KeyedCoProcessFunction等API 对两个流分别处理。
下面是进行connect转换的示例代码。
Scala代码:
import org.apache.flink.streaming.api.functions.co.{CoMapFunction, CoProcessFunction} import org.apache.flink.streaming.api.scala._ import org.apache.flink.util.Collector /** * connect转换操作 * connect和union都有一个共同的作用,就是将2个流或多个流合成一个流。 * 但是两者的区别是:union连接的2个流的类型必须一致,connect连接的流可以不一致,但是可以统一处理。 */ object TransformerConnect { def main(args: Array[String]): Unit = { // 设置流执行环境 val env = StreamExecutionEnvironment.getExecutionEnvironment // connect // 第一个数据集 val ds1:DataStream[(String, Int)] = env.fromElements("good good study").flatMap(_.split("\\W+")).map((_, 1)) // 第二个数据集 val ds2:DataStream[String] = env.fromElements("day day up").flatMap(_.split("\\W+")) // 连接两个数据集 val ds:ConnectedStreams[(String, Int),String] = ds1.connect(ds2) // 调用process方法 // CoProcessFunction泛型参数:[输入流1数据类型,输入流2数据类型,输出流数据类型] ds.process(new CoProcessFunction[(String, Int), String, (String, Int)]{ // 处理输入流1的元素 override def processElement1(in1: (String, Int), context: CoProcessFunction[(String, Int), String, (String, Int)]#Context, out: Collector[(String, Int)]): Unit = { out.collect(in1) // 发送给下游算子 } // 处理输入流2的元素 override def processElement2(in2: String, context: CoProcessFunction[(String, Int), String, (String, Int)]#Context, out: Collector[(String, Int)]): Unit = { out.collect((in2,1)) // 将来自输入流2的元素转换为元组,再发送给下游算子 } }).print // map ds.map(new CoMapFunction[(String, Int), String, (String, Int)] { override def map1(in1: (String, Int)): (String, Int) = { (in1._1.toUpperCase, in1._2) } override def map2(in2: String): (String, Int) = { (in2, 1) } }).print // 执行 env.execute("flink connect transformatiion") } }
Java代码
import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.ConnectedStreams; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.co.CoMapFunction; import org.apache.flink.streaming.api.functions.co.CoProcessFunction; import org.apache.flink.util.Collector; /** * connect转换操作 * connect和union都有一个共同的作用,就是将2个流或多个流合成一个流。 * 但是两者的区别是:union连接的2个流的类型必须一致,connect连接的流可以不一致,但是可以统一处理。 */ public class TransformerConnect { public static void main(String[] args) throws Exception { // 设置流执行环境 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // connect转换操作 // 第一个数据集 DataStream<Tuple2<String, Integer>> ds1 = env.fromElements("good good study") .map(String::toLowerCase) .flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() { @Override public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception { for(String word: value.split("\\W+")){ out.collect(new Tuple2<>(word,1)); } } }); // 第二个数据集 DataStream<String> ds2 = env.fromElements("day day up") .map(String::toLowerCase) .flatMap(new FlatMapFunction<String, String>() { @Override public void flatMap(String value, Collector<String> out) throws Exception { for(String word: value.split("\\W+")){ out.collect(word); } } }); // 连接两个数据集 ConnectedStreams<Tuple2<String, Integer>,String> ds = ds1.connect(ds2); ds.process(new CoProcessFunction<Tuple2<String,Integer>, String, Tuple2<String,Integer>>() { @Override public void processElement1(Tuple2<String, Integer> t, Context context, Collector<Tuple2<String,Integer>> out) throws Exception { out.collect(t); } @Override public void processElement2(String s, Context context, Collector<Tuple2<String,Integer>> out) throws Exception { out.collect(new Tuple2<>(s,1)); } }).print("process"); ds.map(new CoMapFunction<Tuple2<String,Integer>, String, Tuple2<String,Integer>>() { @Override public Tuple2<String, Integer> map1(Tuple2<String, Integer> t) throws Exception { return new Tuple2<>(t.f0.toUpperCase(),t.f1); } @Override public Tuple2<String, Integer> map2(String s) throws Exception { return new Tuple2<>(s, 1); } }).print("map"); // 执行 env.execute("flink connect transformatiion"); } }
执行以上代码,输出结果如下所示:
process:1> (day,1) process:1> (day,1) process:1> (up,1) map:1> (day,1) map:1> (day,1) map:1> (up,1) map:5> (GOOD,1) map:5> (GOOD,1) map:5>> (STUDY,1) process:5> (good,1) process:5> (good,1) process:5> (study,1)