Beam - Streaming
有界数据 vs 无界数据 在 Beam 中,可以用同一个 Pipeline 处理有界数据和无界数据 无论是有界数据还是无界数据,在 Beam 中,都可以用窗口把数据按时间分割成一些有限大小的集合 对于无界数据,必须使用窗口对数据进行分割,然后对每个窗口内的数据集进行处理 读取无界数据 withLogAppendTime - 使用 Kafka 的 log append time 作为 PCollection 的时间戳 12345678Pipeline pipeline = Pipeline.create();pipeline.apply( KafkaIO.<String, String>read() .withBootstrapServers("broker_1:9092,broker_2:9092") .withTopic("shakespeare") // use withTopics(List<String>) to read from multiple topics. .wi...
Beam - Window
Window 在 Beam 中,Window 将 PCollection 里的每个元素根据时间戳划分成不同的有限数据集合 要将一些聚合操作应用在 PCollection 上时,或者对不同的 PCollection 进行 Join 操作 Beam 将这些操作应用在这些被 Window 划分好的不同的数据集上 无论是有界数据还是无界数据,Beam 都会按同样的规则进行处理 在用 IO Connector 读取有界数据集的过程中,Read Transform 会默认为每个元素分配一个相同的时间戳 一般情况下,该时间戳为运行 Pipeline 的时间,即处理时间 - Processing Time Beam 会为该 Pipeline 默认分配一个全局窗口 - Global Window - 从无限小到无限大的时间窗口 Global Window 可以显式将一个全局窗口赋予一个有界数据集 12PCollection<String> input = p.apply(TextIO.read().from(filepath));PCollection<String> batchI...
Beam - WordCount
步骤 用 Pipeline IO 读取文本 用 Transform 对文本进行分词和词频统计 用 Pipeline IO 输出结果 将所有步骤打包成一个 Pipeline 创建 Pipeline 默认情况下,将采用 DirectRunner 在本地运行 1PipelineOptions options = PipelineOptionsFactory.create(); 一个 Pipeline 实例会构建数据处理的 DAG,以及这个 DAG 所需要的 Transform 1Pipeline p = Pipeline.create(options); 应用 Transform TextIO.Read - 读取外部文件,生成一个 PCollection,包含所有文本行,每个元素都是文本中的一行 123String filepattern = "file:///Users/zhongmingmao/workspace/java/hello-beam/corpus/shakespeare.txt";PCollection<String> lines = ...
Beam - Execution Engine
Pipeline 读取输入数据到 PCollection 对读进来的 PCollection 进行 Transform,得到另一个 PCollection 输出结果 PCollection 1234567891011121314// Start by defining the options for the pipeline.PipelineOptions options = PipelineOptionsFactory.create();// Then create the pipeline.Pipeline pipeline = Pipeline.create(options);PCollection<String> lines = pipeline.apply( "ReadLines", TextIO.read().from("gs://some/inputData.txt"));PCollection<String> filteredLines = lines.apply(new FilterLines());filtere...
Beam - Pipeline Test
Context 设计好的 Pipeline 通常需要放在分布式环境下执行,具体每一步的 Transform 都会被分配到任意机器上执行 如果 Pipeline 运行出错,则需要定位到具体机器,再到上面去做调试是不现实的 另一种办法,读取一些样本数据集,再运行整个 Pipeline 去验证哪一步逻辑出错 - 费时费力 正式将 Pipeline 放在分布式环境上运行之前,需要先完整地测试整个 Pipeline 逻辑 Solution Beam 提供了一套完整的测试 SDK 可以在开发 Pipeline 的同时,能够实现对一个 Transform 逻辑的单元测试 也可以对整个 Pipeline 的 End-to-End 测试 在 Beam 所支持的各种 Runners 中,有一个 DirectRunner DirectRunner 即本地机器,整个 Pipeline 会放在本地机器上运行 DoFnTester - 让用户传入一个自定义函数来进行测试 - UDF - User Defined Function DoFnTester 接收的对象是用户继承实现的 DoFn 不应该将 DoFn 当成...
Beam - Pattern
Copier Pattern 每个数据处理模块的输入都是相同的,并且每个数据处理模块都可以单独并且同步地运行处理 1234567891011121314151617181920212223242526272829303132333435363738394041PCollection<Video> videoDataCollection = ...;// 生成高画质视频PCollection<Video> highResolutionVideoCollection = videoDataCollection.apply("highResolutionTransform", ParDo.of(new DoFn<Video, Video>(){ @ProcessElement public void processElement(ProcessContext c) { c.output(generateHighResolution(c.element())); }}));// 生成低画质视频...
Beam - Pipeline IO
读取数据集 一个输入数据集的读取通常是通过 Read Transform 来完成 Read Transform 从外部源读取数据 - 本地文件、数据库、OSS、MQ Read Transform 返回一个 PCollection,该 PCollection 可以作为一个输入数据集,应用在各种 Transform 上 Pipeline 没有限制调用 Read Transform 的时机 可以在 Pipeline 最开始的时候调用 也可以在经过 N 个步骤的 Transforms 后再调用它来读取另外的数据集 本地文件 1PCollection<String> inputs = p.apply(TextIO.read().from(filepath)); Beam 支持从多个文件路径中读取数据集,文件名匹配规则与 Linux glob 一样 glob 操作符的匹配规则最终要和所使用的底层文件系统挂钩 从不同的外部源读取同一类型的数据来统一作为输入数据集 - 利用 flatten 操作将数据集合并 12345PCollection<String> input1 ...
Beam - Pipeline
创建 在 Beam 中,所有的数据处理逻辑都会被抽象成 Pipeline 来运行 Pipeline 是对数据处理逻辑的一个封装 包括一整套流程 - 读取数据集、将数据集转换成想要的结果、输出结果数据集 创建 Pipeline 12PipelineOptions options = PipelineOptionsFactory.create();Pipeline p = Pipeline.create(options); 应用 PCollection 具有不可变性 一个 PCollection 一旦生成,就不能在增加或者删除里面的元素了 在 Beam 中,每次 PCollection 经过一个 Transform 之后,Pipeline 都会创建一个新的 PCollection 新创建的 PCollection 又成为下一个 Transform 的输入 原先的 PCollection 不会有任何改变 对同一个 PCollection 可以应用多种不同的 Transform 处理模型 Pipeline 的底层思想依然是 MapReduce 在分布式环境下,整个 Pipeline...
Beam - Transform
DAG Transform 是 Beam 中数据处理的最基本单元 Beam 把数据转换抽象成有向图 反直觉 - PCollection 是有向图中的边,而 Transform 是有向图中的节点 区分节点和边的关键是看一个 Transform 是不是有一个多余的输入和输出 每个 Transform 都可能有大于一个的输入 PCollection,也可能输出大于一个的输出 PCollection Apply Beam 中的 PCollection 有一个抽象的成员函数 Apply,使用任何一个 Transform 时,都需要调用 Apply 123final_collection = input_collection.apply(Transform1).apply(Transform2).apply(Transform3) Transform概述 ParDo - Parallel Do - 表达的是很通用的并行处理数据操作 GroupByKey - 把一个 Key/Value 的数据集按照 Key 归并 可以用 ParDo 实现 GroupByKey 简单实现 - 放...
Beam - PCollection
数据抽象 Spark RDD 不同的技术系统有不同的数据结构, 如在 C++ 中有 vector、unordered_map 几乎所有的 Beam 数据都能表达为 PCollection PCollection - Parallel Collection - 可并行计算的数据集,与 Spark RDD 非常类似 在一个分布式计算系统中,需要为用户隐藏实现细节,包括数据是怎样表达和存储的 数据可能来自于内存的数据,也可能来自于外部文件,或者来自于 MySQL 数据库 如果没有一个统一的数据抽象的话,开发者需要不停地修改代码,无法专注于业务逻辑 Coder 将数据类型进行序列化和反序列化,便于在网络上传输 需要为 PCollection 的元素编写 Coder Coder 的作用与 Beam 的本质紧密相关 计算流程最终会运行在一个分布式系统 所有的数据都可能在网络上的计算机之间相互传递 Coder 就是告诉 Beam 如何将数据类型进行序列化和反序列化,以便于在网络上传输 Coder 需要注册进全局的 CoderRegistry 为自定义的数据类型建立与 Coder 的对应关系,无...















