草庐IT

javascript - 多次同时调用 `cursor.next()`导致驱动崩溃

coder 2023-10-31 原文

动机:

我有一个架构涉及许多像这样“消费”文档的工作人员:

worker.on('readyForAnotherDoc', () => worker.consume( await cursor.next() ));

这是一种伪代码 - 我正在检查真实代码中的 cursor.hasNext()。有数百个工作人员,因此 cursor.next() 可能会同时被突然爆发的 200 个请求击中。

我正在尝试解决 mongodb node.js 驱动程序中的一个错误/怪癖,如果我对 cursor.next() 的请求太多,就会导致错误彼此“重叠”巧合的是。

背景:

似乎 MongoDB Node.js 驱动程序没有正确处理 cursor.next 向其抛出大量请求的情况。尝试运行这段代码:

(async function() {

  // create a collection for testing:
  let db = await require('mongodb').MongoClient.connect('mongodb://localhost:27017/tester-db-478364');
  await db.collection("test").drop();
  for(let i = 0; i < 1000; i++) {
    await db.collection("test").insertOne({num:i, foo:'bar'});
  }

  let cursor = await db.collection("test").find({});

  async function go() {
    let doc = await cursor.next();
    console.log(doc.num);
  }

  // start 100 simulataneous requests to `cursor.next()`
  for(let i = 0; i < 1000; i++) {
    go();
  }

})();

这是它为我输出的内容:

0
1
2
3
4
5
6
7
8
9
/home/joe/Downloads/testtt/node_modules/mongodb-core/lib/connection/pool.js:410
    if(workItem.socketTimeout) {
               ^

TypeError: Cannot read property 'socketTimeout' of null
    at Connection.messageHandler (/home/me/Downloads/testtt/node_modules/mongodb-core/lib/connection/pool.js:410:16)
    at Socket.<anonymous> (/home/me/Downloads/testtt/node_modules/mongodb-core/lib/connection/connection.js:361:20)
    at emitOne (events.js:115:13)
    at Socket.emit (events.js:210:7)
    at addChunk (_stream_readable.js:252:12)
    at readableAddChunk (_stream_readable.js:239:11)
    at Socket.Readable.push (_stream_readable.js:197:10)
    at TCP.onread (net.js:589:20)

所以在当前批处理用完之前看起来一切正常。但这很奇怪,因为如果您在 .find({}) 之后添加 .batchSize(100),那么它不会修复它。但有趣的是,如果您添加 .batchSize(5),您会得到:

0
1
2
3
4
0
1
2
3
/home/joe/Downloads/testtt/node_modules/mongodb-core/lib/connection/pool.js:410
    if(workItem.socketTimeout) {
               ^

TypeError: Cannot read property 'socketTimeout' of null
etc...

不知道那里发生了什么......

尝试解决它:

但假设我们此时正在寻找变通办法。假设我们稍微更改了 go 函数:

let cursorBusy = false;
async function go() {
  if(cursorBusy) await waitForCursor();
  cursorBusy = true;
  let doc = await cursor.next();
  cursorBusy = false;
  console.log(doc.num);
}
function waitForCursor() {
  return new Promise(resolve => {
    let si = setInterval(() => {
      if(!cursorBusy) {
        resolve();
        clearInterval(si);
      }
    }, 50);
  });
}

这会导致一个新的错误,它似乎出现在各处的 console.log(doc.num) 中:

...
359
415
466
(node:16259) UnhandledPromiseRejectionWarning: Unhandled promise rejection (rejection id: 12): MongoError: clientcursor already in use? driver problem?
427
433
459
...

我认为这并不能避免错误,因为 setInterval 存在某种“竞争条件”。有趣的是,这是一条不同的错误消息。

问题:有什么方法可以检测光标当前是否“忙”?在修复此错误(如果它甚至是错误)之前,这里还有其他潜在的解决方法吗?

This question有一些相似(但绝对不相同)的行为,并且similar issues似乎出现在第三方 node.js 库中。

最佳答案

您的列表中有一些错误。所以真的只是稍微清理一下:

const MongoClient = require('mongodb').MongoClient;

(async function() {

  let db;

  try {
    db = await MongoClient.connect('mongodb://localhost/test');

    await db.collection('test').drop();

    await db.collection('test').insertMany(
      Array(1000).fill(1).map((e,num) => ({ num, foo: 'bar' }))
    );

    // This is not async. It returns immediately
    let cursor = db.collection('test').find();

    async function go() {
      let doc = await cursor.next();   // This awaits before continuing. Not concurrent.
      console.log(doc.num);
    }

    for ( let i = 0; i < 100; i++ ) {
      go();  // Note that these "await" internally
    }

  } catch(e) {
    console.error(e);
  } finally {
    db.close();
  }

})();

真正防弹它,那么您真的应该等待每个操作。因此,在返回时添加 Promise.resolve()await go() 作为良好的衡量标准,并通过减小批量大小来强制打破条件:

const MongoClient = require('mongodb').MongoClient;

(async function() {

  let db;

  try {
    db = await MongoClient.connect('mongodb://localhost/test');

    await db.collection('test').drop();

    await db.collection('test').insertMany(
      Array(1000).fill(1).map((e,num) => ({ num, foo: 'bar' }))
    );

    let cursor = db.collection('test').find().batchSize(1);

    async function go() {
      let doc = await cursor.next();
      console.log(doc.num);
      return Promise.resolve();
    }

    for ( let i = 0; i < 100; i++ ) {
      await go();
    }

    console.log('done');


  } catch(e) {
    console.error(e);
  } finally {
    db.close();
  }


})();

正确地按顺序打印出来。缩短了,但实际上达到了预期的 99:

0
1
2
3
4
5
6
7
8
9
10
(etc..)

解释主要在代码的注释中,您似乎遗漏了哪些是异步,哪些是不是

因此返回一个 Cursor来自.find() 不是 async 方法,并立即返回。这是因为它只是一个操作句柄,此时不执行任何操作。 MongoDB 驱动程序(所有驱动程序)不会联系服务器或在该端建立游标,直到发出“获取”数据的实际请求。

当您调用 .next() 时是在与服务器进行实际通信并返回“一批”结果时。 “批处理”实际上只影响后续调用是否实际返回服务器或不检索数据,因为“批处理”可能已经有“更多”结果,可以在另一个“批处理”请求之前“清空” "制成。无论如何,每次调用.next() 视为异步,无论是否有外部 I/O。

通常您会调用 .hasNext()包装每个迭代(也是 async )因为调用 .next()Cursor 上没有更多结果是错误的。它通常也是一种“循环控制”的方式,如下所示:

(async function() {

  let db;

  try {
    db = await MongoClient.connect('mongodb://localhost/test');

    await db.collection('test').drop();

    await db.collection('test').insertMany(
      Array(1000).fill(1).map((e,num) => ({ num, foo: 'bar' }))
    );

    let cursor = db.collection('test').find();

    async function go() {
      let doc = await cursor.next();
      console.log(doc.num);
    }

    //for ( let i = 0; i < 100; i++ ) {
    while( await cursor.hasNext() ) {  // Check the cursor still has results
      go();
    }

  } catch(e) {
    console.error(e);
  } finally {
    db.close();
  }

})();

然后变化到循环直到光标结束。

关于“并发”的注意事项还在于,它通常不是您在这里所期望的。如果您确实想并行发出多个请求,那么您仍然需要等待当前游标提取。如果您不这样做,那么您就是在要求服务器针对所有请求返回相同数据,而不是“迭代”游标时的顺序数据。

您似乎将其与一些实用函数(尤其是 Mongoose asyncEach())在实现并行“获取”时所做的事情混淆了。代码(来自内存)基本上是人为地插入一个setTimeout() 以等待“下一个滴答声”,这基本上意味着每个 .next() 都必须实际触发。

如前所述,它是“人工的”,因为批处理只是有效地 .map()(在底层代码中的某处)变成了一个更大的批处理。

但正如所证明的那样。由于实际“等待”每个 .next(),基本预期用途确实按预期工作。就像你应该做的那样。

关于javascript - 多次同时调用 `cursor.next()`导致驱动崩溃,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46780632/

有关javascript - 多次同时调用 `cursor.next()`导致驱动崩溃的更多相关文章

  1. ruby - 多次弹出/移动 ruby​​ 数组 - 2

    我的代码目前看起来像这样numbers=[1,2,3,4,5]defpop_threepop=[]3.times{pop有没有办法在一行中完成pop_three方法中的内容?我基本上想做类似numbers.slice(0,3)的事情,但要删除切片中的数组项。嗯...嗯,我想我刚刚意识到我可以试试slice! 最佳答案 是numbers.pop(3)或者numbers.shift(3)如果你想要另一边。 关于ruby-多次弹出/移动ruby​​数组,我们在StackOverflow上找到一

  2. ruby - 检查 "command"的输出应该包含 NilClass 的意外崩溃 - 2

    为了将Cucumber用于命令行脚本,我按照提供的说明安装了arubagem。它在我的Gemfile中,我可以验证是否安装了正确的版本并且我已经包含了require'aruba/cucumber'在'features/env.rb'中为了确保它能正常工作,我写了以下场景:@announceScenario:Testingcucumber/arubaGivenablankslateThentheoutputfrom"ls-la"shouldcontain"drw"假设事情应该失败。它确实失败了,但失败的原因是错误的:@announceScenario:Testingcucumber/ar

  3. Ruby Readline 在向上箭头上使控制台崩溃 - 2

    当我在Rails控制台中按向上或向左箭头时,出现此错误:irb(main):001:0>/Users/me/.rvm/gems/ruby-2.0.0-p247/gems/rb-readline-0.4.2/lib/rbreadline.rb:4269:in`blockin_rl_dispatch_subseq':invalidbytesequenceinUTF-8(ArgumentError)我使用rvm来管理我的ruby​​安装。我正在使用=>ruby-2.0.0-p247[x86_64]我使用bundle来管理我的gem,并且我有rb-readline(0.4.2)(人们推荐的最少

  4. 使用 ACL 调用 upload_file 时出现 Ruby S3 "Access Denied"错误 - 2

    我正在尝试编写一个将文件上传到AWS并公开该文件的Ruby脚本。我做了以下事情:s3=Aws::S3::Resource.new(credentials:Aws::Credentials.new(KEY,SECRET),region:'us-west-2')obj=s3.bucket('stg-db').object('key')obj.upload_file(filename)这似乎工作正常,除了该文件不是公开可用的,而且我无法获得它的公共(public)URL。但是当我登录到S3时,我可以正常查看我的文件。为了使其公开可用,我将最后一行更改为obj.upload_file(file

  5. c# - 如何在 ruby​​ 中调用 C# dll? - 2

    如何在ruby​​中调用C#dll? 最佳答案 我能想到几种可能性:为您的DLL编写(或找人编写)一个COM包装器,如果它还没有,则使用Ruby的WIN32OLE库来调用它;看看RubyCLR,其中一位作者是JohnLam,他继续在Microsoft从事IronRuby方面的工作。(估计不会再维护了,可能不支持.Net2.0以上的版本);正如其他地方已经提到的,看看使用IronRuby,如果这是您的技术选择。有一个主题是here.请注意,最后一篇文章实际上来自JohnLam(看起来像是2009年3月),他似乎很自在地断言RubyCL

  6. java - 从 JRuby 调用 Java 类的问题 - 2

    我正在尝试使用boilerpipe来自JRuby。我看过guide从JRuby调用Java,并成功地将它与另一个Java包一起使用,但无法弄清楚为什么同样的东西不能用于boilerpipe。我正在尝试基本上从JRuby中执行与此Java等效的操作:URLurl=newURL("http://www.example.com/some-location/index.html");Stringtext=ArticleExtractor.INSTANCE.getText(url);在JRuby中试过这个:require'java'url=java.net.URL.new("http://www

  7. ruby - 调用其他方法的 TDD 方法的正确方法 - 2

    我需要一些关于TDD概念的帮助。假设我有以下代码defexecute(command)casecommandwhen"c"create_new_characterwhen"i"display_inventoryendenddefcreate_new_character#dostufftocreatenewcharacterenddefdisplay_inventory#dostufftodisplayinventoryend现在我不确定要为什么编写单元测试。如果我为execute方法编写单元测试,那不是几乎涵盖了我对create_new_character和display_invent

  8. ruby-on-rails - 如何在 Ruby on Rails 中实现由 JSF 2.0 (Primefaces) 驱动的 UI 魔法 - 2

    按照目前的情况,这个问题不适合我们的问答形式。我们希望答案得到事实、引用或专业知识的支持,但这个问题可能会引发辩论、争论、投票或扩展讨论。如果您觉得这个问题可以改进并可能重新打开,visitthehelpcenter指导。关闭10年前。问题1)我想知道ruby​​onrails是否有功能类似于primefaces的gem。我问的原因是如果您使用primefaces(http://www.primefaces.org/showcase-labs/ui/home.jsf),开发人员无需担心javascript或jquery的东西。据我所知,JSF是一个规范,基于规范的各种可用实现,prim

  9. 【鸿蒙应用开发系列】- 获取系统设备信息以及版本API兼容调用方式 - 2

    在应用开发中,有时候我们需要获取系统的设备信息,用于数据上报和行为分析。那在鸿蒙系统中,我们应该怎么去获取设备的系统信息呢,比如说获取手机的系统版本号、手机的制造商、手机型号等数据。1、获取方式这里分为两种情况,一种是设备信息的获取,一种是系统信息的获取。1.1、获取设备信息获取设备信息,鸿蒙的SDK包为我们提供了DeviceInfo类,通过该类的一些静态方法,可以获取设备信息,DeviceInfo类的包路径为:ohos.system.DeviceInfo.具体的方法如下:ModifierandTypeMethodDescriptionstatic StringgetAbiList​()Obt

  10. FOHEART H1数据手套驱动Optitrack光学动捕双手运动(Unity3D) - 2

    本教程将在Unity3D中混合Optitrack与数据手套的数据流,在人体运动的基础上,添加双手手指部分的运动。双手手背的角度仍由Optitrack提供,数据手套提供双手手指的角度。 01  客户端软件分别安装MotiveBody与MotionVenus并校准人体与数据手套。MotiveBodyMotionVenus数据手套使用、校准流程参照:https://gitee.com/foheart_1/foheart-h1-data-summary.git02  数据转发打开MotiveBody软件的Streaming,开始向Unity3D广播数据;MotionVenus中设置->选项选择Unit

随机推荐