mapreduce源码分析作业分配过程
前面提到作业初始化将创建一系列的TaskInProgress缓存到内存,等待各个 tasktracker结点向jobtracker发送心跳请求任务,由jobtracker端的调度器分配任务,默认 JobQueueTaskScheduler,具体实现对应assignTasks方法
assignTasks核心算法 :
1、对个某个tasktracker,计算可用的slot数目,调度器会尽量将任务均匀分布各个结点上,负载均衡.
具体做法是:
分别针对reduce和map计算:
首先算出针对该结点的的一个因子factor:请求作业的总任务数 - 该作业已完成的任务数/集群总的任务数 (扫描jobqueue里各作业)
再算可用的slot数: factor*该结点总的slot数 - 该结点正在运行的任务数
2、先后调用jobinprogess的obtainNewLocalMapTask、obtainNewNonLocalMapTask、 obtainNewReduceTask方法,返回Task类任务,再以LaunchAction的形式封装发回到tasktracker去执行,以 obtainNewLocalMapTask为例,最终调用的是同一个类中findNewMapTask方法,findNewMapTask会返回离 tasktracker最近的task(依次从本结点\本机架\本数据中心去选择,从未运行任务缓存去取,由作业初始化 Map<Node, List<TaskInProgress>> createcache创建赋值)
部分核心代码:
assignTasks方法
public synchronized List<Task> assignTasks(TaskTrackerStatus taskTracker)
throws IOException {
ClusterStatus clusterStatus = taskTrackerManager.getClusterStatus();
final int numTaskTrackers = clusterStatus.getTaskTrackers();
final int clusterMapCapacity = clusterStatus.getMaxMapTasks();
final int clusterReduceCapacity = clusterStatus.getMaxReduceTasks();
Collection<JobInProgress> jobQueue =
jobQueueJobInProgressListener.getJobQueue();
//
// Get map + reduce counts for the current tracker.
//
final int trackerMapCapacity = taskTracker.getMaxMapTasks();
final int trackerReduceCapacity = taskTracker.getMaxReduceTasks();
final int trackerRunningMaps = taskTracker.countMapTasks();
final int trackerRunningReduces = taskTracker.countReduceTasks();
//此处taskTracker为心跳发送过来的 TaskTrackerStatus封装了结点最大map,reduce数以及正在运行的map,reduce数
// Assigned tasks
List<Task> assignedTasks = new ArrayList<Task>();
//
// Compute (running + pending) map and reduce task numbers across pool
//
int remainingReduceLoad = 0;
int remainingMapLoad = 0;
synchronized (jobQueue) {
for (JobInProgress job : jobQueue) {
if (job.getStatus().getRunState() == JobStatus.RUNNING) {
remainingMapLoad += (job.desiredMaps() - job.finishedMaps());
if (job.scheduleReduces()) {
remainingReduceLoad +=
(job.desiredReduces() - job.finishedReduces());
}
}
}
}
// Compute the 'load factor' for maps and reduces
double mapLoadFactor = 0.0;
if (clusterMapCapacity > 0) {
mapLoadFactor = (double)remainingMapLoad / clusterMapCapacity;
}
double reduceLoadFactor = 0.0;
if (clusterReduceCapacity > 0) {
reduceLoadFactor = (double)remainingReduceLoad / clusterReduceCapacity;
}
//
// In the below steps, we allocate first map tasks (if appropriate),
// and then reduce tasks if appropriate. We go through all jobs
// in order of job arrival; jobs only get serviced if their
// predecessors are serviced, too.
//
//
// We assign tasks to the current taskTracker if the given machine
// has a workload that's less than the maximum load of that kind of
// task.
// However, if the cluster is close to getting loaded i.e. we don't
// have enough _padding_ for speculative executions etc., we only
// schedule the "highest priority" task i.e. the task from the job
// with the highest priority.
//
final int trackerCurrentMapCapacity =
Math.min((int)Math.ceil(mapLoadFactor * trackerMapCapacity),
trackerMapCapacity);
int availableMapSlots = trackerCurrentMapCapacity - trackerRunningMaps;
boolean exceededMapPadding = false;
if (availableMapSlots > 0) {
exceededMapPadding =
exceededPadding(true, clusterStatus, trackerMapCapacity);
}
int numLocalMaps = 0;
int numNonLocalMaps = 0;
scheduleMaps:
for (int i=0; i < availableMapSlots; ++i) {
synchronized (jobQueue) {
for (JobInProgress job : jobQueue) {
if (job.getStatus().getRunState() != JobStatus.RUNNING) {
continue;
}
Task t = null;
// Try to schedule a node-local or rack-local Map task
t =
job.obtainNewLocalMapTask(taskTracker, numTaskTrackers,
taskTrackerManager.getNumberOfUniqueHosts());
if (t != null) {
assignedTasks.add(t);
++numLocalMaps;
// Don't assign map tasks to the hilt!
// Leave some free slots in the cluster for future task-failures,
// speculative tasks etc. beyond the highest priority job
if (exceededMapPadding) {
break scheduleMaps;
}
// Try all jobs again for the next Map task
break;
}
// Try to schedule a node-local or rack-local Map task
t =
job.obtainNewNonLocalMapTask(taskTracker, numTaskTrackers,
taskTrackerManager.getNumberOfUniqueHosts());
if (t != null) {
assignedTasks.add(t);
++numNonLocalMaps;
// We assign at most 1 off-switch or speculative task
// This is to prevent TaskTrackers from stealing local-tasks
// from other TaskTrackers.
break scheduleMaps;
}
}
}
}
int assignedMaps = assignedTasks.size();
//
// Same thing, but for reduce tasks
// However we _never_ assign more than 1 reduce task per heartbeat
//
final int trackerCurrentReduceCapacity =
Math.min((int)Math.ceil(reduceLoadFactor * trackerReduceCapacity),
trackerReduceCapacity);
final int availableReduceSlots =
Math.min((trackerCurrentReduceCapacity - trackerRunningReduces), 1);
boolean exceededReducePadding = false;
if (availableReduceSlots > 0) {
exceededReducePadding = exceededPadding(false, clusterStatus,
trackerReduceCapacity);
synchronized (jobQueue) {
for (JobInProgress job : jobQueue) {
if (job.getStatus().getRunState() != JobStatus.RUNNING ||
job.numReduceTasks == 0) {
continue;
}
Task t =
job.obtainNewReduceTask(taskTracker, numTaskTrackers,
taskTrackerManager.getNumberOfUniqueHosts()
);
if (t != null) {
assignedTasks.add(t);
break;
}
// Don't assign reduce tasks to the hilt!
// Leave some free slots in the cluster for future task-failures,
// speculative tasks etc. beyond the highest priority job
if (exceededReducePadding) {
break;
}
}
}
}
。。。。
}
return assignedTasks;
}
obtainNewLocalMapTask方法
public synchronized Task obtainNewLocalMapTask(TaskTrackerStatus tts,
int clusterSize,
int numUniqueHosts)
throws IOException {
if (!tasksInited.get()) {
LOG.info("Cannot create task split for " + profile.getJobID());
return null;
}
int target = findNewMapTask(tts, clusterSize, numUniqueHosts, maxLevel,
status.mapProgress());
if (target == -1) {
return null;
}
Task result = maps[target].getTaskToRun(tts.getTrackerName());
//maps缓存存着TaskInprogress,getTaskToRun返回Task,或者MapTask、ReduceTask
if (result != null) {
addRunningTaskToTIP(maps[target], result.getTaskID(), tts, true);
}
return result;
}
findNewMapTask方法:
返回最近的maps[ ]任务列表对应的下标
private synchronized int findNewMapTask(final TaskTrackerStatus tts,final int clusterSize,
final int numUniqueHosts,
final int maxCacheLevel,
final double avgProgress) {
if (numMapTasks == 0) {
LOG.info("No maps to schedule for " + profile.getJobID());
return -1;
}
String taskTracker = tts.getTrackerName();
TaskInProgress tip = null;
//
// Update the last-known clusterSize
//
this.clusterSize = clusterSize;
if (!shouldRunOnTaskTracker(taskTracker)) {
return -1;
}
// Check to ensure this TaskTracker has enough resources to
// run tasks from this job
long outSize = resourceEstimator.getEstimatedMapOutputSize();
long availSpace = tts.getResourceStatus().getAvailableSpace();
if(availSpace < outSize) {
LOG.warn("No room for map task. Node " + tts.getHost() +
" has " + availSpace +
" bytes free; but we expect map to take " + outSize);
return -1; //see if a different TIP might work better.
}
// For scheduling a map task, we have two caches and a list (optional)
// I) one for non-running task
// II) one for running task (this is for handling speculation)
// III) a list of TIPs that have empty locations (e.g., dummy splits),
// the list is empty if all TIPs have associated locations
// First a look up is done on the non-running cache and on a miss, a look
// up is done on the running cache. The order for lookup within the cache:
// 1. from local node to root [bottom up]
// 2. breadth wise for all the parent nodes at max level
// We fall to linear scan of the list (III above) if we have misses in the
// above caches
Node node = jobtracker.getNode(tts.getHost());
//
// I) Non-running TIP :
//
// 1. check from local node to the root [bottom up cache lookup]
// i.e if the cache is available and the host has been resolved
// (node!=null)
if (node != null) {
Node key = node;
int level = 0;
// maxCacheLevel might be greater than this.maxLevel if findNewMapTask is
// called to schedule any task (local, rack-local, off-switch or speculative)
// tasks or it might be NON_LOCAL_CACHE_LEVEL (i.e. -1) if findNewMapTask is
// (i.e. -1) if findNewMapTask is to only schedule off-switch/speculative
// tasks
int maxLevelToSchedule = Math.min(maxCacheLevel, maxLevel);
for (level = 0;level < maxLevelToSchedule; ++level) {
List <TaskInProgress> cacheForLevel = nonRunningMapCache.get(key);
if (cacheForLevel != null) {
tip = findTaskFromList(cacheForLevel, tts,
numUniqueHosts,level == 0);
if (tip != null) {
// Add to running cache
scheduleMap(tip);
// remove the cache if its empty
if (cacheForLevel.size() == 0) {
nonRunningMapCache.remove(key);
}
return tip.getIdWithinJob();
}
}
key = key.getParent();
}
// Check if we need to only schedule a local task (node-local/rack-local)
if (level == maxCacheLevel) {
return -1;
}
}
//2. Search breadth-wise across parents at max level for non-running
// TIP if
// - cache exists and there is a cache miss
// - node information for the tracker is missing (tracker's topology
// info not obtained yet)
// collection of node at max level in the cache structure
Collection<Node> nodesAtMaxLevel = jobtracker.getNodesAtMaxLevel();
// get the node parent at max level
Node nodeParentAtMaxLevel =
(node == null) ? null : JobTracker.getParentNode(node, maxLevel - 1);
for (Node parent : nodesAtMaxLevel) {
// skip the parent that has already been scanned
if (parent == nodeParentAtMaxLevel) {
continue;
}