Key-Value Pair RDD
有一类特殊的RDD,其元素是以<key,value>对的形式出现,我们称之为"Pair RDD"。针对key/value pair RDD,Spark专门提供了一些操作,这些操作只在key/value对的RDD上可用。
配套视频:
创建Pair RDD
Spark在包含key/value对的RDDs上提供了专门的transformation API,包括reduceByKey、groupByKey、sortByKey和join等。Pair RDD让我们能够在key上并行操作,或者跨网络重新组织数据。
Key/value RDD常被用于执行聚合操作,以及常被用来完成初始的ETL(extract, transform, load)以获取key/value格式数据。
注意,除了count操作之外,大多数操作通常都涉及到shuffle,因为与key相关的数据可能并不总是驻留在单个分区上。
创建Pair RDD的方式有多种。第一种创建方式:从文件中加载。
请看下面的代码:
val file = "/data/spark_demo/rdd/wc.txt" val lines = sc.textFile(file) val pairRDD = lines.flatMap(line => line.split(" ")).map(word => (word,1)) // 通过转换,生成Pair RDD pairRDD.collect
第二种方式:通过并行集合创建Pair RDD。
请看下面的代码:
val rdd = sc.parallelize(Seq("Hadoop","Spark","Hive","Spark")) val pairRDD = rdd.map(word => (word,1)) pairRDD.collect val a = sc.parallelize(List("black", "blue", "white", "green", "grey"), 2) // 通过应用指定的函数来创建该RDD中元素的元组(参数函数生成对应的key),返回一个pair RDD val b = a.keyBy(_.length) b.collect
操作Pair RDD
假设有一个Pair RDD {(1,2),(3,4),(3,6)}。
// 构造pair rdd val pairRDD = sc.parallelize(Seq((1,2),(3,4),(3,6))) pairRDD.collect
keys
返回所有的key。
val p3 = pairRDD.keys p3.collect
values
返回所有的value。
val p4 = pairRDD.values p4.collect
mapValues(func)
将函数应用到Pair RDD中的每个元素上,只改变value,不改变key。
val p6 = pairRDD.mapValues(x => x*x) p6.collect
flatMapValues(func)
传入(K, U)对,传出(K,TraversableOnce[U])。通过一个flatMap函数传递key-value pair RDD中的每个value,而不改变key值;这也保留了原始的RDD分区。
val p7 = pairRDD.flatMapValues(x => (x to 5)) p7.collect
sortByKey([ascending], [numPartitions])
这是一个transformation操作。按照key进行排序,默认是升序。当对(K, V)对的数据集(其中K实现Ordered)调用时,返回一个(K, V)对的数据集(按键升序或降序排序),按布尔类型的ascending参数中指定的顺序。
val p5 = pairRDD.sortByKey() p5.collect // pairRDD.sortByKey(ascending=false).collect pairRDD.sortByKey(false).collect
groupByKey([numPartitions]):这是一个transformation操作。它将RDD中每个key的值分组成一个序列。当对(K, V)对的数据集调用时,返回(K, Iterable<v>)对的数据集。
val p2 = pairRDD.groupByKey() p2.collect
reduceByKey(func, [numPartitions]),reduceByKey(partitioner, func])
这是一个transformation操作。它按照key来合并值(相同key的值进行合并)。当对一个(K, V)对的数据集调用时,返回一个(K, V)对的数据集,其中每个key的值使用给定的reduce函数func进行聚合,该reduce函数的类型必须是(V,V) => V。
val p1 = pairRDD.reduceByKey((x,y) => x + y) p1.collect
foldByKey(zeroValue, [numPartitions])(func),foldByKey(zeroValue, [partitioner])(func)
这是一个transformation操作。它使用一个关联函数和一个初始值来合并每个键的值,这个初始值可以任意次数地添加到结果中,并且不能改变结果(例如,列表连接为Nil,加法为0,乘法为1)。
val p7 = pairRDD.foldByKey(1)((a,b) => a + b) p7.collect // 返回Array((1,3), (3,11))
aggregateByKey(zeroValue)(seqOp, combOp, [numPartitions])
这是一个transformation操作。当对一个(K, V)对的数据集调用时,返回一个(K, U)对的数据集,其中每个key的值使用给定的combine函数和一个中性的“零”值进行聚合。允许与输入值类型不同的聚合值类型,同时避免不必要的分配。
combineByKey(createCombiner, mergeValue, mergeCombiners, numPartitions, mapSideCombine)
这是一个transformation操作。合并相同key的值,使用不同的结果类型。
- createCombiner:在第一次遇到Key时创建组合器函数,将RDD数据集中的V类型value值转换C类型值(V => C)。
- mergeValue:合并值函数,再次遇到相同的Key时,将createCombiner的C类型值与这次传入的V类型值合并成一个C类型值(C,V)=>C。
- mergeCombiners:合并组合器函数,将C类型值两两合并成一个C类型值。
- partitioner:使用已有的或自定义的分区函数,默认是HashPartitioner。
- mapSideCombine:是否在map端进行Combine操作,默认为true。
subtractByKey
这是一个transformation操作。它返回这样一个RDD:其中的pair对的键key只在当前RDD中有而在other RDD中没有。它有三个重载的方法:
- subtractByKey[W](other)
- subtractByKey[W](other, numPartitions)
- subtractByKey[W](other, partitioner)
sampleByKey(withReplacement, fractions, seed)
这是一个transformation操作。返回按key采样的RDD的一个子集(通过分层采样)。
sampleByKeyExact(withReplacement, fractions, seed)
这是一个transformation操作。返回按key采样的RDD的一个子集(通过分层采样),对于每一层(具有相同key的一组对),包含精确的math.ceil(numItems * samplingRate)个元素。
cogroup(otherDataset, [numPartitions])
这是一个transformation操作。当对类型(K, V)和(K, W)的数据集调用时,返回一个(K, (Iterable<v>, Iterable<w>))元组的数据集。这个操作也称为groupWith。
groupWith[W](other)
cogroup的别名。
这是一个transformation操作。groupWith[W1, W2](other1, other2):cogroup的别名。当对类型(K, V)、(K, W1)和(K, W2)的数据集调用时,返回一个(K, (Iterable<v>, Iterable<w1>, Iterable<w2>))元组的数据集。
groupWith[W1, W2, W3](other1, other2, other3):cogroup的别名。当对类型(K, V)、(K, W1)、(K, W2)和(K, W3)的数据集调用时,返回一个(K, (Iterable<v>, Iterable<w1>, Iterable<w2>, Iterable<w3>))元组的数据集。
partitionBy(partitioner)
这是一个transformation操作。返回使用指定分区器分区的RDD的一个副本。
repartitionAndSortWithinPartitions(partitioner)
根据给定的分区程序对RDD进行重新分区,并在每个结果分区中根据键对记录进行排序。这比调用repartition然后在每个分区内排序更有效,因为它可以将排序下推到shuffle机制中。
Pair RDD上的action操作
countByKey()
这是一个action操作。计算每个key的元素数量,将结果收集到一个本地Map中(Map[K, Long])。
val stus = List(("计算机系","张三"),("数学系","李四"),("计算机系","王老五"),("数学系","赵老六")) val rdd1 = spark.sparkContext.parallelize(stus) val kvRDD1 = rdd1.keyBy(_._1) // 转换为pair rdd kvRDD1.countByKey // action操作,返回Map(计算机系 -> 2, 数学系 -> 2)
collectAsMap()
这是一个action操作。将这个RDD中的键值对作为Map返回给master。这不会返回一个multimap(所以如果一个键有多个值,每个键在返回的map中只保留一个值)。这个方法只应该在结果数据很小的情况下使用,因为所有的数据都被加载到驱动程序的内存中。
Pair RDD的集合和连接操作
可以对两个Pair RDD按key进行join连接。Spark提供了类似于关系数据库中的连接,分别是join、leftOuterJoin、rightOuterJoin、fullOuterJoin。
- join(otherDataset, [numPartitions]):当对类型(K, V)和(K, W)的数据集调用时,返回(K, (V, W))对的数据集,其中包含每个key的所有元素对。通过leftOuterJoin、rightOuterJoin和fullOuterJoin支持外连接。
- leftOuterJoin:左外连接。
- rightOuterJoin:右外连接。
- fullOuterJoin:全外连接。
假设有两个RDD,分别是{(1,2),(3,4),(3,6)}和{(3,9)}。首先,我们构造两个RDD:
val pairRDD1 = sc.parallelize(Seq((1,2),(3,4),(3,6))) val pairRDD2 = sc.parallelize(Seq((3,9)))
接下来,对两个RDD进行转换操作:
1)subtractByKey
val r1 = pairRDD1.subtractByKey(pairRDD2) r1.collect
2)join
内连接
val r2 = pairRDD1.join(pairRDD2) r2.collect
3)leftOuterJoin
左外连接。
val r3 = pairRDD1.leftOuterJoin(pairRDD2) r3.collect
4)rightOuterJoin
右外连接。
val r4 = pairRDD1.rightOuterJoin(pairRDD2) r4.collect
5)fullOuterJoin
全外连接。
val r5 = pairRDD1.fullOuterJoin(pairRDD2) r5.collect
6)cogroup
对来自两个RDD的数据按key分组。
val r6 = pairRDD1.cogroup(pairRDD2) r5.collect