Kafka -- 无消息丢失
持久化保证
- Kafka只对已提交的消息做有限度的持久化保证
- 已提交的消息
- 当Kafka的若干个Broker成功地接收到一条消息并写入到日志文件后,会告诉生产者这条消息已经成功提交
- 有限度的持久化保证
- Kafka不保证在任何情况下都能做到不丢失消息,例如机房着火等极端情况
消息丢失
生产者丢失
- 目前Kafka Producer是异步发送消息的,
Producer.send(record)
立即返回,但不能认为消息已经发送成功 - 丢失场景:网络抖动,导致消息没有到达Broker;消息太大,超过Broker的承受能力,Broker拒收
- 解决方案:Producer永远要使用带有回调通知的发送API,即**
Producer.send(record, callback)
**- callback能够准确地告知Producer消息是不是真的提交成功,一旦出现消息提交失败,可以进行针对性的处理
消费者丢失
- Consumer端丢失数据主要体现在Consumer端要消费的消息不见了
- Consumer程序有位移的概念,表示该Consumer当前消费到Topic分区的位置
- 丢失原因:Consumer接收一批消息后,在未处理完所有消息之前,就直接更新位移
- 解决方案:先消费消息,再更新位移
- 这种方式能最大限度地保证消息不丢失,但带来了重复消息的问题,因此Consumer需要支持幂等
- 如果采用多线程异步处理消息,Consumer程序要关闭自动提交位移,由应用程序手动提交位移
最佳实践
Producer
- 使用带有回调的发送API:
Producer.send(record, callback)
acks = all
,表示所有Broker都要接收到,该消息才算是已提交- 将
retries
设置为一个较大的值,Producer自动重试的次数
Broker
unclean.leader.election.enable = false
,控制哪些Broker有资格竞选分区Leader- 如果一个落后很多的Broker也能参与竞选并且成为新的Leader,必然会造成消息丢失
replication.factor >= 3
,将消息多保存几份副本,目前防止消息丢失的主要机制是冗余min.insync.replicas > 1
,消息至少被写入多少个副本才算已提交,生产环境中不能使用默认值1replication.factor > min.insync.replicas
,如果两者相等,只要有一个副本宕机,整个分区就无法正常工作了- 应该在不降低可用性的基础上,改善消息的持久性,防止数据丢失
- 推荐设置为**
replication.factor = min.insync.replicas + 1
**
Consumer
enable.auto.commit = false
,确保消息消费完再手动提交
参考资料
All articles in this blog are licensed under CC BY-NC-SA 4.0 unless stating additionally.