SparkCore:Shared Variables、Broadcast Variables

it2024-08-02  73

官网:Shared Variables http://spark.apache.org/docs/2.4.2/rdd-programming-guide.html#shared-variables

1、Shared Variables

一般,Spark的每个Task操作的是变量的一个独立副本数据,比如定义了一个List,List定义的代码在Driver端执行,而Action操作是在Executor端,那么在Executor端每一个Task都会有一份List数据。 共享变量,可以让每一个Executor中只保留一份List数据。 Spark提供了两种方式共享变量,广播变量/Broadcast Variables,累加器(Accumulator)。 这里只讲广播变量/Broadcast Variables。

2、Broadcast Variables

广播是把一份数据发布到每一台机器上,广播变量的值不可更改,只读;广播的是RDD的结果,只能在driver端,所以需要collect,一般使用collectAsMap,因为这个高效,然后在executor端使用 下面举例说明: 这是一个普通的join

scala> val info1=sc.parallelize(Array(("601","zhangsan"),("602","lisi"))) scala> val info2=sc.parallelize(Array(("601","dongnan","20"),("602","keda","19"),("603","zheda","18"))).map(x=>(x._1,x)) scala> info1.join(info2).foreach(println) (602,(lisi,(602,keda,19))) (601,(zhangsan,(601,dongnan,20))) scala> info1.join(info2).map(x=>{ | x._1 +","+x._2._1+","+x._2._2._2 | })foreach(println) 601,zhangsan,dongnan 602,lisi,keda

图中可以看出stage0,stage1,到stage2是有shuffle的。

广播变量,广播的是小表,那么大表关联小表的时候,就不需要再join了,因为小表已经通过广播在每台机器上存在了,只需要匹配想要的值就可以了。

//这里需要在driver端,collect收集RDD的结果,然后广播出去 scala> val info1=sc.parallelize(Array(("601","zhangsan"),("602","lisi"))).collectAsMap() scala> val infobroadcoast=sc.broadcast(info1) scala> val info2=sc.parallelize(Array(("601","dongnan","20"),("602","keda","19"),("603","zheda","18"))).map(x=>(x._1,x)) //yield的主要作用是记住每次迭代中的有关值,并逐一存入到一个数组中 x是一个tuple(602,(602,keda,19)) scala> info2.mapPartitions(x=>{ | val broadcastMap=infobroadcoast.value | for((key,value)<- x if(broadcastMap.contains(key))) | yield (key,broadcastMap.get(key).getOrElse(""),value._2) | }).foreach(print) (601,zhangsan,dongnan)(602,lisi,keda)

这里可以看到,结果一样,但是没有shuffle的产生。

最新回复(0)