PySpark SQL编程案例:单词计数

下面这个示例中,使用DataFrame和SQL两种方式来实现单词计数功能。

【示例】统计某个英文文本中的词频,找出出现频次最高的三个单词。实现过程和代码如下所示。

(1) 准备数据文件。请自行创建一个纯文本文件word.txt,并编辑内容如下:

good good study
day day up

将该文件上传到HDFS的/data/spark/目录下。

(2) 方法一:使用关系型 API实现单词计数,代码如下:

from pyspark.sql import SparkSession

# 构建SparkSession实例
spark = SparkSession.builder \
   .master("spark://localhost:7077") \
   .appName("pyspark sql demo") \
   .getOrCreate()

# 定义文件路径
filePath = "/data/spark/words.txt"
df = spark.read.text(filePath)

# df.printSchema()
# df.show()

# 对DataFrame进行一系列处理,产生一个包含最终结果的DataFrame
wordDF = wordDF.rdd \
    .flatMap(lambda line: line.value.split(" ")) \
    .map(lambda word: (word, 1)) \
    .toDF(["word", "one"])

# wordDF.show()

# 获得前3个出现频率最高的词
from pyspark.sql.functions import col
top3 = wordDF \
	.groupBy("word") \
	.count() \
	.orderBy(col("count").desc()) \
	.limit(3)

# 输出结果
top3.show()

执行以上代码,输出结果如下:

+----+-----+
|word|count|
+----+-----+
| day|    2|
|good|    2|
|  up|     1|
+----+-----+

(3) 方法二:使用SQL语句,代码如下:

from pyspark.sql import SparkSession

# 构建SparkSession实例
spark = SparkSession.builder \
   .master("spark://localhost:7077") \
   .appName("pyspark sql demo") \
   .getOrCreate()

# 定义文件路径
filePath = "/data/spark/words.txt"
df = spark.read.text(filePath)

# df.printSchema()
# df.show()

# 对DataFrame进行一系列处理,产生一个包含最终结果的DataFrame
wordDF = wordDF.rdd \
    .flatMap(lambda line: line.value.split(" ")) \
    .map(lambda word: (word, 1)) \
    .toDF(["word", "one"])

# wordDF.show()

# 注册为临时view
wordDF.createOrReplaceTempView("wc_tb")

# 执行SQL查询,分析产生结果
sql = """
          select word,count(1) as count
          from wc_tb
          group by word
          order by count desc
      """
resultDF = spark.sql(sql)
resultDF.limit(3).show()

执行以上代码,输出结果如下:

+----+-----+
|word|count|
+----+-----+
| day|    2|
|good|    2|
|  up|     1|
+----+-----+

《Flink原理深入与编程实战》