草庐IT

node.js - 从单个 REDIS 实例读取的 Nodejs 集群架构

coder 2023-07-19 原文

我正在使用 Nodejs cluster 模块让多个 worker 运行。 我创建了一个基本架构,其中将有一个 MASTER 进程,它基本上是一个处理多个请求的快速服务器,MASTER 的主要任务是将来自请求的传入数据写入 REDIS 实例。其他 worker (numOfCPUs - 1)将是非主人,即他们不会处理任何请求,因为他们只是消费者。我有两个功能,即 ABC 和 DEF。我通过分配类型将非 master worker 平均分布在各个功能中。

例如:在 8 核机器上:

1 将是通过 express 服务器处理请求的 MASTER 实例

剩余的 (8 - 1 = 7) 将平均分配。 4 个特征:ABD 和 3 个特征:DEF。

非 master worker 基本上是消费者,即他们从 REDIS 中读取数据,其中只有 MASTER worker 可以写入数据。

下面是相同的代码:

if (cluster.isMaster) {
  // Fork workers.
  for (let i = 0; i < numCPUs - 1; i++) {
    ClusteringUtil.forkNewClusterWithAutoTypeBalancing();
  }

  cluster.on('exit', function(worker) {
    console.log(`Worker ${worker.process.pid}::type(${worker.type}) died`);
    ClusteringUtil.removeWorkerFromList(worker.type);
    ClusteringUtil.forkNewClusterWithAutoTypeBalancing();
  });

  // Start consuming on server-start
  ABCConsumer.start();
  DEFConsumer.start();

  console.log(`Master running with process-id: ${process.pid}`);
} else {
  console.log('CLUSTER  type', cluster.worker.process.env.type, 'running on', process.pid);
  if (
    cluster.worker.process.env &&
    cluster.worker.process.env.type &&
    cluster.worker.process.env.type === ServerTypeEnum.EXPRESS
  ) {
    // worker for handling requests
    app.use(express.json());
    ...
  }
{

除了消费者从 REDIS 读取数据外,一切正常。 由于特定功能有多个消费者,每个消费者都读取相同的消息并开始单独处理,这是我不希望的。如果有 4 个消费者,其中 1 个被标记为繁忙并且在有空之前不能消费,则有 3 个可用。一旦 MASTER 在 REDIS 中写入该特定功能的消息,问题是该功能的所有 3 个可用消费者都开始使用。这意味着对于单个消息,工作是根据可用消费者的数量完成的。

const stringifedData = JSON.stringify(req.body);
  const key = uuidv1();

  const asyncHsetRes = await asyncHset(type, key, stringifedData);

  if (asyncHsetRes) {
    await asyncRpush(FeatureKeyEnum.REDIS.ABC_MESSAGE_QUEUE, key);
    res.send({ status: 'success', message: 'Added to processing queue' });
  } else {
    res.send({ error: 'failure', message: 'Something went wrong in adding to queue' });
  }

消费者简单地接受消息并在忙碌时停止

module.exports.startHeartbeat = startHeartbeat = async function(config = {}) {
  if (!config || !config.type || !config.listKey) {
    return;
  }

  heartbeatIntervalObj[config.type] = setInterval(async () => {
    await asyncLindex(config.listKey, -1).then(async res => {
      if (res) {
        await getFreeWorkerAndDoJob(res, config);
        stopHeartbeat(config);
      }
    });
  }, HEARTBEAT_INTERVAL);
};

理想情况下,一条消息应该只被该特定功能的一个消费者阅读。消费后,它被标记为忙碌,所以它不会进一步消费,直到空闲(我已经处理过这个)。下一条消息只能由其他可用消费者中的一个消费者处理。

请帮我解决这个问题。同样,我希望只有一个免费消费者可以阅读一条消息,而其他免费消费者应该等待新消息。

谢谢

最佳答案

我不确定我是否完全理解您的 Redis 消费者架构,但我觉得它与 Redis 本身的用例相矛盾。您要实现的本质上是一种基于队列的消息传递,能够在消息完成后提交消息。

Redis 有自己的发布/订阅功能,但它是建立在火后遗忘原则之上的。它不区分消费者 - 它只是将数据发送给所有消费者,假设它们的逻辑是处理传入数据。

我建议您使用 RabbitMQ 等队列服务器。您可以使用 AMQP 0-9-1 支持的一些功能来实现您的目标:消息确认、消费者的预取计数等。您可以使用非常灵活的配置来设置您的集群,例如 ok,我想要 X 个消费者,并且每个消费者一次可以处理 1 个唯一的(!)消息,并且只有在他们让服务器(rabbitmq ) 知道他们已成功完成消息处理。这是高度可配置和健壮的。

但是,如果您想通过一些完全托管的服务实现无服务器,这样您就不会像虚拟机或其他任何东西一样配置来运行您选择的消息队列服务器,您可以使用 AWS SQS。它具有非常相似的 API 和功能列表。

希望对您有所帮助!

关于node.js - 从单个 REDIS 实例读取的 Nodejs 集群架构,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56198787/

有关node.js - 从单个 REDIS 实例读取的 Nodejs 集群架构的更多相关文章

  1. ruby - 如何将脚本文件的末尾读取为数据文件(Perl 或任何其他语言) - 2

    我正在寻找执行以下操作的正确语法(在Perl、Shell或Ruby中):#variabletoaccessthedatalinesappendedasafileEND_OF_SCRIPT_MARKERrawdatastartshereanditcontinues. 最佳答案 Perl用__DATA__做这个:#!/usr/bin/perlusestrict;usewarnings;while(){print;}__DATA__Texttoprintgoeshere 关于ruby-如何将脚

  2. ruby-on-rails - 如何使用 instance_variable_set 正确设置实例变量? - 2

    我正在查看instance_variable_set的文档并看到给出的示例代码是这样做的:obj.instance_variable_set(:@instnc_var,"valuefortheinstancevariable")然后允许您在类的任何实例方法中以@instnc_var的形式访问该变量。我想知道为什么在@instnc_var之前需要一个冒号:。冒号有什么作用? 最佳答案 我的第一直觉是告诉你不要使用instance_variable_set除非你真的知道你用它做什么。它本质上是一种元编程工具或绕过实例变量可见性的黑客攻击

  3. ruby 正则表达式 - 如何替换字符串中匹配项的第 n 个实例 - 2

    在我的应用程序中,我需要能够找到所有数字子字符串,然后扫描每个子字符串,找到第一个匹配范围(例如5到15之间)的子字符串,并将该实例替换为另一个字符串“X”。我的测试字符串s="1foo100bar10gee1"我的初始模式是1个或多个数字的任何字符串,例如,re=Regexp.new(/\d+/)matches=s.scan(re)给出["1","100","10","1"]如果我想用“X”替换第N个匹配项,并且只替换第N个匹配项,我该怎么做?例如,如果我想替换第三个匹配项“10”(匹配项[2]),我不能只说s[matches[2]]="X"因为它做了两次替换“1fooX0barXg

  4. Ruby 写入和读取对象到文件 - 2

    好的,所以我的目标是轻松地将一些数据保存到磁盘以备后用。您如何简单地写入然后读取一个对象?所以如果我有一个简单的类classCattr_accessor:a,:bdefinitialize(a,b)@a,@b=a,bendend所以如果我从中非常快地制作一个objobj=C.new("foo","bar")#justgaveitsomerandomvalues然后我可以把它变成一个kindaidstring=obj.to_s#whichreturns""我终于可以将此字符串打印到文件或其他内容中。我的问题是,我该如何再次将这个id变回一个对象?我知道我可以自己挑选信息并制作一个接受该信

  5. ruby-on-rails - Rails - 从另一个模型中创建一个模型的实例 - 2

    我有一个正在构建的应用程序,我需要一个模型来创建另一个模型的实例。我希望每辆车都有4个轮胎。汽车模型classCar轮胎模型classTire但是,在make_tires内部有一个错误,如果我为Tire尝试它,则没有用于创建或新建的activerecord方法。当我检查轮胎时,它没有这些方法。我该如何补救?错误是这样的:未定义的方法'create'forActiveRecord::AttributeMethods::Serialization::Tire::Module我测试了两个环境:测试和开发,它们都因相同的错误而失败。 最佳答案

  6. ruby-on-rails - RSpec:避免使用允许接收的任何实例 - 2

    我正在处理旧代码的一部分。beforedoallow_any_instance_of(SportRateManager).toreceive(:create).and_return(true)endRubocop错误如下:Avoidstubbingusing'allow_any_instance_of'我读到了RuboCop::RSpec:AnyInstance我试着像下面那样改变它。由此beforedoallow_any_instance_of(SportRateManager).toreceive(:create).and_return(true)end对此:let(:sport_

  7. ruby-on-rails - 使用 ruby​​ 将多个实例变量转换为散列的更好方法? - 2

    我收到格式为的回复#我需要将其转换为哈希值(针对活跃商家)。目前我正在遍历变量并执行此操作:response.instance_variables.eachdo|r|my_hash.merge!(r.to_s.delete("@").intern=>response.instance_eval(r.to_s.delete("@")))end这有效,它将生成{:first="charlie",:last=>"kelly"},但它似乎有点hacky和不稳定。有更好的方法吗?编辑:我刚刚意识到我可以使用instance_variable_get作为该等式的第二部分,但这仍然是主要问题。

  8. 世界前沿3D开发引擎HOOPS全面讲解——集3D数据读取、3D图形渲染、3D数据发布于一体的全新3D应用开发工具 - 2

    无论您是想搭建桌面端、WEB端或者移动端APP应用,HOOPSPlatform组件都可以为您提供弹性的3D集成架构,同时,由工业领域3D技术专家组成的HOOPS技术团队也能为您提供技术支持服务。如果您的客户期望有一种在多个平台(桌面/WEB/APP,而且某些客户端是“瘦”客户端)快速、方便地将数据接入到3D应用系统的解决方案,并且当访问数据时,在各个平台上的性能和用户体验保持一致,HOOPSPlatform将帮助您完成。利用HOOPSPlatform,您可以开发在任何环境下的3D基础应用架构。HOOPSPlatform可以帮您打造3D创新型产品,HOOPSSDK包含的技术有:快速且准确的CAD

  9. python - 如何读取 MIDI 文件、更改其乐器并将其写回? - 2

    我想解析一个已经存在的.mid文件,改变它的乐器,例如从“acousticgrandpiano”到“violin”,然后将它保存回去或作为另一个.mid文件。根据我在文档中看到的内容,该乐器通过program_change或patch_change指令进行了更改,但我找不到任何在已经存在的MIDI文件中执行此操作的库.他们似乎都只支持从头开始创建的MIDI文件。 最佳答案 MIDIpackage会为您完成此操作,但具体方法取决于midi文件的原始内容。一个MIDI文件由一个或多个音轨组成,每个音轨是十六个channel中任何一个上的

  10. STM32读取串口传感器数据(颗粒物传感器,主动上传) - 2

    文章目录1.开发板选择*用到的资源2.串口通信(个人理解)3.代码分析(注释比较详细)1.主函数2.串口1配置3.串口2配置以及中断函数4.注意问题5.源码链接1.开发板选择我用的是STM32F103RCT6的板子,不过代码大概在F103系列的板子上都可以运行,我试过在野火103的霸道板上也可以,主要看一下串口对应的引脚一不一样就行了,不一样的就更改一下。*用到的资源keil5软件这里用到了两个串口资源,采集数据一个,串口通信一个,板子对应引脚如下:串口1,TX:PA9,RX:PA10串口2,TX:PA2,RX:PA32.串口通信(个人理解)我就从串口采集传感器数据这个过程说一下我自己的理解,

随机推荐