构造DataFrame-加载外部数据源
Spark提供了一个接口,DataFrameReader,用来从众多的数据源读取数据到DataFrame,以各种格式,如JSON、CSV、Parquet、Text、Avro、ORC等。
同样,要将DataFrame以特定格式写回数据源,Spark使用DataFrameWriter。
加载存储系统中的文件到DataFrame,可通过SparkSession的read字段,它是DataFrameReader的一个实例。它有个load()方法,可直接从配置的数据源加载数据。另外它还有五个快捷方法:text、csv、json、orc和parquet,相当于先调用format()方法再调用load()方法。
读取文本文件创建DataFrame
文本文件是最常见的数据存储文件。Spark DataFrame API允许开发者将文本文件的内容转换成DataFrame。
让我们仔细看看下面的例子,以便更好地理解(这里我们使用Spark自带的数据文件):
val file = "/data/spark_demo/resources/people.txt" val txtDF = spark.read.format("text").load(file) // 加载文本文件 // val txtDF = spark.read.text(file) // 等价上一句,快捷方法 txtDF.printSchema // 打印schema txtDF.show // 输出
输出内容如下所示:
root |-- value: string (nullable = true) +-----------+ | value| +-----------+ |Michael, 29| | Andy, 30| | Justin, 19| +-----------+
Spark会自动推断出模式,并相应地创建一个单列的DataFrame。因此,没有必要为文本数据定义模式。不过,当加载大数据文件时,定义一个schema要比让Spark来进行推断效率更高。
读取CSV文件创建DataFrame
在Spark 2.x中,加载CSV文件是非常简单的。请看下面的示例。
// 数据源文件 val file = "./src/main/resources/people.csv" val peopleDF = spark.read.format("csv") .option("sep", ";") // 字段使用;分隔符 .option("inferSchema", "true") // 指定自动推断模式 .option("samplingRatio", 0.001) // 根据抽样进行模式推断 .option("header", "true") // 说明有标题行 .load(file) // 也可使用快捷方法 /* val peopleDF = spark.read .option("sep", ";") // 字段使用;分隔符 .option("inferSchema", "true") // 指定模式自动推断 .option("samplingRatio", 0.001) // 根据抽样进行模式推断 .option("header", "true") // 说明有标题行 .csv(file) */ peopleDF.printSchema // 打印schema peopleDF.show() // 显示
输出结果如下所示:
root |-- name: string (nullable = true) |-- age: integer (nullable = true) |-- job: string (nullable = true) +-----+---+---------+ | name|age| job| +-----+---+---------+ |Jorge| 30|Developer| | Bob| 32|Developer| +-----+---+---------+
在上面的示例中,使用了模式推断。对于大型的数据源,指定一个schema要比让Spark来进行推断效率更高。
在下面的代码中,我们提供一个schema:
import org.apache.spark.sql.types._ // 数据源文件 val file = "./src/main/resources/people.csv" // 构造schema val fields = Seq( StructField("p_name", StringType, nullable = true), StructField("p_age", LongType, nullable = true), StructField("p_job", StringType, nullable = true) ) val schema = StructType(fields) // 读取数据源,创建DataFrame val peopleDF = spark.read .option("sep", ";") // 字段使用;分隔符 .option("header", "true") // 说明有标题行 .schema(schema) // 指定使用的schema .csv(file) peopleDF.printSchema() // 打印schema peopleDF.show() // 显示
输出结果如下所示:
root |-- p_name: string (nullable = true) |-- p_age: long (nullable = true) |-- p_job: string (nullable = true) +------+-----+---------+ |p_name|p_age| p_job| +------+-----+---------+ | Jorge| 30|Developer| | Bob| 32|Developer| +------+-----+---------+
可以看出,它返回了由行和命名列组成的DataFrame,该DataFrame具有模式中指定的类型。
读取JSON文件创建DataFrame
Spark SQL可以自动推断JSON Dataset的模式,并将其加载为Dataset[Row]。这种转换可以在Dataset[String]或JSON文件上使用SparkSession.read.json()完成。
注意,作为json文件提供的文件实际上并不是典型的JSON文件。每一行必须包含一个单独的、自包含的有效JSON对象。对于常规的多行JSON文件,将multiLine选项设置为true。
读取json数据源文件时,Spark会自动从key中自动推断模式,并相应地创建一个DataFrame。因此,没有必要为JSON数据定义模式。此外,Spark极大地简化了访问复杂JSON数据结构中的字段所需的查询语法。
请看下面的示例。
// 数据源文件 // JSON数据集路径可以是单个文件,也可以是存储文件的目录 val file = "./src/main/resources/people.json" // 读取数据源,创建DataFrame val df = spark.read.json(file) // json解析;列名和数据类型隐式地推断 // schema df.printSchema() // 显示 df.show()
执行以上代码,输出结果如下:
root |-- age: long (nullable = true) |-- name: string (nullable = true) +----+-------+ | age| name| +----+-------+ |null|Michael| | 30| Andy| | 19| Justin| +----+-------+
当然,也可以明确指定一个schema,覆盖Spark的推断schema。如下面的代码所示:
import org.apache.spark.sql.types._ // 数据源文件 // JSON数据集路径可以是单个文件,也可以是存储文件的目录 val file = "./src/main/resources/people.json" // 创建schema。字段名称要与json对象的key名称保持一致 val fields = Seq( StructField("name",StringType,nullable = true), StructField("age",IntegerType,nullable = true) ) // 读取数据源,创建DataFrame,使用自定义的schema val df = spark.read.schema(StructType(fields)).json(file) // schema df.printSchema() // 显示 df.show()
执行上面的代码,输出结果如下所示:
root |-- name: string (nullable = true) |-- age: integer (nullable = true) +-------+----+ | name| age| +-------+----+ |Michael|null| | Andy| 30| | Justin| 19| +-------+----+
读取Parquet文件创建DataFrame
Apache Parquet文件是Spark SQL中直接支持的一种常见格式,它们非常节省空间,非常流行。Apache Parquet是一种高效的、压缩的、面向列的开源数据存储格式。它提供了多种存储优化,允许读取单独的列而非整个文件,这不仅节省了存储空间而且提升了读取效率。它是 Spark 默认的文件格式,支持非常有效的压缩和编码方案,也可用于Hadoop生态系统中的任何项目,可以大大提高这类应用程序的性能。
在下面的示例中,先读取Parquet文件内容到DataFrame中,然后打印其schema并输出数据:
// 读取Parquet文件 val parquetFile = "./src/main/resources/users.parquet" // Parquet是默认的格式,因此当读取时我们不需要指定格式 val usersDF = spark.read.load(parquetFile) // 如果我们想要更加明确,我们可以指定parquet函数 // val usersDF = spark.read.parquet(parquetFile) usersDF.printSchema() usersDF.show()
执行以上代码,输出结果如下所示:
root |-- name: string (nullable = true) |-- favorite_color: string (nullable = true) |-- favorite_numbers: array (nullable = true) | |-- element: integer (containsNull = true) +------+--------------+----------------+ | name|favorite_color|favorite_numbers| +------+--------------+----------------+ |Alyssa| null| [3, 9, 15, 20]| | Ben| red| []| +------+--------------+----------------+
使用JDBC从数据库创建DataFrame
Spark SQL还包括一个可以使用JDBC从其他关系型数据库读取数据的数据源。开发人员可以使用JDBC创建来自其他数据库的DataFrame,只要确保预定数据库的JDBC驱动程序是可访问的(需要在spark类路径中包含特定数据库的JDBC驱动程序)。
在下面的示例中,我们通过JDBC读取MySQL数据库中的一个peoples数据表,并创建DataFrame。首先,在MySQL中执行如下脚本,创建一外名为xueai8的数据和一个名为peoples的数据表,并向表中插入一些样本记录。
mysql> create database xueai8; mysql> use xueai8; mysql> create table peoples(id int not null primary key, name varchar(20), age int); mysql> insert into peoples values(1,"张三",23),(2,"李四",18),(3,"王老五",35); mysql> select * from peoples;
然后编写如下的代码,来读取peoples表中的数据到DataFrame中。
val DB_URL= "jdbc:mysql://localhost:3306/xueai8" + // 数据库名为xueai8 "?useSSL=false&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true" val peoplesDF = spark.read.format("jdbc") .option("driver", "com.mysql.cj.jdbc.Driver") // 数据库驱动程序类名 .option("url", DB_URL) // 连接url .option("dbtable", "peoples") // 要读取的表 .option("user", "root") // 连接账户 .option("password","123456") // 连接密码 .load() peoplesDF.printSchema() peoplesDF.show()
执行以上代码,输出结果如下所示:
root |-- id: integer (nullable = true) |-- name: string (nullable = true) |-- age: integer (nullable = true) +---+------+---+ | id| name|age| +---+------+---+ | 1| 张三| 23| | 2| 李四| 18| | 3|王老五| 35| +---+------+---+
也可以将JDBC参数放到一个Map集合中,做为options的参数传入。代码如下:
val DB_URL= "jdbc:mysql://localhost:3306/cdadb" + // 数据库名为cdadb "?useSSL=false&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true" val jdbcMap = Map( "url" -> DB_URL, // jdbc url "dbtable" -> "peoples", // 要读取的数据表 "user" -> "root", // mysql账号 "password" -> "123456" // mysql密码 ) // 读取JDBC数据源,创建DataFrame val peoplesDF = spark.read.format("jdbc").options(jdbcMap).load() peoplesDF.printSchema() peoplesDF.show()
也可使用jdbc()快捷方法从关系型数据库中加载数据。例如,上面的示例可以改写为如下的代码:
val DB_URL= "jdbc:mysql://localhost:3306/cdadb" + // 数据库名为cdadb "?useSSL=false&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true" // 创建一个Properties()对象来保存参数 import java.util.Properties val connectionProperties = new Properties() connectionProperties.put("user", "root") // 账号 connectionProperties.put("password", "123456") // 密码 val driverClass = "com.mysql.cj.jdbc.Driver" // mysql 8 驱动 // val driverClass = "com.mysql.jdbc.Driver" // mysql 5 驱动 connectionProperties.put("Driver", driverClass) // connectionProperties.setProperty("Driver", driverClass) // 等价上一句 // 使用快捷方式 val peoplesDF = spark.read.jdbc(DB_URL, "peoples", connectionProperties) peoplesDF.printSchema() peoplesDF.show()