数据转换-keyBy

数据转换使用操作符(operator)将一个或多个数据流转换为新的数据流。转换输入可以是一个或多个数据流,转换输出也可以是零个、一个或多个数据流。程序可以将多个转换组合成复杂的数据流拓扑。

有一些转换(如join、coGroup、keyBy、groupBy)要求在元素集合上定义一个key。还有一些转换(如reduce、groupReduce、aggregate、windows)可以应用在按key分组的数据上。

Flink的数据模型不是基于key-value对的。因此,不需要将数据集类型物理打包为key和value。key是“虚拟的”:它们被定义一个函数,该函数可指定数据流中实际数据的哪个字段(或属性)用作key。需要注意的是,如果流元素是POJO,那么它必须重写hashCode()方法。

最简单的情况是对元组的一个或多个字段进行分组。请看下面的示例代码。

Scala代码:

import org.apache.flink.streaming.api.scala._

/**
  * keyBy转换
  */
object TransformerKeyBy{
  def main(args: Array[String]): Unit = {
    // 设置流执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    // 得到输入数据,进行转换:
    env.fromElements("Good good study", "Day day up")
      .map(_.toLowerCase)
      .flatMap(_.split("\\W+"))     		// 相当于先map,再flatten
      .map((_,1))
      .keyBy(0)  					// 按元组索引
      // .keyBy(t => t._1)				// 或按元组的第一个字段分区
      .print

    // 触发流程序执行
    env.execute("flink keyBy transformatiion")
  }
}

Java代码:

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

/**
 * keyBy转换
 */
public class TransformerKeyBy {
    public static void main(String[] args) throws Exception {
        // 设置流执行环境
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 加载数据源,并执行flatMap转换
        DataStream ds = env.fromElements("good good study","day day up")
                .flatMap(new FlatMapFunction() {
                    @Override
                    public void flatMap(String value, Collector out)
                            throws Exception {
                        for(String word: value.split("\\W+")){
                            out.collect(word);
                        }
                    }
                });

        // 通过map转换,将事件流中事件的数据类型变换为(word,1)元组形式
        DataStream> ds_map =
                ds.map(new MapFunction>() {
                    @Override
                    public Tuple2 map(String s) throws Exception {
                        return new Tuple2<>(s,1);
                    }
                });

        // keyBy转换,按key重分区
        KeyedStream,Tuple> ds_keyed = ds_map.keyBy(0);

        // 输出 
        ds_keyed.print();

        // 执行
        env.execute("flink keyBy transformatiion");
    }
}

输出结果如下:

(up,1)
(study,1)
(day,1)
(day,1)
(good,1)
(good,1)

《PySpark原理深入与编程实战》