本文共 3211 字,大约阅读时间需要 10 分钟。
Zookeepr版本 3.4.10
首先安装java
关闭防火墙 selinux
首先搭建zookeeper集群
创建/zookeep /zookeeper/zkdate /zookeeper/zkdatalog(安装目录,快照目录,日志目录)
把 下载的zookeeper包解压至/zookee下 修改配置文件
进入conf目录 把zoo_sample.cfg 拷贝一份 改名zoo.cfg
修改zoo.cfg
配置端口
clientPort=12181(自由选择端口)
添加两行
dataDir=/zookeeper/zookeeper/zkdata (快照目录)
datalogDir=/zookeeper/zookeeper/zkdatalog (日志目录)
添加
server.1=172.16.81.29:2888:3888 (1是唯一标记 和myid里的内容呼应 后面分别是集群相互联系端口 和 选举端口)
server.2=172.16.81.28:2888:3888
在zkdata下创建myid文件
内容为本机标记
此例为1
在/zookeeper/zookeeper/bin
执行zkServer.sh
后面跟 start 启动
{start|start-foreground|stop|restart|status|upgrade|print-cmd}
后面跟不同命令 有不同操作
配置文件优化项
tickTime=2000 时间单位 后面的时间都是这个数的倍数 单位是毫秒 2000 就是2秒
initLimit=10 集群启动时间 超过时间没启动就认为集群失败
syncLimit=5 心跳时间 超过未响应leader认为此机器宕机
ZK集群不会清理日志和快照文件 时间一长会非常多 建议增加清理功能或自己写脚本
autopurge.purgeInterval=1 这个参数指定了清理频率,单位是小时,需要填写一个1或更大的整数,默认是0,表示不开启自己清理功能。
autopurge.snapRetainCount=1 这个参数和上面的参数搭配使用,这个参数指定了需要保留的文件数目。默认是保留3个。
Kafka版本2.11.0.11.0.0
搭建kafka集群
创建 kafka目录
/kafka
把包解压到此目录下
创建/kafka/kafkalogs 存放kafka的消息
进入config目录下
修改server.properties配置文件
修改 broker.id=0 数字在集群中是唯一的
port=9092 (服务端口)
添加 host.name=172.16.81.29 本机IP(不加也可以。防止DNS解析错误)
修改为指定目录 log.dirs=/kafka/kafka/kafkalogs
添加
message.max.bytes=102400 (消息最大大小)
default.replication.factor=2 (默认副本数量)
replica.fetch.max.bytes=102400
修改zookee集群的地址和端口 使用逗号分隔
zookeeper.connect=172.16.81.29:12181,172.16.81.28:12181
Consumer机器还要配置consumer.properties
修改group.id=dealgroup (归属集群名字)
bin/kafka-server-start.sh config/server.properties
意为启动kafka集群 配置文件用 config/server.properties
一台机器上配置两个kafka
注意 broke.id listeners=PLAINTEXT://192.168.1.1:9093 port=9093 三个数据都要改
优化配置项 (按需要 有改 无添加)
num.network.threads =4 broker处理消息的最大线程数,一般情况下数量为cpu核数
num.io.threads =8 broker处理磁盘IO的线程数,数值为cpu核数2倍
queued.max.requests =500 等待IO线程处理的请求队列最大数,若是等待IO的请求超过这个数值,那么会停止接受外部消息
socket.send.buffer.bytes=100*1024 socket的发送缓冲区 不要太小 避免频繁操作
socket.receive.buffer.bytes =100*1024 接受缓冲区 不要太小
socket.request.max.bytes =100*1024*1024 socket请求的最大数值 message.max.bytes必然要小于socket.request.max.bytes,会被topic创建时的指定参数覆盖
log.segment.bytes =1024*1024*1024 topic的分区是以一堆segment文件存储的,这个控制每个segment的大小,会被topic创建时的指定参数覆盖
log.roll.hours =24*7这个参数设置在日志segment没有达到log.segment.bytes设置的大小,但是时间到了 也会强制新建一个segment 会被 topic创建时的指定参数覆盖
log.retention.hours=24
数据文件保留多长时间, 存储的最大时间超过这个时间会根据log.cleanup.policy设置数据清除策略
log.retention.bytes=-1 topic每个分区的最大文件大小,一个topic的大小限制 = 分区数*log.retention.bytes。-1没有大小限 单位比特
log.retention.check.interval.ms=5minutes 文件大小检查的周期时间 检查到期的按策略执行
log.cleaner.enable=false 是否开启日志清理
log.flush.scheduler.interval.ms =3000 检查是否需要固化到硬盘的时间间隔
auto.create.topics.enable =true 是否允许自动创建topic,若是false,就需要通过命令创建
topic每个topic的默认分区个数,若是在topic创建时候没有指定的话会被topic创建时的指定参数覆盖
num.partitions =1
命令
创建一个叫做“test”的topic在116.62.143.68上,它只有一个分区,一个副本
bin/kafka-topics.sh --create --zookeeper 116.62.143.68:12181 --replication-factor 1 --partitions 1 --topic test
查看116.62.143.68上的topic
bin/kafka-topics.sh --list --zookeeper 116.62.143.68:12181
运行在114.62.143.68上的发布者(producer)并在控制台中输一些消息,这些消息将被发送到服务端
bin/kafka-console-producer.sh --broker-list116.62.143.68:9092 --topic test
在订阅者端订阅116.62.143.68 读取消息并输出到标准输出
bin/kafka-console-consumer.sh --zookeeper 116.62.143.68:12181 --topic test --from-beginning
--from-beginning意为从第一行开始读 不加即从当前开始
转载地址:http://scdrf.baihongyu.com/