也许潜在的问题是 node-kafka我正在使用的模块已经实现了一些东西,但也许还没有,所以我们开始吧......
使用 node-kafa 库,我在订阅 consumer.on('message') 事件时遇到了问题。该库使用标准的 events 模块,所以我认为这个问题可能很笼统。
我的实际代码结构又大又复杂,所以这里有一个基本布局的伪示例来突出我的问题。 (注意:此代码片段未经测试,因此我这里可能有错误,但无论如何这里的语法都没有问题)
var messageCount = 0;
var queryCount = 0;
// Getting messages via some event Emitter
consumer.on('message', function(message) {
message++;
console.log('Message #' + message);
// Making a database call for each message
mysql.query('SELECT "test" AS testQuery', function(err, rows, fields) {
queryCount++;
console.log('Query #' + queryCount);
});
})
我在这里看到的是,当我启动我的服务器时,有 100,000 条左右积压的消息是 kafka 想要给我的,它通过事件发射器这样做。所以我开始收到消息。获取并记录所有消息大约需要 15 秒。
假设 mysql 查询相当快,这是我希望看到的输出:
Message #1
Message #2
Message #3
...
Message #500
Query #1
Message #501
Message #502
Query #2
... and so on in some intermingled fashion
我希望这是因为我的第一个 mysql 结果应该很快就准备好了,我希望结果轮流在事件循环中处理响应。我实际得到的是:
Message #1
Message #2
...
Message #100000
Query #1
Query #2
...
Query #100000
在能够处理 mysql 响应之前,我收到了每条消息。所以我的问题是,为什么?为什么在所有消息事件完成之前我无法获得单个数据库结果?
另一个注意事项:我在 node-kafka 中的 .emit('message') 和我的代码中的 mysql.query() 处设置了一个断点,我是以回合制击中他们。因此,在进入我的事件订阅者之前,所有 100,000 次发射似乎都没有预先堆叠起来。因此,我对这个问题提出了第一个假设。
想法和知识将不胜感激:)
最佳答案
node-kafka 驱动程序使用相当大的缓冲区大小 (1M),这意味着它将从 Kafka 获取适合缓冲区的尽可能多的消息。如果服务器积压,并且根据消息大小,这可能意味着一个请求会收到(数万)条消息。
因为 EventEmitter 是同步的(它不使用 Node 事件循环),这意味着驱动程序将向其监听器发出(数万个)数千个事件,并且由于它是同步的,它不会屈服于 Node事件循环,直到所有消息都已传递。
我不认为您可以解决大量的事件传递问题,但我不认为具体的事件传递有问题。更可能的问题是为每个事件启动一个异步操作(在本例中为 MySQL 查询),这可能会使数据库充满查询。
一种可能的解决方法是使用队列而不是直接从事件处理程序执行查询。例如,使用 async.queue您可以限制并发(异步)任务的数量。队列的“ worker ”部分将执行 MySQL 查询,而在事件处理程序中,您只需将消息推送到队列中。
关于javascript - Node.js EventEmitter 事件不共享事件循环,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30035632/
我脑子里浮现出一些关于一种新编程语言的想法,所以我想我会尝试实现它。一位friend建议我尝试使用Treetop(Rubygem)来创建一个解析器。Treetop的文档很少,我以前从未做过这种事情。我的解析器表现得好像有一个无限循环,但没有堆栈跟踪;事实证明很难追踪到。有人可以指出入门级解析/AST指南的方向吗?我真的需要一些列出规则、常见用法等的东西来使用像Treetop这样的工具。我的语法分析器在GitHub上,以防有人希望帮助我改进它。class{initialize=lambda(name){receiver.name=name}greet=lambda{IO.puts("He
我有多个ActiveRecord子类Item的实例数组,我需要根据最早的事件循环打印。在这种情况下,我需要打印付款和维护日期,如下所示:ItemAmaintenancerequiredin5daysItemBpaymentrequiredin6daysItemApaymentrequiredin7daysItemBmaintenancerequiredin8days我目前有两个查询,用于查找maintenance和payment项目(非排他性查询),并输出如下内容:paymentrequiredin...maintenancerequiredin...有什么方法可以改善上述(丑陋的)代
我正在编写一个gem,我必须在其中fork两个启动两个webrick服务器的进程。我想通过基类的类方法启动这个服务器,因为应该只有这两个服务器在运行,而不是多个。在运行时,我想调用这两个服务器上的一些方法来更改变量。我的问题是,我无法通过基类的类方法访问fork的实例变量。此外,我不能在我的基类中使用线程,因为在幕后我正在使用另一个不是线程安全的库。所以我必须将每个服务器派生到它自己的进程。我用类变量试过了,比如@@server。但是当我试图通过基类访问这个变量时,它是nil。我读到在Ruby中不可能在分支之间共享类变量,对吗?那么,还有其他解决办法吗?我考虑过使用单例,但我不确定这是
我收到这个错误:RuntimeError(自动加载常量Apps时检测到循环依赖当我使用多线程时。下面是我的代码。为什么会这样?我尝试多线程的原因是因为我正在编写一个HTML抓取应用程序。对Nokogiri::HTML(open())的调用是一个同步阻塞调用,需要1秒才能返回,我有100,000多个页面要访问,所以我试图运行多个线程来解决这个问题。有更好的方法吗?classToolsController0)app.website=array.join(',')putsapp.websiteelseapp.website="NONE"endapp.saveapps=Apps.order("
是否有简单的方法来更改默认ISO格式(yyyy-mm-dd)的ActiveAdmin日期过滤器显示格式? 最佳答案 您可以像这样为日期选择器提供额外的选项,而不是覆盖js:=f.input:my_date,as::datepicker,datepicker_options:{dateFormat:"mm/dd/yy"} 关于ruby-on-rails-事件管理员日期过滤器日期格式自定义,我们在StackOverflow上找到一个类似的问题: https://s
我正在尝试将以下SQL查询转换为ActiveRecord,它正在融化我的大脑。deletefromtablewhereid有什么想法吗?我想做的是限制表中的行数。所以,我想删除少于最近10个条目的所有内容。编辑:通过结合以下几个答案找到了解决方案。Temperature.where('id这给我留下了最新的10个条目。 最佳答案 从您的SQL来看,您似乎想要从表中删除前10条记录。我相信到目前为止的大多数答案都会如此。这里有两个额外的选择:基于MurifoX的版本:Table.where(:id=>Table.order(:id).
我是Ruby的新手,有些闭包逻辑让我感到困惑。考虑这段代码:array=[]foriin(1..5)array[5,5,5,5,5]这对我来说很有意义,因为i被绑定(bind)在循环之外,所以每次循环都会捕获相同的变量。使用每个block可以解决这个问题对我来说也很有意义:array=[](1..5).each{|i|array[1,2,3,4,5]...因为现在每次通过时都单独声明i。但现在我迷路了:为什么我不能通过引入一个中间变量来修复它?array=[]foriin1..5j=iarray[5,5,5,5,5]因为j每次循环都是新的,我认为每次循环都会捕获不同的变量。例如,这绝对
我遇到了一个非常奇怪的问题,我很难解决。在我看来,我有一个与data-remote="true"和data-method="delete"的链接。当我单击该链接时,我可以看到对我的Rails服务器的DELETE请求。返回的JS代码会更改此链接的属性,其中包括href和data-method。再次单击此链接后,我的服务器收到了对新href的请求,但使用的是旧的data-method,即使我已将其从DELETE到POST(它仍然发送一个DELETE请求)。但是,如果我刷新页面,HTML与"new"HTML相同(随返回的JS发生变化),但它实际上发送了正确的请求类型。这就是这个问题令我困惑的
这是我在ActiveAdmin中的自定义页面ActiveAdmin.register_page"Settings"doaction_itemdolink_to('Importprojects','settings/importprojects')endcontentdopara"Text"endcontrollerdodefimportprojectssystem"rakedataspider:import_projects_ninja"para"OK"endendend我想做的是,当我单击“导入项目”按钮时,我想在Controller中执行rake任务。但是我无法访问该方法。可能是什
在许多ruby类之间共享记录器实例的最佳(正确)方法是什么?现在我只是将记录器创建为全局$logger=Logger.new变量,但我觉得有更好的方法可以在不使用全局变量的情况下执行此操作。如果我有以下内容:moduleFooclassAclassBclassC...classZend在所有类之间共享记录器实例的最佳方式是什么?我是以某种方式在Foo模块中声明/创建记录器还是只是使用全局$logger没问题? 最佳答案 在模块中添加常量:moduleFooLogger=Logger.newclassAclassBclassC..