数据分区和自定义数据分区
数据分区(partition)是 Spark 中的重要概念,是Spark在集群中的多个节点之间划分数据的机制。它是RDD的最小单元,RDD是由分布在各个节点上的分区组成的。Spark使用分区来管理数据,分区的数量决定了task的数量,每个task对应着一个数据分区。这些分区有助于并行化分布式数据处理。
配套视频:
默认情况下,为每个HDFS block块创建一个分区,该分区默认为128MB(Spark 2.x)。例如,当从本地文件系统加载一个文本文件到Spark时,文件的内容被分成几个分区,这些分区均匀地分布在集群中的节点上。在同一个节点上可能会出现不止一个分区。所有这些分区的总和形成了RDD。这就是“弹性分布数据集”中“分布”一词的来源。
查看RDD的分区数量
使用下面的代码查看RDD的分区数量:
// 构造一个pair rdd val pairs = sc.parallelize(List((1, 1), (2, 2), (3, 3))) // 查看分区数量 pairs.partitions.size
分区的数量可以在创建 RDD 时指定。例如,在调用 textFile和parallelize方法创建 RDD 时,可手动指定分区个数。方法签名如下:
deftextFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String] defparallelize[T](seq: Seq[T], numSlices: Int = defaultParallelism)(implicit arg0: ClassTag[T]): RDD[T]
如果未指定 RDD 的分区数量,则在创建 RDD 时,Spark 将使用默认分区数,默认值为“spark.default.parallelism”配置的参数。
调整RDD分区数
RDD从数据源生成的时候,数据通常是随机分配到不同的分区或者保持数据源的分区。RDD分区数的多少,会对Spark程序的执行产生一定的影响。因为除了影响整个集群中的数据分布之外,它还直接决定了将要运行RDD转换的任务的数量。
如果分区数量太少,则直接影响是集群计算资源不能被充分利用。例如分配 8 个核,但分区数量为 4,则将有一半的核没有利用到。此外,因为数据集可能会变得太大,当无法装入executor的内存中时可能会导致内存问题。
如果分区数量太多,虽然计算资源能够充分利用,但会导致 task 数量过多,而 task 数量过多会影响执行效率,主要是 task 在序列化和网络传输过程带来较大的时间开销。
根据Spark RDD Programming Guide上的建议,集群节点的每个核分配 2-4 个分区比较合理,也就是说,建议将分区数设置为集群中CPU核数的三到四倍。
最主要的两种调整数据分区的方法是coalesce和repartition函数。
coalesce转换用于更改分区的数量。它可以触发的RDD shuffling,这取决于shuffle标志(默认禁用,即false)。(repartition(N)方法相当于coalesce(N, true)方法)。在减少分区时,coalesce并没有对所有数据进行移动,仅仅是在原来分区的基础之上进行了合并而已,这样的操作可以减少数据的移动,所以效率较高。
【示例】对RDD进行重分区。
// 在创建时,指定分区个数 val rdd1 = sc.parallelize(Seq(1,2,3,4,5,6,7,8), 4) rdd1.collect rdd1.partitions.size // rdd1的分区数量, 目前为4 // 对于通过转换得到的新RDD,直接调用repartition方法重新分区 val rdd2 = rdd1.map((x) => x*x) // 转换得到rdd2 rdd2.collect val rdd3 = rdd2.repartition(8) // 重新分区,得到rdd3 rdd3.partitions.size // rdd3的分区数量,这时为8
使用数据分区器
当需要对Pair RDD进行重分区时,RDD的分区由org.apache.spark.Partitioner对象执行,该分区器对象将一个分区索引赋给每个RDD元素(在每个key和分区ID间建立起映射,分区ID的值从0到numPartitions - 1)。
Spark内置提供了两个Partitioner分区器实现,分别是:
- org.apache.spark.HashPartitioner
- org.apache.spark.RangePartitioner
HashPartitioner是Spark的默认分区器,它基于一个元素的Java散列码(或者是Pair RDDs中的key的散列码)计算的分区索引。计算公式如下:
partitionIndex = key hashCode % numberOfPartitions
分区索引是准随机的;因此,分区很可能不会完全相同大小。然而,在具有相对较少分区的大型数据集中,该算法可能会在其中均匀地分布数据。
当使用HashPartitioner时,数据分区的默认数量是由Spark配置参数“spark.default.parallelism”决定的。如果该参数没有被用户指定,那么它将被设置为集群中的核的数量。
RangePartitioner将已排序的RDD的数据划分为大致相等的范围。它对传递给它的RDD的内容进行了采样,并根据采样数据确定了范围边界。一般不太可能直接使用RangePartitioner。
partitionBy()方法
当在Pair RDD上调用partitionBy方法进行重分区时,需要向它传递一个参数:期望的Partitioner(分区器)对象。如果分区器与之前使用的分区器相同,则保留分区,RDD保持不变。否则,就会安排一次shuffle,并创建一个新的RDD。
【示例】在调用partitionBy方法进行重分区时,使用指定的分区器。
val pairs = sc.parallelize(List((1, 1), (2, 2), (3, 3))) pairs.partitioner // 查看所使用的分区器 pairs.partitions.size // 初始分区数量 val repairs = pairs.repartition(4) // 重新分区 repairs.partitions.size // 重分区之后的数量 import org.apache.spark.HashPartitioner val partitionedRDD = pairs.partitionBy(new HashPartitioner(2)) // 使用指定的分区器 partitionedRDD.persist() // 持久化,以便后续操作重复使用partitioned partitionedRDD.partitioner // 查看所使用的分区器 partitionedRDD.partitions.size // 查看分区数量
上面代码中,传给partitionBy()的参数值2,代表分区的数量,它将控制有多少并行的tasks执行未来的RDD上的操作(如join)。一般来说,这个值至少与集群中核的数量一样多。
只有当一个数据集在面向key的操作中被重用多次的情况下(例如join操作),控制分区才有意义。
自定义数据分区器
当需要精确地在分区中放置数据时,也可以自定义分区器。自定义分区只可以在Pair RDD上使用。
大多数Pair RDD转换有两个额外的重载方法:一个接受额外的Int参数(所需的分区数量),另一个则接受一个(定制)Partitioner类型的附加参数。其中第一个方法使用默认的HashPartitioner。
例如,下面两行代码是相等的,因为它们都应用了100个分区的HashPartitioner:
rdd.foldByKey(afunction, 100) // 使用默认的HashPartitioner rdd.foldByKey(afunction, new HashPartitioner(100))
如果Pair RDD转换没有指定一个分区器,那么所使用的分区数量将是父RDD(转换为这个分区的RDD)的最大分区数。如果父RDD中没有一个定义了分区器,那么将使用HashPartitioner,分区数由spark.default.parallelism参数指定。
另一种改变Pair RDD中分区之间数据的默认位置的方法是使用默认的HashPartitioner,但是根据某种算法更改key的散列码(hash code)。
【示例】使用自定义分区器对Pair RDD进行重分区,使得Pair RDD中所有偶数写到一个输出文件,所有奇数写到另一个输出文件。
// 自定义分区器 class UsridPartitioner(numParts:Int) extends org.apache.spark.Partitioner{ //覆盖分区数 override def numPartitions: Int = numParts //覆盖分区号获取函数 override def getPartition(key: Any): Int = { key.toString.toInt%10 // 取key的最后一位数字 } } // 构造一个RDD val rdd1 = spark.sparkContext.parallelize(1 to 100000) rdd1.getNumPartitions // 使用自定义的分区器进行重分区,并指定分区数量 val rdd2 = rdd1.map((_,null)).partitionBy(new NumberPartitioner(2)) rdd2.getNumPartitions // 将结果输出到文件中存储 rdd2.map(_._1).saveAsTextFile("file:///home/hduser/data/spark/files-output2") // 加载上面存储的结果文件1,会发现都是偶数 sc.textFile("file:///home/hduser/data/spark/files-output2/part-00000") .map(_.toInt) .takeOrdered(10) // 加载上面存储的结果文件2,会发现都是奇数 sc.textFile("file:///home/hduser/data/spark/files-output2/part-00001") .map(_.toInt) .takeOrdered(10)
也可以在终端窗口中使用如下的Linux命令来查看生成的结果文件:
$ ls /home/hduser/data/spark/files-output2 $ head -10 /home/hduser/data/spark/files-output2/part-00000 $ head -10 /home/hduser/data/spark/files-output2/part-00001