03-并发死锁问题

nobility 发布于 2022-03-23 05-Java并发编程 300 次阅读


并发死锁问题

当多个线程或进程互相持有对方所需资源,却又不主动释放,导致所有进程或线程无法继续前进,导致程序陷入无限阻塞,就是死锁

死锁代码

最简单死锁情况

public static void main(String[] args) throws InterruptedException {
  Object lock1 = new Object();
  Object lock2 = new Object();
  new Thread(() -> {
    synchronized (lock1) {
      System.out.println(Thread.currentThread().getName() + "获得了lock1");
      try {
        Thread.sleep(1000);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
      synchronized (lock2) {
        System.out.println(Thread.currentThread().getName() + "获得了lock2");
      }
    }
  }).start();
  new Thread(() -> {
    synchronized (lock2) {
      System.out.println(Thread.currentThread().getName() + "获得了lock1");
      try {
        Thread.sleep(1000);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
      synchronized (lock1) {
        System.out.println(Thread.currentThread().getName() + "获得了lock2");
      }
    }
  }).start();
}

银行转账问题

import java.util.Random;

public class Main {

  public static void main(String[] args) throws InterruptedException {
    //mustDeadLock(); //必然死锁
    randomDeadLock(100, 20);  //随机死锁
  }

  public static Account[] initAccount(int count) {  //批量账户初始化,每个账户余额在1000到2000之间
    Account[] accounts = new Account[count];
    Random random = new Random(); //用于生成随机数
    for (int i = 0; i < accounts.length; i++) {
      accounts[i] = new Account("account" + i, 1000 + random.nextInt(1000));
    }
    return accounts;
  }

  public static void randomTransferMoney(Account[] accounts) {  //随机转账
    Random random = new Random(); //用于生成随机数
    for (int i = 0; i < accounts.length; i++) {
      int fromAccountIndex = random.nextInt(accounts.length);
      int toAccountIndex = random.nextInt(accounts.length);
      int amount = 100 + random.nextInt(500);  //转账钱数是100到500之间
      int time = random.nextInt(1000);  //转账延时时间
      new TransferMoney(accounts[fromAccountIndex], accounts[toAccountIndex], time).transferMoney(amount);
    }
  }

  public static void mustDeadLock() {  //必然死锁的情况
    Account accountA = new Account("A", 500);
    Account accountB = new Account("B", 500);
    new Thread(() ->
        new TransferMoney(accountA, accountB, 100).transferMoney(200)
    ).start();
    new Thread(() ->
        new TransferMoney(accountB, accountA, 100).transferMoney(200)
    ).start();
  }

  public static void randomDeadLock(int accountCount, int threadCount) {  //随机死锁的情况
    Account[] accounts = initAccount(accountCount);  //批量初始化账户开启不同线程进行转账,账户量容易出现死锁
    for (int i = 0; i < threadCount; i++) {
      new Thread(() -> {
        randomTransferMoney(accounts);
      }).start();
    }
  }
}

class Account {  //账户类
  private String name;  //账户名
  private int balance;  //账户余额

  public Account(String name, int balance) {
    this.name = name;
    this.balance = balance;
  }

  public void setBalance(int balance) {
    this.balance = balance;
  }

  public int getBalance() {
    return balance;
  }

  public String getName() {
    return name;
  }

  @Override
  public String toString() {
    return "账户" + name + "的余额为" + balance;
  }
}

class TransferMoney {
  private Account from;  //转出账户
  private Account to;  //转入账户
  private int time;  //转账延时时间,单位毫秒,默认0

  public TransferMoney(Account from, Account to, int time) {
    this.from = from;
    this.to = to;
    this.time = time;
  }

  public TransferMoney(Account from, Account to) {
    this(from, to, 0);
  }

  public void transferMoney(int amount) {
    synchronized (from) {
      int balance = from.getBalance();  //查看余额是否充足
      if (balance - amount < 0) {
        System.out.println("余额不足,转账失败");
        return;
      }
      if (time > 0) {  //设置时间就延时,若设置转账延时就会进入死锁状态
        try {
          Thread.sleep(time);
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
      }
      synchronized (to) {
        from.setBalance(balance - amount);  //转账操作
        to.setBalance(to.getBalance() + amount);
        System.out.println("从" + from.getName() + "到" + to.getName() + ",成功转账" + amount + "元");
      }
    }
  }
}

哲学家就餐问题

问题描述:有五位哲学家围坐在一张圆形餐桌旁,哲学家只能做吃饭或思考这两件事,餐桌中间有饭,每位哲学家之间各有一只筷子,哲学家必须用两只筷子吃东西,并且只能使用自己左右手边的那两只筷子

在不考虑意大利面有多少,也不考虑哲学家的胃有多大,设计一套规则,使得在哲学家们在完全不交谈,也就是无法知道其他人可能在什么时候要吃饭或者思考的情况下,可以在这吃饭思考两种状态下永远交替下去

产生死锁的情况:每个哲学家都拿起左手的筷子,永远等待右边的筷子,反之依然

class Philosopher implements Runnable {  //哲学家类
  private Object leftChopstick;
  private Object rightChopstick;

  public Philosopher(Object leftChopstick, Object rightChopstick) {
    this.leftChopstick = leftChopstick;
    this.rightChopstick = rightChopstick;
  }

  @Override
  public void run() {  //哲学家的动作,吃饭或思考
    try {
      while (true) {
        doAction("思考中");
        synchronized (leftChopstick) {
          doAction("拿起左边的筷子");
          synchronized (rightChopstick) {
            doAction("拿起右边的筷子 -> 开始进食");
            doAction("放下右边的筷子");
          }
          doAction("放行左边的筷子");
        }
      }
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }

  private void doAction(String action) throws InterruptedException {  //哲学家所作的动作
    System.out.println(Thread.currentThread().getName() + " -> " + action);
    Thread.sleep((long) (Math.random() * 10));  //该动作持续随机时间
  }
}

public class DiningPhilosophers {

  public static Object[] initChopsticks(int count) {
    Object[] chopsticks = new Object[count];
    for (int i = 0; i < chopsticks.length; i++) {  //初始化筷子
      chopsticks[i] = new Object();
    }
    return chopsticks;
  }

  public static Philosopher[] initPhilosophers(Object[] chopsticks) {
    Philosopher[] philosophers = new Philosopher[chopsticks.length];
    for (int i = 0; i < philosophers.length; i++) {  //初始化哲学家
      Object leftChopstick = chopsticks[i];
      Object rightChopstick = chopsticks[(i + 1) % chopsticks.length];  //使用取余方式取到哲学家两测的筷子
      philosophers[i] = new Philosopher(leftChopstick, rightChopstick);
    }
    return philosophers;
  }

  public static void main(String[] args) {
    Object[] chopsticks = initChopsticks(5);
    Philosopher[] philosophers = initPhilosophers(chopsticks);
    for (int i = 0; i < philosophers.length; i++) {
      new Thread(philosophers[i], "哲学家" + (i)).start();  //启动哲学家线程
    }
  }
}

死锁的必要条件

以下四个条件是死锁的必要条件,只要系统发生死锁,这些条件必然成立,而只要一个条件不满足,就不会发生死锁

  1. 互斥条件:一个资源只能同时被一个进程或线程使用
  2. 请求与保持条件:一个进程或线程,因请求资源而阻塞时,对已获得的资源保持不放
  3. 不剥夺条件:进程或线程,已获得的资源,在末使用完之前,不能被外界强行剥夺
  4. 循环等待条件:若干进程或线程之间,形成一种头尾相接的循环等待资源关系

定位死锁

jstack

使用JDK自带命令jstack 进程号对某个Java进程的线程栈进行分析(使用jps命令可查看Java程序的pid),若发现死锁会打印以下信息:对于不明显的死锁可能无法检测出来

Found one Java-level deadlock:
=============================
"Thread-0":
  waiting to lock monitor 0x0000023e7f4e8680 (object 0x0000000089cea620, a java.lang.Object),
  which is held by "Thread-1"
"Thread-1":
  waiting to lock monitor 0x0000023e7ea06f00 (object 0x0000000089cea610, a java.lang.Object),
  which is held by "Thread-0"

Java stack information for the threads listed above:
===================================================
"Thread-0":
        at Main.lambda$main$0(Main.java:17)
        - waiting to lock <0x0000000089cea620> (a java.lang.Object)
        - locked <0x0000000089cea610> (a java.lang.Object)
        at Main$$Lambda$14/0x0000000100066840.run(Unknown Source)
        at java.lang.Thread.run(java.base@11.0.8/Thread.java:834)
"Thread-1":
        at Main.lambda$main$1(Main.java:30)
        - waiting to lock <0x0000000089cea610> (a java.lang.Object)
        - locked <0x0000000089cea620> (a java.lang.Object)
        at Main$$Lambda$15/0x0000000100066c40.run(Unknown Source)
        at java.lang.Thread.run(java.base@11.0.8/Thread.java:834)

Found 1 deadlock.

ThreadMXBean

ThreadMXBean是Java中提供的一个检测死锁情况的工具类

public static void threadMXBean(int timeInterval) {  //每timeInterval时间进行一次死锁检测
  new Thread(() -> {  //新开启一个死锁检测的线程
    ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();  //获取ThreadMXBean实例
    long[] deadlockedThreads = null;
    while (true) {
      try {
        Thread.sleep(timeInterval);  //每timeInterval时间进行一次死锁检测
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
      deadlockedThreads = threadMXBean.findDeadlockedThreads();  //查看当前JVM中是否有死锁发生
      if (deadlockedThreads != null && deadlockedThreads.length > 0) {  //返回不为null且数组有内容(线程ID)则发生死锁
        for (int i = 0; i < deadlockedThreads.length; i++) {
          long deadlockedThread = deadlockedThreads[i]; //获取线程id
          ThreadInfo threadInfo = threadMXBean.getThreadInfo(deadlockedThread);  //使用线程ID获取线程西悉尼
          System.out.println("发现死锁线程" + threadInfo.getThreadName() + ",正在进行处理...");  //进行处理
        }
        System.exit(0);  //处理结束后退出程序
      }
    }
  }).start();
}

修复死锁策略

避免策略

银行转账

我们并不在乎获取锁的顺序,所以要避免相反的获取锁的顺序即可避免死锁,将transferMoney()方法做以下修改

public void transferMoney(int amount) {
  int fromHash = from.hashCode();
  int toHash = to.hashCode();
  if (fromHash > toHash) {  //保证互相转账时获取锁的顺序相同
    firstFromSecondTo(amount);
  } else if (fromHash < toHash) {
    firstToSecondFrom(amount);
  } else {
    synchronized (TransferMoney.class) {  //发生hash碰撞时,再次竞争一个全局锁,谁先抢到谁先执行
      firstFromSecondTo(amount);
    }
  }
}

private void firstFromSecondTo(int amount) {  //先from后to
  synchronized (from) {
    int balance = from.getBalance();  //查看余额是否充足
    if (balance - amount < 0) {
      System.out.println("余额不足,转账失败");
      return;
    }
    if (time > 0) {  //设置时间就延时,若设置转账延时就会进入死锁状态
      try {
        Thread.sleep(time);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }
    synchronized (to) {
      from.setBalance(balance - amount);  //转账操作
      to.setBalance(to.getBalance() + amount);
      System.out.println("从" + from.getName() + "到" + to.getName() + ",成功转账" + amount + "元");
    }
  }
}

private void firstToSecondFrom(int amount) {  //先to后from
  synchronized (to) {
    int balance = from.getBalance();  //查看余额是否充足
    if (balance - amount < 0) {
      System.out.println("余额不足,转账失败");
      return;
    }
    if (time > 0) {  //设置时间就延时,若设置转账延时就会进入死锁状态
      try {
        Thread.sleep(time);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }
    synchronized (from) {
      from.setBalance(balance - amount);  //转账操作
      to.setBalance(to.getBalance() + amount);
      System.out.println("从" + from.getName() + "到" + to.getName() + ",成功转账" + amount + "元");
    }
  }
}

哲学家就餐

  • 服务员检查:当哲学家在进行拿筷子前就会检查,当拿起筷子就会陷入死锁状态,就不会拿起筷子,从而避免死锁
  • 餐票检查:设置小于筷子数量的餐票,在哲学家进餐之前必须要拿到餐票才能进食,未拿到餐票的哲学家只能等待
  • 环路破坏:我们并不在乎获取筷子(锁)的顺序,所以要改变一个哲学家获取筷子(锁)的顺序,将initPhilosophers()方法做以下修改
public static Philosopher[] initPhilosophers(Object[] chopsticks) {
  Philosopher[] philosophers = new Philosopher[chopsticks.length];
  for (int i = 0; i < philosophers.length; i++) {  //初始化哲学家
    Object leftChopstick = chopsticks[i];
    Object rightChopstick = chopsticks[(i + 1) % chopsticks.length];  //使用取余方式取到哲学家两测的筷子

    /*下面是修改代码*/
    if (i == philosophers.length - 1) {  //若是最后一个哲学家就将其拿筷子顺序颠倒
      philosophers[i] = new Philosopher(rightChopstick, leftChopstick);
    } else {
      philosophers[i] = new Philosopher(leftChopstick, rightChopstick);
    }
    /*上面是修改代码*/
  }
  return philosophers;
}

检测与恢复策略

一段时间检测是否有死锁发生,如果使用以下两种恢复策略

  • 进程或线程终止:逐个将线程终止,直到解除死锁,终止顺序最好采用以下几种规则
    • 重要程度:按照线程对于程序的重要性的优先级顺序
    • 已占用资源:占用资源越少的线程优先终止
    • 已运行时间:运行时间越短的线程优先终止
  • 资源抢占:将已发放出去的资源在回收回来,让线程回退几步,来解除死锁,成本较低
    • 缺点:可能同一个线程一直被抢占,造成该线程一直无法执行,导致饥饿现象

鸵鸟策略

若发生死锁的概率极低直接忽略,直到死锁发生时,再进行人工修复

实际开发中避免死锁

  • 尽量使用同步代码块,可自己指定锁对象
  • 尽量降低锁的粒度,使用不同的锁
  • 尽量专锁专用,避免不同功能使用同一把锁
  • 避免锁的嵌套,一旦获取锁的顺序相反,就会造成死锁
  • 放出资源前先查看是否能收回来,比如银行家算法
  • 为每个线程起一个有意义的名字,方便debug进行死锁排查
  • 使用并发类而不是自己设计锁:ConcurrentHashMapConcurrentLinkedQueuejava.util.concurrent.atomic.*
    • 多用并发集合,效率高,比如ConcurrentHashMap;少用同步集合,效率低,比如Hashtable
    • 原子类不仅简单方便,且效率比Lock更高
  • 设置超时时间:使用LocktryLock()方法,若拿不到锁就会等待指定时间,超时后就会返回false
public static void main(String[] args) throws InterruptedException {
  Lock lock1 = new ReentrantLock();
  Lock lock2 = new ReentrantLock();
  new Thread(() -> {
    try {
      while (true) {
        if (lock1.tryLock(1, TimeUnit.SECONDS)) {
          System.out.println(Thread.currentThread().getName() + "获得了lock1");
          Thread.sleep((long) (Math.random() * 10));  //随机延时
          if (lock2.tryLock(1, TimeUnit.SECONDS)) {
            System.out.println(Thread.currentThread().getName() + "获得了lock2");
            System.out.println(Thread.currentThread().getName() + "同时获取的了两把锁");
            lock1.unlock();  //完成操作后释放同时获取到的两个锁
            lock2.unlock();
            break;
          } else {
            System.out.println(Thread.currentThread().getName() + "获取lock2失败");
            System.out.println(Thread.currentThread().getName() + "释放lock1,并重新获取");
            lock1.unlock();  //无法同时获取两把锁,就释放lock1
          }
        } else {
          System.out.println(Thread.currentThread().getName() + "获取lock1失败");
        }
      }
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }).start();
  new Thread(() -> {
    try {
      while (true) {
        if (lock2.tryLock(2, TimeUnit.SECONDS)) {
          System.out.println(Thread.currentThread().getName() + "获得了lock2");
          Thread.sleep((long) (Math.random() * 10));  //随机延时
          if (lock1.tryLock(2, TimeUnit.SECONDS)) {
            System.out.println(Thread.currentThread().getName() + "获得了lock1");
            System.out.println(Thread.currentThread().getName() + "同时获取的了两把锁");
            lock1.unlock();  //完成操作后释放同时获取到的两个锁
            lock2.unlock();
            break;
          } else {
            System.out.println(Thread.currentThread().getName() + "获取lock1失败");
            System.out.println(Thread.currentThread().getName() + "释放lock2,并重新获取");
            lock2.unlock();  //无法同时获取两把锁,就释放lock2
          }
        } else {
          System.out.println(Thread.currentThread().getName() + "获取lock1失败");
        }
      }
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }).start();
}

其他活跃性问题

活锁

当多个线程或进程互相谦让资源,同时主动释放让对方先执行,导致所有进程或线程依然在运行,但程序却得不到进展,因为线程总是重复做释放资源的事情浪费着CPU资源,就是活锁

public class LiveLock {
  public static void main(String[] args) {
    SingleLogBridge singleLogBridge = new SingleLogBridge();
    Pedestrian zhangsan = new Pedestrian("张三");
    zhangsan.setSingleLogBridge(singleLogBridge);
    Pedestrian lisi = new Pedestrian("李四");
    lisi.setSingleLogBridge(singleLogBridge);
    new Thread(() -> {
      lisi.willGo(zhangsan);
    }).start();
    new Thread(() -> {
      zhangsan.willGo(lisi);
    }).start();
  }
}

class SingleLogBridge {
  private Pedestrian pedestrian;  //桥上的人

  public void setPedestrian(Pedestrian pedestrian) {
    this.pedestrian = pedestrian;
  }

  public Pedestrian getPedestrian() {
    return pedestrian;
  }

  public void goBridge() {  //桥上的人过桥
    System.out.println(pedestrian.getName() + "过桥了");
    pedestrian.setGoBridge(true);  //将这个行人状态设置为已过桥
    this.pedestrian = null;  //过完桥,桥上的人清空
  }
}

class Pedestrian {
  private String name;  //行人名字
  private boolean isGoBridge;  //是否过已经过桥
  private SingleLogBridge singleLogBridge;  //桥

  public Pedestrian(String name) {
    this.name = name;
  }

  public String getName() {
    return name;
  }

  public void setGoBridge(boolean goBridge) {
    isGoBridge = goBridge;
  }

  public boolean isGoBridge() {
    return isGoBridge;
  }

  public void setSingleLogBridge(SingleLogBridge singleLogBridge) {
    this.singleLogBridge = singleLogBridge;
  }

  public void willGo(Pedestrian other) {
    while (!this.isGoBridge()) {  //没过桥就要过桥
      if (singleLogBridge.getPedestrian() == null) {  //桥上没人就要上桥
        singleLogBridge.setPedestrian(this);
      }
      if (singleLogBridge.getPedestrian() != this) {  //桥上已经有人,就等待
        try {
          Thread.sleep(100);
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
        continue;
      }
      if (!other.isGoBridge()) {  //对方没过,先让对方过
        System.out.println(name + ":" + other.getName() + "你先过");
        singleLogBridge.setPedestrian(other);
        continue;
      }
      singleLogBridge.goBridge();  //自己过桥
    }
  }
}

解决方案:加入随机因素,一定概率的避让资源,将willGo()方法中的避让代码修改为下面这样(比如以太网指数退避算法)

if (!other.isGoBridge() && new Random().nextBoolean()) {  //50%概率的,若对方没过,先让对方过
  System.out.println(name + ":" + other.getName() + "你先过");
  singleLogBridge.setPedestrian(other);
  continue;
}

饥饿

当线程或进程需要某些资源,但是却始终得不到,比如:

  • 线程优先级设置的过低,导致该线程无法执行陷入饥饿
  • 其他某个线程获得锁陷入无限循环不释放,导致该线程无法执行陷入饥饿
  • 某个程序始终占用某文件的写锁,导致该程序无法执行陷入饥饿

解决方案:不应该设置线程优先级,不应该使用完锁不释放

加油啊!即便没有转生到异世界,也要拿出真本事!!!\(`Δ’)/
最后更新于 2022-03-23