有界数据 vs 无界数据
- 在 Beam 中,可以用同一个 Pipeline 处理有界数据和无界数据
- 无论是有界数据还是无界数据,在 Beam 中,都可以用窗口把数据按时间分割成一些有限大小的集合
- 对于无界数据,必须使用窗口对数据进行分割,然后对每个窗口内的数据集进行处理
读取无界数据
withLogAppendTime - 使用 Kafka 的 log append time 作为 PCollection 的时间戳
1 2 3 4 5 6 7 8
| Pipeline pipeline = Pipeline.create(); pipeline.apply( KafkaIO.<String, String>read() .withBootstrapServers("broker_1:9092,broker_2:9092") .withTopic("shakespeare") .withKeyDeserializer(StringDeserializer.class) .withValueDeserializer(StringDeserializer.class) .withLogAppendTime());
|
PCollection + Timestamp
- 一般情况下,窗口的使用场景中,时间戳都是原生的
- 从 Kafka 读取消息记录,每一条 Kafka 消息都有时间戳
- Beam 允许手动给 PCollection 中的元素添加时间戳
1 2 3 4 5 6 7 8 9 10 11
| 2019-07-05: HAMLET
2019-07-06: ACT I
2019-07-06: SCENE I Elsinore. A platform before the castle.
2019-07-07: [FRANCISCO at his post. Enter to him BERNARDO]
2019-07-07: BERNARDO Who's there?
2019-07-07: FRANCISCO Nay, answer me: stand, and unfold yourself.
|
outputWithTimestamp - 对每一个 PCollection 中的元素附上它所对应的时间戳
1 2 3 4 5 6 7 8 9 10
| static class ExtractTimestampFn extends DoFn<String, String> { @ProcessElement public void processElement(ProcessContext c) { String extractedLine = extractLine(c.element()); Instant timestamp = new Instant(extractTimestamp(c.element());
c.outputWithTimestamp(extractedLine, timestamp); } }
|
PCollection + Window
- 在无界数据的应用场景中,时间戳往往是数据记录自带的
- 在有界数据的应用场景中,时间戳往往需要指定的
- PCollection 元素有了时间戳后,就能根据时间戳应用窗口对数据进行划分 - 固定、滑动、会话
- 将特定的窗口应用到 PCollection 上,同样使用 PCollection 的 apply 方法
1 2 3
| PCollection<String> windowedWords = input .apply(Window.<String>into( FixedWindows.of(Duration.standardMinutes(options.getWindowSize()))));
|
- Beam 的 Transform 不区分有界数据还是无界数据,可以直接复用
- 应用了窗口后,Beam 的 Transform 是在每个窗口内进行数据处理
1
| PCollection<KV<String, Long>> wordCounts = windowedWords.apply(new WordCount.CountWords());
|
输出无界数据
输出结果也是针对每个窗口的
1
| pipeline.apply("Write to PubSub", PubsubIO.writeStrings().to(options.getOutputTopic()));
|