Spark之Stream高级知识分享二(MapWithState +foreachRDD+Window+transform)

it2024-04-20  12

1.MapWithState 小案列

Spark Stream:以批处理为主,用微批处理来处理流数据

Flink:真正的流式处理,以流处理为主,用流处理来处理批数据

但是Spark的Strurctured Stream 确实是真正的流式处理来处理批数据

但是Spark的structured Stream确实是真正的流式处理,也是未来的Spark流式处理的未来方向,新的Stream特性也是加载那里了。

1)MapWithState可以实现和UpdateStateByKey一样对不同批次的数据的分析,但是他是实验性方法,慎用,可能下一版本就没了

2)MapWithState,只有当前批次出现了该key才会显示该key的所有的批次分析数据

3)最好的方式还是写DB

 

MapWithStateTest { def main(args: Array[String]) { //第一个参数是key,第二参数是当前value,第三个参数之前的value val mappingFunction = (key: String, value: Option[Int], state: State[Int])=> { val sum = value.getOrElse(0)+state.getOption().getOrElse(0) state.update(sum) (key,sum) } val sparkConf = new SparkConf() .setAppName("StatefulNetworkWordCount") .setMaster("local[3]") // Create the context with a 5 second batch size val ssc = new StreamingContext(sparkConf, Seconds(5)) ssc.checkpoint(".") // Initial RDD input to updateStateByKey val initialRDD = ssc.sparkContext.parallelize(List(("hello", 1), ("world", 1))) // Create a ReceiverInputDStream on target ip:port and count the // words in input stream of \n delimited test (eg. generated by 'nc') val lines = ssc.socketTextStream("192.168.76.120", 1314) val words = lines.flatMap(_.split(" ")).map(x => (x, 1)).reduceByKey(_+_) val stateDstream = words.mapWithState(StateSpec.function(mappingFunction)) stateDstream.print() ssc.start() ssc.awaitTermination()

  

2.(重要)foreachRDD小案列

1)foreachRDD是生产中用到最多的output(egger)函数,通过它可将DStream转换为RDD以及DF\DS,然后进行操作

2)forearchRDD的方法是在driver中进行的,故写DB时它的获取链接代码必须写在第二层循环的foreachPatition中,不然会爆序列化错误。

3)踩坑,在foreachPatition中数据的获取使用的是迭代器,不管是java还是scala,迭代器只能使用一次,第二次就为空了

4)踩坑,操作DB时要使用连接池

 

import org.apache.spark.SparkConf import org.apache.spark.sql.SparkSession import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.{Seconds, StreamingContext} object ForeachRDDApp { def main(args: Array[String]): Unit = {{ val sc = new SparkConf() .setAppName("word count") .setMaster("local[3]") val ssc = new StreamingContext(sc, Seconds(5)) val lines = ssc.socketTextStream("192.168.76.120", 1314) //当数据多次被使用,数据持久化 // lines.persist(StorageLevel.MEMORY_ONLY) val wordsDStrem = lines.flatMap(_.split(" ")) wordsDStrem.foreachRDD(rdd =>{ val spark = SparkSession .builder .config(rdd.sparkContext.getConf) .getOrCreate() import spark.implicits._ val wordDF = rdd.toDF("word") wordDF.createOrReplaceGlobalTempView("words") val countDF = spark.sql("select word, count(*) as total from words group by word") countDF.show() }) ssc.start() ssc.awaitTermination() } } }

  

3.Window小案列

1)窗口的长度以及滑动时间必须是批出来间隔的整数倍关系

2)窗口的功能完全可通过每次存储DB,然后查询多批次来实现

 

import javax.sql.ConnectionPoolDataSource import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} object WindowTFTest { def main(args: Array[String]): Unit = { val sc = new SparkConf() .setAppName("word count") .setMaster("local[3]") val ssc = new StreamingContext(sc, Seconds(5)) val lines = ssc.socketTextStream("192.168.76.120", 1314) val wordContDS = lines.flatMap(_.split(" ")) .map((_, 1)) .reduceByKey(_ + _) // wordContDS.print() //window val windowDS = wordContDS.window(Seconds(20), Seconds(10)) windowDS.print() //save // wordContDS.saveAsTextFiles("C:\\Users\\admin\\Desktop\\spark学习\\outPutData\\") ssc.start() ssc.awaitTermination() } }

  

4. transform 白名单小案例

1)transform是Transformatition(lazy)类型操作,用它可以将Dstream转换成RDD,这样我们可以通过RDD的编程去实现业务逻辑,如白名单过滤等

 

import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} object TransformApp { def main(args: Array[String]): Unit = { val conf = new SparkConf() .setMaster("local[2]") .setAppName("Transform App") //每隔一秒的数据为一个batch val ssc = new StreamingContext(conf, Seconds(5)) val whiteRDD = ssc.sparkContext.parallelize(List("17")).map((_, true)) //读取的机器以及端口,读取的数据的格式是:老二,3,1 (姓名,年龄,性别) val lines = ssc.socketTextStream("192.168.43.125", 1314) val result = lines.map(x => (x.split(",")(0), x)) .transform(rdd => { //(老二,((老二,3,1),Option[])) rdd.leftOuterJoin(whiteRDD) .filter(_._2._2.getOrElse(false) != true) .map(_._2._1) }) result.print() ssc.start() // Start the computation ssc.awaitTermination() // Wait for the computation to terminate } }

  

 

转载于:https://blog.csdn.net/qq_32641659/article/details/90748175

 

转载于:https://www.cnblogs.com/xuziyu/p/11263877.html

最新回复(0)