Stream是什么
Stream 是一系列异步事情的序列。其类似于一个异步的 Iterable,不同的是当你向 Iterable 获取下一个事情时它会当即给你,可是 Stream 则不会当即给你而是在它准备好时告诉你。
单个 | 0个或多个 | |
---|---|---|
Sync | int |
Interable<int> |
Async | Future<int> |
Stream<int> |
Stream目标
- StreamController:用于整个Stream过程的控制,供给各类接口用于创立各种事情流。
- StreamSink:事情的入口,
add
,addStream
等。 - Stream:事情源自身,一般可用于监听事情或者对事情进行转换,如
listen
、where
。 - StreamSubscription:事情订阅后的目标,表面上用于办理订阅过各类操作,如
cancel
、pause
,同时在内部也是事情的中转关键。
联系
有一个事情源叫Stream,为了方便控制Stream,官方供给了StreamController作为办理;同时它对外供给了
StreamSink
目标作为事情入口;又为Stream
特点供给了监听和变换,最终得到StreamSubscription
能够办理事情的订阅。
Stream作业原理
-
Stream
在listen
的时分传入onData
回调,这个回调会传入到StreamSubscription
中,之后经过zone.registerUnaryCallback
注册得到_onData
目标 -
StreamSink
在增加事情时,会履行到StreamSubscription
中的_sendData
办法,然后经过_zone.runUnaryGuarded(_onData, data)
履行1中得到的_onData
目标,触发listen
时传入的回调办法
//1.listen传入onData回调到StreamSubscription中
StreamSubscription<T> listen(void onData(T data)?,
{Function? onError, void onDone()?, bool? cancelOnError}) {
cancelOnError ??= false;
StreamSubscription<T> subscription =
_createSubscription(onData, onError, onDone, cancelOnError);
_onListen(subscription);
return subscription;
}
//为节省篇幅,已省掉部分代码
//在此,现已获取到_onData函数目标
_onData = _registerDataHandler<T>(_zone, onData),
//把onData传入进行注册
static void Function(T) _registerDataHandler<T>(
Zone zone, void Function(T)? handleData) {
return zone.registerUnaryCallback<void, T>(handleData ?? _nullDataHandler);
}
仿制代码
//2.sink增加事情StreamSubscription._sendData,然后调用_zone.runUnaryGuarded(_onData, data),
/* _EventDispatch interface. */
void _sendData(T data) {
assert(!_isCanceled);
assert(!_isPaused);
assert(!_inCallback);
bool wasInputPaused = _isInputPaused;
_state |= _STATE_IN_CALLBACK;
_zone.runUnaryGuarded(_onData, data);
_state &= ~_STATE_IN_CALLBACK;
_checkState(wasInputPaused);
}
关于zone
和zone.registerUnaryCallback
,zone.runUnaryGuarded
,详细请看这儿
值得留意的是,Stream是在microtask中被调度的
Stream支撑同步、异步(默认异步),播送与非播送
Stream支撑以下办法:
-
map()
:将此stream的每个元素转换为一个新的stream事情。 -
where()
:经过当时stream创立一个新的stream并依据条件丢弃某些元素 -
distinct()
:假如数据事情与前一个数据事情相等,则越过数据事情。 - ….
Stream的使用
经过Stream结构器创立
-
Stream.fromFuture:经过
Future
创立一个单一订阅stream
-
Stream.fromIterable:经过
Iterable
的数据创立一个单一订阅的stream
-
Stream.fromFutures:经过一组
Future
创立单一个单一订阅流的stream
-
Stream.periodic:经过时段创立一个重复发出事情的
stream
经过StreamController创立
import 'dart:async';
void main() {
//1.创立一个恣意类型StreamController目标
StreamController streamController = StreamController(
onListen: () => print('listen'),
onCancel: () => print('cancel'),
onPause: () => print('pause'),
onResume: () => print('resumr'));
//2.经过sink槽口增加恣意类型事情数据
streamController.sink.add(100);
streamController.sink.add(100.121212);
streamController.sink.add('THIS IS STRING');
streamController.sink.close();//只有手动调用close办法发送一个done事情,onDone才会被回调
//3.注册监听
streamController.stream.listen((event) => print(event),
onDone: () => print('is done'),
onError: (error, stacktrace) => print('is error, errMsg: $error'),
cancelOnError: true);
}
经过async*创立
留意:async*
是Dart的关键字,表明该函数回来的是stream,yield
是回来Iterable的单个数据,而yield*
后边跟stream
asyncGet(31).listen((event) {
print(event);
});
Stream<String> asyncGet(int count) async* {
yield* asyncGetString(count).map((event) => event + 'C');
}
Stream<String> asyncGetString(int count) async* {
for (int i = 0; i < count; i++) {
yield await delayedGet(i);
}
}
Future<String> delayedGet(int i) async {
await Future.delayed(Duration(seconds: 1));
return i.toString() + 'B';
}
参考资料
/post/684490…
/post/694210…