【限时免费】岁尾最强一次云计算大年夜会,看传统、社区、互联网企业若何碰撞?
媒介
本文涉及源码基于Spark2.0.0和Hadoop2.6.0,不合版本代码可能不一致,需本身对应。此外针对TextInputFormat格局的Hive表,其他格局的比如Parquet有Spark本身的高效实现,不在评论辩论范围之内
分析
Spark攫取Hive表是经由过程HadoopRDD扫描上来的,具体可见 org.apache.spark.sql.hive.TableReader类,构建HadoopRDD的代码如下
这里inputFormatClass是Hive创建时指定的,默认不指定为 org.apache.hadoop.mapred.TextInputFormat,由它就涉及到了HDFS文件的FileSplit数,大年夜而决定了上层Spark的partition数。在进入HadoopRDD类查看之前,还有一个参数须要我们留意,就是 _minSplitsPerRDD,它在后面SplitSize的计算中是起了感化的。
在 getPartitions 办法里我们可以看到 FileSplit数最后决定了Spark攫取Hive表的Task数,下面我们再来看看 mapred.TextInputFormat 类里 getSplits 的实现
我们看一下它的定义
- private val _minSplitsPerRDD = if (sparkSession.sparkContext.isLocal) {
- 0 // will splitted based on block by default.
- } else {
- math.max(hadoopConf.getInt("mapred.map.tasks", 1),
- sparkSession.sparkContext.defaultMinPartitions)
- }
在我们指定以--master local模式跑的时刻,它为0,而在其他模式下,则是求的一个最大年夜值。这里重点看 defaultMinPartitions,如下
- def defaultMinPartitions: Int = math.min(defaultParallelism, 2)
- // defaultParallelism 在yarn和standalone模式下的计算
- def defaultParallelism(): Int = {
- conf.getInt("spark.default.parallelism", math.max(totalCoreCount.get(), 2))
- }
大年夜这里可以看到,defaultMinPartitions的值一般为2,而 mapred.map.tasks 或者 mapreduce.job.maps( 新版参数)是Hadoop的内建参数,其默认值也为2,一般很少去改变它。所以这里_minSplitsPerRDD的值根本就是2了。
下面我们跟到HadoopRDD类里,去看看它的partitions是若何来的
大年夜膳绫擎可以看到,splitSize是大年夜 computeSplitSize(goalSize, minSize, blockSize) 计算来的,这三个参数我们都知道大年夜小,那么计算规矩是怎么样的呢
- def getPartitions: Array[Partition] = {
- val jobConf = getJobConf()
- // add the credentials here as this can be called before SparkContext initialized
- SparkHadoopUtil.get.addCredentials(jobConf)
- // inputFormat就是膳绫擎参数inputFormatClass所设备的类的实例
- val inputFormat = getInputFormat(jobConf)
- // 此处获取FileSplit数,minPartitions就是膳绫擎的_minSplitsPerRDD
- val inputSplits = inputFormat.getSplits(jobConf, minPartitions)
- val array = new Array[Partition](inputSplits.size)
- // 大年夜这里可以看出FileSplit数决定了Spark扫描Hive表的partition数
- for (i <- 0 until inputSplits.size) {
- array(i) = new HadoopPartition(id, i, inputSplits(i))
- }
- array
- }
分两步来看,起首是扫描文件,计算文件大年夜小的部分
- FileStatus[] files = listStatus(job);
推荐阅读
【限时免费】岁尾最强一次云计算大年夜会,看传统、社区、互联网企业若何碰撞? 之后是7nm的Navi(仙后座),再之后是基于7nm+改进版的下下代架构。据ComputerBase报道,代号GFX10的芯片近>>>详细阅读
本文标题:从源码看Spark读取Hive表数据小文件和分块的问题
地址:http://www.17bianji.com/lsqh/40042.html
1/2 1