本文已收录到 AndroidFamily,技能和职场问题,请关注公众号 [彭旭锐] 发问。

前语

大家好,我是小彭。

在上一篇文章里,咱们聊到了 Square 开源的 I/O 结构 Okio 的三个优势:精简且全面的 API、基于同享的缓冲区规划以及超时机制。前两个优势现已剖析过了,今日咱们来剖析 Okio 的超时检测机制。

本文源码基于 Okio v3.2.0。


学习路线图:

Android IO 框架 Okio 的实现原理,如何检测超时?


1. 知道 Okio 的超时机制

超时机制是一项通用的体系规划,能够防止体系长时刻阻塞在某些使命上。例如网络恳求在超时时刻内没有呼应,客户端就会提早中止恳求,并提示用户某些功用不可用。

1.1 说一下 Okio 超时机制的优势

先考虑一个问题,比较于传统 IO 的超时有什么优势呢?我以为首要体现在 2 个方面:

  • 优势 1 – Okio 弥补了部分 IO 操作不支撑超时检测的缺陷:

Java 原生 IO 操作是否支撑超时,完全取决于底层的体系调用是否支撑。例如,网络 Socket 支撑经过 setSoTimeout API 设置单次 IO 操作的超时时刻,而文件 IO 操作就不支撑,运用原生文件 IO 就无法完成超时。

而 Okio 是统一在应用层完成超时检测,不论体系调用是否支撑超时,都能供给统一的超时检测机制。

  • 优势 2 – Okio 不只支撑单次 IO 操作的超时检测,还支撑包括屡次 IO 操作的复合使命超时检测:

Java 原生 IO 操作只能完成对单次 IO 操作的超时检测,无法完成对包括屡次 IO 操作的复合使命超时检测。例如,OkHttp 支撑装备单次 connect、read 或 write 操作的超时检测,还支撑对一次完好 Call 恳求的超时检测,有时候单个操作没有超时,但串联起来的完好 call 却超时了。

而 Okio 超时机制和 IO 操作没有强耦合,不只支撑对 IO 操作的超时检测,还支撑非 IO 操作的超时检测,所以这种复合使命的超时检测也是能够完成的。

1.2 Timeout 类的作用

Timeout 类是 Okio 超时机制的核心类,Okio 对 Source 输入流和 Sink 输出流都供给了超时机制,咱们在构造 InputStreamSource 和 OutputStreamSink 这些流的完成类时,都需求带着 Timeout 目标:

Source.kt

interface Source : Closeable {
    // 返回超时操控目标
    fun timeout(): Timeout
    ...
}

Sink.kt

actual interface Sink : Closeable, Flushable {
    // 返回超时操控目标
    actual fun timeout(): Timeout
    ...
}

Timeout 类供给了两种装备超时时刻的办法(假如两种办法一起存在的话,Timeout 会优先选用更早的截止时刻):

  • 1、timeoutNanos 使命处理时刻: 设置处理单次使命的超时时刻,

终究触发超时的截止时刻是使命的 startTime + timeoutNanos

  • 2、deadlineNanoTime 截止时刻: 直接设置未来的某个时刻点,多个使命整体的超时时刻点。

Timeout.kt

// hasDeadline 这个属性显得没必要
private var hasDeadline = false // 是否设置了截止时刻点
private var deadlineNanoTime = 0L // 截止时刻点(单位纳秒)
private var timeoutNanos = 0L // 处理单次使命的超时时刻(单位纳秒)

创建 Source 和 Sink 目标时,都需求带着 Timeout 目标:

JvmOkio.kt

// ----------------------------------------------------------------------------
// 输入流
// ----------------------------------------------------------------------------
fun InputStream.source(): Source = InputStreamSource(this, Timeout() /*Timeout 目标*/)
// 文件输入流
fun File.source(): Source = InputStreamSource(inputStream(), Timeout.NONE)
// Socket 输入流
fun Socket.source(): Source {
    val timeout = SocketAsyncTimeout(this)
    val source = InputStreamSource(getInputStream(), timeout /*带着 Timeout 目标*/)
    // 包装为异步超时
    return timeout.source(source)
}
// ----------------------------------------------------------------------------
// 输出流
// ----------------------------------------------------------------------------
fun OutputStream.sink(): Sink = OutputStreamSink(this, Timeout() /*Timeout 目标*/)
// 文件输出流
fun File.sink(append: Boolean = false): Sink = FileOutputStream(this, append).sink()
// Socket 输出流
fun Socket.sink(): Sink {
    val timeout = SocketAsyncTimeout(this)
    val sink = OutputStreamSink(getOutputStream(), timeout /*带着 Timeout 目标*/)
    // 包装为异步超时
    return timeout.sink(sink)
}

在 Timeout 类的基础上,Okio 供给了 2 种超时机制:

  • Timeout 是同步超时
  • AsyncTimeout 是异步超时

Okio 结构

Android IO 框架 Okio 的实现原理,如何检测超时?


2. Timeout 同步超时

Timeout 同步超时依赖于 Timeout#throwIfReached() 办法。

同步超时在每次履行使命之前,都需求先调用 Timeout#throwIfReached() 查看当时时刻是否到达超时截止时刻。假如超时则会直接抛出超时反常,不会再履行使命。

JvmOkio.kt

private class InputStreamSource(
    // 输入流
    private val input: InputStream,
    // 超时操控
    private val timeout: Timeout
) : Source {
    override fun read(sink: Buffer, byteCount: Long): Long {
        // 1、参数校验
        if (byteCount == 0L) return 0
        require(byteCount >= 0) { "byteCount < 0: $byteCount" }
        // 2、查看超时时刻
        timeout.throwIfReached()
        // 3、履行输入使命(已简化)
        val bytesRead = input.read(...)
        return bytesRead.toLong()
    }
    ...
}
private class OutputStreamSink(
    // 输出流
    private val out: OutputStream,
    // 超时操控
    private val timeout: Timeout
) : Sink {
    override fun write(source: Buffer, byteCount: Long) {
        // 1、参数校验
        checkOffsetAndCount(source.size, 0, byteCount)
        // 2、查看超时时刻
        timeout.throwIfReached()
        // 3、履行输入使命(已简化)
        out.write(...)
        ...
    }
    ...
}

看一眼 Timeout#throwIfReached 的源码。 能够看到,同步超时只考虑 “deadlineNanoTime 截止时刻”,假如只设置 “timeoutNanos 使命处理时刻” 是无效的,我觉得这个规划容易让开发者出错。

Timeout.kt

@Throws(IOException::class)
open fun throwIfReached() {
    if (Thread.interrupted()) {
        // 传递中止状况
        Thread.currentThread().interrupt() // Retain interrupted status.
        throw InterruptedIOException("interrupted")
    }
    if (hasDeadline && deadlineNanoTime - System.nanoTime() <= 0) {
        // 抛出超时反常
        throw InterruptedIOException("deadline reached")
    }
}

有必要解说所谓 “同步” 的意思:

同步超时便是指使命的 “履行” 和 “超时查看” 是同步的。当使命超时时,Okio 同步超时不会直接中止使命履行,而是需求检自动查超时时刻(Timeout#throwIfReached)来判断是否产生超时,再决议是否中止使命履行。

这其实与 Java 的中止机制是十分相似的:

当 Java 线程的中止标记位置位时,并不是真的会直接中止线程履行,而是自动需求查看中止标记位(Thread.interrupted)来判断是否产生中止,再决议是否中止线程使命。所以说 Java 的线程中止机制是一种 “同步中止”。

能够看出,同步超时存在 “滞后性”:

由于同步超时需求自动查看,所以即使在使命履行过程中产生超时,也有必要比及查看时才会发现超时,无法及时触发超时反常。因而,就需求异步超时机制。

同步超时示意图

Android IO 框架 Okio 的实现原理,如何检测超时?


3. AsyncTimeout 异步超时

  • 异步超时监控进入: 异步超时在每次履行使命之前,都需求先调用 AsyncTimeout#enter() 办法将 AsyncTimeout 挂载到超时行列中,并根据超时截止时刻的先后顺序排序,行列头部的节点便是会最早超时的使命;

  • 异步超时监控退出: 在每次使命履行完毕之后,都需求再调用 AsyncTimeout#exit() 办法将 AsyncTimeout 从超时行列中移除。

留意: enter() 办法和 eixt() 办法有必要成对存在。

AsyncTimeout.kt

open class AsyncTimeout : Timeout() {
    // 是否在等候行列中
    private var inQueue = false
    // 后续指针
    private var next: AsyncTimeout? = null
    // 超时截止时刻
    private var timeoutAt = 0L
    // 异步超时监控进入
    fun enter() {
        check(!inQueue) { "Unbalanced enter/exit" }
        val timeoutNanos = timeoutNanos()
        val hasDeadline = hasDeadline()
        if (timeoutNanos == 0L && !hasDeadline) {
            return
        }
        inQueue = true
        scheduleTimeout(this, timeoutNanos, hasDeadline)
    }
    // 异步超时监控退出
    // 返回值:是否产生超时(假如节点不存在,阐明被 WatchDog 线程移除,即产生超时)
    fun exit(): Boolean {
        if (!inQueue) return false
        inQueue = false
        return cancelScheduledTimeout(this)
    }
    // 在 WatchDog 线程调用
    protected open fun timedOut() {}
    companion object {
        // 超时行列头节点(岗兵节点)
        private var head: AsyncTimeout? = null
        // 分发超时监控使命
        private fun scheduleTimeout(node: AsyncTimeout, timeoutNanos: Long, hasDeadline: Boolean) {
            synchronized(AsyncTimeout::class.java) {
                // 初次增加监控时,需求发动 Watchdog 线程
                if (head == null) {
                    // 岗兵节点
                    head = AsyncTimeout()
                    Watchdog().start()
                }
                // now:当时时刻
                val now = System.nanoTime()
                // timeoutAt 超时截止时刻:核算 now + timeoutNanos 和 deadlineNanoTime 的较小值
                if (timeoutNanos != 0L && hasDeadline) {
                    node.timeoutAt = now + minOf(timeoutNanos, node.deadlineNanoTime() - now)
                } else if (timeoutNanos != 0L) {
                    node.timeoutAt = now + timeoutNanos
                } else if (hasDeadline) {
                    node.timeoutAt = node.deadlineNanoTime()
                } else {
                    throw AssertionError()
                }
                // remainingNanos 超时剩余时刻:当时时刻间隔超时产生的时刻
                val remainingNanos = node.remainingNanos(now)
                var prev = head!!
                // 线性遍历超时行列,依照超时截止时刻将 node 节点刺进超时行列
                while (true) {
                    if (prev.next == null || remainingNanos < prev.next!!.remainingNanos(now)) {
                        node.next = prev.next
                        prev.next = node
                        // 假如刺进到行列头部,需求唤醒 WatchDog 线程
                        if (prev === head) {
                            (AsyncTimeout::class.java as Object).notify()
                        }
                        break
                    }
                    prev = prev.next!!
                }
            }
        }
        // 撤销超时监控使命
        // 返回值:是否超时
        private fun cancelScheduledTimeout(node: AsyncTimeout): Boolean {
            synchronized(AsyncTimeout::class.java) {
                // 线性遍历超时行列,将 node 节点移除
                var prev = head
                while (prev != null) {
                    if (prev.next === node) {
                        prev.next = node.next
                        node.next = null
                        return false
                    }
                    prev = prev.next
                }
                // 假如节点不存在,阐明被 WatchDog 线程移除,即产生超时
                return true
            }
        }
    }
}

一起,在初次增加异步超时监控时,AsyncTimeout 内部会开启一个 WatchDog 守护线程,依照 “检测 – 等候” 模型观察超时行列的头节点:

  • 假如产生超时,则将头节点移除,并回调 AsyncTimeout#timeOut() 办法。这是一个空办法,需求由子类完成来自动撤销使命;

  • 假如未产生超时,则 WatchDog 线程会核算间隔超时产生的时刻间隔,调用 Object#wait(时刻间隔) 进入限时等候。

需求留意的是: AsyncTimeout#timeOut() 回调中不能履行耗时操作,否则会影响后续检测的及时性。

有意思的是:咱们会发现 Okio 的超时检测机制和 Android ANR 的超时检测机制十分相似,所以咱们能够说 ANR 也是一种异步超时机制。

AsyncTimeout.kt

private class Watchdog internal constructor() : Thread("Okio Watchdog") {
    init {
        // 守护线程
        isDaemon = true
    }
    override fun run() {
        // 死循环
        while (true) {
            try {
                var timedOut: AsyncTimeout? = null
                synchronized(AsyncTimeout::class.java) {
                    // 取头节点(Maybe wait)
                    timedOut = awaitTimeout()
                    // 超时行列为空,退出线程
                    if (timedOut === head) {
                        head = null
                        return
                    }
                }
                // 超时产生,触发 AsyncTimeout#timedOut 回调
                timedOut?.timedOut()
            } catch (ignored: InterruptedException) {
            }
        }
    }
}
companion object {
    // 超时行列为空时,再等候一轮的时刻
    private val IDLE_TIMEOUT_MILLIS = TimeUnit.SECONDS.toMillis(60)
    private val IDLE_TIMEOUT_NANOS = TimeUnit.MILLISECONDS.toNanos(IDLE_TIMEOUT_MILLIS)
    @Throws(InterruptedException::class)
    internal fun awaitTimeout(): AsyncTimeout? {
        // Get the next eligible node.
        val node = head!!.next
        // 假如超时行列为空
        if (node == null) {
            // 需求再等候 60s 后再判断(例如在初次增加监控时)
            val startNanos = System.nanoTime()
            (AsyncTimeout::class.java as Object).wait(IDLE_TIMEOUT_MILLIS)
            return if (head!!.next == null && System.nanoTime() - startNanos >= IDLE_TIMEOUT_NANOS) {
                // 退出 WatchDog 线程
                head
            } else {
                // WatchDog 线程从头取一次
                null
            }
        }
        // 核算当时时刻间隔超时产生的时刻
        var waitNanos = node.remainingNanos(System.nanoTime())
        // 未超时,进入限时等候
        if (waitNanos > 0) {
            // Waiting is made complicated by the fact that we work in nanoseconds,
            // but the API wants (millis, nanos) in two arguments.
            val waitMillis = waitNanos / 1000000L
            waitNanos -= waitMillis * 1000000L
            (AsyncTimeout::class.java as Object).wait(waitMillis, waitNanos.toInt())
            return null
        }
        // 超时,将头节点移除
        head!!.next = node.next
        node.next = null
        return node
    }
}

异步超时示意图

Android IO 框架 Okio 的实现原理,如何检测超时?

直接看代码欠好理解,咱们来举个比如:


4. 举例:OkHttp Call 的异步超时监控

在 OkHttp 中,支撑装备一次完好的 Call 恳求上的操作时刻 callTimeout。一次 Call 恳求包括多个 IO 操作的复合使命,运用传统 IO 是不可能监控超时的,所以需求运用 AsyncTimeout 异步超时。

在 OkHttp 的 RealCall 恳求类中,就运用了 AsyncTimeout 异步超时:

  • 1、开端使命: 在 execute() 办法中,调用 AsyncTimeout#enter() 进入异步超时监控,再履行恳求;

  • 2、完毕使命: 在 callDone() 办法中,调用 AsyncTimeout#exit() 退出异步超时监控。剖析源码发现:callDone() 不只在恳求正常时会调用,在撤销恳求时也会回调,确保了 enter() 和 exit() 成对存在;

  • 3、超时回调:AsyncTimeout#timeOut 超时回调中,调用了 Call#cancel() 提早撤销恳求。Call#cancel() 会调用到 Socket#close(),让阻塞中的 IO 操作抛出 SocketException 反常,以到达提早中止的目的,终究也会走到 callDone() 履行 exit() 退出异步监控。

Call 超时监控示意图

Android IO 框架 Okio 的实现原理,如何检测超时?

RealCall

class RealCall(
    val client: OkHttpClient,
    /** The application's original request unadulterated by redirects or auth headers. */
    val originalRequest: Request,
    val forWebSocket: Boolean
) : Call {
    // 3、AsyncTimeout 超时监控
    private val timeout = object : AsyncTimeout() {
        override fun timedOut() {
            // 撤销恳求
            cancel()
        }
    }.apply {
        timeout(client.callTimeoutMillis.toLong(), MILLISECONDS)
    }
    // 撤销恳求
    override fun cancel() {
        if (canceled) return // Already canceled.
        canceled = true
        exchange?.cancel()
        // 终究会调用 Socket#close()
        connectionToCancel?.cancel()
        eventListener.canceled(this)
    }
    // 1、恳求开端(由业务层调用)
    override fun execute(): Response {
        // 1.1 异步超时监控进入
        timeout.enter()
        // 1.2 履行恳求
        client.dispatcher.executed(this)
        return getResponseWithInterceptorChain()
    }
    // 2、恳求完毕(由 OkHttp 引擎层调用,包括正常和反常状况)
    // 除了 IO 操作在抛出反常后会走到 callDone(),在撤销恳求时也会走到 callDone()
    internal fun <E : IOException?> messageDone(
        exchange: Exchange,
        requestDone: Boolean, // 恳求正常完毕
        responseDone: Boolean, // 呼应正常完毕
        e: E
    ): E {
        ...
        if (callDone) {
            return callDone(e)
        }
        return e
    }
    private fun <E : IOException?> callDone(e: E): E {
        ...
        // 查看是否超时
        val result = timeoutExit(e)
        if (e != null) {
            // 恳求反常(包括超时反常)
            eventListener.callFailed(this, result!!)
        } else {
            // 恳求正常完毕
            eventListener.callEnd(this)
        }
        return result
    }
    private fun <E : IOException?> timeoutExit(cause: E): E {
        if (timeoutEarlyExit) return cause
        // 2.1 异步超时监控退出
        if (!timeout.exit()) return cause
        // 2.2 包装超时反常
        val e = InterruptedIOException("timeout")
        if (cause != null) e.initCause(cause)
        return e as E
    }
}

调用 Socket#close() 会让阻塞中的 IO 操作抛出 SocketException 反常:

Socket.java

// Any thread currently blocked in an I/O operation upon this socket will throw a {@link SocketException}.
public synchronized void close() throws IOException {
    synchronized(closeLock) {
        if (isClosed())
            return;
        if (created)
            impl.close();
        closed = true;
    }
}

Exchange 中会捕获 Socket#close() 抛出的 SocketException 反常:

Exchange.kt

private inner class RequestBodySink(
    delegate: Sink,
    /** The exact number of bytes to be written, or -1L if that is unknown. */
    private val contentLength: Long
) : ForwardingSink(delegate) {
    @Throws(IOException::class)
    override fun write(source: Buffer, byteCount: Long) {
        ...
        try {
            super.write(source, byteCount)
            this.bytesReceived += byteCount
        } catch (e: IOException) {
            // Socket#close() 会抛出反常,被这儿拦截
            throw complete(e)
        }
    }
    private fun <E : IOException?> complete(e: E): E {
        if (completed) return e
        completed = true
        return bodyComplete(bytesReceived, responseDone = false, requestDone = true, e = e)
    }
}
fun <E : IOException?> bodyComplete(
    bytesRead: Long,
    responseDone: Boolean,
    requestDone: Boolean,
    e: E
): E {
    ...
    // 回调到上面的 RealCall#messageDone
    return call.messageDone(this, requestDone, responseDone, e)
}

5. OkHttp 超时检测总结

先说一下 Okhttp 界说的 2 种颗粒度的超时:

  • 第 1 种是在单次 connect、read 或 write 操作上的超时;
  • 第 2 种是在一次完好的 call 恳求上的超时,有时候单个操作没有超时,但连接起来的完好 call 却超时。

其实 Socket 支撑经过 setSoTimeout API 设置单次操作的超时时刻,但这个 API 无法满意需求,比如说 Call 超时是包括多个 IO 操作的复合使命,并且不论是 HTTP/1 并行恳求还是 HTTP/2 多路复用,都会存在一个 Socket 连接上一起承载多个恳求的状况,无法区分是哪个恳求超时。

因而,OkHttp 选用了两种超时监测:

  • 关于 connect 操作,OkHttp 持续运用 Socket 级别的超时,没有问题;
  • 关于 call、read 和 write 的超时,OkHttp 运用一个 Okio 的异步超时机制来监测超时。

版权声明

本文为稀土技能社区首发签约文章,14天内禁止转载,14天后未获授权禁止转载,侵权必究!

Android IO 框架 Okio 的实现原理,如何检测超时?