CommitFailedException

  1. CommitFailedException是Consumer客户端在提交位移时出现的不可恢复的严重异常
  2. 如果异常是可恢复的瞬时错误,提交位移的API方法是支持自动错误重试的,如commitSync方法

解释

Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.

场景

场景1

1
2
3
4
5
6
7
8
9
Properties props = new Properties();
props.put("max.poll.interval.ms", 5000);
consumer.subscribe(Arrays.asList("test-topic"));

while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
Thread.sleep(6000L);
consumer.commitSync();
}
  1. **消息处理的总时间超过预设的max.poll.interval.ms**时,Consumer端会抛出CommitFailedException
  2. 解决方案
    • 缩短单条消息处理的时间
    • 增加max.poll.interval.ms
      • 使用0.10.1.0之前的客户端API,需要使用session.timeout.ms参数
      • session.timeout.ms还有其他含义,max.poll.interval.ms是从session.timeout.ms剥离出来的参数
    • 减少max.poll.records
    • 使用多线程来加速消费
      • 多线程如何提交位移是很容易出错的

场景2

  1. Kafka Java Consumer端提供了一个名为Standalone Consumer的独立消费者
    • 没有消费者组的概念,每个独立消费者实例都独立工作,彼此之间毫无联系
  2. 独立消费者的位移提交机制和消费者组是一样的,也必须指定group.id才能提交位移
  3. 如果同时出现了设置相同group.id消费者组程序和独立消费者程序
    • 独立消费者程序手动提交位移时,会抛出CommitFailedException,表明它不是消费者组内合法的成员

参考资料

Kafka核心技术与实战