数据转换-reduce

数据转换使用操作符(operator)将一个或多个数据流转换为新的数据流。转换输入可以是一个或多个数据流,转换输出也可以是零个、一个或多个数据流。程序可以将多个转换组合成复杂的数据流拓扑。

reduce转换将当前元素与最后一个减少的值组合在一起,并发出新的值。下面是进行reduce转换的示例代码(对DataStream进行求和)。

Scala代码:

import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

/**
  * reduce转换
  */
object TransformerReduce {

  def main(args: Array[String]): Unit = {
    // 设置批处理执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    // 加载数据源,然后执行DataStream转换
    env.fromElements("Good good study", "Day day up")
      .map(_.toLowerCase)           	// 转换为小写
      .flatMap(_.split("\\W+"))     		// 相当于先map,再flatten
      .map((_,1))                   	// 转换为元组类型
      .keyBy(_._1)                    // 按单词进行分组
      .reduce((a,b) => (a._1, a._2 + b._2)) 		// reduce
      .print

    // 触发执行
    env.execute("flink reduce transformatiion")
  }
}

Java代码:

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

/**
 * reduce转换
 */
public class TransformerReduce {
    public static void main(String[] args) throws Exception {
        // 设置流执行环境
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 首先加载数据,然后执行转换
        DataStream<Tuple2<String, Integer>> ds = env
                .fromElements("Good good study","Day day up")
                .map(String::toLowerCase)
                .flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>() {
                    @Override
                    public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
                        for(String word : s.split("\\W+")){
                            collector.collect(new Tuple2<>(word,1));
                        }
                    }
                });

        ds//.keyBy(0)     // 已弃用
          .keyBy(t -> t.f0)
          .reduce(new ReduceFunction<Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> reduce(Tuple2<String, Integer> t1, Tuple2<String, Integer> t2)
                    throws Exception {
                return new Tuple2<>(t1.f0, t1.f1 + t2.f1);
            }
        }).print();

        // 对于流程序,只有执行了下面这个方法,流程序才真正开始执行
        env.execute("flink reduce transformatiion");
    }
}


6> (good,1)
4> (up,1)
6> (good,2)
7> (day,1)
7> (day,2)
5> (study,1)

《PySpark原理深入与编程实战》