构造DataFrame-从RDD创建
从RDD创建DataFrame有三种方式:
- 使用包含Row数据(以元组的形式)的RDD
- 使用case类
- 明确指定一个模式(schema)
从RDD创建DataFrame有多种方式,但是这些方法都必须提供一个schema。要么显式地提供,要么隐式地提供。
使用包含Row数据(以元组的形式)的RDD创建DataFrame
下面的示例中,调用RDD的toDF显式函数,将RDD转换到DataFrame,使用指定的列名。列的类型是从RDD中的数据推断出来的。
// 用于隐式转换,如将rdd转换为DataFrame import spark.implicits._ val persons = List(("张三",23),("李四",18),("王老五",35)) val personRDD = spark.sparkContext.parallelize(persons) // RDD[(String, Int)] val personsDF = personRDD.toDF("name", "age") // rdd to DataFrame personsDF.printSchema() personsDF.show()
我们在这里创建了一个RDD,它包含元组元素,然后调用它的toDF方法。请注意,toDF采用的是元组列表,而不是标量元素。每个元组类似于一行。我们可以选择列名,否则,Spark会自行创建一些模糊的名称,比如_1、_2。列的类型推断是隐式的。
执行上面的代码,输出结果如下:
+------+---+ | name|age| +------+---+ | 张三| 23| | 李四| 18| |王老五| 35| +------+---+
使用case class和反射来创建DataFrame
如果已经有了RDD的数据,Spark SQL支持使用反射来推断RDD的模式,该模式包含特定类型(case类)的对象,从而将现有的RDD转换成DataFrame。
使用反射模式,可以实现从RDD到DataFrame间的隐式转换。请看下面的示例:
// 定义一个case class case class Person(name:String,age:Long) import spark.implicits._ val peoples = List( Person("张三",29), Person("李四",30), Person("王老五",19) ) val peopleDF = spark.sparkContext.parallelize(peoples).toDF() peopleDF.printSchema() peopleDF.show()
输出结果如下所示:
root |-- name: string (nullable = true) |-- age: long (nullable = false) +------+---+ | name|age| +------+---+ | 张三| 29| | 李四| 30| |王老五| 19| +------+---+
如果上面示例中的集合元素不是Person对象,而是字符串的话,那么需要我们自己通过转换来构造出Person集合来。如下所示:
// 定义一个case class case class Person(name:String,age:Long) // 创建一个List集合 val peoples = List("张三,29","李四,30","王老五,19") // 构造一个RDD,经过转换为RDD[Person]类型后,再转换为DataFrame val peopleRDD = spark.sparkContext.parallelize(peoples) val peopleDF = peopleRDD .map(_.split(",")) .map(x => Person(x(0),x(1).trim.toLong)) .toDF() peopleDF.printSchema() peopleDF.show()
使用指定的模式创建DataFrame
第三种方法是通过一个编程接口,先构造出一个特定的模式(schema),然后使用该模式(schema)创建一个DataFrame。这需要使用SparkSession的方法createDataFrame来创建。
虽然这种方法比较冗长,但是在事先不知道列类型的情况下,这种方法允许我们自行构造DataFrame。
创建过程如下面的代码所示:
import org.apache.spark.sql.Row import org.apache.spark.sql.types._ import spark.implicits._ // 构造一个RDD val peopleRDD = spark.sparkContext.parallelize( Seq( Row("张三",30), Row("李四",25), Row("王老五",35) ) ) // 指定一个Schema(模式) val fields = Seq( StructField("name", StringType, nullable = true), StructField("age", IntegerType, nullable = true) ) val schema = StructType(fields) // 从给定的RDD应用给定的Schema创建一个DataFrame val peopleDF = spark.createDataFrame(peopleRDD, schema) // 查看DataFrame Schema peopleDF.printSchema // 输出 peopleDF.show
输出结果如下所示:
root |-- name: string (nullable = true) |-- age: integer (nullable = true) +------+---+ | name|age| +------+---+ | 张三| 30| | 李四| 25| |王老五| 35| +------+---+