Key-Value Pair RDD
有一类特殊的RDD,其元素是以<key,value>对的形式出现,我们称之为“Pair RDD”。针对key/value pair RDD,Spark专门提供了一些操作,这些操作只在key/value对的RDD上可用。
创建Pair RDD
Spark在包含key/value对的RDD上提供了专门的transformation API,包括reduceByKey、groupByKey、sortByKey和join等。Pair RDD让我们能够在key上并行操作,或者跨网络重新组织数据。Key/value RDD常被用于执行聚合操作,以及常被用来完成初始的ETL(extract, transform, load)以获取key/value格式数据。
注意,除了count操作之外,大多数操作通常都涉及到shuffle,因为与key相关的数据可能并不总是驻留在单个分区上。
创建Pair RDD的方式有多种。第一种创建方式:从文件中加载。请看下面的代码:
file = "/data/spark/rdd/wc.txt" lines = sc.textFile(file) # 通过转换,生成Pair RDD pairRDD = lines.flatMap(lambda line: line.split(" ")).map(lambda word: (word,1)) pairRDD.collect()
第二种方式:通过并行集合创建Pair RDD。请看下面的代码:
rdd = sc.parallelize(["Hadoop","Spark","Hive","Spark"]) pairRDD = rdd.map(lambda word: (word,1)) pairRDD.collect() # [('Hadoop', 1), ('Spark', 1), ('Hive', 1), ('Spark', 1)]
也可以使用keyBy()函数自定义key的分组规则。
a = sc.parallelize(["black", "blue", "white", "green", "grey"], 2) # 通过应用指定的函数来创建该RDD中元素的元组(参数函数生成对应的key),返回一个pair RDD b = a.keyBy(lambda x:len(x)) for t in b.collect(): print(t)
执行以上代码,输出结果如下:
(5, 'black') (4, 'blue') (5, 'white') (5, 'green') (4, 'grey')
更多创建Pair RDD的方法。
pets = sc.parallelize([("cat",1),("dog",1),("cat",2)]) pets.collect() # [('cat', 1), ('dog', 1), ('cat', 2)]
操作Pair RDD
假设有一个Pair RDD {(1,2),(3,4),(3,6)}。
# 构造pair rdd pairRDD = sc.parallelize([(1,2),(3,4),(3,6)]) pairRDD.collect() # [(1, 2), (3, 4), (3, 6)]
1)keys:返回所有的key。
p1 = pairRDD.keys() p1.collect() # [1, 3, 3]
2)values:返回所有的value。
p2 = pairRDD.values() p2.collect() # [2, 4, 6]
3)mapValues(func):将函数应用到Pair RDD中的每个元素上,只改变value,不改变key。
p3 = pairRDD.mapValues(lambda x: x*x) p3.collect() # [(1, 4), (3, 16), (3, 36)]
4)flatMapValues(func):传入(K, U)对,传出(K,TraversableOnce[U])。通过一个flatMap函数传递key-value pair RDD中的每个value,而不改变key值;这也保留了原始的RDD分区。
# flatMapValues(func) p4 = pairRDD.flatMapValues(lambda x: range(x,6)) p4.collect() # [(1, 2), (1, 3), (1, 4), (1, 5), (3, 4), (3, 5)]
5)sortByKey([ascending], [numPartitions]):
这是一个transformation操作。按照key进行排序,默认是升序。当对(K, V)对的数据集(其中K实现Ordered)调用时,返回一个(K, V)对的数据集(按键升序或降序排序),按布尔类型的ascending参数中指定的顺序。
p5 = pairRDD.sortByKey() p5.collect() # [(1, 2), (3, 4), (3, 6)] p6 = pairRDD.sortByKey(ascending=False) p6.collect() # [(3, 4), (3, 6), (1, 2)]
6)groupByKey([numPartitions]):这是一个transformation操作。它将RDD中每个key的值分组成一个序列。当对(K, V)对的数据集调用时,返回(K, Iterable
# groupByKey():按照key分组 p7 = pairRDD.groupByKey() p7.map(lambda x : (x[0], list(x[1]))).collect() # [(1, [2]), (3, [4, 6])]
7)reduceByKey(func, [numPartitions]),reduceByKey(partitioner, func]):
这是一个transformation操作。它按照key来合并值(相同key的值进行合并)。当对一个(K, V)对的数据集调用时,返回一个(K, V)对的数据集,其中每个key的值使用给定的reduce函数func进行聚合,该reduce函数的类型必须是(V,V) => V。
p8 = pairRDD.reduceByKey(lambda x,y: x + y) p8.collect() # [(1, 2), (3, 10)]
8)foldByKey(zeroValue, [numPartitions])(func),foldByKey(zeroValue, [partitioner])(func)
这是一个transformation操作。它使用一个关联函数和一个初始值来合并每个键的值,这个初始值可以任意次数地添加到结果中,并且不能改变结果(例如,列表连接为Nil,加法为0,乘法为1)。
p9 = pairRDD.foldByKey(1,lambda a,b: a + b) p9.collect() # [(1, 3), (3, 11)]
9)aggregateByKey(zeroValue)(seqOp, combOp, [numPartitions]):
这是一个transformation操作。当对一个(K, V)对的数据集调用时,返回一个(K, U)对的数据集,其中每个key的值使用给定的combine函数和一个中性的“零”值进行聚合。允许与输入值类型不同的聚合值类型,同时避免不必要的分配。(详细用法在3.5.6中讲解)
10)combineByKey(createCombiner, mergeValue, mergeCombiners, numPartitions, mapSideCombine):
这是一个transformation操作。合并相同key的值,使用不同的结果类型。(详细用法在3.5.7中讲解)
11)subtractByKey
这是一个transformation操作。它返回这样一个RDD:其中的pair对的键key只在当前RDD中有而在other RDD中没有。它有三个重载的方法:
- subtractByKey[W](other)
- subtractByKey[W](other, numPartitions)
- subtractByKey[W](other, partitioner)
12)sampleByKey(withReplacement, fractions, seed):
这是一个transformation操作。返回按key采样的RDD的一个子集(通过分层采样)。
13)sampleByKeyExact(withReplacement, fractions, seed):
这是一个transformation操作。返回按key采样的RDD的一个子集(通过分层采样),对于每一层(具有相同key的一组对),包含精确的math.ceil(numItems * samplingRate)个元素。
14)连接操作
对两个pair RDD执行join连接,以key为连接条件。
- join(otherDataset, [numPartitions]):当对类型(K, V)和(K, W)的数据集调用时,返回(K, (V, W))对的数据集,其中包含每个key的所有元素对。通过leftOuterJoin、rightOuterJoin和fullOuterJoin支持外连接。
- leftOuterJoin:左外连接。
- rightOuterJoin:右外连接。
- fullOuterJoin:全外连接。
例如,在下面的代码中,创建了两个Pair RDD,并执行join连接:
# 构造pair rdd pairRDD1 = sc.parallelize([(1,2),(3,4),(3,6)]) pairRDD2 = sc.parallelize([(1,3),(3,7),(4,9)]) # 执行join连接 joinedRDD = pairRDD1.join(pairRDD2) joinedRDD.collect() # [(1, (2, 3)), (3, (4, 7)), (3, (6, 7))]
15)cogroup(otherDataset, [numPartitions]):
这是一个transformation操作。当对类型(K, V)和(K, W)的数据集调用时,返回一个(K, (Iterable
16)groupWith[W](other):cogroup的别名。
这是一个transformation操作。groupWith[W1, W2](other1, other2):cogroup的别名。当对类型(K, V)、(K, W1)和(K, W2)的数据集调用时,返回一个(K, (Iterable
groupWith[W1, W2, W3](other1, other2, other3):cogroup的别名。当对类型(K, V)、(K, W1)、(K, W2)和(K, W3)的数据集调用时,返回一个(K, (Iterable
17)partitionBy(partitioner):
这是一个transformation操作。返回使用指定分区器分区的RDD的一个副本。
18)repartitionAndSortWithinPartitions(partitioner):
根据给定的分区程序对RDD进行重新分区,并在每个结果分区中根据键对记录进行排序。这比调用repartition然后在每个分区内排序更有效,因为它可以将排序下推到shuffle机制中。
Pair RDD上的action操作
1)countByKey():
这是一个action操作。计算每个key的元素数量,将结果收集到一个本地Map中(Map[K, Long])。
stus = [("计算机系","张三"),("数学系","李四"),("计算机系","王老五"),("数学系","赵老六")] rdd1 = sc.parallelize(stus) kvRDD1 = rdd1.keyBy(lambda x:x[0]) # 转换为pair rdd kvRDD1.countByKey() # action操作,返回defaultdict(int, {'计算机系': 2, '数学系': 2})
2)collectAsMap()
这是一个action操作。将这个RDD中的键值对作为Map返回给master。这不会返回一个multimap(所以如果一个键有多个值,每个键在返回的map中只保留一个值)。这个方法只应该在结果数据很小的情况下使用,因为所有的数据都被加载到驱动程序的内存中。
stus = [("计算机系","张三"),("数学系","李四"),("计算机系","王老五"),("数学系","赵老六")] rdd1 = sc.parallelize(stus) kvRDD1 = rdd1.keyBy(lambda x:x[0]) # 转换为pair rdd kvRDD1.collectAsMap() # {'计算机系': ('计算机系', '王老五'), '数学系': ('数学系', '赵老六')}