在看 flow 相关的源码,总是看到 FusibleFlow、ChannelFlow、ChannelFlowOperatorImpl 等内容,所以把 Channel 相关内容代码都过一遍
// flow 的操作符 public fun <T> Flow<T>.flowOn(context: CoroutineContext): Flow<T> { checkFlowContext(context) return when { context == EmptyCoroutineContext -> this this is FusibleFlow -> fuse(context = context) else -> ChannelFlowOperatorImpl(this, context = context) } }
Channel是什么
Kotlin
版的 BlockingQueue
, 我的个人理解是支撑在协程环境下的运用的队列数据结构
根据 capacity
和 onBufferOverflow
获取不同 Channel
实例
public fun <E> Channel(
capacity: Int = RENDEZVOUS,
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND,
onUndeliveredElement: ((E) -> Unit)? = null
): Channel<E> =
when (capacity) {
RENDEZVOUS -> {
if (onBufferOverflow == BufferOverflow.SUSPEND)
// step1.1: 溢出战略为挂起的时分,无须缓存区,挂起就好
RendezvousChannel(onUndeliveredElement)
else
// step1.2: 溢出丢掉战略,设置一个缓存区寄存
ArrayChannel(1, onBufferOverflow, onUndeliveredElement)
}
// step2: 设置兼并容量形式,由于会自动扔掉最老的数据, 其实我感觉这儿只生效 DROP_OLDEST, 要么不判别,要么判别全
CONFLATED -> {
require(onBufferOverflow == BufferOverflow.SUSPEND) {
"CONFLATED capacity cannot be used with non-default onBufferOverflow"
}
ConflatedChannel(onUndeliveredElement)
}
// step3: 设置无限制形式,底层运用链表完结,由于容量无限制,相对应的溢出也不存在了
UNLIMITED -> LinkedListChannel(onUndeliveredElement)
// step4: 设置缓存形式,假如为挂起战略则容量为64(默许), 反之容量为1,由于剩余2种溢出战略不是丢掉最新的就是丢掉最老的,所以容量1足够
BUFFERED -> ArrayChannel(
if (onBufferOverflow == BufferOverflow.SUSPEND) CHANNEL_DEFAULT_CAPACITY else 1,
onBufferOverflow, onUndeliveredElement
)
// step5: 设置指定容量的的时分,创立 ArrayChannel
else -> {
if (capacity == 1 && onBufferOverflow == BufferOverflow.DROP_OLDEST)
step5.1: 与 CONFLATED相同直接复用
ConflatedChannel(onUndeliveredElement)
else
ArrayChannel(capacity, onBufferOverflow, onUndeliveredElement)
}
}
- Tips: 其实容量还有一个特别值为
OPTIONAL_CHANNEL
, 是ChannelFlow
内部运用的,代表可选择是否运用Channel
,即能不必就不必;场景为假如非ContinuationInterceptor
上下文切换 而且 无buffer 的时分,不需要Channel
的介入
class ChannelFlowOperator{
// ...
override suspend fun collect(collector: FlowCollector<T>) {
// step1: 假如channel是可选的且没有buffer(flowOn/flowWith操作符)
if (capacity == Channel.OPTIONAL_CHANNEL) {
val collectContext = coroutineContext
val newContext = collectContext + context
// step2: 没有上下文产生改变,则开始原始流搜集
if (newContext == collectContext)
return flowCollect(collector)
// step3: 不需要改变协程的线程环境
if (newContext[ContinuationInterceptor] == collectContext[ContinuationInterceptor])
return collectWithContextUndispatched(collector, newContext)
}
// step4: 创立 channel 开始搜集
super.collect(collector)
}
// ...
}
让 Channel
转成 Flow
: 意图让 Channel
具备 Flow
的运算符
Channel
转换 flow
有2种完结方法,实质是同一种完结方法,仅仅是否可以被多次 collect
-
ReceiveChannel#consumeAsFlow()
: 只能被collect
一次,多次collect
时分会抛反常; -
ReceiveChannel#receiveAsFlow
: 能被collect
多次,但Channel
中的一条消息只能被一个collector
所消费, 俗称扇形消费
public fun <T> ReceiveChannel<T>.receiveAsFlow(): Flow<T> = ChannelAsFlow(this, consume = false)
public fun <T> ReceiveChannel<T>.consumeAsFlow(): Flow<T> = ChannelAsFlow(this, consume = true)
private class ChannelAsFlow<T>(
private val channel: ReceiveChannel<T>,
private val consume: Boolean,
context: CoroutineContext = EmptyCoroutineContext,
capacity: Int = Channel.OPTIONAL_CHANNEL,
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
) : ChannelFlow<T>(context, capacity, onBufferOverflow) {
private val consumed = atomic(false)
private fun markConsumed() {
if (consume) {
check(!consumed.getAndSet(true)) { "ReceiveChannel.consumeAsFlow can be collected just once" }
}
}
// ...
override suspend fun collect(collector: FlowCollector<T>) {
// step1: 从 receiveAsFlow 或许 consumeAsFlow 构造可知,只会进入此分支
if (capacity == Channel.OPTIONAL_CHANNEL) {
// step2: 检查是否消费过
markConsumed()
// step3: 把 channel 的交给 collector 搜集
collector.emitAllImpl(channel, consume)
} else {
super.collect(collector)
}
}
// ...
}
private suspend fun <T> FlowCollector<T>.emitAllImpl(channel: ReceiveChannel<T>, consume: Boolean) {
//step4: 检查协程是否正常
ensureActive()
var cause: Throwable? = null
try {
while (true) {
// step5: 获取 channel 中队头 => 由这儿知道当时值被此 collector 消费,其他 collector 就获取不到了
val result = run { channel.receiveCatching() }
// step6: channel关闭的,则跳出循环
if (result.isClosed) {
result.exceptionOrNull()?.let { throw it }
break
}
// step7: 未关闭,弹出获取的值
emit(result.getOrThrow())
}
} catch (e: Throwable) {
cause = e
throw e
} finally {
// step8: 撤销 channel
if (consume) channel.cancelConsumed(cause)
}
}
BroadcastChannel
: 广播Channel(现已被 StatedFlow
和 SharedFlow
替换)
-
当容量
capacity
设置为Conflated
, 实际会生成ConflatedBroadcastChannel
, 相当于溢出的时分每次丢掉最老的,而且内部只要一个状况存储值, 等价于StatedFlow
-
当容量
capacity
设置为大于0的时分,实际生成ArrayBroadcastChannel
, 彻底被SharedFlow
功用替代,而且还多了replay
功用
ChannelFlow: 意图扩展Flow
功用,比方切换协程履行上下文、增加buffer等
FusibleFlow
: 意图是提供一个融合方法,可以返回一个进行本身上下文的切换、容量和溢出战略装备后的流
public interface FusibleFlow<T> : Flow<T> {
public fun fuse(
context: CoroutineContext = EmptyCoroutineContext,
capacity: Int = Channel.OPTIONAL_CHANNEL,
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
): Flow<T>
}
ChannelFlow
: 继承与FusibleFlow
ChannelFlow
其实算 Channel
场景的一个应用;Channel
本身提供了 capacity
和 onBufferOverflow
装备,可以装备当作背压来运用;一起 Channel
本身也是一个出产消费者模型的数据结构,可以在其他协程出产,在 collector
协程进行消费;所以 Kotlin
官方运用 Channel 来完结 flowOn
、buffer
流操作符的完结
则最初所贴的代码就好理解了
public fun <T> Flow<T>.flowOn(context: CoroutineContext): Flow<T> {
checkFlowContext(context)
return when {
// step1: 假如没有上下文切换,则返回本身
context == EmptyCoroutineContext -> this
// step2: 假如本身支撑切换的话,则调用本身方法进行切换
this is FusibleFlow -> fuse(context = context)
// step3: ChannelFlowOperatorImpl 继承 ChannelFlow,ChannelFlow 继承 FusibleFlow,走到此分支,则代表是一个普通流,运用 ChanelFlowOperatorImpl 进行包装切换上下文
else -> ChannelFlowOperatorImpl(this, context = context)
}
}
ChannelFlowOperatorImpl
源码解析: 从 ChannelFlowOperatorImpl#collect
为起点
internal class ChannelFlowOperatorImpl<T>(
flow: Flow<T>,
context: CoroutineContext = EmptyCoroutineContext,
capacity: Int = Channel.OPTIONAL_CHANNEL,
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
) : ChannelFlowOperator<T, T>(flow, context, capacity, onBufferOverflow) {
internal val collectToFun: suspend (ProducerScope<T>) -> Unit
get() = { collectTo(it) }
// Tips: 从上述 flowOn 可知, 不会对流进行无限包装,则返回原始流即可
override fun dropChannelOperators(): Flow<T> = flow
// step1: 开始搜集内容
override suspend fun collect(collector: FlowCollector<T>): Unit =
coroutineScope {
val channel: ReceiveChannel<T> = produceImpl(this)
// step10: 传入的 collector 接受 channel 传过来的内容
collector.emitAll(channel)
}
public open fun produceImpl(scope: CoroutineScope): ReceiveChannel<T> =
// step2: 发动一个新的协程去履行 collectToFun,即 collectTo
scope.produce(context, produceCapacity, onBufferOverflow, start = CoroutineStart.ATOMIC, block = collectToFun)
protected override suspend fun collectTo(scope: ProducerScope<T>) =
// step8: 创立 collector 进行初始流搜集
flowCollect(SendingCollector(scope))
override suspend fun flowCollect(collector: FlowCollector<T>) =
flow.collect(collector)
}
public class SendingCollector<T>(
private val channel: SendChannel<T>
) : FlowCollector<T> {
// step9: 把初始流的内容发射到 channel 中
override suspend fun emit(value: T): Unit = channel.send(value)
}
internal fun <E> CoroutineScope.produce(
context: CoroutineContext = EmptyCoroutineContext,
capacity: Int = 0,
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND,
start: CoroutineStart = CoroutineStart.DEFAULT,
onCompletion: CompletionHandler? = null,
@BuilderInference block: suspend ProducerScope<E>.() -> Unit
): ReceiveChannel<E> {
// step3: 按照容量和溢出战略装备channel
val channel = Channel<E>(capacity, onBufferOverflow)
// step4: 取得新的协程环境
val newContext = newCoroutineContext(context)
// step5: channel 和 协程上下文
val coroutine = ProducerCoroutine(newContext, channel)
// step6: 注册协程完结回调
if (onCompletion != null) coroutine.invokeOnCompletion(handler = onCompletion)
// step7: 发动协程,会履行 ChannelFlowOperator#collectToFun => ChannelFlowOperator#collectTo
coroutine.start(start, coroutine, block)
return coroutine
}