DAG

Transform 是 Beam 中数据处理最基本单元

image-20241120160357180

  1. Beam 把数据转换抽象成有向图
  2. 反直觉 - PCollection 是有向图中的,而 Transform 是有向图中的节点
    • 区分节点的关键是看一个 Transform 是不是有一个多余输入输出
    • 每个 Transform 都可能有大于一个输入 PCollection,也可能输出大于一个输出 PCollection

Apply

Beam 中的 PCollection 有一个抽象的成员函数 Apply,使用任何一个 Transform 时,都需要调用 Apply

1
2
3
final_collection = input_collection.apply(Transform1)
.apply(Transform2)
.apply(Transform3)

image-20241120161250556

Transform

概述

  1. ParDo - Parallel Do - 表达的是很通用并行处理数据操作
  2. GroupByKey - 把一个 Key/Value 的数据集按照 Key 归并

可以用 ParDo 实现 GroupByKey

  1. 简单实现 - 放一个全局的哈希表,然后在 ParDo 中把一个个的元素插入到该哈希表
  2. 可能不可用,面对大规模数据时,可能无法放进一个内存哈希表
  3. 而且,PCollection 会把计算分发到不同的机器上执行

ParDo

在实际应用中,80% 的数据处理流水使用基本的 ParDo 和 DoFn

  1. 在编写 ParDo 时,输入是一个 PCollection 中的单个元素,而输出可以是 0 个1 个多个元素
  2. 只需要考虑好怎么处理一个元素,其余事项,Beam 会在框架层面进行优化并行
  3. 使用 ParDo 时,需要继承它提供的 DoFn
    • 可以将 DoFn 看作 ParDo 的一部分,ParDo 和 DoFn 是一个有机整体
1
2
3
4
5
6
7
8
9
10
static class UpperCaseFn extends DoFn<String, String> {
@ProcessElement
public void processElement(@Element String word, OutputReceiver<String> out) {
out.output(word.toUpperCase());
}
}

PCollection<String> upperCaseWords = words.apply(
ParDo
.of(new UpperCaseFn()));

编程界面

1
pcollection.apply(ParDo.of(new DoFn()))

Filter

挑出符合条件的元素

1
2
3
4
5
6
@ProcessElement
public void processElement(@Element T input, OutputReceiver<T> out) {
if (IsNeeded(input)) {
out.output(input);
}
}

Format

对数据集进行格式转换

1
2
3
4
@ProcessElement
public void processElement(@Element String csvLine, OutputReceiver<tf.Example> out) {
out.output(ConvertToTfExample(csvLine));
}

Extract

提取数据集中的特定值(属性)

1
2
3
4
@ProcessElement
public void processElement(@Element Item item, OutputReceiver<Integer> out) {
out.output(item.price());
}

Stateful Transform

Statefullness - side input/side output

  1. 简单场景都是无状态
    • 每个 DoFn 的 processElement 函数中,输出只依赖于输入
    • 对应的 DoFn 类不需要维持一个成员变量
  2. 无状态的 DoFn 能保证最大的并行运算能力
    • 因为 DoFn 的 processElement 可以分发到不同的机器或者不同的进程
  3. 如果 processElement 的运行需要另外的信息 - 有状态的 DoFn
1
2
3
4
5
6
7
8
static class FindUserNameFn extends DoFn<String, String> {
@ProcessElement
public void processElement(@Element String userId, OutputReceiver<String> out) {
out.output(database.FindUserName(userId));
}

Database database;
}
  1. 因为有了共享状态(数据库连接),在使用有状态的 DoFn 时,需要格外注意 Beam 的并行特性
  2. Beam 不仅仅会把处理函数分发到不同线程和进程,也会分发到不同的机器上执行
    • 当共享数据库的读取操作时,很容易引发数据库的 QPS 过高

需要共享的状态来自于另一些 Beam 的数据处理的中间结果 - side input/side output

1
2
3
4
5
6
7
8
9
10
11
12
13
14
PCollectionView<Integer> mediumSpending = ...;

PCollection<String> usersBelowMediumSpending =
userIds.apply(ParDo
.of(new DoFn<String, String>() {
@ProcessElement
public void processElement(@Element String userId, OutputReceiver<String> out, ProcessContext c) {
int medium = c.sideInput(mediumSpending);
if (findSpending(userId) <= medium) {
out.output(userId);
}
}
}).withSideInputs(mediumSpending)
);
  1. 需要根据之前处理得到的结果,即用户中位数消费数据,找到消费低于该中位数的用户
  2. 可以通过 side input 把这个中位数传递进 DoFn 中,然后可以在 ProcessContext 中取出该 side input

优化

Beam 中的数据操作都是 lazy execution

1
Pcollection1 = pcollection2.apply(Transform)
  1. 真正的计算完全没有被执行
  2. 仅仅只是让 Beam 知道用户的计算意图,需要让 Beam 构建数据处理的 DAG
  3. 然后 Beam 的处理优化器会对处理操作进行优化

image-20241120165851206

  1. 没必要过度优化 DoFn 代码,希望在一个 DoFn 中就把所有计算都做了
  2. 可以用分步的 DoFn 将计算意图表达出来,然后交给 Beam 的优化器合并操作