数据转换-keyBy
数据转换使用操作符(operator)将一个或多个数据流转换为新的数据流。转换输入可以是一个或多个数据流,转换输出也可以是零个、一个或多个数据流。程序可以将多个转换组合成复杂的数据流拓扑。
有一些转换(如join、coGroup、keyBy、groupBy)要求在元素集合上定义一个key。还有一些转换(如reduce、groupReduce、aggregate、windows)可以应用在按key分组的数据上。
Flink的数据模型不是基于key-value对的。因此,不需要将数据集类型物理打包为key和value。key是“虚拟的”:它们被定义一个函数,该函数可指定数据流中实际数据的哪个字段(或属性)用作key。需要注意的是,如果流元素是POJO,那么它必须重写hashCode()方法。
最简单的情况是对元组的一个或多个字段进行分组。请看下面的示例代码。
Scala代码:
import org.apache.flink.streaming.api.scala._ /** * keyBy转换 */ object TransformerKeyBy{ def main(args: Array[String]): Unit = { // 设置流执行环境 val env = StreamExecutionEnvironment.getExecutionEnvironment // 得到输入数据,进行转换: env.fromElements("Good good study", "Day day up") .map(_.toLowerCase) .flatMap(_.split("\\W+")) // 相当于先map,再flatten .map((_,1)) .keyBy(0) // 按元组索引 // .keyBy(t => t._1) // 或按元组的第一个字段分区 .print // 触发流程序执行 env.execute("flink keyBy transformatiion") } }
Java代码:
import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector; /** * keyBy转换 */ public class TransformerKeyBy { public static void main(String[] args) throws Exception { // 设置流执行环境 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 加载数据源,并执行flatMap转换 DataStreamds = env.fromElements("good good study","day day up") .flatMap(new FlatMapFunction () { @Override public void flatMap(String value, Collector out) throws Exception { for(String word: value.split("\\W+")){ out.collect(word); } } }); // 通过map转换,将事件流中事件的数据类型变换为(word,1)元组形式 DataStream > ds_map = ds.map(new MapFunction >() { @Override public Tuple2 map(String s) throws Exception { return new Tuple2<>(s,1); } }); // keyBy转换,按key重分区 KeyedStream ,Tuple> ds_keyed = ds_map.keyBy(0); // 输出 ds_keyed.print(); // 执行 env.execute("flink keyBy transformatiion"); } }
输出结果如下:
(up,1) (study,1) (day,1) (day,1) (good,1) (good,1)