It has been 668 days since the last update, the content of the article may be outdated.
Consumer Lag
- Consumer Lag(滞后程度):消费者当前落后于生产者的程度
- Lag的单位是消息数,一般是在主题的级别上讨论Lag,但Kafka是在分区的级别上监控Lag,因此需要手动汇总
- 对于消费者而言,Lag是最重要的监控指标,直接反应了一个消费者的运行情况
- 一个正常工作的消费者,它的Lag值应该很小,甚至接近于0,滞后程度很小
- 如果Lag很大,表明消费者无法跟上生产者的速度,Lag会越来越大
- 极有可能导致消费者消费的数据已经不在操作系统的页缓存中了,这些数据会失去享有Zero Copy技术的资格
- 这样消费者不得不从磁盘读取这些数据,这将进一步拉大与生产者的差距
- 马太效应:_Lag原本就很大的消费者会越来越慢,Lag也会也来越大_
监控Lag
Kafka自带命令
kafka-consumer-groups
是Kafka提供的最直接的监控消费者消费进度的工具
- 也能监控独立消费者的Lag,独立消费者是没有使用消费者组机制的消费者程序,也要配置
group.id
- 消费者组要调用
KafkaConsumer.subscribe
,独立消费者要调用KafkaConsumer.assign
直接消费指定分区
- 输出信息
- 消费者组、主题、分区、消费者实例ID、消费者连接Broker的主机名、消费者的CLIENT-ID信息
- CURRENT-OFFSET:消费者组当前最新消费消息的位移值
- LOG-END-OFFSET:每个分区当前最新生产的消息的位移值
- LAG:LOG-END-OFFSET和CURRENT-OFFSET的差值
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| $ kafka-consumer-groups --bootstrap-server localhost:9092 --describe --group zhongmingmao
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID zhongmingmao zhongmingmao 1 5 5 0 consumer-1-24d9f1a8-662a-4d20-a360-26a12ddb0902 /192.168.2.1 consumer-1 zhongmingmao zhongmingmao 0 5 5 0 consumer-1-24d9f1a8-662a-4d20-a360-26a12ddb0902 /192.168.2.1 consumer-1 zhongmingmao zhongmingmao 4 6 6 0 consumer-1-24d9f1a8-662a-4d20-a360-26a12ddb0902 /192.168.2.1 consumer-1 zhongmingmao zhongmingmao 3 6 6 0 consumer-1-24d9f1a8-662a-4d20-a360-26a12ddb0902 /192.168.2.1 consumer-1 zhongmingmao zhongmingmao 2 6 6 0 consumer-1-24d9f1a8-662a-4d20-a360-26a12ddb0902 /192.168.2.1 consumer-1
$ kafka-consumer-groups --bootstrap-server localhost:9092 --describe --group zhongmingmao
Consumer group 'zhongmingmao' has no active members.
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID zhongmingmao zhongmingmao 1 5 5 0 - - - zhongmingmao zhongmingmao 0 5 5 0 - - - zhongmingmao zhongmingmao 4 6 6 0 - - - zhongmingmao zhongmingmao 3 6 6 0 - - - zhongmingmao zhongmingmao 2 6 6 0 - - -
|
Kafka Java Consumer API
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| private Map<TopicPartition, Long> lagOf(String groupId, String bootstrapServers) throws TimeoutException { Properties props = new Properties(); props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); try (AdminClient client = AdminClient.create(props)) { ListConsumerGroupOffsetsResult result = client.listConsumerGroupOffsets(groupId); try { Map<TopicPartition, OffsetAndMetadata> consumedOffsets = result.partitionsToOffsetAndMetadata().get(10, TimeUnit.SECONDS); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); try (final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) { Map<TopicPartition, Long> endOffsets = consumer.endOffsets(consumedOffsets.keySet()); return endOffsets.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, entry -> entry.getValue() - consumedOffsets.get(entry.getKey()).offset())); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); return Collections.emptyMap(); } catch (ExecutionException e) { return Collections.emptyMap(); } catch (TimeoutException e) { throw new TimeoutException("Timed out when getting lag for consumer group " + groupId); } } }
@Test public void getLagTest() throws TimeoutException { lagOf("zhongmingmao", "localhost:9092") .forEach((topicPartition, lag) -> log.info("partition: {}, lag: {}", topicPartition, lag)); }
|
1 2 3 4 5
| partition: zhongmingmao-1, lag: 0 partition: zhongmingmao-0, lag: 0 partition: zhongmingmao-4, lag: 0 partition: zhongmingmao-3, lag: 0 partition: zhongmingmao-2, lag: 0
|
Kafka JMX 监控指标
- 上面的两种方式,都可以很方便地查询到给定消费者组的Lag信息
- 但在实际监控场景中,往往需要借助现成的监控框架(如Zabbix/Grafana)
- 此时可以选择Kafka默认提供的JMX监控指标来监控消费者的Lag值
- 消费者提供了
kafka.consumer:type=consumer-fetch-manager-metrics,client-id="{client-id}"
的JMX指标
records-lag-max
和records-lead-min
分别代表此消费者在测试窗口时间内曾经达到的最大Lag值和最小Lead值
- Lead:消费者最新消费消息的位移与当前分区第一条消息位移的差值,_Lag越大,Lead越小_
- 一旦监测到Lead越来越小,甚至快接近于0,预示着消费者端要丢消息了
- Kafka消息是有留存时间的,默认是1周,如果消费者程序足够慢,慢到它要消费的数据快被Kafka删除
- 一旦出现消息被删除,从而导致消费者程序重新调整位移值的情况,可能产生两个后果
- 一个是消费者从头消费一遍数据
- 另一个是消费者从最新的消息位移处开始消费,之前没来得及消费的消息全部被跳过,造成丢消息的假象
- Lag值从100W增加到200W,远不如Lead值从200减少到100重要,实际生产环境中,要同时监控Lag值和Lead值
- 消费者还在分区级别提供了额外的JMX指标,用于单独监控分区级别的Lag和Lead值
kafka.consumer:type=consumer-fetch-manager-metrics,client-id="{client-id}",topic="{topic}",partition="{partition}"
- 多了
records-lag-avg
和records-lead-avg
,可以计算平均的Lag值和Lead值,经常使用
参考资料
Kafka核心技术与实战