Spark SQL编程案例1_单词计数

下面这个示例中,我们使用DataFrame和SQL两种方式来实现单词计数功能。

【例】使用Spark Dataset API统计某个英文文本中的词频。

实现过程和代码如下所示。

1)准备数据文件。请自行创建一个纯文本文件words.txt,并编辑内容如下:

good good study
day day up
to be or not to be
this is a question

2)方法一:使用DSL API实现单词计数。

  def main(args: Array[String]): Unit = {
    // 0) 创建SparkSession的实例
    val spark = SparkSession.builder()
      .master("local[*]")
      .appName("Spark Basic Example")
      .getOrCreate()

    // 定义文件路径
    val filePath = "src/main/resources/words.txt"
    val wordDS = spark.read.textFile(filePath)

    // wordDS.printSchema()
    // wordDS.show()

    // implicit object提供了隐式转换,用于将Scala对象(包括rdd)转换为Dataset、DataFrame、Columns
    // implicit object是在SparkSession内部定义的
    // implicit object继承了SQLImplicits抽象类
    import spark.implicits._

    // 对Dataset进行一系列处理,产生一个包含最终结果的Dataset
    val wordDF = wordDS
      .flatMap(_.split("\\s+"))
      .filter(_.size>0)
      .groupByKey(_.toLowerCase)
      .count
      .toDF("word","count")

    wordDF.show()

    // 获得前3个出现频率最高的词
    val top3 = wordDF.orderBy($"count".desc).limit(3)

    // 输出结果
    top3.show()
  }

执行以上代码,输出结果如下所示:

+--------+-----+
|    word|count|
+--------+-----+
|     day|    2|
|     not|    1|
|   study|    1|
|      be|    2|
|      is|    1|
|      up|    1|
|question|    1|
|    good|    2|
|       a|    1|
|    this|    1|
|      or|    1|
|      to|    2|
+--------+-----+

+----+-----+
|word|count|
+----+-----+
|  be|    2|
|  to|    2|
|good|    2|
+----+-----+

3)方法二:使用SQL语句

  def main(args: Array[String]): Unit = {
    // 0) 创建SparkSession的实例
    val spark = SparkSession.builder()
      .master("local[*]")
      .appName("Spark Basic Example")
      .getOrCreate()

    // 读取输入文件
    val filePath = "src/main/resources/words.txt"
    val wordDS2 = spark.read.textFile(filePath)

    import spark.implicits._

    // 转换操作,然后返回加了列标题的DataFrame
    val wordDF2 = wordDS2
      .flatMap(_.split("\\s+"))
      .filter(_.size>0)
      .toDF("word")

    // 注册为临时view
    wordDF2.createOrReplaceTempView("wc_tb")

    // 执行SQL查询,分析产生结果
    val sql = """
          select word,count(1) as count
          from wc_tb
          group by word
          order by count desc
      """
    val resultDF = spark.sql(sql)
    resultDF.show()
  }

执行以上代码,输出结果如下所示:

+--------+-----+
|    word|count|
+--------+-----+
|    good|    2|
|      be|    2|
|     day|    2|
|      to|    2|
|      is|    1|
|     not|    1|
|      up|    1|
|   study|    1|
|question|    1|
|    this|    1|
|       a|    1|
|      or|    1|
+--------+-----+

《Flink原理深入与编程实战》