RDD案例_电影评分数据集分析

任务描述:请使用Spark RDD实现对电影数据集进行分析,找出平均评分超过4.0的电影列表。

数据集说明:
这里我们使用推荐领域一个著名的开放测试数据集movielens。我们将使用其中的电影评分数据集ratings.csv以及电影数据集movies.csv。
数据集位于PBLP平台的“~/data/spark/movielens/”目录下。

【例】请找出平均评分超过4.0的电影,列表显示,并将查找结果保存到文件中。

请按以下步骤执行。

1) 加载数据,构造RDD。

val ratingsFile = "/home/hduser/data/spark/movielens/ratings.csv"      // 评分数据集
val moviesFile = "/home/hduser/data/spark/movielens/movies.csv"        // 电影数据集

val ratingsRDD = sc.textFile(ratingsFile)
val moviesRDD = sc.textFile(moviesFile)

2) 简单探索。

println("电影评分数据集中记录数量:" + ratingsRDD.count())
println("电影数据集中记录数量:" + moviesRDD.count())

3) 从评分数据集中抽取每部电影的评分,以(movieid, rating)的形式返回。

val movieAvgScore = ratingsRDD
      // 因为第一行是标题行,所以需要过滤掉
      .filter(line => !line.startsWith("userId"))    
      // 转换为(movieid, rating)元组形式
      .map(line => {
            val fields = line.split(",")
            (fields(1).trim.toInt, fields(2).trim.toDouble)
      })
      // 按电影id分组
      .groupByKey()
      // 计算每部 电影的平均评分,格式为元组(movieid, avg_rating)
      .map(t => (t._1, t._2.sum/t._2.size))
      // 过滤出平均评分超过4.0的
      .filter(t => t._2 > 4.0)

4) 从电影数据集中抽取电影名称,以(movieId, title)的形式返回。

val moviesInfo = moviesRDD
      // 因为第一行是标题行,所以需要过滤掉
      .filter(line => !line.startsWith("movieId"))
      // 转换为(movieid, title)元组形式
      .map(line => {
             val fields = line.split(",")
             (fields(0).toInt, fields(1))
      })

5) 将两个数据集连接起来,得到(movieId, title, avgScore)类型结果。

val result =  movieAvgScore.join(moviesInfo)
      .map(f => (f._2._1,(f._1, f._2._2, f._2._1)))
      .sortByKey(ascending = false)
      .map(t => t._2)

6) 结果列表显示。

result.collect.foreach(println)

7) 将查询结果保存到HDFS文件系统中。

result.saveAsTextFile("/data/spark/movielens/result")

完整示例代码

完整示例代码如下:

import org.apache.spark.sql.SparkSession

object MoviesDemo {
  def main(args: Array[String]): Unit = {

    // 创建一个SparkContext来初始化Spark
    // 2.0 以前的用法
    // val conf = new SparkConf().setMaster("local").setAppName("movie demo")
    // val sc = new SparkContext(conf)

    // 2.0 以后的用法
    val spark = SparkSession.builder().master("local[*]").appName("movie demo").getOrCreate()
    val sc = spark.sparkContext

    // 加载数据,构造RDD(注:这里数据集放在项目的src/data/movielens/目录下)
    val ratingsFile = "src/data/movielens/ratings.csv"      // 评分数据集
    val moviesFile = "src/data/movielens/movies.csv"        // 电影数据集

    val ratingsRDD = sc.textFile(ratingsFile)
    val moviesRDD = sc.textFile(moviesFile)

    // 从评分数据集中抽取每部电影的评分,以(movieid, rating)的形式返回
    // 因为第一行是标题行,所以过滤掉
    val movieAvgScore = ratingsRDD
      .filter(line => !line.startsWith("userId"))
      .map(line => {val fields = line.split(","); (fields(1).trim.toInt, fields(2).trim.toDouble)})
      .groupByKey()
      .map(t => (t._1, t._2.sum/t._2.size))
      .filter(t => t._2 > 4.0)

    // 从电影数据集中抽取电影名称,以(movieId, movieName)的形式返回
    // 因为第一行是标题行,所以过滤掉
    val moviesInfo = moviesRDD
      .filter(line => !line.startsWith("movieId"))
      .map(line => {val fields = line.split(","); (fields(0).toInt, fields(1))})

    // 将两个数据集连接起来,得到(movieId, movieName, avgScore)
    val result =  movieAvgScore.join(moviesInfo)
      .map(f => (f._2._1,(f._1, f._2._2, f._2._1)))
      .sortByKey(ascending = false)
      .map(t => t._2)

    // 列表显示
    result.collect.foreach(println)

    // 将查询结果保存到HDFS文件系统中
    result.saveAsTextFile("/data/spark/movielens/result")
  }
}

《Spark原理深入与编程实战》