草庐IT

当任务完成时,Java executors : how to be notified, 没有阻塞?

coder 2023-04-27 原文

假设我有一个队列,里面装满了需要提交给执行器服务的任务。我希望他们一次处理一个。我能想到的最简单的方法是:

  1. 从队列中获取任务
  2. 提交给执行者
  3. 在返回的 Future 上调用 .get 并阻塞直到结果可用
  4. 从队列中获取另一个任务...

但是,我试图完全避免阻塞。如果我有 10,000 个这样的队列,它们需要一次处理一个任务,我将用完堆栈空间,因为它们中的大多数将保留阻塞的线程。

我想要的是提交一个任务并提供一个在任务完成时调用的回调。我将使用该回调通知作为发送下一个任务的标志。 (functionaljava 和 jetlang 显然使用了这样的非阻塞算法,但是我看不懂他们的代码)

如何使用 JDK 的 java.util.concurrent 来做到这一点,而不是编写自己的执行器服务?

(为我提供这些任务的队列本身可能会阻塞,但这是稍后要解决的问题)

最佳答案

定义一个回调接口(interface)来接收你想在完成通知中传递的任何参数。然后在任务结束时调用它。

您甚至可以为 Runnable 任务编写一个通用包装器,并将它们提交给 ExecutorService。或者,请参阅下面的 Java 8 内置机制。

class CallbackTask implements Runnable {

  private final Runnable task;

  private final Callback callback;

  CallbackTask(Runnable task, Callback callback) {
    this.task = task;
    this.callback = callback;
  }

  public void run() {
    task.run();
    callback.complete();
  }

}

CompletableFuture ,Java 8 包含了一种更精细的方法来组合管道,其中流程可以异步和有条件地完成。这是一个人为但完整的通知示例。

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;

public class GetTaskNotificationWithoutBlocking {

  public static void main(String... argv) throws Exception {
    ExampleService svc = new ExampleService();
    GetTaskNotificationWithoutBlocking listener = new GetTaskNotificationWithoutBlocking();
    CompletableFuture<String> f = CompletableFuture.supplyAsync(svc::work);
    f.thenAccept(listener::notify);
    System.out.println("Exiting main()");
  }

  void notify(String msg) {
    System.out.println("Received message: " + msg);
  }

}

class ExampleService {

  String work() {
    sleep(7000, TimeUnit.MILLISECONDS); /* Pretend to be busy... */
    char[] str = new char[5];
    ThreadLocalRandom current = ThreadLocalRandom.current();
    for (int idx = 0; idx < str.length; ++idx)
      str[idx] = (char) ('A' + current.nextInt(26));
    String msg = new String(str);
    System.out.println("Generated message: " + msg);
    return msg;
  }

  public static void sleep(long average, TimeUnit unit) {
    String name = Thread.currentThread().getName();
    long timeout = Math.min(exponential(average), Math.multiplyExact(10, average));
    System.out.printf("%s sleeping %d %s...%n", name, timeout, unit);
    try {
      unit.sleep(timeout);
      System.out.println(name + " awoke.");
    } catch (InterruptedException abort) {
      Thread.currentThread().interrupt();
      System.out.println(name + " interrupted.");
    }
  }

  public static long exponential(long avg) {
    return (long) (avg * -Math.log(1 - ThreadLocalRandom.current().nextDouble()));
  }

}

关于当任务完成时,Java executors : how to be notified, 没有阻塞?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/826212/

有关当任务完成时,Java executors : how to be notified, 没有阻塞?的更多相关文章

  1. ruby - 其他文件中的 Rake 任务 - 2

    我试图在一个项目中使用rake,如果我把所有东西都放到Rakefile中,它会很大并且很难读取/找到东西,所以我试着将每个命名空间放在lib/rake中它自己的文件中,我添加了这个到我的rake文件的顶部:Dir['#{File.dirname(__FILE__)}/lib/rake/*.rake'].map{|f|requiref}它加载文件没问题,但没有任务。我现在只有一个.rake文件作为测试,名为“servers.rake”,它看起来像这样:namespace:serverdotask:testdoputs"test"endend所以当我运行rakeserver:testid时

  2. ruby - 难道Lua没有和Ruby的method_missing相媲美的东西吗? - 2

    我好像记得Lua有类似Ruby的method_missing的东西。还是我记错了? 最佳答案 表的metatable的__index和__newindex可以用于与Ruby的method_missing相同的效果。 关于ruby-难道Lua没有和Ruby的method_missing相媲美的东西吗?,我们在StackOverflow上找到一个类似的问题: https://stackoverflow.com/questions/7732154/

  3. ruby-on-rails - rails 目前在重启后没有安装 - 2

    我有一个奇怪的问题:我在rvm上安装了ruby​​onrails。一切正常,我可以创建项目。但是在我输入“railsnew”时重新启动后,我有“程序'rails'当前未安装。”。SystemUbuntu12.04ruby-v"1.9.3p194"gemlistactionmailer(3.2.5)actionpack(3.2.5)activemodel(3.2.5)activerecord(3.2.5)activeresource(3.2.5)activesupport(3.2.5)arel(3.0.2)builder(3.0.0)bundler(1.1.4)coffee-rails(

  4. ruby - 如何使用 RSpec::Core::RakeTask 创建 RSpec Rake 任务? - 2

    如何使用RSpec::Core::RakeTask初始化RSpecRake任务?require'rspec/core/rake_task'RSpec::Core::RakeTask.newdo|t|#whatdoIputinhere?endInitialize函数记录在http://rubydoc.info/github/rspec/rspec-core/RSpec/Core/RakeTask#initialize-instance_method没有很好的记录;它只是说:-(RakeTask)initialize(*args,&task_block)AnewinstanceofRake

  5. ruby - 在没有 sass 引擎的情况下使用 sass 颜色函数 - 2

    我想在一个没有Sass引擎的类中使用Sass颜色函数。我已经在项目中使用了sassgem,所以我认为搭载会像以下一样简单:classRectangleincludeSass::Script::FunctionsdefcolorSass::Script::Color.new([0x82,0x39,0x06])enddefrender#hamlengineexecutedwithcontextofself#sothatwithintemlateicouldcall#%stop{offset:'0%',stop:{color:lighten(color)}}endend更新:参见上面的#re

  6. 没有类的 Ruby 方法? - 2

    大家好!我想知道Ruby中未使用语法ClassName.method_name调用的方法是如何工作的。我头脑中的一些是puts、print、gets、chomp。可以在不使用点运算符的情况下调用这些方法。为什么是这样?他们来自哪里?我怎样才能看到这些方法的完整列表? 最佳答案 Kernel中的所有方法都可用于Object类的所有对象或从Object派生的任何类。您可以使用Kernel.instance_methods列出它们。 关于没有类的Ruby方法?,我们在StackOverflow

  7. ruby-on-rails - Rails 3,嵌套资源,没有路由匹配 [PUT] - 2

    我真的为这个而疯狂。我一直在搜索答案并尝试我找到的所有内容,包括相关问题和stackoverflow上的答案,但仍然无法正常工作。我正在使用嵌套资源,但无法使表单正常工作。我总是遇到错误,例如没有路线匹配[PUT]"/galleries/1/photos"表格在这里:/galleries/1/photos/1/edit路线.rbresources:galleriesdoresources:photosendresources:galleriesresources:photos照片Controller.rbdefnew@gallery=Gallery.find(params[:galle

  8. ruby-on-rails - 有没有办法为 CarrierWave/Fog 设置上传进度指示器? - 2

    我在Rails应用程序中使用CarrierWave/Fog将视频上传到AmazonS3。有没有办法判断上传的进度,让我可以显示上传进度如何? 最佳答案 CarrierWave和Fog本身没有这种功能;你需要一个前端uploader来显示进度。当我不得不解决这个问题时,我使用了jQueryfileupload因为我的堆栈中已经有jQuery。甚至还有apostonCarrierWaveintegration因此您只需按照那里的说明操作即可获得适用于您的应用的进度条。 关于ruby-on-r

  9. ruby - 没有类方法获取 Ruby 类名 - 2

    如何在Ruby中获取BasicObject实例的类名?例如,假设我有这个:classMyObjectSystem我怎样才能使这段代码成功?编辑:我发现Object的实例方法class被定义为returnrb_class_real(CLASS_OF(obj));。有什么方法可以从Ruby中使用它? 最佳答案 我花了一些时间研究irb并想出了这个:classBasicObjectdefclassklass=class这将为任何从BasicObject继承的对象提供一个#class您可以调用的方法。编辑评论中要求的进一步解释:假设你有对象

  10. ruby - 没有轨道的 ActiveRecord 时区 - 2

    我在非Rails项目中使用ActiveRecord。在Rails中,我可以这样做:config.time_zone='EasternTime(US&Canada)'config.active_record.default_timezone='EasternTime(US&Canada)'但如果我不使用rails,我该如何设置时区? 最佳答案 ActiveRecord::Base.default_timezone='EasternTime(US&Canada)' 关于ruby-没有轨道的A

随机推荐