有界数据 vs 无界数据

  1. 在 Beam 中,可以用同一个 Pipeline 处理有界数据无界数据
  2. 无论是有界数据还是无界数据,在 Beam 中,都可以用窗口把数据按时间分割成一些有限大小的集合
    • 对于无界数据,必须使用窗口对数据进行分割,然后对每个窗口内的数据集进行处理

读取无界数据

withLogAppendTime - 使用 Kafka 的 log append time 作为 PCollection 的时间戳

1
2
3
4
5
6
7
8
Pipeline pipeline = Pipeline.create();
pipeline.apply(
KafkaIO.<String, String>read()
.withBootstrapServers("broker_1:9092,broker_2:9092")
.withTopic("shakespeare") // use withTopics(List<String>) to read from multiple topics.
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
.withLogAppendTime());

PCollection + Timestamp

  1. 一般情况下,窗口的使用场景中,时间戳都是原生
    • 从 Kafka 读取消息记录,每一条 Kafka 消息都有时间戳
  2. Beam 允许手动给 PCollection 中的元素添加时间戳
1
2
3
4
5
6
7
8
9
10
11
2019-07-05:  HAMLET

2019-07-06: ACT I

2019-07-06: SCENE I Elsinore. A platform before the castle.

2019-07-07: [FRANCISCO at his post. Enter to him BERNARDO]

2019-07-07: BERNARDO Who's there?

2019-07-07: FRANCISCO Nay, answer me: stand, and unfold yourself.

outputWithTimestamp - 对每一个 PCollection 中的元素附上它所对应的时间戳

1
2
3
4
5
6
7
8
9
10
static class ExtractTimestampFn extends DoFn<String, String> {
@ProcessElement
public void processElement(ProcessContext c) {
String extractedLine = extractLine(c.element());
Instant timestamp =
new Instant(extractTimestamp(c.element());

c.outputWithTimestamp(extractedLine, timestamp);
}
}

PCollection + Window

  1. 无界数据的应用场景中,时间戳往往是数据记录自带
  2. 有界数据的应用场景中,时间戳往往需要指定
  3. PCollection 元素有了时间戳后,就能根据时间戳应用窗口对数据进行划分 - 固定、滑动、会话
  4. 将特定的窗口应用到 PCollection 上,同样使用 PCollection 的 apply 方法
1
2
3
PCollection<String> windowedWords = input
.apply(Window.<String>into(
FixedWindows.of(Duration.standardMinutes(options.getWindowSize()))));

复用 DoFn 和 PTransform

  1. Beam 的 Transform 不区分有界数据还是无界数据,可以直接复用
  2. 应用了窗口后,Beam 的 Transform 是在每个窗口内进行数据处理
1
PCollection<KV<String, Long>> wordCounts = windowedWords.apply(new WordCount.CountWords());

输出无界数据

输出结果也是针对每个窗口

1
pipeline.apply("Write to PubSub", PubsubIO.writeStrings().to(options.getOutputTopic()));