RDD Transformation操作

Transformation是操作RDD并返回一个新的RDD,如map()和filter()方法,而action是返回一个结果给驱动程序或将结果写入存储的操作,并开始一个计算,如count()和first()。

Spark对于transformation RDD是延迟计算的,只在遇到action时才真正进行计算。许多转换是作用于元素范围内的,也就是一次作用于一个元素。

现在假设有一个RDD,包含元素为{1, 2, 3, 3}。首先,让我们构造出一个RDD:

// 构造一个RDD
val rdd = sc.parallelize(List(1,2,3,3))

接下来,学习普通RDD上的各种转换操作方法。

map(func)

// map转换
val rdd1 = rdd.map(x => x + 1)   // tansformation
rdd1.collect      // action

mapPartitions(func)

通过对这个RDD的每个分区应用一个函数来返回一个新的RDD。

mapPartitions()可以作为map()和foreach()的替代方法。可以对每个分区调用mapPartitions(),而对RDD中的每个元素调用map()和foreach()。因此,可以根据每个分区而不是每个元素进行初始化。

// 构造RDD
val x = spark.sparkContext.parallelize(Array(1,2,3,4,5,6,7,8,9,10), 2)

// mapPartitions转换
x.mapPartitions(iter => Iterator(iter.toArray))
 .collect
 .foreach(item => println(item.toList))

// 自定义函数(迭代器求和)
def f(i:Iterator[Int]) = {
    // 每个分区的每个元素翻倍
    Tuple1(i.sum).productIterator
}

// 应用mapPartitions转换,求每个分区的元素和
val y = x.mapPartitions(f)

// 返回结果
y.collect

mapPartitionsWithIndex(func)

通过对这个RDD的每个分区应用一个函数来返回一个新的RDD,同时跟踪原始分区的索引。mapPartitionsWithIndex类似于mapPartitions(),但它提供了第二个参数索引,用于跟踪分区。

val x = spark.sparkContext.parallelize(Array(1,2,3,4,5,6,7,8,9,10), 2)

// 定义函数 f
def f(partitionIndex:Int, i:Iterator[Int]) = {
     (partitionIndex, i.sum).productIterator
}

val y = x.mapPartitionsWithIndex(f)

y.glom.collect

flatMap(func)

val rdd2 = rdd.flatMap(x => x.to(3))
rdd2.collect

filter(func)

val rdd3 = rdd.filter(x => x!=1)
rdd3.collect

sample(withReplacement, fraction, seed)

返回这个RDD的一个采样子集。其中各参数含义如下:

  • withReplacement:是否可以对元素进行多次采样(采样后替换)
  • fraction:抽样因子。对于without replacement,每个元素被选中的概率,fraction值必须是[0,1]之间;对于with replacement,每个元素被选择的期望次数,fraction值必须大于等于0。
  • seed:用于随机数生成器的种子。

注意:这并不能保证精确地提供给定RDD的计数的因子。

val rdd5 = rdd.sample(false,0.5)
rdd5.collect

distinct([numPartitions])):返回一个包含这个RDD中不同元素的新RDD。

val rdd4 = rdd.distinct()
rdd4.collect

keyBy(func): RDD[(K, T)]

当在类型为T的数据集上调用时,返回一个(K, T)元组对的数据集。通过应用func函数创建这个RDD中元素的元组。

val  x= sc.parallelize(Array("John", "Fred", "Anna", "James"))
val  y= x.keyBy(w => w.charAt(0))
println(y.collect().mkString(", "))

groupBy(func),groupBy(func, numPartitions),groupBy(func, partitioner)

当在类型为T的数据集上调用时,返回一个(K, Iterable[T])元组的数据集。

返回分组项的RDD。每个组由一个key和一系列映射到该key的元素组成。每个组内元素的顺序不能得到保证,甚至在每次计算结果RDD时可能会有所不同。这个方法有可能会引起数据shuffle。

val x = sc.parallelize(Array("Joseph", "Jimmy", "Tina", "Thomas", "James", 
"Cory", "Christine", "Jackeline", "Juan"), 3)

// 每第一个字符创建一个组
val y = x.groupBy(word => word.charAt(0))
 
y.collect
 
// 另一个短的语法
val y = x.groupBy(_.charAt(0))
 
y.collect

sortBy(func,[ascending],[numPartitions])

返回这个按给定key函数排序的RDD。

val data = List(3,1,90,3,5,12)
val rdd = sc.parallelize(data)
rdd.collect
 
// 默认升序
rdd.sortBy(x => x).collect
 
// 降序
rdd.sortBy(x => x, false).collect
 
val result = rdd.sortBy(x => x, false) 
result.partitions.size

// 改变分区数为1
val result = rdd.sortBy(x => x, false, 1)

result.partitions.size

上面的实例对rdd中的元素进行升序排序。并对排序后的RDD的分区个数进行了修改,上面的result就是排序后的RDD,默认的分区个数是2,而我们对它进行了修改,所以最后变成了1。

glom():RDD[Array[T]]

返回将每个分区中的所有元素合并到一个数组中创建的RDD,一个分区一个数组。当在类型为T的RDD上调用时,返回一个Array[T]的RDD。

// 构造RDD
val x = spark.sparkContext.parallelize(Array(1,2,3,4,5,6,7,8,9,10), 2)

// 将每个分区中的所元素合并成一个数组并返回
x.glom().collect

// 分区求和
x.glom().map(_.sum).collect

repartition(numPartitions)

随机地重新shuffle RDD中的数据,以创建更多或更少的分区,并在它们之间进行平衡。repartition()用于增加或减少RDD分区。需要注意的一点是,Sparkcoalesce()是非常昂贵的操作,因为它会跨多个分区转移数据。

val rdd = spark.sparkContext.parallelize(Range(0,20))
println("当前分区数:" + rdd.partitions.size)				// 2

val rdd2 = rdd.repartition(4)
println("重分区后,现在的分区数:" + rdd2.partitions.size)		// 4

coalesce(numPartitions)

将RDD中的分区数量减少到numpartition。coalesce()仅用于以一种有效的方式减少分区数量,适用于过滤大型数据集后更有效地运行操作。这是repartition()的优化或改进版本,其中使用合并可以降低跨分区的数据移动。

需要注意的一点是,Spark repartition()是非常昂贵的操作,因为它会跨多个分区转移数据。

val rdd = spark.sparkContext.parallelize(Range(0,20))
println("当前分区数:" + rdd.partitions.size)				// 2

val rdd3 = rdd.coalesce(1)
println("合并分区后,现在的分区数:" + rdd3.partitions.size)	// 1

randomSplit(weights, seed)

使用提供的权重随机分割这个RDD,以数组形式返回拆分后的RDD(即拆分后的RDD组成的数组并返回)。其中各参数含义如下:

  • weights:分割的权重,如果它们的和不等于1,将被标准化。
  • seed:随机种子。
// 构造一个RDD
val rdd1 = spark.sparkContext.parallelize(Array(1,2,3,4,5,6,7,8,9,10))

// 按80/20分割数据集
val splitedRDD = rdd1.randomSplit(Array(0.8,0.2))

// 查看
splitedRDD(0).collect			// Array(2, 4, 5, 6, 7, 8, 9, 10)
splitedRDD(0).collect			// Array(1, 3)

《Spark原理深入与编程实战》