大数据知识体系
首页
数据结构与算法
  • JVM
  • Java
  • Scala
  • Python
设计模式
  • MySQL
  • Redis
  • HDFS
  • HBase
  • ClickHouse
  • ElasticSearch
  • Iceberg
  • Hudi
  • Spark
  • Flink
  • Hive
  • Yarn
  • Zookeeper
  • Maven
  • Git
  • 数据仓库
  • 用户画像
  • 指标体系
数据治理
关于
首页
数据结构与算法
  • JVM
  • Java
  • Scala
  • Python
设计模式
  • MySQL
  • Redis
  • HDFS
  • HBase
  • ClickHouse
  • ElasticSearch
  • Iceberg
  • Hudi
  • Spark
  • Flink
  • Hive
  • Yarn
  • Zookeeper
  • Maven
  • Git
  • 数据仓库
  • 用户画像
  • 指标体系
数据治理
关于
  • Spark

    • Spark 基础
    • Spark Core
    • Spark 存储体系源码分析
    • Spark RPC 通信源码分析
    • Spark RDD 源码分析
    • Spark Task 源码分析
    • Spark Stage 源码分析
    • Spark DAGScheduler 源码分析
    • Spark TaskScheduler 源码分析
      • 1. TaskScheduler 在调度中的位置
      • 2. Schedulable 详解
        • 2.1 Pool 源码
        • 2.2 TaskSetManager 源码
      • 3. SchedulerBackend 详解
        • 3.1 SchedulerBackend 作用
        • 3.2 SchedulerBackend 实现
      • 4. TaskScheduler 详解
    • Spark Shuffle
    • Spark AppendOnlyMap
  • Flink

    • Flink 概述
    • Flink 架构
    • Flink 快速入门
    • Flink 安装
    • Flink API
    • Flink 状态管理
    • Flink 架构介绍
    • Flink Window
    • Flink Time WaterMark
    • Flink Table Api
    • Flink Sql
    • Flink CEP
    • Flink 面试题
  • Hive

    • Hive 概述
    • Hive 安装及配置参数
    • Hive 客户端的使用
    • Hive 数据类型
    • Hive DDL
    • Hive 表类型
    • Hive DML
    • Hive DQL
    • Hive 内置函数
    • Hive UDF
    • Hive 视图
    • Hive 索引
    • Hive 事务
    • Hive 文件存储
    • Hive HQL 执行原理
    • Hive 数据倾斜
    • Hive 执行计划
    • Hive 调优
    • Hive 面试题
  • 数据处理
  • Spark
Will
2023-11-07
目录

Spark TaskScheduler 源码分析

# 1. TaskScheduler 在调度中的位置

DAGScheduler 实际上是对 Stage 的调度,它将 Stage 中的 TaskSet 提交给 TaskScheduler。所以也可以说 DAGScheduler 是对 TaskSet 的调度。

TaskScheduler 实现了对 Task 的调度,但是它直接调度的对象并不是 Task,而是 Schedulable(可调度实体的抽象),进而通过 Schedulable 的两个实现类 Pool 和 TaskSetManager 对 Task 进行调度(资源分配、状态跟踪、推断执行、本地性、失败重试等)。

TaskScheduler 是比 DAGScheduler 更 low level 的调度。

# 2. Schedulable 详解

Schedulable 是可调度实体的抽象,它有两个实现类:Pool 和 TaskSetManager。

Pool 中维护了可调度的子 Pool 以及 TaskSetManager 的集合。

TaskSetManager 针对在 TaskSchedulerImpl 单个 TaskSet 中的 task 进行调度,同时跟踪每个 task 状态。此类的主要接口是resourceOffer,它问询 TaskSet 是否要在一个节点上运行 task,以及根据 task 的运行结果(成功或失败)进行相应的处理。

Schedulable 源码如下:

private[spark] trait Schedulable {
  // 父 Pool
  var parent: Pool
  // 可调度对象的列表,Schedulable 只有 Pool 和 TaskSetManager 两个子类
  def schedulableQueue: ConcurrentLinkedQueue[Schedulable]
  // 调度模式,可选值有 FAIR、FIFO、NONE
  def schedulingMode: SchedulingMode
  // 公平调度算法的权重
  def weight: Int
  // 公平调度算法的参考值
  def minShare: Int
  // 正在运行的 task 数量
  def runningTasks: Int
  // 调度的优先级
  def priority: Int
  // 所属 StageId
  def stageId: Int
  // PoolName
  def name: String

  // 添加 Schedulable
  def addSchedulable(schedulable: Schedulable): Unit
  // 删除 Schedulable
  def removeSchedulable(schedulable: Schedulable): Unit
  // 根据名称获取 Schedulable
  def getSchedulableByName(name: String): Schedulable
  // 某个 Executor 丢失后的处理
  def executorLost(executorId: String, host: String, reason: ExecutorLossReason): Unit
  // 检查当前 Pool 中是否有需要推断执行的任务
  def checkSpeculatableTasks(minTimeToSpeculation: Int): Boolean
  // 对于当前 Pool 中所有的 TaskSetManager 按照调度算法进行排序
  def getSortedTaskSetQueue: ArrayBuffer[TaskSetManager]
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33

Pool 源码中有一个很重要的属性:

val schedulableQueue = new ConcurrentLinkedQueue[Schedulable]
1

schedulableQueue 是一个 Schedulable 的队列,Schedulable 只有两个实现类 Pool 和 TaskSetManager。所以可以总结出调度池的抽象调度示意图:

这仅仅只是抽象示意图,真正的实现根据调度算法的不同也不一样,如在 FIFO 调度算法中,调度池的抽象不是树状结构,而是一个队列(源码在 FIFOSchedulableBuilder 中):

而当调度算法是 FAIR 时(源码在 FAIRSchedulableBuilder),调度池的抽象就和上面树状图很接近了。

# 2.1 Pool 源码

Pool 源码 实际上也比较简单。

private[spark] class Pool(
    val poolName: String,
    val schedulingMode: SchedulingMode, // 调度模式,可选值有 FAIR、FIFO、NONE
    initMinShare: Int,
    initWeight: Int)
  extends Schedulable with Logging {

  // 可调度对象的列表,Schedulable 只有 Pool 和 TaskSetManager 两个子类
  val schedulableQueue = new ConcurrentLinkedQueue[Schedulable]
  val schedulableNameToSchedulable = new ConcurrentHashMap[String, Schedulable]
  val weight = initWeight

  val minShare = initMinShare
  // 正在运行的 task 数
  var runningTasks = 0
  // 调度优先级
  val priority = 0
  // 所属 StageId
  var stageId = -1
  val name = poolName
  // 父 Pool
  var parent: Pool = null

  // 确认调度算法,调度算法有公平调度和先进先出调度
  // 对应的实现类分别是:FairSchedulingAlgorithm 和 FIFOSchedulingAlgorithm
  private val taskSetSchedulingAlgorithm: SchedulingAlgorithm = {
    schedulingMode match {
      case SchedulingMode.FAIR =>
        new FairSchedulingAlgorithm()
      case SchedulingMode.FIFO =>
        new FIFOSchedulingAlgorithm()
      case _ =>
        val msg = s"Unsupported scheduling mode: $schedulingMode. Use FAIR or FIFO instead."
        throw new IllegalArgumentException(msg)
    }
  }

  // 添加 Schedulable
  override def addSchedulable(schedulable: Schedulable): Unit = {
    require(schedulable != null)
    schedulableQueue.add(schedulable)
    schedulableNameToSchedulable.put(schedulable.name, schedulable)
    schedulable.parent = this
  }

  // 移除 Schedulable
  override def removeSchedulable(schedulable: Schedulable): Unit = {
    schedulableQueue.remove(schedulable)
    schedulableNameToSchedulable.remove(schedulable.name)
  }

  // 根据名称查找 Schedulable
  override def getSchedulableByName(schedulableName: String): Schedulable = {
    if (schedulableNameToSchedulable.containsKey(schedulableName)) {
      return schedulableNameToSchedulable.get(schedulableName)
    }

    // 如果在当前 Pool 中没找到,则挨个从子 Schedulable 递归查找
    for (schedulable <- schedulableQueue.asScala) {
      val sched = schedulable.getSchedulableByName(schedulableName)
      if (sched != null) {
        return sched
      }
    }
    null
  }

  // 当一个 Executor 丢失后,递归调用各个子 Schedulable 中的 executorLost 方法
  // 实际上最终都执行的是 TaskSetManager executorLost 方法
  override def executorLost(executorId: String, host: String, reason: ExecutorLossReason): Unit = {
    schedulableQueue.asScala.foreach(_.executorLost(executorId, host, reason))
  }

  // 检查当前 Pool 中是否有需要推断执行的任务,递归调用各个子 Schedulable 的 checkSpeculatableTasks
  // 实际上最终都执行的是 TaskSetManager checkSpeculatableTasks 方法
  override def checkSpeculatableTasks(minTimeToSpeculation: Int): Boolean = {
    var shouldRevive = false
    for (schedulable <- schedulableQueue.asScala) {
      shouldRevive |= schedulable.checkSpeculatableTasks(minTimeToSpeculation)
    }
    shouldRevive
  }

  // 对于当前 Pool 中所有的 TaskSetManager 按照调度算法进行排序
  // 实际上最终都执行的是 TaskSetManager getSortedTaskSetQueue 方法
  override def getSortedTaskSetQueue: ArrayBuffer[TaskSetManager] = {
    val sortedTaskSetQueue = new ArrayBuffer[TaskSetManager]
    val sortedSchedulableQueue =
      schedulableQueue.asScala.toSeq.sortWith(taskSetSchedulingAlgorithm.comparator)
    for (schedulable <- sortedSchedulableQueue) {
      sortedTaskSetQueue ++= schedulable.getSortedTaskSetQueue
    }
    sortedTaskSetQueue
  }

  def increaseRunningTasks(taskNum: Int): Unit = {
    runningTasks += taskNum
    if (parent != null) {
      parent.increaseRunningTasks(taskNum)
    }
  }

  def decreaseRunningTasks(taskNum: Int): Unit = {
    runningTasks -= taskNum
    if (parent != null) {
      parent.decreaseRunningTasks(taskNum)
    }
  }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109

# 2.2 TaskSetManager 源码

TaskSetManager,从名字就可以看出来是 TaskSet 的管理器,那么首先看看 TaskSet 是什么。

TaskSet 的源码如下,比较简单。它是一批 task 的集合,代表着某个 Stage 中一组 partition 的计算任务,这些 task 集合会被一起提交给 TaskScheduler。

private[spark] class TaskSet(
    val tasks: Array[Task[_]], // Task 数组
    val stageId: Int, // 所属的 StageId
    val stageAttemptId: Int, // 所属的 Stage 尝试的 id
    val priority: Int, // 优先级
    val properties: Properties) {
  val id: String = stageId + "." + stageAttemptId

  override def toString: String = "TaskSet " + id
}
1
2
3
4
5
6
7
8
9
10

TaskSetManager 源码的内容比较多,先整体看一下它的属性,再看一些核心的方法。

private[spark] class TaskSetManager(
    sched: TaskSchedulerImpl, // TaskSetManager 所属的 TaskSchedulerImpl
    val taskSet: TaskSet, // 当前 TaskSetManager 管理的 TaskSet
    val maxTaskFailures: Int, // Task 可允许的最大失败次数
    blacklistTracker: Option[BlacklistTracker] = None,
    clock: Clock = new SystemClock()) extends Schedulable with Logging {

  // SparkConf
  private val conf = sched.sc.conf

  // Task 运行需要的 jar 和 file
  private val addedJars = HashMap[String, Long](sched.sc.addedJars.toSeq: _*)
  private val addedFiles = HashMap[String, Long](sched.sc.addedFiles.toSeq: _*)

  // 结果的总字节大小限制
  val maxResultSize = conf.get(config.MAX_RESULT_SIZE)

  // SparkEnv
  val env = SparkEnv.get
  // 序列化器
  val ser = env.closureSerializer.newInstance()

  val tasks = taskSet.tasks
  private[scheduler] val partitionToIndex = tasks.zipWithIndex.map { case (t, idx) => t.partitionId -> idx }.toMap
  val numTasks = tasks.length
  val copiesRunning = new Array[Int](numTasks)

  // 是否启用任务的推测执行,默认不启用
  val speculationEnabled = conf.get(SPECULATION_ENABLED)
  // 机型推测前任务需要完成多少,默认0.75
  val speculationQuantile = conf.get(SPECULATION_QUANTILE)
  // 任务延迟的比例,比如当75%的task都完成,那么取他们的中位数跟还未执行完的任务作对比。如果超过1.5倍,则开启推测执行。
  val speculationMultiplier = conf.get(SPECULATION_MULTIPLIER)
  val minFinishedForSpeculation = math.max((speculationQuantile * numTasks).floor.toInt, 1)
  val speculationTaskDurationThresOpt = conf.get(SPECULATION_TASK_DURATION_THRESHOLD)
  val speculationTasksLessEqToSlots = numTasks <= Math.max(conf.get(EXECUTOR_CORES) / sched.CPUS_PER_TASK, 1)

  // 记录每个 Task 是否执行成功的数组
  val successful = new Array[Boolean](numTasks)
  // 对每个 Task 执行失败次数机型记录的数组
  private val numFailures = new Array[Int](numTasks)

  // 当 Task 被其它 Task 尝试 kill 时,将被 kill 的 Task 记录到此
  private val killedByOtherAttempt = new HashSet[Long]

  // 对每个 Task 的执行失败次数进行记录的数组
  val taskAttempts = Array.fill[List[TaskInfo]](numTasks)(Nil)
  private[scheduler] var tasksSuccessful = 0

  // 用于公平调度算法的权重
  val weight = 1
  // 用于公平调度算法的参考值
  val minShare = 0
  // 调度的优先级
  var priority = taskSet.priority
  // 调度池所属的 StageId
  var stageId = taskSet.stageId
  val name = "TaskSet_" + taskSet.id
  var parent: Pool = null
  // 所有 Task 执行总结果的大小
  private var totalResultSize = 0L
  // 计算过的 Task 数量
  private var calculatedTasks = 0

  // 正在运行的 Task 集合
  private[scheduler] val runningTasksSet = new HashSet[Long]

  override def runningTasks: Int = runningTasksSet.size

  // 是否进入僵尸状态
  private[scheduler] var isZombie = false

  private[scheduler] def isBarrier = taskSet.tasks.nonEmpty && taskSet.tasks(0).isBarrier

  // 存储按照本地性首选安排的处于等待状态的任务
  private[scheduler] val pendingTasks = new PendingTasksByLocality()

  // 可推测执行的 Task 集合
  private[scheduler] val speculatableTasks = new HashSet[Int]

  // 按本地性首选存储的可推测 Task 的集合
  private[scheduler] val pendingSpeculatableTasks = new PendingTasksByLocality()

  // Task 身份标识与 TaskInfo
  private[scheduler] val taskInfos = new HashMap[Long, TaskInfo]

  val successfulTaskDurations = new MedianHeap()

  // 异常打印到日志的时间间隔
  val EXCEPTION_PRINT_INTERVAL = conf.getLong("spark.logging.exceptionPrintInterval", 10000)

  private val recentExceptions = HashMap[String, (Int, Long)]()

  val epoch = sched.mapOutputTracker.getEpoch

  // Task 本地性级别的数组
  private[scheduler] var myLocalityLevels = computeValidLocalityLevels()

  private[scheduler] var localityWaits = myLocalityLevels.map(getLocalityWait)

  private var currentLocalityIndex = 0 // Index of our current locality level in validLocalityLevels
  private var lastLaunchTime = clock.getTimeMillis()  // Time we last launched a task at this level
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103

# 3. SchedulerBackend 详解

# 3.1 SchedulerBackend 作用

在开始看 TaskScheduler 源码之前,需要先看看 SchedulerBackend,因为 TaskScheduler 的实现离不开它。

TaskScheduler 是在 SparkContext 中创建并初始化的,在 SparkContext 中调用 TaskSchedulerImpl(TaskScheduler 的唯一实现类)的初始化方法时需要传入 SchedulerBackend 作为参数。

SchedulerBackend 是 TaskScheduler 的调度后端接口。TaskScheduler 给 Task 分配资源实际是通过 SchedulerBackend 完成的,SchedulerBackend 给 Task 分配完资源后将与分配给 Task 的 Executor 通信,并要求 Executor 运行 Task。

注意

说实话,光看前面这些官话,我自己也看的很懵逼,还是没懂 SchedulerBackend 到底是干嘛的。

接着往下看~

注意前面说的加粗那句话“与分配给 Task 的 Executor 通信”,DAGScheduler 以及 TaskSchedule 都是在 Driver 段运行的,但是最终的 Task 却是在 Executor 运行的。Driver 需要为 Task 分配资源,还要了解 Task 的运行状态(LAUNCHING、RUNNING、FINISHED、FAILED、KILLED、LOST),所以 Driver 必须要和 Executor 进行 RPC 通信。SchedulerBackend 就是为了和 Executor 通信服务的,与 SchedulerBackend 对应的还有一个组件叫 ExecutorBackend。前者是在 Driver 端,后者是在 Executor 端。

SchedulerBackend 和 ExecutorBackend 都是接口。前者的实现较为丰富,而 ExecutorBackend 的唯一实现类是 CoarseGrainedExecutorBackend,Executor 的真正线程就是在 CoarseGrainedExecutorBackend 中执行的。

# 3.2 SchedulerBackend 实现

SchedulerBackend 只是定义了一些接口,它的实现类如下:

  1. LocalSchedulerBackend:local 模式中的调度后端接口。在 local 模式下,Driver、Executor、LocalSchedulerBackend 都运行在同一个 JVM 进程中。
  2. CoarseGrainedSchedulerBackend:等待 CoarseGrainedExecutorBackend 进行连接的 SchedulerBackend 实现。由 CoarseGrainedSchedulerBackend 建立的 CoarseGrainedExecutorBackend 进程将会一直存在,真正的 Executor 线程将在 CoarseGrainedExecutorBackend 进程中执行。
    • StandaloneSchedulerBackend 是部署在 Standalone 模式下的 SchedulerBackend 实现。
    • MesosCoarseGrainedSchedulerBackend:是部署在 Mesos 模式下的实现。
    • YarnSchedulerBackend:是部署在 Yarn 模式下的实现,并且有 YarnClientSchedulerBackend 和 YarnClusterSchedulerBackend 两个子类。

SchedulerBackend 中定义了所有调度后端的接口:

private[spark] trait SchedulerBackend {
  // 与当前 Job 相关联的应用程序的身份标识
  private val appId = "spark-application-" + System.currentTimeMillis

  // 启动 SchedulerBackend
  def start(): Unit

  // 停止 SchedulerBackend
  def stop(): Unit

  // 给调度池中的所有 Task 分配资源
  def reviveOffers(): Unit

  // 获取 Job 的默认并行度
  def defaultParallelism(): Int

  // 杀死指定 Task
  def killTask(
      taskId: Long,
      executorId: String,
      interruptThread: Boolean,
      reason: String): Unit =
    throw new UnsupportedOperationException

  // SchedulerBackend 是否准备就绪
  def isReady(): Boolean = true

  // 获取 appId
  def applicationId(): String = appId

  // 当使用 cluster 模式运行并且集群管理器支持多次尝试时,此方法可以获取应用程序尝试的标识。
  // 当应用程序在 client 模式运行时,将不支持多次尝试。
  def applicationAttemptId(): Option[String] = None

  // 获取 Driver 日志的 URL,这些 URL 会在 Spark UI 的 Executors 标签页中展示
  def getDriverLogUrls: Option[Map[String, String]] = None

  // 获取 Driver 的属性。当指定自定义日志 URL 模式时,这些属性用于替换日志 URL。
  def getDriverAttributes: Option[Map[String, String]] = None

  // 获取当前可并发启动的最大 Task 数
  def maxNumConcurrentTasks(): Int
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43

# 4. TaskScheduler 详解

TaskScheduler 是相对 DAGScheduler 更 low level 的调度器的规范,目前由 TaskSchedulerImpl 独家实现。每个 TaskScheduler 为单个 SparkContext 服务。TaskSchedule 从 DAGScheduler 获取提交给它们的 TaskSet,并负责将 Task 发送到集群运行、在出现故障时重试、并通过推断执行以及本地性等措施提高运行效率。

上次更新: 2023/11/09, 09:36:58

← Spark DAGScheduler 源码分析 Spark Shuffle→

Theme by Vdoing | Copyright © 2022-2023 Will 蜀ICP备2022002285号-1
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式