草庐IT

java - 小文件的 Spark 重新分区数据

coder 2024-01-06 原文

我是 Spark 的新手,我使用的集群主要用于并行化目的。我有一个 100MB 的文件,其中的每一行都经过某种算法处理,这是一个相当繁重且漫长的处理过程。

我想使用 10 节点集群并并行处理。我知道 block 大小超过 100MB,我尝试重新分区 textFile。如果我理解得很好,这个repartition方法增加了分区的数量:

JavaRDD<String> input = sc.textFile(args[0]);
input.repartition(10);

问题是当我部署到集群时,只有一个节点在有效处理。我怎样才能设法并行处理文件?

更新 1:这是我的 spark-submit 命令:

/usr/bin/spark-submit --master yarn --class mypackage.myclass --jars 
myjar.jar 
gs://mybucket/input.txt outfile

更新2:分区后,基本上有2个操作:

JavaPairRDD<String, String> int_input = mappingToPair(input);
JavaPairRDD<String, String> output = mappingValues(int_input, option);
output.saveAsTextFile("hdfs://...");

其中 mappingToPair(...)

public JavaPairRDD<String, String> mappingToPair(JavaRDD<String> input){
        return input.mapToPair(new PairFunction<String, String, String>() {
            public Tuple2<String, String> call(String line) {
                String[] arrayList = line.split("\t", 2);
                return new Tuple2(arrayList[0], arrayList[1]);
            }
        });
    }

mappingValues(...) 是以下类型的方法:

public JavaPairRDD<String,String> mappingValues(JavaPairRDD<String,String> rdd, final String option){
        return rdd.mapValues(
                new Function<String, String>() {
                    // here the algo processing takes place...
                }
        )
}

最佳答案

这里可能有多个问题:

  1. 文件只有一个 block 大。使用多个执行程序读取它根本没有用,因为 HDFS 节点可以全速服务一个节点,或者以一半速度(加上开销)服务两个节点,等等。当您执行时,执行程序计数变得有用(对于读取步骤)有多个 block 分散在不同的 HDFS 节点上。
  2. 也有可能您以不可拆分的压缩格式存储文件,因此输入步骤只能用一个执行程序读取它,即使它是 block 大小的 100 倍。
  3. 您没有将 repartition(10) 调用链接到您的流程中,因此它根本无效。如果将此行:input.repartition(10); 替换为以下行:input = input.repartition(10); 它将被使用,并且它应该拆分在继续下一步之前将 RDD 分成多个。

请注意,重新分区会使您的过程变得更长,因为数据必须拆分并传输到其他机器,这很容易因网络速度慢而成为瓶颈。

当您使用客户端部署模式时尤其如此。这意味着第一个执行者(驱动程序)是您提交的本地 Spark 实例。所以它会先从集群中下载所有的数据到driver中,分区之后再上传回其他YARN节点。

我可以继续讨论这个,但我想说的主要是:如果你的算法非常简单,而不是分区、传输,然后在一个执行器上运行算法,这个过程甚至可能在一个执行器上运行得更快所有执行者并行。

关于java - 小文件的 Spark 重新分区数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41139575/

有关java - 小文件的 Spark 重新分区数据的更多相关文章

  1. ruby - 解析 RDFa、微数据等的最佳方式是什么,使用统一的模式/词汇(例如 schema.org)存储和显示信息 - 2

    我主要使用Ruby来执行此操作,但到目前为止我的攻击计划如下:使用gemsrdf、rdf-rdfa和rdf-microdata或mida来解析给定任何URI的数据。我认为最好映射到像schema.org这样的统一模式,例如使用这个yaml文件,它试图描述数据词汇表和opengraph到schema.org之间的转换:#SchemaXtoschema.orgconversion#data-vocabularyDV:name:namestreet-address:streetAddressregion:addressRegionlocality:addressLocalityphoto:i

  2. ruby - 如何在续集中重新加载表模式? - 2

    鉴于我有以下迁移:Sequel.migrationdoupdoalter_table:usersdoadd_column:is_admin,:default=>falseend#SequelrunsaDESCRIBEtablestatement,whenthemodelisloaded.#Atthispoint,itdoesnotknowthatusershaveais_adminflag.#Soitfails.@user=User.find(:email=>"admin@fancy-startup.example")@user.is_admin=true@user.save!ende

  3. ruby-on-rails - active_admin 目录中的常量警告重新声明 - 2

    我正在使用active_admin,我在Rails3应用程序的应用程序中有一个目录管理,其中包含模型和页面的声明。时不时地我也有一个类,当那个类有一个常量时,就像这样:classFooBAR="bar"end然后,我在每个必须在我的Rails应用程序中重新加载一些代码的请求中收到此警告:/Users/pupeno/helloworld/app/admin/billing.rb:12:warning:alreadyinitializedconstantBAR知道发生了什么以及如何避免这些警告吗? 最佳答案 在纯Ruby中:classA

  4. java - 等价于 Java 中的 Ruby Hash - 2

    我真的很习惯使用Ruby编写以下代码:my_hash={}my_hash['test']=1Java中对应的数据结构是什么? 最佳答案 HashMapmap=newHashMap();map.put("test",1);我假设? 关于java-等价于Java中的RubyHash,我们在StackOverflow上找到一个类似的问题: https://stackoverflow.com/questions/22737685/

  5. Ruby rpartition 与分区? - 2

    rpartition和partition有什么区别?我已经阅读了文档,但我认为它们是一样的。只是那些出现在后来的ruby​​版本中吗? 最佳答案 以下示例将有助于识别差异:"abccba".partition("b")#=>["a","b","ccba"]"abccba".rpartition("b")#=>["abcc","b","a"]所以区别在于rpartition搜索最右边的匹配项,而不是最左边的匹配项。 关于Rubyrpartition与分区?,我们在StackOverflow

  6. ruby - Ruby 有 `Pair` 数据类型吗? - 2

    有时我需要处理键/值数据。我不喜欢使用数组,因为它们在大小上没有限制(很容易不小心添加超过2个项目,而且您最终需要稍后验证大小)。此外,0和1的索引变成了魔数(MagicNumber),并且在传达含义方面做得很差(“当我说0时,我的意思是head...”)。散列也不合适,因为可能会不小心添加额外的条目。我写了下面的类来解决这个问题:classPairattr_accessor:head,:taildefinitialize(h,t)@head,@tail=h,tendend它工作得很好并且解决了问题,但我很想知道:Ruby标准库是否已经带有这样一个类? 最佳

  7. ruby - 在 Ruby 中重新分配常量时抛出异常? - 2

    我早就知道Ruby中的“常量”(即大写的变量名)不是真正常量。与其他编程语言一样,对对象的引用是唯一存储在变量/常量中的东西。(侧边栏:Ruby确实具有“卡住”引用对象不被修改的功能,据我所知,许多其他语言都没有提供这种功能。)所以这是我的问题:当您将一个值重新分配给常量时,您会收到如下警告:>>FOO='bar'=>"bar">>FOO='baz'(irb):2:warning:alreadyinitializedconstantFOO=>"baz"有没有办法强制Ruby抛出异常而不是打印警告?很难弄清楚为什么有时会发生重新分配。 最佳答案

  8. java - 从 JRuby 调用 Java 类的问题 - 2

    我正在尝试使用boilerpipe来自JRuby。我看过guide从JRuby调用Java,并成功地将它与另一个Java包一起使用,但无法弄清楚为什么同样的东西不能用于boilerpipe。我正在尝试基本上从JRuby中执行与此Java等效的操作:URLurl=newURL("http://www.example.com/some-location/index.html");Stringtext=ArticleExtractor.INSTANCE.getText(url);在JRuby中试过这个:require'java'url=java.net.URL.new("http://www

  9. java - 我的模型类或其他类中应该有逻辑吗 - 2

    我只想对我一直在思考的这个问题有其他意见,例如我有classuser_controller和classuserclassUserattr_accessor:name,:usernameendclassUserController//dosomethingaboutanythingaboutusersend问题是我的User类中是否应该有逻辑user=User.newuser.do_something(user1)oritshouldbeuser_controller=UserController.newuser_controller.do_something(user1,user2)我

  10. java - 什么相当于 ruby​​ 的 rack 或 python 的 Java wsgi? - 2

    什么是ruby​​的rack或python的Java的wsgi?还有一个路由库。 最佳答案 来自Python标准PEP333:Bycontrast,althoughJavahasjustasmanywebapplicationframeworksavailable,Java's"servlet"APImakesitpossibleforapplicationswrittenwithanyJavawebapplicationframeworktoruninanywebserverthatsupportstheservletAPI.ht

随机推荐