/**
* Generate the list of files and make them into FileSplits.
*/
public List<InputSplit> getSplits(JobContext job
) throws IOException {
long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
long maxSize = getMaxSplitSize(job);
// generate splits
List<InputSplit> splits = new ArrayList<InputSplit>();
List<FileStatus>files = listStatus(job);
for (FileStatus file: files) {
Path path = file.getPath();
FileSystem fs = path.getFileSystem(job.getConfiguration());
long length = file.getLen();
BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);
if ((length != 0) && isSplitable(job, path)) {
long blockSize = file.getBlockSize();
long splitSize = computeSplitSize(blockSize, minSize, maxSize);
long bytesRemaining = length;
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(new FileSplit(path, length-bytesRemaining, splitSize,
blkLocations[blkIndex].getHosts()));
bytesRemaining -= splitSize;
}
if (bytesRemaining != 0) {
splits.add(new FileSplit(path, length-bytesRemaining, bytesRemaining,
blkLocations[blkLocations.length-1].getHosts()));
}
} else if (length != 0) {
splits.add(new FileSplit(path, 0, length, blkLocations[0].getHosts()));
} else {
//Create empty hosts array for zero length files
splits.add(new FileSplit(path, 0, length, new String[0]));
}
}
// Save the number of input files in the job-conf
job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
LOG.debug("Total # of splits: " + splits.size());
return splits;
} 我们看上面代码的第18行,computeSplitSize(blockSize, minSize, maxSize)方法,进入瞧瞧:protected long computeSplitSize(long blockSize, long minSize,
long maxSize) {
return Math.max(minSize, Math.min(maxSize, blockSize));
} minSize、maxSize、blockSize和上面说明的是一致的,到这里切片的大小计算就相当清楚了。另外,hadoop还维护了一个相当重要的数据结构用于保存每一个map输入文件的切片信息(FileSplit类),其结构:/** A section of an input file. Returned by {@link
* InputFormat#getSplits(JobContext)} and passed to
* {@link InputFormat#createRecordReader(InputSplit,TaskAttemptContext)}. */
public class FileSplit extends InputSplit implements Writable {
private Path file;//InputSplit所在的文件
private long start;//起始位置
private long length;//InputSplit长度
private String[] hosts;//inputSplit所在节点列表(此属性关乎到任务的本地性)
.
.
.
}(二) host选择算法以及任务本地性分析 在InputSplit的切分算法确定,接下来确定每个InputSplit的元数据信息。元数据信息主要由4部分组成:
继续InputSplit结构,前3项非常容易确定,但是hosts的选择策略就没有那么容易了,它关系到任务的本地性问题能够直接地影响作业的运行效率和网络的消耗。 由于Hadoop在HDFS中上的文件时以block为单位的,那么一个大文件在HDFS上可能对应的多个block分散遍布各个节点上,而从hadoop的文件切片算法来看,一个InputSplit可能对应着多个block分布在不同的节点上(包含block副本),因此hadoop不可能实现完全的数据本地性。所以hadoop按照本地性的高低划分为3个等级:node locality、rack locality和dataCenter locality(hadoop暂时没有实现)。 InputSplit对应的block可能分布在不同的节点上,那么是否应该将所有block所有的节点都放到hosts列表中,作为任务本地性的判断标准呢?举个例子,例如有一个切片P1,其对应着5个block分布在集群中的5个节点中。其中5个节点包含该切片的数据量分别是:500(slave1)、400(slave2)、100(salve3)、50(slave4)、50(slave5)(去掉重复的副本),如果将这5个节点都放到hosts列表作为任务本地性的判断标准的话,那么可能会出现这样的情况:当slave5有空闲的slot时,通过心跳包发送给jobTracker,请求任务分配。由于slave5的这个InputSplit的列表中,因此jobTracker将salve5视为该InputSplit的本地节点,创建Map task任务。很明显可以看出slave5只包含了改InputSplit 9%左右的数据量,其他91%的数据需要从其他节点中下载,本性性的效率十分低下。 因此,hadoop考虑到这种任务调度的效率问题,不会将InputSplit包含的block所在的所有节点放到host列表中,而是选择包含改InputSplit数据总量(统计时去掉同一个节点重复的数据)最大的前几个节点(限制最多选择10个,多余的过滤掉)作为任务调度时的本地性判断标准FileInputFormate设计了一个有效的算法:先按照rack包含的数据量对rack进行排序,然后在rack内部按照每个node包含的数据量(统计时去掉同一个节点重复的数据)对node进行排序,最后取前N个node的host作为InputSplit的host列表,这里的N为block副本数。这样,当任务调度器调度Task时,只要将Task调度给位于host列表的节点就可以实现最大效率的本地性。 【实例】hadoop集群的网络拓扑结构如下图所示,HDFS中block副本数为3,某个InputSplit包含3个block,大小依次为100、150和75,那么很容易统计出4个rack包含该InputSplit的数据量为:rack2[250]>rack1[175]>rack3[150]>rack4[75](统计时去掉同一个节点重复的数据),其rack内部节点包含的数据量为:rack2[node4(150)>node3(100)]>rack1[node1(175)>node2(100)]>rack3[node5(150)=node6(150)]>rack4[node7(75)=node8(75),那么依次选择的3个(block副本数)节点为node4、node3和node1作为改InputSplit的host列表。
我试图在一个项目中使用rake,如果我把所有东西都放到Rakefile中,它会很大并且很难读取/找到东西,所以我试着将每个命名空间放在lib/rake中它自己的文件中,我添加了这个到我的rake文件的顶部:Dir['#{File.dirname(__FILE__)}/lib/rake/*.rake'].map{|f|requiref}它加载文件没问题,但没有任务。我现在只有一个.rake文件作为测试,名为“servers.rake”,它看起来像这样:namespace:serverdotask:testdoputs"test"endend所以当我运行rakeserver:testid时
我正在使用的第三方API的文档状态:"[O]urAPIonlyacceptspaddedBase64encodedstrings."什么是“填充的Base64编码字符串”以及如何在Ruby中生成它们。下面的代码是我第一次尝试创建转换为Base64的JSON格式数据。xa=Base64.encode64(a.to_json) 最佳答案 他们说的padding其实就是Base64本身的一部分。它是末尾的“=”和“==”。Base64将3个字节的数据包编码为4个编码字符。所以如果你的输入数据有长度n和n%3=1=>"=="末尾用于填充n%
Rackup通过Rack的默认处理程序成功运行任何Rack应用程序。例如:classRackAppdefcall(environment)['200',{'Content-Type'=>'text/html'},["Helloworld"]]endendrunRackApp.new但是当最后一行更改为使用Rack的内置CGI处理程序时,rackup给出“NoMethodErrorat/undefinedmethod`call'fornil:NilClass”:Rack::Handler::CGI.runRackApp.newRack的其他内置处理程序也提出了同样的反对意见。例如Rack
我正在编写一个包含C扩展的gem。通常当我写一个gem时,我会遵循TDD的过程,我会写一个失败的规范,然后处理代码直到它通过,等等......在“ext/mygem/mygem.c”中我的C扩展和在gemspec的“扩展”中配置的有效extconf.rb,如何运行我的规范并仍然加载我的C扩展?当我更改C代码时,我需要采取哪些步骤来重新编译代码?这可能是个愚蠢的问题,但是从我的gem的开发源代码树中输入“bundleinstall”不会构建任何native扩展。当我手动运行rubyext/mygem/extconf.rb时,我确实得到了一个Makefile(在整个项目的根目录中),然后当
如何使用RSpec::Core::RakeTask初始化RSpecRake任务?require'rspec/core/rake_task'RSpec::Core::RakeTask.newdo|t|#whatdoIputinhere?endInitialize函数记录在http://rubydoc.info/github/rspec/rspec-core/RSpec/Core/RakeTask#initialize-instance_method没有很好的记录;它只是说:-(RakeTask)initialize(*args,&task_block)AnewinstanceofRake
我们的git存储库中目前有一个Gemfile。但是,有一个gem我只在我的环境中本地使用(我的团队不使用它)。为了使用它,我必须将它添加到我们的Gemfile中,但每次我checkout到我们的master/dev主分支时,由于与跟踪的gemfile冲突,我必须删除它。我想要的是类似Gemfile.local的东西,它将继承从Gemfile导入的gems,但也允许在那里导入新的gems以供使用只有我的机器。此文件将在.gitignore中被忽略。这可能吗? 最佳答案 设置BUNDLE_GEMFILE环境变量:BUNDLE_GEMFI
在应用开发中,有时候我们需要获取系统的设备信息,用于数据上报和行为分析。那在鸿蒙系统中,我们应该怎么去获取设备的系统信息呢,比如说获取手机的系统版本号、手机的制造商、手机型号等数据。1、获取方式这里分为两种情况,一种是设备信息的获取,一种是系统信息的获取。1.1、获取设备信息获取设备信息,鸿蒙的SDK包为我们提供了DeviceInfo类,通过该类的一些静态方法,可以获取设备信息,DeviceInfo类的包路径为:ohos.system.DeviceInfo.具体的方法如下:ModifierandTypeMethodDescriptionstatic StringgetAbiList()Obt
一、引擎主循环UE版本:4.27一、引擎主循环的位置:Launch.cpp:GuardedMain函数二、、GuardedMain函数执行逻辑:1、EnginePreInit:加载大多数模块int32ErrorLevel=EnginePreInit(CmdLine);PreInit模块加载顺序:模块加载过程:(1)注册模块中定义的UObject,同时为每个类构造一个类默认对象(CDO,记录类的默认状态,作为模板用于子类实例创建)(2)调用模块的StartUpModule方法2、FEngineLoop::Init()1、检查Engine的配置文件找出使用了哪一个GameEngine类(UGame
作为新的阿里云用户,您可以50免费试用多种优惠,价值高达1,700美元(或8,500美元)。这将让您了解和体验阿里云平台上提供的一系列产品和服务。如果您以个人身份注册免费试用,您将获得价值1,700美元的优惠。但是,如果您是注册公司,您可以选择企业免费试用,提交基本信息通过企业实名注册验证,即可开始价值$8,500的免费试用!本教程介绍了如何设置您的帐户并使用您的免费试用版。关于免费试用在我们开始此试用之前,您还必须遵守以下条款和条件才能访问您的免费试用:只有在一年内创建的账户才有资格获得阿里云免费试用。通过此免费试用优惠,用户可以免费试用免费试用活动页面上列出的每种产品一次。如果您有多个帐
1.1.1 YARN的介绍 为克服Hadoop1.0中HDFS和MapReduce存在的各种问题⽽提出的,针对Hadoop1.0中的MapReduce在扩展性和多框架⽀持⽅⾯的不⾜,提出了全新的资源管理框架YARN. ApacheYARN(YetanotherResourceNegotiator的缩写)是Hadoop集群的资源管理系统,负责为计算程序提供服务器计算资源,相当于⼀个分布式的操作系统平台,⽽MapReduce等计算程序则相当于运⾏于操作系统之上的应⽤程序。 YARN被引⼊Hadoop2,最初是为了改善MapReduce的实现,但是因为具有⾜够的通⽤性,同样可以⽀持其他的分布式计算模