草庐IT

ThreadPoolExecutor优先级队列PriorityBlockingQueue

丶含光 2023-09-28 原文

前两天重构代码,调试的时候,发现有个使用到线程池的地方抛出java.lang.ClassCastException: java.util.concurrent.FutureTask cannot be cast to异常
这个代码是线上在跑的一个逻辑,不该出现问题才对,最后还是翻了下源码确定原因

原因:之前向线程池提交任务用的是execute方法,复制的时候错用成了submit方法,改回execute方法即可

既然遇到了,顺便记录下

自定义提交到线程池的任务
@Data
@AllArgsConstructor
class TestRunnable implements Runnable {

    private Integer i;

    @Override
    public void run() {
        try {
            Thread.sleep(1000 * i);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(i);
    }
}
  • 线程池配合优先级队列的使用execute
    public static void main(String[] args) {
        // 创建优先级队列,指定队列初始大小 指定队列中的任务比较器
        // 优先级队列是无界的 指定的只是初始大小
        // 可以使用lambda简化
        PriorityBlockingQueue queue = new PriorityBlockingQueue(100, new Comparator<TestRunnable>() {

            @Override
            public int compare(TestRunnable o1, TestRunnable o2) {
                return o1.getI() - o2.getI();
            }
        });
        // 创建线程池 传入优先级队列
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1, 60, TimeUnit.SECONDS, queue);

        threadPoolExecutor.execute(new TestRunnable(5));

        threadPoolExecutor.execute(new TestRunnable(2));

        threadPoolExecutor.execute(new TestRunnable(1));

        threadPoolExecutor.execute(new TestRunnable(3));
    }

执行结果

5
1
2
3

根据执行结果可以看到任务已经被排过序

源码笔记
ThreadPoolExecutor execute
    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        // 线程池有正在执行的任务,其余任务会在指定的队列上排队
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

这里指定的是PriorityBlockingQueue,可以看下该队列增加任务的方法

    public boolean offer(E e) {
        if (e == null)
            throw new NullPointerException();
        final ReentrantLock lock = this.lock;
        lock.lock();
        int n, cap;
        Object[] array;
        while ((n = size) >= (cap = (array = queue).length))
            tryGrow(array, cap);
        try {
            Comparator<? super E> cmp = comparator;
            // cmp是创建队列时指定的比较器
            if (cmp == null)
                siftUpComparable(n, e, array);
            else
                siftUpUsingComparator(n, e, array, cmp);
            size = n + 1;
            notEmpty.signal();
        } finally {
            lock.unlock();
        }
        return true;
    }

siftUpUsingComparator方法

    private static <T> void siftUpUsingComparator(int k, T x, Object[] array,
                                       Comparator<? super T> cmp) {
        while (k > 0) {
            int parent = (k - 1) >>> 1;
            Object e = array[parent];
            // 可以看到调用了比较器的compare方法,以确定任务的优先级
            // x e是指定的任务
            if (cmp.compare(x, (T) e) >= 0)
                break;
            array[k] = e;
            k = parent;
        }
        array[k] = x;
    }
  • 线程池配合优先级队列的使用submit
    public static void main(String[] args) {
        // 创建优先级队列,指定队列初始大小 指定队列中的任务比较器
        // 优先级队列是无界的 指定的只是初始大小
        // 可以使用lambda简化
        PriorityBlockingQueue queue = new PriorityBlockingQueue(4, new Comparator<TestRunnable>() {

            @Override
            public int compare(TestRunnable o1, TestRunnable o2) {
                return o1.getI() - o2.getI();
            }
        });
        // 创建线程池 传入优先级队列
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1, 60, TimeUnit.SECONDS, queue);

        threadPoolExecutor.submit(new TestRunnable(5));

        threadPoolExecutor.submit(new TestRunnable(2));

        threadPoolExecutor.submit(new TestRunnable(1));

        threadPoolExecutor.submit(new TestRunnable(3));
    }

执行结果

Exception in thread "main" java.lang.ClassCastException: java.util.concurrent.FutureTask cannot be cast to com.tianwen.jdk.TestRunnable
    at com.tianwen.jdk.DemoApplication$1.compare(DemoApplication.java:17)
    at java.util.concurrent.PriorityBlockingQueue.siftUpUsingComparator(PriorityBlockingQueue.java:375)
    at java.util.concurrent.PriorityBlockingQueue.offer(PriorityBlockingQueue.java:492)
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1371)
    at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112)
    at com.tianwen.jdk.DemoApplication.main(DemoApplication.java:31)
5
2

可以看到,除了一个正在被执行的任务和一个排在队列头部的任务,其余的任务添加时,在调用compare方法都会抛出异常

源码笔记
ThreadPoolExecutor submit
    public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        // 封装成一个Future
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        // 同样的在执行ThreadPoolExecutor的execute方法
        // 但是传入的已经不是最初的任务,而是一个封装过的Future
        execute(ftask);
        return ftask;
    }

实际是在父类AbstractExecutorService中,可以看到submit方法内先将提交给线程池的任务封装成一个Future,再同样执行ThreadPoolExecutorexecute方法。但其实这里的任务已经不是最初指定的任务,而是一个Future,所以在最后尝试将任务放进优先级队列时,调用比较器的compare方法时自然会抛出
java.lang.ClassCastException: java.util.concurrent.FutureTask cannot be cast to

有关ThreadPoolExecutor优先级队列PriorityBlockingQueue的更多相关文章

  1. ruby - 分布式事务和队列,ruby,erlang,scala - 2

    我有一个涉及多台机器、消息队列和事务的问题。因此,例如用户点击网页,点击将消息发送到另一台机器,该机器将付款添加到用户的帐户。每秒可能有数千次点击。事务的所有方面都应该是容错的。我以前从未遇到过这样的事情,但一些阅读表明这是一个众所周知的问题。所以我的问题。我假设安全的方法是使用两阶段提交,但协议(protocol)是阻塞的,所以我不会获得所需的性能,我是否正确?我通常写Ruby,但似乎Redis之类的数据库和Rescue、RabbitMQ等消息队列系统对我的帮助不大——即使我实现某种两阶段提交,如果Redis崩溃,数据也会丢失,因为它本质上只是内存。所有这些让我开始关注erlang和

  2. ruby-on-rails - Ruby 长时间运行的进程对队列事件使用react - 2

    我有一个将某些事件写入队列的Rails3应用。现在我想在服务器上创建一个服务,每x秒轮询一次队列,并按计划执行其他任务。除了创建ruby​​脚本并通过cron作业运行它之外,还有其他稳定的替代方案吗? 最佳答案 尽管启动基于Rails的持久任务是一种选择,但您可能希望查看更有序的系统,例如delayed_job或Starling管理您的工作量。我建议不要在cron中运行某些东西,因为启动整个Rails堆栈的开销可能很大。每隔几秒运行一次它是不切实际的,因为Rails上的启动时间通常为5-15秒,具体取决于您的硬件。不过,每天这样做几

  3. ruby - 了解 Ruby 中赋值和逻辑运算符的优先级 - 2

    在以下示例中,我无法理解Ruby运算符的优先级:x=1&&y=2由于&&的优先级高于=,我的理解是类似于+和*运算符:1+2*3+4解析为1+(2*3)+4它应该等于:x=(1&&y)=2但是,所有Ruby源代码(包括内部语法解析器Ripper)都将其解析为x=(1&&(y=2))为什么?编辑[08.01.2016]让我们关注一个子表达式:1&&y=2根据优先规则,我们应该尝试将其解析为:(1&&y)=2这没有意义,因为=需要特定的LHS(变量、常量、[]数组项等)。但是既然(1&&y)是一个正确的表达式,那么解析器应该如何处理呢?我试过咨询Ruby的parse.y,但它太像意大利面条

  4. ruby - 在不提供其所有属性的情况下获取队列 - 2

    我正在尝试为现有队列编写消费者。RabbbitMQ在一个单独的实例中运行,名为“org-queue”的队列已经创建并绑定(bind)到一个交换器。org-queue是一个持久队列,它还有一些额外的属性。现在我需要从这个队列接收消息。我使用下面的代码来获取队列的实例conn=Bunny.newconn.startch=conn.create_channelq=ch.queue("org-queue")它抛出一个错误,指出不同的耐用属性。默认情况下,Bunny似乎使用durable=false。所以我添加了durabletrue作为参数。现在它说明了其他参数之间的区别。我是否需要指定所有参

  5. ruby - 如何在特定队列中推送作业并使用 sidekiq 限制工作人员数量? - 2

    我知道我们可以做到:sidekiq_optionsqueue:"Foo"但在这种情况下,Worker只分配给一个队列:“Foo”。我需要在特定队列中分配作业(而不是worker)。使用Resque很容易:Resque.enqueue_to(queue_name,my_job)另外,为了并发问题,我需要限制每个队列的Worker数量为1。我该怎么做? 最佳答案 您可能会使用https://github.com/brainopia/sidekiq-limit_fetch然后:Sidekiq::Client.push({'class'=>

  6. Python:每日一题之小张的衣服(优先队列、哈夫曼编码) - 2

    题目描述小张买了 n 件白色的衣服,他觉得所有衣服都是一种颜色太单调,希望对这些衣服进行染色,每次染色时,他会将某种颜色的所有衣服寄去染色厂,第 i 件衣服的邮费为 ai​ 元,染色厂会按照小张的要求将其中一部分衣服染成同一种任意的颜色,之后将衣服寄给小张,请问小张要将 n 件衣服染成不同颜色的最小代价是多少?输入描述第一行为一个整数 n ,表示衣服的数量。第二行包括 n 个整数a1​,a2​...an​ 表示第 i 件衣服的邮费为 ai​ 元。(1≤n≤10^5,1≤ai​≤10^9 )输出描述输出一个整数表示小张所要花费的最小代价。输入输出样例输入551321输出25 思考🤔:题意:意思是

  7. ruby - ruby 中方法参数的优先级和/或相对于方法参数的优先级 - 2

    这里有两个测试:if[1,2,3,4].include?2&&nil.nil?puts:helloend#=>和if[1,2,3,4].include?(2)&&nil.nil?puts:helloend#=>hello上面告诉我&&比方法参数有更高的优先级,所以它逻辑上和2&&nil.nil?是真的,并将它作为参数传递给include?但是,有这个测试:if[1,2,3,4].include?2andnil.nil?puts:helloend#=>hello所以这告诉我方法参数和“and”具有相同的优先级(或者方法参数高于“and”)因为它传递了2以包含?在处理“和”之前。注意:我知

  8. ruby - Resque:每个队列一个 worker - 2

    我目前有一个Rails3.0项目,使用Ruby1.9.2和Resque。我的应用程序有多个工作类和多个队列,它们是动态创建的(在运行时)。此外,有多个worker已启动,可以自由地在任何队列上工作,因为在启动时没有任何现有队列,并且无法预测它们:$COUNT=3QUEUE=*rakeresque:workers根据project的id创建队列:@queue="project_#{project.id}".to_sym对于给定的队列,他们的作业必须按顺序处理,一次处理一个。我的问题是,通过拥有多个工作人员,可以并行处理多个作业。有没有办法设置每个队列的最大worker数(为1)?有没有办

  9. ruby - Amazon SQS 优先级队列 - 2

    是否可以使用Amazon简单排队服务创建优先级队列?最初我找不到关于这个主题的任何内容,这就是我创建两个队列的原因。一个普通队列和一个优先队列。我正在根据我定义的规则将消息排入此队列,但在出列消息时会出现困惑。如何对队列进行长时间轮询,使我的队列组合表现得像一个优先级队列? 最佳答案 我认为您通过创建两个队列走在正确的轨道上-一个普通队列和一个优先级队列。在这种情况下,您不一定需要长时间轮询。由于优先队列中的消息优先于普通队列中的消息,您可以采用如下方法:轮询优先级队列,直到没有更多消息为止。轮询普通队列并在普通队列中的每条消息后重

  10. ruby - AMQP 动态创建订阅队列 - 2

    我正在尝试使用AMQP、Websockets和Ruby构建一个简单的聊天应用程序。我知道这可能不是理解AMQP的最佳用例,但我想了解我哪里出错了。以下是我的amqp-server代码require'rubygems'require'amqp'require'mongo'require'em-websocket'require'json'classMessageParser#messageformat=>"room:harry_potter,nickname:siddharth,room:members"defself.parse(message)parsed_message=JSON.

随机推荐