kafka最新版 3.1 集群部署测试

        Kafka 是一种高吞吐的分布式消息系统,能够替代传统的消息队列用于解耦合数据处理,缓存未处理消息等,同时具有更高的吞吐率,支持分区、多副本、冗余,因此被广泛用于大规模消息数据处理应用。

        关于kafka的信息可以参考官方文档: https://kafka.apache.org/documentation/ 

        

        虚拟机环境,3台服务器,对应ip:

        192.168.142.128

        192.168.142.129

        192.168.142.130

        先把主机名改掉, 分别到3台机器上改自己的

##192.168.142.128
[root@localhost ~]# hostnamectl set-hostname kafka1
 
##192.168.142.129
[root@localhost ~]# hostnamectl set-hostname kafka2
 
##192.168.142.130
[root@localhost ~]# hostnamectl set-hostname kafka3

        关掉防火墙

[root@kafka1 ~]# systemctl stop firewalld.service
[root@kafka1 ~]# systemctl disable firewalld.service

[root@kafka2 ~]# systemctl stop firewalld.service
[root@kafka2 ~]# systemctl disable firewalld.service

[root@kafka3 ~]# systemctl stop firewalld.service
[root@kafka3 ~]# systemctl disable firewalld.service


        下载kafka,并修改配置文件

##3台服务器都需处理
[root@kafka1  ~]# wget -O kafka_2.13-3.1.0.tgz  https://dlcdn.apache.org/kafka/3.1.0/kafka_2.13-3.1.0.tgz
[root@kafka1  ~]# tar -zxvf kafka_2.13-3.1.0.tgz
[root@kafka1  ~]# mv kafka_2.13-3.1.0/ /usr/local/kafka
[root@kafka1  ~]# cd /usr/local/kafka/
[root@kafka1 kafka]#
[root@kafka1 kafka]# ls
bin  config  libs  LICENSE  licenses  NOTICE  site-docs

        配置zookeeper集群,修改配置文件zookeeper.properties

#每个节点都要执行

[root@kafka1 kafka]# vim config/zookeeper.properties 
dataDir=/data/zookeeper
clientPort=2181
tickTime=2000
initLimit=20
syncLimit=10
server.1=192.168.142.128:2182:2183
server.2=192.168.142.129:2182:2183
server.3=192.168.142.130:2182:2183

[root@kafka2 kafka]# vim config/zookeeper.properties 
dataDir=/data/zookeeper
clientPort=2181
tickTime=2000
initLimit=20
syncLimit=10
server.1=192.168.142.128:2182:2183
server.2=192.168.142.129:2182:2183
server.3=192.168.142.130:2182:2183

[root@kafka3 kafka]# vim config/zookeeper.properties 
dataDir=/data/zookeeper
clientPort=2181
tickTime=2000
initLimit=20
syncLimit=10
server.1=192.168.142.128:2182:2183
server.2=192.168.142.129:2182:2183
server.3=192.168.142.130:2182:2183

       tickTime=2000为zk的基本时间单元,毫秒
        initLimit=10Leader-Follower初始通信时限(tickTime*10)
        syncLimit=5Leader-Follower同步通信时限(tickTime*5)
        server.实例集群标识=实例地址:数据通信端口:选举通信端口

        tickTime: 这个时间是作为 Zookeeper 服务器之间或客户端与服务器之间维持心跳的时间间隔,也就是每个 tickTime 时间就会发送一个心跳

        initLimit:这个配置项是用来配置 Zookeeper 接受客户端(这里所说的客户端不是用户连接 Zookeeper 服务器的客户端,而是 Zookeeper 服务器集群中连接到 Leader 的 Follower 服务器)初始化连接时最长能忍受多少个心跳时间间隔数。当已经超过 5个心跳的时间(也就是 tickTime)长度后 Zookeeper 服务器还没有收到客户端的返回信息,那么表明这个客户端连接失败。总的时间长度就是 5*2000=10 秒

        syncLimit:这个配置项标识 Leader 与Follower 之间发送消息,请求和应答时间长度,最长不能超过多少个 tickTime 的时间长度,总的时间长度就是5*2000=10秒

        clientPort:这个端口就是客户端连接 Zookeeper 服务器的端口,Zookeeper 会监听这个端口,接受客户端的访问请求

        server.1 这个1是服务器的标识也可以是其他的数字, 表示这个是第几号服务器,用来标识服务器,这个标识要写到快照目录下面myid文件里

        192.168.142.128为集群里的IP地址,第一个端口是master和slave之间的通信端口,默认是2182,第二个端口是leader选举的端口,集群刚启动的时候选举或者leader挂掉之后进行新的选举的端口默认是2183)


        创建zookeeper所需的目录

        分别在3个机器上执行,为实例添加集群标识:

第一台:192.168.142.128

[root@kafka1 ~]# mkdir -p /data/zookeeper
[root@kafka1 ~]# echo "1" > /data/zookeeper/myid

第二台:192.168.142.129

[root@kafka2 ~]# mkdir -p /data/zookeeper
[root@kafka2 ~]# echo "2" > /data/zookeeper/myid

第三台:192.168.142.130

[root@kafka3 ~]# mkdir -p /data/zookeeper
[root@kafka3 ~]# echo "3" > /data/zookeeper/myid


        启动集群服务Zookeeper

##192.168.142.128 kafka1
[root@kafka1 kafka]# bin/zookeeper-server-start.sh config/zookeeper.properties		

##192.168.142.129 kafka2
[root@kafka2 kafka]# bin/zookeeper-server-start.sh config/zookeeper.properties		

##192.168.142.130 kafka3
[root@kafka3 kafka]# bin/zookeeper-server-start.sh config/zookeeper.properties		

        效果图

冷暖自知一抹茶ck


        配置Kafka集群环境

        Kafka集群节点>=2时便可对外提供高可用服务


        1)修改Kafka配置文件config/server.properties

        注意:注释掉所有节点的broker.id

root@kafka1 kafka]# vim config/server.properties
log.dirs=/data/kafka-logs
#broker.id=0
zookeeper.connect=kafka1:2181,kafka2:2181,kafka3:2181
listeners=PLAINTEXT://192.168.142.128:9092
advertised.listeners=PLAINTEXT://192.168.142.129:9092

[root@kafka2 kafka]# vim config/server.properties
log.dirs=/data/kafka-logs
#broker.id=0
zookeeper.connect=kafka1:2181,kafka2:2181,kafka3:2181
listeners=PLAINTEXT://192.168.142.129:9092
advertised.listeners=PLAINTEXT://192.168.142.129:9092

[root@kafka3 kafka]# vim config/server.properties
log.dirs=/data/kafka-logs
#broker.id=0
zookeeper.connect=kafka1:2181,kafka2:2181,kafka3:2181
listeners=PLAINTEXT://192.168.142.130:9092
advertised.listeners=PLAINTEXT://192.168.142.130:9092

        创建日志目录

[root@kafka1 kafka]# mkdir -p /data/kafka-logs

[root@kafka2 kafka]# mkdir -p /data/kafka-logs

[root@kafka3 kafka]# mkdir -p /data/kafka-logs

        分别在3台机器上启动kafka

[root@kafka1 kafka]# bin/kafka-server-start.sh config/server.properties

[root@kafka2 kafka]# bin/kafka-server-start.sh config/server.properties

[root@kafka3 kafka]# bin/kafka-server-start.sh config/server.properties

        创建topic测试

[root@kafka1 kafka]#  bin/kafka-topics.sh --create --bootstrap-server kafka1:9092 --replication-factor 3 --partitions 1 --topic my-topic
Created topic my-topic.

        --replication-factor 2:副本集数量 

        --partition 4:分区数

        查看所有的topic:

[root@kafka1 kafka]# bin/kafka-topics.sh --list  --bootstrap-server kafka1:9092
tp1
my-topic

        模拟一个数据:

[root@kafka3 kafka]# bin/kafka-console-producer.sh --broker-list kafka1:9092 --topic my-topic
>message two 
>1111111
>222
>333
>444
>

        起一个消费者:

[root@kafka1 kafka]# bin/kafka-console-consumer.sh --bootstrap-server kafka1:9092 --topic my-topic --from-beginning
1111
222
222
444
测试数据
哈哈

        查看哪个borke【kafka服务器】在工作

        可以描述Topic分区数/副本数/副本Leader/副本ISR等信息:

[root@kafka2 kafka]# bin/kafka-topics.sh --describe --bootstrap-server kafka1:9092 --topic my-topic
Topic: my-topic	TopicId: FthKwxf8TjKb-U8VZ7cztQ	PartitionCount: 1	ReplicationFactor: 3	Configs: segment.bytes=1073741824
	Topic: my-topic	Partition: 0	Leader: 1002	Replicas: 1002,1001,1003	Isr: 1002,1001,1003

        leader:是负责给定分区所有读写操作的节点。每个节点都是随机选择的部分分区的领导者。(哪个broker在读写“leader”)

        replicas:是复制分区日志的节点列表,不管这些节点是leader还是仅仅活着(当前可以正常工作的kafka集群。当leader挂掉时会自动替补)

        isr:是一组“同步”replicas,是replicas列表的子集,它活着并被指到leader(同步消息的列表集合)


        删除Topic

        注意,只是删除Topic在zk的元数据,日志数据仍需手动删除。

bin/kafka-topics.sh --bootstrap-server kafka1:9092  --delete --topic my-topic



        消费者

        新模式,offset存储在borker
        --new-consumer Use new consumer. This is the default.
        --bootstrap-server <server to connectto> REQUIRED (unless old consumer is used): The server to connect to.
        老消费模式,offset存储在zk
        --zookeeper <urls> REQUIRED (only when using old consumer): The connection string for the zookeeper connection in the form host:port. Multiple URLS can be given to allow fail-over.






        【待测】

        到这里kafka集群已经ok

        把zookeeper和kafka做成系统服务并开机自动启动(可以把之前在终端执行的启动命令关了,不然起不来)

        在3台机器上分别运行

[root@kafka1 ~]# mkdir /etc/cluster/

        创建zookeeper的service文件

[root@kafka1 cluster]# cat /etc/cluster/zookeeper.service 
 
[Unit]
Description=zookeeper
 
[Service]
ExecStart=/usr/local/kafka/bin/zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.properties
SyslogIdentifier=zookeeper
 
[Install]
WantedBy=multi-user.target

        加为系统服务并开机启动

[root@kafka1 cluster]# ln -s /etc/cluster/zookeeper.service /lib/systemd/system
 
[root@kafka1 cluster]# systemctl start zookeeper
 
### 开机启动:
[root@kafka1 cluster]# systemctl enable zookeeper

        设置kafka系统服务并开机启动

[root@kafka1 cluster]# cat /etc/cluster/kafka.service 
 
[Unit]
Description=kafka
 
[Service]
ExecStart=/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties 
SyslogIdentifier=kafka
 
[Install]
WantedBy=multi-user.target

        设置开机启动

[root@kafka1 cluster]# ln -s /etc/cluster/kafka.service /lib/systemd/system/
[root@kafka1 cluster]# systemctl start kafka
[root@kafka1 cluster]# systemctl enable kafka

        查看服务状态:

[root@kafka1 cluster]# systemctl status kafka
● kafka.service - kafka
   Loaded: loaded (/etc/cluster/kafka.service; enabled; vendor preset: disabled)
   Active: active (running) since Sun 2021-12-05 20:49:52 EST; 45min ago
 Main PID: 138011 (java)
    Tasks: 74 (limit: 48706)
   Memory: 383.6M

        查看服务日志:

[root@kafka1 cluster]# journalctl -u kafka -f



参考:

        https://blog.csdn.net/weixin_30878501/article/details/99027529

冷暖自知一抹茶ck
请先登录后发表评论
  • 最新评论
  • 总共0条评论