关于aggregateByKey

Apache Spark aggregateByKey函数聚合每个key的值,使用给定的combine函数和一个中性的“零值”,并为该key返回不同类型的值。

这个aggregateByKey函数总共接受3个参数:

  • zeroValue:它是累加值或累加器的初值。如果聚合类型是对所有的值求和,那么它可以是0。如果聚合目标是找出最小值,这个值可以是Double.MaxValue。如果聚合目标是找出最大值,这个值可以使用Double.MinValue。或者,如果我们只是想要一个各自的集合作为每个key的输出的话,也可以使用一个空的List或Map对象。
  • seqOp:是聚合单个分区的所有值的操作。它将一种类型[V]的数据转换/合并为另一种类型[U]的序列操作函数。
  • combOp:类似于seqOp,进一步聚合来自不同分区的所有聚合值。它将多个转换后的类型[U]合并为一个单一类型[U]的组合操作函数。

例如,我们有这样的RDD:PairRDD[String, (String, Double)]。其中,key是学生姓名,数据类型为String,值是课程名称和成绩,数据类型为(String,Double)。现在我们要找出每个学生的最好成绩,过程如下:

【示例】给出每个学生每门功课的分数,请使用aggregateByKey函数计算每个学生的最好成绩。

# 使用key-value对创建PairRDD studentRDD
student_rdd = spark.sparkContext.parallelize([
          ("Joseph", "Maths", 83), ("Joseph", "Physics", 74), ("Joseph", "Chemistry", 91), 
          ("Joseph", "Biology", 82),   ("Jimmy", "Maths", 69), ("Jimmy", "Physics", 62), 
          ("Jimmy", "Chemistry", 97), ("Jimmy", "Biology", 80), ("Tina", "Maths", 78), 
          ("Tina", "Physics", 73), ("Tina", "Chemistry", 68), ("Tina", "Biology", 87),
          ("Thomas", "Maths", 87), ("Thomas", "Physics", 93), ("Thomas", "Chemistry", 91), 
          ("Thomas", "Biology", 74), ("Cory", "Maths", 56), ("Cory", "Physics", 65), 
          ("Cory", "Chemistry", 71), ("Cory", "Biology", 68), ("Jackeline", "Maths", 86), 
          ("Jackeline", "Physics", 62), ("Jackeline", "Chemistry", 75), ("Jackeline", "Biology", 83),
          ("Juan", "Maths", 63), ("Juan", "Physics", 69), ("Juan", "Chemistry", 64), 
          ("Juan", "Biology", 60)], 2)
 
    
# 定义Seqencial Operation and Combiner Operations
# Sequence operation : 从单个分区查找最大成绩
def seq_op(accumulator, element):
    if(accumulator > element[1]):
        return accumulator 
    else: 
        return element[1]
 
 
# Combiner Operation : 从所有分区累加器中找出最大成绩
def comb_op(accumulator1, accumulator2):
    if(accumulator1 > accumulator2):
        return accumulator1 
    else:
        return accumulator2
 

# 在我们的情况下,零值将是零,因为我们正在寻找最大的成绩
zero_val = 0
aggr_rdd = student_rdd.map(lambda t: (t[0], (t[1], t[2]))).aggregateByKey(zero_val, seq_op, comb_op) 
 
# 查看输出
for tpl in aggr_rdd.collect():
    print(tpl)

执行以上代码,输出结果如下:

('Jimmy', 97)
('Tina', 87)
('Thomas', 93)
('Joseph', 91)
('Cory', 71)
('Jackeline', 86)
('Juan', 69)

【示例】在上例的基础上,请修改代码,要求要同时输出每个学生最高成绩及该成绩所属的课程。

# 定义Seqencial Operation 和Combiner Operations
def seq_op(accumulator, element):
    if(accumulator[1] > element[1]):
        return accumulator 
    else: 
        return element
 
 
# Combiner Operation : 从所有分区累加器中找出最大成绩
def comb_op(accumulator1, accumulator2):
    if(accumulator1[1] > accumulator2[1]):
        return accumulator1 
    else:
        return accumulator2
    
 
# 在我们的情况下,零值将是零,因为我们正在寻找最大的成绩
zero_val = ('', 0)
aggr_rdd = student_rdd.map(lambda t: (t[0], (t[1], t[2]))).aggregateByKey(zero_val, seq_op, comb_op) 
 
# 查看输出
for tpl in aggr_rdd.collect():
    print(tpl)

执行以上代码,输出结果如下:

('Jimmy', 77.0)
('Tina', 76.5)
('Thomas', 86.25)
('Joseph', 82.5)
('Cory', 65.0)
('Jackeline', 76.5)
('Juan', 64.0)

aggregateByKey函数的特点总结如下:

  • 性能方面的aggregateByKey是一个优化的transformation操作;
  • aggregateByKey是一个宽依赖的转换;
  • 当聚合需求加上输入和输出RDD类型不同时,我们应该使用aggregateByKey;
  • 当聚合需求加上输入和输出RDD类型相同时,我们应该使用reduceByKey。

《Spark原理深入与编程实战》