就拿groupbykey来说:
比来因为手抖,在Spark中给本身挖了一个数据倾斜的坑。为懂得决这个问题,顺逼揭捉?究了下Spark分区器的道理,趁着周末加班总结一下~
先说说数据倾斜
数据倾斜是指Spark中的RDD在计算的时刻,每个RDD内部的分区包含的数据不平均。比如一共有5个分区,个一一个占领了90%的数据,这就导致本来5个分区可以5小我一路并行干活,结不雅四小我不怎么干活,工作全都压到一小我身上了。碰到这种问题,网上有很多的解决办法。
比如你想要对某个rdd做groupby,然后做join操作,如不雅分组的key就是分布不平均的,那么真样都是无法优化的。因为一旦这个key被切分,就无法完全的做join了,如不雅纰谬这个key切分,必定会造查对应的分区数据倾斜。
不过,懂得数据为什么会倾斜照样很重要的,持续往下看吧!
分区的感化
在PairRDD即(key,value)这种格局的rdd中,很多操作都是基于key的,是以为了自力瓜分义务,会按照key对数据进行重组。比如groupbykey
重组肯定是须要一个规矩的,最常见的就是基于Hash,Spark还供给了一种稍微复杂点的基于抽样的Range分区办法。
下面我们先看看分区器在Spark计算流程中是怎么应用的:
Paritioner的应用
- def groupByKey(): JavaPairRDD[K, JIterable[V]] =
- fromRDD(groupByResultToJava(rdd.groupByKey()))
它会调用PairRDDFunction的groupByKey()办法
- def groupByKey(): RDD[(K, Iterable[V])] = self.withScope {
- groupByKey(defaultPartitioner(self))
- }
RangePartitioner
在这个办法琅绫擎创建了默认的分区器。默认的分区器是如许定义的:
起首获取当前分区的分区个数,如不雅没有设置spark.default.parallelism参数,则创建一个跟之前分区个数一样的Hash分区器。
当然,用户也可以自定义分区器,或者应用其他供给的分区器。API琅绫擎也是支撑的:
- // 传入分区器对象
- def groupByKey(partitioner: Partitioner): JavaPairRDD[K, JIterable[V]] =
- fromRDD(groupByResultToJava(rdd.groupByKey(partitioner)))
- // 传入分区的个数
- def groupByKey(numPartitions: Int): JavaPairRDD[K, JIterable[V]] =
- fromRDD(groupByResultToJava(rdd.groupByKey(numPartitions)))
HashPatitioner
Hash分区器,是最简单也是默认供给的分区器,懂得它的分区规矩,对我们处理数据倾斜或者设计分组的key时,照样很有赞助的。
- class HashPartitioner(partitions: Int) extends Partitioner {
- require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.")
- def numPartitions: Int = partitions
- // 经由过程key寂?驿HashCode,并根据分区数取模。如不雅结不雅小于0,直接加上分区数。
- def getPartition(key: Any): Int = key match {
- case null => 0
- case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
- }
- // 比较两个分区器是否雷同,直接比较其分区个数就行
推荐阅读
跟着数字化企业尽力寻求最佳安然解决筹划来保护其赓续扩大的收集,很多企颐魅正在寻求供给互操作性功能的下一代对象。软件定义收集(SDN)具有很多的优势,经由过程将多个设备的┞菲握平面整>>>详细阅读
本文标题:Spark源码分析之分区器的作用
地址:http://www.17bianji.com/lsqh/34919.html
1/2 1