Copier Pattern

每个数据处理模块输入都是相同的,并且每个数据处理模块都可以单独并且同步地运行处理

b226e637e8cba5f7c3ef938684526373

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
PCollection<Video> videoDataCollection = ...;

// 生成高画质视频
PCollection<Video> highResolutionVideoCollection = videoDataCollection.apply("highResolutionTransform", ParDo.of(new DoFn<Video, Video>(){
@ProcessElement
public void processElement(ProcessContext c) {
c.output(generateHighResolution(c.element()));
}
}));

// 生成低画质视频
PCollection<Video> lowResolutionVideoCollection = videoDataCollection.apply("lowResolutionTransform", ParDo.of(new DoFn<Video, Video>(){
@ProcessElement
public void processElement(ProcessContext c) {
c.output(generateLowResolution(c.element()));
}
}));

// 生成GIF动画
PCollection<Image> gifCollection = videoDataCollection.apply("gifTransform", ParDo.of(new DoFn<Video, Image>(){
@ProcessElement
public void processElement(ProcessContext c) {
c.output(generateGIF(c.element()));
}
}));

// 生成视频字幕
PCollection<Caption> captionCollection = videoDataCollection.apply("captionTransform", ParDo.of(new DoFn<Video, Caption>(){
@ProcessElement
public void processElement(ProcessContext c) {
c.output(generateCaption(c.element()));
}
}));

// 分析视频
PCollection<Report> videoAnalysisCollection = videoDataCollection.apply("videoAnalysisTransform", ParDo.of(new DoFn<Video, Report>(){
@ProcessElement
public void processElement(ProcessContext c) {
c.output(analyzeVideo(c.element()));
}
}));

Filter Pattern

一个数据处理模块将输入的数据集过滤,留下符合条件的数据,然后传输到下一个数据处理模块进行单独处理

47498fc9b2d41c59ffb286d84c4f220f

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
PCollection<User> userCollection = ...;

PCollection<User> diamondUserCollection = userCollection.apply("filterDiamondUserTransform", ParDo.of(new DoFn<User, User>(){
@ProcessElement
public void processElement(ProcessContext c) {
if (isDiamondUser(c.element()) {
c.output(c.element());
}
}
}));

PCollection<User> notifiedUserCollection = userCollection.apply("notifyUserTransform", ParDo.of(new DoFn<User, User>(){
@ProcessElement
public void processElement(ProcessContext c) {
if (notifyUser(c.element()) {
c.output(c.element());
}
}
}));

Splitter Pattern

与 Filter Pattern 不同,不会丢弃任何数据,而是将数据分组处理

c5d84c2aab2e02cc6e1d2e9f7c40e185

使用 side input/side output 技术,把用户群组定义成不同的 PCollection

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// 首先定义每一个output的tag
final TupleTag<User> fiveStarMembershipTag = new TupleTag<User>(){};
final TupleTag<User> goldenMembershipTag = new TupleTag<User>(){};
final TupleTag<User> diamondMembershipTag = new TupleTag<User>(){};

PCollection<User> userCollection = ...;

PCollectionTuple mixedCollection =
userCollection.apply(ParDo
.of(new DoFn<User, User>() {
@ProcessElement
public void processElement(ProcessContext c) {
if (isFiveStartMember(c.element())) {
c.output(c.element());
} else if (isGoldenMember(c.element())) {
c.output(goldenMembershipTag, c.element());
} else if (isDiamondMember(c.element())) {
c.output(diamondMembershipTag, c.element());
}
}
})
.withOutputTags(fiveStarMembershipTag,
TupleTagList.of(goldenMembershipTag).and(diamondMembershipTag)));

// 分离出不同的用户群组
mixedCollection.get(fiveStarMembershipTag).apply(...);

mixedCollection.get(goldenMembershipTag).apply(...);

mixedCollection.get(diamondMembershipTag).apply(...);

Joiner Pattern

将多个不同的数据集合成一个总数据集,一并进行处理

1c4bc9aaebc908633da174ba847999ed

1
2
3
4
5
PCollectionList<Image> collectionList = PCollectionList.of(internalImages).and(thirdPartyImages);
PCollection<Image> mergedCollectionWithFlatten = collectionList
.apply(Flatten.<Image>pCollections());

mergedCollectionWithFlatten.apply(...);
  1. 使用 Beam 合并多个 PCollection 时,用到了 Beam 自带的 Flatten Transform
  2. Flatten Transform - 把来自多个 PCollection 类型一致的元素融合到一个 PCollection 中