PySpark SQL编程模型
所有的PySpark SQL应用程序都以特定的步骤来工作。这些工作步骤如下图所示:
也就是说,每个PySpark SQL应用程序都由相同的基本部分组成:
- 从数据源加载数据,构造DataFrame;
- 对DataFrame执行转换(transformation)操作;
- 将最终的DataFrame存储到指定位置。
下面我们实现一个完整的的PySpark SQL应用程序。
【示例】加载json文件中的人员信息,并进行统计。请按以下步骤执行。
1)准备数据源文件。
Spark安装目录中自带了一个people.json文件,位于“examples/src/main/resources/”目录下。其内容如下:
{"name":"Michael"} {"name":"Andy", "age":30} {"name":"Justin", "age":19}
我们将这个people.json文件,拷贝到/home/hduser/data/spark/resources/目录下。
2)创建一个Jupyter Notebook文件,编辑代码如下:
from pyspark.sql import SparkSession # 1)构建SparkSession实例 spark = SparkSession.builder \ .master("spark://xueai8:7077") \ .appName("pyspark sql demo") \ .getOrCreate() # 2)加载数据源,构造DataFrame input = "file:///home/hduser/data/spark/resources/people.json" df = spark.read.json(input) # 3)执行转换操作 from pyspark.sql.functions import * # 找出年龄超过21岁的人 resultDF = df.where(col("age") > 21) # 显示DataFrame数据 resultDF.show() # 4)将结果保存到csv文件中 output = "/data/spark/people-output" resultDF.write.format("csv").save(output) # 5)停止SparkSession spark.stop()
3)执行过程和执行结果如下:
在本例中,我们使用了本地文件系统。大家可自行修改代码,使用HDFS文件系统。