引子

面试官:“请完成下面这个并发战略”。

面试题 | 异步任务的串并行嵌套及截断
这是一个获取最高价格的恳求战略。恳求价格的接口被分成了并发组(蓝色)和串行组(绿色)。并发即一切恳求一同宣布,串行组即仅当上一恳求回来才恳求下一个。并发组和串行组之间又是并发的关系,当并发组中一切恳求回来时会切断串行组,即停止串行组中后续恳求,然后选取一切回来价格中的最大值。串行组中每个恳求都有底价,当回来价格高于底价时则切断后续恳求,不然持续串行恳求直到比价成功或被并发组切断。

这是一道串并行网络恳求嵌套且有切断操作的杂乱问题。但假如运用 Kotlin 的 Flow,问题就变得反常简略。

关于 Kotlin Flow 根底概念、原理、运用的具体介绍能够点击下面的文章:

  • Kotlin 进阶 | 异步数据流 Flow 的运用场景
  • Kotlin 异步 | Flow 限流的运用场景及原理
  • Kotlin 协程 | CoroutineContext 为什么要规划成 indexed set?(一)

模型的力气

Kotlin Flow 能极大地简化完成代码,是由于“模型的力气”。

好的模型总是以独特视角切入问题,以更准确有效的表达描述问题,终究降低了问题的杂乱性使其易于处理。

比方:3D 图画的旋转、平移、缩放是一个杂乱操作。但当把 3D 图画的点调集笼统为矩阵,就能通过矩阵的运算来表达这些杂乱操作。

Kotlin Flow 便是对多个连续异步进程的笼统模型,它将其理解为异步数据流,即一条时间轴上按序产生数据,其出产者和顾客之间一条管道,出产者从管道的一头刺进数据,顾客从另一头取数据。

同样地,为了简化面试题并发战略的完成逻辑,将网络恳求笼统为“数据类+挂起办法”:

// 数据类
data class Request(
    val name: String, // 恳求称号
    val delay: Long, // 响应时延
    val price: Int, // 价格
    val bottomPrice: Int = 0 // 底价
)
// 获取价格
private suspend fun load(request: Request): Request {
    delay(request.delay)
    return request
}

每个 Request 描述一个恳求,一次 load() 调用表明一次网络恳求。

这样一来,表达串/并行组就很省事了:

// 并行组
private val parallelList = listOf(
    Request("parallel1", 1600, 10),
    Request("parallel2", 2900, 20),
    Request("parallel3", 2000, 11),
    Request("parallel4", 3000, 30),
    Request("parallel5", 5000, 50),
    Request("parallel6", 10000, 60),
)
// 串行流(包含底价)
private val sequenceList = listOf(
    Request("sequence1", 2100, 30, 30),
    Request("sequence2", 1100, 19, 20),
    Request("sequence3", 2000, 15, 16),
    Request("sequence4", 2200, 7, 10),
    Request("sequence5", 3000, 6, 5),
    Request("sequence6", 2000, 5, 5),
    Request("sequence7", 400, 5, 5),
)

安排串行流

串行恳求以列表形式存在,运用asFlow()办法将其转换为 Flow:

val sequenceFlow: Flow<Request> = sequenceList.asFlow()

异步数据流 sequenceFlow 中活动的元素是 Request。

其间 asFlow() 是 Iterable 的扩展办法:

public fun <T> Iterable<T>.asFlow(): Flow<T> =
    // 创立一个流
    flow {
        // 遍历列表并将其间每个元素串行地发送到流上
        forEach { value ->
            emit(value)
        }
    }

元素依照列表次序挨个发射,所以这是个串行流。

sequenceFlow 中每一个元素对应一个异步恳求,运用onEach()办法即可完成该作用:

val sequenceFlow: Flow<Request> =
    sequenceList.asFlow().onEach { load(it) }

其间 onEach() 的界说如下:

public fun <T> Flow<T>.onEach(action: suspend (T) -> Unit): Flow<T> =
transform { value ->
    action(value)// 在发射数据前做一件事情
    return@transform emit(value)// 总是将数据发送到下流
}
public inline fun <T, R> Flow<T>.transform(
    crossinline transform: suspend FlowCollector<R>.(value: T) -> Unit
): Flow<R> = 
    // 构建下流流
    flow {
        // 搜集上游数据(这儿的逻辑在下流流被搜集的时候调用)
        collect { value ->
            // 处理上游数据
            return@collect transform(value)
        }
}

几乎一切 Flow 的扩展办法都运用阻拦转发机制完成,即创立一个新的流,它作为中心顾客会订阅上游数据并转发给下流。关于这方面更具体的解说能够点击Kotlin 进阶 | 异步数据流 Flow 的运用场景

当时场景中,下流想消费价格,运用map()将流上数据进行改换:

val sequenceFlow: Flow<Request> =
    sequenceList.asFlow().map { load(it).price }

在 map 之前,sequenceFlow 中活动的是 Request,map 之后就变成了价格。

现在仅仅界说了流,即指定了流中会输入什么数据,履行什么改变,但流并没有被发动,数据并不会自发地从上游流到下流,这种特性叫冷流。就好比你做了一个月后的旅行攻略,但现在还未踏上行程。

触发冷流活动的操作叫搜集

scope.launch {
    sequenceFlow.collect { price ->
        // 价格终究会串行地流到这儿
    }
}

但先不急着搜集串行流,由于它需要和并行流一同触发。

安排并行流

依葫芦画瓢,先将并行组转化为流:

val parallelFlow = parallelList.asFlow()

此时 parallelFlow 还是个串行流,调用flatMapMerge()使其并发。该办法名指明晰要做三件事flat展平map改换merge合流

为啥做了这三件过后,就产生了并发?

这先得从“map改换”说起:

public fun <T, R> Flow<T>.flatMapMerge(
    transform: suspend (value: T) -> Flow<R>//改换lambda
): Flow<R> = map(transform).flattenMerge()

flatMapMerge() 的完成先用 map() 将元素改换,然后用 flattenMerge() 将其展平再合并。改换被界说为(value: T) -> Flow<R>,即将流中元素改换成一个新的流。这样就构成了嵌套流,从Flow<T>Flow<Flow<R>>

有了嵌套就需要展平,即将二维结构摊平到一维。

拿列表举例,比方List<List<Int>>

val lists = listOf(
    listOf(1,2,3),
    listOf(4,5,6)
)
Log.v("ttaylor","${lists.flatten()}") //[1, 2, 3, 4, 5, 6]
Log.v("ttaylor","${lists.flatMap { it.map { it+1 } }}") //[2, 3, 4, 5, 6, 7]

List.flat()将两层嵌套结构变成单层结构,List.flatMap()在展平的一同供给了改换内部 List 的时机。

而 flattenMerge() 便是将Flow<Flow<R>>展平为Flow<R>然后再将它们合流,最关键的魔法就发生在合流:

public fun <T> Flow<Flow<T>>.flattenMerge(
    concurrency: Int = DEFAULT_CONCURRENCY
): Flow<T> {
    return if (concurrency == 1) flattenConcat() else ChannelFlowMerge(this, concurrency)
}

当并发数大于1时,构建 ChannelFlowMerge 目标:

internal class ChannelFlowMerge<T>(
    private val flow: Flow<Flow<T>>, // 嵌套流
    private val concurrency: Int,
    context: CoroutineContext = EmptyCoroutineContext,
    capacity: Int = Channel.BUFFERED,
    onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
) : ChannelFlow<T>(context, capacity, onBufferOverflow) {
    override suspend fun collectTo(scope: ProducerScope<T>) {
        // 信号量控制并发数
        val semaphore = Semaphore(concurrency)
        // 终究搜集者
        val collector = SendingCollector(scope)
        val job: Job? = coroutineContext[Job]
        // 搜集嵌套流
        flow.collect { inner ->
            job?.ensureActive()
            semaphore.acquire()
            // 为每一个内嵌流发动新协程并搜集之(并发根源)
            scope.launch {
                try {
                    // 一切内嵌流的元素都会发送给终究搜集者(merge)
                    inner.collect(collector)
                } finally {
                    semaphore.release()
                }
            }
        }
    }
}

一切疑云都消散了:

  • 为啥要展平?

由于有嵌套流。

  • 为啥要将流中每个元素都转换为一个新的流构成嵌套?

由于每个新的流会在单独协程中被搜集,不同协程间是并发的。

  • 改换,展平之后为啥要合流?

由于一切内嵌流都会将数据发送同一个搜集者,以便利下流消费数据。

理解了“展平改换合流”之后,当时场景就应做如下改换:

val parallelFlow = parallelList.asFlow().flatMapMerge {
    request -> flow { emit(load(request).price) }
}

每个 Request 都被改换成一个流,该流会履行恳求并将价格发送给下流。

现在只需搜集 parallelFlow 就能并发地拿到价格:

scope.launch {
    parallelFlow.collect { price ->
        // 价格终究会并行地流到这儿
    }
}

但先不急着搜集并发流,由于它需要和串行流一同触发。

串并合流、截流

串/并行流都已具备,是不是履行下面的操作就能完成题意:

scope.launch {
    flowOf(sequenceFlow, parallelFlow)
        .flattenMerge() // 串行组&并行组并发
        .collect { // 获取并/串流中一切价格 }
}

sequenceFlow 和 parallelFlow 都是Flow<Int>类型的流。flowOf() 在它们外面又套了一层,构成Flow<Flow<Int>>嵌套结构,接着运用 flattenMerge() 将其展平合流,终究下流能够拿到串/并流中的一切价格。

漏了一个功能点:截流。

完成截流最简略的思路是:算出并发流的最高价,以此价格停止串行流。

求流中元素的最大值无异于求列表元素最大值:以暂时变量暂存最大值,遍历列表,将每个元素和最大值比较,若大于则更新暂时变量。

按此思路,新增扩展办法如下:

public suspend fun <T> Flow<T>.max(): T {
    var max: Any? = NULL
    // 搜集流
    collect { value ->
        if(value > max) max = value
    }
    return max as T
}

在 Flow 中,像 max() 这样的办法称为终端顾客,由于它是 Flow 最下流的搜集者。collect() 是 suspend 办法,它会挂起协程直到一切元素都被消费结束。

Kotlin 供给了丰厚的终端顾客,比方reduce()也能够完成相同的作用:

public suspend fun <S, T : S> Flow<T>.reduce(
    // 累加算法
    operation: suspend (accumulator: S, value: T) -> S
): S {
    var accumulator: Any? = NULL// 累加器
    collect { value ->
        // 累加每个元素
        accumulator = if (accumulator !== NULL) {
            operation(accumulator as S, value)
        } else {
            value
        }
    }
    return accumulator as S
}

它的算法框架和 max 一模一样,仅仅完成了比取最大值稍杂乱的累加功能。

也能够用 reduce() 完成当时需求:

val parallelMaxPrice = parallelList.asFlow()
    .flatMapMerge { request -> flow { emit(load(request).price) } }
    .reduce { max, cur -> if (cur > max) cur else max }

对流运用终端顾客后,流就不再是流了,而变成一个值。就像上面的 parallelMaxPrice 是一个 Int 值。

这个值得持续活动起来,才干参加串行流的切断。

在 Flow 中一个屡试不爽的惯例操作便是:“阻拦转发”。

private val parallelMaxFlow = flow {
    parallelList.asFlow()
        .flatMapMerge { request -> flow { emit(load(request).price) } }
        .reduce { max, cur -> if (cur > max) cur else max }
        // reduce 的回来值是价格,在新流中发送之
        .also { emit(it) }
}

这样一切并发组的恳求流外层就被套了一层新的流,活动的是并发流最高价格。

最终一个问题:怎么中止流?—— 抛反常。

internal actual class AbortFlowException actual constructor(
    actual val owner: FlowCollector<*>
) : CancellationException("Flow was aborted, no more elements needed") {}

只需在 Flow 中抛出AbortFlowException就可完成流的中止。它继承自 CancellationException,所以流的中止依赖于 suspend 办法的中止。关于这方面原理的解说能够点击裸辞-疫情-闭关-复习-大厂offer(一)

Kotlin 为中止流供给了一个更友爱的扩展办法transformWhile(),套路依然是阻拦转发,即新建下流流,它出产数据的方式是通过搜集上游数据,并将数据转发到一个带有发射数据能力的 lambda 中,当时这个 lambda 有一个回来值,该值决议了是否要停止上游流数据的出产(便是否抛出反常)。关于该办法更具体的介绍能够点击Android 架构之 MVI 初级体 | Flow 替换 LiveData 重构数据链路

当时并/串行流中活动的都是价格,为了便利完成切断,在价分外再包一层:

data class RequestSwitch(val price: Int, val isDone: Boolean)

当 isDone = true 表明整个恳求链应该被停止。用 RequestSwitch 重构串/并行流:

// 串行流
val sequenceFlow = sequenceList.asFlow()
    .map {
        // 价格 >= 底价,则自行切断
        val isDone = it.run { price >= bottomPrice }
        RequestSwitch(load(it).price, isDone)
    }.transformWhile {
        // 总是将串行流中的价格转发到合并流,并且永不切断
        emit(RequestSwitch(it.price, false))
        !it.isDone // 为true表明串行流自行切断
    }
// 并行流
val parallelFlow = flow {
    parallelList.asFlow()
        .flatMapMerge { request -> flow { emit(load(request).price) } }
        .reduce { max, cur -> if (cur > max) cur else max }
        // 并行流的 isDone = true 表明切断串行流
        .also { emit(RequestSwitch(it, true)) }
}
// 消费合流
scope.launch {
    val maxPrice = flowOf(sequenceFlow, parallelFlow)//串并合流
        .flattenMerge()
        .transformWhile {
            emit(it.price) // 总是将上游价格转发至下流
            !it.isDone // 这一行为true表明停止合流
        }
        .reduce { max, value -> if (value > max) value else max }
}

仅用了 31 行代码就表达了如此杂乱的并发战略。还未运用 java 完成过该功能,yy 一下,100 行左右?(欢迎 java 大佬在评论区给出完成计划)

附加题

面试官:“为获取最高价格战略全体设置一个超时,并且当一切恳求都失败时,回来-1。”

欢迎在评论区讨论各种完成计划,在评论区等你哦~

引荐阅览

  • Kotlin 进阶 | 异步数据流 Flow 的运用场景
  • Kotlin 异步 | Flow 限流的运用场景及原理
  • Kotlin 协程 | CoroutineContext 为什么要规划成 indexed set?(一)
  • 面试题 | 等候多个异步使命的结果
  • Android 架构之 MVI 初级体 | Flow 替换 LiveData 重构数据链路