这里最重要的是这个Utils.nonNegativeMod(key.hashCode, numPartitions),它决定了数据进入到哪个分区。
说白了,就是基于这个key获取它的hashCode,然后对分区个数取模。因为HashCode可能为负,这里直接断定下,如不雅小于0,再加上分区个数即可。
那么这个界线是怎么肯定的呢?
- def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = {
- val bySize = (Seq(rdd) ++ others).sortBy(_.partitions.size).reverse
- for (r <- bySize if r.partitioner.isDefined && r.partitioner.get.numPartitions > 0) {
- return r.partitioner.get
- }
- if (rdd.context.conf.contains("spark.default.parallelism")) {
- new HashPartitioner(rdd.context.defaultParallelism)
- } else {
- new HashPartitioner(bySize.head.partitions.size)
- }
- }
算法参考源码:
是以,基于hash的分区,只要包管你的key是分散的,那么最终数据就不会出现数据倾斜的情况。
这个分区器,合适想要把数据打散的场景,然则如不雅雷同的key反复量很大年夜,依然会出现数据倾斜的情况。
每个分区器,最核心的办法,就是getPartition
- def getPartition(key: Any): Int = {
- val k = key.asInstanceOf[K]
- var partition = 0
- if (rangeBounds.length <= 128) {
- // If we have less than 128 partitions naive search
- while (partition < rangeBounds.length && ordering.gt(k, rangeBounds(partition))) {
- partition += 1
- }
- } else {
- // Determine which binary search method to use only once.
- partition = binarySearch(rangeBounds, k)
- // binarySearch either returns the match location
推荐阅读
跟着数字化企业尽力寻求最佳安然解决筹划来保护其赓续扩大的收集,很多企颐魅正在寻求供给互操作性功能的下一代对象。软件定义收集(SDN)具有很多的优势,经由过程将多个设备的┞菲握平面整>>>详细阅读
本文标题:Spark源码分析之分区器的作用
地址:http://www.17bianji.com/lsqh/34919.html
1/2 1