Kafka -- 提交位移
消费位移
- Consumer的消费位移,记录了Consumer要消费的下一条消息的位移
- 假设一个分区中有10条消息,位移分别为0到9
- 某个Consumer消费了5条消息,实际消费了位移0到4的5条消息,此时Consumer的位移为5,指向下一条消息的位移
- Consumer需要向Kafka汇报自己的位移数据,这个汇报过程就是提交位移
- Consumer能够同时消费多个分区的数据,所以位移的提交实际上是在分区粒度上进行的
- Consumer需要为分配给它的每个分区提交各自的位移数据
- 提交位移主要是为了表征Consumer的消费进度
- 当Consumer发生故障重启后,能够从Kafka中读取之前提交的位移值,然后从相应的位移处继续消费
- 位移提交的语义
- 如果提交了位移X,那么Kafka会认为位移值小于X的消息都已经被成功消费了
灵活
- 位移提交非常灵活,可以提交任何位移值,但要承担相应的后果
- 假设Consumer消费了位移为0~9的10条消息
- 如果提交的位移为20,位移位于10~19的消息可能会丢失
- 如果提交的位移为5,位移位于5~9的消息可能会被重复消费
- 位移提交的语义保障由应用程序保证,Kafka只会无脑地接受
- 位移提交的方式
- 从用户角度来看,分为自动提交和手动提交
- 从Consumer端来看,分为同步提交和异步提交
自动提交
- 自动提交:Kafka Consumer在后台默默地提交位移
- 参数
enable.auto.commit
,默认值为true,启用自动提交 - 参数
auto.commit.interval.ms
,默认值为5秒,Kafka每5秒会自动提交一次位移 - Kafka会保证在开始调用poll方法时,提交上次poll返回的所有消息
- poll方法的逻辑:先提交上一批消息的位移,再处理下一批消息,因此能够保证_消息不丢失_
- 自动提交可能会出现_重复消费_
- Consumer每5秒提交一次位移,若提交位移后3秒发生Rebalance,所有Consumer从上次提交的位移处继续消费
1 | Properties props = new Properties(); |
手动提交
enable.auto.commit=false
KafkaConsumer#commitSync()
- 提交
KafkaConsumer#poll()
返回的最新位移 - 同步操作,一直等待,直到位移被成功提交才会返回
- 需要处理完poll方法返回的所有消息后,才提交位移,否则会出现消息丢失
- Consumer处于阻塞状态,直到远端的Broker返回提交结果,才会结束
- 因为应用程序而非资源限制而导致的阻塞都可能是系统的瓶颈,会影响整个应用程序的TPS
- 提交
KafkaConsumer#commitAsync()
- 异步操作,立即返回,不会阻塞,不会影响Consumer应用的TPS,Kafka也提供了回调函数
- **
commitAsync
不能代替commitSync
**,因为commitAsync
不会自动重试- 如果异步提交后再重试,提交的位移值很可能已经过期,因此异步提交的重试是没有意义的
- 手动提交需要组合
commitSync
和commitAsync
,达到最优效果- 利用
commitSync
的自动重试来规避瞬时错误,如网络瞬时抖动、Broker端的GC等 - 利用
commitAsync
的非阻塞性,保证Consumer应用的TPS
- 利用
1 | // 同步提交 |
1 | // 异步提交 |
1 | // 同步提交 + 异步提交 |
精细化提交
- 上面的位移提交方式,都是提交poll方法返回的所有消息的位移,即提交最新一条消息的位移
- 精细化提交
commitSync(Map<TopicPartition, OffsetAndMetadata> offsets)
commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback)
1 | Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>(); |
参考资料
All articles in this blog are licensed under CC BY-NC-SA 4.0 unless stating additionally.