Copier Pattern
每个数据处理模块的输入都是相同的,并且每个数据处理模块都可以单独并且同步地运行处理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| PCollection<Video> videoDataCollection = ...;
PCollection<Video> highResolutionVideoCollection = videoDataCollection.apply("highResolutionTransform", ParDo.of(new DoFn<Video, Video>(){ @ProcessElement public void processElement(ProcessContext c) { c.output(generateHighResolution(c.element())); } }));
PCollection<Video> lowResolutionVideoCollection = videoDataCollection.apply("lowResolutionTransform", ParDo.of(new DoFn<Video, Video>(){ @ProcessElement public void processElement(ProcessContext c) { c.output(generateLowResolution(c.element())); } }));
PCollection<Image> gifCollection = videoDataCollection.apply("gifTransform", ParDo.of(new DoFn<Video, Image>(){ @ProcessElement public void processElement(ProcessContext c) { c.output(generateGIF(c.element())); } }));
PCollection<Caption> captionCollection = videoDataCollection.apply("captionTransform", ParDo.of(new DoFn<Video, Caption>(){ @ProcessElement public void processElement(ProcessContext c) { c.output(generateCaption(c.element())); } }));
PCollection<Report> videoAnalysisCollection = videoDataCollection.apply("videoAnalysisTransform", ParDo.of(new DoFn<Video, Report>(){ @ProcessElement public void processElement(ProcessContext c) { c.output(analyzeVideo(c.element())); } }));
|
Filter Pattern
一个数据处理模块将输入的数据集过滤,留下符合条件的数据,然后传输到下一个数据处理模块进行单独处理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| PCollection<User> userCollection = ...;
PCollection<User> diamondUserCollection = userCollection.apply("filterDiamondUserTransform", ParDo.of(new DoFn<User, User>(){ @ProcessElement public void processElement(ProcessContext c) { if (isDiamondUser(c.element()) { c.output(c.element()); } } }));
PCollection<User> notifiedUserCollection = userCollection.apply("notifyUserTransform", ParDo.of(new DoFn<User, User>(){ @ProcessElement public void processElement(ProcessContext c) { if (notifyUser(c.element()) { c.output(c.element()); } } }));
|
Splitter Pattern
与 Filter Pattern 不同,不会丢弃任何数据,而是将数据分组处理
使用 side input/side output 技术,把用户群组定义成不同的 PCollection
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| final TupleTag<User> fiveStarMembershipTag = new TupleTag<User>(){}; final TupleTag<User> goldenMembershipTag = new TupleTag<User>(){}; final TupleTag<User> diamondMembershipTag = new TupleTag<User>(){};
PCollection<User> userCollection = ...;
PCollectionTuple mixedCollection = userCollection.apply(ParDo .of(new DoFn<User, User>() { @ProcessElement public void processElement(ProcessContext c) { if (isFiveStartMember(c.element())) { c.output(c.element()); } else if (isGoldenMember(c.element())) { c.output(goldenMembershipTag, c.element()); } else if (isDiamondMember(c.element())) { c.output(diamondMembershipTag, c.element()); } } }) .withOutputTags(fiveStarMembershipTag, TupleTagList.of(goldenMembershipTag).and(diamondMembershipTag)));
mixedCollection.get(fiveStarMembershipTag).apply(...);
mixedCollection.get(goldenMembershipTag).apply(...);
mixedCollection.get(diamondMembershipTag).apply(...);
|
Joiner Pattern
将多个不同的数据集合成一个总数据集,一并进行处理
1 2 3 4 5
| PCollectionList<Image> collectionList = PCollectionList.of(internalImages).and(thirdPartyImages); PCollection<Image> mergedCollectionWithFlatten = collectionList .apply(Flatten.<Image>pCollections());
mergedCollectionWithFlatten.apply(...);
|
- 使用 Beam 合并多个 PCollection 时,用到了 Beam 自带的 Flatten Transform
- Flatten Transform - 把来自多个 PCollection 类型一致的元素融合到一个 PCollection 中