当只履行一个 Future 时,在异步函数async fn
或许异步代码块async {}
内部能够直接运用.await
,当需要并发的履行多个 Future 时,直接运用 .await
会堵塞并发使命,直到特定的Future完结(串行履行)。futures
包中供给了许多实用的能够并发履行 Future 的东西,比方join!
宏和select!
宏。
注:futures::future模块下供给了一系列能够操作 Future 的函数(比操作宏丰厚的多),详见:
docs.rs/futures/lat…
docs.rs/futures/lat…
join! 宏
join!
宏允许一起等候多个不同 Future 的完结,且能够并发地运转这些 Future。
先来看两个运用运用.await
的过错版别:
struct Book;
struct Music;
async fn enjoy_book() -> Book { /* ... */ Book }
async fn enjoy_music() -> Music { /* ... */ Music}
// 过错版别1: 异步内部是依次串行履行,而不是一起运转它们
async fn enjoy1_book_and_music() -> (Book, Music) {
// 实践在异步函数内部是是串行履行
let book = enjoy_book().await; // await 触发堵塞式履行
let music = enjoy_music().await; // await 触发堵塞式履行
(book, music)
}
// 过错版别2: 异步内部是依次串行履行,而不是一起运转它们
async fn enjoy2_book_and_music() -> (Book, Music) {
// 实践在异步函数内部是是串行履行
let book_future = enjoy_book(); // async 函数是慵懒的,并没有当即履行
let music_future = enjoy_music(); // async 函数是慵懒的,并没有当即履行
(book_future.await, music_future.await)
}
上面两个例子看似都能够顺利异步运转,但是实践上是有必要先看完书后,才干听音乐,即异步函数内部的使命是依次(依次)串行履行,而不是一起并发的运转。
由于在 Rust 中 Future是慵懒的,直到调用.await
时,才会开端运转。而那两个await
由于在代码中有先后次序,因此它们是次序运转的。
为了正确的并发运转两个Future, 咱们来试试futures::join!
宏:
use futures::join;
// 运用`join!`会回来一个元组,里面的值是对应的`Future`履行完毕后输出的值。
async fn enjoy_book_and_music() -> (Book, Music) {
let book_fut = enjoy_book();
let music_fut = enjoy_music();
// join! 宏有必要等候它办理的一切 Future 完结后才干完结
join!(book_fut, music_fut)
}
fn main() {
futures::executor::block_on(enjoy_book_and_music());
}
假设期望一起运转一个数组里的多个异步使命,能够运用
futures::future::join_all
办法
try_join! 宏
由于join!
有必要等候它办理的一切Future完结后才干完结,假设你期望在某一个Future报错后就当即停止一切Future 的履行,能够运用try_join!
,特别是当Future回来Result
时。
注:传给try_join!
的一切Future都有必要具有相同的过错类型。假设过错类型不同,能够考虑运用来自futures::future::TryFutureExt
模块的map_err
和err_info
办法将过错进行转化:
use futures::{
future::TryFutureExt,
try_join,
};
struct Book;
struct Music;
async fn get_book() -> Result<Book, ()> { /* ... */ Ok(Book) }
async fn get_music() -> Result<Music, String> { /* ... */ Ok(Music) }
/**
* 传给 try_join! 的一切 Future 都有必要具有相同的过错类型。
* 假设过错类型不同,能够考虑运用来自 futures::future::TryFutureExt 模块的 map_err和 err_info办法将过错进行转化:
*/
async fn get_book_and_music() -> Result<(Book, Music), String> {
let book_fut = get_book().map_err(|()| "Unable to get book".to_string());
let music_fut = get_music();
// 某一个 Future 报错后就当即停止一切 Future 的履行,能够运用 try_join!
try_join!(book_fut, music_fut)
}
async fn get_into_book_and_music() -> (Book, Music) {
get_book_and_music().await.unwrap()
}
fn main() {
futures::executor::block_on(get_into_book_and_music());
}
select! 宏
join!
宏只需等一切Future完毕后,才干会集处理结果,select!
宏则代表等候多个Future,只需任何一个Future完结,都能够当即被处理:
use futures::{
future::FutureExt, // for `.fuse()`
pin_mut,
select,
};
async fn task_one() { /* ... */ }
async fn task_two() { /* ... */ }
/**
* 赛跑形式:一起并发地运转 t1 和 t2, 无论两者哪个先完结, 函数完毕且不会等候另一个使命的完结
*/
async fn race_tasks() {
// .fuse() 办法能够让 Future 完结 FusedFuture 特征,
let t1 = task_one().fuse();
let t2 = task_two().fuse();
// pin_mut 宏会为 Future 完结 Unpin特征
pin_mut!(t1, t2);
// 一起等候多个 Future ,且任何一个 Future 完毕后,都能够当即被处理,能够考虑运用 futures::select!:
select! {
() = t1 => println!("使命1首要完结"),
() = t2 => println!("使命2首要完结"),
}
}
上面的代码会一起并发地运转t1
和t2
, 无论两者哪个先完结,都会调用对应的println!
打印相应的输出,然后函数完毕且不会等候另一个使命的完结。
注意:运用select
宏一切必要的满足: FusedFuture + Unpin, 经过 fuse 办法和 pin_mut 宏完结
首要,.fuse()
办法能够让Future
完结FusedFuture
trait, 而pin_mut!
宏会为Future
完结Unpin
trait:
注:
select!
宏一切必要的满足:FusedStream
+Unpin
两个 Trait 束缚:
Unpin
:由于select
不会过拿走一切权的办法运用 Future,而是经过可变引证的办法去运用,这样当select
完毕后,该Future 若没有被完结,它的一切权还能够继续被其它代码运用。FusedFuture
:当Future一旦完结后,那select
就不能再对其进行轮询运用。Fuse
意味着熔断,相当于Future一旦完结,再次调用poll
会直接回来Poll::Pending
。
只需完结了FusedFuture
,select
才干合作loop
一起运用。假设没有完结,就算一个Future
现已完结了,它仍然会被select
不断的轮询履行。
Stream
稍有不同,它们运用的特征是FusedStream
。 经过.fuse()
(也能够手动完结)完结了该特征的Stream
,对其调用.next()
或.try_next()
办法能够获取完结了FusedFuture
特征的Future
:
use futures::{
stream::{Stream, StreamExt, FusedStream},
select,
};
async fn add_two_streams() -> u8 {
// mut s1: impl Stream<Item = u8> + FusedStream + Unpin,
// mut s2: impl Stream<Item = u8> + FusedStream + Unpin,
// .fuse() 办法能够让 Stream 完结 FusedStream 特征,
let s1 = futures::stream::once(async { 10 }).fuse();
let s2 = futures::stream::once(async { 20 }).fuse();
// pin_mut 宏会为 Stream 完结 Unpin 特征
pin_mut!(s1, s2);
let mut total = 0;
loop {
let item = select! {
x = s1.next() => x,
x = s2.next() => x,
complete => break,
default => panic!(), // 该分支永远不会运转,由于`Future`会先运转,然后是`complete`
};
if let Some(next_num) = item {
total += next_num;
}
}
println!("add_two_streams,total = {total}");
total
}
fn main() {
executor::block_on(add_two_streams());
}
注:
select!
宏还支持default
和complete
分支:
complete
分支:当一切的Future
和Stream
完结后才会被履行,它往往合作loop
运用,loop
用于循环完结一切的Future
default
分支:若没有任何Future
或Stream
处于Ready
状况, 则该分支会被当即履行
在运用select
宏过程中,推荐两个很实用的函数和类型:
-
Fuse::terminated()
函数:能够在select
循环构建一个空的Future(已完结FusedFuture
),后面按需填充新 future -
FuturesUnordered
类型:能够使某个Future
有多个复制且都能够一起并发运转
use futures::{
future::{Fuse, FusedFuture, FutureExt},
stream::{FusedStream, FuturesUnordered, Stream, StreamExt},
pin_mut,
select,
};
async fn future_in_select() {
// 创立一个空 Future,而且现已完结了 FusedFuture
let fut = Fuse::terminated();
// 创立一个 FuturesUnordered 类型,能够多次复制
let mut async_tasks: FuturesUnordered<Pin<Box<dyn Future<Output = i32>>>> = FuturesUnordered::new();
async_tasks.push(Box::pin(async { 1 }));
pin_mut!(fut);
let mut total = 0;
loop {
select! {
// select_next_some 函数能够用在`select`上,而且只运转从 stream 回来的`Some(_)`值而忽略`None`
num = async_tasks.select_next_some() => {
println!("first num is {num} and total is {total}");
total += num;
println!("total is {total}");
if total >= 10 { break; }
// 判断是否现已终止
if fut.is_terminated() {
// 按需填充新 future
fut.set(async { 1 }.fuse());
}
},
num = fut => {
println!("second num is {num} and total is {total}");
total += num;
println!("now total is {total}");
async_tasks.push(Box::pin(async { 1 }));
},
complete => break,
default => panic!(),
};
}
println!("total finally is {total}");
}
fn main() {
executor::block_on(future_in_select());
}
总结
futures
包中供给了许多实用的并发履行Future的东西,比方:
-
join!
宏:并发的运转多个不同 Future,且等候有必要等候一切 Future 悉数完结,才算完毕,能够理解为有必要悉数完结的使命并发形式 -
try_join!
宏:并发的运转多个不同 Future,当有某一个 Future 报错后就当即停止一切 Future 的履行,Future 回来是Result
,能够提前完毕的使命并发形式 -
select!
宏:并发的运转多个不同 Future,且只需任何一个 Future 完结,都能够当即被处理,能够理解成使命赛跑形式 - 运用
select!
宏一切必要的满足:FusedFuture
+Unpin
, 经过fuse
办法和pin_mut
宏完结
参考
- course.rs/advance/asy…
- huangjj27.github.io/async-book/…
- docs.rs/futures/lat…
- docs.rs/futures/lat…