OkHttp3 源码浅析
DiaCeh
8年前
<p>之前的底层网络库基本就是Apache HttpClient和HttpURLConnection。由于HttClient比较难用,官方在Android2.3以后就不建议用了,并且在Android5.0以后废弃了HttpClient,在Android6.0更是删除了HttpClient。</p> <p>HttpURLConnection是一种多用途、轻量极的HTTP客户端,使用它来进行HTTP操作可以适用于大多数的应用程序,但是在Android 2.2版本之前存在一些bug,所以官方建议在Android2.3以后替代HttpClient,Volley就是按版本分区使用这两个网络库。</p> <p>然而随着开源届扛把子Square的崛起,OkHttp的开源,这两个网络库只能被淹没在历史洪流中。Android4.4以后HttpURLConnection的底层已经替换成OkHttp实现。OkHttp配合同样是Square开源的Retrofit,网络请求变得更简便,功能更强大。</p> <h2>OkHttp</h2> <p>OkHttp是一个现代,快速,高效的网络库,OkHttp 库的设计和实现的首要目标是高效。</p> <ul> <li>支持 HTTP/2和SPDY,这使得对同一个主机发出的所有请求都可以共享相同的套接字连接;</li> <li>如果 HTTP/2和SPDY不可用,OkHttp会使用连接池来复用连接以提高效率。</li> <li>支持Gzip降低传输内容的大小</li> <li>支持Http缓存</li> <li>会从很多常用的连接问题中自动恢复。如果服务器配置了多个IP地址,OkHttp 会自动重试一个主机的多个 IP 地址。</li> <li>使用Okio来大大简化数据的访问与存储,提高性能</li> </ul> <h2>简单使用</h2> <p>简单的异步请求</p> <pre> <code class="language-java"> OkHttpClient client = new OkHttpClient(); Request request = new Request.Builder() .url(url) .build(); client.newCall(request).enqueue(new Callback() { public void onFailure(Request request, IOException e) { } public void onResponse(Response response) throws IOException { System.out.println(response.body().string()); } }); </code></pre> <p>使用非常的简答,发送请求,拿到异步结果。</p> <h2>OkHttpClient</h2> <p>跟下源码,OkHttpClient.newCall实现</p> <pre> <code class="language-java">public class OkHttpClient implements Cloneable, Call.Factory{ public static final class Builder { Dispatcher dispatcher; Proxy proxy; List<Protocol> protocols; List<ConnectionSpec> connectionSpecs; final List<Interceptor> interceptors = new ArrayList<>(); final List<Interceptor> networkInterceptors = new ArrayList<>(); ProxySelector proxySelector; CookieJar cookieJar; Cache cache; InternalCache internalCache; SocketFactory socketFactory; SSLSocketFactory sslSocketFactory; CertificateChainCleaner certificateChainCleaner; HostnameVerifier hostnameVerifier; CertificatePinner certificatePinner; Authenticator proxyAuthenticator; Authenticator authenticator; ConnectionPool connectionPool; Dns dns; boolean followSslRedirects; boolean followRedirects; boolean retryOnConnectionFailure; int connectTimeout; int readTimeout; int writeTimeout; } ... ... @Override public Call newCall(Request request) { return new RealCall(this, request); ... ... } } </code></pre> <p>OkHttpClient通过Builder实例化,实现了Call.Factory接口创建了一个RealCall的实例,而RealCall是Call接口的实现。</p> <pre> <code class="language-java">public interface Call { Request request(); Response execute() throws IOException; void enqueue(Callback responseCallback); void cancel(); boolean isExecuted(); boolean isCanceled(); interface Factory { Call newCall(Request request); } } </code></pre> <h2>RealCall</h2> <p>RealCall中封装了OKHttpClient和Request</p> <pre> <code class="language-java"> protected RealCall(OkHttpClient client, Request originalRequest) { this.client = client; this.originalRequest = originalRequest; } @Override public void enqueue(Callback responseCallback) { enqueue(responseCallback, false); } void enqueue(Callback responseCallback, boolean forWebSocket) { synchronized (this) { if (executed) throw new IllegalStateException("Already Executed"); executed = true; } client.dispatcher().enqueue(new AsyncCall(responseCallback, forWebSocket)); } final class AsyncCall extends NamedRunnable { private final Callback responseCallback; private final boolean forWebSocket; private AsyncCall(Callback responseCallback, boolean forWebSocket) { super("OkHttp %s", redactedUrl().toString()); this.responseCallback = responseCallback; this.forWebSocket = forWebSocket; } ... @Override protected void execute() { boolean signalledCallback = false; try { Response response = getResponseWithInterceptorChain(forWebSocket); if (canceled) { signalledCallback = true; responseCallback.onFailure(RealCall.this, new IOException("Canceled")); } else { signalledCallback = true; responseCallback.onResponse(RealCall.this, response); } } catch (IOException e) { if (signalledCallback) { // Do not signal the callback twice! Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e); } else { responseCallback.onFailure(RealCall.this, e); } } finally { //注意这一句代码 client.dispatcher().finished(this); } } } </code></pre> <p>调用enqueue封装成AsyncCall交给OKHttpClient的dispatcher线程池执行。</p> <h2>Dispatcher线程池</h2> <p>OkHttp的dispatcher参数是直接new出来的。先看下enqueue方法,将AsyncCall当做参数传递进来</p> <pre> <code class="language-java">public final class Dispatcher { /** 最大并发请求数为64 */ private int maxRequests = 64; /** 每个主机最大请求数为5 */ private int maxRequestsPerHost = 5; /** 线程池 */ private ExecutorService executorService; /** 准备执行的请求 */ private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>(); /** 正在执行的异步请求,包含已经取消但未执行完的请求 */ private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>(); /** 正在执行的同步请求,包含已经取消单未执行完的请求 */ private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>(); synchronized void enqueue(AsyncCall call) { if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) { runningAsyncCalls.add(call); executorService().execute(call); } else { readyAsyncCalls.add(call); } } public synchronized ExecutorService executorService() { if (executorService == null) { executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false)); } return executorService; } } </code></pre> <p>构造一个线程池ExecutorService:</p> <pre> <code class="language-java">executorService = new ThreadPoolExecutor( 0, //corePoolSize 最小并发线程数,如果是0的话,空闲一段时间后所有线程将全部被销毁。 Integer.MAX_VALUE, //maximumPoolSize: 最大线程数,当任务进来时可以扩充的线程最大值,当大于了这个值就会根据丢弃处理机制来处理 60, //keepAliveTime: 当线程数大于corePoolSize时,多余的空闲线程的最大存活时间 TimeUnit.SECONDS,//单位秒 new SynchronousQueue<Runnable>(),//工作队列,先进先出 Util.threadFactory("OkHttp Dispatcher", false));//单个线程的工厂 </code></pre> <p>构建了一个最大线程数为Integer.MAX_VALUE的线程池,也就是说,是个不设最大上限的线程池(其实有限制64个),有多少任务添加进来就新建多少线程,以保证I/O任务中高阻塞低占用的过程中,不会长时间卡在阻塞上。当工作完成后,线程池会在60s内相继关闭所有线程。</p> <p>还记得刚才在AsyncCall.execute() finally中的内容吗</p> <pre> <code class="language-java">finally { client.dispatcher().finished(this); } ... /** Used by {@code AsyncCall#run} to signal completion. */ synchronized void finished(AsyncCall call) { if (!runningAsyncCalls.remove(call)) throw new AssertionError("AsyncCall wasn't running!"); promoteCalls(); } //Dispatcher.java private void promoteCalls() { //超过阈值 返回 if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity. if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote. for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) { AsyncCall call = i.next(); if (runningCallsForHost(call) < maxRequestsPerHost) { i.remove(); runningAsyncCalls.add(call); executorService().execute(call); } if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity. } } </code></pre> <p>当AsyncCall执行完成后,调用Disptcher的finish()方法,调用promoteCalls()方法,如果超过阈值,继续等待,否则取出缓存区的任务执行,顺序是先进先出。</p> <p>Dispatcher线程池总结</p> <ul> <li>调度线程池Disptcher实现了高并发,低阻塞的实现</li> <li>采用Deque作为缓存,先进先出的顺序执行</li> <li>任务在try/finally中调用了finished函数,控制任务队列的执行顺序,而不是采用锁,减少了编码复杂性提高性能</li> </ul> <h2>Interceptor</h2> <p>调度基本整明白了,AsyncCall 中的execute具体内容还没有分析,主要就一行代码。</p> <pre> <code class="language-java">@Override protected void execute() { boolean signalledCallback = false; try { ... Response response = getResponseWithInterceptorChain(forWebSocket); ... } finally { client.dispatcher().finished(this); } } private Response getResponseWithInterceptorChain(boolean forWebSocket) throws IOException { Interceptor.Chain chain = new ApplicationInterceptorChain(0, originalRequest, forWebSocket); return chain.proceed(originalRequest); } </code></pre> <p>从方法名字基本可以猜到是干嘛的,调用 chain.proceed(originalRequest); 将request传递进来,从拦截器链里拿到返回结果。那么拦截器Interceptor是干嘛的,Chain是干嘛的呢?继续往下看ApplicationInterceptorChain</p> <pre> <code class="language-java">class ApplicationInterceptorChain implements Interceptor.Chain { private final int index; private final Request request; ApplicationInterceptorChain(int index, Request request, boolean forWebSocket) { this.index = index; this.request = request; this.forWebSocket = forWebSocket; } @Override public Connection connection() { return null; } @Override public Request request() { return request; } @Override public Response proceed(Request request) throws IOException { // If there's another interceptor in the chain, call that. if (index < client.interceptors().size()) { Interceptor.Chain chain = new ApplicationInterceptorChain(index + 1, request, forWebSocket); Interceptor interceptor = client.interceptors().get(index); Response interceptedResponse = interceptor.intercept(chain); if (interceptedResponse == null) { throw new NullPointerException("application interceptor " + interceptor + " returned null"); } return interceptedResponse; } // No more interceptors. Do HTTP. return getResponse(request, forWebSocket); } } </code></pre> <p>ApplicationInterceptorChain实现了Interceptor.Chain接口,持有Request的引用。</p> <pre> <code class="language-java">public interface Interceptor { Response intercept(Chain chain) throws IOException; interface Chain { Request request(); Response proceed(Request request) throws IOException; Connection connection(); } } </code></pre> <p>proceed方法中判断index(此时为0)是否小于client.interceptors(List )的大小,如果小于也就是说client.interceptors还有Interceptor,那么就再封装一个ApplicationInterceptorChain,只不过index + 1,然后取出第index个Interceptor将chain传递进去。传递进去干嘛呢?我们看一个用法,以实际项目为例</p> <pre> <code class="language-java">HttpLoggingInterceptor interceptor = new HttpLoggingInterceptor(new RetrofitLogger()); interceptor.setLevel(HttpLoggingInterceptor.Level.BODY); OkHttpClient client = new OkHttpClient.Builder() .addInterceptor(interceptor) .retryOnConnectionFailure(true) .connectTimeout(15, TimeUnit.SECONDS) .addInterceptor(getCommonParameterInterceptor()) .addNetworkInterceptor(getTokenInterceptor()) .build(); @Override protected Interceptor getCommonParameterInterceptor() { return new Interceptor() { @Override public Response intercept(Chain chain) throws IOException { Request originalRequest = chain.request(); Request request = originalRequest; if (!originalRequest.method().equalsIgnoreCase("POST")) { HttpUrl modifiedUrl = originalRequest.url().newBuilder() .addQueryParameter("version_code", String.valueOf(AppUtils.getVersionCode())) .addQueryParameter("app_key", "nicepro") .addQueryParameter("app_device", "Android") .addQueryParameter("app_version", AppUtils.getVersionName()) .addQueryParameter("token", AccountUtils.getToken()) .build(); request = originalRequest.newBuilder().url(modifiedUrl).build(); } return chain.proceed(request); } }; } @Override protected Interceptor getTokenInterceptor() { return new Interceptor() { @Override public Response intercept(Chain chain) throws IOException { Request originalRequest = chain.request(); Request authorised = originalRequest.newBuilder() .header("app-key", "nicepro") .header("app-device", "Android") .header("app-version", AppUtils.getVersionName()) .header("os", AppUtils.getOs()) .header("os-version", AppUtils.getAndroidVersion() + "") .header("Accept", "application/json") .header("User-Agent", "Android/retrofit") .header("token", AccountUtils.getToken()) .build(); return chain.proceed(authorised); } }; } </code></pre> <p>可以看到每个Interceptor的intercept方法中做了一些操作后,最后都会调用 chain.proceed(request) 方法,而这个chain就是每次prceed方法中生成的ApplicationInterceptorChain,用index+1的方式递归调用OkHttClient中的Interceptors,进行拦截操作,比如可以用来监控log,修改请求,修改结果,供开发者自定义参数添加等等,然后最终调用的还是最初的index=0的那个chain的proceed方法中的 getResponse(request, forWebSocket); 。</p> <p>可以说OkHttp是用chain串联起拦截器,而每个拦截器都有能力返回Response,返回Response即终止整个调用链,这种设计模式称为 <a href="/misc/goto?guid=4959717657491465716" rel="nofollow,noindex">责任链模式</a> 。这种模式为OkHttp提供了强大的装配能力,极大的提高了OkHttp的扩展性和可维护性。</p> <p>在Android系统中最典型的责任链模式就是View的Touch传递机制,一层一层传递直到被消费。</p> <p>官方的一张图就能很好的解释Interceptor</p> <p style="text-align:center"><img src="https://simg.open-open.com/show/50e636bc980df68994d882d0c84909d8.png"></p> <p>整个流程很清晰。这种设计真是太棒了,值得学习!</p> <h2>连接池复用</h2> <p>我们知道进行一次tcp网络请求,一般要三次握手连接,四次握手断开连接。一次完整的http请求过程见下图。</p> <p style="text-align:center"><img src="https://simg.open-open.com/show/dd7ed72d55a40f9b731fe617d460ee29.jpg"></p> <p>如果请求重复的地址,那么重复的连接和断开连接就成了延长整个时间的的重要因素,特别是在复杂的网络环境下,每次请求传输数据的大小将不再是请求速度的决定性因素。</p> <p>http有一种 keepalive connections 的机制,可以在传输后仍然保持连接,当客户端需要再次获取数据时,直接使用刚刚空闲下来的连接而不需要再次握手。</p> <p>Okhttp支持5个并发KeepAlive,默认链路生命为5分钟(链路空闲后,保持存活的时间) 。</p> <h2>DNS解析</h2> <p>对比上一张图的一次完整的Http请求,在复杂的天朝网络环境下,相信大多数开发者都碰到过很奇怪的网络问题,比如运营商动态插入辣鸡html代码嵌入广告,比如运营商缓存请求数据导致用户请求到的数据不是最新的问题,比如某些运营商只支持 put\post 请求,而不支持 delete 请求,比如运营商。。。这些问题大部分都跟DNS相关。</p> <p>为了解决DNS劫持的问题,我们在薄荷app上做了很多优化工作,比如使用HTTP DNS(我们使用的DNSPod)代替系统自带的libc库去查询运营商的DNS服务器,直接拿到IP地址进行IP直连,其中又做了一些缓存和选择最优IP的一些操作。解决掉了很大一部分用户反馈的网络问题。</p> <p>而在OkHttp中,可以直接配置DNS,默认是系统自带的 Dns.SYSTEM 。</p> <pre> <code class="language-java">// Try each address for best behavior in mixed IPv4/IPv6 environments. List<InetAddress> addresses = address.dns().lookup(socketHost); for (int i = 0, size = addresses.size(); i < size; i++) { InetAddress inetAddress = addresses.get(i); inetSocketAddresses.add(new InetSocketAddress(inetAddress, socketPort)); } </code></pre> <p>注意结果是数组,即一个域名可能会有多个IP,如果一个IP不通,会自动重连下一个IP。</p> <p>开发者就可以新建一个Dns类复写 lookup 方法通过HTTP DNS请求IP地址,其中新建一个 HttpDNSClient 来请求DNS,插入拦截器来配置缓存时间,容错处理等等,然后在构建OkHttpClient时加入 dns 方法即可。</p> <pre> <code class="language-java">client = new OkHttpClient.Builder().addNetworkInterceptor(getLogger()) .dispatcher(getDispatcher()) //配置DNS查询实现 .dns(HTTP_DNS) .build(); </code></pre> <p>这样的全局HTTP DNS解析真是足够简单高效,并且完全是无侵入性的,丝毫不影响正常的网络请求。</p> <h2>总结</h2> <p>本文基本讲了下OkHttp3的大概流程,Interceptor的基本原理,DNS的可选配置等。涉及到socket和Okio流相关的都没有讲到,有兴趣的读者可以在参考文章自行搜索。总结来说,OkHttp基本可以满足日常开发的需求,并且性能足够强大,配合Retrofit + Rxjava更是效率翻倍。如果你在开发新的项目,强烈建议你扔掉Volley,拥抱Retrofit。</p> <p> </p> <p>来自:http://w4lle.github.io/2016/12/06/OkHttp/</p> <p> </p>