一 ,安装 :
1 ,规划 :
node01node02node03
zkzkzkkafkakafkakafka
2 ,安装包下载 :
要这个 : Scala 2.11 - kafka_2.11-0.11.0.0.tgz (asc, md5)
http://kafka.apache.org/downloads.html
3 ,得到 :
kafka_2.11-0.11.0.0.tgz
4 ,上传到 :
/export/softwares
5 ,解压 :
tar -zxvf kafka_2.11-0.11.0.0.tgz -C /export/servers
6 ,创建日志文件夹 :logs
cd /export/servers/kafka_2.11-0.11.0.0
mkdir logs
7 ,修改配置文件
cd /export/servers/kafka_2.11-0.11.0.0/configvim server.properties ( 4 处注意的地方 )
# broker 的全局唯一编号,不能重复( 服务器编号,从 0 开始 ) ( 手动指定 )
broker.id=0
# 删除 topic 功能使能 ( 允许删除数据 ) ( 手动指定 )
delete.topic.enable=true
# 处理网络请求的线程数量
num.network.threads=3
# 用来处理磁盘 IO 的线程数量
num.io.threads=8
# 发送套接字的缓冲区大小
socket.send.buffer.bytes=102400
# 接收套接字的缓冲区大小
socket.receive.buffer.bytes=102400
# 请求套接字的缓冲区大小
socket.request.max.bytes=104857600
# kafka 运行日志存放的路径 ( 手动指定 )
log.dirs=/export/servers/kafka_2.11-0.11.0.0/logs
# topic 在当前 broker 上的分区个数
num.partitions=1
# 用来恢复和清理 data 下数据的线程数量
num.recovery.threads.per.data.dir=1
# segment文件保留的最长时间,超时将被删除
log.retention.hours=168
# 配置连接 Zookeeper 集群地址 ( 手动指定 )
zookeeper.connect=node01:2181,node02:2181,node03:2181
8 ,配置环境变量 :
vi /etc/profile
export KAFKA_HOME
=/export/servers/kafka_2.11-0.11.0.0
export PATH
=$PATH:$KAFKA_HOME/bin
source /etc/profile
9 ,node02 ,node03 安装 kafka :
分发安装文件 :
cd /export/servers
scp -r kafka_2.11-0.11.0.0/ root@node02:$PWD
scp -r kafka_2.11-0.11.0.0/ root@node03:$PWD
修改 brokerid : cd /export/servers/kafka_2.11-0.11.0.0/config vim server.properties 分别为 1,2配置环境变量 : vim /etc/profile source /etc/profile
9 ,启动,关闭 :
启动 zk :三台机器都启动 1 ,启动:zkServer.sh start 2 ,查看状态 :zkServer.sh status启动 kafka :依次在 node01,node02,node03 上启动 kafka 1 ,进入目录 :cd /export/servers/kafka_2.11-0.11.0.0 2 ,启动 :bin/kafka-server-start.sh config/server.properties & 3 ,关闭 :bin/kafka-server-stop.sh stop注意 : 看到 kafka 启动成功后,敲一下回车
二 ,命令行操作 : topic
1 ,topic , partition , message 关系
topic :是一个主题,存储一个类型的数据。partition :分区,一个主题可以有多个分区。message : 具体的消息,存储在具体的分区中。分区 : 不能跨机器,每台机器可以有多个分区。
2 ,查看所有 topic
bin/kafka-topics.sh --zookeeper node01:2181 --list
3 ,创建 topic :
说明 : 1 ,名字 : first 2 ,分区数 : 1 3 ,副本数 : 3 个备份脚本
bin/kafka-topics.sh --zookeeper node01:2181 --create --replication-factor 3 --partitions 1 --topic first
成功的标志 : cd /export/servers/kafka_2.11-0.11.0.0/logs ls 看到 0 号分区
4 ,删除 topic :
bin/kafka-topics.sh --zookeeper node01:2181 --delete --topic first
三 ,命令行操作 : 生产者
1 ,目的 :
用控制台给 kafka 发数据。发送到哪个主题 : first
2 ,命令 :
命令
bin/kafka-console-producer.sh --broker-list node01:9092 --topic first
效果:控制台等待输入我们输入 hello world
3 ,数据存储到哪里去了 :
找到文件,看内容 cd /export/servers/kafka_2.11-0.11.0.0/logs/first-0 cat 00000000000000000000.log看到 hello ,到时还有一堆乱码 说明 : kafka 有自己的序列化存储方式。
4 ,消费某个主题的数据 :理论
默认:从现在开始消费,看不到以往的数据。也可以:从第一条数据开始消费
5 ,消费某个主题的数据 :消费到控制台
目的 : 从开始消费数据代码 :
bin/kafka-console-consumer.sh --zookeeper node01:2181 --from-beginning --topic first
效果 : 得到了所有数据
6 ,查看某个 topic 的详情 :
bin/kafka-topics.sh --zookeeper node01:2181 --describe --topic first