Key-Value Pair RDD

有一类特殊的RDD,其元素是以<key,value>对的形式出现,我们称之为“Pair RDD”。针对key/value pair RDD,Spark专门提供了一些操作,这些操作只在key/value对的RDD上可用。

创建Pair RDD

Spark在包含key/value对的RDD上提供了专门的transformation API,包括reduceByKey、groupByKey、sortByKey和join等。Pair RDD让我们能够在key上并行操作,或者跨网络重新组织数据。Key/value RDD常被用于执行聚合操作,以及常被用来完成初始的ETL(extract, transform, load)以获取key/value格式数据。

注意,除了count操作之外,大多数操作通常都涉及到shuffle,因为与key相关的数据可能并不总是驻留在单个分区上。

创建Pair RDD的方式有多种。第一种创建方式:从文件中加载。请看下面的代码:

file = "/data/spark/rdd/wc.txt"
lines = sc.textFile(file)

# 通过转换,生成Pair RDD
pairRDD = lines.flatMap(lambda line: line.split(" ")).map(lambda word: (word,1))
pairRDD.collect()

第二种方式:通过并行集合创建Pair RDD。请看下面的代码:

rdd = sc.parallelize(["Hadoop","Spark","Hive","Spark"])
pairRDD = rdd.map(lambda word: (word,1))
pairRDD.collect()  		# [('Hadoop', 1), ('Spark', 1), ('Hive', 1), ('Spark', 1)]

也可以使用keyBy()函数自定义key的分组规则。

a = sc.parallelize(["black", "blue", "white", "green", "grey"], 2)

# 通过应用指定的函数来创建该RDD中元素的元组(参数函数生成对应的key),返回一个pair RDD
b = a.keyBy(lambda x:len(x))  

for t in b.collect():
    print(t)

执行以上代码,输出结果如下:

(5, 'black')
(4, 'blue')
(5, 'white')
(5, 'green')
(4, 'grey')

更多创建Pair RDD的方法。

pets = sc.parallelize([("cat",1),("dog",1),("cat",2)])
pets.collect()			# [('cat', 1), ('dog', 1), ('cat', 2)]

操作Pair RDD

假设有一个Pair RDD {(1,2),(3,4),(3,6)}。

# 构造pair rdd
pairRDD = sc.parallelize([(1,2),(3,4),(3,6)])
pairRDD.collect()				# [(1, 2), (3, 4), (3, 6)]

1)keys:返回所有的key。

p1 = pairRDD.keys()
p1.collect()					# [1, 3, 3]

2)values:返回所有的value。

p2 = pairRDD.values()
p2.collect()					# [2, 4, 6]

3)mapValues(func):将函数应用到Pair RDD中的每个元素上,只改变value,不改变key。

p3 = pairRDD.mapValues(lambda x: x*x)
p3.collect()					# [(1, 4), (3, 16), (3, 36)]

4)flatMapValues(func):传入(K, U)对,传出(K,TraversableOnce[U])。通过一个flatMap函数传递key-value pair RDD中的每个value,而不改变key值;这也保留了原始的RDD分区。

# flatMapValues(func)
p4 = pairRDD.flatMapValues(lambda x: range(x,6))
p4.collect()					# [(1, 2), (1, 3), (1, 4), (1, 5), (3, 4), (3, 5)]

5)sortByKey([ascending], [numPartitions]):

这是一个transformation操作。按照key进行排序,默认是升序。当对(K, V)对的数据集(其中K实现Ordered)调用时,返回一个(K, V)对的数据集(按键升序或降序排序),按布尔类型的ascending参数中指定的顺序。

p5 = pairRDD.sortByKey()
p5.collect()					# [(1, 2), (3, 4), (3, 6)]


p6 = pairRDD.sortByKey(ascending=False)
p6.collect()					# [(3, 4), (3, 6), (1, 2)]

6)groupByKey([numPartitions]):这是一个transformation操作。它将RDD中每个key的值分组成一个序列。当对(K, V)对的数据集调用时,返回(K, Iterable)对的数据集。

# groupByKey():按照key分组
p7 = pairRDD.groupByKey()
p7.map(lambda x : (x[0], list(x[1]))).collect()	# [(1, [2]), (3, [4, 6])]

7)reduceByKey(func, [numPartitions]),reduceByKey(partitioner, func]):

这是一个transformation操作。它按照key来合并值(相同key的值进行合并)。当对一个(K, V)对的数据集调用时,返回一个(K, V)对的数据集,其中每个key的值使用给定的reduce函数func进行聚合,该reduce函数的类型必须是(V,V) => V。

p8 = pairRDD.reduceByKey(lambda x,y: x + y)
p8.collect()					# [(1, 2), (3, 10)]

8)foldByKey(zeroValue, [numPartitions])(func),foldByKey(zeroValue, [partitioner])(func)

这是一个transformation操作。它使用一个关联函数和一个初始值来合并每个键的值,这个初始值可以任意次数地添加到结果中,并且不能改变结果(例如,列表连接为Nil,加法为0,乘法为1)。

p9 = pairRDD.foldByKey(1,lambda a,b: a + b)
p9.collect()					# [(1, 3), (3, 11)]

9)aggregateByKey(zeroValue)(seqOp, combOp, [numPartitions]):

这是一个transformation操作。当对一个(K, V)对的数据集调用时,返回一个(K, U)对的数据集,其中每个key的值使用给定的combine函数和一个中性的“零”值进行聚合。允许与输入值类型不同的聚合值类型,同时避免不必要的分配。(详细用法在3.5.6中讲解)

10)combineByKey(createCombiner, mergeValue, mergeCombiners, numPartitions, mapSideCombine):

这是一个transformation操作。合并相同key的值,使用不同的结果类型。(详细用法在3.5.7中讲解)

11)subtractByKey

这是一个transformation操作。它返回这样一个RDD:其中的pair对的键key只在当前RDD中有而在other RDD中没有。它有三个重载的方法:

  • subtractByKey[W](other)
  • subtractByKey[W](other, numPartitions)
  • subtractByKey[W](other, partitioner)

12)sampleByKey(withReplacement, fractions, seed):

这是一个transformation操作。返回按key采样的RDD的一个子集(通过分层采样)。

13)sampleByKeyExact(withReplacement, fractions, seed):

这是一个transformation操作。返回按key采样的RDD的一个子集(通过分层采样),对于每一层(具有相同key的一组对),包含精确的math.ceil(numItems * samplingRate)个元素。

14)连接操作

对两个pair RDD执行join连接,以key为连接条件。

  • join(otherDataset, [numPartitions]):当对类型(K, V)和(K, W)的数据集调用时,返回(K, (V, W))对的数据集,其中包含每个key的所有元素对。通过leftOuterJoin、rightOuterJoin和fullOuterJoin支持外连接。
  • leftOuterJoin:左外连接。
  • rightOuterJoin:右外连接。
  • fullOuterJoin:全外连接。

例如,在下面的代码中,创建了两个Pair RDD,并执行join连接:

# 构造pair rdd
pairRDD1 = sc.parallelize([(1,2),(3,4),(3,6)])
pairRDD2 = sc.parallelize([(1,3),(3,7),(4,9)])

# 执行join连接
joinedRDD = pairRDD1.join(pairRDD2)
joinedRDD.collect()  		# [(1, (2, 3)), (3, (4, 7)), (3, (6, 7))]

15)cogroup(otherDataset, [numPartitions]):

这是一个transformation操作。当对类型(K, V)和(K, W)的数据集调用时,返回一个(K, (Iterable, Iterable))元组的数据集。这个操作也称为groupWith。

16)groupWith[W](other):cogroup的别名。

这是一个transformation操作。groupWith[W1, W2](other1, other2):cogroup的别名。当对类型(K, V)、(K, W1)和(K, W2)的数据集调用时,返回一个(K, (Iterable, Iterable, Iterable))元组的数据集。

groupWith[W1, W2, W3](other1, other2, other3):cogroup的别名。当对类型(K, V)、(K, W1)、(K, W2)和(K, W3)的数据集调用时,返回一个(K, (Iterable, Iterable, Iterable, Iterable))元组的数据集。

17)partitionBy(partitioner):

这是一个transformation操作。返回使用指定分区器分区的RDD的一个副本。

18)repartitionAndSortWithinPartitions(partitioner):

根据给定的分区程序对RDD进行重新分区,并在每个结果分区中根据键对记录进行排序。这比调用repartition然后在每个分区内排序更有效,因为它可以将排序下推到shuffle机制中。

Pair RDD上的action操作

1)countByKey():

这是一个action操作。计算每个key的元素数量,将结果收集到一个本地Map中(Map[K, Long])。

stus = [("计算机系","张三"),("数学系","李四"),("计算机系","王老五"),("数学系","赵老六")]
rdd1 = sc.parallelize(stus)

kvRDD1 = rdd1.keyBy(lambda x:x[0])	# 转换为pair rdd
kvRDD1.countByKey()			# action操作,返回defaultdict(int, {'计算机系': 2, '数学系': 2})

2)collectAsMap()

这是一个action操作。将这个RDD中的键值对作为Map返回给master。这不会返回一个multimap(所以如果一个键有多个值,每个键在返回的map中只保留一个值)。这个方法只应该在结果数据很小的情况下使用,因为所有的数据都被加载到驱动程序的内存中。

stus = [("计算机系","张三"),("数学系","李四"),("计算机系","王老五"),("数学系","赵老六")]
rdd1 = sc.parallelize(stus)

kvRDD1 = rdd1.keyBy(lambda x:x[0])	# 转换为pair rdd
kvRDD1.collectAsMap()			# {'计算机系': ('计算机系', '王老五'), '数学系': ('数学系', '赵老六')}

《PySpark原理深入与编程实战》