本文为稀土技能社区首发签约文章,14天内禁止转载,14天后未获授权禁止转载,侵权必究!
张风捷特烈 – 出品
一、剖析 Stream 目标
要了解一个事物,最好去考虑它存在的 价值
。当你能够意识到某个事物的效果,短少它会有什么弊端,天然会有爱好去了解它。而不是稀里糊涂的看他人怎么用,自己死记硬背 API
有哪些,别离表明什么意思。一味的堆砌常识点,这样不管学什么都是流于外表,茫无头绪。
1. Stream 存在的必要性
可能许多朋友都没有在开发中运用过 Stream
目标,知道它挺重要,但又不知道他的具体的用处。有种只可远观,不行亵玩的距离感。Stream
能够弥补 Future
的短板,它关于异步来说是一块很重要的版块。
一个 Future
目标诞生的那一刻,不管成败,它最终注定只要一个成果。就像一个一般的网络接口,一次恳求只会有一个呼应成果。运用开发在绝大多数场景是一个 因
,对应一个 果
,所以和 Future
打交道比较多。
但有些场景,使命无法一次完结,关于 一次
恳求,会有 若干次
呼应。比方现实生活中,你追更一部小说,在你订阅后,作者每次新时,都会告诉你。在这个场景下,小说完结代表使命完毕,期间会触发多次呼应告诉,这是 Future
无法处理的。
另外,事情告诉的时间不确定的,作者创作的进程也是非常耗时的,所以机体没有必要处于同步等待
的阻塞状况。像这种 异步事情序列
被形象的称之为 Stream 流
。
在人类科学中,一件重要事物的存在,必定有其发挥效用的场所,在这片范畴之下,它是所向无敌的王。在触摸新常识、新概念时,感知这片范畴非常重要,一个东西只要在合适的场景下,才能发挥最大的效能。
2.从读取文件知道 Stream 的运用
File
目标能够经过 readAsString
异步办法读取文件内容,回来 Future<String>
类型目标。而 Future
异步使命只要一次呼应机会,经过 then
回调,所以该办法会将文件中的 一切字符
读取出来。
---->[File#readAsString]---
Future<String> readAsString({Encoding encoding = utf8});
但有些场景中没有必要
或 不能
悉数读取。比方,想要在一个大文件中寻觅一些字符,找到后就 中止读取
;想要在读取文件时 显现
读取进展。这时,只能呼应一次事情的 Future
就爱莫能助了,而这正是 Stream
大显身手的范畴。在 File
类中有 openRead
办法回来 Stream
目标,咱们先经过这个办法了解一下 Stream
的运用办法。
Stream<List<int>> openRead([int? start, int? end]);
现在的场景和上面 追更小说
是很相似的:
-
小说作者
无需一次性向读者
供给一切的章节;小说是一章章
进行更新的,每次更新章节,都需求告诉读者
进行阅览。 -
操作系统
不必一次性读取悉数文件内容,回来给恳求的机体
;文件是一块块
进行读取的,每块文件读取完,需求告诉机体
进行处理。
在对 Stream
的了解中,需求认清两个人物: 发布者
和 订阅者
。其间发布者是真正处理使命的机体,是成果的生产者,比方 作者
、操作系统
、服务器
等,它们有 发送告诉
的义务。订阅者是发送恳求的机体,关于异步使命,其本身并不参加到执行进程中,能够监听告诉来获取需求的成果数据。
代码处理中 Stream
目标运用 listen
办法 监听告诉
,该办法的榜首入参是回调函数,每次告诉时都会被触发。回调函数的参数类型是 Stream
的泛型,表明此次告诉时携带的成果数据。
StreamSubscription<T> listen(void onData(T event)?,
{Function? onError, void onDone()?, bool? cancelOnError});
如下是经过 Stream
事情读取文件,显现读取进展的处理逻辑。当 openRead
使命分发之后,操作系统会一块一块地对文件进行读取,每读一块会发送告诉。Dart
代码中经过 _onData
函数进行监听,回调的 bytes
便是读取的字节数组成果。
在 _onData
函数中依据每次回调的字节数,就能够很轻松地计算出读取的进展。 onDone
指定的函数,会在使命完结时被触发,使命完结也就表明不会再有事情告诉了。
void readFile() async {
File file = File(path.join(Directory.current.path, "assets", "Jane Eyre.txt"));
print("开端读取 Jane Eyre.txt ");
fileLength = await file.length();
Stream<List<int>> stream = file.openRead();
stream.listen(_onData,onDone: _onDone);
}
void _onData(List<int> bytes) {
counter += bytes.length;
double progress = counter * 100 / fileLength;
DateTime time = DateTime.now();
String timeStr = "[${time.hour}:${time.minute}:${time.second}:${time.millisecond}]";
print(timeStr + "=" * (progress ~/ 2) + '[${progress.toStringAsFixed(2)}%]');
}
void _onDone() {
print("读取 Jane Eyre.txt 完毕");
}
3.开始知道 StreamSubscription
Stream#listen
办法监听后,会回来一个 StreamSubscription
目标,表明此次对流的订阅。
StreamSubscription<T> listen(void onData(T event)?,
{Function? onError, void onDone()?, bool? cancelOnError});
经过这个订阅目标,能够暂停 pause
或康复 resume
对流的监听,以及经过 cancel
撤销对流的监听。
---->[StreamSubscription]----
void pause([Future<void>? resumeSignal]);
void resume();
Future<void> cancel();
比方下面当进展大于 50
时,撤销对流的订阅:经过打印日志能够看出 54.99%
时,订阅撤销,流也随之中止,能够注意一个细节。此时 onDone
回调并未触发,表明当 Stream
使命被撤销订阅时,不能算作完结。
late StreamSubscription<List<int>> subscription;
void readFile() async {
File file = File(path.join(Directory.current.path, "assets", "Jane Eyre.txt"));
print("开端读取 Jane Eyre.txt ");
fileLength = await file.length();
Stream<List<int>> stream = file.openRead();
// listen 办法回来 StreamSubscription 目标
subscription = stream.listen(_onData,onDone: _onDone);
}
void _onData(List<int> bytes) async{
counter += bytes.length;
double progress = counter * 100 / fileLength;
DateTime time = DateTime.now();
String timeStr = "[${time.hour}:${time.minute}:${time.second}:${time.millisecond}]";
print(timeStr + "=" * (progress ~/ 2) + '[${progress.toStringAsFixed(2)}%]');
if(progress > 50){
subscription.cancel(); // 撤销订阅
}
}
二、结合运用了解 Stream 的运用
单看 Dart
代码在操控台打印,实在有些不过瘾。下面经过一个风趣的小例子,介绍 Stream
在 Flutter
项目中的运用。这样能够更形象地知道 Stream
的用处,便于进一步了解。
1. 场景剖析
现实生活中假如仔细观察,会发现许多 Stream
概念的身影。比方在银行办理事务时,客户能够看作 Stream
中的一个元素,广播依次播报商标,事务员需求对某个元素进行处理。在饭馆中,每桌的客人能够看作 Stream
中的一个元素,客人下单完结,厨师依据恳求准备饭菜进行处理。这儿,经过模仿 红绿灯
的状况改变,来阐明 Stream
的运用。
能够想象,在一个时间轴上,信号灯的改变是一个接二连三的事情。咱们能够将每次的改变视为 Stream
中的一个元素,信号灯每秒的状况信息都会不同。也便是说,这个 Stream
每秒会产出一个状况,要在运用中模仿红绿灯,只需求监听每次的告诉,更新界面显现即可。
这儿将信号灯的状况信息经过 SignalState
类来封装,成员变量有当前秒数 counter
和信号灯类型 type
。 其间信号灯类型经过 SignalType
枚举表明,有如下三种类型:
const int _kAllowMaxCount = 10;
const int _kWaitMaxCount = 3;
const int _kDenialMaxCount = 10;
class SignalState {
final int counter;
final SignalType type;
SignalState({
required this.counter,
required this.type,
});
}
enum SignalType {
allow, // 答应 - 绿灯
denial, // 回绝 - 红灯
wait, // 等待 - 黄灯
}
2. 信号灯组件的构建
如下所示,信号灯由三个 Lamp
组件和数字构成。三个灯别离表明 红、黄、绿
,某一时间只会量一盏,不亮的运用灰色暗示。三个灯水平排列,有一个黑色布景装修,和文字呈上下结构。
先看灯 Lamp
组件的构建:逻辑非常简略,运用 Container
组件显现圆形,结构时可指定色彩值,为 null
时显现灰色。
class Lamp extends StatelessWidget {
final Color? color;
const Lamp({Key? key, required this.color}) : super(key: key);
@override
Widget build(BuildContext context) {
return Container(
width: 40,
height: 40,
decoration: BoxDecoration(
color: color ?? Colors.grey.withOpacity(0.8),
shape: BoxShape.circle,
),
);
}
}
如下是 SignalLamp
组件的展现效果,其依赖于 SignalState
目标进行显现。依据 SignalType
确定显现的色彩和需求点亮的灯,状况中的 counter
成员用于展现数字。
class SignalLamp extends StatelessWidget {
final SignalState state;
const SignalLamp({Key? key, required this.state}) : super(key: key);
Color get activeColor {
switch (state.type) {
case SignalType.allow:
return Colors.green;
case SignalType.denial:
return Colors.red;
case SignalType.wait:
return Colors.amber;
}
}
@override
Widget build(BuildContext context) {
return Column(
children: [
Container(
padding: const EdgeInsets.symmetric(horizontal: 15, vertical: 10),
decoration: BoxDecoration(
color: Colors.black, borderRadius: BorderRadius.circular(30),),
child: Wrap(
alignment: WrapAlignment.center,
crossAxisAlignment: WrapCrossAlignment.center,
spacing: 15,
children: [
Lamp(color: state.type == SignalType.denial ? activeColor : null),
Lamp(color: state.type == SignalType.wait ? activeColor : null),
Lamp(color: state.type == SignalType.allow ? activeColor : null),
],
),
),
Text(
state.counter.toString(),
style: TextStyle(
fontWeight: FontWeight.bold, fontSize: 50, color: activeColor,
),
)
],
);
}
}
4. Stream 事情的增加与监听
这样,指定不同的 SignalState
就会呈现相应的效果,如下是黄灯的 2 s
:
SignalLamp(
state: SignalState(counter: 2, type: SignalType.wait),
)
在运用 Stream
触发更新之前,先说一下思路。Stream
能够监听一系列事情的触发,每次监听会获取新的信号状况,依据新状况烘托界面即可。如下在 SignalState
中定义 next
办法,便于产出下一状况。逻辑很简略,假如数值大于一,类型不变,数值减一,比方 红灯 6
的下一状况是 红灯 5
; 假如数值等于一,会进入下一类型的最大数值,比方 红灯 1
的下一状况是 黄灯 3
。
---->[SignalState]----
SignalState next() {
if (counter > 1) {
return SignalState(type: type, counter: counter - 1);
} else {
switch (type) {
case SignalType.allow:
return SignalState(
type: SignalType.denial, counter: _kDenialMaxCount);
case SignalType.denial:
return SignalState(type: SignalType.wait, counter: _kWaitMaxCount);
case SignalType.wait:
return SignalState(type: SignalType.allow, counter: _kAllowMaxCount);
}
}
}
把每个事情告诉看做元素,Stream
运用处理事情序列,只不过序列中的元素在此刻是不知道的,何时触发也是不定的。Stream
基于 发布-订阅
的思想经过监听来处理这些事情。 其间两个非常重要的人物: 发布者
是元素的生产者,订阅者
是元素的消费者。
在引擎中的 async
包中封装了 StreamController
类用于操控元素的增加操作,同时供给 Stream
目标用于监听。代码处理如下,tag1
处,监听 streamController
的 stream
目标。事情到来时触发 emit
办法 ( 办法名恣意
),在 emit
中会回调出 SignalState
目标,依据这个新状况更新界面即可。然后延迟 1s
持续增加下一状况。
---->[_MyHomePageState]----
final StreamController<SignalState> streamController = StreamController();
SignalState _signalState = SignalState(counter: 10, type: SignalType.denial);
@override
void initState() {
super.initState();
streamController.stream.listen(emit); // tag1
streamController.add(_signalState);
}
@override
void dispose() {
super.dispose();
streamController.close();
}
void emit(SignalState state) async {
_signalState = state;
setState(() {});
await Future.delayed(const Duration(seconds: 1));
streamController.add(state.next());
}
这样 streamController
增加元素,作为 发布者
;增加的元素能够经过 StreamController
的 stream
成员进行监听。
5. Stream 的操控与反常监听
在前面介绍过 Stream#listen
办法会回来一个 StreamSubscription
的订阅目标,经过该目标能够暂停、康复、撤销对流的监听。如下所示,经过点击按钮执行 _toggle
办法,能够到达 暂停/康复
切换的效果:
---->[_MyHomePageState]----
late StreamSubscription<SignalState> _subscription;
@override
void initState() {
super.initState();
_subscription = streamController.stream.listen(emit);
streamController.add(_signalState);
}
void _toggle() {
if(_subscription.isPaused){
_subscription.resume();
}else{
_subscription.pause();
}
setState(() {});
}
另外,StreamController
在结构时能够传入四个函数来监听流的状况:
final StreamController<SignalState> streamController = StreamController(
onListen: ()=> print("=====onListen====="),
onPause: ()=> print("=====onPause====="),
onResume: ()=> print("=====onResume====="),
onCancel: ()=> print("=====onCancel====="),
);
onListen
会在 stream
成员被监听时触发一次;onPause
、onResume
、onCancel
别离对应订阅者的 pause
、 resume
、cancel
办法。如下是点击暂停和康复的日志信息:
在 Stream#listen
办法中还有另外两个可选参数用于反常的处理。 onError
是过错的回调函数,cancelOnError
标识用于操控触发反常时,是否撤销 Stream
。
StreamSubscription<T> listen(void onData(T event)?,
{Function? onError, void onDone()?, bool? cancelOnError});
如下所示,在 emit
中故意在 红 7
时经过 addError
增加一个反常元素。这儿界面简略显现过错信息,在 3 s
后反常被修正,持续增加新元素。
void emit(SignalState state) async {
_signalState = state;
setState(() {});
await Future.delayed(const Duration(seconds: 1));
SignalState nextState = state.next();
if (nextState.counter == 7 && nextState.type == SignalType.denial) {
streamController.addError(Exception('Error Signal State'));
} else {
streamController.add(nextState);
}
}
在 listen
办法中运用 onError
监听反常事情,进行处理:其间逻辑是烘托过错界面,三秒后修正反常,持续产出下一状况:
_subscription = streamController.stream.listen(
emit,
onError: (err) async {
print(err);
renderError();
await Future.delayed(const Duration(seconds: 3));
fixError();
emit(_signalState.next());
},
cancelOnError: false,
);
关于反常的处理,这儿简略地供给 hasError
标识进行构建逻辑的区别:
bool hasError = false;
void renderError(){
hasError = true;
setState(() {});
}
void fixError(){
hasError = false;
}
最后说一下 listen
中 cancelOnError
的效果,它默许是 false
。假如 cancelOnError = true
,在监听到反常之后,就会撤销监听 stream
,也便是说之后操控器增加的元素就会监听了。这样反常时 StreamController
会触发 onCancel
回调:
三、异步生成器函数与 Stream
前面介绍了经过 StreamController
获取 Stream
进行处理的办法,下面再来看另一种获取 Stream
的办法 – 异步生成器函数
。
1. 考虑 Stream 与 Iterable
经过前面临 Stream
的知道,咱们知道它是在 时间线
上可拥有若干个可监听的事情元素。而 Iterable
也能够拥有多个元素,两者之间是有很大距离的。Iterable
在 时间
和 空间
上都对元素坚持持有关系;而 Stream
只是在时间上监听若干元素的到来,并不在恣意时间都持有元素,更不会在空间上坚持持有关系。
关于一个 Type
类型的数据,在异步使命中,Stream<T>
是 Future<T>
便是多值和单值的区别,它们的成果都不能在 当前时间
得到,只能经过监听在 未来
得到值。 与之相对的便是 Iterable<Type>
和 Type
,它们代表此时此刻,实实在在的目标,能够随时运用。
单值 | 多值 | |
---|---|---|
同步 | Type |
Iterable<Type> |
异步 | Future<Type> |
Stream<Type> |
2. 经过异步生成器函数获取 Stream 目标
Future
目标能够经过 async/awiat
关键字,简化书写,更便利的获取异步使命成果。 关于 Stream
也有相似的 async*/yield
关键字。 如下所示, async*
润饰的办法需求回来一个 Stream
目标。
在办法体中经过 yield
关键字 产出
泛型成果目标,如下是对 信号状况流
元素产生出的逻辑:遍历 count
次,每隔 1 s
产出一个状况。
class SignalStream{
SignalState _signalState = SignalState(counter: 10, type: SignalType.denial);
Stream<SignalState> createStream({int count = 100}) async*{
for(int i = 0 ; i < count; i++){
await Future.delayed(const Duration(seconds: 1));
_signalState = _signalState.next();
yield _signalState;
}
}
}
这样,在 _MyHomePageState
中经过 signalStream.createStream()
就能够创立一个有 100
个元素的流,进行监听。每次接收到新状况时,更新界面,也能够到达意图:
---->[_MyHomePageState]---
final SignalStream signalStream = SignalStream();
_subscription = signalStream.createStream().listen(
emit,
);
void emit(SignalState state) async {
_signalState = state;
setState(() {});
}
到这儿,关于 Stream
的开始知道就完毕了,当然 Stream
的常识还有许多,在后面会陆续介绍。经过本文,你只需求理解 Stream
是什么,经过它咱们精干什么就行了。下一篇咱们将剖析一下 FutureBuilder
和 StreamBuilder
组件的运用和源码完成。它们是 Flutter
对异步目标的封装组件,经过对它们的知道,也能加深咱们对 Future
和 Stream
的立即。 那本文就到这儿,谢谢观看 ~