Kafka -- KafkaAdminClient
背景
- 命令行脚本只能运行在控制台上,在应用程序、运维框架或者监控平台中集成它们,会非常困难
- 很多命令行脚本都是通过连接ZK来提供服务的,这会存在潜在的问题,即绕过Kafka的安全设置
- 运行这些命令行脚本需要使用Kafka内部的类实现,也就是Kafka服务端的代码
- 社区是希望用户使用Kafka客户端代码,通过现有的请求机制来运维管理集群
- 基于上述原因,社区于0.11版本正式推出Java客户端版的KafkaAdminClient
功能
- 主题管理
- 主题的创建、删除、查询
- 权限管理
- 具体权限的配置和删除
- 配置参数管理
- Kafka各种资源(Broker、主题、用户、Client-Id等)的参数设置、查询
- 副本日志管理
- 副本底层日志路径的变更和详情查询
- 分区管理
- 创建额外的主题分区
- 消息删除
- 删除指定位移之前的分区消息
- Delegation Token管理
- Delegation Token的创建、更新、过期、查询
- 消费者组管理
- 消费者组的查询、位移查询和删除
- Preferred领导者选举
- 推选指定主题分区的Preferred Broker为领导者
工作原理
- KafkaAdminClient是双线程设计
- 前端主线程
- 负责将用户要执行的操作转换成对应的请求,然后将请求发送到后端IO线程的队列中
- 后端IO线程
- 从队列中读取相应的请求,再发送到对应的Broker节点上,之后把执行结果保存起来,等待前端线程的获取
- 前端主线程
- KafkaAdminClient在内部大量使用生产者-消费者模式将请求生成和处理解耦
- 前端主线程会创建名为Call的请求对象实例,该实例有两个主要任务
- 构建对应的请求对象
- 创建主题:CreateTopicsRequest
- 查询消费者组位移:OffsetFetchRequest
- 指定响应的回调逻辑
- 比如从Broker端接收到CreateTopicsResponse之后要执行的动作
- 构建对应的请求对象
- 后端IO线程使用了3个队列来承载不同时期的请求对象,分别为新请求队列、待发送请求队列和处理中请求队列
- 原因:新请求队列的线程安全是由Java的Monitor锁来保证的
- 为了保证前端线程不会因为Monitor锁被阻塞,后端IO线程会定期地将新请求队列中的所有Call实例全部搬移到待发送请求队列中进行处理
- 待发送请求队列和处理中请求队列只由后端IO线程处理,因为无需任何锁机制来保证线程安全
- 当后端IO线程在处理某个请求时,会显式地将请求保存在处理中请求队列
- 一旦处理完毕,后端IO线程会自动调用Call对象中的回调逻辑完成最后的处理
- 最后,后端IO线程会通知前端主线程说结果已经准备完毕,这样前端主线程就能够及时获取到执行操作的结果
- KafkaAdminClient是使用了Object的wait和notify来实现通知机制
- KafkaAdminClient并没有使用Java已有的队列去实现请求队列
- 而是使用ArrayList和HashMap等简单容器,再配合Monitor锁来保证线程安全
- 后端线程名称:**
kafka-admin-client-thread
,可以用jstack
**去确认程序是否正常工作- 后端IO线程可能由于未捕获某些异常而意外挂掉
- 原因:新请求队列的线程安全是由Java的Monitor锁来保证的
应用场景
创建主题
1 | Properties props = new Properties(); |
查询消费者组位移
1 | String groupId = "zhongmingmao"; |
获取Broker磁盘占用
1 | try (AdminClient client = AdminClient.create(props)) { |
参考资料
All articles in this blog are licensed under CC BY-NC-SA 4.0 unless stating additionally.