构造DataFrame-加载外部数据源

Spark提供了一个接口,DataFrameReader,用来从众多的数据源读取数据到DataFrame,以各种格式,如JSON、CSV、Parquet、Text、Avro、ORC等。

同样,要将DataFrame以特定格式写回数据源,Spark使用DataFrameWriter。

加载存储系统中的文件到DataFrame,可通过SparkSession的read字段,它是DataFrameReader的一个实例。它有个load()方法,可直接从配置的数据源加载数据。另外它还有五个快捷方法:text、csv、json、orc和parquet,相当于先调用format()方法再调用load()方法。

读取文本文件创建DataFrame

文本文件是最常见的数据存储文件。Spark DataFrame API允许开发者将文本文件的内容转换成DataFrame。

让我们仔细看看下面的例子,以便更好地理解(这里我们使用Spark自带的数据文件):

val file = "/data/spark_demo/resources/people.txt"
val txtDF = spark.read.format("text").load(file)		// 加载文本文件
// val txtDF = spark.read.text(file)	// 等价上一句,快捷方法

txtDF.printSchema				// 打印schema
txtDF.show					// 输出

输出内容如下所示:

root
 |-- value: string (nullable = true)

+-----------+
|      value|
+-----------+
|Michael, 29|
|   Andy, 30|
| Justin, 19|
+-----------+

Spark会自动推断出模式,并相应地创建一个单列的DataFrame。因此,没有必要为文本数据定义模式。不过,当加载大数据文件时,定义一个schema要比让Spark来进行推断效率更高。

读取CSV文件创建DataFrame

在Spark 2.x中,加载CSV文件是非常简单的。请看下面的示例。

// 数据源文件
val file = "./src/main/resources/people.csv"

val peopleDF = spark.read.format("csv")
      .option("sep", ";")             		// 字段使用;分隔符
      .option("inferSchema", "true")  		// 指定自动推断模式
      .option("samplingRatio", 0.001)  		// 根据抽样进行模式推断
      .option("header", "true")       		// 说明有标题行
      .load(file)

 // 也可使用快捷方法
/*
val peopleDF = spark.read
      .option("sep", ";")             		// 字段使用;分隔符
      .option("inferSchema", "true")  		// 指定模式自动推断
      .option("samplingRatio", 0.001)  		// 根据抽样进行模式推断
      .option("header", "true")       		// 说明有标题行
      .csv(file)
*/

peopleDF.printSchema				// 打印schema
peopleDF.show()					// 显示

输出结果如下所示:

root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- job: string (nullable = true)

+-----+---+---------+
| name|age|  job|
+-----+---+---------+
|Jorge| 30|Developer|
|  Bob| 32|Developer|
+-----+---+---------+

在上面的示例中,使用了模式推断。对于大型的数据源,指定一个schema要比让Spark来进行推断效率更高。

在下面的代码中,我们提供一个schema:

import org.apache.spark.sql.types._

// 数据源文件
val file = "./src/main/resources/people.csv"

// 构造schema
val fields = Seq(
      StructField("p_name", StringType, nullable = true),
      StructField("p_age", LongType, nullable = true),
      StructField("p_job", StringType, nullable = true)
    )
val schema = StructType(fields)

// 读取数据源,创建DataFrame
val peopleDF = spark.read
      .option("sep", ";")             			// 字段使用;分隔符
      .option("header", "true")       			// 说明有标题行
      .schema(schema)                 		// 指定使用的schema
      .csv(file)

peopleDF.printSchema()					// 打印schema
peopleDF.show()						// 显示

输出结果如下所示:

root
 |-- p_name: string (nullable = true)
 |-- p_age: long (nullable = true)
 |-- p_job: string (nullable = true)

+------+-----+---------+
|p_name|p_age|    p_job|
+------+-----+---------+
| Jorge|   30|Developer|
|   Bob|   32|Developer|
+------+-----+---------+

可以看出,它返回了由行和命名列组成的DataFrame,该DataFrame具有模式中指定的类型。

读取JSON文件创建DataFrame

Spark SQL可以自动推断JSON Dataset的模式,并将其加载为Dataset[Row]。这种转换可以在Dataset[String]或JSON文件上使用SparkSession.read.json()完成。

注意,作为json文件提供的文件实际上并不是典型的JSON文件。每一行必须包含一个单独的、自包含的有效JSON对象。对于常规的多行JSON文件,将multiLine选项设置为true。

读取json数据源文件时,Spark会自动从key中自动推断模式,并相应地创建一个DataFrame。因此,没有必要为JSON数据定义模式。此外,Spark极大地简化了访问复杂JSON数据结构中的字段所需的查询语法。

请看下面的示例。

// 数据源文件
// JSON数据集路径可以是单个文件,也可以是存储文件的目录
val file = "./src/main/resources/people.json"

// 读取数据源,创建DataFrame
val df = spark.read.json(file)		// json解析;列名和数据类型隐式地推断

// schema
df.printSchema()

// 显示
df.show()

执行以上代码,输出结果如下:

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

当然,也可以明确指定一个schema,覆盖Spark的推断schema。如下面的代码所示:

import org.apache.spark.sql.types._

// 数据源文件
// JSON数据集路径可以是单个文件,也可以是存储文件的目录
val file = "./src/main/resources/people.json"

// 创建schema。字段名称要与json对象的key名称保持一致
val fields = Seq(
      StructField("name",StringType,nullable = true),
      StructField("age",IntegerType,nullable = true)
    )

// 读取数据源,创建DataFrame,使用自定义的schema
val df = spark.read.schema(StructType(fields)).json(file)

// schema
df.printSchema()

// 显示
df.show()

执行上面的代码,输出结果如下所示:

root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)

+-------+----+
|   name| age|
+-------+----+
|Michael|null|
|   Andy|  30|
| Justin|  19|
+-------+----+

读取Parquet文件创建DataFrame

Apache Parquet文件是Spark SQL中直接支持的一种常见格式,它们非常节省空间,非常流行。Apache Parquet是一种高效的、压缩的、面向列的开源数据存储格式。它提供了多种存储优化,允许读取单独的列而非整个文件,这不仅节省了存储空间而且提升了读取效率。它是 Spark 默认的文件格式,支持非常有效的压缩和编码方案,也可用于Hadoop生态系统中的任何项目,可以大大提高这类应用程序的性能。

在下面的示例中,先读取Parquet文件内容到DataFrame中,然后打印其schema并输出数据:

// 读取Parquet文件
val parquetFile = "./src/main/resources/users.parquet"

// Parquet是默认的格式,因此当读取时我们不需要指定格式
val usersDF = spark.read.load(parquetFile)
// 如果我们想要更加明确,我们可以指定parquet函数
// val usersDF = spark.read.parquet(parquetFile)

usersDF.printSchema()
usersDF.show()

执行以上代码,输出结果如下所示:

root
 |-- name: string (nullable = true)
 |-- favorite_color: string (nullable = true)
 |-- favorite_numbers: array (nullable = true)
 |    |-- element: integer (containsNull = true)

+------+--------------+----------------+
|  name|favorite_color|favorite_numbers|
+------+--------------+----------------+
|Alyssa|          null|  [3, 9, 15, 20]|
|   Ben|         red|            []|
+------+--------------+----------------+

使用JDBC从数据库创建DataFrame

Spark SQL还包括一个可以使用JDBC从其他关系型数据库读取数据的数据源。开发人员可以使用JDBC创建来自其他数据库的DataFrame,只要确保预定数据库的JDBC驱动程序是可访问的(需要在spark类路径中包含特定数据库的JDBC驱动程序)。

在下面的示例中,我们通过JDBC读取MySQL数据库中的一个peoples数据表,并创建DataFrame。首先,在MySQL中执行如下脚本,创建一外名为xueai8的数据和一个名为peoples的数据表,并向表中插入一些样本记录。

mysql> create database xueai8;
mysql> use xueai8;
mysql> create table peoples(id int not null primary key, name varchar(20), age int);
mysql> insert into peoples values(1,"张三",23),(2,"李四",18),(3,"王老五",35);
mysql> select * from peoples;

然后编写如下的代码,来读取peoples表中的数据到DataFrame中。

val DB_URL= "jdbc:mysql://localhost:3306/xueai8" +			// 数据库名为xueai8
      "?useSSL=false&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true"       

val peoplesDF = spark.read.format("jdbc")
      .option("driver", "com.mysql.cj.jdbc.Driver") 	    			// 数据库驱动程序类名
      .option("url", DB_URL)					              // 连接url
      .option("dbtable", "peoples")				         		// 要读取的表
      .option("user", "root")					             	// 连接账户
      .option("password","123456")				       		// 连接密码
      .load()

peoplesDF.printSchema()
peoplesDF.show()

执行以上代码,输出结果如下所示:

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)

+---+------+---+
| id|  name|age|
+---+------+---+
|  1|  张三| 23|
|  2|  李四| 18|
|  3|王老五| 35|
+---+------+---+

也可以将JDBC参数放到一个Map集合中,做为options的参数传入。代码如下:

val DB_URL= "jdbc:mysql://localhost:3306/cdadb" +       // 数据库名为cdadb
      "?useSSL=false&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true"

val jdbcMap = Map(
      "url" -> DB_URL,                	// jdbc url
      "dbtable" -> "peoples",         	// 要读取的数据表
      "user" -> "root",               	// mysql账号
      "password" -> "123456"           	// mysql密码
    )

// 读取JDBC数据源,创建DataFrame
val peoplesDF = spark.read.format("jdbc").options(jdbcMap).load()

peoplesDF.printSchema()
peoplesDF.show()

也可使用jdbc()快捷方法从关系型数据库中加载数据。例如,上面的示例可以改写为如下的代码:

val DB_URL= "jdbc:mysql://localhost:3306/cdadb" +       // 数据库名为cdadb
      "?useSSL=false&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true"

// 创建一个Properties()对象来保存参数
import java.util.Properties

val connectionProperties = new Properties()

connectionProperties.put("user", "root")        		// 账号
connectionProperties.put("password", "123456")   		// 密码

val driverClass = "com.mysql.cj.jdbc.Driver"    			// mysql 8 驱动
// val driverClass = "com.mysql.jdbc.Driver"     			// mysql 5 驱动
connectionProperties.put("Driver", driverClass)
// connectionProperties.setProperty("Driver", driverClass)	// 等价上一句

// 使用快捷方式
val peoplesDF = spark.read.jdbc(DB_URL, "peoples", connectionProperties)

peoplesDF.printSchema()
peoplesDF.show()

《Flink原理深入与编程实战》