FutureTask源码分析

OrvilleBurl 8年前
   <p>对于java的并发编程来说,我们都知道Thread和runnable,这是创建一个线程最基本的两种方法,但是这两种方法创建的线程是不支持对线程的执行结果进行返回的。虽然我们可以通过传递引用的方式实现,但是实现起来未免太复杂。这个时候我们可能要用到Callable,callable是一个JDK提供的一个支持线程返回结果的一个接口,通过实现call方法,能返回指定泛型的变量。</p>    <pre>  <code class="language-java">class CallableTask implements Callable<Integer>{        @Override      public Integer call() throws Exception {          System.out.println("call runing");          Thread.sleep(5000);          return 1;      }  }  public class CallableTest {        public static void main(String args[]){            CallableTask task = new CallableTask();          try {              System.out.println("call start");              ExecutorService service = Executors.newSingleThreadExecutor();              Future fu = service.submit(task);              System.out.println(fu.get());              service.shutdown();              System.out.println("call end");          } catch (Exception e) {              e.printStackTrace();          }  }  }</code></pre>    <p>可以通过线程池去实现任务的提交,任务提交后会返回future对象,通过get方法即可获得返回值。</p>    <p>注意:这里其实是不推荐调用call方法的,实际上直接调用call方法和runnable的run方法效果是一样的。</p>    <p>其实JDK提供了一种更好的提交方式,它可以将Runnable和Callable进行封装,以便于提交到线程池。并且可以对线程有更好的控制,比如取消线程的执行,它就是FutureTask。</p>    <p>FutureTask只是简单的对Callable以及Runnable进行了封装,提供了额外的对线程控制的功能以及阻塞获取请求结果的功能,其实对于线程池的submit方法,对于每一个任务都会封装成一个FutureTask来运行。</p>    <pre>  <code class="language-java">/**       * @throws RejectedExecutionException {@inheritDoc}       * @throws NullPointerException       {@inheritDoc}       */      public <T> Future<T> submit(Callable<T> task) {          if (task == null) throw new NullPointerException();          RunnableFuture<T> ftask = newTaskFor(task);          execute(ftask);          return ftask;      }        /**       * Returns a <tt>RunnableFuture</tt> for the given callable task.       *       * @param callable the callable task being wrapped       * @return a <tt>RunnableFuture</tt> which when run will call the       * underlying callable and which, as a <tt>Future</tt>, will yield       * the callable's result as its result and provide for       * cancellation of the underlying task.       * @since 1.6       */      protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {          return new FutureTask<T>(callable);      }</code></pre>    <p>那么FutureTask到底是怎么实现的呢?</p>    <p>首先看构造方法:</p>    <pre>  <code class="language-java">/**       * Creates a <tt>FutureTask</tt> that will, upon running, execute the       * given <tt>Callable</tt>.       *       * @param  callable the callable task       * @throws NullPointerException if callable is null       */      public FutureTask(Callable<V> callable) {          if (callable == null)              throw new NullPointerException();          sync = new Sync(callable);      }        /**       * Creates a <tt>FutureTask</tt> that will, upon running, execute the       * given <tt>Runnable</tt>, and arrange that <tt>get</tt> will return the       * given result on successful completion.       *       * @param runnable the runnable task       * @param result the result to return on successful completion. If       * you don't need a particular result, consider using       * constructions of the form:       * {@code Future<?> f = new FutureTask<Void>(runnable, null)}       * @throws NullPointerException if runnable is null       */      public FutureTask(Runnable runnable, V result) {          sync = new Sync(Executors.callable(runnable, result));      }</code></pre>    <p>FutureTask可以接受Runnable以及Callable两种类型的参数,在初始化的时候内部构造了一个Sync的AQS实现类的实例,对于runnable类型的线程需要转化成Callable,同时可以指定返回值。</p>    <p>当我们再观察其他方法的时候,几乎都是委托Sync去处理的,那么重点就放在了Sync上。</p>    <p>首先看看Sync里面有几个状态:</p>    <pre>  <code class="language-java">/** State value representing that task is ready to run */          private static final int READY     = 0;//准备就绪          /** State value representing that task is running */          private static final int RUNNING   = 1;//正在运行          /** State value representing that task ran */          private static final int RAN       = 2;//运行完毕          /** State value representing that task was cancelled */          private static final int CANCELLED = 4;//任务取消</code></pre>    <p>一个FutureTask的实例就在上面几个状态之间进行轮转,当执行线程时调用run方法,run方法又委托Syn的innerRun方法:</p>    <pre>  <code class="language-java">/**       * Sets this Future to the result of its computation       * unless it has been cancelled.       */      public void run() {          sync.innerRun();      }     //首先CAS将status置为RUNING,可以防止结束前重复提交          void innerRun() {              if (!compareAndSetState(READY, RUNNING))                  return;                runner = Thread.currentThread();              //double check 防止在此之前被cancel              if (getState() == RUNNING) { // recheck after setting thread                  V result;                  try {                      result = callable.call();                  } catch (Throwable ex) {                      setException(ex);                      return;                  }                  //设置结果                  set(result);              } else {                  //清除runner,唤醒阻塞线程                  releaseShared(0); // cancel              }          }</code></pre>    <p>当执行线程的时候,首先做的是将AQS的状态由READY变成RUNNING,因为Sync是AQS的实现类,这个也是改变AQS的状态,改变状态之后进行double check,此时是为了防止在这之前有Cancel的请求。如果Cancel了,那么releaseShared清除状态并且唤醒get等待的线程。如果为Running状态,接下来调用call方法,这里也就是为什么要提交到线程池执行了,注意call方法调用只是一个方法调用,而不像Thread.start那样会直接返回,并且开启新线程执行。当执行完毕之后,调用Set,Set其实也是委托给Sync的innerSet:</p>    <pre>  <code class="language-java">/**       * Sets the result of this Future to the given value unless       * this future has already been set or has been cancelled.       * This method is invoked internally by the <tt>run</tt> method       * upon successful completion of the computation.       * @param v the value       */      protected void set(V v) {          sync.innerSet(v);      }            void innerSet(V v) {              for (;;) {                  int s = getState();                  if (s == RAN)                      return;                  //收到取消信号,不设置结果,直接返回                  if (s == CANCELLED) {                      // aggressively release to set runner to null,                      // in case we are racing with a cancel request                      // that will try to interrupt runner                      releaseShared(0);                      return;                  }                  //设置结果,并设置当前的状态为RAN                  if (compareAndSetState(s, RAN)) {                      //设置内容                      result = v;                      //唤醒阻塞线程                      releaseShared(0);                      done();                      return;                  }              }          }</code></pre>    <p>这里在Set的时候呢,首先也是判断状态如果是RAN直接返回,如果取消了,那么唤醒get等待的线程,并且返回。如果都没有,那么设置FutureTask状态为RAN,表示线程执行完了,同时设置restult为返回值,唤醒所有的等待线程。</p>    <p>上面其实在执行前和执行后都做了Cancel的检查,如果取消,无论执行前后都是没有结果set给result的。</p>    <p>接下来看看是怎么实现阻塞等待结果的,首先看get方法:</p>    <pre>  <code class="language-java">/**       * @throws CancellationException {@inheritDoc}       */      public V get() throws InterruptedException, ExecutionException {          return sync.innerGet();      }                V innerGet() throws InterruptedException, ExecutionException {              //共享锁,没有完成会阻塞在这              acquireSharedInterruptibly(0);              //如果已经取消,那么抛出异常              if (getState() == CANCELLED)                  throw new CancellationException();              if (exception != null)                  throw new ExecutionException(exception);              return result;          }</code></pre>    <p>同样是委托机制,其实关键在于acquireSharedInterruptibly方法。</p>    <pre>  <code class="language-java">/**       * Acquires in shared mode, aborting if interrupted.  Implemented       * by first checking interrupt status, then invoking at least once       * {@link #tryAcquireShared}, returning on success.  Otherwise the       * thread is queued, possibly repeatedly blocking and unblocking,       * invoking {@link #tryAcquireShared} until success or the thread       * is interrupted.       * @param arg the acquire argument       * This value is conveyed to {@link #tryAcquireShared} but is       * otherwise uninterpreted and can represent anything       * you like.       * @throws InterruptedException if the current thread is interrupted       */      public final void acquireSharedInterruptibly(int arg)              throws InterruptedException {          if (Thread.interrupted())              throw new InterruptedException();          if (tryAcquireShared(arg) < 0) //如果目前是RAN状态或者是Cancel状态的话标识已经完成或者结束              doAcquireSharedInterruptibly(arg);//等待Task运行结束,唤醒阻塞队列      }         /**           * Implements AQS base acquire to succeed if ran or cancelled           */         protected int tryAcquireShared(int ignore) {              return innerIsDone() ? 1 : -1;          }          boolean innerIsDone() {              return ranOrCancelled(getState()) && runner == null;          }          private boolean ranOrCancelled(int state) {              return (state & (RAN | CANCELLED)) != 0;          }</code></pre>    <p>其实这里还是使用了委托的机制,同时呢采用了一个共享锁去实现同步,共享锁有一个特点就是允许多个线程获取锁,其实这里对于get操作,其实多个线程同时get是没有问题的,并且如果使用独占锁会降低性能,这里引入共享锁感觉是比较巧妙的。</p>    <p>上面代码将的是,首先线程回去check当前FutureTask的状态,如果是RAN或者Cancel,表示线程已经结束,那么直接返回,如果当前不是上面状态,证明此时线程没执行或者没执行完,那么需要阻塞等待,所以执行doAcquireSharedInterruptibly,让线程等待,等待innerSet之后或者Cancel之后的releaseShared。releaseShared会逐步的唤醒所有阻塞在get上的线程,这样所以线程都能get到结果。提高了效率。</p>    <p>FutureTask实现不但简单而且巧妙(比如巧妙的运用了共享锁),最重要的是使用的也是十分广泛:</p>    <ol>     <li> <p>做异步处理,对于下载,或者生成PDF这种比较重的场景,我们可以通过将请求异步化,抽象成FutureTask提交到线程池中运行,从而避免占用大量的Worker线程(Tomcat或者RPC框架),导致后面的请求阻塞。</p> </li>     <li> <p>对于服务的同步调用,我们可以利用FutureTask进行服务的并行调用,而在最后进行结果的汇总,这样就能变串行调用为并行调用,大大的减小请求的时间(类似于Fork-Join)。</p> </li>    </ol>    <p>最后,异步线程处理和并行处理是个好东西,需要用起来!!!。</p>    <p> </p>    <p>来自:http://www.jianshu.com/p/dfff17300a87</p>    <p> </p>