临时视图与执行SQL查询

Spark SQL支持直接应用标准SQL语句进行查询。当在Spark SQL中编写SQL命令时,它们会被翻译为DataFrame上的关系操作。在SQL语句内,可以访问所有SQL表达式和内置函数。

这需要使用SparkSession的sql函数执行给定的SQL查询,该查询会返回一个DataFrame。

在Spark程序中运行SQL语句

Spark提供了几种在Spark中运行SQL的方法。

  • Spark SQL CLI(./bin/spark-sql);
  • JDBC/ODBC服务器;
  • 在Spark应用程序以编程方式。

前两种选择提供了与Apache Hive的集成,以利用Hive的元数据。Spark SQL支持使用基本SQL语法或HiveQL编写的SQL查询的执行。

在Spark shell中,spark.sql是自动导入的,所以可以直接使用该函数用来编写SQL命令。例如:

sql("select current_date() as today , 1 + 100 as value").show()

本节只讨论最后一个选项,即在Spark应用程序中以编程方式运行SQL。

下面是执行一个不带注册视图的SQL语句的简单示例:

  def main(args: Array[String]): Unit = {
    val spark:SparkSession = SparkSession.builder()
      .master("local[*]")
      .appName("Spark Dataset Demo")
      .getOrCreate()

    val infoDF = spark.sql("select current_date() as today , 1 + 100 as value")
    infoDF.show()
  }

输出结果如下所示:

+----------+-----+
|     today|value|
+----------+-----+
|2021-07-28|  101|
+----------+-----+

从输出结果可以看出,执行sql函数返回的是一个DataFrame。

除了使用read API将文件加载到DataFrame并对其进行查询,Spark也可以使用SQL直接查询该文件。

  def main(args: Array[String]): Unit = {
    val spark:SparkSession = SparkSession.builder()
      .master("local[*]")
      .appName("Spark Dataset Demo")
      .getOrCreate()

    val sqlDF = spark.sql("SELECT * FROM parquet.`src/main/resources/users.parquet`")
    sqlDF.show()
  }

执行以上代码,输出内容如下:

+------+--------------+----------------+
|  name|favorite_color|favorite_numbers|
+------+--------------+----------------+
|Alyssa|          null|  [3, 9, 15, 20]|
|   Ben|           red|              []|
+------+--------------+----------------+

注册临时视图并查询

DataFrame和Dataset本质上就像数据库中的表一样,可以通过SQL语句来查询它们。不过,在可以发出SQL查询来操纵它们之前,需要将它们注册为一个临时视图,然后,就可以使用SQL查询从临时表中查询数据了。每个临时视图都有一个名字,通过视图的名字来引用该DataFrame,该名字在select子句中用作表名。

请看下面的示例。

【示例】使用SQL语句来查询电影数据集。

  def main(args: Array[String]): Unit = {
    val spark:SparkSession = SparkSession.builder()
      .master("local[*]")
      .appName("Spark Dataset Demo")
      .getOrCreate()

    // 定义文件路径
    val parquetFile = "src/main/resources/movies.parquet"

    // 读取到DataFrame中
    val movies = spark.read.parquet(parquetFile)

    // 现在将movies DataFrame注册为一个临时视图
    movies.createOrReplaceTempView("movies")

    // 从视图view查询
    spark.sql("select * from movies where actor_name like '%Jolie%' and produced_year > 2009").show()
  }

输出结果如下所示:

+---------------+---------------+-------------+
|     actor_name|    movie_title|produced_year|
+---------------+---------------+-------------+
|Jolie, Angelina|           Salt|         2010|
|Jolie, Angelina|Kung Fu Panda 2|         2011|
|Jolie, Angelina|    The Tourist|         2010|
+---------------+---------------+-------------+

也可以在sql函数中混合使用SQL语句和DataFrame转换API。请看下面的示例。

【示例】查询电影数据集,找出参演影片超过30部的演员。

  def main(args: Array[String]): Unit = {
    val spark:SparkSession = SparkSession.builder()
      .master("local[*]")
      .appName("Spark Dataset Demo")
      .getOrCreate()

    // 定义文件路径
    val parquetFile = "src/main/resources/movies.parquet"

    // 读取到DataFrame中
    val movies = spark.read.parquet(parquetFile)

    // 现在将movies DataFrame注册为一个临时视图
    movies.createOrReplaceTempView("movies")

    // 查询参演影片超过30部的演员:
    import spark.implicits._
    spark.sql("select actor_name, count(*) as count from movies group by actor_name")
      .where('count > 30)
      .orderBy('count.desc)
      .show()
  }

输出结果如下所示:

+------------------+-----+
|        actor_name|count|
+------------------+-----+
|  Tatasciore, Fred|   38|
|     Welker, Frank|   38|
|Jackson, Samuel L.|   32|
|     Harnell, Jess|   31|
+------------------+-----+

当SQL语句较长时,可以利用"""(三引号)来格式化多行SQL语句。请看下面的示例。

【示例】查询电影数据集,使用子查询来计算每年产生的电影数量。

  def main(args: Array[String]): Unit = {
    val spark:SparkSession = SparkSession.builder()
      .master("local[*]")
      .appName("Spark Dataset Demo")
      .getOrCreate()

    // 定义文件路径
    val parquetFile = "src/main/resources/movies.parquet"

    // 读取到DataFrame中
    val movies = spark.read.parquet(parquetFile)

    // 现在将movies DataFrame注册为一个临时视图
    movies.createOrReplaceTempView("movies")

    // 使用子查询来计算每年产生的电影数量(利用"""来格式化多行SQL语句)
    import spark.implicits._
    spark.sql("""select produced_year, count(*) as count
                from (select distinct movie_title, produced_year from movies)
                group by produced_year
             """)
         .orderBy('count.desc)
         .show(5)
  }

输出结果如下所示:

+-------------+-----+
|produced_year|count|
+-------------+-----+
|         2011|   86|
|         2006|   86|
|         2004|   86|
|         2005|   85|
|         2008|   82|
+-------------+-----+

注:

Spark实现了ANSI SQL:2003修订版(最流行的RDBMS服务器支持)的一个子集。此外,Spark 2.0通过包含一个新的ANSI SQL解析器扩展了Spark SQL功能,支持子查询和SQL:2003标准。更具体地说,子查询支持现在包括相关/不相关的子查询,以及WHERE / HAVING子句中的IN / NOT IN和EXISTS / NOT EXISTS谓词。


《Flink原理深入与编程实战》