一分钟吃透 Spark 之 TaskScheduler
smallstone
8年前
<p>DagScheduler 和 TaskScheduler 的任务交接</p> <p>spark 调度器分为两个部分, 一个是 DagScheduler, 一个是 TaskScheduler, DagScheduler 主要是用来把一个 Job 根据宽依赖划分为多个Stage(阶段),</p> <p>对于划分出来的每个 stage 都抽象为一个 TaskSet任务集 交给 TaskScheduler 来进行进一步的调度运行,</p> <p><img src="https://simg.open-open.com/show/3d1f72c97c22641c91892ed49f4b25b7.jpg"></p> <p>我们来看一张图, 来理清里面的概念, 我们用户编程使用的 RDD, 每个 RDD都有一个分区数, 这个分区数目创建 RDD 的时候有一个初始值,运行过程中,根据配置的 parallelism 参数 和 shuffle 过程中显示指定的分区数目 来调整个数</p> <p>我们可以看到, 一个 task 对应一个 stage 里面一个分区数据的处理任务, task 又分为 ShuffleMapTask 和 ResultTask , 区分任务是中间阶段的任务 还是最后一个阶段的任务。</p> <p>而一个 stage 里面 所有分区的任务集合 就被包装为一个 TaskSet 交给了 TaskScheduler,</p> <h3>TaskScheduler 调度方式</h3> <p>TaskScheduler 会为每个 TaskSet 创建一个 TaskScheduler, 作为一个调度单位, 放在任务池子里面,</p> <p>调度池分为两种, 一种使用 FIFO调度方式 , 还有一种使用 Fair调度方式</p> <p>FIFO调度方式</p> <p style="text-align: center;"><img src="https://simg.open-open.com/show/a79fab8fcbe23ed404eafcc9387218f8.png"></p> <p>这种方式 rootPool 下面直接就是 TaskSetManager , 没有子 Pool,</p> <p>根据 FIFOSchedulingAlgorithm 算法排序, 这种方式排序方式很简单, 直接就是先进先出队列的排序方式对多个 TaskSetManager 进行排队,</p> <p>Fair调度方式</p> <p style="text-align: center;"><img src="https://simg.open-open.com/show/e1df8021c3b6d4a73e127f2c8189ecde.png"></p> <p>这种方式 rootPool 是根 pool, 下一级是 用户定义的 Pool, 这一层是为了给不同的用户定义不同的优先级用的,</p> <p>用户 Pool 下面才是 TaskSetManager</p> <p>FairSchedulingAlgorithm 算法,排序方式是由两个因子控制,</p> <ul> <li> <p>weight: 控制资源池相对其他资源池,可以分配到资源的比例。默认所有资源池的weight都是1。如果你将某个资源池的weight设为2,那么该资源池中的资源将是其他池子的2倍。如果将weight设得很高,如1000,可以实现资源池之间的调度优先级 – 也就是说,weight=1000的资源池总能立即启动其对应的作业。</p> </li> <li> <p>minShare:除了整体weight之外,每个资源池还能指定一个最小资源分配值(CPU个数),管理员可能会需要这个设置。公平调度器总是会尝试优先满足所有活跃(active)资源池的最小资源分配值,然后再根据各个池子的weight来分配剩下的资源。因此,minShare属性能够确保每个资源池都能至少获得一定量的集群资源。minShare的默认值是0。</p> </li> </ul> <p>排序也是递归的, 因为 rootPool 下面有多个 用户自己的 Pool, 要先根据 FairSchedulingAlgorithm 算法对多个 用户的Pool 排序, 然后对一个 Pool 中的多个 TaskSetManager 也使用 FairSchedulingAlgorithm 算法排序。</p> <h3>任务实际运行的触发方式</h3> <p>触发方式有两种,</p> <ul> <li> <p>一堆任务 从 DagScheduler 到 TaskSetManager 提交过来了, 这个时候可能有 大量 executor 的cpu 都闲着呢, 所以要 唤醒他们过来领任务去执行, 这种我们叫做唤醒方式,</p> </li> <li> <p>一个正在执行的任务跑完了, executor 报告任务已经执行完的时候, 这个时候 这个 cpu 又闲着了,看看有没有 任务去领一下, 这种我们叫做干完了手里的活接着干方式</p> </li> <li> <p>有可能增加了新的 Executor ,这个 executor 来注册了, 相当于产生了新的劳动力, 肯定也要去领活干, 我们叫新劳动力领活干方式,</p> </li> </ul> <p>对于 唤醒方式 就是 TaskScheduler 把 TaskScheduler, 作为一个调度单位, 放在任务池子里面后, 然后 调用 reviveOffers 来唤醒, 里面是调用 makeOffers() 方法,</p> <p>对于 干完了手里的活接着干方式, 就是 接受到了 StatusUpdate 消息, 会去调用一下 makeOffers(executorId)</p> <p>对于 新劳动力领活干方式 就是接受到 RegisterExecutor 消息后调用一下 makeOffers()</p> <p>我们来看下 makeOffers() 方法,</p> <pre> val workOffers = IndexedSeq( new WorkerOffer(executorId, executorData.executorHost, executorData.freeCores)) launchTasks(scheduler.resourceOffers(workOffers))</pre> <p>把执行单位包装为 一个 WorkerOffer ,然后调用 taskSchedule 的 resourceOffers方法, 这个方法的注释如下</p> <p>Called by cluster manager to offer resources on slaves. We respond by asking our active task</p> <p>sets for tasks in order of priority. We fill each node with tasks in a round-robin manner so</p> <p>that tasks are balanced across the cluster.</p> <p>这个是 resourceOffers 函数的注释, 就是从把任务池子 中的任务排队, 然后取出最前面的任务, 来和 执行单位相结合, 这里需要注意的是, 会把任务尽可能均匀的分配到每个node 上, 一个任务和一个执行单位的结合包装为一个 TaskDescription, 然后把任务发送到执行单位上去执行,</p> <p style="text-align: center;"><img src="https://simg.open-open.com/show/c0a7658ac19c7e5123e7f6fca2a4fcca.jpg"></p> <p>上图中我们可以看到, 只要有空闲的 executor 就会提供资源给 task, 首先要把 workerOffer shuffle 打乱一下, 免得过分蹂躏个别的 executor。</p> <p> </p> <p>来自:http://mp.weixin.qq.com/s/UaIxXjTFdSauknbj_Ndj1w</p> <p> </p>