构造DataFrame-从RDD创建

从RDD创建DataFrame有三种方式:

  • 使用包含Row数据(以元组的形式)的RDD
  • 使用case类
  • 明确指定一个模式(schema)

从RDD创建DataFrame有多种方式,但是这些方法都必须提供一个schema。要么显式地提供,要么隐式地提供。

使用包含Row数据(以元组的形式)的RDD创建DataFrame

下面的示例中,调用RDD的toDF显式函数,将RDD转换到DataFrame,使用指定的列名。列的类型是从RDD中的数据推断出来的。

// 用于隐式转换,如将rdd转换为DataFrame
import spark.implicits._

val persons = List(("张三",23),("李四",18),("王老五",35))
val personRDD = spark.sparkContext.parallelize(persons)    	// RDD[(String, Int)]
val personsDF = personRDD.toDF("name", "age")			// rdd to DataFrame
personsDF.printSchema()
personsDF.show()

我们在这里创建了一个RDD,它包含元组元素,然后调用它的toDF方法。请注意,toDF采用的是元组列表,而不是标量元素。每个元组类似于一行。我们可以选择列名,否则,Spark会自行创建一些模糊的名称,比如_1、_2。列的类型推断是隐式的。

执行上面的代码,输出结果如下:

+------+---+
|  name|age|
+------+---+
|  张三| 23|
|  李四| 18|
|王老五| 35|
+------+---+

使用case class和反射来创建DataFrame

如果已经有了RDD的数据,Spark SQL支持使用反射来推断RDD的模式,该模式包含特定类型(case类)的对象,从而将现有的RDD转换成DataFrame。

使用反射模式,可以实现从RDD到DataFrame间的隐式转换。请看下面的示例:

// 定义一个case class
case class Person(name:String,age:Long)

import spark.implicits._

val peoples = List(
      Person("张三",29),
      Person("李四",30),
      Person("王老五",19)
    )

val peopleDF = spark.sparkContext.parallelize(peoples).toDF()
peopleDF.printSchema()
peopleDF.show()

输出结果如下所示:

root
 |-- name: string (nullable = true)
 |-- age: long (nullable = false)

+------+---+
|  name|age|
+------+---+
|  张三| 29|
|  李四| 30|
|王老五| 19|
+------+---+

如果上面示例中的集合元素不是Person对象,而是字符串的话,那么需要我们自己通过转换来构造出Person集合来。如下所示:

// 定义一个case class
case class Person(name:String,age:Long)

// 创建一个List集合
val peoples = List("张三,29","李四,30","王老五,19")

// 构造一个RDD,经过转换为RDD[Person]类型后,再转换为DataFrame
val peopleRDD = spark.sparkContext.parallelize(peoples)
val peopleDF = peopleRDD
      .map(_.split(","))
      .map(x => Person(x(0),x(1).trim.toLong))
      .toDF()
peopleDF.printSchema()
peopleDF.show()

使用指定的模式创建DataFrame

第三种方法是通过一个编程接口,先构造出一个特定的模式(schema),然后使用该模式(schema)创建一个DataFrame。这需要使用SparkSession的方法createDataFrame来创建。

虽然这种方法比较冗长,但是在事先不知道列类型的情况下,这种方法允许我们自行构造DataFrame。

创建过程如下面的代码所示:

import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import spark.implicits._

// 构造一个RDD
val peopleRDD = spark.sparkContext.parallelize(
      Seq(
        Row("张三",30),
        Row("李四",25),
        Row("王老五",35)
      )
    )

// 指定一个Schema(模式)
val fields = Seq(      
      StructField("name", StringType, nullable = true),
      StructField("age", IntegerType, nullable = true)
    )
val schema = StructType(fields)

// 从给定的RDD应用给定的Schema创建一个DataFrame
val peopleDF = spark.createDataFrame(peopleRDD, schema)

// 查看DataFrame Schema
peopleDF.printSchema

// 输出
peopleDF.show

输出结果如下所示:

root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)

+------+---+
|  name|age|
+------+---+
|  张三| 30|
|  李四| 25|
|王老五| 35|
+------+---+

《Flink原理深入与编程实战》