草庐IT

java - 如何在 hadoop 中管理连接 - MultipleInputPath

coder 2024-01-08 原文

在 map side join 之后,我在 Reducer 中得到的数据是

key------ book
values
    6
    eraser=>book 2
    pen=>book 4
    pencil=>book 5

我基本上想做的是

eraser=>book = 2/6
pen=>book = 4/6
pencil=>book = 5/6

我最初做的是这样的

public void reduce(Text key,Iterable<Text> values , Context context) throws IOException, InterruptedException{

        System.out.println("key------ "+key);
        System.out.println("Values");
        for(Text value : values){
            System.out.println("\t"+value.toString());
            String v = value.toString();
            double BsupportCnt = 0;
            double UsupportCnt = 0;
            double res = 0;
            if(!v.contains("=>")){
                BsupportCnt = Double.parseDouble(v);
            }
            else{
                String parts[] = v.split(" ");
                UsupportCnt = Double.parseDouble(parts[1]);
            }
//          calculate here
            res = UsupportCnt/BsupportCnt;

        }

如果传入的数据如上,则可以正常工作

但是如果mapper传入的数据是

key------ book
values
    eraser=>book 2
    pen=>book 4
    pencil=>book 5
    6

这行不通 否则我需要将所有 => 存储在一个列表中(如果传入数据是大数据,该列表可能会占用堆空间),一旦我得到一个数字,我就应该进行计算。

更新 由于 Vefthym 要求在值到达 reducer 之前对其进行二次排序。 我使用 htuple 来做同样的事情。 我推荐了this link

在 mapper1 中发出 eraser=>book 2 作为值 所以

public class AprioriItemMapper1 extends Mapper<Text, Text, Text, Tuple>{
    public void map(Text key,Text value,Context context) throws IOException, InterruptedException{
        //Configurations and other stuffs
        //allWords is an ArrayList
        if(allWords.size()<=2)
        {
            Tuple outputKey = new Tuple();
            String LHS1 = allWords.get(1);
            String RHS1 = allWords.get(0)+"=>"+allWords.get(1)+" "+value.toString();
            outputKey.set(TupleFields.ALPHA, RHS1);
            context.write(new Text(LHS1), outputKey);
                 }
//other stuffs

Mapper2 发出 numbers 作为值

public class AprioriItemMapper2 extends Mapper<Text, Text, Text, Tuple>{
    Text valEmit = new Text(); 
    public void map(Text key,Text value,Context context) throws IOException, InterruptedException{
        //Configuration and other stuffs
        if(cnt != supCnt && cnt < supCnt){
            System.out.println("emit");
            Tuple outputKey = new Tuple();
            outputKey.set(TupleFields.NUMBER, value);

            System.out.println("v---"+value);
            System.out.println("outputKey.toString()---"+outputKey.toString());
            context.write(key, outputKey);
        }

Reducer 我只是尝试打印键和值

但是这个发现了错误

Mapper 2: 
line book
Support Count: 2
count--- 1
emit
v---6
outputKey.toString()---[0]='6, 
14/08/07 13:54:19 INFO mapred.LocalJobRunner: Map task executor complete.
14/08/07 13:54:19 WARN mapred.LocalJobRunner: job_local626380383_0003
java.lang.Exception: java.lang.ClassCastException: org.apache.hadoop.io.Text cannot be cast to org.htuple.Tuple
    at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:406)
Caused by: java.lang.ClassCastException: org.apache.hadoop.io.Text cannot be cast to org.htuple.Tuple
    at org.htuple.TupleMapReducePartitioner.getPartition(TupleMapReducePartitioner.java:28)
    at org.apache.hadoop.mapred.MapTask$NewOutputCollector.write(MapTask.java:601)
    at org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:85)
    at org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.write(WrappedMapper.java:106)
    at edu.am.bigdata.apriori.AprioriItemMapper1.map(AprioriItemMapper1.java:49)
    at edu.am.bigdata.apriori.AprioriItemMapper1.map(AprioriItemMapper1.java:1)
    at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:140)
    at org.apache.hadoop.mapreduce.lib.input.DelegatingMapper.run(DelegatingMapper.java:51)
    at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:672)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:330)
    at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:268)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
    at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
    at java.util.concurrent.FutureTask.run(FutureTask.java:166)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
    at java.lang.Thread.run(Thread.java:722)

错误在 context.write(new Text(LHS1), outputKey); 来自 AprioriItemMapper1.java:49 但以上打印细节来自Mapper 2

有什么更好的办法吗 请建议。

最佳答案

我建议使用二次排序,这将保证第一个值(按字典顺序排序)是数字值,假设没有以数字开头的单词。

如果这行不通,那么,考虑到您提到的可扩展性限制,我会将 reducer 的值存储在 HashMap<String,Double> 中。缓冲区,其中键是“=>”的左侧部分,值是它们的数值。 您可以存储这些值,直到获得分母的值 BsupportCnt .然后,您可以发出具有正确分数的所有缓冲区内容和所有剩余值,因为它们一个接一个地出现,而无需再次使用缓冲区(因为您现在知道分母)。类似的东西:

public void reduce(Text key,Iterable<Text> values , Context context) throws IOException, InterruptedException{
    Map<String,Double> buffer = new HashMap<>();
    double BsupportCnt = 0;
    double UsupportCnt;
    double res;
    for(Text value : values){
        String v = value.toString();

        if(!v.contains("=>")){
            BsupportCnt = Double.parseDouble(v);
        } else {
            String parts[] = v.split(" ");
            UsupportCnt = Double.parseDouble(parts[1]);

            if (BsupportCnt != 0) { //no need to add things to the buffer any more
               res = UsupportCnt/BsupportCnt;
               context.write(new Text(v), new DoubleWritable(res));
            } else {
               buffer.put(parts[0], UsupportCnt);
            }
        }

    }


    //now emit the buffer's contents
    for (Map<String,Double>.Entry entry : buffer) {
        context.write(new Text(entry.getKey()), new DoubleWritable(entry.getValue()/BsupportCnt));
    }
}

您可以通过仅将“=>”的左侧部分存储为 HashMap 的键来获得更多空间,因为右侧部分始终是 reducer 的输入键。

关于java - 如何在 hadoop 中管理连接 - MultipleInputPath,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/25160703/

有关java - 如何在 hadoop 中管理连接 - MultipleInputPath的更多相关文章

  1. ruby - 如何在 Ruby 中顺序创建 PI - 2

    出于纯粹的兴趣,我很好奇如何按顺序创建PI,而不是在过程结果之后生成数字,而是让数字在过程本身生成时显示。如果是这种情况,那么数字可以自行产生,我可以对以前看到的数字实现垃圾收集,从而创建一个无限系列。结果只是在Pi系列之后每秒生成一个数字。这是我通过互联网筛选的结果:这是流行的计算机友好算法,类机器算法:defarccot(x,unity)xpow=unity/xn=1sign=1sum=0loopdoterm=xpow/nbreakifterm==0sum+=sign*(xpow/n)xpow/=x*xn+=2sign=-signendsumenddefcalc_pi(digits

  2. ruby - i18n Assets 管理/翻译 UI - 2

    我正在使用i18n从头开始​​构建一个多语言网络应用程序,虽然我自己可以处理一大堆yml文件,但我说的语言(非常)有限,最终我想寻求外部帮助帮助。我想知道这里是否有人在使用UI插件/gem(与django上的django-rosetta不同)来处理多个翻译器,其中一些翻译器不愿意或无法处理存储库中的100多个文件,处理语言数据。谢谢&问候,安德拉斯(如果您已经在ruby​​onrails-talk上遇到了这个问题,我们深表歉意) 最佳答案 有一个rails3branchofthetolkgem在github上。您可以通过在Gemfi

  3. ruby - 如何在 buildr 项目中使用 Ruby 代码? - 2

    如何在buildr项目中使用Ruby?我在很多不同的项目中使用过Ruby、JRuby、Java和Clojure。我目前正在使用我的标准Ruby开发一个模拟应用程序,我想尝试使用Clojure后端(我确实喜欢功能代码)以及JRubygui和测试套件。我还可以看到在未来的不同项目中使用Scala作为后端。我想我要为我的项目尝试一下buildr(http://buildr.apache.org/),但我注意到buildr似乎没有设置为在项目中使用JRuby代码本身!这看起来有点傻,因为该工具旨在统一通用的JVM语言并且是在ruby中构建的。除了将输出的jar包含在一个独特的、仅限ruby​​

  4. ruby - 什么是填充的 Base64 编码字符串以及如何在 ruby​​ 中生成它们? - 2

    我正在使用的第三方API的文档状态:"[O]urAPIonlyacceptspaddedBase64encodedstrings."什么是“填充的Base64编码字符串”以及如何在Ruby中生成它们。下面的代码是我第一次尝试创建转换为Base64的JSON格式数据。xa=Base64.encode64(a.to_json) 最佳答案 他们说的padding其实就是Base64本身的一部分。它是末尾的“=”和“==”。Base64将3个字节的数据包编码为4个编码字符。所以如果你的输入数据有长度n和n%3=1=>"=="末尾用于填充n%

  5. ruby-on-rails - 如何在 ruby​​ 中使用两个参数异步运行 exe? - 2

    exe应该在我打开页面时运行。异步进程需要运行。有什么方法可以在ruby​​中使用两个参数异步运行exe吗?我已经尝试过ruby​​命令-system()、exec()但它正在等待过程完成。我需要用参数启动exe,无需等待进程完成是否有任何ruby​​gems会支持我的问题? 最佳答案 您可以使用Process.spawn和Process.wait2:pid=Process.spawn'your.exe','--option'#Later...pid,status=Process.wait2pid您的程序将作为解释器的子进程执行。除

  6. ruby - 如何在续集中重新加载表模式? - 2

    鉴于我有以下迁移:Sequel.migrationdoupdoalter_table:usersdoadd_column:is_admin,:default=>falseend#SequelrunsaDESCRIBEtablestatement,whenthemodelisloaded.#Atthispoint,itdoesnotknowthatusershaveais_adminflag.#Soitfails.@user=User.find(:email=>"admin@fancy-startup.example")@user.is_admin=true@user.save!ende

  7. ruby - 续集在添加关联时访问many_to_many连接表 - 2

    我正在使用Sequel构建一个愿望list系统。我有一个wishlists和itemstable和一个items_wishlists连接表(该名称是续集选择的名称)。items_wishlists表还有一个用于facebookid的额外列(因此我可以存储opengraph操作),这是一个NOTNULL列。我还有Wishlist和Item具有续集many_to_many关联的模型已建立。Wishlist类也有:selectmany_to_many关联的选项设置为select:[:items.*,:items_wishlists__facebook_action_id].有没有一种方法可以

  8. ruby - 如何在 Ruby 中拆分参数字符串 Bash 样式? - 2

    我正在为一个项目制作一个简单的shell,我希望像在Bash中一样解析参数字符串。foobar"helloworld"fooz应该变成:["foo","bar","helloworld","fooz"]等等。到目前为止,我一直在使用CSV::parse_line,将列分隔符设置为""和.compact输出。问题是我现在必须选择是要支持单引号还是双引号。CSV不支持超过一个分隔符。Python有一个名为shlex的模块:>>>shlex.split("Test'helloworld'foo")['Test','helloworld','foo']>>>shlex.split('Test"

  9. ruby - 如何在 Lion 上安装 Xcode 4.6,需要用 RVM 升级 ruby - 2

    我实际上是在尝试使用RVM在我的OSX10.7.5上更新ruby,并在输入以下命令后:rvminstallruby我得到了以下回复:Searchingforbinaryrubies,thismighttakesometime.Checkingrequirementsforosx.Installingrequirementsforosx.Updatingsystem.......Errorrunning'requirements_osx_brew_update_systemruby-2.0.0-p247',pleaseread/Users/username/.rvm/log/138121

  10. java - 等价于 Java 中的 Ruby Hash - 2

    我真的很习惯使用Ruby编写以下代码:my_hash={}my_hash['test']=1Java中对应的数据结构是什么? 最佳答案 HashMapmap=newHashMap();map.put("test",1);我假设? 关于java-等价于Java中的RubyHash,我们在StackOverflow上找到一个类似的问题: https://stackoverflow.com/questions/22737685/

随机推荐