Kafka -- 高水位 + Leader Epoch
高水位
水位的定义
- 经典教科书
- 在时刻T,任意创建时间(Event Time)为
T'
,且T'<=T
的所有事件都已经到达,那么T就被定义为水位
- 在时刻T,任意创建时间(Event Time)为
- 《Streaming System》
- 水位是一个单调增加且表征最早未完成工作的时间戳
- 上图中标注为
Completed
的蓝色部分代表已经完成的工作,标注为In-Flight
的红色部分代表正在进行中的工作- 两者的边界就是水位线
- 在Kafka中,水位不是时间戳,而是与位置信息绑定的,即用消息位移来表征水位
- Kafka中也有低水位(Low Watermark),是与Kafka删除消息相关联的概念
高水位的作用
- 两个作用
- 定义消息可见性,即用来标识分区下的哪些消息可以被消费者消费的
- 帮助Kafka完成副本同步
- 上图是某个分区Leader副本的高水位图,在分区高水位以下的消息被认为是已提交消息,反之为未提交消息
- 消费者只能消费已提交消息,即位移小于8的所有消息
- 暂不讨论Kafka事务,Kafka的事务机制会影响消费者所能看到的消息的范围,不只是简单依赖高水位来判断
- 而是依靠LSO(Log Stable Offset)的位移值来判断事务型消费者的可见性
- 位移值等于高水位的消息也属于未提交消息,即高水位上的消息是不能被消费者消费的
- 图中还有一个日志末端位移(Log End Offset,LEO)的概念,表示副本写入下一条消息的位移值
- LEO为15,方框是虚线,表示当前副本只有15条消息,位移从0到14,下一条新消息的位移为15
[高水位,LEO)
的消息属于未提交消息,在同一个副本对象,高水位值不会大于LEO值- 高水位和LEO是副本对象的两个重要属性
- Kafka所有副本对象都有对应的高水位和LEO,而Kafka使用Leader副本的高水位来定义所在分区的高水位
高水位的更新机制
远程副本
- 每个副本对象都保存了一组高水位和LEO值,Leader副本所在的Broker还保存了_其它Follower副本的LEO值_
- Kafka把Broker 0上保存的Follower副本又称为远程副本(Remote Replica)
- Kafka副本机制在运行过程中
- 会更新
- Broker 1上Follower副本的高水位和LEO值
- Broker 0上Leader副本的高水位和LEO以及所有远程副本的LEO
- 不会更新
- Broker 0所有远程副本的高水位值,即图中标记为灰色的部分
- 会更新
- Broker 0保存远程副本的作用
- 帮助Leader副本确定其高水位,即分区高水位
更新时机
更新对象 | 更新时机 |
---|---|
Broker 0上Leader副本的LEO | Leader副本接收到生产者发送的消息,写入到本地磁盘后,会更新其LEO值 |
Broker 1上Follower副本的LEO | Follower副本从Leader副本拉取消息,写入本地磁盘后,会更新其LEO值 |
Broker 0上远程副本的LEO | Follower副本从Leader副本拉取消息时,会告诉Leader副本从哪个位移开始拉取, Leader副本会使用这个位移值来更新远程副本的LEO |
Broker 0上Leader副本的高水位 | 两个更新时机:一个是Leader副本更新其LEO之后,一个是更新完远程副本LEO之后 具体算法:取Leader副本和所有与Leader同步的远程副本LEO中的最小值 |
Broker 1上Follower副本的高水位 | Follower副本成功更新完LEO后,会比较其LEO与Leader副本发来的高水位值, 并用两者的较小值去更新自己的高水位 |
- 与Leader副本保持同步,需要满足两个条件
- 该远程Follower副本在ISR中
- 该远程Follower副本LEO值落后Leader副本LEO值的时间不超过参数
replica.lag.time.max.ms
(10秒)
- 某个副本能否进入ISR是由第二个条件判断的
- 2个条件判断是为了应对意外情况:Follower副本已经追上Leader,却不在ISR中
- 假设Kafka只判断第1个条件,副本F刚刚重启,并且已经具备进入ISR的资格,但此时尚未进入到ISR
- 由于缺少了副本F的判断,分区高水位有可能超过真正ISR中的副本LEO,而高水位>LEO是不允许的
Leader副本
- 处理生产者请求
- 写入消息到本地磁盘,更新LEO
- 更新分区高水位值
- 获取Leader副本所在Broker端保存的所有远程副本LEO值
{LEO-1, LEO-2,... LEO-n}
- 获取Leader副本的LEO值:
currentLEO
- 更新**
currentHW = min(currentLEO, LEO-1, LEO-2,... LEO-n)
**
- 获取Leader副本所在Broker端保存的所有远程副本LEO值
- 处理Follower副本拉取消息
- 读取磁盘(或页缓存)中的消息数据
- 使用Follower副本发送请求中的位移值来更新远程副本的LEO值
- 更新分区高水位值(与上面一致)
Follower副本
- 从Leader拉取消息
- 写入消息到本地磁盘
- 更新LEO
- 更新高水位值
- 获取Leader发送的高水位值:
currentHW
- 获取步骤2中更新的LEO值:
currentLEO
- 更新高水位**
min(currentHW, currentLEO)
**
- 获取Leader发送的高水位值:
副本同步样例
主题是单分区两副本,首先是初始状态,所有值都是0
当生产者向主题分区发送一条消息后,状态变更为
此时,Leader副本成功将消息写入到本地磁盘,将LEO值更新为1(更新高水位值为0,并把结果发送给Follower副本)
Follower再次尝试从Leader拉取消息,此时有消息可以拉取,Follower副本也成功更新LEO为1(并将高水位更新为0)
此时,Leader副本和Follower副本的LEO都是1,但各自的高水位依然是0,需要等到下一轮的拉取中被更新
在新一轮的拉取请求中,由于位移值为0的消息已经拉取成功,因此Follower副本这次拉取请求的位移值为1
Leader副本接收到此请求后,更新远程副本LEO为1,然后更新Leader高水位值为1
最后,Leader副本会将当前更新过的高水位值1发送给Follower副本,Follower副本接收到后,也会将自己的高水位值更新为1
Leader Epoch
基本概念
- 上面的副本同步过程中,Follower副本的高水位更新需要一轮额外的拉取请求才能实现
- 如果扩展到多个Follower副本,可能需要多轮拉取请求
- 即Leader副本高水位更新和Follower副本高水位更新在时间上存在错配
- 这种错配是很多数据丢失或数据不一致问题的根源
- 因此,社区在0.11版本正式引入了
Leader Epoch
概念,来规避高水位更新错配导致的各种不一致问题
- Leader Epoch可以大致认为是Leader版本,由两部分数据组成
- Epoch
- 一个单调递增的版本号
- 每当副本领导权发生变更时,都会增加该版本号
- 小版本号的Leader被认为是过期Leader,不能再行使Leader权利
- 起始位移(Start Offset)
- Leader副本在该Epoch值上写入的首条消息的位移
- Epoch
- 两个Leader Epoch,
<0,0>
和<1,120>
<0,0>
表示版本号为0,该版本的Leader从位移0开始保存消息,一共保存了120条消息- 之后Leader发生了变更,版本号增加到1,新版本的起始位移是120
- Broker在内存中为每个分区都缓存
Leader Epoch
数据,同时还会定期地将这些数据持久化到一个checkpoint
文件中- 当Leader副本写入消息到磁盘时,Broker会尝试更新这部分缓存
- 如果Leader是首次写入消息,那么Broker会向缓存中增加Leader Epoch条目,否则不做更新
- 这样每次有Leader变更时,新的Leader副本会查询这部分缓存,取出对应的Leader Epoch的起始位移
- 然后进行相关的逻辑判断,避免数据丢失和数据不一致的情况
数据丢失
- 开始时,副本A和副本B都处于正常状态,A是Leader副本
- 某个的生产者(默认acks设置)向A发送了两条消息,A全部写入成功,Kafka会通知生产者说两条消息全部发送成功
- 假设Leader和Follower都写入了这两条消息,而且Leader副本的高水位也更新了,但_Follower副本的高水位还未更新_
- 此时副本B所在的Broker宕机,当它重启回来后,副本B会执行_日志截断!!_
- **将LEO值调整为之前的高水位值!!**,也就是1
- 位移值为1的那条消息被副本B从磁盘中删除,此时副本B的底层磁盘文件中只保留1条消息,即位移为0的消息
- 副本B执行完日志截断操作后,开始从A拉取消息,此时恰好副本A所在的Broker也宕机了,副本B自然成为新的Leader
- 当A回来后,需要执行相同的日志截断操作,但不能超过新Leader,即将高水位调整与B相同的值,也就是1
- 操作完成后,位移值为1的那条消息就从两个副本中被永远抹掉,造成了数据丢失
Leader Epoch规避数据丢失
- Follower副本B重启后,需要向A发送一个特殊的请求去获取Leader的LEO值,该值为2
- 当获知Leader LEO后,B发现该LEO值大于等于自己的LEO,而且缓存中也没有保存任何起始位移值>2的Epoch条目
- B无需执行任何日志截断操作
- 明显改进:_副本是否执行日志截断不再依赖于高水位进行判断_
- A宕机,B成为Leader,当A重启回来后,执行与B相同的逻辑判断,发现同样不需要执行日志截断
- 至此位移值为1的那条消息在两个副本中均得到保留
- 后面生产者向B写入新消息后,副本B所在的Broker缓存中会生成新的Leader Epoch条目:**
[Epoch=1, Offset=2]
**
小结
- 高水位在界定Kafka消息对外可见性以及实现副本机制方面起到非常重要的作用
- 但设计上的缺陷给Kafka留下了很多数据丢失或数据不一致的潜在风险
- 为此,社区引入了**
Leader Epoch
**机制,尝试规避这类风险,并且效果不错
参考资料
All articles in this blog are licensed under CC BY-NC-SA 4.0 unless stating additionally.