开启成长之旅!这是我参与「日新方案 12 月更文应战」的第6天,点击检查活动概况

flow介绍

挂起函数能够异步回来单个值,那如何异步屡次回来多个值呢? 运用flow,flow的特点:

  • flow{…}块中的代码能够挂起
  • 运用flow,suspend修饰符能够省略
  • 流运用emit函数发射值
  • 流运用collect的函数搜集值
  • flow相似冷流,flow中代码直到流被搜集(调用collect)的时分才运转,相似lazy,什么时分用,什么时分履行。
  • 流的连续性:流搜集都是按顺序搜集的
  • flowOn可更改流发射的上下文,即能够指定在主线程或子线程中履行
  • 与之相对的是暖流,咱们行将介绍的 StateFlow 和 SharedFlow 是暖流,在垃圾回收之前,都是存在内存之中,并且处于活泼状况的。
   //运用flow,suspend修饰符能够省略
    fun doflow() = flow<Int> {
        for (i in 1..5) {
            //这里是挂起,不是堵塞
            delay(500)
            emit(i)
        }
    }.flowOn(Dispatchers.IO)
//调用
  runBlocking {
            doflow().collect {
                log("value=$it")
            }
        }
打印(屡次回来多个值)
com.z.zjetpack V/zx: value=1
com.z.zjetpack V/zx: value=2
com.z.zjetpack V/zx: value=3
com.z.zjetpack V/zx: value=4
com.z.zjetpack V/zx: value=5

flow的运用场景

文件下载场景

    //正在下载(文件总巨细为5)
     fun doflow() = flow<Double> {
        for (i in 1..5) {
            delay(500)
            emit(i.toDouble())
        }
     //flowOn来指定在IO线程中下载
    }.flowOn(Dispatchers.IO)
//读取进展
 runBlocking {
            doflow().collect {
                log("当时下载=${it / 5 * 100}%")
            }
        }
打印:
com.z.zjetpack V/zx: 当时下载=20.0%
com.z.zjetpack V/zx: 当时下载=40.0%
com.z.zjetpack V/zx: 当时下载=60.0%
com.z.zjetpack V/zx: 当时下载=80.0%
com.z.zjetpack V/zx: 当时下载=100.0%

流构建器

flowof 和asflow

 runBlocking {
            flowOf(1, 2, 3)
                .onEach { delay(500) }
                .collect {
                    log("value = $it")
                }
            (5..8).asFlow()
                .onEach { delay(500) }
                .collect {
                    log("value = $it")
                }
        }

运用launchin替换collect在单独的协程中发动搜集流。

        fun event() = (1..3)
        .asFlow()
        .onEach {
            delay(500)
        }.flowOn(Dispatchers.IO)
//调用
        runBlocking {
            val job =   event().onEach {
                log("value = $it")
            }.launchIn(CoroutineScope(Dispatchers.IO))
            //主线程可用this
            //.launchIn(this)
            job.join()
        }

流的撤销

超时的时分撤销

    fun cancelFlow() = flow<Int> {
        for (i in 1..5) {
            delay(1000)
            emit(i)
        }
    }
//调用
        runBlocking {
            //超时的时分撤销流
            withTimeoutOrNull(2500) {
                cancelFlow().collect {
                    log("value = $it")
                }
            }
        }
打印:在2.5秒的时分超时了,撤销了
com.z.zjetpack V/zx: value = 1
com.z.zjetpack V/zx: value = 2

直接撤销

        runBlocking {
                cancelFlow().collect {
                    log("value = $it")
                    if(it == 3){
                        cancel()
                    }
                }
        }

繁忙的使命是不能直接撤销的,需求检测撤销(cancellable)

        runBlocking {
                (1..5).asFlow().cancellable().collect {
                    if(it == 3) {
                        cancel()
                    }
                }
        }

背压:生产者功率 > 消费者功率

Kotlin 之 协程(三)Flow异步流
运用缓冲和flowon来处理背压

buffer():并发运转流中发射元素的代码 conflate():兼并发射项,不对每个值处理 collectLatest():撤销偏重新发送最终一个值

模拟背压代码:

    fun preFlow() = flow<Int> {
        for (i in 1..5) {
            delay(100)
            emit(i)
            log("发送$i")
        }
    }
//调用
        //100ms发送一次,300ms接纳一次就产生了背压
        runBlocking {
            val time = measureTimeMillis {
                preFlow()
                    //buffer能够增加缓冲,提高功率
                    //.buffer(100)
                    //flowOn自带缓冲功用
                    //.flowOn(Dispatchers.IO)
                    //conflate不对每个值处理
                    //.conflate()
                    //.collect
                    //撤销偏重新发送最终一个值
                    .collectLatest {
                        delay(300)
                        log("接纳到:$it")
                    }
            }
            log("总耗时 $time")
        }
打印:
com.z.zjetpack V/zx: 接纳到:1
com.z.zjetpack V/zx: 发送1
com.z.zjetpack V/zx: 接纳到:2
com.z.zjetpack V/zx: 发送2
com.z.zjetpack V/zx: 接纳到:3
com.z.zjetpack V/zx: 发送3
com.z.zjetpack V/zx: 接纳到:4
com.z.zjetpack V/zx: 发送4
com.z.zjetpack V/zx: 接纳到:5
com.z.zjetpack V/zx: 发送5
com.z.zjetpack V/zx: 总耗时 2033
运用buffer后
com.z.zjetpack V/zx: 发送1
com.z.zjetpack V/zx: 发送2
com.z.zjetpack V/zx: 发送3
com.z.zjetpack V/zx: 接纳到:1
com.z.zjetpack V/zx: 发送4
com.z.zjetpack V/zx: 发送5
com.z.zjetpack V/zx: 接纳到:2
com.z.zjetpack V/zx: 接纳到:3
com.z.zjetpack V/zx: 接纳到:4
com.z.zjetpack V/zx: 接纳到:5
com.z.zjetpack V/zx: 总耗时 1634
运用flowOn后
com.z.zjetpack V/zx: 发送1
com.z.zjetpack V/zx: 发送2
com.z.zjetpack V/zx: 发送3
com.z.zjetpack V/zx: 接纳到:1
com.z.zjetpack V/zx: 发送4
com.z.zjetpack V/zx: 发送5
com.z.zjetpack V/zx: 接纳到:2
com.z.zjetpack V/zx: 接纳到:3
com.z.zjetpack V/zx: 接纳到:4
com.z.zjetpack V/zx: 接纳到:5
com.z.zjetpack V/zx: 总耗时 1639
运用conflate后
com.z.zjetpack V/zx: 发送1
com.z.zjetpack V/zx: 发送2
com.z.zjetpack V/zx: 发送3
com.z.zjetpack V/zx: 接纳到:1
com.z.zjetpack V/zx: 发送4
com.z.zjetpack V/zx: 发送5
com.z.zjetpack V/zx: 接纳到:3
com.z.zjetpack V/zx: 接纳到:5
com.z.zjetpack V/zx: 总耗时 1034
运用collectLatest后
com.z.zjetpack V/zx: 发送1
com.z.zjetpack V/zx: 发送2
com.z.zjetpack V/zx: 发送3
com.z.zjetpack V/zx: 发送4
com.z.zjetpack V/zx: 发送5
com.z.zjetpack V/zx: 接纳到:5
com.z.zjetpack V/zx: 总耗时 843

操作符

转化操作符:map ,transform 限长操作符:取指定数量,take 末端操作符:末端操作符用于发动流搜集的挂起函数,collect,tolist,toset,reduce,fold 组合操作符:zip 展平操作符:flatMapConcat(连接),flatMapMerge(兼并),flatMapLatest(最新)

map

    suspend fun perRequest(req: Int): String {
        delay(1000)
        return "转化 $req"
    }
        runBlocking {
            (1..3).asFlow().map {
                perRequest(it)
            }.collect {
                log(it)
            }
      }
打印:
com.z.zjetpack V/zx: 转化 1
com.z.zjetpack V/zx: 转化 2
com.z.zjetpack V/zx: 转化 3

transform

 runBlocking {
(5..6).asFlow().transform {
                emit("s $it")
                emit(perRequest(it))
                emit("e $it")
            }
                //.take(4)
                .collect {
                    log(it)
                }
      }
打印:
com.z.zjetpack V/zx: s 5
com.z.zjetpack V/zx: 转化 5
com.z.zjetpack V/zx: e 5
com.z.zjetpack V/zx: s 6
com.z.zjetpack V/zx: 转化 6
com.z.zjetpack V/zx: e 6

take

加上take之后
com.z.zjetpack V/zx: s 5
com.z.zjetpack V/zx: 转化 5
com.z.zjetpack V/zx: e 5
com.z.zjetpack V/zx: s 6

末端操作符:collect,tolist,toset,reduce,fold

 runBlocking {
            val sum = (1..5).asFlow().map { it * it }.reduce { a, b -> a + b }
            log("sum = $sum")
            val nList = (1..5).asFlow().toList()
            log("nList = $nList")
            val nSet = listOf(1, 2, 2, 3, 3, 5).asFlow().toSet()
            log("nSet = $nSet")
       }
打印:
com.z.zjetpack V/zx: sum = 55
com.z.zjetpack V/zx: nList = [1, 2, 3, 4, 5]
com.z.zjetpack V/zx: nSet = [1, 2, 3, 5]

展平操作符

只运用map的时分

	//回来值是一个flow
    fun reqFlow(i: Int) = flow<String> {
        emit("start $i")
        delay(500)
        emit("end $i")
    }
 runBlocking {
     (0..1).asFlow().map {
                reqFlow(it)
            }.collect {
                log("初次collect = $it")
                it.collect {
                    log("二次 = $it")
                }
            }
   }
打印:由于回来是flow所以需求collect 两次才能拿到值,Flow<Flow<String>>
com.z.zjetpack V/zx: 初次collect = kotlinx.coroutines.flow.SafeFlow@63db1bf
com.z.zjetpack V/zx: 二次 = start 0
com.z.zjetpack V/zx: 二次 = end 0
com.z.zjetpack V/zx: 初次collect = kotlinx.coroutines.flow.SafeFlow@d27108c
com.z.zjetpack V/zx: 二次 = start 1
com.z.zjetpack V/zx: 二次 = end 1

flatMapConcat

 runBlocking {
    (0..1).asFlow().flatMapConcat {
                reqFlow(it)
            }.collect {
                log("初次collect = $it")
            }
      }
打印:直接展开了
com.z.zjetpack V/zx: 初次collect = start 0
com.z.zjetpack V/zx: 初次collect = end 0
com.z.zjetpack V/zx: 初次collect = start 1
com.z.zjetpack V/zx: 初次collect = end 1
 runBlocking {
  (0..1).asFlow().flatMapMerge {
                reqFlow(it)
            }.collect {
                log("初次collect = $it")
            }
      }
打印:
com.z.zjetpack V/zx: 初次collect = start 0
com.z.zjetpack V/zx: 初次collect = start 1
com.z.zjetpack V/zx: 初次collect = end 0
com.z.zjetpack V/zx: 初次collect = end 1

flatMapLatest

 runBlocking {
            (0..1).asFlow().flatMapLatest {
                reqFlow(it)
            }.collect {
                log("初次collect = $it")
            }
    }
打印:
com.z.zjetpack V/zx: 初次collect = start 0
com.z.zjetpack V/zx: 初次collect = start 1
com.z.zjetpack V/zx: 初次collect = end 1

流的反常处理

catch函数 和 try catch

            flow {
                emit(1)
                throw NullPointerException()
                //catch函数只捕获上游的反常
            }.catch {
                log("exception $it")
                //在反常后恢复
                emit(20)
            }.flowOn(Dispatchers.IO)
                .collect {
                    log("msg $it")
                }
打印:
com.z.zjetpack V/zx: exception java.lang.NullPointerException
com.z.zjetpack V/zx: msg 1
com.z.zjetpack V/zx: msg 20
            //不主张经过这种办法捕获上游的反常,违反了flow准则,这种适合捕获下流的反常
            try {
                (1..3).asFlow().collect {
                    check(it > 2) {
                        "ex $it"
                    }
                }
            } catch (e: Exception) {
                log("反常 $e")
            }
打印:
com.z.zjetpack V/zx: 反常 java.lang.IllegalStateException: ex 1

流的完结

finally 和 onCompletion

try {
                (1..3).asFlow().collect {
                    check(it > 2) {
                        "ex $it"
                    }
                }
            } catch (e: Exception) {
                log("反常 $e")
            } finally {
                log("流已完结")
            }
            //产生反常onCompletion能够拿到反常信息,但不会捕获
            try {
                (1..3).asFlow().onCompletion {
                    log("onCompletion $it")
                }.collect {
                    check(it > 2) {
                        "ex $it"
                    }
                }
            } catch (e: Exception) {
                log("反常 $e")
            }
打印:
com.z.zjetpack V/zx: 反常 java.lang.IllegalStateException: ex 1
com.z.zjetpack V/zx: 流已完结
com.z.zjetpack V/zx: onCompletion java.lang.IllegalStateException: ex 1
com.z.zjetpack V/zx: 反常 java.lang.IllegalStateException: ex 1

StateFlow

StateFlow 是一个状况容器式可观察数据流,能够向其搜集器宣布当时状况更新和新状况更新。

  1. StateFlow运用 榜首步:创建 MutableStateFlow 并设置初始化的值。
class MainViewModel : ViewModel() {
    val selected = MutableStateFlow<Boolean>(false)
}

第二步:同 Flow 相同,运用 collect 办法:

lifecycleScope.launch {
    viewModel.selected.collect {
        // ... 引起UI产生的改变
        // 比方 某个按钮是否选中状况
    }
}

第三步:能够给 selected设置值,从而引起 Ui 层的改变:

class MainViewModel : ViewModel() {
    val selected = MutableStateFlow<Boolean>(false)
    fun doSomeThing(value: Boolean) {
        selected.value = value
    }
}

一般的 Flow,是不具备 selected.value = value 这种能力的

StateFlow 和 LiveData 有什么差异? 有两点差异:

榜首点,StateFlow 必须有初始值,LiveData 不需求。 第二点,当 View 变为 STOPPED 状况时,LiveData.observe() 会主动撤销注册运用方,而从 StateFlow 或任何其他数据流搜集数据则不会撤销注册运用方。 关于 StateFlow 在界面毁掉的时仍处于活泼状况,有两种解决办法:

运用 ktx 将 Flow 转化为 LiveData。 在界面毁掉的时分,手动撤销(这很容易被忘记)。

class LatestNewsActivity : AppCompatActivity() {
    ...
    // Coroutine listening for UI states
    private var uiStateJob: Job? = null
    override fun onStart() {
        super.onStart()
        // Start collecting when the View is visible
        uiStateJob = lifecycleScope.launch {
            latestNewsViewModel.uiState.collect { uiState -> ... }
        }
    }
    override fun onStop() {
        // Stop collecting when the View goes to the background
        uiStateJob?.cancel()
        super.onStop()
    }
}

SharedFlow

SharedFlow:数据共享,有点相似广播 和 StateFlow 相同,SharedFlow 也是暖流,它能够将已发送过的数据发送给新的订阅者,并且具有高的装备性。

  1. SharedFlow运用场景 总的来说,SharedFlow 和 StateFlow 相似,他们都是暖流,都能够用来存储状况,但 SharedFlow 装备灵敏。

当你有如下场景时,需求运用 SharedFlow:

产生订阅时,需求将过去已经更新的n个值,同步给新的订阅者。 装备缓存战略。 2. SharedFlow的运用 简略写一个 Demo吧。

榜首步:创建一个 MutableSharedFlow,对应的参数解说在注释中

class MainViewModel : ViewModel() {
    val sharedFlow = MutableSharedFlow<Int>(
        5 // 参数一:当新的订阅者Collect时,发送几个已经发送过的数据给它
        , 3 // 参数二:减去replay,MutableSharedFlow还缓存多少数据
        , BufferOverflow.DROP_OLDEST // 参数三:缓存战略,三种 丢掉最新值、丢掉最旧值和挂起
    )
}

第二步:运用emit或者tryEmit办法

class MainViewModel : ViewModel() {
    val sharedFlow = MutableSharedFlow<Int>(
        // ....
    )
    // 初始化时调用
    init {
        for (i in 0..10) {
            sharedFlow.tryEmit(i)
        }
    }
    // 在按钮中调用
    fun doAsClick() {
        for (i in 11..20) {
            sharedFlow.tryEmit(i)
        }
    }
}

当 MutableSharedFlow 中缓存数据量超过阈值时,emit 办法和 tryEmit 办法的处理办法会有不同:

emit 办法:当缓存战略为 BufferOverflow.SUSPEND 时,emit 办法会挂起,直到有新的缓存空间。 tryEmit 办法:tryEmit 会回来一个 Boolean 值,true 代表传递成功,false 代表会产生一个回调,让这次数据发射挂起,直到有新的缓存空间。 第三步:接纳数据 接纳数据的办法,跟一般的 Flow 没什么差异。

下面是我的全部代码:

class MainActivity : AppCompatActivity() {
    private lateinit var viewModel: MainViewModel
    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)
        viewModel = ViewModelProvider(this).get(com.example.coroutinedemo.viewmodel.MainViewModel::class.java)
        val tvContent = findViewById<TextView>(R.id.tv_content)
        // 发动榜首个协程,接纳初始化的数据
        lifecycleScope.launch {
            val sb = StringBuffer()
            viewModel.sharedFlow.collect {
                sb.append("<<${it}")
                tvContent.text = sb
            }
        }
        val btnGo = findViewById<Button>(R.id.btn_go)
        val tvTwo = findViewById<TextView>(R.id.tv_2)
        btnGo.setOnClickListener {
            // 发送新的数据
            viewModel.doAsClick()
            // 发送新的数据今后,发动第二个协程
            lifecycleScope.launch {
                val sb = StringBuffer()
                viewModel.sharedFlow.collect {
                    sb.append("<<${it}")
                    tvTwo.text = sb.toString()
                }
            }
        }
    }
}
  1. 将冷流转化为SharedFlow 直接运用官网的代码,办法是运用 Flow 的扩展办法 shareIn:
class NewsRemoteDataSource(...,
    private val externalScope: CoroutineScope,
) {
    val latestNews: Flow<List<ArticleHeadline>> = flow {
        ...
    }.shareIn(
        externalScope,
        replay = 1,
        started = SharingStarted.WhileSubscribed() // 发动政策
    )
}

重点是参数三,别离供给了三个发动战略:

SharingStarted.WhileSubscribed():存在订阅者时,将使上游供给方保持活泼状况。 SharingStarted.Eagerly:当即发动供给方。 SharingStarted.Lazily:在榜首个订阅者出现后开始共享数据,并使数据流永远保持活泼状况。 总结 Flow 给我的感觉就像古老的印刷术,版面定了就不可更改,不过,该版面可印刷多张内容;StateFlow 给我的感觉就像活字印刷,能够不停的更改版面,也能够运用同一个版面印刷很多内容。

假如你要运用 Flow 记载数据的状况,StateFlow 和 SharedFlow 会是一个不错的选择。StateFlow 和 SharedFlow 供给了在 Flow 中运用 LiveData 式更新数据的能力,但是假如要在 UI 层运用,需求留意生命周期的问题。

StateFlow 和 SharedFlow 比较,StateFlow 需求供给初始值,SharedFlow 装备灵敏,可供给旧数据同步和缓存装备的功用。 协程进阶技巧 – StateFlow和SharedFlow