目录:

  1. 入门级协程协程使用3步骤
  2. 入门级协程整体架构
  3. 高阶版本协程整体思路

1. 协程使用3步骤

三步思想:

1). 启动协程
2). 挂起,delay的实现
3). 协程调度
fun main() {
    val builder = CoroutineBuilder()  
    // 定义一个简单的挂起函数  
    val suspendFunction: SuspendFunction<Unit> = { continuation ->  
        println("Coroutine is running")  
        continuation.resumeWith(Unit) // 完成协程  
    }  
    // 启动协程  
    runCoroutine(builder, suspendFunction)  
    println("Main thread continues")  
}

2. 整体架构图

Anroid 手写kotlin简单版协程框架(协程的顶级进阶)!

手写kotlin协程2.jpg

2.1 协程的状态 :

// 协程状态  
enum class CoroutineStatus {  
    ACTIVE,  
    COMPLETED,  
    CANCELLED  
}  
2.2 // 协程上下文接口
interface CoroutineContext {
    // 可能的上下文元素,如调度器、取消标志等  
} 

2.3 协程引用

class Continuation<T> {
    var result: T? = null  
    var status: CoroutineStatus = CoroutineStatus.ACTIVE  
    var context: CoroutineContext? = null  
    // 恢复协程  
    fun resumeWith(result: T) {  
        this.result = result  
        status = CoroutineStatus.COMPLETED  
        // 这里应该有一个调度器来决定如何恢复协程  
        // 但为了简单起见,我们直接调用 resume 函数  
        resume()  
    }  
    // 恢复协程的占位符方法,需要由子类实现  
    open fun resume() {  
        throw IllegalStateException("Must be implemented by subclass")  
    }  
}  

2.4 // 挂起函数类型别名

typealias SuspendFunction = (Continuation) -> Unit

2.5 // 协程构建器

class CoroutineBuilder {
    fun <T> build(start: SuspendFunction<T>): T {  
        // 创建一个初始的 Continuation 对象  
        var continuation = object : Continuation<T>() {  
            override fun resume() {  
                // 这里是协程的入口点,我们可以开始执行挂起函数  
                start(this)  
            }  
        }  
        // 启动协程执行  
        continuation.resume()  
        // 等待协程完成并返回结果  
        while (continuation.status == CoroutineStatus.ACTIVE) {  
            // 简单的忙等待,实际中应该有更复杂的调度逻辑  
        }  
        return continuation.result ?: throw IllegalStateException("Coroutine did not complete")  
    }  
}
// 协程启动函数  
fun <T> runCoroutine(builder: CoroutineBuilder, start: SuspendFunction<T>): T {  
    return builder.build(start)  
}

3. 高阶版本协程整体思路

3.1 调用思想:

    private fun testWith() {
        Log.d("MainActivity","pre")
        GlobalScope.launch(Dispatchers.Main) {
            loadData()
        }
        Log.d("MainActivity","end")
    }
    suspend fun loadData(): String {
        Log.d("MainActivity","loadData pre")
        withContext(Dispatchers.IO) {
            Log.d("MainActivity","loadData withContext")
            delay(1000)
        }
        Log.d("MainActivity","loadData end")
        return "MainActivity"
    }

协程的执行顺序!

2024-01-27 12:03:27.258 12490-12490 MainActivity            com.baidu.cdcos.xiecheng             D  pre
2024-01-27 12:03:27.259 12490-12490 MainActivity            com.baidu.cdcos.xiecheng             D  end
2024-01-27 12:03:27.259 12490-12490 MainActivity            com.baidu.cdcos.xiecheng             D  loadData pre
2024-01-27 12:03:27.260 12490-12623 MainActivity            com.baidu.cdcos.xiecheng             D  loadData withContext
2024-01-27 12:03:28.264 12490-12490 MainActivity            com.baidu.cdcos.xiecheng             D  loadData end

提供了协程创建、调度、取消、异常处理、作用域等

3.2 ). 接口Continuation

实现delay函数! —–》自定义挂起函数
相当于: 延时+ 挂起
suspendCoroutine() 或者suspendCancellableCoroutine()
接口还是用官方公用的接口Continuation, 接口的定义需要看懂下

@SinceKotlin("1.3")
public interface Continuation<in T> {
    /**
     * The context of the coroutine that corresponds to this continuation.
     */
    public val context: CoroutineContext
    /**
     * Resumes the execution of the corresponding coroutine passing a successful or failed [result] as the
     * return value of the last suspension point.
     */
    public fun resumeWith(result: Result<T>)
}

3.3 ). 协程创建和开始的接口定义

public fun <R, T> (suspend R.() -> T).createCoroutine(
    receiver: R,
    completion: Continuation<T>
): Continuation<Unit> =
    SafeContinuation(createCoroutineUnintercepted(receiver, completion).intercepted(), COROUTINE_SUSPENDED)
public fun <R, T> (suspend R.() -> T).startCoroutine(
    receiver: R,
    completion: Continuation<T>
) {
    createCoroutineUnintercepted(receiver, completion).intercepted().resume(Unit)  // resume: 启动协程!
}

createCoroutine{}和startCoroutine{}都是扩展函数,而且扩展的接收者类型是(suspend R.() -> T)。
createCoroutineUnintercepted在源代码中只是一个声明,它的具体实现是在IntrinsicsJvm.kt文件中。

public actual fun <T> (suspend () -> T).createCoroutineUnintercepted(
    completion: Continuation<T>
): Continuation<Unit> {
    val probeCompletion = probeCoroutineCreated(completion)
    return if (this is BaseContinuationImpl)
        create(probeCompletion)//这个协程体会被编译成 SuspendLambda 的子类,其也是 BaseContinuationImpl 的子类对象,因此会走create() 方法
    else
        createCoroutineFromSuspendFunction(probeCompletion) {
            (this as Function1<Continuation<T>, Any?>).invoke(it)
        }
}

3.4). 协程接口的一些具体实现

ContinuationImpl
BaseContinuationImpl
SuspendLambda
ContinuationImpl继承自BaseContinuationImpl

internal abstract class ContinuationImpl(
    completion: Continuation<Any?>?,
    private val _context: CoroutineContext?
) : BaseContinuationImpl(completion) {
    constructor(completion: Continuation<Any?>?) : this(completion, completion?.context)
    public override val context: CoroutineContext
        get() = _context!!
    @Transient
    private var intercepted: Continuation<Any?>? = null
    public fun intercepted(): Continuation<Any?> =
        intercepted
            ?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
                .also { intercepted = it }
    protected override fun releaseIntercepted() {
        val intercepted = intercepted
        if (intercepted != null && intercepted !== this) {
            context[ContinuationInterceptor]!!.releaseInterceptedContinuation(intercepted)
        }
        this.intercepted = CompletedContinuation // just in case
    }
}

3.5). 协程调用的具体封装Job

3). 协程和线程一样, 有一个对象
start方法有封装么????
coroutine.start 的调用涉及到运算符重载,实际上会调到 CoroutineStart.invoke() 方法:
最后还是调用到了startCoroutine()
Job是干嘛的?
用来给调用者用的, 这样方便!

interface Job : CoroutineContext.Element {
    companion object Key : CoroutineContext.Key<Job>         // 重要的方法
    override val key: CoroutineContext.Key<*> get() = Job   // 通过key获取到job
    val isActive: Boolean
    fun invokeOnCancel(onCancel: OnCancel): Disposable
    fun invokeOnCompletion(onComplete: OnComplete): Disposable
    fun cancel()
    fun remove(disposable: Disposable)
    fun attachChild(child: Job): Disposable
    suspend fun join()
}

3.6) 协程的状态: 未完成、已取消、已完成, 三种状态, 3个class继承CoroutineState类

sealed class CoroutineState {
    protected var disposableList: RecursiveList<Disposable> = RecursiveList.Nil
        private set
    protected var children: RecursiveList<Job> = RecursiveList.Nil
        private set
    fun from(state: CoroutineState): CoroutineState {
        this.disposableList = state.disposableList
        this.children = state.children
        return this
    }
    class InComplete : CoroutineState()    // 未完成
    class Cancelling: CoroutineState()      // 取消
    class Complete<T>(val value: T? = null, val exception: Throwable? = null) : CoroutineState() // 完成

状态的原子性迁移

abstract class AbstractCoroutine<T>(context: CoroutineContext) : Job, Continuation<T>, CoroutineScope {
    protected val state = AtomicReference<CoroutineState>() // 原子性 
    override fun resumeWith(result: Result<T>) {
        val newState = state.updateAndGet { prevState ->    // updateAndGet
            when (prevState) {
                //although cancelled, flows of job may work out with the normal result.
                is CoroutineState.Cancelling,
                is CoroutineState.InComplete -> prevState.tryComplete(result)
                is CoroutineState.CompleteWaitForChildren<*>,
                is CoroutineState.Complete<*> -> {
                    throw IllegalStateException("Already completed!")
                }
            }
        }

数据结构:

sealed class RecursiveList<out T> {
    object Nil: RecursiveList<Nothing>()
    class Cons<T>(val head: T, val tail: RecursiveList<T>): RecursiveList<T>()
}
fun <T> RecursiveList<T>.remove(element: T): RecursiveList<T> {
    return when(this){
        RecursiveList.Nil -> this
        is RecursiveList.Cons -> {
            if(head == element){
                return tail
            } else {
                RecursiveList.Cons(head, tail.remove(element))
            }
        }
    }
}

3.8) 协程的状态:协程的启动封装

启动返回值: Job

fun CoroutineScope.launch(context: CoroutineContext = EmptyCoroutineContext, block: suspend CoroutineScope.() -> Unit): Job {
    val completion = StandaloneCoroutine(newCoroutineContext(context))
    block.startCoroutine(completion, completion)
    return completion
}

这样开启的

public fun <R, T> (suspend R.() -> T).startCoroutine(
    receiver: R,
    completion: Continuation<T>
) {
    createCoroutineUnintercepted(receiver, completion).intercepted().resume(Unit)
}

3.7) 协程完成的封装

完成回调注册
调用的地方在哪?
是不是协程的状态变化了, 就会调用????
从AbstractCoroutine调用到 CoroutineState

   * 完成回调的注册
     */
    protected fun doOnCompleted(block: (Result<T>) -> Unit): Disposable {
        val disposable = CompletionHandlerDisposable(this, block)
        val newState = state.updateAndGet { prev ->
            when (prev) {
                is CoroutineState.InComplete -> {
                    CoroutineState.InComplete().from(prev).with(disposable)
                }
                is CoroutineState.Cancelling -> {
                    CoroutineState.Cancelling().from(prev).with(disposable)
                }
                is CoroutineState.Complete<*> -> {
                    prev
                }
                is CoroutineState.CompleteWaitForChildren<*> -> prev.copy().with(disposable)
            }
        }
        (newState as? CoroutineState.Complete<T>)?.let {
            block(
                    when {
                        it.exception != null -> Result.failure(it.exception)
                        it.value != null -> Result.success(it.value)
                        else -> throw IllegalStateException("Won't happen.")
                    }
            )
        }
        return disposable
    }
    /***
     * 通知协程完成
     */
    fun <T> notifyCompletion(result: Result<T>) {
        this.disposableList.loopOn<CompletionHandlerDisposable<T>> {
            it.onComplete(result)
        }
    }
    private fun tryCompleteOnChildCompleted(child: Job) {
        val newState = state.updateAndGet { prev ->
            when (prev) {
                is CoroutineState.Cancelling,
                is CoroutineState.InComplete -> {
                    throw IllegalStateException("Should be waiting for children!")
                }
                is CoroutineState.CompleteWaitForChildren<*> -> {
                    prev.onChildCompleted(child)
                }
                is CoroutineState.Complete<*> -> throw IllegalStateException("Already completed!")
            }
        }
        (newState as? CoroutineState.Complete<T>)?.let {
            makeCompletion(it)
        }
    }
    private fun makeCompletion(newState: CoroutineState.Complete<T>){
        val result = if (newState.exception == null) {
            Result.success(newState.value)
        } else {
            Result.failure<T>(newState.exception)
        }
        result.exceptionOrNull()?.let(this::tryHandleException)
        newState.notifyCompletion(result)
        newState.clear()
        parentCancelDisposable?.dispose()
    }

3.9) 协程调度器的设计

/**
 * 调度器的接口定义
 */
interface Dispatcher {
    fun dispatch(block: ()->Unit)
}

拦截器一般是干嘛的?

open class DispatcherContext(private val dispatcher: Dispatcher) : AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
    override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T>
            = DispatchedContinuation(continuation, dispatcher)
}
/**
 * 拦截器实现调度
 */
private class DispatchedContinuation<T>(val delegate: Continuation<T>, val dispatcher: Dispatcher) : Continuation<T>{
    override val context = delegate.context
    override fun resumeWith(result: Result<T>) {
        dispatcher.dispatch {
            delegate.resumeWith(result)
        }
    }
}

线程池:

/***
 * 默认的调度器
 */
object DefaultDispatcher: Dispatcher {
    private val threadGroup = ThreadGroup("DefaultDispatcher")
    private val threadIndex = AtomicInteger(0)
    private val executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() + 1) { runnable ->
        Thread(threadGroup, runnable, "${threadGroup.name}-worker-${threadIndex.getAndIncrement()}").apply { isDaemon = true }
    }
    override fun dispatch(block: () -> Unit) {
        executor.submit(block)
    }
}
/**
 * 调度器的入库 
 */
object Dispatchers {
    val Android by lazy {
        DispatcherContext(AndroidDispatcher)
    }
    val Swing by lazy {
        DispatcherContext(SwingDispatcher)
    }
    val Default by lazy {
        DispatcherContext(DefaultDispatcher)
    }
}

默认调度器

fun CoroutineScope.newCoroutineContext(context: CoroutineContext): CoroutineContext {
    val combined = scopeContext + context + CoroutineName("@coroutine#${coroutineIndex.getAndIncrement()}")
    return if (combined !== Dispatchers.Default && combined[ContinuationInterceptor] == null) combined + Dispatchers.Default else combined
}
class CoroutineName(val name: String): CoroutineContext.Element {
    companion object Key: CoroutineContext.Key<CoroutineName>
    override val key = Key
    override fun toString(): String {
        return name
    }
}

3.9) 协程取消:

    /**
     * cancel函数的实现
     */
    override fun cancel() {
        val prevState = state.getAndUpdate { prev ->
            when (prev) {
                is CoroutineState.InComplete -> {
                    CoroutineState.Cancelling().from(prev)
                }
                is CoroutineState.Cancelling,
                is CoroutineState.Complete<*> -> prev
                is CoroutineState.CompleteWaitForChildren<*> -> {
                    prev.copy(isCancelling = true)
                }
            }
        }
        if (prevState is CoroutineState.InComplete) {
            prevState.notifyCancellation()
        }
        parentCancelDisposable?.dispose()
    }
    /**
     * 支持取消回调的注册
     */
    override fun invokeOnCancel(onCancel: OnCancel): Disposable {
        val disposable = CancellationHandlerDisposable(this, onCancel)
        val newState = state.updateAndGet { prev ->
            when (prev) {
                is CoroutineState.InComplete -> {
                    CoroutineState.InComplete().from(prev).with(disposable)
                }
                is CoroutineState.Cancelling,
                is CoroutineState.Complete<*> -> {
                    prev
                }
                is CoroutineState.CompleteWaitForChildren<*> -> prev.copy().with(disposable)
            }
        }
        (newState as? CoroutineState.Cancelling)?.let {
            // call immediately when Cancelling.
            onCancel()
        }
        return disposable
    }

取消的核心类: CancellableContinuation()
异常处理器

interface CoroutineExceptionHandler : CoroutineContext.Element {
    companion object Key : CoroutineContext.Key<CoroutineExceptionHandler>
    fun handleException(context: CoroutineContext, exception: Throwable)
}
inline fun CoroutineExceptionHandler(crossinline handler: (CoroutineContext, Throwable) -> Unit): CoroutineExceptionHandler =
        object : AbstractCoroutineContextElement(CoroutineExceptionHandler), CoroutineExceptionHandler {
            override fun handleException(context: CoroutineContext, exception: Throwable) =
                    handler.invoke(context, exception)
        }

重要的一些类:
协程的创建
重要的类:
Continuatio
协程的启动
SuspendLambda
BaseContinuationImpl
SafeContinuatio
启动方法:launchCoroutin()
挂起:
CoroutineContext
Element
Element接口中有一个属性key,这个属性很关键

拦截器 :
ContinuationInterceptor
方法 transfer()
协程体的Receiver?