关于combineByKey
Pair RDD的combineByKey转换与Hadoop MapReduce编程中的combiner非常相似。是一个宽依赖的操作,它在最后阶段需要shuffle数据。另外,它内部会按分区合并元素。
Pair RDD的combineByKey是一个通用函数,它使用一组自定义的聚合函数组合每个key的元素。内部combineByKey函数通过应用聚合函数有效地组合了Pair RDD分区的值。combineByKey转换的主要目标是将任何PairRDD[(K,V)]转换为RDD[(K,C)],其中C是键K下所有值的任何聚合的结果。
Pair RDD的combineByKey函数使用如下三个函数作为参数:
- createCombiner:在第一次遇到Key时创建组合器函数,将RDD数据集中的V类型value值转换C类型值(V => C);
- mergeValue:mergeValue:合并值函数,再次遇到相同的Key时,将createCombiner的C类型值与这次传入的V类型值合并成一个C类型值(C,V)=>C。
- mergeCombiners:合并组合器函数,将C类型值两两合并成一个C类型值。
下面我们通一个示例来理解combineByKey的用法。
首先,假设我们有一个由studentName、subjectName和marks构成的RDD,我们想要得到每个学生的平均成绩。下面是用combineByKey变换求解的步骤。

【示例】给出每个学生每门功课的分数,请使用combinerByKey函数计算每个学生的平均成绩。
在这个例子中,因为要计算平均成绩,需要做sum和count聚合。所以这里的createCombiner函数应该用一个元组(sum, count)来初始化它。对于初始聚合,它应该是(value,1)。
在这个例子中,mergeValue有一个累加器元组(sum, count)。因此,每当我们得到一个新值,marks将被添加到第一个元素,而第二个值(即计数器)将增加1。
在这个例子中,mergeCombiners合并来自各个分区的结果(sum, count)。因此,对于同一个key,需要将各个元组中的sum和count都累加,得到每个学生的总成绩和总课目数。
代码实现如下所示:
# 创建PairRDD student_rdd student_rdd = sc.parallelize([ ("Joseph", "Maths", 83), ("Joseph", "Physics", 74), ("Joseph", "Chemistry", 91), ("Joseph", "Biology", 82), ("Jimmy", "Maths", 69), ("Jimmy", "Physics", 62), ("Jimmy", "Chemistry", 97), ("Jimmy", "Biology", 80), ("Tina", "Maths", 78), ("Tina", "Physics", 73), ("Tina", "Chemistry", 68), ("Tina", "Biology", 87), ("Thomas", "Maths", 87), ("Thomas", "Physics", 93), ("Thomas", "Chemistry", 91), ("Thomas", "Biology", 74), ("Cory", "Maths", 56), ("Cory", "Physics", 65), ("Cory", "Chemistry", 71), ("Cory", "Biology", 68), ("Jackeline", "Maths", 86), ("Jackeline", "Physics", 62), ("Jackeline", "Chemistry", 75), ("Jackeline", "Biology", 83), ("Juan", "Maths", 63), ("Juan", "Physics", 69), ("Juan", "Chemistry", 64), ("Juan", "Biology", 60)], 3) # 定义createCombiner, mergeValue 和mergeCombiner 函数 def createCombiner(tpl): return (tpl[1], 1) def mergeValue(accumulator, element): return (accumulator[0] + element[1], accumulator[1] + 1) def mergeCombiner(accumulator1, accumulator2): return (accumulator1[0] + accumulator2[0], accumulator1[1] + accumulator2[1]) comb_rdd = student_rdd.map(lambda t: (t[0], (t[1], t[2]))) \ .combineByKey(createCombiner, mergeValue, mergeCombiner) \ .map(lambda t: (t[0], t[1][0]/t[1][1])) # 查看输出 for tpl in comb_rdd.collect(): print(tpl)
执行以上代码,输出结果如下:
('Jimmy', 77.0) ('Tina', 76.5) ('Thomas', 86.25) ('Juan', 64.0) ('Joseph', 82.5) ('Cory', 65.0) ('Jackeline', 76.5)
combineByKey转换函数的特点总结如下:
- combineByKey是一个通用的转换,而groupByKey、reduceByKey和aggregateByKey转换的内部实现使用了combineByKey;
- combineByKey转换可灵活执行map或reduce端combine;
- combineByKey转换的使用更加复杂;
- 总是需要实现三个函数:createCombiner、mergeValue、mergeCombiner;
- combineByKey是一个转换操作,因此它的计算是惰性的;
- 它是一个宽依赖的操作,因为它在聚合的最后阶段shuffle数据并创建另一个RDD。