Pipeline
- 读取输入数据到 PCollection
- 对读进来的 PCollection 进行 Transform,得到另一个 PCollection
- 输出结果 PCollection
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| PipelineOptions options = PipelineOptionsFactory.create();
Pipeline pipeline = Pipeline.create(options);
PCollection<String> lines = pipeline.apply( "ReadLines", TextIO.read().from("gs://some/inputData.txt"));
PCollection<String> filteredLines = lines.apply(new FilterLines());
filteredLines.apply("WriteMyFile", TextIO.write().to("gs://some/outputData.txt"));
pipeline.run().waitUntilFinish();
|
- 任何一个 Beam 程序都需要先创建一个 Pipeline 实例 - 用来表达 Pipeline 类型
- 二进制程序可以动态包含多个 Pipeline 实例
- 每个 Pipeline 实例都是独立的,封装了要进行操作的数据,以及要进行操作 Transform
- Beam 是延迟运行的
- 在 Pipeline.run 之前,只是构建了 Beam 所需要的数据处理 DAG 用来优化和分配计算资源,未开始计算
- pipeline.run().waitUntilFinish() - 数据真正开始被处理
- Pipeline 可以通过 PipelineOption 动态选择计算引擎
TestPipeline
TestPipeline 是一种特殊的 Pipeline,能够在单机上运行小规模的数据集
1 2 3 4 5 6 7 8 9
| Pipeline p = TestPipeline.create();
PCollection<String> input = p.apply(Create.of(WORDS)).setCoder(StringUtf8Coder.of());
PCollection<String> output = input.apply(new CountWords());
PAssert.that(output).containsInAnyOrder(COUNTS_ARRAY);
p.run();
|
PipelineOptions
- Beam 把应用层的数据处理业务逻辑与底层的运算引擎分离
- 无需修改 Pipeline 代码,就可以在本地、Spark、Flink 上运行 - PipelineOptions
- SparkPipelineOptions / FlinkPipelineOptions
1 2
| options = PipelineOptionsFactory.as(SparkPipelineOptions.class); Pipeline pipeline = Pipeline.create(options);
|
- 通常一个 PipelineOption 是用 PipelineOptionsFactory 来创建的
- PipelineOptionsFactory.as(Class)
- PipelineOptionsFactory.create()
- 更常见的创建方法是从命令行中读取参数来创建 PipelineOption
- PipelineOptionsFactory#fromArgs(String[])
1 2 3 4
| public static void main(String[] args) { PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create(); Pipeline p = Pipeline.create(options); }
|
Direct
- 在本地运行测试或者调试时使用的运行模式
- 在 Direct 模式时,Beam 会在单机上用多线程来模拟分布式的并行处理
Maven
1 2 3 4 5 6
| <dependency> <groupId>org.apache.beam</groupId> <artifactId>beam-runners-direct-java</artifactId> <version>2.60.0</version> <scope>test</scope> </dependency>
|
根据命令行参数选择生成不同的 PipelineOptions 子类
1 2
| PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
|
1 2
| $ mvn compile exec:java -Dexec.mainClass=YourMainClass \ -Dexec.args="--runner=DirectRunner" -Pdirect-runner
|
显式使用 DirectRunner
1 2 3 4 5
| PipelineOptions options = PipelineOptionsFactory.create(); options.setRunner(DirectRunner.class);
options = PipelineOptionsFactory.as(DirectRunner.class); Pipeline pipeline = Pipeline.create(options);
|
Spark
- SparkRunner 在执行 Beam 程序时,与原生的 Spark 程序一样
- SparkRunner 为在 Spark 上运行 Beam Pipeline 提供了以下功能
- Batch Pipeline + Streaming Pipeline
- 与原生 RDD 和 DStream 一样的容错保证
- 与原生 Spark 一样的安全性能
- 可以使用 Spark 的数据回报系统
- 使用 Spark Broadcast 实现 Beam side-input
- ≥ Spark 2.2
Maven
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| <dependency> <groupId>org.apache.beam</groupId> <artifactId>beam-runners-spark</artifactId> <version>2.13.0</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.10</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.10</artifactId> <version>${spark.version}</version> </dependency>
|
命令行参数
1 2 3 4
| $ mvn exec:java -Dexec.mainClass=YourMainClass \ -Pspark-runner \ -Dexec.args="--runner=SparkRunner \ --sparkMaster=<spark master url>"
|
在 Spark 独立集群上运行
1
| $ spark-submit --class YourMainClass --master spark://HOST:PORT target/...jar --runner=SparkRunner
|
Flink
- FlinkRunner 用来在 Flink 上运行 Beam Pipeline
- 可选的计算集群 - 如 Yarn / Kubernetes / Mesos / 本地
- FlinkRunner 适合大规模且连续的数据处理问题
- FlinkRunner 提供的功能
- 以 Streaming 为中心,支持流处理和批处理
- 与原生 Flink 一样的容错性,同样支持 exactly-once 处理语义
- 可以自定义内存管理模型
- 与 Apache Hadoop 生态整合比较好
Maven
1 2 3 4 5
| <dependency> <groupId>org.apache.beam</groupId> <artifactId>beam-runners-flink-1.6</artifactId> <version>2.13.0</version> </dependency>
|
命令行参数
1 2 3 4
| $ mvn exec:java -Dexec.mainClass=YourMainClass \ -Pflink-runner \ -Dexec.args="--runner=FlinkRunner \ --flinkMaster=<flink master url>"
|