敞开成长之旅!这是我参加「日新方案 12 月更文挑战」的第37天,点击检查活动概况
文章中源码的OkHttp版别为4.10.0
implementation 'com.squareup.okhttp3:okhttp:4.10.0'
1.简略运用
- okHttp的简略运用代码如下:
//创立OkHttpClient目标
val client = OkHttpClient().newBuilder().build()
//创立Request目标
val request = Request.Builder()
.url("https://wanandroid.com/wxarticle/list/408/1/json") //添加恳求的url地址
.build() //回来一个Request目标
//建议恳求
fun request() {
val response = client
.newCall(request) //创立一个Call目标
.enqueue(object : Callback { //调用enqueue办法履行异步恳求
override fun onFailure(call: Call, e: IOException) {
TODO("Not yet implemented")
}
override fun onResponse(call: Call, response: Response) {
TODO("Not yet implemented")
}
})
}
- 工作流程有四步:
- 创立OkHttpClient目标
- 创立Request目标
- 创立Call目标
- 开端建议恳求,
enqueue
为异步恳求,execute
为同步恳求
2.OkHttpClient目标是怎么创立的
val client = OkHttpClient().newBuilder().build()
OkHttpClient
目标的创立是一个典型的建造者形式,先看一下newBuilder
办法做了什么,源码如下:
//创立了一个Builder目标
open fun newBuilder(): Builder = Builder(this)
class Builder constructor() {
//调度器
internal var dispatcher: Dispatcher = Dispatcher()
//拦截器调集
internal val interceptors: MutableList<Interceptor> = mutableListOf()
//网络拦截器调集
internal val networkInterceptors: MutableList<Interceptor> = mutableListOf()
...
}
newBuilder
中创立了一个Builder
目标,Builder
目标的结构函数中界说了许多的变量,这儿只保留了3个重要的。
下面看一下build
办法做了什么
//这个this就是上面创立的Builder目标
fun build(): OkHttpClient = OkHttpClient(this)
okHttpClient
源码如下
open class OkHttpClient internal constructor(
builder: Builder
) : Cloneable, Call.Factory, WebSocket.Factory {
@get:JvmName("dispatcher") val dispatcher: Dispatcher = builder.dispatcher
@get:JvmName("interceptors") val interceptors: List<Interceptor> =
builder.interceptors.toImmutableList()
@get:JvmName("networkInterceptors") val networkInterceptors: List<Interceptor> =
builder.networkInterceptors.toImmutableList()
...
}
经过OkHttpClient
目标的源码能够得知,Builder
创立的调度器、拦截器终究都会交给OkHttpClient
,这是建造者形式的特定。
3.Request目标是怎么创立的
val request = Request.Builder()
.url("https://wanandroid.com/wxarticle/list/408/1/json") //添加恳求的url地址
.build() //回来一个Request目标
open class Builder {
//恳求地址
internal var url: HttpUrl? = null
//恳求办法
internal var method: String
//恳求头
internal var headers: Headers.Builder
//恳求体
internal var body: RequestBody? = null
...
}
4.创立Call目标
val call = client.newCall(request)
override fun newCall(request: Request): Call {
//newRealCall中传递了三个参数,第一个参数是OkHttpClient自身,第二个参数request,
//第三个不用重视
return RealCall.newRealCall(this, request, forWebSocket = false)
}
接着咱们先来看一下RealCall
是什么
class RealCall(
val client: OkHttpClient,
/** The application's original request unadulterated by redirects or auth headers. */
val originalRequest: Request,
val forWebSocket: Boolean
) : Call {
...
}
从源码可知RealCall
是Call
的子类,那么Call
又是什么呢,往下看
//调用是预备好要履行的恳求。也能够撤销调用。
//由于该目标表明单个恳求/呼应对(流),因而不能履行两次。
interface Call : Cloneable {
//回来建议此调用的原始恳求。
fun request(): Request
@Throws(IOException::class)
//同步恳求,当即调用恳求,并阻塞,直到呼应能够处理或呈现错误。
fun execute(): Response
//异步恳求,承受回调参数
fun enqueue(responseCallback: Callback)
//撤销恳求
fun cancel()
//假如此调用已被履行或进入行列,则回来true。多次履行调用是错误的。
fun isExecuted(): Boolean
//是否是撤销状况
fun isCanceled(): Boolean
//超时时间,
fun timeout(): Timeout
//创立一个与此调用相同的新调用,即便该调用现已进入行列或履行,该调用也能够被加入行列或履行。
public override fun clone(): Call
fun interface Factory {
fun newCall(request: Request): Call
}
}
5.建议恳求
- 以异步恳求为例进行分析
call.enqueue(object : Callback {
override fun onFailure(call: Call, e: IOException) {
println("onFailure:$e")
}
override fun onResponse(call: Call, response: Response) {
println("onResponse:${response.body.toString()}")
}
})
RealCall
是Call
的子类所以enqueue
的详细实现是在RealCall
中
override fun enqueue(responseCallback: Callback) {
//检查是否进行了二次恳求
check(executed.compareAndSet(false, true)) { "Already Executed" }
//恳求后当即调用,相当于监听恳求的开端事情
callStart()
//将恳求交给调度器来决定什么时候开端恳求
client.dispatcher.enqueue(AsyncCall(responseCallback))
}
private fun callStart() {
this.callStackTrace = Platform.get().getStackTraceForCloseable("response.body().close()")
eventListener.callStart(this)
}
- 疑问:
client.dispatcher.enqueue
是怎么决定什么时候开端恳求的
-
- 已知
client
就是OkHttpClient
- 已知
-
-
dispatcher
是调度器,先来看一下它的源码
-
class Dispatcher constructor() {
//并发履行的最大恳求数。上面的恳求行列在内存中,等候正在运转的调用完结。
//假如在调用这个函数时有超越maxRequests的恳求在运转,那么这些恳求将保持在运转状况。
@get:Synchronized var maxRequests = 64
set(maxRequests) {
require(maxRequests >= 1) { "max < 1: $maxRequests" }
synchronized(this) {
field = maxRequests
}
promoteAndExecute()
}
//每台主机可并发履行的最大恳求数。这约束了URL主机名的恳求。
//留意,对单个IP地址的并发恳求依然可能超越此约束:
//多个主机名可能同享一个IP地址或经过同一个HTTP代理路由。
@get:Synchronized var maxRequestsPerHost = 5
set(maxRequestsPerHost) {
require(maxRequestsPerHost >= 1) { "max < 1: $maxRequestsPerHost" }
synchronized(this) {
field = maxRequestsPerHost
}
promoteAndExecute()
}
//线程安全的单例形式,线程池的获取用于线程调度。
@get:Synchronized
@get:JvmName("executorService") val executorService: ExecutorService
get() {
if (executorServiceOrNull == null) {
executorServiceOrNull = ThreadPoolExecutor(0, Int.MAX_VALUE, 60, TimeUnit.SECONDS,
SynchronousQueue(), threadFactory("$okHttpName Dispatcher", false))
}
return executorServiceOrNull!!
}
//界说预备发送的行列
private val readyAsyncCalls = ArrayDeque<AsyncCall>()
//界说异步发送行列
private val runningAsyncCalls = ArrayDeque<AsyncCall>()
//界说同步发送行列
private val runningSyncCalls = ArrayDeque<RealCall>()
...
}
-
- 再来看一下
dispatcher.enqueue
的源码
- 再来看一下
internal fun enqueue(call: AsyncCall) {
synchronized(this) {
//将call目标添加到预备发送的行列,这个call目标来自AsyncCall,稍后再讲
readyAsyncCalls.add(call)
//修正AsyncCall,使其同享对同一主机的现有运转调用的AtomicInteger。
if (!call.get().forWebSocket) {
//同享一个现已存在的正在运转调用的AtomicInteger
val existingCall = findExistingCallWithHost(call.host())
//统计发送数量
if (existingCall != null) call.reuseCallsPerHostFrom(existingCall)
}
}
//预备发送恳求
promoteAndExecute()
}
//将契合条件的call从readyAsyncCalls(预备发送的行列)添加到runningAsyncCalls(异步发送行列)中
//并在服务器上履行它们
//不能在同步时调用,由于履行调用能够调用用户代码
//假如调度程序当前正在运转,则为true。
private fun promoteAndExecute(): Boolean {
assert(!Thread.holdsLock(this))
//搜集所有需求履行的恳求
val executableCalls = mutableListOf<AsyncCall>()
val isRunning: Boolean
synchronized(this) {
val i = readyAsyncCalls.iterator()
//遍历预备发送的行列
while (i.hasNext()) {
val asyncCall = i.next()
//判别现已发送的恳求是大于等于最大恳求个数64,是则跳出循环
if (runningAsyncCalls.size >= this.maxRequests) break // Max capacity.
//判别并发恳求个数是否大于等于最大并发个数5,假如是则跳出循环
if (asyncCall.callsPerHost().get() >= this.maxRequestsPerHost) continue // Host max capacity.
//从预备行列中删除
i.remove()
//计数+1
asyncCall.callsPerHost().incrementAndGet()
executableCalls.add(asyncCall)
//将目标添加到异步发送行列中
runningAsyncCalls.add(asyncCall)
}
isRunning = runningCallsCount() > 0
}
for (i in 0 until executableCalls.size) {
val asyncCall = executableCalls[i]
//提交任务到线程池
asyncCall.executeOn(executorService)
}
return isRunning
}
-
- 终究恳求是经过
AsyncCall
的executeOn
发送出去的,AsyncCall
是什么
- 终究恳求是经过
internal inner class AsyncCall(
private val responseCallback: Callback
) : Runnable {
...
}
-
- 接收了一个回调,承继了Runnable
fun executeOn(executorService: ExecutorService) {
client.dispatcher.assertThreadDoesntHoldLock()
//暂时界说为未履行成功
var success = false
try {
//运用线程履行自身
executorService.execute(this)
//履行成功
success = true
} catch (e: RejectedExecutionException) {
val ioException = InterruptedIOException("executor rejected")
ioException.initCause(e)
noMoreExchanges(ioException)
//发送失利
responseCallback.onFailure(this@RealCall, ioException)
} finally {
if (!success) {
//发送结束
client.dispatcher.finished(this) // 中止运转
}
}
}
-
-
AsyncCall
将自己加入到线程池,然后线程池敞开线程履行自己的run办法,那么AsyncCall
加入了一个怎样的线程池呢?
-
@get:Synchronized
@get:JvmName("executorService") val executorService: ExecutorService
get() {
if (executorServiceOrNull == null) {
executorServiceOrNull = ThreadPoolExecutor(0, Int.MAX_VALUE, 60, TimeUnit.SECONDS,
SynchronousQueue(), threadFactory("$okHttpName Dispatcher", false))
}
return executorServiceOrNull!!
}
-
- 这儿界说了一个缓存线程池,具有缓存功用且假如一个恳求履行结束后在60s内再次建议就会复用方才那个线程,提高了功能。
- 交给线程池后,线程池会敞开线程履行AsyncCall的run办法
override fun run() {
threadName("OkHttp ${redactedUrl()}") {
//界说呼应标志位,用于表明恳求是否成功
var signalledCallback = false
timeout.enter()
try {
//发送恳求并得到结果
val response = getResponseWithInterceptorChain()
//过程未出错
signalledCallback = true
//回调结果
responseCallback.onResponse(this@RealCall, response)
} catch (e: IOException) {
if (signalledCallback) {
// 不要两次发出回调信号!
Platform.get().log(INFO, "Callback failure for ${toLoggableString()}", e)
} else {
//失利回调
responseCallback.onFailure(this@RealCall, e)
}
} catch (t: Throwable) {
cancel()
if (!signalledCallback) {
val canceledException = IOException("canceled due to $t")
canceledException.addSuppressed(t)
responseCallback.onFailure(this@RealCall, canceledException)
}
throw t
} finally {
client.dispatcher.finished(this)
}
}
}
在恳求得到结果后最后会调用finish
表明完结,这儿的finish
又做了什么呢?
/**
* 由[AsyncCall.run]用于表明完结。
*/
internal fun finished(call: AsyncCall) {
call.callsPerHost.decrementAndGet()
finished(runningAsyncCalls, call)
}
/**
* 由[Call.execute]用于表明完结。
*/
internal fun finished(call: RealCall) {
finished(runningSyncCalls, call)
}
private fun <T> finished(calls: Deque<T>, call: T) {
val idleCallback: Runnable?
synchronized(this) {
//将完结的任务从行列中删除
if (!calls.remove(call)) throw AssertionError("Call wasn't in-flight!")
idleCallback = this.idleCallback
}
//用于将等候行列中的恳求移入异步行列,并交由线程池履行
val isRunning = promoteAndExecute()
//假如没有恳求需求履行,回调搁置callback
if (!isRunning && idleCallback != null) {
idleCallback.run()
}
}
- 流程图如下