在协程中,经过调用操作符shareIn与stateIn,能够将一个冷流转化成一个暖流,这两个办法的区别如下:
- shareIn:将一个冷流转化成一个标准的暖流——SharedFlow类型的目标。
- stateIn:将一个冷流转化成一个单数据更新的暖流——StateFlow类型的目标。
shareIn办法与stateIn办法的运用与完结的原理类似,下面以shareIn办法为例进行剖析。
一.shareIn办法的运用
shareIn办法用于将一个冷流转化成一个暖流目标,并在指定的协程效果域中依据不同的发动停止战略发动暖流,将上游发射的数据发射给下流的多个订阅者,代码如下:
public fun <T> Flow<T>.shareIn(
scope: CoroutineScope,
started: SharingStarted,
replay: Int = 0
): SharedFlow<T> {
...
}
shareIn办法共有三个参数,含义如下:
- scope:表明暖流发动的协程效果域。
-
started:暖流的发动停止战略,共三种:
- Eagerly:立刻发动,而且不会停止。
- Lazily:当第一个订阅者出现时发动,而且不会停止。
- WhileSubscribed:默许情况下,当第一个订阅者出现时发动,当最终一个订阅者消失时停止,并保存replayCache中的数据。
- replay:SharedFlow中replayCache的最大容量,有必要大于等于零。
1.典型场景的运用
shareIn办法用于当创建或获取一个冷流成本较高,一起还有多个订阅者需求获取冷流发射的数据的场景。比方:经过网络衔接获取数据,IO读取文件等,会耗费大量的时刻和设备资源,这时就能够运用shareIn办法,代码如下:
private suspend fun test() {
val flow = flow {
// 网络衔接
connectToNet()
try {
while (true) {
// 获取数据并发射
emit(getDataFromNet())
}
} finally {
// 断开衔接
disconnectFromNet()
}
}
// 不运用shareIn办法
// 10次网络衔接
for (i in 0..10)
launch {
flow.collect {
Log.d("liduo", "test: $it")
}
}
// 运用shareIn办法
val sharedFlow = flow.shareIn(GlobalScope, SharingStarted.Eagerly, 1)
// 1次网络衔接
for (i in 0..10)
launch {
sharedFlow.collect {
Log.d("liduo", "test1: $it")
}
}
}
2.调配操作符的运用
1)感知上游流的完毕
当上游流正常履行完毕完毕时,订阅者无法感知,因为暖流不会完毕。假如需求告诉订阅者上游的流履行完结,能够在shareIn操作符前运用onCompletion操作符,代码如下:
val sharedFlow = flow {
// 向下流发射100
emit(100)
// 向下流发射200
emit(200)
}.onCompletion {
// 假如不是因为反常而完毕
if (it == null)
// 发射0
emit(0)
else // 假如是因为反常完毕,则发射-1
emit(-1)
}.shareIn(GlobalScope, SharingStarted.Eagerly, 1)
2)感知上游流的反常
当上游的流产生反常导致暖流取消时,会直接被暖流地点的协程效果域处理,因此订阅者是没有感知的。假如需求告诉订阅者,能够在shareIn操作符前运用catch或retry操作符,代码如下:
val sharedFlow = flow {
// 向下流发射100
emit(100)
// 向下流发射200
emit(200)
}.catch {
// 产生反常,发射-1告诉下流
emit(-1)
}.shareIn(GlobalScope, SharingStarted.Eagerly, 1)
3)感知上游流的发动
当上游流发动时,订阅者是无法感知的。假如需求告诉订阅者,能够在shareIn操作符前运用onStart操作符,代码如下:
val sharedFlow = flow {
// 向下流发射100
emit(100)
// 向下流发射200
emit(200)
}.onStart {
// 向下流发射1
emit(1)
}.shareIn(GlobalScope, SharingStarted.Eagerly, 1)
4)显示指定缓存最大容量
shareIn操作符发动的暖流在独立的协程中运转,而且暖流中缓存数组的buffered values的最大容量为replay或CHANNEL_DEFAULT_CAPACITY中较大的。假如需求显示的指定buffered values的最大容量,能够在shareIn操作符前运用buffer或conflate操作符,运用规矩如下:
- buffer(0).shareIn(scope, started, 0):replay = 0,extraBufferCapacity = 0,没有缓存。
- buffer(b).shareIn(scope, started, r):replay = r,extraBufferCapacity = b。
- conflate().shareIn(scope, started, r):replay = r,onBufferOverflow = DROP_OLDEST,假如r等于0,则extraBufferCapacity = 1。
二.暖流的发动停止战略
在前面的介绍中,说到在shareIn办法中,暖流的发动停止战略界说在接口SharingStarted中,代码如下:
public interface SharingStarted {
public companion object {
// 立刻发动,而且不会停止
public val Eagerly: SharingStarted = StartedEagerly()
// 当第一个订阅者出现时发动,而且不会停止
public val Lazily: SharingStarted = StartedLazily()
// 默许情况下,当第一个订阅者出现时发动,
// 当最终一个订阅者消失时停止,并保存replayCache中的数据
// 参数stopTimeoutMillis表明当最终一个订阅者消失后多长时刻停止,默许为0——立刻停止
// 参数replayExpirationMillis表明在停止后多长时刻去铲除replayCache,默许为Int最大值——不铲除
@Suppress("FunctionName")
public fun WhileSubscribed(
stopTimeoutMillis: Long = 0,
replayExpirationMillis: Long = Long.MAX_VALUE
): SharingStarted =
StartedWhileSubscribed(stopTimeoutMillis, replayExpirationMillis)
}
// 中心接口办法
public fun command(subscriptionCount: StateFlow<Int>): Flow<SharingCommand>
}
暖流的三个发动停止战略分别对应StartedEagerly、StartedLazily、StartedWhileSubscribed这三个类的目标。除了三个发动停止战略外,接口中还界说了一个中心办法command,用于将SharedFlow类型目标的全局变量subscriptionCount,转化为泛型SharingCommand的Flow类型目标,实际上就是经过监听订阅者数量的改变来宣布不同的控制指令。
StartedEagerly、StartedLazily、StartedWhileSubscribed这三个类都完结了SharingStarted接口,偏重写了command办法。假如我们需求自界说一个新的发动停止战略,也能够经过完结SharingStarted接口重写command办法来完结。
1.暖流的控制指令
SharingCommand类是一个枚举类,界说了控制暖流的指令,代码如下:
public enum class SharingCommand {
// 发动暖流,并触发上游流的履行
START,
// 停止暖流,并取消上游流的履行
STOP,
// 停止暖流,并取消上游流的履行,一起将replayCache重置为初始状况
// 假如暖流的类型为StateFlow,则将replayCache重置为初始值
// 假如暖流的类型为SharedFlow,则调用resetReplayCache办法,清空replayCache
STOP_AND_RESET_REPLAY_CACHE
}
连续发射相同的指令不会有任何效果。先发射STOP指令,再发射START指令,能够触发暖流的重启,偏从头触发上游流的履行。
2.Eagerly战略的完结
Eagerly战略表明立刻发动暖流,而且不会停止,由StartedEagerly类完结,代码如下:
private class StartedEagerly : SharingStarted {
override fun command(subscriptionCount: StateFlow<Int>): Flow<SharingCommand> =
flowOf(SharingCommand.START)
override fun toString(): String = "SharingStarted.Eagerly"
}
...
public fun <T> flowOf(value: T): Flow<T> = flow {
emit(value)
}
Eagerly战略不关心订阅者的数量,在触发后直接向下流发射START指令。
3.Lazily战略的完结
Lazily战略表明当第一个订阅者出现时发动暖流,而且不会停止,由StartedLazily类完结,代码如下:
private class StartedLazily : SharingStarted {
override fun command(subscriptionCount: StateFlow<Int>): Flow<SharingCommand> = flow {
// 标志位,默许为false
var started = false
// 监听订阅者数量的改变
subscriptionCount.collect { count ->
// 假如订阅者数量大于0,且之前没有发射过指令
if (count > 0 && !started) {
// 设置标志位为true
started = true
// 发射START指令
emit(SharingCommand.START)
}
}
}
override fun toString(): String = "SharingStarted.Lazily"
}
Eagerly战略只要当订阅者数量大于0的时分,才会向下流发射START指令,而且只会发射一次。
4.WhileSubscribed战略的完结
WhileSubscribed战略默许情况下表明当第一个订阅者出现时发动暖流,并在最终一个订阅者消失时停止,保存replayCache中的数据,由StartedWhileSubscribed类完结,代码如下:
private class StartedWhileSubscribed(
private val stopTimeout: Long,
private val replayExpiration: Long
) : SharingStarted {
...
override fun command(subscriptionCount: StateFlow<Int>): Flow<SharingCommand> =
// 监听订阅者改变,并对上游发射的数据进行转化
subscriptionCount.transformLatest { count ->
// 假如订阅者数量大于0
if (count > 0) {
// 发射START指令
emit(SharingCommand.START)
} else { // 假如订阅者数量等于0
// 推迟指定的暖流停止时刻
delay(stopTimeout)
// 假如指定的铲除缓存时刻大于0
if (replayExpiration > 0) {
// 发射STOP指令
emit(SharingCommand.STOP)
// 推迟指定的铲除缓存时刻
delay(replayExpiration)
}
// 发射STOP_AND_RESET_REPLAY_CACHE指令
emit(SharingCommand.STOP_AND_RESET_REPLAY_CACHE)
}
} // 只要当START指令发射后,才会向下流发射
.dropWhile { it != SharingCommand.START }
.distinctUntilChanged()// 只要当时后指令不一起,才会向下流发射
...
}
WhileSubscribed战略在订阅者数量大于0的时分向下流发射START指令,在订阅者数量等于0的时分依据不同的推迟时刻参数向下流发射STOP指令和STOP_AND_RESET_REPLAY_CACHE指令。而且有必要先发射START指令,相邻重复的指令也不会被发射到下流。
三.shareIn办法的完结
代码如下:
public fun <T> Flow<T>.shareIn(
scope: CoroutineScope,
started: SharingStarted,
replay: Int = 0
): SharedFlow<T> {
// 核算暖流的参数
val config = configureSharing(replay)
// 创建一个类型为MutableSharedFlow的目标
val shared = MutableSharedFlow<T>(
replay = replay,
extraBufferCapacity = config.extraBufferCapacity,
onBufferOverflow = config.onBufferOverflow
)
// 在指定的协程效果域内发动暖流地点的协程
@Suppress("UNCHECKED_CAST")
scope.launchSharing(config.context, config.upstream, shared, started, NO_VALUE as T)
// 回来SharedFlow类型的目标,控制单一数据发射源
return shared.asSharedFlow()
}
在shareIn办法中,首要调用configureSharing办法,得到暖流的基本参数,这些参数会被封装成一个类型为SharingConfig的目标,代码如下:
private class SharingConfig<T>(
// 上游的流
@JvmField val upstream: Flow<T>,
// extraBufferCapacity参数
@JvmField val extraBufferCapacity: Int,
// 溢出战略
@JvmField val onBufferOverflow: BufferOverflow,
// 协程上下文
@JvmField val context: CoroutineContext
)
接下来,会依据核算出的参数,创建一个MutableSharedFlow类型的目标,并调用launchSharing办法发动暖流地点的协程,最终回来一个SharedFlow类型的目标。
1.暖流参数的核算
configureSharing办法是Flow的扩展办法,依据不同的战略核算出暖流的参数,代码如下:
// Flow的扩展办法
private fun <T> Flow<T>.configureSharing(replay: Int): SharingConfig<T> {
assert { replay >= 0 }
// 核算的extraBufferCapacity默许值
// 成果为0与(Channel.CHANNEL_DEFAULT_CAPACITY - replay)中较大的一个
val defaultExtraCapacity = replay.coerceAtLeast(Channel.CHANNEL_DEFAULT_CAPACITY) - replay
// 假如上游流是通道流,则与上游的通道流融合
if (this is ChannelFlow) {
// 获取上游流的上游,查看上游流是否能够在不依赖Channel的情况下履行
val upstream = dropChannelOperators()
// 假如不为空,阐明能够不依赖Channel
if (upstream != null) {
// 回来一个SharingConfig类型的目标
return SharingConfig(
// 上游流的上游
upstream = upstream,
// 核算extraBufferCapacity,依据通道流的容量进行判断
extraBufferCapacity = when (capacity) {
// 假如容量为默许值或0
Channel.OPTIONAL_CHANNEL, Channel.BUFFERED, 0 ->
when {
// 假如溢出战略为挂起
onBufferOverflow == BufferOverflow.SUSPEND ->
// 假如容量为0,则回来0,不然回来extraBufferCapacity的默许值
if (capacity == 0) 0 else defaultExtraCapacity
// 假如replayCache的最大容量为0,一起溢出战略又不是挂起,
// 阐明至少需求一个缓存,回来1
replay == 0 -> 1
// 走到这儿阐明replayCache存在,且不需求挂起,回来0
else -> 0
}
// 容量为其他情况,回来通道流的容量
else -> capacity
},
// 通道流的溢出战略
onBufferOverflow = onBufferOverflow,
// 通道流的上下文
context = context
)
}
}
// 假如上游不为通道流,会走到这儿
return SharingConfig(
// 上游的流
upstream = this,
// 默许的extraBufferCapacity
extraBufferCapacity = defaultExtraCapacity,
// 默许溢出战略为挂起
onBufferOverflow = BufferOverflow.SUSPEND,
// 空上下文
context = EmptyCoroutineContext
)
}
2.暖流协程的发动
launchSharing办法是CoroutineScope的扩展办法,用于发动暖流地点的协程,代码如下:
private fun <T> CoroutineScope.launchSharing(
context: CoroutineContext,
upstream: Flow<T>,
shared: MutableSharedFlow<T>,
started: SharingStarted,
initialValue: T
) {
// 依据指定的上下文发动一个新的协程
launch(context) {
// 依据暖流发动停止战略进行判断
when {
// Eagerly战略
started === SharingStarted.Eagerly -> {
// 触发上游的履行,并将暖流作为一个FlowCollector类型的目标
upstream.collect(shared)
}
// Lazily战略
started === SharingStarted.Lazily -> {
// 监听订阅者数量
// first用于回来上游发射的第一个满意条件的数据,即订阅者数量大于0
// 因为subscriptionCount为暖流,因此在没有新数据时,会挂起当时协程
shared.subscriptionCount.first { it > 0 }
// 走到这儿,阐明订阅者数量大于0
// 触发上游的履行,并将暖流作为一个FlowCollector类型的目标
upstream.collect(shared)
}
// WhileSubscribed战略或者自界说战略
else -> {
// 调用command办法获取指令
started.command(shared.subscriptionCount)
// 只要当时后指令产生改变时才会发射给下流
.distinctUntilChanged()
// 触发上游流的履行
.collectLatest {
// 依据暖流控制指令进行判断
when (it) {
// 假如为发动指令,则触发上游的履行,并将暖流作为一个FlowCollector类型的目标
SharingCommand.START -> upstream.collect(shared)
// 假如为停止指令,什么都不做
SharingCommand.STOP -> { }
// 假如为停止并清空replayCache指令
SharingCommand.STOP_AND_RESET_REPLAY_CACHE -> {
// 假如暖流的类型为SharedFlow,即当时办法在shareIn办法中调用
if (initialValue === NO_VALUE) {
// 调用resetReplayCache办法,铲除replayCache
shared.resetReplayCache()
} else { // 假如暖流的类型为StateFlow,即当时办法在stateIn办法中调用
// 设置状况为初始值
shared.tryEmit(initialValue)
}
}
}
}
}
}
}
}