Kafka 快速起步

it2022-05-08  7

Kafka 快速起步

2017-01-05 杜亦舒 性能与架构 性能与架构 性能与架构

微信号 yogoup

功能介绍 网站性能提升与架构设计

主要内容:1. kafka 安装、启动2. 消息的 生产、消费3. 配置启动集群4. 集群下的容错测试5. 从文件中导入数据,并导出到文件

单机示例

安装

tar -xzf kafka_2.10-0.10.1.1.tgz cd kafka_2.10-0.10.1.1

启动

> bin/zookeeper-server-start.sh \config/zookeeper.properties> bin/kafka-server-start.sh \config/server.properties

创建topic

打开一个新的终端窗口

bin/kafka-topics.sh --create \--zookeeper localhost:2181 \--replication-factor 1 \--partitions 1 \--topic test

发送消息

打开一个新的终端窗口

bin/kafka-console-producer.sh \--broker-list localhost:9092 \--topic test

进入输入模式,随意输入信息,例如:

hello world hi

获取消息

打开一个新的终端窗口

bin/kafka-console-consumer.sh \--bootstrap-server localhost:9092 \--topic test \--from-beginning

便会显示出刚才发送的两条消息:

hello world hi

这时可以打开发送消息的终端窗口,输入新的信息,再返回来就可以看到自动接收到了新消息

配置集群

新建两个启动配置文件

> cp config/server.properties \config/server-1.properties> cp config/server.properties \ config/server-2.properties

修改 config/server-1.properties 的以下几项配置:

broker.id=1 listeners=PLAINTEXT://:9093 log.dir=logs/kafka-logs-1

修改 config/server-2.properties 的以下几项配置: 

broker.id=2 listeners=PLAINTEXT://:9094 log.dir=logs/kafka-logs-2

启动

> bin/kafka-server-start.sh \config/server-1.properties &> bin/kafka-server-start.sh \ config/server-2.properties &

创建一个topic,设置3个复制

bin/kafka-topics.sh --create \--zookeeper localhost:2181 \--replication-factor 3 \--partitions 1 \--topic my-replicated-topic

发送消息

bin/kafka-console-producer.sh --broker-list localhost:9092 \--topic my-replicated-topic

输入消息:

my test message 1 my test message 2

获取消息

bin/kafka-console-consumer.sh \--bootstrap-server localhost:9092 \--from-beginning \--topic my-replicated-topic

可以正常取得消息

容错测试

# 取得server1的进程号 ps aux | grep server-1.properties # 杀掉进程 kill -9 43116

读取消息

bin/kafka-console-consumer.sh \--bootstrap-server localhost:9092 \--from-beginning \--topic my-replicated-topic

返回信息:

my test message 1 my test message 2

仍然可以正常取得消息

Kafka Connect

Kafka 中的 connecter 可以与外部系统进行连接,例如文件系统、数据库

下面实验一个简单文件系统交互,从一个文件中导入数据,然后导出到另一个文件中

创建一个测试文件,用于导入数据使用

echo -e "foo\nbar" > test.txt

启动 connect,执行数据的导入导出

bin/connect-standalone.sh \ config/connect-standalone.properties \ config/connect-file-source.properties \ config/connect-file-sink.properties

命令执行后,会输出一系列的日志信息,等待执行完毕

查看导出结果

cat test.sink.txt

返回结果:

foo bar

成功导出了 test.txt 中的数据 

过程分析

执行第2步的命令后,为什么是去读test.txt?为什么写入了test.sink.txt?中间的过程是什么样的?

原因是在于两个配置文件

config/connect-file-source.properties (导入配置)

name=local-file-source connector.class=FileStreamSource tasks.max=1 file=test.txt topic=connect-test  

file指定了是从test.txt中导入数据

topic指定了把数据发送到connect-test这个topic

connect-file-sink.properties(导出配置)

name=local-file-sink connector.class=FileStreamSink tasks.max=1 file=test.sink.txt topics=connect-test

file指定了把数据导出到test.txt中导入数据

topic指定从connect-test这个topic中读取数据

查看一下connect-test这个topic

bin/kafka-console-consumer.sh \--bootstrap-server localhost:9092 \--topic connect-test \--from-beginning

结果为:

{"schema":{"type":"string","optional":false},"payload":"foo"} {"schema":{"type":"string","optional":false},"payload":"bar"}

现在向test.txt中添加一条新数据:

echo "Another line" >> test.txt

再次执行 cat test.sink.txt 就会看到刚刚添加的数据:

foo bar Another line        

更多介绍: http://www.cnblogs.com/ximengchj/p/6423704.html

相关文章:

分布式消息队列 Kafka

Kafka 消息存储及检索

Kafka 高可用设计

Kafka是如何实现高吞吐率的

点击 “阅读原文” 查看 文章列表

赞赏

人赞赏

阅读原文 阅读 投诉 精选留言

该文章作者已设置需关注才可以留言

写留言

该文章作者已设置需关注才可以留言

写留言

加载中 以上留言由公众号筛选后显示

了解留言功能详情

转载于:https://www.cnblogs.com/wangdaijun/p/6498041.html

相关资源:数据结构—成绩单生成器

最新回复(0)