我正在玩 Spark。它是来自网站的默认预构建发行版 (0.7.0),具有默认配置、集群模式、一名工作人员(我的本地主机)。我阅读了有关安装的文档,一切似乎都很好。
我有一个 CSV 文件(各种大小,1000 - 100 万行)。如果我用小输入文件(例如 1000 行)运行我的应用程序,一切都很好,程序在几秒钟内完成并产生预期的输出。 但是当我提供一个更大的文件(100.000 行,或 100 万)时,执行失败。我试图挖掘日志,但没有太大帮助(它重复整个过程大约 9-10 次,然后失败退出。此外,还有一些与从某些空源获取失败相关的错误)。
第一个 JavaRDD 返回的结果 Iterable 对我来说是可疑的。如果我返回一个硬编码的单例列表(如 res.add("something"); return res;),一切都很好,即使有一百万行。但是,如果我添加我想要的所有键(28 个长度为 6-20 个字符的字符串),则该过程仅会因大输入而失败。 问题是,我需要所有这些键,这是实际的业务逻辑。
我使用的是 Linux amd64、四核、8GB 内存。最新的 Oracle Java7 JDK。 Spark 配置:
SPARK_WORKER_MEMORY=4g
SPARK_MEM=3g
SPARK_CLASSPATH=$SPARK_CLASSPATH:/my/super/application.jar
我必须提一下,当我启动程序时,它说:
13/05/30 11:41:52 WARN spark.Utils: Your hostname, *** resolves to a loopback address: 127.0.1.1; using 192.168.1.157 instead (on interface eth1)
13/05/30 11:41:52 WARN spark.Utils: Set SPARK_LOCAL_IP if you need to bind to another address
这是我的程序。它基于 JavaWordCount 示例,经过最少修改。
public final class JavaWordCount
{
public static void main(final String[] args) throws Exception
{
final JavaSparkContext ctx = new JavaSparkContext(args[0], "JavaWordCount",
System.getenv("SPARK_HOME"), new String[] {"....jar" });
final JavaRDD<String> words = ctx.textFile(args[1], 1).flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterable<String> call(final String s)
{
// parsing "s" as the line, computation, building res (it's a List<String>)
return res;
}
});
final JavaPairRDD<String, Integer> ones = words.map(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(final String s)
{
return new Tuple2<String, Integer>(s, 1);
}
});
final JavaPairRDD<String, Integer> counts = ones.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(final Integer i1, final Integer i2)
{
return i1 + i2;
}
});
counts.collect();
for (Tuple2<?, ?> tuple : counts.collect()) {
System.out.println(tuple._1 + ": " + tuple._2);
}
}
}
最佳答案
我设法通过将属性 spark.mesos.coarse 设置为 true 来修复它。更多信息 here .
更新:我已经使用 Spark 玩了几个小时。这些设置对我有一点帮助,但似乎几乎不可能在一台机器上处理约 1000 万行文本。
System.setProperty("spark.serializer", "spark.KryoSerializer"); // kryo is much faster
System.setProperty("spark.kryoserializer.buffer.mb", "256"); // I serialize bigger objects
System.setProperty("spark.mesos.coarse", "true"); // link provided
System.setProperty("spark.akka.frameSize", "500"); // workers should be able to send bigger messages
System.setProperty("spark.akka.askTimeout", "30"); // high CPU/IO load
注意:增加帧大小似乎特别有助于防止:org.apache.spark.SparkException: Error communication with MapOutputTracker
关于java - Spark 集群在更大的输入上失败,适用于小的,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/16832429/
大约一年前,我决定确保每个包含非唯一文本的Flash通知都将从模块中的方法中获取文本。我这样做的最初原因是为了避免一遍又一遍地输入相同的字符串。如果我想更改措辞,我可以在一个地方轻松完成,而且一遍又一遍地重复同一件事而出现拼写错误的可能性也会降低。我最终得到的是这样的:moduleMessagesdefformat_error_messages(errors)errors.map{|attribute,message|"Error:#{attribute.to_s.titleize}#{message}."}enddeferror_message_could_not_find(obje
我真的很习惯使用Ruby编写以下代码:my_hash={}my_hash['test']=1Java中对应的数据结构是什么? 最佳答案 HashMapmap=newHashMap();map.put("test",1);我假设? 关于java-等价于Java中的RubyHash,我们在StackOverflow上找到一个类似的问题: https://stackoverflow.com/questions/22737685/
我已经在Sinatra上创建了应用程序,它代表了一个简单的API。我想在生产和开发上进行部署。我想在部署时选择,是开发还是生产,一些方法的逻辑应该改变,这取决于部署类型。是否有任何想法,如何完成以及解决此问题的一些示例。例子:我有代码get'/api/test'doreturn"Itisdev"end但是在部署到生产环境之后我想在运行/api/test之后看到ItisPROD如何实现? 最佳答案 根据SinatraDocumentation:EnvironmentscanbesetthroughtheRACK_ENVenvironm
我已经构建了一些serverspec代码来在多个主机上运行一组测试。问题是当任何测试失败时,测试会在当前主机停止。即使测试失败,我也希望它继续在所有主机上运行。Rakefile:namespace:specdotask:all=>hosts.map{|h|'spec:'+h.split('.')[0]}hosts.eachdo|host|begindesc"Runserverspecto#{host}"RSpec::Core::RakeTask.new(host)do|t|ENV['TARGET_HOST']=hostt.pattern="spec/cfengine3/*_spec.r
我正在尝试使用boilerpipe来自JRuby。我看过guide从JRuby调用Java,并成功地将它与另一个Java包一起使用,但无法弄清楚为什么同样的东西不能用于boilerpipe。我正在尝试基本上从JRuby中执行与此Java等效的操作:URLurl=newURL("http://www.example.com/some-location/index.html");Stringtext=ArticleExtractor.INSTANCE.getText(url);在JRuby中试过这个:require'java'url=java.net.URL.new("http://www
当我使用has_one时,它工作得很好,但在has_many上却不行。在这里您可以看到object_id不同,因为它运行了另一个SQL来再次获取它。ruby-1.9.2-p290:001>e=Employee.create(name:'rafael',active:false)ruby-1.9.2-p290:002>b=Badge.create(number:1,employee:e)ruby-1.9.2-p290:003>a=Address.create(street:"123MarketSt",city:"SanDiego",employee:e)ruby-1.9.2-p290
我只想对我一直在思考的这个问题有其他意见,例如我有classuser_controller和classuserclassUserattr_accessor:name,:usernameendclassUserController//dosomethingaboutanythingaboutusersend问题是我的User类中是否应该有逻辑user=User.newuser.do_something(user1)oritshouldbeuser_controller=UserController.newuser_controller.do_something(user1,user2)我
什么是ruby的rack或python的Java的wsgi?还有一个路由库。 最佳答案 来自Python标准PEP333:Bycontrast,althoughJavahasjustasmanywebapplicationframeworksavailable,Java's"servlet"APImakesitpossibleforapplicationswrittenwithanyJavawebapplicationframeworktoruninanywebserverthatsupportstheservletAPI.ht
这篇文章是继上一篇文章“Observability:从零开始创建Java微服务并监控它(一)”的续篇。在上一篇文章中,我们讲述了如何创建一个Javaweb应用,并使用Filebeat来收集应用所生成的日志。在今天的文章中,我来详述如何收集应用的指标,使用APM来监控应用并监督web服务的在线情况。源码可以在地址 https://github.com/liu-xiao-guo/java_observability 进行下载。摄入指标指标被视为可以随时更改的时间点值。当前请求的数量可以改变任何毫秒。你可能有1000个请求的峰值,然后一切都回到一个请求。这也意味着这些指标可能不准确,你还想提取最小/
HashMap中为什么引入红黑树,而不是AVL树呢1.概述开始学习这个知识点之前我们需要知道,在JDK1.8以及之前,针对HashMap有什么不同。JDK1.7的时候,HashMap的底层实现是数组+链表JDK1.8的时候,HashMap的底层实现是数组+链表+红黑树我们要思考一个问题,为什么要从链表转为红黑树呢。首先先让我们了解下链表有什么不好???2.链表上述的截图其实就是链表的结构,我们来看下链表的增删改查的时间复杂度增:因为链表不是线性结构,所以每次添加的时候,只需要移动一个节点,所以可以理解为复杂度是N(1)删:算法时间复杂度跟增保持一致查:既然是非线性结构,所以查询某一个节点的时候