对账系统
1 2 3 4 5 6 7 8 9 10 11
| while (existUnreconciledOrders()) { pOrder = getPOrder(); dOrder = getDOrder(); Order diff = check(pOrder, dOrder); save(diff); }
|
性能瓶颈
getPOrder()和getDOrder()最为耗时,并且两个操作没有先后顺序的依赖,可以并行处理
简单并行 - join
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
|
while (existUnreconciledOrders()) { Thread t1 = new Thread(() -> { pOrder = getPOrder(); }); t1.start();
Thread t2 = new Thread(() -> { dOrder = getDOrder(); }); t2.start();
t1.join(); t2.join();
Order diff = check(pOrder, dOrder); save(diff); }
|
while循环里每次都会创建新的线程,而创建线程是一个耗时的操作,可以考虑线程池来优化
线程池
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| Executor executor = Executors.newFixedThreadPool(2);
while (existUnreconciledOrders()) { executor.execute(() -> { pOrder = getPOrder(); });
executor.execute(() -> { dOrder = getDOrder(); });
Order diff = check(pOrder, dOrder); save(diff); }
|
- 实现等待的简单方案:计数器 + 管程
- 计数器的初始值为2,当执行完getPOrder()或getDOrder()后,计数器减1,主线程会等待计数器等于0
- 等待计数器等于0其实是一个条件变量,可以利用管程来实现,在JUC中提供了类似的工具类_CountDownLatch_
CountDownLatch
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| Executor executor = Executors.newFixedThreadPool(2);
while (existUnreconciledOrders()) { CountDownLatch latch = new CountDownLatch(2); executor.execute(() -> { pOrder = getPOrder(); latch.countDown(); });
executor.execute(() -> { dOrder = getDOrder(); latch.countDown(); });
latch.await();
Order diff = check(pOrder, dOrder); save(diff); }
|
- 此时, getPOrder()和getDOrder()两个查询操作是并行的,但两个查询操作和对账操作check和save还是串行的
- 实际上,在执行对账操作的时候,可以同时去执行下一轮的查询操作,达到_完全的并行_
完全并行
- 两次查询操作能够和对账操作并行,对账操作还依赖于查询操作的结果,类似于_生产者-消费者_
- 既然是生产者-消费者模型,就需要用到队列,用来保存生产者生成的数据,而消费者从这个队列消费数据
- 针对对账系统,可以设计两个队列,这两个队列之间的元素是有一一对应的关系
- 订单查询操作将订单查询结果插入到订单队列
- 派送单查询操作将派送单插入到派送单队列
- 用双队列实现完全的并行
- 线程T1执行订单查询工作,线程T2执行派送单查询工作,当T1和T2各自生产完1条数据后,通知线程T3执行对账
- 隐藏条件:T1和T2工作的相互等待,步调要一致
- 实现方案
- 计数器初始化为2,线程T1和线程T2生产完1条数据后都将计数器减1
- 如果计数器大于0,则线程T1或者T2等待
- 如果计数器等于0,则通知线程T3,并唤醒等待的线程T1或者T2,与此同时,将计数器重置为2
- JUC提供了类似的工具类_CyclicBarrier_
CyclicBarrier
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
| private Vector<Order> pos;
private Vector<Order> dos;
private Executor executor = Executors.newFixedThreadPool(1);
private final CyclicBarrier barrier = new CyclicBarrier(2, () -> { executor.execute(this::check); });
private void check() { Order p = pos.remove(0); Order d = dos.remove(0); Order diff = check(p, d); save(diff); }
private void getOrders() { Thread t1 = new Thread(() -> { while (existUnreconciledOrders()) { pos.add(getDOrder()); try { barrier.await(); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } } }); t1.start();
Thread t2 = new Thread(() -> { while (existUnreconciledOrders()) { dos.add(getDOrder()); try { barrier.await(); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } } }); t2.start(); }
|
回调线程池
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| int index = --count; if (index == 0) { boolean ranAction = false; try { final Runnable command = barrierCommand; if (command != null) command.run(); ranAction = true; nextGeneration(); return 0; } finally { if (!ranAction) breakBarrier(); } }
|
- CyclicBarrier是同步调用回调函数后才唤醒等待的线程的,如果不采用回调线程池,无法提升性能
- 遇到回调函数时,需要考虑执行回调的线程是哪一个
- 执行CyclicBarrier的回调函数线程是将CyclicBarrier内部计数器减到0的那个线程
小结
- CountDownLatch:主要用来解决一个线程等待多个线程的场景
- CyclicBarrier:主要用来解决一组线程之间互相等待的场景
- CountDownLatch的计数器不能循环利用,一旦计数器减到0,再有线程调用await(),该线程会直接通过
- CyclicBarrier的计数器是可以循环利用的,具备自动重置的功能,还支持设置回调函数
参考资料
Java并发编程实战