存储DataFrame
有时,我们需要将DataFrame中的数据写到外部存储系统中,例如,本地文件系统、HDFS或Amazon S3。在一个典型的ETL数据处理作业中,处理结果很可能需要被写到一些存储系统中,例如HDFS或Hive。
存储DataFrame的API
在Spark SQL中,org.apache.spark.sql.DataFrameWriter类负责将DataFrame中的数据写入外部存储系统。在DataFrame中有一个变量write,它实际上就是DataFrameWriter类的一个实例。与DataFrameWriter交互的模式与DataFrameReader的交互模式有点类似。
下面描述了与DataFrameWriter交互的常见模式:
movies.write .format(...) .mode(...) .option(...) .partitionBy(...) .bucketBy(...) .sortBy(...) .save(path)
与DataFrameReader相似,可以使用json、orc和parquet文件存储格式,默认格式是parquet。需要注意的是,save函数的输入参数是目录名,不是文件名,它将数据直接保存到文件系统,如HDFS、Amazon S3、或者一个本地路径URL。
这些方法大多有相应的快捷方式,如df.write.csv()、df.write.json()、df.write.orc()、df.write.parquet()、df.write.jdbc()等。这些方法相当于先调用format()方法,再调用save()方法。
存储模式
DataFrameWriter类中的一个重要选项是save mode,它表示存储模式。在将DataFrame中的数据写出到存储系统上时,默认行为是创建一个新表。如果指定的输出目录或同名表已经存在的话,则抛出错误消息。可以使用Spark SQL的SaveMode特性来更改此行为。
下面列出了各种支持的存储模式(save mode):
- append:将DataFrame数据追加到已经存在于指定目标位置下的文件列表。
- overwrite:使用DataFrame数据完全覆盖已经存在于指定目标位置下的任何数据文件。
- error,errorIfExists:这是默认模式。如果指定的目标位置存在,那么DataFrameWriter将抛出一个错误。
- ignore:如果指定的目标位置存在,则简单地什么都不做。换句话说,不写出DataFrame中的数据。
这些保存模式不使用任何锁定,也不是原子性的。此外,在执行overwrite时,将在写入新数据之前删除数据。
示例1:将DataFrame存储到CSV文件
【示例】先读取json数据文件,然后进行简单计算,把结果DataFrame保存到csv存储文件中,最后再加载这个结果文件到RDD中。
val spark = SparkSession.builder() .master("local[*]") .appName("Spark Write Demo") .getOrCreate() // 读取json文件源,创建DataFrame val file = "src/main/resources/people.json" val df = spark.read.json(file) // df.show() // 找出age不是null的信息,保存到csv文件中 import spark.implicits._ val output = "tmp/people-csv-output" // df.where($"age".isNotNull).write.format("csv").save(output) df.where($"age".isNotNull).write.csv(output) // 等价上一句 // 将保存的csv数据再次加载到RDD中 val textFile = spark.sparkContext.textFile(output) textFile.foreach(println)
注:write.format()支持输出 json、parquet、jdbc、orc、libsvm、csv、text等格式文件,如果要输出text文本文件,可以采用write.format("text")。但是,需要注意,只有select()中只存在一个列时,才允许保存成文本文件,如果存在两个列,比如select("name", "age"),就不能保存成文本文件。
示例2:将DataFrame存储到MySQL表中
Spark支持通过JDBC方式连接到其他数据库,并将DataFrame存储到数据库中。下面这个示例中,将分析结果DataFrame写出到MySQL数据库中。
【示例】将结果DataFrame保存到MySQL的数据表中。
首先,在MySQL中新建一个测试Spark程序的数据库,数据库名称是“spark”,表的名称是“student”。请登录MySQL数据库,执行以下SQL语句。
mysql> create database spark; mysql> use spark; mysql> create table student (id int(4), name char(20), gender char(4), age int(4)); mysql> insert into student values(1,'张三','F',23); mysql> insert into student values(2,'李四','M',18); mysql> select * from student;
然后,编写Spark应用程序连接MySQL数据库并且向MySQL写入数据。在这里,我们向spark.student表中插入两条记录。
// 首先导入依赖的包: import java.util.Properties import org.apache.spark.sql.types._ import org.apache.spark.sql.Row import org.apache.spark.sql.SaveMode // 下面我们设置两条数据表示两个学生信息: val studentRDD = spark.sparkContext .parallelize(Array("3,王老五,F,44","4,赵小虎,M,27")) .map(_.split(",")) // 下面创建Row 对象,每个Row 对象都是rowRDD 中的一行 // RDD[String] => RDD[Row] val rowRDD = studentRDD.map(stu => Row(stu(0).toInt, stu(1).trim, stu(2).trim, stu(3).toInt)) // 下面要设置模式信息: val schema = StructType( Seq( StructField("id", IntegerType, nullable = true), StructField("name", StringType, nullable = true), StructField("gender", StringType, nullable = true), StructField("age", IntegerType, nullable = true) ) ) // 建立起Row 对象和模式之间的对应关系,也就是把数据和模式对应起来 val studentDF = spark.createDataFrame(rowRDD, schema) // studentDF.printSchema() // studentDF.show() // 下面创建一个prop 变量用来保存JDBC 连接参数 val prop = new Properties() prop.put("user", "root") // 表示用户名是root prop.put("password", "123456") // 表示密码是hadoop prop.put("driver","com.mysql.jdbc.Driver") // 表示驱动程序是 //下面就可以连接数据库,采用append 模式,表示追加记录到数据库spark 的students表中 val DB_URL= "jdbc:mysql://localhost:3306/spark?useSSL=false" // 数据库名为cda studentDF.write .mode(SaveMode.Append) // .mode("append") .jdbc(DB_URL, "student", prop)
注:上面的代码以MySQL 5为例。如果使用的是MySQL 8数据库,需要相应地修改URL连接及驱动程序名称。请参考4.5.8节修改