前语
现在谈起网络恳求,咱们必定下意识想到的便是 okhttp
或许 retrofit
这样的三方恳求库。固然,现在有越来越多的三方库协助着咱们快速开发,可是关于现在的程序员来说,咱们不仅要学会怎样去用,更重要的是要清楚里边的原理,了解里边的思想,终究转化到咱们自己的实践代码中去。
⚡ okhttp
运用起来仍是比较方便,优点也有很多:支持GZIP紧缩
,衔接池复用底层TCP
,恳求主动重试重定向
…
现在 Google官方 也将源码傍边的 HttpURLConnection
底层完结改成 okhttp
了,同时 retrofit
的底层也是 okhttp
,足以阐明其在日常开发中的重要性。现在咱们正式进入今日的正题。
✔️ 本文阅读时长约为:20min
OkHttp版别:4.0.1 —-> 留意此版别起是Kotlin版别哦
okhttp的根本运用流程
// 1.创立client
OkHttpClient client = new OkHttpClient().newBuilder()
.cookieJar(CookieJar.NO_COOKIES)
.callTimeout(10000, TimeUnit.MILLISECONDS)
.build();
// 2.创立request
Request request = new Request.Builder()
.url("http://10.34.12.156:68080/admin-api")
.addHeader("Content-Type", "application/json")
.get();
.build();
// 3.构建call目标
Call call = client.newCall(request);
// 4.1调用call目标的同步恳求办法
Response response = call.execute();// response目标中保存的有回来的呼应参数
// 4.2调用call目标的异步恳求办法
call.enqueue(new Callback() {
@Override
public void onFailure(@NonNull Call call, @NonNull IOException e) {
Log.d(TAG, "onFailure: ");// 失利回调
}
@Override
public void onResponse(@NonNull Call call, @NonNull Response response) {
Log.d(TAG, "onResponse: ");// 成功回调
}
});
okhttp 的优点就在于,咱们完结一次网络恳求最少只需求触摸 OkHttpClient
,Request
,Call
这三个目标,显然是很轻松的。此外还需强调一点,OkHttpClient 和 Request 都是经过建造者形式构建的,这样的优点就在于用户能够根据自己的需求轻松简练的装备一些可选参数,而不必经过传统方式将不需求的参数写成 null
。
同步恳求和异步恳求的流程
1.同步恳求的履行流程
咱们将 OkhttpClient
中的 newCall()
作为入口,敞开整个同步恳求的进程。
// OkHttpClient.kt
// 这儿是构建一个 RealCall 目标
override fun newCall(request: Request): Call = RealCall(this, request, forWebSocket = false)
RealCall
完结了 Call
接口,咱们先来看看 Call
接口长什么样。
// Call.kt
interface Call : Cloneable {
// 同步恳求办法
@Throws(IOException::class)
fun execute(): Response
// 异步恳求办法
fun enqueue(responseCallback: Callback)
// OkHttpClient完结了Factory接口
// 所以才有newCall办法
fun interface Factory {
fun newCall(request: Request): Call
}
}
现在咱们经过 newCall()
得到了一个 RealCall
目标,然后就能经过 RealCall
傍边的 execute()
和 enqueue()
进行网络恳求。
// RealCall.kt
override fun execute(): Response {
// 一个call目标只能履行一次execute办法
// 这儿用CAS思想进行比较,能够进步功率
check(executed.compareAndSet(false, true)) { "Already Executed" }
timeout.enter()
// 这儿首要是个监听器,表明开端进行网络恳求了
callStart()
// 重点关注这块
try {
// 经过分发器进行使命分发
// 其实这儿还表现不出分发器的效果,仅仅是将当时恳求加入到一个同步行列傍边
client.dispatcher.executed(this)
// 经过 getResponseWithInterceptorChain() 取得相应成果
return getResponseWithInterceptorChain()
} finally {
// 完结一些收尾作业,在同步恳求中,几乎没什么用
client.dispatcher.finished(this)
}
}
现在来看看同步恳求中分发器做了什么作业呢?
// Dispatcher.kt
// 正在履行的同步恳求行列
private val runningSyncCalls = ArrayDeque<RealCall>()
// 简略的将当时恳求加入到同步恳求行列中
@Synchronized internal fun executed(call: RealCall) {
runningSyncCalls.add(call)
}
// finish办法 --> 前面说的首尾作业办法
// 可是在同步恳求中用途不大
internal fun finished(call: RealCall) {
finished(runningSyncCalls, call)
}
private fun <T> finished(calls: Deque<T>, call: T) {
// 顾名思义,分发器闲暇时得回调
val idleCallback: Runnable?
synchronized(this) {
// 关于完结恳求的 call ,在这儿移除掉
if (!calls.remove(call)) throw AssertionError("Call wasn't in-flight!")
idleCallback = this.idleCallback
}
// 这儿是进行首尾作业的
// 首要表现在异步恳求中,因而这个办法先放一放
val isRunning = promoteAndExecute()
// 进行闲暇回调办法
if (!isRunning && idleCallback != null) {
idleCallback.run()
}
}
能够看到,在整个同步恳求的进程中,分发器仅仅是将当时的恳求加入到一个同步恳求行列中,恳求完结后再将其移除。由于在同步恳求中 finished()
办法只有一个回调效果,因而咱们将它放一放,重点看一看异步恳求中的 finished()
。
2.异步恳求的履行流程
异步恳求就比同步恳求稍微复杂了一点,咱们依然是从 RealCall
中看起。
// RealCall.kt
override fun enqueue(responseCallback: Callback) {
// 一个call目标只能履行一次execute办法
// 这儿用CAS思想进行比较,能够进步功率
check(executed.compareAndSet(false, true)) { "Already Executed" }
// 这儿边仍旧加上了监听
callStart()
// 构建一个 AsyncCall目标,再交给dispatcher进行分发流程
client.dispatcher.enqueue(AsyncCall(responseCallback))
}
AsyncCall
是 RealCall
的内部类, 它完结了 Runnable 接口,首要是为了能在线程池中去履行它的 run()
办法。
// RealCall.kt
internal inner class AsyncCall(
private val responseCallback: Callback
) : Runnable {
@Volatile var callsPerHost = AtomicInteger(0)
private set
fun reuseCallsPerHostFrom(other: AsyncCall) {
// callPerHost代表了衔接同一个host的衔接数
// 此衔接数必须小于5
this.callsPerHost = other.callsPerHost
}
val host: String
get() = originalRequest.url.host
val request: Request
get() = originalRequest
val call: RealCall
get() = this@RealCall
// 将asyncCall增加到线程池中去履行的办法
fun executeOn(executorService: ExecutorService) {
client.dispatcher.assertThreadDoesntHoldLock()
var success = false
try {
// 线程池去履行当时AsyncCall目标的run办法
executorService.execute(this)
success = true
} catch (e: RejectedExecutionException) {
val ioException = InterruptedIOException("executor rejected")
ioException.initCause(e)
noMoreExchanges(ioException)
responseCallback.onFailure(this@RealCall, ioException)
} finally {
if (!success) {
// 收尾作业
// 其实内部调用的是 promoteAndExecute()
client.dispatcher.finished(this)
}
}
}
override fun run() {
threadName("OkHttp ${redactedUrl()}") {
var signalledCallback = false
timeout.enter()
try {
// getResponseWithInterceptorChain() 取得恳求的呼应成果
val response = getResponseWithInterceptorChain()
signalledCallback = true
// 恳求成功的回调
responseCallback.onResponse(this@RealCall, response)
} catch (e: IOException) {
if (signalledCallback) {
Platform.get().log("Callback failure for ${toLoggableString()}", Platform.INFO, e)
} else {
responseCallback.onFailure(this@RealCall, e)
}
} catch (t: Throwable) {
cancel()
if (!signalledCallback) {
val canceledException = IOException("canceled due to $t")
canceledException.addSuppressed(t)
// 恳求失利的回调
responseCallback.onFailure(this@RealCall, canceledException)
}
throw t
} finally {
// 进行收尾作业
// 相比同步恳求的finished办法,这儿更重要
client.dispatcher.finished(this)
}
}
}
}
现在咱们回到 Dispatcher
中去。
// Dispatcher.kt
// 预备履行的异步恳求行列
private val readyAsyncCalls = ArrayDeque<AsyncCall>()
// 正在履行的异步恳求行列
private val runningAsyncCalls = ArrayDeque<AsyncCall>()
internal fun enqueue(call: AsyncCall) {
synchronized(this) {
// 加当时asyncCall加到预备履行的异步恳求行列中
readyAsyncCalls.add(call)
if (!call.call.forWebSocket) {
// 这儿是得到衔接同一个 host 的恳求数
val existingCall = findExistingCallWithHost(call.host)
if (existingCall != null) call.reuseCallsPerHostFrom(existingCall)
}
}
// dispatcher进行分发call使命的办法
promoteAndExecute()
}
// 关键办法,dispatcher进行使命分发的办法
// 进行收尾作业时,也是调用的它
private fun promoteAndExecute(): Boolean {
this.assertThreadDoesntHoldLock()
// 需求开端履行的使命调集
val executableCalls = mutableListOf<AsyncCall>()
val isRunning: Boolean
synchronized(this) {
val i = readyAsyncCalls.iterator()
// 迭代等候履行异步恳求
while (i.hasNext()) {
val asyncCall = i.next()
// 正在履行异步恳求的总使命数不能大于64个
// 否则直接退出这个循环,不再将恳求加到异步恳求行列中
if (runningAsyncCalls.size >= this.maxRequests) break
// 同一个host的恳求数不能大于5
// 否则直接越过此call目标的增加,去遍历下一个asyncCall目标
if (asyncCall.callsPerHost.get() >= this.maxRequestsPerHost) continue
i.remove()
// 假如拿到了符合条件的asyncCall目标,就将其callPerHost值加1
// callPerHost代表了衔接同一个host的数量
asyncCall.callsPerHost.incrementAndGet()
// 加到需求开端履行的使命调集中
executableCalls.add(asyncCall)
// 将当时call加到正在履行的异步行列傍边
runningAsyncCalls.add(asyncCall)
}
isRunning = runningCallsCount() > 0
}
for (i in 0 until executableCalls.size) {
// 遍历每一个调集中的asyncCall目标
// 将其增加到线程池中,履行它的run办法
val asyncCall = executableCalls[i]
asyncCall.executeOn(executorService)
}
return isRunning
}
现在咱们经过 Dispatcher
将 AsyncCall
目标经过选择,加到了线程池中。选择的限制有两个:
1.当时履行的总恳求数要小于64个。
2.关于衔接的同一个host恳求,要确保数量小于5。
现在,咱们再回头看看将 AsyncCall
目标加到线程池后的一些细节吧!
// Dispatcher.kt
// 将asyncCall增加到线程池中去履行的办法
fun executeOn(executorService: ExecutorService) {
client.dispatcher.assertThreadDoesntHoldLock()
var success = false
try {
// 这儿是之前自定义了创立了一个ExecutorService
executorService.execute(this)
success = true
} catch (e: RejectedExecutionException) {
val ioException = InterruptedIOException("executor rejected")
ioException.initCause(e)
noMoreExchanges(ioException)
responseCallback.onFailure(this@RealCall, ioException)
} finally {
if (!success) {
// 这儿也是会履行收尾作业
client.dispatcher.finished(this)
}
}
}
@get:JvmName("executorService") val executorService: ExecutorService
get() {
if (executorServiceOrNull == null) {
// !!这儿的corePoolSize是 0
// !!阻塞行列是 SynchronousQueue
executorServiceOrNull = ThreadPoolExecutor(0, Int.MAX_VALUE, 60, TimeUnit.SECONDS,
SynchronousQueue(), threadFactory("$okHttpName Dispatcher", false))
}
return executorServiceOrNull!!
}
✊ 咱们先来看 executeOn()
办法,它的首要作业便是履行增加到线程池的 AsyncCall
目标的 run()
办法,去进行网络恳求。其次咱们目光移动到 finally
句子块,会发现每次履行完 run()
办法后,即完结网络恳求后,都会去履行这个 finished()
办法。前面讲到过,内部其实是再次调用了 promoteAndExecute()
办法。那这是为什么呢?
还记住到咱们从预备履行的异步行列中选择一些 AsyncCall
目标拿到线程池中履行吗?假如记住,那你是否还记住咱们是有选择条件的,正因如此,或许在预备履行的异步恳求行列中会有一些 AsyncCall
目标不满足条件依然留在行列里!那咱们莫非终究就不履行这些网络恳求了吗?当然不是!本来每完结一次网络恳求就会再次触发 Dispatcher
去分发 AsyncCall
目标!本来如此。
✊ 然后咱们再来看看这儿用到的线程池是一个什么样的线程池。在上面我贴出来的代码中能够看到,这个线程池的 corePoolSize
是 0
,BlockingQueue
是 SynchronousQueue
,这样构建出来的线程池有什么特别之处吗?了解线程池的同学都应该知道,当使命数超过了 corePoolSize
就会将其加到阻塞行列傍边。也便是说这些使命不会立马履行,而咱们的网络恳求可不想被阻塞着,因而这儿的 corePoolSize
就设置成了 0
。BlockingQueue
设置成 SynchronousQueue
也是类似道理,SynchronousQueue
是不贮存元素的,只需提交的使命数小于最大线程数就会立刻新起线程去履行使命。
3.okhttp网络恳求履行进程总结
总结一下整个 okhttp
网络恳求的整个进程。首要经过咱们经过 构造者 的方式构建好了 OkHttpClient
和 Request
目标,然后调用 OkHttpClient
目标的 newCall()
办法得到一个 RealCall
目标,终究再调用其 execute()
或许 enqueue()
办法进行同步或许异步恳求。
然后假如是同步恳求,Dispatacher
分发器去仅仅简略的将其加入到正在履行的同步恳求行列中做一个符号,假如是异步恳求就会根据 两个条件 去挑选适宜的恳求,并将其发送给一个特定的线程池中去进行网络恳求,终究经过 getResponseWithInterceptorChain()
得到终究成果。
恳求使命之五大阻拦器各司其职
okhttp的又一大特点是整个恳求流程是由五大阻拦器一层层分发下去,终究得到成果再一层层回来上来。 就像这样:
okhttp
内置了五大阻拦器,这五大阻拦器各司其职,经过职责链形式将恳求逐层分发下去,每层完结自己这层该做的事,终究拿到相应成果逐层往上回来成果:
1️⃣. RetryAndFollowUpInterceptor:恳求失利主动重试,假如 DNS
设置了多个ip地址会主动重试其余ip地址。
2️⃣. BridgeInterceptor:会补全咱们恳求中的恳求头,例如Host
,Cookie
,Accept-Encoding
等。
3️⃣. CacheInterceptor:会选择性的将呼应成果进行保存,以便下次直接读取,不再需求再向服务器索要数据。
4️⃣. ConnectInterceptor:树立衔接并得到对应的socket;办理衔接池,从中存取衔接,以便到达衔接复用的目的。
5️⃣. CallServerInterceptor:与服务器树立衔接,详细进行网络恳求,并将成果逐层回来的地方。
留意:这五个是体系内置的阻拦器,咱们也能够经过 addInterceptor()
加入咱们自己写的阻拦器,后面会说到。
现在咱们从敞开阻拦器的入口函数 getResponseWithInterceptorChain()
看起,
// RealCall.kt
// 五大阻拦器的开始入口
@Throws(IOException::class)
internal fun getResponseWithInterceptorChain(): Response {
// 用一个调集保存一切的阻拦器
val interceptors = mutableListOf<Interceptor>()
// 这个interceptor便是咱们自己能够加的第一个阻拦器
// 由于位于一切阻拦器的第一个,与咱们的运用直接相连
// 因而这个阻拦器又被称为 Application Interceptor
interceptors += client.interceptors
// 重试重定向阻拦器
interceptors += RetryAndFollowUpInterceptor(client)
// 桥接阻拦器
interceptors += BridgeInterceptor(client.cookieJar)
// 缓存阻拦器
interceptors += CacheInterceptor(client.cache)
// 衔接阻拦器
interceptors += ConnectInterceptor
if (!forWebSocket) {
// 这个interceptor也是咱们自己能够加的一个阻拦器
// 由于位于真实恳求回来成果的阻拦器前面,能够拿到服务器回来的最原始的成果
// 因而这个阻拦器又被称为 Network Interceptor
interceptors += client.networkInterceptors
}
interceptors += CallServerInterceptor(forWebSocket)
// 构建RealInterceptorChain目标,咱们正是经过此目标将恳求逐层往下传递的
val chain = RealInterceptorChain(
call = this,
interceptors = interceptors,
index = 0,
exchange = null,
request = originalRequest,
connectTimeoutMillis = client.connectTimeoutMillis,
readTimeoutMillis = client.readTimeoutMillis,
writeTimeoutMillis = client.writeTimeoutMillis
)
var calledNoMoreExchanges = false
try {
// 调用RealInterceptorChain的proceed()办法,将恳求向下一个衔接器传递
val response = chain.proceed(originalRequest)
if (isCanceled()) {
response.closeQuietly()
throw IOException("Canceled")
}
// 放回呼应成果
return response
} catch (e: IOException) {
calledNoMoreExchanges = true
throw noMoreExchanges(e) as Throwable
} finally {
if (!calledNoMoreExchanges) {
noMoreExchanges(null)
}
}
}
从这个办法中咱们大概能够总结出,它将一切的阻拦器包含用户自定义的阻拦器悉数经过一个调集保存了下来,然后构建出了 RealInterceptorChain
目标,并调用其 proceed()
办法开端了阻拦器逐层分发作业。
那么它是怎样做到逐层分发的呢?其实很简略,每一个阻拦器中都会经过 proceed()
办法再构建一个 RealInterceptorChain
目标,然后调用 intercpt
去履行下个阻拦器中的使命,如此循环,终究走到终究一个阻拦器后退出。
// RealInterceptorChain.kt -----> 完结了 Chain 接口
override fun proceed(request: Request): Response {
// 查看是否走完了一切的阻拦器,是则退出
check(index < interceptors.size
...
// 这个办法便是再次构建了 RealInterceptorChain 目标 ==> next
// 去履行下个阻拦器中的使命
val next = copy(index = index + 1, request = request)// 这个办法内部就一行代码 new RealInterceptorChain()
val interceptor = interceptors[index]
@Suppress("USELESS_ELVIS")
// 经过调用intercept(next)去履行下一个阻拦器中的使命
val response = interceptor.intercept(next) ?: throw NullPointerException(
"interceptor $interceptor returned null")
...
// 将成果放回到上一个阻拦器中
return response
}
以上咱们搞清楚了阻拦器是怎样一步一步往下传递使命,并逐层往上回来成果的,现在咱们来详细看看每个阻拦器都做了什么工作。
1.RetryAndFollowUpInterceptor阻拦器
// RetryAndFollowUpInterceptor.kt
// 一切阻拦求都完结了 Interceptor 接口
override fun intercept(chain: Interceptor.Chain): Response {
val realChain = chain as RealInterceptorChain
var request = chain.request
val call = realChain.call
var followUpCount = 0
var priorResponse: Response? = null
var newExchangeFinder = true
var recoveredFailures = listOf<IOException>()
// 这是个死循环,意思便是假如恳求失利就需求一直重试,直到主动退出循环(followUpCount>20)
while (true) {
// ExchangeFinder: 获取衔接 ---> ConnectInterceptor中运用
call.enterNetworkInterceptorExchange(request, newExchangeFinder)
// 呼应成果
var response: Response
var closeActiveExchange = true
try {
if (call.isCanceled()) {
throw IOException("Canceled")
}
try {
// 调用下一个阻拦器,即 BridgeInterceptor
// 整个恳求或许会失利,需求捕获然后重试重定向,因而有一个try catch
response = realChain.proceed(request)
newExchangeFinder = true
} catch (e: RouteException) { //1.进行重试
// 1.1 道路反常,查看是否需求重试
if (!recover(e.lastConnectException, call, request, requestSendStarted = false)) {
throw e.firstConnectException.withSuppressed(recoveredFailures)
} else {
recoveredFailures += e.firstConnectException
}
newExchangeFinder = false
// 失利持续重试
continue
} catch (e: IOException) {
// 1.2 IO反常 HTTP2才会有ConnectionShutdownException 代表衔接中止
//假如是由于IO反常,那么requestSendStarted=true (若是HTTP2的衔接中止反常依然为false)
if (!recover(e, call, request, requestSendStarted = e !is ConnectionShutdownException)) {
throw e.withSuppressed(recoveredFailures)
} else {
recoveredFailures += e
}
newExchangeFinder = false
// 失利持续重试
continue
}
// priorResponse:上一次恳求的呼应
if (priorResponse != null) {
response = response.newBuilder()
.priorResponse(priorResponse.newBuilder()
.body(null)
.build())
.build()
}
val exchange = call.interceptorScopedExchange
// 2.根据回来的response进行重定向,构建新的Request目标
val followUp = followUpRequest(response, exchange)
// 2.1 followUp为空,代表没有重定向,直接回来成果response
if (followUp == null) {
if (exchange != null && exchange.isDuplex) {
call.timeoutEarlyExit()
}
closeActiveExchange = false
return response
}
// 2.2 followUp不为空,可是body中设置了只能恳求一次(默许),回来重定向后的成果response
val followUpBody = followUp.body
if (followUpBody != null && followUpBody.isOneShot()) {
closeActiveExchange = false
return response
}
// 重试次数大于20次,抛出反常
if (++followUpCount > MAX_FOLLOW_UPS) {
throw ProtocolException("Too many follow-up requests: $followUpCount")
}
// 将之前重定向后的Request目标赋值给request进行重新恳求
request = followUp
priorResponse = response
} finally {
call.exitNetworkInterceptorExchange(closeActiveExchange)
}
}
}
简略来说,RetryAndFollowUpInterceptor阻拦器帮咱们干了两件事。第一是重试,第二是重定向。
✅ 咱们先来看看什么状况下它会进行重试。
// RetryAndFollowUpInterceptor.kt
// 这个办法便是来判别当时恳求是否需求重试的
private fun recover(
e: IOException,
call: RealCall,
userRequest: Request,
requestSendStarted: Boolean
): Boolean {
// 构建OkHttpClient时装备不重试,则回来false
if (!client.retryOnConnectionFailure) return false
// 回来false
// 1、假如是IO反常(非http2中止反常)表明恳求或许发出
// 2、假如恳求体只能被运用一次(默许为false)
if (requestSendStarted && requestIsOneShot(e, userRequest)) return false
// 回来false
// 协议反常、IO中止反常(除Socket读写超时之外),ssl认证反常
if (!isRecoverable(e, requestSendStarted)) return false
// 无更多的道路,回来false
if (!call.retryAfterFailure()) return false
// 以上状况都不是,则回来true 表明能够重试
return true
}
这儿画个图做个总结:
✅ 再来看看怎样判别是否需求重定向的。
@Throws(IOException::class)
private fun followUpRequest(userResponse: Response, exchange: Exchange?): Request? {
val route = exchange?.connection?.route()
val responseCode = userResponse.code
val method = userResponse.request.method
when (responseCode) {
// 407 署理需求授权,经过proxyAuthenticator取得request,向其中增加恳求头 Proxy-Authorization
// 然后构建一个新的request目标回来出去,预备再次恳求
HTTP_PROXY_AUTH -> {
val selectedProxy = route!!.proxy
if (selectedProxy.type() != Proxy.Type.HTTP) {
throw ProtocolException("Received HTTP_PROXY_AUTH (407) code while not using proxy")
}
return client.proxyAuthenticator.authenticate(route, userResponse)
}
// 401 服务器恳求需授权,经过authenticator取得到了Request,增加Authorization恳求头
// 然后构建一个新的request目标回来出去,预备再次恳求
HTTP_UNAUTHORIZED -> return client.authenticator.authenticate(route, userResponse)
// 回来的呼应码是3xx,这就预备进行重定向,构建新的Request目标
HTTP_PERM_REDIRECT, HTTP_TEMP_REDIRECT, HTTP_MULT_CHOICE, HTTP_MOVED_PERM, HTTP_MOVED_TEMP, HTTP_SEE_OTHER -> {
return buildRedirectRequest(userResponse, method)
}
// 408 恳求超时
HTTP_CLIENT_TIMEOUT -> {
// 用户设置是否能够进行重试(默许答应)
if (!client.retryOnConnectionFailure) {
return null
}
val requestBody = userResponse.request.body
if (requestBody != null && requestBody.isOneShot()) {
return null
}
val priorResponse = userResponse.priorResponse
// 假如上次也是由于408导致重试,这次恳求又回来的408,则不会再去重试了,直接回来null
if (priorResponse != null && priorResponse.code == HTTP_CLIENT_TIMEOUT) {
return null
}
// 服务器回来的 Retry-After:0 或许未呼应Retry-After就不会再次去恳求
if (retryAfter(userResponse, 0) > 0) {
return null
}
// 回来当时的request目标,预备再次恳求
return userResponse.request
}
// 503 服务不可用
HTTP_UNAVAILABLE -> {
val priorResponse = userResponse.priorResponse
// 和408类似,假如两次都是503导致恳求重试,那么这次就不会再重试了,直接回来null
if (priorResponse != null && priorResponse.code == HTTP_UNAVAILABLE) {
return null
}
// 服务端回来的有Retry-After: 0,则当即重试
if (retryAfter(userResponse, Integer.MAX_VALUE) == 0) {
return userResponse.request
}
return null
}
// 421 当时客户端的IP地址衔接到服务器的数量超过了服务器答应的规模
HTTP_MISDIRECTED_REQUEST -> {
val requestBody = userResponse.request.body
if (requestBody != null && requestBody.isOneShot()) {
return null
}
if (exchange == null || !exchange.isCoalescedConnection) {
return null
}
// 运用另一个衔接目标建议恳求
exchange.connection.noCoalescedConnections()
return userResponse.request
}
else -> return null
}
}
老规矩,仍旧给咱们画张图,便于了解。
2.BridgeInterceptor阻拦器
接下来,来到第二个阻拦器 BridgeInterceptor
。这个阻拦器前面说过,首要便是用来补全恳求头的,除此之外便是假如呼应头中有Content-Encoding: gzip
,则会用 GzipSource
进行解析。
// BridgeInterceptor.kt
class BridgeInterceptor(private val cookieJar: CookieJar) : Interceptor {
@Throws(IOException::class)
override fun intercept(chain: Interceptor.Chain): Response {
val userRequest = chain.request()
val requestBuilder = userRequest.newBuilder()
// 这儿没有什么多说的,便是补全恳求头
val body = userRequest.body
if (body != null) {
val contentType = body.contentType()
if (contentType != null) {
requestBuilder.header("Content-Type", contentType.toString())
}
val contentLength = body.contentLength()
if (contentLength != -1L) {
requestBuilder.header("Content-Length", contentLength.toString())
requestBuilder.removeHeader("Transfer-Encoding")
} else {
requestBuilder.header("Transfer-Encoding", "chunked")
requestBuilder.removeHeader("Content-Length")
}
}
if (userRequest.header("Host") == null) {
requestBuilder.header("Host", userRequest.url.toHostHeader())
}
if (userRequest.header("Connection") == null) {
requestBuilder.header("Connection", "Keep-Alive")
}
var transparentGzip = false
if (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) {
transparentGzip = true
requestBuilder.header("Accept-Encoding", "gzip")
}
val cookies = cookieJar.loadForRequest(userRequest.url)
if (cookies.isNotEmpty()) {
requestBuilder.header("Cookie", cookieHeader(cookies))
}
if (userRequest.header("User-Agent") == null) {
requestBuilder.header("User-Agent", userAgent)
}
// 去调用下一个阻拦器,并得到呼应成果
val networkResponse = chain.proceed(requestBuilder.build())
cookieJar.receiveHeaders(userRequest.url, networkResponse.headers)
val responseBuilder = networkResponse.newBuilder()
.request(userRequest)
// 根据呼应头中的 Content-Encoding,判别是否需求gzip解析
if (transparentGzip &&
"gzip".equals(networkResponse.header("Content-Encoding"), ignoreCase = true) &&
networkResponse.promisesBody()) {
val responseBody = networkResponse.body
if (responseBody != null) {
val gzipSource = GzipSource(responseBody.source())
val strippedHeaders = networkResponse.headers.newBuilder()
.removeAll("Content-Encoding")
.removeAll("Content-Length")
.build()
responseBuilder.headers(strippedHeaders)
val contentType = networkResponse.header("Content-Type")
// 进行gzip解析
responseBuilder.body(RealResponseBody(contentType, -1L, gzipSource.buffer()))
}
}
return responseBuilder.build()
}
桥接阻拦器其实作业内容也很简略,在恳求之前,向咱们的恳求头中增加必要的参数,然后拿到恳求的呼应后,根据呼应头中的参数去判别是否需求 gzip
解析,假如需求则用 GzipSource
去解析就好了。
3.CacheInterceptor阻拦器
在讲 CacheInterceptor
阻拦器之前,咱们先来了解一下 HTTP的缓存规矩。 咱们按照其行为将其分为两大类:强缓存和洽谈缓存。
1️⃣强缓存:浏览器并不会将恳求发送给服务器。强缓存是利用 http
的回来头中的 Expires
或许 Cache-Control
两个字段来控制的,用来表明资源的缓存时刻。
2️⃣洽谈缓存:浏览器会将恳求发送至服务器。服务器根据 http
头信息中的 Last-Modify/If-Modify-Since
或 Etag/If-None-Match
来判别是否射中洽谈缓存。假如射中,则 http
回来码为 304
,客户端从本地缓存中加载资源。
搞清楚 Http的缓存战略
后,咱们来看看 CacheInterceptor
阻拦器做了什么作业吧。
// CacheInterceptor.kt
@Throws(IOException::class)
override fun intercept(chain: Interceptor.Chain): Response {
val call = chain.call()
val cacheCandidate = cache?.get(chain.request())
val now = System.currentTimeMillis()
val strategy = CacheStrategy.Factory(now, chain.request(), cacheCandidate).compute()
// 代表需求建议恳求
val networkRequest = strategy.networkRequest
// 代表直接运用本地缓存
val cacheResponse = strategy.cacheResponse
...
// networkRequest 和 cacheResponse 都是null
// 阐明服务器要求运用缓存,可是本地没有缓存,直接失利
if (networkRequest == null && cacheResponse == null) {
return Response.Builder()
.request(chain.request())
.protocol(Protocol.HTTP_1_1)
.code(HTTP_GATEWAY_TIMEOUT)
.message("Unsatisfiable Request (only-if-cached)")
.body(EMPTY_RESPONSE)// 构建一个空的response回来曩昔
.sentRequestAtMillis(-1L)
.receivedResponseAtMillis(System.currentTimeMillis())
.build()
}
// networkRequest为null cacheResponse不为null
// 阐明运用强缓存,成功
if (networkRequest == null) {
return cacheResponse!!.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.build().also {
listener.cacheHit(call, it)
}
}
...
var networkResponse: Response? = null
try {
// 走到这儿阐明需求恳求一次,判别是洽谈缓存仍是重新去服务器上获取资源
// 因而 调用下一个阻拦器去持续恳求
networkResponse = chain.proceed(networkRequest)
} finally {
if (networkResponse == null && cacheCandidate != null) {
cacheCandidate.body?.closeQuietly()
}
}
// 洽谈缓存
// 假如咱们本地有缓存,并且服务器放回给咱们304呼应码,直接运用本地的缓存
if (cacheResponse != null) {
if (networkResponse?.code == HTTP_NOT_MODIFIED) {
val response = cacheResponse.newBuilder()
.headers(combine(cacheResponse.headers, networkResponse.headers))
.sentRequestAtMillis(networkResponse.sentRequestAtMillis)
.receivedResponseAtMillis(networkResponse.receivedResponseAtMillis)
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build()
networkResponse.body!!.close()
return response
...
} else {
cacheResponse.body?.closeQuietly()
}
}
// 走到这,阐明呼应码是200 表明咱们需求用这次新恳求的资源
val response = networkResponse!!.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build()
if (cache != null) {
if (response.promisesBody() && CacheStrategy.isCacheable(response, networkRequest)) {
// 将本次最新得到的呼应存到cache中去
val cacheRequest = cache.put(response)
return cacheWritingResponse(cacheRequest, response).also {
}
}
...
}
// 将这次新恳求的资源回来给上一层阻拦器
return response
}
⚡ 总结一下缓存阻拦器处理缓存的流程:首要得到 RealInterceptorChain
目标,然后经过它再得到两个很重要的目标:networkRequest
和 cacheResponse
。networkRequest
代表去建议一个网络恳求, cacheResponse
代表运用本地缓存。经过这两个目标是否为 null
来判别此次恳求是运用直接缓存,仍是去恳求新的资源,仍是去运用洽谈缓存。终究便是会更新缓存,把每次新恳求的资源都重新保存至 cache
中。
4.ConnectInterceptor阻拦器
衔接阻拦器首要便是做树立衔接和衔接复用的作业,它会从衔接池中取出符合条件的衔接,避免重复创立,从而提升恳求功率。
object ConnectInterceptor : Interceptor {
@Throws(IOException::class)
override fun intercept(chain: Interceptor.Chain): Response {
val realChain = chain as RealInterceptorChain
// 获取衔接 Exchange:数据交换(封装了衔接)
val exchange = realChain.call.initExchange(chain)
val connectedChain = realChain.copy(exchange = exchange)
// 持续调用下一个阻拦器去恳求数据
return connectedChain.proceed(realChain.request)
}
}
能够看到,这个衔接阻拦器中的代码比较少,首要的逻辑都在 initExchange()
办法傍边,这个办法的效果便是拿到一个衔接目标树立衔接。
// RealCall.kt
internal fun initExchange(chain: RealInterceptorChain): Exchange {
synchronized(this) {
check(expectMoreExchanges) { "released" }
check(!responseBodyOpen)
check(!requestBodyOpen)
}
val exchangeFinder = this.exchangeFinder!!
// ExchangeCodec: 编解码器
// find():查找衔接Realconnection
// 内部调用 findHealthy 去找到可用衔接
val codec = exchangeFinder.find(client, chain)
// Exchange:数据交换器 包含了exchangecodec与Realconnection
val result = Exchange(this, eventListener, exchangeFinder, codec)
this.interceptorScopedExchange = result
this.exchange = result
synchronized(this) {
this.requestBodyOpen = true
this.responseBodyOpen = true
}
if (canceled) throw IOException("Canceled")
// 便是拿到了ExChange目标回来出去
return result
}
那么 find()
是怎样找到衔接的呢?咱们直接点到 findConnection()
里边去。
// ExchangeFinder.kt
private fun findConnection(
connectTimeout: Int,
readTimeout: Int,
writeTimeout: Int,
pingIntervalMillis: Int,
connectionRetryEnabled: Boolean
): RealConnection {
// 当时恳求被取消,抛反常
if (call.isCanceled()) throw IOException("Canceled")
...
// 封闭衔接
toClose?.closeQuietly()
eventListener.connectionReleased(call, callConnection)
}
// 1.从衔接池傍边去寻觅衔接,到达复用的目的
if (connectionPool.callAcquirePooledConnection(address, call, null, false)) {
val result = call.connection!!
eventListener.connectionAcquired(call, result)
return result
}
...
// 树立新的衔接
val newConnection = RealConnection(connectionPool, route)
call.connectionToCancel = newConnection
try {
// 2.走到这儿阐明没有找到适宜的衔接,因而现在去创立一个新的衔接
newConnection.connect(
connectTimeout,
readTimeout,
writeTimeout,
pingIntervalMillis,
connectionRetryEnabled,
call,
eventListener
)
} finally {
call.connectionToCancel = null
}
// 对新树立的衔接进行保存,以便下次复用
synchronized(newConnection) {
connectionPool.put(newConnection)
call.acquireConnectionNoEvents(newConnection)
}
...
return newConnection
}
看到这儿,咱们发现其实树立衔接有两种办法,一种是从 衔接池傍边复用旧的衔接,另一种便是去 树立一个新的衔接 。
1️⃣ 衔接池默许的闲暇衔接数是5个,闲暇的衔接时刻也是5分钟,假如闲暇衔接数超出了5个,那么就整理掉最近最久未运用的衔接数,直到衔接池中的衔接数小于等于5个。现在咱们来看看衔接池是个什么姿态的吧。
// RealConnectionPool.kt
class RealConnectionPool(
taskRunner: TaskRunner,
private val maxIdleConnections: Int,
keepAliveDuration: Long,
timeUnit: TimeUnit
) {
//
private val keepAliveDurationNs: Long = timeUnit.toNanos(keepAliveDuration)
private val cleanupQueue: TaskQueue = taskRunner.newQueue()
// 这是个使命相当于守时使命 主动cleanup去整理剩余或许超时的衔接
private val cleanupTask = object : Task("$okHttpName ConnectionPool") {
override fun runOnce() = cleanup(System.nanoTime())
}
// 从衔接池中取到适宜的衔接的办法
fun callAcquirePooledConnection(
address: Address,
call: RealCall,
routes: List<Route>?,
requireMultiplexed: Boolean
): Boolean {
for (connection in connections) {
synchronized(connection) {
if (requireMultiplexed && !connection.isMultiplexed) return@synchronized
// 这儿会比较衔接傍边的每一个参数,必须与当时恳求的参数悉数相同才干获取到
// 比如:host, ip, 协议类型,证书...
if (!connection.isEligible(address, routes)) return@synchronized
call.acquireConnectionNoEvents(connection)
return true
}
}
return false
}
// 闲暇衔接的数量
fun idleConnectionCount(): Int {
return connections.count {
synchronized(it) { it.calls.isEmpty() }
}
}
// 总的衔接数
fun connectionCount(): Int {
return connections.size
}
// 往衔接池中存衔接的办法
fun put(connection: RealConnection) {
connection.assertThreadHoldsLock()
connections.add(connection)
// 放入衔接池时,也会启动一次整理使命
cleanupQueue.schedule(cleanupTask)
}
// 定期做整理作业的办法
fun `cleanup`(now: Long): Long {
// 在运用的衔接数量
var inUseConnectionCount = 0
// 闲暇的衔接数量
var idleConnectionCount = 0
// 最长搁置的衔接
var longestIdleConnection: RealConnection? = null
// 最大搁置时刻
var longestIdleDurationNs = Long.MIN_VALUE
// 遍历每个衔接,判别是否到达整理条件
for (connection in connections) {
synchronized(connection) {
// 记载正在运用与现已搁置的衔接数
if (pruneAndGetAllocationCount(connection, now) > 0) {
inUseConnectionCount++
} else {
idleConnectionCount++
// 记载最长搁置时刻的衔接longestIdleConnection
val idleDurationNs = now - connection.idleAtNs
if (idleDurationNs > longestIdleDurationNs) {
longestIdleDurationNs = idleDurationNs
longestIdleConnection = connection
} else {
Unit
}
}
}
}
when {
// 最长搁置时刻的衔接超过了答应搁置时刻 或许 搁置数量超过答应数量,整理此衔接
longestIdleDurationNs >= this.keepAliveDurationNs
|| idleConnectionCount > this.maxIdleConnections -> {
val connection = longestIdleConnection!!
synchronized(connection) {
if (connection.calls.isNotEmpty()) return 0L // No longer idle.
if (connection.idleAtNs + longestIdleDurationNs != now) return 0L // No longer oldest.
connection.noNewExchanges = true
connections.remove(longestIdleConnection)
}
connection.socket().closeQuietly()
if (connections.isEmpty()) cleanupQueue.cancelAll()
return 0L
}
// 存在搁置衔接,下次履行整理使命在 答应搁置时刻-现已搁置时候 后
idleConnectionCount > 0 -> {
return keepAliveDurationNs - longestIdleDurationNs
}
// 存在运用中的衔接,下次整理在 答应搁置时刻后
inUseConnectionCount > 0 -> {
// 一切点衔接都在用,至少需求等候 一个答应搁置时刻 后再整理
return keepAliveDurationNs
}
else -> {
// 没有任何衔接回来 -1
return -1
}
}
}
}
2️⃣ 衔接池的部分总的来说便是当要复用衔接的时候,调用 isEligible()
办法,去遍历比较当时衔接的 Host
, url
, ip
等是否彻底一样,满足条件就取出这个衔接。另外对衔接进行缓存时,假如当时的衔接数超过了5个,那么会将最近最久未运用的衔接进行铲除。cleanup()
办法会每过一个整理周期主动做一次整理作业。
咱们现在来看假如没有从衔接池中拿到适宜的衔接,是此时怎样创立新的衔接的。
不过在此之前仍是先来看看有关 Http署理
的相关常识。
Http署理 分为一般署理和地道署理,一般署理就起到一个中间人的效果,它会在收到客户端恳求报文时,正确处理恳求和衔接状况,然后向服务器发送行的恳求。收到服务器的成果后,再将呼应成果包装成一个呼应体回来给客户端。而地道署理是将恳求无脑转发给服务端,地道署理需求建议 CONNECT 恳求,发送到署理服务器,代表树立好了地道,然后将 CONNECT 恳求后的一切数据,悉数转发给终究的服务端。
// RealConnection.kt
// 树立阻拦的办法
fun connect(
connectTimeout: Int,
readTimeout: Int,
writeTimeout: Int,
pingIntervalMillis: Int,
connectionRetryEnabled: Boolean,
call: Call,
eventListener: EventListener
) {
...
while (true) {
...
if (route.requiresTunnel()) {
// 地道署理,会做两件事
// 1.connectSocket 衔接socket
// 2.发送CONNECT恳求头 树立地道署理
connectTunnel(connectTimeout, readTimeout, writeTimeout, call, eventListener)
if (rawSocket == null) {
// We were unable to connect the tunnel but properly closed down our resources.
break
}
} else {
// 一般署理 衔接socket
connectSocket(connectTimeout, readTimeout, call, eventListener)
}
establishProtocol(connectionSpecSelector, pingIntervalMillis, call, eventListener)
eventListener.connectEnd(call, route.socketAddress, route.proxy, protocol)
break
...
}
connectTunnel()
内部也是会调用 connectSocket()
,只不过由于地道署理会先发一个 CONNECT
恳求头,因而比 connectSocket()
办法内部多了一个 CONNECT
恳求头的发送。
对衔接池的流程做个简略总结:
5.CallServerInterceptor阻拦器
// CallServerInterceptor.kt
@Throws(IOException::class)
override fun intercept(chain: Interceptor.Chain): Response {
val realChain = chain as RealInterceptorChain
val exchange = realChain.exchange!!
val request = realChain.request
val requestBody = request.body
val sentRequestMillis = System.currentTimeMillis()
// 将恳求头写入缓存中
exchange.writeRequestHeaders(request)
var invokeStartEvent = true
var responseBuilder: Response.Builder? = null
if (HttpMethod.permitsRequestBody(request.method) && requestBody != null) {
// 大容量恳求领会带有 Expect: 100-continue 字段,服务器辨认同意后,才干持续发送恳求给服务端
if ("100-continue".equals(request.header("Expect"), ignoreCase = true)) {
// 与服务器进行恳求
exchange.flushRequest()
responseBuilder = exchange.readResponseHeaders(expectContinue = true)
exchange.responseHeadersStart()
invokeStartEvent = false
}
if (responseBuilder == null) {
if (requestBody.isDuplex()) {
// Prepare a duplex body so that the application can send a request body later.
exchange.flushRequest()
val bufferedRequestBody = exchange.createRequestBody(request, true).buffer()
requestBody.writeTo(bufferedRequestBody)
} else {
// 大部分状况都是走这儿,经过IO流把呼应成果写入response中
val bufferedRequestBody = exchange.createRequestBody(request, false).buffer()
requestBody.writeTo(bufferedRequestBody)
bufferedRequestBody.close()
}
} else {
exchange.noRequestBody()
if (!exchange.connection.isMultiplexed) {
// 没有呼应 Expect:100 continue则阻挠此衔接得到复用,并且会封闭相关的socket
exchange.noNewExchangesOnConnection()
}
}
} else {
exchange.noRequestBody()
}
...
var response = responseBuilder
.request(request)
.handshake(exchange.connection.handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build()
var code = response.code
if (code == 100) {
// 回来100 表明接纳大恳求体恳求 持续发送恳求体
// 得到response后回来成果
responseBuilder = exchange.readResponseHeaders(expectContinue = false)!!
// 构建呼应体
response = responseBuilder
.request(request)
.handshake(exchange.connection.handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build()
code = response.code
}
response = if (forWebSocket && code == 101) {
// 假如状况码是101并且是webSocket就回来空的response
response.newBuilder()
.body(EMPTY_RESPONSE)
.build()
} else {
response.newBuilder()
.body(exchange.openResponseBody(response))
.build()
}
return response
}
在 okhttp
中,面对比较大的恳求体时,会先去问询服务器是否接纳此恳求体,假如服务器接纳并回来呼应码 200
,则 okhttp
持续发送恳求体,否则就直接回来给客户端。假如服务器疏忽此恳求,则不会呼应,终究客户端会超时抛出反常。
阻拦器部分完毕~
整个 okhttp
功用的完结就在这五个默许的阻拦器中,所以先了解阻拦器形式的作业机制是先决条件。这五个阻拦器分别为: 重试阻拦器、桥接阻拦器、缓存阻拦器、衔接阻拦器、恳求服务阻拦器。每一个阻拦器担任的作业不一样,就好像工厂流水线,终究经过这五道工序,就完结了终究的产品。
可是与流水线不同的是,okhttp
中的阻拦器每次建议恳求都会在交给下一个阻拦器之前干一些工作,在取得了成果之后又干一些工作。整个进程在恳求向是次序的,而呼应则是逆序。
写在终究
篇幅很长,能看到终究很不容易,给自己一个 大大的赞 吧!
假如觉得写的不错,就给个赞再走吧~
创造实属不易,你的必定是我创造的动力,下次见!
假如本篇博客有任何错误的地方,请咱们批评指正,不胜感激。