Window

  1. 在 Beam 中,WindowPCollection 里的每个元素根据时间戳划分成不同的有限数据集合
  2. 要将一些聚合操作应用在 PCollection 上时,或者对不同的 PCollection 进行 Join 操作
    • Beam 将这些操作应用在这些被 Window 划分好的不同的数据集
  3. 无论是有界数据还是无界数据,Beam 都会按同样的规则进行处理
  4. 在用 IO Connector 读取有界数据集的过程中,Read Transform 会默认为每个元素分配一个相同的时间戳
    • 一般情况下,该时间戳为运行 Pipeline 的时间,即处理时间 - Processing Time
    • Beam 会为该 Pipeline 默认分配一个全局窗口 - Global Window - 从无限小无限大的时间窗口

Global Window

可以显式将一个全局窗口赋予一个有界数据集

1
2
PCollection<String> input = p.apply(TextIO.read().from(filepath));
PCollection<String> batchInputs = input.apply(Window.<String>into(new GlobalWindows()));
  1. 在处理有界数据集时,可以不用显式地分配一个窗口
  2. 在处理无界数据集时,必须显式地分配一个窗口
    • 并且不能是全局窗口,否则运行 Pipeline 时会直接报错

Fixed Window

  1. 固定窗口(Fixed Window)也称为滚动窗口(Tumbling Window)
  2. Fixed Window 通常由一个静态的窗口大小来定义
  3. 一个 PCollection 中的所有元素,会根据自身的时间戳被分配到相应的固定窗口中
  4. 固定窗口本质上不会重叠在一起,PCollection 中的每一个元素只会落入唯一一个窗口

Window Transform

1
2
3
4
PCollection<String> input =
p.apply(KafkaIO.<Long, String>read()).apply(Values.<String>create());
PCollection<String> fixedWindowedInputs =
input.apply(Window.<String>into(FixedWindows.of(Duration.standardHours(1))));

Sliding Window

  1. 滑动窗口由一个静态的窗口大小和一个滑动周期来定义
  2. Beam 对滑动周期的大小没有做任何限制
    • 滑动周期 < 滑动窗口,会有部分重叠,在一个 PCollection 中,同一个元素会被分配到不同的滑动窗口
    • 滑动周期 = 滑动窗口,降级为固定窗口

Window Transform

1
2
3
4
5
6
PCollection<String> input =
p.apply(KafkaIO.<Long, String>read()).apply(Values.<String>create());
PCollection<String> slidingWindowedInputs =
input.apply(
Window.<String>into(
SlidingWindows.of(Duration.standardHours(1)).every(Duration.standardMinutes(30))));

Sessions Window

  1. 会话窗口没有一个固定的窗口长度
  2. 会话窗口主要用于记录持续一段时间的活动数据集
    • 在一个会话窗口中的数据集,将它里面所有的元素按照时间戳来排序
    • 任意相邻的两个元素,它们的时间戳相差不会超过一个定义好的静态间隔时间(Gap Duration)

假设会话窗口的静态时间间隔为 5 分钟
value1value3 在第一个会话窗口,而 value4value5 在第二个会话窗口

1
2
3
4
5
(key1, value1, [7:44:00 AM,7:44:00 AM))
(key1, value2, [7:45:00 AM,7:45:00 AM))
(key1, value3, [7:49:00 AM,7:49:00 AM))
(key1, value4, [8:01:00 AM,8:01:00 AM))
(key1, value5, [8:02:00 AM,8:02:00 AM))
1
2
3
4
PCollection<String> input =
p.apply(KafkaIO.<Long, String>read()).apply(Values.<String>create());
PCollection<String> sessionWindowedInputs =
input.apply(Window.<String>into(Sessions.withGapDuration(Duration.standardMinutes(5))));