1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| public class AvgLatencyConsumerInterceptor implements ConsumerInterceptor<String, String> {
private Jedis jedis;
@Override public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) { AtomicLong latency = new AtomicLong(0L); records.forEach(record -> latency.addAndGet(System.currentTimeMillis() - record.timestamp())); jedis.incrBy("totalLatency", latency.get()); long totalLatency = Long.parseLong(jedis.get("totalLatency")); long totalSentMessage = Long.parseLong(jedis.get("totalSentMessage")); jedis.set("avgLatency", String.valueOf(totalLatency / totalSentMessage)); return records; }
@Override public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) { }
@Override public void close() { }
@Override public void configure(Map<String, ?> configs) { } }
|