草庐IT

hadoop - Mapreduce - 当 reducer 达到 67% 时超时

coder 2024-01-06 原文

当 reducer 达到 67% 时,我们会收到超时异常,我认为这是在排序阶段之后和 reduce 阶段之前。请告知我们应该寻找哪些参数来解决问题。

16/06/15 16:58:13 INFO mapreduce.Job:  map 100% reduce 0%
16/06/15 16:58:23 INFO mapreduce.Job:  map 100% reduce 24%
16/06/15 16:59:05 INFO mapreduce.Job:  map 100% reduce 28%
16/06/15 16:59:08 INFO mapreduce.Job:  map 100% reduce 30%
16/06/15 16:59:39 INFO mapreduce.Job:  map 100% reduce 33%
16/06/15 17:00:09 INFO mapreduce.Job:  map 100% reduce 52%
16/06/15 17:00:12 INFO mapreduce.Job:  map 100% reduce 67%
16/06/15 17:05:42 INFO mapreduce.Job: Task Id : attempt_1465992294703_0001_r_000000_2, Status : FAILED

驱动类

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.CSVLineRecordReader;
import org.apache.hadoop.mapreduce.lib.input.CSVNLineInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class ExchgLogsTransposeDriver extends Configured implements Tool {


    public int run(String[] args) throws Exception {
        @SuppressWarnings("deprecation")
        Configuration conf = getConf();
        String outPath=null;
        String inPath=null;

        if(args==null ||args.length==0){
             inPath="C:\\HadoopWS\\infile\\";
             outPath="C:\\HadoopWS\\outfile\\";

        }else{
            inPath=args[0];
            outPath=args[1];


        }


        Path output =new Path(outPath);
        Path input =new Path(inPath);

        FileSystem hdfs = FileSystem.get(conf);
        if (hdfs.exists(output)) {
            hdfs.delete(output, true);
        }
        conf.set(CSVLineRecordReader.FORMAT_DELIMITER, "\"");
        conf.set(CSVLineRecordReader.FORMAT_SEPARATOR, ",");
        conf.setInt(CSVNLineInputFormat.LINES_PER_MAP, 500000);
        conf.setBoolean(CSVLineRecordReader.IS_ZIPFILE, false);
        Job job = new Job(conf);

        job.setJarByClass(ExchgLogsTransposeDriver.class);
        job.setMapperClass(ExchgLogsMapper.class);
        job.setMapOutputKeyClass(CompositeKey.class);
        job.setMapOutputValueClass(CompositeWritable.class);
//        job.setNumReduceTasks(2);
        job.setMapSpeculativeExecution(true);

        job.setPartitionerClass(ActualKeyPartitioner.class);
        job.setGroupingComparatorClass(ActualKeyGroupingComparator.class);
        job.setSortComparatorClass(CompositeKeyComparator.class);
        job.setReducerClass(ExchgLogsReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(CompositeWritable.class);

        job.getConfiguration().set("mapreduce.output.basename", input.getName());
        job.getConfiguration().set("mapreduce.map.output.compress", "true");
//        job.getConfiguration().set("mapreduce.map.output.compress.codec", "com.hadoop.compression.lzo.LzoCodec");


        job.setInputFormatClass(CSVNLineInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        FileInputFormat.setInputDirRecursive(job, true);
        FileInputFormat.addInputPath(job, new Path(inPath));
        FileOutputFormat.setOutputPath(job, new Path(outPath));

        return job.waitForCompletion(true) ? 0 : 1;
    }

    public static void main(String args[]) throws Exception {
        System.exit(ToolRunner.run(new ExchgLogsTransposeDriver(), args));
    }
}

reducer 类

import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class ExchgLogsReducer extends Reducer<CompositeKey, CompositeWritable, NullWritable, Text> {
    Log log = LogFactory.getLog(ExchgLogsReducer.class);
    public static final String NEW = "NEW";
    public static final String FW = "FW";
    public static final String RE = "RE";
    public static final int ZERO = 0;
    Text res = new Text();

    @Override
    public void reduce(CompositeKey key, Iterable<CompositeWritable> value, Context context)
            throws IOException, InterruptedException {
        List<CompositeValueObj> cache = new ArrayList<CompositeValueObj>();

        StringBuilder response = new StringBuilder();
        Iterator<CompositeWritable> it = value.iterator();
        while (it.hasNext()) {
            CompositeWritable currWritable = new CompositeWritable();
            currWritable = it.next();
            CompositeValueObj obj = new CompositeValueObj();
            obj.setRecepient((currWritable.getRecepient().toString()));
            obj.setSender(currWritable.getSender().toString());
            obj.setType(currWritable.getType().toString());
            obj.setTimestamp(currWritable.getTimestamp().toString());
            cache.add(obj);
            // System.out.println(new Text(" "+"\t" + obj.getRecepient() + "\t"
            // + obj.getSender() + "\t" +obj.getType()+ "\t" +
            // obj.getTimestamp()));

        }

        for (int i = 0; i < cache.size(); i++) {
            CompositeValueObj currobj = cache.get(i);
            String receiver = currobj.getRecepient().toString();
            String origSender = currobj.getSender().toString();

            String dateFrom = currobj.getTimestamp().toString();
            System.out.println(key.getSubject() + "  " + "i==>" + i + cache.size());
            for (int j = i + 1; j < cache.size(); j++) {
                response = new StringBuilder(key.getSubject()).append(",").append(receiver).append(",");
                CompositeValueObj nextObj = cache.get(j);
                System.out.println(key.getSubject() + "  " + "j==>" + j);

                String dateTo = nextObj.getTimestamp().toString();
                String newSender = nextObj.getSender().toString();
                String newRecepient = nextObj.getRecepient().toString();
                String mailType = nextObj.getType().toString();

                // System.out.println(mailType+ "==>"+receiver+
                // "==>"+newRecepient);

                if (receiver.equals(newRecepient)) {
                    response.append(origSender).append(",");
                    response.append("N,0,0,").append(dateFrom);
                    break;
                }

                if (receiver.equals(newSender) && ((mailType.equals(RE) || (mailType.equals(FW))))) {
                    if (mailType.equals(RE)) {
                        response.append(origSender).append(",");
                        response.append("Y,");
                        response.append(getTimeDiff(dateFrom, dateTo));
                        response.append(",0,").append(dateTo);
                        break;
                    }

                    if (mailType.equals(FW)) {
                        response.append(origSender).append(",");
                        response.append("Y,0,");
                        response.append(getTimeDiff(dateFrom, dateTo));
                        response.append(",").append(dateTo);
                        break;
                    }

                } else {
                    response.append(origSender).append(",");
                    response.append("N,0,0,").append(dateFrom);
                }

            }
            if (i + 1 == cache.size()) {
                response = new StringBuilder(key.getSubject()).append(",").append(receiver).append(",");
                response.append(origSender).append(",");
                response.append("N,0,0,").append(dateFrom);

            }
            res.set(response.toString());
            // System.err.println(key.getSubject()+new
            // Text(response.toString()));
            context.write(NullWritable.get(), res);
        }

    }

    private static double getTimeDiff(String date1, String date2) {
        double diff = 0;
        double weekend = 0;
        boolean isWEchain=false;
        boolean isWESent=false;

        if (date1 == null || date2 == null) {
            return 0;
        }
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS");
        try {
            Date from = sdf.parse(date1);
            Date to = sdf.parse(date2);
            Calendar cal1 = Calendar.getInstance();
            Calendar cal2 = Calendar.getInstance();
            cal1.setTime(from);
            cal2.setTime(to);
            int noOfDaysWE = 0;
            System.out.println(cal1.get(Calendar.DAY_OF_WEEK));
            System.out.println(cal2.get(Calendar.DAY_OF_WEEK));
            if ((((Calendar.FRIDAY == cal1.get(Calendar.DAY_OF_WEEK))
                    || (Calendar.SATURDAY == cal1.get(Calendar.DAY_OF_WEEK)))
                    && ((Calendar.FRIDAY == cal2.get(Calendar.DAY_OF_WEEK))
                            || (Calendar.SATURDAY == cal2.get(Calendar.DAY_OF_WEEK))))
                    ) {

                isWEchain =true;
            }else if((((Calendar.FRIDAY == cal1.get(Calendar.DAY_OF_WEEK))
                    || (Calendar.SATURDAY == cal1.get(Calendar.DAY_OF_WEEK)))
                    && (((Calendar.FRIDAY != cal2.get(Calendar.DAY_OF_WEEK))
                            && (Calendar.SATURDAY != cal2.get(Calendar.DAY_OF_WEEK)))))){
                isWESent=true;
                if(Calendar.FRIDAY == cal1.get(Calendar.DAY_OF_WEEK)){
                    cal1.add(Calendar.DATE, 1);

                }
                cal1.set(Calendar.HOUR,20);
                cal1.set(Calendar.MINUTE,0);
                cal1.set(Calendar.SECOND,0);
                cal1.set(Calendar.MILLISECOND,0);
            }
            System.out.println(cal1.getTime());
            System.out.println(cal2.getTime());
            System.out.println(isWESent);
            diff=cal2.getTimeInMillis() - cal1.getTimeInMillis();
            if(diff < 0 ){
            return 0;   
            }

            while (cal1.before(cal2)) {
                if ((Calendar.FRIDAY == cal1.get(Calendar.DAY_OF_WEEK))
                        || (Calendar.SATURDAY == cal1.get(Calendar.DAY_OF_WEEK))) {
                    noOfDaysWE++;
                }
                cal1.add(Calendar.DATE, 1);
            }




            if (noOfDaysWE != 0) {
                weekend = TimeUnit.DAYS.toMillis(noOfDaysWE);
            }
            if(isWEchain && (noOfDaysWE <= 2)){
                return 0;

            }

            System.out.println(diff);
            diff = diff - weekend;
        } catch (ParseException e) {
            return 0;
        }

        if (diff != 0)
            return diff / 1000;
        else
            return 0;
    }

    public static void main(String[] a) {

        System.out.println(getTimeDiff("2016-06-03T19:41:48.781Z", "2016-06-05T07:21:01.000Z"));
    }

}

最佳答案

请查看 mapred.task.timeoutmapred-site.xml 中的毫秒。

修改属性后,需要重启所有的tranckers。(job + task)

提示:如果您想在运行时打印所有配置以检查是否已应用,请使用驱动程序中的以下代码片段。 例如:

final JobConf conf = new JobConf(config, this.getClass());

try {
            conf.writeXml(System.out);
        } catch (final IOException e) {
            e.printStackTrace();
        }

关于hadoop - Mapreduce - 当 reducer 达到 67% 时超时,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37836944/

有关hadoop - Mapreduce - 当 reducer 达到 67% 时超时的更多相关文章

  1. ruby - 简单获取法拉第超时 - 2

    有没有办法在这个简单的get方法中添加超时选项?我正在使用法拉第3.3。Faraday.get(url)四处寻找,我只能先发起连接后应用超时选项,然后应用超时选项。或者有什么简单的方法?这就是我现在正在做的:conn=Faraday.newresponse=conn.getdo|req|req.urlurlreq.options.timeout=2#2secondsend 最佳答案 试试这个:conn=Faraday.newdo|conn|conn.options.timeout=20endresponse=conn.get(url

  2. hadoop安装之保姆级教程(二)之YARN的配置 - 2

    1.1.1 YARN的介绍 为克服Hadoop1.0中HDFS和MapReduce存在的各种问题⽽提出的,针对Hadoop1.0中的MapReduce在扩展性和多框架⽀持⽅⾯的不⾜,提出了全新的资源管理框架YARN. ApacheYARN(YetanotherResourceNegotiator的缩写)是Hadoop集群的资源管理系统,负责为计算程序提供服务器计算资源,相当于⼀个分布式的操作系统平台,⽽MapReduce等计算程序则相当于运⾏于操作系统之上的应⽤程序。 YARN被引⼊Hadoop2,最初是为了改善MapReduce的实现,但是因为具有⾜够的通⽤性,同样可以⽀持其他的分布式计算模

  3. ruby-on-rails - Rails 优雅地处理超时 session ? - 2

    使用rails4,ruby2。我在rails配置中为我的cookiesession设置了30分钟的超时时间。问题是,如果我转到表单,让session超时,然后提交表单,我会收到此ActionController::InvalidAuthenticityToken错误。如何在Rails中优雅地处理这个错误?比如说,重定向到登录屏幕? 最佳答案 在您的ApplicationController:rescue_fromActionController::InvalidAuthenticityTokendoredirect_tosome_p

  4. Ruby 在 n *milli* 秒后超时一段代码 - 2

    在Ruby中,我需要在n毫秒秒后暂停一段代码的执行。我知道RubyTimeout库支持秒的超时:http://ruby-doc.org/stdlib/libdoc/timeout/rdoc/index.html这可能吗? 最佳答案 只需为超时使用十进制值。n毫秒的示例:Timeout::timeout(n/1000.0){sleep(100)} 关于Ruby在n*milli*秒后超时一段代码,我们在StackOverflow上找到一个类似的问题: https:

  5. 大数据之Hadoop数据仓库Hive - 2

    目录:一、简介二、HQL的执行流程三、索引四、索引案例五、Hive常用DDL操作六、Hive常用DML操作七、查询结果插入到表八、更新和删除操作九、查询结果写出到文件系统十、HiveCLI和Beeline命令行的基本使用十一、Hive配置一、简介Hive是一个构建在Hadoop之上的数据仓库,它可以将结构化的数据文件映射成表,并提供类SQL查询功能,用于查询的SQL语句会被转化为MapReduce作业,然后提交到Hadoop上运行。特点:简单、容易上手(提供了类似sql的查询语言hql),使得精通sql但是不了解Java编程的人也能很好地进行大数据分析;灵活性高,可以自定义用户函数(UDF)和

  6. ruby-on-rails - unicorn 超时处理 - 2

    我想知道从我的应用程序跟踪unicorn超时的最佳方法是什么。该应用程序的某些部分运行缓慢,目前它们已安静地超时。我可以增加超时时间,但这将问题推到了地毯下。理想情况下,我希望收到减速板通知或类似的效果。我不是在寻找性能指南,我只是在寻找一种高效可靠地了解超时的方法。其他人如何使用unicorn处理Rails应用程序的超时?扫描nginx错误日志?在unicorn配置中插入处理程序?nginx配置中的处理程序?[Ubuntu12.04+nginx+unicorn+rails3.2+ruby1.9.3] 最佳答案 我倾向于在NewRe

  7. ruby - 在 AWS Linux 上增加 RestClient/Net::HTTP 中的 connect(2) 超时 - 2

    我正在使用rest-client发布到一个非常慢的网络服务。我将timeout设置为600秒,并且我已经确认它正在传递给Net::HTTP的@read_timeout和@open_timeout.但是,大约两分钟后,我收到一个低级超时错误,Errno::ETIMEDOUT:Connectiontimedout-connect(2):回溯的相关部分是Operationtimedout-connect(2)for[myhost]port[myport]/Users/dmoles/.rvm/rubies/ruby-2.2.5/lib/ruby/2.2.0/net/http.rb:879:in

  8. ruby - 用 map reduce 解决一个问题 - 2

    我想在ruby​​中模拟我对像hadoop这样的系统的map和reduce函数的实现,以验证这个想法至少有效。我有以下问题。我有两个元素列表:List13-A4-B5-C7-D8-FList22-A8-B6-C9-D4-E我需要构建一个公共(public)列表,其中包括与两个列表中公共(public)字母关联的数字总和:commonList5-A12-B11-C16-D我想用map和reduce操作制作一个ruby​​脚本来解决这个问题。我不确定如何解决这个问题或在ruby​​脚本中模拟这个问题要遵循什么程序。感谢任何帮助。 最佳答案

  9. ruby-on-rails - 如何在 sidekiq 中设置作业超时 - 2

    我遇到了sidekiq的问题:我想为作业设置超时,这意味着当作业的处理时间大于超时时,该作业将停止。我已经搜索了如何在文件sidekiq.yml中设置全局超时配置。但是我想为不同的单独作业设置单独的超时,这意味着定义工作人员的类之一将具有特定的超时配置。你能帮帮我吗?非常感谢。 最佳答案 没有批准的方法可以做到这一点。您无法在线程执行时安全地停止它。您需要更改工作以定期检查它是否应该停止。您可以为您正在进行的任何第3方调用设置网络超时,以便它们超时。 关于ruby-on-rails-如何

  10. ruby - 超时::在 ruby​​ 中使用 selenium-webdriver 时出错 - 2

    自从我开始开发测试以来,我一直收到超时错误。起初我认为这与我的xpaths的效率有关,但在多次看到测试快速通过后,我认为这与选择器无关。该错误是随机发生的,并且通常在一个特征中多次发生。我需要修复或至少了解这个问题是什么。步骤定义示例:When/^Inavigateto"(.*)"$/do|webpage|navigate_to(webpage)end这是我得到的错误:Timeout::Error(Timeout::Error)/usr/lib/ruby/1.9.1/net/protocol.rb:146:in`rescueinrbuf_fill'/usr/lib/ruby/1.9.1

随机推荐