Spark 流计算程序-统计每2秒钟的单词个数-写入文件

it2022-05-05  125

Spark 流计算程序-统计每2秒钟的单词个数 import java.io.{File, FileOutputStream} import java.util.Date import java.text.SimpleDateFormat import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} import scala.collection.mutable.ArrayBuffer /** * Spark 流计算程序,统计每2秒钟的单词个数 */ object WordCountSaveFileScala { def main(args: Array[String]): Unit = { val conf = new SparkConf() conf.setAppName("StreamingWordCountSaveFile") conf.setMaster("local[*]") //流上下文 val ssc = new StreamingContext(conf, Seconds(2)) //创建套接字文本流 // val lines = ssc.socketTextStream("localhost", 8888) val lines = ssc.socketTextStream("master", 8888) //考察RDD的分区数 lines.foreachRDD(rdd=>{ val part=rdd.getNumPartitions println(part) }) //单词序列 val words = lines.flatMap(_.split(" ")) //标一成对 val pair = words.map((_, 1)) val result = pair.reduceByKey(_ + _) result.foreachRDD(rdd => { val rdd2 = rdd.mapPartitionsWithIndex((idx, it) => { val buf: ArrayBuffer[(Int, (String, Int))] = ArrayBuffer[(Int, (String, Int))]() for (t <- it) { buf.+=((idx, t)) } buf.iterator }) rdd2.foreachPartition(it => { val now = new Date() //以小时为单位存放数据 val sdf = new SimpleDateFormat("yyyy-MM-dd-HH") //格式化时间串 val strDate = sdf.format(now) val itt = it.take(1) if (!itt.isEmpty) { //分区索引 var par = itt.next()._1 //数据文件 val file = strDate + "-" + par + ".dat" //构建文件对象,文件输出流 val four = new FileOutputStream(new File("d:///stream", file), true) for (t <- it) { val word = t._2._1 val cnt = t._2._2 four.write((word + "\t" + cnt + "\r\n").getBytes()) four.flush() } four.close() } }) }) //启动上下文 ssc.start() //等待停止 ssc.awaitTermination() } }

 

测试:

在linux 上开启 nc -lk 8888

a

a

a

a

b

b

bb

类似如上数据进行测试。

或者在本地开启serversSocket测试

import java.io.IOException; import java.io.OutputStream; import java.net.ServerSocket; import java.net.Socket; public class MyServerSocket { public static void main(String[] args) throws Exception { ServerSocket ss=new ServerSocket(8888); while(true){ Socket sock=ss.accept(); System.out.println("有人连接了"); OutputStream oos=sock.getOutputStream(); int i=0; for(;;){ oos.write(("hello world tom"+ i +"\r\n").getBytes()); oos.flush(); Thread.sleep(10); i++; } } } }

最新回复(0)