使用集合数据源

也可以直接在内存中将一个数据集合读取为DataStream。Flink提供了如下几个方法。

  • fromCollection(Seq):从Java Java.util. collection创建一个数据流。集合中的所有元素必须具有相同的类型。
  • fromCollection(Iterator):从迭代器创建数据流。该类指定迭代器返回的元素的数据类型。
  • fromElements(elements: _*):根据给定的对象序列创建数据流。所有对象必须具有相同的类型。
  • fromParallelCollection(SplittableIterator):并行地从迭代器创建数据流。该类指定迭代器返回的元素的数据类型。
  • generateSequence(from, to):并行地生成给定区间内的数字序列。

【示例】使用集合数据源。

1、在IntelliJ IDEA中创建一个Flink项目,使用flink-quickstart-scala/flink-quickstart-java项目模板。

2、打开项目中的StreamingJob对象文件,编辑流处理代码。

Scala代码:

import org.apache.flink.streaming.api.scala._

object CollectionSourceDemo {

  // 定义事件类
  case class Person(name:String, age:Integer)

  def main(args: Array[String]): Unit = {
    // 设置流执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    // 首先从环境中获取一些数据,比如:
    val peoples = List(Person("张三", 21),Person("李四", 16),Person("王老五", 35))
    val personStream = env.fromCollection(peoples)

    // 对DataStream执行转换操作,并输出计算结果
    personStream.filter(_.age>18).print()

    // 触发流程序执行
    env.execute("Flink Collection Source Demo")
  }
}

Java代码:

import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.ArrayList;
import java.util.List;

public class CollectionSourceDemo {

	// POJO类
	public static class Person {
		public String name;
		public Integer age;

		public Person() {}

		public Person(String name, Integer age) {
			this.name = name;
			this.age = age;
		}

		public String toString() {
			return this.name.toString() + ": age " + this.age.toString();
		}
	}

	public static void main(String[] args) throws Exception {
		// 设置流执行环境
		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

		// 读取数据源,构造DataStream
		List peoples = new ArrayList<>();
		peoples.add(new Person("张三", 21));
		peoples.add(new Person("李四", 16));
		peoples.add(new Person("王老五", 35));

		DataStream personStream = env.fromCollection(peoples);

		// 对DataStream执行filter转换操作
		DataStream adults = personStream.filter(new FilterFunction() {
			@Override
			public boolean filter(Person person) throws Exception {
				return person.age >= 18;
			}
		});

		// 输出流计算结果
		adults.print();

		// 触发流程序执行
		env.execute("Flink File Source");
	}
}

3、执行以上程序,输出结果如下:

张三: age 21
王老五: age 35

《Flink原理深入与编程实战》