任务视角

  1. 线程池+Future:简单并行任务
  2. CompletableFuture:聚合任务
  3. CompletionService:批量并行任务
  4. Fork/Join:_分治_

分治任务模型

  1. 分治任务模型分为两个阶段:任务分解 + 结果合并
  2. 任务分解:将任务迭代地分解为子任务,直至子任务可以直接计算出结果
    • 任务和分解后的子任务具有相似性(算法相同,只是计算的数据规模不同,往往采用递归算法)
  3. 结果合并:逐层合并子任务的执行结果,直至获得最终结果

Fork/Join

概述

  1. Fork/Join是并行计算的框架,主要用来支持分治任务模型,Fork对应任务分解,Join对应结果合并
  2. Fork/Join框架包含两部分:分治任务ForkJoinTask + 分治任务线程池ForkJoinPool
    • 类似于Runnable + ThreadPoolExecutor
  3. ForkJoinTask最核心的方法是forkjoin
    • fork:异步地执行一个子任务
    • join:阻塞当前线程,等待子任务的执行结果
  4. ForkJoinTask有两个子类:RecursiveAction + RecursiveTask
    • Recursive:通过递归的方式来处理分治任务
    • RecursiveAction.compute:没有返回值
    • RecursiveTask.compute:有返回值

简单使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// 递归任务
@AllArgsConstructor
class Fibonacci extends RecursiveTask<Integer> {
private final int n;

@Override
protected Integer compute() {
if (n <= 1) {
return n;
}
// 创建子任务
Fibonacci f1 = new Fibonacci(n - 1);
f1.fork();
Fibonacci f2 = new Fibonacci(n - 2);
f2.fork();
// 等待子任务结果并合并
return f1.join() + f2.join();
}
}

// 创建分治任务线程池
ForkJoinPool pool = new ForkJoinPool(4);
// 创建分治任务
Fibonacci fibonacci = new Fibonacci(30);
// 启动分治任务
System.out.println(pool.invoke(fibonacci)); // 832040

ForkJoinPool的工作原理

  1. Fork/Join并行计算的核心组件是ForkJoinPool
  2. ThreadPoolExecutor本质上是生产者-消费者模式的实现
    • 内部有一个任务队列,该任务队列是生产者和消费者通信的媒介
    • ThreadPoolExecutor可以有多个工作线程,但这些工作线程都_共享一个任务队列_
  3. ForkJoinPool本质上也是生产者-消费者模式的实现,但更加智能
    • ThreadPoolExecutor内部只有一个任务队列,而ForkJoinPool内部有多个任务队列
    • 当通过invoke或submit提交任务时,ForkJoinPool会根据一定的路由规则把任务提交到一个任务队列
      • 如果任务在执行过程中创建子任务,那么该子任务被会提交到工作线程对应的任务队列
    • ForkJoinPool支持任务窃取,如果工作线程空闲了,那么它会窃取其他任务队列里的任务
    • ForkJoinPool的任务队列是_双端队列_
      • 工作线程正常获取任务窃取任务分别从任务队列不同的端消费,避免不必要的数据竞争

统计单词数量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
@AllArgsConstructor
class MapReduce extends RecursiveTask<Map<String, Long>> {
private String[] fc;
private int start;
private int end;

@Override
protected Map<String, Long> compute() {
if (end - start == 1) {
return calc(fc[start]);
} else {
int mid = (start + end) / 2;
// 前半部分数据fork一个递归任务
MapReduce mr1 = new MapReduce(fc, start, mid);
mr1.fork();
// 后半部分数据在当前任务中递归处理
MapReduce mr2 = new MapReduce(fc, mid, end);
// 计算子任务,返回合并的结果
return merge(mr2.compute(), mr1.join());
}
}

// 统计单词数量
private Map<String, Long> calc(String line) {
Map<String, Long> result = new HashMap<>();
String[] words = line.split("\\s+");
for (String word : words) {
if (result.containsKey(word)) {
result.put(word, result.get(word) + 1);
} else {
result.put(word, 1L);
}
}
return result;
}

// 合并结果
private Map<String, Long> merge(Map<String, Long> r1, Map<String, Long> r2) {
Map<String, Long> result = new HashMap<>(r1);
r2.forEach((word, count) -> {
if (result.containsKey(word)) {
result.put(word, result.get(word) + count);
} else {
result.put(word, count);
}
});
return result;
}
}

String[] fc = {"hello world",
"hello me",
"hello fork",
"hello join",
"fork join in world"};
ForkJoinPool pool = new ForkJoinPool(3);
MapReduce mapReduce = new MapReduce(fc, 0, fc.length);
Map<String, Long> result = pool.invoke(mapReduce);
result.forEach((word, count) -> System.out.println(word + " : " + count));

小结

  1. Fork/Join并行计算框架主要解决的是分治任务,分治的核心思想是_分而治之_
  2. Fork/Join并行计算框架的核心组件是ForkJoinPool,支持任务窃取,让所有线程的工作量基本均衡
  3. Java 1.8提供的Stream API里的并行流是以ForkJoinPool为基础的
    • 默认情况下,所有并行流计算都共享一个ForkJoinPool,该共享的ForkJoinPool的线程数是CPU核数
    • 如果存在IO密集型的并行流计算,那可能会因为一个很慢的IO计算而影响整个系统的性能
    • 因此,建议_用不同的ForkJoinPool执行不同类型的计算任务_

参考资料

Java并发编程实战