Condition
- Condition实现了管程模型中的_条件变量_
- Java内置的管程(synchronized)只有一个条件变量,而Lock&Condition实现的管程支持多个条件变量
- 在很多并发场景下,支持多个条件变量能够让并发程序的可读性更好,也更容易实现
阻塞队列
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
|
public class BlockedQueue<T> { private static final int MAX_SIZE = 10; private final Lock lock = new ReentrantLock(); private final Condition notFull = lock.newCondition(); private final Condition notEmpty = lock.newCondition(); private final Stack<T> stack = new Stack<>();
public void enq(T t) { lock.lock(); try { while (stack.size() >= MAX_SIZE) { notFull.await(); } stack.push(t); notEmpty.signalAll(); } catch (InterruptedException ignored) { } finally { lock.unlock(); } }
public T deq() { lock.lock(); try { while (stack.isEmpty()) { notEmpty.await(); } T pop = stack.pop(); notFull.signalAll(); return pop; } catch (InterruptedException ignored) { } finally { lock.unlock(); } return null; } }
|
- 一个阻塞队列,需要两个条件变量,一个是队列不空(空队列不允许出队),一个是队列不满(队列已满不允许入队)
- Lock&Condition实现的管程,线程等待和通知需要调用_await/signal/signalAll_
- Java内置的管程(synchronized),线程等待和通知需要调用_wait/notify/notifyAll_
同步 + 异步
- 区别:_调用方是否需要等待结果_
- 异步调用:调用方创建一个子线程,在子线程中执行方法调用
- 异步方法:在方法实现的时候,创建一个新的线程执行逻辑,主线程直接return
Dubbo
在TCP协议层面,发送完RPC请求后,系统线程是不会等待RPC的响应结果的,需要RPC框架完成异步转同步的操作
DubboInvoker
1 2 3 4 5 6
| protected Result doInvoke(final Invocation invocation) throws Throwable { ... return (Result) currentClient .request(inv, timeout) .get(); }
|
DefaultFuture
当RPC返回结果之前,阻塞调用线程,让调用线程等待;当RPC返回结果后,唤醒调用线程,让调用线程重新执行
1 2 3 4 5 6 7
| private final Lock lock = new ReentrantLock(); private final Condition done = lock.newCondition();
private volatile Response response;
private volatile ResponseCallback callback;
|
get
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| public boolean isDone() { return response != null; }
public Object get(int timeout) throws RemotingException { ... if (!isDone()) { long start = System.currentTimeMillis(); lock.lock(); try { while (!isDone()) { done.await(timeout, TimeUnit.MILLISECONDS); if (isDone() || System.currentTimeMillis() - start > timeout) { break; } } } catch (InterruptedException e) { throw new RuntimeException(e); } finally { lock.unlock(); } ... } return returnFromResponse(); }
|
doReceived
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| private void doReceived(Response res) { lock.lock(); try { response = res; if (done != null) { done.signalAll(); } } finally { lock.unlock(); } if (callback != null) { invokeCallback(callback); } }
|
参考资料
Java并发编程实战