OKHttp
MoMo Lv5

使用方法

分别创建一个OkHttpClient对象,一个Request对象,然后利用他们创建一个Call对象,最后调用同步请求execute()方法或者异步请求enqueue()方法来拿到Response

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
private final OkHttpClient client = new OkHttpClient();

Request request = new Request.Builder()
.url("https://github.com/")
.build();

//同步请求
Response response = client.newCall(request).execute();
//todo handle response

//异步请求
client.newCall(request).enqueue(new Callback() {
@Override
public void onFailure(@NotNull Call call, @NotNull IOException e) {
//todo handle request failed
}

@Override
public void onResponse(@NotNull Call call, @NotNull Response response) throws IOException {
//todo handle Response
}
});

基本对象介绍

OkHttpClient

一个请求的配置类,采用了建造者模式,方便用户配置一些请求参数,如配置callTimeoutcookieinterceptor等等。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
open class OkHttpClient internal constructor(builder: Builder) : Cloneable, Call.Factory, WebSocket.Factory {

constructor() : this(Builder())

class Builder constructor() {
//调度器
internal var dispatcher: Dispatcher = Dispatcher()
//连接池
internal var connectionPool: ConnectionPool = ConnectionPool()
//整体流程拦截器
internal val interceptors: MutableList<Interceptor> = mutableListOf()
//网络流程拦截器
internal val networkInterceptors: MutableList<Interceptor> = mutableListOf()
//流程监听器
internal var eventListenerFactory: EventListener.Factory = EventListener.NONE.asFactory()
//连接失败时是否重连
internal var retryOnConnectionFailure = true
//服务器认证设置
internal var authenticator: Authenticator = Authenticator.NONE
//是否重定向
internal var followRedirects = true
//是否从HTTP重定向到HTTPS
internal var followSslRedirects = true
//cookie设置
internal var cookieJar: CookieJar = CookieJar.NO_COOKIES
//缓存设置
internal var cache: Cache? = null
//DNS设置
internal var dns: Dns = Dns.SYSTEM
//代理设置
internal var proxy: Proxy? = null
//代理选择器设置
internal var proxySelector: ProxySelector? = null
//代理服务器认证设置
internal var proxyAuthenticator: Authenticator = Authenticator.NONE
//socket配置
internal var socketFactory: SocketFactory = SocketFactory.getDefault()
//https socket配置
internal var sslSocketFactoryOrNull: SSLSocketFactory? = null
internal var x509TrustManagerOrNull: X509TrustManager? = null
internal var connectionSpecs: List<ConnectionSpec> = DEFAULT_CONNECTION_SPECS
//协议
internal var protocols: List<Protocol> = DEFAULT_PROTOCOLS
//域名校验
internal var hostnameVerifier: HostnameVerifier = OkHostnameVerifier
internal var certificatePinner: CertificatePinner = CertificatePinner.DEFAULT
internal var certificateChainCleaner: CertificateChainCleaner? = null
//请求超时
internal var callTimeout = 0
//连接超时
internal var connectTimeout = 10_000
//读取超时
internal var readTimeout = 10_000
//写入超时
internal var writeTimeout = 10_000
internal var pingInterval = 0
internal var minWebSocketMessageToCompress = RealWebSocket.DEFAULT_MINIMUM_DEFLATE_SIZE
internal var routeDatabase: RouteDatabase? = null

···省略代码···

Request

同样是请求参数的配置类,也同样采用了建造者模式,只有四个参数,分别是请求URL请求方法请求头请求体

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
class Request internal constructor(
@get:JvmName("url") val url: HttpUrl,
@get:JvmName("method") val method: String,
@get:JvmName("headers") val headers: Headers,
@get:JvmName("body") val body: RequestBody?,
internal val tags: Map<Class<*>, Any>
) {

open class Builder {
//请求的URL
internal var url: HttpUrl? = null
//请求方法,如:GET、POST..
internal var method: String
//请求头
internal var headers: Headers.Builder
//请求体
internal var body: RequestBody? = null
···省略代码···
}
}

Call

请求调用接口,表示这个请求已经准备好可以执行,也可以取消只能执行一次

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
interface Call : Cloneable {
/** 返回发起此调用的原始请求 */
fun request(): Request

/**
* 同步请求,立即执行。
*
* 抛出两种异常:
* 1. 请求失败抛出IOException;
* 2. 如果在执行过一回的前提下再次执行抛出IllegalStateException;*/
@Throws(IOException::class)
fun execute(): Response

/**
* 异步请求,将请求安排在将来的某个时间点执行。
* 如果在执行过一回的前提下再次执行抛出IllegalStateException */
fun enqueue(responseCallback: Callback)

/** 取消请求。已经完成的请求不能被取消 */
fun cancel()

/** 是否已被执行 */
fun isExecuted(): Boolean

/** 是否被取消 */
fun isCanceled(): Boolean

/** 一个完整Call请求流程的超时时间配置,默认选自[OkHttpClient.Builder.callTimeout] */
fun timeout(): Timeout

/** 克隆这个call,创建一个新的相同的Call */
public override fun clone(): Call

/** 利用工厂模式来让 OkHttpClient 来创建 Call对象 */
fun interface Factory {
fun newCall(request: Request): Call
}
}

RealCall

OkHttpClient 中,利用 newCall 方法来创建一个 Call 对象,但从源码中可以看出,newCall 方法返回的是一个 RealCall 对象。

1
2
3
4
5
6
7
8
9
10
11
@Override public void enqueue(Callback responseCallback) {
// 不能重复执行
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
captureCallStackTrace();
eventListener.callStart(this);
// 交给 dispatcher调度器 进行调度
client.dispatcher().enqueue(new AsyncCall(responseCallback));
}

synchronized (this) 确保每个call只能被执行一次不能重复执行,如果想要完全相同的call,可以进行克隆

1
2
3
4
@SuppressWarnings("CloneDoesntCallSuperClone")
@Override public RealCall clone() {
return RealCall.newRealCall(client, originalRequest, forWebSocket);
}

利用dispatcher调度器,来进行实际的执行client.dispatcher().enqueue(new AsyncCall(responseCallback))

1
2
override fun newCall(request: Request): Call = 
RealCall(this, request, forWebSocket = false)

RealCallCall接口的具体实现类,是应用端与网络层的连接桥,展示应用端原始的请求与连接数据,以及网络层返回的response及其它数据流。 通过使用方法也可知,创建RealCall对象后,就要调用同步或异步请求方法,所以它里面还包含同步请求 execute()异步请求 enqueue()方法。

AsyncCall

异步请求调用,是RealCall的一个内部类,就是一个Runnable,被调度器中的线程池所执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
inner class AsyncCall(
//用户传入的响应回调方法
private val responseCallback: Callback
) : Runnable {
//同一个域名的请求次数,volatile + AtomicInteger 保证在多线程下及时可见性与原子性
@Volatile var callsPerHost = AtomicInteger(0)
private set

fun reuseCallsPerHostFrom(other: AsyncCall) {
this.callsPerHost = other.callsPerHost
}

···省略代码···

fun executeOn(executorService: ExecutorService) {
client.dispatcher.assertThreadDoesntHoldLock()

var success = false
try {
//调用线程池执行
executorService.execute(this)
success = true
} catch (e: RejectedExecutionException) {
val ioException = InterruptedIOException("executor rejected")
ioException.initCause(e)
noMoreExchanges(ioException)
//请求失败,调用 Callback.onFailure() 方法
responseCallback.onFailure(this@RealCall, ioException)
} finally {
if (!success) {
//请求失败,调用调度器finish方法
client.dispatcher.finished(this) // This call is no longer running!
}
}
}

override fun run() {
threadName("OkHttp ${redactedUrl()}") {
var signalledCallback = false
timeout.enter()
try {
//请求成功,获取到服务器返回的response
val response = getResponseWithInterceptorChain()
signalledCallback = true
//调用 Callback.onResponse() 方法,将 response 传递出去
responseCallback.onResponse(this@RealCall, response)
} catch (e: IOException) {
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log("Callback failure for ${toLoggableString()}", Platform.INFO, e)
} else {
//请求失败,调用 Callback.onFailure() 方法
responseCallback.onFailure(this@RealCall, e)
}
} catch (t: Throwable) {
//请求出现异常,调用cancel方法来取消请求
cancel()
if (!signalledCallback) {
val canceledException = IOException("canceled due to $t")
canceledException.addSuppressed(t)
//请求失败,调用 Callback.onFailure() 方法
responseCallback.onFailure(this@RealCall, canceledException)
}
throw t
} finally {
//请求结束,调用调度器finish方法
client.dispatcher.finished(this)
}
}
}
}

Dispatcher

调度器,用来调度Call对象,同时包含线程池与异步请求队列,用来存放与执行AsyncCall对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class Dispatcher constructor() {
@get:Synchronized
@get:JvmName("executorService") val executorService: ExecutorService
get() {
if (executorServiceOrNull == null) {
//创建一个缓存线程池,来处理请求调用
executorServiceOrNull = ThreadPoolExecutor(0, Int.MAX_VALUE, 60, TimeUnit.SECONDS,
SynchronousQueue(), threadFactory("$okHttpName Dispatcher", false))
}
return executorServiceOrNull!!
}

/** 已准备好的异步请求队列 */
@get:Synchronized
private val readyAsyncCalls = ArrayDeque<AsyncCall>()

/** 正在运行的异步请求队列, 包含取消但是还未finish的AsyncCall */
private val runningAsyncCalls = ArrayDeque<AsyncCall>()

/** 正在运行的同步请求队列, 包含取消但是还未finish的RealCall */
private val runningSyncCalls = ArrayDeque<RealCall>()

···省略代码···
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
 // 执行异步请求
synchronized void enqueue(AsyncCall call) {
// 同时请求不能超过并发数(64,可配置调度器调整)
// okhttp会使用共享主机即 地址相同的会共享socket
// 同一个host最多允许5条线程通知执行请求
if (runningAsyncCalls.size() < maxRequests &&
runningCallsForHost(call) < maxRequestsPerHost) {
// 加入运行队列 并交给线程池执行
runningAsyncCalls.add(call);
// AsyncCall 是一个runnable,放到线程池中去执行,查看其execute实现
executorService().execute(call);
} else {
// 加入等候队列
readyAsyncCalls.add(call);
}
}

Dispatchercall加入到队列中,然后通过线程池来执行call

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
    // 同时能进行的最大请求数
private int maxRequests = 64;
// 同时请求的相同HOST的最大个数 SCHEME :// HOST [ ":" PORT ] [ PATH [ "?" QUERY ]]
// 如 https://restapi.amap.com restapi.amap.com - host
private int maxRequestsPerHost = 5;
/**
* Ready async calls in the order they'll be run.
* TODO 双端队列,支持首尾两端 双向开口可进可出,方便移除
* 异步等待队列
*
*/
private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();

/**
* Running asynchronous calls. Includes canceled calls that haven't finished yet.
* TODO 正在进行的异步队列
*/
private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();

executorService() 这个方法创建了一个线程池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public synchronized ExecutorService executorService() {
if (executorService == null) {
// 线程池的相关概念 需要理解
// 核心线程 最大线程 非核心线程闲置60秒回收 任务队列
executorService = new ThreadPoolExecutor(
0,
Integer.MAX_VALUE,
60, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
Util.threadFactory("OkHttp Dispatcher",false)
);
}
return executorService;
}

总结

对象 作用
Call 请求调用接口,表示这个请求已经准备好可以执行,也可以被取消,只能执行一次。
RealCall Call接口的具体实现类,是应用与网络层之间的连接桥,包含OkHttpClientRequest信息。
AsyncCall 异步请求调用,其实就是个Runnable,会被放到线程池中进行处理。
Dispatcher 调度器,用来调度Call对象,同时包含线程池与异步请求队列,用来存放与执行AsyncCall对象。
Request 请求类,包含urlmethodheadersbody
Response 网络层返回的响应数据。
Callback 响应回调函数接口,包含onFailureonResponse 两个方法。

流程分析

OkHttp请求过程中最少只需要接触OkHttpClient、Request、Call、Response,但是框架内部进行大量的逻辑处理。

image

所有的逻辑大部分集中在拦截器中,但是在进入拦截器之前还需要依靠分发器来调配请求任务。

  • 分发器:内部维护队列线程池内,完成请求调配
  • 拦截器:完成整个请求过程。

同步请求

分发器只记录请求,用于判断ldleRunnable是否需要执行

1
client.newCall(request).execute();

newCall方法就是创建一个RealCall对象,然后执行其execute()方法。

RealCall.kt

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
override fun execute(): Response {
//CAS判断是否已经被执行了, 确保只能执行一次,如果已经执行过,则抛出异常
check(executed.compareAndSet(false, true)) { "Already Executed" }

//请求超时开始计时
timeout.enter()
//开启请求监听
callStart()
try {
//调用调度器中的 executed() 方法,调度器只是将 call 加入到了runningSyncCalls队列中
client.dispatcher.executed(this)
//调用getResponseWithInterceptorChain 方法拿到 response
return getResponseWithInterceptorChain()
} finally {
//执行完毕,调度器将该 call 从 runningSyncCalls队列中移除
client.dispatcher.finished(this)
}
}

调用调度器executed方法,就是将当前的RealCall对象加入到runningSyncCalls队列中,然后调用getResponseWithInterceptorChain方法拿到response

真正执行请求的是getResponseWithInterceptorChain(); 然后通过回调将Response返回给用户。

值得注意的finally 执行了client.dispatcher().finished(this); 通过调度器移除队列,并且判断是否存在等待队列,如果存在,检查执行队列是否达到最大值,如果没有将等待队列变为执行队列。这样也就确保了等待队列被执行。

异步请求

image

1. 如何决定将请求放入ready还是running?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 执行异步请求
synchronized void enqueue(AsyncCall call) {
// 同时请求不能超过并发数(64,可配置调度器调整)
// okhttp会使用共享主机即 地址相同的会共享socket
// 同一个host最多允许5条线程通知执行请求
if (runningAsyncCalls.size() < maxRequests &&
runningCallsForHost(call) < maxRequestsPerHost) {
// 加入运行队列 并交给线程池执行
runningAsyncCalls.add(call);
// AsyncCall 是一个runnable,放到线程池中去执行,查看其execute实现
executorService().execute(call);
} else {
// 加入等候队列
readyAsyncCalls.add(call);
}
}
同时满足以下两个条件放入running立即执行,否则放入ready等待执行
- 正在执行的异步请求个数不大于64个(maxRequests)
- 同个域名的请求最大数不大于5个(maxRequestsPerHost)

2. 从Ready移动到Running的条件是什么?

1
2
3
4
5
6
7
8
9
override fun execute(): Response {
...
try {
// 拿到response
} finally {

client.dispatcher.finished(this)
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private fun <T> finished(calls: Deque<T>, call: T) {
val idleCallback: Runnable?
synchronized(this) {
//将当前请求调用从 正在运行队列 中移除
if (!calls.remove(call)) throw AssertionError("Call wasn't in-flight!")
idleCallback = this.idleCallback
}

// 从ready获取任务放到running执行
val isRunning = promoteAndExecute()

...

}

  • 每个请求执行完成就会从running移除,进行第一步相同逻辑的判断,尝试从ready中取出一个任务放到running执行

3. 分发器线程池是怎么定义的?

RealCall.kt

1
2
3
4
5
6
7
8
override fun enqueue(responseCallback: Callback) {
//CAS判断是否已经被执行了, 确保只能执行一次,如果已经执行过,则抛出异常
check(executed.compareAndSet(false, true)) { "Already Executed" }
//开启请求监听
callStart()
//新建一个AsyncCall对象,通过调度器enqueue方法加入到readyAsyncCalls队列中
client.dispatcher.enqueue(AsyncCall(responseCallback))
}

然后调用调度器的enqueue方法,

Dispatcher.kt

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
internal fun enqueue(call: AsyncCall) {
//加锁,保证线程安全
synchronized(this) {
//将该请求调用加入到 readyAsyncCalls 队列中
readyAsyncCalls.add(call)

// Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to
// the same host.
if (!call.call.forWebSocket) {
//通过域名来查找有没有相同域名的请求,有则复用。
val existingCall = findExistingCallWithHost(call.host)
if (existingCall != null) call.reuseCallsPerHostFrom(existingCall)
}
}
//执行请求
promoteAndExecute()
}


private fun promoteAndExecute(): Boolean {
this.assertThreadDoesntHoldLock()

val executableCalls = mutableListOf<AsyncCall>()
//判断是否有请求正在执行
val isRunning: Boolean
//加锁,保证线程安全
synchronized(this) {
//遍历 readyAsyncCalls 队列
val i = readyAsyncCalls.iterator()
while (i.hasNext()) {
val asyncCall = i.next()
//runningAsyncCalls 的数量不能大于最大并发请求数 64
if (runningAsyncCalls.size >= this.maxRequests) break // Max capacity.
//同域名最大请求数5,同一个域名最多允许5条线程同时执行请求
if (asyncCall.callsPerHost.get() >= this.maxRequestsPerHost) continue // Host max capacity.

//从 readyAsyncCalls 队列中移除,并加入到 executableCalls 及 runningAsyncCalls 队列中
i.remove()
asyncCall.callsPerHost.incrementAndGet()
executableCalls.add(asyncCall)
runningAsyncCalls.add(asyncCall)
}
//通过运行队列中的请求数量来判断是否有请求正在执行
isRunning = runningCallsCount() > 0
}

//遍历可执行队列,调用线程池来执行AsyncCall
for (i in 0 until executableCalls.size) {
val asyncCall = executableCalls[i]
asyncCall.executeOn(executorService)
}

return isRunning
}

调度器的enqueue方法就是将AsyncCall加入到readyAsyncCalls队列中,然后调用promoteAndExecute方法来执行请求,promoteAndExecute方法做的其实就是遍历readyAsyncCalls队列,然后将符合条件的请求用线程池执行,也就是会执行AsyncCall.run()方法。

AsyncCall 方法简单来说就是调用getResponseWithInterceptorChain方法拿到response,然后通过Callback.onResponse方法传递出去。反之,如果请求失败,捕获了异常,就通过Callback.onFailure将异常信息传递出去。 最终,请求结束,调用调度器finish方法。

Dispatcher.kt

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
/** 异步请求调用结束方法 */
internal fun finished(call: AsyncCall) {
call.callsPerHost.decrementAndGet()
finished(runningAsyncCalls, call)
}

/** 同步请求调用结束方法 */
internal fun finished(call: RealCall) {
finished(runningSyncCalls, call)
}

private fun <T> finished(calls: Deque<T>, call: T) {
val idleCallback: Runnable?
synchronized(this) {
//将当前请求调用从 正在运行队列 中移除
if (!calls.remove(call)) throw AssertionError("Call wasn't in-flight!")
idleCallback = this.idleCallback
}

//继续执行剩余请求,将call从readyAsyncCalls中取出加入到runningAsyncCalls,然后执行
val isRunning = promoteAndExecute()

if (!isRunning && idleCallback != null) {
//如果执行完了所有请求,处于闲置状态,调用闲置回调方法
idleCallback.run()
}
}

获取Response

接着就是看看getResponseWithInterceptorChain方法是如何拿到response的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
internal fun getResponseWithInterceptorChain(): Response {
//拦截器列表
val interceptors = mutableListOf<Interceptor>()
interceptors += client.interceptors
interceptors += RetryAndFollowUpInterceptor(client)
interceptors += BridgeInterceptor(client.cookieJar)
interceptors += CacheInterceptor(client.cache)
interceptors += ConnectInterceptor
if (!forWebSocket) {
interceptors += client.networkInterceptors
}
interceptors += CallServerInterceptor(forWebSocket)

//构建拦截器责任链
val chain = RealInterceptorChain(
call = this,
interceptors = interceptors,
index = 0,
exchange = null,
request = originalRequest,
connectTimeoutMillis = client.connectTimeoutMillis,
readTimeoutMillis = client.readTimeoutMillis,
writeTimeoutMillis = client.writeTimeoutMillis
)
//如果call请求完成,那就意味着交互完成了,没有更多的东西来交换了
var calledNoMoreExchanges = false
try {
//执行拦截器责任链来获取 response
val response = chain.proceed(originalRequest)
//如果被取消,关闭响应,抛出异常
if (isCanceled()) {
response.closeQuietly()
throw IOException("Canceled")
}
return response
} catch (e: IOException) {
calledNoMoreExchanges = true
throw noMoreExchanges(e) as Throwable
} finally {
if (!calledNoMoreExchanges) {
noMoreExchanges(null)
}
}
}

采用了责任链设计模式,通过拦截器构建了以RealInterceptorChain责任链,然后执行proceed方法来得到response

Interceptor

只声明了一个拦截器方法,在子类中具体实现,还包含一个Chain接口,核心方法是proceed(request)处理请求来获取response

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
fun interface Interceptor {
/** 拦截方法 */
@Throws(IOException::class)
fun intercept(chain: Chain): Response

interface Chain {
/** 原始请求数据 */
fun request(): Request

/** 核心方法,处理请求,获取response */
@Throws(IOException::class)
fun proceed(request: Request): Response

fun connection(): Connection?

fun call(): Call

fun connectTimeoutMillis(): Int

fun withConnectTimeout(timeout: Int, unit: TimeUnit): Chain

fun readTimeoutMillis(): Int

fun withReadTimeout(timeout: Int, unit: TimeUnit): Chain

fun writeTimeoutMillis(): Int

fun withWriteTimeout(timeout: Int, unit: TimeUnit): Chain
}
}

RealInterceptorChain

拦截器链就是实现Interceptor.Chain接口,重点就是复写的proceed方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
class RealInterceptorChain(
internal val call: RealCall,
private val interceptors: List<Interceptor>,
private val index: Int,
internal val exchange: Exchange?,
internal val request: Request,
internal val connectTimeoutMillis: Int,
internal val readTimeoutMillis: Int,
internal val writeTimeoutMillis: Int
) : Interceptor.Chain {

···省略代码···
private var calls: Int = 0
override fun call(): Call = call
override fun request(): Request = request

@Throws(IOException::class)
override fun proceed(request: Request): Response {
check(index < interceptors.size)

calls++

if (exchange != null) {
check(exchange.finder.sameHostAndPort(request.url)) {
"network interceptor ${interceptors[index - 1]} must retain the same host and port"
}
check(calls == 1) {
"network interceptor ${interceptors[index - 1]} must call proceed() exactly once"
}
}

//index+1, 复制创建新的责任链,也就意味着调用责任链中的下一个处理者,也就是下一个拦截器
val next = copy(index = index + 1, request = request)
//取出当前拦截器
val interceptor = interceptors[index]

//执行当前拦截器的拦截方法
@Suppress("USELESS_ELVIS")
val response = interceptor.intercept(next) ?: throw NullPointerException(
"interceptor $interceptor returned null")

if (exchange != null) {
check(index + 1 >= interceptors.size || next.calls == 1) {
"network interceptor $interceptor must call proceed() exactly once"
}
}

check(response.body != null) { "interceptor $interceptor returned a response with no body" }

return response
}
}

链式调用,最终会执行拦截器列表中的每个拦截器,返回Response

拦截器

  1. client.interceptors:这是由开发者设置的,会在所有的拦截器处理之前进行最早的拦截处理,可用于添加一些公共参数,如自定义header自定义log等等。
  2. RetryAndFollowUpInterceptor:这里会对连接做一些初始化工作,以及请求失败的重试工作,重定向的后续请求工作。跟他的名字一样,就是做重试工作还有一些连接跟踪工作。
  3. BridgeInterceptor:是客户端与服务器之间的沟通桥梁,负责将用户构建的请求转换为服务器需要的请求,以及将网络请求返回回来的响应转换为用户可用的响应。
  4. CacheInterceptor:这里主要是缓存的相关处理,会根据用户在OkHttpClient里定义的缓存配置,然后结合请求新建一个缓存策略,由它来判断是使用网络还是缓存来构建response
  5. ConnectInterceptor:这里主要就是负责建立连接,会建立TCP连接或者TLS连接
  6. client.networkInterceptors:这里也是开发者自己设置的,所以本质上和第一个拦截器差不多,但是由于位置不同,所以用处也不同。
  7. CallServerInterceptor:这里就是进行网络数据的请求和响应了,也就是实际的网络I/O操作,将请求头与请求体发送给服务器,以及解析服务器返回的response

client.interceptors

这是用户自己定义的拦截器,称为应用拦截器,会保存在OkHttpClientinterceptors: List<Interceptor>列表中。 他是拦截器责任链中的第一个拦截器,也就是说会第一个执行拦截方法,可以通过它来添加自定义Header信息,如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
class HeaderInterceptor implements Interceptor {
@Override
public Response intercept(Chain chain) throws IOException {
Request request = chain.request().newBuilder()
.addHeader("device-android", "xxxxxxxxxxx")
.addHeader("country-code", "ZH")
.build();
return chain.proceed(request);
}
}

//然后在 OkHttpClient 中加入
OkHttpClient client = new OkHttpClient.Builder()
.connectTimeout(60, TimeUnit.SECONDS)
.readTimeout(15, TimeUnit.SECONDS)
.writeTimeout(15, TimeUnit.SECONDS)
.cookieJar(new MyCookieJar())
.addInterceptor(new HeaderInterceptor())//添加自定义Header拦截器
.build();

RetryAndFollowUpInterceptor

第二个拦截器,负责请求失败的重试工作与重定向的后续请求工作,同时它会对连接做一些初始化工作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
class RetryAndFollowUpInterceptor(private val client: OkHttpClient) : Interceptor {

@Throws(IOException::class)
override fun intercept(chain: Interceptor.Chain): Response {
val realChain = chain as RealInterceptorChain
var request = chain.request
val call = realChain.call
var followUpCount = 0
var priorResponse: Response? = null
var newExchangeFinder = true
var recoveredFailures = listOf<IOException>()
while (true) {
//这里会新建一个ExchangeFinder,ConnectInterceptor会使用到
call.enterNetworkInterceptorExchange(request, newExchangeFinder)

var response: Response
var closeActiveExchange = true
try {
if (call.isCanceled()) {
throw IOException("Canceled")
}

try {
response = realChain.proceed(request)
newExchangeFinder = true
} catch (e: RouteException) {
//尝试通过路由连接失败。该请求将不会被发送。
if (!recover(e.lastConnectException, call, request, requestSendStarted = false)) {
throw e.firstConnectException.withSuppressed(recoveredFailures)
} else {
recoveredFailures += e.firstConnectException
}
newExchangeFinder = false
continue
} catch (e: IOException) {
//尝试与服务器通信失败。该请求可能已发送。
if (!recover(e, call, request, requestSendStarted = e !is ConnectionShutdownException)) {
throw e.withSuppressed(recoveredFailures)
} else {
recoveredFailures += e
}
newExchangeFinder = false
continue
}

// Attach the prior response if it exists. Such responses never have a body.
//尝试关联上一个response,注意:body是为null
if (priorResponse != null) {
response = response.newBuilder()
.priorResponse(priorResponse.newBuilder()
.body(null)
.build())
.build()
}

val exchange = call.interceptorScopedExchange
//会根据 responseCode 来判断,构建一个新的request并返回来重试或者重定向
val followUp = followUpRequest(response, exchange)

if (followUp == null) {
if (exchange != null && exchange.isDuplex) {
call.timeoutEarlyExit()
}
closeActiveExchange = false
return response
}
//如果请求体是一次性的,不需要再次重试
val followUpBody = followUp.body
if (followUpBody != null && followUpBody.isOneShot()) {
closeActiveExchange = false
return response
}

response.body?.closeQuietly()

//最大重试次数,不同的浏览器是不同的,比如:Chrome为21,Safari则是16
if (++followUpCount > MAX_FOLLOW_UPS) {
throw ProtocolException("Too many follow-up requests: $followUpCount")
}

request = followUp
priorResponse = response
} finally {
call.exitNetworkInterceptorExchange(closeActiveExchange)
}
}
}

/** 判断是否要进行重连,false->不尝试重连;true->尝试重连。*/
private fun recover(
e: IOException,
call: RealCall,
userRequest: Request,
requestSendStarted: Boolean
): Boolean {
//客户端禁止重试
if (!client.retryOnConnectionFailure) return false

//不能再次发送该请求体
if (requestSendStarted && requestIsOneShot(e, userRequest)) return false

//发生的异常是致命的,无法恢复,如:ProtocolException
if (!isRecoverable(e, requestSendStarted)) return false

//没有更多的路由来尝试重连
if (!call.retryAfterFailure()) return false

// 对于失败恢复,使用带有新连接的相同路由选择器
return true
}
···省略代码···

BridgeInterceptor

是客户端与服务器之间的沟通桥梁,负责将用户构建的请求转换为服务器需要的请求,比如:添加 Content-Type,添加 Cookie,添加User-Agent等等。再将服务器返回的response做一些处理转换为客户端需要的response。比如:移除响应头中的Content-EncodingContent-Length等等。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
class BridgeInterceptor(private val cookieJar: CookieJar) : Interceptor {

@Throws(IOException::class)
override fun intercept(chain: Interceptor.Chain): Response {
//获取原始请求数据
val userRequest = chain.request()
val requestBuilder = userRequest.newBuilder()
//重新构建请求头,请求体信息
val body = userRequest.body

val contentType = body.contentType()
requestBuilder.header("Content-Type", contentType.toString())
requestBuilder.header("Content-Length", contentLength.toString())
requestBuilder.header("Transfer-Encoding", "chunked")
requestBuilder.header("Host", userRequest.url.toHostHeader())
requestBuilder.header("Connection", "Keep-Alive")

···省略代码···

//添加cookie
val cookies = cookieJar.loadForRequest(userRequest.url)
if (cookies.isNotEmpty()) {
requestBuilder.header("Cookie", cookieHeader(cookies))
}
//添加user-agent
if (userRequest.header("User-Agent") == null) {
requestBuilder.header("User-Agent", userAgent)
}
//重新构建一个Request,然后执行下一个拦截器来处理该请求
val networkResponse = chain.proceed(requestBuilder.build())

cookieJar.receiveHeaders(userRequest.url, networkResponse.headers)

//创建一个新的responseBuilder,目的是将原始请求数据构建到response中
val responseBuilder = networkResponse.newBuilder()
.request(userRequest)

if (transparentGzip &&
"gzip".equals(networkResponse.header("Content-Encoding"), ignoreCase = true) &&
networkResponse.promisesBody()) {
val responseBody = networkResponse.body
if (responseBody != null) {
val gzipSource = GzipSource(responseBody.source())
val strippedHeaders = networkResponse.headers.newBuilder()
.removeAll("Content-Encoding")
.removeAll("Content-Length")
.build()
//修改response header信息,移除Content-Encoding,Content-Length信息
responseBuilder.headers(strippedHeaders)
val contentType = networkResponse.header("Content-Type")
//修改response body信息
responseBuilder.body(RealResponseBody(contentType, -1L, gzipSource.buffer()))
}
}

return responseBuilder.build()

···省略代码···

CacheInterceptor

用户可以通过OkHttpClient.cache来配置缓存,缓存拦截器通过CacheStrategy来判断是使用网络还是缓存来构建response

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
class CacheInterceptor(internal val cache: Cache?) : Interceptor {

@Throws(IOException::class)
override fun intercept(chain: Interceptor.Chain): Response {
val call = chain.call()
//通过request从OkHttpClient.cache中获取缓存
val cacheCandidate = cache?.get(chain.request())

val now = System.currentTimeMillis()
//创建一个缓存策略,用来确定怎么使用缓存
val strategy = CacheStrategy.Factory(now, chain.request(), cacheCandidate).compute()
//为空表示不使用网络,反之,则表示使用网络
val networkRequest = strategy.networkRequest
//为空表示不使用缓存,反之,则表示使用缓存
val cacheResponse = strategy.cacheResponse
//追踪网络与缓存的使用情况
cache?.trackResponse(strategy)
val listener = (call as? RealCall)?.eventListener ?: EventListener.NONE
//有缓存但不适用,关闭它
if (cacheCandidate != null && cacheResponse == null) {
cacheCandidate.body?.closeQuietly()
}

//如果网络被禁止,但是缓存又是空的,构建一个code为504的response,并返回
if (networkRequest == null && cacheResponse == null) {
return Response.Builder()
.request(chain.request())
.protocol(Protocol.HTTP_1_1)
.code(HTTP_GATEWAY_TIMEOUT)
.message("Unsatisfiable Request (only-if-cached)")
.body(EMPTY_RESPONSE)
.sentRequestAtMillis(-1L)
.receivedResponseAtMillis(System.currentTimeMillis())
.build().also {
listener.satisfactionFailure(call, it)
}
}

//如果我们禁用了网络不使用网络,且有缓存,直接根据缓存内容构建并返回response
if (networkRequest == null) {
return cacheResponse!!.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.build().also {
listener.cacheHit(call, it)
}
}
//为缓存添加监听
if (cacheResponse != null) {
listener.cacheConditionalHit(call, cacheResponse)
} else if (cache != null) {
listener.cacheMiss(call)
}

var networkResponse: Response? = null
try {
//责任链往下处理,从服务器返回response 赋值给 networkResponse
networkResponse = chain.proceed(networkRequest)
} finally {
//捕获I/O或其他异常,请求失败,networkResponse为空,且有缓存的时候,不暴露缓存内容。
if (networkResponse == null && cacheCandidate != null) {
cacheCandidate.body?.closeQuietly()
}
}

//如果有缓存
if (cacheResponse != null) {
//且网络返回response code为304的时候,使用缓存内容新构建一个Response返回。
if (networkResponse?.code == HTTP_NOT_MODIFIED) {
val response = cacheResponse.newBuilder()
.headers(combine(cacheResponse.headers, networkResponse.headers))
.sentRequestAtMillis(networkResponse.sentRequestAtMillis)
.receivedResponseAtMillis(networkResponse.receivedResponseAtMillis)
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build()

networkResponse.body!!.close()

// Update the cache after combining headers but before stripping the
// Content-Encoding header (as performed by initContentStream()).
cache!!.trackConditionalCacheHit()
cache.update(cacheResponse, response)
return response.also {
listener.cacheHit(call, it)
}
} else {
//否则关闭缓存响应体
cacheResponse.body?.closeQuietly()
}
}

//构建网络请求的response
val response = networkResponse!!.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build()

//如果cache不为null,即用户在OkHttpClient中配置了缓存,则将上一步新构建的网络请求response存到cache中
if (cache != null) {
//根据response的code,header以及CacheControl.noStore来判断是否可以缓存
if (response.promisesBody() && CacheStrategy.isCacheable(response, networkRequest)) {
// 将该response存入缓存
val cacheRequest = cache.put(response)
return cacheWritingResponse(cacheRequest, response).also {
if (cacheResponse != null) {
listener.cacheMiss(call)
}
}
}
//根据请求方法来判断缓存是否有效,只对Get请求进行缓存,其它方法的请求则移除
if (HttpMethod.invalidatesCache(networkRequest.method)) {
try {
//缓存无效,将该请求缓存从client缓存配置中移除
cache.remove(networkRequest)
} catch (_: IOException) {
// The cache cannot be written.
}
}
}

return response
}

···省略代码···
  1. 如果从缓存获取的 Response 是nul,那就需要使用网络请求获取响应
  2. 如果是Https请求,但是又丢失了握手信息,那也不能使用缓存,需要进行网络请求
  3. 如果判断响应码不能缓存且响应头有 no-store 标识,那就需要进行网络请求
  4. 如果请求头有 no-cache 标识或者有 If-Modified-since/If-None-Match,那么需要进行网络请求
  5. 如果响应头没有 no-cache 标识,且缓存时间没有超过极限时间,那么可以使用缓存,不需要进行网络请求
  6. 如果缓存过期了,判断响应头是否设置 Etag/Last-Modified/Date,没有那就直接使用网络请求否则需要考虑服务器返回304;

并且,只要需要进行网络请求,请求头中就不能包含only-if-cached,否则框架直接返回504

ConnectInterceptor

负责实现与服务器真正建立起连接

1
2
3
4
5
6
7
8
9
10
11
12
kotlin 代码解读复制代码object ConnectInterceptor : Interceptor {
@Throws(IOException::class)
override fun intercept(chain: Interceptor.Chain): Response {
val realChain = chain as RealInterceptorChain
//初始化一个exchange对象
val exchange = realChain.call.initExchange(chain)
//根据这个exchange对象来复制创建一个新的连接责任链
val connectedChain = realChain.copy(exchange = exchange)
//执行该连接责任链
return connectedChain.proceed(realChain.request)
}
}
  1. 初始化一个exchange对象。
  2. 然后根据这个exchange对象来复制创建一个新的连接责任链。
  3. 执行该连接责任链。

RealCall.kt

1
2
3
4
5
6
7
8
9
10
11
internal fun initExchange(chain: RealInterceptorChain): Exchange {
...省略代码...
//这里的exchangeFinder就是在RetryAndFollowUpInterceptor中创建的
val exchangeFinder = this.exchangeFinder!!
//返回一个ExchangeCodec(是个编码器,为request编码以及为response解码)
val codec = exchangeFinder.find(client, chain)
//根据exchangeFinder与codec新构建一个Exchange对象,并返回
val result = Exchange(this, eventListener, exchangeFinder, codec)
...省略代码...
return result
}

ExchangeFinder.kt

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
fun find(
client: OkHttpClient,
chain: RealInterceptorChain
): ExchangeCodec {
try {
//查找合格可用的连接,返回一个 RealConnection 对象
val resultConnection = findHealthyConnection(
connectTimeout = chain.connectTimeoutMillis,
readTimeout = chain.readTimeoutMillis,
writeTimeout = chain.writeTimeoutMillis,
pingIntervalMillis = client.pingIntervalMillis,
connectionRetryEnabled = client.retryOnConnectionFailure,
doExtensiveHealthChecks = chain.request.method != "GET"
)
//根据连接,创建并返回一个请求响应编码器:Http1ExchangeCodec 或者 Http2ExchangeCodec,分别对应Http1协议与Http2协议
return resultConnection.newCodec(client, chain)
} catch (e: RouteException) {
trackFailure(e.lastConnectException)
throw e
} catch (e: IOException) {
trackFailure(e)
throw RouteException(e)
}
}

ExchangeFinder.kt

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
private fun findHealthyConnection(
connectTimeout: Int,
readTimeout: Int,
writeTimeout: Int,
pingIntervalMillis: Int,
connectionRetryEnabled: Boolean,
doExtensiveHealthChecks: Boolean
): RealConnection {
while (true) {
//重点:查找连接
val candidate = findConnection(
connectTimeout = connectTimeout,
readTimeout = readTimeout,
writeTimeout = writeTimeout,
pingIntervalMillis = pingIntervalMillis,
connectionRetryEnabled = connectionRetryEnabled
)
//检查该连接是否合格可用,合格则直接返回该连接
if (candidate.isHealthy(doExtensiveHealthChecks)) {
return candidate
}
//如果该连接不合格,标记为不可用,从连接池中移除
candidate.noNewExchanges()
...省略代码...
}
}

通过findConnection方法来查找连接,找到连接后判断是否是合格可用的,合格就直接返回该连接。

所以核心方法就是findConnection

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
private fun findConnection(
connectTimeout: Int,
readTimeout: Int,
writeTimeout: Int,
pingIntervalMillis: Int,
connectionRetryEnabled: Boolean
): RealConnection {
if (call.isCanceled()) throw IOException("Canceled")

//第一次,尝试重连 call 中的 connection,不需要去重新获取连接
val callConnection = call.connection // This may be mutated by releaseConnectionNoEvents()!
if (callConnection != null) {
var toClose: Socket? = null
synchronized(callConnection) {
if (callConnection.noNewExchanges || !sameHostAndPort(callConnection.route().address.url)) {
toClose = call.releaseConnectionNoEvents()
}
}

//如果 call 中的 connection 还没有释放,就重用它。
if (call.connection != null) {
check(toClose == null)
return callConnection
}

//如果 call 中的 connection 已经被释放,关闭Socket.
toClose?.closeQuietly()
eventListener.connectionReleased(call, callConnection)
}

//需要一个新的连接,所以重置一些状态
refusedStreamCount = 0
connectionShutdownCount = 0
otherFailureCount = 0

//第二次,尝试从连接池中获取一个连接,不带路由,不带多路复用
if (connectionPool.callAcquirePooledConnection(address, call, null, false)) {
val result = call.connection!!
eventListener.connectionAcquired(call, result)
return result
}

//连接池中是空的,准备下次尝试连接的路由
val routes: List<Route>?
val route: Route

...省略代码...

//第三次,再次尝试从连接池中获取一个连接,带路由,不带多路复用
if (connectionPool.callAcquirePooledConnection(address, call, routes, false)) {
val result = call.connection!!
eventListener.connectionAcquired(call, result)
return result
}

route = localRouteSelection.next()
}

//第四次,手动创建一个新连接
val newConnection = RealConnection(connectionPool, route)
call.connectionToCancel = newConnection
try {
newConnection.connect(
connectTimeout,
readTimeout,
writeTimeout,
pingIntervalMillis,
connectionRetryEnabled,
call,
eventListener
)
} finally {
call.connectionToCancel = null
}
call.client.routeDatabase.connected(newConnection.route())

//第五次,再次尝试从连接池中获取一个连接,带路由,带多路复用。
//这一步主要是为了校验一下,比如已经有了一条连接了,就可以直接复用,而不用使用手动创建的新连接。
if (connectionPool.callAcquirePooledConnection(address, call, routes, true)) {
val result = call.connection!!
nextRouteToTry = route
newConnection.socket().closeQuietly()
eventListener.connectionAcquired(call, result)
return result
}

synchronized(newConnection) {
//将手动创建的新连接放入连接池
connectionPool.put(newConnection)
call.acquireConnectionNoEvents(newConnection)
}

eventListener.connectionAcquired(call, newConnection)
return newConnection
}

一共做了5次尝试去得到连接:

  1. 第一次,尝试重连 call 中的 connection,不需要去重新获取连接。
  2. 第二次,尝试从连接池中获取一个连接,不带路由,不带多路复用。
  3. 第三次,再次尝试从连接池中获取一个连接,带路由,不带多路复用。
  4. 第四次,手动创建一个新连接。
  5. 第五次,再次尝试从连接池中获取一个连接,带路由,带多路复用。

client.networkInterceptors

该拦截器称为网络拦截器,与client.interceptors一样也是由用户自己定义的,同样是以列表的形式存在OkHttpClient中。

他们的不同都是由于他们所处的位置不同所导致的,应用拦截器处于第一个位置,所以无论如何它都会被执行,而且只会执行一次。而网络拦截器处于倒数第二的位置,它不一定会被执行,而且可能会被执行多次,比如:在RetryAndFollowUpInterceptor失败或者CacheInterceptor直接返回缓存的情况下,网络拦截器是不会被执行的。

CallServerInterceptor

到了这里,客户端与服务器已经建立好了连接,接着就是将请求头与请求体发送给服务器,以及解析服务器返回的response了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
class CallServerInterceptor(private val forWebSocket: Boolean) : Interceptor {

@Throws(IOException::class)
override fun intercept(chain: Interceptor.Chain): Response {
val realChain = chain as RealInterceptorChain
val exchange = realChain.exchange!!
val request = realChain.request
val requestBody = request.body
var invokeStartEvent = true
var responseBuilder: Response.Builder? = null
try {
//写入请求头
exchange.writeRequestHeaders(request)
//如果不是GET请求,并且请求体不为空
if (HttpMethod.permitsRequestBody(request.method) && requestBody != null) {
//当请求头为"Expect: 100-continue"时,在发送请求体之前需要等待服务器返回"HTTP/1.1 100 Continue" 的response,如果没有等到该response,就不发送请求体。
//POST请求,先发送请求头,在获取到100继续状态后继续发送请求体
if ("100-continue".equals(request.header("Expect"), ignoreCase = true)) {
//刷新请求,即发送请求头
exchange.flushRequest()
//解析响应头
responseBuilder = exchange.readResponseHeaders(expectContinue = true)
exchange.responseHeadersStart()
invokeStartEvent = false
}
//写入请求体
if (responseBuilder == null) {
if (requestBody.isDuplex()) {
//如果请求体是双公体,就先发送请求头,稍后在发送请求体
exchange.flushRequest()
val bufferedRequestBody = exchange.createRequestBody(request, true).buffer()
//写入请求体
requestBody.writeTo(bufferedRequestBody)
} else {
//如果获取到了"Expect: 100-continue"响应,写入请求体
val bufferedRequestBody = exchange.createRequestBody(request, false).buffer()
requestBody.writeTo(bufferedRequestBody)
bufferedRequestBody.close()
}
···省略代码···
//请求结束,发送请求体
exchange.finishRequest()
···省略代码···

try {
if (responseBuilder == null) {
//读取响应头
responseBuilder = exchange.readResponseHeaders(expectContinue = false)!!
···省略代码···
//构建一个response
var response = responseBuilder
.request(request)
.handshake(exchange.connection.handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build()
var code = response.code
···省略代码···
return response

···省略代码···

写入发送请求头,然后根据条件是否写入发送请求体,请求结束。解析服务器返回的请求头,然后构建一个新的response,并返回。 这里CallServerInterceptor是拦截器责任链中最后一个拦截器了,所以他不会再调用chain.proceed()方法往下执行,而是将这个构建的response往上传递给责任链中的每个拦截器。

总结

image

  1. **addInterceptor(Interceptor)**,处理原始请求和最终的响应:可以添加自定义header、通用参数、参数加密、网关接入。由开发者设置的,会按照开发者的要求,在所有的拦截器处理之前进行最早的拦截处理,比如一些公共参数,Header都可以在这里添加。
  2. RetryAndFollowUpInterceptor,处理错误重试和重定向,这里会对连接做一些初始化工作,以及请求失败的充实工作,重定向的后续请求工作。跟他的名字一样,就是做重试工作还有一些连接跟踪工作。
  3. BridgeInterceptor,应用层和网络层的桥接拦截器,主要工作是为请求添加cookie、添加固定的header,比如Host、Content-Length、Content-Type、User-Agent等等,然后保存响应结果的cookie,如果响应使用gzip压缩过,则还需要进行解压。为用户构建一个能够进行网络访问的请求,同时后续工作将网络请求回来的响应Response转化为用户可用的Response。
  4. CacheInterceptor,这里主要是处理cache相关处理,会根据OkHttpClient对象的配置以及缓存策略对请求值进行缓存,而且如果本地有了可⽤的Cache,就可以在没有网络交互的情况下就返回缓存结果。如果命中缓存则不会发起网络请求
  5. ConnectInterceptor,连接拦截器,内部会维护一个连接池,负责连接复用、创建连接(三次握手等等)、释放连接以及创建连接上的socket流。建立TCP连接或者TLS连接,以及负责编码解码的HttpCodec
  6. networkInterceptors,开发者自己设置的,所以本质上和第一个拦截器差不多,但是由于位置不同,所以用处也不同。这个位置添加的拦截器可以看到请求和响应的数据了,所以可以做一些网络调试。
  7. CallServerInterceptor,这里就是进行网络数据的请求和响应了,也就是实际的网络I/O操作,通过socket读写数据。

完整流程图

image

反思

设计模式

  1. 建造者模式:不论是在OkHttpClientRequest还是Response中都用到了建造者模式,因为这几个类中都有很多参数,需要供用户选择需要的参数来构建其想要的实例
  2. 工厂方法模式:帮助生成复杂对象,如: OkHttpClient.newCall(request Request) 来创建 Call 对象
  3. 责任链模式:将7个拦截器构成拦截器责任链,然后按顺序从上往下执行,得到Response后,从下往上传回去。
    1. 对象行为型模式,为请求创建了一个接收者对象的链,在处理请求的时候执行过滤(各司其职)。
    2. 任链上的处理者负责处理请求,客户只需要将请求发送到责任链即可,无须关心请求的处理细节和请求的传递,所以职责链将请求的发送者和请求的处理者解耦了。

线程安全

AsyncCall 类中的 callsPerHost 变量,使用了 Volatile + AtomicInteger来修饰,从而保证在多线程下的线程安全。

1
2
3
4
5
6
inner class AsyncCall( private val responseCallback: Callback) : Runnable {
//同一个域名的请求次数,volatile + AtomicInteger 保证在多线程下及时可见性与原子性
@Volatile var callsPerHost = AtomicInteger(0)
private set
...省略代码...
}

数据结构

为什么readyAsyncCalls runningAsyncCalls runningSyncCalls采用ArrayDeque

  1. 他们都是用来存放网络请求的,这些请求需要做到先到先得,所以采用队列。

  2. 根据代码所示,当执行enqueue时,我们需要遍历readyAsyncCalls,将符合执行条件的Call加入到runningAsyncCalls,这相对比于链表来说,数组的查找效率要更高,所以采用ArrayDeque

ArrayBlockQueque

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
ArrayBlofkingQueue<Runnable> queue = new ArrayBlockingQueue<>(1);

ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(
0,
Integer.MAX_VALUE,
keepAliveTime:60,TimeUnit.SEcONDs,queue
);

poolExecutor.execute(() → {
System.out.println("任务1:"+ Thread.currentThread());

while(true) {}
});

poolExecutor.execute(() → {
System.out.println("任务2:"+ Thread.currentThread());
});

poolExecutor.execute(() → {
System.out.println("任务2:"+ Thread.currentThread());
});

执行结果为

1
2
3
任务1:Thread[pool-1-thread-1,5,main]
任务3:Thread[pool-1-thread-2,5,main]
任务2:Thread[pool-1-thread-2,5,main]
  1. 任务1立即执行且不会结束(死循环)
  2. 任务2加到等待队列中(queue)
  3. 任务3加到等待队列中,但由于队列数为1从而添加失败
  4. 当前执行的任务数小于Integer.MAX_VALUE,新开一个线程执行
  5. 任务3执行结束后从等待队列中取到任务2并执行

如果只有任务1和任务2那么只会打印“任务1”。因为任务1立即执行且不会结束,任务2加入了等待队列但永远不会被取出

LinkedBlockingQueue同理

怎么实现连接池

一些概念:

  1. Connection,实现为RealConnection:连接,抽象概念,内部维护了Socket
  2. ConnectionPool,持有RealConnectionPool:连接池,管理连接的复用
  3. Exchange:交换器(管理请求和响应、持有ExchangeCodec
  4. ExchangeCodec:编解码器,用于编码请求,解码响应,实现有Http1ExchangeCodecHttp2ExchangeCodec
  5. HTTP 1.1:引入keep-alive机制,支持连接保活,可以多个请求复用一个连接,但请求是串行的
  6. HTTP 2.0:支持多路复用,一个连接的多个请求可以并行

为什么需要连接池?

频繁的进行建立Sokcet连接和断开Socket是非常消耗网络资源和浪费时间的,所以HTTP中的keepalive连接对于降低延迟和提升速度有非常重要的作用。keepalive机制可以在一次TCP连接中可以持续发送多份数据而不会断开连接。所以连接的多次使用,也就是复用就变得格外重要了,而复用连接就需要对连接进行管理,于是就有了连接池的概念。

中使用ConectionPool实现连接池,默认支持5个并发KeepAlive,默认链路生命为5分钟。

怎么实现的?

  1. ConectionPool中维护了一个双端队列Deque,也就是两端都可以进出的队列,用来存储连接。

  2. ConnectInterceptor,也就是负责建立连接的拦截器中,首先会找可用连接,也就是从连接池中去获取连接,具体的就是会调用到ConectionPool的get方法。

1
2
3
4
5
6
7
8
9
10
RealConnection get(Address address, StreamAllocation streamAllocation, Route route) {
assert (Thread.holdsLock(this));
for (RealConnection connection : connections) {
if (connection.isEligible(address, route)) {
streamAllocation.acquire(connection, true);
return connection;
}
}
return null;
}

遍历了双端队列,如果连接有效,就会调用acquire方法计数并返回这个连接。

  1. 如果没找到可用连接,就会创建新连接,并会把这个建立的连接加入到双端队列中,同时开始运行线程池中的线程,其实就是调用了ConectionPool的put方法。
1
2
3
4
5
6
7
8
9
10
public final class ConnectionPool {
void put(RealConnection connection) {
if (!cleanupRunning) {
//没有连接的时候调用
cleanupRunning = true;
executor.execute(cleanupRunnable);
}
connections.add(connection);
}
}

这个线程池中只有一个线程,是用来清理连接的,也就是上述的cleanupRunnable

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
private final Runnable cleanupRunnable = new Runnable() {
@Override
public void run() {
while (true) {
//执行清理,并返回下次需要清理的时间。
long waitNanos = cleanup(System.nanoTime());
if (waitNanos == -1) return;
if (waitNanos > 0) {
long waitMillis = waitNanos / 1000000L;
waitNanos -= (waitMillis * 1000000L);
synchronized (ConnectionPool.this) {
//在timeout时间内释放锁
try {
ConnectionPool.this.wait(waitMillis, (int) waitNanos);
} catch (InterruptedException ignored) {

}
}
}
}
}
};

这个runnable会不停的调用cleanup方法清理线程池,并返回下一次清理的时间间隔,然后进入wait等待。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
long cleanup(long now) {
synchronized (this) {
//遍历连接
for (Iterator<RealConnection> i = connections.iterator(); i.hasNext(); ) {
RealConnection connection = i.next();

//检查连接是否是空闲状态,
//不是,则inUseConnectionCount + 1
//是 ,则idleConnectionCount + 1
if (pruneAndGetAllocationCount(connection, now) > 0) {
inUseConnectionCount++;
continue;
}

idleConnectionCount++;

// If the connection is ready to be evicted, we're done.
long idleDurationNs = now - connection.idleAtNanos;
if (idleDurationNs > longestIdleDurationNs) {
longestIdleDurationNs = idleDurationNs;
longestIdleConnection = connection;
}
}

//如果超过keepAliveDurationNs或maxIdleConnections,
//从双端队列connections中移除
if (longestIdleDurationNs >= this.keepAliveDurationNs || idleConnectionCount > this.maxIdleConnections) {
connections.remove(longestIdleConnection);
} else if (idleConnectionCount > 0) {
//如果空闲连接次数>0,返回将要到期的时间
// A connection will be ready to evict soon.
return keepAliveDurationNs - longestIdleDurationNs;
} else if (inUseConnectionCount > 0) {
// 连接依然在使用中,返回保持连接的周期5分钟
return keepAliveDurationNs;
} else {
// No connections, idle or in use.
cleanupRunning = false;
return -1;
}
}

closeQuietly(longestIdleConnection.socket());

// Cleanup again immediately.
return 0;
}

也就是当如果空闲连接maxIdleConnections超过5个或者keepalive时间大于5分钟,则将该连接清理掉。

怎样属于空闲连接?

acquire计数方法:

1
2
3
4
5
6
7
8
public void acquire(RealConnection connection, boolean reportedAcquired) {
assert (Thread.holdsLock(connectionPool));
if (this.connection != null) throw new IllegalStateException();

this.connection = connection;
this.reportedAcquired = reportedAcquired;
connection.allocations.add(new StreamAllocationReference(this, callStackTrace));
}

RealConnection中,有一个StreamAllocation虚引用列表allocations。每创建一个连接,就会把连接对应的StreamAllocationReference添加进该列表中,如果连接关闭以后就将该对象移除。

主要就是管理双端队列Deque<RealConnection>,可以用的连接就直接用,然后定期清理连接,同时通过对StreamAllocation的引用计数实现自动回收。

缓存

缓存的实现是基于请求和响应的header来做的。CacheStrategy即缓存策略,CacheInterceptor拦截器会根据他拿到网络请求networkRequest、缓存响应cacheResponse,从而决定是使用网络还是缓存。

CacheStrategy.java
内部类工厂,生产CacheStrategy

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
static class Factory {
//一些字段:servedDate、lastModified、expires、etag...
Factory(long nowMillis, Request request, Response cacheResponse) {
this.nowMillis = nowMillis;
this.request = request;
this.cacheResponse = cacheResponse;
if (cacheResponse != null) {
//解析cacheResponse,把参数赋值给自己的成员变量
this.sentRequestMillis = cacheResponse.sentRequestAtMillis();
//...
Headers headers = cacheResponse.headers();
for (int i = 0, size = headers.size(); i < size; i++) {
String fieldName = headers.name(i);
String value = headers.value(i);
if ("Date".equalsIgnoreCase(fieldName)) {
servedDate = HttpDate.parse(value);
servedDateString = value;
} else if (xxx){
//...
}
}
}
}

CacheStrategy get() {
CacheStrategy candidate = getCandidate();
if (candidate.networkRequest != null && request.cacheControl().onlyIfCached()) {
//返回策略,交给拦截器
return new CacheStrategy(null, null);
}
return candidate;
}

CacheStrategy getCandidate() {
//根据header字段,得到各种策略,交给拦截器...
return new CacheStrategy(xxx);
}
}

getCandidate里面就是根据header字段得到各种策略,然后交给拦截器处理

那么缓存是如何写入磁盘的呢?跟进InternalCache接口,他的实现在Cache类里,

Cache.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
InternalCache internalCache = new InternalCache() {
@Override public Response get(Request request) throws IOException {
return Cache.this.get(request);//读取
}

@Override public CacheRequest put(Response response) throws IOException {
return Cache.this.put(response);//写入
}

Response get(Request request) {
String key = key(request.url()); //键
DiskLruCache.Snapshot snapshot; //缓存快照
Entry entry;
snapshot = cache.get(key); //cache是okhttp的DiskLruCache
if (snapshot == null) {
return null; //没缓存,直接返回
}
//快照得到输入流,用于创建缓存条目
entry = new Entry(snapshot.getSource(ENTRY_METADATA));
//得到响应
Response response = entry.response(snapshot);
return response;
}

CacheRequest put(Response response) {
String requestMethod = response.request().method();
if (!requestMethod.equals("GET")) {
//不是get请求,不缓存
return null;
}
//封装成日志条目
Entry entry = new Entry(response);
DiskLruCache.Editor editor = null;
editor = cache.edit(key(response.request().url()));
//写入缓存
entry.writeTo(editor);
return new CacheRequestImpl(editor);
}
}

缓存默认是关闭的,需要自行开启:

1
2
3
4
5
new OkHttpClient.Builder().cache(new Cache(
new File(MyApp.APP.getCacheDir(),
"okhttp_cache"), //路径
50L * 1024L * 1024L) //大小
).build();

线程池

当一个任务通过execute(Runnable)方法添加到线程池时:

  • 线程数量小于corePoolSize,新建线程(核心)来处理被添加的任务

  • 线程数量大于等于 corePoolSize,新任务被添加到等待队列,添加失败

    • 线程数量小于maximumPoolSize,新建线程执行新任务
    • 线程数量等于maximumPoolSize,使用RejectedExecutionHander拒绝策略
Powered by Hexo & Theme Keep
Unique Visitor Page View