PySpark SQL编程案例:单词计数
下面这个示例中,使用DataFrame和SQL两种方式来实现单词计数功能。
【示例】统计某个英文文本中的词频,找出出现频次最高的三个单词。实现过程和代码如下所示。
(1) 准备数据文件。请自行创建一个纯文本文件word.txt,并编辑内容如下:
good good study day day up
将该文件上传到HDFS的/data/spark/目录下。
(2) 方法一:使用关系型 API实现单词计数,代码如下:
from pyspark.sql import SparkSession # 构建SparkSession实例 spark = SparkSession.builder \ .master("spark://localhost:7077") \ .appName("pyspark sql demo") \ .getOrCreate() # 定义文件路径 filePath = "/data/spark/words.txt" df = spark.read.text(filePath) # df.printSchema() # df.show() # 对DataFrame进行一系列处理,产生一个包含最终结果的DataFrame wordDF = wordDF.rdd \ .flatMap(lambda line: line.value.split(" ")) \ .map(lambda word: (word, 1)) \ .toDF(["word", "one"]) # wordDF.show() # 获得前3个出现频率最高的词 from pyspark.sql.functions import col top3 = wordDF \ .groupBy("word") \ .count() \ .orderBy(col("count").desc()) \ .limit(3) # 输出结果 top3.show()
执行以上代码,输出结果如下:
+----+-----+ |word|count| +----+-----+ | day| 2| |good| 2| | up| 1| +----+-----+
(3) 方法二:使用SQL语句,代码如下:
from pyspark.sql import SparkSession # 构建SparkSession实例 spark = SparkSession.builder \ .master("spark://localhost:7077") \ .appName("pyspark sql demo") \ .getOrCreate() # 定义文件路径 filePath = "/data/spark/words.txt" df = spark.read.text(filePath) # df.printSchema() # df.show() # 对DataFrame进行一系列处理,产生一个包含最终结果的DataFrame wordDF = wordDF.rdd \ .flatMap(lambda line: line.value.split(" ")) \ .map(lambda word: (word, 1)) \ .toDF(["word", "one"]) # wordDF.show() # 注册为临时view wordDF.createOrReplaceTempView("wc_tb") # 执行SQL查询,分析产生结果 sql = """ select word,count(1) as count from wc_tb group by word order by count desc """ resultDF = spark.sql(sql) resultDF.limit(3).show()
执行以上代码,输出结果如下:
+----+-----+ |word|count| +----+-----+ | day| 2| |good| 2| | up| 1| +----+-----+