前言
协程到现在,咱们已经差不多学完了一切基础知识,包括协程发动办法、挂起函数、结构化并发、反常处理、Channel
以及Flow
等,而关于Flow
的进阶运用以及协程更多进阶运用,在后面还需求继续探究。
在之前有一篇文章,咱们简单完结了一个Retrofit
,并且运用协程的API完结了挂起函数,让咱们能够用同步的办法写异步代码。文章地址:# 协程(09) | 完结一个简易Retrofit。
那这还不行过瘾,因为咱们之前学习Flow
的时候,知道Flow
就像是一条河流,那假设咱们从网络获取的数据,就像是河流相同流淌下来,咱们运用各种中间操作符进行处理,最终再展示出来,运用链式调用,不仅大大简化代码编写,还让逻辑愈加明晰。
本章内容咱们就来完结一个简易的支撑Flow
回来类型的Retrofit
。和支撑挂起函数相同,咱们分为2个方向:第一个方向是不改动本来SDK代码,把Callback
类型改成支撑Flow
,这种适合咱们没有第三方库源码的状况;第二个方向是直接有权限修正源码,在源码阶段支撑Flow
。
正文
代码完结仍是继续第9篇中的简易Retrofit
代码,所以这儿简易先看之前的文章。和完结挂起函数相同,咱们先来改造Callback
。
Callback
转Flow
和完结挂起函数相同,咱们给KtCall
类型再加一个扩展函数asFlow
:
/**
* 把本来[CallBack]方式的代码,改成[Flow]样式的,即消除回调。其实和扩展挂起函数相同,大致有如下过程:
* * 调用一个高阶函数,关于成功数据进行回来,即[trySendBlocking]办法
* * 关于失利的数据进行回来反常,即[close]办法
* * 一起要能够响应撤销,即[awaitClose]办法
* */
fun <T: Any> KtCall<T>.asFlow(): Flow<T> =
callbackFlow {
//开端网络恳求
val c = call(object : CallBack<T>{
override fun onSuccess(data: T) {
//回来正确的数据,可是要调用close()
trySendBlocking(data)
.onSuccess { close() }
.onFailure { close(it) }
}
override fun onFail(throwable: Throwable) {
//回来反常信息
close(throwable)
}
})
awaitClose {
//响应外部撤销恳求
c.cancel()
}
}
这儿的代码比较简单,可是有许多细节知识点,咱们来简单分析一下:
- 经过
callbackFlow
高阶函数完结功用,回来Flow
类型的数据,该函数界说:
public fun <T> callbackFlow(@BuilderInference block: suspend ProducerScope<T>.() -> Unit):
Flow<T> = CallbackFlowBuilder(block)
该办法经过ProducerScope
,向block
代码块中提供SendChannel
实例,经过SendChannel
实例,咱们能够向其间发射元素,从而创立出一个冷的Flow
。
这个函数的界说,在之前文章中咱们反复强调过,block
是高阶函数类型,它的接收者是ProducerScope
:
public interface ProducerScope<in E> : CoroutineScope, SendChannel<E> {
public val channel: SendChannel<E>
}
经过该接口中的默许属性channel
,咱们能够发送数据到Channel
中,比方上面代码块中的trySendBlocking
和close
办法,这也就阐明该办法完结是运用了Channel
。
因为该办法回来的Flow
是冷流,只有当一个终端操作符被调用时,该block
才会履行。
- 该结构者
builder
能确保线程安全和上下文保存,因此提供的ProducerScope
能够在任何上下文中运用,比方基于Callback
的API中,也便是本例测试代码中。
成果Flow
会在block
代码块履行完当即结束,所以应该调用awaitClose
挂起函数来保证flow
在运转,不然channel
会在block
履行完结当即close
,这也便是为什么在上面代码中,写完事务代码,还要调用awaitClose
挂起函数的原因。
-
awaitClose
是一个高阶函数,它的参数block
会在Flow
的消费者手动撤销Flow
的搜集,或许基于Callback
的API中调用SendChannel
的close
办法时被履行。
所以awaitClose
能够用来做一些block
完结后的收尾作业,比方上面代码中咱们用来撤销OkHttp
的恳求,或许在反注册一些Callback
。
一起awaitClose
是必须要调用的,能够避免当flow
被撤销时产生内存泄漏,不然代码会一向履行,即使flow
的搜集已经完结了。
为了杜绝上面状况,咱们在Callback
中,假如事务代码履行完结,不论是成功仍是失利,都需求调用close
,就比方上面代码中回来成功和回来失利都要调用close
,并且在失利时,还需求传递参数。
写完上面代码,咱们也做了一个简单分析,主要是一些规则要履行,现在咱们就来在代码中运用一下:
findViewById<TextView>(R.id.flowCall).setOnClickListener {
val dataFlow = KtHttp.create(ApiService::class.java).reposAsync(language = "Kotlin", since = "weekly").asFlow()
dataFlow
.onStart {
Toast.makeText(this@MainActivity, "开端恳求", Toast.LENGTH_SHORT).show()
}
.onCompletion {
Toast.makeText(this@MainActivity, "恳求完结", Toast.LENGTH_SHORT).show()
}
.onEach {
findViewById<TextView>(R.id.result).text = it.toString()
}
.catch {
Log.i("Flow", "catch exception: $it")
}
.launchIn(lifecycleScope)
}
现在咱们的网络恳求回来值就变成了Flow
类型,咱们就能够运用Flow
的API进行链式调用,在编码和逻辑上都愈加便利。
直接支撑Flow
上面代码运用Callback
转为Flow
适用于一些第三方库,咱们无权修正源码,可是大多数状况下,咱们仍是能够修正源码的。
就比方本章所说的简易Retrofit
,没看过之前的代码完结仍是建议看一下,这儿咱们依据之前完结异步作用相同,来界说一个直接回来Flow
类型的办法:
/**
* [reposFlow]用于异步调用,一起回来类型是[Flow]
* */
@GET("/repo")
fun reposFlow(
@Field("lang") language: String,
@Field("since") since: String
): Flow<RepoList>
然后仍是判别办法的回来值,类似于之前判别回来值类型是否是KtCall
相同,咱们判别回来值是否是Flow
类型:
/**
* 判别办法回来值类型是否是[Flow]类型
* */
private fun isFlowReturn(method: Method) =
getRawType(method.genericReturnType) == Flow::class.java
然后在具体调用的invoke
办法中进行处理:
/**
* 调用[OkHttp]功用进行网络恳求,这儿依据办法的回来值类型挑选不同的策略。
* @param path 这个是HTTP恳求的url
* @param method 界说在[ApiService]中的办法,在里面完结中,假设办法的回来值类型是[KtCall]带
* 泛型参数的类型,则认为需求进行异步调用,进行封装,让调用者传入[CallBack]。假设回来类型是一般的
* 类型,则直接进行同步调用。
* @param args 办法的参数。
* */
private fun <T: Any> invoke(path: String, method: Method, args: Array<Any>): Any?{
if (method.parameterAnnotations.size != args.size) return null
...
//泛型判别
return when{
isKtCallReturn(method) -> {
val genericReturnType = getTypeArgument(method)
KtCall<T>(call, gson, genericReturnType)
}
isFlowReturn(method) -> {
logX("Start Out")
flow<T> {
logX("Start In")
val genericReturnType = getTypeArgument(method)
val response = okHttpClient.newCall(request).execute()
val json = response.body?.string()
val result = gson.fromJson<T>(json, genericReturnType)
// 传出成果
logX("Start Emit")
emit(result)
logX("End Emit")
}
}
else -> {
val response = okHttpClient.newCall(request).execute()
val genericReturnType = method.genericReturnType
val json = response.body?.string()
Log.i("zyh", "invoke: json = $json")
//这儿这个调用,必须要传入泛型参数
gson.fromJson<Any?>(json, genericReturnType)
}
}
}
在isFlowReturn
分支中,咱们首先加了一些能够打印协程信息的log,便利咱们看线程切换作用。然后便是咱们十分了解的flow{}
高阶函数,它是Flow
的上游操作符,在创立Flow
的一起,运用emit
发送数据,这部分知识点在Flow
的文章中,咱们已经十分了解了。
最终咱们来进行调用:
findViewById<TextView>(R.id.flowReturnCall).setOnClickListener {
KtHttp.create(ApiService::class.java).reposFlow(language = "Kotlin", since = "weekly")
.flowOn(Dispatchers.IO)
.onStart {
Toast.makeText(this@MainActivity, "开端恳求", Toast.LENGTH_SHORT).show()
}
.onCompletion {
Toast.makeText(this@MainActivity, "恳求完结", Toast.LENGTH_SHORT).show()
}
.catch {
Log.i("Flow", "catch exception: $it")
}
.onEach {
logX("Display UI")
findViewById<TextView>(R.id.result).text = it.toString()
}
.launchIn(lifecycleScope)
}
相同的,咱们运用flowOn
来切换该操作符之前的操作的线程,然后运用launchIn
在搜集数据的一起指定Scope
。
打印如下:
在红框中,代码履行在主线程,网络恳求部分履行在作业线程,这样就完结了异步恳求,也不会形成Android的UI卡顿了。
总结
本篇文章从2个方面来介绍了Flow
的运用,当咱们运用第三方库时,能够运用第一种办法来支撑Flow
;当是新代码时,咱们就能够直接让其支撑Flow
。
本篇文章涉及的代码:github.com/horizon1234…