作家
登录

从源码看Spark读取Hive表数据小文件和分块的问题

作者: 来源: 2017-12-21 13:15:26 阅读 我要评论

【限时免费】岁尾最强一次云计算大年夜会,看传统、社区、互联网企业若何碰撞?


媒介

本文涉及源码基于Spark2.0.0和Hadoop2.6.0,不合版本代码可能不一致,需本身对应。此外针对TextInputFormat格局的Hive表,其他格局的比如Parquet有Spark本身的高效实现,不在评论辩论范围之内

分析

Spark攫取Hive表是经由过程HadoopRDD扫描上来的,具体可见 org.apache.spark.sql.hive.TableReader类,构建HadoopRDD的代码如下

这里inputFormatClass是Hive创建时指定的,默认不指定为 org.apache.hadoop.mapred.TextInputFormat,由它就涉及到了HDFS文件的FileSplit数,大年夜而决定了上层Spark的partition数。在进入HadoopRDD类查看之前,还有一个参数须要我们留意,就是 _minSplitsPerRDD,它在后面SplitSize的计算中是起了感化的。

在 getPartitions 办法里我们可以看到 FileSplit数最后决定了Spark攫取Hive表的Task数,下面我们再来看看 mapred.TextInputFormat 类里 getSplits 的实现

我们看一下它的定义

  1. private val _minSplitsPerRDD = if (sparkSession.sparkContext.isLocal) { 
  2. 0 // will splitted based on block by default
  3. else { 
  4. math.max(hadoopConf.getInt("mapred.map.tasks", 1), 
  5. sparkSession.sparkContext.defaultMinPartitions) 

在我们指定以--master local模式跑的时刻,它为0,而在其他模式下,则是求的一个最大年夜值。这里重点看 defaultMinPartitions,如下

  1. def defaultMinPartitions: Int = math.min(defaultParallelism, 2) 
  2.  
  3. // defaultParallelism 在yarn和standalone模式下的计算 
  4. def defaultParallelism(): Int = { 
  5. conf.getInt("spark.default.parallelism", math.max(totalCoreCount.get(), 2)) 

大年夜这里可以看到,defaultMinPartitions的值一般为2,而 mapred.map.tasks 或者 mapreduce.job.maps( 新版参数)是Hadoop的内建参数,其默认值也为2,一般很少去改变它。所以这里_minSplitsPerRDD的值根本就是2了。

下面我们跟到HadoopRDD类里,去看看它的partitions是若何来的

大年夜膳绫擎可以看到,splitSize是大年夜 computeSplitSize(goalSize, minSize, blockSize) 计算来的,这三个参数我们都知道大年夜小,那么计算规矩是怎么样的呢

  1. def getPartitions: Array[Partition] = { 
  2. val jobConf = getJobConf() 
  3. // add the credentials here as this can be called before SparkContext initialized 
  4. SparkHadoopUtil.get.addCredentials(jobConf) 
  5. // inputFormat就是膳绫擎参数inputFormatClass所设备的类的实例 
  6. val inputFormat = getInputFormat(jobConf) 
  7. // 此处获取FileSplit数,minPartitions就是膳绫擎的_minSplitsPerRDD 
  8. val inputSplits = inputFormat.getSplits(jobConf, minPartitions) 
  9. val array = new Array[Partition](inputSplits.size
  10. // 大年夜这里可以看出FileSplit数决定了Spark扫描Hive表的partition数 
  11. for (i <- 0 until inputSplits.size) { 
  12. array(i) = new HadoopPartition(id, i, inputSplits(i)) 
  13. array 

分两步来看,起首是扫描文件,计算文件大年夜小的部分

  1. FileStatus[] files = listStatus(job); 
  2.  1/4    1 2 3 4 下一页 尾页

      推荐阅读

      7nm工艺!AMD Navi显卡现身:性能大提升

    【限时免费】岁尾最强一次云计算大年夜会,看传统、社区、互联网企业若何碰撞? 之后是7nm的Navi(仙后座),再之后是基于7nm+改进版的下下代架构。据ComputerBase报道,代号GFX10的芯片近>>>详细阅读


    本文标题:从源码看Spark读取Hive表数据小文件和分块的问题

    地址:http://www.17bianji.com/lsqh/40042.html

关键词: 探索发现

乐购科技部分新闻及文章转载自互联网,供读者交流和学习,若有涉及作者版权等问题请及时与我们联系,以便更正、删除或按规定办理。感谢所有提供资讯的网站,欢迎各类媒体与乐购科技进行文章共享合作。

网友点评
自媒体专栏

评论

热度

精彩导读
栏目ID=71的表不存在(操作类型=0)