2 ,Kafka 集群部署,启动,关闭,命令行操作

it2022-05-05  134

一 ,安装 :

1 ,规划 :

node01node02node03zkzkzkkafkakafkakafka

2 ,安装包下载 :

要这个 : Scala 2.11 - kafka_2.11-0.11.0.0.tgz (asc, md5)

http://kafka.apache.org/downloads.html

3 ,得到 :

kafka_2.11-0.11.0.0.tgz

4 ,上传到 :

/export/softwares

5 ,解压 :

tar -zxvf kafka_2.11-0.11.0.0.tgz -C /export/servers

6 ,创建日志文件夹 :logs

cd /export/servers/kafka_2.11-0.11.0.0 mkdir logs

7 ,修改配置文件

cd /export/servers/kafka_2.11-0.11.0.0/configvim server.properties ( 4 处注意的地方 ) # broker 的全局唯一编号,不能重复( 服务器编号,从 0 开始 ) ( 手动指定 ) broker.id=0 # 删除 topic 功能使能 ( 允许删除数据 ) ( 手动指定 ) delete.topic.enable=true # 处理网络请求的线程数量 num.network.threads=3 # 用来处理磁盘 IO 的线程数量 num.io.threads=8 # 发送套接字的缓冲区大小 socket.send.buffer.bytes=102400 # 接收套接字的缓冲区大小 socket.receive.buffer.bytes=102400 # 请求套接字的缓冲区大小 socket.request.max.bytes=104857600 # kafka 运行日志存放的路径 ( 手动指定 ) log.dirs=/export/servers/kafka_2.11-0.11.0.0/logs # topic 在当前 broker 上的分区个数 num.partitions=1 # 用来恢复和清理 data 下数据的线程数量 num.recovery.threads.per.data.dir=1 # segment文件保留的最长时间,超时将被删除 log.retention.hours=168 # 配置连接 Zookeeper 集群地址 ( 手动指定 ) zookeeper.connect=node01:2181,node02:2181,node03:2181

8 ,配置环境变量 :

vi /etc/profile

# KAFKA_HOME export KAFKA_HOME=/export/servers/kafka_2.11-0.11.0.0 export PATH=$PATH:$KAFKA_HOME/bin

source /etc/profile

9 ,node02 ,node03 安装 kafka :

分发安装文件 : cd /export/servers scp -r kafka_2.11-0.11.0.0/ root@node02:$PWD scp -r kafka_2.11-0.11.0.0/ root@node03:$PWD 修改 brokerid : cd /export/servers/kafka_2.11-0.11.0.0/config vim server.properties 分别为 1,2配置环境变量 : vim /etc/profile source /etc/profile

9 ,启动,关闭 :

启动 zk :三台机器都启动 1 ,启动:zkServer.sh start 2 ,查看状态 :zkServer.sh status启动 kafka :依次在 node01,node02,node03 上启动 kafka 1 ,进入目录 :cd /export/servers/kafka_2.11-0.11.0.0 2 ,启动 :bin/kafka-server-start.sh config/server.properties & 3 ,关闭 :bin/kafka-server-stop.sh stop注意 : 看到 kafka 启动成功后,敲一下回车

二 ,命令行操作 : topic

1 ,topic , partition , message 关系

topic :是一个主题,存储一个类型的数据。partition :分区,一个主题可以有多个分区。message : 具体的消息,存储在具体的分区中。分区 : 不能跨机器,每台机器可以有多个分区。

2 ,查看所有 topic

bin/kafka-topics.sh --zookeeper node01:2181 --list

3 ,创建 topic :

说明 : 1 ,名字 : first 2 ,分区数 : 1 3 ,副本数 : 3 个备份脚本 bin/kafka-topics.sh --zookeeper node01:2181 --create --replication-factor 3 --partitions 1 --topic first 成功的标志 : cd /export/servers/kafka_2.11-0.11.0.0/logs ls 看到 0 号分区

4 ,删除 topic :

bin/kafka-topics.sh --zookeeper node01:2181 --delete --topic first

三 ,命令行操作 : 生产者

1 ,目的 :

用控制台给 kafka 发数据。发送到哪个主题 : first

2 ,命令 :

命令 bin/kafka-console-producer.sh --broker-list node01:9092 --topic first 效果:控制台等待输入我们输入 hello world

3 ,数据存储到哪里去了 :

找到文件,看内容 cd /export/servers/kafka_2.11-0.11.0.0/logs/first-0 cat 00000000000000000000.log看到 hello ,到时还有一堆乱码 说明 : kafka 有自己的序列化存储方式。

4 ,消费某个主题的数据 :理论

默认:从现在开始消费,看不到以往的数据。也可以:从第一条数据开始消费

5 ,消费某个主题的数据 :消费到控制台

目的 : 从开始消费数据代码 : bin/kafka-console-consumer.sh --zookeeper node01:2181 --from-beginning --topic first 效果 : 得到了所有数据

6 ,查看某个 topic 的详情 :

bin/kafka-topics.sh --zookeeper node01:2181 --describe --topic first

最新回复(0)