本文首要剖析了冷流 和 暖流 的相关完成原理,原理逻辑长而杂乱。特别是触及暖流 SharedFlow 相关完成原理时,逻辑更是笼统,了解比较困难。本文比较长,建议依据目录挑选分段阅览,能够先看根底概念和冷流,再分别看暖流 SharedFlow 和 StateFlow

阅览本文时,能够带着以下问题去思考:

  1. 冷流和暖流 指的是什么?
  2. 在事务开发中,冷流和暖流能够用来做什么或许解决什么问题?
  3. 冷流和暖流 的差异是什么?
  4. 冷流的履行原理是什么?
  5. 暖流 SharedFlow 对它发射的数据是怎么办理的?
  6. 暖流 SharedFlow 对它的订阅者是怎么办理的?
  7. 暖流 StateFlow 和 LiveData 有什么差异?

技术都是为事务服务的,不管是冷流仍是暖流,它们都需求解决事务开发中实践的问题,比如:

  • 协程和冷流能够替换 RxJava 结构进行呼应式编程,在Kotlin 项目中,运用协程和冷流比运用 RxJava 更有优势;
  • 暖流 SharedFlow 能够用来做事情总线,替换 EventBus
  • 暖流 StateFlow 能够用来做事情状况更新,替换 LiveData,并结合 MVI 替换 MVVM

假如本文中出现错误,会及时纠正。欢迎阅览。

根底概念

从上一篇文章:Kotlin Flow 探索,知道 Kotlin Flow 便是 Kotlin 数据流,而数据流需求包含供给方(出产者),中介(中心操作),运用方(顾客):

  • 供给方(出产者):源数据,将数据增加到数据流中;
  • 中介(中心操作):能够修正发送到数据流的值,或修正数据流本身;
  • 运用方(顾客):成果数据,运用数据流中的值。
flow
  .operator1()
  .operator2()
  .operator3()
  .collect(consumer)

创立一个数据流,能够运用 Kotlin 扩展函数 flowOfasFlow , flow{}

 flowOf(1, 2, 3).map { it * it }.collect {}
 (1..3).asFlow().map { it % 2 == 0 }.collect {}
 flow<Int> {
                emit(1)
                emit(2)
                emit(3)
            }.map { it * 2 }.collect {}

在上面创立数据流的方式中,有必要要有运用方(顾客),也便是搜集器 collect{}时,中心操作才会履行,这一点和 Kotlin Sequences 一样。

现在,除了上面的创立数据流方式,还能够运用 SharedFlowStateFlow


class TestFlow {
    private val _sharedFlow = MutableSharedFlow<Int>(
        replay = 0,
        extraBufferCapacity = 0,
        onBufferOverflow = BufferOverflow.SUSPEND
    )
    val sharedFlow: SharedFlow<Int> = _sharedFlow
    fun testSharedFlow() {
        MainScope().launch {
            Log.e("Flow", "sharedFlow:emit 1")
            _sharedFlow.emit(1)
            Log.e("Flow", "sharedFlow:emit 2")
            _sharedFlow.emit(2)
        }
    }
    private val _stateFlow = MutableStateFlow<Int>(value = 1)
    val stateFlow: SharedFlow<Int> = _stateFlow
    fun testStateFlow() {
        MainScope().launch {
            _stateFlow.value = 1
        }
    }
}

在运用SharedFlowStateFlow时创立数据流时,能够没有或许有多个搜集器 collect{} ,它独立于搜集器 collect{}存在,而且不会被asList,asSet等顾客中止。

testFlow.testSharedFlow()
testFlow.testStateFlow()
控制台输出成果:
Flow                    com.wangjiang.example                E  sharedFlow:emit 1
Flow                    com.wangjiang.example                E  sharedFlow:emit 2
Flow                    com.wangjiang.example                E  stateFlow:value 1

能够看到,没有搜集器 collect{}的时分,ShareFlow 和 StateFlow 仍是履行了。下面增加搜集器 collect{},再看一下:

        lifecycleScope.launch {
            testFlow.sharedFlow.collect {
                Log.e("Flow", "SharedFlow Collect1: value=$it")
            }
        }
        lifecycleScope.launch {
            testFlow.sharedFlow.collect {
                Log.e("Flow", "SharedFlow Collect2: value=$it")
            }
        }
        testFlow.testSharedFlow()
        lifecycleScope.launch {
            testFlow.stateFlow.collect {
                Log.e("Flow", "StateFlow Collect1: value=$it")
            }
        }
        lifecycleScope.launch {
            testFlow.stateFlow.collect {
                Log.e("Flow", "StateFlow Collect2: value=$it")
            }
        }
        testFlow.testStateFlow()
控制台输出成果:
Flow                    com.wangjiang.example                E  StateFlow Collect1: value=1
Flow                    com.wangjiang.example                E  StateFlow Collect2: value=1
Flow                    com.wangjiang.example                E  sharedFlow:emit 1
Flow                    com.wangjiang.example                E  SharedFlow Collect1: value=1
Flow                    com.wangjiang.example                E  SharedFlow Collect2: value=1
Flow                    com.wangjiang.example                E  sharedFlow:emit 2
Flow                    com.wangjiang.example                E  SharedFlow Collect1: value=2
Flow                    com.wangjiang.example                E  SharedFlow Collect2: value=2

关于SharedFlow ,它相似事情总线,将事情分发给事情订阅者,同享事情。关于StateFlow ,它相似 LiveData,更新事情最新状况,告知订阅者事情的更新。

现在关于冷流和暖流能够简略区分为:将运用 flowOfasFlow , flow{}等创立的数据流称为冷流,也便是运用 : Flow<T>创立的数据流,它不能独立于搜集器 collect{}存在,且每个数据流需求搜集器 collect{}才干称为一个完好的数据流;将运用: SharedFlow<T>: StateFlow<T> 创立的数据流称为暖流,它能独立于搜集器 collect{}存在,能够没有或多个搜集器 collect{}

冷流

Flow 是一种相似于序列的冷流 — 这段 flow 构建器中的代码直到流被搜集的时分才运转。

Flow 和序列一样,需求有完毕操作符,也便是有搜集器 collect{}asList,asSet等操作的时分,才运转:

        lifecycleScope.launch {
            val flow = flow {
                Log.e("Flow", "emit:1")
                emit(1)
                Log.e("Flow", "emit:2")
                emit(2)
            }.map {
                Log.e("Flow", "map:$it")
                it * it
            }
            flow.collect {
                Log.e("Flow", "collect:$it")
            }
        }
控制台输出成果:
Flow                    com.wangjiang.example                 E  emit:1
Flow                    com.wangjiang.example                 E  map:1
Flow                    com.wangjiang.example                 E  collect:1
Flow                    com.wangjiang.example                 E  emit:2
Flow                    com.wangjiang.example                 E  map:2
Flow                    com.wangjiang.example                 E  collect:4

当运用 collect{}时,开端履行数据出产:发射值 emit,再履行中心操作:map改换,再履行数据消费:collect。在整个过程中,数据流是按照时间顺序产生的,也便是emit:1map:1collect:1emit:2map:2collect:4,而不是emit:1emit:2map:1map:2collect:1collect:4

下面从一个示例来简略看下冷流的履行原理 :

class TestFlow {
    fun testColdFlow() {
        MainScope().launch {
            flow<Int> { emit(1) }.map { it * it }.collect {
                Log.e("Flow", "testColdFlow", Throwable())
            }
        }
    }
}

运转 testColdFlow 办法,控制台输出 collect 办法调用栈信息:

 Flow                    com.wangjiang.example                 E  testColdFlow
java.lang.Throwable
at com.wangjiang.example.flow.TestFlow$testColdFlow$1$3.emit(TestFlow.kt:46)
at com.wangjiang.example.flow.TestFlow$testColdFlow$1$3.emit(TestFlow.kt:45)
at com.wangjiang.example.flow.TestFlow$testColdFlow$1$invokeSuspend$$inlined$map$1$2.emit(Emitters.kt:224)
at kotlinx.coroutines.flow.internal.SafeCollectorKt$emitFun$1.invoke(SafeCollector.kt:15)
at kotlinx.coroutines.flow.internal.SafeCollectorKt$emitFun$1.invoke(SafeCollector.kt:15)
at kotlinx.coroutines.flow.internal.SafeCollector.emit(SafeCollector.kt:87)
at kotlinx.coroutines.flow.internal.SafeCollector.emit(SafeCollector.kt:66)
at com.wangjiang.example.flow.TestFlow$testColdFlow$1$1.invokeSuspend(TestFlow.kt:45)
at com.wangjiang.example.flow.TestFlow$testColdFlow$1$1.invoke(Unknown Source:14)
at com.wangjiang.example.flow.TestFlow$testColdFlow$1$1.invoke(Unknown Source:4)
at kotlinx.coroutines.flow.SafeFlow.collectSafely(Builders.kt:61)
at kotlinx.coroutines.flow.AbstractFlow.collect(Flow.kt:230)
at com.wangjiang.example.flow.TestFlow$testColdFlow$1$invokeSuspend$$inlined$map$1.collect(SafeCollector.common.kt:113)
at com.wangjiang.example.flow.TestFlow$testColdFlow$1.invokeSuspend(TestFlow.kt:45)
at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:106)
at android.os.Handler.handleCallback(Handler.java:900)
at android.os.Handler.dispatchMessage(Handler.java:103)
at android.os.Looper.loop(Looper.java:219)
at android.app.ActivityThread.main(ActivityThread.java:8668)
at java.lang.reflect.Method.invoke(Native Method)
at com.android.internal.os.RuntimeInit$MethodAndArgsCaller.run(RuntimeInit.java:513)
at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:1109)

从上面办法调用栈信息,能够看出大致顺序是:collectAbstractFlow.collectSafeFlow.collectSafelymapSafeCollector.emitemitTestFlow$testColdFlow$1$3.emit。(上面的日志和源码对应不上,这儿能够经过 AndroidStudio 查看 kotlin 的 class 文件。)

这儿重点来了解一下中心操作 map 改换:

import kotlinx.coroutines.flow.unsafeTransform as transform
public inline fun <T, R> Flow<T>.map(crossinline transform: suspend (value: T) -> R): Flow<R> = transform { value ->
    return@transform emit(transform(value))
}
internal inline fun <T, R> Flow<T>.unsafeTransform(
    @BuilderInference crossinline transform: suspend FlowCollector<R>.(value: T) -> Unit
): Flow<R> = unsafeFlow { // Note: unsafe flow is used here, because unsafeTransform is only for internal use
    collect { value ->
        // kludge, without it Unit will be returned and TCE won't kick in, KT-28938
        return@collect transform(value)
    }
}
internal inline fun <T> unsafeFlow(@BuilderInference crossinline block: suspend FlowCollector<T>.() -> Unit): Flow<T> {
    return object : Flow<T> {
        override suspend fun collect(collector: FlowCollector<T>) {
            collector.block()
        }
    }
}

从上面代码剖析,map 流程是:maptransformunsafeTransformunsafeFlow { }collect {}履行上一个 flow拿到上一个 flow 的成果@collect transform(value)@transform emit(transform(value))map改换操作成果到下一个 flow 或 顾客 collect

看到这个流程,没错,flow 冷流的履行流程是与 Kotin Sequence 履行流程的原理是相似的。

所以整个冷流的触发流程是能够简略概括为:顾客 collect 触发 中心操作,中心操作 filter, map 改换等 触宣布产者,然后出产者 emit 出产数据,然后将数据交给中心操作改换,最终再将改换后的数据交给顾客。这也便是冷流履行的原理。有点相似,从下往上触发,再从上往下活动的感觉

事务场景

关于 : Flow<T> 的事务场景,与运用 RxJava 相似,用于呼应式编程,例如:

 GlobalScope.launch(Dispatchers.Main) {
            flowOf(bitmap).map { bmp ->
                //在子线程中履行耗时操作,存储 bitmap 到本地
                saveBitmap(bmp)
            }.flowOn(Dispatchers.IO).collect { bitmapLocalPath ->
                //在主线程中处理存储 bitmap 后的本地路地址
            }
        }

在 Kotlin 项目中,能够运用 协程和冷流 替换 RxJava 来做呼应式编程。

总结

冷流需求有数据出产者、0或多个中心操作、数据顾客才干一同构建成为一个完好的流。它的履行原理相似 Kotin Sequence,当有顾客 collect 或其它终端操作时,流开端从下往上触发,然后从上往下活动。

暖流

暖流分为 SharedFlow 和 StateFlow,它们都是独立于搜集器的存在而存在。

SharedFlow

SharedFlow ,顾名思义,它被称作为暖流,首要在于它能让一切搜集器同享它所宣布的值,其间同享的方式是播送,且它的实例能够独立于搜集器的存在而存在。要了解 ShredFlow,就要了解它同享和独立存在的意义。

下面将从 SharedFlow 的创立,发摄 和 搜集来剖析。

创立

创立一个 SharedFlow 运用的是 SharedFlowKt 供给的 MutableSharedFlow 结构办法:

private val _sharedFlow = MutableSharedFlow<Int>(
        replay = 0,
        extraBufferCapacity = 0,
        onBufferOverflow = BufferOverflow.SUSPEND
    )
    val sharedFlow: SharedFlow<Int> = _sharedFlow

参数意义:

  • replay:新订阅者订阅时,从头发送多少个之前已宣布的值给新订阅者(相似粘性数据);
  • extraBufferCapacity:除了 replay 外,缓存的值数量,当缓存空间还有值时,emit 不会 suspend(emit 过快,collect 过慢,emit 的数据将被缓存起来);
  • onBufferOverflow:指定缓存区中已存满要发送的数据项时的处理战略(缓存区巨细由 replay 和 extraBufferCapacity 一同决议)。默认值为 BufferOverflow.SUSPEND,还能够是 BufferOverflow.DROP_LATEST 或 BufferOverflow.DROP_OLDEST(顾名思义)。
public fun <T> MutableSharedFlow(
    replay: Int = 0,
    extraBufferCapacity: Int = 0,
    onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
): MutableSharedFlow<T> {
    //.....省掉
    //缓存的值数量
    val bufferCapacity0 = replay + extraBufferCapacity
    val bufferCapacity = if (bufferCapacity0 < 0) Int.MAX_VALUE else bufferCapacity0
    return SharedFlowImpl(replay, bufferCapacity, onBufferOverflow)
}
internal open class SharedFlowImpl<T>(
    private val replay: Int,
    private val bufferCapacity: Int,
    private val onBufferOverflow: BufferOverflow
) : AbstractSharedFlow<SharedFlowSlot>(), MutableSharedFlow<T>, CancellableFlow<T>, FusibleFlow<T> {
    //.....省掉
}

MutableSharedFlow 结构办法回来的是一个 SharedFlowImpl 实例,下面来看一下关于 SharedFlowImpl 类相关的类和接口的简略关系:

Kotlin Flow 冷流和热流
从上往下,各个类或接口的职责是:

  • Flow 接口:用于流的消费操作,也便是订阅者订阅 collect,它供给 public suspend fun collect(collector: FlowCollector<T>) 接口办法,办法参数 collecter 便是冷流或暖流的搜集器,所以 Flow 接口依赖于 FlowCollecter 接口;
  • FlowCollecter 接口:用于流的搜集,能够是流的完毕操作或中心操作,它供给 public suspend fun emit(value: T)办法,办法参数 value 便是 数据出产者或许中心操作 emit 发送的值;
  • SharedFlow 接口:承继 Flow 接口,并界说了public val replayCache: List<T>特色,它表明给新订阅者的(replay个数)值缓存快照;
  • MutableSharedFlow 接口:承继 SharedFlow 和 FlowCollecter 接口,那么它就能够 collect(collecter: FlowCollecter<T>) 也能够 emit(value: T)
  • CancellableFlow 接口:承继 Flow 接口,是一个空接口,首要标记这个 Flow 是能够撤销的,也便是 SharedFlow 是能够撤销的;
  • CancellableFlowImpl 类:完成了 Flow 接口的 collect(collector: FlowCollector<T>) 办法;
  • FusibleFlow 接口:与 BufferOverflow 和 flowOn 操作结合起来一同作业;
  • AbstractSharedFlow<S : AbstractSharedFlowSlot<*>> 笼统类:担任对订阅者的办理,接纳 AbstractSharedFlowSlot,承继了 SynchronizedObject类;
  • SynchronizedObject 类:协程内部担任供给加锁 synchronized(lock: SynchronizedObject, block: () -> T) 办法参数 的 lock 锁目标;
  • AbstractSharedFlowSlot<SharedFlowImpl<*>> 笼统类:界说了 fun allocateLocked(flow: F): Booleanfun freeLocked(flow: F): Array<Continuation<Unit>?> 办法,分别表明订阅者相关的 SharedFlowSlot 的申请和开释;
  • SharedFlowSlot 类:承继 AbstractSharedFlowSlot 类,完成了 allocateLockedfreeLocked 笼统办法,并界说了 特色 var index = -1Lvar cont: Continuation<Unit>? = null,其间 index 表明将要在处理的数据在缓存数组中的索引,cont 表明用来保存等候新数据发送的订阅者的续体(包装订阅者);
  • BufferOverflow 枚举类:流中的缓存区溢出处理战略,枚举值 SUSPEND 表明发送或发送值的上游在缓存区已满时挂起,枚举值DROP_OLDEST 表明溢出时删除缓存区中最旧的值,将新值增加到缓存区,不要挂起,枚举值 DROP_LATEST 表明在缓存区溢出时删除当时增加到缓存区的最新值(以便缓存区内容坚持不变),不要挂起;
  • SharedFlowImpl 类:真实的 SharedFlow 完成类,承继了 AbstractSharedFlow<SharedFlowSlot> 笼统类 和 完成了MutableSharedFlow<T>, CancellableFlow<T>, FusibleFlow<T> 接口。

从上面的信息,创立一个 SharedFlow 后,它供给的才能,能够简略综合概括为:能够运用 emit 发射数据,发射时触及数据缓存,缓存溢出战略,是否会被挂起等问题;也能够运用 collect 订阅,订阅时触及订阅者的办理,数据获取,是否会被挂起等问题

发射

SharedFlowImpl 类完成了 FlowCollector接口的 emit 办法,当调用 emit 办法时:

  1. 假如有订阅者正在 collect 当时 SharedFlow,且 onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND,此调用才可能会被挂起;
  2. 假如没有订阅者正在 collect 当时 SharedFlow,则不会运用缓存区。假如 replay != 0,则最近宣布的值会简略地存储到 replay 缓存中,并替换 replay 缓存中旧的元素;假如 replay=0,则将最近宣布的值丢弃;
  3. emit 办法是被 suspend 修饰的,与它相关的还有一个 非 suspend 修饰的办法:tryEmit
  4. 此办法是线程安全的,能够在没有外部同步的情况下从并发协程中安全地调用。

调用 emit 办法可能会被挂起

首先,看下emit 办法的完成:

    override suspend fun emit(value: T) {
        if (tryEmit(value)) return // fast-path
        emitSuspend(value)
    }

emit 办法会不会被挂起,首要取决于 tryEmit(value) 办法回来值,假如回来 true,那么就不会履行 emitSuspend(value),也便是不会被挂起,不然履行 emitSuspend(value),emit 会被挂起。

下面先看一下 emit 办法 不会挂起和会挂起的比如:

不会挂起,调用的是 tryEmit 办法
class TestFlow {
    private val _sharedFlow = MutableSharedFlow<Int>(
        replay = 0,
        extraBufferCapacity = 1,
        onBufferOverflow = BufferOverflow.SUSPEND
    )
    val sharedFlow: SharedFlow<Int> = _sharedFlow
    fun testSharedFlow() {
        MainScope().launch {
            Log.e("Flow", "sharedFlow:emit 1")
            _sharedFlow.emit(1)
        }
    }
  }
lifecycleScope.launch {
  testFlow.sharedFlow.collect(object : FlowCollector<Int> {
        override suspend fun emit(value: Int) {
               Log.e("Flow", "SharedFlow Collect: value=$value", Throwable())
        }
   })
}
控制台输出日志:
Flow                    com.wangjiang.example                 E  sharedFlow:emit 1
Flow                    com.wangjiang.example                 E  SharedFlow Collect: value=1
java.lang.Throwable
at com.wangjiang.example.fragment.TestFlowFragment$initView$5$1.emit(TestFlowFragment.kt:76)
at com.wangjiang.example.fragment.TestFlowFragment$initView$5$1.emit(TestFlowFragment.kt:174)
at kotlinx.coroutines.flow.SharedFlowImpl.collect$suspendImpl(SharedFlow.kt:383)
at kotlinx.coroutines.flow.SharedFlowImpl$collect$1.invokeSuspend(Unknown Source:15)
at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
at kotlinx.coroutines.DispatchedTaskKt.resume(DispatchedTask.kt:234)
at kotlinx.coroutines.DispatchedTaskKt.resumeUnconfined(DispatchedTask.kt:190)
at kotlinx.coroutines.DispatchedTaskKt.dispatch(DispatchedTask.kt:161)
at kotlinx.coroutines.CancellableContinuationImpl.dispatchResume(CancellableContinuationImpl.kt:397)
at kotlinx.coroutines.CancellableContinuationImpl.resumeImpl(CancellableContinuationImpl.kt:431)
at kotlinx.coroutines.CancellableContinuationImpl.resumeImpl$default(CancellableContinuationImpl.kt:420)
at kotlinx.coroutines.CancellableContinuationImpl.resumeWith(CancellableContinuationImpl.kt:328)
at kotlinx.coroutines.flow.SharedFlowImpl.tryEmit(SharedFlow.kt:400)
at kotlinx.coroutines.flow.SharedFlowImpl.emit$suspendImpl(SharedFlow.kt:405)
at kotlinx.coroutines.flow.SharedFlowImpl.emit(Unknown Source:0)
at com.wangjiang.example.flow.TestFlow$testSharedFlow$1.invokeSuspend(TestFlow.kt:20)
at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:106)
at android.os.Handler.handleCallback(Handler.java:900)
at android.os.Handler.dispatchMessage(Handler.java:103)
at android.os.Looper.loop(Looper.java:219)
at android.app.ActivityThread.main(ActivityThread.java:8668)
at java.lang.reflect.Method.invoke(Native Method)
at com.android.internal.os.RuntimeInit$MethodAndArgsCaller.run(RuntimeInit.java:513)
at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:1109)

将上面 MutableSharedFlow 结构办法中的 extraBufferCapacity = 1 修正为 extraBufferCapacity = 0,其它坚持不变:

会挂起,调用的是 emitSuspend 办法
控制台输出日志:
Flow                    com.wangjiang.example                 E  sharedFlow:emit 1
Flow                    com.wangjiang.example                 E  SharedFlow Collect: value=1
java.lang.Throwable
at com.wangjiang.example.fragment.TestFlowFragment$initView$5$1.emit(TestFlowFragment.kt:76)
at com.wangjiang.example.fragment.TestFlowFragment$initView$5$1.emit(TestFlowFragment.kt:174)
at kotlinx.coroutines.flow.SharedFlowImpl.collect$suspendImpl(SharedFlow.kt:383)
at kotlinx.coroutines.flow.SharedFlowImpl$collect$1.invokeSuspend(Unknown Source:15)
at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
at kotlinx.coroutines.DispatchedTaskKt.resume(DispatchedTask.kt:234)
at kotlinx.coroutines.DispatchedTaskKt.resumeUnconfined(DispatchedTask.kt:190)
at kotlinx.coroutines.DispatchedTaskKt.dispatch(DispatchedTask.kt:161)
at kotlinx.coroutines.CancellableContinuationImpl.dispatchResume(CancellableContinuationImpl.kt:397)
at kotlinx.coroutines.CancellableContinuationImpl.resumeImpl(CancellableContinuationImpl.kt:431)
at kotlinx.coroutines.CancellableContinuationImpl.resumeImpl$default(CancellableContinuationImpl.kt:420)
at kotlinx.coroutines.CancellableContinuationImpl.resumeWith(CancellableContinuationImpl.kt:328)
at kotlinx.coroutines.flow.SharedFlowImpl.emitSuspend(SharedFlow.kt:504)
at kotlinx.coroutines.flow.SharedFlowImpl.emit$suspendImpl(SharedFlow.kt:406)
at kotlinx.coroutines.flow.SharedFlowImpl.emit(Unknown Source:0)
at com.wangjiang.example.flow.TestFlow$testSharedFlow$1.invokeSuspend(TestFlow.kt:20)
at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:106)
at android.os.Handler.handleCallback(Handler.java:900)
at android.os.Handler.dispatchMessage(Handler.java:103)
at android.os.Looper.loop(Looper.java:219)
at android.app.ActivityThread.main(ActivityThread.java:8668)
at java.lang.reflect.Method.invoke(Native Method)
at com.android.internal.os.RuntimeInit$MethodAndArgsCaller.run(RuntimeInit.java:513)
at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:1109)

上面两份日志差异在于:

  • emitsuspendImpltryEmitCancellableContinuationImpl.resumeWithDispatchedTaskKt.resumeTestFlowFragment$initView$5$1.emit
  • emitsuspendImplemitSuspendCancellableContinuationImpl.resumeWithDispatchedTaskKt.resumeTestFlowFragment$initView$5$1.emit

从输出日志对比,当 onBufferOverflow战略为BufferOverflow.SUSPEND时,假如缓存空间extraBufferCapacity有值,emit 不会被挂起,不然会被挂起。所以,现在能够综合猜测 onBufferOverflowextraBufferCapacity 的值会影响 tryEmit办法的回来值。

  override fun tryEmit(value: T): Boolean {
        var resumes: Array<Continuation<Unit>?> = EMPTY_RESUMES
        val emitted = synchronized(this) {
            if (tryEmitLocked(value)) {
                resumes = findSlotsToResumeLocked(resumes)
                true
            } else {
                false
            }
        }
        for (cont in resumes) cont?.resume(Unit)
        return emitted
    }
    private fun tryEmitLocked(value: T): Boolean {
        // Fast path without collectors -> no buffering
        if (nCollectors == 0) return tryEmitNoCollectorsLocked(value) // always returns true
        // With collectors we'll have to buffer
        // 假如缓存已满且订阅者消费慢时,不能直接给订阅者值
        if (bufferSize >= bufferCapacity && minCollectorIndex <= replayIndex) {
            when (onBufferOverflow) {
                BufferOverflow.SUSPEND -> return false // will suspend
                BufferOverflow.DROP_LATEST -> return true // just drop incoming
                BufferOverflow.DROP_OLDEST -> {} // force enqueue & drop oldest instead
            }
        }
        enqueueLocked(value)
        bufferSize++ // value was added to buffer
        // drop oldest from the buffer if it became more than bufferCapacity
        if (bufferSize > bufferCapacity) dropOldestLocked()
        // keep replaySize not larger that needed
        if (replaySize > replay) { // increment replayIndex by one
            updateBufferLocked(replayIndex + 1, minCollectorIndex, bufferEndIndex, queueEndIndex)
        }
        return true
    }

tryEmit 办法的回来值取决于 emitted, emitted 的值又取决于 tryEmitLocked 办法的回来值。tryEmitLocked 的回来值是否为 false 取决于:

if (bufferSize >= bufferCapacity && minCollectorIndex <= replayIndex) {
            when (onBufferOverflow) {
                BufferOverflow.SUSPEND -> return false // will suspend
                BufferOverflow.DROP_LATEST -> return true // just drop incoming
                BufferOverflow.DROP_OLDEST -> {} // force enqueue & drop oldest instead
            }
        }

字段: bufferSizebufferCapacityminCollectorIndexreplayIndex,它们都是 SharedFlowImp 的全局变量。

private class SharedFlowImpl<T>(
    private val replay: Int, //新订阅者订阅时,从头发送多少个之前已宣布的值给新订阅者
    private val bufferCapacity: Int, // replay+extraBufferCapacity,缓存容量
    private val onBufferOverflow: BufferOverflow //缓存溢出战略
) : AbstractSharedFlow<SharedFlowSlot>(), MutableSharedFlow<T>, CancellableFlow<T>, FusibleFlow<T> {
    /*
        Logical structure of the buffer 缓存逻辑结构
                  buffered values
             /-----------------------\
                          replayCache      queued emitters
                          /----------\/----------------------\
         +---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
         |   | 1 | 2 | 3 | 4 | 5 | 6 | E | E | E | E | E | E |   |   |   |
         +---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
               ^           ^           ^                      ^
               |           |           |                      |
              head         |      head + bufferSize     head + totalSize
               |           |           |
     index of the slowest  |    index of the fastest
      possible collector   |     possible collector
               |           |
               |     replayIndex == new collector's index
               \---------------------- /
          range of possible minCollectorIndex
          head == minOf(minCollectorIndex, replayIndex) // by definition
          totalSize == bufferSize + queueSize // by definition
       INVARIANTS:
          minCollectorIndex = activeSlots.minOf { it.index } ?: (head + bufferSize)
          replayIndex <= head + bufferSize
     */
    // Stored state 缓存存储状况
    private var buffer: Array<Any?>? = null // 缓存数组,用于保存 emit 发送的数据
    private var replayIndex = 0L // 新订阅者从 replayCache 中获取数据的开端方位
    private var minCollectorIndex = 0L //  当时活跃订阅者从缓存数组中获取数据时,对应的方位最小索引
    private var bufferSize = 0 // 缓存数组中 buffered values 的巨细
    private var queueSize = 0 // 缓存数组中 queued emitters 的巨细
    // Computed state 缓存计算状况
    private val head: Long get() = minOf(minCollectorIndex, replayIndex) // 缓存数组的开端方位
    private val replaySize: Int get() = (head + bufferSize - replayIndex).toInt() // 缓存数组中 replay 的巨细
    private val totalSize: Int get() = bufferSize + queueSize // 缓存数组中现已缓存的数据数量
    private val bufferEndIndex: Long get() = head + bufferSize// 缓存数组中 buffered values 的完毕方位的后一位索引,也便是 queued emitters 的开端方位
    private val queueEndIndex: Long get() = head + bufferSize + queueSize 缓存数组中 queued emitters 的完毕方位的后一位索引

从上面 SharedFlowImpl 中的缓存逻辑结构,再结合:

MutableSharedFlow<Int>(
        replay = 0,
        extraBufferCapacity = 10,
        onBufferOverflow = BufferOverflow.SUSPEND
    )

当 extraBufferCapacity = 1 ,调用 emit 办法发射数据时,此刻 bufferSize=0,bufferCapacity=1,minCollectorIndex=0,replayIndex=0,所以 bufferSize >= bufferCapacity && minCollectorIndex <= replayIndex 为 false,所以 tryEmitLocked 回来 true, tryEmit 回来 true, emit 不会被挂起。

当 extraBufferCapacity = 0 ,调用 emit 办法发射数据时,此刻 bufferSize=0,bufferCapacity=0,minCollectorIndex=0,replayIndex=0,所以 bufferSize >= bufferCapacity && minCollectorIndex <= replayIndex 为 true,又由于 onBufferOverflow=BufferOverflow.SUSPEND,所以 tryEmitLocked 回来 false, tryEmit 回来 false, 所以会履行 emitSuspend,emit 会被挂起。

这便是调用 emit 办法可能会被挂起原因。其实,满足bufferSize >= bufferCapacity && minCollectorIndex <= replayIndex 判读条件便是缓存区溢出,这时需求挑选处理战略,是 BufferOverflow.SUSPEND,仍是 BufferOverflow.DROP_LATEST,仍是 BufferOverflow.DROP_OLDEST

缓存区

emit 发送的值存储在缓存数组 buffer 中:

private var buffer: Array<Any?>? = null // 缓存数组,用于保存 emit 发送的数据

buffer 包含: buffered valuesqueued emitters

其间 buffered values 中存储的是 emit(value) 办法中的 value 值,buffered values 的巨细取决于 bufferCapacity=replay + extraBufferCapacity,replay 和 extraBufferCapacity 便是创立 MutableSharedFlow( replay: Int = 0, extraBufferCapacity: Int = 0, onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND )传入的值,所以 buffered values 也分为两部分:replay 和 extraBufferCapacity。

其间 queued emitters 中存储的是 emit(value) 办法中的 value 被包装成的 Emitter 值:

private suspend fun emitSuspend(value: T) = suspendCancellableCoroutine<Unit> sc@{ cont ->
        var resumes: Array<Continuation<Unit>?> = EMPTY_RESUMES
        val emitter = synchronized(this) lock@{
            // recheck buffer under lock again (make sure it is really full)
            if (tryEmitLocked(value)) {
                cont.resume(Unit)
                resumes = findSlotsToResumeLocked(resumes)
                return@lock null
            }
            // add suspended emitter to the buffer
            // 将 value 包装成 Emitter 目标存储到 buffer 中,存储方位为 queued emitters 的开端完毕方位规模
            Emitter(this, head + totalSize, value, cont).also {
                enqueueLocked(it)
                queueSize++ // added to queue of waiting emitters
                // synchronous shared flow might rendezvous with waiting emitter
                if (bufferCapacity == 0) resumes = findSlotsToResumeLocked(resumes)
            }
        }
        // outside of the lock: register dispose on cancellation
        emitter?.let { cont.disposeOnCancellation(it) }
        // outside of the lock: resume slots if needed
        for (r in resumes) r?.resume(Unit)
    }
 private class Emitter(
        @JvmField val flow: SharedFlowImpl<*>,
        @JvmField var index: Long,
        @JvmField val value: Any?,
        @JvmField val cont: Continuation<Unit>
    ) : DisposableHandle {
        override fun dispose() = flow.cancelEmitter(this)
    }

在 emit 挂起时(缓存区 buffered values 已满或巨细为0),才存储 Emitter 值到 buffer 中。

现在存储值到 buffer 中,有下面几条链路:

  1. emittryEmittryEmitLockedtryEmitNoCollectorsLockedenqueueLocked
  2. emittryEmittryEmitLockedenqueueLocked
  3. emitemitSuspendtryEmitLockedtryEmitNoCollectorsLockedenqueueLocked
  4. emitemitSuspendtryEmittryEmitLockedenqueueLocked
  5. emitemitSuspendEmitterenqueueLocked
 // enqueues item to buffer array, caller shall increment either bufferSize or queueSize
    private fun enqueueLocked(item: Any?) {
        val curSize = totalSize
        val buffer = when (val curBuffer = buffer) {
           // 创立一个巨细为 2 的缓存数组
            null -> growBuffer(null, 0, 2)
            // 假如当时缓存数据现已存满,则扩容,扩大为本来的 2 倍
            else -> if (curSize >= curBuffer.size) growBuffer(curBuffer, curSize,curBuffer.size * 2) else curBuffer
        }
        buffer.setBufferAt(head + curSize, item)
    }
没有订阅者正在 collect 当时 SharedFlow

当没有订阅者正在 collect 当时 SharedFlow,存储值到 buffer 中,会测验走链路:

  • emittryEmittryEmitLockedtryEmitNoCollectorsLockedenqueueLocked
  • emitemitSuspendtryEmitLockedtryEmitNoCollectorsLockedenqueueLocked
private fun tryEmitLocked(value: T): Boolean {
        // Fast path without collectors -> no buffering
        if (nCollectors == 0) return tryEmitNoCollectorsLocked(value) // always returns true
        // ..... 省掉
        return true
    }
 private fun tryEmitNoCollectorsLocked(value: T): Boolean {
        assert { nCollectors == 0 }
        if (replay == 0) return true // no need to replay, just forget it now
        enqueueLocked(value) // enqueue to replayCache
        bufferSize++ // value was added to buffer
        // drop oldest from the buffer if it became more than replay
        if (bufferSize > replay) dropOldestLocked()
        minCollectorIndex = head + bufferSize // a default value (max allowed)
        return true
    }

此刻,假如又 replay=0,则不会运用缓存区。不然会将 emit(value) 中 value 存储到 buffer 数组中 buffered values 的 replayCache 开端完毕规模方位内。

replay 的值

当有订阅者正在 collect 当时 SharedFlow,此刻,假如 replay=0,extraBufferCapacity=0,则会测验走链路:

  • emitemitSuspendEmitterenqueueLocked

emit(value) 中 value值 包装成 Emitter 目标存储到 buffer 数组中的 queued emitters 开端完毕规模方位内。当没有订阅者时,emit(value) 中 value 就会被丢弃。

假如 replay!=0 或 extraBufferCapacity!=0,会测验走链路:

  • emittryEmittryEmitLockedenqueueLocked
  • emitemitSuspendtryEmittryEmitLockedenqueueLocked
  • emitemitSuspendEmitterenqueueLocked

emit(value) 办法中的 value 值会被存储到 buffer 数组中 buffered values 或 queued emitters 开端完毕规模方位内,当存储到 buffered values 开端完毕规模方位内,replayCache 开端完毕规模方位内的值会更新,还会遭到缓存溢出战略 onBufferOverflow(BufferOverflow.SUSPEND,BufferOverflow.DROP_LATEST 或 BufferOverflow.DROP_OLDEST)影响。

搜集

依据上面发射的剖析,搜集便是从缓存区 buffer 中去取值,能够从 buffered values 区域直接取出 value 值,或能够从 queued emitters 取出 Emitter 目标拆解出 value 值。

SharedFlowImpl 类完成了 Flow接口的 collect 办法:

 override suspend fun collect(collector: FlowCollector<T>) {
        // 分配一个 SharedFlowSlot
        val slot = allocateSlot()
        try {
           // 假如订阅者是一个 SubscribedFlowCollector,则先告诉订阅者开端订阅
            if (collector is SubscribedFlowCollector) collector.onSubscription()
            // 当时订阅者地点协程
            val collectorJob = currentCoroutineContext()[Job]
            // 死循环
            while (true) {
                var newValue: Any?
                // 死循环
                while (true) {
                   // 经过分配的 slot 去从缓存区 buffer 获取值
                    newValue = tryTakeValue(slot) // attempt no-suspend fast path first
                    // 获取到值
                    if (newValue !== NO_VALUE) break
                    // 没有获取到值,订阅者地点协程会被挂起,等候 emit 发射新数据到缓存区
                    awaitValue(slot) // await signal that the new value is available
                }
                //承认订阅者地点协程是否还存活,假如不存活,会抛出 CancellationException 异常,直接到 finally
                collectorJob?.ensureActive()
                // 将新值给订阅者
                collector.emit(newValue as T)
            }
        } finally {
            // 订阅者不存活时,开释分配的 slot
            freeSlot(slot)
        }
    }

订阅者订阅时首要的步骤是:

  1. 分配一个 SharedFlowSlot:val slot = allocateSlot()
  2. 经过分配的 slot 去从缓存区 buffer 获取值:newValue = tryTakeValue(slot);假如获取值成功,就直接进入下一步,不然订阅者地点协程会被挂起,等候 emit 发射新数据到缓存区:awaitValue(slot)
  3. 承认订阅者地点协程是否还存活,假如不存活,会抛出 CancellationException 异常,直接到 finally:collectorJob?.ensureActive()
  4. 将新值给订阅者:collector.emit(newValue as T)
  5. 订阅者不存活时,开释分配的 slot:freeSlot(slot)

下面剖析一下allocateSlottryTakeValue(slot)awaitValuefreeSlot

allocateSlot

allocateSlot 办法界说在 SharedFlowImpl 承继的 AbstractSharedFlow 笼统类中:

    @Suppress("UNCHECKED_CAST")
    protected var slots: Array<S?>? = null // 用于办理给订阅者分配的 slot
        private set
    protected var nCollectors = 0 // 还存活的订阅者数量
        private set
    private var nextIndex = 0 // 分配下一个 slot 目标在 slots 数组中的索引
    private var _subscriptionCount: MutableStateFlow<Int>? = null // 用一个 StateFlow 来记载订阅者数量
    protected fun allocateSlot(): S {
        // Actually create slot under lock
        var subscriptionCount: MutableStateFlow<Int>? = null
        // 加锁
        val slot = synchronized(this) {
            // 获取一个 Array<SharedFlowSlot?> 目标
            val slots = when (val curSlots = slots) {
                // 新创立一个巨细为 2 的 Array<SharedFlowSlot?> 
                null -> createSlotArray(2).also { slots = it }
                // 扩容,容量扩大为本来 Array<SharedFlowSlot?>  的 2 倍
                else -> if (nCollectors >= curSlots.size) {
                    curSlots.copyOf(2 * curSlots.size).also { slots = it }
                } else {
                    // 直接运用当时的 Array<SharedFlowSlot?>
                    curSlots
                }
            }
            // 下面为从上面的 slots 数组中获取一个 slot 目标
            var index = nextIndex
            var slot: S
            while (true) {
                slot = slots[index] ?: createSlot().also { slots[index] = it }
                index++
                if (index >= slots.size) index = 0
                // 给 slot 的特色 index 赋值,index 的值指向的缓存区 buffer 中的 index
                if ((slot as AbstractSharedFlowSlot<Any>).allocateLocked(this)) break // break when found and allocated free slot
            }
            nextIndex = index
            // 订阅者加1
            nCollectors++
            subscriptionCount = _subscriptionCount // retrieve under lock if initialized
            slot
        }
        // 订阅数量加 1
        subscriptionCount?.increment(1)
        return slot
    }

从上面的代码逻辑,这个办法的首要效果是:为订阅者分配一个 SharedFlowSlot 目标,该目标能够用来相关从缓存区 buffer 获取值的索引,也便是能够用来确定订阅者将要收到的值,以及将订阅者地点协程挂起,等候有新值发送到缓冲区 buffer。

关于 SharedFlowSlot 类:

private class SharedFlowSlot : AbstractSharedFlowSlot<SharedFlowImpl<*>>() {
    @JvmField
    var index = -1L // 指向缓冲区 buffer 中的索引,值为 -1 表明当时 slot 现已被开释
    //......省掉
    override fun allocateLocked(flow: SharedFlowImpl<*>): Boolean {
        if (index >= 0) return false // not free
        index = flow.updateNewCollectorIndexLocked()
        return true
    }
    //......省掉
}
    internal fun updateNewCollectorIndexLocked(): Long {
        val index = replayIndex
        if (index < minCollectorIndex) minCollectorIndex = index
        return index
    }

为订阅者分配的 slot 目标的变量 index ,它从缓存区 buffer 获取值的初始值索引为 replayIndex(index=replayIndex),也便是新订阅者从 replayCache 中的开端方位开端获取值。

tryTakeValue

tryTakeValue 办法的效果是:经过 SharedFlowSlot 的 index 从缓存区 buffer 获取值,index 指向的可能是缓存区 buffer 中的 buffered values 或 queued emitters 开端完毕方位规模。当取值成功后,index 指向缓存区 buffer 的下一个方位slot.index = index + 1

    private fun tryTakeValue(slot: SharedFlowSlot): Any? {
        var resumes: Array<Continuation<Unit>?> = EMPTY_RESUMES
        // 加锁
        val value = synchronized(this) {
            // 经过 slot ,获取指向的缓存区 buffer 中的 index
            val index = tryPeekLocked(slot)
            if (index < 0) {
                // 没有值
                NO_VALUE
            } else {
                // 记载一下当时 slot 的 index
                val oldIndex = slot.index
                // 经过上面的 index,从缓存区 buffer 中取出对应的值
                val newValue = getPeekedValueLockedAt(index)
                // slot 的 index 指向缓存区 buffer 中的下一位 index+1
                slot.index = index + 1 // points to the next index after peeked one
                // 更新缓存数组的方位,并获取缓存数组与订阅者数组中可康复的续体
                resumes = updateCollectorIndexLocked(oldIndex)
                newValue
            }
        }
        for (resume in resumes) resume?.resume(Unit)
        return value
    }

判别 index 是契合缓存区 buffer 中的 buffered values 仍是 queued emitters 的开端完毕方位规模,首要经过 tryPeekLocked 办法:

// returns -1 if cannot peek value without suspension
    private fun tryPeekLocked(slot: SharedFlowSlot): Long {
        // slot.index 刚开端的值是 replayIndex,也便是指向 buffered values(参看上面的 updateNewCollectorIndexLocked 办法 )
        val index = slot.index
        // 假如 index 在 buffered values  的开端完毕方位规模内,直接回来
        if (index < bufferEndIndex) return index
        // 下面的逻辑都是用来判别是否能在 queued emitters 取值
        // index>=bufferEndIndex ,此刻假如 buffered values 的容量又大于0,找不到值
        if (bufferCapacity > 0) return -1L 
        // 此刻缓存数组只要 queued emitters,不能取开端方位后边的 Emitter,所以找不到值
        // 由于 head=minOf(minCollectorIndex, replayIndex)
        if (index > head) return -1L 
        // 缓存数组巨细为0 ,找不到值
        if (queueSize == 0) return -1L
        // 从 queued emitters 开端完毕方位规模内取值
        return index 
    }

awaitValue

当 tryTakeValue 办法回来 NO_VALUE 值,也便是 tryPeekLocked 办法回来 -1L 时,此刻在缓存区 buffer 找不到对应的 index,就会履行 awaitValue

    // 这个办法是一个挂起办法
    private suspend fun awaitValue(slot: SharedFlowSlot): Unit = suspendCancellableCoroutine { cont ->
        synchronized(this) lock@{
            // 再此测验获取指向的缓存区 buffer 中的 index
            val index = tryPeekLocked(slot) // recheck under this lock
            if (index < 0) {
                // 没有找到,给 slot 目标 cont 赋值,也便是让订阅者地点协程挂起
                slot.cont = cont // Ok -- suspending
            } else {
                // 找到,康复协程,不需求挂起
                cont.resume(Unit) // has value, no need to suspend
                return@lock
            }
            slot.cont = cont // suspend, waiting
        }
    }

该办法的首要效果是:将订阅者封装成 Continuation 接口完成类目标,挂起订阅者地点协程。

slot.cont 是一个 Continuation 接口完成类目标:

public interface Continuation<in T> {
    /**
     *  相关的协程
     */
    public val context: CoroutineContext
    /**
     *  康复相关的协程,传递一个 successful 或 failed 成果值过去
     *  
     */
    public fun resumeWith(result: Result<T>)
}

它的 context 相关的便是订阅者地点的协程。所以 slot.cont 的值存储的是相关订阅者的 Continuation 目标。

freeSlot

freeSlot 办法与 allocateSlot 相对应,当订阅者不再存活时,freeSlot 办法就会履行:

    protected fun freeSlot(slot: S) {
        // 运用 StateFlow 保存订阅数量
        var subscriptionCount: MutableStateFlow<Int>? = null
        // 加锁
        val resumes = synchronized(this) {
            // 订阅者数量减1
            nCollectors--
            subscriptionCount = _subscriptionCount
            //  假如没有订阅者,下一次在 slots 中分配 slot 目标,从索引0开端
            if (nCollectors == 0) nextIndex = 0
            // slot 目标的真实开释
            (slot as AbstractSharedFlowSlot<Any>).freeLocked(this)
        }
        /*
           Resume suspended coroutines.
           This can happens when the subscriber that was freed was a slow one and was holding up buffer.
           When this subscriber was freed, previously queued emitted can now wake up and are resumed here.
        */
        for (cont in resumes) cont?.resume(Unit)
        // 订阅数量减1
        subscriptionCount?.increment(-1)
    }

这个办法首要的效果:记载的订阅者数量减1,以及将 slot 目标中 index 和 cont 重置,也便是 index 不再指向缓存区 buffer 的开端完毕方位规模,cout 不再相关订阅者地点协程。

private class SharedFlowSlot : AbstractSharedFlowSlot<SharedFlowImpl<*>>() {
    @JvmField
    var index = -1L // 指向缓冲区 buffer 中的索引,值为 -1 表明当时 slot 现已被开释
    //......省掉
    @JvmField
    var cont: Continuation<Unit>? = null // 用来保存等候新数据发送的订阅者的续体,当订阅者等候新值时用到
    //......省掉
    override fun freeLocked(flow: SharedFlowImpl<*>): Array<Continuation<Unit>?> {
        assert { index >= 0 }
        val oldIndex = index
        index = -1L
        cont = null // cleanup continuation reference
        return flow.updateCollectorIndexLocked(oldIndex)
    }
}

在 freeLocked中调用的 flow.updateCollectorIndexLocked(oldIndex)办法用于更新缓存数组的方位

到这儿,SharedFlow 的创立,发送 和 搜集剖析完毕。关于它的特色:同享和独立存在有了一个大概的了解。

事务场景

知道了 SharedFlow 的创立,发送 和 搜集原理后,根据它同享和独立存在的特色,能够在事务中用来做事情总线,相似以前运用的 EventBus。下面是一个 SharedFlow 完成的 EventBus 简略比如:

界说事情总线:

object EventBus {
    private val events = ConcurrentHashMap<String, MutableSharedFlow<Event>>()
    private fun getOrPutEventFlow(eventName: String): MutableSharedFlow<Event> {
        return events[eventName] ?: MutableSharedFlow<Event>().also { events[eventName] = it }
    }
    fun getEventFlow(event: Class<Event>): SharedFlow<Event> {
        return getOrPutEventFlow(event.simpleName).asSharedFlow()
    }
    suspend fun produceEvent(event: Event) {
        val eventName = event::class.java.simpleName
        getOrPutEventFlow(eventName).emit(event)
    }
    fun postEvent(event: Event, delay: Long = 0, scope: CoroutineScope = MainScope()) {
        scope.launch {
            delay(delay)
            produceEvent(event)
        }
    }
}
@Keep
open class Event(val value: Int) {
}

事情发射和订阅:

        lifecycleScope.launch {
            EventBus.getEventFlow(Event::class.java).collect {
                Log.e("Flow", "EventBus Collect: value=${it.value}")
            }
        }
        EventBus.postEvent(Event(1), 0, lifecycleScope)
        EventBus.postEvent(Event(2), 0)
控制台输出成果:
Flow                    com.example.wangjaing                E  EventBus Collect: value=1
Flow                    com.example.wangjaing                E  EventBus Collect: value=2

运用 SharedFlow 来做事情总线,有以下优点:

  1. 事情能够延迟发送
  2. 能够界说粘性事情
  3. 事情能够感知 Activity 或 Fragment 的生命周期
  4. 事情是有序的

总结

在暖流 SharedFlow 中,当它创立以后它就存在了,它能够在出产者 emit 数据时,没有顾客 collect 数据而独立运转。当出产者 emit 数据后,这些数据会被缓存下来,新老顾客都能够收到这些数据,然后到达同享数据。

关于发射数据操作,会遭到 MutableSharedFlow 结构办法参数 replay, extraBufferCapacity,onBufferOverflow 值的影响,这些参数会决议发射操作是挂起仍是不挂起。发射的数据,将运用缓存数组进行办理,办理区域分为 buffered values 和 queued emitters。replay 和extraBufferCapacity 参数决议了buffered values 区域的巨细,当 buffered values 区域存满溢出时,会依据溢出战略 onBufferOverflow 进行区域调整。当 replay=0 和 extraBufferCapacity=0 ,或 replay!=0 和 extraBufferCapacity!=0 且 buffered values 区域存满, 发射的数据将被包装成 Emitter 存储到 queued emitters 区域。另外,订阅者数量决议了发射数据是存储到缓存区仍是丢弃。最终,缓存区存储的数据对一切订阅者同享。

关于搜集数据操作,运用 slots: Array<SharedFlowSlot?> 数组来办理订阅者,其间每一个 slot 目标对应一个订阅者,slot 目标的 slot.index 将订阅者要搜集的数据与缓存区相关起来,slot.cont 将订阅者地点协程与 SharedFlow上下文相关起来。假如经过 slot.index 能在缓存区取到值,就直接将值给订阅者。不然就将订阅者封装成 Continuation 接口完成类目标存储到 slot.cont 中,挂起订阅者地点协程,等候缓存区有值时,再康复订阅者协程并给它值。当订阅者协程不存活时,会开释订阅者相关的 slot 目标,也便是重置 slot.inext 和 slot.cont 的值,并从头调整缓存数组的方位。

StateFlow

StateFlow 也是根据 SharedFlow 完成的,所以能够把 StateFlow 了解为 SharedFlow 的一种特别存在。

public interface StateFlow<out T> : SharedFlow<T> {
    /**
     * The current value of this state flow.
     */
    public val value: T
}

StateFlow ,它也是暖流,它也能让一切搜集器同享它所宣布的值,只不过这个值为当时最新的值,它的实例也能够独立于搜集器的存在而存在。

下面将从 StateFlow 的创立,发送 和 搜集来剖析,原理与 SharedFlow 的创立,发送 和 搜集相似,所以这儿只做简略剖析。

创立

创立一个 StateFlow 运用的是 SharedFlowKt 供给的 MutableStateFlow 结构办法:

public fun <T> MutableStateFlow(value: T): MutableStateFlow<T> = StateFlowImpl(value ?: NULL)
private class StateFlowImpl<T>(
    initialState: Any // T | NULL
) : AbstractSharedFlow<StateFlowSlot>(), MutableStateFlow<T>, CancellableFlow<T>, FusibleFlow<T> {
   private val _state = atomic(initialState)
   //......省掉
}

StateFlow 有必要要有一个初始值,这个值被缓存在一个原子目标里边:_state = atomic(initialState)。假如这个值没有更新,那么订阅者订阅时,将收到这个值。

发射

StateFlowImpl 类完成了发射数据 emittryEmit 办法:

private class StateFlowImpl<T>(
    initialState: Any // T | NULL
) : AbstractSharedFlow<StateFlowSlot>(), MutableStateFlow<T>, CancellableFlow<T>, FusibleFlow<T> {
    // 缓存当时值原子目标
    private val _state = atomic(initialState) 
    private var sequence = 0 
    public override var value: T
        get() = NULL.unbox(_state.value)
        //发送时,更新当时值,并缓存在 _state 中
        set(value) { updateState(null, value ?: NULL) }
    override fun compareAndSet(expect: T, update: T): Boolean =
        updateState(expect ?: NULL, update ?: NULL)
    //更新原子目标 _state 的值
    private fun updateState(expectedState: Any?, newState: Any): Boolean {
        var curSequence = 0
        // 订阅者相关的 slots
        var curSlots: Array<StateFlowSlot?>? = this.slots 
        synchronized(this) {
            val oldState = _state.value
            if (expectedState != null && oldState != expectedState) return false // CAS 操作
            if (oldState == newState) return true // 假如当时值和新值持平,则不必更新,也不必通知订阅者
            // 将新值更新到缓存中
            _state.value = newState
            curSequence = sequence
            if (curSequence and 1 == 0) { 
                curSequence++ // 
                sequence = curSequence
            } else {
                sequence = curSequence + 2 // change sequence to notify, keep it odd
                return true 
            }
            curSlots = slots // read current reference to collectors under lock
        }
       //......省掉
    }
   //......省掉
   //非挂起发送
    override fun tryEmit(value: T): Boolean {
        this.value = value
        return true
    }
    //挂起发送
    override suspend fun emit(value: T) {
        this.value = value
    }
}

这两个办法都是去更新缓存目标 _state 存储的 value 值,假如当时值和新值持平,则不会更新,不然更新,并把新值给订阅者。

搜集

依据上面发射的剖析,搜集便是从缓存 _state 中去取值。

StateFlowImpl 类完成了 Flow接口的 collect 办法:

    override suspend fun collect(collector: FlowCollector<T>): Nothing {
        // 分配一个 StateFlowSlot
        val slot = allocateSlot()
        try {
            // 假如订阅者是 SubscribedFlowCollector 类型,则告诉订阅者订阅开端
            if (collector is SubscribedFlowCollector) collector.onSubscription()
            // 当时协程
            val collectorJob = currentCoroutineContext()[Job]
            // 记载一下上一个缓存值
            var oldState: Any? = null 
            // 死循环
            while (true) {
                // Here the coroutine could have waited for a while to be dispatched,
                // 获取当时缓存值
                val newState = _state.value
                // 承认订阅者地点协程是否还存活,假如不存活,会抛出 `CancellationException` 异常,直接到 finally
                collectorJob?.ensureActive()
                // 假如上一个缓存值为空或新值不等于上一个缓存值,则将新值给订阅者
                if (oldState == null || oldState != newState) {
                    collector.emit(NULL.unbox(newState))
                    //更新记载的缓存值
                    oldState = newState
                }
                // 判别订阅者是否需求挂起
                if (!slot.takePending()) { 
                    //订阅者地点协程会被挂起,等候 emit 发射新数据到缓存
                    slot.awaitPending()
                }
            }
        } finally {
           // 订阅者不存活时,开释分配的 slot
            freeSlot(slot)
        }
    }

订阅者订阅时首要的步骤是:

  1. 分配一个 StateFlowSlot:val slot = allocateSlot()
  2. 经过分配的 slot 去从缓存 _state 中取值:val newState = _state.value
  3. 承认订阅者地点协程是否还存活,假如不存活,会抛出 CancellationException 异常,直接到 finally:collectorJob?.ensureActive()
  4. 将新值给订阅者:collector.emit(NULL.unbox(newState))
  5. 订阅者不存活时,开释分配的 slot:freeSlot(slot)

事务场景

知道了 StateFlow 的创立,发送 和 搜集的大致原理后,以及它同享最新状况的特色。在事务中能够用来做状况更新(替换 LiveData)。

比如从服务端获取一个列表数据,并把列表数据展示到 UI。下面运用 MVI (Model-View-Intent)来做:

Data Layer:

class FlowRepository private constructor() {
    companion object {
        @JvmStatic
        fun newInstance(): FlowRepository = FlowRepository()
    }
    fun requestList(): Flow<List<ItemBean>> {
        val call = ServiceGenerator
            .createService(FlowListApi::class.java)
            .getList()
        return flow {
            emit(call.execute())
        }.flowOn(Dispatchers.IO).filter { it.isSuccessful }
            .map {
                it.body()?.data
            }
            .filterNotNull().catch {
                emit(emptyList())
            }.onEmpty {
                emit(emptyList())
            }
    }
}

ViewModel:

class ListViewModel : ViewModel() {
    private val repository: FlowRepository = FlowRepository.newInstance()
    private val _uiIntent: Channel<FlowViewIntent> = Channel()
    private val uiIntent: Flow<FlowViewIntent> = _uiIntent.receiveAsFlow()
    private val _uiState: MutableStateFlow<FlowViewState<List<ItemBean>>> =
        MutableStateFlow(FlowViewState.Init())
    val uiState: StateFlow<FlowViewState<List<ItemBean>>> = _uiState
    fun sendUiIntent(intent: FlowViewIntent) {
        viewModelScope.launch {
            _uiIntent.send(intent)
        }
    }
    init {
        viewModelScope.launch {
            uiIntent.collect {
                handleIntent(it)
            }
        }
    }
    private fun handleIntent(intent: FlowViewIntent) {
        viewModelScope.launch {
            repository.requestList().collect {
                if (it.isEmpty()) {
                    _uiState.emit(FlowViewState.Failure(0, "data is invalid"))
                } else {
                    _uiState.emit(FlowViewState.Success(it))
                }
            }
        }
    }
}
data class FlowViewIntent()
sealed class FlowViewState<T> {
    @Keep
    class Init<T> : FlowViewState<T>()
    @Keep
    class Success<T>(val result: T) : FlowViewState<T>()
    @Keep
    class Failure<T>(val code: Int, val msg: String) : FlowViewState<T>()
}

UI:

 private var isRequestingList = false
 private lateinit var listViewModel: ListViewModel
 private fun initData() {
        listViewModel = ViewModelProvider(this)[ListViewModel::class.java]
        lifecycleScope.launchWhenStarted {
            listViewModel.uiStateFlow.collect {
                when (it) {
                    is FlowViewState.Success -> {
                        showList(it.result)
                    }
                    is FlowViewState.Failure -> {
                        showListIfFail()
                    }
                    else -> {}
                }
            }
        }
        requestList()
    } 
  private fun requestList() {
        if (!isRequestingList) {
            isRequestingList = true
            listViewModel.sendUiIntent( FlowViewIntent() )
        }
    }

运用 StateFlow 替换 LiveData ,并用结合 MVI 替换 MVVM 后,能够有以下优点:

  1. 唯一可信数据源:MVVM 中 可能会存在很多 LiveData,这导致数据交互或并行更新出现逻辑不可控,增加 UIState 结合 StateFlow ,数据源只要 UIState;
  2. 数据单向活动:MVVM 中存在数据 UI ⇆ ViewModel 相互活动,而 MVI 中数据只能从 Data Layer → ViewModel → UI 活动,数据是单向活动的。

运用 StateFlow 替换 LiveData 来做事情状况更新,有以下差异:

  • StateFlow 需求将初始状况传递给结构函数,而 LiveData 不需求。
  • 当 View 进入 STOPPED 状况时,LiveData.observe() 会主动撤销注册运用方,而从 StateFlow 或任何其他数据流搜集数据的操作并不会主动中止。如需完成相同的行为,需求从 Lifecycle.repeatOnLifecycle 块搜集数据流。

总结

暖流 StateFlow,根据 SharedFlow 完成,所以它也有独立存在和同享的特色。但在 StateFlow 中发射数据,只要最新的值被缓存下来,所以当新老订阅者订阅时,只会收到它最终一次更新的值,假如发射的新值和当时值持平,订阅者也不会收到通知。