在一些特定场景,例如streamingRDD需要和历史数据进行join从而获得一些profile信息,此时形成较小的新数据RDD和很大的历史RDD的join。 Spark中直接join实际上效率不高:
RDD没有索引,join操作实际上是相互join的RDD进行hash然后shuffle到一起;实际上,如果历史数据的RDD有索引,我们可以循环遍历streaming中的每一条数据,并向历史数据发送point query,即loop + indexed get。Streaming的数据是小数据,这样坐的性能会高很多。(这种小数据和大量历史数据的join模式在物联网/互联网场景下很常见)
另外,
spark中的RDD是只读的,增量信息无法直接更新到历史RDD中虽然我们可以使用streaming的窗口操作来缓存一定量的历史数据,但这会增加业务逻辑的复杂度。
IndexedRDD能够解决上述的两个问题,即对RDD内存数据建立索引,并且可以更新RDD。但是IndexRDD不支持事务,如果需要对同一个key做更新就存在数据更新冲突,导致数据不一致。另外,IndexRDD单纯是RDD的数据结构和接口的增强,不支持Spark之外的组件对其的访问。
本文将介绍基于Apache Geode和Spark相结合:
基于Geode的RDD借助Geode的内存数据存储和数据索引,其join操作是loop + indexed get方式,可以提高流数据和历史数据相join的效率;Geode 是目前性能和生产可用性最高的IMDG之一,基本满足ACID;Spark 中通过GeodeRDD的写操作实际上是将数据写入Geode,我们还可以通过JDBC等方式访问数据,甚至进行OLAP操作。需要手动编译spark-connector,参照GitHub上的流程操作即可。https://github.com/apache/geode/blob/rel/v1.1.1/geode-spark-connector/doc/1_building.md 最终会编译三个文件: The following jar files will be created:
geode-spark-connector/target/scala-2.10/geode-spark-connector_2.10-0.5.0.jargeode-functions/target/scala-2.10/geode-functions_2.10-0.5.0.jargeode-spark-demos/target/scala-2.10/geode-spark-demos_2.10-0.5.0.jarStart Geode cluster with 1 locator and 2 servers:
gfsh gfsh>start locator --name=locator1 --port=55221 gfsh>start server --name=server1 --locators=localhost[55221] --server-port=0 gfsh>start server --name=server2 --locators=localhost[55221] --server-port=0Then create two demo regions:
gfsh>create region --name=str_str_region --type=PARTITION --key-constraint=java.lang.String --value-constraint=java.lang.String gfsh>create region --name=int_str_region --type=PARTITION --key-constraint=java.lang.Integer --value-constraint=java.lang.StringDeploy Spark Geode Connector's geode-function jar (geode-functions_2.10-0.5.0.jar):
gfsh>deploy --jar=<path to connector project>/geode-functions/target/scala-2.10/geode-functions_2.10-0.5.0.jar官网下载spark1.6.0-bin-hadoop2.6。解压后运行./sbin/start-all。
Check Geode locator property in the Spark shell:
scala> sc.getConf.get("spark.geode.locators") res0: String = localhost[55221]Geode可以认为是类似hdfs/hbase的数据集,不同的是:
基于Geode数据形成的RDD可以被修改;普通的RDD可以和Geode Region数据快速join;使用Geode Spark Connector的代码中首先import一下org.apache.geode.spark.connector._。引入所有的implicit函数。
scala> import org.apache.geode.spark.connector._ import org.apache.geode.spark.connector._In the Spark shell, create a simple pair RDD and save it to Geode:
scala> val data = Array(("1", "one"), ("2", "two"), ("3", "three")) data: Array[(String, String)] = Array((1,one), (2,two), (3,three)) scala> val distData = sc.parallelize(data) distData: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[0] at parallelize at <console>:14 scala> distData.saveToGemfire("str_str_region") 15/02/17 07:11:54 INFO DAGScheduler: Job 0 finished: runJob at GemFireRDDFunctions.scala:29, took 0.341288 s此时Geode中相应region就有了刚才save的数据了gfsh:
gfsh>query --query="select key,value from /str_str_region.entries" Result : true startCount : 0 endCount : 20 Rows : 3 key | value --- | ----- 1 | one 3 | three 2 | two NEXT_STEP_NAME : ENDSaving non-pair RDD to Geode requires an extra function that converts each element of RDD to a key-value pair. Here's sample session in Spark shell:
scala> val data2 = Array("a","ab","abc") data2: Array[String] = Array(a, ab, abc) scala> val distData2 = sc.parallelize(data2) distData2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at parallelize at <console>:17 scala> distData2.saveToGemfire("int_str_region", e => (e.length, e)) [info 2015/02/17 12:43:21.174 PST <main> tid=0x1] ... 15/02/17 12:43:21 INFO DAGScheduler: Job 0 finished: runJob at GemFireRDDFunctions.scala:52, took 0.251194 sVerify the result with gfsh:
gfsh>query --query="select key,value from /int_str_region.entrySet" Result : true startCount : 0 endCount : 20 Rows : 3 key | value --- | ----- 2 | ab 3 | abc 1 | a NEXT_STEP_NAME : ENDThe same API is used to expose both replicated and partitioned region as RDDs.
scala> val rdd = sc.geodeRegion[String, String]("str_str_region") rdd: org.apache.geode.spark.connector.rdd.GemFireRDD[String,String] = GemFireRDD[2] at RDD at GemFireRDD.scala:19 scala> rdd.foreach(println) (1,one) (3,three) (2,two) scala> val rdd2 = sc.geodeRegion[Int, String]("int_str_region") rdd2: org.apache.geode.spark.connector.rdd.GemFireRDD[Int,String] = GemFireRDD[3] at RDD at GemFireRDD.scala:19 scala> rdd2.foreach(println) (2,ab) (1,a) (3,abc)10万条数据的性能差别有将近10倍。
具体来说,RDD跟Geode Regioin的join是循环+get操作,类似于map-only 的join。具体代码参见GeodeJoinRdd.scala
private def computeWithoutFunc(split: Partition, context: TaskContext, region: Region[K, V]): Iterator[(T, V)] = { val leftPairs = left.iterator(split, context).toList.asInstanceOf[List[(K, _)]] val leftKeys = leftPairs.map { case (k, v) => k}.toSet // Note: get all will return (key, null) for non-exist entry, so remove those entries val rightPairs = region.getAll(leftKeys).filter { case (k, v) => v != null} leftPairs.filter{case (k, v) => rightPairs.contains(k)} .map {case (k, v) => ((k, v).asInstanceOf[T], rightPairs.get(k).get)}.toIterator }而RDD跟RDD的普通join操作需要数据的shuffle,会带来很多额外的开销。如下图所示。
可以推断一下,在一些特定场景,例如streamingRDD需要和历史数据进行join从而获得一些profile信息,此时形成较小的新数据RDD和很大的历史RDD的join。此时loop + index get的性能会高很多。这种小数据和大量历史数据的join模式在物联网/互联网场景下很常见。
此外IndexedRdd也可以作为一个备选方案。但是IndexedRdd无法向Geode这样能够被Spark世界之外访问,只能作为提高spark计算的一种方案.
转载于:https://www.cnblogs.com/luweiseu/p/7698986.html
相关资源:2017年4月自考试卷,04-18年另外打包,需要自取