将流计算结果写出到CSV文件

在使用 Flink 进行数据处理时,数据经数据源流入,然后通过系列转换,最终可以通过Data Sink将计算结果进行输出,Flink Data Sinks(数据接收器)就是用于定义数据流最终的输出位置,它消费数据流并将它们转发到文件、套接字、外部系统或打印输出。

内置数据接收器

Flink提供了多种内置的Data Sink API 用于日常的开发,具体如下:

  • writeAsText("/path/to/file"):用于将计算结果以字符串的方式并行地写入到指定文件夹下。这些字符串是通过调用每个元素的toString()方法获得的,使用的输出类是TextOutputFormat。这个方法除了路径参数是必选外,还可以通过指定第二个参数来定义输出模式。输出模式有以下两个可选值:
    • WriteMode.NO_OVERWRITE:当指定路径上不存在任何文件时,才执行写出操作;
    • WriteMode.OVERWRITE:不论指定路径上是否存在文件,都执行写出操作;如果原来已有文件,则进行覆盖。
  • writeAsCsv("/path/to/file"):用于将计算结果以CSV的文件格式写出到指定目录。行和字段分隔符是可配置的。每个字段的值来自对象的toString()方法。使用的输出类是CsvOutputFormat。
  • print()/printToErr():在标准输出/标准错误流上打印输出每个元素的toString()值。可选地,可以提供输出的前缀,这有助于区分不同的print调用。如果并行度大于1,输出也将以产生输出的任务的id作为前缀。print/printToErr是测试当中最常用的方式,用于将计算结果以标准输出流或错误输出流的方式打印到控制台上。
  • writeUsingOutputFormat():自定义文件输出的基类和方法。支持自定义对象到字节的转换。在定义自定义格式时,需要继承自FileOutputFormat,它负责序列化和反序列化。上面介绍的 writeAsText 和 writeAsCsv 其底层调用的都是该方法。
  • writeToSocket(host, port, SerializationSchema):用于将计算结果以指定的格式写出到指定的socket套接字。为了正确的序列化和格式化,需要定义SerializationSchema。
  • addSink:调用自定义接收器函数。Flink与作为接收器函数实现的其他系统(如Apache Kafka)的连接器捆绑在一起。

注:以上方法中,以writeAs*开头的方法,在Flink API文档中,已经标识为"Deprecated",即弃用状态,在未来的版本中有可能被删除,因此使用时要慎重。原因请参见下一节。

【示例】分析流数据,并将分析结果写出到csv文件中。

下面我们通过示例来掌所致常用Flink Data Sink的用法。请按以下步骤执行。

1、在IntelliJ IDEA中创建一个Flink项目,使用flink-quickstart-scala/flink-quickstart-java项目模板。(Flink项目创建过程,请参见2.2节)

2、设置依赖。在pom.xml中添加如下依赖:

Scala Maven依赖:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-scala_2.12</artifactId>
    <version>1.13.2</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-scala_2.12</artifactId>
    <version>1.13.2</version>
    <provided</scope>
</dependency>

Java Maven依赖:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-java</artifactId>
    <version>1.13.2</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java_2.12</artifactId>
    <version>1.13.2</version>
    <scope>provided</scope>
</dependency>

3、创建流应用程序类。代码如下。

Scala代码:

import org.apache.flink.streaming.api.scala._

/**
  * Data Sink:writeAsCSV方法,将结果写入到csv文件
  */
object DataSinkAsCSV {

  def main(args: Array[String]): Unit = {
    // 设置流执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    // 得到输入数据,进行转换:
    env.fromElements("Good good study", "Day day up")
        .map(_.toLowerCase)
        .flatMap(_.split("\\W+"))     		// 相当于先map,再flatten
        .map((_,1))                   		// 转换为元组
        .writeAsCsv("result.csv")     		// 写出到结果文件中
        .setParallelism(1)            		// 结果写到单个文件中

    env.execute("Data Sink Demo")
  }
}

Java代码:

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

/**
 * Data Sink:writeAsCSV方法,将结果写入到csv文件
 */
public class DataSinkDemo1 {
    public static void main(String[] args) throws Exception {
        // 设置流执行环境
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 获得数据,执行map和flatMap转换
        env.fromElements("Good good study","Day day up")
                .map(String::toLowerCase)
                .flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>() {
                    @Override
                    public void flatMap(String s, Collector<Tuple2<String,Integer>> out) throws Exception {
                        for(String word : s.split("\\W+")){
                            out.collect(new Tuple2<>(word,1));
                        }
                    }
                })
                .writeAsCsv("result.csv")       	// 写出到指定的结果文件中
                .setParallelism(1);             	// 写出到一个结果文件中

        // 执行流程序
        env.execute("Data Sink Demo");
    }
}

4、执行以上程序,可以看到,在项目的根目录下生成了一个结果文件result。查看输出的结果文件result,内容如下所示:

good,1
good,1
study,1
day,1
day,1
up,1

《PySpark原理深入与编程实战》