协程中,经过调用操作符shareIn与stateIn,能够将一个冷流转化成一个暖流,这两个办法的区别如下:

  • shareIn:将一个冷流转化成一个标准的暖流——SharedFlow类型的目标。
  • stateIn:将一个冷流转化成一个单数据更新的暖流——StateFlow类型的目标。

shareIn办法与stateIn办法的运用与完结的原理类似,下面以shareIn办法为例进行剖析。

一.shareIn办法的运用

shareIn办法用于将一个冷流转化成一个暖流目标,并在指定的协程效果域中依据不同的发动停止战略发动暖流,将上游发射的数据发射给下流的多个订阅者,代码如下:

public fun <T> Flow<T>.shareIn(
    scope: CoroutineScope,
    started: SharingStarted,
    replay: Int = 0
): SharedFlow<T> {
    ...
}

shareIn办法共有三个参数,含义如下:

  • scope:表明暖流发动的协程效果域。
  • started:暖流的发动停止战略,共三种:

    • Eagerly:立刻发动,而且不会停止。
    • Lazily:当第一个订阅者出现时发动,而且不会停止。
    • WhileSubscribed:默许情况下,当第一个订阅者出现时发动,当最终一个订阅者消失时停止,并保存replayCache中的数据。
  • replay:SharedFlow中replayCache的最大容量,有必要大于等于零。

1.典型场景的运用

shareIn办法用于当创建或获取一个冷流成本较高,一起还有多个订阅者需求获取冷流发射的数据的场景。比方:经过网络衔接获取数据,IO读取文件等,会耗费大量的时刻和设备资源,这时就能够运用shareIn办法,代码如下:

private suspend fun test() {
    val flow = flow {
        // 网络衔接
        connectToNet()
        try {
            while (true) {
                // 获取数据并发射
                emit(getDataFromNet())
            }
        } finally {
            // 断开衔接
            disconnectFromNet()
        }
    }
    // 不运用shareIn办法
    // 10次网络衔接
    for (i in 0..10)
        launch {
            flow.collect {
                Log.d("liduo", "test: $it")
            }
        }
    // 运用shareIn办法
    val sharedFlow = flow.shareIn(GlobalScope, SharingStarted.Eagerly, 1)
    // 1次网络衔接
    for (i in 0..10)
        launch {
            sharedFlow.collect {
                Log.d("liduo", "test1: $it")
            }
        }
}

2.调配操作符的运用

1)感知上游流的完毕

当上游流正常履行完毕完毕时,订阅者无法感知,因为暖流不会完毕。假如需求告诉订阅者上游的流履行完结,能够在shareIn操作符前运用onCompletion操作符,代码如下:

val sharedFlow = flow {
    // 向下流发射100
    emit(100)
    // 向下流发射200
    emit(200)
}.onCompletion {
    // 假如不是因为反常而完毕
    if (it == null)
        // 发射0
        emit(0)
    else // 假如是因为反常完毕,则发射-1
        emit(-1)
}.shareIn(GlobalScope, SharingStarted.Eagerly, 1)

2)感知上游流的反常

当上游的流产生反常导致暖流取消时,会直接被暖流地点的协程效果域处理,因此订阅者是没有感知的。假如需求告诉订阅者,能够在shareIn操作符前运用catch或retry操作符,代码如下:

val sharedFlow = flow {
    // 向下流发射100
    emit(100)
    // 向下流发射200
    emit(200)
}.catch {
    // 产生反常,发射-1告诉下流
    emit(-1)
}.shareIn(GlobalScope, SharingStarted.Eagerly, 1)

3)感知上游流的发动

当上游流发动时,订阅者是无法感知的。假如需求告诉订阅者,能够在shareIn操作符前运用onStart操作符,代码如下:

val sharedFlow = flow {
    // 向下流发射100
    emit(100)
    // 向下流发射200
    emit(200)
}.onStart {
    // 向下流发射1
    emit(1)
}.shareIn(GlobalScope, SharingStarted.Eagerly, 1)

4)显示指定缓存最大容量

shareIn操作符发动的暖流在独立的协程中运转,而且暖流中缓存数组的buffered values的最大容量为replay或CHANNEL_DEFAULT_CAPACITY中较大的。假如需求显示的指定buffered values的最大容量,能够在shareIn操作符前运用buffer或conflate操作符,运用规矩如下:

  • buffer(0).shareIn(scope, started, 0):replay = 0,extraBufferCapacity = 0,没有缓存。
  • buffer(b).shareIn(scope, started, r):replay = r,extraBufferCapacity = b。
  • conflate().shareIn(scope, started, r):replay = r,onBufferOverflow = DROP_OLDEST,假如r等于0,则extraBufferCapacity = 1。

二.暖流的发动停止战略

在前面的介绍中,说到在shareIn办法中,暖流的发动停止战略界说在接口SharingStarted中,代码如下:

public interface SharingStarted {
    public companion object {
        // 立刻发动,而且不会停止
        public val Eagerly: SharingStarted = StartedEagerly()
        // 当第一个订阅者出现时发动,而且不会停止
        public val Lazily: SharingStarted = StartedLazily()
        // 默许情况下,当第一个订阅者出现时发动,
        // 当最终一个订阅者消失时停止,并保存replayCache中的数据
        // 参数stopTimeoutMillis表明当最终一个订阅者消失后多长时刻停止,默许为0——立刻停止
        // 参数replayExpirationMillis表明在停止后多长时刻去铲除replayCache,默许为Int最大值——不铲除
        @Suppress("FunctionName")
        public fun WhileSubscribed(
            stopTimeoutMillis: Long = 0,
            replayExpirationMillis: Long = Long.MAX_VALUE
        ): SharingStarted =
            StartedWhileSubscribed(stopTimeoutMillis, replayExpirationMillis)
    }
    // 中心接口办法
    public fun command(subscriptionCount: StateFlow<Int>): Flow<SharingCommand>
}

暖流的三个发动停止战略分别对应StartedEagerly、StartedLazily、StartedWhileSubscribed这三个类的目标。除了三个发动停止战略外,接口中还界说了一个中心办法command,用于将SharedFlow类型目标的全局变量subscriptionCount,转化为泛型SharingCommand的Flow类型目标,实际上就是经过监听订阅者数量的改变来宣布不同的控制指令。

StartedEagerly、StartedLazily、StartedWhileSubscribed这三个类都完结了SharingStarted接口,偏重写了command办法。假如我们需求自界说一个新的发动停止战略,也能够经过完结SharingStarted接口重写command办法来完结。

1.暖流的控制指令

SharingCommand类是一个枚举类,界说了控制暖流的指令,代码如下:

public enum class SharingCommand {
    // 发动暖流,并触发上游流的履行
    START,
    // 停止暖流,并取消上游流的履行
    STOP,
    // 停止暖流,并取消上游流的履行,一起将replayCache重置为初始状况
    // 假如暖流的类型为StateFlow,则将replayCache重置为初始值
    // 假如暖流的类型为SharedFlow,则调用resetReplayCache办法,清空replayCache
    STOP_AND_RESET_REPLAY_CACHE
}

连续发射相同的指令不会有任何效果。先发射STOP指令,再发射START指令,能够触发暖流的重启,偏从头触发上游流的履行。

2.Eagerly战略的完结

Eagerly战略表明立刻发动暖流,而且不会停止,由StartedEagerly类完结,代码如下:

private class StartedEagerly : SharingStarted {
    override fun command(subscriptionCount: StateFlow<Int>): Flow<SharingCommand> =
        flowOf(SharingCommand.START)
    override fun toString(): String = "SharingStarted.Eagerly"
}
...
public fun <T> flowOf(value: T): Flow<T> = flow {
    emit(value)
}

Eagerly战略不关心订阅者的数量,在触发后直接向下流发射START指令。

3.Lazily战略的完结

Lazily战略表明当第一个订阅者出现时发动暖流,而且不会停止,由StartedLazily类完结,代码如下:

private class StartedLazily : SharingStarted {
    override fun command(subscriptionCount: StateFlow<Int>): Flow<SharingCommand> = flow {
        // 标志位,默许为false
        var started = false
        // 监听订阅者数量的改变
        subscriptionCount.collect { count ->
            // 假如订阅者数量大于0,且之前没有发射过指令
            if (count > 0 && !started) {
                // 设置标志位为true
                started = true
                // 发射START指令
                emit(SharingCommand.START)
            }
        }
    }
    override fun toString(): String = "SharingStarted.Lazily"
}

Eagerly战略只要当订阅者数量大于0的时分,才会向下流发射START指令,而且只会发射一次。

4.WhileSubscribed战略的完结

WhileSubscribed战略默许情况下表明当第一个订阅者出现时发动暖流,并在最终一个订阅者消失时停止,保存replayCache中的数据,由StartedWhileSubscribed类完结,代码如下:

private class StartedWhileSubscribed(
    private val stopTimeout: Long,
    private val replayExpiration: Long
) : SharingStarted {
    ...
    override fun command(subscriptionCount: StateFlow<Int>): Flow<SharingCommand> = 
        // 监听订阅者改变,并对上游发射的数据进行转化
        subscriptionCount.transformLatest { count ->
            // 假如订阅者数量大于0
            if (count > 0) {
                // 发射START指令
                emit(SharingCommand.START)
            } else { // 假如订阅者数量等于0
                // 推迟指定的暖流停止时刻
                delay(stopTimeout)
                // 假如指定的铲除缓存时刻大于0
                if (replayExpiration > 0) {
                    // 发射STOP指令
                    emit(SharingCommand.STOP)
                    // 推迟指定的铲除缓存时刻
                    delay(replayExpiration)
                }
                // 发射STOP_AND_RESET_REPLAY_CACHE指令
                emit(SharingCommand.STOP_AND_RESET_REPLAY_CACHE)
            }
        } // 只要当START指令发射后,才会向下流发射
        .dropWhile { it != SharingCommand.START }
        .distinctUntilChanged()// 只要当时后指令不一起,才会向下流发射
    ...
}

WhileSubscribed战略在订阅者数量大于0的时分向下流发射START指令,在订阅者数量等于0的时分依据不同的推迟时刻参数向下流发射STOP指令和STOP_AND_RESET_REPLAY_CACHE指令。而且有必要先发射START指令,相邻重复的指令也不会被发射到下流。

三.shareIn办法的完结

代码如下:

public fun <T> Flow<T>.shareIn(
    scope: CoroutineScope,
    started: SharingStarted,
    replay: Int = 0
): SharedFlow<T> {
    // 核算暖流的参数
    val config = configureSharing(replay)
    // 创建一个类型为MutableSharedFlow的目标
    val shared = MutableSharedFlow<T>(
        replay = replay,
        extraBufferCapacity = config.extraBufferCapacity,
        onBufferOverflow = config.onBufferOverflow
    )
    // 在指定的协程效果域内发动暖流地点的协程
    @Suppress("UNCHECKED_CAST")
    scope.launchSharing(config.context, config.upstream, shared, started, NO_VALUE as T)
    // 回来SharedFlow类型的目标,控制单一数据发射源
    return shared.asSharedFlow()
}

在shareIn办法中,首要调用configureSharing办法,得到暖流的基本参数,这些参数会被封装成一个类型为SharingConfig的目标,代码如下:

private class SharingConfig<T>(
    // 上游的流
    @JvmField val upstream: Flow<T>,
    // extraBufferCapacity参数
    @JvmField val extraBufferCapacity: Int,
    // 溢出战略
    @JvmField val onBufferOverflow: BufferOverflow,
    // 协程上下文
    @JvmField val context: CoroutineContext
)

接下来,会依据核算出的参数,创建一个MutableSharedFlow类型的目标,并调用launchSharing办法发动暖流地点的协程,最终回来一个SharedFlow类型的目标。

1.暖流参数的核算

configureSharing办法是Flow的扩展办法,依据不同的战略核算出暖流的参数,代码如下:

// Flow的扩展办法
private fun <T> Flow<T>.configureSharing(replay: Int): SharingConfig<T> {
    assert { replay >= 0 }
    // 核算的extraBufferCapacity默许值
    // 成果为0与(Channel.CHANNEL_DEFAULT_CAPACITY - replay)中较大的一个
    val defaultExtraCapacity = replay.coerceAtLeast(Channel.CHANNEL_DEFAULT_CAPACITY) - replay
    // 假如上游流是通道流,则与上游的通道流融合
    if (this is ChannelFlow) {
        // 获取上游流的上游,查看上游流是否能够在不依赖Channel的情况下履行
        val upstream = dropChannelOperators()
        // 假如不为空,阐明能够不依赖Channel
        if (upstream != null) {
            // 回来一个SharingConfig类型的目标
            return SharingConfig(
                // 上游流的上游
                upstream = upstream,
                // 核算extraBufferCapacity,依据通道流的容量进行判断
                extraBufferCapacity = when (capacity) {
                    // 假如容量为默许值或0
                    Channel.OPTIONAL_CHANNEL, Channel.BUFFERED, 0 ->
                        when {
                            // 假如溢出战略为挂起
                            onBufferOverflow == BufferOverflow.SUSPEND ->
                                // 假如容量为0,则回来0,不然回来extraBufferCapacity的默许值
                                if (capacity == 0) 0 else defaultExtraCapacity
                            // 假如replayCache的最大容量为0,一起溢出战略又不是挂起,
                            // 阐明至少需求一个缓存,回来1
                            replay == 0 -> 1
                            // 走到这儿阐明replayCache存在,且不需求挂起,回来0
                            else -> 0
                        }
                    // 容量为其他情况,回来通道流的容量
                    else -> capacity
                },
                // 通道流的溢出战略
                onBufferOverflow = onBufferOverflow,
                // 通道流的上下文
                context = context
            )
        }
    }
    // 假如上游不为通道流,会走到这儿
    return SharingConfig(
        // 上游的流
        upstream = this,
        // 默许的extraBufferCapacity
        extraBufferCapacity = defaultExtraCapacity,
        // 默许溢出战略为挂起
        onBufferOverflow = BufferOverflow.SUSPEND,
        // 空上下文
        context = EmptyCoroutineContext
    )
}

2.暖流协程的发动

launchSharing办法是CoroutineScope的扩展办法,用于发动暖流地点的协程,代码如下:

private fun <T> CoroutineScope.launchSharing(
    context: CoroutineContext,
    upstream: Flow<T>,
    shared: MutableSharedFlow<T>,
    started: SharingStarted,
    initialValue: T
) {
    // 依据指定的上下文发动一个新的协程
    launch(context) {
        // 依据暖流发动停止战略进行判断
        when {
            // Eagerly战略
            started === SharingStarted.Eagerly -> {
                // 触发上游的履行,并将暖流作为一个FlowCollector类型的目标
                upstream.collect(shared)
            }
            // Lazily战略
            started === SharingStarted.Lazily -> {
                // 监听订阅者数量
                // first用于回来上游发射的第一个满意条件的数据,即订阅者数量大于0
                // 因为subscriptionCount为暖流,因此在没有新数据时,会挂起当时协程
                shared.subscriptionCount.first { it > 0 }
                // 走到这儿,阐明订阅者数量大于0
                // 触发上游的履行,并将暖流作为一个FlowCollector类型的目标
                upstream.collect(shared)
            }
            // WhileSubscribed战略或者自界说战略
            else -> {
                // 调用command办法获取指令
                started.command(shared.subscriptionCount)
                    // 只要当时后指令产生改变时才会发射给下流 
                    .distinctUntilChanged()
                    // 触发上游流的履行
                    .collectLatest {
                        // 依据暖流控制指令进行判断
                        when (it) {
                            // 假如为发动指令,则触发上游的履行,并将暖流作为一个FlowCollector类型的目标
                            SharingCommand.START -> upstream.collect(shared)
                            // 假如为停止指令,什么都不做
                            SharingCommand.STOP -> { }
                            // 假如为停止并清空replayCache指令
                            SharingCommand.STOP_AND_RESET_REPLAY_CACHE -> {
                                // 假如暖流的类型为SharedFlow,即当时办法在shareIn办法中调用
                                if (initialValue === NO_VALUE) {
                                    // 调用resetReplayCache办法,铲除replayCache
                                    shared.resetReplayCache()
                                } else { // 假如暖流的类型为StateFlow,即当时办法在stateIn办法中调用
                                    // 设置状况为初始值
                                    shared.tryEmit(initialValue)
                                }
                            }
                        }
                    }
            }
        }
    }
}