数据转换-filter
数据转换使用操作符(operator)将一个或多个数据流转换为新的数据流。转换输入可以是一个或多个数据流,转换输出也可以是零个、一个或多个数据流。程序可以将多个转换组合成复杂的数据流拓扑。
filter转换
filter函数对条件进行评估,如果结果为true,则该条数据输出。filter函数可以输出零个记录。下面是进行filter转换的示例代码。
Scala代码:
import org.apache.flink.api.scala._ import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment object TransformerFilter{ def main(args: Array[String]) { // 设置批处理执行环境 val env = StreamExecutionEnvironment.getExecutionEnvironment // 得到输入数据,然后执行filter转换 env.fromElements("Good good study", "Day day up") .map(_.toLowerCase) .filter(_.contains("study")) .print() // 执行 env.execute("flink filter transformatiion") } }
Java代码:
import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; /** * filter转换 */ public class TransformerFilter { public static void main(String[] args) throws Exception { // 设置流执行环境 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 首先从环境中获取一些数据,再执行map和filter转换: env.fromElements("Good good study","Day day up") .map(String::toLowerCase) .filter((FilterFunction) s -> s.contains("study")) .print(); // 对于流程序,只有执行了下面这个方法,流程序才真正开始执行 env.execute("flink map transformatiion"); } }
输出结果如下所示:
good good study