1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| private Map<TopicPartition, Long> lagOf(String groupId, String bootstrapServers) throws TimeoutException { Properties props = new Properties(); props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); try (AdminClient client = AdminClient.create(props)) { ListConsumerGroupOffsetsResult result = client.listConsumerGroupOffsets(groupId); try { Map<TopicPartition, OffsetAndMetadata> consumedOffsets = result.partitionsToOffsetAndMetadata().get(10, TimeUnit.SECONDS); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); try (final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) { Map<TopicPartition, Long> endOffsets = consumer.endOffsets(consumedOffsets.keySet()); return endOffsets.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, entry -> entry.getValue() - consumedOffsets.get(entry.getKey()).offset())); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); return Collections.emptyMap(); } catch (ExecutionException e) { return Collections.emptyMap(); } catch (TimeoutException e) { throw new TimeoutException("Timed out when getting lag for consumer group " + groupId); } } }
@Test public void getLagTest() throws TimeoutException { lagOf("zhongmingmao", "localhost:9092") .forEach((topicPartition, lag) -> log.info("partition: {}, lag: {}", topicPartition, lag)); }
|