Kafka -- 位移主题
ZooKeeper
- 老版本Consumer的位移管理依托于Apache ZooKeeper,自动或手动地将位移数据提交到ZK中保存
- 当Consumer重启后,能自动从ZK中读取位移数据,从而在上次消费截止的地方继续消费
- 这种设计使得Kafka Broker不需要保存位移数据,减少了Broker端需要持有的状态空间,有利于实现高伸缩性
- 但ZK并不适用于高频的写操作
位移主题
- 将Consumer的位移数据作为普通的Kafka消息,提交到
__consumer_offsets
(保存Consumer的位移信息)- 提交过程需要实现高持久性,并需要支持高频的写操作
- 位移主题是普通的Kafka主题,同时也是一个内部主题,交由Kafka管理即可
- 位移主题的消息格式由Kafka定义,用户不能修改
- 因此不能随意向位移主题写消息,一旦写入的消息不能满足格式,那Kafka内部无法成功解析,会造成Broker崩溃
- Kafka Consumer有API来提交位移(即向位移主题写消息)
消息格式
- 常用格式:Key-Value
- Key为消息键值,Value为消息体,在Kafka中都是字节数组
- Key
<Group ID, Topic, Partition>
- Value
- Offset + Other MetaData(时间戳等,这是为了执行各种各样的后续操作,例如删除过去位移信息等)
- 用于保存Consumer Group信息的消息
- 用来注册Consumer Group
- 用于删除Group过期位移甚至删除Group的消息
- 专属名词:tombstone消息,即墓碑消息,也称delete mark,主要特点是消息体为null
- 一旦某个Consumer Group下所有的Consumer实例都停止,而且它们的位移数据已被删除
- Kafka会向位移主题的对应分区写入tombstone消息,表明要彻底删除这个Consumer Group
创建位移主题
- Kafka集群中的第一个Consumer程序启动时,会自动创建位移主题
- Broker端参数:offsets.topic.num.partitions=50,Kafka会自动创建50分区的位移主题
- Broker端参数:offsets.topic.replication.factor=3,Kafka会自动创建3副本的位移主题
- 手动创建位移主题
- 在Kafka集群尚未启动任何Consumer之前,使用Kafka API来创建
- 推荐:采用Kafka的自动创建
提交位移
- 自动提交位移
- enable.auto.commit=true
- Consumer在后台默默地定期提交位移,提交间隔由参数控制
auto.commit.interval.ms
- 缺点
- 完全无法把控Consumer端的位移管理
- 很多与Kafka集成的大数据框架都禁用自动提交位移的,如Spark、Flink等
- 只要Consumer一直启动着,就会无限期地向位移主题写入消息
- 假设Consumer当前消费了某个主题的最新一条消息,位移为100,之后该主题就没有产生任何新消息
- 但由于设置了自动提交位移,位移主题会不停地写入位移=100,这就要求位移主题有特定的消息删除策略
- 完全无法把控Consumer端的位移管理
- 手动提交位移
- enable.auto.commit=false
- 需要应用程序手动提交位移
删除过期消息
- 策略:Compaction(整理)
- Kafka使用Compact策略来删除位移主题中的过期消息,避免该主题无限期膨胀
- 过期消息:对于同一个Key的两条消息M1和M2,如果M1的发送时间早于M2,那么M1就是过期消息
- Compact过程:扫描日志的所有消息,剔除过期的消息,然后把剩下的消息整理在一起
- Kafka提供了专门的后台线程(Log Cleaner)来定期巡检待Compact的主题
- 如果位移主题无限期膨胀,占用过多的磁盘空间,检查下Log Cleaner线程的状态
参考资料
All articles in this blog are licensed under CC BY-NC-SA 4.0 unless stating additionally.