PySpark RDD编程案例_数据聚合计算

【示例】给定一些销售数据,数据采用键值对的形式(公司,收入),求出每个公司的总收入和平均收入。

提示:可直接用sc.parallelize在内存中生成数据,在求每个公司总收入时,先分三个分区进行求和,然后再把三个分区进行合并。只需要编写RDD combineByKey函数的前三个参数的实现。

from pyspark.sql import SparkSession

# 构建SparkSession和SparkContext实例
spark = SparkSession.builder \
   .master("spark://xueai8:7077") \
   .appName("pyspark demo") \
   .getOrCreate()

sc = spark.sparkContext

# 构造RDD
data = sc.parallelize([("company-1",92),("company-1",85),("company-1",82),
                   ("company-2",78),("company-2",96),("company-2",85),
                   ("company-3",88),("company-3",94),("company-3",80)], 3)

# 定义createCombiner, mergeValue 和mergeCombiner 函数
def createCombiner(income):
    return (income, 1)
    
def mergeValue(acc, income): 
    return (acc[0] + income, acc[1] + 1)
    
def mergeCombiner(acc1, acc2): 
    return (acc1[0] + acc2[0], acc1[1] + acc2[1])

cbk = data \
    .combineByKey(createCombiner, mergeValue, mergeCombiner) \
    .map(lambda t: (t[0], t[1][0], t[1][0]/float(t[1][1])))

# 查看输出
for row in cbk.collect():
    print(row)

执行以上代码,输出结果如下:

('company-1', 259, 86.33333333333333)
('company-3', 262, 87.33333333333333)
('company-2', 259, 86.33333333333333)

《PySpark原理深入与编程实战》