[笔记迁移][Spark][11]Spark源码——内核架构4

it2022-05-05  179

文章目录

8. TaskScheduler——分发Task至Executor9. Executor10. Task

8. TaskScheduler——分发Task至Executor

/** * TaskScheduler提交Task的入口 */ override def submitTasks (taskSet : TaskSet) { val tasks = taskSet.tasks logInfo("Adding task set " + taskSet .id + " with " + tasks .length + " tasks" ) this. synchronized { //给每一个TaskSet,创建一个TasSetkManager(负责它所对应的TaskSet的任务执行状况监视和管理) val manager = createTaskSetManager( taskSet, maxTaskFailures) val stage = taskSet.stageId val stageTaskSets = taskSetsByStageIdAndAttempt.getOrElseUpdate (stage , new HashMap[Int, TaskSetManager]) //加入内存缓存中 stageTaskSets( taskSet. stageAttemptId) = manager val conflictingTaskSet = stageTaskSets.exists { case (_, ts) => ts. taskSet != taskSet && !ts.isZombie } if (conflictingTaskSet) { throw new IllegalStateException(s"more than one active taskSet for stage $stage:" + s " ${ stageTaskSets.toSeq .map {_._2 .taskSet .id}.mkString ("," )}" ) } schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties) if (! isLocal && !hasReceivedTask ) { starvationTimer.scheduleAtFixedRate (new TimerTask() { override def run () { if (!hasLaunchedTask) { logWarning("Initial job has not accepted any resources; " + "check your cluster UI to ensure that workers are registered " + "and have sufficient resources" ) } else { this.cancel () } } }, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS) } hasReceivedTask = true } // SparkContext初始化,创建TaskScheduler时,关键是创建SchedulerBackend,这里的 backend就是当初创建的那个 //负责创建AppClient,向Master注册Application backend.reviveOffers() //底层调用CoarseGrainedSchedulerBackend.reviveOffers{driverEndpoint.send(ReviveOffers)} } /** * Schedules the tasks within a single TaskSet in the TaskSchedulerImpl. This class keeps track of * each task, retries tasks if they fail (up to a limited number of times), and * handles locality-aware(本地化) scheduling for this TaskSet via delay scheduling. The main interfaces * to it are resourceOffer, which asks the TaskSet whether it wants to run a task on one node, * and statusUpdate, which tells it that one of its tasks changed state (e.g. finished). * * THREADING: This class is designed to only be called from code with a lock on the * TaskScheduler (e.g. its event handlers). It should not be called from other threads. * * @param sched the TaskSchedulerImpl associated with the TaskSetManager * @param taskSet the TaskSet to manage scheduling for * @param maxTaskFailures if any particular task fails this number of times, the entire * task set will be aborted */ private[spark] class TaskSetManager( sched: TaskSchedulerImpl, val taskSet: TaskSet, val maxTaskFailures: Int, blacklistTracker: Option[ BlacklistTracker] = None, clock: Clock = new SystemClock ()) extends Schedulable with Logging //CoarseGrainedSchedulerBackend.reviveOffers() -> makeOffers() // Make fake resource offers on all executors private def makeOffers () { // Make sure no executor is killed while some task is launching on it val taskDescs = CoarseGrainedSchedulerBackend. this.synchronized { // Filter out executors under killing val activeExecutors = executorDataMap.filterKeys(executorIsAlive) val workOffers = activeExecutors .map { case (id, executorData) => new WorkerOffer(id, executorData.executorHost, executorData.freeCores) }. toIndexedSeq //调用TaskSchedulerImpl的resourceOffers()执行任务分配算法,将各个Task分配到executor上 //传入的参数是该Application所有可用的executor,并将其封装为WorkerOffer,每个WorkerOffer代表了每个Executor可用的cpu资源数量 scheduler.resourceOffers (workOffers ) } if (! taskDescs.isEmpty ) { //分配好Task到Executor后,执行自己的launchTasks()将分配的Task发送LanchTask消息到对应的Executor上去,由Executor启动并执行 launchTasks(taskDescs ) } } //根据分配好的情况,在Executor上启动Task // Launch tasks returned by a set of resource offers private def launchTasks (tasks : Seq[Seq[TaskDescription]]) { for ( task <- tasks. flatten) { //将每个Executor要执行的Task信息统一进行序列化操作 val serializedTask = TaskDescription.encode(task) if (serializedTask.limit() >= maxRpcMessageSize) { scheduler.taskIdToTaskSetManager.get(task.taskId).foreach { taskSetMgr => try { var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " + "spark.rpc.message.maxSize (%d bytes). Consider increasing " + "spark.rpc.message.maxSize or using broadcast variables for large values." msg = msg.format(task.taskId, task.index, serializedTask.limit(), maxRpcMessageSize) taskSetMgr.abort(msg) } catch { case e: Exception => logError("Exception in error callback" , e) } } } else { //找到对应的Executor val executorData = executorDataMap(task.executorId) //减去要使用的cpu资源 executorData.freeCores -= scheduler.CPUS_PER_TASK logDebug(s "Launching task ${task.taskId} on executor id: ${task.executorId} hostname: " + s "${executorData.executorHost} .") //向Executor发送LaunchTask消息,来在Executor上启动Task executorData.executorEndpoint.send(LaunchTask( new SerializableBuffer(serializedTask))) } } } /** * Called by cluster manager to offer resources on slaves. We respond by asking our active task * sets for tasks in order of priority. We fill each node with tasks in a round-robin manner so * that tasks are balanced across the cluster. */ def resourceOffers( offers: IndexedSeq[WorkerOffer]): Seq[Seq [TaskDescription]] = synchronized { // Mark each slave as alive and remember its hostname // Also track if new executor is added var newExecAvail = false for ( o <- offers) { if (! hostToExecutors.contains (o .host )) { hostToExecutors(o .host ) = new HashSet[String]() } if (! executorIdToRunningTaskIds.contains (o .executorId )) { hostToExecutors(o .host ) += o .executorId executorAdded(o .executorId , o .host ) executorIdToHost(o .executorId ) = o .host executorIdToRunningTaskIds(o .executorId ) = HashSet [Long]() newExecAvail = true } for ( rack <- getRackForHost(o .host )) { hostsByRack.getOrElseUpdate (rack , new HashSet[String]() ) += o .host } } // Before making any offers, remove any nodes from the blacklist whose blacklist has expired. Do // this here to avoid a separate thread and added synchronization overhead, and also because // updating the blacklist is only relevant when task offers are being made. blacklistTrackerOpt. foreach(_. applyBlacklistTimeout()) val filteredOffers = blacklistTrackerOpt.map { blacklistTracker => offers.filter { offer => ! blacklistTracker.isNodeBlacklisted (offer.host) && !blacklistTracker.isExecutorBlacklisted(offer.executorId) } }.getOrElse(offers) // 将可用的Executor打散,尽量进行负载均衡 val shuffledOffers = shuffleOffers(filteredOffers ) // Build a list of tasks to assign to each worker. // tasks,类似一个二维数组,且每个子ArrayBuffer的数量是固定的,即为该Executor可用的CPU数量 val tasks = shuffledOffers. map( o => new ArrayBuffer[TaskDescription]( o. cores / CPUS_PER_TASK)) val availableCpus = shuffledOffers.map (o => o .cores ).toArray // 从rootPool中取出排序的TaskSet,rootPool调度池是在TaskSchedulerImpl,SchedulerBackend创建完成后,执行的initialize()创建的 // 所有提交的TaskSet首先会放入这个调度池,然后再执行Task分配算法时,从这个调度池中,取出排好队的TaskSet val sortedTaskSets = rootPool.getSortedTaskSetQueue for ( taskSet <- sortedTaskSets) { logDebug( "parentName: %s, name: %s, runningTasks: %s" .format( taskSet.parent.name, taskSet.name, taskSet.runningTasks)) if (newExecAvail) { taskSet.executorAdded() } } // 分配算法核心,双重for循环,对每个TaskSet从最好的本地化级别遍历到最差级别 // Take each TaskSet in our scheduling order, and then offer it each node in increasing order // of locality levels so that it gets a chance to launch local tasks on all of them. // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY // PROCESS_LOCAL: 进程本地化,RDD的partition和Task进入一个Executor,速度当然快 // NODE_LOCAL: 节点本地化,RDD的partition和Task不在一个Executor进程,但在一个Worker节点 // NO_PREF : 没有本地化级别 // RACK_LOCAL : 机架本地化,直到RDD的partition和Task在同一机架上 // ANY : 任意本地化级别 for ( taskSet <- sortedTaskSets) { var launchedAnyTask = false var launchedTaskAtCurrentMaxLocality = false for ( currentMaxLocality <- taskSet.myLocalityLevels) { do { //对当前TaskSet中的Task,尝试优先使用最小本地化级别在Executor上启动 //若无法启动,就跳出这个do-while循环,进入下一种本地化级别即放大本地化级别 //以此类推,直至尝试将TaskSet在某些本地化级别下,让Task在Executor上全部启动 launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet( taskSet, currentMaxLocality, shuffledOffers, availableCpus, tasks) launchedAnyTask |= launchedTaskAtCurrentMaxLocality } while (launchedTaskAtCurrentMaxLocality) } if (!launchedAnyTask) { taskSet.abortIfCompletelyBlacklisted(hostToExecutors) } } if (tasks.size > 0) { hasLaunchedTask = true } return tasks } private def resourceOfferSingleTaskSet ( taskSet: TaskSetManager, maxLocality: TaskLocality, shuffledOffers: Seq[WorkerOffer], availableCpus: Array[Int], tasks: IndexedSeq[ArrayBuffer[ TaskDescription]]) : Boolean = { var launchedTask = false // nodes and executors that are blacklisted for the entire application have already been // filtered out by this point //遍历所有Executor for ( i <- 0 until shuffledOffers.size ) { val execId = shuffledOffers( i). executorId val host = shuffledOffers( i). host //若当前Executor的cpu数量至少大于每个Task要使用的cpu数量,默认是1 if (availableCpus( i) >= CPUS_PER_TASK) { try { //调用TaskSetManager的resorceOffer找到在该Executor上以这种本地化级别可以启动的那些Task //TaskSetManager的resorceOffer大致过程:判断该Executor在这种本地化级别之前的等待时间,若本地化级别的等待时间在一定范围内,则认为Task使用该本地化级别可以在Executor上启动 for (task <- taskSet .resourceOffer (execId , host , maxLocality )) { //当如tasks二位数据,给指定的Executor加上要启动的Task tasks( i) += task //将相应的分配信息加入内存缓存 val tid = task .taskId taskIdToTaskSetManager(tid ) = taskSet taskIdToExecutorId(tid ) = execId executorIdToRunningTaskIds(execId ).add (tid ) availableCpus(i) -= CPUS_PER_TASK assert( availableCpus(i ) >= 0 ) launchedTask = true } } catch { case e : TaskNotSerializableException => logError(s"Resource offer failed, task set ${ taskSet. name} was not serializable") // Do not offer resources for this task, but don't throw an error to allow other // task sets to be submitted. return launchedTask } } } return launchedTask }

9. Executor

   [1] Executor反向注册机制

override def onStart () { logInfo("Connecting to driver: " + driverUrl) rpcEnv.asyncSetupEndpointRefByURI(driverUrl). flatMap { ref => // This is a very fast action so we can use "ThreadUtils.sameThread" driver = Some(ref) //CoarseGrainedExecutorBackend启动后直接向Driver发送RegisterExecutor消息 ref.ask[Boolean](RegisterExecutor(executorId, self, hostname, cores, extractLogUrls)) }(ThreadUtils.sameThread). onComplete { // This is a very fast action so we can use "ThreadUtils.sameThread" case Success(msg) => // Always receive `true`. Just ignore it case Failure(e) => exitExecutor( 1, s "Cannot register with driver: $driverUrl" , e, notifyDriver = false ) }(ThreadUtils.sameThread) } override def receive : PartialFunction [Any, Unit] = { //在Driver注册Executor成功后,将回送RegisteredExecutor消息,该CoarseGrainedExecutorBackend创建Executor(用于大部分功能实现) case RegisteredExecutor => logInfo("Successfully registered with driver" ) try { executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false) } catch { case NonFatal (e ) => exitExecutor(1 , "Unable to create executor due to " + e .getMessage , e ) } // ... ... }

   [2] Task启动机制

//TaskSchedulerImpl发送LaunchTask消息给Executor,启动已给该Executor分配的Task case LaunchTask(data ) => if (executor == null) { exitExecutor(1 , "Received LaunchTask command but executor was null" ) } else { //反序列化TaskDescription val taskDesc = TaskDescription .decode (data .value ) logInfo("Got assigned task " + taskDesc .taskId ) //启动一个Task executor.launchTask( this, taskDesc) } def launchTask(context: ExecutorBackend, taskDescription : TaskDescription): Unit = { //TaskRunner extends Runnable //对每一个Task创建一个TaskRunner线程 val tr = new TaskRunner( context, taskDescription) //将TaskRunner加入内存缓存ConcurrentHashMap runningTasks. put( taskDescription.taskId , tr ) //Executors.newCachedThreadPool->threadPool,直接将TaskRunner放入线程池执行(排队) threadPool.execute(tr) }

10. Task

Task原理入口

class TaskRunner( execBackend: ExecutorBackend, private val taskDescription : TaskDescription) extends Runnable { //... ... override def run (): Unit = { threadId = Thread. currentThread.getId Thread. currentThread.setName (threadName ) val threadMXBean = ManagementFactory. getThreadMXBean val taskMemoryManager = new TaskMemoryManager(env.memoryManager, taskId) val deserializeStartTime = System. currentTimeMillis() val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported ) { threadMXBean.getCurrentThreadCpuTime } else 0L Thread. currentThread.setContextClassLoader (replClassLoader ) val ser = env.closureSerializer.newInstance() logInfo(s"Running $ taskName (TID $ taskId)" ) execBackend. statusUpdate(taskId, TaskState. RUNNING, EMPTY_BYTE_BUFFER) var taskStart: Long = 0 var taskStartCpu: Long = 0 startGCTime = computeTotalGcTime() try { // Must be set before updateDependencies() is called, in case fetching dependencies // requires access to properties contained within (e.g. for access control). // 对序列化的Task数据进行反序列化 Executor. taskDeserializationProps.set (taskDescription .properties) // 通过网络拷贝需要的资源:文件,Jars等 updateDependencies(taskDescription .addedFiles, taskDescription.addedJars) // 通过正式的反序列化操作,将整个Task的数据集反序列化 task = ser. deserialize[Task[Any]]( taskDescription.serializedTask, Thread.currentThread.getContextClassLoader) task. localProperties = taskDescription.properties task.setTaskMemoryManager(taskMemoryManager) // If this task has been killed before we deserialized it, let's quit now. Otherwise, // continue executing the task. val killReason = reasonIfKilled if (killReason .isDefined ) { // Throw an exception rather than returning, because returning within a try{} block // causes a NonLocalReturnControl exception to be thrown. The NonLocalReturnControl // exception will be caught by the catch block, leading to an incorrect ExceptionFailure // for the task. throw new TaskKilledException (killReason .get ) } // The purpose of updating the epoch here is to invalidate executor map output status cache // in case FetchFailures have occurred. In local mode `env.mapOutputTracker` will be // MapOutputTrackerMaster and its cache invalidation is not based on epoch numbers so // we don't need to make any special calls here. if (!isLocal ) { logDebug("Task " + taskId + "'s epoch is " + task.epoch) env.mapOutputTracker.asInstanceOf[MapOutputTrackerWorker].updateEpoch(task.epoch) } // Run the actual task and measure its runtime. taskStart = System.currentTimeMillis () taskStartCpu = if (threadMXBean .isCurrentThreadCpuTimeSupported ) { threadMXBean.getCurrentThreadCpuTime } else 0L var threwException = true // KEY!!!最关键的部分,使用Task的run // value对ShuffleMapTask来说,其实就是MapStatus,封装了ShuffleMapTask的计算数据,输出的位置 // 若后面还是一个ShuffleMapTask,就会去联系MapOutputTracker,获取上一个ShuffleMapTask的输出位置,然后通过网络拉取数据 // ResultTask也是一样的 val value = try { val res = task.run( taskAttemptId = taskId, attemptNumber = taskDescription.attemptNumber, metricsSystem = env.metricsSystem) threwException = false res } finally { val releasedLocks = env.blockManager.releaseAllLocksForTask(taskId) val freedMemory = taskMemoryManager .cleanUpAllAllocatedMemory () if (freedMemory > 0 && !threwException) { val errMsg = s"Managed memory leak detected; size = $ freedMemory bytes, TID = $taskId" if (conf.getBoolean("spark.unsafe.exceptionOnMemoryLeak" , false)) { throw new SparkException( errMsg) } else { logWarning(errMsg) } } if (releasedLocks .nonEmpty && !threwException) { val errMsg = s "${ releasedLocks.size } block locks were not released by TID = $taskId:\n " + releasedLocks.mkString ("[" , ", " , "]" ) if (conf.getBoolean("spark.storage.exceptionOnPinLeak" , false)) { throw new SparkException( errMsg) } else { logInfo(errMsg) } } } task.context.fetchFailed.foreach { fetchFailure => // uh -oh. it appears the user code has caught the fetch-failure without throwing any // other exceptions. Its *possible* this is what the user meant to do (though highly // unlikely). So we will log an error and keep going. logError(s "TID ${taskId} completed successfully though internally it encountered " + s "unrecoverable fetch failures! Most likely this means user code is incorrectly " + s "swallowing Spark's internal ${classOf [FetchFailedException]}", fetchFailure) } val taskFinish = System.currentTimeMillis() val taskFinishCpu = if (threadMXBean. isCurrentThreadCpuTimeSupported) { threadMXBean.getCurrentThreadCpuTime } else 0L // If the task has been killed, let's fail it. task.context.killTaskIfInterrupted() // 对MapStatus进行各种序列化和封装,通过网络发送给Driver val resultSer = env.serializer.newInstance() val beforeSerialization = System.currentTimeMillis() val valueBytes = resultSer .serialize (value) val afterSerialization = System.currentTimeMillis() // Deserialization happens in two parts: first, we deserialize a Task object, which // includes the Partition. Second, Task.run() deserializes the RDD and function to be run. // 计算出Task相关的统计信息Metrics(显示在SparkUI->4040端口):ExecutorDeserializeTime / ExecutorDeserializeCpuTime / ExecutorRunTime / ExecutorCpuTime / JvmGCTime / ResultSerializationTime task.metrics.setExecutorDeserializeTime( (taskStart - deserializeStartTime) + task.executorDeserializeTime) task.metrics.setExecutorDeserializeCpuTime( (taskStartCpu - deserializeStartCpuTime) + task.executorDeserializeCpuTime) // We need to subtract Task.run()'s deserialization time to avoid double-counting task.metrics.setExecutorRunTime((taskFinish - taskStart) - task.executorDeserializeTime) task.metrics.setExecutorCpuTime( (taskFinishCpu - taskStartCpu) - task.executorDeserializeCpuTime) task.metrics.setJvmGCTime(computeTotalGcTime() - startGCTime) task.metrics.setResultSerializationTime(afterSerialization - beforeSerialization) // Expose task metrics using the Dropwizard metrics system. // Update task metrics counters executorSource.METRIC_CPU_TIME.inc(task.metrics.executorCpuTime) executorSource.METRIC_RUN_TIME.inc(task.metrics.executorRunTime) executorSource.METRIC_JVM_GC_TIME.inc(task.metrics.jvmGCTime) executorSource.METRIC_DESERIALIZE_TIME.inc(task.metrics.executorDeserializeTime) executorSource.METRIC_DESERIALIZE_CPU_TIME.inc(task.metrics.executorDeserializeCpuTime) executorSource.METRIC_RESULT_SERIALIZE_TIME.inc(task.metrics.resultSerializationTime) executorSource.METRIC_SHUFFLE_FETCH_WAIT_TIME .inc(task.metrics.shuffleReadMetrics.fetchWaitTime) executorSource.METRIC_SHUFFLE_WRITE_TIME.inc(task.metrics.shuffleWriteMetrics.writeTime) executorSource.METRIC_SHUFFLE_TOTAL_BYTES_READ .inc(task.metrics.shuffleReadMetrics.totalBytesRead) executorSource.METRIC_SHUFFLE_REMOTE_BYTES_READ .inc(task.metrics.shuffleReadMetrics.remoteBytesRead) executorSource.METRIC_SHUFFLE_REMOTE_BYTES_READ_TO_DISK .inc(task.metrics.shuffleReadMetrics.remoteBytesReadToDisk) executorSource.METRIC_SHUFFLE_LOCAL_BYTES_READ .inc(task.metrics.shuffleReadMetrics.localBytesRead) executorSource.METRIC_SHUFFLE_RECORDS_READ .inc(task.metrics.shuffleReadMetrics.recordsRead) executorSource.METRIC_SHUFFLE_REMOTE_BLOCKS_FETCHED .inc(task.metrics.shuffleReadMetrics.remoteBlocksFetched) executorSource.METRIC_SHUFFLE_LOCAL_BLOCKS_FETCHED .inc(task.metrics.shuffleReadMetrics.localBlocksFetched) executorSource.METRIC_SHUFFLE_BYTES_WRITTEN .inc(task.metrics.shuffleWriteMetrics.bytesWritten) executorSource.METRIC_SHUFFLE_RECORDS_WRITTEN .inc(task.metrics.shuffleWriteMetrics.recordsWritten) executorSource.METRIC_INPUT_BYTES_READ .inc(task.metrics.inputMetrics.bytesRead) executorSource.METRIC_INPUT_RECORDS_READ .inc(task.metrics.inputMetrics.recordsRead) executorSource.METRIC_OUTPUT_BYTES_WRITTEN .inc(task.metrics.outputMetrics.bytesWritten) executorSource.METRIC_OUTPUT_RECORDS_WRITTEN .inc(task.metrics.inputMetrics.recordsRead) executorSource.METRIC_RESULT_SIZE.inc(task.metrics.resultSize) executorSource.METRIC_DISK_BYTES_SPILLED.inc(task.metrics.diskBytesSpilled) executorSource.METRIC_MEMORY_BYTES_SPILLED.inc(task.metrics.memoryBytesSpilled) // Note: accumulator updates must be collected after TaskMetrics is updated val accumUpdates = task.collectAccumulatorUpdates() // TODO: do not serialize value twice val directResult = new DirectTaskResult (valueBytes , accumUpdates ) val serializedDirectResult = ser .serialize (directResult) val resultSize = serializedDirectResult .limit () // directSend = sending directly back to the driver val serializedResult : ByteBuffer = { if (maxResultSize > 0 && resultSize > maxResultSize) { logWarning(s"Finished $taskName (TID $taskId ). Result is larger than maxResultSize " + s "(${ Utils.bytesToString( resultSize)} > ${Utils .bytesToString (maxResultSize)}), " + s "dropping it.") ser. serialize(new IndirectTaskResult[Any](TaskResultBlockId(taskId), resultSize)) } else if (resultSize > maxDirectResultSize) { val blockId = TaskResultBlockId (taskId) env.blockManager.putBytes( blockId, new ChunkedByteBuffer(serializedDirectResult.duplicate()), StorageLevel.MEMORY_AND_DISK_SER) logInfo( s "Finished $taskName (TID $ taskId). $ resultSize bytes result sent via BlockManager)") ser. serialize(new IndirectTaskResult[Any](blockId, resultSize)) } else { logInfo(s"Finished $taskName (TID $taskId ). $resultSize bytes result sent to driver") serializedDirectResult } } setTaskFinishedAndClearInterruptStatus() // KEY 调用Executor所在的CoarseGrainedExecutorBackend.statusUpdate发送MapStauts execBackend.statusUpdate (taskId, TaskState. FINISHED, serializedResult) } catch { case t : Throwable if hasFetchFailure && !Utils. isFatalError(t ) => val reason = task .context .fetchFailed .get .toTaskFailedReason if (!t .isInstanceOf [FetchFailedException ]) { // there was a fetch failure in the task, but some user code wrapped that exception // and threw something else. Regardless, we treat it as a fetch failure. val fetchFailedCls = classOf [FetchFailedException ].getName logWarning(s"TID ${taskId} encountered a ${fetchFailedCls } and " + s "failed, but the ${fetchFailedCls } was hidden by another " + s "exception. Spark is handling this like a fetch failure and ignoring the " + s "other exception: $t" ) } setTaskFinishedAndClearInterruptStatus() execBackend.statusUpdate (taskId, TaskState. FAILED, ser.serialize (reason)) case t : TaskKilledException => logInfo(s"Executor killed $taskName (TID $taskId ), reason: ${t .reason}") setTaskFinishedAndClearInterruptStatus() execBackend.statusUpdate (taskId, TaskState. KILLED, ser.serialize (TaskKilled(t.reason))) case _: InterruptedException | NonFatal (_) if task != null && task.reasonIfKilled.isDefined => val killReason = task.reasonIfKilled.getOrElse("unknown reason") logInfo(s"Executor interrupted and killed $taskName (TID $taskId ), reason: $killReason") setTaskFinishedAndClearInterruptStatus() execBackend.statusUpdate ( taskId, TaskState.KILLED, ser.serialize(TaskKilled(killReason))) case CausedBy( cDE: CommitDeniedException) => val reason = cDE .toTaskCommitDeniedReason setTaskFinishedAndClearInterruptStatus() execBackend.statusUpdate (taskId, TaskState. KILLED, ser.serialize (reason)) case t : Throwable => // Attempt to exit cleanly by informing the driver of our failure. // If anything goes wrong (or this was a fatal exception), we will delegate to // the default uncaught exception handler, which will terminate the Executor. logError(s"Exception in $taskName (TID $taskId )", t) // SPARK-20904: Do not report failure to driver if if happened during shut down. Because // libraries may set up shutdown hooks that race with running tasks during shutdown, // spurious failures may occur and can result in improper accounting in the driver (e.g. // the task failure would not be ignored if the shutdown happened because of premption, // instead of an app issue). if (!ShutdownHookManager. inShutdown()) { // Collect latest accumulator values to report back to the driver val accums : Seq[AccumulatorV2 [_, _]] = if (task != null) { task.metrics.setExecutorRunTime(System.currentTimeMillis() - taskStart) task.metrics.setJvmGCTime(computeTotalGcTime() - startGCTime) task.collectAccumulatorUpdates( taskFailed = true ) } else { Seq. empty } val accUpdates = accums .map (acc => acc .toInfo (Some(acc.value), None)) val serializedTaskEndReason = { try { ser. serialize(new ExceptionFailure(t, accUpdates).withAccums(accums)) } catch { case _: NotSerializableException => // t is not serializable so just send the stacktrace ser. serialize(new ExceptionFailure(t, accUpdates, false).withAccums(accums)) } } setTaskFinishedAndClearInterruptStatus() execBackend.statusUpdate (taskId, TaskState. FAILED, serializedTaskEndReason) } else { logInfo("Not reporting error to driver during JVM shutdown.") } // Don't forcibly exit unless the exception was inherently fatal, to avoid // stopping other tasks unnecessarily. if (!t .isInstanceOf [SparkOutOfMemoryError ] && Utils. isFatalError(t )) { uncaughtExceptionHandler.uncaughtException (Thread.currentThread (), t ) } } finally { runningTasks.remove (taskId ) } } /** * Download any missing dependencies if we receive a new set of files and JARs from the * SparkContext. Also adds any new JARs we fetched to the class loader. */ private def updateDependencies (newFiles : Map [String, Long], newJars: Map[String, Long]) { // 获取Hadoop配置文件 lazy val hadoopConf = SparkHadoopUtil. get. newConfiguration(conf) // Java同步块,解决共享资源如currentFiles访问的线程安全 // Task实际上是以Java线程的方式,在一个CoarseExecutorBackend线程内并发运行 synchronized { // Fetch missing dependencies // 遍历要拉取的文件 for (( name, timestamp) <- newFiles if currentFiles. getOrElse(name , -1L) < timestamp) { logInfo("Fetching " + name + " with timestamp " + timestamp ) // Fetch file with useCache mode, close cache for local mode. // 通过网络通信从远程拉取文件 Utils.fetchFile (name , new File(SparkFiles .getRootDirectory ()), conf, env.securityManager, hadoopConf, timestamp , useCache = !isLocal) currentFiles(name ) = timestamp } // 遍历要拉取的Jars for (( name, timestamp) <- newJars ) { val localName = new URI(name). getPath.split ("/"). last val currentTimeStamp = currentJars .get (name ) . orElse( currentJars.get (localName)) . getOrElse(-1L ) if (currentTimeStamp < timestamp ) { logInfo("Fetching " + name + " with timestamp " + timestamp ) // Fetch file with useCache mode, close cache for local mode. Utils.fetchFile(name, new File(SparkFiles.getRootDirectory ()), conf, env.securityManager, hadoopConf, timestamp , useCache = !isLocal) currentJars(name ) = timestamp // Add it to our class loader val url = new File(SparkFiles .getRootDirectory (), localName ).toURI .toURL if (!urlClassLoader.getURLs().contains (url)) { logInfo("Adding " + url + " to class loader") urlClassLoader.addURL(url) } } } } } //... ... } /** * Called by [[org.apache.spark.executor.Executor]] to run this task. * * @param taskAttemptId an identifier for this task attempt that is unique within a SparkContext. * @param attemptNumber how many times this task has been attempted (0 for the first attempt) * @return the result of the task along with updates of Accumulators. */ final def run ( taskAttemptId: Long, attemptNumber: Int, metricsSystem: MetricsSystem): T = { SparkEnv.get .blockManager .registerTask (taskAttemptId ) //创建执行上下文TaskContext context = new TaskContextImpl( stageId, stageAttemptId, // stageAttemptId and stageAttemptNumber are semantically equal partitionId, taskAttemptId, attemptNumber, taskMemoryManager, localProperties, metricsSystem, metrics) TaskContext.setTaskContext (context) taskThread = Thread. currentThread() if (_reasonIfKilled != null) { kill(interruptThread = false, _reasonIfKilled ) } new CallerContext( "TASK", SparkEnv.get .conf .get (APP_CALLER_CONTEXT ), appId, appAttemptId, jobId, Option(stageId), Option(stageAttemptId), Option( taskAttemptId), Option( attemptNumber)).setCurrentContext () try { // Key!!! 调用抽象方法runTask,子类实现ShuffleMapTask / ResultTask runTask(context) } catch { case e: Throwable => // Catch all errors; run task failure callbacks, and rethrow the exception. try { context.markTaskFailed(e) } catch { case t : Throwable => e. addSuppressed(t ) } context.markTaskCompleted(Some(e)) throw e } finally { try { // Call the task completion callbacks. If "markTaskCompleted" is called twice, the second // one is no-op . context.markTaskCompleted(None) } finally { try { Utils.tryLogNonFatalError { // Release memory used by this thread for unrolling blocks SparkEnv.get.blockManager. memoryStore.releaseUnrollMemoryForThisTask (MemoryMode. ON_HEAP) SparkEnv.get.blockManager. memoryStore.releaseUnrollMemoryForThisTask ( MemoryMode.OFF_HEAP) // Notify any tasks waiting for execution memory to be freed to wake up and try to // acquire memory again. This makes impossible the scenario where a task sleeps forever // because there are no other tasks left to notify it. Since this is safe to do but may // not be strictly necessary, we should revisit whether we can remove this in the // future. val memoryManager = SparkEnv. get. memoryManager memoryManager.synchronized { memoryManager.notifyAll() } } } finally { // Though we unset the ThreadLocal here, the context member variable itself is still // queried directly in the TaskRunner to check for FetchFailedExceptions. TaskContext.unset() } } } } /** * A ShuffleMapTask divides the elements of an RDD into multiple buckets (based on a partitioner * specified in the ShuffleDependency. Default is HashPartitioner). * * See [[org.apache.spark.scheduler.Task]] for more information. * * @param stageId id of the stage this task belongs to * @param stageAttemptId attempt id of the stage this task belongs to * @param taskBinary broadcast version of the RDD and the ShuffleDependency. Once deserialized, * the type should be (RDD[_], ShuffleDependency[_, _, _]). * @param partition partition of the RDD this task is associated with * @param locs preferred task execution locations for locality scheduling * @param localProperties copy of thread-local properties set by the user on the driver side. * @param serializedTaskMetrics a `TaskMetrics` that is created and serialized on the driver side * and sent to executor side. * * The parameters below are optional: * @param jobId id of the job this task belongs to * @param appId id of the app this task belongs to * @param appAttemptId attempt id of the app this task belongs to */ private[spark] class ShuffleMapTask( stageId: Int, stageAttemptId: Int, taskBinary: Broadcast[Array[Byte]], partition: Partition, @transient private var locs : Seq[TaskLocation], localProperties: Properties, serializedTaskMetrics: Array[Byte], jobId: Option[Int] = None, appId: Option[ String] = None , appAttemptId: Option[ String] = None ) extends Task[ MapStatus](stageId , stageAttemptId , partition .index , localProperties, serializedTaskMetrics, jobId, appId, appAttemptId) with Logging { //...... override def runTask (context : TaskContext): MapStatus = { // Deserialize the RDD using the broadcast variable. // 多个Task并行/并发运行在多个Executor中,可能都不在一个节点,但一个Stage的Task面对的RDD是一样的 // 因此,将通过Broadcast variable直接读出RDD自己处理的部分 val threadMXBean = ManagementFactory. getThreadMXBean val deserializeStartTime = System. currentTimeMillis() val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported ) { threadMXBean. getCurrentThreadCpuTime } else 0L val ser = SparkEnv.get .closureSerializer .newInstance () val ( rdd, dep) = ser. deserialize[(RDD[_], ShuffleDependency[_, _, _])]( ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader) _executorDeserializeTime = System. currentTimeMillis() - deserializeStartTime _executorDeserializeCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported ) { threadMXBean. getCurrentThreadCpuTime - deserializeStartCpuTime } else 0L var writer: ShuffleWriter[Any, Any] = null try { // 从ShufflerManager中获取ShuffleWriter val manager = SparkEnv.get .shuffleManager writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context) // Key 传入当前Task要处理的partition,核心逻辑就在RDD的iterator()中针对RDD的某partition执行自定义算子 // 返回的数据都是通过ShuffleWriter,经过HashPartitioner进行分区后写入对应的分区bucket writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]]) // 最后,返回MapStatus,其中封装了ShuffleMapTask计算后的数据,存储在BlockManager的相关信息 writer.stop(success = true). get } catch { case e: Exception => try { if (writer != null) { writer. stop( success = false) } } catch { case e : Exception => log.debug("Could not stop writer", e) } throw e } } //...... } // ResultTask override def runTask (context : TaskContext): U = { // Deserialize the RDD and the func using the broadcast variables. val threadMXBean = ManagementFactory. getThreadMXBean val deserializeStartTime = System. currentTimeMillis() val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported ) { threadMXBean. getCurrentThreadCpuTime } else 0L //基本的反序列化 val ser = SparkEnv.get .closureSerializer .newInstance () val ( rdd, func) = ser. deserialize[(RDD[T ], (TaskContext, Iterator[ T]) => U)]( ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader) _executorDeserializeTime = System. currentTimeMillis() - deserializeStartTime _executorDeserializeCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported ) { threadMXBean. getCurrentThreadCpuTime - deserializeStartCpuTime } else 0L //通过RDD的iterator执行自定义的算子 func(context, rdd.iterator(partition, context)) } //RDD /** * Internal method to this RDD; will read from cache if applicable, or otherwise compute it. * This should ''not'' be called by users directly, but is available for implementors of custom * subclasses of RDD. */ final def iterator (split : Partition, context: TaskContext): Iterator[ T] = { if (storageLevel != StorageLevel.NONE) { getOrCompute( split, context) } else { computeOrReadCheckpoint( split, context) } } /** * Compute an RDD partition or read it from a checkpoint if the RDD is checkpointing. */ private[spark] def computeOrReadCheckpoint (split : Partition, context: TaskContext): Iterator[ T] = { if (isCheckpointedAndMaterialized) { firstParent[ T]. iterator( split, context) } else { //又是抽象方法->MapPartitionsRDD compute(split, context) } } //MapPartitionsRDD /** * f可以理解为自定义算子函数,但是Spark内部的封装还实现了其他逻辑 * 调用到这里为止,其实就是针对RDD的partition执行计算操作,返回新的RDD的partition iterator */ override def compute (split : Partition, context: TaskContext): Iterator[ U] = f(context, split.index, firstParent[ T]. iterator( split, context)) // CoarseGrainedExecutorBackend overridedefstatusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) { val msg = StatusUpdate( executorId, taskId , state , data ) driver match { case Some(driverRef ) => driverRef .send (msg ) case None => logWarning(s "Drop $msg because has not yet connected to driver") } } // CoarseGrainedSchedulerBackend case StatusUpdate(executorId , taskId , state , data ) => scheduler.statusUpdate (taskId , state , data .value ) if (TaskState. isFinished(state )) { executorDataMap.get (executorId ) match { case Some (executorInfo ) => executorInfo.freeCores += scheduler.CPUS_PER_TASK makeOffers(executorId ) case None => // Ignoring the update since we don't know about the executor. logWarning(s"Ignored task status update ($taskId state $state) " + s "from unknown executor with ID $executorId" ) } } // TaskScheduler def statusUpdate( tid: Long, state: TaskState, serializedData: ByteBuffer) { var failedExecutor: Option[ String] = None var reason: Option[ExecutorLossReason] = None synchronized { try { taskIdToTaskSetManager.get (tid ) match { case Some (taskSet ) => //实际编写Spark应用时可能经常发现Task lost,因为各种各样的原因执行失败 if (state == TaskState.LOST) { // TaskState.LOST is only used by the deprecated Mesos fine-grained scheduling mode, // where each executor corresponds to a single task, so mark the executor as failed. val execId = taskIdToExecutorId .getOrElse (tid , throw new IllegalStateException( "taskIdToTaskSetManager.contains(tid) <=> taskIdToExecutorId.contains(tid)")) if (executorIdToRunningTaskIds .contains (execId )) { reason = Some( SlaveLost(s"Task $ tid was lost, so marking the executor as lost as well.")) removeExecutor(execId , reason .get ) failedExecutor = Some (execId ) } } // 如果Task完成,从内存缓存中移除 if (TaskState. isFinished(state )) { cleanupTaskState(tid ) taskSet. removeRunningTask(tid ) // 相应处理 if (state == TaskState.FINISHED) { taskResultGetter.enqueueSuccessfulTask (taskSet , tid , serializedData) } else if (Set(TaskState. FAILED, TaskState.KILLED, TaskState.LOST).contains( state)) { taskResultGetter.enqueueFailedTask (taskSet , tid , state , serializedData) } } case None => logError( ( "Ignoring update with state %s for TID %s because its task set is gone (this is " + "likely the result of receiving duplicate task finished status updates) or its " + "executor has been marked as failed." ) .format( state, tid)) } } catch { case e : Exception => logError("Exception in statusUpdate" , e ) } } // Update the DAGScheduler without holding a lock on this, since that can deadlock if (failedExecutor. isDefined) { assert(reason.isDefined) dagScheduler. executorLost(failedExecutor .get , reason .get ) backend.reviveOffers() } }

最新回复(0)