将流计算结果写出到CSV文件
在使用 Flink 进行数据处理时,数据经数据源流入,然后通过系列转换,最终可以通过Data Sink将计算结果进行输出,Flink Data Sinks(数据接收器)就是用于定义数据流最终的输出位置,它消费数据流并将它们转发到文件、套接字、外部系统或打印输出。
内置数据接收器
Flink提供了多种内置的Data Sink API 用于日常的开发,具体如下:
- writeAsText("/path/to/file"):用于将计算结果以字符串的方式并行地写入到指定文件夹下。这些字符串是通过调用每个元素的toString()方法获得的,使用的输出类是TextOutputFormat。这个方法除了路径参数是必选外,还可以通过指定第二个参数来定义输出模式。输出模式有以下两个可选值:
- WriteMode.NO_OVERWRITE:当指定路径上不存在任何文件时,才执行写出操作;
- WriteMode.OVERWRITE:不论指定路径上是否存在文件,都执行写出操作;如果原来已有文件,则进行覆盖。
- writeAsCsv("/path/to/file"):用于将计算结果以CSV的文件格式写出到指定目录。行和字段分隔符是可配置的。每个字段的值来自对象的toString()方法。使用的输出类是CsvOutputFormat。
- print()/printToErr():在标准输出/标准错误流上打印输出每个元素的toString()值。可选地,可以提供输出的前缀,这有助于区分不同的print调用。如果并行度大于1,输出也将以产生输出的任务的id作为前缀。print/printToErr是测试当中最常用的方式,用于将计算结果以标准输出流或错误输出流的方式打印到控制台上。
- writeUsingOutputFormat():自定义文件输出的基类和方法。支持自定义对象到字节的转换。在定义自定义格式时,需要继承自FileOutputFormat,它负责序列化和反序列化。上面介绍的 writeAsText 和 writeAsCsv 其底层调用的都是该方法。
- writeToSocket(host, port, SerializationSchema):用于将计算结果以指定的格式写出到指定的socket套接字。为了正确的序列化和格式化,需要定义SerializationSchema。
- addSink:调用自定义接收器函数。Flink与作为接收器函数实现的其他系统(如Apache Kafka)的连接器捆绑在一起。
注:以上方法中,以writeAs*开头的方法,在Flink API文档中,已经标识为"Deprecated",即弃用状态,在未来的版本中有可能被删除,因此使用时要慎重。原因请参见下一节。
【示例】分析流数据,并将分析结果写出到csv文件中。
下面我们通过示例来掌所致常用Flink Data Sink的用法。请按以下步骤执行。
1、在IntelliJ IDEA中创建一个Flink项目,使用flink-quickstart-scala/flink-quickstart-java项目模板。(Flink项目创建过程,请参见2.2节)
2、设置依赖。在pom.xml中添加如下依赖:
Scala Maven依赖:
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-scala_2.12</artifactId> <version>1.13.2</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_2.12</artifactId> <version>1.13.2</version> <provided</scope> </dependency>
Java Maven依赖:
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>1.13.2</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.12</artifactId> <version>1.13.2</version> <scope>provided</scope> </dependency>
3、创建流应用程序类。代码如下。
Scala代码:
import org.apache.flink.streaming.api.scala._ /** * Data Sink:writeAsCSV方法,将结果写入到csv文件 */ object DataSinkAsCSV { def main(args: Array[String]): Unit = { // 设置流执行环境 val env = StreamExecutionEnvironment.getExecutionEnvironment // 得到输入数据,进行转换: env.fromElements("Good good study", "Day day up") .map(_.toLowerCase) .flatMap(_.split("\\W+")) // 相当于先map,再flatten .map((_,1)) // 转换为元组 .writeAsCsv("result.csv") // 写出到结果文件中 .setParallelism(1) // 结果写到单个文件中 env.execute("Data Sink Demo") } }
Java代码:
import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector; /** * Data Sink:writeAsCSV方法,将结果写入到csv文件 */ public class DataSinkDemo1 { public static void main(String[] args) throws Exception { // 设置流执行环境 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 获得数据,执行map和flatMap转换 env.fromElements("Good good study","Day day up") .map(String::toLowerCase) .flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>() { @Override public void flatMap(String s, Collector<Tuple2<String,Integer>> out) throws Exception { for(String word : s.split("\\W+")){ out.collect(new Tuple2<>(word,1)); } } }) .writeAsCsv("result.csv") // 写出到指定的结果文件中 .setParallelism(1); // 写出到一个结果文件中 // 执行流程序 env.execute("Data Sink Demo"); } }
4、执行以上程序,可以看到,在项目的根目录下生成了一个结果文件result。查看输出的结果文件result,内容如下所示:
good,1 good,1 study,1 day,1 day,1 up,1