RDD案例_电影评分数据集分析
任务描述:请使用Spark RDD实现对电影数据集进行分析,找出平均评分超过4.0的电影列表。
- 数据集说明:
- 这里我们使用推荐领域一个著名的开放测试数据集movielens。我们将使用其中的电影评分数据集ratings.csv以及电影数据集movies.csv。
- 数据集位于PBLP平台的“~/data/spark/movielens/”目录下。
【例】请找出平均评分超过4.0的电影,列表显示,并将查找结果保存到文件中。
请按以下步骤执行。
1) 加载数据,构造RDD。
val ratingsFile = "/home/hduser/data/spark/movielens/ratings.csv" // 评分数据集 val moviesFile = "/home/hduser/data/spark/movielens/movies.csv" // 电影数据集 val ratingsRDD = sc.textFile(ratingsFile) val moviesRDD = sc.textFile(moviesFile)
2) 简单探索。
println("电影评分数据集中记录数量:" + ratingsRDD.count()) println("电影数据集中记录数量:" + moviesRDD.count())
3) 从评分数据集中抽取每部电影的评分,以(movieid, rating)的形式返回。
val movieAvgScore = ratingsRDD // 因为第一行是标题行,所以需要过滤掉 .filter(line => !line.startsWith("userId")) // 转换为(movieid, rating)元组形式 .map(line => { val fields = line.split(",") (fields(1).trim.toInt, fields(2).trim.toDouble) }) // 按电影id分组 .groupByKey() // 计算每部 电影的平均评分,格式为元组(movieid, avg_rating) .map(t => (t._1, t._2.sum/t._2.size)) // 过滤出平均评分超过4.0的 .filter(t => t._2 > 4.0)
4) 从电影数据集中抽取电影名称,以(movieId, title)的形式返回。
val moviesInfo = moviesRDD // 因为第一行是标题行,所以需要过滤掉 .filter(line => !line.startsWith("movieId")) // 转换为(movieid, title)元组形式 .map(line => { val fields = line.split(",") (fields(0).toInt, fields(1)) })
5) 将两个数据集连接起来,得到(movieId, title, avgScore)类型结果。
val result = movieAvgScore.join(moviesInfo) .map(f => (f._2._1,(f._1, f._2._2, f._2._1))) .sortByKey(ascending = false) .map(t => t._2)
6) 结果列表显示。
result.collect.foreach(println)
7) 将查询结果保存到HDFS文件系统中。
result.saveAsTextFile("/data/spark/movielens/result")
完整示例代码
完整示例代码如下:
import org.apache.spark.sql.SparkSession object MoviesDemo { def main(args: Array[String]): Unit = { // 创建一个SparkContext来初始化Spark // 2.0 以前的用法 // val conf = new SparkConf().setMaster("local").setAppName("movie demo") // val sc = new SparkContext(conf) // 2.0 以后的用法 val spark = SparkSession.builder().master("local[*]").appName("movie demo").getOrCreate() val sc = spark.sparkContext // 加载数据,构造RDD(注:这里数据集放在项目的src/data/movielens/目录下) val ratingsFile = "src/data/movielens/ratings.csv" // 评分数据集 val moviesFile = "src/data/movielens/movies.csv" // 电影数据集 val ratingsRDD = sc.textFile(ratingsFile) val moviesRDD = sc.textFile(moviesFile) // 从评分数据集中抽取每部电影的评分,以(movieid, rating)的形式返回 // 因为第一行是标题行,所以过滤掉 val movieAvgScore = ratingsRDD .filter(line => !line.startsWith("userId")) .map(line => {val fields = line.split(","); (fields(1).trim.toInt, fields(2).trim.toDouble)}) .groupByKey() .map(t => (t._1, t._2.sum/t._2.size)) .filter(t => t._2 > 4.0) // 从电影数据集中抽取电影名称,以(movieId, movieName)的形式返回 // 因为第一行是标题行,所以过滤掉 val moviesInfo = moviesRDD .filter(line => !line.startsWith("movieId")) .map(line => {val fields = line.split(","); (fields(0).toInt, fields(1))}) // 将两个数据集连接起来,得到(movieId, movieName, avgScore) val result = movieAvgScore.join(moviesInfo) .map(f => (f._2._1,(f._1, f._2._2, f._2._1))) .sortByKey(ascending = false) .map(t => t._2) // 列表显示 result.collect.foreach(println) // 将查询结果保存到HDFS文件系统中 result.saveAsTextFile("/data/spark/movielens/result") } }