RDD Action操作

Action是返回一个结果给驱动程序或将结果写入存储的操作,并开始一个计算,如count()和first()。

一旦创建了RDD,就只有在执行了action时才会执行各种转换。一个action的执行结果可以是将数据写回存储系统,或者返回到驱动程序,以便在本地进行进一步的计算。

常用的action操作函数

reduce(func)
使用函数func(接受两个参数并返回一个参数)聚合数据集的元素。这个函数应该是交换律和结合律,这样才能并行地正确地计算它。
collect()
将RDD操作的所有结果返回给驱动程序。这通常对产生足够小的数据集的操作很有用。
count()
这会返回数据集中的元素数量或RDD操作的结果输出。
first()
返回数据集的第一个元素或RDD操作产生的结果输出。它的工作原理类似于take(1)函数。
take(n)
返回RDD的前n个元素。它首先扫描一个分区,然后使用该分区的结果来估计满足该限制所需的其他分区的数量。这个方法应该只在预期得到的数组很小的情况下使用,因为所有的数据都加载到驱动程序的内存中。
top(n)
按照指定的隐式排序[T]从这个RDD中取出最大的k个元素,并维护排序。这与takeOrdered相反。这个方法应该只在预期得到的数组很小的情况下使用,因为所有的数据都加载到驱动程序的内存中。
takeSample(withReplacement, num, [seed])
返回一个数组,其中包含来自数据集的元素的num个随机样本。它有三个参数,如下:
  • withReplacement/withoutReplacement:这表示采样时有或没有替换(在取多个样本时,它指示是否将旧的样本替换回集合,然后取一个新的样本或者在不替换的情况下取样本)。对于withReplacement,参数应该是True和False。
  • num:这表示样本中元素的数量。
  • seed:这是一个随机数生成器种子(可选)。
takeOrdered(n)
返回RDD的前n个(最小的)元素,并维护排序。这和top是相反的。这个方法应该只在预期得到的数组很小的情况下使用,因为所有的数据都加载到驱动程序的内存中。
saveAsTextFile(path)
将数据集的元素作为文本文件(或文本文件集)写入本地文件系统、HDFS或任何其他hadoop支持的文件系统的给定目录中。Spark将对每个元素调用toString,将其转换为文件中的一行文本。
countByKey()
仅在类型(K, V)的RDDs上可用。返回(K, Int)对的hashmap和每个键的计数。
foreach(func)
在数据集的每个元素上运行函数func。

action操作代码

假设一个RDD,包含{1, 2, 3, 3}。下面是一些常用action操作代码示例。

// 构造RDD
val rdd1 = spark.sparkContext.parallelize(Array(4,5,1,2,8,9,3,6,7,10))

rdd.count
rdd.first
rdd.collect

// countByValue: 返回每个元素在RDD中出现的次数
rdd.countByValue

// take(n):返回RDD中的前n个元素
rdd.take(3)

// top(n):返回RDD中的top(n)个元素
rdd.top(3)

// takeOrdered(n):返回n个元素,基于隐含的顺序
rdd.takeOrdered(3)

// takeSample(withReplacement,num,[seed]):随机返回num个元素
rdd.takeSample(false,2)

几个重点action函数

下面我们着重介绍其中几个action函数。

reduce

这是一个action,并且一个宽依赖操作。例如,在下面的示例中,使用reduce计算List(1,2,3,3)中所有元素的和。

// 构造RDD
val rdd = sc.parallelize(List(1,2,3,3))

// reduce(func)
rdd.reduce((x,y) => x + y)

// 等价
rdd.reduce(_+_)

aggregate(zeroValue)(seqOp,combOp)

类似于reduce,但用来返回不同的类型。这个函数聚合每个分区的元素,然后使用给定的combine组合函数和一个中性的“零值”,对所有分区的结果进行聚合。其中各参数的含义如下:

  • zeroValue:seqOp操作符的每个分区的累积结果的初始值,combOp操作符的不同分区的合并结果的初始值—这通常是中性元素(例如,列表连接为Nil或求和为0或求积为1)。
  • seqOp:用于在分区内累积结果的运算符。
  • combOp:用于合并来自不同分区的结果的关联运算符。

这个aggregate函数类似于reduce()和fold()。但reduce和fold这两个函数有一个问题,那就是它们的返回值必须与RDD的数据类型相同。aggregate()函数就打破了这个限制。比如可以返回(Int, Int)元组,这在要计算平均值的时候很有用。

【示例】使用Spark RDD aggregate()函数计算RDD元素的总和。

import org.apache.spark.sql.SparkSession

object AggregateExample2 extends App {

  val spark = SparkSession.builder()
    .appName("SparkByExamples.com")
    .master("local")
    .getOrCreate()

  spark.sparkContext.setLogLevel("ERROR")

  // 构造RDD
  val listRdd = spark.sparkContext.parallelize(List(1,2,3,4,5,3,2))

  // 定义分区计算函数
  def param0= (accu:Int, v:Int) => accu + v

  // 定义分区结果合并计算函数
  def param1= (accu1:Int,accu2:Int) => accu1 + accu2

  // 执行aggregate计算
  val result = listRdd.aggregate(0)(param0,param1)
  println("输出: " + result)
}

执行以上代码,输出结果如下:

输出: 20

【示例】使用aggregate计算List(1,2,3,3)中所有元素的平均值。

要算平均值,需求计算出两个值,一个是RDD的各元素的累加和,另一个是元素计数。对于加法计算,要初始化为(0, 0)。

import org.apache.spark.sql.SparkSession

object AggregateExample2 extends App {

  val spark = SparkSession.builder()
    .appName("SparkByExamples.com")
    .master("local")
    .getOrCreate()

  spark.sparkContext.setLogLevel("ERROR")

  // 构造一个RDD,指定有两个分区
  val rdd = sc.parallelize(List(1,2,3,3), 2)

  // 查看分区数
  println(s"分区数: ${rdd.partitions.size}")

  // 计算所有元素的平均值
  def param1= (accu:Int, v:Int) => accu + v
  def param2= (accu1:Int, accu2:Int) => accu1 + accu2

  val result = rdd.aggregate((0,0))((acc,e)=>(acc._1+e, acc._2+1),
(acc1,acc2)=>(acc1._1+acc2._1, acc1._2+acc2._2))
  val avg = result._1/result._2.toDouble

  println(s"RDD中所有元素的平均值是:${avg}")
}

执行以上代码,输出结果如下:

分区数: 2
RDD中所有元素的平均值是:2.25

《Flink原理深入与编程实战》