前言

本文首要基于详细场景介绍StateFlow和ShareFlow的运用

场景实战

主页1s改写一次

// onresume
fun startRefreshUi() {
    refreshJob = launch(Dispatchers.Main) {
        while (_isRefreshUiRunning) {
            _todayDataFlow.emitAll(
                getTodayUseCase(
                    GetTodayUseCase.Params(uid, KevaUtil.getBoolean(NEED_REQUEST_KEY, true))
                )
            )
            delay(REFRESH_DATA_DURATION)
        }
    }
}
// onstop
fun stopRefreshUi() {
    Log.d(TAG, "stopRefreshUi")
    _isRefreshUiRunning = false
}
  • onResume开始改写
  • onStop 中止改写

优化如下

/**
    * [REFRESH_DATA_DURATION] 改写一次today
    */

private val refreshTodayFlow = flow {
    while (true) {

        Log.i("yyyyyyyyyy", ": emitAll OnlineOrOffline")

        emit(Unit)

        delay(REFRESH_DATA_DURATION)

    }

}
val todayDataFlow: SharedFlow<Res<TodayData>> = refreshTodayFlow.transform {
    UserBean.getInstance()?.uniqid?.let { uid ->

        emitAll(getTodayUseCase(

            GetTodayUseCase.Params(uid, KevaUtil.getBoolean(NEED_REQUEST_KEY, true))

        ))

    }

}.stateIn(viewModelScope, SharingStarted.WhileSubscribed() , Res.Loading)

SharingStarted.WhileSubscribed()在onStop时 会当即中止上游更新。ShareFlow和StateFlow中的生命周期办理 – 文章 – ByteTech (bytedance.net)

分页处理

场景分析

运用jetpack paging3进行分页, paging3内部做了flow的适配

val pagingRankingData: MutableLiveData<Flow<PagingData<RankingsItem>>?> = MutableLiveData()
private val pagerChangedFlow = combineTransform(
        rakingKeyWrapper,

        rakingDuration,

        updateLocationSignal,

        reloadSignal

    ) { rankingKeyWrapper, duration, updateLocationSignal, reload ->

        Log.i("yyyyy", "1231")

        val rankingKey = rankingKeyWrapper?.data

        nowRankingKey = rankingKey

        nowRankingDuration = duration

        // 1. 之前这样处理的原因是, 如果emitAll(pager.flow),只要第一次能执行,后续都不能进入combineTransform的 {}

        pagingRankingData.value = Pager(PagingConfig(pageSize = pageSize)) {

                    val source = RankingListSource(

                        rankingKey?.key,

                        duration,

                        pageSize

                    ) { rankingData, page ->

                        if (page == RankingListSource.FIRST_PAGE) {

                            //只要加载第一页的时分咱们才改写这个

                            viewModelScope.launch {

                                _rankingExtData.emit(rankingData)

                            }

                        }

                    }

                    return@Pager source

                }.flow             

        emit(emptyFlow<Int>())

    }.stateIn(viewModelScope, SharingStarted.Eagerly, 1) // 2. SharingStarted.Eagerly不太对

问题

  • 只要第一次能够打印yyyyy,原理见combineTranform原理 – 文章 – ByteTech (bytedance.net)
  • 退回后台时没有封闭流

简化模型

简化场景模型如下:

private val pageFlowSignal = MutableSharedFlow<Int>(0)
private val pageFlow = MutableStateFlow(0)
private val testFlow1 = MutableStateFlow(1)
private val testFlow2 = MutableStateFlow("1")
val testFlow = combineTransform(testFlow1, testFlow2, pageFlowSignal) { a, b, _ ->
    Log.i("yyyyyyy", ": combineTransform")
    // emitAll 一个热流
    emitAll(pageFlow)
}
// 每次点击button触发
fun test() {
    viewModelScope.launch {
        pageFlowSignal.emit((0..1100).random())
    }
}

经过点击button按钮触发test函数,但只要第一次会打印yyyyyyy

  • 在combineTransform里不要emitAll(热流)

改造

val pagerChangedFlow = combineTransform(
        rakingKeyWrapper,

        rakingDuration,

        updateLocationSignal,

        reloadSignal

    ) { rankingKeyWrapper, duration, updateLocationSignal, reload ->

        val rankingKey = rankingKeyWrapper?.data

        nowRankingKey = rankingKey

        nowRankingDuration = duration

        val rankingKeyJson = GsonUtils.toJson(rankingKeyWrapper?.data)

        KevaUtil.putString(GetInitedRankingKeyUseCase.SAVED_RANKING_KEY, rankingKeyJson)

        KevaUtil.putString(

            GetInitedRankingDurationUseCase.SAVED_RANKING_DURATION,

            duration.duration

        )

        emit(Pair(rankingKey, duration))

    }.flatMapLatest {

        XLog.tag(TAG).i(" enter page flow ")

        val (rankingKey, duration) = it

        if (rankingKey?.isNeedLocate == true && !fitLocationUtil.hasLocationPermission()) {

            XLog.tag(TAG)

                .d("pagingRankingData needLocate true and don't hasLocationPermission.")

            flow {

                emit(PagingData.empty<RankingsItem>())

            }

        } else {

            val pageSize = 50

            Pager(PagingConfig(pageSize = pageSize)) {

                val source = RankingListSource(

                    rankingKey?.key,

                    duration,

                    pageSize

                ) { rankingData, page ->

                    if (page == RankingListSource.FIRST_PAGE) {

                        //只要加载第一页的时分咱们才改写这个

                        viewModelScope.launch {

                            _rankingExtData.emit(rankingData)

                        }

                    }

                }

                return@Pager source

            }.flow

        }

    }.cachedIn(viewModelScope)
  • 运用flatMapLatest平滑combineTransFrom。相当于把pageFlow的内层循环拿出来了

部分改写

场景介绍

  • 授权办理list
  • 点击详细运用,弹出弹窗查看授权详情
  • 在详情页面可分别对基础信息,运动数据授权

建立模型

整体运用了SDK原生的数据结构,其中有两个特色比较影响流模型构建

  • 原生数据结构无法序列化,很难向DialoFragment传递序列化数据作为arguments,需求共用一个ViewModel
  • 原生数据结构无法直接改值
// 授权办理列表的uiState,SharingStarted.WhileSubscribed(5000, 1000)第二个参数表示重放的过期保存的时间,默许是MAX_VALUE
val authList = refreshAuthListSignal.transform {
        // 得到授权列表
        emitAll(getSportAuthListUseCase(Unit))
    }.stateIn(viewModelScope, SharingStarted.WhileSubscribed(5000, 1000), Res.Loading)
// 详情弹窗的ui state
// sportAuthorizeInfo为sdk里的数据结构SportAuthorizeInfo
val detailScopes: StateFlow<List<ScopeInfo>> = sportAuthorizeInfo.transform { sportAuthInfo ->
    if (sportAuthInfo != null) {
        XLog.tag(TAG).i("sportAuthInfo = $sportAuthInfo")
        emit(sportAuthInfo.sportAuthScopes.map { it })
    }
}.stateIn(viewModelScope, SharingStarted.WhileSubscribed(5000), emptyList())
  • 部分改写只会改detailScopes,而不会改sportAuthorizeInfo。所以sportAuthorizeInfo的状况会有一些问题。所以部分改写尽量不要这么用即sportAuthorizeInfo作为一个stateFlow
  • ScopeInfo 无法直接改值,所以需求自己的数据结构

改造

基于以上模型,做如下改造

// 用于部分改写
val partRefreshSignal = MutableSharedFlow<Unit>()
val detailScopes: StateFlow<List<Scope>> = _refreshInfoSignal.transform { action ->
    val pos = if (action is Action.RefreshAllAction) action.pos else _curPos
    val authInfo = curAuthList[pos]
    val scopes = when (action) {
        is Action.RemoveAllAction -> {
            authInfo.sportAuthScopes.map {
                it.newBuilder().name(it.name).isAuthorized(false).build()
            }
        }
        else -> {
            authInfo.sportAuthScopes
        }
    }
    XLog.tag(TAG).i("sportAuthInfo = $authInfo")
    emit(scopes.map { Scope(authInfo.client_key, it.name, it.isAuthorized) })
}.stateIn(viewModelScope, SharingStarted.WhileSubscribed(5000), emptyList())
fun removeScope(scopeName: String) {
    viewModelScope.launch {
        val scope = curDetailScopes.first { it.name == scopeName }
        val res = removeSportAuthUseCase(RemoveSportAuthUseCase.Params(scope.clientKey, scopeName))
        if (res is Res.Success) {
            XLog.tag(TAG).i("remove sport auth success")
            scope.isAuthorized = false
            XLog.tag(TAG).i(" removeScope scope = $scope ")
            // 部分改写
            partRefreshSignal.emit(Unit)
            AuthManagerTracker.change_app_authorize(trackerCurPos, curName, "data_detail", toTrackerDataStatus(userInfoStatus), toTrackerDataStatus(sportInfoStatus))
        } else {
            _actionErrorSignal.emit(Unit)
            AuthManagerTracker.change_app_authorize(trackerCurPos, curName, "data_detail", toTrackerDataStatus(userInfoStatus), toTrackerDataStatus(sportInfoStatus))
        }
    }
}
// View层,新增一个collect
launchAndRepeatWithViewLifecycle {
    model.partRefreshSignal.collect {
        binding.bindChecked(model.curDetailScopes)
    }
}
  • google提出的MVI是一个理想模型,在部分改写场景中很难完成,由于部分数据改动需求让整个数据改变是不科学的,浪费功能。

下载 & 上报

下载和上报这个场景,生命周期和View不太共同,当View退出的时分,两者应该不能被销毁,所以获取流的时分,在第二个参数应该为`SharingStarted.Eagerly `如下

stateIn(viewModelScope, SharingStarted.Eagerly, SyncState.Init)

这样就会在View退出的时分,上游(生产者)不会被封闭,下流(顾客)会被封闭。当从头进入View的时分,下流会康复,上游会把状况emit给下流

我这儿View是一个loading且不行封闭的view。如果是在可封闭的view场景,逻辑上是需求Service去处理的

在静态类中办理生命周期

咱们在惯例的开发中能够看到注册与反注册,比方在Activity的onCreate注册,在onDestory反注册,在ViewModel init时注册,在onClear时反注册。可是现在有了`lifecycelScope` ViewModelScope能够已sdk的方法供给出去了。接入方不必考虑生命周期了。比方考虑如下场景

  • 调用方企图在KV组件的Key改变时,触发某些行为,比方Keva

object UploadHistoryFlagger {
    fun flagHadUploadHistory() {
        KevaUtil.putBoolean(
                Constant.HAD_UPLOAD_HISTORY_KEY + "_" + UserBean.getInstance()?.uniqid,
                true
            )
         hasUploadListener?.invoke()
    }
}

然后在ViewModel声明这样的Listener

UploadHistoryFlagger.hasUploadListener = {
 //在之前没有上传数据,现在上传数据之后,需求改写
    XLog.tag(TAG).d("hasUploadListener invoke")
 }
override fun onCleared() {
    // 在ViewModel置null
    UploadHistoryFlagger.hasUploadListener = null
    super.onCleared()
}

改造

咱们利用viewModelScopeFlow(本质也是回调)来改造下,即在ViewModelScope的生命周期内用flow监听keva的key的改变,完成如下:

fun CoroutineScope.kevaChannelFlow(): Flow<String> {
    return getKevaKey()
}
 /**
 * 内部和viewScope的
 */
private fun CoroutineScope.getKevaKey(): Flow<String> {
    val keyShareFlow = MutableSharedFlow<String>(extraBufferCapacity = 1, onBufferOverflow = BufferOverflow.DROP_OLDEST)
    var listener: Keva.OnChangeListener? = null
    // key回调
    Keva.getRepo(KEVA_MAP_ID, MODE_SINGLE_PROCESS)
        ?.registerChangeListener(Keva.OnChangeListener { repo, key ->
  launch {
 keyShareFlow.tryEmit(key)
            }
 } .also {
 listener = it
 } )
        // 咱们在viewModelScope结束的时分反注册
    return keyShareFlow.onCompletion {
 XLog.tag(TAG).i(" onCompletion ")
        Keva.getRepo(KEVA_MAP_ID, MODE_SINGLE_PROCESS)?.unRegisterChangeListener(listener)
    }
}

在ViewModel

val kevaDataFlow = viewModelScope.kevaChannelFlow()
    .filter { it == Constant.HAD_UPLOAD_HISTORY_KEY + "_" + UserBean.getInstance()?.uniqid }
init {
    kevaDataFlow.collect {
    XLog.tag(TAG).d("hasUploadListener invoke")
        reload()
    }
}

利用combine的特性,进行杂乱的埋点

无法仿制加载中的内容

如上图所示,需求紫色流(todayDataFlow)和绿色流(badgeFlow),todayDataFlow是一个守时循环流,两个流都是异步使命。咱们需求把埋点时机控制在紫色流的第一个节点和绿色流的节点都发射后触发埋点。难点在于都是异步流,无法确守时机,有或许变成如上图所示的错误流序列

下图说明了怎么完成:

无法仿制加载中的内容

即运用filter过滤掉2类型节点,然后和绿色流combine即可,如下代码为完成


 /**
 * trackerBadgeFlow 朴实是为了埋点
 * 逻辑:等候badgeFlow,todayDataFlow一起回来,才会displayHomePage
 */
private val trackerBadgeFlow = _badgeFlow.transform { badge ->
 Log.i(TAG, "badge: $badge")
    if (badge is Res.Success) {
        if (badge.data != null) emit(badge.data?.status ?: false)
    } else {
        emit(false)
    }
 }
val trackerDisplayHomeFlow = combineTransform(trackerBadgeFlow, todayDataFlow.filter { today ->
 (today is Res.Success && today.data.weekInfo.second > 0)
 } ) { newBadge, today ->
 if (today is Res.Success) {
        val weekInfo = today.data.weekInfo
        Log.i(TAG, "weekInfo: ${weekInfo.second} today = $today")
        if (weekInfo.second > 0) {
            emit(Pair(weekInfo, newBadge))
        }
    }
 } .shareIn(viewModelScope, SharingStarted.WhileSubscribed())
fun fetchBadge(showBadge: Boolean) {
    viewModelScope.launch {
 if (!showBadge) {
            _badgeFlow.emit(Res.Error(IllegalArgumentException()))
        } else {
            val res = getBadgeInfoUseCase(Unit)
            _badgeFlow.emit(res)
        }
    }
}

ShareFlow与StateFlow实战

生命周期收效逻辑

ShareFlow与StateFlow实战

上游会以下流得生命周期Eagerly为准,其实比较相似RxJava

总结

从头考虑呼应式 vs 指令式

无法仿制加载中的内容

  • 呼应式面向流编程,监听流的改变,呼应改变
  • 用声明的方法呼应未来产生的事情流
  • 呼应式面向成果,指令式面向进程
  • Flutter在界面布局上是不能经过指令式去写的,只能在布局上描绘成果。然后监听布局相关的数据改变,然后改变UI
  • 呼应式相似y=f(x),
  1. 所以基于shareFlow和stateFlow的ViewModel类一般长这样(相似Flutter,Compose的写法)
val detailScopes: StateFlow<List<Scope>> = _refreshInfoSignal.transform { action ->
 val pos = if (action is Action.RefreshAllAction) action.pos else _curPos
    val authInfo = curAuthList.getOrNull(curPos) ?: return@transform
    val scopes = when (action) {
        is Action.RemoveAllAction -> {
            authInfo.sportAuthScopes.map {
 it.newBuilder().name(it.name).isAuthorized(false).build()
            }
 }
        else -> {
            authInfo.sportAuthScopes
        }
    }
    XLog.tag(TAG).i("sportAuthInfo = $authInfo")
    emit(scopes.map { Scope(authInfo.client_key, it.name, it.isAuthorized) } )
 } .stateIn(viewModelScope, SharingStarted.WhileSubscribed(5000), emptyList())
  • detailScopes描绘了成果
  • _refreshInfoSignal描绘了输入
  • transform一般为转换函数
  1. LiveData的写法一般长这样
val detailScopes = MutableLiveData<List<Scope>>()
fun removeAllAction() {
     val scopes = authInfo.sportAuthScopes.map {
 it.newBuilder().name(it.name).isAuthorized(false).build()
            }
     detailScopes.value = scopes
}
fun refreshAllAction() {
     //   
     detailScopes.value = scopes
}

能够看到两种写法的不同点

  • 第一种写法detailScopes是一个成果,能够直接定义。第二种写法需求指令式的写`detailScopes.value = scopes`

在普通的android开发中运用声明式的写法对错常理想化的。由于其不具备相似flutter和compose的树形结构(该结构会进步UI的改写功率),所以一般是会有多个uiState用来绑定每个部分的ui。