Kafka -- 多线程消费者
Kafka Java Consumer设计原理
- Kafka Java Consumer从Kafka 0.10.1.0开始,KafkaConsumer变成了双线程设计,即用户主线程和心跳线程
- 用户主线程:启动Consumer应用程序main方法的那个线程
- 心跳线程:只负责定期给对应的Broker机器发送心跳请求,以标识消费者应用的存活性
- 引入心跳线程的另一个目的
- 将心跳频率和主线程调用KafkaConsumer.poll方法的频率分开,解耦真实的消息处理逻辑和消费组成员存活性管理
- 虽然有了心跳线程,但实际的消息获取逻辑依然是在用户主线程中完成
- 因此在消费消息的这个层面,依然可以安全地认为KafkaConsumer是单线程的设计
- 老版本Consumer是多线程的架构
- 每个Consumer实例在内部为所有订阅的主题分区创建对应的消息获取线程,即Fetcher线程
- 老版本Consumer同时也是阻塞式的,Consumer实例启动后,内部会创建很多阻塞式的消息获取迭代器
- 但在很多场景下,Consumer端有非阻塞需求,如在流处理应用中执行过滤、分组等操作就不能是阻塞式的
- 基于这个原因,社区为新版本Consumer设计了单线程+轮询的机制,该机制能较好地实现非阻塞的消息获取
- 单线程的设计简化了Consumer端的设计
- Consumer获取到消息后,处理消息的逻辑是否采用多线程,完全由使用者决定
- 不论使用哪一种编程语言,单线程的设计都比较容易实现
- 并不是所有的编程语言都能很好地支持多线程,而单线程设计的Consumer更容易移植到其他语言上
多线程方案
- KafkaConsumer是线程不安全的
- 不能多线程共享一个KafkaConsumer实例,否则会抛出ConcurrentModificationException
- 但KafkaConsumer.wakeup()是线程安全的
方案1
- 消费者程序启动多个线程,每个线程维护专属的KafkaConsumer实例,负责完整的消息获取、消息处理流程
- 优点
- 实现简单,比较符合目前使用Consumer API的习惯
- 多个线程之间没有任何交互,省去了很多保障线程安全方面的开销
- Kafka主题中的每个分区都能保证只被一个线程处理,容易实现分区内的消息消费顺序
- 缺点
- 每个线程都维护自己的KafkaConsumer实例,必然会占用更多的系统资源,如内存、TCP连接等
- 能使用的线程数受限于Consumer订阅主题的总分区数
- 每个线程完整地执行消息获取和消息处理逻辑
- 一旦消息处理逻辑很重,消息处理速度很慢,很容易出现不必要的Rebalance,引发整个消费者组的消费停滞
1 | public class KafkaConsumerRunner implements Runnable { |
方案2
- 消费者程序使用单个或多个线程获取消息,同时创建多个消费线程执行消息处理逻辑
- 获取消息的线程可以是一个,也可以是多个,每个线程维护专属的KafkaConsumer实例
- 处理消息则由特定的线程池来做,从而实现消息获取和消息处理的真正解耦
- 优点
- 把任务切分成消息获取和消息处理两部分,分别由不同的线程来处理
- 相对于方案1,方案2最大的优势是它的高伸缩性
- 可以独立地调节消息获取的线程数,以及消息处理的线程数,不必考虑两者之间是否相互影响
- 缺点
- 实现难度大,因为要分别管理两组线程
- 消息获取和消息处理解耦,无法保证分区内的消费顺序
- 两组线程,使得整个消息消费链路被拉长,最终导致正确位移提交会变得异常困难,可能会出现消息的重复消费
1 | private final KafkaConsumer<String, String> consumer; |
参考资料
All articles in this blog are licensed under CC BY-NC-SA 4.0 unless stating additionally.