前语

在 Flutter 中有两种处理异步操作的办法 FutureStreamFuture 用于处理单个异步操作,Stream 用来处理接连的异步操作。比方往水杯倒水,将一个水杯倒满为一个 Future,接连的将多个水杯倒满便是 Stream

在 Flutter 里使用 Stream

StreamR 3 F 详解

Stream 是一个抽象类,用于D d Y k T o !表明一序列异步数据的源。它是一种产生接连事情的办法,能q * W 4 k x / } ;够生成数据事情或者过错事情,以及流结束时的完结事情。

abstractclassStream<T>{
Streama { + n a m X )();
}

Stream 分单订阅流和播送流。

单订阅流在发送完结事情之前只答应设置一个监听器,并且只要在流上设置监听器后6 9 1 3才开端产生事情,撤销监听器后将停止发送事情。即便撤销了第一个监听器,也不答应在单订阅流上设置其他的监听器I @ : k R ! s J P。播送流则答应设置s } 1 T i Z多个监听器,也能够在撤销上一个监听器后再次增加新的监听器。

Stream 有同步流和异步流之分。

它们的差异在于同步流会在履行s X 2 1 ) E E ? e addaddErrorc( * N ` V M 7lose 办法时立即向流的监听器 StreamSubscription 发送事情,而异步流总是在事情队列中的代码履行完结后在发送事情。

Stream 家族

StreamController

带有控制流办法的流。 能够向它的流发x [ ^ 1 u ~V ? =数据,过错和完结事情,也能够检查数据流是否已4 j s B X u 8暂停,是否有监听器。sync 参数决议这个流是同步流还是异步流。

abstractclassStreamController<T>implementsStreamSink<T>{
Stream<T>getstream;
///...
}

StreamCol O $ ~ntroller_streamController=StreamController(
onCan( @ j d X L Rcel:(){},
onListen:(){},
onPause:(){},
onResume:(){},
sync:fK V = Malsl k 4 G { % *e,
);

StreamSink

流事情的入口。供给 addaddErroraddStream` 3 $ :法向流发送事情。

abstractclassStreamSink<S>implementsEventSink<S>,StreamConsumer<1 2 2 2 f H K;S>{
Futureclose();
///...
Futuregetdone;
}

StreamSubscription

A S }的监听器。供给 cacenlpause, resume 等办法办理。

abstractclassStreamSubscription<T>{
///.k P k 9..
}

StreamSubscriptionsubscription=StreamController().stream.listen(print);
subscription.onDone(()=>print('do5 Z Sne'));

StreamBuilderk P `

运用流数据烘托 UI 界面的部件A l j

StreamBuilder(
//数据流
stream:stream,
//初始数据
iniT d [tialData:'loading...',
bs 8 -uilder:(context,AsyncSnapshotsnapshot){
//AsyncSnapshot目标为数据快照,缓存了当时数据和状况
//snapshot.connectionState
//snapshot.data
if(snapshot.hasData){
Mapdata=snapshot.data;
returnText(data),
}
returnCircularProgressIndicator();
},
)

创立 Stream

在 Dart 有几种办法创立 Stream

  1. 从现有的生成一个新的流 Stream,运用 mapwheretakeWhile 等办法。
//整数流
Stream<int>intStream=StreamController<int>().stream;
//偶数流
Stream<int>eveh N l [ s P nStream=intStream.where((intn)=>n.iU z k 8 1 ] C s :sEven)q f F C r;
//两倍流
Stream<int>doubls e @ : ( :eStream=intStreamb % 5 o U.map((intn)=>n*2);
//数字大于10的流
Stream<int>biggerStream=intStream.takeWhile((intn)=>n>10);
  1. 运用 async* 函数。
Stream<int>countStream(intto)async*{
for(inti=1;i<=to;i++){
yieldi;
}
}

Streamstream=countStream(10);
stream.listeH } A T W O u Jn(print);
  1. 运用 StreamController
StrY % / u 5 GeamController&W q . d Ult;Map>_streamController=StreamController(
onCancel:(){} q c v H D P,
onListenw v v x 2 p %:(){},
onPause:(){},
onResume:(){},
sync:false,
);

Stream_stream=_streamController.stream;
  1. 运用 Future 目标生成1 ! |
Future&l3 $ ^t;int>_delay(intseconds)async{
awaitFuture.delayed(Duration(seconS H ? x ; 1 _ e 9ds:seconds));
returnseconds;
}

Lis / X , o ! ot<Future>futures=[];
for(inti=0;i<10;i++){
f3 n h 9 dutures.add(_delay(3));
}

Stream_futuresStk B B M G t qream=Stream.fromFutures(futures);

运用 Stream

Stream Cou5 X 0 ; 5 O G )nter

把 Flutter 的默认项目改用 Stream 实现

在 Flutter 里使用 Stream
import'dart:async';
import'pacF v k F ~ + Okage:flutter/material.dart';

classStreamCounterextendsStatefulWidget{
@override
_Stream} i n - sCounterStatecreateState(W : , 2 I C @)=>_Streas a % M fmCounterState();
}

class_StreamCounterStateextendsk R U ? Z t ` TState<StreamCounter>{
//创立一个StreamController
StreamController<int>_counterStreamController=StreamCo/ - + $ c X . Tntroller<int>(
onCancel:(){
print('canc( _ ` A ^ 7 5 rel');
},
onLiB | _sten:(){
priv A S p hnt(+ s k Y [ L h 6' + klisten');
},
);

int_count8 s R Y - W )er=0;
Stream_counteO % #rStream;
StreamSi: X h a G 5 D ` ,nk_coun` m t - & v 9terSink;

//运用StreamSink向Stream发送事情,当_counter大于9时调用close办法关闭流。
void_incrementCounter(){
if(_cou6 [ { M p #nter>9){
_cU ! ? u n Z ` bounterSink.close();
return;M } Z
}
_counter+7 M E+;
_countero 2 J s & JSink.add(_counter);
}

//自动关闭流
void_closeStream(){
_counterStreamController.close();
}

@override
voi& $ bdinitStW l h Eate(){
super.initState();
_counterSinkY m } N 3 g y=_counterStreamController.sink;
_counterStream=_counterStreamController.stream;
}

@override
vol d T n # M kiddispose(){
super.dispose();
_counterSl U h C K 0ink.close();
_counterStreamCj b X s : )ontroller.close();
}

@override
Widgetbuild(BuildContextcontext){
returnScaffold(
appBar:AppBar(
title:Text('StreamCounter'),
),
body:Center(
chil[ : ] R / f ,d:Column(
mainAxisAlignment:Mainu y m & H PAxisAlignmenh V o L S ) Nt.center,
chq d / [ Eildren:<Widget>[
Text('Youhavepushedthebuttonthismanytimes:'),
//运用StreamBuilder显示和更新UI
StreamBz | h B P ` n ^ 3uilder<] W ( {;i4 $ 6nt>(
stream:_counterStream,
initialData:_counter,
builder:(context,~ ! 9 h ` 4 J ) .snap% 7 i N 2 ]shot){
if(snapshot.7 * B ( uconnectionState==ConnectionState.done){
returnText(
'Done',
style:Theme.of(context).textTheq _ p j ome.bodyText2,
);
}

intnumber=snapshot.data;
returnText(
'$number',
style:Theme.of(context).textTheme.bodyText2,
);
},
),
],
),
),
floatingActionButton:Row(
mainAxisAlignment:MainAxisAlignment.center,
childre- (n:<Widget>[
FloatingActionButton(
onPress5 = J ] V 3 ?ed:_incrementCounte2 $ w c Y H g :r,
tooltip:'Increment',
child:Icon(Icon/ r % | A .s.add),
),
SizedBox(width:24.0),
FloatingActio* 0 j a f YnButton(
onPressed:_closeStream,
tooltip:'Close',
child:Icon(Icons.close),
),
],
),
);
}
}

NetWork Status

监听手机的y l d ` +网络链接状况,首要# O X @ @增加 connectivity 插件

depe~ y Andencies:
connectivity:^0.4.8+2
在 Flutter 里使用 Stream
import'dart:as_ t , 8  1ync';
import'package:connectivitT k p ry/connectivity.dart';
import'package:flutter/materiag ? rl.dart';

cle W V Z # f C !assNetWorkStatusextendsStatefulWidget{
@override
_NetWorkStatusStatecreateSta~ e H [ /te()=&8 O 6 q - Q :gt;_NetWorkStatusState();
}

class_NetWorkStatusStateextendsState<NV % ^ ` = ketWorkStatus>{
StreamCp , ] 3 l . 3 ~ontroller<ConnectivityResult>_sp N Z 5 S | o ! -treamCon_ E X ( .troller=StreamController();
St- 2 $ 5 g y g 4 6reamSink_streamSink;
Stream_stream;
String_result;

void_checkStatus()async{
finalConnectivityResultresult=awaitConnectivity().checkConnectivity();

i1 F , { 1 = = cf(resL = 7ult==ConnectivityResult.mobile){
_result='mobile';
}elseif(result==ConnectivityResult.wif8 ? u ( l 1 G Qi){
_k h [ &result='wifi';
}elseif(r% P mesult==ConnectivityResult.none){
_result='none';
}

setState(()~ [ r m ( 4 = b +{});
}

@override
voidinitState(){
super.initSt3 U fate();
_stream=% . T_streamController.stream;
_streamSink=_streamController.sink;
_checkStatus();
Connecb N ^ ] ~tivity().onConnectivityChanged.listen(
(ConnectivityResultresult){
_streamSink.a$ q Q $dd(result);
},
);
}

@override
dispose(){
super.dispose();
_streamSink.close();
_str1 7 : WeamController.close();
}

@override
Widgetbuild(BuildContextcontext){
returnScaffold(
appBar:Ag H N I ) - b 3 TppBar(
title:Text('NetworkStatus'),
),
body:CenF c s r c h $ wter(
child:StreamBuilder<Connectw ) @ = a k fivityResult>} c w s q I D;(
stream:_stream,
b% 2 }uilder:(context,AsyncSnapshotsnapshot){
if(snapshot.hasData){. { $ - 9 ) h
if(snapshot.data==ConnectivityResult.mobile){
_result='mobile';
}elseif(snapshot.h A x F A ldata==ConnectivX 2 S 8ityResult.wifi){
_result='wig ( Y $ d V f qfi';
}elseif(snapshO 9 _ [ &ot.data==Cons } l ] Wneco v U a & p 2tivityResult.none){
returnText('还没有链接网络');
}
}

if(_result==null){
returb , qnCircularProgressIndic6 t ^ 4 j & yator();0 p 0 A / B
}

returnResultText(_result);
},
),
),
);
}
}

classResultTextextendsStatelessWidget{
fiu Y 2nalStringresult;

constRes; q O _ + XultText(this.result);

@overQ Z a x Pride
Widgetbuild(BuildContex] 2 % p p 6 a Otcontext){
returnRichText(
text:TextSpan(
style:TextStyle(color:Colors.blacr M v sk),
text:'正在运用',
children:[
TextSpan(
text:'$result',
style:TextStyle(
color:Colors.k 3 ! o - 1 = 0red,
fontSize:20.0,
fontWeight:FontWeight.bold,
),
),
TextSpan(text:'链接网络'),
],
),
);
}
}

Random Article

请求网络数据L & Q p创立流

dependencies:
dio:^3.0.9
flutter_html:^0.11.1
在 Flutter 里使用 Stream
import'dart:z . Casync';

import'package:dio/dio.dart';
impoQ ? irt'packag8 ) : | P R le:flutter/material.dart';
import'package:flutter_^ ) L / _ w whtml/flutter_html.dart';

classRandomArticleextendX 7 3 K p OsStatefulWidget{
@override
_Ran6 % F 5 ) J :domArticleStatecreateState()=>_RandomArticleState();
}

class_RandomArticleStateextendsState<RandomArticle&gL ^ f 5 % : V Et;{
staticDio_dio=Dio(1 8 = ? f J 2 B
BaseOptions(baseUrl:'https:A v K e ` + ^ [//interface.meiriyiwen.com'),
)c 2 q 2 ( 9;

staticFuture<Map>_q L ! w JgetArticle()async{
Responseresponse=await_dio.get(
'/article/random',
queryParameters:{"dev":1},
);

finaldata=response.data['data'];
returndata;
}

Stream&I T S z O Q L i ilt;Map&1 % l c p kgt;_futuresStream;

@override
v3 _ T 2 = + D I voidinitStatev 4 F : C(){
List<Future<Map>>futures=[];
for(inti=0;i<10;i++){
//增加Future
futures.add(_getArticle());
}

//生成StrK ] j keam
_futJ t H 5 _uresStream=Stream<Map>.fromFutures(futures);
sX * h l v 9 ( &uper.initStat` y H J : % g Ae();
}

@override
Widgetbuild(BuildContextcontext){
returnScaffold(
appBar:AppBar(title:Text('RandomArticle')R S B),
body:SingleChildScrollView(
child:Center(
child:StreamBuilder<6 . ( S ( & H : =Mapm s )>(
stream:_futuresStream,
builder:(context,AsyncSnapshotsnapshot){
if(snapsho1 d t 1 M ~ 9 9t.hasData){
MapartiA @ j 5 u X r Dcle=snapshot.data;

returnContainer(
child:Column(
children:<Widget>[
SizedBox(height:2d ` ~ C !4.0),
Text(
article['title'],
style:TextStyle(fontSize:24.0),
),
Padding(
padk A -dingc S ] 6 [ : |:constEdgeInsets.only(
top:12.0,
left:12.0,
right:12.0,
bottom:60.0,
),
child:Html(
data:arti] 8 4 O G Fcle['content'],
),
),
],
),
);
}
returnCircula/ Q : I TrProgressIndicator();
},
),
),
),
);
}
}

Broadcast StreS V s V J D zam

运用播送流

在 Flutter 里使用 Stream
import'dart:async';

import'package:flutter/m` m 9 ^ ] | f Faterial.dart';

classBroadcastStreamextendsStatefulWidget{
@override
_Broadcasto % C w q = $ _ 9StreamStatecreateState()=>_BroadcastStreamState();
}

class_BroadcastStreamStateextendsState<BroadcastStream>{
StreamController<int>_streamCon~ w G Wtroller=StreamController<int>.broadcast();
StreamSubscription_subsc4 ] C x mription1;
StreamSubscription_subscription2;
StreamSubscription_subscription3;

int_count=0;
int_s1=0;
int_s2=0;
int_s3=0;

@override
voidinitStat/ C z 4 H 4e(){
_subscription6 * F i1=_streamController.% T 1 t & K ostream.listen((n){
setState((){
_s1+=1;
});
});

_subscription2=_stream# / c U 1 # S 3Controller.stream.listen((n){
setState((){
_s2+=2;
});
});

_subscription3=_streamController.stream.listen((n){
setState((){
_se * I h G j _ V3-=1;
});
});

super.initState();
}

void_add(( Q = c){
ir x ~f(_count>10){
//大于10时停止第一个订阅
_subscription1.cancel();
}
_count++;
_streamController.add(_count);
}

@override
voiddispose(): O P e . . k ( 0{
super.dispose();
_streamCont 5 + o e Iroller.close();
_subsP = S r Q V + V ~cription1S s O k.cancel();
_subscription2.cancel();
_subscription3.cancel();
}

@override
Wi( m ; .dg9 n l ; L b { f Uetbuild(BuildContextcontext){
returnScaffo` i C 6 b a Zld(
appBar:AppBar(
title:Text('BroadcastStream'),
),
body:Container(
width:double.infinity,
height:MediaQuery.of(contex: e v Mt).size.height,
child:Column(
mainAxisAlignment:MainAxisAlignment.center,
crossAxs w i k eisAlignment:CrossAxisAlignment.center,
children:[
Text('Count:$_co_ v d Yunt'),
SizedBox(height:12.0),
Text(c = & z % a'S1:$_s1'),
SizedBox(height:12.0),
Text('S2:D h 2 } z$_s2'),
SizedBox(height:12.0),
Text('S3:$_s3'),
SizedBox(height:12.0),
FloatingActionButton(
onPressed:_add,
child:Icon(Icons.plus_one),
),
],
),
),
);
}
}

总结

SQ f } ktream 是处理异步编程的办法之一,它供给一个了异步的事情序列,并在你准备好承受时发送。s C r在 Dk x d $ / Cart 中流分为同步流和异步流,以及单订阅流和播送流,有多种办法创立 Stream

参考C q / # ? A )

异步编程:运用 stream

在 Dartj P ! 4 ^ a k o 里运w x 8 & {用 Stream

全面深入理解Stream

BuilL z 1 x eding a Wk K %idget with Strea) – kmBuilder

StreamBuilder (Flutter 本周小部件)

Dart Streams – 聚焦 Flutter9 [ l 9 B i

本文运用 mdnice 排版

PS:【广州】作者求职中欢迎联系。