Kafka -- 可靠性
可靠性保证
- 可靠性保证:确保系统在各种不同的环境下能够发生一致的行为
- Kafka的保证
- 保证_分区消息的顺序_
- 如果使用同一个生产者往同一个分区写入消息,而且消息B在消息A之后写入
- 那么Kafka可以保证消息B的偏移量比消息A的偏移量大,而且消费者会先读取消息A再读取消息B
- 只有当消息被写入分区的所有同步副本时(文件系统缓存),它才被认为是已提交
- 生产者可以选择接收不同类型的确认,控制参数
acks
- 生产者可以选择接收不同类型的确认,控制参数
- 只要还有一个副本是活跃的,那么已提交的消息就不会丢失
- 消费者只能读取已经提交的消息
- 保证_分区消息的顺序_
复制
- Kafka可靠性保证的核心:_复制机制_ + 分区的多副本架构
- 把消息写入多个副本,可以使Kafka在发生崩溃时仍能保证消息的持久性
- Kafka的主题被分成多个分区,分区是基本的数据块,分区存储在单个磁盘上
- Kafka可以保证分区里的事件总是有序的,分区可以在线(可用),也可以离线(不可用)
- 每个分区可以有多个副本,其中一个副本是首领副本
- 所有的事件都直接发送给首领副本,或者直接从首领副本读取事件
- 其他副本只需要与首领副本保持同步,并及时复制最新的事件即可
- 当首领副本不可用时,其中一个同步副本将成为新的首领
- _分区首领是同步副本_,对于跟随者副本来说,需要满足下列的全部条件才被认为是同步的
- 与ZooKeeper之间有一个活跃的会话(6S内的心跳)
- 在过去的10S内从首领副本那里获取过最新的消息(几乎零延迟)
- 一个不同步的副本通过与ZooKeeper重新建立连接,并从首领副本那里获取最新的消息,可以重新变成同步的
- 这个过程在网络出现临时问题时很快就能得到修复,但如果Broker发生崩溃就需要较长的时间
- 如果一个或多个副本在同步和非同步状态之间_快速切换_
- 说明集群内部出现了问题,通常是由于Java不恰当的垃圾回收配置导致的
- 不恰当的垃圾回收配置会造成几秒钟的停顿,从而导致Broker和ZooKeeper之间断开连接
- 最后变成不同步,进而发生状态切换
- 一个滞后的同步副本会导致生产者和消费者变慢
- 因为消息在被认为已提交之前,客户端会等待所有同步副本接收消息
- 如果一个副本不再同步了,那么我们将不再关心它是否已经接收到消息
- 因此,_非同步副本不会对性能造成任何影响_
- 但更少的同步副本意味着更低的有效复制系数,在发生宕机时丢失数据的风险就会变大
Broker配置
Broker有3个配置参数会影响到Kafka消息存储的可靠性,可以应用于Broker级别(所有主题),也可以应用于主题级别
复制系数
- 主题级别的配置参数为
replication.factor
,Broker级别的配置参数为default.replication.factor
- Kafka的默认复制系数为3,即使在主题创建后,仍然可以通过新增或移除副本来改变复制系数
- 如果复制系数为
N
,那么在N-1
个Broker失效的情况下,仍然能够从主题读取数据或向主题写入数据- 更高的复制系数可以带来_更高的可用性、可靠性和更少的故障_
- 但会占用_更多的磁盘空间_
- 主题的复制系数与主题的重要程度成正相关
- 在要求可用性的场景下,把复制系数设置为3,已经足够安全了,银行可能会使用5个副本
- 副本的分布也很重要
- Kafka会确保分区的每个副本被放在不同的Broker上
- 同时,为了避免机架级别的故障,建议把Broker分布在不同的机架上,控制参数为
broker.rack
不完全的首领选举
unclean.leader.election.enable
只能在Broker级别进行设置,默认值为false- 当分区首领不可用时,一个同步副本会被选为新的分区首领
- 如果选举过程中没有丢失数据,即提交到旧首领的数据同时存在于所有的同步副本上,那么这个选举过程是完全的
- 在首领不可用时,其他副本都不同步的场景
- 分区有3个副本,其中两个跟随者不可用,这时如果生产者继续往首领写入数据,所有消息都会得到确认并被提交
- 因为首领是唯一同步的副本
- 如果首领也不可用了,恰巧之前的一个跟随者重新启动,该跟随者就成为分区的唯一不同步副本
- 分区有3个副本,因为网络问题导致两个跟随者复制消息滞后,尽管它们还在复制消息,但已经不同步了
- 首领作为唯一同步的副本继续接收消息
- 如果首领变为不可用,另外两个副本再也无法变成同步的了
- 分区有3个副本,其中两个跟随者不可用,这时如果生产者继续往首领写入数据,所有消息都会得到确认并被提交
两难选择
- 如果不同步的副本不能被提升为新首领,那么分区在旧首领恢复之前是不可用的,牺牲了可用性
- 如果不同步的副本可以被提升为新首领,那么这个副本变为不同步之后写入旧首领的消息会全部消失,导致数据不一致
- 假设在副本0和副本1不可用时,偏移量100~200的消息被写入副本2(首领)
- 现在副本2也变为不可用,而副本0变成了可用,副本0只包含0~100的消息,不包含偏移量100~200的消息
- 如果允许副本0成为新首领,生产者可以继续写入数据,消费者可以继续读取数据,保证了可用性
- 于是,新首领(副本0)就有了偏移量100~200的新消息
- 但是,部分消费者会读到100~200的旧消息,部分消费者会读到为100~200的新消息,部分消费者读到两者的混合
- 小结
- 如果不允许不同步的副本成为新首领,那么就要接受较低的可用性
- 如果允许不同步的副本成为新首领,就要承担丢失数据和出现数据不一致的风险
最少同步副本
min.insync.replicas
,可以在主题级别和Broker级别上进行配置- 尽管为一个主题配置了3个副本,但还是会出现只有一个同步副本的情况
- Kafka对可靠性保证的定义:消息只有被写入到所有同步副本之后才被认为是已提交的
- 如果所有同步副本只剩下一个,那么在这个副本变为不可用时,数据就会丢失
- 如果
min.insync.replicas=2
,那么_至少要存在两个同步副本才能向分区写入数据_- 如果只有一个同步副本,那么Broker就会停止接受生产者的请求
- 此时Broker变成了只读
- 尝试发送数据的生产者会收到
NotEnoughReplicasException
异常 - 消费者仍然可以继续读取已有的数据
- 尝试发送数据的生产者会收到
- 这是为了避免发生不完全选举时数据的写入和读取出现非预期的行为
在可靠的系统里使用生产者
反例
反例1
- 为Broker配置了3个副本,并且禁用了不完全首领选举
- 把生产者的
acks
设置为1
(只要首领接收到消息就可以认为消息写入成功) - 生产者发送一个消息给首领,首领成功写入,但跟随者副本还没有收到这个消息
- 首领向生产者发送一个响应,告诉生产者消息写入成功,然后首领崩溃了,此时消息还没有被其他副本复制过去
- 此时另外两个副本仍然被认为是同步的(判断一个副本不同步需要一小段时间)
- 其中一个副本成为了新的首领,因为消息还没有被写入这个副本,所以消息丢失了
- 但生产者却认为消息已经成功写入了
- 因为消费者看不到丢失的消息,所以此时的系统仍然是一致的(因为副本没有收到这个消息,所以消息不算已提交)
- 但从生产者角度来看,它丢失了一个消息
反例2
- 为Broker配置了3个副本,禁用了不完全首领选举,并且把生产者的
acks
设置为all
- 假设现在往Kafka发送给消息,分区的首领刚好崩溃,新的首领正在选举当中,Kafka会往生产者返回首领不可用的响应
- 此时,如果生产者没能正确处理这个错误,也没有重试发送消息直到发送成功,那么消息也有可能丢失
- 但这不能算是Broker的可靠性问题,因为Broker并没有收到这个消息
- 也不是一致性问题,因为消费者并没有读到这个消息
小结
- 根据可靠性需求配置恰当的
acks
值 - 在参数配置和代码里正确地处理错误
发送确认
acks=0
:如果生产者能够通过网络把消息发送出去,那么就认为消息已经成功写入Kafka- 如果分区离线或者整个集群长时间不可用,那么就不会收到任何错误
- 即使在完全首领选举的情况下,仍有可能丢失消息,因为在新首领选举过程中,生产者并不知道首领已经不可用了
- 在该模式下,运行速度是非常快的,可以得到惊人的吞吐量和带宽利用率,但会丢失一些数据
acks=1
:首领在收到消息并把它写入到分区数据文件(Linux文件系统缓存)时返回确认或错误响应- 在该模式下,如果发生正常的首领选举,生产者会在选举时收到
LeaderNotAvailableException
异常 - 如果生产者能够恰当地处理该异常,那么它就会重试发送消息,最终消息会安全到达新首领
- 但仍有可能丢失数据,例如消息已经成功写入首领,但在消息被复制到跟随者副本之前首领发生崩溃
- 在该模式下,如果发生正常的首领选举,生产者会在选举时收到
acks=all
:首领在返回确认或者错误响应之前,会等待所有同步副本都收到消息- 如果和
min.insync.replicas
结合,就能决定在返回确认前至少有多少个副本能够收到消息 - 这是最保险的做法,生产者会一直重试直到消息被成功提交
- 但这也是最慢的做法,生产者在继续发送其他消息之前需要等待所有副本都收到当前的消息
- 可以通过使用异步模式和更大的批次来加快速度,但这样通常会降低吞吐量
- 如果和
重试配置
- 生产者向Broker发送消息时,Broker可以返回一个成功响应码或者一个错误响应码
- 错误响应码分类:一种是可以通过重试解决,一种是无法通过重试解决
- 如果Broker返回的是
LeaderNotAvailableException
,生产者可以通过尝试重新发送消息来解决 - 如果Broker返回的是
InvalidConfigurationException
,即使通过重试也无法改变配置选项
- 如果Broker返回的是
- 如果目标是不丢失任何消息,最好让生产者遇到可重试错误时能够_保持重试_
- 重试发送一个已经失败的消息会带来一些风险,如果两个消息都写入成功,会导致_消息重复_
- 重试和恰当的错误处理可以保证每个消息至少被保存一次
- 目前的Kafka版本无法保证每个消息只被保存一次
- 现实中的很多应用程序在消息里加入唯一标识符,用于检测重复消息
- 另外还需要应用程序可以做到消息的幂等
在可靠的系统里使用消费者
- 只有被提交到Kafka(已经被写入所有同步副本)的消息,对消费者才是可用的
- 对消费者而言,读取到的消息已经具备了一致性
- 消费者唯一要做的是要跟踪哪些消息已经读取过,哪些没有被读取过
- 从分区读取数据时,消费者会获取一批事件,检查这批事件里最大的偏移量,然后从这个偏移量开始读取另一批事件
- 这样保证消费者总能以正确的顺序获取新数据,不会错过任何事件
- 如果一个消费者退出,另一个消费者需要知道前一个消费者在退出前处理的最后一个偏移量是多少
- 因此消费者需要提交偏移量
- 消费者把当前读取的偏移量保存起来,在退出之后,同一个群组里的其他消费者就可以接手它的工作
- 如果消费者提交了偏移量却未能处理完消息,那么就可能会造成_消息丢失_
- 已提交消息 VS 已提交偏移量
- 已提交消息:已经被写入所有同步副本并且对消费者可见的消息
- 已提交偏移量:消费者发送给Kafka的偏移量,用于确认它已经收到并处理好的消息位置
消费者的可靠性配置
- group.id
- 如果两个消费者具有相同的
group.id
,并且订阅了同一个主题,那么每个消费者会分到主题分区的一个_子集_
- 如果两个消费者具有相同的
- auto.offset.reset
- 指定了在没有偏移量可提交时或者请求的偏移量在Broker不存在时,消费者的行为
earliest
:消费者会从分区的开始位置读取数据,不管偏移量是否有效- 导致消费者读取大量的重复数据,但可以保证最少的数据丢失
latest
:消费者会从分区的末尾开始读取数据- 可以减少重复处理消息,也有可能会错过一些消息
- enable.auto.commit
- 消费者基于任务调度自动提交偏移量
- 如果消费者在轮询操作里处理完所有的数据,那么自动提交可以保证_只提交已经处理过的偏移量_
- 自动提交的主要缺点
- 无法控制重复处理消息(比如消费者在自动提交偏移量之前停止处理消息)
- 如果把消息交给另一个后台线程去处理,自动提交机制可能会在消息还没处理完毕就提交偏移量
- auto.commit.interval.ms
- 默认是每5S提交一次,频繁提交会增加额外的开销,但也会降低重复处理消息的概率
显式提交偏移量
- 总是在处理完事件后再提交偏移量
- 如果所有的处理都在轮询里完成,并且不需要在轮询之间维护状态(例如为了实现聚合操作)
- 那么可以使用自动提交,或者在轮询结束后进行手动提交
- 提交频率是性能和重复消息之间的权衡
- 即使在最简单的场景里,仍然可以在一个循环里多次提交偏移量
- 也可以在每处理完一个事件之后,或者多个循环里只提交一次
- 确保对提交的偏移量心里有数
- 在轮询过程中提交偏移量有个不好的地方
- 就是提交的偏移量有可能是读取到的最新偏移量,而不是处理过的最新偏移量
- 因此,必须确保处理完消息后再提交偏移量,否则会导致消费者错过消息
- 在轮询过程中提交偏移量有个不好的地方
- 再均衡
- 在设计应用程序时要注意处理消费者的再均衡问题
- 例如,一般要在分区被撤销之前提交偏移量,并在分配到新分区时清理之前的状态
- 消费者可能需要重试
- 场景:在进行轮询之后,有些消息没有被完全处理,需要稍候再来处理
- 例如要把Kafka的数据写到数据库里,不过在那个时间数据库恰好不可用,需要稍候再试
- 提交的是偏移量,而不是对消息的确认
- 如果记录#30处理失败,但记录#31处理成功,那么就不应该提交#31
- 否则会导致#31以内的偏移量都被提交,包括#30
- 解决方案
- 方案1
- 在遇到可重试错误时,提交最后一个处理成功的偏移量,然后把还没有处理好的消息保存到缓存区里
- 调用消费者的
pause()
方法来确保其他的轮询不会返回数据,_在保持轮询的同时尝试重新处理_ - 如果重试成功,或者重试次数达到上限并决定放弃,那么把错误记录下来并丢弃消息
- 然后调用
resume()
方法让消费者继续从轮询里获取新数据
- 方案2
- 在遇到可重试错误时,把错误写入一个独立的主题,然后继续
- 一个独立的消费者群组负责从该主题上读取错误消息,并进行重试
- 该模式有点类似其他消息系统的
dead-letter-queue
- 方案1
- 场景:在进行轮询之后,有些消息没有被完全处理,需要稍候再来处理
- 消费者可能需要维护状态
- 有时会希望在多个轮询之间维护状态
- 例如想计算消息的移动平均数,希望在首次轮询之后计算平均数,然后在后续的轮询中更新这个结果
- 提交偏移量的同时把最近计算的平均数写到一个结果的主题上
- 消费者线程在重新启动之后,就可以拿到最近的平均事并接着计算
- 由于Kafka并没有提供事务支持,消费者有可能写入平均数之后来不及提交偏移量就崩溃了
- 有时会希望在多个轮询之间维护状态
- 长时间处理
- 有时候处理数据需要很长时间
- 但是暂停轮询的时间不能超过几秒钟,即使不想获得更多的数据,也要保持轮询,这样客户端才能往Broker发送心跳
- 解决方案
- 使用一个线程池来处理数据,使用多个线程可以进行并行处理,从而加快处理速度
- 把数据移交给线程池去处理之后,就可以暂停消费者,然后保持轮询,但不获取新数据,直到处理完成
- 在工作线程处理完成之后,让消费者继续获取新数据
- 消费者一直保持轮询,心跳会正常发送,就不会发生再均衡
- 仅一次传递
- 应用程序不仅仅需要至少一次(
at-least-once
,没有数据丢失)语义,还需要仅一次(exactly-once
)语义 - 目前Kafka还不能完全支持仅一次语义,消费者采用其他办法来保证Kafka里的每个消息只被写到外部系统一次
- 但不会处理向Kafka写入数据时可能出现的重复数据
- 实现仅一次处理最简单且最常用的办法是把结果写到一个支持唯一键的系统里,比如键值存储引擎,关系型数据库等
- 这种情况下
- 要么消息本身包含一个唯一键
- 要么使用主题、分区和偏移量的组合来创建唯一键(唯一标识一个Kafka记录)
- 如果你把消息和一个唯一键写入系统,然后恰巧又读到一个相同的消息,只要把原先的键值覆盖掉即可
- 数据存储引擎会覆盖已经存在的键值对,就像没有出现过重复数据一样,这个模式叫作_幂等性写入_
- 这种情况下
- 如果写入消息的系统支持事务
- 最简单的是使用关系型数据库,把消息和偏移量放到同一个事务里,这样它们就能保持同步
- 在消费者启动时,会获取最近处理过的消息偏移量,然后调用
seek()
方法从该偏移量继续读取数据
- 应用程序不仅仅需要至少一次(
All articles in this blog are licensed under CC BY-NC-SA 4.0 unless stating additionally.