CentOS 6.9安装Kafka并使用自带的ZooKeeper配置集群

消息队列 2019年03月10日

本篇笔记记录了在Linux的CentOS 6.9版本下安装Kafka的过程,以及使用内置的ZooKeeper配置集群的方式,并测试了消息的生产和消费。

测试节点IP
Node2:192.168.75.132
Node2:192.168.75.134
Node3:192.168.75.135
ZooKeeper依赖java虚拟机,因此需要安装java环境
安装java环境
查看java包列表

yum list java*

安装java

yum -y install java-1.8.0-openjdk*

查看java版本

java -version

下载安装Kafka(所有Node操作)
下载Kafka

wget -c http://mirror.bit.edu.cn/apache/kafka/2.1.1/kafka_2.11-2.1.1.tgz

解压

tar zxvf kafka_2.11-2.1.1.tgz
mv kafka_2.11-2.1.1 /usr/local/kafka
cd /usr/local/kafka

配置Kafka(所有Node操作)
添加环境变量

vim /etc/profile

追加如下配置

export KAFKA_HOME=/usr/local/kafka
export PATH=$PATH:$KAFKA_HOME/bin

使环境变量立即生效

source /etc/profile

修改Kafka配置

vim /usr/local/kafka/config/server.properties

如下修改

#服务器ID,分别是1,2,3
broker.id=1
#各自的ip地址
listeners=PLAINTEXT://192.168.75.132:9092
#各自的ip地址
advertised.listeners=PLAINTEXT://192.168.75.132:9092
#log地址
log.dirs=/var/log/kafka-logs
#集群地址和端口
zookeeper.connect=192.168.75.132:2181,192.168.75.134:2181,192.168.75.135:2181

修改ZooKeeper配置

vim /usr/local/kafka/config/zookeeper.properties

如下修改

dataDir=/data/zookeeper
initLimit=5
syncLimit=2
maxClientCnxns=50

server.1=192.168.75.132:2888:3888
server.2=192.168.75.134:2888:3888
server.3=192.168.75.135:2888:3888

创建相应目录

mkdir -p /data/zookeeper
mkdir -p /var/log/kafka-logs

启动与测试(所有Node操作)
启动zookeeper(测试启动没问题了再加-daemon)

zookeeper-server-start.sh -daemon  $KAFKA_HOME/config/zookeeper.properties

如果报错
Caused by: java.lang.IllegalArgumentException: /data/zookeeper/myid file is missing
各节点创建myid文件,把各自的broker.id写进去

vim /data/zookeeper/myid
1

启动kafka

kafka-server-start.sh -daemon  $KAFKA_HOME/config/server.properties

在192.168.75.132上创建一个测试的Topic

kafka-topics.sh --create --zookeeper 192.168.75.132:2181 --replication-factor 3 --partitions 4 --topic testtopic
Created topic "testtopic".

换个节点,在192.168.75.134上查看Topic

kafka-topics.sh --list --zookeeper 192.168.75.134:2181
testtopic

在192.168.75.135上打开一个生产者,准备产生消息

kafka-console-producer.sh --broker-list 192.168.75.135:9092 --topic testtopic
>

在192.168.75.132和192.168.75.134上打开两个消费者,准备消费消息

kafka-console-consumer.sh --bootstrap-server 192.168.75.132:9092 --topic testtopic --group group1 --from-beginning
kafka-console-consumer.sh --bootstrap-server 192.168.75.134:9092 --topic testtopic --group group1 --from-beginning

192.168.75.135生产情况

192.168.75.132消费情况

192.168.75.134消费情况

需要注意的事项:
1.确保各节点防火墙端口正确开启
2.多个消费者时要带--group参数,否则每个消费者会产生默认的group id,导致同一个消息被多个消费者消费
3.--from-beginning参数指定开始时从日志中读取未消费的消息,不带这个参数则读取最新消息
4.当只需一个消费者时,如收集log时,可以开启多个生产者和一个消费者,--group参数可以不指定