Spark Streaming整合kafka实战

it2022-05-05  86

一、pom.xml文件中的依赖

<groupId>groupId</groupId> <artifactId>day142.0</artifactId> <version>1.0-SNAPSHOT</version> <properties> <scala.version>2.11.8</scala.version> <hadoop.version>2.7.4</hadoop.version> <spark.version>2.0.2</spark.version> </properties> <dependencies> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>${spark.version}</version> </dependency> <!--sparkSql 需要引入的包--> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>${spark.version}</version> </dependency> <!--HiveContext需要引入的包--> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-hive_2.11</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.11</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-flume_2.10</artifactId> <version>2.0.2</version> </dependency> <!-- 这个是和Kafka有关的依赖,spark2.x版本之后--> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-8_2.11</artifactId> <version>2.1.0</version> </dependency> </dependencies>

二、我们现在开始启动虚拟机中的Kafka,根据命令的不同会有修改的地方 (1)启动zookeeper集群,所有都要启动 zkServer.sh start

(2)启动kafka集群,要进入Kafka的目录下使用这个命令启动 bin/kafka-server-start.sh config/server.properties

(3) 创建topic,主题自己改,IP地址换成自己的,9092不需要改动 /opt/software/kafka/bin/kafka-topics.sh -create --bootstrap-server 19192.132:9092,192.168.192.133:9092,192.168.192.134:9092 -replication-factor 3 --partitions 3 --topic cyy

(4) 向topic中生产数据,也就是启动生产者,你的主题名称需要与创建的保持一致,IP地址不要写错通过shell命令向topic发送消息 /opt/software/kafka/bin/kafka-console-producer.sh --broker-list 192.168.192.132:9092,192.168.192.133:9092,192.168.192.134:9092 --topic cyy (5)这个时候想知道Kafka集群启动是否成功,可以启动集群中的消费者去接受生产的消息 /opt/software/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.192.132:9092,192.168.192.133:9092,192.168.192.134:9092 --topic cyy --from-beginning

Kafka启动成功之后,就去idea中运行代码,

三、代码

import org.apache.spark.streaming.dstream.DStream import org.apache.spark.streaming.kafka.KafkaUtils import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.{SparkConf, SparkContext} import scala.collection.immutable //todo:利用sparkStreaming接受kafka中的数据实现单词计数----采用receivers object SparkStreamingKafka_Receiver_checkpoint { def updateFunc(a:Seq[Int], b:Option[Int]) :Option[Int] ={ Some(a.sum+b.getOrElse(0)) } def main(args: Array[String]): Unit = { val checkpointPath = "./kafka-receiver" val ssc = StreamingContext.getOrCreate(checkpointPath, () => { createFunc(checkpointPath) }) ssc.start() ssc.awaitTermination() } def createFunc(checkpointPath:String): StreamingContext = { //todo:1、创建sparkConf val sparkConf: SparkConf = new SparkConf() .setAppName("SparkStreamingKafka_Receiver_checkpoint") .setMaster("local[4]") //todo:开启wal预写日志 .set("spark.streaming.receiver.writeAheadLog.enable","true") //todo:2、创建sparkContext val sc = new SparkContext(sparkConf) sc.setLogLevel("WARN") //todo:3、创建StreamingContext val ssc = new StreamingContext(sc,Seconds(5)) ssc.checkpoint(checkpointPath) //todo:4、指定zkServer val zkServer="node1:2181,node2:2181,node3:2181" //todo:5、指定groupId val groupId="spark-kafka-receiver01" //todo:6、指定topics 这个可以利用一个消费者组来消费多个topic, //(topic_name -> numPartitions) 指定topic消费的线程数 val topics=Map("kafka_spark"->1) //todo:7、并行运行更多的接收器读取kafak topic中的数据,这里设置3个 val resultDStream: immutable.IndexedSeq[DStream[String]] = (1 to 3).map(x => { //todo:8、通过使用KafkaUtils的createStream接受kafka topic中的数据,生成DStream val kafkaDataDStream: DStream[String] = KafkaUtils.createStream(ssc, zkServer, groupId, topics).map(x => x._2) kafkaDataDStream } ) //todo:利用StreamContext将所有的DStream组合在一起 val kafkaDStream: DStream[String] = ssc.union(resultDStream) //todo:8、获取kafka中topic的内容 //todo:9、切分每一行。每个单词记为1 val wordAndOne: DStream[(String, Int)] = kafkaDStream.flatMap(_.split(" ")).map((_,1)) //todo:10、相同单词出现的次数累加 val result: DStream[(String, Int)] = wordAndOne.updateStateByKey(updateFunc) //todo:打印 result.print() ssc } }

运行代码:效果图:

java.lang.NoClassDefFoundError:org / apache / spark / Logging(java.lang.NoClassDefFoundError: org/apache/spark/Logging)运行中报这个错误的时候就是pom.xml中的Kafka的依赖版本不正确。请使用上面提供的依赖。


最新回复(0)