草庐IT

hadoop作业分片处理以及任务本地性分析(源码分析第一篇)

zengzhaozheng 2023-03-28 原文

(一) Map输入数据块的切分算法(基于hadoop源码 1.0.1):
 (1)分片算法

    MapTask的个数据主要取决于InputFormat通过对输入数据调用getSplit()方法分割为若干个分片数据,即InputSplit数。

hadoop中切片大小主要由以下几个因素:

blockSize:块大小

minSize:最小分片大小,由参数mapred.min.split.size设置,默认为1

maxSize:最大分片大小,由参数mapred.max.split.size设置,默认Long.MAX-VALUE

   分片大小为:SplitSize=Math.max(minSize, Math.min(maxSize, blockSize));

/**    * Generate the list of files and make them into FileSplits.    */   public List<InputSplit> getSplits(JobContext job                                     ) throws IOException {     long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));     long maxSize = getMaxSplitSize(job);     // generate splits     List<InputSplit> splits = new ArrayList<InputSplit>();     List<FileStatus>files = listStatus(job);     for (FileStatus file: files) {       Path path = file.getPath();       FileSystem fs = path.getFileSystem(job.getConfiguration());       long length = file.getLen();       BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);       if ((length != 0) && isSplitable(job, path)) {         long blockSize = file.getBlockSize();         long splitSize = computeSplitSize(blockSize, minSize, maxSize);         long bytesRemaining = length;         while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {           int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);           splits.add(new FileSplit(path, length-bytesRemaining, splitSize,                                    blkLocations[blkIndex].getHosts()));           bytesRemaining -= splitSize;         }                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                        if (bytesRemaining != 0) {           splits.add(new FileSplit(path, length-bytesRemaining, bytesRemaining,                      blkLocations[blkLocations.length-1].getHosts()));         }       } else if (length != 0) {         splits.add(new FileSplit(path, 0, length, blkLocations[0].getHosts()));       } else {         //Create empty hosts array for zero length files         splits.add(new FileSplit(path, 0, length, new String[0]));       }     }                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                // Save the number of input files in the job-conf     job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());     LOG.debug("Total # of splits: " + splits.size());     return splits;   }  我们看上面代码的第18行,computeSplitSize(blockSize, minSize, maxSize)方法,进入瞧瞧:

protected long computeSplitSize(long blockSize, long minSize,                                 long maxSize) {   return Math.max(minSize, Math.min(maxSize, blockSize)); }  minSize、maxSize、blockSize和上面说明的是一致的,到这里切片的大小计算就相当清楚了。另外,hadoop还维护了一个相当重要的数据结构用于保存每一个map输入文件的切片信息(FileSplit类),其结构:

/** A section of an input file.  Returned by {@link  * InputFormat#getSplits(JobContext)} and passed to  * {@link InputFormat#createRecordReader(InputSplit,TaskAttemptContext)}. */ public class FileSplit extends InputSplit implements Writable {   private Path file;//InputSplit所在的文件   private long start;//起始位置   private long length;//InputSplit长度   private String[] hosts;//inputSplit所在节点列表(此属性关乎到任务的本地性)             .             .             . }

(二) host选择算法以及任务本地性分析    在InputSplit的切分算法确定,接下来确定每个InputSplit的元数据信息。元数据信息主要由4部分组成:

<file,start,length,hosts>,也就是上面的FileSplit对应的结构,其实hadoop在提交作业的时候会将FileSplit的元数据信息映射为2部分,分别是InputSplit元数据信息和原始InputSplit信息。其中元数据信息被JobTracker用于生成Task本地性相关数据结构,而原始InputSplit信息是在Map Task初始化的时候使用的,以被map获取到自己要进行处理的数据,他们分别对应的文件为jobMetainfo和job.split。在50070页面中我们可以看到(${mapreduce.jobtracker.staging.root.dir}/${user}/.staging/${jobId}):

继续InputSplit结构,前3项非常容易确定,但是hosts的选择策略就没有那么容易了,它关系到任务的本地性问题能够直接地影响作业的运行效率和网络的消耗。

   由于Hadoop在HDFS中上的文件时以block为单位的,那么一个大文件在HDFS上可能对应的多个block分散遍布各个节点上,而从hadoop的文件切片算法来看,一个InputSplit可能对应着多个block分布在不同的节点上(包含block副本),因此hadoop不可能实现完全的数据本地性。所以hadoop按照本地性的高低划分为3个等级:node locality、rack locality和dataCenter locality(hadoop暂时没有实现)。

   InputSplit对应的block可能分布在不同的节点上,那么是否应该将所有block所有的节点都放到hosts列表中,作为任务本地性的判断标准呢?举个例子,例如有一个切片P1,其对应着5个block分布在集群中的5个节点中。其中5个节点包含该切片的数据量分别是:500(slave1)、400(slave2)、100(salve3)、50(slave4)、50(slave5)(去掉重复的副本),如果将这5个节点都放到hosts列表作为任务本地性的判断标准的话,那么可能会出现这样的情况:当slave5有空闲的slot时,通过心跳包发送给jobTracker,请求任务分配。由于slave5的这个InputSplit的列表中,因此jobTracker将salve5视为该InputSplit的本地节点,创建Map task任务。很明显可以看出slave5只包含了改InputSplit 9%左右的数据量,其他91%的数据需要从其他节点中下载,本性性的效率十分低下。

   因此,hadoop考虑到这种任务调度的效率问题,不会将InputSplit包含的block所在的所有节点放到host列表中,而是选择包含改InputSplit数据总量(统计时去掉同一个节点重复的数据)最大的前几个节点(限制最多选择10个,多余的过滤掉)作为任务调度时的本地性判断标准FileInputFormate设计了一个有效的算法:先按照rack包含的数据量对rack进行排序,然后在rack内部按照每个node包含的数据量(统计时去掉同一个节点重复的数据)对node进行排序,最后取前N个node的host作为InputSplit的host列表,这里的N为block副本数。这样,当任务调度器调度Task时,只要将Task调度给位于host列表的节点就可以实现最大效率的本地性。

   【实例】hadoop集群的网络拓扑结构如下图所示,HDFS中block副本数为3,某个InputSplit包含3个block,大小依次为100、150和75,那么很容易统计出4个rack包含该InputSplit的数据量为:rack2[250]>rack1[175]>rack3[150]>rack4[75](统计时去掉同

一个节点重复的数据),其rack内部节点包含的数据量为:rack2[node4(150)>node3(100)]>rack1[node1(175)>node2(100)]>rack3[node5(150)=node6(150)]>rack4[node7(75)=node8(75),那么依次选择的3个(block副本数)节点为node4、node3和node1作为改InputSplit的host列表。



    根据上面的分析,我们可以得出一个结论:当InputSplit尺寸大于block并且其对应的所有block(包含副本)不在同一个节点上时,Map Task不可能完全实现数据的本地化,也就是说,总有一部分数据需要从远程节点上读取,因此得出,当使用基于FileInputFormat实现InputFormat时,为了提高数据本地性,应该尽量使InputSplit大小与block大小一致。

(三) 任务本地性和多用户调度器联系多任务调度器Capacity Scheduler和Fair Scheduler在任务调度时,会根据任务的本地性而选择相应去延迟调度任务。

   (a)Capacity Scheduler 采用的作业任务延迟调度策略:

    当选择一个作业后,如果在该作业中没有找到满足本地性要求的任务,那么Capacity Scheduler调度器会让该作业跳过一定数目的调度机会,直到找到满足本地性要求的任务或者达到跳过次数上限(requiredSlots*localityWaitFactor),其中localityWaitFactor可以通过参数

mapreduce.job.locality.wait.factor配置,默认情况下localityWaitFactor=min{jobNodes/clusterNodes,1},其中

                jobNodes表示:该作业输入数据所在节点总数。

                clusterNodes表示:集群节点总数。

requiredSlots=min{(numMapTasks-finishedMapTask),numTaskTrackers},其中

                numMapTasks表示:该作业的MapTask数目

                finishedMapTask表示:改作业已经完成的MapTask数目

                numTaskTrackers表示:taskTracker数目


(b)Fair Scheduler 采用的作业任务延迟调度策略:

    当出现一个空闲slot时,如果选中的作业中没有node-local或者rack-local任务,则暂时把资源让给其他作业,直到

找到一个满足数据本地性的任务或者达到一个时间阀值,此时不得不选在一个非本地性的任务执行。



---------------------------------------hadoop源码分析系列------------------------------------------------------------------------------------------------------------hadoop作业分片处理以及任务本地性分析(源码分析第一篇)

hadoop作业提交过程分析(源码分析第二篇)

hadoop作业初始化过程详解(源码分析第三篇)

JobTracker之作业恢复与权限管理机制(源码分析第四篇)

JobTracker之辅助线程和对象映射模型分析(源码分析第五篇)

---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------参考文献:

[1]《Hadoop技术内幕:深入解析MapReduce架构设计与实现原理》

[2] http://hadoop.apache.org/



有关hadoop作业分片处理以及任务本地性分析(源码分析第一篇)的更多相关文章

  1. ruby - 其他文件中的 Rake 任务 - 2

    我试图在一个项目中使用rake,如果我把所有东西都放到Rakefile中,它会很大并且很难读取/找到东西,所以我试着将每个命名空间放在lib/rake中它自己的文件中,我添加了这个到我的rake文件的顶部:Dir['#{File.dirname(__FILE__)}/lib/rake/*.rake'].map{|f|requiref}它加载文件没问题,但没有任务。我现在只有一个.rake文件作为测试,名为“servers.rake”,它看起来像这样:namespace:serverdotask:testdoputs"test"endend所以当我运行rakeserver:testid时

  2. ruby - 什么是填充的 Base64 编码字符串以及如何在 ruby​​ 中生成它们? - 2

    我正在使用的第三方API的文档状态:"[O]urAPIonlyacceptspaddedBase64encodedstrings."什么是“填充的Base64编码字符串”以及如何在Ruby中生成它们。下面的代码是我第一次尝试创建转换为Base64的JSON格式数据。xa=Base64.encode64(a.to_json) 最佳答案 他们说的padding其实就是Base64本身的一部分。它是末尾的“=”和“==”。Base64将3个字节的数据包编码为4个编码字符。所以如果你的输入数据有长度n和n%3=1=>"=="末尾用于填充n%

  3. ruby - 如何指定 Rack 处理程序 - 2

    Rackup通过Rack的默认处理程序成功运行任何Rack应用程序。例如:classRackAppdefcall(environment)['200',{'Content-Type'=>'text/html'},["Helloworld"]]endendrunRackApp.new但是当最后一行更改为使用Rack的内置CGI处理程序时,rackup给出“NoMethodErrorat/undefinedmethod`call'fornil:NilClass”:Rack::Handler::CGI.runRackApp.newRack的其他内置处理程序也提出了同样的反对意见。例如Rack

  4. ruby - 使用 C 扩展开发 ruby​​gem 时,如何使用 Rspec 在本地进行测试? - 2

    我正在编写一个包含C扩展的gem。通常当我写一个gem时,我会遵循TDD的过程,我会写一个失败的规范,然后处理代码直到它通过,等等......在“ext/mygem/mygem.c”中我的C扩展和在gemspec的“扩展”中配置的有效extconf.rb,如何运行我的规范并仍然加载我的C扩展?当我更改C代码时,我需要采取哪些步骤来重新编译代码?这可能是个愚蠢的问题,但是从我的gem的开发源代码树中输入“bundleinstall”不会构建任何native扩展。当我手动运行rubyext/mygem/extconf.rb时,我确实得到了一个Makefile(在整个项目的根目录中),然后当

  5. ruby - 如何使用 RSpec::Core::RakeTask 创建 RSpec Rake 任务? - 2

    如何使用RSpec::Core::RakeTask初始化RSpecRake任务?require'rspec/core/rake_task'RSpec::Core::RakeTask.newdo|t|#whatdoIputinhere?endInitialize函数记录在http://rubydoc.info/github/rspec/rspec-core/RSpec/Core/RakeTask#initialize-instance_method没有很好的记录;它只是说:-(RakeTask)initialize(*args,&task_block)AnewinstanceofRake

  6. ruby - 是否可以覆盖 gemfile 进行本地开发? - 2

    我们的git存储库中目前有一个Gemfile。但是,有一个gem我只在我的环境中本地使用(我的团队不使用它)。为了使用它,我必须将它添加到我们的Gemfile中,但每次我checkout到我们的master/dev主分支时,由于与跟踪的gemfile冲突,我必须删除它。我想要的是类似Gemfile.local的东西,它将继承从Gemfile导入的gems,但也允许在那里导入新的gems以供使用只有我的机器。此文件将在.gitignore中被忽略。这可能吗? 最佳答案 设置BUNDLE_GEMFILE环境变量:BUNDLE_GEMFI

  7. 【鸿蒙应用开发系列】- 获取系统设备信息以及版本API兼容调用方式 - 2

    在应用开发中,有时候我们需要获取系统的设备信息,用于数据上报和行为分析。那在鸿蒙系统中,我们应该怎么去获取设备的系统信息呢,比如说获取手机的系统版本号、手机的制造商、手机型号等数据。1、获取方式这里分为两种情况,一种是设备信息的获取,一种是系统信息的获取。1.1、获取设备信息获取设备信息,鸿蒙的SDK包为我们提供了DeviceInfo类,通过该类的一些静态方法,可以获取设备信息,DeviceInfo类的包路径为:ohos.system.DeviceInfo.具体的方法如下:ModifierandTypeMethodDescriptionstatic StringgetAbiList​()Obt

  8. UE4 源码阅读:从引擎启动到Receive Begin Play - 2

    一、引擎主循环UE版本:4.27一、引擎主循环的位置:Launch.cpp:GuardedMain函数二、、GuardedMain函数执行逻辑:1、EnginePreInit:加载大多数模块int32ErrorLevel=EnginePreInit(CmdLine);PreInit模块加载顺序:模块加载过程:(1)注册模块中定义的UObject,同时为每个类构造一个类默认对象(CDO,记录类的默认状态,作为模板用于子类实例创建)(2)调用模块的StartUpModule方法2、FEngineLoop::Init()1、检查Engine的配置文件找出使用了哪一个GameEngine类(UGame

  9. 阿里云国际版免费试用:如何注册以及注意事项 - 2

    作为新的阿里云用户,您可以50免费试用多种优惠,价值高达1,700美元(或8,500美元)。这将让您了解和体验阿里云平台上提供的一系列产品和服务。如果您以个人身份注册免费试用,您将获得价值1,700美元的优惠。但是,如果您是注册公司,您可以选择企业免费试用,提交基本信息通过企业实名注册验证,即可开始价值$8,500的免费试用!本教程介绍了如何设置您的帐户并使用您的免费试用版。​关于免费试用在我们开始此试用之前,您还必须遵守以下条款和条件才能访问您的免费试用:只有在一年内创建的账户才有资格获得阿里云免费试用。通过此免费试用优惠,用户可以免费试用免费试用活动页面上列出的每种产品一次。如果您有多个帐

  10. hadoop安装之保姆级教程(二)之YARN的配置 - 2

    1.1.1 YARN的介绍 为克服Hadoop1.0中HDFS和MapReduce存在的各种问题⽽提出的,针对Hadoop1.0中的MapReduce在扩展性和多框架⽀持⽅⾯的不⾜,提出了全新的资源管理框架YARN. ApacheYARN(YetanotherResourceNegotiator的缩写)是Hadoop集群的资源管理系统,负责为计算程序提供服务器计算资源,相当于⼀个分布式的操作系统平台,⽽MapReduce等计算程序则相当于运⾏于操作系统之上的应⽤程序。 YARN被引⼊Hadoop2,最初是为了改善MapReduce的实现,但是因为具有⾜够的通⽤性,同样可以⽀持其他的分布式计算模

随机推荐