数据转换-聚合转换

DataStream API支持各种聚合,比如min、max、sum等等。这些函数可以应用于KeyedDataStream,以获得滚动聚合。下面是进行聚合转换的示例代码。

Scala代码:

import org.apache.flink.streaming.api.scala._

/**
  * DataStream API支持各种聚合,比如min、max、sum等等。
  * 这些函数可以应用于KeyedDataStream,以获得滚动聚合。
  */
object TransformerAgg{
  def main(args: Array[String]): Unit = {
    // 设置流执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    // 首先从环境中获取一些数据,并使用操作符转换 DataStream[String]。比如:
    val ds_keyed = env.fromElements(("good",1),("good",2),("study",1)).keyBy(_._1)

    ds_keyed.sum(1).print
    ds_keyed.min(1).print
    ds_keyed.max(1).print
    ds_keyed.minBy(1).print
    ds_keyed.maxBy(1).print

    env.execute("flink aggregation transformatiion")
  }
}

Java代码:

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * DataStream API支持各种聚合,比如min、max、sum等等。
 * 这些函数可以应用于KeyedDataStream,以获得滚动聚合。
 */
public class TransformerAgg {
    public static void main(String[] args) throws Exception {
        // 设置流执行环境
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 首先从环境中获取一些数据,再执行map和flatMap转换:
        DataStream<Tuple2<String,Integer>> ds = env.fromElements(
                new Tuple2<>("good",1),
                new Tuple2<>("good",2),
                new Tuple2<>("study",1));

        ds.keyBy(t -> t.f0).sum(1).print();			// 参数也可以是字段名
        ds.keyBy(t -> t.f0).min(1).print();
        ds.keyBy(t -> t.f0).max(1).print();
        ds.keyBy(t -> t.f0).minBy(1).print();
        ds.keyBy(t -> t.f0).maxBy(1).print();

        // 对于流程序,只有执行了下面这个方法,流程序才真正开始执行
        env.execute("flink aggregation transformatiion");
    }
}

《PySpark原理深入与编程实战》