spark streaming(1) socket wordcount

it2025-10-16  5

文章目录

简介ncstreaming程序

简介

对于spark steaming,相当于小批次的spark rdd的计算,只是不断的启动driver端进行计算,然后driver启动excuter

nc

安装nc

yum install nc -y

[外链图片转存失败(img-u7SDXy3p-1564853035562)(1564849201547.png)]

命令为

nc -lk 8888

注意: 为什么采用local[2],开启两个线程,如图 需要receiver和calcuater,一个接受一个计算

streaming程序

import org.apache.spark.streaming.dstream.ReceiverInputDStream import org.apache.spark.streaming.{Milliseconds, StreamingContext} import org.apache.spark.{SparkConf, SparkContext} object StreamingWorldCount { def main(args: Array[String]): Unit = { val conf= new SparkConf().setMaster("local[2]").setAppName("StreamingWorldCount") val sc = new SparkContext(conf) //streamingContext相当于对sparkContext进行了一次包装,有了Streaming context就相当于对spark创建了抽象DSteaming val context = new StreamingContext(sc,Milliseconds(3000)) val lines: ReceiverInputDStream[String] = context.socketTextStream("192.168.18.100",8888) lines.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).print() //优雅的开始 context.start() //优雅的结束 context.awaitTermination() } }

pom.xml

<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.11</artifactId> <version>2.2.0</version> </dependency> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>2.11.8</version> </dependency>
最新回复(0)