在SpringBoot中通过maven来做包管理构建,有几个地方需要注意一下的,需要解决包之间的冲突,否则运行时会报错:
(1)sparkSQL中需要先排除两个包:
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>${spark.version}</version> <exclusions> <exclusion> <groupId>org.codehaus.janino</groupId> <artifactId>janino</artifactId> </exclusion> <exclusion> <groupId>org.codehaus.janino</groupId> <artifactId>commons-compiler</artifactId> </exclusion> </exclusions> </dependency>(2)重新引入:
<dependency> <groupId>org.codehaus.janino</groupId> <artifactId>commons-compiler</artifactId> <version>2.7.8</version> </dependency> <dependency> <groupId>org.codehaus.janino</groupId> <artifactId>janino</artifactId> <version>2.7.8</version> </dependency>
ok,准备工作做完之后,开始代码层面得工作:
主要给大家演示的场景是将json字符串转换成临时表,然后通过sparkSQL操作临时表,非常简单方便:
public class SparkJsonSQL { public void Exec(){ SparkConf conf = new SparkConf(); conf.setMaster("local[2]").setAppName("jsonRDD"); JavaSparkContext sc = new JavaSparkContext(conf); SQLContext sqlContext = new SQLContext(sc); JavaRDD<String> nameRDD = sc.parallelize(Arrays.asList( "{\"name\":\"zhangsan\",\"age\":\"18\"}", "{\"name\":\"lisi\",\"age\":\"19\"}", "{\"name\":\"wangwu\",\"age\":\"20\"}" )); JavaRDD<String> scoreRDD = sc.parallelize(Arrays.asList( "{\"name\":\"zhangsan\",\"score\":\"100\"}", "{\"name\":\"lisi\",\"score\":\"200\"}", "{\"name\":\"wangwu\",\"score\":\"300\"}" )); Dataset<Row> namedf = sqlContext.read().json(nameRDD); Dataset<Row> scoredf = sqlContext.read().json(scoreRDD); namedf.registerTempTable("name"); scoredf.registerTempTable("score"); Dataset<Row> result = sqlContext.sql("select name.name,name.age,score.score from name,score where name.name = score.name"); //Dataset<Row> result = sqlContext.sql("select * from name"); result.show(); result.foreach(x ->System.out.print(x)); sc.stop(); } }
我们将程序运行起来看看效果: