存储DataFrame
有时,需要将DataFrame中的数据写到外部存储系统中,例如,在一个典型的ETL数据处理作业中,处理结果通常需要被写到一些存储系统中,如本地文件系统、HDFS、Hive或Amazon S3。下面学习如何存储这些DataFrame。
写出DataFrame
在PySpark SQL中,pyspark.sql.DataFrameWriter类负责将DataFrame中的数据写入外部存储系统。在DataFrame中有一个变量write,它实际上就是DataFrameWriter类的一个实例。与DataFrameWriter交互的模式与DataFrameReader的交互模式有点类似。
与DataFrameWriter交互的常见模式,代码如下:
movies.write \ # DataFrameWriter实例对象 .format(...) \ # 指定存储格式 .mode(...) \ # 指定写出模式:append或overwrite .option(...) \ # 指定选项 .partitionBy(...) \ # 指定分区列 .bucketBy(...) \ # 指定分桶 .sortBy(...) \ # 排序 .save(path) # 保存到指定路径(文件夹)
与DataFrameReader相似,可以使用json、orc和parquet文件存储格式,默认格式是parquet。需要注意的是,save()函数的输入参数是目录名,不是文件名,它将数据直接保存到文件系统,如HDFS、Amazon S3、或者一个本地路径URL。
这些方法大多有相应的快捷方式,如df.write.csv()、df.write.json()、df.write.orc()、df.write.parquet()、df.write.jdbc()等。这些方法相当于先调用format()方法,再调用save()方法。
在下面这个示例中,先读取PySpark自带的数据文件people.json,然后进行简单计算,并把结果DataFrame保存到CSV格式的存储文件中,最后再加载这个结果存储文件到RDD中,代码如下:
from pyspark.sql import SparkSession from pyspark.sql.functions import col # 构建SparkSession实例 spark = SparkSession.builder \ .master("spark://localhost:7077") \ .appName("pyspark sql demo") \ .getOrCreate() # 读取json文件源,创建DataFrame file = "/data/spark/resources/people.json" df = spark.read.json(file) df.printSchema df.show() # 保存目录位置 output = "/data/spark/people-csv-output" # 找出age不是null的信息,保存到csv文件中 df.where(col("age").isNotNull()).write.format("csv").save(output) # df.where(col("age").isNotNull()).write.csv(output) # 等价上一句 # 将保存的csv数据再次加载到RDD中 textFile = spark.sparkContext.textFile(output) for row in textFile.collect(): print(row)
执行以上代码,输出内容如下:
30,Andy 19,Justin
PySpark支持通过JDBC方式连接到其他数据库,并将DataFrame存储到数据库中。下面这个示例中,将分析结果DataFrame写出到MySQL数据库中。
首先,在MySQL中新建一个测试Spark程序的数据库,数据库名称是spark,数据表的名称是student。请登录MySQL数据库,执行以下SQL语句:
mysql> create database spark; mysql> use spark; mysql> create table student (id int(4), name char(20), gender char(4), age int(4)); mysql> insert into student values(1,'张三','F',23); mysql> insert into student values(2,'李四','M',18); mysql> select * from student;
然后,编写Spark应用程序,构造一个DataFrame,包含两行学生信息,代码如下:
from pyspark.sql import SparkSession from pyspark.sql.types import * from pyspark.sql.functions import col # 构建SparkSession实例 spark = SparkSession.builder \ .master("spark://localhost:7077") \ .appName("pyspark sql demo") \ .getOrCreate() # 设置两条数据表示两个学生信息 students = [ (3, "王老五", "F", 44), (4, "赵小虎", "M", 27) ] # 指定一个Schema(模式) fields = [ StructField("id", IntegerType(), True), StructField("name", StringType(), True), StructField("gender", StringType(), True), StructField("age", IntegerType(), True) ] schema = StructType(fields) # 将元组转为DataFrame stusDF = spark.createDataFrame(students, schema=schema) stusDF.printSchema() stusDF.show()
执行以上代码,输出内容如下:
root |-- id: integer (nullable = true) |-- name: string (nullable = true) |-- gender: string (nullable = true) |-- age: integer (nullable = true) +---+------+------+---+ | id| name|gender|age| +---+------+------+---+ | 3| 王老五| F| 44| | 4| 赵小虎| M| 27| +---+------+------+---+
接下来,编写PySpark代码,连接MySQL数据库并且将上面的studentDF写入MySQL保存。在本例中向spark.student表中插入两条记录,代码如下:
# 下面创建一个prop变量用来保存JDBC连接参数 props = { "driver": "org.mariadb.jdbc.Driver", "user": "root", "password": "admin" } # 下面就可以连接数据库,采用append模式,表示追加记录到数据库spark的student表中 url = "jdbc:mysql://localhost:3306/spark?useSSL=false" stuDF.write \ .mode("append") \ .jdbc(url=url, table="spark.students", properties=props)
执行上以代码,然后到MySQL中去查询spark.student表,SQL语句如下:
mysql> select * from spark.student;
可以看到如下的查询结果,证明数据已经正确地写入到了数据库中:
+------+-----------+--------+------+ | id | name | gender | age | +------+-----------+--------+------+ | 1 | 张三 | F | 23 | | 2 | 李四 | M | 18 | | 3 | 王老五 | 男 | 46 | | 4 | 赵小花 | 女 | 27 | +------+-----------+--------+------+
存储模式
DataFrameWriter类中的一个重要选项是save mode,它表示存储模式。在将DataFrame中的数据写出到存储系统上时,默认行为是创建一个新表。如果指定的输出目录或同名表已经存在的话,则抛出错误消息。可以使用PySpark SQL的SaveMode特性来更改此行为。
PySpark所支持的各种存储模式见表
存储模式(mode) | 说明 |
---|---|
"append" | 当保存一个DataFrame到数据源时,如果数据/表已经存在的话,该DataFrame的内容将被追加到已经存储的数据 |
"overwrite" | 当保存一个DataFrame到数据源时,如果数据/表已经存在的话,已经存在的数据将被该DataFrame的内容所覆盖 |
"error"或"errorIfExists"(默认) | 当保存一个DataFrame到数据源时,如果数据已经存在的话,将抛出一个异常 |
"ignore" | 当保存一个DataFrame到数据源时,如果数据/表已经存在的话,save操作不会保存该DataFrame的内容,并且不会改变已经存在的数据。这类似于SQL中的create table if not exists |
例如,将数据以CSV格式写出,但使用‘#’作为分隔符,代码如下:
movies.write \ .format("csv") \ .option("sep", "#") \ .save("/tmp/output/csv")
如果使用overwrite模式写出数据,并同时写出标题行,则代码如下:
movies.write \ .format("csv") \ .mode("overwrite") \ .option("sep", "#") \ .option("header", "true") \ .save("/tmp/output/csv")