Isolate是Dart中重要的异步通讯方式,它的效果和用法不再赘述,关键是怎样高雅的与它进行数据通讯。这儿实践了一种写法,能够按这种写法结合自己的工程比较高雅地完成数据通讯。
一般写法
在Dart中与ioslate通讯的一般用这种形式,也可看官方的示例代码 :
import 'dart:isolate';
void main() async {
final recv = ReceivePort('main.incoming');
final isolate = await Isolate.spawn<SendPort>(
IsolateObj.setupIsolate,
recv.sendPort,
);
final main = MainObj();
recv.listen(main.handleResponsesFromIsolate);
// call it somewhere
main.create();
...
...
recv.close();
isolate.kill();
}
class MainObj {
void handleResponsesFromIsolate(dynamic msg) {
}
void create() {
_send?.send({
'type': 'create',
});
}
}
class IsolateObj {
static void setupIsolate(SendPort outgoing) async {
final incoming = ReceivePort('_isolate.incoming');
outgoing.send(incoming.sendPort);
}
}
在Isolate中,响应每个音讯的动作或许是不同的,于是不得不有一个switch case
,如果音讯的类型非常多则分支非常巨大。不管这个分支是针对数据类型的还是数据字段的,针对每个分支不得不新增许多处理办法:
class IsolateObj {
final SendPort outgoing;
IsolateObj(this.outgoing);
void _create(Map<String, dynamic> msg) {
}
void _dispose(Map<String, dynamic> msg) {
}
void _handleMessageFromMain(Map<String, dynamic> msg) {
final type = msg['type'];
switch (type) {
case 'create':
_create(msg);
break;
case 'dispose':
_dispose(msg);
break;
}
}
static void setupIsolate(SendPort outgoing) async {
final incoming = ReceivePort('_isolate.incoming');
outgoing.send(incoming.sendPort);
final messages = incoming.cast<Map<String, dynamic>>();
final obj = IsolateObj(outgoing);
messages.listen(obj._handleMessageFromMain);
}
}
另一个别扭的地方是ReceivePort
中接纳的数据类型是不同的,第一个数据元素类型是SendPort
,要让接纳方能够发送数据,其后的元素才干是真正通讯的数据类型。
void handleResponsesFromIsolate(dynamic msg) {
if (msg is SendPort) {
} else if (msg is Map<String, dynamic>) {
}
}
在Isolate中处理完音讯,或许需求再给主线程一些反应,也便是说主线程除了“发送”数据外也要“接纳”数据,来完成“双向通讯”。Dart中发送和接纳这两个操作是分开的,需求分别用SendPort
和ReceivePort
。而一旦接纳数据,主线程中也不得不树立一个巨大的switch case
:
class IsolateObj {
final SendPort outgoing;
IsolateObj(this.outgoing);
void _create(Map<String, dynamic> msg) {
// do some time consuming creation operation
outgoing.send({
'type': 'created',
});
}
void _dispose(Map<String, dynamic> msg) {
// do some dipose
outgoing.send({
'type': 'created',
});
}
void _handleMessageFromMain(Map<String, dynamic> msg) {
final type = msg['type'];
switch (type) {
case 'create':
_create(msg);
break;
case 'dispose':
_dispose(msg);
break;
}
}
}
class MainObj {
void handleResponsesFromIsolate(dynamic msg) {
if (msg is SendPort) {
} else if (msg is Map<String, dynamic>) {
switch (msg['type']) {
case 'created':
_onCreated(msg);
break;
case 'disposed':
break;
}
}
}
void create() {
_send?.send({
'type': 'create',
});
}
void _onCreated(msg) {
final data = msg['data'];
}
}
显然这种写法的第三个缺陷是上下文割裂。发送处和接纳处位于不同的办法体内,许多时候不得不再存储额外的上下文信息。由于时序的原因,一些类成员也不得不声明成可空类型,比如SendPort?
:
class MainObj {
SendPort? _send;
void handleResponsesFromIsolate(dynamic msg) {
if (msg is SendPort) {
_send = msg;
} else if (msg is Map<String, dynamic>) {
}
}
}
上下文关联
已然Isolate通讯是用一种”通道“,咱们能不能像http恳求那样发送完恳求之后直接“等候”当次恳求的回来?就像这样:
class MainObj {
Future<void> create() async {
_send?.send({
'type': 'create',
});
final msg = await getNextElementFromIsolate();
final data = msg['data'];
}
}
这样主线程一侧就能够移除switch case
这一坨,并且create
和onCreated
能够兼并起来。咱们能够发现ReceivePort
其实便是一个Stream
。能够满足获取下一个元素的办法只要Stream.first
,并且只要能获取下一个Stream
元素,有了Stream.first
的另一个妙处是,SendPort
能够防止成为可空类型:
final isolate = await Isolate.spawn<SendPort>(
IsolateObj.setupIsolate,
recv.sendPort,
);
final port = await recv.first;
final data = recv.cast<Map<String, dynamic>>();
final main = MainObj(data, port);
class MainObj {
final Stream<Map<String, dynamic>> _data;
final SendPort _send;
MainObj(this._data, this._send);
Future<void> create() async {
_send.send({
'type': 'create',
});
final msg = await _data.first;
final data = msg['data'];
}
}
太棒了,一下子大大简化了主线程这一侧的操作逻辑!
可是!别高兴太早。这个完成有个很大的问题。
监听时序
咱们的意图其实很清晰,发送一次恳求,接纳这次恳求的成果,也便是恳求和成果1对1。Stream
中的元素应该一个接一个的接纳,上一个元素被消费完,下一个元素再接纳,不然成果就会紊乱。然而问题就在Stream.first
中的完成仅仅是加了一个linstener,等同于Stream.elementAt(0)
。屡次调用Stream.first
,仅仅注册屡次监听,当一个元素数据来了成果仅仅触发屡次告诉,这并不是咱们期望的“1对1”。也便是说:
final results = [
await _data.first,
await _data.first,
]; // [element1, element2]
这样写是成果是对的,在接纳了一个元素之后再去等候下一个元素,但如果同时等候并行的操作成果,那数据则是错误的:
final results = await Future.wait([
_data.first,
_data.first,
]); // [element1, element1]
实际开发中,第二种情况才是最多的,比如在main.create()
调用之后再一次调用了main.create()
,这时候数据还没有在isolate中处理,等处理完之后内部接纳的其实是相同的数据。
流式队列
要处理这个问题,有必要得用新的办法。目的很清晰:在流中一个一个的获取元素对象。多亏了美好的StreamQueue
,已经帮咱们达到这个目的。StreamQueue
放在package:async/async.dart
中。只需求一点点更改:
final port = await recv.first;
final data = recv.cast<Map<String, dynamic>>();
final main = MainObj(StreamQueue(data), port);
class MainObj {
final StreamQueue<Map<String, dynamic>> _data;
final SendPort _send;
MainObj(this._data, this._send);
Future<void> create() async {
_send.send({
'type': 'create',
});
final msg = await _data.next;
final data = msg['data'];
}
}
轻松完成咱们的效果!其有用一个简略的List<Completer<Map<String, dynamic>>
就能够完成以上的效果,有爱好的朋友能够尝试一下!
被迫告诉
现实情况永远是更杂乱的,在异步通讯中,可不是只要主动恳求的情况,还有一种被迫告诉的情况,也便是不需求主线程发送恳求,不守时接纳isolate的告诉。这和网络恳求是类似的,ReceivePort
就像一个长衔接,在长衔接树立起来之后,服务端很或许会主动推送一些事件,客户端然后被迫接纳告诉。这样的情况或许会构成乱序,本来流程是这样:
main isolate
create -----------------> _create
create -----------------> _create
onCreated <-------------- data1
onCreated <-------------- data2
但如果稠浊了被迫告诉:
main isolate
create -----------------> _create
create -----------------> _create
onCreated <-----notify--- data3
onCreated <-------------- data1
? <-------------- data2
所以问题的关键是咱们预设了接纳的响应对应最近一次发送的恳求,典型的客户端服务端形式。但实际情况不是这样,Isolate虽然是执行主线程发出的指令,但在执行的过程中或许会触发某个条件去告诉主线程,在这种情况下,主线程接纳的或许都是错误的成果。
要处理这个问题其实也非常简略。已然被迫告诉的音讯会构成时序紊乱,那把它从流中过滤出来单独构成一个流就能够了。
final port = await recv.first;
final receiving = recv.cast<Map<String, dynamic>>().asBroadcastStream();
final data = receiving.where((e) => e['type'] != 'notify');
final notify = receiving.where((e) => e['type'] == 'notify');
final main = MainObj(StreamQueue(data), port);
notify.listen((e) {
// do something on notified.
});
到这儿,乱序问题其实已经处理了。但这儿给出另一种办法,已然被迫告诉型音讯需求从流中除掉,那干脆就不要放进这个流里——谁说只能有一个通道的?咱们知道ReceivePort
其实便是一个Stream,那针对notify音讯,单独开设一个通道就好,终究代码如下:
import 'dart:isolate';
import 'package:async/async.dart' show StreamQueue;
void main() async {
final recv = ReceivePort('main.incoming');
final notify = ReceivePort('notify.incoming');
final isolate = await Isolate.spawn<(SendPort, SendPort)>(
IsolateObj.setupIsolate,
(recv.sendPort, notify.sendPort),
);
final port = await recv.first;
final receiving = recv.cast<Map<String, dynamic>>().asBroadcastStream();
final main = MainObj(StreamQueue(receiving), port);
notify.cast<Map<String, dynamic>>().listen(main.onNotified);
await main.create();
recv.close();
isolate.kill();
}
class MainObj {
final StreamQueue<Map<String, dynamic>> _data;
final SendPort _send;
MainObj(this._data, this._send);
Future<void> create() async {
_send.send({
'type': 'create',
});
final msg = await _data.next;
final data = msg['data'];
// continue to do something.
}
void onNotified(Map<String, dynamic> data) {
}
}
class IsolateObj {
final SendPort outgoing;
final SendPort notify;
IsolateObj(this.outgoing, this.notify);
void _create(Map<String, dynamic> msg) {
// do some creation
outgoing.send({
'type': 'created',
});
}
void _dispose(Map<String, dynamic> msg) {
}
void _notifyCallback() {
notify.send({
'type': 'notify',
});
}
void _handleMessageFromMain(Map<String, dynamic> msg) {
final type = msg['type'];
switch (type) {
case 'create':
_create(msg);
break;
case 'dispose':
_dispose(msg);
break;
}
}
static void setupIsolate((SendPort, SendPort) r) async {
final (outgoing, notify) = r;
final incoming = ReceivePort('_isolate.incoming');
outgoing.send(incoming.sendPort);
final messages = incoming.cast<Map<String, dynamic>>();
final obj = IsolateObj(outgoing, notify);
messages.listen(obj._handleMessageFromMain);
}
}
多亏了Dart3中的Record,传送多参数现在非常简略了!
这种写法能够作为Isolate通讯的一种新范式,和各种switch case
说再见吧!