步骤
用 Pipeline IO 读取文本
用 Transform 对文本进行分词和词频统计
用 Pipeline IO 输出结果
将所有步骤打包成一个 Pipeline
创建 Pipeline
默认 情况下,将采用 DirectRunner 在本地运行
1 PipelineOptions options = PipelineOptionsFactory.create();
一个 Pipeline 实例会构建数据处理的 DAG ,以及这个 DAG 所需要的 Transform
1 Pipeline p = Pipeline.create(options);
TextIO.Read - 读取外部文件,生成一个 PCollection,包含所有文本行,每个元素都是文本中的一行
1 2 3 String filepattern = "file:///Users/zhongmingmao/workspace/java/hello-beam/corpus/shakespeare.txt" ; PCollection<String> lines = p.apply(TextIO.read().from(filepattern));
分词
1 2 3 4 5 PCollection<String> words = lines.apply( "ExtractWords" , FlatMapElements.into(TypeDescriptors.strings()) .via((String word) -> Arrays.asList(word.split("[^\\p{L}]+" ))));
Count Transform - 把任意一个 PCollection 转换成 Key-Value 组合Key 为原来 PCollection 中非重复 的元素,Value 为元素出现的次数
1 PCollection<KV<String, Long>> counts = words.apply(Count.<String>perElement());
将 Key-Value 组成的 PCollection 转换成输出格式
1 2 3 4 5 6 7 PCollection<String> formatted = counts.apply( "FormatResults" , MapElements.into(TypeDescriptors.strings()) .via( (KV<String, Long> wordCount) -> wordCount.getKey() + ": " + wordCount.getValue()));
TextIO.Write - 把最终的 PCollection 写进文本,每个元素 都会被写成文本文件中独立的一行
1 formatted.apply(TextIO.write().to("/tmp/wordcounts" ));
运行 Pipeline
Pipeline.run - 把 Pipeline 所包含的 Transform 优化 并放到执行的 Runner 上执行 - 默认异步 执行
1 p.run().waitUntilFinish();
代码优化 独立 DoFn
提高:可读性 + 复用性 + 可测试性
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 import org.apache.beam.sdk.metrics.Counter;import org.apache.beam.sdk.metrics.Distribution;import org.apache.beam.sdk.metrics.Metrics;import org.apache.beam.sdk.transforms.DoFn;public class ExtractWordsFn extends DoFn <String, String> { private final Counter emptyLines = Metrics.counter(ExtractWordsFn.class, "emptyLines" ); private final Distribution lineLenDist = Metrics.distribution(ExtractWordsFn.class, "lineLenDistro" ); @DoFn .ProcessElement public void processElement (@DoFn .Element String element, OutputReceiver<String> receiver) { lineLenDist.update(element.length()); if (element.trim().isEmpty()) { emptyLines.inc(); } String[] words = element.split("[^\\p{L}]+" , -1 ); for (String word : words) { if (!word.isEmpty()) { receiver.output(word); } } } }
FormatAsTextFn 1 2 3 4 5 6 7 8 9 10 11 import org.apache.beam.sdk.transforms.DoFn;import org.apache.beam.sdk.values.KV;public class FormatAsTextFn extends DoFn <KV<String, Long>, String> { @DoFn .ProcessElement public void processElement ( @DoFn .Element KV<String, Long> wordCount, DoFn.OutputReceiver<String> receiver) { receiver.output(wordCount.getKey() + ": " + wordCount.getValue()); } }
PTransform - 整合一些相关联 的 Transform 输入输出类型 - 一连串 Transform 的最初输入 和最终输出
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 import org.apache.beam.sdk.transforms.Count;import org.apache.beam.sdk.transforms.PTransform;import org.apache.beam.sdk.transforms.ParDo;import org.apache.beam.sdk.values.KV;import org.apache.beam.sdk.values.PCollection;public class CountWords extends PTransform <PCollection<String>, PCollection<KV<String, Long>>> { @Override public PCollection<KV<String, Long>> expand (PCollection<String> lines) { PCollection<String> words = lines.apply(ParDo.of(new ExtractWordsFn ())); PCollection<KV<String, Long>> wordCounts = words.apply(Count.perElement()); return wordCounts; } }
参数化 PipelineOptions 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 import org.apache.beam.sdk.options.Description;import org.apache.beam.sdk.options.PipelineOptions;import org.apache.beam.sdk.options.Validation;public interface WordCountOptions extends PipelineOptions { @Description("Path of the file to read from") @Validation .Required String getInputFile () ; void setInputFile (String value) ; @Description("Path of the file to write to") @Validation .Required String getOutput () ; void setOutput (String value) ; }
main
1 2 3 4 5 6 7 8 9 10 11 12 public static void main (String[] args) { WordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(WordCountOptions.class); Pipeline p = Pipeline.create(options); p.apply("ReadLines" , TextIO.read().from(options.getInputFile())) .apply(new CountWords ()) .apply(ParDo.of(new FormatAsTextFn ())) .apply("WriteCounts" , TextIO.write().to(options.getOutput())); p.run().waitUntilFinish(); }
单元测试
将数据处理操作 封装成 DoFn 和 PTransform - 可以独立测试
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public class ExtractWordsFnTest { @Test public void testExtractWordsFn () throws Exception { DoFnTester<String, String> extractWordsFn = DoFnTester.of(new ExtractWordsFn ()); Assert.assertThat( extractWordsFn.processBundle(" some input words " ), CoreMatchers.hasItems("some" , "input" , "words" )); Assert.assertThat(extractWordsFn.processBundle(" " ), CoreMatchers.hasItems()); Assert.assertThat( extractWordsFn.processBundle(" some " , " input" , " words" ), CoreMatchers.hasItems("some" , "input" , "words" )); } }