Pipeline

  1. 读取输入数据到 PCollection
  2. 对读进来的 PCollection 进行 Transform,得到另一个 PCollection
  3. 输出结果 PCollection
1
2
3
4
5
6
7
8
9
10
11
12
13
14
// Start by defining the options for the pipeline.
PipelineOptions options = PipelineOptionsFactory.create();

// Then create the pipeline.
Pipeline pipeline = Pipeline.create(options);

PCollection<String> lines = pipeline.apply(
"ReadLines", TextIO.read().from("gs://some/inputData.txt"));

PCollection<String> filteredLines = lines.apply(new FilterLines());

filteredLines.apply("WriteMyFile", TextIO.write().to("gs://some/outputData.txt"));

pipeline.run().waitUntilFinish();
  1. 任何一个 Beam 程序都需要先创建一个 Pipeline 实例 - 用来表达 Pipeline 类型
  2. 二进制程序可以动态包含多个 Pipeline 实例
    • 每个 Pipeline 实例都是独立的,封装了要进行操作的数据,以及要进行操作 Transform
  3. Beam 是延迟运行
    • Pipeline.run 之前,只是构建了 Beam 所需要的数据处理 DAG 用来优化和分配计算资源,未开始计算
    • pipeline.run().waitUntilFinish() - 数据真正开始被处理
  4. Pipeline 可以通过 PipelineOption 动态选择计算引擎

TestPipeline

TestPipeline 是一种特殊的 Pipeline,能够在单机上运行小规模的数据集

1
2
3
4
5
6
7
8
9
Pipeline p = TestPipeline.create();

PCollection<String> input = p.apply(Create.of(WORDS)).setCoder(StringUtf8Coder.of());

PCollection<String> output = input.apply(new CountWords());

PAssert.that(output).containsInAnyOrder(COUNTS_ARRAY);

p.run();

PipelineOptions

  1. Beam 把应用层的数据处理业务逻辑底层的运算引擎分离
  2. 无需修改 Pipeline 代码,就可以在本地SparkFlink 上运行 - PipelineOptions
  3. SparkPipelineOptions / FlinkPipelineOptions
1
2
options = PipelineOptionsFactory.as(SparkPipelineOptions.class);
Pipeline pipeline = Pipeline.create(options);
  1. 通常一个 PipelineOption 是用 PipelineOptionsFactory 来创建的
    • PipelineOptionsFactory.as(Class)
    • PipelineOptionsFactory.create()
  2. 更常见的创建方法是从命令行中读取参数来创建 PipelineOption
    • PipelineOptionsFactory#fromArgs(String[])
1
2
3
4
public static void main(String[] args) {
PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
Pipeline p = Pipeline.create(options);
}

Direct

  1. 本地运行测试或者调试时使用的运行模式
  2. 在 Direct 模式时,Beam 会在单机上用多线程模拟分布式并行处理

Maven

1
2
3
4
5
6
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-direct-java</artifactId>
<version>2.60.0</version>
<scope>test</scope>
</dependency>

根据命令行参数选择生成不同的 PipelineOptions 子类

1
2
PipelineOptions options =
PipelineOptionsFactory.fromArgs(args).create();
1
2
$ mvn compile exec:java -Dexec.mainClass=YourMainClass \
-Dexec.args="--runner=DirectRunner" -Pdirect-runner

显式使用 DirectRunner

1
2
3
4
5
PipelineOptions options = PipelineOptionsFactory.create();
options.setRunner(DirectRunner.class);
// 或者这样
options = PipelineOptionsFactory.as(DirectRunner.class);
Pipeline pipeline = Pipeline.create(options);

Spark

  1. SparkRunner执行 Beam 程序时,与原生的 Spark 程序一样
  2. SparkRunner 为在 Spark 上运行 Beam Pipeline 提供了以下功能
    • Batch Pipeline + Streaming Pipeline
    • 原生 RDD 和 DStream 一样的容错保证
    • 原生 Spark 一样的安全性能
    • 可以使用 Spark 的数据回报系统
    • 使用 Spark Broadcast 实现 Beam side-input
  3. ≥ Spark 2.2

Maven

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-spark</artifactId>
<version>2.13.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>${spark.version}</version>
</dependency>

命令行参数

1
2
3
4
$ mvn exec:java -Dexec.mainClass=YourMainClass \
-Pspark-runner \
-Dexec.args="--runner=SparkRunner \
--sparkMaster=<spark master url>"

Spark 独立集群上运行

1
$ spark-submit --class YourMainClass --master spark://HOST:PORT target/...jar --runner=SparkRunner

Flink

  1. FlinkRunner 用来在 Flink 上运行 Beam Pipeline
    • 可选的计算集群 - 如 Yarn / Kubernetes / Mesos / 本地
  2. FlinkRunner 适合大规模且连续的数据处理问题
  3. FlinkRunner 提供的功能
    • Streaming中心,支持流处理批处理
    • 原生 Flink 一样的容错性,同样支持 exactly-once 处理语义
    • 可以自定义内存管理模型
    • 与 Apache Hadoop 生态整合比较好

Maven

1
2
3
4
5
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-flink-1.6</artifactId>
<version>2.13.0</version>
</dependency>

命令行参数

1
2
3
4
$ mvn exec:java -Dexec.mainClass=YourMainClass \
-Pflink-runner \
-Dexec.args="--runner=FlinkRunner \
--flinkMaster=<flink master url>"