Kafka -- 调优
调优目标
- 主要目标:高吞吐量、低延时
- 吞吐量
- 即TPS,指的是Broker端进程或Client端应用程序每秒能处理的字节数或消息数
- 延时,可以有两种理解
- 从Producer发送消息到Broker持久化完成之间的时间间隔
- 端到端的延时,即从Producer发送消息到Consumer成功消费该消息的总时长
优化漏斗
优化漏斗是调优过程中的分层漏斗,层级越靠上,调优的效果越明显
操作系统层
mount -o noatime
- 在挂载文件系统时禁用atime(Access Time)更新,记录的是文件最后被访问的时间
- 记录atime需要操作系统访问inode资源,禁用atime可以避免inode访问时间的写入操作
- 文件系统选择ext4、XFS、ZFS
- 将swappiness设置成一个很小的值(1~10,默认是60),防止Linux的
OOM Killer
开启随机杀掉进程- swappiness=0,并不会禁止对swap的使用,只是最大限度地降低使用swap的可能性
- 因为一旦设置为0,当物理内存耗尽时,操作系统会触发OOM Killer
- OOM Killer会随机挑选一个进程然后kill掉,不会给出任何预警
- swappiness=N,表示内存使用**
(100-N)%
**时,开始使用Swap
- swappiness=0,并不会禁止对swap的使用,只是最大限度地降低使用swap的可能性
ulimit -n
设置大一点,否则可能会出现Too Many File Open错误vm.max_map_count
也设置大一点(如655360,默认值65530)- 在一个主题数超多的机器上,可能会碰到OutOfMemoryError:Map failed错误
- 页缓存大小
- 给Kafka预留的页缓存至少也要容纳一个日志段的大小(
log.segment.bytes
,默认值为1GB) - 消费者程序在消费时能直接命中页缓存,从而避免昂贵的物理磁盘IO操作
- 给Kafka预留的页缓存至少也要容纳一个日志段的大小(
JVM层
- 堆大小,经验值为6~8GB
- 如果需要精确调整,关注Full GC后堆上存活对象的总大小,然后将堆大小设置为该值的1.5~2倍
jmap -histo:live <pid>
可以人为触发Full GC
- 选择垃圾收集器
- 推荐使用G1,主要原因是优化难度比CMS小
- 如果使用G1后,频繁Full GC,配置
-XX:+PrintAdaptiveSizePolicy
,查看触发Full GC的原因 - 使用G1的另一个问题是大对象,即
too many humongous allocations
- 大对象一般指的是至少占用半个Region大小的对象,大对象会被直接分配在大对象区
- 可以适当增大
-XX:+G1HeapRegionSize=N
- 尽量避免Full GC!!
框架层
尽量保持客户端版本和Broker端版本一致,否则可能会丧失很多性能收益,如Zero Copy
应用程序层
- 不要频繁创建Producer和Consumer对象实例,构造这些对象的开销很大
- 用完及时关闭
- Producer对象和Consumer对象会创建很多物理资源,如Socket连接、ByteBuffer缓冲区,很容易造成资源泄露
- 合理利用多线程来改善性能,Kafka的Java Producer是线程安全的,而Java Consumer不是线程安全的
性能指标调优
TPS != 1000 / Latency(ms)
- 假设Kafka Producer以2ms的延时来发送消息,如果每次都只发送一条消息,那么
TPS=500
- 但如果Producer不是每次只发送一条消息,而是在发送前等待一段时间,然后统一发送一批消息
- 如Producer每次发送前等待8ms,总共缓存了1000条消息,总延时累加到了10ms,但
TPS=100,000
- 虽然延时增加了4倍,但TPS却增加了200倍,这就是批次化或微批次化的优势
- 用户一般愿意用较小的延时增加的代价,去换取TPS的显著提升,Kafka Producer就是采用了这样的思路
- 基于的前提:内存操作(几百纳秒)和网络IO操作(毫秒甚至秒级)的时间量级不同
调优吞吐量
- Broker端
- 适当增加**
num.replica.fetchers
(默认值为1),但不用超过CPU核数**- 生产环境中,配置了
acks=all
的Producer程序吞吐量被拖累的首要因素,就是副本同步性能
- 生产环境中,配置了
- 调优GC参数避免频繁Full GC
- 适当增加**
- Producer端
- 适当增加
batch.size
(默认值为16KB,可以增加到512KB或1MB)- 增加消息批次的大小
- 适当增加
linger.ms
(默认值为0,可以增加到10~100)- 增加消息批次的缓存时间
- 修改
compression.type
(默认值为none,可以修改为lz4或zstd,适配最好) - 修改
acks
(默认值为1,可以修改为0或1)- 优化的目标是吞吐量,不要开启
acks=all
(引入的副本同步时间通常是吞吐量的瓶颈)
- 优化的目标是吞吐量,不要开启
- 修改
retries
(修改为0)- 优化的目标是吞吐量,不要开启重试
- 如果多线程共享同一个Producer,增加
buffer.memory
(默认为32MB
)TimeoutException:Failed to allocate memory within the configured max blocking time
- 适当增加
- Consumer端
- 采用多Consumer进程或线程同时消费数据
- 适当增加
fetch.min.bytes
(默认值为1Byte,可以修改为1KB或更大)
调优延时
- Broker端
- 适当增加
num.replica.fetchers
- 适当增加
- Producer端
- 设置
linger.ms=0
- 设置
compression.type=none
- 压缩会消耗CPU时间
- 设置
acks=1
- 设置
- Consumer端
- 设置
fetch.min.bytes=1
- 设置
参考资料
All articles in this blog are licensed under CC BY-NC-SA 4.0 unless stating additionally.