存储DataFrame

有时,我们需要将DataFrame中的数据写到外部存储系统中,例如,本地文件系统、HDFS或Amazon S3。在一个典型的ETL数据处理作业中,处理结果很可能需要被写到一些存储系统中,例如HDFS或Hive。

存储DataFrame的API

在Spark SQL中,org.apache.spark.sql.DataFrameWriter类负责将DataFrame中的数据写入外部存储系统。在DataFrame中有一个变量write,它实际上就是DataFrameWriter类的一个实例。与DataFrameWriter交互的模式与DataFrameReader的交互模式有点类似。

下面描述了与DataFrameWriter交互的常见模式:

movies.write
      .format(...)
      .mode(...)
      .option(...)
      .partitionBy(...)
      .bucketBy(...)
      .sortBy(...)
      .save(path)

与DataFrameReader相似,可以使用json、orc和parquet文件存储格式,默认格式是parquet。需要注意的是,save函数的输入参数是目录名,不是文件名,它将数据直接保存到文件系统,如HDFS、Amazon S3、或者一个本地路径URL。

这些方法大多有相应的快捷方式,如df.write.csv()、df.write.json()、df.write.orc()、df.write.parquet()、df.write.jdbc()等。这些方法相当于先调用format()方法,再调用save()方法。

存储模式

DataFrameWriter类中的一个重要选项是save mode,它表示存储模式。在将DataFrame中的数据写出到存储系统上时,默认行为是创建一个新表。如果指定的输出目录或同名表已经存在的话,则抛出错误消息。可以使用Spark SQL的SaveMode特性来更改此行为。

下面列出了各种支持的存储模式(save mode):

  • append:将DataFrame数据追加到已经存在于指定目标位置下的文件列表。
  • overwrite:使用DataFrame数据完全覆盖已经存在于指定目标位置下的任何数据文件。
  • error,errorIfExists:这是默认模式。如果指定的目标位置存在,那么DataFrameWriter将抛出一个错误。
  • ignore:如果指定的目标位置存在,则简单地什么都不做。换句话说,不写出DataFrame中的数据。

这些保存模式不使用任何锁定,也不是原子性的。此外,在执行overwrite时,将在写入新数据之前删除数据。

示例1:将DataFrame存储到CSV文件

【示例】先读取json数据文件,然后进行简单计算,把结果DataFrame保存到csv存储文件中,最后再加载这个结果文件到RDD中。

val spark = SparkSession.builder()
      .master("local[*]")
      .appName("Spark Write Demo")
      .getOrCreate()

// 读取json文件源,创建DataFrame
val file = "src/main/resources/people.json"
val df = spark.read.json(file)
// df.show()

// 找出age不是null的信息,保存到csv文件中
import spark.implicits._
val output = "tmp/people-csv-output"
// df.where($"age".isNotNull).write.format("csv").save(output)
df.where($"age".isNotNull).write.csv(output)    // 等价上一句

// 将保存的csv数据再次加载到RDD中
val textFile = spark.sparkContext.textFile(output)
textFile.foreach(println)

注:write.format()支持输出 json、parquet、jdbc、orc、libsvm、csv、text等格式文件,如果要输出text文本文件,可以采用write.format("text")。但是,需要注意,只有select()中只存在一个列时,才允许保存成文本文件,如果存在两个列,比如select("name", "age"),就不能保存成文本文件。

示例2:将DataFrame存储到MySQL表中

Spark支持通过JDBC方式连接到其他数据库,并将DataFrame存储到数据库中。下面这个示例中,将分析结果DataFrame写出到MySQL数据库中。

【示例】将结果DataFrame保存到MySQL的数据表中。

首先,在MySQL中新建一个测试Spark程序的数据库,数据库名称是“spark”,表的名称是“student”。请登录MySQL数据库,执行以下SQL语句。

mysql> create database spark;
mysql> use spark;
mysql> create table student (id int(4), name char(20), gender char(4), age int(4));
mysql> insert into student values(1,'张三','F',23);
mysql> insert into student values(2,'李四','M',18);
mysql> select * from student;

然后,编写Spark应用程序连接MySQL数据库并且向MySQL写入数据。在这里,我们向spark.student表中插入两条记录。

// 首先导入依赖的包:
import java.util.Properties
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import org.apache.spark.sql.SaveMode

// 下面我们设置两条数据表示两个学生信息:
val studentRDD = spark.sparkContext
      .parallelize(Array("3,王老五,F,44","4,赵小虎,M,27"))
      .map(_.split(","))

// 下面创建Row 对象,每个Row 对象都是rowRDD 中的一行
// RDD[String] => RDD[Row]
val rowRDD = studentRDD.map(stu => Row(stu(0).toInt, stu(1).trim, stu(2).trim, stu(3).toInt))

// 下面要设置模式信息:
val schema = StructType(
      Seq(
        StructField("id", IntegerType, nullable = true),
        StructField("name", StringType, nullable = true),
        StructField("gender", StringType, nullable = true),
        StructField("age", IntegerType, nullable = true)
      )
)

// 建立起Row 对象和模式之间的对应关系,也就是把数据和模式对应起来
val studentDF = spark.createDataFrame(rowRDD, schema)
// studentDF.printSchema()
// studentDF.show()

// 下面创建一个prop 变量用来保存JDBC 连接参数
val prop = new Properties()
prop.put("user", "root")        				// 表示用户名是root
prop.put("password", "123456")   				// 表示密码是hadoop
prop.put("driver","com.mysql.jdbc.Driver") 			// 表示驱动程序是

//下面就可以连接数据库,采用append 模式,表示追加记录到数据库spark 的students表中
val DB_URL= "jdbc:mysql://localhost:3306/spark?useSSL=false"  	// 数据库名为cda
studentDF.write
      .mode(SaveMode.Append)				// .mode("append")
      .jdbc(DB_URL, "student", prop)

注:上面的代码以MySQL 5为例。如果使用的是MySQL 8数据库,需要相应地修改URL连接及驱动程序名称。请参考4.5.8节修改


《Flink原理深入与编程实战》