部署zookeeper+kafka集群

前提条件
1、部署Kafka集群搭建需要服务器至少3台,奇数台
zookeeper有这样一个特性:集群中只要有过半的机器是正常工作的,那么整个集群对外就是可用的。采用奇数个的节点主要是出于两方面的考虑,一是防止由脑裂造成的集群不可用,二是在容错能力相同的情况下,奇数台更节省资源。
2、Kafka的安装需要java环境,jdk1.8.0版本或以上
3、zookeeper软件版本: apache-zookeeper-3.5.9-bin.tar.gz
4、kafka软件版本: kafka_2.13-2.8.0.tgz
5、/etc/hosts主机解析, 3个节点主机kafka1,kafka2, kafka3 ssh免密码能互相登录

(1)、 部署zookeeper集群
1、下载 apache-zookeeper-3.5.9-bin.tar.gz 并解压到 /usr/local/zookeeper
bin目录: 存放的可执行文件,zookeeper命令都在bin目录下
conf目录: 存放的配置文件。
contrib目录: 存放扩展包。
docs目录: 存放文档。
lib目录: 存放jar包,Zookeeper是使用Java开发的,会用到很多jar包,都存放在lib目录下。
src目录: 存放源码。

2、配置zookeeper
创建zookeeper目录
# mkdir /usr/local/zookeeper/zkdata
# mkdir /usr/local/zookeeper/zkdatalog
修改zookeeper配置文件
# cp /usr/local/zookeeper/conf/zoo_sample.cfg /usr/local/zookeeper/conf/zoo.cfg
编辑 /usr/local/zookeeper/conf/zoo.cfg 配置文件
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/usr/local/zookeeper/zkdata
dataLogDir=/usr/local/zookeeper/zkdatalog
clientPort=2181
server.1=kafka1:2888:3888
server.2=kafka2:2888:3888
server.3=kafka3:2888:3888
三台机器上的zoo.cfg文件配置相同,data.Dir 为zookeeper的数据目录,server.1、server.2、server.3 为集群信息。
2888端口号是zookeeper服务之间通信的端口。
3888端口是zookeeper与其他应用程序通信的端口。
tickTime:CS通信心跳数
Zookeeper 服务器之间或客户端与服务器之间维持心跳的时间间隔,也就是每个 tickTime 时间就会发送一个心跳。
tickTime 以毫秒为单位。
tickTime:该参数用来定义心跳的间隔时间,zookeeper的客户端和服务端之间也有和web开发里类似的session的概念,而zookeeper里最小的session过期时间就是tickTime的两倍。
initLimit:LF初始通信时限。
集群中的follower服务器(F)与leader服务器(L)之间 初始连接 时能容忍的最多心跳数(tickTime的数量)
syncLimit:LF同步通信时限
集群中的follower服务器(F)与leader服务器(L)之间 请求和应答 之间能容忍的最多心跳数(tickTime的数量)
创建myid文件:进入/usr/local/zookeeper/zkdata目录,创建myid文件,三台服务器上的myid文件内容分别为1,2,3, myid是zookeeper集群用来发现彼此的标识,必须创建,且不能相同。

配置zookeeper环境变量 在/etc/profile文件末尾添加
ZOOKEEPER_HOME=/usr/local/zookeeper
PATH=$ZOOKEEPER_HOME/bin:$PATH
export ZOOKEEPER_HOME
保存文件后执行 source /etc/profile 使得zookeeper环境变量生效。

启动zookeeper
# zkServer.sh start
/usr/bin/java
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED

3、zookeeper 基本操作
登入zookeeper
# zkCli.sh -server 127.0.0.1:2181

3.1、使用 ls 命令来查看当前 ZooKeeper 中所包含的内容:
[zk: 127.0.0.1:2181(CONNECTED) 2] ls /
[admin, brokers, cluster, config, consumers, controller, controller_epoch, feature, isr_change_notification, latest_producer_id_block, log_dir_event_notification, zookeeper]

3.2、创建一个新的 znode
[zk: 127.0.0.1:2181(CONNECTED) 3] create /zk Zkdata //这个命令创建了一个新的节点“ zk ”以及与它关联的字符串"Zkdata"
Created /zk

3.3、再次使用 ls 命令来查看现在 zookeeper 中所包含的内容:
[zk: 127.0.0.1:2181(CONNECTED) 8] ls /
[admin, brokers, cluster, config, consumers, controller, controller_epoch, feature, isr_change_notification, latest_producer_id_block, log_dir_event_notification, zk, zookeeper]
我们看到zk 节点已经被创建。

3.4、查看 znode 中的字符串
[zk: 127.0.0.1:2181(CONNECTED) 9] get /zk
Zkdata

3.5、修改znode 所关联的字符内容
[zk: 127.0.0.1:2181(CONNECTED) 10] set /zk DAKAKA
[zk: 127.0.0.1:2181(CONNECTED) 11] get /zk
DAKAKA

3.6、删除Znode节点
[zk: 127.0.0.1:2181(CONNECTED) 12] delete /zk
[zk: 127.0.0.1:2181(CONNECTED) 13] ls /
[admin, brokers, cluster, config, consumers, controller, controller_epoch, feature, isr_change_notification, latest_producer_id_block, log_dir_event_notification, zookeeper]
我们看到zk节点被删除了

3.7、使用Zookeeper删除Kafka中的topic
[zk: 127.0.0.1:2181(CONNECTED) 25] ls /brokers/topics
[__consumer_offsets, test]
[zk: 127.0.0.1:2181(CONNECTED) 26] deleteall /brokers/topics/test
[zk: 127.0.0.1:2181(CONNECTED) 27] ls /brokers/topics
[__consumer_offsets]

(2)、部署kafka集群
kafka1, kafka2, kafka3 三个节点均需要配置kafka
解压kafka_2.13-2.8.0.tgz 软件包到/usr/local/kafka/
1、进入目录:/usr/local/kafka/ 创建kafka日志数据目录:mkdir kafkalog
2、修改kafka配置文件/usr/local/kafka/config/server.properties

broker.id=0 // broker.id的值三个节点要配置不同的值,分别配置为0,1,2
advertised.host.name=kafka1 // 在/etc/hosts中配置kafka1, kafka2,kafka3 主机解析,此处是在kafka1主机上的配置,此处就写kafka1主机的name
advertised.port=9092 // 默认端口,不需要改
log.dirs=/usr/local/kafka/kafkalog // kafka日志数据目录
zookeeper.connect=kafka1:2181,kafka2:2181,kafka3:2181 // zookeeper连接地址,多个以逗号隔开
zookeeper.connection.timeout.ms=30000 // zookeeper连接超时时间
group.initial.rebalance.delay.ms=0 // Kafka空消费组延时rebalance
auto.leader.rebalance.enable=true // leader 均衡机制

配置kafka环境变量,编辑/etc/profile文件
ZOOKEEPER_HOME=/usr/local/zookeeper
KAFKA_HOME=/usr/local/kafka
PATH=$ZOOKEEPER_HOME/bin:$KAFKA_HOME/bin:$PATH
export ZOOKEEPER_HOME KAFKA_HOME
保存文件后执行 source /etc/profile 使得kafka环境变量生效。

启动kafka集群:
nohup kafka-server-start.sh /usr/local/kafka/config/server.properties &>> /usr/local/kafka/logs/kafka.log &

(3)、kafka集群测试
在kafka1主机上创建topic名称YANG, 分区数为3,副本数2
# kafka-topics.sh --create --zookeeper kafka1:3181,kafka2:2181,kafka3:2181 --partitions 3 --replication-factor 2 --topic YANG
Created topic YANG.

列出已创建的topic列表
# kafka-topics.sh --list --zookeeper localhost:2181
YANG
__consumer_offsets

查看topic的详细情况
# kafka-topics.sh --describe --zookeeper kafka1:2181,kafka2:2181,kafka3:2181
Topic: YANG TopicId: jYufRKC0S-GkzV8atEliEA PartitionCount: 3 ReplicationFactor: 2 Configs:
Topic: YANG Partition: 0 Leader: 1 Replicas: 1,2 Isr: 1,2
Topic: YANG Partition: 1 Leader: 2 Replicas: 2,0 Isr: 2,0
Topic: YANG Partition: 2 Leader: 0 Replicas: 0,1 Isr: 0,1
...
解释
1 Partition: 分区
2 Leader : 负责读写指定分区的节点
3 Replicas : 复制该分区log的节点列表
4 Isr : “in-sync” replicas,当前活跃的副本列表(是一个子集),并且可能成为Leader 。

在kafka1 broker上的topic YANG上生产消息
模拟客户端去发送消息

# kafka-console-producer.sh --broker-list kafka1:9092,kafka2:9092,kafka3:9092 --topic YANG // CTRL +D 完成消息创建
>Hello, My name is yang.
>what's your name?
>[root@node1 ~]#

在kafka2, kafka3 主机上模拟客户端去接受消息
# kafka-console-consumer.sh --bootstrap-server kafka2:9092 --topic YANG --from-beginning
Hello, My name is yang.
what's your name?
说明: 新版本中丢弃 --zookeeper 这种启动方式, 使用--bootstrap-server来替代它。

故障测试

停止 kafka1主机上的kafka服务
在kafka2主机上模拟客户端去发送消息
# kafka-console-producer.sh --broker-list kafka1:9092,kafka2:9092,kafka3:9092 --topic YANG
>123
>456
>

在 kafka3 主机上模拟客户端去接受消息
# kafka-console-consumer.sh --bootstrap-server kafka3:9092 --topic YANG --from-beginning
Hello, My name is yang.
what's your name??
123
456

至此kafka集群故障配置完成。

(4)、kafka常用命令

1、删除一个topic
# kafka-topics.sh --delete --zookeeper kafka1:2181 --topic YANG

说明:
配置文件server.properties中必须设置delete.topic.enable=true,否则只会标记为删除,而不是真正删除。
执行此脚本的时候,topic的数据会同时被删除。如果由于某些原因导致topic的数据不能完全删除(如其中一个broker down了),此时topic只会被marked for deletion,而不会真正删除。此时创建同名的topic会有冲突。

2、修改topic

修改分区数量:
# kafka-topics.sh --alter --zookeeper kafka1:2181 --topic YANG --partitions 4

增加、修改或者删除一个配置参数:
# kafka-topics.sh —alter --zookeeper kafka1:2181 --topic YANG --config key=value
# kafka-topics.sh —alter --zookeeper 192.168.1.11:2181/kafka --topic YANG --deleteConfig key

相关新闻

联系我们

全国服务热线

400-033-9553

电子邮件:admin@example.com
工作时间:09:00-17:00 周一至周五

在线客服
关注微信
关注微信
分享本页
返回顶部