YARN批处理方式kill Applications解决方案

jopen 9年前

前言

在使用hadoop集群的时候,所有的任务都是最终以Application的形式跑在集群中,不管你是自己写的MR程序亦或是你写的hive sql转化成的mr任务,最终都是以Application应用的身份在跑.这些Application跑完之后,这些信息在jobHistory中就可以看了,可以说hadoop在这方面做得真的非常完整.但是完善归完善.但是jobHistory可以说是一种"事后分析",而有的时候我们反而是需要去控制目前所在的Applications.举个例子,比如某用户误启动了几十个应用,这些应用又是非常消耗资源的,一般这时候集群的管理者就会想尽办法去kill这些应用,否则资源都被占了,别人怎么使用.但是比较不幸的一点,目前Yarn在kill应用的操作上还没有提供批处理的方式,只有最简单的kill+appID的形式,都是单一的操作,所以会造成最后"应用是kill掉了,但是花了半个小时时间."这样的结果.所以本文就来分析一下批处理kill应用的解决方案,从应用场景到方案设计再到最终实现,依次进行描述.


应用场景

1.误操作启动大量应用.就是文章开头所描述到的场景.这个在小集群,少用户的情况下不容易发生,但是一旦集群具备了一定规模,达到多组用户的时候,这种现象发生的概率就会比较高了.用户多了,遇到这种问题,必须得解决吧,而且必须得高效解决.

2.第二种情况发生在Application recovery阶段,比如说当你的集群中开启了yarn的recovery功能之后,此时做ResourceManager重启的时候,他会从rmStateStore中读取上次保存下来的Application信息,然后进行全部恢复。这个时候如果你不想要恢复上次由于RM服务停止造成中断的应用,那么你也需要有个方法去批量kill正在此时恢复的应用。这种场景就不是被动发生的,在某些场景下他就会出现,发生。

所以总结上面提到的2个case,1个是偏外部因素,1个是偏内部因素,但是都说明了我们需要1个方法能够去批量执行kill应用的操作。


现有的Kill Application命令

现有存在于YARN中的kill应用的操作命令很简单,如下

bin/yarn application -kill <applicationId>
没用过这个命令的同学可以在自己的测试集群中输入这个命令玩一下。同时yarn application下还有一些其他的命令参数,输入下面这个命令可以打出帮助信息

bin/yarn application


批处理Kill Applications CLI设计

CLI是CommandLine的简称,就是命令行,上一段中的-kill <applicationId>在Yarn是是被定义为1个ApplicationCLI,是针对Application的命令操作.表现出来的形式是以YarnCli的形式被调用,所以会在前面看到yarn这个前缀.所以我们的1个目标就是创造出一些能进行批处理的执行kill应用的操作命令.但是这里会冒出另外1个问题,如果进行批量执行应用操作,那么这些待kill的应用一定具有某种类似或相同的属性,否则我们就无法划分,归类了.首先需要找出在Yarn中的Application到底有哪些属性,我截取出了部分代码:

@Private  public class Application {    private static final Log LOG = LogFactory.getLog(Application.class);        private AtomicInteger taskCounter = new AtomicInteger(0);      private AtomicInteger numAttempts = new AtomicInteger(0);    final private String user;    final private String queue;    final private ApplicationId applicationId;  ...
另外一个project中的定义:

public interface Application extends EventHandler<ApplicationEvent> {      String getUser();      Map<ContainerId, Container> getContainers();      ApplicationId getAppId();      ApplicationState getApplicationState();    }
所以综合以上这2个类的定义,可以跳出3个比较常见的属性作为CLI参数,

第一.user,用户名.

第二.queue,所属队列.

第三.applicationState.应用状态,比如我想kill正在running状态的应用或是处于Accepted但是还没开始running的应用,这也是1个不错的指标维度.

我们最终想要达到的效果是这样的:

-killByAppStates <States>       The states of application that will be killed.  -killByUser <User name>         Kill running-state applications of specific user.  -killOfQueue <Queue Name>       Kill the applications of specific queue.
这里需要提出一点建议,对于第二个命令-killByUser,因为涉及到直接关联用户的操作命令,如果操作者是普通用户,不一定会有权限去kill别人的应用,所以这个命令更应该放在RmAdminCLI的命令中而不是YarnCLI.这一点在后面的具体代码实现中会加以区分.


ResourceManager的CLI相关服务

在打算具体实现新的命令接口,需要首先对rm对应的内部对应服务有1个了解.ResourceManager在这里的设计可以说是比较巧妙的,为了避免客户端命令的服务影响admin超级管理员的命令服务,他将这2套服务进行了分开处理,即AdminService和ClientRMService,然后被RMAdminCLI和ApplicationCLI所远程调用,具体的结构如下


所以在后面需要改动代码的地方就是在图中出现的几个主要类中.


批处理Kill Application操作实现

对Yarn新增命令行操作,难的并不是他的实现,而是在于他的流程,因为有些代码是自己生成的,你需要改一些.proto的文件,下面演示一下如何改ApplicationCLI,RMAdminCLI将简单描述.

第一步.定义rpc方法接口以及请求回复类

import "Security.proto";  import "yarn_service_protos.proto";    service ApplicationClientProtocolService {    ....    rpc killApplicationsByAppStates (KillApplicationsByAppStatesRequestProto) returns (KillApplicationsByAppStatesResponseProto);    rpc killApplicationsOfQueue (KillApplicationsOfQueueRequestProto) returns (KillApplicationsOfQueueResponseProto);  }
...  message KillApplicationsByAppStatesRequestProto {    repeated YarnApplicationStateProto application_states = 1;  }    message KillApplicationsByAppStatesResponseProto {    optional bool is_kill_completed = 1 [default = false];  }    message KillApplicationsOfQueueRequestProto {    required string queue = 1;  }    message KillApplicationsOfQueueResponseProto {    optional bool is_kill_completed = 1 [default = false];  }
里面不明白的地方,请自行查阅protocolbuffer的使用说明.最后使用编译命令重新打包,比如"mvn package -Dmaven.test.skip=true",以此生成相应代码.

第二步.实现Request,Response类以及具体的Impl实现类

比如以KillByAppStates的命令为例子:

/**   * Licensed to the Apache Software Foundation (ASF) under one   * or more contributor license agreements.  See the NOTICE file   * distributed with this work for additional information   * regarding copyright ownership.  The ASF licenses this file   * to you under the Apache License, Version 2.0 (the   * "License"); you may not use this file except in compliance   * with the License.  You may obtain a copy of the License at   *   *     http://www.apache.org/licenses/LICENSE-2.0   *   * Unless required by applicable law or agreed to in writing, software   * distributed under the License is distributed on an "AS IS" BASIS,   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.   * See the License for the specific language governing permissions and   * limitations under the License.   */    package org.apache.hadoop.yarn.api.protocolrecords;    import java.util.EnumSet;    import org.apache.hadoop.classification.InterfaceAudience.Private;  import org.apache.hadoop.classification.InterfaceAudience.Public;  import org.apache.hadoop.classification.InterfaceStability.Stable;  import org.apache.hadoop.classification.InterfaceStability.Unstable;  import org.apache.hadoop.yarn.api.ApplicationClientProtocol;  import org.apache.hadoop.yarn.api.records.YarnApplicationState;  import org.apache.hadoop.yarn.util.Records;    /**   * <p>   * The request sent by the client to the <code>ResourceManager</code> to abort   * the applications by appStates.   * </p>   * <p>   * The client, via {@link KillApplicationsByAppStatesRequest} provides the   * {@link YarnApplicationState} of the applications to be aborted.   * </p>   *    * @see ApplicationClientProtocol#killApplicationsByApRpStates(KillApplicationsByAppStatesRequest)   */  @Public  @Stable  public abstract class KillApplicationsByAppStatesRequest {    @Private    @Unstable    public static KillApplicationsByAppStatesRequest newInstance(        EnumSet<YarnApplicationState> applicationStates) {      KillApplicationsByAppStatesRequest request =          Records.newRecord(KillApplicationsByAppStatesRequest.class);      request.setApplicationStates(applicationStates);      return request;    }      /**     * Get the application states to filter applications on     *     * @return Set of Application states to filter on     */    @Public    @Stable    public abstract EnumSet<YarnApplicationState> getApplicationStates();      /**     * Set the application states to filter applications on     *     * @param applicationStates A Set of Application states to filter on. If not     *          defined, match all running applications     */    @Private    @Unstable    public abstract void setApplicationStates(        EnumSet<YarnApplicationState> applicationStates);  }
public class KillApplicationsByAppStatesRequestPBImpl extends      KillApplicationsByAppStatesRequest {      EnumSet<YarnApplicationState> applicationStates = null;      KillApplicationsByAppStatesRequestProto proto =        KillApplicationsByAppStatesRequestProto.getDefaultInstance();    KillApplicationsByAppStatesRequestProto.Builder builder = null;    boolean viaProto = false;      public KillApplicationsByAppStatesRequestPBImpl() {      builder = KillApplicationsByAppStatesRequestProto.newBuilder();    }      public KillApplicationsByAppStatesRequestPBImpl(        KillApplicationsByAppStatesRequestProto proto) {      this.proto = proto;      viaProto = true;    }      public KillApplicationsByAppStatesRequestProto getProto() {      mergeLocalToProto();      proto = viaProto ? proto : builder.build();      viaProto = true;      return proto;    }      private void mergeLocalToProto() {      if (viaProto)        maybeInitBuilder();      mergeLocalToBuilder();      proto = builder.build();      viaProto = true;    }      private void mergeLocalToBuilder() {      if (applicationStates != null && !applicationStates.isEmpty()) {        builder.clearApplicationStates();        builder.addAllApplicationStates(Iterables.transform(applicationStates,            new Function<YarnApplicationState, YarnApplicationStateProto>() {              @Override              public YarnApplicationStateProto apply(YarnApplicationState input) {                return ProtoUtils.convertToProtoFormat(input);              }            }));      }    }      private void maybeInitBuilder() {      if (viaProto || builder == null) {        builder = KillApplicationsByAppStatesRequestProto.newBuilder(proto);      }      viaProto = false;    }      private void initApplicationStates() {      if (this.applicationStates != null) {        return;      }      KillApplicationsByAppStatesRequestProtoOrBuilder p =          viaProto ? proto : builder;      List<YarnApplicationStateProto> appStatesList =          p.getApplicationStatesList();      this.applicationStates = EnumSet.noneOf(YarnApplicationState.class);        for (YarnApplicationStateProto c : appStatesList) {        this.applicationStates.add(ProtoUtils.convertFromProtoFormat(c));      }    }      @Override    public EnumSet<YarnApplicationState> getApplicationStates() {      initApplicationStates();      return this.applicationStates;    }      @Override    public void setApplicationStates(        EnumSet<YarnApplicationState> applicationStates) {      maybeInitBuilder();      if (applicationStates == null) {        builder.clearApplicationStates();      }      this.applicationStates = applicationStates;    }      @Override    public int hashCode() {      return getProto().hashCode();    }      @Override    public boolean equals(Object other) {      if (other == null)        return false;      if (other.getClass().isAssignableFrom(this.getClass())) {        return this.getProto().equals(this.getClass().cast(other).getProto());      }      return false;    }      @Override    public String toString() {      return TextFormat.shortDebugString(getProto());    }  }
Impl实现类中会用到之前pb生成的代码.

第三步.定义协议方法接口类并实现其子类方法

/**   * <p>The protocol between clients and the <code>ResourceManager</code>   * to submit/abort jobs and to get information on applications, cluster metrics,   * nodes, queues and ACLs.</p>    */  @Public  @Stable  public interface ApplicationClientProtocol extends ApplicationBaseProtocol {     .....      /**     * <p>     * The interface used by clients to request the <code>ResourceManager</code>     * to abort applications by appStates.     * </p>     *     * <p>     * The client, via {@link KillApplicationsByAppStatesRequest} provides the     * {@link YarnApplicationState} of the applications to be aborted.     * </p>     *     * <p>     * In secure mode,the <code>ResourceManager</code> verifies access to the     * application, queue etc. before terminating the application.     * </p>     *     * <p>     * Currently, the <code>ResourceManager</code> returns an empty response on     * success and throws an exception on rejecting the request.     * </p>     *     * @param request request to abort the application by appStates     * @return <code>ResourceManager</code> returns an empty response on success     *         and throws an exception on rejecting the request     * @throws YarnException     * @throws IOException     * @see #getQueueUserAcls(GetQueueUserAclsInfoRequest)     */    @Public    @Stable    @Idempotent    public KillApplicationsByAppStatesResponse killApplicationsByAppStates(        KillApplicationsByAppStatesRequest request) throws YarnException,        IOException;  }
具体继承子类,一般就2类,1个是Client对应类,1个是服务端对应类,
@Private  public class ApplicationClientProtocolPBClientImpl implements ApplicationClientProtocol,      Closeable {      private ApplicationClientProtocolPB proxy;      public ApplicationClientProtocolPBClientImpl(long clientVersion,        InetSocketAddress addr, Configuration conf) throws IOException {      RPC.setProtocolEngine(conf, ApplicationClientProtocolPB.class,        ProtobufRpcEngine.class);      proxy = RPC.getProxy(ApplicationClientProtocolPB.class, clientVersion, addr, conf);    }      @Override    public void close() {      if (this.proxy != null) {        RPC.stopProxy(this.proxy);      }    }      ...      @Override    public KillApplicationsByAppStatesResponse killApplicationsByAppStates(        KillApplicationsByAppStatesRequest request) throws YarnException,        IOException {      KillApplicationsByAppStatesRequestProto requestProto =          ((KillApplicationsByAppStatesRequestPBImpl) request).getProto();      try {        return new KillApplicationsByAppStatesResponsePBImpl(            proxy.killApplicationsByAppStates(null, requestProto));      } catch (ServiceException e) {        RPCUtil.unwrapAndThrowException(e);        return null;      }    }  }
服务端对应2个,1个是PbClient对应的PbServer还有1个真正操作的服务端,在这里就是ClientRMService

public class ApplicationClientProtocolPBServiceImpl implements ApplicationClientProtocolPB {      private ApplicationClientProtocol real;        public ApplicationClientProtocolPBServiceImpl(ApplicationClientProtocol impl) {      this.real = impl;    }        ...    @Override    public KillApplicationsByAppStatesResponseProto killApplicationsByAppStates(        RpcController controller, KillApplicationsByAppStatesRequestProto proto)        throws ServiceException {      KillApplicationsByAppStatesRequestPBImpl request =          new KillApplicationsByAppStatesRequestPBImpl(proto);      try {        KillApplicationsByAppStatesResponse response =            real.killApplicationsByAppStates(request);        return ((KillApplicationsByAppStatesResponsePBImpl) response).getProto();      } catch (YarnException e) {        throw new ServiceException(e);      } catch (IOException e) {        throw new ServiceException(e);      }    }  }
ClientRMService中的实现:

/**   * The client interface to the Resource Manager. This module handles all the rpc   * interfaces to the resource manager from the client.   */  public class ClientRMService extends AbstractService implements      ApplicationClientProtocol {    ....      @SuppressWarnings("unchecked")    @Override    public KillApplicationsByAppStatesResponse killApplicationsByAppStates(        KillApplicationsByAppStatesRequest request) throws YarnException,        IOException {      UserGroupInformation callerUGI;      try {        callerUGI = UserGroupInformation.getCurrentUser();      } catch (IOException ie) {        LOG.info("Error getting UGI ", ie);        RMAuditLogger.logFailure("UNKNOWN", AuditConstants.KILL_APP_REQUEST,            "UNKNOWN", "ClientRMService", "Error getting UGI");        throw RPCUtil.getRemoteException(ie);      }        EnumSet<YarnApplicationState> appStates = request.getApplicationStates();      List<ApplicationReport> reports =          getApplicationReportListByAppStates(callerUGI, appStates);      List<ApplicationId> applicationIds = new ArrayList<ApplicationId>();        if (reports != null && reports.size() > 0) {        for (ApplicationReport report : reports) {          applicationIds.add(report.getApplicationId());        }      }        for (ApplicationId appId : applicationIds) {        RMApp application = this.rmContext.getRMApps().get(appId);        if (application == null) {          RMAuditLogger.logFailure(callerUGI.getUserName(),              AuditConstants.KILL_APP_REQUEST, "UNKNOWN", "ClientRMService",              "Trying to kill an absent application", appId);          throw new ApplicationNotFoundException("Trying to kill an absent"              + " application " + appId);        }          if (!checkAccess(callerUGI, application.getUser(),            ApplicationAccessType.MODIFY_APP, application)) {          RMAuditLogger.logFailure(callerUGI.getShortUserName(),              AuditConstants.KILL_APP_REQUEST,              "User doesn't have permissions to "                  + ApplicationAccessType.MODIFY_APP.toString(),              "ClientRMService", AuditConstants.UNAUTHORIZED_USER, appId);          throw RPCUtil.getRemoteException(new AccessControlException("User "              + callerUGI.getShortUserName() + " cannot perform operation "              + ApplicationAccessType.MODIFY_APP.name() + " on " + appId));        }          if (application.isAppFinalStateStored()) {          RMAuditLogger.logSuccess(callerUGI.getShortUserName(),              AuditConstants.KILL_APP_REQUEST, "ClientRMService", appId);          continue;        }          this.rmContext.getDispatcher().getEventHandler()            .handle(new RMAppEvent(appId, RMAppEventType.KILL));      }        return KillApplicationsByAppStatesResponse.newInstance(true);    }    ...
可以看到ClientRMService也是继承了ApplicationClientPtotocol,只要有新的客户端命令,就必然会存在对应的服务端的实现.上面的killApplicationsByAppStates的方法是模仿了原有的kill单一Application方法,如下:

@SuppressWarnings("unchecked")    @Override    public KillApplicationResponse forceKillApplication(        KillApplicationRequest request) throws YarnException {        ApplicationId applicationId = request.getApplicationId();        UserGroupInformation callerUGI;      try {        callerUGI = UserGroupInformation.getCurrentUser();      } catch (IOException ie) {        LOG.info("Error getting UGI ", ie);        RMAuditLogger.logFailure("UNKNOWN", AuditConstants.KILL_APP_REQUEST,            "UNKNOWN", "ClientRMService" , "Error getting UGI",            applicationId);        throw RPCUtil.getRemoteException(ie);      }        RMApp application = this.rmContext.getRMApps().get(applicationId);      if (application == null) {        RMAuditLogger.logFailure(callerUGI.getUserName(),            AuditConstants.KILL_APP_REQUEST, "UNKNOWN", "ClientRMService",            "Trying to kill an absent application", applicationId);        throw new ApplicationNotFoundException("Trying to kill an absent"            + " application " + applicationId);      }        if (!checkAccess(callerUGI, application.getUser(),          ApplicationAccessType.MODIFY_APP, application)) {        RMAuditLogger.logFailure(callerUGI.getShortUserName(),            AuditConstants.KILL_APP_REQUEST,            "User doesn't have permissions to "                + ApplicationAccessType.MODIFY_APP.toString(), "ClientRMService",            AuditConstants.UNAUTHORIZED_USER, applicationId);        throw RPCUtil.getRemoteException(new AccessControlException("User "            + callerUGI.getShortUserName() + " cannot perform operation "            + ApplicationAccessType.MODIFY_APP.name() + " on " + applicationId));      }        if (application.isAppFinalStateStored()) {        RMAuditLogger.logSuccess(callerUGI.getShortUserName(),            AuditConstants.KILL_APP_REQUEST, "ClientRMService", applicationId);        return KillApplicationResponse.newInstance(true);      }        this.rmContext.getDispatcher().getEventHandler()          .handle(new RMAppEvent(applicationId, RMAppEventType.KILL));        // For UnmanagedAMs, return true so they don't retry      return KillApplicationResponse.newInstance(          application.getApplicationSubmissionContext().getUnmanagedAM());    }
最终都是把包装好后的kill event事件发给对应的处理分发器.

第四步.编写客户端命令类并提供对应处理方法

...        Option killAppStateOpt =            new Option(APP_KILL_BY_APPSTATES_CMD, true,                "The states of application that will be killed"                    + ", input comma-separated list of application states.");        killAppStateOpt.setValueSeparator(',');        killAppStateOpt.setArgs(Option.UNLIMITED_VALUES);        killAppStateOpt.setArgName("States");        opts.addOption(killAppStateOpt);        ...
....      } else if (cliParser.hasOption(APP_KILL_BY_APPSTATES_CMD)) {        if (args.length != 3) {          printUsage(title, opts);          return exitCode;        }          EnumSet<YarnApplicationState> appStates =            EnumSet.noneOf(YarnApplicationState.class);        String[] states = cliParser.getOptionValues(APP_KILL_BY_APPSTATES_CMD);        if (states != null) {          for (String state : states) {            if (!state.trim().isEmpty()) {              try {                appStates.add(YarnApplicationState.valueOf(StringUtils                    .toUpperCase(state).trim()));              } catch (IllegalArgumentException ex) {                sysout.println("The application state " + state + " is invalid.");                sysout.println(getAllValidApplicationStates());                return exitCode;              }            }          }        }          try {          killApplicationsByAppStates(appStates);        } catch (ApplicationNotFoundException e) {          return exitCode;        }      }  ....
调用方法如下:
/**     * Kill the applications by appStates     *     * @param appStates     * @throws IOException     */    private void killApplicationsByAppStates(        EnumSet<YarnApplicationState> appStates) throws YarnException,        IOException {      if (appStates == null || appStates.isEmpty()) {        sysout.println("The appStates should not be null.");        return;      }        if (appStates.contains(YarnApplicationState.FAILED)          || appStates.contains(YarnApplicationState.KILLED)          || appStates.contains(YarnApplicationState.FINISHED)) {        sysout            .println("The appState should not contain state failed, killed, finished");        return;      }        sysout.println("Killing applications of specific states.");      client.killApplicationsByAppStates(appStates);    }
客户端的被调用方法就是要力求简单,就传个参数就可以了,主要操作在Server端执行就OK了.

第五步.编写单元测试测验结果

在没有测试集群的情况下,这不失为1种最好的试验办法.对client端和Server端各造一个testcase.

@Test    public void testKillApplicationsByAppStates() throws Exception {      ApplicationCLI cli = createAndGetAppCLI();      EnumSet<YarnApplicationState> appStates =          EnumSet.noneOf(YarnApplicationState.class);      appStates.add(YarnApplicationState.RUNNING);      appStates.add(YarnApplicationState.SUBMITTED);        int result =          cli.run(new String[] { "application", "-killByAppStates",              "RUNNING,SUBMITTED" });      assertEquals(0, result);      verify(client).killApplicationsByAppStates(appStates);      verify(sysOut).println("Killing applications of specific states.");    }
@Test    public void testKillApplicationsByAppStates() throws Exception {      YarnConfiguration conf = new YarnConfiguration();      MockRM rm = new MockRM();      rm.init(conf);      rm.start();        ClientRMService rmService = rm.getClientRMService();      GetApplicationsRequest getRequest =          GetApplicationsRequest.newInstance(EnumSet              .of(YarnApplicationState.KILLED));        EnumSet<YarnApplicationState> appStates =          EnumSet.noneOf(YarnApplicationState.class);      appStates.add(YarnApplicationState.ACCEPTED);      RMApp app1 = rm.submitApp(1024);      RMApp app2 = rm.submitApp(1024);        assertEquals("Incorrect number of apps in the RM", 0, rmService          .getApplications(getRequest).getApplicationList().size());        KillApplicationsByAppStatesRequest killRequest =          KillApplicationsByAppStatesRequest.newInstance(appStates);        KillApplicationsByAppStatesResponse killResponse =          rmService.killApplicationsByAppStates(killRequest);      assertTrue("Kill the applications successfully",          killResponse.getIsKillCompleted());    }
对于其他2个维度,对user和queue都是类似的,这里就不重新进行描述了,具体代码可以见我下面的patch链接.这个新属性功能我将此建议功能提交开源hadoop社区,编号YARN-4529.


相关链接

Issue链接:https://issues.apache.org/jira/browse/YARN-4529

Github patch链接:https://github.com/linyiqun/open-source-patch/tree/master/yarn/YARN-4529


来自: http://blog.csdn.net/androidlushangderen/article/details/50457963