Stream
是 Flutter 处理数据呼应的一个重要手法,它供给了一种处理数据流的办法,其效果相似于 Kotlin 中的 Flow
,依据发布订阅模式
的规划,经过监听Stream
,订阅者能不断接收到数据流的最新改变。
Stream 的基本用法
Stream
能经过async*
和StreamController
发生,也能经过其它Stream
转化而来。相较于async*
,StreamController
由于灵活度更高,因而更为常见,两者在运用场景上也有必定不同。
async*
信任大家必定知道async
,但async*
就未必,同样作为 Flutter 里异步处理的一环,async
主要跟Future
打交道,而async*
处理的目标是Stream
,async*
在运用上需求搭配yield
。下面这段代码演示了怎么运用async*
进行 1 到 10 的相加。
Stream<int> countStream(int to) async* {
print("stream 被监听");
for (int i = 1; i <= to; i++) {
yield i;
}
}
Future<int> sumStream(Stream<int> stream) async {
var sum = 0;
await for (final value in stream) {
sum += value;
}
return sum;
}
void main() async {
var stream = countStream(10);
// 当注释掉下面这行,控制台不会打印出 "stream 被监听",也就表明 async* 办法体没被履行
var sum = await sumStream(stream);
print(sum); // 55
}
在上面的示例中,async*
办法体里yield
在每次的遍历中,都往Stream
回来一个数据,经过 await for
的监听拿到每次回来的值,接着履行sum
操作。值得注意的是,async*
这种办法发生的stream
,当stream
没有被监听时,async*
办法体是不会被履行的。
假如你看着async*
还有点别扭,请记住:async
回来的是一个Future
,而async*
回来的是一个Stream
。
StreamController
日常开发中,通常会经过StreamController
创立Stream
。只需求构造出StreamController
目标,经过这个目标的.stream
就能够得到Stream
。
Stream<int> countStream(int to) {
// 先创立 StreamController
late StreamController<int> controller;
controller = StreamController<int>(onListen: () {
// 当 Stream 被监听时会触发 onListen 回调
for (var i = 0; i < to; i++) {
controller.add(i);
}
controller.close();
});
return controller.stream;
}
Future<int> listenOn(Stream<int> stream) async {
var completer = Completer<int>();
var sum = 0;
// 监听 stream
stream.listen(
(event) {
sum += event;
},
onDone: () => completer.complete(sum),
);
return completer.future;
}
void main() async {
var stream = countStream(10);
// 当注释掉下面这行,控制台也不会打印出 "stream 被监听"
var sum = await listenOn(stream);
print(sum); // 55
}
在创立StreamController
的时候传入了一个onListen
回调,当流第一次被监听的时候,会触发这个回调,此刻会往流里边依次增加多个数据,listenOn
办法里拿到这些数据履行相加操作。这儿运用了stream
的listen
的办法进行监听。
Stream 左右护法
Flutter 中的 Stream 处理,涉及到三类目标,以发布订阅模式
的视点去看的话,能够分为发布者 StreamController
、数据通道 Stream
、订阅者 StreamSubscription
。
class Example {
var controller = StreamController<int>();
Stream<int> get stream => controller.stream;
StreamSubscription<int>? _subscription;
void initState() {
_subscription = stream.listen((event) {
print(event);
});
for(var i = 0; i <= 10; i++) {
controller.add(i);
}
}
void dispose() {
_subscription?.cancel();
_subscription = null;
}
}
每一个StreamController
都对应着一个Stream
,当Stream
被订阅时,会得到一个StreamSubscription
目标。
上面的比方中,接口运用是简单的,可是他们内部的工作原理是怎么?一个事情从发布到消费中心经过了哪些流程?
数据流向图
在事情处理上:Stream
在被订阅时,会创立StreamSubscription
,并将其间的onData
等事情处理的回调传给StreamSubscription
。
在事情输入输出上:StreamController
经过add
办法输入事情后,先判别此刻是否存在订阅者StreamSubscription
,假如存在则调用StreamSubscription
的onData
处理,不存在就先存到_pendingEvents
里,等到下次StreamSubscription
呈现了再向它输出事情。
能够看到,StreamController
在整个事情流向的处理中肩负着最重要的使命,它控制着事情怎么输入和输出,StreamSubscription
担任处理输出到这儿的事情,Stream
在得到StreamSubscription
后就完成了它的使命挑选“退隐山林”。
这么讲或许还有点“干”,为了更直观的介绍他们各自的责任,接下来咱们从他们界说的接口出发,去考虑他们都能做哪些事情。为了方便呈现,我只取其间最关键的部分。
StreamController
abstract interface class StreamController<T> implements StreamSink<T> {
// stream 流
Stream<T> get stream;
// 流状况的回调
abstract void Function()? onListen; // 被监听
abstract void Function()? onPause; // 流暂停
abstract void Function()? onResume; // 流康复
abstract void Function()? onCancel; // 流撤销/封闭
// 流状况
bool get isClosed;
bool get isPaused;
bool get hasLitener; // 当时流是否有订阅者
// 监听 source,转发给 stream
Future addStream(Stream<T> source, {bool? cancelOnError});
// 往流里边增加事情
void add(T event);
void addError(Object error, [StackTrace? stackTrace]);
Future close(); // 封闭流
// 输出事情
// 以下这三个接口在 _StreamControllerBase 中
void _sendData(T data);
void _sendError(Object error, StackTrace stackTrace);
void _sendDone();
}
-
StreamController
担任办理事情流的状况,当状况改变时,会触发到相应的回调(onListen/onPause等)。 -
StreamController
担任事情的输入,输入的办法有两种,一种是事情接口add
、addError
;别的一种是经过监听其它的Stream
;一起事情也分为两种,一种是正常事情,一种是错误事情。 -
StreamController
能封闭这个事情流通道,会发生一个onDone
事情。 -
StreamController
担任事情的输出,不同的输入对应不同的输出。
Stream
abstract mixin class Stream<T> {
// 是否地播送流,播送流允许多订阅
bool get isBroadcast => false;
// 监听流改变,回来订阅者
StreamSubscription<T> listen(void onData(T event)?,
{Function? onError, void onDone()?, bool? cancelOnError});
// 一系列 Stream 处理和改换操作
Stream<T> where(bool test(T event)) { ... }
Stream<S> map<S>(S convert(T event)) { ... }
Stream<E> asyncMap<E>(FutureOr<E> convert(T event)) { ...}
Stream<E> asyncExpand<E>(Stream<E>? convert(T event)) { ... }
Stream<T> handleError(Function onError, {bool test(error)?}) { ... }
...
}
-
Stream
露出事情流的订阅办法listen
,回来当时订阅者,并把listen
办法中的onData
等参数注册到当时订阅者里边。 -
Stream
有许多过滤转化等语法糖办法。
StreamSubscription
abstract interface class StreamSubscription<T> {
// 监听数据改变
void onData(void handleData(T data)?);
void onError(Function? handleError);
void onDone(void handleDone()?);
// 暂停/康复 监听
void pause([Future<void>? resumeSignal]);
void resume();
bool get isPaused;
// 撤销监听
Future<void> cancel();
// 转成 Future 目标,监听流完毕事情
Future<E> asFuture<E>([E? futureValue]);
}
-
StreamSubscription
作为事情输出端,担任事情的输出处理。 -
StreamSubscription
也能对自己的订阅行为进行暂停、康复或撤销等动作。
Stream 的分类
Stream
有许多子类,对应不同场景的完成。比方关于输入端而言,能够分为同步流和异步流;在输出端上,又可分为播送流和非播送流。
同步和异步
StreamController
的工厂办法中,经过sync
能够指定同步或许异步。同步和异步的区别是:事情输入后是否会立即输出。 同步流在事情输入后会立刻履行onData
,异步流在事情输入后会注册一个异步事情,等到当时EventLoop
中的同步事情处理后触发onData
。
factory StreamController(
{void onListen()?,
void onPause()?,
void onResume()?,
FutureOr<void> onCancel()?,
bool sync = false}) {
return sync
? _SyncStreamController<T>(onListen, onPause, onResume, onCancel)
: _AsyncStreamController<T>(onListen, onPause, onResume, onCancel);
}
在完成上看,_SyncStreamController
终究输出时运用的是_SyncStreamControllerDispatch
,_AsyncStreamController
运用的是_AsyncStreamControllerDispatch
。
两者在输出处理不同,_SyncStreamControllerDispatch
调用的是subscription
的_add
办法,_AsyncStreamControllerDispatch
调用的是subscription
的_addPending
办法。_addPending
会先将事情存到行列里,一起假如行列没有在跑就敞开行列的处理,经过scheduleMicrotask
对事情进行异步处理,处理完当时事情持续处理行列时的其它事情,直到行列清空。
播送和非播送
上述代码中出产的对错播送流,播送流经过StreamController.broadcast
办法创立。播送和非播送的区别是是否允许多次订阅。
factory StreamController.broadcast(
{void onListen()?, void onCancel()?, bool sync = false}) {
return sync
? _SyncBroadcastStreamController<T>(onListen, onCancel)
: _AsyncBroadcastStreamController<T>(onListen, onCancel);
}
非播送StreamController
承继自_StreamController
,播送StreamController
承继自_BroadcastStreamController
,两者的区别能够经过_subscribe
的完成表现。_StreamController
的完成如下,当重复订阅后会直接抛出 StateError 反常。
StreamSubscription<T> _subscribe(void onData(T data)?, Function? onError,
void onDone()?, bool cancelOnError) {
if (!_isInitialState) {
throw StateError("Stream has already been listened to.");
}
_ControllerSubscription<T> subscription = _ControllerSubscription<T>(
this, onData, onError, onDone, cancelOnError);
// ...
return subscription;
}
_BroadcastStreamController
里边有两个目标_firstSubscription
、_lastSubscription
,_BroadcastSubscription
是双向链表结构。当需求输出事情时,经过整个链表,告诉一切的订阅进行音讯的处理。
_BroadcastSubscription<T>? _firstSubscription;
_BroadcastSubscription<T>? _lastSubscription;
开发实战
经过前面接口和分类的分析,咱们对这 Stream 有了更深入的知道。刀已磨好,接下来便经过两个比方来试试这把刀究竟锋不尖利。
利用 Stream 完成事情的播送
事情的播送,在开发时总会遇到,尤其是在跨组件或跨页面的场景,信任大部分开发者的项目里也都会引进相似EventBus
的三方或自研框架。比方:当我在修改个人资料时,Save
之后需求告诉其它页面进行改写以展现最新的个人信息。
// user entity
class UserInfo {
int uid;
String name;
UserInfo(this.uid, this.name);
}
// userinfo update
class UserInfoChangeEvent {
static final _controller = StreamController<UserInfo>.broadcast();
static StreamSubscription<UserInfo> subscribe(Function(UserInfo) callback) {
return _controller.stream.listen(callback);
}
static void broadcast(UserInfo userInfo) {
_controller.add(userInfo);
}
}
// 用户修改页面
class UserProfileViewModel {
...
// 点击 save 时,会调用到 broadcast 办法向外发送事情
void onSave(int uid, String name) {
UserInfoChangeEvent.broadcast(UserInfo(uid, name));
}
}
// 其它页面状况改写
class ViewState extends State<ViewWidget> {
StreamSubscription<UserInfo>? _subscription;
UserInfo? _curUserInfo;
@override
void initState() {
super.initState();
// 初始化时,监听 UserInfoChangeEvent
_subscription = UserInfoChangeEvent.subscribe((userInfo) {
setState(() {
_curUserInfo = userInfo;
})
});
}
@override
void dispose() {
super.dispose();
// 退出时,要撤销监听。不然会有内存泄漏
_subscription?.cancel();
_subscription = null;
}
}
这儿,UserInfoChangeEvent
界说了播送类型的StreamController
,而且向外露出了subscribe
和boradcast
接口,用户修改页面在点击save
时走到onSave
办法,这个办法里调用了UserInfoChangeEvent
的broadcast
办法向外发送了一个更改信息的事情;
接着ViewState
这儿在initState
时经过UserInfoChangeEvent
的subscribe
办法注册了监听,接收到了事情赋值到当时_curUserInfo
,调用setState
改写页面。
StreamBuilder 完成 Widget 主动改写
Flutter 供给了一个组件StreamBuilder
,能协助咱们方便的监听Stream
并改写 Widget。例如进入一个页面时,通常会有一个数据加载的过程,此刻页面会阅历 Loading -> Loaded/LoadError 的状况变更,不同的状况会呈现不同的页面 UI,这时咱们就需求界说一个 LoadingState 的枚举类型,在数据加载后时经过 StreamController 发布 LoadState 状况,StreamBuilder
监听到更新然后会主动触发 Widget 的改写。AsyncSnapShot
是快照的意思,保存着此刻最新的事情。
StreamBuilder<LoadingState>(
stream: viewModel.loadingStateStream,
initialData: LoadingState.loading,
builder: (BuildContext context, AsyncSnapshot snapshot) {
// 依据 snapshot 的数据处理回来
var data = snapshot.data;
if (data == LoadingState.Loaded) {
reutrn Container(child: Text("Loaded Success"));
} else if (data == LoadingStat.LoadError) {
reutrn Container(child: Text("Loading Error"));
} else {
return LoadingView()
}
},
)
不过,StreamBuilder
也有坑。咱们知道,关于Stream
来说,事情被消费了就会丢掉,无论是StreamController
仍是Stream
都不会保存上次的值,以页面加载为例,页面进来后ViewModel
履行数据加载完成后,向loadingStateStream
里发布了Loaded
的状况,假如此刻页面还没有布局StreamBuilder
,StreamBuilder
就无法收到这次监听,等到后边StreamBuilder
真实增加到界面上时现已错过了上次的事情,AsyncSnapshot
拿到的仍是initialData
时设置的数据,也就是 loading 态,这样状况就会展现反常。
你或许会有疑问,为什么StreamBuilder
不能一开始就增加到页面的build
方面里?当然能够,但即便如此也无法保证StreamBuilder
的监听必定会比viewModel
的状况更新早,由于假如页面的内容较长,一开始StreamBuilder
还不在可视区内,它的initState
办法就不会履行,也就不会监听loadingStateStream
。
StreamBuilder
会面对这种囧境,归根究竟是由于Stream
的规划。
Stream 的改换和处理
前面在介绍Stream
的接口时,咱们提到过Stream
里边有许多操作办法。在这part,侧重挑几个从姓名上不太好了解的打开讲讲。
Future<E> drain<E>([E? futureValue]);
drain
意为“排出、消耗“。这儿指”排掉”这条流中心一切的事情,只呼应完毕信号,当流封闭时回来 futureValue。
final result = await Stream.fromIterable([1, 2, 3]).drain(100);
print(result); // Outputs: 100.
Future<S> fold<S>(S initialValue, S Function(S previous, T element) combine)
事情迭代。依据combine
兼并流里边的事情,该办法能够指定回来的类型S
,一起能够指定初始值 initialValue
。
final result = await Stream<int>.fromIterable([2, 4, 6, 8, 10])
.fold<String>("0", (previous, element) => "$previous - $element");
print(result); // 0 - 2 - 4 - 6 - 8 - 10
Future<T> reduce(T Function(T previous, T element) combine);
也是事情迭代。与fold
不同的是,reduce
无法指定初始值且它只能回来与原流相同的类型T
。
final result = await Stream.fromIterable<int>([2, 6, 10, 8, 2])
.fold<int>(10, (previous, element) => previous + element);
print(result); // 38
Future pipe(StreamConsumer<T> streamConsumer);
流管道拼接。将当时流的事情流向streamConsumer
中,streamConsumer
的子类完成通常是一个StreamController
,拿到事情后告诉给它的订阅者。
var controller = StreamController<int>();
var stream = controller.stream;
stream.listen((event) {
print(event); // 2 4 6 8 10
});
var result = await Stream<int>.fromIterable([2, 4, 6, 8, 10]).pipe(controller);
print("result: $result"); // null
Stream<E> asyncExpand<E>(Stream<E>? Function(T event) convert);
异步打开。将原流中的事情做一次打开操作,得到一个E
类型的新流。
var stream = Stream<int>.fromIterable([2, 4, 6, 8, 10]);
var newStream = stream.asyncExpand((event) {
return Stream<int>.fromIterable([event, event + 1]);
});
newStream.listen((event){
print(event); // 2 3 4 5 6 7 8 9 10
});
Stream<E> asyncMap<E>(FutureOr<E> Function(T event) convert);
异步映射。跟asyncExpand
相似,只是转化操作回来的是FutureOr
目标,为那些转化过程中涉及到异步处理的场景供给便当。
var newStream = stream.asyncMap((event) async {
await Future.delayed(const Duration(seconds: 1));
return event + 1;
});
newStream.listen((event){
print(event); // 3 5 7 9 11
});
真实忍不住想吐槽一下,有些办法的姓名起的诚心不昨滴,其间部分都有点“挂羊头卖狗肉”的感觉了。。。
你真的懂了吗?
讲了许多,现在来查验一下。假设有一段逻辑,controller
会增加三个事情,分别是add(1) add(2) add()3
,subscription
会在每次收到事情时打印output: $event
,中心会有一次暂停,3 秒后康复,猜一下在以下几种场景下最终输出的顺序是什么?
1. 同步流
void main() async {
// 同步流:sync 为 true
var controller = StreamController<int>(sync: true);
var subscription = controller.stream.listen((event) {
print('output: $event');
});
controller.add(1);
controller.add(2);
controller.add(3);
print('暂停');
subscription.pause();
Future.delayed(const Duration(seconds: 3), () {
print('3秒后 -> 康复');
subscription.resume();
});
}
// will print:
// output: 1
// output: 2
// output: 3
// 暂停
// 3秒后 -> 康复
2. 异步流
保持 1 中其它代码不变,将sync
的值设置成false
。
// will print:
// 暂停
// 3秒后 -> 康复
// output: 1
// output: 2
// output: 3
3. 异步流:运用 Future.delayed 推迟暂停
保持 2 中其它代码不变,将暂停康复推迟Duraiton.zero
。
Future.delayed(Duration.zero, (){
print('暂停');
subscription.pause();
Future.delayed(const Duration(seconds: 3), () {
print('3秒后 -> 康复');
subscription.resume();
});
});
// will print
// output: 1
// output: 2
// output: 3
// 暂停
// 3秒后 -> 康复
4. 异步流:运用 scheduleMicrotask 推迟暂停
保持 3 中其它代码不变,用scheduleMicrotask
代替Future.delayed
。
scheduleMicrotask(() {
print('暂停');
subscription.pause();
Future.delayed(const Duration(seconds: 3), () {
print('3秒后 -> 康复');
subscription.resume();
});
});
// will print
// output: 1
// 暂停
// 3秒后 -> 康复
// output: 2
// output: 3
上面的输出是否如你所料?信任假如你了解了之前的介绍,对1 2 3点的输出结果是没有问题的。可是关于第 4 点:虽然同样为推迟暂停,3 和 4 中的输出完全不一样,4 中的输出在输出output: 1
后才会触发暂停。这又是为什么呢?要解说这个输出,就要从源码出发了。
结语
所以,咱们首先要从概念上了解他们,其次咱们还要从代码上知道详细的完成。当程度的履行不及预期,缺乏代码完成层面的了解,咱们便会显得手忙脚乱。像前面呈现的StreamBuilder处理中的坑和输出顺序的问题,只有阅读底层源码,才干发现原因并精确修复。下一篇文章,将从源码完成上深入分析Stream
。