存储DataFrame

有时,需要将DataFrame中的数据写到外部存储系统中,例如,在一个典型的ETL数据处理作业中,处理结果通常需要被写到一些存储系统中,如本地文件系统、HDFS、Hive或Amazon S3。下面学习如何存储这些DataFrame。

写出DataFrame

在PySpark SQL中,pyspark.sql.DataFrameWriter类负责将DataFrame中的数据写入外部存储系统。在DataFrame中有一个变量write,它实际上就是DataFrameWriter类的一个实例。与DataFrameWriter交互的模式与DataFrameReader的交互模式有点类似。

与DataFrameWriter交互的常见模式,代码如下:

movies.write 	\		# DataFrameWriter实例对象
      .format(...) \		# 指定存储格式
      .mode(...) \		# 指定写出模式:append或overwrite
      .option(...) \		# 指定选项
      .partitionBy(...) \	# 指定分区列
      .bucketBy(...) \		# 指定分桶
      .sortBy(...) \		# 排序
      .save(path)		# 保存到指定路径(文件夹)

与DataFrameReader相似,可以使用json、orc和parquet文件存储格式,默认格式是parquet。需要注意的是,save()函数的输入参数是目录名,不是文件名,它将数据直接保存到文件系统,如HDFS、Amazon S3、或者一个本地路径URL。

这些方法大多有相应的快捷方式,如df.write.csv()、df.write.json()、df.write.orc()、df.write.parquet()、df.write.jdbc()等。这些方法相当于先调用format()方法,再调用save()方法。

在下面这个示例中,先读取PySpark自带的数据文件people.json,然后进行简单计算,并把结果DataFrame保存到CSV格式的存储文件中,最后再加载这个结果存储文件到RDD中,代码如下:

from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# 构建SparkSession实例
spark = SparkSession.builder \
   .master("spark://localhost:7077") \
   .appName("pyspark sql demo") \
   .getOrCreate()

# 读取json文件源,创建DataFrame
file = "/data/spark/resources/people.json"
df = spark.read.json(file)

df.printSchema
df.show()

# 保存目录位置
output = "/data/spark/people-csv-output"
# 找出age不是null的信息,保存到csv文件中
df.where(col("age").isNotNull()).write.format("csv").save(output)
# df.where(col("age").isNotNull()).write.csv(output)  # 等价上一句

# 将保存的csv数据再次加载到RDD中
textFile = spark.sparkContext.textFile(output)
for row in textFile.collect():
    print(row)

执行以上代码,输出内容如下:

30,Andy
19,Justin

PySpark支持通过JDBC方式连接到其他数据库,并将DataFrame存储到数据库中。下面这个示例中,将分析结果DataFrame写出到MySQL数据库中。

首先,在MySQL中新建一个测试Spark程序的数据库,数据库名称是spark,数据表的名称是student。请登录MySQL数据库,执行以下SQL语句:

mysql> create database spark;
mysql> use spark;
mysql> create table student (id int(4), name char(20), gender char(4), age int(4));
mysql> insert into student values(1,'张三','F',23);
mysql> insert into student values(2,'李四','M',18);
mysql> select * from student;

然后,编写Spark应用程序,构造一个DataFrame,包含两行学生信息,代码如下:

from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import col

# 构建SparkSession实例
spark = SparkSession.builder \
   .master("spark://localhost:7077") \
   .appName("pyspark sql demo") \
   .getOrCreate()

# 设置两条数据表示两个学生信息
students = [
    (3, "王老五", "F", 44),
    (4, "赵小虎", "M", 27)
]

# 指定一个Schema(模式)
fields = [
    StructField("id", IntegerType(), True), 
    StructField("name", StringType(), True),
    StructField("gender", StringType(), True),
    StructField("age", IntegerType(), True)
] 
schema = StructType(fields) 

# 将元组转为DataFrame                
stusDF = spark.createDataFrame(students, schema=schema)

stusDF.printSchema()
stusDF.show()

执行以上代码,输出内容如下:

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- age: integer (nullable = true)

+---+------+------+---+
| id|  name|gender|age|
+---+------+------+---+
|  3| 王老五|    F| 44|
|  4| 赵小虎|    M| 27|
+---+------+------+---+

接下来,编写PySpark代码,连接MySQL数据库并且将上面的studentDF写入MySQL保存。在本例中向spark.student表中插入两条记录,代码如下:

# 下面创建一个prop变量用来保存JDBC连接参数
props = {
    "driver": "org.mariadb.jdbc.Driver",
    "user": "root",
    "password": "admin"
}
 
# 下面就可以连接数据库,采用append模式,表示追加记录到数据库spark的student表中
url = "jdbc:mysql://localhost:3306/spark?useSSL=false"
stuDF.write \
     .mode("append") \
     .jdbc(url=url, table="spark.students", properties=props)

执行上以代码,然后到MySQL中去查询spark.student表,SQL语句如下:

mysql> select * from  spark.student;

可以看到如下的查询结果,证明数据已经正确地写入到了数据库中:

+------+-----------+--------+------+
|  id   |  name    | gender | age  |
+------+-----------+--------+------+
|     1 | 张三     | F      |   23 |
|     2 | 李四     | M      |   18 |
|     3 | 王老五   | 男     |   46 |
|     4 | 赵小花   | 女     |   27 |
+------+-----------+--------+------+

存储模式

DataFrameWriter类中的一个重要选项是save mode,它表示存储模式。在将DataFrame中的数据写出到存储系统上时,默认行为是创建一个新表。如果指定的输出目录或同名表已经存在的话,则抛出错误消息。可以使用PySpark SQL的SaveMode特性来更改此行为。

PySpark所支持的各种存储模式见表

存储模式(mode) 说明
"append" 当保存一个DataFrame到数据源时,如果数据/表已经存在的话,该DataFrame的内容将被追加到已经存储的数据
"overwrite" 当保存一个DataFrame到数据源时,如果数据/表已经存在的话,已经存在的数据将被该DataFrame的内容所覆盖
"error"或"errorIfExists"(默认) 当保存一个DataFrame到数据源时,如果数据已经存在的话,将抛出一个异常
"ignore" 当保存一个DataFrame到数据源时,如果数据/表已经存在的话,save操作不会保存该DataFrame的内容,并且不会改变已经存在的数据。这类似于SQL中的create table if not exists

例如,将数据以CSV格式写出,但使用‘#’作为分隔符,代码如下:

movies.write \
      .format("csv") \
      .option("sep", "#") \
      .save("/tmp/output/csv")

如果使用overwrite模式写出数据,并同时写出标题行,则代码如下:

movies.write \
      .format("csv") \
      .mode("overwrite") \
      .option("sep", "#") \
      .option("header", "true") \
      .save("/tmp/output/csv")

《PySpark原理深入与编程实战》