场景
1 2 3 4 5 6 7
| int p1 = getPriceByS1(); save(p1); int p2 = getPriceByS2(); save(p2); int p3 = getPriceByS3(); save(p3);
|
ThreadPoolExecutor + Future
1 2 3 4 5 6 7 8 9 10 11
| ExecutorService pool = Executors.newFixedThreadPool(3); Future<Integer> f1 = pool.submit(() -> getPriceByS1()); Future<Integer> f2 = pool.submit(() -> getPriceByS2()); Future<Integer> f3 = pool.submit(() -> getPriceByS3());
int p1 = f1.get(); pool.execute(() -> save(p1)); int p2 = f2.get(); pool.execute(() -> save(p2)); int p3 = f3.get(); pool.execute(() -> save(p3));
|
BlockingQueue
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| ExecutorService pool = Executors.newFixedThreadPool(3); Future<Integer> f1 = pool.submit(() -> getPriceByS1()); Future<Integer> f2 = pool.submit(() -> getPriceByS2()); Future<Integer> f3 = pool.submit(() -> getPriceByS3());
BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(); pool.execute(() -> { try { queue.put(f1.get()); } catch (Exception ignored) { } });
pool.execute(() -> { try { queue.put(f2.get()); } catch (Exception ignored) { } });
pool.execute(() -> { try { queue.put(f3.get()); } catch (Exception ignored) { } });
for (int i = 0; i < 3; i++) { int price = queue.take(); pool.execute(() -> save(price)); }
|
CompletionService
- CompletionService的实现原理:内部维护了一个阻塞队列,把任务执行结果的Future对象加入到阻塞队列中
- CompletionService的实现类是ExecutorCompletionService
构造函数
1 2 3
| public ExecutorCompletionService(Executor executor); public ExecutorCompletionService(Executor executor, BlockingQueue<Future<V>> completionQueue);
|
简单使用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| ExecutorService pool = Executors.newFixedThreadPool(3); CompletionService service = new ExecutorCompletionService(pool);
pool.submit(() -> getPriceByS1()); pool.submit(() -> getPriceByS2()); pool.submit(() -> getPriceByS3()); for (int i = 0; i < 3; i++) { Object price = service.take().get(); pool.execute(() -> { try { save((Integer) price); } catch (Exception ignored) { } }); }
|
take + poll
1 2 3 4 5 6
| public Future<V> take() throws InterruptedException;
public Future<V> poll();
public Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException;
|
Forking Cluster
支持并行地调用多个查询服务,只要有一个成功返回结果,整个服务就可以返回了,利用CompletionService可以实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| ExecutorService pool = Executors.newFixedThreadPool(3); CompletionService<Integer> service = new ExecutorCompletionService<>(pool); List<Future<Integer>> futures = new ArrayList<>(3);
futures.add(service.submit(() -> geoCoderByS1())); futures.add(service.submit(() -> geoCoderByS2())); futures.add(service.submit(() -> geoCoderByS3()));
try { Integer price = service.take().get(); } catch (Exception ignored) { for (Future<Integer> future : futures) { future.cancel(true); } }
|
小结
- CompletionService的应用场景:_批量提交异步任务_
- CompletionService将线程池Executor和阻塞队列BlockingQueue融合在一起,使得批量异步任务的管理更简单
- CompletionService能够让异步任务的执行结果有序化,先执行完的先进入阻塞队列,避免无谓的等待
参考资料
Java并发编程实战