1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
| @AllArgsConstructor class MapReduce extends RecursiveTask<Map<String, Long>> { private String[] fc; private int start; private int end;
@Override protected Map<String, Long> compute() { if (end - start == 1) { return calc(fc[start]); } else { int mid = (start + end) / 2; MapReduce mr1 = new MapReduce(fc, start, mid); mr1.fork(); MapReduce mr2 = new MapReduce(fc, mid, end); return merge(mr2.compute(), mr1.join()); } }
private Map<String, Long> calc(String line) { Map<String, Long> result = new HashMap<>(); String[] words = line.split("\\s+"); for (String word : words) { if (result.containsKey(word)) { result.put(word, result.get(word) + 1); } else { result.put(word, 1L); } } return result; }
private Map<String, Long> merge(Map<String, Long> r1, Map<String, Long> r2) { Map<String, Long> result = new HashMap<>(r1); r2.forEach((word, count) -> { if (result.containsKey(word)) { result.put(word, result.get(word) + count); } else { result.put(word, count); } }); return result; } }
String[] fc = {"hello world", "hello me", "hello fork", "hello join", "fork join in world"}; ForkJoinPool pool = new ForkJoinPool(3); MapReduce mapReduce = new MapReduce(fc, 0, fc.length); Map<String, Long> result = pool.invoke(mapReduce); result.forEach((word, count) -> System.out.println(word + " : " + count));
|