在学习 Rust 异步编程过程中,不管是自动还是被迫,都会接触到不少运用 futures 库的场景,要深化学习 Rust 异步编程,需求从总体上的探究下 futures 的全貌,以便更好的把握 Rust 异步编程。

futures-rs 是 rust 官方供给的一个类库(即:futures 项目库),它是 Rust 异步编程的根底,供给异步运转时(Runtime)。包括要害 trait 的定义如 Stream,以及宏如join!select! 以及各种 future 组合子用来操控异步流程。

注:futures-rs 中定义的 future 类型是规范库中 future 的原始完结。Rust 将中心的 Future trait 移入规范库中并改为std::future::Future以完结 async/await 语法。在某种意义上讲,std::future::Future 能够看作是futures::future::Future的最小子集。

futures-rs库供给了许多功用,把握它们对异步编程很有协助:

  • 供给了许多并发东西:join!select!
  • 供给了TryFutureFusedFutureStreamtrait
  • 供给了许多FutureTryFutureStream的扩展办法
  • 供给了异步 IO 的支撑
  • 供给了Sink
  • 供给了线程池、异步履行器

官方文档:docs.rs/futures/lat…

future-rs包括一系列的 crate,比方:

crate 名称 描述
futures 仅仅从头导出了下面 crate ,并供给对外调用的 API
futures-core 包括了中心的 trait 和类型
futures-task 包括了用于处理使命(task)的东西
futures-channel 包括了用于异步通讯的各种 channel
futures-executor 基于 future-rs 库的异步使命履行器
futures-io 包括了 IO 相关笼统的 trait
futures-sink 包括了 Sink trait
futures-macro 包括了 join!、try_join!、select! 等宏完结
futures-util 包括了一些常用的东西并扩展了一些 trait

代码结构

官方代码:github.com/rust-lang/f…

  • ├── ci // 包括一个 sh 文件
  • ├── examples // 包括两个demo
  • ├── futures-test // 用于测试 futures-rs 的通用东西类库(非测试用例)
  • ├── futures // 供给从头导出了下面 crate 对外调用的 API,但没有导出悉数
  • ├── futures-channel // 异步通讯 channel完结,包括 onshot 和 mpsc
  • ├── futures-core // futures 库的中心 trait 和 type
  • ├── futures-executor // futures 库的异步使命履行器
  • ├── futures-io // 异步 IO 相关笼统的 trait,比方 AsyncRead、AsyncWrite、AsyncSeek、AsyncBufRead 等
  • ├── futures-macro // 供给join!、try_join!、select!、pin_mut!等宏完结
  • ├── futures-sink // Sink trait
  • ├── futures-task // 处理 task 的东西,如 FutureObj/LocalFutureObj struct、 Spawn/LocalSpawn ArcWake trait、
  • ├── futures-util // 通用东西类库、扩展 trait(如AsyncReadExt、SinkExt、SpawnExt)

运用futures 库

运用futures能够引进 futures-rs 下的悉数功用,也能够只引进需求的功用,能够无需规范库即可作业,引进只需在 cargo.toml 添加依靠,比方:

[dependencies]
// 计划1 :引进futures库的悉数 crate
futures = "0.3"
// 计划2 ::只引进线程池 crate
futures = { version = "0.3", features = ["thread-pool"] }
// 计划3 :要在`[no_std]`环境中运用`futures`,请运用:
futures = { version = "0.3", default-features = false }

示例代码:

// main.rs 
use futures::channel::mpsc;   // 消息通道
use futures::executor;  // future 履行器
use futures::executor::ThreadPool; // 线程池
use futures::StreamExt; // Stream 流扩展(比方一些流操作组合)
fn main() {
    let pool = ThreadPool::new().expect("Failed to build pool");
    let (tx, rx) = mpsc::unbounded::<i32>();
    // 运用 async 块创建一个 future,回来一个 Future 的完结
    // 此时尚未供给 executor 给这个 future,因而它不会运转
    let fut_values = async {
        // 创建另外一个 async 块,同样会生成 Future 的完结,
        // 它在父async块里边,因而会在父 async 块履行后,供给 executor 给子 async 块履行
        // executor 连接是由 Future::poll的第二个参数 std::task::Context 完结,
        // 它代表了咱们的 executor ,子 async 块生成的 Future 完结它能被 polled (运用父 async 块的 executor)
        let fut_tx_result = async move {
            (0..100).for_each(|v| {
                tx.unbounded_send(v).expect("Failed to send");
            })
        };
        // 运用 thread pool 的 spawn 办法传输生成的 future
        pool.spawn_ok(fut_tx_result);
        // 操作组合因子
        let fut_values = rx
            .map(|v| v * 2)
            .collect();
        //运用 async 块供给的 executue 去等待 fut_values 完结
        fut_values.await
    };
    // 真实的去调用上面的 fut_values future,履行它的 poll 办法和 Future 里的子 future 的 poll 办法,终究驱动 fut_values被驱动完结,回来成果
    let values: Vec<i32> = executor::block_on(fut_values);
    println!("Values={:?}", values);
}

futures Crate

futures crate 仅仅是从头导出了futures-rs 下的其他 crate ,并供给对外调用的 API,但没有导出悉数,而是供给了常用的一部分模块,trait,函数和宏的导出。

注:futures-rs 对外供给的 api 种类繁复,咱们实际用到的或许仅仅一小部分,stjepang将其间的一部分抽取出来,形成了一个轻量级的 futures 版别,详见 github.com/stjepang/fu…

比方供给从头导出(Re-exports) 异步相关的模块、trait 、宏等:

// trait 导出
pub use futures_core::future::Future;
pub use futures_core::future::TryFuture;
pub use futures_util::future::FutureExt;
pub use futures_util::future::TryFutureExt;
pub use futures_core::stream::Stream;
pub use futures_core::stream::TryStream;
pub use futures_util::stream::StreamExt;
pub use futures_util::stream::TryStreamExt;
pub use futures_sink::Sink;
pub use futures_util::sink::SinkExt;
pub use futures_io::AsyncBufRead;
pub use futures_io::AsyncRead;
pub use futures_io::AsyncSeek;
pub use futures_io::AsyncWrite;
pub use futures_util::AsyncBufReadExt;
pub use futures_util::AsyncReadExt;
pub use futures_util::AsyncSeekExt;
pub use futures_util::AsyncWriteExt;
// 模块导出
pub use futures_util::{future, sink, stream, task};
// 宏导出
pub use futures_util::{join, try_join, select, select_biased, pin_mut};

futures/src/lib.rs 源码如下:

//! Abstractions for asynchronous programming.
//! This crate provides a number of core abstractions for writing asynchronous code:
// futures crate 供给一些列异步编程相关的中心笼统,作用仅仅是从头导出了futures-rs 下的其他 crate,并供给对外调用的 API
#[doc(no_inline)]
pub use futures_core::future::{Future, TryFuture}; // 导出Future和TryFuture
#[doc(no_inline)]
pub use futures_util::future::{FutureExt, TryFutureExt}; // 导出FutureExt和FutureExt
#[doc(no_inline)]
pub use futures_core::stream::{Stream, TryStream}; // 导出Stream和TryStream
#[doc(no_inline)]
pub use futures_util::stream::{StreamExt, TryStreamExt};// 导出StreamExt和TryStreamExt
#[doc(no_inline)]
pub use futures_sink::Sink; // 导出Sink
#[doc(no_inline)]
pub use futures_util::sink::SinkExt; // 导出SinkExt
#[cfg(feature = "std")]
#[doc(no_inline)]
// 导出 AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite
pub use futures_io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite};
#[cfg(feature = "std")]
#[doc(no_inline)]
// 导出 AsyncBufReadExt, AsyncReadExt, AsyncSeekExt, AsyncWriteExt
pub use futures_util::{AsyncBufReadExt, AsyncReadExt, AsyncSeekExt, AsyncWriteExt};
// Macro reexports
pub use futures_core::ready; // 导出ready宏
pub use futures_util::pin_mut; // 导出pin_mut宏
#[cfg(feature = "std")]
#[cfg(feature = "async-await")]
pub use futures_util::select; // 导出select宏
#[cfg(feature = "async-await")]
// 导出一些列宏,比方:join, pending, poll, select_biased, try_join
pub use futures_util::{join, pending, poll, select_biased, try_join}; // Async-await
// Module reexports
#[doc(inline)]
// 导出一些列模块,比方:future, sink, stream, task
pub use futures_util::{future, sink, stream, task};
#[cfg(feature = "std")]
#[cfg(feature = "async-await")]
pub use futures_util::stream_select; // 导出stream_select宏
#[cfg(feature = "alloc")]
#[doc(inline)]
pub use futures_channel as channel;  // 导出futures_channel
#[cfg(feature = "alloc")]
#[doc(inline)]
pub use futures_util::lock; // 导出lock 东西
#[cfg(feature = "std")]
#[doc(inline)]
pub use futures_util::io; // 导出io东西
#[cfg(feature = "executor")]
#[cfg_attr(docsrs, doc(cfg(feature = "executor")))]
pub mod executor {
    //! Built-in executors and related tools.
    //! This module is only available when the `executor` feature of this library is activated.
    // 导出履行器下的一些列东西,比方block_on, block_on_stream等
    pub use futures_executor::{
        block_on, block_on_stream, enter, BlockingStream, Enter, EnterError, LocalPool,
        LocalSpawner,
    };
    #[cfg(feature = "thread-pool")]
    #[cfg_attr(docsrs, doc(cfg(feature = "thread-pool")))]
    // 导出履行器下的线程池:ThreadPool和ThreadPoolBuilder
    pub use futures_executor::{ThreadPool, ThreadPoolBuilder};
}
#[cfg(feature = "compat")]
#[cfg_attr(docsrs, doc(cfg(feature = "compat")))]
pub mod compat {
    //! Interop between `futures` 0.1 and 0.3.
    //! This module is only available when the `compat` feature of this library is activated.
    // 导出 futures_util 库下 feature 兼容相关一些东西
    pub use futures_util::compat::{
        Compat, Compat01As03, Compat01As03Sink, CompatSink, Executor01As03, Executor01CompatExt,
        Executor01Future, Future01CompatExt, Sink01CompatExt, Stream01CompatExt,
    };
    #[cfg(feature = "io-compat")]
    #[cfg_attr(docsrs, doc(cfg(feature = "io-compat")))]
     // 导出 futures_util 库下 IO 相关的一些的兼容东西
    pub use futures_util::compat::{AsyncRead01CompatExt, AsyncWrite01CompatExt};
}
/// 导出 futures crate一些预导入常用项,需求手动导入:use futures::prelude::*;
pub mod prelude {
    //! A "prelude" for crates using the `futures` crate.
    pub use crate::future::{self, Future, TryFuture};
    pub use crate::sink::{self, Sink};
    pub use crate::stream::{self, Stream, TryStream};
    #[doc(no_inline)]
    #[allow(unreachable_pub)]
    pub use crate::future::{FutureExt as _, TryFutureExt as _};
    #[doc(no_inline)]
    pub use crate::sink::SinkExt as _;
    #[doc(no_inline)]
    #[allow(unreachable_pub)]
    pub use crate::stream::{StreamExt as _, TryStreamExt as _};
    #[cfg(feature = "std")]
    pub use crate::io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite};
    #[cfg(feature = "std")]
    #[doc(no_inline)]
    #[allow(unreachable_pub)]
    pub use crate::io::{
        AsyncBufReadExt as _, AsyncReadExt as _, AsyncSeekExt as _, AsyncWriteExt as _,
    };
}

官方文档:

  • futures-rs: github.com/rust-lang/f…
  • futures crates:crates.io/crates/futu…
  • futures doc:docs.rs/futures/lat…
  • futures-future:docs.rs/futures/lat…
  • Future trait:docs.rs/futures/lat…
  • FutureExt tait:docs.rs/futures/lat…
  • TryFutureExt tait:docs.rs/futures/lat…