安装步骤

Kafka与Zookeeper

Kafka使用Zookeeper保存集群的元数据信息消费者信息

安装Zookeeper和Kafka

1
2
3
4
5
6
7
8
9
10
11
12
13
$ brew install kafka
==> Installing dependencies for kafka: zookeeper
==> Caveats
==> zookeeper
To have launchd start zookeeper now and restart at login:
brew services start zookeeper
Or, if you don't want/need a background service you can just run:
zkServer start
==> kafka
To have launchd start kafka now and restart at login:
brew services start kafka
Or, if you don't want/need a background service you can just run:
zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties & kafka-server-start /usr/local/etc/kafka/server.properties

启动Zookeeper和Kafka

当前服务列表

1
2
3
4
5
$ brew services list
Name Status User Plist
kafka stopped
mysql started zhongmingmao /Users/zhongmingmao/Library/LaunchAgents/homebrew.mxcl.mysql.plist
zookeeper stopped

启动Zookeeper

1
2
$ brew services start zookeeper
==> Successfully started `zookeeper` (label: homebrew.mxcl.zookeeper)

启动Kakfa

1
2
$ brew services start kafka
==> Successfully started `kafka` (label: homebrew.mxcl.kafka)

当前服务列表

1
2
3
4
5
$ brew services list
Name Status User Plist
kafka started zhongmingmao /Users/zhongmingmao/Library/LaunchAgents/homebrew.mxcl.kafka.plist
mysql started zhongmingmao /Users/zhongmingmao/Library/LaunchAgents/homebrew.mxcl.mysql.plist
zookeeper started zhongmingmao /Users/zhongmingmao/Library/LaunchAgents/homebrew.mxcl.zookeeper.plist

基本测试

创建主题

1
2
3
4
5
6
7
8
9
10
$ kafka-topics --zookeeper localhost:2181 --create --replication-factor 1 --partitions 1 --topic zhongmingmao
Created topic "zhongmingmao".

$ kafka-topics --zookeeper localhost:2181 --list
__consumer_offsets
zhongmingmao

$ kafka-topics --zookeeper localhost:2181 --describe --topic zhongmingmao
Topic:zhongmingmao PartitionCount:1 ReplicationFactor:1 Configs:
Topic: zhongmingmao Partition: 0 Leader: 0 Replicas: 0 Isr: 0

发布消息

1
2
3
4
5
$ kafka-console-producer --broker-list localhost:9092 --topic zhongmingmao
>zhongmingmao
>is
>learning kafka
>

读取消息

1
2
3
4
$ kafka-console-consumer --bootstrap-server localhost:9092 --topic zhongmingmao --from-beginning
zhongmingmao
is
learning kafka

Broker配置

常规配置

broker.id

  1. 每个Broker都需要一个标识符,使用broker.id来表示,默认为0,也可以被设置成任意整数
  2. 这个值在整个Kafka集群里必须是唯一
  3. 建议把它们设置成与机器名具有相关性的整数

zookeeper.connect

  1. 指定用来保存Broker元数据的Zookeeper地址
  2. 例如 hostname1:port1/path,hostname2:port2/path,hostname3:port3/path
    • /path 是可选的Zookeeper路径,作为Kafka集群的chroot环境
    • 如果不指定,默认使用根路径
    • 如果指定的chroot不存在,Broker会在启动的时候创建它
    • 在Kafka集群里使用chroot路径是一种最佳实践

log.dirs

  1. Kafka把所有的消息都保存在磁盘上,存放这些日志片段的目录是通过log.dirs指定的
  2. 一组逗号分隔的本地文件系统路径
  3. 如果指定了多个路径,那么Broker会依据最少使用原则,_把同一个分区的日志片段保存在同一个路径下_
    • Broker会往拥有最少分区数目的路径新增分区
    • 而不是往拥有最少磁盘空间的路径新增分区

num.recovery.threads.per.data.dir

Specify the maximum number of threads that are used for log recovery for each data directory.

处理时机
  1. 服务器正常启动,用于打开每个分区的日志片段
  2. 服务器崩溃后重启,用于检查和截断每个分区的日志片段
  3. 服务器正常关闭,用于关闭日志片段
多线程和单目录
  1. 默认情况下,每个日志目录只使用一个线程
    • 这些线程只在服务器启动或者关闭时会用到,所以完全可以设置大量的线程来达到并行操作的目的
  2. 所配置的数字对应的是log.dirs指定的单个日志目录
    • 如果num.recovery.threads.per.data.dir=8log.dirs=3,那么总共需要24个线程

auto.create.topics.enable

自动创建时机
  1. 当一个生产者开始往主题写入消息
  2. 当一个消费者开始从主题读取消息
  3. 当任意一个客户端向主题发送元数据请求

主题配置

num.partitions

  1. 指定新创建的主题包含多少个分区
  2. 如果启用了主题的自动创建功能(默认启用)
    • 可以增加主题分区的个数,但不能减少分区的个数
    • 如果要让一个主题分区的个数少于num.partitions指定的值,需要手动创建该主题
选定分区数量
  1. 主题 需要达到多大的吞吐量?100KB/S还是1GB/S?
  2. 单个分区读取数据的最大吞吐量是多少?
    • 每个分区一般都会有一个消费者
    • 如果消费者将数据写入数据库的速度不会超过50MB/S
      • 那么一个分区读取数据的吞吐量不需要超过50MB/S
  3. 通过类似的方法估算生产者向单个分区写入数据的吞吐量
    • 不过生产者的速度一般比消费者快得多,最好为生产者多估算一些吞吐量
  4. 每个Broker包含的分区个数可用的磁盘空间网络带宽
  5. 如果消息是按照不同的键写入分区的,那么为已有的主题新增分区就会很困难
  6. 单个Broker对分区个数是有限制的
    • 因为分区越多占用的内存越多,完成首领选举需要的时间也越长
    • 如果已经估算出主题的吞吐量消费者的吞吐量,用主题吞吐量除以消费者吞吐量算出分区的个数
    • 如果不知道这些信息,把单个分区的大小限制在25GB以内可以得到比较理想的效果

log.retention.{hours,minutes,ms}

  1. 决定消息多久以后会被删除,_默认一周_,推荐使用log.retention.ms
  2. 如果指定了不止一个参数,会优先使用具有最小值的那个参数
  3. 根据时间保留数据是通过检查磁盘上日志片段文件最后修改时间来实现的
    • 一般来说,最后修改时间指的是日志片段的关闭时间,也就是_日志片段文件里最后一个消息的时间戳_

log.retention.bytes

  1. 通过保留的字节数来判断消息是否过期,作用在每一个分区上,默认1GB
  2. 若同时指定了log.retention.mslog.retention.bytes,只要满足任意一个条件,消息就会被删除

log.segment.bytes

  1. 当消息到达Broker时,它们被追加到分区的当前日志片段
  2. 日志片段大小达到了log.segment.bytes指定的上限(默认1GB
    • 当前日志片段就会被关闭(记录最后修改时间),一个新的日志片段就会被打开
    • 如果一个日志片段被关闭,就开始等待过期
    • 这个参数越小,就会越频繁地关闭和分配新文件,从而降低磁盘写入的整体效率
  3. 如果主题的消息量不大,如何调整这个参数的大小变得尤为重要
    • 如果一个主题每天只接收100MB的消息,而log.segment.bytes采用默认值(1GB)
      • 那么需要10天的时间才能填满一个日志片段
    • 日志片段在被关闭之前,消息是不会过期的
    • log.retention.ms采用默认值(7天),那么日志片段需要17天才会过期
      • 需要等到日志片段里的最后一个消息过期才能被删除
  4. 日志片段的大小 会影响使用时间戳获取偏移量
    • 在使用时间戳获取日志偏移量
      • Kafka会检查分区里最后修改时间大于指定时间戳的日志片段,返回该日志片段开头的偏移量
      • 例如某个分区有3个日志片段:S1:T1,S2:T2,S3:T3,T1.5会返回S2开头的偏移量
    • 对于使用时间戳获取偏移量的操作来说,_日志片段越小,结果越准确_

segment.ms

  1. 指定多长时间之后日志片段会被关闭
  2. log.segment.bytessegment.ms不存在互斥关系,看哪个条件先得到满足
  3. 默认情况下,segment.ms没有设定值,所以一般依据log.segment.bytes来关闭日志片段
  4. 在使用基于时间的日志片段时,要着重考虑并行关闭多个日志片段对磁盘性能的影响

message.max.bytes

  1. 限制单个消息的大小默认1MB
  2. 该参数指的是_压缩后的消息大小_
  3. 值越大,那么负责网络连接和请求的线程就需要花越多的时间来处理这些请求
    • 而且还会增加磁盘写入块的大小,从而影响IO吞吐量
  4. 如果消费者客户端设置的fetch.message.max.bytesmessage.max.bytes
    • 那么消费者就无法读取比较大的消息,导致出现消费者被阻塞的情况

硬件选择

磁盘吞吐量

  1. 生产者客户端的性能直接受到服务器端磁盘吞吐量的影响
  2. 生产者生成的消息必须被提交到服务器保存

磁盘容量

  1. 需要多大的磁盘容量取决于需要保留的消息数量
  2. 在决定扩展Kafka集群规模时,存储容量是一个需要考虑的因素
  • 通过让主题拥有多个分区集群的总流量可以被均衡到整个集群
  • 如果单个Broker无法支持全部容量,可以让其它Broker提供可用的容量
  • 存储容量的选择同时受到集群复制策略的影响

内存

  1. 磁盘性能影响生产者,内存影响消费者
  2. 消费者读取的消息会直接存放在系统的页面缓存里,这比从磁盘上重新读取要快得多
  3. 运行Kafka的JVM不需要太大的内存
    • 剩余的系统内存可以用作页面缓存,或者用来缓存正在使用中的日志片段
    • 不建议把Kafka与其它重要的应用程序部署在一起,因为需要共享页面缓存
      • 最终会导致降低Kafka消费者的性能

网络

  1. 网络吞吐量决定了Kafka能处理的最大数据流量,它和磁盘存储是制约Kafka扩展规模的主要因素
  2. Kafka支持多个消费者,造成流入和流出的网络流量不平衡
  3. 集群复制镜像也会占用网络流量
    • 如果网络接口出现饱和,那么集群的复制出现延时就会在所难免,从而让集群不堪一击

CPU

  1. 与磁盘和内存相比,_Kafka对计算处理能力的要求相对较低_
  2. 计算处理
    • 客户端为了优化网络磁盘空间,会对消息进行压缩
    • 服务器需要对消息进行批量解压设置偏移量,然后重新进行批量压缩,再保存在磁盘上