数据转换-flatMap

数据转换使用操作符(operator)将一个或多个数据流转换为新的数据流。转换输入可以是一个或多个数据流,转换输出也可以是零个、一个或多个数据流。程序可以将多个转换组合成复杂的数据流拓扑。

flatMap转换

flatMap接受一条记录并输出零条、一条或多条记录。下面是进行flatMap转换的示例代码。

Scala代码:

import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

object TransformerFlatMap{

  def main(args: Array[String]) {
    // 设置批处理执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    // 得到输入数据,flatMap转换:
    env.fromElements("Good good study", "Day day up")
       .map(_.toLowerCase)
       .flatMap(_.split("\\W+"))     		// 相当于先执行map,再执行flatten
       .print

    // 执行
    env.execute("flink flatmap transformatiion")
  }
}

Java代码:

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * flatMap转换
 */
public class TransformerFlatMap {
    public static void main(String[] args) throws Exception {
        // 设置流执行环境
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 首先从环境中获取一些数据,再执行map和flatMap转换:
        env.fromElements("Good good study","Day day up")
                .map(String::toLowerCase)
                // 传入一个匿名函数
                .flatMap((FlatMapFunction) (value, out) -> {
                    for(String word: value.split("\\W+")){
                        out.collect(word);
                    }
                }).returns(Types.STRING)
                .print();

        // 对于流程序,只有执行了下面这个方法,流程序才真正开始执行
        env.execute("flink flatmap transformatiion");
    }
}

上面的代码中,flatMap函数的传入参数是一个Lambda表达式,它对于flatMap 的支持是无法猜测出来类型的,必须通过returns(Types.STRING) 指定具体的返回值类型。

执行以上代码,输出结果如下所示:

good
good
study
day
day
up

《PySpark原理深入与编程实战》