开启生长之旅!这是我参与「日新计划 12 月更文应战」的第30天,点击检查活动概况
在剖析Kotlin协程—协程的并发中用Actor
也完成了并发安全,而Actor是本质是基于Channel管音讯完成的,所以Channel是一个线程安全的数据管道,依据Channel的这个安全特性它最常见的用法是建立CSP通讯模型。
CSP通讯模型(Communicating Sequential Processes),是一种形式化言语,用于描述并发体系中的交互模式。CSP 允许依据独立运行的组件进程来描述体系,而且仅经过音讯传递通讯相互交互。
1.为什么能够经过Channel完成CSP通讯模型?
剖析Channel完成CSP通讯模型首先要从Channel的数据结构下手,先来看一段demo
fun main() = runBlocking {
val scope = CoroutineScope(Job())
// 1,创立管道
val channel = Channel<Int>()
scope.launch {
// 2,在一个单独的协程傍边发送管道音讯
repeat(3) {
channel.send(it)
println("send: $it")
}
channel.close()
}
scope.launch {
// 3,在一个单独的协程傍边接收管道音讯
repeat(3) {
val result = channel.receive()
println("result $result")
}
}
println("end")
Thread.sleep(2000000L)
}
//输出成果:
//end
//result 0
//send: 0
//result 1
//send: 1
//result 2
//send: 2
代码分为三个部分,创立Channel、经过channel发送数据,经过channel接收数据。
2.Channel的数据结构
在注释1这儿创立了一个Channel。Channel是一个接口,一起它还完成了SendChannel
和ReceiveChannel
,可是注释1中创立时调用的Channel其实是一个一般的顶层函数,它的效果是一个结构函数,所以首字母是大写的,在前面的Kotlin协程—CoroutineScope是怎么办理协程的也是提到过的。
public interface Channel<E> : SendChannel<E>, ReceiveChannel<E> {
...
public fun <E> Channel(
capacity: Int = RENDEZVOUS,
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND,
onUndeliveredElement: ((E) -> Unit)? = null
): Channel<E> =
when (capacity) {
RENDEZVOUS -> {
if (onBufferOverflow == BufferOverflow.SUSPEND)
RendezvousChannel(onUndeliveredElement) // 一种有用的交会通道的完成
else
ArrayChannel(1, onBufferOverflow, onUndeliveredElement) // 经过缓冲通道支撑缓冲区溢出
}
CONFLATED -> {
require(onBufferOverflow == BufferOverflow.SUSPEND) {
"CONFLATED capacity cannot be used with non-default onBufferOverflow"
}
ConflatedChannel(onUndeliveredElement)
}
UNLIMITED -> LinkedListChannel(onUndeliveredElement) // 疏忽onBufferOverflow:它有缓冲区,但它永久不会溢出
BUFFERED -> ArrayChannel( // 运用默许容量与SUSPEND
if (onBufferOverflow == BufferOverflow.SUSPEND) CHANNEL_DEFAULT_CAPACITY else 1,
onBufferOverflow, onUndeliveredElement
)
else -> {
if (capacity == 1 && onBufferOverflow == BufferOverflow.DROP_OLDEST)
ConflatedChannel(onUndeliveredElement) // 合并完成效率更高,但工作方法似乎相同
else
ArrayChannel(capacity, onBufferOverflow, onUndeliveredElement)
}
}
}
经过Channel函数中能够发现它的核心逻辑便是一个when表达式,依据传入的参数创立不同的Channel实例:RendezvousChannel
、ArrayChannel
、ConflatedChannel
、LinkedListChannel
,而且这几个Channel的父类都是AbstractChannel
。AbstractChannel
的父类又是AbstractSendChannel
。
/**
* 它是一切发送通道完成的基类。
*/
internal abstract class AbstractSendChannel<E>(
@JvmField protected val onUndeliveredElement: OnUndeliveredElement<E>?
) : SendChannel<E> {
protected val queue = LockFreeLinkedListHead()
...
//抽象的发送/接收通道。它是一切通道完成的基类。
internal abstract class AbstractChannel<E>(
onUndeliveredElement: OnUndeliveredElement<E>?
) : AbstractSendChannel<E>(onUndeliveredElement), Channel<E> {
}
}
AbstractSendChannel
类中还有一个变量val queue = LockFreeLinkedListHead()
,Channel的核心逻辑便是依靠它完成的。
public actual open class LockFreeLinkedListHead : LockFreeLinkedListNode() {
public actual val isEmpty: Boolean get() = next === this
}
public actual open class LockFreeLinkedListNode {
private val _next = atomic<Any>(this) // Node | Removed | OpDescriptor
private val _prev = atomic(this) // Node to the left (cannot be marked as removed)
private val _removedRef = atomic<Removed?>(null) // lazily cached removed ref to this
}
LockFreeLinkedListHead 其实继承自 LockFreeLinkedListNode,而 LockFreeLinkedListNode 则是完成 Channel 核心功用的关键数据结构。
看到LinkedList根本能够确认这是一个链表,而LockFreeLinkedListHead是一个循环双向链表,LockFreeLinkedListHead是一个岗兵节点。这个节点本身不会用于存储任何数据,它的 next 指针会指向整个链表的头节点,而它的 prev 指针会指向整个链表的尾节点。
岗兵节点是为了简化处理链表边界条件而引入的附加链表节点,岗兵节点一般位于链表头部它的值没有任何意义,在一个有岗兵节点的链表中,从第二个节点开端才真实保存有意义的信息。
这儿先用两张图来别离标识空链表和有元素的链表的结构
链表为空的时候LockFreeLinkedListHead
的next指针和prev指针都指向本身的,这就意味着这个节点是不会存储数据的,也是不会被删去的。
链表中有2个或两个以上的元素时LockFreeLinkedListHead
的next指针才是第一个节点,prev则指向尾结点。
寻常的循环双向链表是能够在首尾增加元素的,一起也支撑“正向遍历、逆向遍历”的。而Channel内部的数据结构只能在结尾增加,遍历顺序则是从队头开端,这样的设计就让他的行为变成了先进先出单向行列的一起还完成了队尾增加操作,只需要O(1)的时间复杂度。
能够说,正是由于 LockFreeLinkedList 这个数据结构,才能运用 Channel 完成 CSP 通讯模型。
3.发送和接收的流程
前面的demo中启动了两个协程,在这两个协程中发送了三次数据和接收了三次数据,程序首先会履行send()
,可是由于Channel默许状况下容量是0,所以send()
首先会被挂起。
public final override suspend fun send(element: E) {
//1
if (offerInternal(element) === OFFER_SUCCESS) return
//2
return sendSuspend(element)
}
/**
* 测验将元素增加到缓冲区或行列接收方
*/
protected open fun offerInternal(element: E): Any {
while (true) {
//3
val receive = takeFirstReceiveOrPeekClosed() ?: return OFFER_FAILED
val token = receive.tryResumeReceive(element, null)
if (token != null) {
assert { token === RESUME_TOKEN }
receive.completeResumeReceive(element)
return receive.offerResult
}
}
}
private suspend fun sendSuspend(element: E): Unit = suspendCancellableCoroutineReusable sc@ { cont ->
loop@ while (true) {
if (isFullImpl) {
//4
val send = if (onUndeliveredElement == null)
SendElement(element, cont) else
SendElementWithUndeliveredHandler(element, cont, onUndeliveredElement)
val enqueueResult = enqueueSend(send)
when {
enqueueResult == null -> { // enqueued successfully
//5
cont.removeOnCancellation(send)
return@sc
}
enqueueResult is Closed<*> -> {
cont.helpCloseAndResumeWithSendException(element, enqueueResult)
return@sc
}
enqueueResult === ENQUEUE_FAILED -> {} // try to offer instead
enqueueResult is Receive<*> -> {} // try to offer instead
else -> error("enqueueSend returned $enqueueResult")
}
}
// hm... receiver is waiting or buffer is not full. try to offer
val offerResult = offerInternal(element)
when {
offerResult === OFFER_SUCCESS -> {
cont.resume(Unit)
return@sc
}
offerResult === OFFER_FAILED -> continue@loop
offerResult is Closed<*> -> {
cont.helpCloseAndResumeWithSendException(element, offerResult)
return@sc
}
else -> error("offerInternal returned $offerResult")
}
}
}
- 注释1:测验向Channel发送数据,在有消费这的状况下if判别便是true,那么此时就会return,可是如果是第一次调用Channel就不会有顾客,所以
offerInternal
函数中想要从行列中取出顾客是不可能的(注释3),所以第一次调用就进入了sendSuspend
函数。 - 注释2:这个挂起函数是由高阶函数
suspendCancellableCoroutineReusable
完成的,这个高阶函数的效果便是露出挂起函数的Continuation
目标。 - 注释4:这儿的效果便是把发送的元素封装成
SendElement
目标,然后调用enqueueSend()
办法将其增加到LockFreeLinkedList
行列的结尾,增加成功则进入注释5开端履行。 - 注释5:这儿便是将
SendElement
从行列中删去。
到这儿应该会有一个疑问:send()
函数一直是被挂起的啊,它会在什么时候康复呢?
答案便是接收数据的时候:receive()
。
public final override suspend fun receive(): E {
// 1
val result = pollInternal()
@Suppress("UNCHECKED_CAST")
if (result !== POLL_FAILED && result !is Closed<*>) return result as E
// 2
return receiveSuspend(RECEIVE_THROWS_ON_CLOSE)
}
protected open fun pollInternal(): Any? {
while (true) {
//3
val send = takeFirstSendOrPeekClosed() ?: return POLL_FAILED
val token = send.tryResumeSend(null)
if (token != null) {
assert { token === RESUME_TOKEN }
//4
send.completeResumeSend()
return send.pollResult
}
send.undeliveredElement()
}
}
private suspend fun <R> receiveSuspend(receiveMode: Int): R = suspendCancellableCoroutineReusable sc@ { cont ->
val receive = if (onUndeliveredElement == null)
ReceiveElement(cont as CancellableContinuation<Any?>, receiveMode) else
ReceiveElementWithUndeliveredHandler(cont as CancellableContinuation<Any?>, receiveMode, onUndeliveredElement)
while (true) {
if (enqueueReceive(receive)) {
removeReceiveOnCancel(cont, receive)
return@sc
}
// hm... something is not right. try to poll
val result = pollInternal()
if (result is Closed<*>) {
receive.resumeReceiveClosed(result)
return@sc
}
if (result !== POLL_FAILED) {
cont.resume(receive.resumeValue(result as E), receive.resumeOnCancellationFun(result as E))
return@sc
}
}
}
- 注释1:测验从 LockFreeLinkedList 行列傍边找出是否有正在被挂起的发送方;
- 注释2:当 LockFreeLinkedList 行列傍边没有正在挂起的发送方时,它会履行
receiveSuspend()
,而 receiveSuspend() 也相同会被挂起,这个逻辑跟前面的sendSuspend()
是相似的,先封装成receiveElement
目标并增加行列的结尾,如果增加成功的话这个receiveSuspend()
就会持续被挂起,这也就意味着receive()
也会被挂起,而它的康复机遇则是在offerInternal()
中; - 注释3:在
send()
流程中这一步是将发送的元素封装成SendElement
目标,那么在receive()
中这一步是取出封装好的SendElement
目标,然后调用注释4; - 注释4:这儿其实调用的是
override fun completeResumeSend() = cont.completeResume(RESUME_TOKEN)
这儿的cont
其实是continuation
,前面聊过,它是一个状态机,cont.completeResume(RESUME_TOKEN)
会回调CancellableContinuationImpl
中的completeResume
override fun completeResume(token: Any) {
assert { token === RESUME_TOKEN }
dispatchResume(resumeMode)
}
private fun dispatchResume(mode: Int) {
if (tryResume()) return
dispatch(mode)
}
//DispatchedTask
internal fun <T> DispatchedTask<T>.dispatch(mode: Int) {
assert { mode != MODE_UNINITIALIZED } // invalid mode value for this method
val delegate = this.delegate
val undispatched = mode == MODE_UNDISPATCHED
if (!undispatched && delegate is DispatchedContinuation<*> && mode.isCancellableMode == resumeMode.isCancellableMode) {
// dispatch directly using this instance's Runnable implementation
val dispatcher = delegate.dispatcher
val context = delegate.context
if (dispatcher.isDispatchNeeded(context)) {
dispatcher.dispatch(context, this)
} else {
resumeUnconfined()
}
} else {
// delegate is coming from 3rd-party interceptor implementation (and does not support cancellation)
// or undispatched mode was requested
resume(delegate, undispatched)
}
}
终究调用dispatch(mode)
,而 dispatch(mode)
其实便是 DispatchedTask 的 dispatch()
,而这个dispatch()
便是协程体傍边的代码在线程履行的机遇,终究,它会履行在 Java 的 Executor 之上。至此,之前被挂起的 send() 办法,就算是康复了。
4.总结:
- Channel是一个线程安全的管道,常见用法是完成CSP通讯模型,它之所以能够用这种方法来完成CSP通讯模型首要还是由于底层的数据结构是
LockFreeLinkedList
; -
LockFreeLinkedList
是一个双向循环的链表可是在Channel源码中会被作为先进先出的单向行列,只是在行列结尾刺进节点,且只会正向遍历; - Channel的发送分为两种状况,一种是当时行列中现已存在发送方,这时候
send()
会康复 Receive 节点的履行,并将数据发送给对方;另一种状况是当时行列中不存在发送方,便是初次发送,则会挂起并将元素封装成SendElement
后增加到LockFreeLinkedList
行列的结尾,等候被receive()
康复履行; - Channel的接收分为两种状况,一种是当时行列中现已存在被挂起的发送方,这时候
receive()
会康复Send节点的履行,而且取出Send节点带过来的数据 ; 另一种状况是当时行列中不存在被挂起的发送方,那么这时候receive()
就会被挂起,一起会封装元素为ReceiveElement
并增加到行列的结尾,等候被下一个send()
康复履行。