DAGScheduler-提交Task

提交Task

当调度阶段提交运行后,在DAGScheduler的submitMissingTasks方法中,会根据调度阶段partition个数拆分对应个数任务,这些任务组成一个任务集提交到TaskScheduler进行处理

对于ResultStage生成ResultTask,对于ShuffleMapStage生成ShuffleMapTask。对于每一个任务集包含了对应调度阶段的所有任务,这些任务处理逻辑完全一样, 不同的是对应处理的数据,而这些数据是其对应的数据分片(Partition)。DAGScheduler的submitMissingTasks方法如下,此方法在Stage没有不可用的父Stage时,提交当前Stage还未完成的任务:

 private def submitMissingTasks(stage: Stage, jobId: Int) {
logDebug("submitMissingTasks(" + stage + ")")

// First figure out the indexes of partition ids to compute.
val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()

// Use the scheduling pool, job group, description, etc. from an ActiveJob associated
// with this Stage
val properties = jobIdToActiveJob(jobId).properties

runningStages += stage
// SparkListenerStageSubmitted should be posted before testing whether tasks are
// serializable. If tasks are not serializable, a SparkListenerStageCompleted event
// will be posted, which should always come after a corresponding SparkListenerStageSubmitted
// event.
stage match {
case s: ShuffleMapStage =>
outputCommitCoordinator.stageStart(stage = s.id, maxPartitionId = s.numPartitions - 1)
case s: ResultStage =>
outputCommitCoordinator.stageStart(
stage = s.id, maxPartitionId = s.rdd.partitions.length - 1)
}
val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try {
stage match {
case s: ShuffleMapStage =>
partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMap
case s: ResultStage =>
partitionsToCompute.map { id =>
val p = s.partitions(id)
(id, getPreferredLocs(stage.rdd, p))
}.toMap
}
} catch {
case NonFatal(e) =>
stage.makeNewStageAttempt(partitionsToCompute.size)
listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))
abortStage(stage, s"Task creation failed: $e\\n${Utils.exceptionString(e)}", Some(e))

runningStages -= stage
return
}

stage.makeNewStageAttempt(partitionsToCompute.size, taskIdToLocations.values.toSeq)
listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))

// TODO: Maybe we can keep the taskBinary in Stage to avoid serializing it multiple times.
// Broadcasted binary for the task, used to dispatch tasks to executors. Note that we broadcast
// the serialized copy of the RDD and for each task we will deserialize it, which means each
// task gets a different copy of the RDD. This provides stronger isolation between tasks that
// might modify state of objects referenced in their closures. This is necessary in Hadoop
// where the JobConf/Configuration object is not thread-safe.
var taskBinary: Broadcast[Array[Byte]] = null
try {
// For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep).
// For ResultTask, serialize and broadcast (rdd, func).
val taskBinaryBytes: Array[Byte] = stage match {
case stage: ShuffleMapStage =>
JavaUtils.bufferToArray(
closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef))
case stage: ResultStage =>
JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, stage.func): AnyRef))
}

taskBinary = sc.broadcast(taskBinaryBytes)
} catch {
// In the case of a failure during serialization, abort the stage.
case e: NotSerializableException =>
abortStage(stage, "Task not serializable: " + e.toString, Some(e))
runningStages -= stage

// Abort execution
return
case NonFatal(e) =>
abortStage(stage, s"Task serialization failed: $e\\n${Utils.exceptionString(e)}", Some(e))
runningStages -= stage
return
}

val tasks: Seq[Task[_]] = try {
val serializedTaskMetrics = closureSerializer.serialize(stage.latestInfo.taskMetrics).array()
stage match {
case stage: ShuffleMapStage =>
stage.pendingPartitions.clear()
partitionsToCompute.map { id =>
val locs = taskIdToLocations(id)
val part = stage.rdd.partitions(id)
stage.pendingPartitions += id
new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,

taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId),
Option(sc.applicationId), sc.applicationAttemptId)
}

case stage: ResultStage =>
partitionsToCompute.map { id =>
val p: Int = stage.partitions(id)
val part = stage.rdd.partitions(p)
val locs = taskIdToLocations(id)
new ResultTask(stage.id, stage.latestInfo.attemptId,
taskBinary, part, locs, id, properties, serializedTaskMetrics,
Option(jobId), Option(sc.applicationId), sc.applicationAttemptId)
}
}
} catch {
case NonFatal(e) =>
abortStage(stage, s"Task creation failed: $e\\n${Utils.exceptionString(e)}", Some(e))
runningStages -= stage
return
}

if (tasks.size > 0) {
logInfo(s"Submitting ${tasks.size} missing tasks from $stage (${stage.rdd}) (first 15 " +
s"tasks are for partitions ${tasks.take(15).map(_.partitionId)})")
taskScheduler.submitTasks(new TaskSet(
tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))
stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
} else {
// Because we posted SparkListenerStageSubmitted earlier, we should mark
// the stage as completed here in case there are no tasks to run
markStageAsFinished(stage, None)

val debugString = stage match {
case stage: ShuffleMapStage =>
s"Stage ${stage} is actually done; " +
s"(available: ${stage.isAvailable}," +
s"available outputs: ${stage.numAvailableOutputs}," +
s"partitions: ${stage.numPartitions})"
case stage : ResultStage =>
s"Stage ${stage} is actually done; (partitions: ${stage.numPartitions})"
}
logDebug(debugString)

submitWaitingChildStages(stage)
}
}

submitMissingTasks的执行过程总结如下:

  • 清空当前Stage的pendingPartitions。由于当前Stage的任务刚开始执行,所以需要清空便于记录需要计算的分区任务,
  • 调用Stage的findMissingPartitions方法,找出当前Stage的所有分区中还没有完成计算的分区的索引。
  • 获取ActiveJob的properties。properties包含了当前Job的调度、group、描述等属性信息。
  • 将当前Stage加入runningStages集合中,即当前Stage已经处于运行状态。
  • 调用outputCommitCoordinator的stageStart方法,启动对当前Stage的输出提交到HDFS的协调。
  • 调用DAGScheduler的getPreferredLocs方法,获取partitionsToCompute中的每一个分区的偏好位置。如果发生任何异常,则调用Stage的makeNewStageAttempt方法开始一次新的Stage执行尝试,然后向listenerBus投递SparkListenerStageSubmitted事件。
  • 调用Stage的makeNewStageAttempt方法开始Stage的执行尝试,并向listenerBus投递SparkListenerStageSubmitted事件。
  • 如果当前Stage是ShuffleMapStage,那么对Stage的rdd和ShuffleDependency进行序列化;如果当前Stage是ResultStage,那么对Stage的rdd和对RDD的分区进行计算的func进行序列化
  • 调用SparkContext的broadcast方法,广播上一步生成的序列化对象
  • 如果当前Stage是ShuffleMapStage,则为ShuffleMapStage的每一个分区创建一个ShuffleMapTask。如果当前Stage是ResultStage,则为ResultStage的每一个分区创建一个ResultTask。
  • 如果上一步创建了至少1个Task,那么将此Task处理的分区索引添加到Stage的pendingPartitions中,然后为这批Task创建TaskSet,并调用TaskScheduler的submitTasks方法提交此TaskSet
  • 如果没有创建任何Task,意味着当前Stage没有Task需要提交执行,因此调用DAGScheduler的markStageAsFinished方法,将当前Stage标记为完成。然后调用submitWaitingChildStages方法,提交当前Stage的子Stage。

markStageAsFinished方法,

 private def markStageAsFinished(stage: Stage, errorMessage: Option[String] = None): Unit = {
val serviceTime = stage.latestInfo.submissionTime match {
case Some(t) => "%.03f".format((clock.getTimeMillis() - t) / 1000.0)
case _ => "Unknown"
}
if (errorMessage.isEmpty) {
logInfo("%s (%s) finished in %s s".format(stage, stage.name, serviceTime))
stage.latestInfo.completionTime = Some(clock.getTimeMillis())

// Clear failure count for this stage, now that it's succeeded.
// We only limit consecutive failures of stage attempts,so that if a stage is
// re-used many times in a long-running job, unrelated failures don't eventually cause the
// stage to be aborted.
stage.clearFailures()
} else {
stage.latestInfo.stageFailed(errorMessage.get)
logInfo(s"$stage (${stage.name}) failed in $serviceTime s due to ${errorMessage.get}")
}

outputCommitCoordinator.stageEnd(stage.id)
listenerBus.post(SparkListenerStageCompleted(stage.latestInfo))
runningStages -= stage
}

markStageAsFinished的执行步骤如下:

1、计算Stage的执行时间

2、如果执行Stage的过程中并没有发生错误,那么设置Stage的latestInfo的完成时间等于系统当前时间,并调用Stage的clearFailures方法清空fetchFailedAttemptIds。

3、如果执行Stage的过程中发生了错误,那么调用StageInfo的stageFailed方法保存失败原因和Stage的完成时间。

4、调用outputCommitCoordinator的stageEnd方法,停止对当前Stage的输出提交到HDFS的协调。

5、向listenerBus投递SparkListenerStageCompleted消息

6、将当前Stage从正在运行的Stage中移除。

submitWaitingChildStages方法,提交Stage的所有处于等待的子Stage:

 private def submitWaitingChildStages(parent: Stage) {
logTrace(s"Checking if any dependencies of $parent are now runnable")
logTrace("running: " + runningStages)
logTrace("waiting: " + waitingStages)
logTrace("failed: " + failedStages)
val childStages = waitingStages.filter(_.parents.contains(parent)).toArray
waitingStages --= childStages
for (stage submitStage(stage)
}
}

Task获取最佳位置算法

根据当前Stage进行判断,如果是ShuffleMapStage,则从getPreferredLocs(stage.rdd, id)获取任务的本地性信息;如果是ResultStage,则从getPreferredLocs(stage.rdd, p)获取任务的本地性信息。

源码,在submitMissingTasks方法中:

 private def submitMissingTasks(stage: Stage, jobId: Int) {
………………
stage match {
case s: ShuffleMapStage =>
partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMap
case s: ResultStage =>
partitionsToCompute.map { id =>

val p = s.partitions(id)
(id, getPreferredLocs(stage.rdd, p))
}.toMap
}
………………
}

其中partitionsToCompute为需要计算的Partition的Id

 val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()

具体一个Partition中的数据本地性的算法实现在getPreferredLocs方法中

 private[spark] def getPreferredLocs(rdd: RDD[_], partition: Int): Seq[TaskLocation] = {
getPreferredLocsInternal(rdd, partition, new HashSet)
}

getPreferredLocsInternal是getPreferredLocs的递归实现:这个方法是线程安全的,只能被DAGScheduler通过线程安全方法getCacheLocs()使用。

getPreferredLocsInternal源码如下:

 private def getPreferredLocsInternal(
rdd: RDD[_],
partition: Int,
visited: HashSet[(RDD[_], Int)]): Seq[TaskLocation] = {
// If the partition has already been visited, no need to re-visit.
// This avoids exponential path exploration. SPARK-695
// 如果分区已被访问,则无须重新访问。这避免了路径探索
if (!visited.add((rdd, partition))) {
// Nil has already been returned for previously visited partitions.
// 已访问的分区返回零
return Nil
}
// If the partition is cached, return the cache locations
// 如果分区已被缓存,返回缓存的位置。
val cached = getCacheLocs(rdd)(partition)
if (cached.nonEmpty) {
return cached
}

// If the RDD has some placement preferences (as is the case for input RDDs), get those
// 如果RDD位置优先(输入RDDs的情况),就获取它
val rddPrefs = rdd.preferredLocations(rdd.partitions(partition)).toList
if (rddPrefs.nonEmpty) {
return rddPrefs.map(TaskLocation(_))
}

// If the RDD has narrow dependencies, pick the first partition of the first narrow dependency
// that has any placement preferences. Ideally we would choose based on transfer sizes,
// but this will do for now.
// 如果RDD是窄依赖,将选择第一个窄依赖的第一个分区作为位置首选项。
// 理想情况下,将基于传输大小选择
rdd.dependencies.foreach {
case n: NarrowDependency[_] =>
for (inPart val locs = getPreferredLocsInternal(n.rdd, inPart, visited)
if (locs != Nil) {
return locs
}
}

case _ =>
}

Nil
}

getPreferredLocsInternal代码中:

  • 首先判断visited中是否包含当前的RDD的partition。
  • 如果partition被缓存,则在getCacheLocs(rdd)(partition)传入rdd和partition,获取缓存的位置信息。如果能够获取到信息,就返回。

getCacheLocs源码:

 private[scheduler] def getCacheLocs(rdd: RDD[_]): IndexedSeq[Seq[TaskLocation]] = cacheLocs.synchronized {
// Note: this doesn't use `getOrElse()` because this method is called O(num tasks) times
// 注意:这个不用getOrElse(),因为方法被调用0(任务数)次

if (!cacheLocs.contains(rdd.id)) {
// Note: if the storage level is NONE, we don't need to get locations from block manager.
val locs: IndexedSeq[Seq[TaskLocation]] = if (rdd.getStorageLevel == StorageLevel.NONE) {
IndexedSeq.fill(rdd.partitions.length)(Nil)
} else {
val blockIds =
rdd.partitions.indices.map(index => RDDBlockId(rdd.id, index)).toArray[BlockId]
blockManagerMaster.getLocations(blockIds).map { bms =>
bms.map(bm => TaskLocation(bm.host, bm.executorId))
}
}
cacheLocs(rdd.id) = locs
}
cacheLocs(rdd.id)
}

getCacheLocs中的cacheLocs是一个HashMap,包含每个RDD的分区上的缓存位置信息。map的key值是RDD的ID,value是由分区编号索引组成的数组。每个数组的值是RDD分区缓存位置的集合。

 private val cacheLocs = new HashMap[Int, IndexedSeq[Seq[TaskLocation]]]

getPreferredLocsInternal方法在具体算法实现的时候,首先查询DAGScheduler的内存数据结构中是否存在当前Partition的数据本地性信息,如果有,则直接返回;如果没有,首先调用rdd.getPreferredLocations。

如果自定义RDD,那么一定要写getPreferredLocations,这是RDD的五大特性之一。为保证Task计算的数据本地性,最关键的方式是必须实现RDD的getPreferredLocations。数据不动代码动。

 final def preferredLocations(split: Partition): Seq[String] = {
checkpointRDD.map(_.getPreferredLocations(split)).getOrElse {
getPreferredLocations(split)
}
}
 protected def getPreferredLocations(split: Partition): Seq[String] = Nil

数据的本地性在运行前就已经完成,因为RDD构建的时候已经有元数据的信息。

DAGScheduler计算数据本地性的时候巧妙地借助了RDD自身的getPreferredLocations中数据,最大化地优化效率,因为getPreferredLocations中表明每个Partition的数据本地性,虽然当前Partition可能被persist或者checkpoint,但是persist或者checkpoint默认情况下肯定和getPreferredLocations中的Partition的数据本地性一致的。


分享到:


相關文章: