操作DataFrame
从Spark 2.0开始,Spark将DataFrame看作是Dataset的一个特例,即Dataset[Row]。Dataset/DataFrame为结构化数据操作提供了一种特定于领域的语言(DSL)。其中DataFrame所支持的这些操作也被称为“无类型转换”,同时提供Scala、Java、Python和R语言的支持。而Dataset所支持的这些操作也被称为“类型化转换”,只提供强类型的Scala和Java语言支持的API。
在本节中,我们将重点讨论可以在DataFrame上执行的各种操作。这些操作被分为两类,transformation和action。开发人员链接多个操作来选择、过滤、转换、聚合和排序在DataFrame中的数据。
多种方式引用列
在学习操作DataFrame之前,首先需要掌握Spark所提供的引用DataFrame中的列的多种方式。在下面的代码中,演示了这些方式:
import org.apache.spark.sql.functions._ import spark.implicits._ // 使用元组序列创建一个DataFrame val kvDF = Seq((1,2),(3,4)).toDF("key","value") // 要在一个DataFrame中显示列名,可以调用columns函数 kvDF.columns // 以不同的方式选择特定的列 kvDF.select("key").show // 列为字符串类型 kvDF.select(col("key")).show // col是内置函数,它返回Column类型 kvDF.select(column("key")).show // column是内置函数,它返回Column类型 kvDF.select(expr("key")).show // expr与col方法调用相同 kvDF.select($"key").show // Scala中构造Column类型的语法糖 kvDF.select('key).show // 同上 // 也可以使用DataFrame的col函数 kvDF.select(kvDF.col("key")).show
执行上面的代码,都输出相同的内容:
+---+ |key| +---+ | 1| | 3| +---+
准备数据
在演示DataFrame的各种操作方法之前,我们先准备好数据。首先加载数据集到DataFrame中,代码如下所示:
// 加载电影数据集文件到DataFrame中 val file = "./src/main/resources/movies.csv" val movies = spark.read .option("header","true") .option("inferSchema","true") .csv(file) movies.printSchema() movies.show(5)
执行以上代码,输出结果如下所示:
root |-- actor: string (nullable = true) |-- title: string (nullable = true) |-- year: integer (nullable = true) +-----------------------+----------------+-------+ | actor| title|year| +-----------------------+----------------+------+ |McClure, Marc (I)|Freaky Friday|2003| |McClure, Marc (I)| Coach Carter|2005| |McClure, Marc (I)| Superman II|1980| |McClure, Marc (I)| Apollo 13|1995| |McClure, Marc (I)| Superman|1978| +-----------------------+----------------+------+
对DataFrame进行转换操作
DataFrame API提供有许多函数用来执行关系运算,这些函数模拟了SQL关系操作:
- 选择数据:select
- 删除某列:drop
- 过滤数据:where和filter(同义的)
- 限制返回的数量:limit
- 重命名列:withColumnRenamed
- 增加一个新的列:withColumn
- 数据分组:groupBy
- 数据排序:orderBy和sort(等价的)
接下来,学习DataFrame的各种转换操作函数。
1)select函数:选择指定的列。
下面的代码选取原DF中的两列,返回一个新的DataFrame:
movies.select("title","year").show(5)
输出结果如下:
+-------------+----+ | title|year| +-------------+----+ |Freaky Friday|2003| | Coach Carter|2005| | Superman II|1980| | Apollo 13|1995| | Superman|1978| +-------------+----+
下面的代码使用column表达式将电影上演的年份转换到年代表示,给赋予一个别名:
import spark.implicits._ movies.select($"title",($"year"-$"year" % 10).as("decade")).show(5)
输出结果如下所示:
+-------------+------+ | title|decade| +-------------+------+ |Freaky Friday| 2000| | Coach Carter| 2000| | Superman II| 1980| | Apollo 13| 1990| | Superman| 1970| +-------------+------+
2)selectExpr:使用SQL表达式选择列。
下面的代码中,用通配符星号(*)来表示选择所有的列,并增加一个新的列“decade”,新列的值是通过对year列值计算得到的电影上映的年代:
movies.selectExpr("*","(year - year % 10) as decade").show(5)
输出内容如下所示:
+-----------------+-------------+----+------+ | actor| title|year|decade| +-----------------+-------------+----+------+ |McClure, Marc (I)|Freaky Friday|2003| 2000| |McClure, Marc (I)| Coach Carter|2005| 2000| |McClure, Marc (I)| Superman II|1980| 1980| |McClure, Marc (I)| Apollo 13|1995| 1990| |McClure, Marc (I)| Superman|1978| 1970| +-----------------+-------------+----+------+
下面的代码中使用了SQL表达式和内置函数,用来查询电影数量和演员数量这两个值:
movies.selectExpr("count(distinct(title)) as movies","count(distinct(actor)) as actors").show()
输出结果如下所示:
+------+------+ |movies|actors| +------+------+ | 1409| 6527| +------+------+
可以看出,数据集中的电影共有1409部,演员共有6527名。
3)filter, where:实现过滤。这两个函数等价。
首先找出2000年以前上映的电影,在filter函数或where函数中指定过滤条件:
import spark.implicits._ movies.filter($"year" < 2000).show(5) // movies.where($"year" < 2000).show(5) // 等价
输出结果如下所示:
+-----------------+--------------------+----+ | actor| title|year| +-----------------+--------------------+----+ |McClure, Marc (I)| Superman II|1980| |McClure, Marc (I)| Apollo 13|1995| |McClure, Marc (I)| Superman|1978| |McClure, Marc (I)| Back to the Future|1985| |McClure, Marc (I)|Back to the Futur...|1990| +-----------------+--------------------+----+
找出2000年及以后上映的电影:
import spark.implicits._ movies.filter($"year" >= 2000).show(5) movies.where($"year" >= 2000).show(5)
输出结果如下所示:
+-----------------+--------------------+----+ | actor| title|year| +-----------------+--------------------+----+ |McClure, Marc (I)| Freaky Friday|2003| |McClure, Marc (I)| Coach Carter|2005| |Cooper, Chris (I)| Me, Myself & Irene|2000| |Cooper, Chris (I)| Capote|2005| |Cooper, Chris (I)|The Bourne Supremacy|2004| +-----------------+--------------------+----+
找出2000年上映的电影。注意,在Scala中,相等比较要求3个等号。
import spark.implicits._ movies.filter($"year" === 2000).show(5) movies.where($"year" === 2000).show(5)
输出结果如下所示:
+-----------------+--------------------+----+ | actor| title|year| +-----------------+--------------------+----+ |Cooper, Chris (I)| Me, Myself & Irene|2000| |Cooper, Chris (I)| The Patriot|2000| | Jolie, Angelina|Gone in Sixty Sec...|2000| | Yip, Françoise| Romeo Must Die|2000| | Danner, Blythe| Meet the Parents|2000| +-----------------+--------------------+----+
找出非2000年上映的电影。注意,在Scala中,不相等比较使用的操作符是 =!=。
import spark.implicits._ movies.select("title","year").filter('year =!= 2000).show(5) movies.select("title","year").where('year =!= 2000).show(5)
输出结果如下所示:
+-------------+----+ | title|year| +-------------+----+ |Freaky Friday|2003| | Coach Carter|2005| | Superman II|1980| | Apollo 13|1995| | Superman|1978| +-------------+----+
4)distinct(),dropDuplicates()
返回一个新数据集,其中仅包含此数据集中的唯一行。实际上,distinct()是dropDuplicates()的别名。例如,想知道数据集中共有多少条唯一行(去重),代码如下:
movies.distinct().count() movies.dropDuplicates().count() // 与上句等价
执行以上代码,输出内容如下:
31394
但是,如果想要知道数据集中共包含有多少部电影,则需要基于title字段进行唯一值统计,代码如下:
movies.select("title").distinct().count() // 其实也可以使用SQL的distinct函数 // movies.selectExpr("count(distinct(title)) as movies").show()
执行以上代码,输出内容如下:
1409
5)dropDuplicates(colNames: Array[String])
仅考虑列的子集,返回删除(按列的子集)重复行的新数据集。例如,同样需要统计数据集中共包含多少部电影,代码如下:
movies.dropDuplicates(Array("title")).count()
执行以上代码,输出内容如下:
1409
6)sort(sortExprs: Column*),orderBy(sortExprs: Column*)
相当于SQL中的order by子句,它返回按指定的列排序后的新数据集。例如,要按电影名称长度顺序以及上映年份倒序显示,代码如下:
movies.dropDuplicates(Seq("title", "year")) // 去重 .selectExpr("title", "length(title) as title_length", "year") .orderBy('title_length.asc, 'year.desc) // asc:升序,desc:倒序 .show(10,false)
执行以上代码,输出内容如下:
+-----+------------+----+ |title|title_length|year| +-----+------------+----+ |Up |2 |2009| |21 |2 |2008| |12 |2 |2007| |RV |2 |2006| |X2 |2 |2003| |Rio |3 |2011| |Hop |3 |2011| |300 |3 |2006| |Saw |3 |2004| |Elf |3 |2003| +-----+------------+----+ only showing top 10 rows
7)groupBy(cols: Column*),groupBy(col1: String, cols: String*)
相当于SQL中的group by子句,按指定的列对数据进行分组,以便执行聚合统计操作。例如,统计每年上映的电影数量并按数量倒序显示,代码如下:
movies.groupBy("year") .count() .orderBy($"count".desc) .show(10)
执行以上代码,输出内容如下:
+----+-----+ |year|count| +----+-----+ |2006| 2078| |2004| 2005| |2007| 1986| |2005| 1960| |2011| 1926| |2008| 1892| |2009| 1890| |2010| 1843| |2002| 1834| |2001| 1687| +----+-----+ only showing top 10 rows
如果要统计上映电影数量超过2000部的年份,代码如下:
movies.groupBy("year") .count() .where($"count" > 2000) .show()
执行以上代码,输出内容如下:
+----+-----+ |year|count| +----+-----+ |2006| 2078| |2004| 2005| +----+-----+
8)limit(n: Int)
通过获取前n行返回一个新的Dataset,相当于SQL中的limit子句。通常将其与orderBy配合来实现Top N算法。例如,统计上映电影数量最多的5个年份,代码如下:
movies.groupBy("year") .count() .orderBy($"count".desc) .limit(5) .show()
执行以上代码,输出内容如下:
+----+-----+ |year|count| +----+-----+ |2006| 2078| |2004| 2005| |2007| 1986| |2005| 1960| |2011| 1926| +----+-----+
查询电影名称最长的5部电影,代码如下:
movies.dropDuplicates("title") // 去重 .selectExpr("title", "length(title) as title_length") .orderBy(col("title_length").desc) // 倒序 .limit(5) .show(false)
执行以上代码,输出内容如下:
+-----------------------------------------------------------------------------------+------------+ |title |title_length| +-----------------------------------------------------------------------------------+------------+ |Borat: Cultural Learnings of America for Make Benefit Glorious Nation of Kazakhstan|83 | |The Chronicles of Narnia: The Lion, the Witch and the Wardrobe |62 | |Hannah Montana & Miley Cyrus: Best of Both Worlds Concert |57 | |The Chronicles of Narnia: The Voyage of the Dawn Treader |56 | |Istoriya pro Richarda, milorda i prekrasnuyu Zhar-ptitsu |56 | +-----------------------------------------------------------------------------------+------------+
9)union(other: Dataset[T]),unionAll(other: Dataset[T])
按列位置合并两个数据集的行,相当于SQL中的union all。假设现在想在名称为“12”的电影中添加一个缺失的演员,可以采用如下的方法:另外创建一个DataFrame,其中包含所缺失的演员信息,然后将原数据集与这个含有缺失演员信息的新数据集执行一个union操作,将两个数据集合并在一起。这样就将缺失的演员添加到了原数据集中。
首先获得电影“12”的数据集,代码如下:
val shortNameMovieDF = movies.where($"title" === "12") shortNameMovieDF.show()
执行以上代码,输出内容如下:
+--------------------+-----+----+ | actor|title|year| +--------------------+-----+----+ | Efremov, Mikhail| 12|2007| | Stoyanov, Yuriy| 12|2007| | Gazarov, Sergey| 12|2007| |Verzhbitskiy, Viktor| 12|2007| +--------------------+-----+----+
可以看到,该部电影有四个演员,但是缺少了演员Brychta,Edita的信息。接下来,另外创建一个DataFrame,包含所缺失演员Brychta,Edita的信息。然后将这个DataFrame和上面的DataFrame进行合并,这样就将演员Brychta,Edita的信息合并到上面的数据集中了,代码如下:
// 另创建一个DataFrame val forgottenActor = Seq(("Brychta, Edita", "12", 2007L)) val forgottenActorDF = forgottenActor.toDF("actor","title","year") // 通过合并两个DataFrame,实现添加缺失的演员姓名 val completeShortNameMovieDF = shortNameMovieDF.union(forgottenActorDF) completeShortNameMovieDF.show()
执行以上代码,输出内容如下:
+--------------------+-----+----+ | actor|title|year| +--------------------+-----+----+ | Efremov, Mikhail| 12|2007| | Stoyanov, Yuriy| 12|2007| | Gazarov, Sergey| 12|2007| |Verzhbitskiy, Viktor| 12|2007| | Brychta, Edita| 12|2007| +--------------------+-----+----+
如果要实现类SQL中UNION的功能,则在union()操作之后使用distinct()操作,就可以去掉重复的数据了。
10) withColumn(colName: String, col: Column)
通过添加列或替换具有相同名称的现有列,返回一个新的数据集。例如,向movies数据集增加一个新列decade,该列的值是基于year这个列的表达式,计算的结果是该电影上映的年代,代码如下:
movies.withColumn("decade", $"year" - $"year" % 10).show(5)
执行以上代码,输出内容如下:
+-----------------+-------------+----+------+ | actor| title|year|decade| +-----------------+-------------+----+------+ |McClure, Marc (I)|Freaky Friday|2003| 2000| |McClure, Marc (I)| Coach Carter|2005| 2000| |McClure, Marc (I)| Superman II|1980| 1980| |McClure, Marc (I)| Apollo 13|1995| 1990| |McClure, Marc (I)| Superman|1978| 1970| +-----------------+-------------+----+------+ only showing top 5 rows
如果传给withColumn函数的列名与现有列名相同,则意味着用新值替换旧值。例如,将year这一列的值替换为年代(原值为年份),代码如下:
movies.withColumn("year", $"year" - $"year" % 10).show(5)
执行以上代码,输出内容如下:
+-----------------+-------------+----+ | actor| title|year| +-----------------+-------------+----+ |McClure, Marc (I)|Freaky Friday|2000| |McClure, Marc (I)| Coach Carter|2000| |McClure, Marc (I)| Superman II|1980| |McClure, Marc (I)| Apollo 13|1990| |McClure, Marc (I)| Superman|1970| +-----------------+-------------+----+ only showing top 5 rows
11) withColumnRenamed(existingName: String, newName: String)
返回一个重命名列的新数据集。如果模式不包含existingName,则不做任何操作。例如,将movies数据集中的列名改为新的名称,代码如下:
movies .withColumnRenamed("actor", "actor_name") .withColumnRenamed("title", "movie_title") .withColumnRenamed("year", "produced_year") .show(5)
执行以上代码,输出内容如下:
+-----------------+-------------+-------------+ | actor_name| movie_title|produced_year| +-----------------+-------------+-------------+ |McClure, Marc (I)| Freaky Friday| 2003| |McClure, Marc (I)| Coach Carter| 2005| |McClure, Marc (I)| Superman II| 1980| |McClure, Marc (I)| Apollo 13| 1995| |McClure, Marc (I)| Superman| 1978| +-----------------+-------------+-------------+ only showing top 5 rows
12) drop(col: Column),drop(colNames: String*)
返回一个删除了指定列的新Dataset。如果要被删除的列不存在,则不做任何操作。例如,删除movies数据集中指定的列,代码如下:
// 删除指定的列,包括一个并不存在的列”me” val dropedMovies = movies.drop("actor", "me") dropedMovies.printSchema() dropedMovies.show(5)
执行以上代码,输出内容如下:
root |-- title: string (nullable = true) |-- year: integer (nullable = true) +-------------+----+ | title|year| +-------------+----+ |Freaky Friday|2003| | Coach Carter|2005| | Superman II|1980| | Apollo 13|1995| | Superman|1978| +-------------+----+ only showing top 5 rows
从输出结果可以看出,actor这一列被删除了,而me这一列在原DataFrame中并不存在,所以删除不存在的列没有任何影响。
13) sample(fraction: Double)
数据抽样。通过使用随机种子对部分行进行抽样(不进行替换),返回一个新的Dataset。抽样比例由fraction参数决定,它指定要生成的行的比例,范围[0.0,1.0]。这个方法有多个不同的重载版本:
- (1) sample(fraction: Double, seed: Long):指定种子。
- (2) sample(withReplacement: Boolean, fraction: Double):指定是否替换,使用随机种子。
- (3) sample(withReplacement: Boolean, fraction: Double, seed: Long):同上,但指定种子。
例如,指定无放回抽样和抽样比例,代码如下:
movies.sample(false, 0.0003).show(false)
执行以上代码,输出内容如下(随机的):
+--------------------+-------------------------------------------+----+ |actor |title |year| +--------------------+-------------------------------------------+----+ |Baskin, Elya |Austin Powers: International Man of Mystery|1997| |Rapace, Noomi |Prometheus |2012| |Hatcher, Teri |Coraline |2009| |Damon, Matt |The Bourne Supremacy |2004| |Novak, B.J. |Knocked Up |2007| |Masamune, Tohoru |Jumper |2008| |Cruise, Tom |Collateral |2004| |Harvey, Steve (I) |Racing Stripes |2005| |Giuliani, Rudolph W.|Anger Management |2003| |Sloan, Amy |The Aviator |2004| |Krumholtz, David (I)|The Mexican |2001| +--------------------+-------------------------------------------+----+
执行有放回抽样并指定比例因子、种子的抽样,代码如下:
movies.sample(true, 0.0003, 123456).show()
执行以上代码,输出内容如下(保持不变的):
+-----------------+--------------------+----+ | actor| title|year| +-----------------+--------------------+----+ | Piddock, Jim| The Prestige|2006| | Reed, Tanoai| The Stepford Wives|2004| | Moyo, Masasa| Angels & Demons|2009| | Zemeckis, Leslie| Beowulf|2007| | Huston, Danny|X-Men Origins: Wo...|2009| | Pompeo, Ellen| Old School|2003| | Utt, Kenneth (I)| Philadelphia|1993| |Cannon, Kevin (I)| Cop Out|2010| +-----------------+--------------------+----+
对DataFrame进行action操作
与RDD类似,当在DataFrame上执行action操作时,会触发真正的计算。这些action操作相对都比较简单,在下面的代码中演示了这些action方法的使用:
// 查看前5条数据。第2个参数指定当列内容较长时,是否截断显示,false为不截断 movies.show(5,false) // 返回数据集中的数量 movies.count // 返回数据集中第1条数据 movies.first() // 等价于first方法 movies.head() // 返回数据集中前3条数据,以Array形式 movies.head(3) // 返回数据集中前3条数据,以Array形式 movies.take(3) // 返回数据集中前3条数据,以List形式 movies.takeAsList(3) // 返回一个包含数据集中所有行的数组 movies.collect // 返回一个包含数据集中所有行的数组,以List形式 movies.collectAsList // 返回数据集的数据类型,以Array形式 df.types // 返回数据集的列名,以Array形式 df.columns