背景

  1. Spark Streaming 将无边界的流数据抽象成 DStream
    • 按特定的时间间隔,把数据流分割成一个个 RDD 进行批处理
    • DStream API 与 RDD API 高度相似,拥有 RDD 的各种性质
  2. DataSet/DataFrame
    • DataSet/DataFrame 是高级 API,提供类似于 SQL 的查询接口,方便熟悉关系型数据库的开发人员使用
    • Spark SQL 执行引擎自动优化 DataSet/DataFrame 程序
      • RDD API 开发的程序本质上需要开发人员手工构造 RDD 的 DAG 执行图,依赖于手工优化
  3. 如果拥有 DataSet/DataFrame API流处理模块
    • 无需去用相对底层DStream API 去处理无边界数据,大大提升开发效率
  4. 2016 年,Spark 2.0 中推出结构化流处理的模块 - Structured Streaming
    • Structured Streaming 基于 Spark SQL 引擎实现
    • 在开发视角,流数据静态数据没有区别,可以像批处理静态数据那样处理流数据
    • 随着流数据持续输入,Spark SQL 引擎会持续地处理新数据,并更新计算结果

模型

  1. 流数据处理最基本的问题是如何对不断更新的无边界数据建模
  2. Spark Streaming
    • 流数据按一定的时间间隔分割成许多个小的数据块进行批处理
  3. Structured Streaming
    • 把数据看成一个无边界关系型数据库表
    • 每一个数据都是表中的一行,不断会有新的数据行被添加到表中
    • 可以对该表做任何类似批处理查询,Spark 会不断对新加入的数据进行处理,并更新计算结果

image-20241118184008593

  1. 与 Spark Streaming 类似,Structured Streaming 也是将输入的数据按照时间间隔(例如 1 秒)划分为数据段
  2. 每一秒都会把新输入的数据添加到表中,Spark 也会每秒更新输出结果
    • 输出结果也是的形式,输出表可以写入到硬盘或者 HDFS

Structured Streaming 的三种输出模式

Mode Desc
完全模式 - Complete Mode 整个更新过的输出表被写入外部存储
附加模式 - Append Mode 上次触发后新增的行才会被写入到外部存储
如果老数据有改动则不适合该模式
更新模式 - Update Mode 上次触发后被更新的行才会被写入外部存储

Structured Streaming 并不会完全存储输入数据

  1. 每个时间间隔,Structured Streaming 都会读取最新的输入,进行处理并更新输出表,然后删除这次输入
  2. Structured Streaming 只会存储更新输出表所需要的信息

Structured Streaming 模型根据事件时间(Event Time)处理数据时十分方便

  1. 事件时间指的是事件发生的时间,是数据本身的属性;处理时间是 Spark 接收数据的时间
  2. 在 Structured Streaming 模型中,每个数据是输入数据表中的一行,那么事件时间就是行中的一列
  3. 依靠 DataSet/DataFrame API 提供的类似于 SQL 的接口 - 很方便地执行基于时间窗口查询

Streaming DataFrame API

Structured Streaming 发布后,DataFrame 即可以代表静态的有边界数据,也可以代表无边界数据

创建 DataFrame

1
2
3
4
5
6
socketDataFrame = spark
.readStream
.format("socket"
.option("host", "localhost")
.option("port", 9999)
.load()
  1. SparkSession.readStream() 返回的 DataStreamReader 可以用于创建 Streaming DataFrame
  2. 支持多种类型的数据流作为输入,如 FileKafkaSocket

查询操作

Streaming DataFrameStatic DataFrame 都支持 SQL 查询(select、where),也支持 RDD 转换操作

1
2
3
4
df = … // 这个DataFrame代表学校学生的数据流,schema是{name: string, age: number, height: number, grade: string}
df.select("name").where("age > 10") // 返回年龄大于10岁的学生名字列表
df.groupBy("grade").count() // 返回每个年级学生的人数
df.sort_values([‘age’], ascending=False).head(100) // 返回100个年龄最大的学生

通过 isStreaming 函数判断一个 DataFrame 是否代表流数据

1
df.isStreaming()

基于事件时间的时间窗口操作 - 在 Spark Streaming 中的热词统计是基于处理时间

1
2
3
4
5
6
7
8
words = ...  # 这个DataFrame代表词语的数据流,schema是 { timestamp: Timestamp, word: String}

windowedCounts = words.groupBy(
window(words.timestamp, "1 minute", "10 seconds"),
words.word
).count()
.sort(desc("count"))
.limit(10)
  1. 基于词语的生成时间(而非 Spark 的处理时间),创建一个窗口长度为 1 分钟,滑动间隔为 10 秒的窗口
  2. 把输入的词语表根据窗口和词语本身聚合起来,并统计每个窗口内词语的数量,在根据词语数量倒排 Top 10

输出结果流

  1. 当经过各种 SQL 查询操作后,创建好代表最终结果DataFrame
  2. 下一步开始对输入数据流的处理,并持续输出结果
1
2
3
4
5
6
7
8
query = wordCounts
.writeStream
.outputMode("complete")
.format("csv")
.option("path", "path/to/destination/dir")
.start()

query.awaitTermination()
  1. 通过 Dataset.writeStream() 返回的 DataStreamWriter 对象去输出结果
  2. 支持多种写入位置,如 File、Kafka、Console、内存等

Structured Streaming vs Spark Streaming

综合来说,Structured Streaming 是比 Spark Streaming 更好的流处理工具

易用性 + 性能

  1. Spark Streaming 提供的 DStream APIRDD API 非常类似,相对底层

    • 编写 Spark Streaming 程序时,本质上是去构造 RDD 的 DAG 执行图,然后通过 Spark Engine 运行

    • 开发者心智负担比较重,需要想办法去提高程序的处理效率

    • 对于一个好的框架来说,开发者只需要专注于业务逻辑上,无需担心配置和优化等繁杂事项

  2. Structured Streaming 提供的 DataFrame API 是一个相对高级的 API

    • 统一的数据抽象可以用一套统一的方案去处理批处理流处理,而无需关心具体的执行细节
    • 而且 DataFrame API 是在 Spark SQL 执行引擎上执行 ,有非常多的优化功能 - 所以性能更佳

实时性

  1. Spark Streaming准实时的,可以做到的最小延迟1 秒左右
  2. 虽然 Structured Streaming 也是类似的微批处理思想
    • 每过一个时间间隔,就去拿最新的数据加入到输入数据表更新结果
    • 但相比于 Spark Streaming,更接近于实时处理,可以做到更小的时间间隔,最小延迟在 100 毫秒左右
    • Spark 2.3 开始,Structured Streaming 引入了连续处理的模式,可以做到真正的毫秒级延迟

事件时间

Spark Streaming - 处理时间
Structured Streaming - 处理时间 or 事件时间

  1. Structured Streaming 对基于事件时间的处理有很好的支持
  2. Spark Streaming 是将数据按照接收到的时间切分成一个个 RDD 来进行批处理
    • 很难基于数据本身的事件时间进行处理,如果某个数据的处理时间与事件时间不一致,很容易出问题