数据转换-reduce
数据转换使用操作符(operator)将一个或多个数据流转换为新的数据流。转换输入可以是一个或多个数据流,转换输出也可以是零个、一个或多个数据流。程序可以将多个转换组合成复杂的数据流拓扑。
reduce转换将当前元素与最后一个减少的值组合在一起,并发出新的值。下面是进行reduce转换的示例代码(对DataStream进行求和)。
Scala代码:
import org.apache.flink.api.scala._ import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment /** * reduce转换 */ object TransformerReduce { def main(args: Array[String]): Unit = { // 设置批处理执行环境 val env = StreamExecutionEnvironment.getExecutionEnvironment // 加载数据源,然后执行DataStream转换 env.fromElements("Good good study", "Day day up") .map(_.toLowerCase) // 转换为小写 .flatMap(_.split("\\W+")) // 相当于先map,再flatten .map((_,1)) // 转换为元组类型 .keyBy(_._1) // 按单词进行分组 .reduce((a,b) => (a._1, a._2 + b._2)) // reduce .print // 触发执行 env.execute("flink reduce transformatiion") } }
Java代码:
import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector; /** * reduce转换 */ public class TransformerReduce { public static void main(String[] args) throws Exception { // 设置流执行环境 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 首先加载数据,然后执行转换 DataStream<Tuple2<String, Integer>> ds = env .fromElements("Good good study","Day day up") .map(String::toLowerCase) .flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>() { @Override public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception { for(String word : s.split("\\W+")){ collector.collect(new Tuple2<>(word,1)); } } }); ds//.keyBy(0) // 已弃用 .keyBy(t -> t.f0) .reduce(new ReduceFunction<Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> reduce(Tuple2<String, Integer> t1, Tuple2<String, Integer> t2) throws Exception { return new Tuple2<>(t1.f0, t1.f1 + t2.f1); } }).print(); // 对于流程序,只有执行了下面这个方法,流程序才真正开始执行 env.execute("flink reduce transformatiion"); } }
6> (good,1) 4> (up,1) 6> (good,2) 7> (day,1) 7> (day,2) 5> (study,1)