我是 Storm 的新手,我创建了一个程序来读取一定时间内递增的数字。我在 Spout 和“nextTuple()”方法中使用了一个计数器,该计数器正在发出并递增
_collector.emit(new Values(new Integer(currentNumber++)));
/* how this method is being continuously called...*/
并且在元组类的execute()方法中有
public void execute(Tuple input) {
int number = input.getInteger(0);
logger.info("This number is (" + number + ")");
_outputCollector.ack(input);
}
/*this part I am clear as Bolt would receive the input from Spout*/
在我的主类执行中,我有以下代码
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("NumberSpout", new NumberSpout());
builder.setBolt("NumberBolt", new PrimeNumberBolt())
.shuffleGrouping("NumberSpout");
Config config = new Config();
LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology("NumberTest", config, builder.createTopology());
Utils.sleep(10000);
localCluster.killTopology("NumberTest");
localCluster.shutdown();
程序完美运行良好。目前我在这里查看的是 Storm 框架如何在内部连续调用 nextTuple() 方法。我确信我的理解在这里遗漏了一些东西,并且由于这个差距,我无法连接到这个框架的内部逻辑。
你们中的任何人都可以帮助我清楚地理解这一部分,这对我来说将是一个很大的帮助,因为我将不得不在我的项目中实现这个概念。如果我在这里在概念上很清楚,那么我可以取得重大进展。如果有人可以在这里快速帮助我,我将不胜感激。等待回复...
最佳答案
how does the Storm framework internally calls the nextTuple() method continuously.
我相信这实际上涉及到关于 Storm 拓扑的整个生命周期的非常详细的讨论,以及不同实体(如 worker 、执行者、任务等)的清晰概念。拓扑的实际处理由 StormSubmitter 类及其 submitTopology 方法。
它做的第一件事是使用 Nimbus's Thrift interface 开始上传 jar。然后调用 submitTopology 最终将拓扑提交给 Nimbus。
Nimbus 然后开始规范化拓扑(来自文档:规范化的主要目的是确保每个任务都具有相同的序列化注册,这对于序列化正常工作至关重要)之后是序列化,zookeeper握手,supervisor和worker进程启动等等。它的讨论范围太广,但如果你真的想挖掘更多,你可以浏览 life cycle of storm topology它很好地解释了在整个过程中执行的分步操作。
(文档中的快速注释)
First a couple of important notes about topologies:
The actual topology that runs is different than the topology the user specifies. The actual topology has implicit streams and an implicit "acker" bolt added to manage the acking framework (used to guarantee data processing).
The implicit topology is created via the system-topology! function. system-topology! is used in two places:
- - when Nimbus is creating tasks for the topology code
- - in the worker so it knows where it needs to route messages to code
现在这里有一些我可以尝试分享的线索......
Spout 或 Bolt 实际上是进行实际处理(逻辑)的组件。在 Storm 术语中,它们在整个结构中执行尽可能多的任务。
来自文档页面:每个任务对应一个执行线程
现在,在许多其他任务中,工作进程(阅读 here)在 storm 中的一个典型职责是监视拓扑是否处于 Activity 状态并将该特定状态存储在名为的变量中storm-active-atom。任务使用此变量来确定是否调用 nextTuple 方法。因此,只要您的拓扑处于 Activity 状态(您还没有放置 spout 代码,但假设)直到您计时器处于 Activity 状态(正如您所说的某个时间),它将继续调用 nextTuple 方法。您可以进一步挖掘以了解 Storm 的 Acking framework implementation了解一旦元组被成功处理它是如何理解和确认的Guarantee-message-processing
I am sure that my understanding is missing something here and due to this gap I am unable to connect to the internal logic of this framework
说到这里,我觉得更重要的是弄清楚如何使用storm,而不是早期如何理解storm。例如,与其学习 storm 的内部机制,更重要的是要认识到,如果我们将 spout 设置为逐行读取文件,那么它会使用 _collector.emit 方法继续发射每一行,直到到达 EOF .连接到它的 bolt 在其 execute(tuple input) 方法中接收相同的
希望这有助于您将来与我们分享更多信息
关于java - Storm如何处理Bolt中的nextTuple,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/20408895/
总的来说,我对ruby还比较陌生,我正在为我正在创建的对象编写一些rspec测试用例。许多测试用例都非常基础,我只是想确保正确填充和返回值。我想知道是否有办法使用循环结构来执行此操作。不必为我要测试的每个方法都设置一个assertEquals。例如:describeitem,"TestingtheItem"doit"willhaveanullvaluetostart"doitem=Item.new#HereIcoulddotheitem.name.shouldbe_nil#thenIcoulddoitem.category.shouldbe_nilendend但我想要一些方法来使用
我试图在一个项目中使用rake,如果我把所有东西都放到Rakefile中,它会很大并且很难读取/找到东西,所以我试着将每个命名空间放在lib/rake中它自己的文件中,我添加了这个到我的rake文件的顶部:Dir['#{File.dirname(__FILE__)}/lib/rake/*.rake'].map{|f|requiref}它加载文件没问题,但没有任务。我现在只有一个.rake文件作为测试,名为“servers.rake”,它看起来像这样:namespace:serverdotask:testdoputs"test"endend所以当我运行rakeserver:testid时
作为我的Rails应用程序的一部分,我编写了一个小导入程序,它从我们的LDAP系统中吸取数据并将其塞入一个用户表中。不幸的是,与LDAP相关的代码在遍历我们的32K用户时泄漏了大量内存,我一直无法弄清楚如何解决这个问题。这个问题似乎在某种程度上与LDAP库有关,因为当我删除对LDAP内容的调用时,内存使用情况会很好地稳定下来。此外,不断增加的对象是Net::BER::BerIdentifiedString和Net::BER::BerIdentifiedArray,它们都是LDAP库的一部分。当我运行导入时,内存使用量最终达到超过1GB的峰值。如果问题存在,我需要找到一些方法来更正我的代
Rails2.3可以选择随时使用RouteSet#add_configuration_file添加更多路由。是否可以在Rails3项目中做同样的事情? 最佳答案 在config/application.rb中:config.paths.config.routes在Rails3.2(也可能是Rails3.1)中,使用:config.paths["config/routes"] 关于ruby-on-rails-Rails3中的多个路由文件,我们在StackOverflow上找到一个类似的问题
我需要从一个View访问多个模型。以前,我的links_controller仅用于提供以不同方式排序的链接资源。现在我想包括一个部分(我假设)显示按分数排序的顶级用户(@users=User.all.sort_by(&:score))我知道我可以将此代码插入每个链接操作并从View访问它,但这似乎不是“ruby方式”,我将需要在不久的将来访问更多模型。这可能会变得很脏,是否有针对这种情况的任何技术?注意事项:我认为我的应用程序正朝着单一格式和动态页面内容的方向发展,本质上是一个典型的网络应用程序。我知道before_filter但考虑到我希望应用程序进入的方向,这似乎很麻烦。最终从任何
我在我的项目中添加了一个系统来重置用户密码并通过电子邮件将密码发送给他,以防他忘记密码。昨天它运行良好(当我实现它时)。当我今天尝试启动服务器时,出现以下错误。=>BootingWEBrick=>Rails3.2.1applicationstartingindevelopmentonhttp://0.0.0.0:3000=>Callwith-dtodetach=>Ctrl-CtoshutdownserverExiting/Users/vinayshenoy/.rvm/gems/ruby-1.9.3-p0/gems/actionmailer-3.2.1/lib/action_mailer
刚入门rails,开始慢慢理解。有人可以解释或给我一些关于在application_controller中编码的好处或时间和原因的想法吗?有哪些用例。您如何为Rails应用程序使用应用程序Controller?我不想在那里放太多代码,因为据我了解,每个请求都会调用此Controller。这是真的? 最佳答案 ApplicationController实际上是您应用程序中的每个其他Controller都将从中继承的类(尽管这不是强制性的)。我同意不要用太多代码弄乱它并保持干净整洁的态度,尽管在某些情况下ApplicationContr
我想向我的Controller传递一个参数,它是一个简单的复选框,但我不知道如何在模型的form_for中引入它,这是我的观点:{:id=>'go_finance'}do|f|%>Transferirde:para:Entrada:"input",:placeholder=>"Quantofoiganho?"%>Saída:"output",:placeholder=>"Quantofoigasto?"%>Nota:我想做一个额外的复选框,但我该怎么做,模型中没有一个对象,而是一个要检查的对象,以便在Controller中创建一个ifelse,如果没有检查,请帮助我,非常感谢,谢谢
我注意到像bundler这样的项目在每个specfile中执行requirespec_helper我还注意到rspec使用选项--require,它允许您在引导rspec时要求一个文件。您还可以将其添加到.rspec文件中,因此只要您运行不带参数的rspec就会添加它。使用上述方法有什么缺点可以解释为什么像bundler这样的项目选择在每个规范文件中都需要spec_helper吗? 最佳答案 我不在Bundler上工作,所以我不能直接谈论他们的做法。并非所有项目都checkin.rspec文件。原因是这个文件,通常按照当前的惯例,只
我正在使用active_admin,我在Rails3应用程序的应用程序中有一个目录管理,其中包含模型和页面的声明。时不时地我也有一个类,当那个类有一个常量时,就像这样:classFooBAR="bar"end然后,我在每个必须在我的Rails应用程序中重新加载一些代码的请求中收到此警告:/Users/pupeno/helloworld/app/admin/billing.rb:12:warning:alreadyinitializedconstantBAR知道发生了什么以及如何避免这些警告吗? 最佳答案 在纯Ruby中:classA