使用集合数据源
也可以直接在内存中将一个数据集合读取为DataStream。Flink提供了如下几个方法。
- fromCollection(Seq):从Java Java.util. collection创建一个数据流。集合中的所有元素必须具有相同的类型。
- fromCollection(Iterator):从迭代器创建数据流。该类指定迭代器返回的元素的数据类型。
- fromElements(elements: _*):根据给定的对象序列创建数据流。所有对象必须具有相同的类型。
- fromParallelCollection(SplittableIterator):并行地从迭代器创建数据流。该类指定迭代器返回的元素的数据类型。
- generateSequence(from, to):并行地生成给定区间内的数字序列。
【示例】使用集合数据源。
1、在IntelliJ IDEA中创建一个Flink项目,使用flink-quickstart-scala/flink-quickstart-java项目模板。
2、打开项目中的StreamingJob对象文件,编辑流处理代码。
Scala代码:
import org.apache.flink.streaming.api.scala._ object CollectionSourceDemo { // 定义事件类 case class Person(name:String, age:Integer) def main(args: Array[String]): Unit = { // 设置流执行环境 val env = StreamExecutionEnvironment.getExecutionEnvironment // 首先从环境中获取一些数据,比如: val peoples = List(Person("张三", 21),Person("李四", 16),Person("王老五", 35)) val personStream = env.fromCollection(peoples) // 对DataStream执行转换操作,并输出计算结果 personStream.filter(_.age>18).print() // 触发流程序执行 env.execute("Flink Collection Source Demo") } }
Java代码:
import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import java.util.ArrayList; import java.util.List; public class CollectionSourceDemo { // POJO类 public static class Person { public String name; public Integer age; public Person() {} public Person(String name, Integer age) { this.name = name; this.age = age; } public String toString() { return this.name.toString() + ": age " + this.age.toString(); } } public static void main(String[] args) throws Exception { // 设置流执行环境 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 读取数据源,构造DataStream Listpeoples = new ArrayList<>(); peoples.add(new Person("张三", 21)); peoples.add(new Person("李四", 16)); peoples.add(new Person("王老五", 35)); DataStream personStream = env.fromCollection(peoples); // 对DataStream执行filter转换操作 DataStream adults = personStream.filter(new FilterFunction () { @Override public boolean filter(Person person) throws Exception { return person.age >= 18; } }); // 输出流计算结果 adults.print(); // 触发流程序执行 env.execute("Flink File Source"); } }
3、执行以上程序,输出结果如下:
张三: age 21 王老五: age 35