Future 界说
Future 是 Rust 异步编程的核心,Futuretrait 的界说:
#[must_use = "futures do nothing unless you `.await` or poll them"] #[lang = "future_trait"]
pub trait Future {
type Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}
#[must_use = "this `Poll` may be a `Pending` variant, which should be handled"]
pub enum Poll<T> {
Ready(T),
Pending,
}
Future 有一个相关类型 Output;还有一个 poll()
办法,它回来Poll<Self::Output>
。Poll 是个枚举,有 Ready
和 Pending
两个状况。经过调用 poll()
办法能够推动 Future 的进一步履行,直到使命完结被切走停止。
在当时 poll 中,若 Future 完结了,则回来
Poll::Ready(result)
,即得到 Future 的值并回来;若Future 还没完结,则回来Poll::Pending()
,此时 Future 会被挂起,需求等某个事情将其唤醒(wake唤醒函数)
履行调度器 executor
executor 是一个 Future 的调度器。操作系统担任调度线程,但它不会去调度用户态的协程(比方 Future),所以任何运用了协程来处理并发的程序,都需求有一个 executor 来担任协程的调度。
Rust 的 Future 是慵懒的:只要在被 poll 轮询时才会运转。其中一个推动它的方法便是在 async 函数中运用.await
来调用另一个 async 函数,可是这个只能处理 async 内部的问题,那些最外层的 async 函数,需求靠履行器 executor 来推动 。
executor 运转时
Rust 尽管供给 Future 这样的协程,但它在言语层面并不供给 executor,当不需求运用协程时,不需求引进任何运转时;而需求运用协程时,能够在生态系统中选择最适宜的 executor。
Rust 有如下4中常见的 executor :
- futures:这个库自带了很简略的 executor
- tokio:供给 executor,当运用 #[tokio::main] 时,就隐含引进了 tokio 的 executor
- async-std:供给 executor,和 tokio 相似
- smol:供给 async-executor,主要供给了 block_on
wake 告知机制
executor 会管理一批 Future (最外层的 async 函数),然后经过不停地 poll
推动它们直到完结。 最开端,履行器会先 poll
一次 Future ,后边就不会自动去 poll
了,假如 poll
办法回来Poll::Pending
,就挂起 Future,直到收到某个事情后,经过 wake()
函数去唤醒被挂起 Future,Future 就能够去自动告知履行器,它才会持续去 poll
,履行器就能够履行该 Future。这种 wake 告知然后 poll
的方法会不断重复,直到 Future 完结。
Waker 供给了 wake()
办法:其作用是能够告知履行器,相关的使命能够被唤醒了,此时履行器就能够对相应的 Future 再次进行 poll
操作。
Context
是 Waker 的一个封装,先看下 poll
办法里的 Context
:
pub struct Context<'a> {
waker: &'a Waker,
_marker: PhantomData<fn(&'a ()) -> &'a ()>,
}
Waker 的界说和相关的代码十分抽象,内部运用了一个 vtable 来允许各式各样的 waker 的行为:
pub struct RawWakerVTable {
clone: unsafe fn(*const ()) -> RawWaker,
wake: unsafe fn(*const ()),
wake_by_ref: unsafe fn(*const ()),
drop: unsafe fn(*const ()),
}
Rust 本身不供给异步运转时,它只在规范库里规则了一些根本的接口,能够由各个运转时自行决定怎样完结。所以在规范库中,只能看到这些接口的界说,以及“高层”接口的完结,比方 Waker 下的 wake 办法,只是调用了 vtable 里的 wake() 而已 。
impl Waker {
/// Wake up the task associated with this `Waker`.
#[inline]
pub fn wake(self) {
// The actual wakeup call is delegated through a virtual function call
// to the implementation which is defined by the executor.
let wake = self.waker.vtable.wake;
let data = self.waker.data;
// Don't call `drop` -- the waker will be consumed by `wake`.
crate::mem::forget(self);
// SAFETY: This is safe because `Waker::from_raw` is the only way
// to initialize `wake` and `data` requiring the user to acknowledge
// that the contract of `RawWaker` is upheld.
unsafe { (wake)(data) };
}
...
}
vtable 具体的完结并不在规范库中,而是在第三方的异步运转时里,比方 futures 库的 waker vtable
界说。
构建一个计时器
用一个计时器例子,帮助了解 Future 调度机制,目标是: 在创立计时器时创立新线程,休眠特定时刻,然后过了时刻窗口时告知(signal) 计时器 future。
注:需求用到futures
包的ArcWake
特征,它能够供给一个方便的途径去构建一个Waker
。修改Cargo.toml
,添加下面依赖:
[dependencies]
futures = "0.3"
计时器 Future 完整代码:
// future_timer.rs
use futures;
use std::{
future::Future,
pin::Pin,
sync::{Arc, Mutex},
task::{Context, Poll, Waker},
thread,
time::Duration,
};
pub struct TimerFuture {
shared_state: Arc<Mutex<SharedState>>,
}
/// 在Future和等候的线程间同享状况
struct SharedState {
/// 定时(睡觉)是否完毕
completed: bool,
/// 当睡觉完毕后,线程能够用`waker`告知`TimerFuture`来唤醒使命
waker: Option<Waker>,
}
impl Future for TimerFuture {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// 经过检查同享状况,来确认定时器是否现已完结
let mut shared_state = self.shared_state.lock().unwrap();
if shared_state.completed {
println!("future ready. execute poll to return.");
Poll::Ready(())
} else {
println!("future not ready, tell the future task how to wakeup to executor");
// 设置`waker`,这样新线程在睡觉(计时)完毕后能够唤醒当时的使命,接着再次对`Future`进行`poll`操作,
// 下面的`clone`每次被`poll`时都会产生一次,实际上,应该是只`clone`一次更加合理。
// 选择每次都`clone`的原因是: `TimerFuture`能够在履行器的不同使命间移动,假如只克隆一次,
// 那么获取到的`waker`可能现已被篡改并指向了其它使命,终究导致履行器运转了过错的使命
shared_state.waker = Some(cx.waker().clone());
Poll::Pending
}
}
}
impl TimerFuture {
/// 创立一个新的`TimerFuture`,在指定的时刻完毕后,该`Future`能够完结
pub fn new(duration: Duration) -> Self {
let shared_state = Arc::new(Mutex::new(SharedState {
completed: false,
waker: None,
}));
// 创立新线程
let thread_shared_state = shared_state.clone();
thread::spawn(move || {
// 睡觉指定时刻完结计时功用
thread::sleep(duration);
let mut shared_state = thread_shared_state.lock().unwrap();
// 告知履行器定时器现已完结,能够持续`poll`对应的`Future`了
shared_state.completed = true;
if let Some(waker) = shared_state.waker.take() {
println!("detect future is ready, wakeup the future task to executor.");
waker.wake()
}
});
TimerFuture { shared_state }
}
}
fn main() {
// 咱们现在还没有完结调度器,所以要用一下futues库里的一个调度器。
futures::executor::block_on(TimerFuture::new(Duration::new(10, 0)));
}
履行成果如下:
future not ready, tell the future task how to wakeup to executor
detect future is ready, wakeup the future task to executor.
future ready. execute poll to return.
能够看到,刚开端的时分,定时10s事情还未完结,处在Pending
状况,这时要告知这个使命后边安排妥当后怎样唤醒去调度履行。等10s后,定时事情完结了,经过前面的设置的Waker
,唤醒这个Future
使命去调度履行。
构建一个履行器
上面的代码,咱们并没有完结调度器,而是运用的futures
库中供给的一个调度器去履行,下面自己完结一个调度器,看一下它的原理。而在Rust中,真正要用的话,还是要学习tokio
库,这儿咱们只是为了讲述一下完结原理,以便于了解异步是怎样一回事。关键代码如下:
// future_executor.rs
use {
futures::{
future::{BoxFuture, FutureExt},
task::{waker_ref, ArcWake},
},
std::{
future::Future,
sync::mpsc::{sync_channel, Receiver, SyncSender},
sync::{Arc, Mutex},
task::Context,
time::Duration,
},
};
mod future_timer;
// 引进之前完结的定时器模块
use future_timer::TimerFuture;
/// 使命履行器,担任从通道中接收使命然后履行
struct Executor {
ready_queue: Receiver<Arc<Task>>,
}
/// `Spawner`担任创立新的`Future`然后将它发送到使命通道中
#[derive(Clone)]
struct Spawner {
task_sender: SyncSender<Arc<Task>>,
}
/// 一个 Future,它能够调度自己(将自己放入使命通道中),然后等候履行器去`poll`
struct Task {
/// 进行中的Future,在未来的某个时刻点会被完结
///
/// 按理来说`Mutex`在这儿是剩余的,由于咱们只要一个线程来履行使命。可是由于
/// Rust并不聪明,它无法知道`Future`只会在一个线程内被修改,并不会被跨线程修改。因此
/// 咱们需求运用`Mutex`来满意这个笨笨的编译器对线程安全的执着。
///
/// 假如是出产级的履行器完结,不会运用`Mutex`,由于会带来性能上的开支,取而代之的是运用`UnsafeCell`
future: Mutex<Option<BoxFuture<'static, ()>>>,
/// 能够将该使命本身放回到使命通道中,等候履行器的poll
task_sender: SyncSender<Arc<Task>>,
}
fn new_executor_and_spawner() -> (Executor, Spawner) {
// 使命通道允许的最大缓冲数(使命行列的最大长度)
// 当时的完结只是是为了简略,在实际的履行中,并不会这么运用
const MAX_QUEUED_TASKS: usize = 10_000;
let (task_sender, ready_queue) = sync_channel(MAX_QUEUED_TASKS);
(Executor { ready_queue }, Spawner { task_sender })
}
impl Spawner {
fn spawn(&self, future: impl Future<Output = ()> + 'static + Send) {
let future = future.boxed();
let task = Arc::new(Task {
future: Mutex::new(Some(future)),
task_sender: self.task_sender.clone(),
});
println!("first dispatch the future task to executor.");
self.task_sender.send(task).expect("too many tasks queued.");
}
}
/// 完结ArcWake,标明怎样去唤醒使命去调度履行。
impl ArcWake for Task {
fn wake_by_ref(arc_self: &Arc<Self>) {
// 经过发送使命到使命管道的方法来完结`wake`,这样`wake`后,使命就能被履行器`poll`
let cloned = arc_self.clone();
arc_self
.task_sender
.send(cloned)
.expect("too many tasks queued");
}
}
impl Executor {
// 实际运转具体的Future使命,不断的接收Future task履行。
fn run(&self) {
let mut count = 0;
while let Ok(task) = self.ready_queue.recv() {
count = count + 1;
println!("received task. {}", count);
// 获取一个future,若它还没有完结(仍然是Some,不是None),则对它进行一次poll并测验完结它
let mut future_slot = task.future.lock().unwrap();
if let Some(mut future) = future_slot.take() {
// 根据使命本身创立一个 `LocalWaker`
let waker = waker_ref(&task);
let context = &mut Context::from_waker(&*waker);
// `BoxFuture<T>`是`Pin<Box<dyn Future<Output = T> + Send + 'static>>`的类型别号
// 经过调用`as_mut`办法,能够将上面的类型转换成`Pin<&mut dyn Future + Send + 'static>`
if future.as_mut().poll(context).is_pending() {
println!("executor run the future task, but is not ready, create a future again.");
// Future还没履行完,因此将它放回使命中,等候下次被poll
*future_slot = Some(future);
} else {
println!("executor run the future task, is ready. the future task is done.");
}
}
}
}
}
fn main() {
let (executor, spawner) = new_executor_and_spawner();
// 将 TimerFuture 封装成一个使命,分发到调度器去履行
spawner.spawn(async {
println!("TimerFuture await");
// 创立定时器Future,并等候它完结
TimerFuture::new(Duration::new(10, 0)).await;
println!("TimerFuture Done");
});
// drop掉使命,这样履行器就知道使命现已完结,不会再有新的使命进来
drop(spawner);
// 运转履行器直到使命行列为空
// 使命运转后,会先打印`howdy!`, 暂停2秒,接着打印 `done!`
executor.run();
}
运转成果如下:
first dispatch the future task to executor.
received task. 1
TimerFuture await
future not ready, tell the future task how to wakeup to executor
executor run the future task, but is not ready, create a future again.
detect future is ready, wakeup the future task to executor.
received task. 2
future ready. execute poll to return.
TimerFuture Done
executor run the future task, is ready. the future task is done.
第一次调度的时分,由于还没有安排妥当,在Pending
状况,告知这个使命,后边安排妥当是怎样唤醒wake
该使命。然后当事情安排妥当的时分,由于前面告知了怎么唤醒,按办法唤醒了该使命去调度履行。
异步处理流程
Reactor Pattern 是构建高性能事情驱动系统的一个很典型形式,executor 和 reactor 是 Reactor Pattern 的组成部分。Reactor pattern 包含三部分:
- task:待处理的使命。使命能够被打断,并且把控制权交给 executor,等候之后的调度
- executor:一个调度器。保护等候运转的使命(ready queue),以及被堵塞的使命(wait queue)
- reactor:保护事情行列。当事情来暂时,告知 executor 唤醒某个使命等候运转
executor 会调度履行待处理的使命,当使命无法持续进行却又没有完结时,它会挂起使命,并设置好适宜的唤醒条件。之后,假如 reactor 得到了满意条件的事情,它会唤醒之前挂起的使命,然后 executor 就有机会持续履行这个使命。这样一直循环下去,直到使命履行完毕。
Rust 运用 Future 做异步处理便是一个典型的 Reactor Pattern 形式。
以 tokio 为例:async/await 供给语法层面的支撑,Future 是异步使命的数据结构,当 .await 时,executor 就会调度并履行它
下图为 tokio 上 Future 异步处理整个流程:
引证自《陈天 Rust 编程第一课》
注:tokio 的调度器会运转在多个线程上,运转线程上自己的 ready queue 上的使命(Future),假如没有,就去别的线程的调度器上偷一些过来运转(work-stealing 调度机制)。当某个使命无法再持续取得发展,此时 Future 运转的成果是
Poll::Pending
,那么调度器会挂起使命,并设置好适宜的唤醒条件(Waker),等候被 reactor 唤醒。而reactor 会利用操作系统供给的异步 I/O(如epoll / kqueue / IOCP),来监听操作系统供给的 IO 事情,当遇到满意条件的事情时,就会调用 Waker.wake() 唤醒被挂起的 Future,这个 Future 会回到 ready queue 等候履行。
总结
Future 是 Rust 异步编程的核心,代表一些将在未来完结的操作。 Rust 的Future 是慵懒的,需求履行器executor 调度履行,这种调度履行完结根据轮询,在当时轮询 poll
中,若 Future 完结了,则回来Poll::Ready(result)
,即得到 Future 的值并回来;若 Future 还没完结,则回来Poll::Pending()
,此时 Future 会被挂起,需求等某个事情产生 Waker 将其唤醒,Waker 供给wake()
办法来告知履行器哪个相关使命应该要唤醒。当wake()
函数被调用时, 履行器知道 Waker 相关的使命现已准备好持续了,该 future 会被再轮询一遍。这种 wake
告知然后 poll
的方法会不断重复,直到 Future 完结。
每个异步使命分成三个阶段:
-
轮询阶段(The Poll): 调度履行器(executor)触发一个
Future
被轮询后,开端履行,遇堵塞(Pending
)则挂起进入等候阶段。 -
等候阶段:事情源(一般称为reactor)注册
Waker
等候一个事情产生,当该事情准备好时唤醒 wake 相应的Future
,进入唤醒阶段。 -
唤醒阶段:事情产生,相应的
Future
被 Waker 唤醒。 履行器(executor)调度Future
再次被轮询,并向前走一步,直到它完结或达到一个Pending
点,不能再向前走, 如此往复,直到终究完结。
参考
-
Rust 圣经 – 异步编程
-
Rust 异步编程
-
200 行代码讲透 RUST FUTURES
-
Futures Explained in 200 Lines of Rust
-
陈天 Rust 编程第一课 – 异步处理