Flink流处理程序编程模型
Flink程序的基本构建块是stream和transformation(流和转换)。从概念上讲,stream是数据记录的流(可能永远不会结束),transformation是一个运算,它接受一个或多个流作为输入,经过处理/计算后生成一个或多个输出流。
下面我们实现一个完整的、可工作的Flink流应用程序示例。
【示例】将有关人员的记录流作为输入,并从中筛选出未成年人信息。Scala实现
import org.apache.flink.streaming.api.scala._ /** * 将有关人员的记录流作为输入,并从中筛选出未成年人信息。 */ object StreamingJobDemo1 { // 定义事件类 case class Person(name:String, age:Integer) def main(args: Array[String]) { // 设置流执行环境 val env = StreamExecutionEnvironment.getExecutionEnvironment // 读取数据源,构造数据流 val peoples = env.fromElements( Person("张三", 21), Person("李四", 16), Person("王老五", 35) ) // 对数据流执行filter转换 val adults = peoples.filter(_.age>18) // 输出结果 adults.print // 执行 env.execute("Flink Streaming Job") } }
执行代码,输出结果如下:
7> Person(张三,21) 1> Person(王老五,35)
Java实现
(1)在IntelliJ IDEA中创建一个Flink项目,使用flink-quickstart-java项目模板。
(2)设置依赖。在pom.xml中添加如下依赖:
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>1.13.2</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.12</artifactId> <version>1.13.2</version> <scope>provided</scope> </dependency>
(3)创建一个POJO类,用来表示流中的数据。代码如下。
// POJO类,表示人员信息实体 public class Person { public String name; // 存储姓名 public Integer age; // 存储年龄 // 空构造器 public Person() {}; // 构造器,初始化属性 public Person(String name, Integer age) { this.name = name; this.age = age; }; // 用于调试时输出信息 public String toString() { return this.name.toString() + ": age " + this.age.toString(); }; }
(4)打开项目中的StreamingJob对象文件,编辑流处理代码如下。
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.api.common.functions.FilterFunction; public class StreamingJobDemo1 { public static void main(String[] args) throws Exception { // 获得流执行环境 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 读取数据源,构造DataStream DataStreampersonDS = env.fromElements( new Person("张三", 21), new Person("李四", 16), new Person("王老五", 35) ); // 执行转换运算(这里是过滤年龄不小于18岁的人) // 注意,这里我们使用了匿名函数 DataStream<Person> adults = personDS.filter(new FilterFunction () { @Override public boolean filter(Person person) throws Exception { return person.age >= 18; } }); // 将结果输出到控制台 adults.print(); // 触发流程序开始执行 env.execute("stream demo"); } }
(5)执行以上程序,输出结果如下。
张三: age 21 王老五: age 35