Kafka 集群部署(KRaft)
版本说明
- Ubuntu 18.04.6
- Kafka 3.6.0
- JDK8
集群配置
操作系统 | ip | 域名 | Kafka Broker 端口 | Kafka Controller 端口 |
---|---|---|---|---|
Ubuntu 18.04.6 | 192.168.50.131 | kafka1.com | 9092 | 9093 |
Ubuntu 18.04.6 | 192.168.50.132 | kafka2.com | 9092 | 9093 |
Ubuntu 18.04.6 | 192.168.50.133 | kafka3.com | 9092 | 9093 |
安装 vim, curl
1 |
|
配置静态 ip 和 hosts
为了使用域名,更加方便的进行配置,这里将虚拟机的 DHCP 改成了静态分配
IP,所以需要手动设置一下每台机器 IP 地址,这里以
192.168.50.131
为例。
找到网络接口名称,运行以下命令:
1
ip addr
查找以
ens
或eth
开头的接口名称。例如,ens33
或eth0
。1
2
3
4
5
6
7
8
9
10
11
12
13hedon@ubuntu:~$ ip addr
1: lo: <LOOPBACK,UP,LOWER_UP> mtu 65536 qdisc noqueue state UNKNOWN group default qlen 1000
link/loopback 00:00:00:00:00:00 brd 00:00:00:00:00:00
inet 127.0.0.1/8 scope host lo
valid_lft forever preferred_lft forever
inet6 ::1/128 scope host
valid_lft forever preferred_lft forever
2: ens33: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc fq_codel state UP group default qlen 1000
link/ether 00:0c:29:82:9e:69 brd ff:ff:ff:ff:ff:ff
inet 192.168.50.133/24 brd 192.168.50.255 scope global dynamic noprefixroute ens33
valid_lft 1644sec preferred_lft 1644sec
inet6 fe80::c367:c7cc:3ad4:23b3/64 scope link
valid_lft forever preferred_lft forever可以找到
ens33
,其中inet 192.168.50.133/24
表示 IP 地址为192.168.50.133
,子网掩码为/24
(等于255.255.255.0
)。这个 IP 地址是 DHCP 动态分配的,说明宿主机分配给虚拟机的 IP 范围就在
192.168.50.xxx
,所以我们会将静态 IP 配置在这个范围内。获取网关地址
1
ip route | grep default
输出:
1
2hedon@ubuntu:~$ ip route | grep default
default via 192.168.50.2 dev ens33 proto dhcp metric 100说明默认网关是
192.168.50.2
,编辑
/etc/network/interfaces
文件,配置静态 IP 地址,内容如下:1
2
3
4
5
6auto ens33
iface ens33 inet static
address 192.168.50.131
netmask 255.255.255.0
gateway 192.168.50.2
dns-nameservers 8.8.8.8 8.8.4.4重启
1
su reboot
再次查看 ip 地址
1
ip addr
有以下输出便说明静态 IP 配置成功了。
1
2
3
4
5
6
7
8
9
10
11
121: lo: <LOOPBACK,UP,LOWER_UP> mtu 65536 qdisc noqueue state UNKNOWN group default qlen 1000
link/loopback 00:00:00:00:00:00 brd 00:00:00:00:00:00
inet 127.0.0.1/8 scope host lo
valid_lft forever preferred_lft forever
inet6 ::1/128 scope host
valid_lft forever preferred_lft forever
2: ens33: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc fq_codel state UP group default qlen 1000
link/ether 00:0c:29:82:9e:69 brd ff:ff:ff:ff:ff:ff
inet 192.168.50.131/24 brd 192.168.50.255 scope global ens33
valid_lft forever preferred_lft forever
inet6 fe80::20c:29ff:fe82:9e69/64 scope link
valid_lft forever preferred_lft forever配置域名
1
sudo vim /etc/hosts
追加内容如下:
1
2
3192.168.50.131 kafka1.com
192.168.50.132 kafka2.com
192.168.50.133 kafka3.comping 一下
1
2
3
4
5
6
7
8
9hedon@ubuntu:~$ ping kafka1.com
PING kafka1.com (192.168.50.131) 56(84) bytes of data.
64 bytes from kafka1.com (192.168.50.131): icmp_seq=1 ttl=64 time=0.024 ms
64 bytes from kafka1.com (192.168.50.131): icmp_seq=2 ttl=64 time=0.021 ms
64 bytes from kafka1.com (192.168.50.131): icmp_seq=3 ttl=64 time=0.029 ms
^C
--- kafka1.com ping statistics ---
3 packets transmitted, 3 received, 0% packet loss, time 2029ms
rtt min/avg/max/mdev = 0.021/0.024/0.029/0.006 msping 一下百度,看看能不能访问外网
1
2hedon@ubuntu:~$ ping baidu.com
ping: baidu.com: Name or service not known如果这里可以访问,则直接跳过进入下一步,不可以的话,需要配置一下域名解析系统。
配置域名解析系统
Ubuntu 系统使用
systemd-resolved
服务来管理 DNS,你可以在/etc/systemd/resolved.conf
文件中进行 DNS 配置。1
sudo vim /etc/systemd/resolved.conf
取消或添加
DNS
的注释,并修改为:1
2[Resolve]
DNS=8.8.8.8 8.8.4.4重启启动
systemd-resolved
:1
sudo systemctl restart systemd-resolved
再尝试 ping 一下百度:
1
2
3
4
5
6
7
8
9
10hedon@ubuntu:~$ ping www.baidu.com
PING www.a.shifen.com (153.3.238.110) 56(84) bytes of data.
64 bytes from 153.3.238.110 (153.3.238.110): icmp_seq=1 ttl=128 time=15.9 ms
64 bytes from 153.3.238.110 (153.3.238.110): icmp_seq=2 ttl=128 time=15.9 ms
64 bytes from 153.3.238.110 (153.3.238.110): icmp_seq=3 ttl=128 time=16.1 ms
64 bytes from 153.3.238.110 (153.3.238.110): icmp_seq=4 ttl=128 time=15.3 ms
^C
--- www.a.shifen.com ping statistics ---
4 packets transmitted, 4 received, 0% packet loss, time 14104ms
rtt min/avg/max/mdev = 15.368/15.850/16.145/0.291 ms
补充说明:/etc/network/interfaces
文件的配置
这是一个用于配置 Linux 系统上网络接口的文件。在这个示例中,我们为名为ens33
的网络接口配置了静态 IP地址和相关的网络设置。下面是各行的解释:
auto ens33
: 这一行表示在系统启动时自动激活ens33
网络接口。auto
关键字后面跟着接口名称。iface ens33 inet static
: 这一行定义了ens33
网络接口的配置。iface
关键字后面跟着接口名称,inet
表示我们正在配置 IPv4地址,static
表示我们要为接口分配一个静态 IP地址(而不是通过 DHCP 获得)。address 192.168.50.131
: 这一行设置了网络接口的静态IP 地址。在这个例子中,我们为ens33
接口分配了192.168.50.131
IP 地址。IP 地址是 Internet协议(IP)用于在网络中唯一标识设备的数字标签。每个连接到网络的设备都需要一个唯一的IP 地址,以便其他设备可以找到并与之通信。IP 地址通常分为两种版本:IPv4和 IPv6。在此示例中,我们使用了一个 IPv4 地址。
netmask 255.255.255.0
:这一行定义了子网掩码。在这个例子中,子网掩码是255.255.255.0
,表示前三个字节(24位)是网络地址,最后一个字节(8 位)是主机地址。子网掩码用于划分 IP 地址的网络部分和主机部分。子网掩码与 IP地址进行按位与操作,从而得到网络地址。这有助于确定哪些 IP地址属于同一子网,以便正确地将数据包路由到目的地。子网划分有助于组织网络、提高安全性和管理性。
gateway 192.168.50.2
:这一行设置了默认网关。在这个例子中,我们将默认网关设置为192.168.50.2
。默认网关是用于将数据包发送到其他网络的路由器或设备的IP 地址。网关是一个充当网络中数据包传输的中继点的设备,通常是一个路由器。当一个设备需要将数据包发送到不同子网的另一个设备时,它会将数据包发送到网关。网关负责将数据包路由到正确的目的地。默认网关是设备用于将数据包发送到其他网络的首选网关。
dns-nameservers 8.8.8.8 8.8.4.4
: 这一行指定了 DNS服务器的 IP 地址。在这个例子中,我们使用了谷歌的公共 DNS 服务器8.8.8.8
和8.8.4.4
。DNS服务器用于将主机名解析为 IP 地址。域名系统(DNS)是将人类可读的域名(例如 www.baidu.com)IP地址的系统。DNS服务器是负责执行此解析过程的服务器。当您在浏览器中输入一个网址时,计算机会向DNS 服务器查询该域名对应的 IP 地址,然后将请求发送到该 IP地址以获取网页内容。
配置文件中的这些设置将在系统启动时生效。要立即应用更改,您可以使用以下命令重启网络服务:
1 |
|
安装 jdk
1 |
|
验证 java8 是否已经安装成功:
1 |
|
有以下类似输出的话则表明安装成功:
1 |
|
安装 Kafka
下载并解压 Kafka
1
2wget https://archive.apache.org/dist/kafka/3.6.0/kafka_2.13-3.6.0.tgz
tar -zxvf kafka_2.13-3.6.0.tgz将解压缩后的文件夹移动到
/opt
目录中:1
sudo mv kafka_2.13-3.6.0 /opt/kafka-3.6.0
使用 Kafka 提供的脚本生成一个 ClusterID
1
export KAFKA_CLUSTER_ID="$(/opt/kafka-3.6.0/bin/kafka-storage.sh random-uuid)"
输出 ClusterID
1
2hedon@ubuntu:/opt/kafka-3.6.0$ echo $KAFKA_CLUSTER_ID
XiMRcbJ-QEO694L7sfDdBQ在其他节点上将
KAFKA_CLUSTER_ID
设置为上面的值:1
export KAFKA_CLUSTER_ID=XiMRcbJ-QEO694L7sfDdBQ
备份配置文件,注意这里的配置文件是
config/kraft/server.properties
,在config
目录下的kraft
目录中:1
cp /opt/kafka-3.6.0/config/kraft/server.properties /opt/kafka-3.6.0/config/kraft/server.properties.bak
修改配置
1
vim /opt/kafka-3.6.0/config/kraft/server.properties
主要修改内容如下:
1
2
3
4
5
6
7
8
9
10
11
12# 节点 ID,分别为 1,2,3
node.id=1
# 日志目录
log.dirs=/opt/kafka-3.6.0/kafka-combined-logs
# 可以成为控制器的节点和它们的端口
controller.quorum.voters=1@kafka1.com:9093,2@kafka2.com:9093,3@kafka3.com:9093
# 定义 Kafka Broker 如何向外部公布它的地址。
# 这是 Kafka Broker 通知 Producer 和 Consumer 如何连接到自己的方式。
# 例如,如果你设置 advertised.listeners=PLAINTEXT://my.public.ip:9092,
# 那么 Kafka Broker 将告诉 Producer 和 Consumer 它的公共 IP 地址是 my.public.ip,并且它在 9092 端口上监听连接。
# 这里我们需要在 3 个节点分别设置对应的地址
advertised.listeners=PLAINTEXT://kafka1.com:9092格式化日志目录
1
/opt/kafka-3.6.0/bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c /opt/kafka-3.6.0/config/kraft/server.properties
输出:
1
Formatting /opt/kafka-3.6.0/kraft-combined-logs with metadata.version 3.6-IV2.
三个节点都启动 Kafka
1
/opt/kafka-3.6.0/bin/kafka-server-start.sh -daemon /opt/kafka-3.6.0/config/kraft/server.properties
选择任意一个节点创建一个新 topic
1
/opt/kafka-3.6.0/bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic test --replication-factor 1 --partitions=2
输出:
1
Created topic test.
在其他节点获取
test
这个topic
的信息1
/opt/kafka-3.6.0/bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic test
可以看到关于
test
这个topic
的信息是可以获取到的,说明集群之前信息是互通的,集群搭建完毕。1
2
3Topic: test TopicId: svJClTUpSFa9Z6FWDvkARg PartitionCount: 2 ReplicationFactor: 1 Configs: segment.bytes=1073741824
Topic: test Partition: 0 Leader: 2 Replicas: 2 Isr: 2
Topic: test Partition: 1 Leader: 3 Replicas: 3 Isr: 3随便选择一个节点,往
test
里面写入数据:1
/opt/kafka-3.6.0/bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test
输入数据后按回车即发送一条数据,可以随时按
Ctrl + C
退出:1
2
3
4
5hedon@ubuntu:~/Downloads$ /opt/kafka-3.6.0/bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test
>msg1
>msg2
>msg 3
>^随便选择一个节点,启动消费者消费
topic
中的数据:1
/opt/kafka-3.6.0/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
输出:
1
2
3
4
5hedon@ubuntu:/opt/kafka-3.6.0$ /opt/kafka-3.6.0/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
msg1
msg2
msg 3
^CProcessed a total of 3 messages
至此,Kafka 的 KRaft 版本集群就部署完毕了!
补充说明 - KRaft 配置文件
下面是 Kafka KRaft 版本配置文件每个配置项的解释:
配置项 | 说明 |
---|---|
process.roles | Kafka 服务器的角色,设置此项将 Kafka 置于 KRaft 模式。可能的值包括"broker" 和 "controller"。 |
node.id | 与此实例关联的节点 ID。 |
controller.quorum.voters | 控制器选举的投票节点,格式为 node-id@host:port 。 |
listeners | 服务器监听的地址,格式为listener_name://host_name:port 。 |
inter.broker.listener.name | 用于 broker 之间通信的监听器名称。 |
advertised.listeners | 服务器向客户端宣告的监听器名称、主机名和端口。 |
controller.listener.names | 控制器使用的监听器名称列表。 |
listener.security.protocol.map | 监听器名称到安全协议的映射。默认情况下,它们是相同的。 |
num.network.threads | 服务器用于从网络接收请求和向网络发送响应的线程数。 |
num.io.threads | 服务器用于处理请求(可能包括磁盘 I/O)的线程数。 |
socket.send.buffer.bytes | 服务器用于发送数据的缓冲区大小。 |
socket.receive.buffer.bytes | 服务器用于接收数据的缓冲区大小。 |
socket.request.max.bytes | 服务器接受的请求的最大大小(用于防止内存溢出)。 |
log.dirs | 用于存储日志文件的目录列表。 |
num.partitions | 每个主题的默认日志分区数。 |
num.recovery.threads.per.data.dir | 每个数据目录在启动时用于日志恢复和关闭时用于刷新的线程数。 |
offsets.topic.replication.factor | 内部主题 "__consumer_offsets" 和 "__transaction_state"的复制因子。 |
transaction.state.log.replication.factor | 事务状态日志的复制因子。 |
transaction.state.log.min.isr | 事务状态日志的最小同步副本数。 |
log.flush.interval.messages | 强制将数据刷新到磁盘之前接受的消息数。 |
log.flush.interval.ms | 消息在日志中停留的最大时间,超过这个时间就会强制刷新到磁盘。 |
log.retention.hours | 由于年龄而使日志文件有资格被删除的最小年龄。 |
log.retention.bytes | 基于大小的日志保留策略。 |
log.segment.bytes | 日志段文件的最大大小。 |
log.retention.check.interval.ms | 检查日志段是否可以根据保留策略被删除的间隔。 |
请注意,这只是 Kafka 配置的一部分,Kafka 配置的完整列表可以在