Stream 是 Flutter 处理数据呼应的一个重要手法,它供给了一种处理数据流的办法,其效果相似于 Kotlin 中的 Flow,依据发布订阅模式的规划,经过监听Stream,订阅者能不断接收到数据流的最新改变。

Stream 的基本用法

Stream能经过async*StreamController发生,也能经过其它Stream转化而来。相较于async*StreamController由于灵活度更高,因而更为常见,两者在运用场景上也有必定不同。

async*

信任大家必定知道async,但async*就未必,同样作为 Flutter 里异步处理的一环,async主要跟Future打交道,而async*处理的目标是Streamasync*在运用上需求搭配yield。下面这段代码演示了怎么运用async*进行 1 到 10 的相加。

Stream<int> countStream(int to) async* {
  print("stream 被监听");
  for (int i = 1; i <= to; i++) {
    yield i;
  }
}
Future<int> sumStream(Stream<int> stream) async {
  var sum = 0;
  await for (final value in stream) {
    sum += value;
  }
  return sum;
}
void main() async {
  var stream = countStream(10);
  // 当注释掉下面这行,控制台不会打印出 "stream 被监听",也就表明 async* 办法体没被履行
  var sum = await sumStream(stream);
  print(sum); // 55
}

在上面的示例中,async*办法体里yield在每次的遍历中,都往Stream回来一个数据,经过 await for的监听拿到每次回来的值,接着履行sum操作。值得注意的是,async*这种办法发生的stream,当stream没有被监听时,async* 办法体是不会被履行的。

假如你看着async*还有点别扭,请记住:async回来的是一个Future,而async*回来的是一个Stream

StreamController

日常开发中,通常会经过StreamController创立Stream。只需求构造出StreamController目标,经过这个目标的.stream就能够得到Stream

Stream<int> countStream(int to) {
  // 先创立 StreamController
  late StreamController<int> controller;
  controller = StreamController<int>(onListen: () {
    // 当 Stream 被监听时会触发 onListen 回调
    for (var i = 0; i < to; i++) {
      controller.add(i);
    }
    controller.close();
  });
  return controller.stream;
}
Future<int> listenOn(Stream<int> stream) async {
  var completer = Completer<int>();
  var sum = 0;
  // 监听 stream
  stream.listen(
    (event) {
      sum += event;
    },
    onDone: () => completer.complete(sum),
  );
  return completer.future;
}
void main() async {
  var stream = countStream(10);
  // 当注释掉下面这行,控制台也不会打印出 "stream 被监听"
  var sum = await listenOn(stream);
  print(sum); // 55
}

在创立StreamController的时候传入了一个onListen回调,当流第一次被监听的时候,会触发这个回调,此刻会往流里边依次增加多个数据,listenOn办法里拿到这些数据履行相加操作。这儿运用了streamlisten的办法进行监听。

Stream 左右护法

Flutter 中的 Stream 处理,涉及到三类目标,以发布订阅模式的视点去看的话,能够分为发布者 StreamController数据通道 Stream订阅者 StreamSubscription

class Example {
  var controller = StreamController<int>();
  Stream<int> get stream => controller.stream;
  StreamSubscription<int>? _subscription;
  void initState() {
    _subscription = stream.listen((event) {
      print(event);
    }); 
    for(var i = 0; i <= 10; i++) {
      controller.add(i);
    }
  }
  void dispose() {
    _subscription?.cancel();
    _subscription = null;
  }
}

每一个StreamController都对应着一个Stream,当Stream被订阅时,会得到一个StreamSubscription目标。

上面的比方中,接口运用是简单的,可是他们内部的工作原理是怎么?一个事情从发布到消费中心经过了哪些流程?

数据流向图

深入理解 Flutter 中的 Stream (一)

在事情处理上:Stream在被订阅时,会创立StreamSubscription,并将其间的onData等事情处理的回调传给StreamSubscription

在事情输入输出上:StreamController经过add办法输入事情后,先判别此刻是否存在订阅者StreamSubscription,假如存在则调用StreamSubscriptiononData处理,不存在就先存到_pendingEvents里,等到下次StreamSubscription呈现了再向它输出事情。

能够看到,StreamController 在整个事情流向的处理中肩负着最重要的使命,它控制着事情怎么输入和输出,StreamSubscription担任处理输出到这儿的事情,Stream在得到StreamSubscription后就完成了它的使命挑选“退隐山林”。

这么讲或许还有点“干”,为了更直观的介绍他们各自的责任,接下来咱们从他们界说的接口出发,去考虑他们都能做哪些事情。为了方便呈现,我只取其间最关键的部分。

StreamController

abstract interface class StreamController<T> implements StreamSink<T> {
  // stream 流
  Stream<T> get stream;
  // 流状况的回调
  abstract void Function()? onListen; // 被监听
  abstract void Function()? onPause; // 流暂停
  abstract void Function()? onResume; // 流康复
  abstract void Function()? onCancel; // 流撤销/封闭
  // 流状况
  bool get isClosed;
  bool get isPaused;
  bool get hasLitener; // 当时流是否有订阅者
  // 监听 source,转发给 stream
  Future addStream(Stream<T> source, {bool? cancelOnError});
  // 往流里边增加事情
  void add(T event);
  void addError(Object error, [StackTrace? stackTrace]);
  Future close(); // 封闭流
  // 输出事情
  // 以下这三个接口在 _StreamControllerBase 中
  void _sendData(T data);
  void _sendError(Object error, StackTrace stackTrace);
  void _sendDone();
}
  1. StreamController担任办理事情流的状况,当状况改变时,会触发到相应的回调(onListen/onPause等)。
  2. StreamController担任事情的输入,输入的办法有两种,一种是事情接口addaddError;别的一种是经过监听其它的Stream;一起事情也分为两种,一种是正常事情,一种是错误事情。
  3. StreamController能封闭这个事情流通道,会发生一个onDone事情。
  4. StreamController担任事情的输出,不同的输入对应不同的输出。

Stream

abstract mixin class Stream<T> {
  // 是否地播送流,播送流允许多订阅
  bool get isBroadcast => false;
  // 监听流改变,回来订阅者
  StreamSubscription<T> listen(void onData(T event)?,
    {Function? onError, void onDone()?, bool? cancelOnError});
  // 一系列 Stream 处理和改换操作
  Stream<T> where(bool test(T event)) { ... }
  Stream<S> map<S>(S convert(T event)) { ... }
  Stream<E> asyncMap<E>(FutureOr<E> convert(T event)) { ...}
  Stream<E> asyncExpand<E>(Stream<E>? convert(T event)) { ... }
  Stream<T> handleError(Function onError, {bool test(error)?}) { ... }
  ...
}
  1. Stream露出事情流的订阅办法listen,回来当时订阅者,并把listen办法中的onData等参数注册到当时订阅者里边。
  2. Stream有许多过滤转化等语法糖办法。

StreamSubscription

abstract interface class StreamSubscription<T> {
  // 监听数据改变
  void onData(void handleData(T data)?);
  void onError(Function? handleError);
  void onDone(void handleDone()?);
  // 暂停/康复 监听
  void pause([Future<void>? resumeSignal]);
  void resume();
  bool get isPaused;
  // 撤销监听
  Future<void> cancel();
  // 转成 Future 目标,监听流完毕事情
  Future<E> asFuture<E>([E? futureValue]);
}
  1. StreamSubscription作为事情输出端,担任事情的输出处理。
  2. StreamSubscription也能对自己的订阅行为进行暂停、康复或撤销等动作。

Stream 的分类

Stream有许多子类,对应不同场景的完成。比方关于输入端而言,能够分为同步流和异步流;在输出端上,又可分为播送流和非播送流。

同步和异步

StreamController的工厂办法中,经过sync能够指定同步或许异步。同步和异步的区别是:事情输入后是否会立即输出。 同步流在事情输入后会立刻履行onData,异步流在事情输入后会注册一个异步事情,等到当时EventLoop中的同步事情处理后触发onData

factory StreamController(
    {void onListen()?,
    void onPause()?,
    void onResume()?,
    FutureOr<void> onCancel()?,
    bool sync = false}) {
  return sync
      ? _SyncStreamController<T>(onListen, onPause, onResume, onCancel)
      : _AsyncStreamController<T>(onListen, onPause, onResume, onCancel);
}

在完成上看,_SyncStreamController终究输出时运用的是_SyncStreamControllerDispatch_AsyncStreamController运用的是_AsyncStreamControllerDispatch

两者在输出处理不同,_SyncStreamControllerDispatch调用的是subscription_add办法,_AsyncStreamControllerDispatch调用的是subscription_addPending办法。_addPending会先将事情存到行列里,一起假如行列没有在跑就敞开行列的处理,经过scheduleMicrotask对事情进行异步处理,处理完当时事情持续处理行列时的其它事情,直到行列清空。

播送和非播送

上述代码中出产的对错播送流,播送流经过StreamController.broadcast办法创立。播送和非播送的区别是是否允许多次订阅。

factory StreamController.broadcast(
    {void onListen()?, void onCancel()?, bool sync = false}) {
  return sync
      ? _SyncBroadcastStreamController<T>(onListen, onCancel)
      : _AsyncBroadcastStreamController<T>(onListen, onCancel);
}

非播送StreamController承继自_StreamController,播送StreamController承继自_BroadcastStreamController,两者的区别能够经过_subscribe的完成表现。_StreamController的完成如下,当重复订阅后会直接抛出 StateError 反常。

StreamSubscription<T> _subscribe(void onData(T data)?, Function? onError,
    void onDone()?, bool cancelOnError) {
  if (!_isInitialState) {
    throw StateError("Stream has already been listened to.");
  }
  _ControllerSubscription<T> subscription = _ControllerSubscription<T>(
      this, onData, onError, onDone, cancelOnError);
  // ...
  return subscription;
}

_BroadcastStreamController里边有两个目标_firstSubscription_lastSubscription_BroadcastSubscription是双向链表结构。当需求输出事情时,经过整个链表,告诉一切的订阅进行音讯的处理。

_BroadcastSubscription<T>? _firstSubscription;
_BroadcastSubscription<T>? _lastSubscription;

开发实战

经过前面接口和分类的分析,咱们对这 Stream 有了更深入的知道。刀已磨好,接下来便经过两个比方来试试这把刀究竟锋不尖利。

利用 Stream 完成事情的播送

事情的播送,在开发时总会遇到,尤其是在跨组件或跨页面的场景,信任大部分开发者的项目里也都会引进相似EventBus的三方或自研框架。比方:当我在修改个人资料时,Save之后需求告诉其它页面进行改写以展现最新的个人信息。

// user entity
class UserInfo {
  int uid;
  String name;
  UserInfo(this.uid, this.name);
}
// userinfo update
class UserInfoChangeEvent {
  static final _controller = StreamController<UserInfo>.broadcast();
  static StreamSubscription<UserInfo> subscribe(Function(UserInfo) callback) {
    return _controller.stream.listen(callback);
  }
  static void broadcast(UserInfo userInfo) {
    _controller.add(userInfo);
  }
}
// 用户修改页面
class UserProfileViewModel {
  ...
  // 点击 save 时,会调用到 broadcast 办法向外发送事情
  void onSave(int uid, String name) {
    UserInfoChangeEvent.broadcast(UserInfo(uid, name));
  }
}
// 其它页面状况改写
class ViewState extends State<ViewWidget> {
  StreamSubscription<UserInfo>? _subscription;
  UserInfo? _curUserInfo;
  @override
  void initState() {
    super.initState();
    // 初始化时,监听 UserInfoChangeEvent
    _subscription = UserInfoChangeEvent.subscribe((userInfo) {
      setState(() {
        _curUserInfo = userInfo;
      })
    });
  }
  @override
  void dispose() {
    super.dispose();
    // 退出时,要撤销监听。不然会有内存泄漏
    _subscription?.cancel();
    _subscription = null;
  }
}

这儿,UserInfoChangeEvent 界说了播送类型的StreamController,而且向外露出了subscribeboradcast接口,用户修改页面在点击save时走到onSave办法,这个办法里调用了UserInfoChangeEventbroadcast办法向外发送了一个更改信息的事情;

接着ViewState这儿在initState时经过UserInfoChangeEventsubscribe办法注册了监听,接收到了事情赋值到当时_curUserInfo,调用setState改写页面。

StreamBuilder 完成 Widget 主动改写

Flutter 供给了一个组件StreamBuilder,能协助咱们方便的监听Stream并改写 Widget。例如进入一个页面时,通常会有一个数据加载的过程,此刻页面会阅历 Loading -> Loaded/LoadError 的状况变更,不同的状况会呈现不同的页面 UI,这时咱们就需求界说一个 LoadingState 的枚举类型,在数据加载后时经过 StreamController 发布 LoadState 状况,StreamBuilder监听到更新然后会主动触发 Widget 的改写。AsyncSnapShot是快照的意思,保存着此刻最新的事情。

StreamBuilder<LoadingState>(
  stream: viewModel.loadingStateStream,
  initialData: LoadingState.loading,
  builder: (BuildContext context, AsyncSnapshot snapshot) {
    // 依据 snapshot 的数据处理回来
    var data = snapshot.data;
    if (data == LoadingState.Loaded) {
        reutrn Container(child: Text("Loaded Success"));
    } else if (data == LoadingStat.LoadError) {
        reutrn Container(child: Text("Loading Error"));
    } else {
        return LoadingView()
    }
  },
)

不过,StreamBuilder也有坑。咱们知道,关于Stream来说,事情被消费了就会丢掉,无论是StreamController仍是Stream都不会保存上次的值,以页面加载为例,页面进来后ViewModel履行数据加载完成后,向loadingStateStream里发布了Loaded的状况,假如此刻页面还没有布局StreamBuilderStreamBuilder就无法收到这次监听,等到后边StreamBuilder真实增加到界面上时现已错过了上次的事情,AsyncSnapshot拿到的仍是initialData时设置的数据,也就是 loading 态,这样状况就会展现反常。

你或许会有疑问,为什么StreamBuilder不能一开始就增加到页面的build方面里?当然能够,但即便如此也无法保证StreamBuilder的监听必定会比viewModel的状况更新早,由于假如页面的内容较长,一开始StreamBuilder还不在可视区内,它的initState办法就不会履行,也就不会监听loadingStateStream

StreamBuilder会面对这种囧境,归根究竟是由于Stream的规划。

Stream 的改换和处理

前面在介绍Stream的接口时,咱们提到过Stream里边有许多操作办法。在这part,侧重挑几个从姓名上不太好了解的打开讲讲。

Future<E> drain<E>([E? futureValue]);

drain意为“排出、消耗“。这儿指”排掉”这条流中心一切的事情,只呼应完毕信号,当流封闭时回来 futureValue。

final result = await Stream.fromIterable([1, 2, 3]).drain(100);
print(result); // Outputs: 100.

Future<S> fold<S>(S initialValue, S Function(S previous, T element) combine)

事情迭代。依据combine兼并流里边的事情,该办法能够指定回来的类型S,一起能够指定初始值 initialValue

final result = await Stream<int>.fromIterable([2, 4, 6, 8, 10])
     .fold<String>("0", (previous, element) => "$previous - $element");
print(result); // 0 - 2 - 4 - 6 - 8 - 10

Future<T> reduce(T Function(T previous, T element) combine);

也是事情迭代。与fold不同的是,reduce无法指定初始值且它只能回来与原流相同的类型T

final result = await Stream.fromIterable<int>([2, 6, 10, 8, 2])
     .fold<int>(10, (previous, element) => previous + element);
print(result); // 38

Future pipe(StreamConsumer<T> streamConsumer);

流管道拼接。将当时流的事情流向streamConsumer中,streamConsumer的子类完成通常是一个StreamController,拿到事情后告诉给它的订阅者。

var controller = StreamController<int>();
var stream = controller.stream;
stream.listen((event) {    
  print(event); // 2 4 6 8 10
});
var result = await Stream<int>.fromIterable([2, 4, 6, 8, 10]).pipe(controller);  
print("result: $result"); // null

Stream<E> asyncExpand<E>(Stream<E>? Function(T event) convert);

异步打开。将原流中的事情做一次打开操作,得到一个E类型的新流。

var stream = Stream<int>.fromIterable([2, 4, 6, 8, 10]);
var newStream = stream.asyncExpand((event) {
  return Stream<int>.fromIterable([event, event + 1]);
});
newStream.listen((event){
  print(event);  // 2 3 4 5 6 7 8 9 10
});

Stream<E> asyncMap<E>(FutureOr<E> Function(T event) convert);

异步映射。跟asyncExpand相似,只是转化操作回来的是FutureOr目标,为那些转化过程中涉及到异步处理的场景供给便当。

var newStream = stream.asyncMap((event) async {
  await Future.delayed(const Duration(seconds: 1));
  return event + 1;
});
newStream.listen((event){
  print(event); // 3 5 7 9 11
});

真实忍不住想吐槽一下,有些办法的姓名起的诚心不昨滴,其间部分都有点“挂羊头卖狗肉”的感觉了。。。

你真的懂了吗?

讲了许多,现在来查验一下。假设有一段逻辑,controller会增加三个事情,分别是add(1) add(2) add()3subscription会在每次收到事情时打印output: $event,中心会有一次暂停,3 秒后康复,猜一下在以下几种场景下最终输出的顺序是什么?

1. 同步流

void main() async {
  // 同步流:sync 为 true
  var controller = StreamController<int>(sync: true);
  var subscription = controller.stream.listen((event) {
    print('output: $event');
  });
  controller.add(1);
  controller.add(2);
  controller.add(3);
  print('暂停');
  subscription.pause();
  Future.delayed(const Duration(seconds: 3), () {
    print('3秒后 -> 康复');
    subscription.resume();
  });
}
// will print:
// output: 1
// output: 2
// output: 3
// 暂停
// 3秒后 -> 康复

2. 异步流

保持 1 中其它代码不变,将sync的值设置成false

// will print:
// 暂停
// 3秒后 -> 康复
// output: 1
// output: 2
// output: 3

3. 异步流:运用 Future.delayed 推迟暂停

保持 2 中其它代码不变,将暂停康复推迟Duraiton.zero

  Future.delayed(Duration.zero, (){
    print('暂停');
    subscription.pause();
    Future.delayed(const Duration(seconds: 3), () {
      print('3秒后 -> 康复');
      subscription.resume();
    });
  });
// will print
// output: 1
// output: 2
// output: 3
// 暂停
// 3秒后 -> 康复

4. 异步流:运用 scheduleMicrotask 推迟暂停

保持 3 中其它代码不变,用scheduleMicrotask代替Future.delayed

  scheduleMicrotask(() {
    print('暂停');
    subscription.pause();
    Future.delayed(const Duration(seconds: 3), () {
      print('3秒后 -> 康复');
      subscription.resume();
    });
  });
// will print
// output: 1
// 暂停
// 3秒后 -> 康复
// output: 2
// output: 3

上面的输出是否如你所料?信任假如你了解了之前的介绍,对1 2 3点的输出结果是没有问题的。可是关于第 4 点:虽然同样为推迟暂停,3 和 4 中的输出完全不一样,4 中的输出在输出output: 1后才会触发暂停。这又是为什么呢?要解说这个输出,就要从源码出发了。

结语

所以,咱们首先要从概念上了解他们,其次咱们还要从代码上知道详细的完成。当程度的履行不及预期,缺乏代码完成层面的了解,咱们便会显得手忙脚乱。像前面呈现的StreamBuilder处理中的坑输出顺序的问题,只有阅读底层源码,才干发现原因并精确修复。下一篇文章,将从源码完成上深入分析Stream