PySpark RDD编程案例_数据聚合计算
【示例】给定一些销售数据,数据采用键值对的形式(公司,收入),求出每个公司的总收入和平均收入。
提示:可直接用sc.parallelize在内存中生成数据,在求每个公司总收入时,先分三个分区进行求和,然后再把三个分区进行合并。只需要编写RDD combineByKey函数的前三个参数的实现。
from pyspark.sql import SparkSession # 构建SparkSession和SparkContext实例 spark = SparkSession.builder \ .master("spark://xueai8:7077") \ .appName("pyspark demo") \ .getOrCreate() sc = spark.sparkContext # 构造RDD data = sc.parallelize([("company-1",92),("company-1",85),("company-1",82), ("company-2",78),("company-2",96),("company-2",85), ("company-3",88),("company-3",94),("company-3",80)], 3) # 定义createCombiner, mergeValue 和mergeCombiner 函数 def createCombiner(income): return (income, 1) def mergeValue(acc, income): return (acc[0] + income, acc[1] + 1) def mergeCombiner(acc1, acc2): return (acc1[0] + acc2[0], acc1[1] + acc2[1]) cbk = data \ .combineByKey(createCombiner, mergeValue, mergeCombiner) \ .map(lambda t: (t[0], t[1][0], t[1][0]/float(t[1][1]))) # 查看输出 for row in cbk.collect(): print(row)
执行以上代码,输出结果如下:
('company-1', 259, 86.33333333333333) ('company-3', 262, 87.33333333333333) ('company-2', 259, 86.33333333333333)