|
用户提交作业到JobTracker之后,通知任务调度器新增作业的代码定位到JobTracker类的addJob(JobID jobId, JobInProgress job)方法,代码如下:/**
* Adds a job to the jobtracker. Make sure that the checks are inplace before
* adding a job. This is the core job submission logic
* @param jobId The id for the job submitted which needs to be added
*/
private synchronized JobStatus addJob(JobID jobId, JobInProgress job)
throws IOException {
totalSubmissions++;
synchronized (jobs) {
synchronized (taskScheduler) {
jobs.put(job.getProfile().getJobID(), job);
for (JobInProgressListener listener : jobInProgressListeners) {
listener.jobAdded(job);//主要由EagerTaskInitializationListener和JobQueueJobInProgressListener来进行实现
}
}
}
......
}上面代码会逐个向已经在JobTracker上注册的监听器发放“新增作业”通知,其中listener的jobAdded(job)方法主要由EagerTaskInitializationListener和JobQueueJobInProgressListener来进行实现。/**
* We add the JIP to the jobInitQueue, which is processed
* asynchronously to handle split-computation and build up
* the right TaskTracker/Block mapping.
*/
@Override
public void jobAdded(JobInProgress job) {
synchronized (jobInitQueue) {
jobInitQueue.add(job);//将作业添加到作业初始化队列中
resortInitQueue();//根据作业的优先级和提交时间对作业进行排序
jobInitQueue.notifyAll();
}
}public void jobAdded(JobInProgress job) {
jobQueue.put(new JobSchedulingInfo(job.getStatus()), job);
} 其中JobSchedulingInfo主要维护了3个变量信息:private JobPriority priority;
private long startTime;
private JobID id;(三)作业初始化详细过程分析任务调度器中的EagerTaskInitializationListener将作业的初始化封装为线程,并且放到了固定大小的线程池(FixedThreadPool)中,其线程池的数量由mapred.jobinit.threads配置,默认大小为4。其线程结构代码如下:class InitJob implements Runnable {
private JobInProgress job;
public InitJob(JobInProgress job) {
this.job = job;
}
public void run() {
ttm.initJob(job);
}
}变量ttm对象为TaskTrackerManager接口,其实是由JobTracker来实现的,话句话说就是调用了JobTracker里面的initJob方法对作业进行初始化操作的。JobTracker中的initJob方法主要做了2件事情。第一件:构造和初始化Map Task和Reduce Task任务(接下来主要分析的环节),第二件:发通知给任务调度器去更新作业当前状态。大体代码如下:public void initJob(JobInProgress job) {
if (null == job) {
LOG.info("Init on null job is not valid");
return;
}
try {
JobStatus prevStatus = (JobStatus)job.getStatus().clone();
LOG.info("Initializing " + job.getJobID());
job.initTasks();//构造和初始化Map Task和Reduce Task任务
// Inform the listeners if the job state has changed
// Note : that the job will be in PREP state.
JobStatus newStatus = (JobStatus)job.getStatus().clone();
if (prevStatus.getRunState() != newStatus.getRunState()) {
JobStatusChangeEvent event =
new JobStatusChangeEvent(job, EventType.RUN_STATE_CHANGED, prevStatus,
newStatus);
synchronized (JobTracker.this) {
updateJobInProgressListeners(event);//发通知给任务调度器去更新作业当前状态
}
}
}(1)MapTask和ReduceTask任务的构造和初始 所有Task的普通信息以及运行状态都封装在TaskInProgress类,并且有其对Task进行状态的更新和管理,因此创建TaskInProgress(简称TIP)类其实相当于创建了Task任务。Hadoop中Task任务可以分为Map Task、Reduce Task、Setup Task、Cleanup Task,下面主要分析这几种Task的初始化过程。//
// read input splits and create a map per a split
//
TaskSplitMetaInfo[] splits = createSplits(jobId);//还原TaskSplitMetaInfo对象
if (numMapTasks != splits.length) {
throw new IOException("Number of maps in JobConf doesn't match number of " +
"recieved splits for job " + jobId + "! " +
"numMapTasks=" + numMapTasks + ", #splits=" + splits.length);
}
numMapTasks = splits.length;
// Sanity check the locations so we don't create/initialize unnecessary tasks
for (TaskSplitMetaInfo split : splits) {
NetUtils.verifyHostnames(split.getLocations());
}maps = new TaskInProgress[numMapTasks];
for(int i=0; i < numMapTasks; ++i) {
inputLength += splits[i].getInputDataLength();
maps[i] = new TaskInProgress(jobId, jobFile,
splits[i],
jobtracker, conf, this, i, numSlotsPerMap);
}......
// Create reduce tasks
//
this.reduces = new TaskInProgress[numReduceTasks];//创建Reduce Task
for (int i = 0; i < numReduceTasks; i++) {
reduces[i] = new TaskInProgress(jobId, jobFile,
numMapTasks, i,
jobtracker, conf, this, numSlotsPerReduce);
nonRunningReduces.add(reduces[i]);//将新建的Reudce Task放到没有运行的Reduce Task列表中
}
// Calculate the minimum number of maps to be complete before
// we should start scheduling reduces
completedMapsForReduceSlowstart = //设定Reduce Task 启动的时机参数
(int)Math.ceil(
(conf.getFloat("mapred.reduce.slowstart.completed.maps",
DEFAULT_COMPLETED_MAPS_PERCENT_FOR_REDUCE_SLOWSTART) *
numMapTasks));// create cleanup two cleanup tips, one map and one reduce.
cleanup = new TaskInProgress[2];
// cleanup map tip. This map doesn't use any splits. Just assign an empty
// split.
TaskSplitMetaInfo emptySplit = JobSplit.EMPTY_TASK_SPLIT;
cleanup[0] = new TaskInProgress(jobId, jobFile, emptySplit,
jobtracker, conf, this, numMapTasks, 1);
cleanup[0].setJobCleanupTask();
// cleanup reduce tip.
cleanup[1] = new TaskInProgress(jobId, jobFile, numMapTasks,
numReduceTasks, jobtracker, conf, this, 1);
cleanup[1].setJobCleanupTask();public void setupJob(JobContext context) throws IOException {
JobConf conf = context.getJobConf();
Path outputPath = FileOutputFormat.getOutputPath(conf);
if (outputPath != null) {
Path tmpDir = new Path(outputPath, FileOutputCommitter.TEMP_DIR_NAME);
FileSystem fileSys = tmpDir.getFileSystem(conf);
if (!fileSys.mkdirs(tmpDir)) {
LOG.error("Mkdirs failed to create " + tmpDir.toString());
}
}
}当此任务完成之后整个作业的状态会由PREP变成RUNNING。| 特殊说明:(1)由上面代码我们可以看出当Setup/Cleanup Task创建时,放的是一个空的InputSplit(e mptySplit),是因为这2中任务不负责对InputSplit数据进行处理的。其任务状态只有“开始”和“结束”,进度只有0%和100%。(2)Map/Reduce Setup Task运行时分别占用一个Map slot和一个Reduce slot,但是Hadoop认为他们功能是类似的,他们不能同时运行,话句话说Map Setup Task开始运行就会将Reudce Setup Task杀死,相反Reduce Setup Task开始运行时就会将Map Setup Task杀死。 |
这里要特别说明一下,上面的图是我按照JobInProgress类的顺序逐步分析的任务初始化结果,但是任务初始化过程的顺序和运行的顺序是不同的。Task运行的过程是这样的:start->TaskInProgress(Setup Task)->TaskInProgress(Map/Reduce Task)->TaskInProgress(CleanUp Task)。在我的gem中,我需要yaml并且在我的本地计算机上运行良好。但是在将我的gem推送到rubygems.org之后,当我尝试使用我的gem时,我收到一条错误消息=>"uninitializedconstantPsych::Syck(NameError)"谁能帮我解决这个问题?附言RubyVersion=>ruby1.9.2,GemVersion=>1.6.2,Bundlerversion=>1.0.15 最佳答案 经过几个小时的研究,我发现=>“YAML使用未维护的Syck库,而Psych使用现代的LibYAML”因此,为了解决
我在Rails工作并有以下类(class):classPlayer当我运行时bundleexecrailsconsole然后尝试:a=Player.new("me",5.0,"UCLA")我回来了:=>#我不知道为什么Player对象不会在这里初始化。关于可能导致此问题的操作/解释的任何建议?谢谢,马里奥格 最佳答案 havenoideawhythePlayerobjectwouldn'tbeinitializedhere它没有初始化很简单,因为你还没有初始化它!您已经覆盖了ActiveRecord::Base初始化方法,但您没有调
我有用于控制用户任务的Rails5API项目,我有以下错误,但并非总是针对相同的Controller和路由。ActionController::RoutingError:uninitializedconstantApi::V1::ApiController我向您描述了一些我的项目,以更详细地解释错误。应用结构路线scopemodule:'api'donamespace:v1do#=>Loginroutesscopemodule:'login'domatch'login',to:'sessions#login',as:'login',via::postend#=>Teamroutessc
我正在阅读一本关于Ruby的书,作者在编写类初始化定义时使用的形式与他在本书前几节中使用的形式略有不同。它看起来像这样:classTicketattr_accessor:venue,:datedefinitialize(venue,date)self.venue=venueself.date=dateendend在本书的前几节中,它的定义如下:classTicketattr_accessor:venue,:datedefinitialize(venue,date)@venue=venue@date=dateendend在第一个示例中使用setter方法与在第二个示例中使用实例变量之间是
一、引擎主循环UE版本:4.27一、引擎主循环的位置:Launch.cpp:GuardedMain函数二、、GuardedMain函数执行逻辑:1、EnginePreInit:加载大多数模块int32ErrorLevel=EnginePreInit(CmdLine);PreInit模块加载顺序:模块加载过程:(1)注册模块中定义的UObject,同时为每个类构造一个类默认对象(CDO,记录类的默认状态,作为模板用于子类实例创建)(2)调用模块的StartUpModule方法2、FEngineLoop::Init()1、检查Engine的配置文件找出使用了哪一个GameEngine类(UGame
1.1.1 YARN的介绍 为克服Hadoop1.0中HDFS和MapReduce存在的各种问题⽽提出的,针对Hadoop1.0中的MapReduce在扩展性和多框架⽀持⽅⾯的不⾜,提出了全新的资源管理框架YARN. ApacheYARN(YetanotherResourceNegotiator的缩写)是Hadoop集群的资源管理系统,负责为计算程序提供服务器计算资源,相当于⼀个分布式的操作系统平台,⽽MapReduce等计算程序则相当于运⾏于操作系统之上的应⽤程序。 YARN被引⼊Hadoop2,最初是为了改善MapReduce的实现,但是因为具有⾜够的通⽤性,同样可以⽀持其他的分布式计算模
我正在写一篇关于在Ruby中几乎一切都是对象的博客文章,我试图通过以下示例来展示这一点:classCoolBeansattr_accessor:beansdefinitialize@bean=[]enddefcount_beans@beans.countendend所以从类中我们可以看出它有4个方法(当然,除非我错了):它可以在创建新实例时初始化一个默认的空bean数组它可以计算它有多少个bean它可以读取它有多少个bean(通过attr_accessor)它可以向空数组写入(或添加)更多bean(也通过attr_accessor)但是,当我询问类本身它有哪些实例方法时,我没有看到默认
我去了这个website查看Rails5.0.0和Rails5.1.1之间的区别为什么5.1.1不再包含:config/initializers/session_store.rb?谢谢 最佳答案 这是删除它的提交:Setupdefaultsessionstoreinternally,nolongerthroughanapplicationinitializer总而言之,新应用没有该初始化器,session存储默认设置为cookie存储。即与在该初始值设定项的生成版本中指定的值相同。 关于
是否可以在所有delayed_job任务之前运行一个方法?基本上,我们试图确保每个运行delayed_job的服务器都有我们代码的最新实例,所以我们想运行一个方法来在每个作业运行之前检查它。(我们已经有了“check”方法并在别处使用它。问题只是关于如何从delayed_job中调用它。) 最佳答案 现在有一种官方方法可以通过插件来做到这一点。这篇博文通过示例清楚地描述了如何执行此操作http://www.salsify.com/blog/delayed-jobs-callbacks-and-hooks-in-rails(本文中描述
我有一个类unzipper.rb,它使用Rubyzip解压文件。在我的本地环境中,我可以成功解压缩文件,而无需使用require'zip'明确包含依赖项但是在Heroku上,我得到一个NameError(uninitializedconstantUnzipper::Zip)我只能通过使用明确的require来解决问题:为什么这在Heroku环境中是必需的,但在本地主机上却不是?我的印象是Rails自动需要所有gem。app/services/unzipper.rbrequire'zip'#OnlyrequiredforHeroku.Workslocallywithout!class