Key-Value Pair RDD

有一类特殊的RDD,其元素是以<key,value>对的形式出现,我们称之为"Pair RDD"。针对key/value pair RDD,Spark专门提供了一些操作,这些操作只在key/value对的RDD上可用。

配套视频:

Spark Pair RDD视频

创建Pair RDD

Spark在包含key/value对的RDDs上提供了专门的transformation API,包括reduceByKey、groupByKey、sortByKey和join等。Pair RDD让我们能够在key上并行操作,或者跨网络重新组织数据。

Key/value RDD常被用于执行聚合操作,以及常被用来完成初始的ETL(extract, transform, load)以获取key/value格式数据。

注意,除了count操作之外,大多数操作通常都涉及到shuffle,因为与key相关的数据可能并不总是驻留在单个分区上。

创建Pair RDD的方式有多种。第一种创建方式:从文件中加载。

请看下面的代码:

val file = "/data/spark_demo/rdd/wc.txt"
val lines = sc.textFile(file)

val pairRDD = lines.flatMap(line => line.split(" ")).map(word => (word,1))   // 通过转换,生成Pair RDD
pairRDD.collect

第二种方式:通过并行集合创建Pair RDD。

请看下面的代码:

val rdd = sc.parallelize(Seq("Hadoop","Spark","Hive","Spark"))
val pairRDD = rdd.map(word => (word,1))
pairRDD.collect

val a = sc.parallelize(List("black", "blue", "white", "green", "grey"), 2)
// 通过应用指定的函数来创建该RDD中元素的元组(参数函数生成对应的key),返回一个pair RDD
val b = a.keyBy(_.length) 
b.collect

操作Pair RDD

假设有一个Pair RDD {(1,2),(3,4),(3,6)}。

// 构造pair rdd
val pairRDD = sc.parallelize(Seq((1,2),(3,4),(3,6)))
pairRDD.collect

keys

返回所有的key。

val p3 = pairRDD.keys
p3.collect

values

返回所有的value。

val p4 = pairRDD.values
p4.collect

mapValues(func)

将函数应用到Pair RDD中的每个元素上,只改变value,不改变key。

val p6 = pairRDD.mapValues(x => x*x)
p6.collect

flatMapValues(func)

传入(K, U)对,传出(K,TraversableOnce[U])。通过一个flatMap函数传递key-value pair RDD中的每个value,而不改变key值;这也保留了原始的RDD分区。

val p7 = pairRDD.flatMapValues(x => (x to 5))
p7.collect

sortByKey([ascending], [numPartitions])

这是一个transformation操作。按照key进行排序,默认是升序。当对(K, V)对的数据集(其中K实现Ordered)调用时,返回一个(K, V)对的数据集(按键升序或降序排序),按布尔类型的ascending参数中指定的顺序。

val p5 = pairRDD.sortByKey()
p5.collect

// pairRDD.sortByKey(ascending=false).collect
pairRDD.sortByKey(false).collect

groupByKey([numPartitions]):这是一个transformation操作。它将RDD中每个key的值分组成一个序列。当对(K, V)对的数据集调用时,返回(K, Iterable<v>)对的数据集。

val p2 = pairRDD.groupByKey()
p2.collect

reduceByKey(func, [numPartitions]),reduceByKey(partitioner, func])

这是一个transformation操作。它按照key来合并值(相同key的值进行合并)。当对一个(K, V)对的数据集调用时,返回一个(K, V)对的数据集,其中每个key的值使用给定的reduce函数func进行聚合,该reduce函数的类型必须是(V,V) => V。

val p1 = pairRDD.reduceByKey((x,y) => x + y)
p1.collect

foldByKey(zeroValue, [numPartitions])(func),foldByKey(zeroValue, [partitioner])(func)

这是一个transformation操作。它使用一个关联函数和一个初始值来合并每个键的值,这个初始值可以任意次数地添加到结果中,并且不能改变结果(例如,列表连接为Nil,加法为0,乘法为1)。

val p7 = pairRDD.foldByKey(1)((a,b) => a + b)
p7.collect						// 返回Array((1,3), (3,11))

aggregateByKey(zeroValue)(seqOp, combOp, [numPartitions])

这是一个transformation操作。当对一个(K, V)对的数据集调用时,返回一个(K, U)对的数据集,其中每个key的值使用给定的combine函数和一个中性的“零”值进行聚合。允许与输入值类型不同的聚合值类型,同时避免不必要的分配。

combineByKey(createCombiner, mergeValue, mergeCombiners, numPartitions, mapSideCombine)

这是一个transformation操作。合并相同key的值,使用不同的结果类型。

  • createCombiner:在第一次遇到Key时创建组合器函数,将RDD数据集中的V类型value值转换C类型值(V => C)。
  • mergeValue:合并值函数,再次遇到相同的Key时,将createCombiner的C类型值与这次传入的V类型值合并成一个C类型值(C,V)=>C。
  • mergeCombiners:合并组合器函数,将C类型值两两合并成一个C类型值。
  • partitioner:使用已有的或自定义的分区函数,默认是HashPartitioner。
  • mapSideCombine:是否在map端进行Combine操作,默认为true。

subtractByKey

这是一个transformation操作。它返回这样一个RDD:其中的pair对的键key只在当前RDD中有而在other RDD中没有。它有三个重载的方法:

  • subtractByKey[W](other)
  • subtractByKey[W](other, numPartitions)
  • subtractByKey[W](other, partitioner)

sampleByKey(withReplacement, fractions, seed)

这是一个transformation操作。返回按key采样的RDD的一个子集(通过分层采样)。

sampleByKeyExact(withReplacement, fractions, seed)

这是一个transformation操作。返回按key采样的RDD的一个子集(通过分层采样),对于每一层(具有相同key的一组对),包含精确的math.ceil(numItems * samplingRate)个元素。

cogroup(otherDataset, [numPartitions])

这是一个transformation操作。当对类型(K, V)和(K, W)的数据集调用时,返回一个(K, (Iterable<v>, Iterable<w>))元组的数据集。这个操作也称为groupWith。

groupWith[W](other)

cogroup的别名。

这是一个transformation操作。groupWith[W1, W2](other1, other2):cogroup的别名。当对类型(K, V)、(K, W1)和(K, W2)的数据集调用时,返回一个(K, (Iterable<v>, Iterable<w1>, Iterable<w2>))元组的数据集。

groupWith[W1, W2, W3](other1, other2, other3):cogroup的别名。当对类型(K, V)、(K, W1)、(K, W2)和(K, W3)的数据集调用时,返回一个(K, (Iterable<v>, Iterable<w1>, Iterable<w2>, Iterable<w3>))元组的数据集。

partitionBy(partitioner)

这是一个transformation操作。返回使用指定分区器分区的RDD的一个副本。

repartitionAndSortWithinPartitions(partitioner)

根据给定的分区程序对RDD进行重新分区,并在每个结果分区中根据键对记录进行排序。这比调用repartition然后在每个分区内排序更有效,因为它可以将排序下推到shuffle机制中。

Pair RDD上的action操作

countByKey()

这是一个action操作。计算每个key的元素数量,将结果收集到一个本地Map中(Map[K, Long])。

val stus = List(("计算机系","张三"),("数学系","李四"),("计算机系","王老五"),("数学系","赵老六"))
val rdd1 = spark.sparkContext.parallelize(stus)

val kvRDD1 = rdd1.keyBy(_._1)   	// 转换为pair rdd
kvRDD1.countByKey			// action操作,返回Map(计算机系 -> 2, 数学系 -> 2)

collectAsMap()

这是一个action操作。将这个RDD中的键值对作为Map返回给master。这不会返回一个multimap(所以如果一个键有多个值,每个键在返回的map中只保留一个值)。这个方法只应该在结果数据很小的情况下使用,因为所有的数据都被加载到驱动程序的内存中。

Pair RDD的集合和连接操作

可以对两个Pair RDD按key进行join连接。Spark提供了类似于关系数据库中的连接,分别是join、leftOuterJoin、rightOuterJoin、fullOuterJoin。

  • join(otherDataset, [numPartitions]):当对类型(K, V)和(K, W)的数据集调用时,返回(K, (V, W))对的数据集,其中包含每个key的所有元素对。通过leftOuterJoin、rightOuterJoin和fullOuterJoin支持外连接。
  • leftOuterJoin:左外连接。
  • rightOuterJoin:右外连接。
  • fullOuterJoin:全外连接。

假设有两个RDD,分别是{(1,2),(3,4),(3,6)}和{(3,9)}。首先,我们构造两个RDD:

val pairRDD1 = sc.parallelize(Seq((1,2),(3,4),(3,6)))
val pairRDD2 = sc.parallelize(Seq((3,9)))

接下来,对两个RDD进行转换操作:

1)subtractByKey

val r1 = pairRDD1.subtractByKey(pairRDD2)
r1.collect

2)join

内连接

val r2 = pairRDD1.join(pairRDD2)
r2.collect

3)leftOuterJoin

左外连接。

val r3 = pairRDD1.leftOuterJoin(pairRDD2)
r3.collect

4)rightOuterJoin

右外连接。

val r4 = pairRDD1.rightOuterJoin(pairRDD2)
r4.collect

5)fullOuterJoin

全外连接。

val r5 = pairRDD1.fullOuterJoin(pairRDD2)
r5.collect

6)cogroup

对来自两个RDD的数据按key分组。

val r6 = pairRDD1.cogroup(pairRDD2)
r5.collect

《Flink原理深入与编程实战》