文章目录
Kafka入门-概述及部署1.Kafka概述1.1 消息队列(1)点对点模式(一对一,消费者主动拉取数据,消息收到后消息清除)(2)发布/订阅模式(一对多,数据产生后,推送给所有订阅者)
1.2 为什么需要消息队列1.3 什么是Kafka1.4 Kafka架构
2.Kafka部署(1)创建安装目录(2)解压安装安装包到安装目录(3)配置环境变量生效环境变量
(4)创建日志保存文件(5)修改配置文件server.properties
(6)启动Kafka后台启动
Kafka简单使用1.新建一个Topic2.查询topic的信息3.查询所有可以使用topics4.修改Topics的分区数(只能增加)5.删除topic6.创建消费者(1)创建测试用topic:test01(2)创建生产者
7.创建消费者8.新生产者(支持0.9版本+)9.新消费者(支持0.9版本+)10.列出消费者组
Kafka入门-概述及部署
1.Kafka概述
1.1 消息队列
(1)点对点模式(一对一,消费者主动拉取数据,消息收到后消息清除)
点对点模型通常是基于拉取或者轮询的消息传送模型,这种模型从队列中请求信息,而不是将信息推送到客户端。这个模型的特点是发送到队列的消息被一个且只有一个接收者处理,即使有多个消息监听者也是如此。
(2)发布/订阅模式(一对多,数据产生后,推送给所有订阅者)
发布/订阅模型则是一个基于推送的消息传送模型。发布订阅模式可以有多种不同的订阅者,临时订阅者只有在主动监听主题时才接受消息,而持久订阅者则监听主题的所有消息,即使当前订阅者不可用,处于离线状态。
1.2 为什么需要消息队列
(1)解耦:允许你独立的扩展或修改两边的处理过程,只要确保他们遵守同样的接口约束。
(2)冗余:
消息队列把数据进行持久化直到它们被完全处理,通过这一方式规避了数据丢失的风险。许多消息队列所采用的“插入-获取-删除”范式中,把一个消息从队列中删除之前,需要你处理系统明确的指出该条消息已经被处理完毕,从而确保你的数据被安全的保存知道你是用完毕。
(3)扩展性:因为消息队列解耦了你的处理过程,所以增大消息入队和处理的频率是很容易的,只要另外增加处理过程即可。
(4)灵活性&峰值处理能力
…
1.3 什么是Kafka
在流式计算中,Kafka一般用来缓存数据,Storm通过消费Kafka的数据进行计算。
Kafka是一个分布式消息队列。Kafka对消息保存时根据Topic进行归类,发送消息者称为Producer,消息接收者称为Consumer,此外kafka集群有多个kafka实例组成,每个实例(server)称为broker。
无论是kafka集群,还是consumer都依赖于zookeeper集群保存一些meta信息。来保证系统可用性。
1.4 Kafka架构
2.Kafka部署
(1)创建安装目录
[root@hadoop02 ~
]
(2)解压安装安装包到安装目录
[root@hadoop02 kafka-0.10.0.0
]
(3)配置环境变量
vim /etc/profile
export KAFKA_HOME
=/opt/software/kafka/kafka-0.10.0.0
export PATH
=$PATH:$KAFKA_HOME/bin
生效环境变量
source /etc/profile
(4)创建日志保存文件
[root@hadoop02 kafka-0.10.0.0
]
[root@hadoop02 kafka-0.10.0.0
]
bin config libs LICENSE logs NOTICE site-docs
(5)修改配置文件
server.properties
# listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://hadoop02:9092
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=0
# A comma seperated list of directories under which to store log files
log.dirs=/opt/software/kafka/kafka-0.10.0.0/logs
# root directory for all kafka znodes.
zookeeper.connect=hadoop02:2181
(6)启动Kafka
[root@hadoop02 kafka
]
后台启动
nohup kafka-server-start.sh
$KAFKA_HOME/config/server.properties
&
Kafka简单使用
1.新建一个Topic
[root@hadoop02 ~
]
Created topic
"hello02".
注意:replication-factor不能大于broker数
2.查询topic的信息
[root@hadoop02 ~
]
Topic:hello02 PartitionCount:2 ReplicationFactor:1 Configs:
Topic: hello02 Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: hello02 Partition: 1 Leader: 0 Replicas: 0 Isr: 0
3.查询所有可以使用topics
[root@hadoop02 ~
]
hello01
hello02
4.修改Topics的分区数(只能增加)
[root@hadoop02 ~
]
WARNING: If partitions are increased
for a topic that has a key, the partition logic or ordering of the messages will be affected
Adding partitions succeeded
!
[root@hadoop02 ~
]
Topic:hello02 PartitionCount:5 ReplicationFactor:1 Configs:
Topic: hello02 Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: hello02 Partition: 1 Leader: 0 Replicas: 0 Isr: 0
Topic: hello02 Partition: 2 Leader: 0 Replicas: 0 Isr: 0
Topic: hello02 Partition: 3 Leader: 0 Replicas: 0 Isr: 0
Topic: hello02 Partition: 4 Leader: 0 Replicas: 0 Isr: 0
5.删除topic
注意:必须在conf/server.properties中设置delete.topic.enable=true,默认值为false,如果不设置,删除topic只是标记删除,并不是真正的删除。
delete.topic.enable=false
当delete.topic.enable=true时:
重新启动kafka服务器:
[root@hadoop02 ~
]
[1
] 8782
[root@hadoop02 ~
]
[root@hadoop02 ~
]
9036 Jps
3262 QuorumPeerMain
8782 Kafka
删除topic:hello01
[root@hadoop02 ~
]
hello01
[root@hadoop02 ~
]
Topic hello01 is marked
for deletion.
Note: This will have no impact
if delete.topic.enable is not
set to true.
[root@hadoop02 ~
]
[root@hadoop02 ~
]
6.创建消费者
(1)创建测试用topic:test01
[root@hadoop02 ~
]
Created topic
"test01".
(2)创建生产者
[root@hadoop02 ~
]
创建生产者成功,控制台进入等待输入状态。
参数解析
–broker-list:kafka的服务地址[多个用逗号隔开]
–topic:具体的单个topic
7.创建消费者
[root@hadoop02 ~
]
参数解析:
–bootstrap-server:kafka的服务地址
–topic:具体的单个topic
注意:标记为删除状态的topic也可以使用
8.新生产者(支持0.9版本+)
[root@pseudo01 kafka-0.10
]
9.新消费者(支持0.9版本+)
[root@pseudo01 software
]
> --bootstrap-server pseudo01:9092 \
> --topic test02 \
> --new-consumer \
> --from-beginning \
> --consumer.config
$KAFKA_HOME/config/consumer.properties
10.列出消费者组