草庐IT

hadoop作业初始化过程详解(源码分析第三篇)

zengzhaozheng 2023-03-28 原文
(一)概述

   我们在上一篇blog已经详细的分析了一个作业从用户输入提交命令到到达JobTracker之前的各个过程。在作业到达JobTracker之后初始化之前,JobTracker会通过submitJob方法,为每个作业都创建一个JobInProgress对象(本文以后简称JIP),用于维护作业的运行时信息以及监控正在运行作业的运行状态和进度。然后检查提交作业的用户是否具有指定队列的作业提交权限,接着检查用户提交作业时候的自己配置的内存量情况是否在管理员的配置范围内,具体点说就是用户提交作业时,配置的mapred.job.map.memory.mb和mapred.job.reduce.memory.mb不能超过管理员配置的

mapred.cluster.max.map.memory.mb和mapred.cluster.max.reduce.memory.mb,一旦提交作业就会直接失败。最后JobTracker通知TaskScheduler初始化作业,作业初始化这一步是我们本文讨论的内容。

(二)作业初始化的时机以及框架调度流程

(1)作业初始化时机

   当JobTracker收到用户提交的作业后,不会马上为其进行初始化而是会交给任务调度器调用EagerTaskInitializationListener对作业进行初始化操作。JobTracker接收到新提交的任务后不对其马上进行初始化是因为考虑到几方面的因素:


  • 从资源角度上考虑

    作业一旦被初始化了就一定会占用一定量的内存资源,如果大量已被初始化的作业在队列中而暂时得不到调度的话,那么会造成不必要内存资源的浪费。因此Hadoop会按照一定可行策略来对作业进行初始化操作以节省内存资源。


  • 从设计合理性

    任务调度器的作用是根据每个节点的资源使用情况以及本地性要求为节点分配最适合的任务,而只有被初始化后的作业才能够被任务调度器调度,这里有一个先后过程,因此将作业的初始化嵌套在任务调度中是一种比较合理的设计方法。


(2)框架调度流程

   Hadoop任务调度器的内部实现都是实现了TaskScheduler接口,具有热插拔性。Hadoop的默认任务调度器是JobQueueTaskScheduler(FIFO),管理员可以通过mapred.jobtracker.taskScheduler参数进行配置,其他比较常用的任务调度器还有Fair Scheduler和Capacity Scheduler。

   JobTracker和TaskScheduler之前采用了“观察者模式”,JobTracker作业主题会将“新增作业、更新作业状态、kill掉作业”等命令通知“观察者”TaskScheduler中的EagerTaskInitializationListener和JobQueueJobInProgressListener。其整体调用框架如下图:

用户提交作业到JobTracker之后,通知任务调度器新增作业的代码定位到JobTracker类的addJob(JobID jobId, JobInProgress job)方法,代码如下:

/**   * Adds a job to the jobtracker. Make sure that the checks are inplace before   * adding a job. This is the core job submission logic   * @param jobId The id for the job submitted which needs to be added   */  private synchronized JobStatus addJob(JobID jobId, JobInProgress job)  throws IOException {    totalSubmissions++;    synchronized (jobs) {      synchronized (taskScheduler) {        jobs.put(job.getProfile().getJobID(), job);        for (JobInProgressListener listener : jobInProgressListeners) {          listener.jobAdded(job);//主要由EagerTaskInitializationListener和JobQueueJobInProgressListener来进行实现        }      }    }    ......  }上面代码会逐个向已经在JobTracker上注册的监听器发放“新增作业”通知,其中listener的jobAdded(job)方法主要由EagerTaskInitializationListener和JobQueueJobInProgressListener来进行实现。

  • EagerTaskInitializationListener中的实现,其主要功能是对将作业添加到初始化队列以及对作业进行排序,代码如下:

/**  * We add the JIP to the jobInitQueue, which is processed  * asynchronously to handle split-computation and build up  * the right TaskTracker/Block mapping.  */ @Override public void jobAdded(JobInProgress job) {   synchronized (jobInitQueue) {     jobInitQueue.add(job);//将作业添加到作业初始化队列中     resortInitQueue();//根据作业的优先级和提交时间对作业进行排序     jobInitQueue.notifyAll();   } }
  • JobQueueJobInProgressListener中的实现,其主要功能是将作业添加到作业列表当中。

public void jobAdded(JobInProgress job) {   jobQueue.put(new JobSchedulingInfo(job.getStatus()), job); }   其中JobSchedulingInfo主要维护了3个变量信息:

private JobPriority priority; private long startTime; private JobID id;(三)作业初始化详细过程分析

任务调度器中的EagerTaskInitializationListener将作业的初始化封装为线程,并且放到了固定大小的线程池(FixedThreadPool)中,其线程池的数量由mapred.jobinit.threads配置,默认大小为4。其线程结构代码如下:

class InitJob implements Runnable {   private JobInProgress job;                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                               public InitJob(JobInProgress job) {     this.job = job;   }                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                               public void run() {     ttm.initJob(job);   } }变量ttm对象为TaskTrackerManager接口,其实是由JobTracker来实现的,话句话说就是调用了JobTracker里面的initJob方法对作业进行初始化操作的。JobTracker中的initJob方法主要做了2件事情。第一件:构造和初始化Map Task和Reduce Task任务(接下来主要分析的环节),第二件:发通知给任务调度器去更新作业当前状态。大体代码如下:

public void initJob(JobInProgress job) {     if (null == job) {       LOG.info("Init on null job is not valid");       return;     }                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                      try {       JobStatus prevStatus = (JobStatus)job.getStatus().clone();       LOG.info("Initializing " + job.getJobID());       job.initTasks();//构造和初始化Map Task和Reduce Task任务       // Inform the listeners if the job state has changed       // Note : that the job will be in PREP state.       JobStatus newStatus = (JobStatus)job.getStatus().clone();       if (prevStatus.getRunState() != newStatus.getRunState()) {         JobStatusChangeEvent event =           new JobStatusChangeEvent(job, EventType.RUN_STATE_CHANGED, prevStatus,               newStatus);         synchronized (JobTracker.this) {           updateJobInProgressListeners(event);//发通知给任务调度器去更新作业当前状态         }       }     }(1)MapTask和ReduceTask任务的构造和初始

   所有Task的普通信息以及运行状态都封装在TaskInProgress类,并且有其对Task进行状态的更新和管理,因此创建TaskInProgress(简称TIP)类其实相当于创建了Task任务。Hadoop中Task任务可以分为Map Task、Reduce Task、Setup Task、Cleanup Task,下面主要分析这几种Task的初始化过程。

  • 第一步,读取InputSplit元数据文件job.splitmetainfo,还原成为描述InputSplit具体信息(上一篇blog已经详细说过)的TaskSplitMetaInfo对象,详细代码可以定位到JobInProgress类的initTasks方法(主要负责任务的初始化工作),还原对象的代码如下:

//    // read input splits and create a map per a split    //    TaskSplitMetaInfo[] splits = createSplits(jobId);//还原TaskSplitMetaInfo对象    if (numMapTasks != splits.length) {      throw new IOException("Number of maps in JobConf doesn't match number of " +            "recieved splits for job " + jobId + "! " +            "numMapTasks=" + numMapTasks + ", #splits=" + splits.length);    }    numMapTasks = splits.length;    // Sanity check the locations so we don't create/initialize unnecessary tasks    for (TaskSplitMetaInfo split : splits) {      NetUtils.verifyHostnames(split.getLocations());    }
  • 第二步,根据第一步还原的TaskSplitMetaInfo对象信息创建InputSplit.length个TIP对应Map Task,具体代码如下:

maps = new TaskInProgress[numMapTasks];    for(int i=0; i < numMapTasks; ++i) {      inputLength += splits[i].getInputDataLength();      maps[i] = new TaskInProgress(jobId, jobFile,                                   splits[i],                                   jobtracker, conf, this, i, numSlotsPerMap);    }
  • 第三步,创建mapred.reduce.tasks(默认配置为1)个TIP对应Reduce Task,另外需要注意的是,Reduce Task虽然已经被初始化了,但是不会立即得到调用的,因为它的输入依赖于Map Task的输出,它要等待Map Task完成一定的比例数目才会得到调用,此完成数目比例由参数mappred.reduce.completed.maps配置,默认为0.05。

......    // Create reduce tasks    //    this.reduces = new TaskInProgress[numReduceTasks];//创建Reduce Task    for (int i = 0; i < numReduceTasks; i++) {      reduces[i] = new TaskInProgress(jobId, jobFile,                                      numMapTasks, i,                                      jobtracker, conf, this, numSlotsPerReduce);      nonRunningReduces.add(reduces[i]);//将新建的Reudce Task放到没有运行的Reduce Task列表中    } // Calculate the minimum number of maps to be complete before     // we should start scheduling reduces     completedMapsForReduceSlowstart = //设定Reduce Task 启动的时机参数       (int)Math.ceil(           (conf.getFloat("mapred.reduce.slowstart.completed.maps",                          DEFAULT_COMPLETED_MAPS_PERCENT_FOR_REDUCE_SLOWSTART) *            numMapTasks));
  • 第四步,创建2个TIP分别对应Map Cleanup Task和Reduce Cleanup Task,其作用是清除任务运行之后产生的临时文件信息,他们运行完之后整个作业的状态会由原先的RUNNIN状态变为SUCCEEDED状态,具体代码如下:

// create cleanup two cleanup tips, one map and one reduce.    cleanup = new TaskInProgress[2];    // cleanup map tip. This map doesn't use any splits. Just assign an empty    // split.    TaskSplitMetaInfo emptySplit = JobSplit.EMPTY_TASK_SPLIT;    cleanup[0] = new TaskInProgress(jobId, jobFile, emptySplit,            jobtracker, conf, this, numMapTasks, 1);    cleanup[0].setJobCleanupTask();    // cleanup reduce tip.    cleanup[1] = new TaskInProgress(jobId, jobFile, numMapTasks,                       numReduceTasks, jobtracker, conf, this, 1);    cleanup[1].setJobCleanupTask();
  • 第五步,创建2个TIP分别对应Map Setup Task和Reduce Setup Task,其作用就是做一些很简单的初始化工作,比如建临时目录等,其具体实现代码在FileOutputCommitter类的setupJob(JobContext context)方法中,如下:

    public void setupJob(JobContext context) throws IOException {     JobConf conf = context.getJobConf();     Path outputPath = FileOutputFormat.getOutputPath(conf);     if (outputPath != null) {       Path tmpDir = new Path(outputPath, FileOutputCommitter.TEMP_DIR_NAME);       FileSystem fileSys = tmpDir.getFileSystem(conf);       if (!fileSys.mkdirs(tmpDir)) {         LOG.error("Mkdirs failed to create " + tmpDir.toString());       }     }   }当此任务完成之后整个作业的状态会由PREP变成RUNNING。

特殊说明:

(1)由上面代码我们可以看出当Setup/Cleanup Task创建时,放的是一个空的InputSplit(e   mptySplit),是因为这2中任务不负责对InputSplit数据进行处理的。其任务状态只有“开始”和“结束”,进度只有0%和100%。

(2)Map/Reduce Setup Task运行时分别占用一个Map slot和一个Reduce slot,但是Hadoop认为他们功能是类似的,他们不能同时运行,话句话说Map Setup Task开始运行就会将Reudce Setup Task杀死,相反Reduce Setup Task开始运行时就会将Map Setup Task杀死。

(四)作业初始化详细过程分析总结

   这里要特别说明一下,上面的图是我按照JobInProgress类的顺序逐步分析的任务初始化结果,但是任务初始化过程的顺序和运行的顺序是不同的。Task运行的过程是这样的:start->TaskInProgress(Setup Task)->TaskInProgress(Map/Reduce Task)->TaskInProgress(CleanUp Task)。


---------------------------------------hadoop源码分析系列------------------------------------------------------------------------------------------------------------hadoop作业分片处理以及任务本地性分析(源码分析第一篇)

hadoop作业提交过程分析(源码分析第二篇)

hadoop作业初始化过程详解(源码分析第三篇)

JobTracker之作业恢复与权限管理机制(源码分析第四篇)

JobTracker之辅助线程和对象映射模型分析(源码分析第五篇)

---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

参考文献:

[1]《Hadoop技术内幕:深入解析MapReduce架构设计与实现原理》

[2] http://hadoop.apache.org/


有关hadoop作业初始化过程详解(源码分析第三篇)的更多相关文章

  1. ruby-on-rails - 未初始化的常量 Psych::Syck (NameError) - 2

    在我的gem中,我需要yaml并且在我的本地计算机上运行良好。但是在将我的gem推送到ruby​​gems.org之后,当我尝试使用我的gem时,我收到一条错误消息=>"uninitializedconstantPsych::Syck(NameError)"谁能帮我解决这个问题?附言RubyVersion=>ruby1.9.2,GemVersion=>1.6.2,Bundlerversion=>1.0.15 最佳答案 经过几个小时的研究,我发现=>“YAML使用未维护的Syck库,而Psych使用现代的LibYAML”因此,为了解决

  2. ruby-on-rails - 未在 Ruby 中初始化的对象 - 2

    我在Rails工作并有以下类(class):classPlayer当我运行时bundleexecrailsconsole然后尝试:a=Player.new("me",5.0,"UCLA")我回来了:=>#我不知道为什么Player对象不会在这里初始化。关于可能导致此问题的操作/解释的任何建议?谢谢,马里奥格 最佳答案 havenoideawhythePlayerobjectwouldn'tbeinitializedhere它没有初始化很简单,因为你还没有初始化它!您已经覆盖了ActiveRecord::Base初始化方法,但您没有调

  3. ruby-on-rails - ActionController::RoutingError: 未初始化常量 Api::V1::ApiController - 2

    我有用于控制用户任务的Rails5API项目,我有以下错误,但并非总是针对相同的Controller和路由。ActionController::RoutingError:uninitializedconstantApi::V1::ApiController我向您描述了一些我的项目,以更详细地解释错误。应用结构路线scopemodule:'api'donamespace:v1do#=>Loginroutesscopemodule:'login'domatch'login',to:'sessions#login',as:'login',via::postend#=>Teamroutessc

  4. ruby - 这两个 Ruby 类初始化定义有什么区别? - 2

    我正在阅读一本关于Ruby的书,作者在编写类初始化定义时使用的形式与他在本书前几节中使用的形式略有不同。它看起来像这样:classTicketattr_accessor:venue,:datedefinitialize(venue,date)self.venue=venueself.date=dateendend在本书的前几节中,它的定义如下:classTicketattr_accessor:venue,:datedefinitialize(venue,date)@venue=venue@date=dateendend在第一个示例中使用setter方法与在第二个示例中使用实例变量之间是

  5. UE4 源码阅读:从引擎启动到Receive Begin Play - 2

    一、引擎主循环UE版本:4.27一、引擎主循环的位置:Launch.cpp:GuardedMain函数二、、GuardedMain函数执行逻辑:1、EnginePreInit:加载大多数模块int32ErrorLevel=EnginePreInit(CmdLine);PreInit模块加载顺序:模块加载过程:(1)注册模块中定义的UObject,同时为每个类构造一个类默认对象(CDO,记录类的默认状态,作为模板用于子类实例创建)(2)调用模块的StartUpModule方法2、FEngineLoop::Init()1、检查Engine的配置文件找出使用了哪一个GameEngine类(UGame

  6. hadoop安装之保姆级教程(二)之YARN的配置 - 2

    1.1.1 YARN的介绍 为克服Hadoop1.0中HDFS和MapReduce存在的各种问题⽽提出的,针对Hadoop1.0中的MapReduce在扩展性和多框架⽀持⽅⾯的不⾜,提出了全新的资源管理框架YARN. ApacheYARN(YetanotherResourceNegotiator的缩写)是Hadoop集群的资源管理系统,负责为计算程序提供服务器计算资源,相当于⼀个分布式的操作系统平台,⽽MapReduce等计算程序则相当于运⾏于操作系统之上的应⽤程序。 YARN被引⼊Hadoop2,最初是为了改善MapReduce的实现,但是因为具有⾜够的通⽤性,同样可以⽀持其他的分布式计算模

  7. ruby - 为什么当我调用类的实例方法时,初始化不显示为方法? - 2

    我正在写一篇关于在Ruby中几乎一切都是对象的博客文章,我试图通过以下示例来展示这一点:classCoolBeansattr_accessor:beansdefinitialize@bean=[]enddefcount_beans@beans.countendend所以从类中我们可以看出它有4个方法(当然,除非我错了):它可以在创建新实例时初始化一个默认的空bean数组它可以计算它有多少个bean它可以读取它有多少个bean(通过attr_accessor)它可以向空数组写入(或添加)更多bean(也通过attr_accessor)但是,当我询问类本身它有哪些实例方法时,我没有看到默认

  8. ruby-on-rails - 为什么在 Rails 5.1.1 中删除了 session 存储初始化程序 - 2

    我去了这个website查看Rails5.0.0和Rails5.1.1之间的区别为什么5.1.1不再包含:config/initializers/session_store.rb?谢谢 最佳答案 这是删除它的提交:Setupdefaultsessionstoreinternally,nolongerthroughanapplicationinitializer总而言之,新应用没有该初始化器,session存储默认设置为cookie存储。即与在该初始值设定项的生成版本中指定的值相同。 关于

  9. ruby-on-rails - 在所有延迟的作业之前 Hook - 2

    是否可以在所有delayed_job任务之前运行一个方法?基本上,我们试图确保每个运行delayed_job的服务器都有我们代码的最新实例,所以我们想运行一个方法来在每个作业运行之前检查它。(我们已经有了“check”方法并在别处使用它。问题只是关于如何从delayed_job中调用它。) 最佳答案 现在有一种官方方法可以通过插件来做到这一点。这篇博文通过示例清楚地描述了如何执行此操作http://www.salsify.com/blog/delayed-jobs-callbacks-and-hooks-in-rails(本文中描述

  10. ruby-on-rails - NameError(未初始化常量 Unzipper::Zip)但仅在 Heroku 部署(Rails)上 - 2

    我有一个类unzipper.rb,它使用Rubyzip解压文件。在我的本地环境中,我可以成功解压缩文件,而无需使用require'zip'明确包含依赖项但是在Heroku上,我得到一个NameError(uninitializedconstantUnzipper::Zip)我只能通过使用明确的require来解决问题:为什么这在H​​eroku环境中是必需的,但在本地主机上却不是?我的印象是Rails自动需要所有gem。app/services/unzipper.rbrequire'zip'#OnlyrequiredforHeroku.Workslocallywithout!class

随机推荐