RDD Transformation操作
Transformation是操作RDD并返回一个新的RDD,如map()和filter()方法,而action是返回一个结果给驱动程序或将结果写入存储的操作,并开始一个计算,如count()和first()。
Spark对于transformation RDD是延迟计算的,只在遇到action时才真正进行计算。许多转换是作用于元素范围内的,也就是一次作用于一个元素。
现在假设有一个RDD,包含元素为{1, 2, 3, 3}。首先,让我们构造出一个RDD:
// 构造一个RDD val rdd = sc.parallelize(List(1,2,3,3))
接下来,学习普通RDD上的各种转换操作方法。
map(func)
// map转换 val rdd1 = rdd.map(x => x + 1) // tansformation rdd1.collect // action
mapPartitions(func)
通过对这个RDD的每个分区应用一个函数来返回一个新的RDD。
mapPartitions()可以作为map()和foreach()的替代方法。可以对每个分区调用mapPartitions(),而对RDD中的每个元素调用map()和foreach()。因此,可以根据每个分区而不是每个元素进行初始化。
// 构造RDD val x = spark.sparkContext.parallelize(Array(1,2,3,4,5,6,7,8,9,10), 2) // mapPartitions转换 x.mapPartitions(iter => Iterator(iter.toArray)) .collect .foreach(item => println(item.toList)) // 自定义函数(迭代器求和) def f(i:Iterator[Int]) = { // 每个分区的每个元素翻倍 Tuple1(i.sum).productIterator } // 应用mapPartitions转换,求每个分区的元素和 val y = x.mapPartitions(f) // 返回结果 y.collect
mapPartitionsWithIndex(func)
通过对这个RDD的每个分区应用一个函数来返回一个新的RDD,同时跟踪原始分区的索引。mapPartitionsWithIndex类似于mapPartitions(),但它提供了第二个参数索引,用于跟踪分区。
val x = spark.sparkContext.parallelize(Array(1,2,3,4,5,6,7,8,9,10), 2) // 定义函数 f def f(partitionIndex:Int, i:Iterator[Int]) = { (partitionIndex, i.sum).productIterator } val y = x.mapPartitionsWithIndex(f) y.glom.collect
flatMap(func)
val rdd2 = rdd.flatMap(x => x.to(3)) rdd2.collect
filter(func)
val rdd3 = rdd.filter(x => x!=1) rdd3.collect
sample(withReplacement, fraction, seed)
返回这个RDD的一个采样子集。其中各参数含义如下:
- withReplacement:是否可以对元素进行多次采样(采样后替换)
- fraction:抽样因子。对于without replacement,每个元素被选中的概率,fraction值必须是[0,1]之间;对于with replacement,每个元素被选择的期望次数,fraction值必须大于等于0。
- seed:用于随机数生成器的种子。
注意:这并不能保证精确地提供给定RDD的计数的因子。
val rdd5 = rdd.sample(false,0.5) rdd5.collect
distinct([numPartitions])):返回一个包含这个RDD中不同元素的新RDD。
val rdd4 = rdd.distinct() rdd4.collect
keyBy(func): RDD[(K, T)]
当在类型为T的数据集上调用时,返回一个(K, T)元组对的数据集。通过应用func函数创建这个RDD中元素的元组。
val x= sc.parallelize(Array("John", "Fred", "Anna", "James")) val y= x.keyBy(w => w.charAt(0)) println(y.collect().mkString(", "))
groupBy(func),groupBy(func, numPartitions),groupBy(func, partitioner)
当在类型为T的数据集上调用时,返回一个(K, Iterable[T])元组的数据集。
返回分组项的RDD。每个组由一个key和一系列映射到该key的元素组成。每个组内元素的顺序不能得到保证,甚至在每次计算结果RDD时可能会有所不同。这个方法有可能会引起数据shuffle。
val x = sc.parallelize(Array("Joseph", "Jimmy", "Tina", "Thomas", "James", "Cory", "Christine", "Jackeline", "Juan"), 3) // 每第一个字符创建一个组 val y = x.groupBy(word => word.charAt(0)) y.collect // 另一个短的语法 val y = x.groupBy(_.charAt(0)) y.collect
sortBy(func,[ascending],[numPartitions])
返回这个按给定key函数排序的RDD。
val data = List(3,1,90,3,5,12) val rdd = sc.parallelize(data) rdd.collect // 默认升序 rdd.sortBy(x => x).collect // 降序 rdd.sortBy(x => x, false).collect val result = rdd.sortBy(x => x, false) result.partitions.size // 改变分区数为1 val result = rdd.sortBy(x => x, false, 1) result.partitions.size
上面的实例对rdd中的元素进行升序排序。并对排序后的RDD的分区个数进行了修改,上面的result就是排序后的RDD,默认的分区个数是2,而我们对它进行了修改,所以最后变成了1。
glom():RDD[Array[T]]
返回将每个分区中的所有元素合并到一个数组中创建的RDD,一个分区一个数组。当在类型为T的RDD上调用时,返回一个Array[T]的RDD。
// 构造RDD val x = spark.sparkContext.parallelize(Array(1,2,3,4,5,6,7,8,9,10), 2) // 将每个分区中的所元素合并成一个数组并返回 x.glom().collect // 分区求和 x.glom().map(_.sum).collect
repartition(numPartitions)
随机地重新shuffle RDD中的数据,以创建更多或更少的分区,并在它们之间进行平衡。repartition()用于增加或减少RDD分区。需要注意的一点是,Sparkcoalesce()是非常昂贵的操作,因为它会跨多个分区转移数据。
val rdd = spark.sparkContext.parallelize(Range(0,20)) println("当前分区数:" + rdd.partitions.size) // 2 val rdd2 = rdd.repartition(4) println("重分区后,现在的分区数:" + rdd2.partitions.size) // 4
coalesce(numPartitions)
将RDD中的分区数量减少到numpartition。coalesce()仅用于以一种有效的方式减少分区数量,适用于过滤大型数据集后更有效地运行操作。这是repartition()的优化或改进版本,其中使用合并可以降低跨分区的数据移动。
需要注意的一点是,Spark repartition()是非常昂贵的操作,因为它会跨多个分区转移数据。
val rdd = spark.sparkContext.parallelize(Range(0,20)) println("当前分区数:" + rdd.partitions.size) // 2 val rdd3 = rdd.coalesce(1) println("合并分区后,现在的分区数:" + rdd3.partitions.size) // 1
randomSplit(weights, seed)
使用提供的权重随机分割这个RDD,以数组形式返回拆分后的RDD(即拆分后的RDD组成的数组并返回)。其中各参数含义如下:
- weights:分割的权重,如果它们的和不等于1,将被标准化。
- seed:随机种子。
// 构造一个RDD val rdd1 = spark.sparkContext.parallelize(Array(1,2,3,4,5,6,7,8,9,10)) // 按80/20分割数据集 val splitedRDD = rdd1.randomSplit(Array(0.8,0.2)) // 查看 splitedRDD(0).collect // Array(2, 4, 5, 6, 7, 8, 9, 10) splitedRDD(0).collect // Array(1, 3)