数据分区和自定义数据分区

数据分区(partition)是 Spark 中的重要概念,是Spark在集群中的多个节点之间划分数据的机制。它是RDD的最小单元,RDD是由分布在各个节点上的分区组成的。Spark使用分区来管理数据,分区的数量决定了task的数量,每个task对应着一个数据分区。这些分区有助于并行化分布式数据处理。

配套视频:

RDD数据重分区

默认情况下,为每个HDFS block块创建一个分区,该分区默认为128MB(Spark 2.x)。例如,当从本地文件系统加载一个文本文件到Spark时,文件的内容被分成几个分区,这些分区均匀地分布在集群中的节点上。在同一个节点上可能会出现不止一个分区。所有这些分区的总和形成了RDD。这就是“弹性分布数据集”中“分布”一词的来源。

查看RDD的分区数量

使用下面的代码查看RDD的分区数量:

// 构造一个pair rdd
val pairs = sc.parallelize(List((1, 1), (2, 2), (3, 3)))

// 查看分区数量
pairs.partitions.size

分区的数量可以在创建 RDD 时指定。例如,在调用 textFile和parallelize方法创建 RDD 时,可手动指定分区个数。方法签名如下:

deftextFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String]
defparallelize[T](seq: Seq[T], numSlices: Int = defaultParallelism)(implicit arg0: ClassTag[T]): RDD[T]

如果未指定 RDD 的分区数量,则在创建 RDD 时,Spark 将使用默认分区数,默认值为“spark.default.parallelism”配置的参数。

调整RDD分区数

RDD从数据源生成的时候,数据通常是随机分配到不同的分区或者保持数据源的分区。RDD分区数的多少,会对Spark程序的执行产生一定的影响。因为除了影响整个集群中的数据分布之外,它还直接决定了将要运行RDD转换的任务的数量。

如果分区数量太少,则直接影响是集群计算资源不能被充分利用。例如分配 8 个核,但分区数量为 4,则将有一半的核没有利用到。此外,因为数据集可能会变得太大,当无法装入executor的内存中时可能会导致内存问题。

如果分区数量太多,虽然计算资源能够充分利用,但会导致 task 数量过多,而 task 数量过多会影响执行效率,主要是 task 在序列化和网络传输过程带来较大的时间开销。

根据Spark RDD Programming Guide上的建议,集群节点的每个核分配 2-4 个分区比较合理,也就是说,建议将分区数设置为集群中CPU核数的三到四倍。

最主要的两种调整数据分区的方法是coalesce和repartition函数。

coalesce转换用于更改分区的数量。它可以触发的RDD shuffling,这取决于shuffle标志(默认禁用,即false)。(repartition(N)方法相当于coalesce(N, true)方法)。在减少分区时,coalesce并没有对所有数据进行移动,仅仅是在原来分区的基础之上进行了合并而已,这样的操作可以减少数据的移动,所以效率较高。

【示例】对RDD进行重分区。

// 在创建时,指定分区个数
val rdd1 = sc.parallelize(Seq(1,2,3,4,5,6,7,8), 4)
rdd1.collect

rdd1.partitions.size                // rdd1的分区数量, 目前为4

// 对于通过转换得到的新RDD,直接调用repartition方法重新分区
val rdd2 = rdd1.map((x) => x*x)     // 转换得到rdd2
rdd2.collect

val rdd3 = rdd2.repartition(8)      // 重新分区,得到rdd3
rdd3.partitions.size                // rdd3的分区数量,这时为8

使用数据分区器

当需要对Pair RDD进行重分区时,RDD的分区由org.apache.spark.Partitioner对象执行,该分区器对象将一个分区索引赋给每个RDD元素(在每个key和分区ID间建立起映射,分区ID的值从0到numPartitions - 1)。

Spark内置提供了两个Partitioner分区器实现,分别是:

  • org.apache.spark.HashPartitioner
  • org.apache.spark.RangePartitioner

HashPartitioner是Spark的默认分区器,它基于一个元素的Java散列码(或者是Pair RDDs中的key的散列码)计算的分区索引。计算公式如下:

   partitionIndex = key hashCode % numberOfPartitions

分区索引是准随机的;因此,分区很可能不会完全相同大小。然而,在具有相对较少分区的大型数据集中,该算法可能会在其中均匀地分布数据。

当使用HashPartitioner时,数据分区的默认数量是由Spark配置参数“spark.default.parallelism”决定的。如果该参数没有被用户指定,那么它将被设置为集群中的核的数量。

RangePartitioner将已排序的RDD的数据划分为大致相等的范围。它对传递给它的RDD的内容进行了采样,并根据采样数据确定了范围边界。一般不太可能直接使用RangePartitioner。

partitionBy()方法

当在Pair RDD上调用partitionBy方法进行重分区时,需要向它传递一个参数:期望的Partitioner(分区器)对象。如果分区器与之前使用的分区器相同,则保留分区,RDD保持不变。否则,就会安排一次shuffle,并创建一个新的RDD。

【示例】在调用partitionBy方法进行重分区时,使用指定的分区器。

val pairs = sc.parallelize(List((1, 1), (2, 2), (3, 3)))
pairs.partitioner				// 查看所使用的分区器
pairs.partitions.size				// 初始分区数量

val repairs = pairs.repartition(4)              // 重新分区 
repairs.partitions.size			        // 重分区之后的数量

import org.apache.spark.HashPartitioner

val partitionedRDD = pairs.partitionBy(new HashPartitioner(2))   // 使用指定的分区器
partitionedRDD.persist() 	                // 持久化,以便后续操作重复使用partitioned
partitionedRDD.partitioner			// 查看所使用的分区器
partitionedRDD.partitions.size		        // 查看分区数量

上面代码中,传给partitionBy()的参数值2,代表分区的数量,它将控制有多少并行的tasks执行未来的RDD上的操作(如join)。一般来说,这个值至少与集群中核的数量一样多。

只有当一个数据集在面向key的操作中被重用多次的情况下(例如join操作),控制分区才有意义。

自定义数据分区器

当需要精确地在分区中放置数据时,也可以自定义分区器。自定义分区只可以在Pair RDD上使用。

大多数Pair RDD转换有两个额外的重载方法:一个接受额外的Int参数(所需的分区数量),另一个则接受一个(定制)Partitioner类型的附加参数。其中第一个方法使用默认的HashPartitioner。

例如,下面两行代码是相等的,因为它们都应用了100个分区的HashPartitioner:

  rdd.foldByKey(afunction, 100)      // 使用默认的HashPartitioner
  rdd.foldByKey(afunction, new HashPartitioner(100))

如果Pair RDD转换没有指定一个分区器,那么所使用的分区数量将是父RDD(转换为这个分区的RDD)的最大分区数。如果父RDD中没有一个定义了分区器,那么将使用HashPartitioner,分区数由spark.default.parallelism参数指定。

另一种改变Pair RDD中分区之间数据的默认位置的方法是使用默认的HashPartitioner,但是根据某种算法更改key的散列码(hash code)。

【示例】使用自定义分区器对Pair RDD进行重分区,使得Pair RDD中所有偶数写到一个输出文件,所有奇数写到另一个输出文件。

// 自定义分区器
class UsridPartitioner(numParts:Int) extends org.apache.spark.Partitioner{

    //覆盖分区数
    override def numPartitions: Int = numParts  

    //覆盖分区号获取函数
    override def getPartition(key: Any): Int = {
         key.toString.toInt%10       // 取key的最后一位数字
    }
}

// 构造一个RDD
val rdd1 = spark.sparkContext.parallelize(1 to 100000)
rdd1.getNumPartitions

// 使用自定义的分区器进行重分区,并指定分区数量
val rdd2 = rdd1.map((_,null)).partitionBy(new NumberPartitioner(2)) 
rdd2.getNumPartitions

// 将结果输出到文件中存储
rdd2.map(_._1).saveAsTextFile("file:///home/hduser/data/spark/files-output2")  

// 加载上面存储的结果文件1,会发现都是偶数
sc.textFile("file:///home/hduser/data/spark/files-output2/part-00000")
  .map(_.toInt)
  .takeOrdered(10)

// 加载上面存储的结果文件2,会发现都是奇数
sc.textFile("file:///home/hduser/data/spark/files-output2/part-00001")
  .map(_.toInt)
  .takeOrdered(10)

也可以在终端窗口中使用如下的Linux命令来查看生成的结果文件:

$ ls /home/hduser/data/spark/files-output2
$ head -10 /home/hduser/data/spark/files-output2/part-00000
$ head -10 /home/hduser/data/spark/files-output2/part-00001

《PySpark原理深入与编程实战》