RDD Action操作
Action是返回一个结果给驱动程序或将结果写入存储的操作,并开始一个计算,如count()和first()。
一旦创建了RDD,就只有在执行了action时才会执行各种转换。一个action的执行结果可以是将数据写回存储系统,或者返回到驱动程序,以便在本地进行进一步的计算。
常用的action操作函数
- reduce(func)
- 使用函数func(接受两个参数并返回一个参数)聚合数据集的元素。这个函数应该是交换律和结合律,这样才能并行地正确地计算它。
- collect()
- 将RDD操作的所有结果返回给驱动程序。这通常对产生足够小的数据集的操作很有用。
- count()
- 这会返回数据集中的元素数量或RDD操作的结果输出。
- first()
- 返回数据集的第一个元素或RDD操作产生的结果输出。它的工作原理类似于take(1)函数。
- take(n)
- 返回RDD的前n个元素。它首先扫描一个分区,然后使用该分区的结果来估计满足该限制所需的其他分区的数量。这个方法应该只在预期得到的数组很小的情况下使用,因为所有的数据都加载到驱动程序的内存中。
- top(n)
- 按照指定的隐式排序[T]从这个RDD中取出最大的k个元素,并维护排序。这与takeOrdered相反。这个方法应该只在预期得到的数组很小的情况下使用,因为所有的数据都加载到驱动程序的内存中。
- takeSample(withReplacement, num, [seed])
-
返回一个数组,其中包含来自数据集的元素的num个随机样本。它有三个参数,如下:
- withReplacement/withoutReplacement:这表示采样时有或没有替换(在取多个样本时,它指示是否将旧的样本替换回集合,然后取一个新的样本或者在不替换的情况下取样本)。对于withReplacement,参数应该是True和False。
- num:这表示样本中元素的数量。
- seed:这是一个随机数生成器种子(可选)。
- takeOrdered(n)
- 返回RDD的前n个(最小的)元素,并维护排序。这和top是相反的。这个方法应该只在预期得到的数组很小的情况下使用,因为所有的数据都加载到驱动程序的内存中。
- saveAsTextFile(path)
- 将数据集的元素作为文本文件(或文本文件集)写入本地文件系统、HDFS或任何其他hadoop支持的文件系统的给定目录中。Spark将对每个元素调用toString,将其转换为文件中的一行文本。
- countByKey()
- 仅在类型(K, V)的RDDs上可用。返回(K, Int)对的hashmap和每个键的计数。
- foreach(func)
- 在数据集的每个元素上运行函数func。
action操作代码
假设一个RDD,包含{1, 2, 3, 3}。下面是一些常用action操作代码示例。
// 构造RDD val rdd1 = spark.sparkContext.parallelize(Array(4,5,1,2,8,9,3,6,7,10)) rdd.count rdd.first rdd.collect // countByValue: 返回每个元素在RDD中出现的次数 rdd.countByValue // take(n):返回RDD中的前n个元素 rdd.take(3) // top(n):返回RDD中的top(n)个元素 rdd.top(3) // takeOrdered(n):返回n个元素,基于隐含的顺序 rdd.takeOrdered(3) // takeSample(withReplacement,num,[seed]):随机返回num个元素 rdd.takeSample(false,2)
几个重点action函数
下面我们着重介绍其中几个action函数。
reduce
这是一个action,并且一个宽依赖操作。例如,在下面的示例中,使用reduce计算List(1,2,3,3)中所有元素的和。
// 构造RDD val rdd = sc.parallelize(List(1,2,3,3)) // reduce(func) rdd.reduce((x,y) => x + y) // 等价 rdd.reduce(_+_)
aggregate(zeroValue)(seqOp,combOp)
类似于reduce,但用来返回不同的类型。这个函数聚合每个分区的元素,然后使用给定的combine组合函数和一个中性的“零值”,对所有分区的结果进行聚合。其中各参数的含义如下:
- zeroValue:seqOp操作符的每个分区的累积结果的初始值,combOp操作符的不同分区的合并结果的初始值—这通常是中性元素(例如,列表连接为Nil或求和为0或求积为1)。
- seqOp:用于在分区内累积结果的运算符。
- combOp:用于合并来自不同分区的结果的关联运算符。
这个aggregate函数类似于reduce()和fold()。但reduce和fold这两个函数有一个问题,那就是它们的返回值必须与RDD的数据类型相同。aggregate()函数就打破了这个限制。比如可以返回(Int, Int)元组,这在要计算平均值的时候很有用。
【示例】使用Spark RDD aggregate()函数计算RDD元素的总和。
import org.apache.spark.sql.SparkSession object AggregateExample2 extends App { val spark = SparkSession.builder() .appName("SparkByExamples.com") .master("local") .getOrCreate() spark.sparkContext.setLogLevel("ERROR") // 构造RDD val listRdd = spark.sparkContext.parallelize(List(1,2,3,4,5,3,2)) // 定义分区计算函数 def param0= (accu:Int, v:Int) => accu + v // 定义分区结果合并计算函数 def param1= (accu1:Int,accu2:Int) => accu1 + accu2 // 执行aggregate计算 val result = listRdd.aggregate(0)(param0,param1) println("输出: " + result) }
执行以上代码,输出结果如下:
输出: 20
【示例】使用aggregate计算List(1,2,3,3)中所有元素的平均值。
要算平均值,需求计算出两个值,一个是RDD的各元素的累加和,另一个是元素计数。对于加法计算,要初始化为(0, 0)。
import org.apache.spark.sql.SparkSession object AggregateExample2 extends App { val spark = SparkSession.builder() .appName("SparkByExamples.com") .master("local") .getOrCreate() spark.sparkContext.setLogLevel("ERROR") // 构造一个RDD,指定有两个分区 val rdd = sc.parallelize(List(1,2,3,3), 2) // 查看分区数 println(s"分区数: ${rdd.partitions.size}") // 计算所有元素的平均值 def param1= (accu:Int, v:Int) => accu + v def param2= (accu1:Int, accu2:Int) => accu1 + accu2 val result = rdd.aggregate((0,0))((acc,e)=>(acc._1+e, acc._2+1), (acc1,acc2)=>(acc1._1+acc2._1, acc1._2+acc2._2)) val avg = result._1/result._2.toDouble println(s"RDD中所有元素的平均值是:${avg}") }
执行以上代码,输出结果如下:
分区数: 2 RDD中所有元素的平均值是:2.25