临时视图与SQL查询
PySpark SQL支持直接应用标准SQL语句进行查询。当在PySpark SQL中编写SQL命令时,它们会被翻译为DataFrame上的关系操作。在SQL语句内,可以访问所有SQL表达式和内置函数。
这需要使用SparkSession的sql()函数执行给定的SQL查询,该查询会返回一个DataFrame。
1. 在PySpark程序中执行SQL语句
PySpark SQL支持使用基本SQL语法或HiveQL编写的SQL查询的执行。在pyspark shell或Zeppelin Notebook中,会自动导入spark.sql,所以可以直接使用该函数用来编写SQL命令,代码如下:
spark.sql("select current_date() as today , 1 + 100 as value").show()
SparkSession的sql()函数执行给定的SQL查询,该查询会返回一个DataFrame。
本节只讨论最后一个选项,即在PySpark应用程序中以编程方式运行SQL。例如,执行一个不带注册视图的SQL语句,代码如下:
from pyspark.sql import SparkSession # 构建SparkSession实例 spark = SparkSession.builder \ .master("spark://localhost:7077") \ .appName("pyspark sql demo") \ .getOrCreate() infoDF = spark.sql("select current_date() as today , 1 + 100 as value") infoDF.show()
执行以上代码,输出内容如下:
+----------+-----+ | today|value| +----------+-----+ |2022-02-11| 101| +----------+-----+
除了使用PySpark读API将文件加载到DataFrame并对其进行查询外,PySpark也可以使用SQL语句直接查询该数据文件,代码如下:
sqlDF = spark.sql(""" SELECT * FROM parquet.`/data/spark/resources/users.parquet` """) sqlDF.show()
执行以上代码,输出内容如下:
+------+--------------+----------------+ | name|favorite_color|favorite_numbers| +------+--------------+----------------+ |Alyssa| null| [3, 9, 15, 20]| | Ben| red| []| +------+--------------+----------------+
2. 注册临时视图并执行SQL查询
DataFrame本质上就像数据库中的表一样,可以通过SQL语句来查询它们。不过,在可以发出SQL查询来操纵它们之前,需要将它们注册为一个临时视图,然后,就可以使用SQL查询从临时表中查询数据了。每个临时视图都有一个名字,通过视图的名字来引用该DataFrame,该名字在select子句中用作表名。
例如,要查询电影数据集movies.parquet文件,代码如下:
from pyspark.sql import SparkSession # 构建SparkSession实例 spark = SparkSession.builder \ .master("spark://localhost:7077") \ .appName("pyspark sql demo") \ .getOrCreate() # 定义文件路径 parquetFile = "/data/spark/movies2/movies.parquet" # 读取到DataFrame中 movies = spark.read.parquet(parquetFile) # 现在将movies DataFrame注册为一个临时视图 movies.createOrReplaceTempView("movies") # 从视图view查询 sql = """ select * from movies where actor_name like '%Jolie%' and produced_year > 2009 """ spark.sql(sql).show()
执行以上代码,输出内容如下:
+---------------+---------------+-------------+ | actor_name| movie_title|produced_year| +---------------+---------------+-------------+ |Jolie, Angelina| Salt| 2010| |Jolie, Angelina|Kung Fu Panda 2| 2011| |Jolie, Angelina| The Tourist| 2010| +---------------+---------------+-------------+
也可以在sql()函数中混合使用SQL语句和DataFrame转换API。例如,查询电影数据集,找出参演影片超过30部的演员,代码如下:
from pyspark.sql import SparkSession from pyspark.sql.functions import col # 构建SparkSession实例 spark = SparkSession.builder \ .master("spark://localhost:7077") \ .appName("pyspark sql demo") \ .getOrCreate() # 定义文件路径 parquetFile = "/data/spark/movies2/movies.parquet" # 读取到DataFrame中 movies = spark.read.parquet(parquetFile) # 现在将movies DataFrame注册为一个临时视图 movies.createOrReplaceTempView("movies") # 从视图view查询 spark.sql("select actor_name, count(*) as count from movies group by actor_name") \ .where('count > 30') \ .orderBy(col("count").desc()) \ .show()
执行以上代码,输出内容如下:
+------------------+-----+ | actor_name|count| +------------------+-----+ | Tatasciore, Fred| 38| | Welker, Frank| 38| |Jackson, Samuel L.| 32| | Harnell, Jess| 31| +------------------+-----+
当SQL语句较长时,可以利用"""(三引号)来格式化多行SQL语句。例如,查询电影数据集,使用子查询来计算每年产生的电影数量,代码如下:
from pyspark.sql import SparkSession from pyspark.sql.functions import col # 构建SparkSession实例 spark = SparkSession.builder \ .master("spark://localhost:7077") \ .appName("pyspark sql demo") \ .getOrCreate() # 定义文件路径 parquetFile = "/data/spark/movies2/movies.parquet" # 读取到DataFrame中 movies = spark.read.parquet(parquetFile) # 现在将movies DataFrame注册为一个临时视图 movies.createOrReplaceTempView("movies") # 使用子查询来计算每年产生的电影数量(利用"""来格式化多行SQL语句) spark.sql("""select produced_year, count(*) as count from (select distinct movie_title, produced_year from movies) group by produced_year """) \ .orderBy(col("count").desc()) \ .show(5)
执行以上代码,输出内容如下:
+-------------+-----+ |produced_year|count| +-------------+-----+ | 2011| 86| | 2004| 86| | 2006| 86| | 2005| 85| | 2008| 82| +-------------+-----+ only showing top 5 rows
3. 直接使用数据源注册临时视图
在前面的示例中,都是先将数据加载到一个DataFrame中,然后将该DataFrame注册为临时视图或全局视图。除此之外,也可以使用SparkSession的sql()方法从注册的数据源直接加载数据注册临时视图。
例如,注册一个Parquet文件并加载它的内容,代码如下:
from pyspark.sql import SparkSession # 构建SparkSession实例 spark = SparkSession.builder \ .master("spark://localhost:7077") \ .appName("pyspark sql demo") \ .getOrCreate() # 从parquet数据源创建临时视图 spark.sql("create temporary view usersParquet "+ "using org.apache.spark.sql.parquet "+ "options(path '/data/spark/resources/users.parquet')") # 查询临时视图 spark.sql("select * from usersParquet").show()
执行以上代码,输出结果如下:
+------+--------------+----------------+ | name|favorite_color|favorite_numbers| +------+--------------+----------------+ |Alyssa| null| [3, 9, 15, 20]| | Ben| red| []| +------+--------------+----------------+
下面是另一个使用内置数据源的例子。从JDBC注册一个临时视图,然后使用SQL语句查询该临时视图(这里连接的是MySQL 5数据库,读者如果使用的是其他版本的MySQL,请自行修改为相应版本的连接参数),代码如下:
from pyspark.sql import SparkSession # 构建SparkSession实例 spark = SparkSession.builder \ .master("spark://localhost:7077") \ .appName("pyspark sql demo") \ .getOrCreate() # 从jdbc数据源创建临时视图 spark.sql( """ create temporary view peoplesjdbc using org.apache.spark.sql.jdbc options( url 'jdbc:mysql://localhost:3306/xueai8', dbtable 'peoples', user 'root', password 'admin' ) """ ) # 在临时视图上执行查询 spark.sql("select * from peoplesjdbc").show()
执行以上代码,输出结果如下:
+---+------+---+ | id| name|age| +---+------+---+ | 1| 张三| 23| | 2| 李四| 18| | 3| 王老五| 35| +---+------+---+