操作DataFrame

从Spark 2.0开始,Spark将DataFrame看作是Dataset的一个特例,即Dataset[Row]。Dataset/DataFrame为结构化数据操作提供了一种特定于领域的语言(DSL)。其中DataFrame所支持的这些操作也被称为“无类型转换”,同时提供Scala、Java、Python和R语言的支持。而Dataset所支持的这些操作也被称为“类型化转换”,只提供强类型的Scala和Java语言支持的API。

在本节中,我们将重点讨论可以在DataFrame上执行的各种操作。这些操作被分为两类,transformation和action。开发人员链接多个操作来选择、过滤、转换、聚合和排序在DataFrame中的数据。

多种方式引用列

在学习操作DataFrame之前,首先需要掌握Spark所提供的引用DataFrame中的列的多种方式。在下面的代码中,演示了这些方式:

import org.apache.spark.sql.functions._
import spark.implicits._

// 使用元组序列创建一个DataFrame
val kvDF = Seq((1,2),(3,4)).toDF("key","value")

// 要在一个DataFrame中显示列名,可以调用columns函数
kvDF.columns

// 以不同的方式选择特定的列
kvDF.select("key").show             	// 列为字符串类型
kvDF.select(col("key")).show        	// col是内置函数,它返回Column类型
kvDF.select(column("key")).show     	// column是内置函数,它返回Column类型
kvDF.select(expr("key")).show     	// expr与col方法调用相同
kvDF.select($"key").show            	// Scala中构造Column类型的语法糖
kvDF.select('key).show              	// 同上

// 也可以使用DataFrame的col函数
kvDF.select(kvDF.col("key")).show

执行上面的代码,都输出相同的内容:

+---+
|key|
+---+
|  1|
|  3|
+---+

准备数据

在演示DataFrame的各种操作方法之前,我们先准备好数据。首先加载数据集到DataFrame中,代码如下所示:

// 加载电影数据集文件到DataFrame中
val file = "./src/main/resources/movies.csv"
val movies = spark.read
      .option("header","true")
      .option("inferSchema","true")
      .csv(file)

movies.printSchema()
movies.show(5)

执行以上代码,输出结果如下所示:

root
 |-- actor: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: integer (nullable = true)

+-----------------------+----------------+-------+
|            actor|        title|year|
+-----------------------+----------------+------+
|McClure, Marc (I)|Freaky Friday|2003|
|McClure, Marc (I)| Coach Carter|2005|
|McClure, Marc (I)|  Superman II|1980|
|McClure, Marc (I)|    Apollo 13|1995|
|McClure, Marc (I)|   Superman|1978|
+-----------------------+----------------+------+

对DataFrame进行转换操作

DataFrame API提供有许多函数用来执行关系运算,这些函数模拟了SQL关系操作:

  • 选择数据:select
  • 删除某列:drop
  • 过滤数据:where和filter(同义的)
  • 限制返回的数量:limit
  • 重命名列:withColumnRenamed
  • 增加一个新的列:withColumn
  • 数据分组:groupBy
  • 数据排序:orderBy和sort(等价的)

接下来,学习DataFrame的各种转换操作函数。

1)select函数:选择指定的列。

下面的代码选取原DF中的两列,返回一个新的DataFrame:

    movies.select("title","year").show(5)

输出结果如下:

+-------------+----+
|        title|year|
+-------------+----+
|Freaky Friday|2003|
| Coach Carter|2005|
|  Superman II|1980|
|    Apollo 13|1995|
|     Superman|1978|
+-------------+----+

下面的代码使用column表达式将电影上演的年份转换到年代表示,给赋予一个别名:

import spark.implicits._
movies.select($"title",($"year"-$"year" % 10).as("decade")).show(5)

输出结果如下所示:

+-------------+------+
|        title|decade|
+-------------+------+
|Freaky Friday|  2000|
| Coach Carter|  2000|
|  Superman II|  1980|
|    Apollo 13|  1990|
|     Superman|  1970|
+-------------+------+

2)selectExpr:使用SQL表达式选择列。

下面的代码中,用通配符星号(*)来表示选择所有的列,并增加一个新的列“decade”,新列的值是通过对year列值计算得到的电影上映的年代:

movies.selectExpr("*","(year - year % 10) as decade").show(5)

输出内容如下所示:

+-----------------+-------------+----+------+
|            actor|        title|year|decade|
+-----------------+-------------+----+------+
|McClure, Marc (I)|Freaky Friday|2003|  2000|
|McClure, Marc (I)| Coach Carter|2005|  2000|
|McClure, Marc (I)|  Superman II|1980|  1980|
|McClure, Marc (I)|    Apollo 13|1995|  1990|
|McClure, Marc (I)|     Superman|1978|  1970|
+-----------------+-------------+----+------+

下面的代码中使用了SQL表达式和内置函数,用来查询电影数量和演员数量这两个值:

movies.selectExpr("count(distinct(title)) as movies","count(distinct(actor)) as actors").show()

输出结果如下所示:

+------+------+
|movies|actors|
+------+------+
|  1409|  6527|
+------+------+

可以看出,数据集中的电影共有1409部,演员共有6527名。

3)filter, where:实现过滤。这两个函数等价。

首先找出2000年以前上映的电影,在filter函数或where函数中指定过滤条件:

import spark.implicits._
movies.filter($"year" < 2000).show(5)
// movies.where($"year" < 2000).show(5)		// 等价

输出结果如下所示:

+-----------------+--------------------+----+
|            actor|               title|year|
+-----------------+--------------------+----+
|McClure, Marc (I)|         Superman II|1980|
|McClure, Marc (I)|           Apollo 13|1995|
|McClure, Marc (I)|            Superman|1978|
|McClure, Marc (I)|  Back to the Future|1985|
|McClure, Marc (I)|Back to the Futur...|1990|
+-----------------+--------------------+----+

找出2000年及以后上映的电影:

import spark.implicits._

movies.filter($"year" >= 2000).show(5)
movies.where($"year" >= 2000).show(5)

输出结果如下所示:

+-----------------+--------------------+----+
|            actor|               title|year|
+-----------------+--------------------+----+
|McClure, Marc (I)|       Freaky Friday|2003|
|McClure, Marc (I)|        Coach Carter|2005|
|Cooper, Chris (I)|  Me, Myself & Irene|2000|
|Cooper, Chris (I)|              Capote|2005|
|Cooper, Chris (I)|The Bourne Supremacy|2004|
+-----------------+--------------------+----+

找出2000年上映的电影。注意,在Scala中,相等比较要求3个等号。

import spark.implicits._

movies.filter($"year" === 2000).show(5)
movies.where($"year" === 2000).show(5)

输出结果如下所示:

+-----------------+--------------------+----+
|            actor|               title|year|
+-----------------+--------------------+----+
|Cooper, Chris (I)|  Me, Myself & Irene|2000|
|Cooper, Chris (I)|         The Patriot|2000|
|  Jolie, Angelina|Gone in Sixty Sec...|2000|
|   Yip, Françoise|      Romeo Must Die|2000|
|   Danner, Blythe|    Meet the Parents|2000|
+-----------------+--------------------+----+

找出非2000年上映的电影。注意,在Scala中,不相等比较使用的操作符是 =!=。

import spark.implicits._

movies.select("title","year").filter('year =!= 2000).show(5)
movies.select("title","year").where('year =!= 2000).show(5)

输出结果如下所示:

+-------------+----+
|        title|year|
+-------------+----+
|Freaky Friday|2003|
| Coach Carter|2005|
|  Superman II|1980|
|    Apollo 13|1995|
|     Superman|1978|
+-------------+----+

4)distinct(),dropDuplicates()

返回一个新数据集,其中仅包含此数据集中的唯一行。实际上,distinct()是dropDuplicates()的别名。例如,想知道数据集中共有多少条唯一行(去重),代码如下:

movies.distinct().count()
movies.dropDuplicates().count()    // 与上句等价

执行以上代码,输出内容如下:

31394

但是,如果想要知道数据集中共包含有多少部电影,则需要基于title字段进行唯一值统计,代码如下:

movies.select("title").distinct().count()

// 其实也可以使用SQL的distinct函数
// movies.selectExpr("count(distinct(title)) as movies").show()

执行以上代码,输出内容如下:

1409

5)dropDuplicates(colNames: Array[String])

仅考虑列的子集,返回删除(按列的子集)重复行的新数据集。例如,同样需要统计数据集中共包含多少部电影,代码如下:

movies.dropDuplicates(Array("title")).count()

执行以上代码,输出内容如下:

1409

6)sort(sortExprs: Column*),orderBy(sortExprs: Column*)

相当于SQL中的order by子句,它返回按指定的列排序后的新数据集。例如,要按电影名称长度顺序以及上映年份倒序显示,代码如下:

movies.dropDuplicates(Seq("title", "year"))       // 去重
      .selectExpr("title", "length(title) as title_length", "year")
      .orderBy('title_length.asc, 'year.desc)     // asc:升序,desc:倒序
      .show(10,false)

执行以上代码,输出内容如下:

+-----+------------+----+
|title|title_length|year|
+-----+------------+----+
|Up    |2             |2009|
|21    |2             |2008|
|12    |2             |2007|
|RV    |2             |2006|
|X2    |2             |2003|
|Rio   |3             |2011|
|Hop   |3             |2011|
|300   |3             |2006|
|Saw   |3             |2004|
|Elf   |3             |2003|
+-----+------------+----+
only showing top 10 rows

7)groupBy(cols: Column*),groupBy(col1: String, cols: String*)

相当于SQL中的group by子句,按指定的列对数据进行分组,以便执行聚合统计操作。例如,统计每年上映的电影数量并按数量倒序显示,代码如下:

movies.groupBy("year")
      .count()
      .orderBy($"count".desc)
      .show(10)

执行以上代码,输出内容如下:

+----+-----+
|year|count|
+----+-----+
|2006| 2078|
|2004| 2005|
|2007| 1986|
|2005| 1960|
|2011| 1926|
|2008| 1892|
|2009| 1890|
|2010| 1843|
|2002| 1834|
|2001| 1687|
+----+-----+
only showing top 10 rows

如果要统计上映电影数量超过2000部的年份,代码如下:

movies.groupBy("year")
      .count()
      .where($"count" > 2000)
      .show()

执行以上代码,输出内容如下:

+----+-----+
|year|count|
+----+-----+
|2006| 2078|
|2004| 2005|
+----+-----+

8)limit(n: Int)

通过获取前n行返回一个新的Dataset,相当于SQL中的limit子句。通常将其与orderBy配合来实现Top N算法。例如,统计上映电影数量最多的5个年份,代码如下:

movies.groupBy("year")
       .count()
       .orderBy($"count".desc)
       .limit(5)
       .show()

执行以上代码,输出内容如下:

+----+-----+
|year|count|
+----+-----+
|2006| 2078|
|2004| 2005|
|2007| 1986|
|2005| 1960|
|2011| 1926|
+----+-----+

查询电影名称最长的5部电影,代码如下:

movies.dropDuplicates("title")       // 去重
      .selectExpr("title", "length(title) as title_length")
      .orderBy(col("title_length").desc)     // 倒序
      .limit(5)
      .show(false)

执行以上代码,输出内容如下:

+-----------------------------------------------------------------------------------+------------+
|title                                                                              |title_length|
+-----------------------------------------------------------------------------------+------------+
|Borat: Cultural Learnings of America for Make Benefit Glorious Nation of Kazakhstan|83          |
|The Chronicles of Narnia: The Lion, the Witch and the Wardrobe                     |62          |
|Hannah Montana & Miley Cyrus: Best of Both Worlds Concert                          |57          |
|The Chronicles of Narnia: The Voyage of the Dawn Treader                           |56          |
|Istoriya pro Richarda, milorda i prekrasnuyu Zhar-ptitsu                           |56          |
+-----------------------------------------------------------------------------------+------------+

9)union(other: Dataset[T]),unionAll(other: Dataset[T])

按列位置合并两个数据集的行,相当于SQL中的union all。假设现在想在名称为“12”的电影中添加一个缺失的演员,可以采用如下的方法:另外创建一个DataFrame,其中包含所缺失的演员信息,然后将原数据集与这个含有缺失演员信息的新数据集执行一个union操作,将两个数据集合并在一起。这样就将缺失的演员添加到了原数据集中。

首先获得电影“12”的数据集,代码如下:

val shortNameMovieDF = movies.where($"title" === "12")
shortNameMovieDF.show()

执行以上代码,输出内容如下:

+--------------------+-----+----+
|                  actor|title|year|
+--------------------+-----+----+
|     Efremov, Mikhail|   12|2007|
|      Stoyanov, Yuriy|   12|2007|
|      Gazarov, Sergey|   12|2007|
|Verzhbitskiy, Viktor|   12|2007|
+--------------------+-----+----+

可以看到,该部电影有四个演员,但是缺少了演员Brychta,Edita的信息。接下来,另外创建一个DataFrame,包含所缺失演员Brychta,Edita的信息。然后将这个DataFrame和上面的DataFrame进行合并,这样就将演员Brychta,Edita的信息合并到上面的数据集中了,代码如下:

// 另创建一个DataFrame
val forgottenActor = Seq(("Brychta, Edita", "12", 2007L))
val forgottenActorDF = forgottenActor.toDF("actor","title","year")

// 通过合并两个DataFrame,实现添加缺失的演员姓名
val completeShortNameMovieDF = shortNameMovieDF.union(forgottenActorDF)
completeShortNameMovieDF.show()

执行以上代码,输出内容如下:

+--------------------+-----+----+
|                  actor|title|year|
+--------------------+-----+----+
|     Efremov, Mikhail|   12|2007|
|      Stoyanov, Yuriy|   12|2007|
|      Gazarov, Sergey|   12|2007|
|Verzhbitskiy, Viktor|   12|2007|
|       Brychta, Edita|   12|2007|
+--------------------+-----+----+

如果要实现类SQL中UNION的功能,则在union()操作之后使用distinct()操作,就可以去掉重复的数据了。

10) withColumn(colName: String, col: Column)

通过添加列或替换具有相同名称的现有列,返回一个新的数据集。例如,向movies数据集增加一个新列decade,该列的值是基于year这个列的表达式,计算的结果是该电影上映的年代,代码如下:

movies.withColumn("decade", $"year" - $"year" % 10).show(5)

执行以上代码,输出内容如下:

+-----------------+-------------+----+------+
|              actor|          title|year|decade|
+-----------------+-------------+----+------+
|McClure, Marc (I)|Freaky Friday|2003|  2000|
|McClure, Marc (I)| Coach Carter|2005|  2000|
|McClure, Marc (I)|  Superman II|1980|  1980|
|McClure, Marc (I)|     Apollo 13|1995|  1990|
|McClure, Marc (I)|      Superman|1978|  1970|
+-----------------+-------------+----+------+
only showing top 5 rows

如果传给withColumn函数的列名与现有列名相同,则意味着用新值替换旧值。例如,将year这一列的值替换为年代(原值为年份),代码如下:

movies.withColumn("year", $"year" - $"year" % 10).show(5)

执行以上代码,输出内容如下:

+-----------------+-------------+----+
|              actor|          title|year|
+-----------------+-------------+----+
|McClure, Marc (I)|Freaky Friday|2000|
|McClure, Marc (I)| Coach Carter|2000|
|McClure, Marc (I)|  Superman II|1980|
|McClure, Marc (I)|     Apollo 13|1990|
|McClure, Marc (I)|      Superman|1970|
+-----------------+-------------+----+
only showing top 5 rows

11) withColumnRenamed(existingName: String, newName: String)

返回一个重命名列的新数据集。如果模式不包含existingName,则不做任何操作。例如,将movies数据集中的列名改为新的名称,代码如下:

movies
    .withColumnRenamed("actor", "actor_name")
    .withColumnRenamed("title", "movie_title")
    .withColumnRenamed("year", "produced_year")
    .show(5)

执行以上代码,输出内容如下:

+-----------------+-------------+-------------+
|        actor_name|   movie_title|produced_year|
+-----------------+-------------+-------------+
|McClure, Marc (I)| Freaky Friday|         2003|
|McClure, Marc (I)|   Coach Carter|         2005|
|McClure, Marc (I)|    Superman II|         1980|
|McClure, Marc (I)|      Apollo 13|         1995|
|McClure, Marc (I)|        Superman|         1978|
+-----------------+-------------+-------------+
only showing top 5 rows

12) drop(col: Column),drop(colNames: String*)

返回一个删除了指定列的新Dataset。如果要被删除的列不存在,则不做任何操作。例如,删除movies数据集中指定的列,代码如下:

// 删除指定的列,包括一个并不存在的列”me”
val dropedMovies = movies.drop("actor", "me")

dropedMovies.printSchema()
dropedMovies.show(5)

执行以上代码,输出内容如下:

root
 |-- title: string (nullable = true)
 |-- year: integer (nullable = true)

+-------------+----+
|         title|year|
+-------------+----+
|Freaky Friday|2003|
| Coach Carter|2005|
|  Superman II|1980|
|     Apollo 13|1995|
|      Superman|1978|
+-------------+----+
only showing top 5 rows

从输出结果可以看出,actor这一列被删除了,而me这一列在原DataFrame中并不存在,所以删除不存在的列没有任何影响。

13) sample(fraction: Double)

数据抽样。通过使用随机种子对部分行进行抽样(不进行替换),返回一个新的Dataset。抽样比例由fraction参数决定,它指定要生成的行的比例,范围[0.0,1.0]。这个方法有多个不同的重载版本:

  • (1) sample(fraction: Double, seed: Long):指定种子。
  • (2) sample(withReplacement: Boolean, fraction: Double):指定是否替换,使用随机种子。
  • (3) sample(withReplacement: Boolean, fraction: Double, seed: Long):同上,但指定种子。

例如,指定无放回抽样和抽样比例,代码如下:

movies.sample(false, 0.0003).show(false)

执行以上代码,输出内容如下(随机的):

+--------------------+-------------------------------------------+----+
|actor                  |title                                              |year|
+--------------------+-------------------------------------------+----+
|Baskin, Elya          |Austin Powers: International Man of Mystery|1997|
|Rapace, Noomi         |Prometheus                                       |2012|
|Hatcher, Teri         |Coraline                                          |2009|
|Damon, Matt           |The Bourne Supremacy                            |2004|
|Novak, B.J.           |Knocked Up                                        |2007|
|Masamune, Tohoru     |Jumper                                             |2008|
|Cruise, Tom           |Collateral                                        |2004|
|Harvey, Steve (I)    |Racing Stripes                                   |2005|
|Giuliani, Rudolph W.|Anger Management                                 |2003|
|Sloan, Amy            |The Aviator                                       |2004|
|Krumholtz, David (I)|The Mexican                                       |2001|
+--------------------+-------------------------------------------+----+

执行有放回抽样并指定比例因子、种子的抽样,代码如下:

movies.sample(true, 0.0003, 123456).show()

执行以上代码,输出内容如下(保持不变的):

+-----------------+--------------------+----+
|              actor|                  title|year|
+-----------------+--------------------+----+
|      Piddock, Jim|         The Prestige|2006|
|      Reed, Tanoai|  The Stepford Wives|2004|
|      Moyo, Masasa|      Angels & Demons|2009|
| Zemeckis, Leslie|                Beowulf|2007|
|     Huston, Danny|X-Men Origins: Wo...|2009|
|     Pompeo, Ellen|            Old School|2003|
| Utt, Kenneth (I)|          Philadelphia|1993|
|Cannon, Kevin (I)|                Cop Out|2010|
+-----------------+--------------------+----+

对DataFrame进行action操作

与RDD类似,当在DataFrame上执行action操作时,会触发真正的计算。这些action操作相对都比较简单,在下面的代码中演示了这些action方法的使用:

// 查看前5条数据。第2个参数指定当列内容较长时,是否截断显示,false为不截断
movies.show(5,false)

// 返回数据集中的数量
movies.count

// 返回数据集中第1条数据
movies.first()

// 等价于first方法
movies.head()

// 返回数据集中前3条数据,以Array形式
movies.head(3)

// 返回数据集中前3条数据,以Array形式
movies.take(3)

// 返回数据集中前3条数据,以List形式
movies.takeAsList(3) 

// 返回一个包含数据集中所有行的数组
movies.collect

// 返回一个包含数据集中所有行的数组,以List形式
movies.collectAsList

// 返回数据集的数据类型,以Array形式
df.types

// 返回数据集的列名,以Array形式
df.columns

《Spark原理深入与编程实战》