数据转换-聚合转换
DataStream API支持各种聚合,比如min、max、sum等等。这些函数可以应用于KeyedDataStream,以获得滚动聚合。下面是进行聚合转换的示例代码。
Scala代码:
import org.apache.flink.streaming.api.scala._ /** * DataStream API支持各种聚合,比如min、max、sum等等。 * 这些函数可以应用于KeyedDataStream,以获得滚动聚合。 */ object TransformerAgg{ def main(args: Array[String]): Unit = { // 设置流执行环境 val env = StreamExecutionEnvironment.getExecutionEnvironment // 首先从环境中获取一些数据,并使用操作符转换 DataStream[String]。比如: val ds_keyed = env.fromElements(("good",1),("good",2),("study",1)).keyBy(_._1) ds_keyed.sum(1).print ds_keyed.min(1).print ds_keyed.max(1).print ds_keyed.minBy(1).print ds_keyed.maxBy(1).print env.execute("flink aggregation transformatiion") } }
Java代码:
import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; /** * DataStream API支持各种聚合,比如min、max、sum等等。 * 这些函数可以应用于KeyedDataStream,以获得滚动聚合。 */ public class TransformerAgg { public static void main(String[] args) throws Exception { // 设置流执行环境 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 首先从环境中获取一些数据,再执行map和flatMap转换: DataStream<Tuple2<String,Integer>> ds = env.fromElements( new Tuple2<>("good",1), new Tuple2<>("good",2), new Tuple2<>("study",1)); ds.keyBy(t -> t.f0).sum(1).print(); // 参数也可以是字段名 ds.keyBy(t -> t.f0).min(1).print(); ds.keyBy(t -> t.f0).max(1).print(); ds.keyBy(t -> t.f0).minBy(1).print(); ds.keyBy(t -> t.f0).maxBy(1).print(); // 对于流程序,只有执行了下面这个方法,流程序才真正开始执行 env.execute("flink aggregation transformatiion"); } }