PySpark SQL编程案例:用户数据集分析
【示例】下面这个示例分析PySpark安装包自带的people.txt文件内容,但是以编程方式指定模式。
首先加载people.txt到一个RDD中,代码如下:
from pyspark.sql import SparkSession from pyspark.sql.types import * from pyspark.sql.functions import * # 构建SparkSession实例 spark = SparkSession.builder \ .master("spark://localhost:7077") \ .appName("pyspark sql demo") \ .getOrCreate() # 构造一个RDD file = "/data/spark/resources/people.txt" # 创建一个RDD peopleRDD = spark.sparkContext.textFile(file) # 指定一个Schema(模式) fields = [ StructField("name", StringType(), True), StructField("age", IntegerType(), True) ] schema = StructType(fields) # 将RDD[String]转换为RDD[(String, Int)] rowRDD = peopleRDD \ .map(lambda line: line.split(",")) \ .map(lambda arr:(arr[0], int(arr[1]))) # 将这个schema应用到该RDD peopleDF = spark.createDataFrame(rowRDD, schema) # 输出 schema和内容 peopleDF.printSchema() peopleDF.show()
执行以上代码,输出结果如下:
root |-- name: string (nullable = true) |-- age: integer (nullable = true) +-------+---+ | name|age| +-------+---+ |Michael| 29| | Andy| 30| | Justin| 19| +-------+---+
然后,注册一个临时视图,使用SQL对该视图进行查询,代码如下:
# 使用该DataFrame创建一个临时视图 peopleDF.createOrReplaceTempView("people") # SQL可以在使用DataFrames创建的临时视图上运行 sqlStr = "SELECT name, age FROM people WHERE age BETWEEN 13 AND 19" results = spark.sql(sqlStr) results.show()
执行以上代码,输出内容如下:
+------------+ | value| +------------+ |Name: Justin| +------------+