Kafka学习记录之一

安装kafka

kafka安装过程可参考

安装测试

  • 创建并验证主题
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

> Created topic test.

./bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic test

>Topic:test PartitionCount:1 ReplicationFactor:1 Configs:
> Topic: test Partition: 0 Leader: 1 Replicas: 1 Isr: 1
  • 发送测试消息
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
# --zookeeper 参数已不支持

borker 配置

常规配置

参考 中的 server.properties部分配置信息的说明。

如何选定分区数量

主题需要选定分区数量并不是一件可有可无的事情,需要考虑如下几个因素 :

  • 主题需要达到多大的吞吐量?如每秒期望写入100KB还是1GB
  • 从单个分区读取数据的最大吞吐量是多少?如消费者写入数据库速度不会超过50MB/S,那么一个分区不需要超过50MB/s
  • 估算和产者向单个分区写入数据的吞吐量,生产者的速度 一般比消息者快,应多估算一些
  • 每个boker包含的分区个数、可用的磁盘空间和网络带宽
  • 如果消息是按照 不同的键来写入分区,那为已有的主题新增分区就会很困难
  • 单个broker对分区个数是有限制的,因为分区越多,占用的内存越多,完成首领选举需要的时间越长

基于以上几个因素,需要很多分区,但不能太多,如果估算出主题的吞吐量和消费者吞吐量,可以用主题吞吐量除以消费者吞吐量算算出分区的个数。 如每秒要从主题上写入和读取1GB的数据,并且每个消费者每秒可以处理50M的数据,那至少20个分区,即可让20个消费者同时读取这些分区,从而达到1GB/S的吞吐量。 若不知道这些信息,可把分区的大小限制在25GB以内达到比较理想的效果。

如何确定需要多少个borker

  • 需要多少磁盘空间来保留数据,单个broker有多少空间可用
  • 若整个集群要保留10TB的数据,每个broker可以存储2TB,那至少需要5个broker。 若开启数据复制,那至少还需要一倍的空间

    • 集群处理清求能力

    网络接口处理客户端流量的能力 若开启数据复制 ,磁盘吞吐量低和系统内存不足也会成为性能问题,可扩展多个broker来解决

    操作系统调优

    1.虚拟内存

  • vm.swappiness 值 设置的小一点。比如1.
  • 此参数指明了虚拟机的子系统将如何使用交换分区,而不是只把内存页从页面缓存里移除。

    内存页和磁盘之间的交换对kafka各方面的性能都有重大影响。kafka大量地使用系统页面缓存,如果虚拟内存被交换到磁盘,说明没有多条内存可以分配 给页面缓存了。

    避免内存交换的方法是不设置任何交换分区(交换分区不是必需的)

    • vm.dirty_background_ratio设为小于10的值

    此参数指的是系统内存的百分比,大部分情况为5即可

    不能设置为0,会导致内核频繁的刷新页面,从而降低内核为底层设置的磁盘写入提供缓冲的能力

    • vm.dirty_ratio 设置为大于20的值(系统内存的百分比)

    此参数可以增加被内核进程刷新到磁盘之前的脏页数量

    60~80是比较合理的区间,调整此参数会带来一些风险,包括未刷新磁盘操作的数量和同步 刷新引起的长时间I/O等待。

    如果该参数设置较高,建议启用kafka的复制功能,避免因系统崩溃造 成的数据丢失。

    在kafka运行期间检查脏页的数量,可在/proc/vmstat文件里查看

    #cat /proc/vmstat | egrep “dirty|writeback”

    nr_dirty 3875

    nr_writeback 29

    nr_writeback_temp0

    2.磁盘

    目前EXT4和XFS最为常见,EXT4可以做的很好,但需要更多的调优,存在较大的风险。 * EXT4引入了块分配延迟算法,一旦系统崩溃,更容易造成数据丢失和文件系统毁坏。 * XFS也使用分配延迟算法,不过比EXT4安全些。XFS为kafka提供 了更好的性能,无需额外的调优。批量磁盘写入具有更高的效率,提升整体的I/O吞吐量

    对挂载点的noatime参数进行合理设置

    文件元数据包含3个时间戳:创建时间(ctime),最后修改时间(mtime)和最后访问时间(atime),默认每次文件被读取都会更新atime,导致大量的磁盘写操作,所以完全可以把它禁用。

    3.网络

    默认系统内核没有针对 快速的大流量网络传输进行优化。

    • 分配给socket读写缓冲区提升网络的传输性能 ,对应的参数分别是net.core.wmem_default 和 net.core.rmem_default,合理的值 是131072(128K),读写缓冲区最大值 对应的参数分别是net.core.wmem_max和net.core.rmem_max,合理值是2097152(2MB)。
    • 设置TCP socket的读写缓冲区,参数分别是net.ipv4.tcp_wmem 和 net.ipv4.tcp_rmem,参数值由3个整数级成,用空格分隔,分别表示最小值,默认值和最大值。最大值不能大于net.core.wmem_max和net.core.rmem_max指定大小
    • net.ipv4.tcp_window_scaling设置为1,启用tcp时间窗扩展,提升客户端传输数据的效率,并可以服务器进行缓冲。
    • net.ipv4.tcp_max_syn_backlog设为比默认值 1024更大的值 ,可以接受更多的并发连接。
    • net.core.netdev_max_backlog设为比默值 1000更大的值有助于应对网络流量的爆发,特别是在使用千兆网络的情况 下,允许更多的数据包排除等待内核处理

    生产环境注意事项

    垃圾回收器选项

  • MaxGCPauseMillis: 每次垃圾回收默认停顿时间(200ms)
  • InitiatingHeapOccupancyPercent: 指定G1启动新一轮垃圾回收之前可以使用的堆内存百分比默认45
  • kafka对堆内存的使用率非常高,容易产生垃圾对象,所以可以把这些值 设置小一些,若一台服务器64G内存,且使用5GB堆内存来运行kafka,可配置MaxGCPauseMillis为20ms,InitiatingHeapOccupancyPercent为35,这样垃圾回收可以早一些。

    kafka的启动脚本未启用G1回收机器,而使用了Parallel New和CMS(Concurrent Mark-Sweep,并发票房和清除)垃圾回收器。可修改环境变量来修改:

    # export KAFKA_JVM_PERFORMANCE_OPTS="-server -XX:+UseG1GC
    -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapIccupancyPerCent=35
    -XX:+DisableExpilicitGC -Djava.awt.headless=true"

    数据中心布局

    最好把集群的broker安装在不同的机架上,至少不要让它们共享可能出现单点故障的基础设施,如电源和网络。随着时间的推移,机架也需要进行维护,而这会导致机器离线(移动机器或重新连接电源)

    kafka生产者

    创建kafka生产者

    创建一个生产者对象,有3个属性必选:

    • bootstrap.servers

    指定broker的地址清单,地址格式:host:port。建议至少要提供两个boker的信息。

    • key.serializer

    broker希望接收到的消息的键和值 都是字节数组。

    • value.serializer

    指定的类会将值 序列化。

    发送消息主要有以下3种方式 :

    • 发送并忘记(fire-and-forget)
    • 同步发送
    • 异步发送

    生产者的配置

    1.acks

    指定必须要有多少个分区副本收到消息,生产者才会认为消息写入是成功的

    若acks=0,生产者在写入消息之前不会等待任何来自服务器的响应。

    若acks=1,只要集群的首领节点收到消息,生产者就会收到一个来自服务器的成功响应。

    若acks=all,只有当所有参与复制的节点全部收到消息时,生产者才会收到一个成功响应

    2.buffer.memory

    用来设置生产者内存缓冲区的大小,生产者用它缓冲要发送到服务器的消息

    若应用程序发送消息的速度超过发送到服务器的速度 ,会导致生产者空间不足,导致 阻塞或异常

    取决于block.on.buffer.full参数(新版本换成,max.block.ms,表示抛出异常之前可阻塞一段时间)

    3.compression.type

    默认消息发送时不会被压缩,可设置为snappy,gzip或lz4

    snappy是Goole发明,占用较少CPU,提供较好的性能和压缩比

    Gzip会占用较多的CPU,会提供更高的压缩比

    4.retries

    生产者可以重发消息的次数,默认会在每次重试之间等待100ms,可通过retry.backoff.ms改变间隔

    5.batch.size

    指定一个批次可以使用的内存大小,按照字节数据计算

    半满批次或一个消息的批次均可能被发送,把批次大小设置很大,不会造成延迟,只会占更多内存,过小会频繁发送消息,增加额外开销

    6.linger.ms

    生产者在发送批次之前等待更多消息加入批次的时间。

    7.client.id

    可以是任意字符串,识别消息来源

    8.max.in.flight.requests.per.connection

    生产者在收到服务器响应之前可以发送多少个消息。

    值 越高,越占内存,不过也会提升吞吐量,设置为1,保证消息的顺序写入

    9.timeout.ms、request.timeout.ms和metadata.fetch.timeout.ms

    request.timeout.ms 生产者发送数据时等待服务器返回响应时间

    metadata.fetch.timeout.ms 生产者在获取元数据时等待时间与asks配置相匹配

    10.max.block.ms

    获取元数据时生产者的阻塞时间

    11.max.request.size

    控制生产者发送的请求大小

    12.receive.buffer.bytes和send.buffer.bytes

    分别指定TCP socker接收和发送数据包的缓存大小

    设置为-1,则使用系统默认值

    序列化器

    Json,Avro,Thrift或Protobuf


    分享到:


    相關文章: