我正在参加「启航计划」
1、前语
经过观察我的更新频率,你或许会得出我是一个鸽子的结论。不过请听我狡辩一下。最近我沉迷于编写服务器端的运用,由于虽然咱们说Android运用Java,但实际上还有一些小技巧。今天,我决定痛改前非,并为大家带来一篇关于Android WebSocket的教程。当然,咱们将从零开端,一步一步地构建它。(咕咕咕)一同学习、一同前进。假如写的欠好,或者有过错之处,恳请在评论、私信、邮箱指出,万分感谢
2、WebScoket
它的主要功用是允许服务器主意向客户端推送信息,一起也允许客户端主意向服务器发送信息。
WebSocket是一种依据TCP的全双工通讯协议,经过它能够在客户端和服务器之间树立一个耐久的衔接,完成实时的双向数据传输。与传统的HTTP恳求-响应形式不同,WebSocket提供了更高效、实时性更强的通讯方式。
由于WebSocket是一种协议,因而不能直接运用。但是幸运的是,咱们有许多老练的开源库和结构能够帮助咱们轻松地运用WebSocket功用。
- OKHttp:OKHttp是一个强大的网络库,提供了对WebSocket的完好支撑。它具有杰出的文档和活跃的维护,是许多Android开发者首选的WebSocket计划。
- Java-WebSocket:Java-WebSocket是一个纯Java完成的WebSocket库,能够在Android项目中运用。它具有简略易用的API,并支撑WebSocket协议的各种功用。
- AndroidAsync:AndroidAsync是一个依据Java NIO的异步网络库,提供了对WebSocket的支撑。它具有简洁的API和高性能,适用于处理高并发的WebSocket衔接。
- Autobahn Android:Autobahn Android是依据Autobahn库的Android版本,它完成了WebSocket协议的客户端和服务器端功用。它支撑高档特性如RPC(远程进程调用)和发布-订阅形式。
这儿我选择OKHttp,假如你问我为什么。我只能说,正经人都在用Retrofit,干嘛不用OKHttp呢。
3、OKHttp WebScoket
- 增加依赖项到你的build.gradle(随便选个版本吧)。
implementation 'com.squareup.okhttp3:okhttp:4.9.3'
//or
implementation("com.squareup.okhttp3:okhttp:4.9.3")
-
创立WebSocket客户端
相信假如你了解过Retrofit或者OKHttp,这是十分简略的
private val wsHttpClient by lazy { OkHttpClient.Builder() .pingInterval(10, TimeUnit.SECONDS) // 设置 PING 帧发送间隔 .build() } val request = Request.Builder() .url("ws://xixixixixix") .build()
-
树立一个WS衔接~
wsHttpClient.newWebSocket(request, object : WebSocketListener() { override fun onOpen(webSocket: WebSocket, response: Response) { super.onOpen(webSocket, response) Log.i(TAG,"WS connection successful") // WebSocket 衔接树立 } override fun onMessage(webSocket: WebSocket, text: String) { super.onMessage(webSocket, text) Log.i(TAG,"openWs onMessage $text") // 收到服务端发送来的 String 类型音讯 } override fun onMessage(webSocket: WebSocket, bytes: ByteString) { super.onMessage(webSocket, bytes) // 收到服务端发送来的 ByteString 类型音讯 Log.i(TAG,"openWs onMessage $text") } override fun onClosing(webSocket: WebSocket, code: Int, reason: String) { super.onClosing(webSocket, code, reason) Log.i(TAG,"openWs onClosing") // 收到服务端发来的 CLOSE 帧音讯,准备封闭衔接 } override fun onClosed(webSocket: WebSocket, code: Int, reason: String) { super.onClosed(webSocket, code, reason) Log.i(TAG,"openWs onClosed") // WebSocket 衔接封闭 } override fun onFailure(webSocket: WebSocket, t: Throwable, response: Response?) { super.onFailure(webSocket, t, response) Log.i(TAG,"openWs onFailure : Ws衔接失利 ${response}") // WebSocket衔接失利 } })
对于接纳音讯是onMessage,不过咱们暂时按下不表
-
发送音讯至服务端
能够看到,咱们树立衔接成功后,在onOpen办法中会回来webSocket: WebSocket,运用这个目标,你能够向服务器发送音讯
// 发送文本音讯 webSocket.send("Hello, WebSocket!") // 发送二进制音讯 webSocket.send("Hello, WebSocket!".toByteString())
记得在运用WebSocket发送音讯时,要保证衔接现已成功树立。不然,假如在衔接未树立或已封闭的状况下测验发送音讯,将会导致过错。
4、onMessage
在运用OKHttp WebSocket时,一旦成功树立WebSocket衔接并接纳到onOpen
办法的回调后,基本上一个WebSocket的设置就完成了。
然而,在处理服务器发送的音讯时,你需求将onMessage
中接纳到的音讯进行适当的分发,以便依据你的需求进行后续处理。这样能够保证接纳到的音讯能够被正确地传递和处理,然后完成更灵活的音讯处理机制。
一般而言你需求如此:(接下来的代码不要抄!!!!)
1)监听器
-
创立WebSocket监听器接口:在WebSocket通讯类外部创立一个接口,用于界说
onMessage
回调办法。例如:interface OnListener { fun onMessageReceived(message: String) }
-
在WebSocket通讯类中持有监听器引证:在WebSocket通讯类中增加一个成员变量来持有
WebSocketListener
接口的引证。例如:class WebSocketClient(private val webSocketListener: WebSocketListener) { // ... }
-
在WebSocket通讯类的
onMessage
回调中调用监听器办法:在WebSocket通讯类的onMessage
回调中,将接纳到的音讯传递给监听器的办法。例如:wsHttpClient.newWebSocket(request, object : WebSocketListener() { // ... override fun onMessage(webSocket: WebSocket, text: String) { // 收到文本音讯 webSocketListener.onMessageReceived(text) } // ... })
-
在另一个类中完成WebSocketListener接口:在另一个类中完成WebSocketListener接口,并完成
onMessageReceived
办法来接纳音讯。例如:class MyWebSocketListener : WebSocketListener { override fun onMessageReceived(message: String) { // 处理接纳到的音讯 // ... } }
-
运用自界说的WebSocketListener:在您的运用程序中,创立一个实例化了自界说的WebSocketListener的WebSocketClient目标。例如:
val webSocketListener = MyWebSocketListener() val webSocketClient = WebSocketClient(webSocketListener)
以上1-5,都是骗你的,由于不或许只要一个办法或类能够处理音讯,所以能够运用观察者形式或事情总线来完成。
2)观察者形式
遗忘上面的代码
- 创立一个音讯事情类:创立一个表明WebSocket音讯的事情类。
data class MessageEvent(val message: String)
- 创立一个WebSocket事情观察者接口:创立一个WebSocket事情观察者接口,界说用于接纳WebSocket音讯的办法。
interface OnMessageListener {
fun onWebSocketMessage(event: MessageEvent)
}
- 在WebSocket通讯类中增加观察者列表:在WebSocket通讯类中增加一个观察者列表和相应的办法,用于增加、移除和告诉观察者。
object WsManager {
private val observers = mutableListOf<OnMessageListener>()
fun addWebSocketListener(observer: OnMessageListener) {
observers.add(observer)
}
fun removeWebSocketListener(observer: OnMessageListener) {
observers.remove(observer)
}
private fun notifyWebSocketMessage(message: String) {
val event = MessageEvent(message)
for (observer in observers) {
observer.onWebSocketMessage(event)
}
}
// WebSocket onMessage回调
private val webSocketListener = object : WebSocketListener() {
override fun onMessage(webSocket: WebSocket, text: String) {
notifyWebSocketMessage(text)
}
}
// 其他WebSocket相关办法
}
- 在接纳WebSocket音讯的办法或类中完成OnMessageListener接口:在需求处理WebSocket音讯的办法或类中,完成OnMessageListener接口,并在
onWebSocketMessage
办法中处理音讯。
class MyWebSocketActivity : OnMessageListener {
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
WsManager.addWebSocketListener(this)
}
override fun onDestroy() {
super.onDestroy()
WsManager.removeWebSocketListener(this)
}
override fun onWebSocketMessage(event: MessageEvent) {
val message = event.message
// 处理WebSocket音讯
// ...
}
}
现在,当WebSocket接纳到音讯时,WebSocket通讯类将告诉一切注册的观察者,它们将独立地接纳和处理WebSocket音讯。
这种办法运用观察者形式将onMessage
回调的音讯分发给多个办法或类,让它们能够独立地处理音讯。您能够依据需求增加更多的观察者,并在notifyWebSocketMessage
办法中告诉它们。
当然,这只是一个未完善的版本,你还需求考虑多线程的,粘性音讯等。咱们接着说,总会说完的
3)事情总线
1、EventButs
运用事情总线来完成WebSocket音讯的分发,能够考虑运用第三方库,如EventBus或RxJava,来简化事情的发布和订阅进程。
以下是运用EventBus库的示例:
-
首先,保证已将EventBus库增加到您的项目中。您能够在Gradle文件的dependencies块中增加以下行:
implementation("org.greenrobot:eventbus:3.2.0")
-
然后,界说一个WebSocket音讯事情类:
data class WebSocketMessageEvent(val message: String)
-
将WebSocket接纳到的音讯发布到事情总线:
wsHttpClient.newWebSocket(request, object : WebSocketListener() { // ... override fun onMessage(webSocket: WebSocket, text: String) { // 收到文本音讯 EventBus.getDefault().post(WebSocketMessageEvent(message)) } // ... })
-
在需求处理WebSocket音讯的办法或类中,订阅WebSocket音讯事情并界说相应的处理办法:
class MyWebSocketActivity : OnMessageListener { override fun onCreate(savedInstanceState: Bundle?) { super.onCreate(savedInstanceState) EventBus.getDefault().register(this) } override fun onDestroy() { super.onDestroy() EventBus.getDefault().unregister(this) } // 界说处理WebSocket音讯事情的办法 @Subscribe(threadMode = ThreadMode.MAIN) fun onWebSocketMessage(event: WebSocketMessageEvent) { val message = event.message println("Received message: $message") // 处理接纳到的音讯 // ... } }
2、自界说EventBus
为了愈加直观的,看到分发的原理,咱们自己写一个,嘻嘻嘻。
-
WsModel
EventBus是经过目标类型来判别是否需求接纳,界说音讯结构。但是我想经过自己操控传入一个标识,所以咱们需求一个
enum class WsModel {
MAIN, CHAT
}
-
@WsSubscribe
首先咱们需求一个订阅的注解
@Retention(AnnotationRetention.RUNTIME)
@Target(AnnotationTarget.FUNCTION)
annotation class WsSubscribe(val model: WsModel, val priority: Int = 0)
运用的话就很简略了
@WsSubscribe(WsModel.CHAT)
fun onChatMessage(any: Any) {}
-
改造WsManger
咱们需求改造下WsManger,用来register、unRegister办法
fun register(subscriber: Any) {}
@Synchronized
fun unregister(subscriber: Any) {}
-
WsSubscriberMethod
在开端之前register,咱们创立一个
WsSubscriberMethod
- 存储办法信息:
WsSubscriberMethod
目标用于存储订阅办法的相关信息,包含办法名、参数类型、线程形式等。经过将这些信息封装到WsSubscriberMethod
目标中,能够方便地管理和传递办法相关的信息。 - 办法调用:在事情发布时,需求找到对应的订阅办法,并将事情传递给这些办法进行处理。
WsSubscriberMethod
目标中存储了办法的相关信息,包含办法名和参数类型,能够在事情发布时快速找到对应的办法并进行调用。 - 线程调度:
WsSubscriberMethod
目标还能够存储线程形式信息,指示订阅办法应该在哪个线程上履行。这能够用于完成事情的异步处理或跨线程通讯。经过存储线程形式信息,能够保证订阅办法在适当的线程上履行,防止了线程安全性问题或堵塞主线程的状况。 - 注册和撤销注册:
WsSubscriberMethod
目标能够用于注册和撤销注册订阅办法。在注册进程中,能够经过WsSubscriberMethod
目标将订阅办法与事情类型树立相关,然后完成事情的订阅。在撤销注册时,能够经过WsSubscriberMethod
目标找到对应的订阅办法,并将其从事情体系中移除。
- 存储办法信息:
-
Register
注册订阅者目标,将目标中带有 @WsSubscribe 注解的订阅办法增加到订阅列表中
- 体系获取注册目标中的一切办法。
- 对于每个办法,体系查看是否存在
@WsSubscribe
注解。 - 假如存在
@WsSubscribe
注解,则提取相关的事情类型信息。 - 将事情类型和对应的办法树立映射关系,将办法注册为事情处理程序或订阅者。
看代码
/** * @param subscriber 要注册的订阅者目标 */ fun register(subscriber: Any) { // 查找订阅者目标中的订阅办法 val subscriberMethods: List<WsSubscriberMethod> = findSubscriberMethods(subscriber) // 对订阅操作进行同步,防止多线程竞赛问题 synchronized(this) { // 遍历订阅办法列表,将每个办法增加到订阅列表中 for (subscriberMethod in subscriberMethods) { subscribe(subscriber, subscriberMethod) } } }
先看看奇特的
findSubscriberMethods
/** * 查找目标中带有 @WsSubscribe 注解的订阅办法并回来它们的信息列表 * * @param obj 要查找的目标 * @return 订阅办法的信息列表 */ private fun findSubscriberMethods(obj: Any): List<WsSubscriberMethod> { // 存储订阅办法的列表 val subscribers = mutableListOf<WsSubscriberMethod>() // 获取目标的类信息 val objClass = obj.javaClass // 获取目标类中声明的一切办法 val declaredMethods = objClass.declaredMethods // 遍历每个办法 for (method in declaredMethods) { // 获取办法的修饰符 val modifiers = method.modifiers // 结构完好的办法名,格式为 "类名.办法名" val methodName = "${method.declaringClass.name}.${method.name}" // 查看办法的修饰符,满意条件才进行处理 if ((modifiers and Modifier.PUBLIC) != 0 && (modifiers and MODIFIERS_IGNORE) == 0) { // 获取办法的参数类型列表 val parameterTypes = method.parameterTypes // 查看办法的参数个数,有必要为 1 if (parameterTypes.size == 1) { // 获取办法上的 @WsSubscribe 注解 val subscribeAnnotation: WsSubscribe? = method.getAnnotation(WsSubscribe::class.java) // 查看注解是否存在 if (subscribeAnnotation != null) { // 将订阅办法的信息封装为 WsSubscriberMethod 目标,并增加到列表中 subscribers.add( WsSubscriberMethod( subscribeAnnotation.model, method, subscribeAnnotation.priority, ) ) } } else if (method.isAnnotationPresent(WsSubscribe::class.java)) { // 参数个数不为 1,但办法上存在 @WsSubscribe 注解,抛出反常(为什么一定要一个呢,不是两个呢,不是0个呢?你能够考虑下,嘻嘻) throw IllegalArgumentException("@WsSubscribe method $methodName must have exactly 1 parameter but has ${parameterTypes.size}") } } else { // 办法的修饰符不符合要求,但办法上存在 @WsSubscribe 注解,抛出反常 if (method.isAnnotationPresent(WsSubscribe::class.java)) { throw IllegalArgumentException("$methodName is a illegal @WsSubscribe method: must be public, non-static, and non-abstract") } } } // 回来订阅办法的信息列表 return subscribers }
结合注释,你必定能够的啦。
你或许留意到了
subscriptionsByModelType
,他的声明如下:用于存储依照WsModel
类型分类的订阅列表。方便到时候依据WsModel分发~private val subscriptionsByModelType: MutableMap<WsModel, CopyOnWriteArrayList<WsSubscription>> = EnumMap(WsModel::class.java)
当然了,register中还有一个办法subscribe,是在
findSubscriberMethods
后,对回来值进行遍历那么它是private fun subscribe(subscriber: Any, subscriberMethod: WsSubscriberMethod) { // 获取订阅的模型类型 val modelType: WsModel = subscriberMethod.modelType // 创立新的订阅目标 val newSubscription = WsSubscription(subscriber, subscriberMethod) // 获取模型类型对应的订阅列表 var subscriptions: CopyOnWriteArrayList<WsSubscription>? = subscriptionsByModelType[modelType] // 假如订阅列表为空,则创立一个新的订阅列表并将其相关到模型类型 if (subscriptions == null) { subscriptions = CopyOnWriteArrayList<WsSubscription>() subscriptionsByModelType[modelType] = subscriptions } else { // 假如订阅列表不为空,查看是否已存在相同的订阅目标,若存在则抛出反常 if (subscriptions.contains(newSubscription)) { throw IllegalArgumentException("Subscriber ${subscriber.javaClass} already registered to event $modelType") } } // 在适宜的位置插入新的订阅目标,依据优先级从高到低排序 val size = subscriptions.size for (i in 0..size) { if (i == size || subscriberMethod.priority > subscriptions[i].subscriberMethod.priority) { subscriptions.add(i, newSubscription) break } } // 更新订阅者订阅的事情列表 var subscribedEvents: MutableList<WsModel>? = typesBySubscriber[subscriber] if (subscribedEvents == null) { subscribedEvents = ArrayList() typesBySubscriber[subscriber] = subscribedEvents } subscribedEvents.add(modelType) }
总的来说便是将一个订阅者目标和其对应的订阅办法增加到事情总线中进行订阅。具体来说:
-
获取订阅办法对应的模型类型。
-
创立一个新的订阅目标,将订阅者目标和订阅办法封装起来。
-
依据模型类型从 subscriptionsByModelType 中获取对应的订阅列表。
- 假如订阅列表不存在,就创立一个新的
CopyOnWriteArrayList<WsSubscription>
并将其相关到模型类型。 - 假如订阅列表现已存在,查看是否现已存在相同的订阅目标,假如存在则抛出反常。
- 假如订阅列表不存在,就创立一个新的
-
依据订阅办法的优先级,将新的订阅目标插入到订阅列表中的适宜位置,以保持订阅办法的优先级有序。
-
更新订阅者目标的订阅事情列表,将模型类型增加到该列表中。
经过履行这些进程,订阅者就能成功订阅指定的事情,而且事情总线会将订阅者的订阅办法按优先级有序地存储起来,以便在事情发布时依照订阅者的要求进行调用。
-
UnRegister
Register都写好了,Unregister就更简略了
-
从
typesBySubscriber
中获取订阅者目标现已订阅的事情类型列表。 -
假如获取到的
subscribedTypes
不为 null,则表明该订阅者目标有进行过订阅操作。- 遍历订阅者目标现已订阅的事情类型列表。
- 对于每个事情类型,调用
unsubscribeByEventType(subscriber, eventType)
办法进行撤销订阅操作。 - 从
typesBySubscriber
中移除该订阅者目标的订阅记载。
-
假如获取到的
subscribedTypes
为 null,则表明该订阅者目标在事情总线中未进行过注册。
@Synchronized fun unregister(subscriber: Any) { val subscribedTypes: List<WsModel>? = typesBySubscriber[subscriber] if (subscribedTypes != null) { for (eventType in subscribedTypes) { unsubscribeByEventType(subscriber, eventType) } typesBySubscriber.remove(subscriber) } else { Log.i( TAG, "WsManager Subscriber to unregister was not registered before: ${subscriber.javaClass}" ) } }
-
-
Post
嘻嘻嘻,注册、反注册都好了,就只要Post啦;那么如何Post呢
首先咱们如何将事情发布给订阅者的办法呢?
private fun postToSubscription(subscription: WsSubscription,event: Any) { try { // 运用反射调用订阅办法来处理事情 subscription.subscriberMethod.method.invoke(subscription.subscriber, event) } catch (e: InvocationTargetException) { Log.i(TAG, "WsManager $e") // 处理订阅者反常(暂未完成) } catch (e: IllegalAccessException) { throw IllegalStateException("Unexpected exception", e) } }
直接运用subscriberMethod.method.invoke就完事了~
接着直接在for
循环中调用postToSubscription()
办法,那么就完事了。NO,NO,NO
在多线程环境下,直接在for
循环中调用postToSubscription()
办法或许会导致并发拜访的问题,然后导致线程安全性问题。因而,运用PostingThreadState
类来维护当时线程的发布状况,以保证线程安全性。
/**
* 用于保存当时线程的发布状况的内部类。
*/
internal class PostingThreadState {
val eventQueue: MutableList<Any> = arrayListOf()
var isPosting = false
var isMainThread = false
var subscription: WsSubscription? = null
var event: Any? = null
var canceled = false
}
/**
* 将单个事情发布给其订阅者,针对给定的事情类型。
* 假如存在订阅者,回来true;不然回来false。
*/
private fun postSingleEventForEventType(
event: Any,
postingState: PostingThreadState,
eventModel: WsModel
): Boolean {
var subscriptions: CopyOnWriteArrayList<WsSubscription>?
synchronized(this) { subscriptions = subscriptionsByModelType[eventModel] }
Log.i(TAG, "subscriptions $subscriptions")
if (!subscriptions.isNullOrEmpty()) {
// 遍历订阅者列表,依次发布事情
for (subscription in subscriptions!!) {
postingState.event = event
postingState.subscription = subscription
// 发布事情并查看是否被撤销
val aborted: Boolean = try {
postToSubscription(subscription, event)
postingState.canceled
} finally {
postingState.event = null
postingState.subscription = null
postingState.canceled = false
}
// 假如事情发布被撤销,则终止发布进程
if (aborted) {
break
}
}
return true
}
return false
}
/**
* 将单个事情发布给其订阅者,针对特定的事情类型。
*/
@Throws(Error::class)
private fun postSingleEvent(model: WsModel, event: Any, postingState: PostingThreadState) {
// 测验发布事情并回来是否找到订阅者
val subscriptionFound: Boolean = postSingleEventForEventType(event, postingState, model)
// 假如没有订阅者,则记载日志
if (!subscriptionFound) {
Log.i(TAG, "WsManager No subscribers registered for event $model")
}
}
post代码在这
/**
* 将给定的事情发布到事情总线。
*/
private fun post(model: WsModel, event: Any) {
// 获取当时线程的发布状况
val postingState: PostingThreadState = currentPostingThreadState.get() as PostingThreadState
// 获取事情队列
val eventQueue: MutableList<Any> = postingState.eventQueue
// 将事情增加到队列中
eventQueue.add(event)
// 假如当时没有正在发布事情,则开端进行事情发布
if (!postingState.isPosting) {
// 判别是否在主线程中发布事情
postingState.isMainThread = Looper.getMainLooper() == Looper.myLooper()
postingState.isPosting = true
// 查看发布状况是否被撤销
if (postingState.canceled) {
throw IllegalArgumentException("Internal error. Abort state was not reset")
}
try {
// 循环处理事情队列中的事情
while (eventQueue.isNotEmpty()) {
postSingleEvent(model, eventQueue.removeAt(0), postingState)
}
} finally {
// 发布完成后重置发布状况
postingState.isPosting = false
postingState.isMainThread = false
}
}
}
结合注释看,一会儿就明白了~~
好了现在咱们只需求在WsManager中调用一下就完事了(这儿我和服务器约定的是在回来数据的data中带上model:WsModel),总之这儿其实和恳求数据接纳一个意思。
大约长这样
{
"code": 200,
"msg": "string",
"data": {
"model": "CHAT"
}
}
wsHttpClient.newWebSocket(request, object : WebSocketListener() {
// ...
override fun onMessage(webSocket: WebSocket, text: String) {
try {
val msg: BaseResponse<WsMessage>? = text.toObject()
msg?.getData()?.model?.let {
post(it, msg)
} ?: run {
LogW("openWs onMessage error msg?.getData()?.model?")
}
} catch (e: ClassCastException) {
LogW("openWs onMessage error $e")
}
}
// ...
})
这样你的@WsSubscribe就能够收到对应model类型的音讯啦。
5、下个华章
由于篇幅原因,咱们先到这,哈,还有很要害的重连战略没有说,我还没写完,马上~咕咕咕。
假如您有任何疑问、对文章写的不满意、发现过错或者有更好的办法,欢迎在评论、私信或邮件中提出,十分感谢您的支撑。
6、感谢
- 校稿:ChatGpt
- 文笔优化:ChatGpt
代码都在这儿啦,或许有所出入,大差不差啦