草庐IT

Iterator-reducer

全部标签

python - 如何使用 Python 在 MapReduce 中的 reducer 中输出键值对,以便 1 小时内的时间结束?

我有一种情况需要处理一个非常大的文本文件,格式如下:ID\ttime\tduration\tDescription\tstatus我想利用MapReduce来帮助我处理这个文件。我知道MapReduce基于键值对工作。Mapper将输出键和一些值,而MapReduce将确保所有相同的键最终都在1个reducer中。我想要在reducer中结束的是时间间隔在1小时以内的行。然后在reducer中,我想访问所有其他信息以及ID、持续时间、状态来做其他事情。所以我猜想输出的值是一个列表还是什么?我有一些Python代码来处理输入数据。映射器.py#!/usr/bin/envpythonimp

file - java eclipse hadoop map reduce程序无法访问我存储在hdfs中的文件

我的javaeclipsehadoopmapreduce程序显示无法定位输入文件的错误。我已经使用hadoop命令通过终端将文件复制到hadoop目录。我可以在javaeclipsedfs位置看到这些文件。并且还在终端中使用命令hadoopdfs-ls。当我创建一个普通文件夹(不是hdfs)时,问题就解决了。但是随后程序正在从本地文件系统访问文件。我已经在redhat服务器32位上安装了hadoop1.2.1,使用javaeclipseluna,我已经包含了hadoop插件和来自hadoop库的外部jar文件。输入和输出路径通过运行时参数给出 最佳答案

java - 如何为 map reducer 作业在 java 中为 hadoop 输入自定义选择列读取

Hadoop新手,我想了解Hadoop如何读取文件输入:我能够使用下面的代码从2列(键/值)输入文件运行Hadoop作业:但是如果我有5列并且我想要的(键/值)是A&E(而不是A&B)我需要准确修改哪个函数呢?publicclassInverterCounterextendsConfiguredimplementsTool{publicstaticclassMapClassextendsMapReduceBaseimplementsMapper{publicvoidmap(Textkey,Textvalue,OutputCollectoroutput,Reporterreporter)

hadoop - 哪种方法阻止 reducer 在 hadoop yarn 中启动实际的 reduce 阶段?

我是hadoopyarn的新手,希望reducers在完成所有映射之前开始实际的缩减过程。我试图找出调用reducer但找不到的类。任何人都可以在这方面帮助我吗? 最佳答案 在所有映射器完成之前,reducer只能开始收集映射器的输出。这称为shuffle阶段。但是,它们无法启动sorting和reduce阶段,因为它们需要在开始工作之前拥有ALLmap输出记录在他们。原因很简单:想象一下wordcount示例,您想要计算一个词的出现频率。在reduce阶段,如果您在获取所有映射器的输出(即一些计数是缺少这个词),那么,你可能会给出

hadoop - 将 reducer 设置为默认值,但最后我有两个文件

我正在运行一个mapreduce作业,其中reducer的数量设置为默认值(一个reducer)。理论上,每个reducer的输出必须是一个文件,但是当我运行我的工作时,我有两个文件部分-r-00000和部分-r-00001为什么会这样?我的集群中只有一个节点。我的驱动类:publicclassDriverDateextendsConfiguredimplementsTool{@Overridepublicintrun(String[]args)throwsException{if(args.length!=2){System.out.printf("Usage:AvgWordLeng

java - Hadoop,成功的 Map Reduce 作业但没有输出

目标:我正在尝试使用MapReduce合并数据。我在同一个文件夹中有多组数据。方法:所以我在一个程序/流程中连续多次运行MapReduce合并作业。问题:我面临的问题不是失败的工作,而是没有输出的成功工作。第一个(有时是两个)迭代总是有输出(part-r-00000)但不是以下。我正在使用大小和体积都非常小的示例数据集(1~2kb,大约5个文件)我尝试了什么:让线程在每次运行后hibernate5秒,但无济于事。过了一会儿,我尝试使用webhdfs检查,仍然没有这样的文件。请问您能给我解释一下吗?提前致谢。图片:代码:/**Tochangethislicenseheader,choos

java - 合并来自 hadoop map-reduce 的结果

我有一个Mapper,NullWritable,Text,Text>它有效地接收电子邮件并多次吐出电子邮件地址的键和找到它的字段的值(发件人、收件人、抄送等)。然后我有一个Reducer接受电子邮件地址和字段名称。它吐出一个NullWritable键和一个地址在给定字段中出现的次数的计数。例如...{"address":"joe.bloggs@gmail.com","toCount":12,"fromCount":4}我正在使用FileUtil.copyMerge来合并作业的输出,但(显然)不同reducer的结果没有合并,所以在实践中我看到:{"address":"joe.blogg

hadoop - HDFS 联邦 : Submission of Map Reduce jobs among multiple Name nodes

根据HdfsFederation上的Apache文档,系统可通过多个名称节点的联合进行隔离扩展。多个名称节点/namespace为了横向扩展名称服务,联邦使用多个独立的名称节点/namespace。名称节点是联合的;Namenodes是独立的,不需要相互协调。Datanodes被所有Namenodes用作block的公共(public)存储。我唯一的疑问:我没有看到名称节点之间有任何中央协调器,因为所有节点都在运行隔离。对如何提交和处理作业感到困惑。1)如果我提交一个map-reduce作业,哪个名称节点将处理它?或者2)客户端是否应该知道必须为其提交作业的名称节点?如果客户端不知道哪

scala - 如何将 Scalding TypedPipe 转换为 Iterator

在我的Scaldinghadoop作业中,我在管道上有一些分组逻辑,然后我需要处理每个组:valgeorecs:TypedPipe[GeoRecord]=getRecordsgeorecs.map(r=>(getRegion(r),r)).groupBy(_._1).mapValueStream(xs=>clusterRecords(xs)).values.write(out)在clusterRecords内部,我需要将传入的迭代器转换为TypedPipe,以便我可以1)对其进行采样和2)取叉积://turntheiteratortoapipesowecansampleitvalsam

java - Hadoop 1.2.1 - 多节点集群 - Wordcount 程序的 Reducer 阶段挂起?

我的问题在这里听起来可能有些多余,但之前问题的解决方案都是临时的。我尝试过的很少,但还没有成功。实际上,我正在研究hadoop-1.2.1(在ubuntu14上),最初我有singlenodeset-up然后我运行了WordCount编程成功。然后我根据this给它加了一个节点。教程。它成功启动,没有任何错误,但现在当我运行相同的WordCount程序时,它卡在减少阶段。我查看了任务跟踪器日志,它们如下所示:-INFOorg.apache.hadoop.mapred.TaskTracker:LaunchTaskAction(registerTask):attempt_201509110