Kafka入门01

it2022-05-05  162

文章目录

Kafka入门-概述及部署1.Kafka概述1.1 消息队列(1)点对点模式(一对一,消费者主动拉取数据,消息收到后消息清除)(2)发布/订阅模式(一对多,数据产生后,推送给所有订阅者) 1.2 为什么需要消息队列1.3 什么是Kafka1.4 Kafka架构 2.Kafka部署(1)创建安装目录(2)解压安装安装包到安装目录(3)配置环境变量生效环境变量 (4)创建日志保存文件(5)修改配置文件server.properties (6)启动Kafka后台启动 Kafka简单使用1.新建一个Topic2.查询topic的信息3.查询所有可以使用topics4.修改Topics的分区数(只能增加)5.删除topic6.创建消费者(1)创建测试用topic:test01(2)创建生产者 7.创建消费者8.新生产者(支持0.9版本+)9.新消费者(支持0.9版本+)10.列出消费者组

Kafka入门-概述及部署

1.Kafka概述

1.1 消息队列

(1)点对点模式(一对一,消费者主动拉取数据,消息收到后消息清除)

​ 点对点模型通常是基于拉取或者轮询的消息传送模型,这种模型从队列中请求信息,而不是将信息推送到客户端。这个模型的特点是发送到队列的消息被一个且只有一个接收者处理,即使有多个消息监听者也是如此。

(2)发布/订阅模式(一对多,数据产生后,推送给所有订阅者)

​ 发布/订阅模型则是一个基于推送的消息传送模型。发布订阅模式可以有多种不同的订阅者,临时订阅者只有在主动监听主题时才接受消息,而持久订阅者则监听主题的所有消息,即使当前订阅者不可用,处于离线状态。

1.2 为什么需要消息队列

(1)解耦:允许你独立的扩展或修改两边的处理过程,只要确保他们遵守同样的接口约束。

(2)冗余:

消息队列把数据进行持久化直到它们被完全处理,通过这一方式规避了数据丢失的风险。许多消息队列所采用的“插入-获取-删除”范式中,把一个消息从队列中删除之前,需要你处理系统明确的指出该条消息已经被处理完毕,从而确保你的数据被安全的保存知道你是用完毕。

(3)扩展性:因为消息队列解耦了你的处理过程,所以增大消息入队和处理的频率是很容易的,只要另外增加处理过程即可。

(4)灵活性&峰值处理能力

1.3 什么是Kafka

​ 在流式计算中,Kafka一般用来缓存数据,Storm通过消费Kafka的数据进行计算。

​ Kafka是一个分布式消息队列。Kafka对消息保存时根据Topic进行归类,发送消息者称为Producer,消息接收者称为Consumer,此外kafka集群有多个kafka实例组成,每个实例(server)称为broker。

​ 无论是kafka集群,还是consumer都依赖于zookeeper集群保存一些meta信息。来保证系统可用性。

1.4 Kafka架构

2.Kafka部署

(1)创建安装目录

[root@hadoop02 ~]# mkdir /opt/software/kafka

(2)解压安装安装包到安装目录

[root@hadoop02 kafka-0.10.0.0]# tar -zxvf kafka_2.11-0.10.0.0.tgz -C /opt/software/kafka/

(3)配置环境变量

vim /etc/profile #Set Kafka Environment export KAFKA_HOME=/opt/software/kafka/kafka-0.10.0.0 export PATH=$PATH:$KAFKA_HOME/bin

生效环境变量

source /etc/profile

(4)创建日志保存文件

[root@hadoop02 kafka-0.10.0.0]# mkdir logs [root@hadoop02 kafka-0.10.0.0]# ls bin config libs LICENSE logs NOTICE site-docs

(5)修改配置文件

server.properties

# listeners = PLAINTEXT://your.host.name:9092 listeners=PLAINTEXT://hadoop02:9092 # The id of the broker. This must be set to a unique integer for each broker. broker.id=0 # A comma seperated list of directories under which to store log files log.dirs=/opt/software/kafka/kafka-0.10.0.0/logs # root directory for all kafka znodes. zookeeper.connect=hadoop02:2181

(6)启动Kafka

[root@hadoop02 kafka]# kafka-server-start.sh $KAFKA_HOME/config/server.properties

后台启动

nohup kafka-server-start.sh $KAFKA_HOME/config/server.properties &

Kafka简单使用

1.新建一个Topic

[root@hadoop02 ~]# kafka-topics.sh --create --zookeeper hadoop02:2181 --replication-factor 1 --partitions 2 --topic hello02 Created topic "hello02".

注意:replication-factor不能大于broker数

2.查询topic的信息

[root@hadoop02 ~]# kafka-topics.sh --describe --zookeeper hadoop02:2181 --topic hello02 Topic:hello02 PartitionCount:2 ReplicationFactor:1 Configs: Topic: hello02 Partition: 0 Leader: 0 Replicas: 0 Isr: 0 Topic: hello02 Partition: 1 Leader: 0 Replicas: 0 Isr: 0

3.查询所有可以使用topics

[root@hadoop02 ~]# kafka-topics.sh --list --zookeeper hadoop02:2181 hello01 hello02

4.修改Topics的分区数(只能增加)

[root@hadoop02 ~]# kafka-topics.sh --alter --zookeeper hadoop02:2181 --partitions 5 --topic hello02 WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected Adding partitions succeeded! [root@hadoop02 ~]# kafka-topics.sh --describe --zookeeper hadoop02:2181 --topic hello02 Topic:hello02 PartitionCount:5 ReplicationFactor:1 Configs: Topic: hello02 Partition: 0 Leader: 0 Replicas: 0 Isr: 0 Topic: hello02 Partition: 1 Leader: 0 Replicas: 0 Isr: 0 Topic: hello02 Partition: 2 Leader: 0 Replicas: 0 Isr: 0 Topic: hello02 Partition: 3 Leader: 0 Replicas: 0 Isr: 0 Topic: hello02 Partition: 4 Leader: 0 Replicas: 0 Isr: 0

5.删除topic

注意:必须在conf/server.properties中设置delete.topic.enable=true,默认值为false,如果不设置,删除topic只是标记删除,并不是真正的删除。

delete.topic.enable=false

当delete.topic.enable=true时:

重新启动kafka服务器:

[root@hadoop02 ~]# nohup kafka-server-start.sh $KAFKA_HOME/config/server.properties & [1] 8782 [root@hadoop02 ~]# nohup: ignoring input and appending output to ‘nohup.out’ [root@hadoop02 ~]# jps 9036 Jps 3262 QuorumPeerMain 8782 Kafka

删除topic:hello01

[root@hadoop02 ~]# kafka-topics.sh --list --zookeeper hadoop02:2181 hello01 [root@hadoop02 ~]# kafka-topics.sh --delete -zookeeper hadoop02:2181 --topic hello01 Topic hello01 is marked for deletion. Note: This will have no impact if delete.topic.enable is not set to true. [root@hadoop02 ~]# kafka-topics.sh --list --zookeeper hadoop02:2181 [root@hadoop02 ~]#

6.创建消费者

(1)创建测试用topic:test01

[root@hadoop02 ~]# kafka-topics.sh --create --zookeeper hadoop02:2181 --replication-factor 1 --partitions 2 --topic test01 Created topic "test01".

(2)创建生产者

[root@hadoop02 ~]# kafka-console-producer.sh --broker-list hadoop02:9092 --topic test01

创建生产者成功,控制台进入等待输入状态。

参数解析

–broker-list:kafka的服务地址[多个用逗号隔开]

–topic:具体的单个topic

7.创建消费者

[root@hadoop02 ~]# kafka-console-consumer.sh --bootstrap-server hadoop02:9092 --topic test01 --zookeeper hadoop02:2181 --from-beginning

参数解析:

–bootstrap-server:kafka的服务地址

–topic:具体的单个topic

注意:标记为删除状态的topic也可以使用

8.新生产者(支持0.9版本+)

[root@pseudo01 kafka-0.10]# kafka-console-producer.sh --broker-list pseudo01:9092 --topic test02 --producer.config $KAFKA_HOME/config/producer.properties

9.新消费者(支持0.9版本+)

[root@pseudo01 software]# kafka-console-consumer.sh \ > --bootstrap-server pseudo01:9092 \ > --topic test02 \ > --new-consumer \ > --from-beginning \ > --consumer.config $KAFKA_HOME/config/consumer.properties

10.列出消费者组


最新回复(0)