循环等待
- 在《Java并发 – 死锁》中,通过破坏占用且等待条件来规避死锁,核心代码如下
while (!allocator.apply(this, target)) {}
- 如果apply()操作的时间非常短,并且并发不大,该方案还能应付
- 一旦apply()操作比较耗时,或者并发比较大,该方案就不适用了
- 因为这可能需要循环上万次才能获得锁,非常_消耗CPU_
- 最好的方案:_等待-通知_
- 当线程要求的条件不满足,则线程阻塞自己,进入等待状态
- 当线程要求的条件满足后,通知等待的线程,重新开始执行
- 线程阻塞能够避免因循环等待而消耗CPU的问题
- Java语言原生支持等待-通知机制
- 线程首先获取互斥锁,当线程要求的条件不满足时,释放互斥锁,进入等待状态
- 当要求的条件满足时,通知等待的线程,重新获取互斥锁
等待-通知机制
Java语言原生支持的等待-通知机制:**synchronized + wait + notify/notifyAll
**
wait
- 每个互斥锁有两个独立的等待队列,如上图所示,等待队列L和等待队列R
- 左边有一个等待队列(等待队列L),_在同一时刻,只允许一个线程进入synchronized保护的临界区_
- 当已经有一个线程进入synchronized保护的临界区后,其他线程就只能进入等待队列L进行等待
- 当一个线程进入临界区后,由于某些条件不满足,需要进入等待状态,可以调用wait()方法
- 当调用wait()方法后,当前线程就会被阻塞,并且进入到右边的等待队列(等待队列R)
- 线程在进入等待队列R的同时,会释放持有的互斥锁,其他线程就有机会获得锁,并进入临界区
- 关键点:_sleep不会释放互斥锁_
notify/notifyAll
- 当线程要求的条件满足时,可以通过
notify/notifyAll
来通知等待的线程
- 当条件满足时调用notify(),会通知等待队列R(将等待队列L中第1个节点移到等待队列R)中的线程,告知它_条件曾经满足_
- notify()只能保证在通知的时间点,条件是满足的
- 而被通知线程的执行时间点与通知时间点基本上不会重合,当线程执行的时候,条件很可能已经不满足了
- 被通知的线程如果想要重新执行,仍然需要先获取到互斥锁
- 关键点:_执行notify/notifyAll并不会释放互斥锁,在synchronized代码块结束后才真正的释放互斥锁_
编码范式
1 2 3
| while (条件不满足) { wait(); }
|
- 可以解决条件曾经满足的问题
- 当wait()返回时,条件有可能已经改变了,需要重新检验条件是否满足,如果不满足,继续wait()
notifyAll
- 尽量使用notifyAll()
- notify():会随机地通知等待队列R中的一个线程
- 隐含动作:先将等待队列L的第一个节点移动到等待队列R
- notifyAll():会通知等待队列R中的所有线程
- 隐含动作:先将等待队列L的所有节点移动到等待队列R(待确定是否正确)
- notify()的风险:_可能导致某些线程永远不会被通知到_
- 假设有资源A、B、C、D,线程1~4都对应同一个互斥锁L
- 线程1申请到了AB,线程2申请到了CD
- 此时线程3申请AB,会进入互斥锁L的等待队列L,线程4申请CD,也会进入互斥锁L的等待队列L
- 线程1归还AB,通过notify()来通知互斥锁L的等待队列R中的线程,假设为线程4(先被移动到等待队列R)
- 但线程4申请的是CD,不满足条件,执行wait(),而真正该被唤醒的线程3就再也没有机会被唤醒了
等待队列
- wait/notify/notifyAll操作的等待队列都是_互斥锁的等待队列_
- 如果synchronized锁定的是this,那么对应的一定是this.wait()/this.notify()/this.notifyAll()
- 如果synchronized锁定的是target,那么对应的一定是target.wait()/target.notify()/target.notifyAll()
- 上面这3个方法能够被调用的前提是已经获取了相应的互斥锁,都必须在synchronized内部被调用
- 如果在synchronized外部调用,或者锁定的是this,而调用的是target.wait(),JVM会抛出IllegalMonitorStateException
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| public class IllegalMonitorStateExceptionTest { private Object lockA = new Object(); private Object lockB = new Object();
@Test public void test1() throws InterruptedException { lockA.wait(); }
@Test public void test2() throws InterruptedException { synchronized (lockA) { lockB.wait(); } } }
|
转账实例
Allocator
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| public class Allocator {
private static class Holder { private static Allocator allocator = new Allocator(); }
public static Allocator getInstance() { return Holder.allocator; }
private Allocator() { }
private List<Object> als = new ArrayList<>();
public synchronized void apply(Object from, Object to) { while (als.contains(from) || als.contains(to)) { try { wait(); } catch (InterruptedException e) { e.printStackTrace(); } } als.add(from); als.add(to); }
public synchronized void free(Object from, Object to) { als.remove(from); als.remove(to); notifyAll(); } }
|
Account
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| public class Account { private Allocator allocator = Allocator.getInstance(); private int balance;
public void transfer(Account target, int amt) { allocator.apply(this, target);
try { synchronized (this) { synchronized (target) { if (balance > amt) { balance -= amt; target.balance += amt; } } } } finally { allocator.free(this, target); } } }
|
参考资料
Java并发编程实战