步骤

  1. Pipeline IO 读取文本
  2. Transform 对文本进行分词和词频统计
  3. Pipeline IO 输出结果
  4. 将所有步骤打包成一个 Pipeline

c6b63574f6005aaa4a6aba366b0a5dcd

创建 Pipeline

默认情况下,将采用 DirectRunner 在本地运行

1
PipelineOptions options = PipelineOptionsFactory.create();

一个 Pipeline 实例会构建数据处理的 DAG,以及这个 DAG 所需要的 Transform

1
Pipeline p = Pipeline.create(options);

应用 Transform

TextIO.Read - 读取外部文件,生成一个 PCollection,包含所有文本行,每个元素都是文本中的一行

1
2
3
String filepattern =
"file:///Users/zhongmingmao/workspace/java/hello-beam/corpus/shakespeare.txt";
PCollection<String> lines = p.apply(TextIO.read().from(filepattern));

分词

1
2
3
4
5
PCollection<String> words =
lines.apply(
"ExtractWords",
FlatMapElements.into(TypeDescriptors.strings())
.via((String word) -> Arrays.asList(word.split("[^\\p{L}]+"))));

Count Transform - 把任意一个 PCollection 转换成 Key-Value 组合
Key 为原来 PCollection 中非重复的元素,Value 为元素出现的次数

1
PCollection<KV<String, Long>> counts = words.apply(Count.<String>perElement());

将 Key-Value 组成的 PCollection 转换成输出格式

1
2
3
4
5
6
7
PCollection<String> formatted =
counts.apply(
"FormatResults",
MapElements.into(TypeDescriptors.strings())
.via(
(KV<String, Long> wordCount) ->
wordCount.getKey() + ": " + wordCount.getValue()));

TextIO.Write - 把最终的 PCollection 写进文本,每个元素都会被写成文本文件中独立的一行

1
formatted.apply(TextIO.write().to("/tmp/wordcounts"));

运行 Pipeline

Pipeline.run - 把 Pipeline 所包含的 Transform 优化并放到执行的 Runner 上执行 - 默认异步执行

1
p.run().waitUntilFinish();

代码优化

独立 DoFn

提高:可读性 + 复用性 + 可测试性

ExtractWordsFn

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Distribution;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.transforms.DoFn;

public class ExtractWordsFn extends DoFn<String, String> {

private final Counter emptyLines = Metrics.counter(ExtractWordsFn.class, "emptyLines");
private final Distribution lineLenDist =
Metrics.distribution(ExtractWordsFn.class, "lineLenDistro");

@DoFn.ProcessElement
public void processElement(@DoFn.Element String element, OutputReceiver<String> receiver) {
lineLenDist.update(element.length());
if (element.trim().isEmpty()) {
emptyLines.inc();
}

// Split the line into words.
String[] words = element.split("[^\\p{L}]+", -1);
// Output each word encountered into the output PCollection.
for (String word : words) {
if (!word.isEmpty()) {
receiver.output(word);
}
}
}
}

FormatAsTextFn

1
2
3
4
5
6
7
8
9
10
11
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.values.KV;

public class FormatAsTextFn extends DoFn<KV<String, Long>, String> {

@DoFn.ProcessElement
public void processElement(
@DoFn.Element KV<String, Long> wordCount, DoFn.OutputReceiver<String> receiver) {
receiver.output(wordCount.getKey() + ": " + wordCount.getValue());
}
}

PTransform

PTransform - 整合一些相关联的 Transform
输入输出类型 - 一连串 Transform 的最初输入最终输出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;

public class CountWords extends PTransform<PCollection<String>, PCollection<KV<String, Long>>> {

@Override
public PCollection<KV<String, Long>> expand(PCollection<String> lines) {
// Convert lines of text into individual words.
PCollection<String> words = lines.apply(ParDo.of(new ExtractWordsFn()));

// Count the number of times each word occurs.
PCollection<KV<String, Long>> wordCounts = words.apply(Count.perElement());
return wordCounts;
}
}

参数化 PipelineOptions

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.Validation;

public interface WordCountOptions extends PipelineOptions {

@Description("Path of the file to read from")
@Validation.Required
String getInputFile();

void setInputFile(String value);

@Description("Path of the file to write to")
@Validation.Required
String getOutput();

void setOutput(String value);
}

main

1
2
3
4
5
6
7
8
9
10
11
12
public static void main(String[] args) {
WordCountOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(WordCountOptions.class);
Pipeline p = Pipeline.create(options);

p.apply("ReadLines", TextIO.read().from(options.getInputFile()))
.apply(new CountWords())
.apply(ParDo.of(new FormatAsTextFn()))
.apply("WriteCounts", TextIO.write().to(options.getOutput()));

p.run().waitUntilFinish();
}

单元测试

数据处理操作封装成 DoFnPTransform - 可以独立测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class ExtractWordsFnTest {

@Test
public void testExtractWordsFn() throws Exception {
// Use DoFnTester to test the ExtractWordsFn DoFn.
DoFnTester<String, String> extractWordsFn = DoFnTester.of(new ExtractWordsFn());

Assert.assertThat(
extractWordsFn.processBundle(" some input words "),
CoreMatchers.hasItems("some", "input", "words"));
Assert.assertThat(extractWordsFn.processBundle(" "), CoreMatchers.hasItems());
Assert.assertThat(
extractWordsFn.processBundle(" some ", " input", " words"),
CoreMatchers.hasItems("some", "input", "words"));
}
}