Demo entry 6754915

1

   

Submitted by 1 on Jul 23, 2018 at 10:08
Language: Scala. Code size: 6.2 kB.

private def submitMissingTasks(stage: Stage, jobId: Int) {  
   logDebug("submitMissingTasks(" + stage + ")")  
  
   // First figure out the indexes of partition ids to compute.  
   val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()  
  
   // Use the scheduling pool, job group, description, etc. from an ActiveJob associated  
   // with this Stage  
   val properties = jobIdToActiveJob(jobId).properties  
  
   runningStages += stage  
   // SparkListenerStageSubmitted should be posted before testing whether tasks are  
   // serializable. If tasks are not serializable, a SparkListenerStageCompleted event  
   // will be posted, which should always come after a corresponding SparkListenerStageSubmitted  
   // event.  
   stage match {  
     case s: ShuffleMapStage =>  
       outputCommitCoordinator.stageStart(stage = s.id, maxPartitionId = s.numPartitions - 1)  
     case s: ResultStage =>  
       outputCommitCoordinator.stageStart(  
         stage = s.id, maxPartitionId = s.rdd.partitions.length - 1)  
   }  
   val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try {  
     stage match {  
       case s: ShuffleMapStage =>  
         partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMap  
       case s: ResultStage =>  
         partitionsToCompute.map { id =>  
           val p = s.partitions(id)  
           (id, getPreferredLocs(stage.rdd, p))  
         }.toMap  
     }  
   } catch {  
     case NonFatal(e) =>  
       stage.makeNewStageAttempt(partitionsToCompute.size)  
       listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))  
       abortStage(stage, s"Task creation failed: $e\n${Utils.exceptionString(e)}", Some(e))  
       runningStages -= stage  
       return  
   }  
  
   stage.makeNewStageAttempt(partitionsToCompute.size, taskIdToLocations.values.toSeq)  
   listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))  
  
   // TODO: Maybe we can keep the taskBinary in Stage to avoid serializing it multiple times.  
   // Broadcasted binary for the task, used to dispatch tasks to executors. Note that we broadcast  
   // the serialized copy of the RDD and for each task we will deserialize it, which means each  
   // task gets a different copy of the RDD. This provides stronger isolation between tasks that  
   // might modify state of objects referenced in their closures. This is necessary in Hadoop  
   // where the JobConf/Configuration object is not thread-safe.  
   var taskBinary: Broadcast[Array[Byte]] = null  
   try {  
     // For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep).  
     // For ResultTask, serialize and broadcast (rdd, func).  
     val taskBinaryBytes: Array[Byte] = stage match {  
       case stage: ShuffleMapStage =>  
         JavaUtils.bufferToArray(  
           closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef))  
       case stage: ResultStage =>  
         JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, stage.func): AnyRef))  
     }  
  
     taskBinary = sc.broadcast(taskBinaryBytes)  
   } catch {  
     // In the case of a failure during serialization, abort the stage.  
     case e: NotSerializableException =>  
       abortStage(stage, "Task not serializable: " + e.toString, Some(e))  
       runningStages -= stage  
  
       // Abort execution  
       return  
     case NonFatal(e) =>  
       abortStage(stage, s"Task serialization failed: $e\n${Utils.exceptionString(e)}", Some(e))  
       runningStages -= stage  
       return  
   }  
  
   val tasks: Seq[Task[_]] = try {  
     val serializedTaskMetrics = closureSerializer.serialize(stage.latestInfo.taskMetrics).array()  
     stage match {  
       case stage: ShuffleMapStage =>  
         stage.pendingPartitions.clear()  
         partitionsToCompute.map { id =>  
           val locs = taskIdToLocations(id)  
           val part = stage.rdd.partitions(id)  
           stage.pendingPartitions += id  
           new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,  
             taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId),  
             Option(sc.applicationId), sc.applicationAttemptId)  
         }  
  
       case stage: ResultStage =>  
         partitionsToCompute.map { id =>  
           val p: Int = stage.partitions(id)  
           val part = stage.rdd.partitions(p)  
           val locs = taskIdToLocations(id)  
           new ResultTask(stage.id, stage.latestInfo.attemptId,  
             taskBinary, part, locs, id, properties, serializedTaskMetrics,  
             Option(jobId), Option(sc.applicationId), sc.applicationAttemptId)  
         }  
     }  
   } catch {  
     case NonFatal(e) =>  
       abortStage(stage, s"Task creation failed: $e\n${Utils.exceptionString(e)}", Some(e))  
       runningStages -= stage  
       return  
   }  
  
   if (tasks.size > 0) {  
     logInfo(s"Submitting ${tasks.size} missing tasks from $stage (${stage.rdd}) (first 15 " +  
       s"tasks are for partitions ${tasks.take(15).map(_.partitionId)})")  
     taskScheduler.submitTasks(new TaskSet(  
       tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))  
     stage.latestInfo.submissionTime = Some(clock.getTimeMillis())  
   } else {  
     // Because we posted SparkListenerStageSubmitted earlier, we should mark  
     // the stage as completed here in case there are no tasks to run  
     markStageAsFinished(stage, None)  
  
     val debugString = stage match {  
       case stage: ShuffleMapStage =>  
         s"Stage ${stage} is actually done; " +  
           s"(available: ${stage.isAvailable}," +  
           s"available outputs: ${stage.numAvailableOutputs}," +  
           s"partitions: ${stage.numPartitions})"  
       case stage : ResultStage =>  
         s"Stage ${stage} is actually done; (partitions: ${stage.numPartitions})"  
     }  
     logDebug(debugString)  
  
     submitWaitingChildStages(stage)  
   }  
 }  

This snippet took 0.01 seconds to highlight.

Back to the Entry List or Home.

Delete this entry (admin only).