草庐IT

mapreduce_shuffle

全部标签

python - 在本地测试 mapreduce 流作业的更优雅的方法?

我有一份用Python编写的mapreduce工作。在将其放入EMR之前,我想在本地对其进行测试。目前我知道的唯一测试方法是运行命令:猫输入文件|python映射器.py|排序-k1,1|pythonreducer>输出文件但管道对我来说有点可怕,因为如果有任何东西破裂我不知道(除了检查此命令的退出代码)。有没有更优雅/pythonic的方式来运行mapreduce并检查它是否成功运行(这样我就可以捕获特定的异常并处理它)?谢谢 最佳答案 一种明显的方法是在本地Hadoop框架中测试您的代码。例如,您可以在VM中使用打包的Hadoo

python - Hadoop mapreduce 任务失败并显示 143

我目前正在学习使用Hadoopmapred,但遇到了这个错误:packageJobJar:[/home/hduser/mapper.py,/home/hduser/reducer.py,/tmp/hadoop-unjar4635332780289131423/][]/tmp/streamjob8641038855230304864.jartmpDir=null16/10/3117:41:12INFOclient.RMProxy:ConnectingtoResourceManagerat/192.168.0.55:805016/10/3117:41:13INFOclient.RMProx

hadoop - 启动 MapReduce 作业的不同方式

在ApacheHadoop中仅使用job.waitForCompletion(true)方法和通过ToolRunner.run(newMyClass(),args)启动mapreduce作业有什么区别?我有一个MapReduce作业通过以下两种方式执行:首先如下:publicclassMaxTemperatureextendsConfiguredimplementsTool{publicstaticvoidmain(String[]args)throwsException{intexitCode=ToolRunner.run(newMaxTemperature(),args);Syst

java - MapReduce:如何将 HashMap 传递给映射器

我正在设计新一代分析系统,该系统需要近乎实时地处理来自多个传感器的多个事件。为此,我想使用一种大数据分析平台,例如Hadoop、SparkStreaming或Flink。为了分析每个事件,我需要使用表(DB)中的一些元数据或至少将其加载到缓存map中。问题是每个映射器将在多个节点上并行化。所以我有两件事要处理:首先,如何将HashMap加载/传递给映射器?有什么方法可以使映射器之间的HashMap保持一致吗? 最佳答案 将HashMap结构序列化为文件,将其存储在HDFS中,并在MapReduce作业配置阶段使用Distribute

hadoop - 仅在 HDFS 文件的一部分上执行 MapReduce 作业

我在HDFS(~20Gb)中有一个大文件,我通常在其上执行MapReduce作业。创建了大约170个映射器。使用的InputFormat是FileInputFormat。现在我想只在文件的一部分(例如,文件的前40Mb)上执行MapReduce作业。有没有简单的方法来执行此操作?感谢您的帮助。 最佳答案 大家好,最后,我找到了一个解决方案,包括派生FileInputFormat类并覆盖getSplits方法,以便仅获取与HDFS文件的所需部分相对应的拆分.在这个方法中,我调用父类(superclass)来获取由InputFileFo

java - 当我在 mapreduce 框架中设置 Split size 大于实际 Block size 时会发生什么?

据我所知,一个mapper将分配给一个split。但是当我将Splitsize设置为大于实际Blocksize时会发生什么?例如:如果我设置Blocksize=128Mb和SplitSize=130Mb,在这些情况下将运行多少映射器。是一个映射器还是多个映射器? 最佳答案 如果InputSplit超过HDFSblock大小,则映射器最终会从多个block读取数据。在您的示例中,如果block大小=128MB且计算的拆分大小=130MB,将生成一个映射任务,该任务将从两个不同的block读取。这两个block究竟是如何被读取的,是HD

hadoop - 读取 csv MapReduce 中的空单元格时的 ArrayIndexOutofBounds

我正在尝试为以下数据运行MapReduce程序。这是我的映射器代码:@Overrideprotectedvoidmap(Objectkey,Textvalue,Mapper.Contextcontext)throwsIOException,ArrayIndexOutOfBoundsException,InterruptedException{Stringtokens[]=value.toString().split(",");if(tokens[6]!=null){context.write(newText(tokens[6]),newIntWritable(1));}}由于我的一些单

Java MapReduce 按日期计数

我是Hadoop的新手,我正在尝试做一个MapReduce程序,以按日期(按月分组)计算讲师的最大前两次出现次数。所以我的输入是这样的:2017-06-01,A,B,A,C,B,E,F2017-06-02,Q,B,Q,F,K,E,F2017-06-03,A,B,A,R,T,E,E2017-07-01,A,B,A,C,B,E,F2017-07-05,A,B,A,G,B,G,G所以,我正在尝试这个MapReducer程序的结果,比如:2017-06,A:4,E:42017-07,A:4,B:4publicclassArrayGiulioTest{publicstaticLoggerlogg

hadoop - Shuffle write 大和 spark task 变得超慢时的优化

有一个SparkSQL将连接4个大表(前3个表5000万,最后一个表2亿)并进行一些分组操作,消耗60天的数据。并且此SQL将需要2小时才能运行,在此期间,我检查到ShuffleWrite正在急剧增加,可能会超过200GB。相比之下,当我将消耗日期范围从60天减少到45天时,运行只需要6.3分钟。我查看了DAG图,对于45天的数据,它在最后一次sortMergeJoin之后输出了10亿条数据。谁能告诉我我可以从哪个方向优化这个场景?谢谢!附言可能的相关信息:Spark.version=2.1.0spark.executor.instances=20spark.executor.memo

python - 使用 yarn 比较器在 MapReduce Python 中进行字数统计排序

我想解决字数统计问题,想得到按照文件中出现频率倒序排序的结果。以下是我为此目的编写的四个文件(2个映射器和2个缩减器,因为一个MapReduce作业无法解决此问题):1)映射器1.pyimportsysimportrereload(sys)sys.setdefaultencoding('utf-8')#requiredtoconverttounicodeforlineinsys.stdin:try:article_id,text=unicode(line.strip()).split('\t',1)exceptValueErrorase:continuewords=re.split("