Kafka -- 生产者管理TCP连接
建立TCP连接
创建KafkaProducer实例
1 | Properties properties = new Properties(); |
bootstrap.servers
是Producer的核心参数之一,指定了Producer启动时要连接的Broker地址- 如果
bootstrap.servers
指定了1000个Broker,那么Producer启动时会首先创建与这1000个Broker的TCP连接 - 因此不建议把集群中所有的Broker信息都配置到
bootstrap.servers
中,通常配置3~4台足够- Producer一旦连接到集群中的任意一台Broker,就能拿到整个集群的Broker信息(metadata request)
- 在创建KafkaProducer实例时启动Sender线程是不合理的
- 在对象构造器中启动线程会造成this指针逃逸,理论上Sender线程能够观测到一个尚未构造完成的KafkaProducer实例
- 在构造对象时创建线程是没有问题的,但最好不要同时启动线程
相关日志
1 | Sender - Starting Kafka producer I/O thread. |
其他场景
- 其他可能创建TCP连接的场景:更新元数据后,消息发送时
- 当Producer更新了集群的元数据后,如果发现与某些Broker当前没有连接,那么Producer会创建一个TCP连接
- 场景1
- 当Producer尝试向不存在的主题发送消息时,Broker会告诉Producer这个主题不存在
- 此时Producer会发送metadata request到Kafka集群,去尝试获取最新的元数据信息
- 与集群中所有的Broker建立TCP连接
- 场景2
- Producer通过
metadata.max.age.ms
参数定期地去更新元数据信息,默认值300000,即5分钟
- Producer通过
- 场景1
- 当Producer要发送消息时,Producer发现与目标Broker(依赖负载均衡算法)还没有连接,也会创建一个TCP连接
关闭TCP连接
- Producer端关闭TCP连接有两种方式:用户主动关闭、Kafka自动关闭
- 用户主动关闭
- 广义的主动关闭,包括用户调用
kill -9
来杀掉Producer,最推荐的方式:producer.close()
- 广义的主动关闭,包括用户调用
- Kafka自动关闭
- Producer端参数
connections.max.idle.ms
,默认值540000,即9分钟 - 如果9分钟内没有任何请求经过某个TCP连接,Kafka会主动把TCP连接关闭
connections.max.idle.ms=-1
会禁用这种机制,TCP连接将成为永久长连接- Kafka创建的Socket连接都开启了keepalive
- 关闭TCP连接的发起方是Kafka客户端,属于被动关闭的场景
- 被动关闭的后果就是会产生大量的CLOSE_WAIT连接
- Producer端或Client端没有机会显式地观测到此TCP连接已被中断
- Producer端参数
参考资料
All articles in this blog are licensed under CC BY-NC-SA 4.0 unless stating additionally.