1.hadoop MapReduce 框架 并行计算的思想、分而治之的思想
2. scala集合高阶函数 处理数据的思想: 将要分析的数据放到集合中去,然后调用集合的高阶函数去处理数据
Apache Spark™ is a unified analytics engine for large-scale data processing. 统一分析引擎为海量数据处理 统一:什么样的数据都能处理分析,什么类型的数据都可以处理,实时,离线,流式都可以。
MapReduce map,reduce函数 将分析的结果放在磁盘中,并且后续进行数据分析的时候,再从磁盘读取,以此往返。。。
Runs Everywhere
spark要处理的数据在哪里 hdfs/hive/HBASE/ORC/parquet/csv/tsv/jdbc/redis/mangdb/ES/。。 spark 能从哪里读数据,就能往哪里写数据
spark程序运行在哪里 本地模式 集群模式
yarn ,standlone, mesos cluster
第一步.数据的读取 将要处理的数据封装到一个RDD中(类别scala中list列表) val inputRDD = sc.textFile("…")
第二步.数据处理(分析)
调用集合RDD中函数(高阶函数)进行处理分析
RDD->函数->另外一个RDD :tranformation函数
val outputRDD = inputRDD.#(基本就是list中函数)
比如:map/flatMap/filter/reduceBy/Key
第三步:数据的输出(输出)
调用RDD函数,返回的不是RDD类型
outputRDD.#此类型函数被称为action函数 foreach/count/sum/first
三步放在一起就是一个链式编程: sc.textFile(…).transaction().action
(1)上传解压 (2)修改配置 mv spark-env.sh.template spark-env.sh 修改spark-env.sh (3)启动 帮助命令: bin/spark-shell –help 启动交互式命令 bin/spark-shell --master local[2] (4)是否启动成功 不报错,进入交互式命令行 webui上查看 4040
spark-shell –master local[2] 运行一个程序在本地模式 [2] ->表示的 线程thread 每个task运行在一个线程Thread里面,都需要1core CPU
对于MapReduce application来讲 一个MapReduce application就是一个Mapreduce job 每个task(map task还是reduce task)运行在进程中(JVM process)
注意:修改了配置文件,需要重启进程才生效
在spark 2.x中不推荐使用sparkContext对象读取数据,而是推荐SparkSession Session是对context的进一步封装,sparkSession中包含SparkContext 读数据: DataFrameReader =spark.read
DataFrameWriter
就是一个集合 在使用的时候,就当做为Scala集合类中List列表
实质 分布式 存储数据 集合 abstract class RDD[T: ClassTag] A Resilient(弹性) Distributed(分布式) Dataset (RDD) Represents(代表) an immutable(不可变), partitioned(分区) collection of elements that can be operated(处理) on in parallel(并行) 将数据划分为很多分区(partitioned),不同分区的数据存在在不同的机器里面,每个分区的数据可以被一个Task进程处理分析This class contains the basic operations available on all RDDs, such as map, filter, and persist 所有的RDD都一些基本的操作,比如map,filter,persist PairRDDFunctions contains operations available only on RDDs of key-value pairs, such as groupByKey and join; RDD中的类型是键值对(二元组),RDD隐式转换PairRDDFunctions,PairRDDFunctions方法我们就可以用,比如有groupByKey,reduceByKey,join
Internally, each RDD is characterized by five main properties: A list of partitions 分区列表 A function for computing each split 每一个分片可以被一个函数处理(并行) A list of dependencies on other RDDs 一个RDD依赖于其他RDD(RDD怎么来) 查看依赖 println(wordCountRDD.toDebugString) Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned) 可选,如果RDD的值是二元组,你可以指定分区 Optionally, a list of preferred(最优) locations to compute each split on (e.g. block locations for an HDFS file) 可选,最优的路径中读取分片数据
转换函数Tansformation
一个RDD调用函数之后 转换为另外一个RDD map,filter,reducBykey
action函数 一个RDD调用函数之后,不会转换为一个RDD,触发一个Job的执行 比如 count ->Long first -> 第一条数据 take ->获取前几条数据 top->获取前几条数据(排序) foreach ->打印所有的数据 saveAsTextFile ->保存数据到文件
持久化persisitent def persist(newLevel: StorageLevel) def unpersist(blocking: Boolean = true)
class StorageLevel private( private var _useDisk: Boolean, //磁盘 private var _useMemory: Boolean, //内存 //JVM内存中tachyon(基于内存的分布式文件系统) private var _useOffHeap: Boolean, / private var _deserialized: Boolean, 反序列化 private var _replication: Int = 1) //副本个数 extends Externalizable 什么情况下,RDD数据需要进行持久化呢??? (1)某个RDD数据,被多次使用 重复RDD (2)某个RDD数据来自不易,使用超过1次 经过复杂的处理得到RDD (3)通常选择的策略 MEMORY_ONLY_2 -如果内存足够 MEMORY_AND_DISK_SER_2 -如果内存不够,先内存后磁盘
数据结构RDD,用于存储管理数据
DAG调度 spark中每个job的调度都是DAG调度 DAG:有向无环图 (0)构建DAG图, 倒推法,配合依赖 (1)DAG图划分为多个stage,RDD直接产生了shuffle过程,就会划分stage (2)按照顺序执行stage中task任务,每个stage中可有多个Task
对于RDD中某些函数使用注意
redcueByKey(combiner) = groupBy+ map(变量值相加) redcueByKey可以先进行本地聚合操作
RDD的分区对应一个Task处理数据
def repartition(numPartitions: Int) -产生shuffle def coalesce(numPartitions: Int, shuffle: Boolean = false)
一开始的时候,数据量比较多,可以加到RDD分分区数,增加并行度(在集群资源充足的情况下)当数据预处理之后(尤其过滤清洗之后)。RDD中数据量减少了很多,此时可以考虑减少分区的数目介绍 spark框架自身带的 分布式集群资源管理和任务调度框架,类似于Hadoop Yarn框架
Standalone ------------------>Yarn Master ------------------------>ResourceManager workers------------------------>NodeManager
唯独有一点不同的是: 一台机器只能运行一个Nodemanager,但是spark standalone在一台机器上可以同时运行多个Workers安装部署(伪分布式)
conf/spark-env.sh
SPARK_MASTER_HOST=bigdata-hpsk01.huadian.com SPARK_MASTER_PORT=7077 SPARK_MASTER_WEBUI_PORT=8080 SPARK_WORKER_CORES=2 SPARK_WORKER_MEMORY=2g SPARK_WORKER_PORT=7078 SPARK_WORKER_WEBUI_PORT=8081
mv slaves.template slaves
配置worker节点运行的主机,一行一个 bigdata-hpsk01.huadian.com
启动服务
启动主节点 sbin/start-master.sh启动从节点 sbin/start-slaves.sh 注意:配置免密钥登录 ssh-keygen -t rsa ssh-copy-id huadian@bigdata-hpsk01.huadian.com测试 可以通过webui页面查看 8080/8081
回顾MapReduce程序运行在Yarn集群组成
-1.Application Master 应用的管理者,负责资源的申请,task运行监控调度 -2.Task的运行 MapTask和ReduceTask,都是运行在Nodemanager上Container容器中。spark application运行在集群下,包含2部分(重点)
1.Driver Program -JVM 进程 相当于AppMaster,应用的管理者,主要是调度job的执行 就是程序Main方法,必须创建sparkContext实例对象 端口号4040,提供应用的监控
2.Executor s 每一个Executor是一个JVM进程,里面包含内存和CPU(可以理解为容器container),运行Task—相当于线程池 那么executor可以运行多少个tasks,就看有多少core
相关文档: http://spark.apache.org/docs/2.2.0/cluster-overview.html