本文为稀土技能社区首发签约文章,14天内禁止转载,14天后未获授权禁止转载,侵权必究!
张风捷特烈 – 出品

一、剖析 Stream 目标

要了解一个事物,最好去考虑它存在的 价值 。当你能够意识到某个事物的效果,短少它会有什么弊端,天然会有爱好去了解它。而不是稀里糊涂的看他人怎么用,自己死记硬背 API 有哪些,别离表明什么意思。一味的堆砌常识点,这样不管学什么都是流于外表,茫无头绪。


1. Stream 存在的必要性

可能许多朋友都没有在开发中运用过 Stream 目标,知道它挺重要,但又不知道他的具体的用处。有种只可远观,不行亵玩的距离感。Stream 能够弥补 Future 的短板,它关于异步来说是一块很重要的版块。

一个 Future 目标诞生的那一刻,不管成败,它最终注定只要一个成果。就像一个一般的网络接口,一次恳求只会有一个呼应成果。运用开发在绝大多数场景是一个 ,对应一个 ,所以和 Future 打交道比较多。

【Flutter 异步编程 - 叁】 |  初步认识 Stream 类的使用

但有些场景,使命无法一次完结,关于 一次 恳求,会有 若干次 呼应。比方现实生活中,你追更一部小说,在你订阅后,作者每次新时,都会告诉你。在这个场景下,小说完结代表使命完毕,期间会触发多次呼应告诉,这是 Future 无法处理的。

另外,事情告诉的时间不确定的,作者创作的进程也是非常耗时的,所以机体没有必要处于同步等待 的阻塞状况。像这种 异步事情序列 被形象的称之为 Stream 流

【Flutter 异步编程 - 叁】 |  初步认识 Stream 类的使用

在人类科学中,一件重要事物的存在,必定有其发挥效用的场所,在这片范畴之下,它是所向无敌的王。在触摸新常识、新概念时,感知这片范畴非常重要,一个东西只要在合适的场景下,才能发挥最大的效能。


2.从读取文件知道 Stream 的运用

File 目标能够经过 readAsString 异步办法读取文件内容,回来 Future<String> 类型目标。而 Future 异步使命只要一次呼应机会,经过 then 回调,所以该办法会将文件中的 一切字符 读取出来。

---->[File#readAsString]---
Future<String> readAsString({Encoding encoding = utf8});

但有些场景中没有必要不能 悉数读取。比方,想要在一个大文件中寻觅一些字符,找到后就 中止读取 ;想要在读取文件时 显现 读取进展。这时,只能呼应一次事情的 Future 就爱莫能助了,而这正是 Stream 大显身手的范畴。在 File 类中有 openRead 办法回来 Stream 目标,咱们先经过这个办法了解一下 Stream 的运用办法。

Stream<List<int>> openRead([int? start, int? end]);

现在的场景和上面 追更小说 是很相似的:

  • 小说作者 无需一次性向 读者 供给一切的章节;小说是 一章章 进行更新的,每次更新章节,都需求 告诉读者 进行阅览。
  • 操作系统 不必一次性读取悉数文件内容,回来给恳求的 机体;文件是 一块块 进行读取的,每块文件读取完,需求 告诉机体 进行处理。

【Flutter 异步编程 - 叁】 |  初步认识 Stream 类的使用

在对 Stream 的了解中,需求认清两个人物: 发布者订阅者 。其间发布者是真正处理使命的机体,是成果的生产者,比方 作者操作系统服务器 等,它们有 发送告诉 的义务。订阅者是发送恳求的机体,关于异步使命,其本身并不参加到执行进程中,能够监听告诉来获取需求的成果数据。

代码处理中 Stream 目标运用 listen 办法 监听告诉 ,该办法的榜首入参是回调函数,每次告诉时都会被触发。回调函数的参数类型是 Stream 的泛型,表明此次告诉时携带的成果数据。

StreamSubscription<T> listen(void onData(T event)?,
    {Function? onError, void onDone()?, bool? cancelOnError});

如下是经过 Stream 事情读取文件,显现读取进展的处理逻辑。当 openRead 使命分发之后,操作系统会一块一块地对文件进行读取,每读一块会发送告诉。Dart 代码中经过 _onData 函数进行监听,回调的 bytes 便是读取的字节数组成果。

【Flutter 异步编程 - 叁】 |  初步认识 Stream 类的使用

_onData 函数中依据每次回调的字节数,就能够很轻松地计算出读取的进展。 onDone 指定的函数,会在使命完结时被触发,使命完结也就表明不会再有事情告诉了。

void readFile() async {
  File file = File(path.join(Directory.current.path, "assets", "Jane Eyre.txt"));
  print("开端读取 Jane Eyre.txt ");
  fileLength = await file.length();
  Stream<List<int>> stream = file.openRead();
  stream.listen(_onData,onDone: _onDone);
}
void _onData(List<int> bytes) {
  counter += bytes.length;
  double progress = counter * 100 / fileLength;
  DateTime time = DateTime.now();
  String timeStr = "[${time.hour}:${time.minute}:${time.second}:${time.millisecond}]";
  print(timeStr + "=" * (progress ~/ 2) + '[${progress.toStringAsFixed(2)}%]');
}
void _onDone() {
  print("读取 Jane Eyre.txt 完毕");
}

3.开始知道 StreamSubscription

Stream#listen 办法监听后,会回来一个 StreamSubscription 目标,表明此次对流的订阅。

StreamSubscription<T> listen(void onData(T event)?,
    {Function? onError, void onDone()?, bool? cancelOnError});

经过这个订阅目标,能够暂停 pause 或康复 resume 对流的监听,以及经过 cancel 撤销对流的监听。

---->[StreamSubscription]----
void pause([Future<void>? resumeSignal]);
void resume();
Future<void> cancel();

比方下面当进展大于 50 时,撤销对流的订阅:经过打印日志能够看出 54.99% 时,订阅撤销,流也随之中止,能够注意一个细节。此时 onDone 回调并未触发,表明当 Stream 使命被撤销订阅时,不能算作完结。

【Flutter 异步编程 - 叁】 |  初步认识 Stream 类的使用

late StreamSubscription<List<int>> subscription;
void readFile() async {
  File file = File(path.join(Directory.current.path, "assets", "Jane Eyre.txt"));
  print("开端读取 Jane Eyre.txt ");
  fileLength = await file.length();
  Stream<List<int>> stream = file.openRead();
  // listen 办法回来 StreamSubscription 目标
  subscription = stream.listen(_onData,onDone: _onDone);
}
void _onData(List<int> bytes) async{
  counter += bytes.length;
  double progress = counter * 100 / fileLength;
  DateTime time = DateTime.now();
  String timeStr = "[${time.hour}:${time.minute}:${time.second}:${time.millisecond}]";
  print(timeStr + "=" * (progress ~/ 2) + '[${progress.toStringAsFixed(2)}%]');
  if(progress > 50){
    subscription.cancel(); // 撤销订阅
  }
}

二、结合运用了解 Stream 的运用

单看 Dart 代码在操控台打印,实在有些不过瘾。下面经过一个风趣的小例子,介绍 StreamFlutter 项目中的运用。这样能够更形象地知道 Stream 的用处,便于进一步了解。


1. 场景剖析

现实生活中假如仔细观察,会发现许多 Stream 概念的身影。比方在银行办理事务时,客户能够看作 Stream 中的一个元素,广播依次播报商标,事务员需求对某个元素进行处理。在饭馆中,每桌的客人能够看作 Stream 中的一个元素,客人下单完结,厨师依据恳求准备饭菜进行处理。这儿,经过模仿 红绿灯 的状况改变,来阐明 Stream 的运用。

【Flutter 异步编程 - 叁】 |  初步认识 Stream 类的使用

能够想象,在一个时间轴上,信号灯的改变是一个接二连三的事情。咱们能够将每次的改变视为 Stream 中的一个元素,信号灯每秒的状况信息都会不同。也便是说,这个 Stream 每秒会产出一个状况,要在运用中模仿红绿灯,只需求监听每次的告诉,更新界面显现即可。

【Flutter 异步编程 - 叁】 |  初步认识 Stream 类的使用

这儿将信号灯的状况信息经过 SignalState 类来封装,成员变量有当前秒数 counter 和信号灯类型 type 。 其间信号灯类型经过 SignalType 枚举表明,有如下三种类型:

const int _kAllowMaxCount = 10;
const int _kWaitMaxCount = 3;
const int _kDenialMaxCount = 10;
class SignalState {
  final int counter;
  final SignalType type;
  SignalState({
    required this.counter,
    required this.type,
  });
}
enum SignalType {
  allow, // 答应 - 绿灯
  denial, // 回绝 - 红灯
  wait, // 等待 - 黄灯
}

2. 信号灯组件的构建

如下所示,信号灯由三个 Lamp 组件和数字构成。三个灯别离表明 红、黄、绿 ,某一时间只会量一盏,不亮的运用灰色暗示。三个灯水平排列,有一个黑色布景装修,和文字呈上下结构。

【Flutter 异步编程 - 叁】 |  初步认识 Stream 类的使用


先看灯 Lamp 组件的构建:逻辑非常简略,运用 Container 组件显现圆形,结构时可指定色彩值,为 null 时显现灰色。

class Lamp extends StatelessWidget {
  final Color? color;
  const Lamp({Key? key, required this.color}) : super(key: key);
  @override
  Widget build(BuildContext context) {
    return Container(
      width: 40,
      height: 40,
      decoration: BoxDecoration(
        color: color ?? Colors.grey.withOpacity(0.8),
        shape: BoxShape.circle,
      ),
    );
  }
}

如下是 SignalLamp 组件的展现效果,其依赖于 SignalState 目标进行显现。依据 SignalType 确定显现的色彩和需求点亮的灯,状况中的 counter 成员用于展现数字。

【Flutter 异步编程 - 叁】 |  初步认识 Stream 类的使用

class SignalLamp extends StatelessWidget {
  final SignalState state;
  const SignalLamp({Key? key, required this.state}) : super(key: key);
  Color get activeColor {
    switch (state.type) {
      case SignalType.allow:
        return Colors.green;
      case SignalType.denial:
        return Colors.red;
      case SignalType.wait:
        return Colors.amber;
    }
  }
  @override
  Widget build(BuildContext context) {
    return Column(
      children: [
        Container(
          padding: const EdgeInsets.symmetric(horizontal: 15, vertical: 10),
          decoration: BoxDecoration(
              color: Colors.black, borderRadius: BorderRadius.circular(30),),
          child: Wrap(
            alignment: WrapAlignment.center,
            crossAxisAlignment: WrapCrossAlignment.center,
            spacing: 15,
            children: [
              Lamp(color: state.type == SignalType.denial ? activeColor : null),
              Lamp(color: state.type == SignalType.wait ? activeColor : null),
              Lamp(color: state.type == SignalType.allow ? activeColor : null),
            ],
          ),
        ),
        Text(
          state.counter.toString(),
          style: TextStyle(
              fontWeight: FontWeight.bold, fontSize: 50, color: activeColor,
          ),
        )
      ],
    );
  }
}

4. Stream 事情的增加与监听

这样,指定不同的 SignalState 就会呈现相应的效果,如下是黄灯的 2 s

SignalLamp(
  state: SignalState(counter: 2, type: SignalType.wait),
)

【Flutter 异步编程 - 叁】 |  初步认识 Stream 类的使用

在运用 Stream 触发更新之前,先说一下思路。Stream 能够监听一系列事情的触发,每次监听会获取新的信号状况,依据新状况烘托界面即可。如下在 SignalState 中定义 next 办法,便于产出下一状况。逻辑很简略,假如数值大于一,类型不变,数值减一,比方 红灯 6 的下一状况是 红灯 5 ; 假如数值等于一,会进入下一类型的最大数值,比方 红灯 1 的下一状况是 黄灯 3

---->[SignalState]----
SignalState next() {
  if (counter > 1) {
    return SignalState(type: type, counter: counter - 1);
  } else {
    switch (type) {
      case SignalType.allow:
        return SignalState(
            type: SignalType.denial, counter: _kDenialMaxCount);
      case SignalType.denial:
        return SignalState(type: SignalType.wait, counter: _kWaitMaxCount);
      case SignalType.wait:
        return SignalState(type: SignalType.allow, counter: _kAllowMaxCount);
    }
  }
}

把每个事情告诉看做元素,Stream 运用处理事情序列,只不过序列中的元素在此刻是不知道的,何时触发也是不定的。Stream 基于 发布-订阅 的思想经过监听来处理这些事情。 其间两个非常重要的人物: 发布者 是元素的生产者,订阅者 是元素的消费者。

在引擎中的 async 包中封装了 StreamController 类用于操控元素的增加操作,同时供给 Stream 目标用于监听。代码处理如下,tag1 处,监听 streamControllerstream 目标。事情到来时触发 emit 办法 ( 办法名恣意 ),在 emit 中会回调出 SignalState 目标,依据这个新状况更新界面即可。然后延迟 1s 持续增加下一状况。

---->[_MyHomePageState]----
final StreamController<SignalState> streamController = StreamController();
SignalState _signalState = SignalState(counter: 10, type: SignalType.denial);
@override
void initState() {
  super.initState();
  streamController.stream.listen(emit); // tag1
  streamController.add(_signalState);
}
@override
void dispose() {
  super.dispose();
  streamController.close();
}
void emit(SignalState state) async {
  _signalState = state;
  setState(() {});
  await Future.delayed(const Duration(seconds: 1));
  streamController.add(state.next());
}

这样 streamController 增加元素,作为 发布者;增加的元素能够经过 StreamControllerstream 成员进行监听。

【Flutter 异步编程 - 叁】 |  初步认识 Stream 类的使用


5. Stream 的操控与反常监听

在前面介绍过 Stream#listen 办法会回来一个 StreamSubscription 的订阅目标,经过该目标能够暂停、康复、撤销对流的监听。如下所示,经过点击按钮执行 _toggle 办法,能够到达 暂停/康复 切换的效果:

【Flutter 异步编程 - 叁】 |  初步认识 Stream 类的使用

---->[_MyHomePageState]----
late StreamSubscription<SignalState> _subscription;
@override
void initState() {
  super.initState();
  _subscription = streamController.stream.listen(emit);
  streamController.add(_signalState);
}
void _toggle() {
  if(_subscription.isPaused){
    _subscription.resume();
  }else{
    _subscription.pause();
  }
  setState(() {});
}

另外,StreamController 在结构时能够传入四个函数来监听流的状况:

【Flutter 异步编程 - 叁】 |  初步认识 Stream 类的使用

final StreamController<SignalState> streamController = StreamController(
  onListen: ()=> print("=====onListen====="),
  onPause: ()=> print("=====onPause====="),
  onResume: ()=> print("=====onResume====="),
  onCancel: ()=> print("=====onCancel====="),
);

onListen 会在 stream 成员被监听时触发一次;onPauseonResumeonCancel 别离对应订阅者的 pauseresumecancel 办法。如下是点击暂停和康复的日志信息:

【Flutter 异步编程 - 叁】 |  初步认识 Stream 类的使用


Stream#listen 办法中还有另外两个可选参数用于反常的处理。 onError 是过错的回调函数,cancelOnError 标识用于操控触发反常时,是否撤销 Stream

StreamSubscription<T> listen(void onData(T event)?,
    {Function? onError, void onDone()?, bool? cancelOnError});

如下所示,在 emit 中故意在 红 7 时经过 addError 增加一个反常元素。这儿界面简略显现过错信息,在 3 s 后反常被修正,持续增加新元素。

【Flutter 异步编程 - 叁】 |  初步认识 Stream 类的使用

void emit(SignalState state) async {
  _signalState = state;
  setState(() {});
  await Future.delayed(const Duration(seconds: 1));
  SignalState nextState = state.next();
  if (nextState.counter == 7 && nextState.type == SignalType.denial) {
    streamController.addError(Exception('Error Signal State'));
  } else {
    streamController.add(nextState);
  }
}

listen 办法中运用 onError 监听反常事情,进行处理:其间逻辑是烘托过错界面,三秒后修正反常,持续产出下一状况:

_subscription = streamController.stream.listen(
  emit,
  onError: (err) async {
    print(err);
    renderError();
    await Future.delayed(const Duration(seconds: 3));
    fixError();
    emit(_signalState.next());
  },
  cancelOnError: false,
);

关于反常的处理,这儿简略地供给 hasError 标识进行构建逻辑的区别:

【Flutter 异步编程 - 叁】 |  初步认识 Stream 类的使用

bool hasError = false;
void renderError(){
  hasError = true;
  setState(() {});
}
void fixError(){
  hasError = false;
}

最后说一下 listencancelOnError 的效果,它默许是 false 。假如 cancelOnError = true ,在监听到反常之后,就会撤销监听 stream ,也便是说之后操控器增加的元素就会监听了。这样反常时 StreamController 会触发 onCancel 回调:

【Flutter 异步编程 - 叁】 |  初步认识 Stream 类的使用


三、异步生成器函数与 Stream

前面介绍了经过 StreamController 获取 Stream 进行处理的办法,下面再来看另一种获取 Stream 的办法 – 异步生成器函数


1. 考虑 Stream 与 Iterable

经过前面临 Stream 的知道,咱们知道它是在 时间线 上可拥有若干个可监听的事情元素。而 Iterable 也能够拥有多个元素,两者之间是有很大距离的。Iterable时间空间 上都对元素坚持持有关系;而 Stream 只是在时间上监听若干元素的到来,并不在恣意时间都持有元素,更不会在空间上坚持持有关系。

关于一个 Type 类型的数据,在异步使命中,Stream<T>Future<T> 便是多值和单值的区别,它们的成果都不能在 当前时间 得到,只能经过监听在 未来 得到值。 与之相对的便是 Iterable<Type>Type ,它们代表此时此刻,实实在在的目标,能够随时运用。

单值 多值
同步 Type Iterable<Type>
异步 Future<Type> Stream<Type>

2. 经过异步生成器函数获取 Stream 目标

Future 目标能够经过 async/awiat 关键字,简化书写,更便利的获取异步使命成果。 关于 Stream 也有相似的 async*/yield 关键字。 如下所示, async* 润饰的办法需求回来一个 Stream 目标。

在办法体中经过 yield 关键字 产出 泛型成果目标,如下是对 信号状况流 元素产生出的逻辑:遍历 count 次,每隔 1 s 产出一个状况。

class SignalStream{
  SignalState _signalState = SignalState(counter: 10, type: SignalType.denial);
  Stream<SignalState> createStream({int count = 100}) async*{
    for(int i = 0 ; i < count; i++){
      await Future.delayed(const Duration(seconds: 1));
      _signalState = _signalState.next();
      yield _signalState;
    }
  }
}

这样,在 _MyHomePageState 中经过 signalStream.createStream() 就能够创立一个有 100 个元素的流,进行监听。每次接收到新状况时,更新界面,也能够到达意图:

【Flutter 异步编程 - 叁】 |  初步认识 Stream 类的使用

---->[_MyHomePageState]---
final SignalStream signalStream = SignalStream();
_subscription = signalStream.createStream().listen(
  emit,
);
void emit(SignalState state) async {
  _signalState = state;
  setState(() {});
}

到这儿,关于 Stream 的开始知道就完毕了,当然 Stream 的常识还有许多,在后面会陆续介绍。经过本文,你只需求理解 Stream 是什么,经过它咱们精干什么就行了。下一篇咱们将剖析一下 FutureBuilderStreamBuilder 组件的运用和源码完成。它们是 Flutter 对异步目标的封装组件,经过对它们的知道,也能加深咱们对 FutureStream 的立即。 那本文就到这儿,谢谢观看 ~