开启成长之旅!这是我参与「日新方案 12 月更文应战」的第6天,点击检查活动概况
flow介绍
挂起函数能够异步回来单个值,那如何异步屡次回来多个值呢? 运用flow,flow的特点:
- flow{…}块中的代码能够挂起
- 运用flow,suspend修饰符能够省略
- 流运用emit函数发射值
- 流运用collect的函数搜集值
- flow相似冷流,flow中代码直到流被搜集(调用collect)的时分才运转,相似lazy,什么时分用,什么时分履行。
- 流的连续性:流搜集都是按顺序搜集的
- flowOn可更改流发射的上下文,即能够指定在主线程或子线程中履行
- 与之相对的是暖流,咱们行将介绍的 StateFlow 和 SharedFlow 是暖流,在垃圾回收之前,都是存在内存之中,并且处于活泼状况的。
//运用flow,suspend修饰符能够省略
fun doflow() = flow<Int> {
for (i in 1..5) {
//这里是挂起,不是堵塞
delay(500)
emit(i)
}
}.flowOn(Dispatchers.IO)
//调用
runBlocking {
doflow().collect {
log("value=$it")
}
}
打印(屡次回来多个值)
com.z.zjetpack V/zx: value=1
com.z.zjetpack V/zx: value=2
com.z.zjetpack V/zx: value=3
com.z.zjetpack V/zx: value=4
com.z.zjetpack V/zx: value=5
flow的运用场景
文件下载场景
//正在下载(文件总巨细为5)
fun doflow() = flow<Double> {
for (i in 1..5) {
delay(500)
emit(i.toDouble())
}
//flowOn来指定在IO线程中下载
}.flowOn(Dispatchers.IO)
//读取进展
runBlocking {
doflow().collect {
log("当时下载=${it / 5 * 100}%")
}
}
打印:
com.z.zjetpack V/zx: 当时下载=20.0%
com.z.zjetpack V/zx: 当时下载=40.0%
com.z.zjetpack V/zx: 当时下载=60.0%
com.z.zjetpack V/zx: 当时下载=80.0%
com.z.zjetpack V/zx: 当时下载=100.0%
流构建器
flowof 和asflow
runBlocking {
flowOf(1, 2, 3)
.onEach { delay(500) }
.collect {
log("value = $it")
}
(5..8).asFlow()
.onEach { delay(500) }
.collect {
log("value = $it")
}
}
运用launchin替换collect在单独的协程中发动搜集流。
fun event() = (1..3)
.asFlow()
.onEach {
delay(500)
}.flowOn(Dispatchers.IO)
//调用
runBlocking {
val job = event().onEach {
log("value = $it")
}.launchIn(CoroutineScope(Dispatchers.IO))
//主线程可用this
//.launchIn(this)
job.join()
}
流的撤销
超时的时分撤销
fun cancelFlow() = flow<Int> {
for (i in 1..5) {
delay(1000)
emit(i)
}
}
//调用
runBlocking {
//超时的时分撤销流
withTimeoutOrNull(2500) {
cancelFlow().collect {
log("value = $it")
}
}
}
打印:在2.5秒的时分超时了,撤销了
com.z.zjetpack V/zx: value = 1
com.z.zjetpack V/zx: value = 2
直接撤销
runBlocking {
cancelFlow().collect {
log("value = $it")
if(it == 3){
cancel()
}
}
}
繁忙的使命是不能直接撤销的,需求检测撤销(cancellable)
runBlocking {
(1..5).asFlow().cancellable().collect {
if(it == 3) {
cancel()
}
}
}
背压:生产者功率 > 消费者功率 运用缓冲和flowon来处理背压
buffer():并发运转流中发射元素的代码 conflate():兼并发射项,不对每个值处理 collectLatest():撤销偏重新发送最终一个值
模拟背压代码:
fun preFlow() = flow<Int> {
for (i in 1..5) {
delay(100)
emit(i)
log("发送$i")
}
}
//调用
//100ms发送一次,300ms接纳一次就产生了背压
runBlocking {
val time = measureTimeMillis {
preFlow()
//buffer能够增加缓冲,提高功率
//.buffer(100)
//flowOn自带缓冲功用
//.flowOn(Dispatchers.IO)
//conflate不对每个值处理
//.conflate()
//.collect
//撤销偏重新发送最终一个值
.collectLatest {
delay(300)
log("接纳到:$it")
}
}
log("总耗时 $time")
}
打印:
com.z.zjetpack V/zx: 接纳到:1
com.z.zjetpack V/zx: 发送1
com.z.zjetpack V/zx: 接纳到:2
com.z.zjetpack V/zx: 发送2
com.z.zjetpack V/zx: 接纳到:3
com.z.zjetpack V/zx: 发送3
com.z.zjetpack V/zx: 接纳到:4
com.z.zjetpack V/zx: 发送4
com.z.zjetpack V/zx: 接纳到:5
com.z.zjetpack V/zx: 发送5
com.z.zjetpack V/zx: 总耗时 2033
运用buffer后
com.z.zjetpack V/zx: 发送1
com.z.zjetpack V/zx: 发送2
com.z.zjetpack V/zx: 发送3
com.z.zjetpack V/zx: 接纳到:1
com.z.zjetpack V/zx: 发送4
com.z.zjetpack V/zx: 发送5
com.z.zjetpack V/zx: 接纳到:2
com.z.zjetpack V/zx: 接纳到:3
com.z.zjetpack V/zx: 接纳到:4
com.z.zjetpack V/zx: 接纳到:5
com.z.zjetpack V/zx: 总耗时 1634
运用flowOn后
com.z.zjetpack V/zx: 发送1
com.z.zjetpack V/zx: 发送2
com.z.zjetpack V/zx: 发送3
com.z.zjetpack V/zx: 接纳到:1
com.z.zjetpack V/zx: 发送4
com.z.zjetpack V/zx: 发送5
com.z.zjetpack V/zx: 接纳到:2
com.z.zjetpack V/zx: 接纳到:3
com.z.zjetpack V/zx: 接纳到:4
com.z.zjetpack V/zx: 接纳到:5
com.z.zjetpack V/zx: 总耗时 1639
运用conflate后
com.z.zjetpack V/zx: 发送1
com.z.zjetpack V/zx: 发送2
com.z.zjetpack V/zx: 发送3
com.z.zjetpack V/zx: 接纳到:1
com.z.zjetpack V/zx: 发送4
com.z.zjetpack V/zx: 发送5
com.z.zjetpack V/zx: 接纳到:3
com.z.zjetpack V/zx: 接纳到:5
com.z.zjetpack V/zx: 总耗时 1034
运用collectLatest后
com.z.zjetpack V/zx: 发送1
com.z.zjetpack V/zx: 发送2
com.z.zjetpack V/zx: 发送3
com.z.zjetpack V/zx: 发送4
com.z.zjetpack V/zx: 发送5
com.z.zjetpack V/zx: 接纳到:5
com.z.zjetpack V/zx: 总耗时 843
操作符
转化操作符:map ,transform 限长操作符:取指定数量,take 末端操作符:末端操作符用于发动流搜集的挂起函数,collect,tolist,toset,reduce,fold 组合操作符:zip 展平操作符:flatMapConcat(连接),flatMapMerge(兼并),flatMapLatest(最新)
map
suspend fun perRequest(req: Int): String {
delay(1000)
return "转化 $req"
}
runBlocking {
(1..3).asFlow().map {
perRequest(it)
}.collect {
log(it)
}
}
打印:
com.z.zjetpack V/zx: 转化 1
com.z.zjetpack V/zx: 转化 2
com.z.zjetpack V/zx: 转化 3
transform
runBlocking {
(5..6).asFlow().transform {
emit("s $it")
emit(perRequest(it))
emit("e $it")
}
//.take(4)
.collect {
log(it)
}
}
打印:
com.z.zjetpack V/zx: s 5
com.z.zjetpack V/zx: 转化 5
com.z.zjetpack V/zx: e 5
com.z.zjetpack V/zx: s 6
com.z.zjetpack V/zx: 转化 6
com.z.zjetpack V/zx: e 6
take
加上take之后
com.z.zjetpack V/zx: s 5
com.z.zjetpack V/zx: 转化 5
com.z.zjetpack V/zx: e 5
com.z.zjetpack V/zx: s 6
末端操作符:collect,tolist,toset,reduce,fold
runBlocking {
val sum = (1..5).asFlow().map { it * it }.reduce { a, b -> a + b }
log("sum = $sum")
val nList = (1..5).asFlow().toList()
log("nList = $nList")
val nSet = listOf(1, 2, 2, 3, 3, 5).asFlow().toSet()
log("nSet = $nSet")
}
打印:
com.z.zjetpack V/zx: sum = 55
com.z.zjetpack V/zx: nList = [1, 2, 3, 4, 5]
com.z.zjetpack V/zx: nSet = [1, 2, 3, 5]
展平操作符
只运用map的时分
//回来值是一个flow
fun reqFlow(i: Int) = flow<String> {
emit("start $i")
delay(500)
emit("end $i")
}
runBlocking {
(0..1).asFlow().map {
reqFlow(it)
}.collect {
log("初次collect = $it")
it.collect {
log("二次 = $it")
}
}
}
打印:由于回来是flow所以需求collect 两次才能拿到值,Flow<Flow<String>>
com.z.zjetpack V/zx: 初次collect = kotlinx.coroutines.flow.SafeFlow@63db1bf
com.z.zjetpack V/zx: 二次 = start 0
com.z.zjetpack V/zx: 二次 = end 0
com.z.zjetpack V/zx: 初次collect = kotlinx.coroutines.flow.SafeFlow@d27108c
com.z.zjetpack V/zx: 二次 = start 1
com.z.zjetpack V/zx: 二次 = end 1
flatMapConcat
runBlocking {
(0..1).asFlow().flatMapConcat {
reqFlow(it)
}.collect {
log("初次collect = $it")
}
}
打印:直接展开了
com.z.zjetpack V/zx: 初次collect = start 0
com.z.zjetpack V/zx: 初次collect = end 0
com.z.zjetpack V/zx: 初次collect = start 1
com.z.zjetpack V/zx: 初次collect = end 1
runBlocking {
(0..1).asFlow().flatMapMerge {
reqFlow(it)
}.collect {
log("初次collect = $it")
}
}
打印:
com.z.zjetpack V/zx: 初次collect = start 0
com.z.zjetpack V/zx: 初次collect = start 1
com.z.zjetpack V/zx: 初次collect = end 0
com.z.zjetpack V/zx: 初次collect = end 1
flatMapLatest
runBlocking {
(0..1).asFlow().flatMapLatest {
reqFlow(it)
}.collect {
log("初次collect = $it")
}
}
打印:
com.z.zjetpack V/zx: 初次collect = start 0
com.z.zjetpack V/zx: 初次collect = start 1
com.z.zjetpack V/zx: 初次collect = end 1
流的反常处理
catch函数 和 try catch
flow {
emit(1)
throw NullPointerException()
//catch函数只捕获上游的反常
}.catch {
log("exception $it")
//在反常后恢复
emit(20)
}.flowOn(Dispatchers.IO)
.collect {
log("msg $it")
}
打印:
com.z.zjetpack V/zx: exception java.lang.NullPointerException
com.z.zjetpack V/zx: msg 1
com.z.zjetpack V/zx: msg 20
//不主张经过这种办法捕获上游的反常,违反了flow准则,这种适合捕获下流的反常
try {
(1..3).asFlow().collect {
check(it > 2) {
"ex $it"
}
}
} catch (e: Exception) {
log("反常 $e")
}
打印:
com.z.zjetpack V/zx: 反常 java.lang.IllegalStateException: ex 1
流的完结
finally 和 onCompletion
try {
(1..3).asFlow().collect {
check(it > 2) {
"ex $it"
}
}
} catch (e: Exception) {
log("反常 $e")
} finally {
log("流已完结")
}
//产生反常onCompletion能够拿到反常信息,但不会捕获
try {
(1..3).asFlow().onCompletion {
log("onCompletion $it")
}.collect {
check(it > 2) {
"ex $it"
}
}
} catch (e: Exception) {
log("反常 $e")
}
打印:
com.z.zjetpack V/zx: 反常 java.lang.IllegalStateException: ex 1
com.z.zjetpack V/zx: 流已完结
com.z.zjetpack V/zx: onCompletion java.lang.IllegalStateException: ex 1
com.z.zjetpack V/zx: 反常 java.lang.IllegalStateException: ex 1
StateFlow
StateFlow 是一个状况容器式可观察数据流,能够向其搜集器宣布当时状况更新和新状况更新。
- StateFlow运用 榜首步:创建 MutableStateFlow 并设置初始化的值。
class MainViewModel : ViewModel() {
val selected = MutableStateFlow<Boolean>(false)
}
第二步:同 Flow 相同,运用 collect 办法:
lifecycleScope.launch {
viewModel.selected.collect {
// ... 引起UI产生的改变
// 比方 某个按钮是否选中状况
}
}
第三步:能够给 selected设置值,从而引起 Ui 层的改变:
class MainViewModel : ViewModel() {
val selected = MutableStateFlow<Boolean>(false)
fun doSomeThing(value: Boolean) {
selected.value = value
}
}
一般的 Flow,是不具备 selected.value = value 这种能力的
StateFlow 和 LiveData 有什么差异? 有两点差异:
榜首点,StateFlow 必须有初始值,LiveData 不需求。 第二点,当 View 变为 STOPPED 状况时,LiveData.observe() 会主动撤销注册运用方,而从 StateFlow 或任何其他数据流搜集数据则不会撤销注册运用方。 关于 StateFlow 在界面毁掉的时仍处于活泼状况,有两种解决办法:
运用 ktx 将 Flow 转化为 LiveData。 在界面毁掉的时分,手动撤销(这很容易被忘记)。
class LatestNewsActivity : AppCompatActivity() {
...
// Coroutine listening for UI states
private var uiStateJob: Job? = null
override fun onStart() {
super.onStart()
// Start collecting when the View is visible
uiStateJob = lifecycleScope.launch {
latestNewsViewModel.uiState.collect { uiState -> ... }
}
}
override fun onStop() {
// Stop collecting when the View goes to the background
uiStateJob?.cancel()
super.onStop()
}
}
SharedFlow
SharedFlow:数据共享,有点相似广播 和 StateFlow 相同,SharedFlow 也是暖流,它能够将已发送过的数据发送给新的订阅者,并且具有高的装备性。
- SharedFlow运用场景 总的来说,SharedFlow 和 StateFlow 相似,他们都是暖流,都能够用来存储状况,但 SharedFlow 装备灵敏。
当你有如下场景时,需求运用 SharedFlow:
产生订阅时,需求将过去已经更新的n个值,同步给新的订阅者。 装备缓存战略。 2. SharedFlow的运用 简略写一个 Demo吧。
榜首步:创建一个 MutableSharedFlow,对应的参数解说在注释中
class MainViewModel : ViewModel() {
val sharedFlow = MutableSharedFlow<Int>(
5 // 参数一:当新的订阅者Collect时,发送几个已经发送过的数据给它
, 3 // 参数二:减去replay,MutableSharedFlow还缓存多少数据
, BufferOverflow.DROP_OLDEST // 参数三:缓存战略,三种 丢掉最新值、丢掉最旧值和挂起
)
}
第二步:运用emit或者tryEmit办法
class MainViewModel : ViewModel() {
val sharedFlow = MutableSharedFlow<Int>(
// ....
)
// 初始化时调用
init {
for (i in 0..10) {
sharedFlow.tryEmit(i)
}
}
// 在按钮中调用
fun doAsClick() {
for (i in 11..20) {
sharedFlow.tryEmit(i)
}
}
}
当 MutableSharedFlow 中缓存数据量超过阈值时,emit 办法和 tryEmit 办法的处理办法会有不同:
emit 办法:当缓存战略为 BufferOverflow.SUSPEND 时,emit 办法会挂起,直到有新的缓存空间。 tryEmit 办法:tryEmit 会回来一个 Boolean 值,true 代表传递成功,false 代表会产生一个回调,让这次数据发射挂起,直到有新的缓存空间。 第三步:接纳数据 接纳数据的办法,跟一般的 Flow 没什么差异。
下面是我的全部代码:
class MainActivity : AppCompatActivity() {
private lateinit var viewModel: MainViewModel
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
viewModel = ViewModelProvider(this).get(com.example.coroutinedemo.viewmodel.MainViewModel::class.java)
val tvContent = findViewById<TextView>(R.id.tv_content)
// 发动榜首个协程,接纳初始化的数据
lifecycleScope.launch {
val sb = StringBuffer()
viewModel.sharedFlow.collect {
sb.append("<<${it}")
tvContent.text = sb
}
}
val btnGo = findViewById<Button>(R.id.btn_go)
val tvTwo = findViewById<TextView>(R.id.tv_2)
btnGo.setOnClickListener {
// 发送新的数据
viewModel.doAsClick()
// 发送新的数据今后,发动第二个协程
lifecycleScope.launch {
val sb = StringBuffer()
viewModel.sharedFlow.collect {
sb.append("<<${it}")
tvTwo.text = sb.toString()
}
}
}
}
}
- 将冷流转化为SharedFlow 直接运用官网的代码,办法是运用 Flow 的扩展办法 shareIn:
class NewsRemoteDataSource(...,
private val externalScope: CoroutineScope,
) {
val latestNews: Flow<List<ArticleHeadline>> = flow {
...
}.shareIn(
externalScope,
replay = 1,
started = SharingStarted.WhileSubscribed() // 发动政策
)
}
重点是参数三,别离供给了三个发动战略:
SharingStarted.WhileSubscribed():存在订阅者时,将使上游供给方保持活泼状况。 SharingStarted.Eagerly:当即发动供给方。 SharingStarted.Lazily:在榜首个订阅者出现后开始共享数据,并使数据流永远保持活泼状况。 总结 Flow 给我的感觉就像古老的印刷术,版面定了就不可更改,不过,该版面可印刷多张内容;StateFlow 给我的感觉就像活字印刷,能够不停的更改版面,也能够运用同一个版面印刷很多内容。
假如你要运用 Flow 记载数据的状况,StateFlow 和 SharedFlow 会是一个不错的选择。StateFlow 和 SharedFlow 供给了在 Flow 中运用 LiveData 式更新数据的能力,但是假如要在 UI 层运用,需求留意生命周期的问题。
StateFlow 和 SharedFlow 比较,StateFlow 需求供给初始值,SharedFlow 装备灵敏,可供给旧数据同步和缓存装备的功用。 协程进阶技巧 – StateFlow和SharedFlow