Java8 中的纯异步编程

jopen 10年前

当系统越来越复杂之后,服务化的模块的接口调用会越来越多,最终模块之间的IO 成为影响整体系统性能的关键因素。传统的阻塞IO + 线程池模型应对这种场景比较无力,只能依靠增加线程数量,但是服务器本身的线程数是有上线的。一个模块接口性能的波动,啥有不慎就会造成调用者线程池被 IO打满,压垮整个服务。这时候纯异步编程就有了用武之地,因为IO 不再占用线程来执行,仅需要开少量的线程用于CPU 密集的操作,模块本身对服务接口超时的容忍程度也大大增加。

所谓纯异步编程,就是中间完全没有阻塞的操作,所有的IO 调用均是异步的。一般讲到纯异步编程,都会让人望而生畏,其实纯异步程序好不好写,和语言的特性是很相关的。像Golang 这样原生支持协程的语言基本上写出的程序就是全异步的,使用者就像在写同步程序一样;而c#, python, 以及最新的nodejs 这样的语言,虽然没有原生协程支持,但是支持Generator,写出的纯异步程序也和同步程序看起来差不多。如果一个语言连Generator 也没有,就只能使用回调的方法来写异步程序,而回调本身是编写难度很高、很容易出错的方式,尤其是还要考虑异常处理、传递这些问题,所谓「回调地狱」即是如此。

为了降低写回调式的纯异步程序的难度,就有了一种可以称之为「Managed callback」的编程模式,其代表有Google guava 的concurrent lib,推ter 使用scala 完成的Finagle 等。这种模式的特点是程序书写的顺序看起来和执行的顺序一致,自动管理异常的捕获传递,避免深层次的嵌套,函数式风格,偏向于使用粒度较小的函数组合来完成程序,使用Immutable的对象等。

Java 中原本的Future 类也是设计用来完成异步编程的,但是Future 本身的接口和功能比较有限,这才有了Guava 中的ListenableFuture 等各种增强的实现。Java8 提供了新的CompletableFuture,并且有了基本的对于函数编程的支持,已经很合适来进行Managed callback 模式的异步编程了。

首先我们基于Netty实现了一个异步的Http Server 和 Http Client,假设有一个Http client 基础接口如下:

CompletableFuture<ImmutableResponse> request(ImmutableRequest request);

 另外有一个Async 的Http Server,可以注册如下接口的Hanlder:

/**   * Async http process interface   *   * @author Dong Liu dongliu@wandoujia.com   */  public interface AsyncHandler {        /**       * handle request.       */      CompletableFuture<ImmutableResponse> handle(ImmutableRequest request);  }

假设我们要写一个纯异步的程序,从 A 接口获取数据,这个数据是一个整数,在这个接口上加 10 再通过Http 接口返回。

首先我们觉得request 方法太原始,封装一个更简单的方法:

public CompletableFuture<String> get(String url) {      ImmutableRequest request = ImmutableRequest.newBuilder()              .withMethod(HttpMethod.GET)              .withUri(url)              .build();      return request(request).thenApply(response -> new String(response.getBody()));  }

这里演示了CompletableFuture 中thenApply 方法的用法,这个方法在CompletableFuture 中管理的回调完成之后进行调用,对结果进行处理。

现在来完成我们的Handler:

public class AddNumberHandler implements AsyncHandler {        private HttpClient httpClient = ...;        @Override      public CompletableFuture<ImmutableResponse> handle(ImmutableRequest request) {          CompletableFuture<String> resultFuture = httpClient.get("http://127.0.0.1/a");          return resultFuture.thenApply(result -> {              int total = Integer.parseInt(result) + 10;              return ImmutableResponse.newBuilder().withBody(String.valueOf(total)).build();          });      }  }

让情况变得更复杂一些。假设我们需要调用A, B, C 三个接口,并把三个接口所返回的数字相加。因为A, B, C三个接口调用是独立的,所以决定并行的来请求三个接口,以提升效率:

public class AddNumberHandler implements AsyncHandler {        private HttpClient httpClient = ...;        @Override      public CompletableFuture<ImmutableResponse> handle(ImmutableRequest request) {          CompletableFuture<String> aFuture = httpClient.get("http://127.0.0.1/a");          CompletableFuture<String> bFuture = httpClient.get("http://127.0.0.1/b");          CompletableFuture<String> cFuture = httpClient.get("http://127.0.0.1/b");            return FutureUtils.combine(aFuture, bFuture, cFuture).thenApply(abc -> {              int total = parseInt(abc._1()) + parseInt(abc._2()) + parseInt(abc._3());              return ImmutableResponse.newBuilder().withBody(String.valueOf(total)).build();          });      }  }

FutureUtils.combine 方法是一个简单的封装,封装了CompletableFuture 的 Combine 方法以方便使用。可以看到,使用CompletableFuture 来写这种并行多个请求的异步程序是很容易的事情。

这中间我们都没有特别的对异常进行处理,如果直接使用callback 的话,这是肯定不可行的,必须手动捕获所有异常,否则这个callback 就永远不会返回了。但是现在,CompletableFuture管理了我们提供的回调函数, 会帮我们捕获异常并进行管理。这个异常在调用thenApply 或者combine 方法的时候是自动传递的,如果第一步就失败了,后面注册的回调就不会被执行。要对异常进行处理的话,可以使用exceptionally 方法:

public class AddNumberHandler implements AsyncHandler {        private HttpClient httpClient = ...;      private Logger logger = ...;        @Override      public CompletableFuture<ImmutableResponse> handle(ImmutableRequest request) {          CompletableFuture<String> aFuture = httpClient.get("http://127.0.0.1/a");          CompletableFuture<String> bFuture = httpClient.get("http://127.0.0.1/b");          CompletableFuture<String> cFuture = httpClient.get("http://127.0.0.1/b");            aFuture.isCompletedExceptionally();            CompletableFuture<Integer> totalFuture = FutureUtils.combine(aFuture, bFuture, cFuture)                  .thenApply(abc -> parseInt(abc._1()) + parseInt(abc._2()) + parseInt(abc._3()))                  .exceptionally(t -> {                      logger.error("", t);                      return -1;                  });            return totalFuture.thenApply(total -> {              return ImmutableResponse.newBuilder().withBody(String.valueOf(total)).build();          });      }  }

 这里举得例子都比较简单,想更多的了解可以参见CompletableFuture 的文档。

</div>

原文链接: http://www.dongliu.net/post/622452