OkHttp分为异步恳求和同步恳求两部分
同步恳求
call.execute()
execute
代码如下:
#okhttp3.internal.connection.RealCall
override fun execute(): Response {
check(executed.compareAndSet(false, true)) { "Already Executed" }
// 1.敞开超时检测
timeout.enter()
callStart()
try {
// 2. 将恳求增加在同步恳求行列
client.dispatcher.executed(this)
//3. 调用getResponseWithInterceptorChain获取Response
return getResponseWithInterceptorChain()
} finally {
client.dispatcher.finished(this)
}
}
-
敞开超时检测
-
将恳求增加在同步恳求行列
-
调用
getResponseWithInterceptorChain
获取Response
异步恳求
-
okHttpClient.newCall
回来值
首要,咱们看一下okHttpClient.newCall
回来的是什么
newCall(request: Request): Call = RealCall(this, request, forWebSocket = false)
newCall
回来一个RealCall
目标
-
call.enqueue
的源码
override fun enqueue(responseCallback: Callback) {
check(executed.compareAndSet(false, true)) { "Already Executed" }
//1.
callStart()
//2.
client.dispatcher.enqueue(AsyncCall(responseCallback))
}
-
callStart()
办法,内部调用的EventListener.callStart
,是一个全局监听恳求进程的办法 - 调用
dispatcher
的enqueue
办法,从字面理解为,恳求需要排队履行,这里的恳求是AsyncCall
目标,AsyncCall
中包括回来结果的监听responseCallback
,也便是咱们在调用call.enqueue
时传入的callBack
。
AsyncCall下面会进行详细介绍
-
Dispatcher的enqueue办法
internal fun enqueue(call: AsyncCall) {
synchronized(this) {
//1.
readyAsyncCalls.add(call)
// Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to
// the same host.
if (!call.call.forWebSocket) {
val existingCall = findExistingCallWithHost(call.host)
if (existingCall != null) call.reuseCallsPerHostFrom(existingCall)
}
}
//2.
promoteAndExecute()
}
运用同步锁*synchronized
*防止线程同步问题:
-
将
AsyncCall
目标参加readyAsyncCalls
这个行列中 -
promoteAndExecute()
,推动并履行,咱们看一下详细干了什么 -
Dispatcher的promoteAndExecute办法
经过注释,咱们能够很清楚的知道该函数的作用:
将合法的恳求从readyAsyncCalls
行列里移到runningAsyncCalls
行列里,而且用ExecutorService
履行他们
/**
* Promotes eligible calls from [readyAsyncCalls] to [runningAsyncCalls] and runs them on the
* executor service. Must not be called with synchronization because executing calls can call
* into user code.
*
* @return true if the dispatcher is currently running calls.
*/
private fun promoteAndExecute(): Boolean {
this.assertThreadDoesntHoldLock()
//1.
val executableCalls = mutableListOf<AsyncCall>()
val isRunning: Boolean
synchronized(this) {
val i = readyAsyncCalls.iterator()
while (i.hasNext()) {
val asyncCall = i.next()
if (runningAsyncCalls.size >= this.maxRequests) break // Max capacity.
if (asyncCall.callsPerHost.get() >= this.maxRequestsPerHost) continue // Host max capacity.
// 从readyAsyncCalls中移除
i.remove()
asyncCall.callsPerHost.incrementAndGet()
//2. 增加到executableCalls/runningAsyncCalls 行列中
executableCalls.add(asyncCall)
runningAsyncCalls.add(asyncCall)
}
isRunning = runningCallsCount() > 0
}
// Avoid resubmitting if we can't logically progress
// particularly because RealCall handles a RejectedExecutionException
// by executing on the same thread.
if (executorService.isShutdown) {
...
} else {
for (i in 0 until executableCalls.size) {
// 3.
val asyncCall = executableCalls[i]
asyncCall.executeOn(executorService)
}
}
return isRunning
}
- 声明一个
AsyncCall
的集合 - 将每个
AsynCall
从readyAsyncCalls
中移除,增加到executableCalls/runningAsyncCalls
行列中 - 从
executableCalls
中取出AsynCall
,调用AsyncCall.executeOn
办法,并传入executorService
executorService
为ExecutorService
目标:
val executorService: ExecutorService get() { if (executorServiceOrNull == null) { executorServiceOrNull = ThreadPoolExecutor(0, Int.MAX_VALUE, 60, TimeUnit.SECONDS, SynchronousQueue(), threadFactory("$okHttpName Dispatcher", false)) } return executorServiceOrNull!! }
ExecutorService
是一个没有核心线程,非核心线程数为Int最大值,线程闲置60s会被回收的线程池
-
AsyncCall.executeOn
fun executeOn(executorService: ExecutorService) {
client.dispatcher.assertThreadDoesntHoldLock()
var success = false
try {
//1.
executorService.execute(this)
success = true
} catch (e: RejectedExecutionException) {
failRejected(e)
} finally {
if (!success) {
client.dispatcher.finished(this) // This call is no longer running!
}
}
}
运用传入的executorService
线程池履行AsyncCall
使命,而AsyncCall
是一个Runnable
目标,直接查看其run办法
-
AsyncCall.run
override fun run() {
threadName("OkHttp ${redactedUrl()}") {
var signalledCallback = false
// 1.
timeout.enter()
try {
// 2.
val response = getResponseWithInterceptorChain()
signalledCallback = true
// 3.
responseCallback.onResponse(this@RealCall, response)
} catch (e: IOException) {
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log("Callback failure for ${toLoggableString()}", Platform.INFO, e)
} else {
//4.
responseCallback.onFailure(this@RealCall, e)
}
} catch (t: Throwable) {
cancel()
if (!signalledCallback) {
val canceledException = IOException("canceled due to $t")
canceledException.addSuppressed(t)
// 5.
responseCallback.onFailure(this@RealCall, canceledException)
}
throw t
} finally {
client.dispatcher.finished(this)
}
}
}
-
调用
timeout.enter()
敞开超时检测 -
调用
getResponseWithInterceptorChain
获取呼应结果 -
获取到
response
后调用responseCallback.response
办法 -
调用
responseCallback.onFailure
,失告诉败结果
getResponseWithInterceptorChain源码分析
internal fun getResponseWithInterceptorChain(): Response {
// Build a full stack of interceptors.
val interceptors = mutableListOf<Interceptor>()
interceptors += client.interceptors
interceptors += RetryAndFollowUpInterceptor(client)
interceptors += BridgeInterceptor(client.cookieJar)
interceptors += CacheInterceptor(client.cache)
interceptors += ConnectInterceptor
if (!forWebSocket) {
interceptors += client.networkInterceptors
}
interceptors += CallServerInterceptor(forWebSocket)
val chain = RealInterceptorChain(
call = this,
interceptors = interceptors,
index = 0,
exchange = null,
request = originalRequest,
connectTimeoutMillis = client.connectTimeoutMillis,
readTimeoutMillis = client.readTimeoutMillis,
writeTimeoutMillis = client.writeTimeoutMillis
)
var calledNoMoreExchanges = false
try {
val response = chain.proceed(originalRequest)
if (isCanceled()) {
response.closeQuietly()
throw IOException("Canceled")
}
return response
} catch (e: IOException) {
calledNoMoreExchanges = true
throw noMoreExchanges(e) as Throwable
} finally {
if (!calledNoMoreExchanges) {
noMoreExchanges(null)
}
}
}
简单来说,getResponseWithInterceptorChain
办法中,便是构建一个链式拦截器,然后建议恳求,整个进程是经过拦截器的链式调用完结的,即先从左到右调用,再从右到左回来结果
暂时无法在飞书文档外展现此内容
-
RetryAndFollowUpInterceptor
:担任失利重试和重定向 -
BridgeInterceptor
:担任把用户恳求转换为发送到服务器的恳求,并把服务器的呼应转化为用户需要的呼应 -
CacheInterceptor
:担任读取缓存、更新缓存 -
ConnectInterceptor
:担任和服务器树立衔接 -
CallServerInterceptor
:担任向服务器发送数据,从服务器读取呼应数据
addInterceptor 和 addNetworkInterceptor区别
在getResponseWithInterceptorChain
办法中能够看到:拦截器是经过职责链模式调用的,
addInterceptor
增加的Interceptor
会被增加到职责链的初步,因此:
-
任何情况下都会首要履行
-
不受重定向和重试机制的影响,即自定义拦截器只会被履行一次
addNetworkInterceptor
增加的拦截器处在ConnectInterceptor
之后,因此:
- 如果无网衔接失利,该拦截器不会被调用
- 重定向和重试几回,该拦截器就会被调用几回
- 答应操作中心呼应,比方当恳求操作发生重定向(拦截被履行了两次)或许重试等。
- 不答应调用缓存来short-circuit (短路)这个恳求。(意思便是说不能从缓存池中获取缓存目标回来给客户端,必须经过恳求服务的方法获取呼应,也便是Chain.proceed())
- 能够监听数据的传输