PySpark SQL编程模型
所有的PySpark SQL应用程序都以特定的步骤来工作。这些工作步骤如下图所示:
也就是说,每个PySpark SQL应用程序都由相同的基本部分组成:
- 从数据源加载数据,构造DataFrame;
- 对DataFrame执行转换(transformation)操作;
- 将最终的DataFrame存储到指定位置。
下面我们实现一个完整的的PySpark SQL应用程序。
【示例】加载json文件中的人员信息,并进行统计。请按以下步骤执行。
1)准备数据源文件。
Spark安装目录中自带了一个people.json文件,位于“examples/src/main/resources/”目录下。其内容如下:
{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}
我们将这个people.json文件,拷贝到/home/hduser/data/spark/resources/目录下。
2)创建一个Jupyter Notebook文件,编辑代码如下:
from pyspark.sql import SparkSession
# 1)构建SparkSession实例
spark = SparkSession.builder \
.master("spark://xueai8:7077") \
.appName("pyspark sql demo") \
.getOrCreate()
# 2)加载数据源,构造DataFrame
input = "file:///home/hduser/data/spark/resources/people.json"
df = spark.read.json(input)
# 3)执行转换操作
from pyspark.sql.functions import *
# 找出年龄超过21岁的人
resultDF = df.where(col("age") > 21)
# 显示DataFrame数据
resultDF.show()
# 4)将结果保存到csv文件中
output = "/data/spark/people-output"
resultDF.write.format("csv").save(output)
# 5)停止SparkSession
spark.stop()
3)执行过程和执行结果如下:
在本例中,我们使用了本地文件系统。大家可自行修改代码,使用HDFS文件系统。