文章目录
简介ncstreaming程序
简介
对于spark steaming,相当于小批次的spark rdd的计算,只是不断的启动driver端进行计算,然后driver启动excuter
nc
安装nc
yum install nc -y
[外链图片转存失败(img-u7SDXy3p-1564853035562)(1564849201547.png)]
命令为
nc -lk 8888
注意: 为什么采用local[2],开启两个线程,如图 需要receiver和calcuater,一个接受一个计算
streaming程序
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.{Milliseconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
object StreamingWorldCount {
def main(args: Array[String]): Unit = {
val conf= new SparkConf().setMaster("local[2]").setAppName("StreamingWorldCount")
val sc = new SparkContext(conf)
//streamingContext相当于对sparkContext进行了一次包装,有了Streaming context就相当于对spark创建了抽象DSteaming
val context = new StreamingContext(sc,Milliseconds(3000))
val lines: ReceiverInputDStream[String] = context.socketTextStream("192.168.18.100",8888)
lines.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).print()
//优雅的开始
context.start()
//优雅的结束
context.awaitTermination()
}
}
pom.xml
<dependency>
<groupId>org.apache.spark
</groupId>
<artifactId>spark-streaming_2.11
</artifactId>
<version>2.2.0
</version>
</dependency>
<dependency>
<groupId>org.scala-lang
</groupId>
<artifactId>scala-library
</artifactId>
<version>2.11.8
</version>
</dependency>