Beam - Pipeline Test
Context
- 设计好的 Pipeline 通常需要放在分布式环境下执行,具体每一步的 Transform 都会被分配到任意机器上执行
- 如果 Pipeline 运行出错,则需要定位到具体机器,再到上面去做调试是不现实的
- 另一种办法,读取一些样本数据集,再运行整个 Pipeline 去验证哪一步逻辑出错 - 费时费力
- 正式将 Pipeline 放在分布式环境上运行之前,需要先完整地测试整个 Pipeline 逻辑
Solution
- Beam 提供了一套完整的测试 SDK
- 可以在开发 Pipeline 的同时,能够实现对一个 Transform 逻辑的单元测试
- 也可以对整个 Pipeline 的 End-to-End 测试
- 在 Beam 所支持的各种 Runners 中,有一个 DirectRunner
- DirectRunner 即本地机器,整个 Pipeline 会放在本地机器上运行
- DoFnTester - 让用户传入一个自定义函数来进行测试 - UDF - User Defined Function
- DoFnTester 接收的对象是用户继承实现的 DoFn
- 不应该将 DoFn 当成一个单元来进行测试
- 在 Beam 中,数据转换的逻辑都是被抽象成 Transform,而不是 Transform 里面的 ParDo 的具体实现
- 一个简单的 Transform 可以用一个 ParDo 来表示
- 每个 Runner 具体怎么运行这些 ParDo,对用户来说应该是透明的
- 从 Beam 2.4.0 后,DoFnTester 被标记为 Deprecated,推荐使用 TestPipeline
Unit
- 创建 TestPipeline 实例
- 创建一个静态的、用于测试的输入数据集
- 使用 Create Transform 来创建一个 PCollection 作为输入数据集
- 在测试数据集上调用业务实现的 Transform 并将结果保存在一个 PCollection 上
- 使用 PAssert 类的相关函数来验证输出的 PCollection 是否符合预期
继承 DoFn 类来实现一个产生偶数的 Transform,输入和输出的数据类型都是 Integer
1 | static class EvenNumberFn extends DoFn<Integer, Integer> { |
创建 TestPipeline 实例
1 | ... |
创建静态输入数据集
1 | ... |
使用 Create Transform 创建 PCollection
Create Transform - 将 Java Collection 的数据转换成 Beam 的数据抽象 PCollection
1 | ... |
调用业务 Transform 的处理逻辑
1 | ... |
验证输出结果 - PAssert
1 | ... |
运行 TestPipeline - PAssert 必须在 TestPipeline.run 之前
1 | final class TestClass { |
End-to-End
- 现实应用中,一般都是多步骤 Pipeline,可能会涉及到多个输入数据集,也可能会有多个输出
- 在 Beam 中,端到端的测试与 Transform 的单元测试非常相似
- 唯一不同点,需要为所有的输入数据集创建测试集,而不仅仅只针对一个 Transform
- 对于 Pipeline 中每个应用到 Write Transform 的地方,都需要用到 PAssert 来验证数据集
步骤
- 创建 TestPipeline 实例
- 对于多步骤 Pipeline 的每个输入数据源,创建相对应的静态测试数据集
- 使用 Create Transform,将所有的静态测试数据集转换成 PCollection 作为输入数据集
- 按照真实的 Pipeline 逻辑,调用所有的 Transforms 操作
- 在 Pipeline 中所有应用到 Write Transform 的地方,都使用 PAssert 来替换 Write Transform
- 并验证输出的结果是否符合预期
All articles in this blog are licensed under CC BY-NC-SA 4.0 unless stating additionally.