mapreduce源码分析作业分配过程

jopen 11年前

前面提到作业初始化将创建一系列的TaskInProgress缓存到内存,等待各个 tasktracker结点向jobtracker发送心跳请求任务,由jobtracker端的调度器分配任务,默认 JobQueueTaskScheduler,具体实现对应assignTasks方法

assignTasks核心算法 :

1、对个某个tasktracker,计算可用的slot数目,调度器会尽量将任务均匀分布各个结点上,负载均衡.

具体做法是:

分别针对reducemap计算:

首先算出针对该结点的的一个因子factor:请求作业的总任务数 该作业已完成的任务数/集群总的任务数  (扫描jobqueue里各作业)

再算可用的slot: factor*该结点总的slot数 该结点正在运行的任务数

2、先后调用jobinprogess的obtainNewLocalMapTask、obtainNewNonLocalMapTask、 obtainNewReduceTask方法,返回Task类任务,再以LaunchAction的形式封装发回到tasktracker去执行,以 obtainNewLocalMapTask为例,最终调用的是同一个类中findNewMapTask方法,findNewMapTask会返回离 tasktracker最近的task(依次从本结点\本机架\本数据中心去选择,从未运行任务缓存去取,由作业初始化 Map<Node, List<TaskInProgress>> createcache创建赋值)


部分核心代码:


assignTasks方法

public synchronized List<Task> assignTasks(TaskTrackerStatus taskTracker)

      throws IOException {

    ClusterStatus clusterStatus = taskTrackerManager.getClusterStatus();

    final int numTaskTrackers = clusterStatus.getTaskTrackers();

    final int clusterMapCapacity = clusterStatus.getMaxMapTasks();

    final int clusterReduceCapacity = clusterStatus.getMaxReduceTasks();

    Collection<JobInProgress> jobQueue =

      jobQueueJobInProgressListener.getJobQueue();

    //

    // Get map + reduce counts for the current tracker.

    //

    final int trackerMapCapacity = taskTracker.getMaxMapTasks();

    final int trackerReduceCapacity = taskTracker.getMaxReduceTasks();

    final int trackerRunningMaps = taskTracker.countMapTasks();

final int trackerRunningReduces = taskTracker.countReduceTasks();

//此处taskTracker为心跳发送过来的 TaskTrackerStatus封装了结点最大map,reduce数以及正在运行的map,reduce

    // Assigned tasks

    List<Task> assignedTasks = new ArrayList<Task>();

    //

    // Compute (running + pending) map and reduce task numbers across pool

    //

    int remainingReduceLoad = 0;

    int remainingMapLoad = 0;

    synchronized (jobQueue) {

      for (JobInProgress job : jobQueue) {

        if (job.getStatus().getRunState() == JobStatus.RUNNING) {

          remainingMapLoad += (job.desiredMaps() - job.finishedMaps());

          if (job.scheduleReduces()) {

            remainingReduceLoad += 

              (job.desiredReduces() - job.finishedReduces());

          }

        }

      }

    }

    // Compute the 'load factor' for maps and reduces

    double mapLoadFactor = 0.0;

    if (clusterMapCapacity > 0) {

      mapLoadFactor = (double)remainingMapLoad / clusterMapCapacity;

    }

    double reduceLoadFactor = 0.0;

    if (clusterReduceCapacity > 0) {

      reduceLoadFactor = (double)remainingReduceLoad / clusterReduceCapacity;

    }

        

    //

    // In the below steps, we allocate first map tasks (if appropriate),

    // and then reduce tasks if appropriate.  We go through all jobs

    // in order of job arrival; jobs only get serviced if their 

    // predecessors are serviced, too.

    //

    //

    // We assign tasks to the current taskTracker if the given machine 

    // has a workload that's less than the maximum load of that kind of

    // task.

    // However, if the cluster is close to getting loaded i.e. we don't

    // have enough _padding_ for speculative executions etc., we only 

    // schedule the "highest priority" task i.e. the task from the job 

    // with the highest priority.

    //

    

    final int trackerCurrentMapCapacity = 

      Math.min((int)Math.ceil(mapLoadFactor * trackerMapCapacity), 

                              trackerMapCapacity);

    int availableMapSlots = trackerCurrentMapCapacity - trackerRunningMaps;

    boolean exceededMapPadding = false;

    if (availableMapSlots > 0) {

      exceededMapPadding = 

        exceededPadding(true, clusterStatus, trackerMapCapacity);

    }

    

    int numLocalMaps = 0;

    int numNonLocalMaps = 0;

    scheduleMaps:

    for (int i=0; i < availableMapSlots; ++i) {

      synchronized (jobQueue) {

        for (JobInProgress job : jobQueue) {

          if (job.getStatus().getRunState() != JobStatus.RUNNING) {

            continue;

          }

          Task t = null;

          

          // Try to schedule a node-local or rack-local Map task

          t = 

            job.obtainNewLocalMapTask(taskTracker, numTaskTrackers,

                                      taskTrackerManager.getNumberOfUniqueHosts());

          if (t != null) {

            assignedTasks.add(t);

            ++numLocalMaps;

            

            // Don't assign map tasks to the hilt!

            // Leave some free slots in the cluster for future task-failures,

            // speculative tasks etc. beyond the highest priority job

            if (exceededMapPadding) {

              break scheduleMaps;

            }

           

            // Try all jobs again for the next Map task 

            break;

          }

          

          // Try to schedule a node-local or rack-local Map task

          t = 

            job.obtainNewNonLocalMapTask(taskTracker, numTaskTrackers,

                                   taskTrackerManager.getNumberOfUniqueHosts());

          

          if (t != null) {

            assignedTasks.add(t);

            ++numNonLocalMaps;

            

            // We assign at most 1 off-switch or speculative task

            // This is to prevent TaskTrackers from stealing local-tasks

            // from other TaskTrackers.

            break scheduleMaps;

          }

        }

      }

    }

    int assignedMaps = assignedTasks.size();

    //

    // Same thing, but for reduce tasks

    // However we _never_ assign more than 1 reduce task per heartbeat

    //

    final int trackerCurrentReduceCapacity = 

      Math.min((int)Math.ceil(reduceLoadFactor * trackerReduceCapacity), 

               trackerReduceCapacity);

    final int availableReduceSlots = 

      Math.min((trackerCurrentReduceCapacity - trackerRunningReduces), 1);

    boolean exceededReducePadding = false;

    if (availableReduceSlots > 0) {

      exceededReducePadding = exceededPadding(false, clusterStatus, 

                                              trackerReduceCapacity);

      synchronized (jobQueue) {

        for (JobInProgress job : jobQueue) {

          if (job.getStatus().getRunState() != JobStatus.RUNNING ||

              job.numReduceTasks == 0) {

            continue;

          }

          Task t = 

            job.obtainNewReduceTask(taskTracker, numTaskTrackers, 

                                    taskTrackerManager.getNumberOfUniqueHosts()

                                    );

          if (t != null) {

            assignedTasks.add(t);

            break;

          }

          

          // Don't assign reduce tasks to the hilt!

          // Leave some free slots in the cluster for future task-failures,

          // speculative tasks etc. beyond the highest priority job

          if (exceededReducePadding) {

            break;

          }

        }

      }

    }

    。。。。

    }

    return assignedTasks;

  }

 

obtainNewLocalMapTask方法

  public synchronized Task obtainNewLocalMapTask(TaskTrackerStatus tts,

                                                     int clusterSize, 

                                                     int numUniqueHosts)

  throws IOException {

    if (!tasksInited.get()) {

      LOG.info("Cannot create task split for " + profile.getJobID());

      return null;

    }

    int target = findNewMapTask(tts, clusterSize, numUniqueHosts, maxLevel

                                status.mapProgress());

    if (target == -1) {

      return null;

    }

Task result = maps[target].getTaskToRun(tts.getTrackerName());

//maps缓存存着TaskInprogress,getTaskToRun返回Task,或者MapTaskReduceTask

    if (result != null) {

      addRunningTaskToTIP(maps[target], result.getTaskID(), tts, true);

    }

    return result;

  }

findNewMapTask方法:

返回最近的maps[ ]任务列表对应的下标

private synchronized int findNewMapTask(final TaskTrackerStatus tts, 
                                          final int clusterSize,
                                          final int numUniqueHosts,
                                          final int maxCacheLevel,
                                          final double avgProgress) {
    if (numMapTasks == 0) {
      LOG.info("No maps to schedule for " + profile.getJobID());
      return -1;
    }


    String taskTracker = tts.getTrackerName();
    TaskInProgress tip = null;
    
    //
    // Update the last-known clusterSize
    //
    this.clusterSize = clusterSize;


    if (!shouldRunOnTaskTracker(taskTracker)) {
      return -1;
    }


    // Check to ensure this TaskTracker has enough resources to 
    // run tasks from this job
    long outSize = resourceEstimator.getEstimatedMapOutputSize();
    long availSpace = tts.getResourceStatus().getAvailableSpace();
    if(availSpace < outSize) {
      LOG.warn("No room for map task. Node " + tts.getHost() + 
               " has " + availSpace + 
               " bytes free; but we expect map to take " + outSize);


      return -1; //see if a different TIP might work better. 
    }
    
    
    // For scheduling a map task, we have two caches and a list (optional)
    //  I)   one for non-running task
    //  II)  one for running task (this is for handling speculation)
    //  III) a list of TIPs that have empty locations (e.g., dummy splits),
    //       the list is empty if all TIPs have associated locations


    // First a look up is done on the non-running cache and on a miss, a look 
    // up is done on the running cache. The order for lookup within the cache:
    //   1. from local node to root [bottom up]
    //   2. breadth wise for all the parent nodes at max level


    // We fall to linear scan of the list (III above) if we have misses in the 
    // above caches


    Node node = jobtracker.getNode(tts.getHost());
    
    //
    // I) Non-running TIP :
    // 


    // 1. check from local node to the root [bottom up cache lookup]
    //    i.e if the cache is available and the host has been resolved
    //    (node!=null)
    if (node != null) {
      Node key = node;
      int level = 0;
      // maxCacheLevel might be greater than this.maxLevel if findNewMapTask is
      // called to schedule any task (local, rack-local, off-switch or speculative)
      // tasks or it might be NON_LOCAL_CACHE_LEVEL (i.e. -1) if findNewMapTask is
      //  (i.e. -1) if findNewMapTask is to only schedule off-switch/speculative
      // tasks
      int maxLevelToSchedule = Math.min(maxCacheLevel, maxLevel);
      for (level = 0;level < maxLevelToSchedule; ++level) {
        List <TaskInProgress> cacheForLevel = nonRunningMapCache.get(key);
        if (cacheForLevel != null) {
          tip = findTaskFromList(cacheForLevel, tts, 
              numUniqueHosts,level == 0);
          if (tip != null) {
            // Add to running cache
            scheduleMap(tip);


            // remove the cache if its empty
            if (cacheForLevel.size() == 0) {
              nonRunningMapCache.remove(key);
            }


            return tip.getIdWithinJob();
          }
        }
        key = key.getParent();
      }
      
      // Check if we need to only schedule a local task (node-local/rack-local)
      if (level == maxCacheLevel) {
        return -1;
      }
    }


    //2. Search breadth-wise across parents at max level for non-running 
    //   TIP if
    //     - cache exists and there is a cache miss 
    //     - node information for the tracker is missing (tracker's topology
    //       info not obtained yet)


    // collection of node at max level in the cache structure
    Collection<Node> nodesAtMaxLevel = jobtracker.getNodesAtMaxLevel();


    // get the node parent at max level
    Node nodeParentAtMaxLevel = 
      (node == null) ? null : JobTracker.getParentNode(node, maxLevel - 1);
    
    for (Node parent : nodesAtMaxLevel) {


      // skip the parent that has already been scanned
      if (parent == nodeParentAtMaxLevel) {
        continue;
      }