Spark---------------------spark框架的知识点及使用

it2022-05-08  8

1.spark框架是如何处理数据的

1.hadoop MapReduce 框架 并行计算的思想、分而治之的思想

2. scala集合高阶函数 处理数据的思想: 将要分析的数据放到集合中去,然后调用集合的高阶函数去处理数据

2.spark是什么

Apache Spark™ is a unified analytics engine for large-scale data processing. 统一分析引擎为海量数据处理 统一:什么样的数据都能处理分析,什么类型的数据都可以处理,实时,离线,流式都可以。

MapReduce map,reduce函数 将分析的结果放在磁盘中,并且后续进行数据分析的时候,再从磁盘读取,以此往返。。。

Runs Everywhere

spark要处理的数据在哪里 hdfs/hive/HBASE/ORC/parquet/csv/tsv/jdbc/redis/mangdb/ES/。。 spark 能从哪里读数据,就能往哪里写数据

spark程序运行在哪里 本地模式 集群模式

yarn ,standlone, mesos cluster

3.spark分析数据的过程(3步骤)

第一步.数据的读取 将要处理的数据封装到一个RDD中(类别scala中list列表) val inputRDD = sc.textFile("…")

第二步.数据处理(分析)

调用集合RDD中函数(高阶函数)进行处理分析

RDD->函数->另外一个RDD :tranformation函数

val outputRDD = inputRDD.#(基本就是list中函数)

比如:map/flatMap/filter/reduceBy/Key

第三步:数据的输出(输出)

调用RDD函数,返回的不是RDD类型

outputRDD.#此类型函数被称为action函数 foreach/count/sum/first

三步放在一起就是一个链式编程: sc.textFile(…).transaction().action

4.spark版本与依赖

目前用的最多的版本 2.2.0 1.6.1 http://spark.apache.org/docs/2.2.0/spark安装的依赖 JDK8 scala spark

5.spark安装

(1)上传解压 (2)修改配置 mv spark-env.sh.template spark-env.sh 修改spark-env.sh (3)启动 帮助命令: bin/spark-shell –help 启动交互式命令 bin/spark-shell --master local[2] (4)是否启动成功 不报错,进入交互式命令行 webui上查看 4040

6.spark appliction与MapReduce application的区别

spark-shell –master local[2] 运行一个程序在本地模式 [2] ->表示的 线程thread 每个task运行在一个线程Thread里面,都需要1core CPU

对于MapReduce application来讲 一个MapReduce application就是一个Mapreduce job 每个task(map task还是reduce task)运行在进程中(JVM process)

7.权限问题

注意:修改了配置文件,需要重启进程才生效

8.SparkSession

在spark 2.x中不推荐使用sparkContext对象读取数据,而是推荐SparkSession Session是对context的进一步封装,sparkSession中包含SparkContext 读数据: DataFrameReader =spark.read

DataFrameWriter

9.RDD

RDD是什么

就是一个集合 在使用的时候,就当做为Scala集合类中List列表

实质 分布式 存储数据 集合 abstract class RDD[T: ClassTag] A Resilient(弹性) Distributed(分布式) Dataset (RDD) Represents(代表) an immutable(不可变), partitioned(分区) collection of elements that can be operated(处理) on in parallel(并行) 将数据划分为很多分区(partitioned),不同分区的数据存在在不同的机器里面,每个分区的数据可以被一个Task进程处理分析

This class contains the basic operations available on all RDDs, such as map, filter, and persist 所有的RDD都一些基本的操作,比如map,filter,persist PairRDDFunctions contains operations available only on RDDs of key-value pairs, such as groupByKey and join; RDD中的类型是键值对(二元组),RDD隐式转换PairRDDFunctions,PairRDDFunctions方法我们就可以用,比如有groupByKey,reduceByKey,join

Internally, each RDD is characterized by five main properties: A list of partitions 分区列表 A function for computing each split 每一个分片可以被一个函数处理(并行) A list of dependencies on other RDDs 一个RDD依赖于其他RDD(RDD怎么来) 查看依赖 println(wordCountRDD.toDebugString) Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned) 可选,如果RDD的值是二元组,你可以指定分区 Optionally, a list of preferred(最优) locations to compute each split on (e.g. block locations for an HDFS file) 可选,最优的路径中读取分片数据

RDD创建方式

RDD中函数的分类

转换函数Tansformation

一个RDD调用函数之后 转换为另外一个RDD map,filter,reducBykey

action函数 一个RDD调用函数之后,不会转换为一个RDD,触发一个Job的执行 比如 count ->Long first -> 第一条数据 take ->获取前几条数据 top->获取前几条数据(排序) foreach ->打印所有的数据 saveAsTextFile ->保存数据到文件

持久化persisitent def persist(newLevel: StorageLevel) def unpersist(blocking: Boolean = true)

class StorageLevel private( private var _useDisk: Boolean, //磁盘 private var _useMemory: Boolean, //内存 //JVM内存中tachyon(基于内存的分布式文件系统) private var _useOffHeap: Boolean, / private var _deserialized: Boolean, 反序列化 private var _replication: Int = 1) //副本个数 extends Externalizable 什么情况下,RDD数据需要进行持久化呢??? (1)某个RDD数据,被多次使用 重复RDD (2)某个RDD数据来自不易,使用超过1次 经过复杂的处理得到RDD (3)通常选择的策略 MEMORY_ONLY_2 -如果内存足够 MEMORY_AND_DISK_SER_2 -如果内存不够,先内存后磁盘

10.spark 框架的优势

数据结构RDD,用于存储管理数据

DAG调度 spark中每个job的调度都是DAG调度 DAG:有向无环图 (0)构建DAG图, 倒推法,配合依赖 (1)DAG图划分为多个stage,RDD直接产生了shuffle过程,就会划分stage (2)按照顺序执行stage中task任务,每个stage中可有多个Task

11.Spark性能优化:RDD方法优化

对于RDD中某些函数使用注意

(1)能不使用groupByKey函数就不使用,除非不得已

redcueByKey(combiner) = groupBy+ map(变量值相加) redcueByKey可以先进行本地聚合操作

(2)尽量使用XXPartition函数代替XX函数

xx:map/foreach/zip def foreach(f: T => Unit): Unit f:针对RDD中每个元素进行的操作处理的 def foreachPartition(f: Iterator[T] => Unit): Unit f:针对RDD中每个分区的元素进行操作处理的 比如RDD中2个分区,100条数据,现将数据保存到MYSQL表中 foreach item ->mysql connection ->创建100次 foreachPartition 对每个分区中数据 只要获取2个连接即可

(3)适当的降低或者增加RDD分区数目

RDD的分区对应一个Task处理数据

def repartition(numPartitions: Int) -产生shuffle def coalesce(numPartitions: Int, shuffle: Boolean = false)

一开始的时候,数据量比较多,可以加到RDD分分区数,增加并行度(在集群资源充足的情况下)当数据预处理之后(尤其过滤清洗之后)。RDD中数据量减少了很多,此时可以考虑减少分区的数目

12.spark standalone

介绍 spark框架自身带的 分布式集群资源管理和任务调度框架,类似于Hadoop Yarn框架

Standalone ------------------>Yarn Master ------------------------>ResourceManager workers------------------------>NodeManager

唯独有一点不同的是: 一台机器只能运行一个Nodemanager,但是spark standalone在一台机器上可以同时运行多个Workers

安装部署(伪分布式)

conf/spark-env.sh

SPARK_MASTER_HOST=bigdata-hpsk01.huadian.com SPARK_MASTER_PORT=7077 SPARK_MASTER_WEBUI_PORT=8080 SPARK_WORKER_CORES=2 SPARK_WORKER_MEMORY=2g SPARK_WORKER_PORT=7078 SPARK_WORKER_WEBUI_PORT=8081

mv slaves.template slaves

配置worker节点运行的主机,一行一个 bigdata-hpsk01.huadian.com

启动服务

启动主节点 sbin/start-master.sh启动从节点 sbin/start-slaves.sh 注意:配置免密钥登录 ssh-keygen -t rsa ssh-copy-id huadian@bigdata-hpsk01.huadian.com

测试 可以通过webui页面查看 8080/8081

13.spark application运行的过程

回顾MapReduce程序运行在Yarn集群组成

-1.Application Master 应用的管理者,负责资源的申请,task运行监控调度 -2.Task的运行 MapTask和ReduceTask,都是运行在Nodemanager上Container容器中。

spark application运行在集群下,包含2部分(重点)

1.Driver Program -JVM 进程 相当于AppMaster,应用的管理者,主要是调度job的执行 就是程序Main方法,必须创建sparkContext实例对象 端口号4040,提供应用的监控

2.Executor s 每一个Executor是一个JVM进程,里面包含内存和CPU(可以理解为容器container),运行Task—相当于线程池 那么executor可以运行多少个tasks,就看有多少core

相关文档: http://spark.apache.org/docs/2.2.0/cluster-overview.html


最新回复(0)