现在咱们现已构建了block_on函数,是时分进一步将其转化为一个真实的履行器了。 咱们期望咱们的遗履行器不仅仅一次运转一个future,而是一起运转多个future!
这篇博文的灵感来自于juliex,一个最小的履行器,作者也是Rust中的async/await功能的开拓者之一。 今天咱们要从头开始写一个更现代、更明晰的juliex
版本。
咱们的履行器的目标是只运用简略和彻底安全的代码,可是性能能够与现有的最佳履行器匹敌。
咱们将用作依靠的crate包括crossbeam、async-task、once_cell、futures和num_cpus。
接口
履行器只要一个函数,便是运转一个future:
fn spawn<F, R>(future: F) -> JoinHandle<R>
where
F: Future<Output = R> + Send + 'static,
R: Send + 'static,
{
todo!()
}
回来的JoinHandle是一种完结了Future的类型,在使命完结后能够取得其输出。
留意这个spawn()函数和std::thread::spawn()之间的相似之处——它们几乎是等价的,除了一个产生异步使命,另一个产生线程。
下面是一个简略的比方,生成一个使命并等候它的输出:
fn main() {
futures::executor::block_on(async {
let handle = spawn(async { 1 + 2 });
assert_eq!(handle.await, 3);
});
}
将输出传递给JoinHandle
既然 JoinHandle是一个完结 Future 的类型,那么让咱们暂先简略地将它界说为一个固定到堆上的future的别名:
type JoinHandle<R> = Pin<Box<dyn Future<Output = R> + Send>>;
这个办法目前可行,可是不要忧虑,稍后咱们会将它作为一个新的结构明晰地重写,并手动完结 Future。
产生的 future 的输出有必要以某种办法发送到 JoinHandle。 一种办法是创建一个 oneshot 通道,并在future完结时经过该通道发送输出。 那么 JoinHandle 便是一个等候来自通道的音讯的future:
use futures::channel::oneshot;
fn spawn<F, R>(future: F) -> JoinHandle<R>
where
F: Future<Output = R> + Send + 'static,
R: Send + 'static,
{
let (s, r) = oneshot::channel();
let future = async move {
let _ = s.send(future.await);
};
todo!()
Box::pin(async { r.await.unwrap() })
}
下一步是在堆上分配future包装器,并将其推入某种大局使命行列,以便由履行程序处理。 咱们称这种分配的future为一项使命。
使命的分析
使命(task)包括future和它的状况。 咱们需求跟踪状况,以了解使命是否计划运转、是否当时正在运转、是否现已完结等等。
下面是咱们的Task类型的界说:
struct Task {
state: AtomicUsize,
future: Mutex<Pin<Box<dyn Future<Output = ()> + Send>>>,
}
咱们还没有确认状况到底是什么,但它将是某种能够从任何线程更新的 AtomicUsize。 咱们今后再说吧。
Future 的输出类型是()ーー这是因为 spawn ()函数将原始的 future 包装成一个将输出发送到 oneshot 通道,然后简略地回来()。
future被固定在堆上。 这是因为只要pin的future才干被轮询(poll)。 可是为什么它还被包装在Mutex中呢?
每个与使命相关联的 Waker 都会保存一个 Task 引证,这样它就能够经过将使命推入大局使命行列来唤醒使命。 问题就在这儿: 使命实例在线程之间同享,可是轮询future需求对它的可变拜访。 解决方案: 咱们将future封装到互斥对象中,以取得对它的可变拜访权。
假如这一切听起来让人困惑,不要忧虑,一旦咱们完结了整个履行器,理解起来就会简略得多!
让咱们来分配一个保存future和他的状况的Task来完结spawn函数:
fn spawn<F, R>(future: F) -> JoinHandle<R>
where
F: Future<Output = R> + Send + 'static,
R: Send + 'static,
{
let (s, r) = oneshot::channel();
let future = async move {
let _ = s.send(future.await);
};
let task = Arc::new(Task {
state: AtomicUsize::new(0),
future: Mutex::new(Box::pin(future)),
});
QUEUE.send(task).unwrap();
Box::pin(async { r.await.unwrap() })
}
一旦使命被分配,咱们将其推入 QUEUE,这是一个包括可运转使命的大局行列。 Spawn ()函数现在现已完结,所以让咱们接下来界说 QUEUE..。
履行器线程
因为咱们正在构建一个履行器,所以有必要有一个后台线程池,它从行列中获取可运转的使命并运转它们,即轮询它们的future。
让咱们界说大局使命行列,而且在它第一次被初始化时产生履行线程池:
use crossbeam::channel;
use once_cell::sync::Lazy;
static QUEUE: Lazy<channel::Sender<Arc<Task>>> = Lazy::new(|| {
let (sender, receiver) = channel::unbounded::<Arc<Task>>();
for _ in 0..num_cpus::get().max(1) {
let receiver = receiver.clone();
thread::spawn(move || receiver.iter().for_each(|task| task.run()));
}
sender
});
十分简略——履行器线程实际上是一行代码! 使命行列是一个无界通道,而履行器线程则从这个通道接收使命并运转每个使命.
履行器线程的数量等于体系上的核心数量,该核心数量由nums_cpus
供给。
现在咱们现已有了使命行列和线程池,最后一个需求完结的部分是run()办法。
使命履行
运转一个使命仅仅意味着轮询它的future。 咱们现已从咱们完结block_on()
的前一篇博客文章中知道怎么轮询future。
Run()办法如下所示:
impl Task {
fn run(self: Arc<Task>) {
let waker = todo!();
let cx = &mut Context::from_waker(&waker);
self.future.try_lock().unwrap().as_mut().poll(cx);
}
}
请留意,咱们需求确定future,以取得可变拜访权并对其进行轮询。 根据规划,没有其他线程会一起持有锁,因而try_lock()有必要总是成功。
可是咱们怎么发明一个唤醒者呢? 咱们将像前次相同运用async_task::waker_fn()
,但唤醒函数应该做什么呢?
咱们不能就这样把一个Arc<Task>
放到QUEUE中,以下是咱们应该考虑的潜在竞赛冲突:
- 假如一个使命现已完结了,然后被唤醒了怎么办? Waker生命周期会超越他关联的Future,而且咱们也不想在行列中包括现已完结的使命.
- 假如一个使命在运转之前,连续被唤醒两次会怎么样? 咱们不期望在行列中同一个使命出现两次.
- 假如一个使命正在运转的时分被唤醒了怎么办? 假如这时分将其加入行列中,另一个履行线程或许试图运转它,这将导致一个使命一起在两个线程上运转.
假如咱们仔细想想,咱们会想出两个简略的规矩,高雅地解决一切这些问题:
- 假如还没有被唤醒而且当时没有正在运转,唤醒函数会安排此使命
- 假如一个使命正在运转时被唤醒,由当时履行器线程(当时正在运转这个future的那个线程)从头调度它.
让咱们勾勒出这些规矩:
impl Task {
fn run(self: Arc<Task>) {
let waker = async_task::waker_fn(|| {
todo!("schedule if the task is not woken already and is not running");
});
let cx = &mut Context::from_waker(&waker);
self.future.try_lock().unwrap().as_mut().poll(cx);
todo!("schedule if the task was woken while running");
}
}
还记得咱们在 Task 中界说的 AtomicUsize 类型的状况字段吗? 现在是时分在其间存储一些有用的数据了。 关于使命,有两条信息能够协助咱们完结唤醒: 1. 使命是否现已被唤醒 2. 使命是否正在运转
这两个值都是 true / false 值,咱们能够在 state 字段顶用两个位表示它们:
const WOKEN: usize = 0b01;
const RUNNING: usize = 0b10;
唤醒函数设置“WOKEN”位。 假如两个位从前都是0(即使命既没有被唤醒也没有运转) ,那么咱们经过将引证推入行列来调度使命:
let task = self.clone();
let waker = async_task::waker_fn(move || {
if task.state.fetch_or(WOKEN, Ordering::SeqCst) == 0 {
QUEUE.send(task.clone()).unwrap();
}
});
在轮询future之前,咱们撤销了WOKEN位的设置,并设置了RUNNING位:
self.state.store(RUNNING, Ordering::SeqCst);
let cx = &mut Context::from_waker(&waker);
let poll = self.future.try_lock().unwrap().as_mut().poll(cx);
在轮询future之后,咱们撤销RUNNING位的设置,并检查从前的状况是否现已设置了WOKEN 和 RUNNING 位(即使命在运转时被唤醒)。 假如是这样,咱们从头安排使命:
风趣的是,假如使命完结了(即它的future不再是pending) ,咱们就会让它永远处于RUNNING
状况。 这样future今后被唤醒后就不或许再次进入行列。
咱们现在有了一个真实的履行器ーー在v1.rs中看到完整的完结。
一点魔法
假如您发现处理Task
结构体及其状况转化很有挑战,我感同身受。 但也有好音讯,这些作业都不需求你亲自做,运用asyc-task
即可!
咱们只需求用async_task::Task()
替换Arc<Task>
,并用async-task::JoinHandle<()>
替换oneshot
通道。
这便是咱们怎么简化生成:
type Task = async_task::Task<()>;
fn spawn<F, R>(future: F) -> JoinHandle<R>
where
F: Future<Output = R> + Send + 'static,
R: Send + 'static,
{
let (task, handle) = async_task::spawn(future, |t| QUEUE.send(t).unwrap(), ());
task.schedule();
Box::pin(async { handle.await.unwrap() })
}
async_task::spawn()
接受三个参数: 1. 待运转的future 2. 一个将使命放入行列的调度函数. 该函数或许被唤醒器履行,也或许被run()
在轮询future后履行. 3. 一个包括恣意信息的tag,这个tag信息会保存在task中. 这篇博客中咱们不考虑仅仅简略的保存()
,也便是疏忽它.
然后构造函数回来两个值: 1.async_task::Task<()>
,其间()
便是刚刚传入的tag. 2.async_task::JoinHandle<R, ()>
,这儿的()
仍是刚刚的tag. 这个JoinHandle是一个future,它完结的时分会回来一个Option<R>
. 当回来None的时分表示使命产生了panic或许被撤销了.
假如您想知道schedule()
办法,它只需调用使命上的 schedule 函数将其推入行列。 咱们也能够自己将使命推入QUEUE——终究成果是相同的。
综上所述,咱们终究得到了这个十分简略的履行器:
static QUEUE: Lazy<channel::Sender<Task>> = Lazy::new(|| {
let (sender, receiver) = channel::unbounded::<Task>();
for _ in 0..num_cpus::get().max(1) {
let receiver = receiver.clone();
thread::spawn(move || receiver.iter().for_each(|task| task.run()));
}
sender
});
type Task = async_task::Task<()>;
type JoinHandle<R> = Pin<Box<dyn Future<Output = R> + Send>>;
fn spawn<F, R>(future: F) -> JoinHandle<R>
where
F: Future<Output = R> + Send + 'static,
R: Send + 'static,
{
let (task, handle) = async_task::spawn(future, |t| QUEUE.send(t).unwrap(), ());
task.schedule();
Box::pin(async { handle.await.unwrap() })
}
完整的代码能够在v2.rs中找到。
这儿运用async_task::spawn()
的好处不仅仅是简略。它也比咱们自己写的Task更有功率,也更健壮。 举一个健壮性的比方,async_task::Task
在完结后当即删去未来,而不是等候使命的一切引证都失效后才删去。
除此之外,async-task
还供给了一些有用的特性,比方tags和cancellation,可是咱们今天不讨论这些。 还值得一提的是,async-task 是一个#[no_std]
crate,乃至能够在没有标准库的情况下运用。
改进的JoinHandle
假如你仔细观察咱们最新的履行器,还有一个功率低下的实例——JoinHandle的冗余Box::pin()
分配。
假如咱们能够运用下面的类型别名就更好了,可是咱们不能,因为async_task::JoinHandle<R>
输出Option<R>
,而JoinHandle输出R:
type JoinHandle<R> = async_task::JoinHandle<R, ()>;
咱们只能将async_task::JoinHandle
封装到一个新的结构体中,假如使命产生panic或许被撤销,它也会panic: >这句话感觉说不通呢,需求看看async_task源码才行
struct JoinHandle<R>(async_task::JoinHandle<R, ()>);
impl<R> Future for JoinHandle<R> {
type Output = R;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match Pin::new(&mut self.0).poll(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(output) => Poll::Ready(output.expect("task failed")),
}
}
}
完整的履行器完结能够在v3.rs中找到。
处理惊惧(panic)
到目前为止,咱们还没有真实考虑过当使命感到惊惧时会产生什么,即调用poll()时会产生惊惧。 现在run ()
办法仅仅将惊惧传播到履行器中。 咱们应该考虑这是否是咱们真实想要的。
明智的做法是以某种办法处理这些惊惧。 例如,咱们能够简略地疏忽惊惧,持续运转。 这样它们仅仅屏幕上打印信息,但不会溃散整个进程ーー惊惧的线程的作业办法彻底相同。
为了疏忽惊惧,咱们将run()包装成catch_unwind():
use std::panic::catch_unwind;
static QUEUE: Lazy<channel::Sender<Task>> = Lazy::new(|| {
let (sender, receiver) = channel::unbounded::<Task>();
for _ in 0..num_cpus::get().max(1) {
let receiver = receiver.clone();
thread::spawn(move || {
receiver.iter().for_each(|task| {
let _ = catch_unwind(|| task.run());
})
});
}
sender
});
在v4.rs中能够找到疏忽惊惧的完整履行器代码。
有许多明智的应对惊惧的战略。下面是一些在async-task
库中供给的比方:
-
疏忽惊惧– 惊惧直接被疏忽,当
JoinHandle<R>
在await时也会产生惊惧 -
传播惊惧panic被从头放入在等候
JoinHandle<R>
成果的那个使命中. -
输出惊惧
JoinHandle<R>
输出std::thread::Result<R>
.
完结任何你想要的惊惧处理战略都是很简略的。 这彻底由你来决议哪一个是最好的!
履行器的功率
当时的代码简短、简略、安全,但它有多快呢?
async_task::spawn ()分配的使命仅仅一个分配,存储使命状况、future以及future完结后的输出。 没有其他躲藏成本了ーーspawn的速度实际上现已到了极限!
其他履行器,如 async-std 和 tokio,分配使命的办法彻底相同。 咱们的履行器的基础本质上是一个最优的完结,现在咱们离与盛行的履行器竞赛只要一步之遥: 使命盗取。
现在,一切的履行器线程同享相同的使命行列。 假如一切线程都在一起拜访行列,则因为争用,性能将受到影响。 使命盗取背后的主意是为每个履行器线程分配一个不同的行列。 这样履行器线程只需求在自己的行列为空时从其他行列中盗取使命,这意味着争用只会很少产生,而不是一向产生。
我将在另一篇博客文章中更多地议论使命盗取。
正确性
每个人都告诉咱们,并发是困难的。 Go言语供给了一个内置的竞赛检测器,tokio创建了自己的并发检查器 loom 来寻觅并发错误,而crossbeam在某些情况下乃至采用了方式证明。 听起来很可怕!
可是咱们能够坐下来,放松,不必忧虑。 竞赛检测器,消毒器,乃至miri(译者 Miri是一个实验性的 Rust MIR解释器。它能够运转Rust二进制文件,对其进行测验,能够检查出某些未界说的行为)或loom,都不能在咱们的遗言履行器上捕捉到bug。 原因是咱们只编写了安全代码,而安全代码是内存安全的,也便是说它不能包括数据竞赛。 Rust的类型体系现已证明咱们的履行器是正确的。
确保内存安全的担负彻底由依靠的crate承当,更确切地说是aysnc-task和crossbeam。 请定心,两者都十分重视正确性。async-task
有一个覆盖一切边际情况的广泛测验套件,crossbeam的通道有许多测验,乃至经过Go和std::sync::mpsc测验套件,作业盗取双向行列根据一个经过方式证明的完结,而根据epoch的垃圾收集器也有正确性证明。
适用于一切人的履行器
自从Alex和Aaron在2016年初次designed zero-cost futures以来,他们的计划便是每个spawn的future只进行一次内存分配:
每个“使命”需求一个分配,成果通常是每个连接需求一个分配。
然而,单次分配使命是一个好心的谎话ーー咱们花了好几年才真实得到它们。 比方tokio 0.1版本中spawn时需求分配一个future,然后分配使命状况,最后分配一个oneshot通道。 也便是每个spawn三个分配点!
然后,在2019年8月,async-task诞生了。 有史以来第一次,咱们成功地将future、使命状况和通道的分配压缩为单次分配。 之所以花费这么长期,是因为使命内部的手动分配和状况转化办理十分复杂。 可是现在现已完结了,你再也不必忧虑任何事情了。
此后不久,在2019年10月,tokio也采用了类似于 async-task 的完结办法。
现在,任何人都能够经过单次分配使命来构建一个高效的履行器。 从前的火箭科学现在现已不复存在了。
原文:stjepang.github.io/2020/01/31/…
翻译:stevenbai.top/rust/build_…
作者: stevenbai 博客:stevenbai.top/