PySpark SQL编程案例:用户数据集分析
【示例】下面这个示例分析PySpark安装包自带的people.txt文件内容,但是以编程方式指定模式。
首先加载people.txt到一个RDD中,代码如下:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
# 构建SparkSession实例
spark = SparkSession.builder \
.master("spark://localhost:7077") \
.appName("pyspark sql demo") \
.getOrCreate()
# 构造一个RDD
file = "/data/spark/resources/people.txt"
# 创建一个RDD
peopleRDD = spark.sparkContext.textFile(file)
# 指定一个Schema(模式)
fields = [
StructField("name", StringType(), True),
StructField("age", IntegerType(), True)
]
schema = StructType(fields)
# 将RDD[String]转换为RDD[(String, Int)]
rowRDD = peopleRDD \
.map(lambda line: line.split(",")) \
.map(lambda arr:(arr[0], int(arr[1])))
# 将这个schema应用到该RDD
peopleDF = spark.createDataFrame(rowRDD, schema)
# 输出 schema和内容
peopleDF.printSchema()
peopleDF.show()
执行以上代码,输出结果如下:
root |-- name: string (nullable = true) |-- age: integer (nullable = true) +-------+---+ | name|age| +-------+---+ |Michael| 29| | Andy| 30| | Justin| 19| +-------+---+
然后,注册一个临时视图,使用SQL对该视图进行查询,代码如下:
# 使用该DataFrame创建一个临时视图
peopleDF.createOrReplaceTempView("people")
# SQL可以在使用DataFrames创建的临时视图上运行
sqlStr = "SELECT name, age FROM people WHERE age BETWEEN 13 AND 19"
results = spark.sql(sqlStr)
results.show()
执行以上代码,输出内容如下:
+------------+ | value| +------------+ |Name: Justin| +------------+