草庐IT

死锁的3种死法

博学谷狂野架构师 2023-04-16 原文

1. 什么是死锁

在多线程环境中,多个进程可以竞争有限数量的资源。当一个进程申请资源时,如果这时没有可用资源,那么这个进程进入等待状态。有时,如果所申请的资源被其他等待进程占有,那么该等待进程有可能再也无法改变状态。这种情况称为死锁

在Java中使用多线程,就会有可能导致死锁问题。死锁会让程序一直住,不再往下执行。我们只能通过中止并重启的方式来让程序重新执行。

2. 造成死锁的原因

  • 当前线程拥有其他线程需要的资源
  • 当前线程等待其他线程已拥有的资源
  • 都不放弃自己拥有的资源

3. 死锁的必要条件

3.1 互斥

进程要求对所分配的资源(如打印机)进行排他性控制,即在一段时间内某资源仅为一个进程所占有。此时若有其他进程请求该资源,则请求进程只能等待。

3.2 不可剥夺

进程所获得的资源在未使用完毕之前,不能被其他进程强行夺走,即只能由获得该资源的进程自己来释放(只能是主动释放)。

3.3 请求与保持

进程已经保持了至少一个资源,但又提出了新的资源请求,而该资源已被其他进程占有,此时请求进程被阻塞,但对自己已获得的资源保持不放。

3.4 循环等待

是指进程发生死锁后,必然存在一个进程–资源之间的环形链,通俗讲就是你等我的资源,我等你的资源,大家一直等。

4. 死锁的分类

4.1 静态顺序型死锁

线程之间形成相互等待资源的环时,就会形成顺序死锁lock-ordering deadlock,多个线程试图以不同的顺序来获取相同的锁时,容易形成顺序死锁,如果所有线程以固定的顺序来获取锁,就不会出现顺序死锁问题

经典案例是LeftRightDeadlock,两个方法,分别是leftRigth、rightLeft。如果一个线程调用leftRight,另一个线程调用rightLeft,且两个线程是交替执行的,就会发生死锁。

public class LeftRightDeadLock {

    //左边锁
    private static Object left = new Object();
    //右边锁
    private static Object right = new Object();

    /**
     * 现持有左边的锁,然后获取右边的锁
     */
    public static void leftRigth() {
        synchronized (left) {
            System.out.println("leftRigth: left lock,threadId:" + Thread.currentThread().getId());
            //休眠增加死锁产生的概率
            sleep(100);
            synchronized (right) {
                System.out.println("leftRigth: right lock,threadId:" + Thread.currentThread().getId());
            }
        }
    }

    /**
     * 现持有右边的锁,然后获取左边的锁
     */
    public static void rightLeft() {
        synchronized (right) {
            System.out.println("rightLeft: right lock,threadId:" + Thread.currentThread().getId());
            //休眠增加死锁产生的概率
            sleep(100);
            synchronized (left) {
                System.out.println("rightLeft: left lock,threadId:" + Thread.currentThread().getId());
            }
        }
    }

    /**
     * 休眠
     *
     * @param time
     */
    private static void sleep(long time) {
        try {
            Thread.sleep(time);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        //创建一个线程池
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        executorService.execute(() -> leftRigth());
        executorService.execute(() -> rightLeft());
        executorService.shutdown();
    }
}

输出:

leftRigth: left lock,threadId:12
rightLeft: right lock,threadId:13

我们发现,12号线程锁住了左边要向右边获取锁,13号锁住了右边,要向左边获取锁,因为两边都不释放自己的锁,互不相让,就产生了死锁。

4.1.1 解决方案

固定加锁的顺序(针对锁顺序死锁)

只要交换下锁的顺序,让线程来了之后先获取同一把锁,获取不到就等待,等待上一个线程释放锁再获取锁。

public static void leftRigth() {
       synchronized (left) {
         ...
           synchronized (right) {
            ...
           }
       }
   }

   public static void rightLeft() {
       synchronized (left) {
         ...
           synchronized (right) {
            ...
           }
       }
   }

4.2 动态锁顺序型死锁

由于方法入参由外部传递而来,方法内部虽然对两个参数按照固定顺序进行加锁,但是由于外部传递时顺序的不可控,而产生锁顺序造成的死锁,即动态锁顺序死锁。

上例告诉我们,交替的获取锁会导致死锁,且锁是固定的。有时候锁的执行顺序并不那么清晰,参数导致不同的执行顺序。经典案例是银行账户转账,from账户向to账户转账,在转账之前先获取两个账户的锁,然后开始转账,如果这是to账户向from账户转账,角色互换,也会导致锁顺序死锁。

/**
 * 动态顺序型死锁
 * 转账业务
 */
public class TransferMoneyDeadlock {

    public static void transfer(Account from, Account to, int amount) {
        //先锁住转账的账户
        synchronized (from) {
            System.out.println("线程【" + Thread.currentThread().getId() + "】获取【" + from.name + "】账户锁成功");
            //休眠增加死锁产生的概率
            sleep(100);
            //在锁住目标账户
            synchronized (to) {
                System.out.println("线程【" + Thread.currentThread().getId() + "】获取【" + to.name + "】账户锁成功");
                if (from.balance < amount) {
                    System.out.println("余额不足");
                    return;
                } else {
                    from.debit(amount);
                    to.credit(amount);
                    System.out.println("线程【" + Thread.currentThread().getId() + "】从【" + from.name + "】账户转账到【" + to.name + "】账户【" + amount + "】元钱成功");
                }
            }
        }
    }

    private static class Account {
        String name;
        int balance;

        public Account(String name, int balance) {
            this.name = name;
            this.balance = balance;
        }

        void debit(int amount) {
            this.balance = balance - amount;
        }

        void credit(int amount) {
            this.balance = balance + amount;
        }
    }


    /**
     * 休眠
     *
     * @param time
     */
    private static void sleep(long time) {
        try {
            Thread.sleep(time);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        //创建线程池
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        //创建账户A
        Account A = new Account("A", 100);
        //创建账户B
        Account B = new Account("B", 200);
        //A -> B 的转账
        executorService.execute(() -> transfer(A, B, 5));
        //B -> A 的转账
        executorService.execute(() -> transfer(B, A, 10));
        executorService.shutdown();
    }
}

输出:

线程【12】获取【A】账户锁成功
线程【13】获取【B】账户锁成功

然后就没有然后了,产生了死锁,我们发现 因为对象的调用关系,产生了互相锁住资源的问题。

4.2.1 解决方案

根据传入对象的hashCode硬性确定加锁顺序,消除可变性,避免死锁。

package com.test.thread.deadlock;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * 动态顺序型死锁解决方案
 */
public class TransferMoneyDeadlock {
    /**
     * 监视器,第三把锁,为了方式HASH冲突
     */
    private static Object lock = new Object();

    /**
     * 我们经过上一次得失败,明白了不能依赖参数名称简单的确定锁的顺序,因为参数是
     * 具有动态性的,所以,我们改变一下思路,直接根据传入对象的hashCode()大小来
     * 对锁定顺序进行排序(这里要明白的是如何排序不是关键,有序才是关键)。
     *
     * @param from
     * @param to
     * @param amount
     */
    public static void transfer(Account from, Account to, int amount) {
        /**
         * 这里需要说明一下为什么不使用HashCode()因为HashCode方法可以被重写,
         * 所以,我们无法简单的使用父类或者当前类提供的简单的hashCode()方法,
         * 所以,我们就使用系统提供的identityHashCode()方法,该方法保证无论
         * 你是否重写了hashCode方法,都会在虚拟机层面上调用一个名为JVM_IHashCode
         * 的方法来根据对象的存储地址来获取该对象的hashCode(),HashCode如果不重写
         * 的话,其实也是通过这个虚拟机层面上的方法,JVM_IHashCode()方法实现的
         * 这个方法是用C++实现的。
         */
        int fromHash = System.identityHashCode(from);
        int toHash = System.identityHashCode(to);
        if (fromHash > toHash) {
            //先锁住转账的账户
            synchronized (from) {
                System.out.println("线程【" + Thread.currentThread().getId() + "】获取【" + from.name + "】账户锁成功");
                //休眠增加死锁产生的概率
                sleep(100);
                //在锁住目标账户
                synchronized (to) {
                    System.out.println("线程【" + Thread.currentThread().getId() + "】获取【" + to.name + "】账户锁成功");
                    if (from.balance < amount) {
                        System.out.println("余额不足");
                        return;
                    } else {
                        from.debit(amount);
                        to.credit(amount);
                        System.out.println("线程【" + Thread.currentThread().getId() + "】从【" + from.name + "】账户转账到【" + to.name + "】账户【" + amount + "】元钱成功");
                    }
                }
            }
        } else if (fromHash < toHash) {
            //先锁住转账的账户
            synchronized (to) {
                System.out.println("线程【" + Thread.currentThread().getId() + "】获取【" + from.name + "】账户锁成功");
                //休眠增加死锁产生的概率
                sleep(100);
                //在锁住目标账户
                synchronized (from) {
                    System.out.println("线程【" + Thread.currentThread().getId() + "】获取【" + to.name + "】账户锁成功");
                    if (from.balance < amount) {
                        System.out.println("余额不足");
                        return;
                    } else {
                        from.debit(amount);
                        to.credit(amount);
                        System.out.println("线程【" + Thread.currentThread().getId() + "】从【" + from.name + "】账户转账到【" + to.name + "】账户【" + amount + "】元钱成功");
                    }
                }
            }
        } else {
            //如果传入对象的Hash值相同,那就加让加第三层锁
            synchronized (lock) {
                //先锁住转账的账户
                synchronized (from) {
                    System.out.println("线程【" + Thread.currentThread().getId() + "】获取【" + from.name + "】账户锁成功");
                    //休眠增加死锁产生的概率
                    sleep(100);
                    //在锁住目标账户
                    synchronized (to) {
                        System.out.println("线程【" + Thread.currentThread().getId() + "】获取【" + to.name + "】账户锁成功");
                        if (from.balance < amount) {
                            System.out.println("余额不足");
                            return;
                        } else {
                            from.debit(amount);
                            to.credit(amount);
                            System.out.println("线程【" + Thread.currentThread().getId() + "】从【" + from.name + "】账户转账到【" + to.name + "】账户【" + amount + "】元钱成功");
                        }
                    }
                }
            }
        }

    }

    private static class Account {
        String name;
        int balance;

        public Account(String name, int balance) {
            this.name = name;
            this.balance = balance;
        }

        void debit(int amount) {
            this.balance = balance - amount;
        }

        void credit(int amount) {
            this.balance = balance + amount;
        }
    }


    /**
     * 休眠
     *
     * @param time
     */
    private static void sleep(long time) {
        try {
            Thread.sleep(time);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        //创建线程池
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        //创建账户A
        Account A = new Account("A", 100);
        //创建账户B
        Account B = new Account("B", 200);
        //A -> B 的转账
        executorService.execute(() -> transfer(A, B, 5));
        //B -> A 的转账
        executorService.execute(() -> transfer(B, A, 10));
        executorService.shutdown();
    }
}

输出

线程【12】获取【A】账户锁成功
线程【12】获取【B】账户锁成功
线程【12】从【A】账户转账到【B】账户【5】元钱成功
线程【13】获取【B】账户锁成功
线程【13】获取【A】账户锁成功
线程【13】从【B】账户转账到【A】账户【10】元钱成功

4.3 协作对象间的死锁

在协作对象之间可能存在多个锁获取的情况,但是这些获取多个锁的操作并不像在LeftRightDeadLock或transferMoney中那么明显,这两个锁并不一定必须在同一个方法中被获取。如果在持有锁时调用某个外部方法,那么这就需要警惕死锁问题,因为在这个外部方法中可能会获取其他锁,或者阻塞时间过长,导致其他线程无法及时获取当前被持有的锁。

上述两例中,在同一个方法中获取两个锁。实际上,锁并不一定在同一方法中被获取。经典案例,如出租车调度系统。

/**
 * 协作对象间的死锁
 */
public class CoordinateDeadlock {
    /**
     * Taxi 类
     */
    static class Taxi {
        private String location;
        private String destination;
        private Dispatcher dispatcher;

        public Taxi(Dispatcher dispatcher, String destination) {
            this.dispatcher = dispatcher;
            this.destination = destination;
        }

        public synchronized String getLocation() {
            return this.location;
        }

        /**
         * 该方法先获取Taxi的this对象锁后,然后调用Dispatcher类的方法时,又需要获取
         * Dispatcher类的this方法。
         *
         * @param location
         */
        public synchronized void setLocation(String location) {
            this.location = location;
            System.out.println(Thread.currentThread().getName() + " taxi set location:" + location);
            if (this.location.equals(destination)) {
                dispatcher.notifyAvailable(this);
            }
        }
    }

    /**
     * 调度类
     */
    static class Dispatcher {
        private Set<Taxi> taxis;
        private Set<Taxi> availableTaxis;

        public Dispatcher() {
            taxis = new HashSet<Taxi>();
            availableTaxis = new HashSet<Taxi>();
        }

        public synchronized void notifyAvailable(Taxi taxi) {
            System.out.println(Thread.currentThread().getName() + " notifyAvailable.");
            availableTaxis.add(taxi);
        }

        /**
         * 打印当前位置:有死锁风险
         * 持有当前锁的时候,同时调用Taxi的getLocation这个外部方法;而这个外部方法也是需要加锁的
         * reportLocation的锁的顺序与Taxi的setLocation锁的顺序完全相反
         */
        public synchronized void reportLocation() {
            System.out.println(Thread.currentThread().getName() + " report location.");
            for (Taxi t : taxis) {
                t.getLocation();
            }
        }

        public void addTaxi(Taxi taxi) {
            taxis.add(taxi);
        }
    }

    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        final Dispatcher dispatcher = new Dispatcher();
        final Taxi taxi = new Taxi(dispatcher, "软件园");
        dispatcher.addTaxi(taxi);
        //先获取dispatcher锁,然后是taxi的锁
        executorService.execute(() -> dispatcher.reportLocation());
        //先获取taxi锁,然后是dispatcher的锁
        executorService.execute(() -> taxi.setLocation("软件园"));
        executorService.shutdown();
    }
}

4.3.1 解决方案

使用开放调用,开放调用指调用该方法不需要持有锁。

开放调用,是指在调用某个方法时不需要持有锁。开放调用可以避免死锁,这种代码更容易编写。上述调度算法完全可以修改为开发调用,修改同步代码块的范围,使其仅用于保护那些涉及共享状态的操作,避免在同步代码块中执行方法调用。修改Dispatcher的reportLocation方法:

4.3.1.1 setLocation方法
/**
    * 开放调用,不持有锁期间进行外部方法调用
    *
    * @param location
    */
   public void setLocation(String location) {
       synchronized (this) {
           this.location = location;
       }
       System.out.println(Thread.currentThread().getName() + " taxi set location:" + location);
       if (this.location.equals(destination)) {
           dispatcher.notifyAvailable(this);
       }
   }
4.3.1.2 reportLocation 方法
/**
       * 同步块只包含对共享状态的操作代码
       */
      public synchronized void reportLocation() {
          System.out.println(Thread.currentThread().getName() + " report location.");
          Set<Taxi> taxisCopy;
          synchronized (this) {
              taxisCopy = new HashSet<Taxi>(taxis);
          }
          for (Taxi t : taxisCopy) {
              t.getLocation();
          }
      }

本文由传智教育博学谷教研团队发布。

如果本文对您有帮助,欢迎关注点赞;如果您有任何建议也可留言评论私信,您的支持是我坚持创作的动力。

转载请注明出处!

有关死锁的3种死法的更多相关文章

  1. ruby - 使用 SizedQueue 在 ruby​​ 代码中出现死锁 - 2

    我认为我对线程在ruby​​中的工作原理存在根本性的误解,我希望获得一些见解。我想要一个简单的生产者和消费者。首先,生产者线程从文件中提取行并将它们粘贴到SizedQueue中;当那些用完时,在末端贴上一些token,让消费者知道事情已经完成。require'thread'numthreads=2filename='edition-2009-09-11.txt'bq=SizedQueue.new(4)producerthread=Thread.new(bq)do|queue|File.open(filename)do|f|f.eachdo|r|queue现在有几个消费者。为简单起见,让

  2. ruby - 是什么导致我的 Ruby `trap` block 出现这种死锁? - 2

    我正在通读JesseStorimer的优秀著作,WorkingwithUnixProcesses.在有关从已退出的子进程捕获信号的部分中,他提供了一个代码示例。我稍微修改了该代码(见下文)以更清楚地了解正在发生的事情:父级在信号之间恢复自己的执行(我可以通过它的puts看到),wait在一个trap语句中为多个child执行(有时我得到“收到CHLD信号”,然后是多个“childpid退出”)。预期输出通常下面代码的输出类似于:parentisworkinghardReceivedaCHLDsignalchildpid73408exitedparentisworkinghardpare

  3. ruby - 如何跟踪 Ruby 中的死锁 - 2

    我使用BrB为我用Process#forkfork的Ruby1.9中的各种工作进程共享数据源:Thread.abort_on_exception=trueforkdoputs"Initializingdatasourceprocess...(PID:#{Process.pid})"data=DataSource.new(files)BrB::Service.start_service(:object=>data,:verbose=>false,:host=>host,:port=>port)EM.reactor_thread.joinendworkerfork如下:8.timesdo|

  4. ruby - 如何修复 Ruby 中 join() 中的死锁 - 2

    我在Ruby中从事多线程工作。代码片段是:threads_array=Array.new(num_of_threads)1.upto(num_of_threads)do|i|Thread.abort_on_exception=truethreads_array[i-1]=Thread.new{catch(:exit)doprint"s#{i}"user_id=nilloopdouser_id=user_ids.pop()ifuser_id==nilprint"a#{i}"Thread.stop()enddosomething(user_id)endend}end#puts"aftert

  5. ruby - 线程池中的死锁 - 2

    我找不到适合Ruby的ThreadPool实现,所以我写了我的(部分基于这里的代码:http://web.archive.org/web/20081204101031/http://snippets.dzone.com:80/posts/show/3276,但更改为等待/信号和ThreadPool关闭的其他实现。但是在运行一段时间后(有100个线程并处理大约1300个任务),它在第25行死锁-它在那里等待新工作。任何想法,为什么会发生?require'thread'beginrequire'fastthread'rescueLoadError$stderr.puts"Usingther

  6. 【Unity】Unity 欧拉角、四元数、万向节死锁、四元数转轴角 - 2

    文章目录欧拉角(Euler)万向节欧拉角旋转特性欧拉角优点欧拉角缺点方位的表达方式不唯一万向节锁(GimbalLock)四元数(Quaternion)四元数转轴角四元数优点四元数缺点Quaternion类欧拉角(Euler)什么是欧拉角?百科上是这样解释的:用来确定定点转动刚体位置的3个一组独立角参量,由章动角θ、旋进角(即进动角)ψ和自转角φ组成,为欧拉首先提出而得名。很难理解吧?其实我们没有必要把欧拉角想得太复杂。对于开发者来说,欧拉角就是用一个Vector3变量来记录物体沿着x、y、z轴的旋转。注意,虽然这是一个Vector3变量,但它并不是向量,这个变量的x、y、z三个分量是用来描述旋

  7. ruby-on-rails - PG::TRDeadlockDetected:错误:检测到死锁 - 2

    我正在通过bundleexecpumactl-Fconfig/puma.rbphased-restart重新启动8个pumaworker,效果很好。现在我收到越来越多的postgres错误:PG::TRDeadlockDetected:ERROR:deadlockdetected我发现大约有50个空闲的postgres进程在运行:postgres:myappmyapp_production127.0.0.1(59950)idlepostgres:myappmyapp_production127.0.0.1(60141)idle...当我运行bundleexecpumactl-Fconf

  8. go - golang 中的死锁 - 2

    我知道交换第15行和第17行不会出错,但是,我不明白为什么不交换会导致死锁packagemainimport("fmt")funcgreet(cchanstring){fmt.Println("Hello"+fatalerror:所有goroutines都睡着了-死锁! 最佳答案 channelc是无缓冲的。在发送方和接收方都准备就绪之前,无缓冲channel上的通信不会继续。程序死锁是因为当主goroutine执行发送操作时没有接收者准备好。 关于go-golang中的死锁,我们在St

  9. go - 所有goroutine都处于 sleep 状态-死锁(无限循环+选择) - 2

    我有一个应用程序,每隔几秒钟就创建一个从api获取当前价格的例程。然后它将响应发送到监视例程以进行分析。如果监视器发现价格有明显变化,它会发送一个通知。如果每次执行程序之间的延迟较大,则可以正常工作。如果它很小,它不会:“致命错误:所有goroutine都处于休眠-死锁状态!”被触发,程序崩溃。我猜(?)死锁是由以下原因造成的:(1)监视器充斥着新的价格信息(并且未能及时分析这些信息);或(2)主功能被监视器的消息所淹没。在主函数将新价格附加到一个切片上的同时,监视器正在遍历它,这一事实可能也有一些缺点。如何解决这个问题?在阅读其他文章时,我认为“select”语句是一种神奇的治疗方法

  10. go - 不明显的死锁情况 - 2

    您能解释一下为什么会出现这种僵局吗?packagemainimport("sync""fmt""runtime")funcmain(){m:=sync.RWMutex{}gofunc(){m.RLock()runtime.Gosched()m.RLock()m.RUnlock()m.RUnlock()}()runtime.Gosched()m.Lock()m.Unlock()fmt.Println("works")}我不太清楚为什么这种死锁总是经常发生。这会不会是调度器的一个怪癖? 最佳答案 来自RWMutex文档:Ifagorou

随机推荐