构造DataFrame-从RDD创建
从RDD创建DataFrame有三种方式:
- 使用包含Row数据(以元组的形式)的RDD
- 使用case类
- 明确指定一个模式(schema)
从RDD创建DataFrame有多种方式,但是这些方法都必须提供一个schema。要么显式地提供,要么隐式地提供。
使用包含Row数据(以元组的形式)的RDD创建DataFrame
下面的示例中,调用RDD的toDF显式函数,将RDD转换到DataFrame,使用指定的列名。列的类型是从RDD中的数据推断出来的。
// 用于隐式转换,如将rdd转换为DataFrame
import spark.implicits._
val persons = List(("张三",23),("李四",18),("王老五",35))
val personRDD = spark.sparkContext.parallelize(persons) // RDD[(String, Int)]
val personsDF = personRDD.toDF("name", "age") // rdd to DataFrame
personsDF.printSchema()
personsDF.show()
我们在这里创建了一个RDD,它包含元组元素,然后调用它的toDF方法。请注意,toDF采用的是元组列表,而不是标量元素。每个元组类似于一行。我们可以选择列名,否则,Spark会自行创建一些模糊的名称,比如_1、_2。列的类型推断是隐式的。
执行上面的代码,输出结果如下:
+------+---+ | name|age| +------+---+ | 张三| 23| | 李四| 18| |王老五| 35| +------+---+
使用case class和反射来创建DataFrame
如果已经有了RDD的数据,Spark SQL支持使用反射来推断RDD的模式,该模式包含特定类型(case类)的对象,从而将现有的RDD转换成DataFrame。
使用反射模式,可以实现从RDD到DataFrame间的隐式转换。请看下面的示例:
// 定义一个case class
case class Person(name:String,age:Long)
import spark.implicits._
val peoples = List(
Person("张三",29),
Person("李四",30),
Person("王老五",19)
)
val peopleDF = spark.sparkContext.parallelize(peoples).toDF()
peopleDF.printSchema()
peopleDF.show()
输出结果如下所示:
root |-- name: string (nullable = true) |-- age: long (nullable = false) +------+---+ | name|age| +------+---+ | 张三| 29| | 李四| 30| |王老五| 19| +------+---+
如果上面示例中的集合元素不是Person对象,而是字符串的话,那么需要我们自己通过转换来构造出Person集合来。如下所示:
// 定义一个case class
case class Person(name:String,age:Long)
// 创建一个List集合
val peoples = List("张三,29","李四,30","王老五,19")
// 构造一个RDD,经过转换为RDD[Person]类型后,再转换为DataFrame
val peopleRDD = spark.sparkContext.parallelize(peoples)
val peopleDF = peopleRDD
.map(_.split(","))
.map(x => Person(x(0),x(1).trim.toLong))
.toDF()
peopleDF.printSchema()
peopleDF.show()
使用指定的模式创建DataFrame
第三种方法是通过一个编程接口,先构造出一个特定的模式(schema),然后使用该模式(schema)创建一个DataFrame。这需要使用SparkSession的方法createDataFrame来创建。
虽然这种方法比较冗长,但是在事先不知道列类型的情况下,这种方法允许我们自行构造DataFrame。
创建过程如下面的代码所示:
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import spark.implicits._
// 构造一个RDD
val peopleRDD = spark.sparkContext.parallelize(
Seq(
Row("张三",30),
Row("李四",25),
Row("王老五",35)
)
)
// 指定一个Schema(模式)
val fields = Seq(
StructField("name", StringType, nullable = true),
StructField("age", IntegerType, nullable = true)
)
val schema = StructType(fields)
// 从给定的RDD应用给定的Schema创建一个DataFrame
val peopleDF = spark.createDataFrame(peopleRDD, schema)
// 查看DataFrame Schema
peopleDF.printSchema
// 输出
peopleDF.show
输出结果如下所示:
root |-- name: string (nullable = true) |-- age: integer (nullable = true) +------+---+ | name|age| +------+---+ | 张三| 30| | 李四| 25| |王老五| 35| +------+---+