草庐IT

hadoop - 如何通过并行运行的两个 map task 并行读取两个文件

coder 2024-01-10 原文

请对我放轻松一点,因为我才接触 Hadoop 和 Mapreduce 3 个月。

我有 2 个文件,每个文件 120 MB,每个文件中的数据完全是非结构化的,但具有共同的模式。由于数据结构不同,默认的 LineInputFormat 无法满足我的要求。

因此,在读取文件时,我覆盖了 isSplitable() 方法并通过返回 false 来停止拆分。这样 1 个映射器就可以访问一个完整的文件,我可以执行我的逻辑并实现要求。

我的机器可以并行运行两个映射器,所以通过停止拆分,我通过为每个文件一个一个地运行映射器而不是为一个文件并行运行两个映射器来降低性能。

我的问题是如何为两个文件并行运行两个映射器以提高性能。

例如

When split was allowed:
    file 1: split 1 (1st mapper) || split 2 (2nd mapper)------ 2 min 
    file 2: split 1 (1st mapper) || split 2 (2nd mapper)------ 2 min

    Total Time for reading two files =====  4 min

When Split not allowed:
    file 1: no parallel jobs so (1st mapper)---------4 min
    file 2: no parallel jobs so (1st mapper)---------4 min

    Total Time to read two files ===== 8 min (Performance degraded)

What I want
    File 1 (1st Mapper) || file 2 (2nd Mapper) ------4 min

    Total time to read two files ====== 4 min 

基本上我希望两个文件同时被两个不同的映射器读取。

请帮助我实现这个场景。

下面是我的自定义 InputFormat 和自定义 RecordReader 代码。

public class NSI_inputformatter extends FileInputFormat<NullWritable, Text>{
@Override
public boolean isSplitable(FileSystem fs, Path filename)
{
    //System.out.println("Inside the isSplitable Method of NSI_inputformatter");
    return false;
}

@Override
public RecordReader<NullWritable, Text> getRecordReader(InputSplit split,
        JobConf job_run, Reporter reporter) throws IOException {
    // TODO Auto-generated method stub
    //System.out.println("Inside the getRecordReader method of NSI_inputformatter");

    return new NSI_record_reader(job_run, (FileSplit)split);
}

}

记录阅读器:

public class NSI_record_reader implements RecordReader<NullWritable, Text> 
{
FileSplit split;
JobConf job_run;
String text;
public boolean processed=false;
public NSI_record_reader(JobConf job_run, FileSplit split)
{
    //System.out.println("Inside the NSI_record_reader constructor");
    this.split=split;
    this.job_run=job_run;

    //System.out.println(split.toString());
}
@Override
public boolean next(NullWritable key, Text value) throws IOException {
    // TODO Auto-generated method stub
    //System.out.println("Inside the next method of the NLI_record_reader");
    if (!processed)
    {
        byte [] content_add=new byte[(int)(split.getLength())];
        Path file=split.getPath();
        FileSystem fs=file.getFileSystem(job_run);
        FSDataInputStream input=null;


        try{
            input=fs.open(file);
            System.out.println("the input is " +input+ input.toString());
            IOUtils.readFully(input, content_add, 0, content_add.length);
            value.set(content_add, 0, content_add.length);
        }
        finally
        {
            IOUtils.closeStream(input);

        }
        processed=true;
        return true;
    }

    return false;
}

@Override
public void close() throws IOException {
    // TODO Auto-generated method stub

}

@Override
public NullWritable createKey() {
    System.out.println("Inside createkey() mrthod of NSI_record_reader");
    // TODO Auto-generated method stub
    return  NullWritable.get();
}

@Override
public Text createValue() {
    System.out.println("Inside createValue() mrthod of NSI_record_reader");
    // TODO Auto-generated method stub
    return new Text();
}

@Override
public long getPos() throws IOException {
    // TODO Auto-generated method stub
    System.out.println("Inside getPs() mrthod of NSI_record_reader");
    return processed ? split.getLength() : 0;
}

@Override
public float getProgress() throws IOException {
    // TODO Auto-generated method stub
    System.out.println("Inside getProgress() mrthod of NSI_record_reader");
    return processed ? 1.0f : 0.0f;
}

}

输入样本:

<Dec 12, 2013 1:05:56 AM CST> <Error> <HTTP> <BEA-101017>       <[weblogic.servlet.internal.WebAppServletContext@42e87d99 - appName: 'Agile', name:    '/Agile', context-path: '/Agile', spec-version: 'null'] Root cause of ServletException.
  javax.servlet.jsp.JspException: Connection reset by peer: socket write error
at com.agile.ui.web.taglib.common.FormTag.writeFormHeader(FormTag.java:498)
at com.agile.ui.web.taglib.common.FormTag.doStartTag(FormTag.java:429)
at jsp_servlet._default.__login_45_cms._jspService(__login_45_cms.java:929)
at weblogic.servlet.jsp.JspBase.service(JspBase.java:34)
at    weblogic.servlet.internal.StubSecurityHelper$ServletServiceAction.run(StubSecurityHelper.ja va:227)
Truncated. see log file for complete stacktrace
>
Retrieving the value for the attribute Page Two.Validation Status for the Object 769630
Retrieving the value for the attribute Page Two.Pilot Required for the Object 769630
Retrieving the value for the attribute Page Two.NPO Contact for the Object 769630
<Dec 12, 2013 1:12:13 AM CST> <Warning> <Socket> <BEA-000449> <Closing socket as no         data read from it during the configured idle timeout of 0 secs> 

谢谢。

最佳答案

您可以尝试设置属性 -D mapred.min.split.size=209715200。在这种情况下,FileInputFormat 不应拆分您的文件,因为它们小于 mapred.min.split.size

关于hadoop - 如何通过并行运行的两个 map task 并行读取两个文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/23415498/

有关hadoop - 如何通过并行运行的两个 map task 并行读取两个文件的更多相关文章

  1. ruby - 如何使用 Nokogiri 的 xpath 和 at_xpath 方法 - 2

    我正在学习如何使用Nokogiri,根据这段代码我遇到了一些问题:require'rubygems'require'mechanize'post_agent=WWW::Mechanize.newpost_page=post_agent.get('http://www.vbulletin.org/forum/showthread.php?t=230708')puts"\nabsolutepathwithtbodygivesnil"putspost_page.parser.xpath('/html/body/div/div/div/div/div/table/tbody/tr/td/div

  2. ruby - 如何从 ruby​​ 中的字符串运行任意对象方法? - 2

    总的来说,我对ruby​​还比较陌生,我正在为我正在创建的对象编写一些rspec测试用例。许多测试用例都非常基础,我只是想确保正确填充和返回值。我想知道是否有办法使用循环结构来执行此操作。不必为我要测试的每个方法都设置一个assertEquals。例如:describeitem,"TestingtheItem"doit"willhaveanullvaluetostart"doitem=Item.new#HereIcoulddotheitem.name.shouldbe_nil#thenIcoulddoitem.category.shouldbe_nilendend但我想要一些方法来使用

  3. ruby - 使用 RubyZip 生成 ZIP 文件时设置压缩级别 - 2

    我有一个Ruby程序,它使用rubyzip压缩XML文件的目录树。gem。我的问题是文件开始变得很重,我想提高压缩级别,因为压缩时间不是问题。我在rubyzipdocumentation中找不到一种为创建的ZIP文件指定压缩级别的方法。有人知道如何更改此设置吗?是否有另一个允许指定压缩级别的Ruby库? 最佳答案 这是我通过查看ruby​​zip内部创建的代码。level=Zlib::BEST_COMPRESSIONZip::ZipOutputStream.open(zip_file)do|zip|Dir.glob("**/*")d

  4. ruby - 其他文件中的 Rake 任务 - 2

    我试图在一个项目中使用rake,如果我把所有东西都放到Rakefile中,它会很大并且很难读取/找到东西,所以我试着将每个命名空间放在lib/rake中它自己的文件中,我添加了这个到我的rake文件的顶部:Dir['#{File.dirname(__FILE__)}/lib/rake/*.rake'].map{|f|requiref}它加载文件没问题,但没有任务。我现在只有一个.rake文件作为测试,名为“servers.rake”,它看起来像这样:namespace:serverdotask:testdoputs"test"endend所以当我运行rakeserver:testid时

  5. ruby-on-rails - 在 Rails 中将文件大小字符串转换为等效千字节 - 2

    我的目标是转换表单输入,例如“100兆字节”或“1GB”,并将其转换为我可以存储在数据库中的文件大小(以千字节为单位)。目前,我有这个:defquota_convert@regex=/([0-9]+)(.*)s/@sizes=%w{kilobytemegabytegigabyte}m=self.quota.match(@regex)if@sizes.include?m[2]eval("self.quota=#{m[1]}.#{m[2]}")endend这有效,但前提是输入是倍数(“gigabytes”,而不是“gigabyte”)并且由于使用了eval看起来疯狂不安全。所以,功能正常,

  6. python - 如何使用 Ruby 或 Python 创建一系列高音调和低音调的蜂鸣声? - 2

    关闭。这个问题是opinion-based.它目前不接受答案。想要改进这个问题?更新问题,以便editingthispost可以用事实和引用来回答它.关闭4年前。Improvethisquestion我想在固定时间创建一系列低音和高音调的哔哔声。例如:在150毫秒时发出高音调的蜂鸣声在151毫秒时发出低音调的蜂鸣声200毫秒时发出低音调的蜂鸣声250毫秒的高音调蜂鸣声有没有办法在Ruby或Python中做到这一点?我真的不在乎输出编码是什么(.wav、.mp3、.ogg等等),但我确实想创建一个输出文件。

  7. ruby-on-rails - Rails 3 中的多个路由文件 - 2

    Rails2.3可以选择随时使用RouteSet#add_configuration_file添加更多路由。是否可以在Rails3项目中做同样的事情? 最佳答案 在config/application.rb中:config.paths.config.routes在Rails3.2(也可能是Rails3.1)中,使用:config.paths["config/routes"] 关于ruby-on-rails-Rails3中的多个路由文件,我们在StackOverflow上找到一个类似的问题

  8. ruby-on-rails - 如何验证 update_all 是否实际在 Rails 中更新 - 2

    给定这段代码defcreate@upgrades=User.update_all(["role=?","upgraded"],:id=>params[:upgrade])redirect_toadmin_upgrades_path,:notice=>"Successfullyupgradeduser."end我如何在该操作中实际验证它们是否已保存或未重定向到适当的页面和消息? 最佳答案 在Rails3中,update_all不返回任何有意义的信息,除了已更新的记录数(这可能取决于您的DBMS是否返回该信息)。http://ar.ru

  9. ruby-on-rails - 'compass watch' 是如何工作的/它是如何与 rails 一起使用的 - 2

    我在我的项目目录中完成了compasscreate.和compassinitrails。几个问题:我已将我的.sass文件放在public/stylesheets中。这是放置它们的正确位置吗?当我运行compasswatch时,它不会自动编译这些.sass文件。我必须手动指定文件:compasswatchpublic/stylesheets/myfile.sass等。如何让它自动运行?文件ie.css、print.css和screen.css已放在stylesheets/compiled。如何在编译后不让它们重新出现的情况下删除它们?我自己编译的.sass文件编译成compiled/t

  10. ruby - 将差异补丁应用于字符串/文件 - 2

    对于具有离线功能的智能手机应用程序,我正在为Xml文件创建单向文本同步。我希望我的服务器将增量/差异(例如GNU差异补丁)发送到目标设备。这是计划:Time=0Server:hasversion_1ofXmlfile(~800kiB)Client:hasversion_1ofXmlfile(~800kiB)Time=1Server:hasversion_1andversion_2ofXmlfile(each~800kiB)computesdeltaoftheseversions(=patch)(~10kiB)sendspatchtoClient(~10kiBtransferred)Cl

随机推荐