读取文件数据源

还可以选择使用基于文件的源函数从文件源中传输数据。

从文件源读取数据的源函数定义有:

  • readTextFile(String path):逐行读取路径指定的文本文件,即符合TextInputFormat规范的文本文件,并以字符串形式返回。
  • readFile(FileInputFormat inputFormat, String path):根据指定的文件输入格式读取(一次)文件。
  • readFile(fileInputFormat, path, watchType, interval, pathFilter):这是前两个方法在内部调用的方法。它根据给定的fileInputFormat读取路径中的文件。

【示例】读取文件,并统计文件内的单词数量。

1、在IDEA中创建Flink项目;

2、在项目的src上单击右键,创建一个名为wc.txt的文本文件。

3、编辑wc.txt,输入以下内容并保存:

good good study
day day up

4、流处理代码实现。

Scala代码:

import org.apache.flink.streaming.api.scala._

object FileSourceDemo {
  def main(args: Array[String]): Unit = {
    // 设置流执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    // 加载文件数据源,构造DataStream
    val textPath = "input/wc.txt"
    val text = env.readTextFile(textPath)

    // 对DataStream执行转换操作
    text
      .flatMap { _.toLowerCase.split("\\W+").filter( _.nonEmpty ) }
      .map { (_, 1) }
      .print()

    // 触发流程序执行
    env.execute("Simple Flink File Source Demo")
  }
}

Java代码:

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

/**
 * 文件数据源
 */
public class FileSourceDemo {
    public static void main(String[] args) throws Exception {
        // 设置流执行环境
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 加载文件数据源,构造DataStream
		String textPath = "input/wc.txt";
		DataStreamSource<String> text = env.readTextFile(textPath);

		// 数据流转换
        text.map(String::toLowerCase)       // 转小写
                .flatMap(new Splitter())    	// flatMap转换
                .print();

        // 触发流程序执行
        env.execute("Simple Flink File Source Demo");
    }

    // 实现FlatMapFunction接口的函数
    public static class Splitter implements FlatMapFunction> {
        @Override
        public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
            for (String word: sentence.split(" ")) {		// 分词
                out.collect(new Tuple2<>(word, 1));	// 构造(word,1)发送给下游算子
            }
        }
    }
}

5、执行以上程序,输出结果如下:

2> (day,1)
6> (good,1)
2> (day,1)
6> (good,1)
2> (up,1)
6> (study,1)

《Flink原理深入与编程实战》