Kafka -- 拦截器
设计思路
- 基本思想:允许应用程序在不修改逻辑的情况下,动态地实现一组可插拔的事件处理逻辑链
- 拦截器能够在主业务操作的前后多个时间点上插入对应的拦截逻辑
- 以配置拦截器类的方式动态插入到应用程序中,可以快速地切换不同的拦截器,而不影响主程序逻辑
Kafka拦截器
- Kafka拦截器自0.10.0.0版本被引入后并未得到太多的实际应用
- Kafka拦截器分为生产者拦截器和消费者拦截器
- 生产者拦截器:允许在发送消息前以及消息提交成功后植入拦截逻辑
- 消费者拦截器:允许在消费消息前以及提交位移后植入拦截逻辑
- Kafka拦截器支持链式调用,Kafka会按照添加顺序依次执行拦截器逻辑
- Kafka拦截器通过参数
interceptor.classes
来配置(生产者和消费者一致)- 指定拦截器类时需要使用全限定名
1 | Properties props = new Properties(); |
生产者拦截器
1 | public interface ProducerInterceptor<K, V> extends Configurable { |
消费者拦截器
1 | public interface ConsumerInterceptor<K, V> extends Configurable { |
案例:端到端延时
- Kafka默认提供的监控指标都是针对单个客户端或者Broker,缺少消息维度的监控
- 如何追踪一条消息在集群间的流转路径
- 如何监控一条消息从生产到消费的端到端延时
生产者拦截器
1 | public class AvgLatencyProducerInterceptor implements ProducerInterceptor<String, String> { |
消费者拦截器
1 | public class AvgLatencyConsumerInterceptor implements ConsumerInterceptor<String, String> { |
参考资料
All articles in this blog are licensed under CC BY-NC-SA 4.0 unless stating additionally.