RDD-分区器(Partitioner)

RDD之间的依赖关系如果是Shuffle依赖,上游的RDD该如何确定每个分区的输出将交由下游RDD的那个分区呢?或者下游RDD的各个分区将具体依赖于上游RDD的哪些分区?Spark提供分区计算器来解决这个问题。

ShuffleDependency的partitioner属性的类型是Partitioner,抽象类Partitioner定义了分区计算器的接口规范,ShuffleDependency分区取决于Partitioner的具体实现。Partitioner源码,位于org.apache.spark.Partitioner

Partitioner的伴生对象,位于org.apache.spark.Partitioner

Partitioner的numPartitions方法用于获取分区数量。Partitioner的getPartition方法用于将输入的key映射到下游RDD的从0到numPartitions-1这样范围中的某一个分区。

分区程序必须是确定性的,即它必须在给定相同分区键的情况下返回相同的分区ID。

如果设置了spark.default.parallelism,我们将使用SparkContext defaultParallelism的值作为默认分区号,否则我们将使用上游分区的最大数量。

HashPartition

HashPartitioner是Spark的默认分区程序。 如果我们没有配置任何分区器,那么Spark将使用这个散列分区器来对数据进去分区。

注意 Java数组具有基于数组的身份而不是其内容的hashCode,因此尝试使用HashPartitioner对RDD [Array []]或RDD[(Array [],_)]进行分区将产生意外或不正确的结果。

HashPartitioner源码,位于org.apache.spark.Partitioner

HashPartitioner增加另一个名为partitions的构造器参数作为分区数,重写的numPartitions方法只返回partitions。重写的getPartition方法实际以key的hashCode和numPartitions作为参数调用Utils的nonNegativeMod方法。nonNegativeMod方法将对key的hashCode和numPartitions进行驱魔运算,得到key对应分区索引。

使用哈希和取模的方式,可以方便地计算出下游RDD的各个分区将具体处理那些key。由于上游RDD所处理的key的哈希值在取模后很可能产生数据倾斜,所以HashPartitioner并不是一个均衡的分区计算器。

RangePartitioner

从HashPartitioner分区的实现原理我们可以看出,其结果可能导致每个分区中数据量的不均匀,极端情况下会导致某些分区拥有RDD的全部数据,这显然不是我们需要的。而RangePartitioner分区则尽量保证每个分区中数据量的均匀,而且分区与分区之间是有序的,也就是说一个分区中的元素肯定都是比另一个分区内的元素小或者大;但是分区内的元素是不能保证顺序的。简单的说就是将一定范围内的数映射到某一个分区内。

RangePartitioner分区器的主要作用就是将一定范围内的数映射到某一个分区内,所以它的实现中分界的算法尤为重要。该分区器的实现方式主要是通过两个步骤来实现的,

  • 第一步:先重整个RDD中抽取出样本数据,将样本数据排序,计算出每个分区的最大key值,形成一个Array[KEY]类型的数组变量rangeBounds;
  • 第二步:判断key在rangeBounds中所处的范围,给出该key值在下一个RDD中的分区id下标;该分区器要求RDD中的KEY类型必须是可以排序的

RangePartitioner源码,位于org.apache.spark.Partitioner

其实RangePartitioner的重点是在于构建rangeBounds数组对象,主要步骤是:

  1. 如果分区数量小于1或者rdd中不存在数据的情况下,直接返回一个空的数组,不需要计算range的边界;如果分区数据大于1的情况下,而且rdd中有数据的情况下,才需要计算数组对象
  1. 计算总体的数据抽样大小sampleSize,计算规则是:至少每个分区抽取20个数据或者最多1M的数据量
  1. 根据sampleSize和分区数量计算每个分区的数据抽样样本数量sampleSizePrePartition
  1. 调用RangePartitioner的sketch函数进行数据抽样,计算出每个分区的样本
  1. 计算样本的整体占比以及数据量过多的数据分区,防止数据倾斜
  1. 对于数据量比较多的RDD分区调用RDD的sample函数API重新进行数据抽取
  1. 将最终的样本数据通过RangePartitoner的determineBounds函数进行数据排序分配,计算出rangeBounds

RangePartitioner的sketch函数的作用是对RDD中的数据按照需要的样本数据量进行数据抽取,主要调用SamplingUtils类的reservoirSampleAndCount方法对每个分区进行数据抽取,抽取后计算出整体所有分区的数据量大小;reservoirSampleAndCount方法的抽取方式是先从迭代器中获取样本数量个数据(顺序获取), 然后对剩余的数据进行判断,替换之前的样本数据,最终达到数据抽样的效果。

RangePartitioner的determineBounds函数的作用是根据样本数据记忆权重大小确定数据边界, 源码:

一般而已,使用默认的HashPartitioner即可,RangePartitioner的使用有一定的局限性


分享到:


相關文章: