Kafka -- 重设消费者组位移
背景
- Kafka和传统的消息引擎在设计上有很大的区别,Kafka消费者读取消息是可以重演的
- 像RabbitMQ和ActiveMQ等传统消息中间件,处理和响应消息的方式是破坏性
- 一旦消息被成功处理,就会从Broker上被删除
- Kafka是基于日志结构(Log-based)的消息引擎
- 消费者在消费消息时,仅仅是从磁盘文件中读取数据而已,是只读操作,因为消费者不会删除消息数据
- 同时,由于位移数据是由消费者控制的,因此能够很容易地修改位移值,实现重复消费历史数据的功能
- Kafka Or 传统消息中间件
- 传统消息中间件:消息处理逻辑非常复杂,处理代价高、又不关心消息之间的顺序
- Kafka:需要较高的吞吐量、但每条消息的处理时间很短,又关心消息的顺序
重设位移策略
- 位移维度
- 直接把消费者的位移值重设成给定的位移值
- 时间维度
- 给定一个时间,让消费者把位移调整成大于该时间的最小位移
维度 | 策略 | 含义 |
---|---|---|
位移维度 | Earliest | 把位移调整到当前最早位移处 |
Latest | 把位移调整到当前最新位移处 | |
Current | 把位移调整到当前最新提交位移处 | |
Specified-Offset | 把位移调整成指定位移 | |
Shift-By-N | 把位移调整成到当前位移+N处(N可以是负值) | |
时间维度 | DateTime | 把位移调整到大于给定时间的最小位移处 |
Duration | 把位移调整到距离当前时间指定间隔的位移处 |
- Earliest
- 最早位移不一定是0,在生产环境中,很久远的消息会被Kafka自动删除
- 如果想要重新消费主题的所有消息,可以使用Earliest策略
- Latest
- 如果想要跳过所有历史消息,打算从最新的消息处开始消费,可以使用Latest策略
- Specified-Offset
- 典型使用场景:消费者程序在处理某条错误消息时,可以手动跳过此消息的处理
- Duration
- 格式为
PnDTnHnMnS
,D、H、M、S分别代表天、小时、分钟、秒 - 如果想将位移调回到15分钟前,可以指定**
PT0H15M0S
**
- 格式为
重设位移方式
消费者API
1 | // org.apache.kafka.clients.consumer.Consumer |
- 每次调用
seek
方法只能重设一个分区的位移 seekToBeginning
和seekToEnd
可以一次性重设多个分区
Earliest
1 | Properties consumerProperties = new Properties(); |
Latest
1 | consumer.seekToEnd(consumer.partitionsFor(topic).stream().map( |
Current
1 | consumer.partitionsFor(topic).stream().map( |
Specified-Offset
1 | long targetOffset = 1234L; |
Shift-By-N
1 | for (PartitionInfo info : consumer.partitionsFor(topic)) { |
DateTime
1 | long ts = LocalDateTime.of(2019, 6, 20, 20, 0) |
Duration
1 | Map<TopicPartition, Long> timeToSearch = consumer.partitionsFor(topic).stream().map( |
命令行工具
从Kafka 0.11版本开始引入
Earliest
--to-earliest
1 | $ kafka-consumer-groups --bootstrap-server localhost:9092 --group test-group --reset-offsets --all-topics --to-earliest --execute |
Latest
--to-latest
1 | $ kafka-consumer-groups --bootstrap-server localhost:9092 --group test-group --reset-offsets --all-topics --to-latest --execute |
Current
--to-current
1 | $ kafka-consumer-groups --bootstrap-server localhost:9092 --group test-group --reset-offsets --all-topics --to-current --execute |
Specified-Offset
--to-offset
1 | $ kafka-consumer-groups --bootstrap-server localhost:9092 --group test-group --reset-offsets --all-topics --to-offset <offset> --execute |
Shift-By-N
--shift-by
1 | $ kafka-consumer-groups --bootstrap-server localhost:9092 --group test-group --reset-offsets --shift-by <offset_N> --execute |
DateTime
--to-datetime
1 | $ kafka-consumer-groups --bootstrap-server localhost:9092 --group test-group --reset-offsets --to-datetime 2019-09-26T00:00:00.000 --execute |
Duration
--by-duration
1 | $ kafka-consumer-groups --bootstrap-server localhost:9092 --group test-group --reset-offsets --by-duration PT0H30M0S --execute |
参考资料
All articles in this blog are licensed under CC BY-NC-SA 4.0 unless stating additionally.