临时视图与执行SQL查询
Spark SQL支持直接应用标准SQL语句进行查询。当在Spark SQL中编写SQL命令时,它们会被翻译为DataFrame上的关系操作。在SQL语句内,可以访问所有SQL表达式和内置函数。
这需要使用SparkSession的sql函数执行给定的SQL查询,该查询会返回一个DataFrame。
在Spark程序中运行SQL语句
Spark提供了几种在Spark中运行SQL的方法。
- Spark SQL CLI(./bin/spark-sql);
- JDBC/ODBC服务器;
- 在Spark应用程序以编程方式。
前两种选择提供了与Apache Hive的集成,以利用Hive的元数据。Spark SQL支持使用基本SQL语法或HiveQL编写的SQL查询的执行。
在Spark shell中,spark.sql是自动导入的,所以可以直接使用该函数用来编写SQL命令。例如:
sql("select current_date() as today , 1 + 100 as value").show()
本节只讨论最后一个选项,即在Spark应用程序中以编程方式运行SQL。
下面是执行一个不带注册视图的SQL语句的简单示例:
def main(args: Array[String]): Unit = { val spark:SparkSession = SparkSession.builder() .master("local[*]") .appName("Spark Dataset Demo") .getOrCreate() val infoDF = spark.sql("select current_date() as today , 1 + 100 as value") infoDF.show() }
输出结果如下所示:
+----------+-----+ | today|value| +----------+-----+ |2021-07-28| 101| +----------+-----+
从输出结果可以看出,执行sql函数返回的是一个DataFrame。
除了使用read API将文件加载到DataFrame并对其进行查询,Spark也可以使用SQL直接查询该文件。
def main(args: Array[String]): Unit = { val spark:SparkSession = SparkSession.builder() .master("local[*]") .appName("Spark Dataset Demo") .getOrCreate() val sqlDF = spark.sql("SELECT * FROM parquet.`src/main/resources/users.parquet`") sqlDF.show() }
执行以上代码,输出内容如下:
+------+--------------+----------------+ | name|favorite_color|favorite_numbers| +------+--------------+----------------+ |Alyssa| null| [3, 9, 15, 20]| | Ben| red| []| +------+--------------+----------------+
注册临时视图并查询
DataFrame和Dataset本质上就像数据库中的表一样,可以通过SQL语句来查询它们。不过,在可以发出SQL查询来操纵它们之前,需要将它们注册为一个临时视图,然后,就可以使用SQL查询从临时表中查询数据了。每个临时视图都有一个名字,通过视图的名字来引用该DataFrame,该名字在select子句中用作表名。
请看下面的示例。
【示例】使用SQL语句来查询电影数据集。
def main(args: Array[String]): Unit = { val spark:SparkSession = SparkSession.builder() .master("local[*]") .appName("Spark Dataset Demo") .getOrCreate() // 定义文件路径 val parquetFile = "src/main/resources/movies.parquet" // 读取到DataFrame中 val movies = spark.read.parquet(parquetFile) // 现在将movies DataFrame注册为一个临时视图 movies.createOrReplaceTempView("movies") // 从视图view查询 spark.sql("select * from movies where actor_name like '%Jolie%' and produced_year > 2009").show() }
输出结果如下所示:
+---------------+---------------+-------------+ | actor_name| movie_title|produced_year| +---------------+---------------+-------------+ |Jolie, Angelina| Salt| 2010| |Jolie, Angelina|Kung Fu Panda 2| 2011| |Jolie, Angelina| The Tourist| 2010| +---------------+---------------+-------------+
也可以在sql函数中混合使用SQL语句和DataFrame转换API。请看下面的示例。
【示例】查询电影数据集,找出参演影片超过30部的演员。
def main(args: Array[String]): Unit = { val spark:SparkSession = SparkSession.builder() .master("local[*]") .appName("Spark Dataset Demo") .getOrCreate() // 定义文件路径 val parquetFile = "src/main/resources/movies.parquet" // 读取到DataFrame中 val movies = spark.read.parquet(parquetFile) // 现在将movies DataFrame注册为一个临时视图 movies.createOrReplaceTempView("movies") // 查询参演影片超过30部的演员: import spark.implicits._ spark.sql("select actor_name, count(*) as count from movies group by actor_name") .where('count > 30) .orderBy('count.desc) .show() }
输出结果如下所示:
+------------------+-----+ | actor_name|count| +------------------+-----+ | Tatasciore, Fred| 38| | Welker, Frank| 38| |Jackson, Samuel L.| 32| | Harnell, Jess| 31| +------------------+-----+
当SQL语句较长时,可以利用"""(三引号)来格式化多行SQL语句。请看下面的示例。
【示例】查询电影数据集,使用子查询来计算每年产生的电影数量。
def main(args: Array[String]): Unit = { val spark:SparkSession = SparkSession.builder() .master("local[*]") .appName("Spark Dataset Demo") .getOrCreate() // 定义文件路径 val parquetFile = "src/main/resources/movies.parquet" // 读取到DataFrame中 val movies = spark.read.parquet(parquetFile) // 现在将movies DataFrame注册为一个临时视图 movies.createOrReplaceTempView("movies") // 使用子查询来计算每年产生的电影数量(利用"""来格式化多行SQL语句) import spark.implicits._ spark.sql("""select produced_year, count(*) as count from (select distinct movie_title, produced_year from movies) group by produced_year """) .orderBy('count.desc) .show(5) }
输出结果如下所示:
+-------------+-----+ |produced_year|count| +-------------+-----+ | 2011| 86| | 2006| 86| | 2004| 86| | 2005| 85| | 2008| 82| +-------------+-----+
注:
Spark实现了ANSI SQL:2003修订版(最流行的RDBMS服务器支持)的一个子集。此外,Spark 2.0通过包含一个新的ANSI SQL解析器扩展了Spark SQL功能,支持子查询和SQL:2003标准。更具体地说,子查询支持现在包括相关/不相关的子查询,以及WHERE / HAVING子句中的IN / NOT IN和EXISTS / NOT EXISTS谓词。