存储DataFrame
有时,我们需要将DataFrame中的数据写到外部存储系统中,例如,本地文件系统、HDFS或Amazon S3。在一个典型的ETL数据处理作业中,处理结果很可能需要被写到一些存储系统中,例如HDFS或Hive。
存储DataFrame的API
在Spark SQL中,org.apache.spark.sql.DataFrameWriter类负责将DataFrame中的数据写入外部存储系统。在DataFrame中有一个变量write,它实际上就是DataFrameWriter类的一个实例。与DataFrameWriter交互的模式与DataFrameReader的交互模式有点类似。
下面描述了与DataFrameWriter交互的常见模式:
movies.write
.format(...)
.mode(...)
.option(...)
.partitionBy(...)
.bucketBy(...)
.sortBy(...)
.save(path)
与DataFrameReader相似,可以使用json、orc和parquet文件存储格式,默认格式是parquet。需要注意的是,save函数的输入参数是目录名,不是文件名,它将数据直接保存到文件系统,如HDFS、Amazon S3、或者一个本地路径URL。
这些方法大多有相应的快捷方式,如df.write.csv()、df.write.json()、df.write.orc()、df.write.parquet()、df.write.jdbc()等。这些方法相当于先调用format()方法,再调用save()方法。
存储模式
DataFrameWriter类中的一个重要选项是save mode,它表示存储模式。在将DataFrame中的数据写出到存储系统上时,默认行为是创建一个新表。如果指定的输出目录或同名表已经存在的话,则抛出错误消息。可以使用Spark SQL的SaveMode特性来更改此行为。
下面列出了各种支持的存储模式(save mode):
- append:将DataFrame数据追加到已经存在于指定目标位置下的文件列表。
- overwrite:使用DataFrame数据完全覆盖已经存在于指定目标位置下的任何数据文件。
- error,errorIfExists:这是默认模式。如果指定的目标位置存在,那么DataFrameWriter将抛出一个错误。
- ignore:如果指定的目标位置存在,则简单地什么都不做。换句话说,不写出DataFrame中的数据。
这些保存模式不使用任何锁定,也不是原子性的。此外,在执行overwrite时,将在写入新数据之前删除数据。
示例1:将DataFrame存储到CSV文件
【示例】先读取json数据文件,然后进行简单计算,把结果DataFrame保存到csv存储文件中,最后再加载这个结果文件到RDD中。
val spark = SparkSession.builder()
.master("local[*]")
.appName("Spark Write Demo")
.getOrCreate()
// 读取json文件源,创建DataFrame
val file = "src/main/resources/people.json"
val df = spark.read.json(file)
// df.show()
// 找出age不是null的信息,保存到csv文件中
import spark.implicits._
val output = "tmp/people-csv-output"
// df.where($"age".isNotNull).write.format("csv").save(output)
df.where($"age".isNotNull).write.csv(output) // 等价上一句
// 将保存的csv数据再次加载到RDD中
val textFile = spark.sparkContext.textFile(output)
textFile.foreach(println)
注:write.format()支持输出 json、parquet、jdbc、orc、libsvm、csv、text等格式文件,如果要输出text文本文件,可以采用write.format("text")。但是,需要注意,只有select()中只存在一个列时,才允许保存成文本文件,如果存在两个列,比如select("name", "age"),就不能保存成文本文件。
示例2:将DataFrame存储到MySQL表中
Spark支持通过JDBC方式连接到其他数据库,并将DataFrame存储到数据库中。下面这个示例中,将分析结果DataFrame写出到MySQL数据库中。
【示例】将结果DataFrame保存到MySQL的数据表中。
首先,在MySQL中新建一个测试Spark程序的数据库,数据库名称是“spark”,表的名称是“student”。请登录MySQL数据库,执行以下SQL语句。
mysql> create database spark; mysql> use spark; mysql> create table student (id int(4), name char(20), gender char(4), age int(4)); mysql> insert into student values(1,'张三','F',23); mysql> insert into student values(2,'李四','M',18); mysql> select * from student;
然后,编写Spark应用程序连接MySQL数据库并且向MySQL写入数据。在这里,我们向spark.student表中插入两条记录。
// 首先导入依赖的包:
import java.util.Properties
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import org.apache.spark.sql.SaveMode
// 下面我们设置两条数据表示两个学生信息:
val studentRDD = spark.sparkContext
.parallelize(Array("3,王老五,F,44","4,赵小虎,M,27"))
.map(_.split(","))
// 下面创建Row 对象,每个Row 对象都是rowRDD 中的一行
// RDD[String] => RDD[Row]
val rowRDD = studentRDD.map(stu => Row(stu(0).toInt, stu(1).trim, stu(2).trim, stu(3).toInt))
// 下面要设置模式信息:
val schema = StructType(
Seq(
StructField("id", IntegerType, nullable = true),
StructField("name", StringType, nullable = true),
StructField("gender", StringType, nullable = true),
StructField("age", IntegerType, nullable = true)
)
)
// 建立起Row 对象和模式之间的对应关系,也就是把数据和模式对应起来
val studentDF = spark.createDataFrame(rowRDD, schema)
// studentDF.printSchema()
// studentDF.show()
// 下面创建一个prop 变量用来保存JDBC 连接参数
val prop = new Properties()
prop.put("user", "root") // 表示用户名是root
prop.put("password", "123456") // 表示密码是hadoop
prop.put("driver","com.mysql.jdbc.Driver") // 表示驱动程序是
//下面就可以连接数据库,采用append 模式,表示追加记录到数据库spark 的students表中
val DB_URL= "jdbc:mysql://localhost:3306/spark?useSSL=false" // 数据库名为cda
studentDF.write
.mode(SaveMode.Append) // .mode("append")
.jdbc(DB_URL, "student", prop)
注:上面的代码以MySQL 5为例。如果使用的是MySQL 8数据库,需要相应地修改URL连接及驱动程序名称。请参考4.5.8节修改