前语
一般来说前端的app和服务器通讯都是用的Http,Http运用方便,恳求流程好操控,但有时分app需求实时接纳服务端的推送或坚持长衔接,这时就需求运用Socket了
java供给的Socket接口仍是比较难用的,而网上有一个开源库OkScoket封装的仍是挺好用的,Github地址:github.com/xuuhaoo/OkS…
但即使如此,其没有一对一回调或同步恳求办法,只能经过一个或几个一致的回调办法,就形成了运用比较费事且简略犯错
而Retrofit运用比较好用,可是原生其只支撑Http,所以我将其封装了一下能够不用修正Retrofit和其接口的代码就能够简略方便安全的运用
正文
下面将一下原理,如果不想看的同学能够直接翻到下面看引证方法和怎么运用
咱们如果无侵入无修正接口的运用能够用两种方案
1.自界说动态署理
2.依据Retrofit供给的接口自界说完成Call.Factory和Call
1.自界说动态署理
该种方法比较灵敏,且代码很少,但无法利用Retrofit的其他优势,比方自界说回来值类型和解析器,比方支撑RxJava就需求自己在写一套或Copy一下,比方解析目标就要自己来处理
所以不运用该办法,但我将完成的代码放出来(只支撑POST和GET的注解,其他注解能够自行支撑)
咱们经过写相应接口的动态署理,并将本身恳求的回调注册到OkSocket的一致回调中,然后自己判别id来取回调,并将其改形成同步(自旋)和异步(回调)的Call在封装适配RxJava
import io.reactivex.Observable
import io.reactivex.Observer
import io.reactivex.disposables.Disposable
import io.reactivex.exceptions.CompositeException
import io.reactivex.exceptions.Exceptions
import io.reactivex.plugins.RxJavaPlugins
import okhttp3.Request
import retrofit2.Call
import retrofit2.Callback
import retrofit2.Response
import retrofit2.http.Field
import retrofit2.http.GET
import retrofit2.http.POST
import retrofit2.http.Query
import java.lang.reflect.*
import java.net.SocketTimeoutException
/**
* creator: lt 2021/1/23 lt.dygzs@qq.com
* effect : 将大部分Http转为Socket
* warning:
*/
object SocketRequest {
/**
* 动态署理单例目标
*/
val instance: IPostRequest = getPostRequest()
//获取动态署理实例目标
private fun getPostRequest(): IPostRequest {
val clazz = IPostRequest::class.java//拿到咱们被署理接口的class目标
return Proxy.newProxyInstance(//调用动态署理生成的办法来生成动态署理
clazz.classLoader,//类加载器目标
arrayOf(clazz),//由于咱们的接口不需求承继其他接口,所以直接传入接口的class就行
PostRequestHandler()//InvocationHandler接口的完成类,用来处理署理目标的办法调用
) as IPostRequest
}
class PostRequestHandler : InvocationHandler {
override fun invoke(proxy: Any, method: Method, args: Array<out Any?>?): Any? {
//处理Object类的办法
if (method.declaringClass == Any::class.java)
return (if (args == null) method.invoke(this) else method.invoke(this, *args))
val tMap = HashMap<String, Any>()
method.parameterAnnotations.forEachIndexed { index, it ->
//拿到参数的key,如果拿不到阐明需求运用http恳求
val key = (it.find { it is Field } as? Field)?.value
?: (it.find { it is Query } as? Query)?.value
?: return method.invoke(PostRequest.getPostRequest(), args)
tMap.put(key, args?.get(index) ?: "")
}
//拿到url,如果不是以斜杠最初就拼接上
var url = method.getAnnotation(POST::class.java)?.value
?: method.getAnnotation(GET::class.java)?.value
?: return method.invoke(PostRequest.getPostRequest(), args)
if (url.startsWith('/').not())
url = "/$url"
//处理回来值
return when (method.returnType) {
Call::class.java -> SocketCall<Any>(url, tMap, getParameterUpperBound(0, method.genericReturnType as ParameterizedType))
Observable::class.java -> createObservable(url, tMap, method)
else -> method.invoke(PostRequest.getPostRequest(), args)
}
}
}
private fun getParameterUpperBound(index: Int, type: ParameterizedType): Type {
val types = type.actualTypeArguments
require(!(index < 0 || index >= types.size)) { "Index " + index + " not in range [0," + types.size + ") for " + type }
val paramType = types[index]
return if (paramType is WildcardType) {
paramType.upperBounds[0]
} else paramType
}
private fun createObservable(url: String, tMap: HashMap<String, Any>, method: Method): Observable<Any?> =
SocketObservable(SocketObservable.CallExecuteObservable(SocketCall(url, tMap, getParameterUpperBound(0, method.genericReturnType as ParameterizedType))))
}
class SocketObservable<T>(private val upstream: Observable<Response<T>>) : Observable<T>() {
override fun subscribeActual(observer: Observer<in T>) {
upstream.subscribe(BodyObserver(observer))
}
private class BodyObserver<R>(val observer: Observer<in R>) : Observer<Response<R>> {
private var terminated = false
override fun onSubscribe(disposable: Disposable) {
observer.onSubscribe(disposable)
}
override fun onNext(response: Response<R>) {
if (response.isSuccessful) {
val body = response.body()
if (body != null) {
observer.onNext(body)
return
}
}
terminated = true
val t: Throwable = retrofit2.HttpException(response)
try {
observer.onError(t)
} catch (inner: Throwable) {
Exceptions.throwIfFatal(inner)
RxJavaPlugins.onError(CompositeException(t, inner))
}
}
override fun onComplete() {
if (!terminated) {
observer.onComplete()
}
}
override fun onError(throwable: Throwable) {
if (!terminated) {
observer.onError(throwable)
} else {
// This should never happen! onNext handles and forwards errors automatically.
val broken: Throwable = AssertionError(
"This should never happen! Report as a bug with the full stacktrace.")
broken.initCause(throwable)
RxJavaPlugins.onError(broken)
}
}
}
class CallExecuteObservable<T>(private val call: Call<T>) : Observable<Response<T>>() {
override fun subscribeActual(observer: Observer<in Response<T>>) {
// Since Call is a one-shot type, clone it for each new observer.
val disposable = CallDisposable(call)
observer.onSubscribe(disposable)
if (disposable.isDisposed) {
return
}
var terminated = false
try {
val response = call.execute()
if (!disposable.isDisposed) {
observer.onNext(response)
}
if (!disposable.isDisposed) {
terminated = true
observer.onComplete()
}
} catch (t: Throwable) {
Exceptions.throwIfFatal(t)
if (terminated) {
RxJavaPlugins.onError(t)
} else if (!disposable.isDisposed) {
try {
observer.onError(t)
} catch (inner: Throwable) {
Exceptions.throwIfFatal(inner)
RxJavaPlugins.onError(CompositeException(t, inner))
}
}
}
}
private class CallDisposable constructor(private val call: Call<*>) : Disposable {
@Volatile
private var disposed = false
override fun dispose() {
disposed = true
call.cancel()
}
override fun isDisposed(): Boolean {
return disposed
}
}
}
}
class SocketCall<T>(
val url: String,
val tMap: HashMap<String, Any>,
val trueReturnType: Type) : Call<T> {
private var canceled = false
private var isExecuted = false//是否运转过,一个call目标只允许运转一次
private var requestId = 0
/**
* 检查网络三十秒,如果没有衔接成功就 sync抛异常,async就回调false
* 传入[asyncCallback]表明async
*/
private fun checkConnect(asyncCallback: ((Boolean) -> Unit)? = null) {
if (SocketManage.manager.isConnect) {
asyncCallback?.invoke(true)
return
}
SocketManage.connect()
val time = System.currentTimeMillis()
if (asyncCallback == null) {
while (true) {
if (SocketManage.currConnStatus == 3)
return
if (System.currentTimeMillis() - time > 30000)
throw SocketTimeoutException()
try {
Thread.sleep(100)
} catch (e: Exception) {
e.printStackTrace()
}
}
} else {
ThreadPool.submitToCacheThreadPool {
while (true) {
if (SocketManage.currConnStatus == 3) {
asyncCallback(true)
return@submitToCacheThreadPool
}
if (System.currentTimeMillis() - time > 30000) {
asyncCallback(false)
return@submitToCacheThreadPool
}
try {
Thread.sleep(100)
} catch (e: Exception) {
e.printStackTrace()
}
}
}
}
}
override fun execute(): Response<T> {
checkConnect()
if (isExecuted) throw IllegalStateException("只能履行一次")
isExecuted = true
val data = TcpSendData(url, tMap)
requestId = data.request_id
"send2 : $url ${tMap.toJson()}".w("SocketManage22")
var a: Any? = null
var t: Throwable? = null
var notFinish = true
SocketManage.addListener(requestId, trueReturnType) { any: Any?, throwable: Throwable? ->
a = any
t = throwable
notFinish = false
}
//发送恳求
SocketManage.manager.send(data)
var whileNumber = 0
while (notFinish && !canceled) {
try {
whileNumber++
if (whileNumber % 20 == 0)
SocketManage.handlerTimeOutedListener()
Thread.sleep(50)
} catch (e: Exception) {
e.printStackTrace()
}
}
t?.let { throw it }
return if (a == null) throw ServerException("服务端回来的result是空") else Response.success(a as T)
}
override fun enqueue(callback: Callback<T>) {
checkConnect {
if (!it) {
HandlerPool.post {
callback.onFailure(this, SocketTimeoutException())
}
return@checkConnect
}
if (isExecuted) throw IllegalStateException("只能履行一次")
isExecuted = true
val data = TcpSendData(url, tMap)
"send : $url ${tMap.toJson()}".w("SocketManage22")
requestId = data.request_id
SocketManage.addListener(requestId, trueReturnType) { any: Any?, throwable: Throwable? ->
if (any != null)
callback.onResponse(this, Response.success(any as T))
else
callback.onFailure(this, throwable ?: ServerException("服务端回来的result是空2"))
}
//发送恳求
SocketManage.manager.send(data)
}
}
override fun clone(): Call<T> = SocketCall(url, tMap, trueReturnType)
override fun isExecuted(): Boolean = isExecuted
override fun cancel() {
canceled = true
// 取消恳求
SocketManage.removeListener(requestId)
}
override fun isCanceled(): Boolean = canceled
override fun request(): Request? = null
}
object : SocketActionAdapter() {
override fun onSocketReadResponse(info: ConnectionInfo?, action: String?, data: OriginalData?) {
data ?: return
...
//处理回调
val (_, type, listener) = listenerMap.remove(requestId) ?: return
try {
val any = Gson().fromJson<Any?>(body.getJSONObject("body").toString(), type)
HandlerPool.post {
if (any == null) listener(null, ServerException("服务端回来的result是空")) else listener(any, null)
}
} catch (t: Throwable) {
HandlerPool.post {
listener(null, t)
}
}
handlerTimeOutedListener()
...
具体能够参阅我之前模仿Retrofit完成的Http动态署理:模仿Retrofit封装一个运用更简略的网络恳求结构_滔lt的博客-CSDN博客
2.依据Retrofit供给的接口自界说完成Call.Factory和Call
ps:该方案被抛弃,由于其比较受约束,并且无法充分利用到Retrofit的功用,并且约束Http比较死,用Socket需求写很多死代码
经过查看Retrofit结构,发现传入的OkHttpClient其实便是一个Call.Factory的完成类,所以只需求自行完成Call.Factory,咱们也能够来操控Call的创立,只要Call能创立出来,其他的如Observable也仅仅对Call的包装
在经过和办法1相同的回调处理,即可完成效果
要害代码如下:
abstract class SocketCallAdapter(private val manager: IConnectionManager) : Call.Factory {
override fun newCall(request: Request): Call {
...
return SocketCall(
manager,
this,
url,
map
)
}
}
/**
* creator: lt 2021/2/27 lt.dygzs@qq.com
* effect : 用于Socket恳求的Call
* warning:
*/
internal class SocketCall(
private val manager: IConnectionManager,
private val adapter: SocketCallAdapter,
private val url: String,
private val tMap: HashMap<String, Any>) : Call {
//和第一种完成相同
...
}
3.依据我from的Retrofit库供给的才能来阻拦Retrofit.Call的创立
运用该方法能够更灵敏且更简略复用Retrofit的才能和插件
参阅更易于运用的Retrofit(不用写注解)_滔lt的博客-CSDN博客第七条和源码:GitHub – ltttttttttttt/Retrofit_SocketCallAdapter: Retrofit能够直接运用OkSocket来进行网络恳求,Retrofit内的东西都不需求修正,只需求将OkHttpClient换成此即可
ps:其实源码很简略,就几个文件且内容都很少,并且想自界说阻拦简直太方便了
运用方法
在根项目的build.gradle文件中参加:
allprojects {
repositories {
...
maven { url 'https://jitpack.io' }
}
}
app的build.gradle中加上
dependencies{
...
implementation 'com.github.ltttttttttttt:Retrofit_SocketCallAdapter:1.1.8'
implementation "com.github.ltttttttttttt:retrofit:1.3.0"
implementation 'org.jetbrains.kotlin:kotlin-reflect:1.4.30'
//todo 默许用的'com.github.ltttttttttttt:retrofit:',所以如果需求用到gson的阻拦器之类的,可是其间包括的有原版的retrofit的引证,会导致冲突,所以能够运用下面的办法来去掉本引证的某个远程依靠
//implementation 'com.squareup.retrofit2:converter-gson:2.7.0' exclude module: 'retrofit'
//todo 由于运用了com.github.ltttttttttttt:retrofit,所以无法运用默许的Retrofit库,由于供给了默许Retrofit库没有的功用,但其原有功用此库都有,所以仍是能够运用默许Retrofit库的一切功用的
}
代码中运用(也能够参阅Github内的InitRetrofit类):
OkSocket的初始化方法参阅:github.com/xuuhaoo/OkS… 运用也很简略
其他运用方法和Retrofit相同(但只支撑get的query和post的field转为socket恳求,其他方法能够评论留言,如果你能够完成直接提交更新!(欢迎大佬来补充更新))
val manager = OkSocket.open(ConnectionInfo(xxx,xxx))
...//初始化OkSocket的manager
val mId = AtomicInteger(0)
val handler = Handler(Looper.getMainLooper())
val socketAdapter = object : SocketAdapter(manager) {
//从响应数据中获取恳求id和body数据
override fun getResponseIdAndBodyBytes(data: OriginalData): Pair<Int, ByteArray>? {
val jb = JSONObject(String(data.bodyBytes))
if (!jb.has("id")) return null
return jb.getInt("id") to jb.getString("body").toByteArray()
}
//回来当时Socket在逻辑意义上是否和服务端连通了
override fun socketIsConnect(): Boolean = manager.isConnect
//依据url和恳求参数生成用于发送的数据和Id
override fun createSendDataAndId(url: String, requestParametersMap: HashMap<String, Any>, returns: (ISendable, Int) -> Unit) {
val id = mId.incrementAndGet()
val data = TcpSendData(id, url, requestParametersMap)//这儿需求你运用自己界说的数据发送类
returns(data, id)
}
//在主线程回调数据
override fun handlerCallbackRunnable(runnable: Runnable) {
handler.post(runnable)
}
}
//Socket的动态署理
val r = Retrofit.Builder()
.baseUrl(HttpConfig.ROOT_URL.toString())
.addConverterFactory(GsonConverterFactory.create())
.client(OkHttpClient())
//这儿设置了Retrofit.Call的生成阻拦,能够重写SocketServiceMethodFactory.createServiceMethod办法回来null表明自己用Socket处理不了而运用Http恳求
.setServiceMethodFactory(SocketServiceMethodFactory(manager,socketAdapter ))
.build()
.create(HttpFunctions::class.java)
源码地址:GitHub – ltttttttttttt/Retrofit_SocketCallAdapter: Retrofit能够直接运用OkSocket来进行网络恳求,Retrofit内的东西都不需求修正,只需求将OkHttpClient换成此即可