目录:
- 入门级协程协程使用3步骤
- 入门级协程整体架构图
- 高阶版本协程整体思路
1. 协程使用3步骤
三步思想:
1). 启动协程
2). 挂起,delay的实现
3). 协程调度
fun main() {
val builder = CoroutineBuilder()
// 定义一个简单的挂起函数
val suspendFunction: SuspendFunction<Unit> = { continuation ->
println("Coroutine is running")
continuation.resumeWith(Unit) // 完成协程
}
// 启动协程
runCoroutine(builder, suspendFunction)
println("Main thread continues")
}
2. 整体架构图
手写kotlin协程2.jpg
2.1 协程的状态 :
// 协程状态
enum class CoroutineStatus {
ACTIVE,
COMPLETED,
CANCELLED
}
2.2 // 协程上下文接口
interface CoroutineContext {
// 可能的上下文元素,如调度器、取消标志等
}
2.3 协程引用
class Continuation<T> {
var result: T? = null
var status: CoroutineStatus = CoroutineStatus.ACTIVE
var context: CoroutineContext? = null
// 恢复协程
fun resumeWith(result: T) {
this.result = result
status = CoroutineStatus.COMPLETED
// 这里应该有一个调度器来决定如何恢复协程
// 但为了简单起见,我们直接调用 resume 函数
resume()
}
// 恢复协程的占位符方法,需要由子类实现
open fun resume() {
throw IllegalStateException("Must be implemented by subclass")
}
}
2.4 // 挂起函数类型别名
typealias SuspendFunction = (Continuation) -> Unit
2.5 // 协程构建器
class CoroutineBuilder {
fun <T> build(start: SuspendFunction<T>): T {
// 创建一个初始的 Continuation 对象
var continuation = object : Continuation<T>() {
override fun resume() {
// 这里是协程的入口点,我们可以开始执行挂起函数
start(this)
}
}
// 启动协程执行
continuation.resume()
// 等待协程完成并返回结果
while (continuation.status == CoroutineStatus.ACTIVE) {
// 简单的忙等待,实际中应该有更复杂的调度逻辑
}
return continuation.result ?: throw IllegalStateException("Coroutine did not complete")
}
}
// 协程启动函数
fun <T> runCoroutine(builder: CoroutineBuilder, start: SuspendFunction<T>): T {
return builder.build(start)
}
3. 高阶版本协程整体思路
3.1 调用思想:
private fun testWith() {
Log.d("MainActivity","pre")
GlobalScope.launch(Dispatchers.Main) {
loadData()
}
Log.d("MainActivity","end")
}
suspend fun loadData(): String {
Log.d("MainActivity","loadData pre")
withContext(Dispatchers.IO) {
Log.d("MainActivity","loadData withContext")
delay(1000)
}
Log.d("MainActivity","loadData end")
return "MainActivity"
}
协程的执行顺序!
2024-01-27 12:03:27.258 12490-12490 MainActivity com.baidu.cdcos.xiecheng D pre
2024-01-27 12:03:27.259 12490-12490 MainActivity com.baidu.cdcos.xiecheng D end
2024-01-27 12:03:27.259 12490-12490 MainActivity com.baidu.cdcos.xiecheng D loadData pre
2024-01-27 12:03:27.260 12490-12623 MainActivity com.baidu.cdcos.xiecheng D loadData withContext
2024-01-27 12:03:28.264 12490-12490 MainActivity com.baidu.cdcos.xiecheng D loadData end
提供了协程创建、调度、取消、异常处理、作用域等
3.2 ). 接口Continuation
实现delay函数! —–》自定义挂起函数
相当于: 延时+ 挂起
suspendCoroutine() 或者suspendCancellableCoroutine()
接口还是用官方公用的接口Continuation, 接口的定义需要看懂下
@SinceKotlin("1.3")
public interface Continuation<in T> {
/**
* The context of the coroutine that corresponds to this continuation.
*/
public val context: CoroutineContext
/**
* Resumes the execution of the corresponding coroutine passing a successful or failed [result] as the
* return value of the last suspension point.
*/
public fun resumeWith(result: Result<T>)
}
3.3 ). 协程创建和开始的接口定义
public fun <R, T> (suspend R.() -> T).createCoroutine(
receiver: R,
completion: Continuation<T>
): Continuation<Unit> =
SafeContinuation(createCoroutineUnintercepted(receiver, completion).intercepted(), COROUTINE_SUSPENDED)
public fun <R, T> (suspend R.() -> T).startCoroutine(
receiver: R,
completion: Continuation<T>
) {
createCoroutineUnintercepted(receiver, completion).intercepted().resume(Unit) // resume: 启动协程!
}
createCoroutine{}和startCoroutine{}都是扩展函数,而且扩展的接收者类型是(suspend R.() -> T)。
createCoroutineUnintercepted在源代码中只是一个声明,它的具体实现是在IntrinsicsJvm.kt文件中。
public actual fun <T> (suspend () -> T).createCoroutineUnintercepted(
completion: Continuation<T>
): Continuation<Unit> {
val probeCompletion = probeCoroutineCreated(completion)
return if (this is BaseContinuationImpl)
create(probeCompletion)//这个协程体会被编译成 SuspendLambda 的子类,其也是 BaseContinuationImpl 的子类对象,因此会走create() 方法
else
createCoroutineFromSuspendFunction(probeCompletion) {
(this as Function1<Continuation<T>, Any?>).invoke(it)
}
}
3.4). 协程接口的一些具体实现
ContinuationImpl
BaseContinuationImpl
SuspendLambda
ContinuationImpl继承自BaseContinuationImpl
internal abstract class ContinuationImpl(
completion: Continuation<Any?>?,
private val _context: CoroutineContext?
) : BaseContinuationImpl(completion) {
constructor(completion: Continuation<Any?>?) : this(completion, completion?.context)
public override val context: CoroutineContext
get() = _context!!
@Transient
private var intercepted: Continuation<Any?>? = null
public fun intercepted(): Continuation<Any?> =
intercepted
?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
.also { intercepted = it }
protected override fun releaseIntercepted() {
val intercepted = intercepted
if (intercepted != null && intercepted !== this) {
context[ContinuationInterceptor]!!.releaseInterceptedContinuation(intercepted)
}
this.intercepted = CompletedContinuation // just in case
}
}
3.5). 协程调用的具体封装Job
3). 协程和线程一样, 有一个对象
start方法有封装么????
coroutine.start 的调用涉及到运算符重载,实际上会调到 CoroutineStart.invoke() 方法:
最后还是调用到了startCoroutine()
Job是干嘛的?
用来给调用者用的, 这样方便!
interface Job : CoroutineContext.Element {
companion object Key : CoroutineContext.Key<Job> // 重要的方法
override val key: CoroutineContext.Key<*> get() = Job // 通过key获取到job
val isActive: Boolean
fun invokeOnCancel(onCancel: OnCancel): Disposable
fun invokeOnCompletion(onComplete: OnComplete): Disposable
fun cancel()
fun remove(disposable: Disposable)
fun attachChild(child: Job): Disposable
suspend fun join()
}
3.6) 协程的状态: 未完成、已取消、已完成, 三种状态, 3个class继承CoroutineState类
sealed class CoroutineState {
protected var disposableList: RecursiveList<Disposable> = RecursiveList.Nil
private set
protected var children: RecursiveList<Job> = RecursiveList.Nil
private set
fun from(state: CoroutineState): CoroutineState {
this.disposableList = state.disposableList
this.children = state.children
return this
}
class InComplete : CoroutineState() // 未完成
class Cancelling: CoroutineState() // 取消
class Complete<T>(val value: T? = null, val exception: Throwable? = null) : CoroutineState() // 完成
状态的原子性迁移
abstract class AbstractCoroutine<T>(context: CoroutineContext) : Job, Continuation<T>, CoroutineScope {
protected val state = AtomicReference<CoroutineState>() // 原子性
override fun resumeWith(result: Result<T>) {
val newState = state.updateAndGet { prevState -> // updateAndGet
when (prevState) {
//although cancelled, flows of job may work out with the normal result.
is CoroutineState.Cancelling,
is CoroutineState.InComplete -> prevState.tryComplete(result)
is CoroutineState.CompleteWaitForChildren<*>,
is CoroutineState.Complete<*> -> {
throw IllegalStateException("Already completed!")
}
}
}
数据结构:
sealed class RecursiveList<out T> {
object Nil: RecursiveList<Nothing>()
class Cons<T>(val head: T, val tail: RecursiveList<T>): RecursiveList<T>()
}
fun <T> RecursiveList<T>.remove(element: T): RecursiveList<T> {
return when(this){
RecursiveList.Nil -> this
is RecursiveList.Cons -> {
if(head == element){
return tail
} else {
RecursiveList.Cons(head, tail.remove(element))
}
}
}
}
3.8) 协程的状态:协程的启动封装
启动返回值: Job
fun CoroutineScope.launch(context: CoroutineContext = EmptyCoroutineContext, block: suspend CoroutineScope.() -> Unit): Job {
val completion = StandaloneCoroutine(newCoroutineContext(context))
block.startCoroutine(completion, completion)
return completion
}
这样开启的
public fun <R, T> (suspend R.() -> T).startCoroutine(
receiver: R,
completion: Continuation<T>
) {
createCoroutineUnintercepted(receiver, completion).intercepted().resume(Unit)
}
3.7) 协程完成的封装
完成回调注册
调用的地方在哪?
是不是协程的状态变化了, 就会调用????
从AbstractCoroutine调用到 CoroutineState
* 完成回调的注册
*/
protected fun doOnCompleted(block: (Result<T>) -> Unit): Disposable {
val disposable = CompletionHandlerDisposable(this, block)
val newState = state.updateAndGet { prev ->
when (prev) {
is CoroutineState.InComplete -> {
CoroutineState.InComplete().from(prev).with(disposable)
}
is CoroutineState.Cancelling -> {
CoroutineState.Cancelling().from(prev).with(disposable)
}
is CoroutineState.Complete<*> -> {
prev
}
is CoroutineState.CompleteWaitForChildren<*> -> prev.copy().with(disposable)
}
}
(newState as? CoroutineState.Complete<T>)?.let {
block(
when {
it.exception != null -> Result.failure(it.exception)
it.value != null -> Result.success(it.value)
else -> throw IllegalStateException("Won't happen.")
}
)
}
return disposable
}
/***
* 通知协程完成
*/
fun <T> notifyCompletion(result: Result<T>) {
this.disposableList.loopOn<CompletionHandlerDisposable<T>> {
it.onComplete(result)
}
}
private fun tryCompleteOnChildCompleted(child: Job) {
val newState = state.updateAndGet { prev ->
when (prev) {
is CoroutineState.Cancelling,
is CoroutineState.InComplete -> {
throw IllegalStateException("Should be waiting for children!")
}
is CoroutineState.CompleteWaitForChildren<*> -> {
prev.onChildCompleted(child)
}
is CoroutineState.Complete<*> -> throw IllegalStateException("Already completed!")
}
}
(newState as? CoroutineState.Complete<T>)?.let {
makeCompletion(it)
}
}
private fun makeCompletion(newState: CoroutineState.Complete<T>){
val result = if (newState.exception == null) {
Result.success(newState.value)
} else {
Result.failure<T>(newState.exception)
}
result.exceptionOrNull()?.let(this::tryHandleException)
newState.notifyCompletion(result)
newState.clear()
parentCancelDisposable?.dispose()
}
3.9) 协程调度器的设计
/**
* 调度器的接口定义
*/
interface Dispatcher {
fun dispatch(block: ()->Unit)
}
拦截器一般是干嘛的?
open class DispatcherContext(private val dispatcher: Dispatcher) : AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T>
= DispatchedContinuation(continuation, dispatcher)
}
/**
* 拦截器实现调度
*/
private class DispatchedContinuation<T>(val delegate: Continuation<T>, val dispatcher: Dispatcher) : Continuation<T>{
override val context = delegate.context
override fun resumeWith(result: Result<T>) {
dispatcher.dispatch {
delegate.resumeWith(result)
}
}
}
线程池:
/***
* 默认的调度器
*/
object DefaultDispatcher: Dispatcher {
private val threadGroup = ThreadGroup("DefaultDispatcher")
private val threadIndex = AtomicInteger(0)
private val executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() + 1) { runnable ->
Thread(threadGroup, runnable, "${threadGroup.name}-worker-${threadIndex.getAndIncrement()}").apply { isDaemon = true }
}
override fun dispatch(block: () -> Unit) {
executor.submit(block)
}
}
/**
* 调度器的入库
*/
object Dispatchers {
val Android by lazy {
DispatcherContext(AndroidDispatcher)
}
val Swing by lazy {
DispatcherContext(SwingDispatcher)
}
val Default by lazy {
DispatcherContext(DefaultDispatcher)
}
}
默认调度器
fun CoroutineScope.newCoroutineContext(context: CoroutineContext): CoroutineContext {
val combined = scopeContext + context + CoroutineName("@coroutine#${coroutineIndex.getAndIncrement()}")
return if (combined !== Dispatchers.Default && combined[ContinuationInterceptor] == null) combined + Dispatchers.Default else combined
}
class CoroutineName(val name: String): CoroutineContext.Element {
companion object Key: CoroutineContext.Key<CoroutineName>
override val key = Key
override fun toString(): String {
return name
}
}
3.9) 协程取消:
/**
* cancel函数的实现
*/
override fun cancel() {
val prevState = state.getAndUpdate { prev ->
when (prev) {
is CoroutineState.InComplete -> {
CoroutineState.Cancelling().from(prev)
}
is CoroutineState.Cancelling,
is CoroutineState.Complete<*> -> prev
is CoroutineState.CompleteWaitForChildren<*> -> {
prev.copy(isCancelling = true)
}
}
}
if (prevState is CoroutineState.InComplete) {
prevState.notifyCancellation()
}
parentCancelDisposable?.dispose()
}
/**
* 支持取消回调的注册
*/
override fun invokeOnCancel(onCancel: OnCancel): Disposable {
val disposable = CancellationHandlerDisposable(this, onCancel)
val newState = state.updateAndGet { prev ->
when (prev) {
is CoroutineState.InComplete -> {
CoroutineState.InComplete().from(prev).with(disposable)
}
is CoroutineState.Cancelling,
is CoroutineState.Complete<*> -> {
prev
}
is CoroutineState.CompleteWaitForChildren<*> -> prev.copy().with(disposable)
}
}
(newState as? CoroutineState.Cancelling)?.let {
// call immediately when Cancelling.
onCancel()
}
return disposable
}
取消的核心类: CancellableContinuation()
异常处理器
interface CoroutineExceptionHandler : CoroutineContext.Element {
companion object Key : CoroutineContext.Key<CoroutineExceptionHandler>
fun handleException(context: CoroutineContext, exception: Throwable)
}
inline fun CoroutineExceptionHandler(crossinline handler: (CoroutineContext, Throwable) -> Unit): CoroutineExceptionHandler =
object : AbstractCoroutineContextElement(CoroutineExceptionHandler), CoroutineExceptionHandler {
override fun handleException(context: CoroutineContext, exception: Throwable) =
handler.invoke(context, exception)
}
重要的一些类:
协程的创建
重要的类:
Continuatio
协程的启动
SuspendLambda
BaseContinuationImpl
SafeContinuatio
启动方法:launchCoroutin()
挂起:
CoroutineContext
Element
Element接口中有一个属性key,这个属性很关键
拦截器 :
ContinuationInterceptor
方法 transfer()
协程体的Receiver?