前语
在 Flutter 中有两种处理异步操作的办法 Future
和 Stream
,Future
用于处理单个异步操作,Stream
用来处理接连的异步操作。比方往水杯倒水,将一个水杯倒满为一个 Future
,接连的将多个水杯倒满便是 Stream
。
StreamR 3 F 详解
Stream
是一个抽象类,用于D d Y k T o !表明一序列异步数据的源。它是一种产生接连事情的办法,能q * W 4 k x / } ;够生成数据事情或者过错事情,以及流结束时的完结事情。
abstractclassStream<T>{
Streama { + n a m X )();
}
Stream
分单订阅流和播送流。
单订阅流在发送完结事情之前只答应设置一个监听器,并且只要在流上设置监听器后6 9 1 3才开端产生事情,撤销监听器后将停止发送事情。即便撤销了第一个监听器,也不答应在单订阅流上设置其他的监听器I @ : k R ! s J P。播送流则答应设置s } 1 T i Z多个监听器,也能够在撤销上一个监听器后再次增加新的监听器。
Stream
有同步流和异步流之分。
它们的差异在于同步流会在履行s X 2 1 ) E E ? e add
,addError
或 c( * N ` V M 7lose
办法时立即向流的监听器 StreamSubscription
发送事情,而异步流总是在事情队列中的代码履行完结后在发送事情。
Stream
家族
StreamController
带有控制流办法的流。 能够向它的流发x [ ^ 1 u ~送V ? =数据,过错和完结事情,也能够检查数据流是否已4 j s B X u 8暂停,是否有监听器。sync
参数决议这个流是同步流还是异步流。
abstractclassStreamController<T>implementsStreamSink<T>{
Stream<T>getstream;
///...
}
StreamCol O $ ~ntroller_streamController=StreamController(
onCan( @ j d X L Rcel:(){},
onListen:(){},
onPause:(){},
onResume:(){},
sync:fK V = Malsl k 4 G { % *e,
);
StreamSink
流事情的入口。供给 add
,addError
,addStream
办` 3 $ :法向流发送事情。
abstractclassStreamSink<S>implementsEventSink<S>,StreamConsumer<1 2 2 2 f H K;S>{
Futureclose();
///...
Futuregetdone;
}
StreamSubscription
流A S }的监听器。供给 cacenl
、pause
, resume
等办法办理。
abstractclassStreamSubscription<T>{
///.k P k 9..
}
StreamSubscriptionsubscription=StreamController().stream.listen(print);
subscription.onDone(()=>print('do5 Z Sne'));
StreamBuilderk P `
运用流数据烘托 UI 界面的部件A l j。
StreamBuilder(
//数据流
stream:stream,
//初始数据
iniT d [tialData:'loading...',
bs 8 -uilder:(context,AsyncSnapshotsnapshot){
//AsyncSnapshot目标为数据快照,缓存了当时数据和状况
//snapshot.connectionState
//snapshot.data
if(snapshot.hasData){
Mapdata=snapshot.data;
returnText(data),
}
returnCircularProgressIndicator();
},
)
创立 Stream
在 Dart 有几种办法创立 Stream
-
从现有的生成一个新的流 Stream
,运用map
,where
,takeWhile
等办法。
//整数流
Stream<int>intStream=StreamController<int>().stream;
//偶数流
Stream<int>eveh N l [ s P nStream=intStream.where((intn)=>n.iU z k 8 1 ] C s :sEven)q f F C r;
//两倍流
Stream<int>doubls e @ : ( :eStream=intStreamb % 5 o U.map((intn)=>n*2);
//数字大于10的流
Stream<int>biggerStream=intStream.takeWhile((intn)=>n>10);
-
运用 async*
函数。
Stream<int>countStream(intto)async*{
for(inti=1;i<=to;i++){
yieldi;
}
}
Streamstream=countStream(10);
stream.listeH } A T W O u Jn(print);
-
运用 StreamController
。
StrY % / u 5 GeamController&W q . d Ult;Map>_streamController=StreamController(
onCancel:(){} q c v H D P,
onListenw v v x 2 p %:(){},
onPause:(){},
onResume:(){},
sync:false,
);
Stream_stream=_streamController.stream;
-
运用 Future
目标生成1 ! |
Future&l3 $ ^t;int>_delay(intseconds)async{
awaitFuture.delayed(Duration(seconS H ? x ; 1 _ e 9ds:seconds));
returnseconds;
}
Lis / X , o ! ot<Future>futures=[];
for(inti=0;i<10;i++){
f3 n h 9 dutures.add(_delay(3));
}
Stream_futuresStk B B M G t qream=Stream.fromFutures(futures);
运用 Stream
Stream Cou5 X 0 ; 5 O G )nter
把 Flutter 的默认项目改用 Stream
实现
import'dart:async';
import'pacF v k F ~ + Okage:flutter/material.dart';
classStreamCounterextendsStatefulWidget{
@override
_Stream} i n - sCounterStatecreateState(W : , 2 I C @)=>_Streas a % M fmCounterState();
}
class_StreamCounterStateextendsk R U ? Z t ` TState<StreamCounter>{
//创立一个StreamController
StreamController<int>_counterStreamController=StreamCo/ - + $ c X . Tntroller<int>(
onCancel:(){
print('canc( _ ` A ^ 7 5 rel');
},
onLiB | _sten:(){
priv A S p hnt(+ s k Y [ L h 6' + klisten');
},
);
int_count8 s R Y - W )er=0;
Stream_counteO % #rStream;
StreamSi: X h a G 5 D ` ,nk_coun` m t - & v 9terSink;
//运用StreamSink向Stream发送事情,当_counter大于9时调用close办法关闭流。
void_incrementCounter(){
if(_cou6 [ { M p #nter>9){
_cU ! ? u n Z ` bounterSink.close();
return;M } Z
}
_counter+7 M E+;
_countero 2 J s & JSink.add(_counter);
}
//自动关闭流
void_closeStream(){
_counterStreamController.close();
}
@override
voi& $ bdinitStW l h Eate(){
super.initState();
_counterSinkY m } N 3 g y=_counterStreamController.sink;
_counterStream=_counterStreamController.stream;
}
@override
vol d T n # M kiddispose(){
super.dispose();
_counterSl U h C K 0ink.close();
_counterStreamCj b X s : )ontroller.close();
}
@override
Widgetbuild(BuildContextcontext){
returnScaffold(
appBar:AppBar(
title:Text('StreamCounter'),
),
body:Center(
chil[ : ] R / f ,d:Column(
mainAxisAlignment:Mainu y m & H PAxisAlignmenh V o L S ) Nt.center,
chq d / [ Eildren:<Widget>[
Text('Youhavepushedthebuttonthismanytimes:'),
//运用StreamBuilder显示和更新UI
StreamBz | h B P ` n ^ 3uilder<] W ( {;i4 $ 6nt>(
stream:_counterStream,
initialData:_counter,
builder:(context,~ ! 9 h ` 4 J ) .snap% 7 i N 2 ]shot){
if(snapshot.7 * B ( uconnectionState==ConnectionState.done){
returnText(
'Done',
style:Theme.of(context).textTheq _ p j ome.bodyText2,
);
}
intnumber=snapshot.data;
returnText(
'$number',
style:Theme.of(context).textTheme.bodyText2,
);
},
),
],
),
),
floatingActionButton:Row(
mainAxisAlignment:MainAxisAlignment.center,
childre- (n:<Widget>[
FloatingActionButton(
onPress5 = J ] V 3 ?ed:_incrementCounte2 $ w c Y H g :r,
tooltip:'Increment',
child:Icon(Icon/ r % | A .s.add),
),
SizedBox(width:24.0),
FloatingActio* 0 j a f YnButton(
onPressed:_closeStream,
tooltip:'Close',
child:Icon(Icons.close),
),
],
),
);
}
}
NetWork Status
监听手机的y l d ` +网络链接状况,首要# O X @ @增加 connectivity
插件
depe~ y Andencies:
connectivity:^0.4.8+2
import'dart:as_ t , 8 1ync';
import'package:connectivitT k p ry/connectivity.dart';
import'package:flutter/materiag ? rl.dart';
cle W V Z # f C !assNetWorkStatusextendsStatefulWidget{
@override
_NetWorkStatusStatecreateSta~ e H [ /te()=&8 O 6 q - Q :gt;_NetWorkStatusState();
}
class_NetWorkStatusStateextendsState<NV % ^ ` = ketWorkStatus>{
StreamCp , ] 3 l . 3 ~ontroller<ConnectivityResult>_sp N Z 5 S | o ! -treamCon_ E X ( .troller=StreamController();
St- 2 $ 5 g y g 4 6reamSink_streamSink;
Stream_stream;
String_result;
void_checkStatus()async{
finalConnectivityResultresult=awaitConnectivity().checkConnectivity();
i1 F , { 1 = = cf(resL = 7ult==ConnectivityResult.mobile){
_result='mobile';
}elseif(result==ConnectivityResult.wif8 ? u ( l 1 G Qi){
_k h [ &result='wifi';
}elseif(r% P mesult==ConnectivityResult.none){
_result='none';
}
setState(()~ [ r m ( 4 = b +{});
}
@override
voidinitState(){
super.initSt3 U fate();
_stream=% . T_streamController.stream;
_streamSink=_streamController.sink;
_checkStatus();
Connecb N ^ ] ~tivity().onConnectivityChanged.listen(
(ConnectivityResultresult){
_streamSink.a$ q Q $dd(result);
},
);
}
@override
dispose(){
super.dispose();
_streamSink.close();
_str1 7 : WeamController.close();
}
@override
Widgetbuild(BuildContextcontext){
returnScaffold(
appBar:Ag H N I ) - b 3 TppBar(
title:Text('NetworkStatus'),
),
body:CenF c s r c h $ wter(
child:StreamBuilder<Connectw ) @ = a k fivityResult>} c w s q I D;(
stream:_stream,
b% 2 }uilder:(context,AsyncSnapshotsnapshot){
if(snapshot.hasData){. { $ - 9 ) h
if(snapshot.data==ConnectivityResult.mobile){
_result='mobile';
}elseif(snapshot.h A x F A ldata==ConnectivX 2 S 8ityResult.wifi){
_result='wig ( Y $ d V f qfi';
}elseif(snapshO 9 _ [ &ot.data==Cons } l ] Wneco v U a & p 2tivityResult.none){
returnText('还没有链接网络');
}
}
if(_result==null){
returb , qnCircularProgressIndic6 t ^ 4 j & yator();0 p 0 A / B
}
returnResultText(_result);
},
),
),
);
}
}
classResultTextextendsStatelessWidget{
fiu Y 2nalStringresult;
constRes; q O _ + XultText(this.result);
@overQ Z a x Pride
Widgetbuild(BuildContex] 2 % p p 6 a Otcontext){
returnRichText(
text:TextSpan(
style:TextStyle(color:Colors.blacr M v sk),
text:'正在运用',
children:[
TextSpan(
text:'$result',
style:TextStyle(
color:Colors.k 3 ! o - 1 = 0red,
fontSize:20.0,
fontWeight:FontWeight.bold,
),
),
TextSpan(text:'链接网络'),
],
),
);
}
}
Random Article
请求网络数据L & Q p创立流
dependencies:
dio:^3.0.9
flutter_html:^0.11.1
import'dart:z . Casync';
import'package:dio/dio.dart';
impoQ ? irt'packag8 ) : | P R le:flutter/material.dart';
import'package:flutter_^ ) L / _ w whtml/flutter_html.dart';
classRandomArticleextendX 7 3 K p OsStatefulWidget{
@override
_Ran6 % F 5 ) J :domArticleStatecreateState()=>_RandomArticleState();
}
class_RandomArticleStateextendsState<RandomArticle&gL ^ f 5 % : V Et;{
staticDio_dio=Dio(1 8 = ? f J 2 B
BaseOptions(baseUrl:'https:A v K e ` + ^ [//interface.meiriyiwen.com'),
)c 2 q 2 ( 9;
staticFuture<Map>_q L ! w JgetArticle()async{
Responseresponse=await_dio.get(
'/article/random',
queryParameters:{"dev":1},
);
finaldata=response.data['data'];
returndata;
}
Stream&I T S z O Q L i ilt;Map&1 % l c p kgt;_futuresStream;
@override
v3 _ T 2 = + D I voidinitStatev 4 F : C(){
List<Future<Map>>futures=[];
for(inti=0;i<10;i++){
//增加Future
futures.add(_getArticle());
}
//生成StrK ] j keam
_futJ t H 5 _uresStream=Stream<Map>.fromFutures(futures);
sX * h l v 9 ( &uper.initStat` y H J : % g Ae();
}
@override
Widgetbuild(BuildContextcontext){
returnScaffold(
appBar:AppBar(title:Text('RandomArticle')R S B),
body:SingleChildScrollView(
child:Center(
child:StreamBuilder<6 . ( S ( & H : =Mapm s )>(
stream:_futuresStream,
builder:(context,AsyncSnapshotsnapshot){
if(snapsho1 d t 1 M ~ 9 9t.hasData){
MapartiA @ j 5 u X r Dcle=snapshot.data;
returnContainer(
child:Column(
children:<Widget>[
SizedBox(height:2d ` ~ C !4.0),
Text(
article['title'],
style:TextStyle(fontSize:24.0),
),
Padding(
padk A -dingc S ] 6 [ : |:constEdgeInsets.only(
top:12.0,
left:12.0,
right:12.0,
bottom:60.0,
),
child:Html(
data:arti] 8 4 O G Fcle['content'],
),
),
],
),
);
}
returnCircula/ Q : I TrProgressIndicator();
},
),
),
),
);
}
}
Broadcast StreS V s V J D zam
运用播送流
import'dart:async';
import'package:flutter/m` m 9 ^ ] | f Faterial.dart';
classBroadcastStreamextendsStatefulWidget{
@override
_Broadcasto % C w q = $ _ 9StreamStatecreateState()=>_BroadcastStreamState();
}
class_BroadcastStreamStateextendsState<BroadcastStream>{
StreamController<int>_streamCon~ w G Wtroller=StreamController<int>.broadcast();
StreamSubscription_subsc4 ] C x mription1;
StreamSubscription_subscription2;
StreamSubscription_subscription3;
int_count=0;
int_s1=0;
int_s2=0;
int_s3=0;
@override
voidinitStat/ C z 4 H 4e(){
_subscription6 * F i1=_streamController.% T 1 t & K ostream.listen((n){
setState((){
_s1+=1;
});
});
_subscription2=_stream# / c U 1 # S 3Controller.stream.listen((n){
setState((){
_s2+=2;
});
});
_subscription3=_streamController.stream.listen((n){
setState((){
_se * I h G j _ V3-=1;
});
});
super.initState();
}
void_add(( Q = c){
ir x ~f(_count>10){
//大于10时停止第一个订阅
_subscription1.cancel();
}
_count++;
_streamController.add(_count);
}
@override
voiddispose(): O P e . . k ( 0{
super.dispose();
_streamCont 5 + o e Iroller.close();
_subsP = S r Q V + V ~cription1S s O k.cancel();
_subscription2.cancel();
_subscription3.cancel();
}
@override
Wi( m ; .dg9 n l ; L b { f Uetbuild(BuildContextcontext){
returnScaffo` i C 6 b a Zld(
appBar:AppBar(
title:Text('BroadcastStream'),
),
body:Container(
width:double.infinity,
height:MediaQuery.of(contex: e v Mt).size.height,
child:Column(
mainAxisAlignment:MainAxisAlignment.center,
crossAxs w i k eisAlignment:CrossAxisAlignment.center,
children:[
Text('Count:$_co_ v d Yunt'),
SizedBox(height:12.0),
Text(c = & z % a'S1:$_s1'),
SizedBox(height:12.0),
Text('S2:D h 2 } z$_s2'),
SizedBox(height:12.0),
Text('S3:$_s3'),
SizedBox(height:12.0),
FloatingActionButton(
onPressed:_add,
child:Icon(Icons.plus_one),
),
],
),
),
);
}
}
总结
SQ f } ktream
是处理异步编程的办法之一,它供给一个了异步的事情序列,并在你准备好承受时发送。s C r在 Dk x d $ / Cart 中流分为同步流和异步流,以及单订阅流和播送流,有多种办法创立 Stream
。
参考C q / # ? A )
异步编程:运用 stream
在 Dartj P ! 4 ^ a k o 里运w x 8 & {用 Stream
全面深入理解Stream
BuilL z 1 x eding a Wk K %idget with Strea) – kmBuilder
StreamBuilder (Flutter 本周小部件)
Dart Streams – 聚焦 Flutter9 [ l 9 B i
本文运用 mdnice 排版
PS:【广州】作者求职中欢迎联系。