草庐IT

Guava异步回调

M_lear 2023-03-28 原文

写的比较糙,大家可能会看的比较懵。其实本文就是把debug出来的逻辑给记录下来了而已。

正文

从ListeningExecutorService的submit开始分析。

在AbstractListeningExecutorService中重写了newTaskFor方法。
newTaskFor返回的是TrustedListenableFutureTask对象。

AbstractExecutorService的submit逻辑:

    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }

所以submit执行的是TrustedListenableFutureTask的run逻辑。

TrustedListenableFutureTask的run逻辑执行的是TrustedFutureInterruptibleTask的run逻辑。

TrustedFutureInterruptibleTask的run逻辑是继承InterruptibleTask的。

InterruptibleTask的run逻辑:

    public final void run() {
        Thread currentThread = Thread.currentThread();
        if (this.compareAndSet((Object)null, currentThread)) {
            boolean run = !this.isDone();
            T result = null;
            Throwable error = null;

            try {
                if (run) {
                    result = this.runInterruptibly();
                }
            } catch (Throwable var9) {
                error = var9;
            } finally {
                if (!this.compareAndSet(currentThread, DONE)) {
                    this.waitForInterrupt(currentThread);
                }

                if (run) {
                    if (error == null) {
                        this.afterRanInterruptiblySuccess(NullnessCasts.uncheckedCastNullableTToT(result));
                    } else {
                        this.afterRanInterruptiblyFailure(error);
                    }
                }

            }

        }
    }

模板模式。
其中调用的runInterruptibly、afterRanInterruptiblySuccess、afterRanInterruptiblyFailure都是抽象方法,在子类实现。

    @ParametricNullness
    abstract T runInterruptibly() throws Exception;

    abstract void afterRanInterruptiblySuccess(@ParametricNullness T var1);

    abstract void afterRanInterruptiblyFailure(Throwable var1);

子类TrustedFutureInterruptibleTask的这三个方法:

        @ParametricNullness
        V runInterruptibly() throws Exception {
            return this.callable.call();
        }

        void afterRanInterruptiblySuccess(@ParametricNullness V result) {
            TrustedListenableFutureTask.this.set(result);
        }

        void afterRanInterruptiblyFailure(Throwable error) {
            TrustedListenableFutureTask.this.setException(error);
        }

runInterruptibly执行的就是Callable的call方法。

如果把子类实现串进去,整体上InterruptibleTask的run逻辑其实类似于JDK FutureTask的run逻辑。

外部类TrustedListenableFutureTask的set和setException方法(继承自TrustedFuture),都会调用complete方法。

complete方法会调用executeListener执行所有的回调逻辑。

回调逻辑封装在CallbackListener的run:

        public void run() {
            if (this.future instanceof InternalFutureFailureAccess) {
                Throwable failure = InternalFutures.tryInternalFastPathGetFailure((InternalFutureFailureAccess)this.future);
                if (failure != null) {
                    this.callback.onFailure(failure);
                    return;
                }
            }

            Object value;
            try {
                value = Futures.getDone(this.future);
            } catch (ExecutionException var3) {
                this.callback.onFailure(var3.getCause());
                return;
            } catch (Error | RuntimeException var4) {
                this.callback.onFailure(var4);
                return;
            }

            this.callback.onSuccess(value);
        }

Futures.getDone

    @ParametricNullness
    @CanIgnoreReturnValue
    public static <V> V getDone(Future<V> future) throws ExecutionException {
        Preconditions.checkState(future.isDone(), "Future was expected to be done: %s", future);
        return Uninterruptibles.getUninterruptibly(future);
    }

Uninterruptibles.getUninterruptibly

    @ParametricNullness
    @CanIgnoreReturnValue
    public static <V> V getUninterruptibly(Future<V> future) throws ExecutionException {
        boolean interrupted = false;

        try {
            while(true) {
                try {
                    Object var2 = future.get();
                    return var2;
                } catch (InterruptedException var6) {
                    interrupted = true;
                }
            }
        } finally {
            if (interrupted) {
                Thread.currentThread().interrupt();
            }

        }
    }

这里的get不会阻塞,因为Callable任务已经执行完了,这里只是单纯获取执行结果。

回到上面的run逻辑,后面就是根据future的get结果调用对应的回调逻辑。

有关Guava异步回调的更多相关文章

  1. ruby-on-rails - 如何在 ruby​​ 中使用两个参数异步运行 exe? - 2

    exe应该在我打开页面时运行。异步进程需要运行。有什么方法可以在ruby​​中使用两个参数异步运行exe吗?我已经尝试过ruby​​命令-system()、exec()但它正在等待过程完成。我需要用参数启动exe,无需等待进程完成是否有任何ruby​​gems会支持我的问题? 最佳答案 您可以使用Process.spawn和Process.wait2:pid=Process.spawn'your.exe','--option'#Later...pid,status=Process.wait2pid您的程序将作为解释器的子进程执行。除

  2. ruby - 如何在 Rails 4 中使用表单对象之前的验证回调? - 2

    我有一个服务模型/表及其注册表。在表单中,我几乎拥有服务的所有字段,但我想在验证服务对象之前自动设置其中一些值。示例:--服务Controller#创建Action:defcreate@service=Service.new@service_form=ServiceFormObject.new(@service)@service_form.validate(params[:service_form_object])and@service_form.saverespond_with(@service_form,location:admin_services_path)end在验证@ser

  3. ruby - 有人可以帮助解释类创建的 post_initialize 回调吗 (Sandi Metz) - 2

    我正在阅读SandiMetz的POODR,并且遇到了一个我不太了解的编码原则。这是代码:classBicycleattr_reader:size,:chain,:tire_sizedefinitialize(args={})@size=args[:size]||1@chain=args[:chain]||2@tire_size=args[:tire_size]||3post_initialize(args)endendclassMountainBike此代码将为其各自的属性输出1,2,3,4,5。我不明白的是查找方法。当一辆山地自行车被实例化时,因为它没有自己的initialize方法

  4. ruby-on-rails - 在 Ruby on Rails 中发送响应之前如何等待多个异步操作完成? - 2

    在我做的一些网络开发中,我有多个操作开始,比如对外部API的GET请求,我希望它们同时开始,因为一个不依赖另一个的结果。我希望事情能够在后台运行。我找到了concurrent-rubylibrary这似乎运作良好。通过将其混合到您创建的类中,该类的方法具有在后台线程上运行的异步版本。这导致我编写如下代码,其中FirstAsyncWorker和SecondAsyncWorker是我编写的类,我在其中混合了Concurrent::Async模块,并编写了一个名为“work”的方法来发送HTTP请求:defindexop1_result=FirstAsyncWorker.new.async.

  5. ruby-on-rails - 将保存回调添加到单个 ActiveRecord 实例,可以吗? - 2

    是否可以为单个ActiveRecord实例添加回调?作为进一步的限制,这是继续使用库,所以我无法控制该类(除了对其进行猴子修补)。这或多或少是我想做的:defdo_something_creazymessage=Message.newmessage.on_save_call:do_even_more_crazy_stuffenddefdo_even_more_crazy_stuff(message)puts"Message#{message}hasbeensaved!Hallelujah!"end 最佳答案 你可以通过在创建对象后立

  6. ruby-on-rails - Ruby method_added 回调不触发包括模块 - 2

    我想写一点“Deprecate-It”库并经常使用“method_added”回调。但是现在我注意到在包含模块时不会触发此回调。是否有任何回调或变通方法,以便在某些内容包含到自身时通知类“Foobar”?用于演示的小Demo:#IncludingModulswon'ttriggermethod_addedcallbackmoduleInvisibleMethoddefinvisible"Youwon'tgetacallbackfromme"endendclassFoobardefself.method_added(m)puts"InstanceMethod:'#{m}'addedto'

  7. ruby-on-rails - 使用 before_save 回调或自定义验证器添加验证错误? - 2

    我有一个模型Listingbelongs_to:user。或者,Userhas_many:listings。每个列表都有一个对其进行分类的类别字段(狗、猫等)。User还有一个名为is_premium的bool字段。这是我验证类别的方式...validates_format_of:category,:with=>/(dogs|cats|birds|tigers|lions|rhinos)/,:message=>'isincorrect'假设我只想让高级用户能够添加老虎、狮子和犀牛。我该怎么做?最好在before_save方法中执行此操作吗?before_save:premium_che

  8. ruby-on-rails - 如何在多个环境中处理 OmniAuth 回调? - 2

    我有一个应用程序专门使用Facebook作为身份验证提供程序,并正确设置了生产模式的回调。为了让它工作,您需要为您的Facebook应用程序提供一个站点URL和一个用于回调的站点域,在我的例子中是http://appname.heroku.com和appname。heroku.com分别。问题是我的Controller设置为只允许经过身份验证的session,所以我无法在开发模式下查看我的应用程序,因为Facebook应用程序的域显然没有设置为本地主机。如何在不更改Facebook设置的情况下解决这个问题? 最佳答案 创建另一个域l

  9. ruby-on-rails - Rails 模型中的条件回调? - 2

    只是想知道是否有一种方法可以在Rails中进行条件回调。我知道您可以像这样进行条件验证:validates_uniqueness_of:email,:if=>(1==1)我经常在回调中做这样的事情:classLineItem稍微清理一下就好了。 最佳答案 classLineItem 关于ruby-on-rails-Rails模型中的条件回调?,我们在StackOverflow上找到一个类似的问题: https://stackoverflow.com/quest

  10. ruby - 使用什么异步 Ruby 服务器? - 2

    我们开始使用Ruby开发新游戏项目。我们决定使用其中一种异步Ruby服务器,但我们无法决定选择哪一种。选项是:歌利亚抽筋+消瘦/彩虹rack-fiber_pool+rack+thin/rainbowseventmachine_httpserver它们似乎都在处理HTTP请求。Cramp还支持开箱即用的Websocket和服务器端事件。您知道这些服务器的优缺点吗? 最佳答案 我使用eventmachine_httpserver公开了一个RESTfulAPIinanEventMachine-basedIRCbot绝对不会推荐它用于任何严

随机推荐