安装kafka
kafka安装过程可参考
安装测试
- 创建并验证主题
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
> Created topic test.
./bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic test
>Topic:test PartitionCount:1 ReplicationFactor:1 Configs:
> Topic: test Partition: 0 Leader: 1 Replicas: 1 Isr: 1
- 发送测试消息
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
# --zookeeper 参数已不支持
borker 配置
常规配置
参考 中的 server.properties部分配置信息的说明。
如何选定分区数量
主题需要选定分区数量并不是一件可有可无的事情,需要考虑如下几个因素 :
- 主题需要达到多大的吞吐量?如每秒期望写入100KB还是1GB
- 从单个分区读取数据的最大吞吐量是多少?如消费者写入数据库速度不会超过50MB/S,那么一个分区不需要超过50MB/s
- 估算和产者向单个分区写入数据的吞吐量,生产者的速度 一般比消息者快,应多估算一些
- 每个boker包含的分区个数、可用的磁盘空间和网络带宽
- 如果消息是按照 不同的键来写入分区,那为已有的主题新增分区就会很困难
- 单个broker对分区个数是有限制的,因为分区越多,占用的内存越多,完成首领选举需要的时间越长
基于以上几个因素,需要很多分区,但不能太多,如果估算出主题的吞吐量和消费者吞吐量,可以用主题吞吐量除以消费者吞吐量算算出分区的个数。 如每秒要从主题上写入和读取1GB的数据,并且每个消费者每秒可以处理50M的数据,那至少20个分区,即可让20个消费者同时读取这些分区,从而达到1GB/S的吞吐量。 若不知道这些信息,可把分区的大小限制在25GB以内达到比较理想的效果。
如何确定需要多少个borker
若整个集群要保留10TB的数据,每个broker可以存储2TB,那至少需要5个broker。 若开启数据复制,那至少还需要一倍的空间
- 集群处理清求能力
网络接口处理客户端流量的能力 若开启数据复制 ,磁盘吞吐量低和系统内存不足也会成为性能问题,可扩展多个broker来解决
操作系统调优
1.虚拟内存
此参数指明了虚拟机的子系统将如何使用交换分区,而不是只把内存页从页面缓存里移除。
内存页和磁盘之间的交换对kafka各方面的性能都有重大影响。kafka大量地使用系统页面缓存,如果虚拟内存被交换到磁盘,说明没有多条内存可以分配 给页面缓存了。
避免内存交换的方法是不设置任何交换分区(交换分区不是必需的)
- vm.dirty_background_ratio设为小于10的值
此参数指的是系统内存的百分比,大部分情况为5即可
不能设置为0,会导致内核频繁的刷新页面,从而降低内核为底层设置的磁盘写入提供缓冲的能力
- vm.dirty_ratio 设置为大于20的值(系统内存的百分比)
此参数可以增加被内核进程刷新到磁盘之前的脏页数量
60~80是比较合理的区间,调整此参数会带来一些风险,包括未刷新磁盘操作的数量和同步 刷新引起的长时间I/O等待。
如果该参数设置较高,建议启用kafka的复制功能,避免因系统崩溃造 成的数据丢失。
在kafka运行期间检查脏页的数量,可在/proc/vmstat文件里查看
#cat /proc/vmstat | egrep “dirty|writeback”
nr_dirty 3875
nr_writeback 29
nr_writeback_temp0
2.磁盘
目前EXT4和XFS最为常见,EXT4可以做的很好,但需要更多的调优,存在较大的风险。 * EXT4引入了块分配延迟算法,一旦系统崩溃,更容易造成数据丢失和文件系统毁坏。 * XFS也使用分配延迟算法,不过比EXT4安全些。XFS为kafka提供 了更好的性能,无需额外的调优。批量磁盘写入具有更高的效率,提升整体的I/O吞吐量
对挂载点的noatime参数进行合理设置
文件元数据包含3个时间戳:创建时间(ctime),最后修改时间(mtime)和最后访问时间(atime),默认每次文件被读取都会更新atime,导致大量的磁盘写操作,所以完全可以把它禁用。
3.网络
默认系统内核没有针对 快速的大流量网络传输进行优化。
- 分配给socket读写缓冲区提升网络的传输性能 ,对应的参数分别是net.core.wmem_default 和 net.core.rmem_default,合理的值 是131072(128K),读写缓冲区最大值 对应的参数分别是net.core.wmem_max和net.core.rmem_max,合理值是2097152(2MB)。
- 设置TCP socket的读写缓冲区,参数分别是net.ipv4.tcp_wmem 和 net.ipv4.tcp_rmem,参数值由3个整数级成,用空格分隔,分别表示最小值,默认值和最大值。最大值不能大于net.core.wmem_max和net.core.rmem_max指定大小
- net.ipv4.tcp_window_scaling设置为1,启用tcp时间窗扩展,提升客户端传输数据的效率,并可以服务器进行缓冲。
- net.ipv4.tcp_max_syn_backlog设为比默认值 1024更大的值 ,可以接受更多的并发连接。
- net.core.netdev_max_backlog设为比默值 1000更大的值有助于应对网络流量的爆发,特别是在使用千兆网络的情况 下,允许更多的数据包排除等待内核处理
生产环境注意事项
垃圾回收器选项
kafka对堆内存的使用率非常高,容易产生垃圾对象,所以可以把这些值 设置小一些,若一台服务器64G内存,且使用5GB堆内存来运行kafka,可配置MaxGCPauseMillis为20ms,InitiatingHeapOccupancyPercent为35,这样垃圾回收可以早一些。
kafka的启动脚本未启用G1回收机器,而使用了Parallel New和CMS(Concurrent Mark-Sweep,并发票房和清除)垃圾回收器。可修改环境变量来修改:
# export KAFKA_JVM_PERFORMANCE_OPTS="-server -XX:+UseG1GC
-XX:MaxGCPauseMillis=20 -XX:InitiatingHeapIccupancyPerCent=35
-XX:+DisableExpilicitGC -Djava.awt.headless=true"
数据中心布局
最好把集群的broker安装在不同的机架上,至少不要让它们共享可能出现单点故障的基础设施,如电源和网络。随着时间的推移,机架也需要进行维护,而这会导致机器离线(移动机器或重新连接电源)
kafka生产者
创建kafka生产者
创建一个生产者对象,有3个属性必选:
- bootstrap.servers
指定broker的地址清单,地址格式:host:port。建议至少要提供两个boker的信息。
- key.serializer
broker希望接收到的消息的键和值 都是字节数组。
- value.serializer
指定的类会将值 序列化。
发送消息主要有以下3种方式 :
- 发送并忘记(fire-and-forget)
- 同步发送
- 异步发送
生产者的配置
1.acks
指定必须要有多少个分区副本收到消息,生产者才会认为消息写入是成功的
若acks=0,生产者在写入消息之前不会等待任何来自服务器的响应。
若acks=1,只要集群的首领节点收到消息,生产者就会收到一个来自服务器的成功响应。
若acks=all,只有当所有参与复制的节点全部收到消息时,生产者才会收到一个成功响应
2.buffer.memory
用来设置生产者内存缓冲区的大小,生产者用它缓冲要发送到服务器的消息
若应用程序发送消息的速度超过发送到服务器的速度 ,会导致生产者空间不足,导致 阻塞或异常
取决于block.on.buffer.full参数(新版本换成,max.block.ms,表示抛出异常之前可阻塞一段时间)
3.compression.type
默认消息发送时不会被压缩,可设置为snappy,gzip或lz4
snappy是Goole发明,占用较少CPU,提供较好的性能和压缩比
Gzip会占用较多的CPU,会提供更高的压缩比
4.retries
生产者可以重发消息的次数,默认会在每次重试之间等待100ms,可通过retry.backoff.ms改变间隔
5.batch.size
指定一个批次可以使用的内存大小,按照字节数据计算
半满批次或一个消息的批次均可能被发送,把批次大小设置很大,不会造成延迟,只会占更多内存,过小会频繁发送消息,增加额外开销
6.linger.ms
生产者在发送批次之前等待更多消息加入批次的时间。
7.client.id
可以是任意字符串,识别消息来源
8.max.in.flight.requests.per.connection
生产者在收到服务器响应之前可以发送多少个消息。
值 越高,越占内存,不过也会提升吞吐量,设置为1,保证消息的顺序写入
9.timeout.ms、request.timeout.ms和metadata.fetch.timeout.ms
request.timeout.ms 生产者发送数据时等待服务器返回响应时间
metadata.fetch.timeout.ms 生产者在获取元数据时等待时间与asks配置相匹配
10.max.block.ms
获取元数据时生产者的阻塞时间
11.max.request.size
控制生产者发送的请求大小
12.receive.buffer.bytes和send.buffer.bytes
分别指定TCP socker接收和发送数据包的缓存大小
设置为-1,则使用系统默认值
序列化器
Json,Avro,Thrift或Protobuf
閱讀更多 碼代碼 的文章