我正在使用 Spark 应用程序和 Mongos 控制台运行相同的聚合管道。在控制台上,数据在眨眼间获取,只需第二次使用“它”即可检索所有预期数据。 然而,根据 Spark WebUI,Spark 应用程序需要将近两分钟的时间。
如您所见,正在启动 242 个任务来获取结果。我不确定为什么在 MongoDB 聚合仅返回 40 个文档时启动了如此大量的任务。看起来开销很大。
我在 Mongos 控制台上运行的查询:
db.data.aggregate([
{
$match:{
signals:{
$elemMatch:{
signal:"SomeSignal",
value:{
$gt:0,
$lte:100
}
}
}
}
},
{
$group:{
_id:"$root_document",
firstTimestamp:{
$min:"$ts"
},
lastTimestamp:{
$max:"$ts"
},
count:{
$sum:1
}
}
}
])
Spark 应用程序代码
JavaMongoRDD<Document> rdd = MongoSpark.load(sc);
JavaMongoRDD<Document> aggregatedRdd = rdd.withPipeline(Arrays.asList(
Document.parse(
"{ $match: { signals: { $elemMatch: { signal: \"SomeSignal\", value: { $gt: 0, $lte: 100 } } } } }"),
Document.parse(
"{ $group : { _id : \"$root_document\", firstTimestamp: { $min: \"$ts\"}, lastTimestamp: { $max: \"$ts\"} , count: { $sum: 1 } } }")));
JavaRDD<String> outputRdd = aggregatedRdd.map(new Function<Document, String>() {
@Override
public String call(Document arg0) throws Exception {
String output = String.format("%s;%s;%s;%s", arg0.get("_id").toString(),
arg0.get("firstTimestamp").toString(), arg0.get("lastTimestamp").toString(),
arg0.get("count").toString());
return output;
}
});
outputRdd.saveAsTextFile("/user/spark/output");
之后,我使用 hdfs dfs -getmerge/user/spark/output/output.csv 并比较结果。
为什么聚合这么慢?调用 withPipeline 不是为了减少需要传输到 Spark 的数据量吗?看起来它并没有像 Mongos 控制台那样进行聚合。在 Mongos 控制台上,它运行得非常快。我正在使用 Spark 1.6.1 和 mongo-spark-connector_2.10 版本 1.1.0。
编辑:我想知道的另一件事是启动了两个执行程序(因为我使用的是默认执行设置 atm),但只有一个执行程序完成所有工作。为什么第二个执行者不做任何工作?
编辑 2:当使用不同的聚合管道并调用 .count() 而不是 saveAsTextFile(..) 时,还会创建 242 个任务。这次将返回 65,000 份文件。
最佳答案
大量任务是由默认的 Mongo Spark 分区程序策略引起的。它在计算分区时忽略了聚合管道,主要原因有两个:
但是,正如您所发现的那样,它们会生成空分区,这在您的情况下成本很高。
修复的选择可能是:
改变分区策略
选择一个替代的分区器来减少分区的数量。例如,PaginateByCount 会将数据库拆分为一定数量的分区。
创建您自己的分区器 - 只需实现该特征,您就可以应用聚合管道并对结果进行分区。查看HalfwayPartitioner和 custom partitioner test举个例子。
使用 $out 将结果预先聚合到一个集合中并从那里读取。
coalesce(N) 将分区合并在一起并减少分区的数量。spark.mongodb.input.partitionerOptions.partitionSizeMB 配置以生成更少的分区。自定义分区器应该会产生最佳解决方案,但有一些方法可以更好地利用可用的默认分区器。
如果您认为应该有一个使用聚合管道计算分区的默认分区程序,那么请向 MongoDB 添加一个票证 Spark Jira project .
关于MongoDB Spark Connector - 聚合速度慢,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40419921/
不知何故,我似乎无法获得包含我的聚合的响应...使用curl它按预期工作:HBZUMB01$curl-XPOST"http://localhost:9200/contents/_search"-d'{"size":0,"aggs":{"sport_count":{"value_count":{"field":"dwid"}}}}'我收到回复:{"took":4,"timed_out":false,"_shards":{"total":5,"successful":5,"failed":0},"hits":{"total":90,"max_score":0.0,"hits":[]},"a
什么是Linq聚合方法的ruby等价物。它的工作原理是这样的varfactorial=new[]{1,2,3,4,5}.Aggregate((acc,i)=>acc*i);每次将数组序列中的值传递给lambda时,变量acc都会累积。 最佳答案 这在数学以及几乎所有编程语言中通常称为折叠。它是更普遍的变形概念的一个实例。Ruby从Smalltalk中继承了这个特性的名称,它被称为inject:into:(像aCollectioninject:aStartValueinto:aBlock一样使用。)所以,在Ruby中,它称为inj
按照目前的情况,这个问题不适合我们的问答形式。我们希望答案得到事实、引用或专业知识的支持,但这个问题可能会引发辩论、争论、投票或扩展讨论。如果您觉得这个问题可以改进并可能重新打开,visitthehelpcenter指导。关闭10年前。在我在网上找到的每个基准测试中,Ruby似乎都很慢,比Java慢得多。Ruby的人只是说这无关紧要。您能举个例子说明RubyonRails(以及Ruby本身)的速度真的无关紧要吗?
我有这段代码:date_counter=Time.mktime(2011,01,01,00,00,00,"+05:00")@weeks=Array.new(date_counter..Time.now).step(1.week)do|week|logger.debug"WEEK:"+week.inspect@weeks从技术上讲,代码有效,输出:SatJan0100:00:00-05002011SatJan0800:00:00-05002011SatJan1500:00:00-05002011etc.但是执行时间完全是垃圾!每周计算大约需要四秒钟。我在这段代码中是否遗漏了一些奇怪的低效
我想知道NokogiriXPath或CSS解析是否可以更快地处理HTML文件。速度有何不同? 最佳答案 Nokogiri没有XPath或CSS解析。它将XML/HTML解析为单个DOM,然后您可以使用CSS或XPath语法进行查询。CSS选择器在要求libxml2执行查询之前在内部转换为XPath。因此(对于完全相同的选择器)XPath版本会快一点点,因为CSS不需要先转换成XPath。但是,您的问题没有通用答案;这取决于您选择的是什么,以及您的XPath是什么样的。很有可能,您不会编写与Nokogiri创建的相同的XPath。例如
我在使用Arel聚契约(Contract)一查询中的2列时遇到了问题。当我运行它时,在railsdev-server崩溃之前,整个服务器会卡住一分钟。我怀疑是无限循环:)。也许我误解了Arel的整个概念,如果有人能看一下,我将不胜感激。这个查询的预期结果是这样的:[{:user_id=>1,:sum_account_charges=>300,:sum_paid_debts=>1000},...]a_account_charges=Table(:account_charges)a_paid_debts=Table(:paid_debts)a_participants=Table(:exp
过程和lambdadiffer关于方法范围和return关键字的效果。我对它们之间的性能差异很感兴趣。我写了一个测试,如下所示:deftime(&block)start=Time.nowblock.callp"thattook#{Time.now-start}"enddeftest(proc)time{(0..10000000).each{|n|proc.call(n)}}enddeftest_block(&block)time{(0..10000000).each{|n|block.call(n)}}enddefmethod_testtime{(1..10000000).each{|
我想同时执行多个聚合函数,例如获取按状态分组的最大和最小id:Model.maximum(:id).minimum(:id).group(:status)这行不通(至少对于Rails3.1.1是这样)——你在最小调用时收到一个错误,说它没有在Fixnum上定义。NoMethodError:undefinedmethod`minimum'for22377:Fixnum我可以为它做原始sql-但只是想知道是否有更高级别/Rails选项...谢谢,克里斯 最佳答案 我有一个类似的问题,我在Rails4中使用groupwithpluck解决
我正在使用RubyonRails3.2.2、FactoryGirl3.1.0、FactoryGirlRails3.1.0、Rspec2.9.0和RspecRails2.9.0。为了测试我的应用程序,我必须在数据库中创建大量记录(大约5000条),但是该操作非常慢(创建记录需要10多分钟)。我这样进行:before(:each)do5000.timesdoFactoryGirl.create(:article,)endend如何改进我的规范代码以加快速度?注意:可能速度较慢是由在每个文章创建过程前后运行的(5)个文章回调引起的,但我可以跳过这些(因为我唯一需要测试的是文章和不是关联的模型
从来源(database_cleaner,active_record)来看,它们应该同样快。但是有人声称使用database_cleaner的事务策略会降低Controller和模型规范的速度(forexample)。我手头没有用于基准测试的大型测试套件。任何人有任何见解或比较两者? 最佳答案 我花了一点时间在广泛使用ActiveRecord固定装置的中型代码库上比较两者。当我将其切换为使用DatabaseCleaner而不是use_transactional_fixtures时,模型规范开始花费大约两倍的时间。在进行了与您相同的比