引子
面试官:“请完成下面这个并发战略”。
这是一个获取最高价格的恳求战略。恳求价格的接口被分成了并发组(蓝色)和串行组(绿色)。并发即一切恳求一同宣布,串行组即仅当上一恳求回来才恳求下一个。并发组和串行组之间又是并发的关系,当并发组中一切恳求回来时会切断串行组,即停止串行组中后续恳求,然后选取一切回来价格中的最大值。串行组中每个恳求都有底价,当回来价格高于底价时则切断后续恳求,不然持续串行恳求直到比价成功或被并发组切断。
这是一道串并行网络恳求嵌套且有切断操作的杂乱问题。但假如运用 Kotlin 的 Flow,问题就变得反常简略。
关于 Kotlin Flow 根底概念、原理、运用的具体介绍能够点击下面的文章:
- Kotlin 进阶 | 异步数据流 Flow 的运用场景
- Kotlin 异步 | Flow 限流的运用场景及原理
- Kotlin 协程 | CoroutineContext 为什么要规划成 indexed set?(一)
模型的力气
Kotlin Flow 能极大地简化完成代码,是由于“模型的力气”。
好的模型总是以独特视角切入问题,以更准确有效的表达描述问题,终究降低了问题的杂乱性使其易于处理。
比方:3D 图画的旋转、平移、缩放是一个杂乱操作。但当把 3D 图画的点调集笼统为矩阵,就能通过矩阵的运算来表达这些杂乱操作。
Kotlin Flow 便是对多个连续异步进程的笼统模型,它将其理解为异步数据流,即一条时间轴上按序产生数据,其出产者和顾客之间一条管道,出产者从管道的一头刺进数据,顾客从另一头取数据。
同样地,为了简化面试题并发战略的完成逻辑,将网络恳求笼统为“数据类+挂起办法”:
// 数据类
data class Request(
val name: String, // 恳求称号
val delay: Long, // 响应时延
val price: Int, // 价格
val bottomPrice: Int = 0 // 底价
)
// 获取价格
private suspend fun load(request: Request): Request {
delay(request.delay)
return request
}
每个 Request 描述一个恳求,一次 load() 调用表明一次网络恳求。
这样一来,表达串/并行组就很省事了:
// 并行组
private val parallelList = listOf(
Request("parallel1", 1600, 10),
Request("parallel2", 2900, 20),
Request("parallel3", 2000, 11),
Request("parallel4", 3000, 30),
Request("parallel5", 5000, 50),
Request("parallel6", 10000, 60),
)
// 串行流(包含底价)
private val sequenceList = listOf(
Request("sequence1", 2100, 30, 30),
Request("sequence2", 1100, 19, 20),
Request("sequence3", 2000, 15, 16),
Request("sequence4", 2200, 7, 10),
Request("sequence5", 3000, 6, 5),
Request("sequence6", 2000, 5, 5),
Request("sequence7", 400, 5, 5),
)
安排串行流
串行恳求以列表形式存在,运用asFlow()
办法将其转换为 Flow:
val sequenceFlow: Flow<Request> = sequenceList.asFlow()
异步数据流 sequenceFlow 中活动的元素是 Request。
其间 asFlow() 是 Iterable 的扩展办法:
public fun <T> Iterable<T>.asFlow(): Flow<T> =
// 创立一个流
flow {
// 遍历列表并将其间每个元素串行地发送到流上
forEach { value ->
emit(value)
}
}
元素依照列表次序挨个发射,所以这是个串行流。
sequenceFlow 中每一个元素对应一个异步恳求,运用onEach()
办法即可完成该作用:
val sequenceFlow: Flow<Request> =
sequenceList.asFlow().onEach { load(it) }
其间 onEach() 的界说如下:
public fun <T> Flow<T>.onEach(action: suspend (T) -> Unit): Flow<T> =
transform { value ->
action(value)// 在发射数据前做一件事情
return@transform emit(value)// 总是将数据发送到下流
}
public inline fun <T, R> Flow<T>.transform(
crossinline transform: suspend FlowCollector<R>.(value: T) -> Unit
): Flow<R> =
// 构建下流流
flow {
// 搜集上游数据(这儿的逻辑在下流流被搜集的时候调用)
collect { value ->
// 处理上游数据
return@collect transform(value)
}
}
几乎一切 Flow 的扩展办法都运用阻拦转发机制完成,即创立一个新的流,它作为中心顾客会订阅上游数据并转发给下流。关于这方面更具体的解说能够点击Kotlin 进阶 | 异步数据流 Flow 的运用场景
当时场景中,下流想消费价格,运用map()
将流上数据进行改换:
val sequenceFlow: Flow<Request> =
sequenceList.asFlow().map { load(it).price }
在 map 之前,sequenceFlow 中活动的是 Request,map 之后就变成了价格。
现在仅仅界说了流,即指定了流中会输入什么数据,履行什么改变,但流并没有被发动,数据并不会自发地从上游流到下流,这种特性叫冷流。就好比你做了一个月后的旅行攻略,但现在还未踏上行程。
触发冷流活动的操作叫搜集:
scope.launch {
sequenceFlow.collect { price ->
// 价格终究会串行地流到这儿
}
}
但先不急着搜集串行流,由于它需要和并行流一同触发。
安排并行流
依葫芦画瓢,先将并行组转化为流:
val parallelFlow = parallelList.asFlow()
此时 parallelFlow 还是个串行流,调用flatMapMerge()
使其并发。该办法名指明晰要做三件事flat展平
、map改换
、merge合流
。
为啥做了这三件过后,就产生了并发?
这先得从“map改换”说起:
public fun <T, R> Flow<T>.flatMapMerge(
transform: suspend (value: T) -> Flow<R>//改换lambda
): Flow<R> = map(transform).flattenMerge()
flatMapMerge() 的完成先用 map() 将元素改换,然后用 flattenMerge() 将其展平再合并。改换被界说为(value: T) -> Flow<R>
,即将流中元素改换成一个新的流。这样就构成了嵌套流,从Flow<T>
到Flow<Flow<R>>
。
有了嵌套就需要展平,即将二维结构摊平到一维。
拿列表举例,比方List<List<Int>>
:
val lists = listOf(
listOf(1,2,3),
listOf(4,5,6)
)
Log.v("ttaylor","${lists.flatten()}") //[1, 2, 3, 4, 5, 6]
Log.v("ttaylor","${lists.flatMap { it.map { it+1 } }}") //[2, 3, 4, 5, 6, 7]
List.flat()
将两层嵌套结构变成单层结构,List.flatMap()
在展平的一同供给了改换内部 List 的时机。
而 flattenMerge() 便是将Flow<Flow<R>>
展平为Flow<R>
然后再将它们合流,最关键的魔法就发生在合流:
public fun <T> Flow<Flow<T>>.flattenMerge(
concurrency: Int = DEFAULT_CONCURRENCY
): Flow<T> {
return if (concurrency == 1) flattenConcat() else ChannelFlowMerge(this, concurrency)
}
当并发数大于1时,构建 ChannelFlowMerge 目标:
internal class ChannelFlowMerge<T>(
private val flow: Flow<Flow<T>>, // 嵌套流
private val concurrency: Int,
context: CoroutineContext = EmptyCoroutineContext,
capacity: Int = Channel.BUFFERED,
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
) : ChannelFlow<T>(context, capacity, onBufferOverflow) {
override suspend fun collectTo(scope: ProducerScope<T>) {
// 信号量控制并发数
val semaphore = Semaphore(concurrency)
// 终究搜集者
val collector = SendingCollector(scope)
val job: Job? = coroutineContext[Job]
// 搜集嵌套流
flow.collect { inner ->
job?.ensureActive()
semaphore.acquire()
// 为每一个内嵌流发动新协程并搜集之(并发根源)
scope.launch {
try {
// 一切内嵌流的元素都会发送给终究搜集者(merge)
inner.collect(collector)
} finally {
semaphore.release()
}
}
}
}
}
一切疑云都消散了:
- 为啥要展平?
由于有嵌套流。
- 为啥要将流中每个元素都转换为一个新的流构成嵌套?
由于每个新的流会在单独协程中被搜集,不同协程间是并发的。
- 改换,展平之后为啥要合流?
由于一切内嵌流都会将数据发送同一个搜集者,以便利下流消费数据。
理解了“展平改换合流”之后,当时场景就应做如下改换:
val parallelFlow = parallelList.asFlow().flatMapMerge {
request -> flow { emit(load(request).price) }
}
每个 Request 都被改换成一个流,该流会履行恳求并将价格发送给下流。
现在只需搜集 parallelFlow 就能并发地拿到价格:
scope.launch {
parallelFlow.collect { price ->
// 价格终究会并行地流到这儿
}
}
但先不急着搜集并发流,由于它需要和串行流一同触发。
串并合流、截流
串/并行流都已具备,是不是履行下面的操作就能完成题意:
scope.launch {
flowOf(sequenceFlow, parallelFlow)
.flattenMerge() // 串行组&并行组并发
.collect { // 获取并/串流中一切价格 }
}
sequenceFlow 和 parallelFlow 都是Flow<Int>
类型的流。flowOf() 在它们外面又套了一层,构成Flow<Flow<Int>>
嵌套结构,接着运用 flattenMerge() 将其展平合流,终究下流能够拿到串/并流中的一切价格。
漏了一个功能点:截流。
完成截流最简略的思路是:算出并发流的最高价,以此价格停止串行流。
求流中元素的最大值无异于求列表元素最大值:以暂时变量暂存最大值,遍历列表,将每个元素和最大值比较,若大于则更新暂时变量。
按此思路,新增扩展办法如下:
public suspend fun <T> Flow<T>.max(): T {
var max: Any? = NULL
// 搜集流
collect { value ->
if(value > max) max = value
}
return max as T
}
在 Flow 中,像 max() 这样的办法称为终端顾客,由于它是 Flow 最下流的搜集者。collect() 是 suspend 办法,它会挂起协程直到一切元素都被消费结束。
Kotlin 供给了丰厚的终端顾客,比方reduce()
也能够完成相同的作用:
public suspend fun <S, T : S> Flow<T>.reduce(
// 累加算法
operation: suspend (accumulator: S, value: T) -> S
): S {
var accumulator: Any? = NULL// 累加器
collect { value ->
// 累加每个元素
accumulator = if (accumulator !== NULL) {
operation(accumulator as S, value)
} else {
value
}
}
return accumulator as S
}
它的算法框架和 max 一模一样,仅仅完成了比取最大值稍杂乱的累加功能。
也能够用 reduce() 完成当时需求:
val parallelMaxPrice = parallelList.asFlow()
.flatMapMerge { request -> flow { emit(load(request).price) } }
.reduce { max, cur -> if (cur > max) cur else max }
对流运用终端顾客后,流就不再是流了,而变成一个值。就像上面的 parallelMaxPrice 是一个 Int 值。
这个值得持续活动起来,才干参加串行流的切断。
在 Flow 中一个屡试不爽的惯例操作便是:“阻拦转发”。
private val parallelMaxFlow = flow {
parallelList.asFlow()
.flatMapMerge { request -> flow { emit(load(request).price) } }
.reduce { max, cur -> if (cur > max) cur else max }
// reduce 的回来值是价格,在新流中发送之
.also { emit(it) }
}
这样一切并发组的恳求流外层就被套了一层新的流,活动的是并发流最高价格。
最终一个问题:怎么中止流?—— 抛反常。
internal actual class AbortFlowException actual constructor(
actual val owner: FlowCollector<*>
) : CancellationException("Flow was aborted, no more elements needed") {}
只需在 Flow 中抛出AbortFlowException
就可完成流的中止。它继承自 CancellationException,所以流的中止依赖于 suspend 办法的中止。关于这方面原理的解说能够点击裸辞-疫情-闭关-复习-大厂offer(一)
Kotlin 为中止流供给了一个更友爱的扩展办法transformWhile()
,套路依然是阻拦转发,即新建下流流,它出产数据的方式是通过搜集上游数据,并将数据转发到一个带有发射数据能力的 lambda 中,当时这个 lambda 有一个回来值,该值决议了是否要停止上游流数据的出产(便是否抛出反常)。关于该办法更具体的介绍能够点击Android 架构之 MVI 初级体 | Flow 替换 LiveData 重构数据链路
当时并/串行流中活动的都是价格,为了便利完成切断,在价分外再包一层:
data class RequestSwitch(val price: Int, val isDone: Boolean)
当 isDone = true 表明整个恳求链应该被停止。用 RequestSwitch 重构串/并行流:
// 串行流
val sequenceFlow = sequenceList.asFlow()
.map {
// 价格 >= 底价,则自行切断
val isDone = it.run { price >= bottomPrice }
RequestSwitch(load(it).price, isDone)
}.transformWhile {
// 总是将串行流中的价格转发到合并流,并且永不切断
emit(RequestSwitch(it.price, false))
!it.isDone // 为true表明串行流自行切断
}
// 并行流
val parallelFlow = flow {
parallelList.asFlow()
.flatMapMerge { request -> flow { emit(load(request).price) } }
.reduce { max, cur -> if (cur > max) cur else max }
// 并行流的 isDone = true 表明切断串行流
.also { emit(RequestSwitch(it, true)) }
}
// 消费合流
scope.launch {
val maxPrice = flowOf(sequenceFlow, parallelFlow)//串并合流
.flattenMerge()
.transformWhile {
emit(it.price) // 总是将上游价格转发至下流
!it.isDone // 这一行为true表明停止合流
}
.reduce { max, value -> if (value > max) value else max }
}
仅用了 31 行代码就表达了如此杂乱的并发战略。还未运用 java 完成过该功能,yy 一下,100 行左右?(欢迎 java 大佬在评论区给出完成计划)
附加题
面试官:“为获取最高价格战略全体设置一个超时,并且当一切恳求都失败时,回来-1。”
欢迎在评论区讨论各种完成计划,在评论区等你哦~
引荐阅览
- Kotlin 进阶 | 异步数据流 Flow 的运用场景
- Kotlin 异步 | Flow 限流的运用场景及原理
- Kotlin 协程 | CoroutineContext 为什么要规划成 indexed set?(一)
- 面试题 | 等候多个异步使命的结果
- Android 架构之 MVI 初级体 | Flow 替换 LiveData 重构数据链路