当只履行一个 Future 时,在异步函数async fn或许异步代码块async {}内部能够直接运用.await,当需要并发的履行多个 Future 时,直接运用 .await会堵塞并发使命,直到特定的Future完结(串行履行)。futures包中供给了许多实用的能够并发履行 Future 的东西,比方join!宏和select!宏。

注:futures::future模块下供给了一系列能够操作 Future 的函数(比操作宏丰厚的多),详见:
docs.rs/futures/lat…
docs.rs/futures/lat…

join! 宏

join!宏允许一起等候多个不同 Future 的完结,且能够并发地运转这些 Future。

先来看两个运用运用.await的过错版别:

struct Book;
struct Music;
async fn enjoy_book() -> Book { /* ... */ Book }
async fn enjoy_music() -> Music { /* ... */ Music}
// 过错版别1: 异步内部是依次串行履行,而不是一起运转它们
async fn enjoy1_book_and_music() -> (Book, Music) {
    // 实践在异步函数内部是是串行履行
    let book = enjoy_book().await; // await 触发堵塞式履行
    let music = enjoy_music().await; // await 触发堵塞式履行
    (book, music)
}
// 过错版别2: 异步内部是依次串行履行,而不是一起运转它们
async fn enjoy2_book_and_music() -> (Book, Music) {
    // 实践在异步函数内部是是串行履行
    let book_future = enjoy_book(); // async 函数是慵懒的,并没有当即履行
    let music_future = enjoy_music(); // async 函数是慵懒的,并没有当即履行
    (book_future.await, music_future.await)
}

上面两个例子看似都能够顺利异步运转,但是实践上是有必要先看完书后,才干听音乐即异步函数内部的使命是依次(依次)串行履行,而不是一起并发的运转

由于在 Rust 中 Future是慵懒的,直到调用.await时,才会开端运转。而那两个await由于在代码中有先后次序,因此它们是次序运转的。

为了正确的并发运转两个Future, 咱们来试试futures::join!宏:

use futures::join;
// 运用`join!`会回来一个元组,里面的值是对应的`Future`履行完毕后输出的值。
async fn enjoy_book_and_music() -> (Book, Music) {
    let book_fut = enjoy_book();
    let music_fut = enjoy_music();
    // join! 宏有必要等候它办理的一切 Future 完结后才干完结
    join!(book_fut, music_fut)
}
fn main() {
    futures::executor::block_on(enjoy_book_and_music());
}

假设期望一起运转一个数组里的多个异步使命,能够运用futures::future::join_all办法

try_join! 宏

由于join!有必要等候它办理的一切Future完结后才干完结,假设你期望在某一个Future报错后就当即停止一切Future 的履行,能够运用try_join!,特别是当Future回来Result时。

注:传给try_join!的一切Future都有必要具有相同的过错类型。假设过错类型不同,能够考虑运用来自futures::future::TryFutureExt模块的map_errerr_info办法将过错进行转化:

use futures::{
    future::TryFutureExt,
    try_join,
};
struct Book;
struct Music;
async fn get_book() -> Result<Book, ()> { /* ... */ Ok(Book) }
async fn get_music() -> Result<Music, String> { /* ... */ Ok(Music) }
/**
 * 传给 try_join! 的一切 Future 都有必要具有相同的过错类型。
 * 假设过错类型不同,能够考虑运用来自 futures::future::TryFutureExt 模块的 map_err和 err_info办法将过错进行转化:
 */
async fn get_book_and_music() -> Result<(Book, Music), String> {
    let book_fut = get_book().map_err(|()| "Unable to get book".to_string());
    let music_fut = get_music();
    // 某一个 Future 报错后就当即停止一切 Future 的履行,能够运用 try_join!
    try_join!(book_fut, music_fut)
}
async fn get_into_book_and_music() -> (Book, Music) {
    get_book_and_music().await.unwrap()
}
fn main() {
    futures::executor::block_on(get_into_book_and_music());
}

select! 宏

join!宏只需等一切Future完毕后,才干会集处理结果,select! 宏则代表等候多个Future,只需任何一个Future完结,都能够当即被处理:

use futures::{
    future::FutureExt, // for `.fuse()`
    pin_mut,
    select,
};
async fn task_one() { /* ... */ }
async fn task_two() { /* ... */ }
/**
 * 赛跑形式:一起并发地运转 t1 和 t2, 无论两者哪个先完结, 函数完毕且不会等候另一个使命的完结
 */
async fn race_tasks() {
    // .fuse() 办法能够让 Future 完结 FusedFuture 特征,
    let t1 = task_one().fuse();
    let t2 = task_two().fuse();
    // pin_mut 宏会为 Future 完结 Unpin特征
    pin_mut!(t1, t2);
    // 一起等候多个 Future ,且任何一个 Future 完毕后,都能够当即被处理,能够考虑运用 futures::select!:
    select! {
        () = t1 => println!("使命1首要完结"),
        () = t2 => println!("使命2首要完结"),
    }
}

上面的代码会一起并发地运转t1t2, 无论两者哪个先完结,都会调用对应的println!打印相应的输出,然后函数完毕且不会等候另一个使命的完结。

注意:运用select宏一切必要的满足: FusedFuture + Unpin, 经过 fuse 办法和 pin_mut 宏完结

首要,.fuse()办法能够让Future完结FusedFuture trait, 而pin_mut!宏会为Future完结Unpintrait:

注:select!宏一切必要的满足: FusedStream + Unpin 两个 Trait 束缚:

  • Unpin:由于select不会过拿走一切权的办法运用 Future,而是经过可变引证的办法去运用,这样当select完毕后,该Future 若没有被完结,它的一切权还能够继续被其它代码运用。
  • FusedFuture:当Future一旦完结后,那select就不能再对其进行轮询运用。Fuse意味着熔断,相当于Future一旦完结,再次调用poll会直接回来Poll::Pending

只需完结了FusedFutureselect才干合作loop一起运用。假设没有完结,就算一个Future现已完结了,它仍然会被select不断的轮询履行。

Stream稍有不同,它们运用的特征是FusedStream。 经过.fuse()(也能够手动完结)完结了该特征的Stream,对其调用.next().try_next()办法能够获取完结了FusedFuture特征的Future:

use futures::{
    stream::{Stream, StreamExt, FusedStream},
    select,
};
async fn add_two_streams() -> u8 {
    // mut s1: impl Stream<Item = u8> + FusedStream + Unpin,
    // mut s2: impl Stream<Item = u8> + FusedStream + Unpin,
    // .fuse() 办法能够让 Stream 完结 FusedStream 特征,
    let s1 = futures::stream::once(async { 10 }).fuse();
    let s2 = futures::stream::once(async { 20 }).fuse();
    // pin_mut 宏会为 Stream 完结 Unpin 特征
    pin_mut!(s1, s2);
    let mut total = 0;
    loop {
        let item = select! {
            x = s1.next() => x,
            x = s2.next() => x,
            complete => break,
            default => panic!(), // 该分支永远不会运转,由于`Future`会先运转,然后是`complete`
        };
        if let Some(next_num) = item {
            total += next_num;
        }
    }
    println!("add_two_streams,total = {total}");
    total
}
fn main() {
    executor::block_on(add_two_streams());
}

注:select!宏还支持defaultcomplete分支:

  • complete分支:当一切的FutureStream完结后才会被履行,它往往合作loop运用,loop用于循环完结一切的Future
  • default分支:若没有任何FutureStream处于Ready状况, 则该分支会被当即履行

在运用select宏过程中,推荐两个很实用的函数和类型:

  • Fuse::terminated()函数:能够在select循环构建一个空的Future(已完结FusedFuture),后面按需填充新 future
  • FuturesUnordered类型:能够使某个Future有多个复制且都能够一起并发运转
use futures::{
    future::{Fuse, FusedFuture, FutureExt},
    stream::{FusedStream, FuturesUnordered, Stream, StreamExt},
    pin_mut,
    select,
};
async fn future_in_select() {
    // 创立一个空 Future,而且现已完结了 FusedFuture 
    let fut = Fuse::terminated();
    // 创立一个 FuturesUnordered 类型,能够多次复制
    let mut async_tasks: FuturesUnordered<Pin<Box<dyn Future<Output = i32>>>> = FuturesUnordered::new();
    async_tasks.push(Box::pin(async { 1 }));
    pin_mut!(fut);
    let mut total = 0;
    loop {
        select! {
            // select_next_some 函数能够用在`select`上,而且只运转从 stream 回来的`Some(_)`值而忽略`None`
            num = async_tasks.select_next_some() => {
                println!("first num is {num} and total is {total}");
                total += num;
                println!("total is {total}");
                if total >= 10 { break; }
                // 判断是否现已终止
                if fut.is_terminated() {
                    // 按需填充新 future
                    fut.set(async { 1 }.fuse());
                }
            },
            num = fut => {
                println!("second num is {num} and total is {total}");
                total += num;
                println!("now total is {total}");
                async_tasks.push(Box::pin(async { 1 }));
            },
            complete => break,
            default => panic!(),
        };
    }
    println!("total finally is {total}");
}
fn main() {
    executor::block_on(future_in_select());
}

总结

futures包中供给了许多实用的并发履行Future的东西,比方:

  1. join!宏:并发的运转多个不同 Future,且等候有必要等候一切 Future 悉数完结,才算完毕,能够理解为有必要悉数完结的使命并发形式
  2. try_join!宏:并发的运转多个不同 Future,当有某一个 Future 报错后就当即停止一切 Future 的履行,Future 回来是Result,能够提前完毕的使命并发形式
  3. select!宏:并发的运转多个不同 Future,且只需任何一个 Future 完结,都能够当即被处理,能够理解成使命赛跑形式
  4. 运用select!宏一切必要的满足: FusedFuture + Unpin, 经过fuse办法和pin_mut宏完结

参考

  • course.rs/advance/asy…
  • huangjj27.github.io/async-book/…
  • docs.rs/futures/lat…
  • docs.rs/futures/lat…