Forver.微笑

面带微笑未必真的开心,但笑起的那一刻,心中的那些不开心的事已经不重要了~

0%

OkHttp使用详解及重要源码分析

OkHttp的优势

OkHttp 是 Square 的一款应用于 Android 和 Java 的 Http 和 Http/2 客户端。

  • 能够高效的执行 http,数据加载速度更快,更省流量
  • 支持 GZIP 压缩,提升速度,节省流量
  • 缓存响应数据,避免了重复的网络请求
  • 使用简单,支持同步阻塞调用和带回调的异步调用

OkHttp基本用法

添加依赖

使用的时候只需要在 Gradle 里面加入下面一行依赖即可引入:

1
implementation 'com.squareup.okhttp3:okhttp:latestVersion'

发起HTTP GET 请求的步骤

  1. 新建OkHttpClient对象
  2. 构造Request对象
  3. 将Request对象封装为Call
  4. 通过Call来执行同步或异步请求

同步写法

实现Get请求

1
2
3
4
5
6
OkHttpClient client = new OkHttpClient();
Request request = new Request.Builder()//builder默认构造了get请求,可以省略get()
.url(url)
.build();
//耗时操作,实际使用时注意规范写法
Response response = client.newCall(request).execute();

异步写法

1
2
3
4
5
6
7
8
9
10
11
client.newCall(request).enqueue(new Callback() {
@Override
public void onFailure(Call call, IOException e) {
Log.d(TAG, "onFailure: " + e);
}

@Override
public void onResponse(Call call, Response response) throws IOException {
Log.d(TAG, "OnResponse: " + response.body().toString());
}
});

注意:

  • Response对象获取结果时response.body()从可以获取不同类型的数据,获取字符串使用的string()方法,而不是toString()方法。
  • string()方法只能调用一次

发起HTTP POST 请求的步骤

发起post请求

实现POST请求,同get请求相比,只需要在构造Request对象时,使用post方法传入body数据

1
2
3
4
5
6
7
8
OkHttpClient client = new OkHttpClient();
//此处省略body参数的生成方式,支持多种格式,写法见下文
Request request = new Request.Builder()
.url(url)
.post(body)
.build();
//耗时操作,实际使用时注意规范写法
Response response = client.newCall(request).execute()

为post请求生成body

  • 纯数据请求生成body

    1
    2
    3
    4
    5
    //使用body传递String对象写法
    RequestBody mediaType=MediaType.parse("application/json; charset=UTF-8");
    //避免手写错误最好使用三方库
    String bodyContent = "{\"name\":\"zhangsan\",\"age\":\"20\"}";
    RequestBody body = RequestBody.create(mediaType, bodyContent);
  • 上传文件请求生成body

    1
    2
    3
    4
    //使用body上传文件写法
    RequestBody mediaType=MediaType.parse("application/octet-stream")
    File bodyContent = new File(filePath);
    RequestBody body = RequestBody.create(mediaType, bodyContent);
  • 为既有数据又有文件的请求生成body

    1
    2
    3
    4
    5
    6
    7
    MultipartBody body = new MultipartBody.Builder()
    .setType(MultipartBody.FORM)
    .addFormDataPart("name", "zhangsan")
    .addFormDataPart("age", "20")
    .addFormDataPart("file", file.getName(),
    RequestBody.create(MediaType.parse("application/octet-stream"), file))
    .build();

OkHttp源码分析

从代码看,请求流程是从OKHttpClient.newCall(Request)开始的,调用该方法前,主要是一些初始化工作。
OkHttp时序图

关键流程

将Request对象封装为Call对象

1
2
3
4
@Override 
public Call newCall(Request request) {
return RealCall.newRealCall(this, request, false /* for web socket */);
}

Call是接口,实现类是RealCall

1
2
3
4
5
6
static RealCall newRealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
// Safely publish the Call instance to the EventListener.
RealCall call = new RealCall(client, originalRequest, forWebSocket);
call.eventListener = client.eventListenerFactory().create(call);
return call;
}

然后有Call对象发起同步或者异步请求。

发起请求

深入源码会发现,同步或异步请求都是通过分发器Dispatcher将任务加入到不同的队列进行管理。

  • 同步请求会调用getResponseWithInterceptorChain()立即执行请求,执行完毕后,将请求从队列中移除。
  • 异步请求会创建AsyncCall对象,AsyncCall是继承自Runnable的可执行对象。分发器为异步处理创建了线程池,通过线程池对任务队列进行异步处理。

最后还是通过RealCall的execute方法去执行单个请求。

同步请求

RealCall.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Override public Response execute() throws IOException {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
captureCallStackTrace();
timeout.enter();
eventListener.callStart(this);
try {
client.dispatcher().executed(this);
//立即执行同步请求得到相应的结果
Response result = getResponseWithInterceptorChain();
if (result == null) throw new IOException("Canceled");
return result;
} catch (IOException e) {
e = timeoutExit(e);
eventListener.callFailed(this, e);
throw e;
} finally {
//无论成功失败,移除请求
client.dispatcher().finished(this);
}
}

Dispatcher.java

1
2
3
4
synchronized void executed(RealCall call) {
//加入同步队列
runningSyncCalls.add(call);
}

异步请求

RealCall.java

1
2
3
4
5
6
7
8
9
10
@Override public void enqueue(Callback responseCallback) {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
captureCallStackTrace();
eventListener.callStart(this);
//将任务封装为AsyncCall
client.dispatcher().enqueue(new AsyncCall(responseCallback));
}

Dispatcher.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
void enqueue(AsyncCall call) {
synchronized (this) {
//加入异步队列
readyAsyncCalls.add(call);
}
//通过线程池管理异步队列
promoteAndExecute();
}

private boolean promoteAndExecute() {
assert (!Thread.holdsLock(this));

List<AsyncCall> executableCalls = new ArrayList<>();
boolean isRunning;
synchronized (this) {
for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
AsyncCall asyncCall = i.next();

if (runningAsyncCalls.size() >= maxRequests) break; // Max capacity.
if (runningCallsForHost(asyncCall) >= maxRequestsPerHost) continue; // Host max capacity.

i.remove();
executableCalls.add(asyncCall);
runningAsyncCalls.add(asyncCall);
}
isRunning = runningCallsCount() > 0;
}

for (int i = 0, size = executableCalls.size(); i < size; i++) {
AsyncCall asyncCall = executableCalls.get(i);
//创建线程池,管理请求队列,深入源码会发现仍然会回到RealCall的execute()方法。
asyncCall.executeOn(executorService());
}

return isRunning;
}

//执行异步任务的线程池
public synchronized ExecutorService executorService() {
if (executorService == null) {
executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
}
return executorService;
}

异步请求单任务的执行过程与同步类似,执行完成后将结果交给回调返回
AsyncCall.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Override protected void execute() {
boolean signalledCallback = false;
timeout.enter();
try {
Response response = getResponseWithInterceptorChain();
if (retryAndFollowUpInterceptor.isCanceled()) {
signalledCallback = true;
responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
} else {
signalledCallback = true;
responseCallback.onResponse(RealCall.this, response);
}
} catch (IOException e) {
e = timeoutExit(e);
if (signalledCallback) {
Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
} else {
eventListener.callFailed(RealCall.this, e);
responseCallback.onFailure(RealCall.this, e);
}
} finally {
client.dispatcher().finished(this);
}
}

拦截器

从代码可以看到,整个请求流程从请求到取到返回结果前,通过责任链模式,将请求交给一系列的拦截器进行处理,得到最终Response对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
Response getResponseWithInterceptorChain() throws IOException {
// Build a full stack of interceptors.
List<Interceptor> interceptors = new ArrayList<>();
interceptors.addAll(client.interceptors());
interceptors.add(retryAndFollowUpInterceptor);
interceptors.add(new BridgeInterceptor(client.cookieJar()));
interceptors.add(new CacheInterceptor(client.internalCache()));
interceptors.add(new ConnectInterceptor(client));
if (!forWebSocket) {
interceptors.addAll(client.networkInterceptors());
}
interceptors.add(new CallServerInterceptor(forWebSocket));

Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0,
originalRequest, this, eventListener, client.connectTimeoutMillis(),
client.readTimeoutMillis(), client.writeTimeoutMillis());

return chain.proceed(originalRequest);
}

RealInterceptorChain.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
@Override public Response proceed(Request request) throws IOException {
return proceed(request, streamAllocation, httpCodec, connection);
}


public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec,
RealConnection connection) throws IOException {
if (index >= interceptors.size()) throw new AssertionError();

calls++;

// If we already have a stream, confirm that the incoming request will use it.
if (this.httpCodec != null && !this.connection.supportsUrl(request.url())) {
throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
+ " must retain the same host and port");
}

// If we already have a stream, confirm that this is the only call to chain.proceed().
if (this.httpCodec != null && calls > 1) {
throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
+ " must call proceed() exactly once");
}

// Call the next interceptor in the chain.
RealInterceptorChain next = new RealInterceptorChain(interceptors, streamAllocation, httpCodec,
connection, index + 1, request, call, eventListener, connectTimeout, readTimeout,
writeTimeout);
Interceptor interceptor = interceptors.get(index);
Response response = interceptor.intercept(next);

// Confirm that the next interceptor made its required call to chain.proceed().
if (httpCodec != null && index + 1 < interceptors.size() && next.calls != 1) {
throw new IllegalStateException("network interceptor " + interceptor
+ " must call proceed() exactly once");
}

// Confirm that the intercepted response isn't null.
if (response == null) {
throw new NullPointerException("interceptor " + interceptor + " returned null");
}

if (response.body() == null) {
throw new IllegalStateException(
"interceptor " + interceptor + " returned a response with no body");
}

return response;
}

从源码可以看出,责任链中不仅有预先定义好的拦截器,也可以像OKHttpClient对象添加自定义的拦截器对象。
责任链按照添加的数据进行处理,处理的顺序为:

  • 自定义拦截器
  • RetryAndFollowUpInterceptor
  • BridgeInterceptor
  • CacheInterceptor
  • ConnectInterceptor
  • 自定义networkInterceptors
  • CallServerInterceptor
    对上述拦截器稍作调整,先依次分析预定义拦截器,再讲解自定义拦截器。

RetryAndFollowUpInterceptor

从源码看,RetryAndFollowUpInterceptor主要是根据请求发生的错误或服务器返回的错误信息,进行重试或重定向。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
@Override public Response intercept(Chain chain) throws IOException {
Request request = chain.request();
RealInterceptorChain realChain = (RealInterceptorChain) chain;
Call call = realChain.call();
EventListener eventListener = realChain.eventListener();

//StreamAllocation :负责维护服务器的链接、并发流和请求之间的关系
//streamAllocation在拦截器中ConnectInterceptor被处理
StreamAllocation streamAllocation = new StreamAllocation(client.connectionPool(),
createAddress(request.url()), call, eventListener, callStackTrace);
this.streamAllocation = streamAllocation;

//记录重试次数
int followUpCount = 0;
Response priorResponse = null;
while (true) {
if (canceled) {
streamAllocation.release();
throw new IOException("Canceled");
}

Response response;
boolean releaseConnection = true;
try {
// 发起请求
response = realChain.proceed(request, streamAllocation, null, null);
releaseConnection = false;
} catch (RouteException e) {
// RouteException:尝试连接路由失败,请求没有被发送出去
// 尝试从失败中恢复,恢复保持连接继续向下执行,不能恢复抛出异常,结束重试
if (!recover(e.getLastConnectException(), streamAllocation, false, request)) {
throw e.getFirstConnectException();
}
releaseConnection = false;
continue;
} catch (IOException e) {
// 尝试与服务器通信失败,请求可能没有被发送到出去
// 尝试从失败中恢复,恢复保持连接继续向下执行,不能恢复抛出异常,结束重试
boolean requestSendStarted = !(e instanceof ConnectionShutdownException);
if (!recover(e, streamAllocation, requestSendStarted, request)) throw e;
releaseConnection = false;
continue;
} finally {
// 如果重试失败释放资源,成功则保持连接
// We're throwing an unchecked exception. Release any resources.
if (releaseConnection) {
streamAllocation.streamFailed(null);
streamAllocation.release();
}
}

// 如果存在priorResponse,则将其绑定到response中,并且不保留priorResponse的body信息
if (priorResponse != null) {
response = response.newBuilder()
.priorResponse(priorResponse.newBuilder()
.body(null)
.build())
.build();
}

Request followUp;
try {
//根据响应对request进行处理,以发起重试或重定向
followUp = followUpRequest(response, streamAllocation.route());
} catch (IOException e) {
//重试失败,释放资源,结束重试
streamAllocation.release();
throw e;
}

//重试失败,释放资源,结束重试
if (followUp == null) {
streamAllocation.release();
return response;
}

closeQuietly(response.body());

//超过最大重试次数,停止重试
if (++followUpCount > MAX_FOLLOW_UPS) {
streamAllocation.release();
throw new ProtocolException("Too many follow-up requests: " + followUpCount);
}

if (followUp.body() instanceof UnrepeatableRequestBody) {
streamAllocation.release();
throw new HttpRetryException("Cannot retry streamed HTTP body", response.code());
}
//是否能够复用链接,无法复用则创建新的连接
if (!sameConnection(response, followUp.url())) {
streamAllocation.release();
streamAllocation = new StreamAllocation(client.connectionPool(),
createAddress(followUp.url()), call, eventListener, callStackTrace);
this.streamAllocation = streamAllocation;
} else if (streamAllocation.codec() != null) {
throw new IllegalStateException("Closing the body of " + response
+ " didn't close its backing stream. Bad interceptor?");
}

request = followUp;
priorResponse = response;
}
}

BridgeInterceptor

BridgeInterceptor比较简单,主要负责对请求中没有指定的一些header进行完善和基础优化,并对响应进行相应的处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
@Override public Response intercept(Chain chain) throws IOException {
Request userRequest = chain.request();
Request.Builder requestBuilder = userRequest.newBuilder();

RequestBody body = userRequest.body();
if (body != null) {
MediaType contentType = body.contentType();
if (contentType != null) {
requestBuilder.header("Content-Type", contentType.toString());
}

long contentLength = body.contentLength();
if (contentLength != -1) {
requestBuilder.header("Content-Length", Long.toString(contentLength));
requestBuilder.removeHeader("Transfer-Encoding");
} else {
requestBuilder.header("Transfer-Encoding", "chunked");
requestBuilder.removeHeader("Content-Length");
}
}

if (userRequest.header("Host") == null) {
requestBuilder.header("Host", hostHeader(userRequest.url(), false));
}

if (userRequest.header("Connection") == null) {
requestBuilder.header("Connection", "Keep-Alive");
}

// 如果header没有指定“Accept-Encoding”和“Range”,为请求添加header信息"Accept-Encoding: gzip",并且注意对返回结果解压
boolean transparentGzip = false;
if (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) {
transparentGzip = true;
requestBuilder.header("Accept-Encoding", "gzip");
}

List<Cookie> cookies = cookieJar.loadForRequest(userRequest.url());
if (!cookies.isEmpty()) {
requestBuilder.header("Cookie", cookieHeader(cookies));
}

if (userRequest.header("User-Agent") == null) {
requestBuilder.header("User-Agent", Version.userAgent());
}

Response networkResponse = chain.proceed(requestBuilder.build());

HttpHeaders.receiveHeaders(cookieJar, userRequest.url(), networkResponse.headers());

Response.Builder responseBuilder = networkResponse.newBuilder()
.request(userRequest);

if (transparentGzip
&& "gzip".equalsIgnoreCase(networkResponse.header("Content-Encoding"))
&& HttpHeaders.hasBody(networkResponse)) {
GzipSource responseBody = new GzipSource(networkResponse.body().source());
Headers strippedHeaders = networkResponse.headers().newBuilder()
.removeAll("Content-Encoding")
.removeAll("Content-Length")
.build();
responseBuilder.headers(strippedHeaders);
String contentType = networkResponse.header("Content-Type");
responseBuilder.body(new RealResponseBody(contentType, -1L, Okio.buffer(responseBody)));
}

return responseBuilder.build();
}

CacheInterceptor

CacheInterceptor负责处理请求过程中的缓存问题。cache(InternalCache对象)是全局变量,在Cache类内部实现,使用DiskLruCache对缓存进行管理,缓存在磁盘上。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
@Override public Response intercept(Chain chain) throws IOException {

//读取缓存中的响应信息
Response cacheCandidate = cache != null
? cache.get(chain.request())
: null;

long now = System.currentTimeMillis();

//根据请求和缓存响应创建缓存策略
//CacheStrategy.networkRequest为空时表示不使用网络
//CacheStrategy.cacheResponse为空时表示不使用缓存
CacheStrategy strategy = new CacheStrategy.Factory(now, chain.request(), cacheCandidate).get();
Request networkRequest = strategy.networkRequest;
Response cacheResponse = strategy.cacheResponse;

//保留符合缓存策略的响应报文
if (cache != null) {
cache.trackResponse(strategy);
}

//没有缓存可用
if (cacheCandidate != null && cacheResponse == null) {
closeQuietly(cacheCandidate.body()); // The cache candidate wasn't applicable. Close it.
}

// 不使网络,也没有缓存信息时,返回504错误信息
if (networkRequest == null && cacheResponse == null) {
return new Response.Builder()
.request(chain.request())
.protocol(Protocol.HTTP_1_1)
.code(504)
.message("Unsatisfiable Request (only-if-cached)")
.body(Util.EMPTY_RESPONSE)
.sentRequestAtMillis(-1L)
.receivedResponseAtMillis(System.currentTimeMillis())
.build();
}

// 不使用网络时,直接读缓存,但是忽略body
if (networkRequest == null) {
return cacheResponse.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.build();
}

Response networkResponse = null;
try {
networkResponse = chain.proceed(networkRequest);
} finally {
// If we're crashing on I/O or otherwise, don't leak the cache body.
if (networkResponse == null && cacheCandidate != null) {
closeQuietly(cacheCandidate.body());
}
}

//有缓存更新缓存
if (cacheResponse != null) {
// 304,直接使用缓存
if (networkResponse.code() == HTTP_NOT_MODIFIED) {
Response response = cacheResponse.newBuilder()
.headers(combine(cacheResponse.headers(), networkResponse.headers()))
.sentRequestAtMillis(networkResponse.sentRequestAtMillis())
.receivedResponseAtMillis(networkResponse.receivedResponseAtMillis())
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build();
networkResponse.body().close();

// Update the cache after combining headers but before stripping the
// Content-Encoding header (as performed by initContentStream()).
cache.trackConditionalCacheHit();
// 更新缓存
cache.update(cacheResponse, response);
return response;
} else {
closeQuietly(cacheResponse.body());
}
}

//无缓存,则将响应报文按照缓存策略存入缓存
Response response = networkResponse.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build();

if (cache != null) {
if (HttpHeaders.hasBody(response) && CacheStrategy.isCacheable(response, networkRequest)) {
// Offer this request to the cache.
CacheRequest cacheRequest = cache.put(response);
return cacheWritingResponse(cacheRequest, response);
}

if (HttpMethod.invalidatesCache(networkRequest.method())) {
try {
cache.remove(networkRequest);
} catch (IOException ignored) {
// The cache cannot be written.
}
}
}

return response;
}

ConnectInterceptor

ConnectInterceptor通过StreamAllocation类对连接中的流信息进行处理.是所有拦截器中最核心的拦截器。
并且涉及到一些比较复杂的类。
HttpCodec类负责对Http的请求报文和响应报文进行编解码
StreamAllocation :从连接中获取一个连接,并从连接中获取编解码的流信息。
获取连接时,先判断当前连接是否可用,如果可用使用当前连接;如果没有则从连接池获取一个连接;如果都没有可用连接,则创建一个新的连接,并进行握手,然后将其放到连接池中。
注意获取链接的过程中,即使新创建了连接,如果查到了已有的可用连接,仍然优先选择已有的可用连接。
ConnectionPool: 管理连接,复用已有连接池,减少网络延迟。

ConnectInterceptor.java

1
2
3
4
5
6
7
8
9
10
11
12
@Override public Response intercept(Chain chain) throws IOException {
RealInterceptorChain realChain = (RealInterceptorChain) chain;
Request request = realChain.request();
StreamAllocation streamAllocation = realChain.streamAllocation();

boolean doExtensiveHealthChecks = !request.method().equals("GET");
//从连接中获取请求和响应流信息
HttpCodec httpCodec = streamAllocation.newStream(client, chain, doExtensiveHealthChecks);
RealConnection connection = streamAllocation.connection();

return realChain.proceed(request, streamAllocation, httpCodec, connection);
}

StreamAllocation.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
//获取一个连接,并从连接中获取编解码的流信息
public HttpCodec newStream(
OkHttpClient client, Interceptor.Chain chain, boolean doExtensiveHealthChecks) {
int connectTimeout = chain.connectTimeoutMillis();
int readTimeout = chain.readTimeoutMillis();
int writeTimeout = chain.writeTimeoutMillis();
int pingIntervalMillis = client.pingIntervalMillis();
boolean connectionRetryEnabled = client.retryOnConnectionFailure();

try {
RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
writeTimeout, pingIntervalMillis, connectionRetryEnabled, doExtensiveHealthChecks);
HttpCodec resultCodec = resultConnection.newCodec(client, chain, this);

synchronized (connectionPool) {
codec = resultCodec;
return resultCodec;
}
} catch (IOException e) {
throw new RouteException(e);
}
}

private RealConnection findHealthyConnection(int connectTimeout, int readTimeout,
int writeTimeout, int pingIntervalMillis, boolean connectionRetryEnabled,
boolean doExtensiveHealthChecks) throws IOException {
while (true) {
//获取一个连接
RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout,
pingIntervalMillis, connectionRetryEnabled);

synchronized (connectionPool) {
if (candidate.successCount == 0) {
return candidate;
}
}
//检查连接是否健康可用
if (!candidate.isHealthy(doExtensiveHealthChecks)) {
noNewStreams();
continue;
}

return candidate;
}
}

private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout,
int pingIntervalMillis, boolean connectionRetryEnabled) throws IOException {
boolean foundPooledConnection = false;
RealConnection result = null;
Route selectedRoute = null;
Connection releasedConnection;
Socket toClose;
synchronized (connectionPool) {
if (released) throw new IllegalStateException("released");
if (codec != null) throw new IllegalStateException("codec != null");
if (canceled) throw new IOException("Canceled");

// 尝试使用当前连接,但是需要注意已有的连接是否被限制创建新的流。
releasedConnection = this.connection;
toClose = releaseIfNoNewStreams();
if (this.connection != null) {
// 当前连接可用
result = this.connection;
releasedConnection = null;
}
if (!reportedAcquired) {
releasedConnection = null;
}

//当前连接不可用,则从连接池获取一个连接
if (result == null) {
Internal.instance.get(connectionPool, address, this, null);
if (connection != null) {
foundPooledConnection = true;
//从连接池得到了可用的连接
result = connection;
} else {
selectedRoute = route;
}
}
}
closeQuietly(toClose);

if (releasedConnection != null) {
eventListener.connectionReleased(call, releasedConnection);
}
if (foundPooledConnection) {
eventListener.connectionAcquired(call, result);
}
//当前连接可用或连接池找到了可用连接,结束查找连接
if (result != null) {
return result;
}

boolean newRouteSelection = false;
if (selectedRoute == null && (routeSelection == null || !routeSelection.hasNext())) {
newRouteSelection = true;
routeSelection = routeSelector.next();
}

synchronized (connectionPool) {
if (canceled) throw new IOException("Canceled");

if (newRouteSelection) {
// 如果当前连接不可用或连接池获得了可用的链接,那么根据一系列的IP地址,再次尝试从连接池获取连接
List<Route> routes = routeSelection.getAll();
for (int i = 0, size = routes.size(); i < size; i++) {
Route route = routes.get(i);
Internal.instance.get(connectionPool, address, this, route);
if (connection != null) {
foundPooledConnection = true;
result = connection;
this.route = route;
break;
}
}
}
//仍然不能从连接池获取连接,则新建一个连接
if (!foundPooledConnection) {
if (selectedRoute == null) {
selectedRoute = routeSelection.next();
}

route = selectedRoute;
refusedStreamCount = 0;
result = new RealConnection(connectionPool, selectedRoute);
acquire(result, false);
}
}

//第二次尝试从连接池获取到了连接,结束查找连接
if (foundPooledConnection) {
eventListener.connectionAcquired(call, result);
return result;
}

//对新创建的连接建立连接(TCP和TLS握手)
// Do TCP + TLS handshakes. This is a blocking operation.
result.connect(connectTimeout, readTimeout, writeTimeout, pingIntervalMillis,
connectionRetryEnabled, call, eventListener);
routeDatabase().connected(result.route());

Socket socket = null;
synchronized (connectionPool) {
reportedAcquired = true;

//并将连接存入连接池
Internal.instance.put(connectionPool, result);

//如果连接池有重复的链接,则使用已有的连接
if (result.isMultiplexed()) {
socket = Internal.instance.deduplicate(connectionPool, address, this);
result = connection;
}
}
closeQuietly(socket);

eventListener.connectionAcquired(call, result);
return result;
}

ConnectionPool.java
1
2
3
4
5
6
7
8
9
10
//使用双端队列管理连接池
private final Deque<RealConnection> connections = new ArrayDeque<>();
//连接池中的闲置的连接,闲置多久后被清理
private final int maxIdleConnections;
private final long keepAliveDurationNs;
//默认最大可闲置的连接数为5个,超过5分钟连接会被清理。
清理任务通过线程池完成。此处不再列出源码
public ConnectionPool() {
this(5, 5, TimeUnit.MINUTES);
}

CallServerInterceptor

CallServerInterceptor是最后一个拦截器,将Request对象写入流信息,发送给服务器,并将从服务器返回的流信息转化为和Resposne对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
@Override public Response intercept(Chain chain) throws IOException {
RealInterceptorChain realChain = (RealInterceptorChain) chain;
HttpCodec httpCodec = realChain.httpStream();
StreamAllocation streamAllocation = realChain.streamAllocation();
RealConnection connection = (RealConnection) realChain.connection();
Request request = realChain.request();

long sentRequestMillis = System.currentTimeMillis();

realChain.eventListener().requestHeadersStart(realChain.call());
httpCodec.writeRequestHeaders(request);
realChain.eventListener().requestHeadersEnd(realChain.call(), request);

Response.Builder responseBuilder = null;
if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) {
if ("100-continue".equalsIgnoreCase(request.header("Expect"))) {
httpCodec.flushRequest();
realChain.eventListener().responseHeadersStart(realChain.call());
responseBuilder = httpCodec.readResponseHeaders(true);
}

if (responseBuilder == null) {
realChain.eventListener().requestBodyStart(realChain.call());
long contentLength = request.body().contentLength();
CountingSink requestBodyOut =
new CountingSink(httpCodec.createRequestBody(request, contentLength));
BufferedSink bufferedRequestBody = Okio.buffer(requestBodyOut);

request.body().writeTo(bufferedRequestBody);
bufferedRequestBody.close();
realChain.eventListener()
.requestBodyEnd(realChain.call(), requestBodyOut.successfulCount);
} else if (!connection.isMultiplexed()) {
streamAllocation.noNewStreams();
}
}

httpCodec.finishRequest();

if (responseBuilder == null) {
realChain.eventListener().responseHeadersStart(realChain.call());
responseBuilder = httpCodec.readResponseHeaders(false);
}

Response response = responseBuilder
.request(request)
.handshake(streamAllocation.connection().handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build();

int code = response.code();
if (code == 100) {
responseBuilder = httpCodec.readResponseHeaders(false);

response = responseBuilder
.request(request)
.handshake(streamAllocation.connection().handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build();

code = response.code();
}

realChain.eventListener()
.responseHeadersEnd(realChain.call(), response);

if (forWebSocket && code == 101) {
response = response.newBuilder()
.body(Util.EMPTY_RESPONSE)
.build();
} else {
response = response.newBuilder()
.body(httpCodec.openResponseBody(response))
.build();
}

if ("close".equalsIgnoreCase(response.request().header("Connection"))
|| "close".equalsIgnoreCase(response.header("Connection"))) {
streamAllocation.noNewStreams();
}

if ((code == 204 || code == 205) && response.body().contentLength() > 0) {
throw new ProtocolException(
"HTTP " + code + " had non-zero Content-Length: " + response.body().contentLength());
}

return response;
}

自定义拦截器

如果项目中有特殊需求,还可以自定义拦截器,处理一些特殊的需求。
只需要继承Interceptor,并实现intercept(Chain chain)方法。
只需要使用OKHttpClient.Builder的addInterceptor或addNetworkInterceptor插入即可。
自定义拦截器目前可以用于以下场景:

  • OKHttp自带了一个自定义拦截器HttpLoggingInterceptor,用于打印请求和响应报文的日志信息。
  • 开源库chuck实现了在手机通知栏监控http请求。
  • 封装业务拦截器,处理业务上统一行为,比如插入全局请求参数,统一参数格式,参数加密,封装统一响应结果解析等操作。

参考