现在咱们现已构建了block_on函数,是时分进一步将其转化为一个真实的履行器了。 咱们期望咱们的遗履行器不仅仅一次运转一个future,而是一起运转多个future!

这篇博文的灵感来自于juliex,一个最小的履行器,作者也是Rust中的async/await功能的开拓者之一。 今天咱们要从头开始写一个更现代、更明晰的juliex版本。

咱们的履行器的目标是只运用简略和彻底安全的代码,可是性能能够与现有的最佳履行器匹敌。

咱们将用作依靠的crate包括crossbeam、async-task、once_cell、futures和num_cpus。

接口

履行器只要一个函数,便是运转一个future:

fn spawn<F, R>(future: F) -> JoinHandle<R>
where
    F: Future<Output = R> + Send + 'static,
    R: Send + 'static,
{
    todo!()
}

回来的JoinHandle是一种完结了Future的类型,在使命完结后能够取得其输出。

留意这个spawn()函数和std::thread::spawn()之间的相似之处——它们几乎是等价的,除了一个产生异步使命,另一个产生线程。

下面是一个简略的比方,生成一个使命并等候它的输出:

fn main() {
    futures::executor::block_on(async {
        let handle = spawn(async { 1 + 2 });
        assert_eq!(handle.await, 3);
    });
}

将输出传递给JoinHandle

既然 JoinHandle是一个完结 Future 的类型,那么让咱们暂先简略地将它界说为一个固定到堆上的future的别名:

type JoinHandle<R> = Pin<Box<dyn Future<Output = R> + Send>>;

这个办法目前可行,可是不要忧虑,稍后咱们会将它作为一个新的结构明晰地重写,并手动完结 Future。

产生的 future 的输出有必要以某种办法发送到 JoinHandle。 一种办法是创建一个 oneshot 通道,并在future完结时经过该通道发送输出。 那么 JoinHandle 便是一个等候来自通道的音讯的future:

use futures::channel::oneshot;
fn spawn<F, R>(future: F) -> JoinHandle<R>
where
    F: Future<Output = R> + Send + 'static,
    R: Send + 'static,
{
    let (s, r) = oneshot::channel();
    let future = async move {
        let _ = s.send(future.await);
    };
    todo!()
    Box::pin(async { r.await.unwrap() })
}

下一步是在堆上分配future包装器,并将其推入某种大局使命行列,以便由履行程序处理。 咱们称这种分配的future为一项使命。

使命的分析

使命(task)包括future和它的状况。 咱们需求跟踪状况,以了解使命是否计划运转、是否当时正在运转、是否现已完结等等。

下面是咱们的Task类型的界说:

struct Task {
    state: AtomicUsize,
    future: Mutex<Pin<Box<dyn Future<Output = ()> + Send>>>,
}

咱们还没有确认状况到底是什么,但它将是某种能够从任何线程更新的 AtomicUsize。 咱们今后再说吧。

Future 的输出类型是()ーー这是因为 spawn ()函数将原始的 future 包装成一个将输出发送到 oneshot 通道,然后简略地回来()。

future被固定在堆上。 这是因为只要pin的future才干被轮询(poll)。 可是为什么它还被包装在Mutex中呢?

每个与使命相关联的 Waker 都会保存一个 Task 引证,这样它就能够经过将使命推入大局使命行列来唤醒使命。 问题就在这儿: 使命实例在线程之间同享,可是轮询future需求对它的可变拜访。 解决方案: 咱们将future封装到互斥对象中,以取得对它的可变拜访权。

假如这一切听起来让人困惑,不要忧虑,一旦咱们完结了整个履行器,理解起来就会简略得多!

让咱们来分配一个保存future和他的状况的Task来完结spawn函数:

fn spawn<F, R>(future: F) -> JoinHandle<R>
where
    F: Future<Output = R> + Send + 'static,
    R: Send + 'static,
{
    let (s, r) = oneshot::channel();
    let future = async move {
        let _ = s.send(future.await);
    };
    let task = Arc::new(Task {
        state: AtomicUsize::new(0),
        future: Mutex::new(Box::pin(future)),
    });
    QUEUE.send(task).unwrap();
    Box::pin(async { r.await.unwrap() })
}

一旦使命被分配,咱们将其推入 QUEUE,这是一个包括可运转使命的大局行列。 Spawn ()函数现在现已完结,所以让咱们接下来界说 QUEUE..。

履行器线程

因为咱们正在构建一个履行器,所以有必要有一个后台线程池,它从行列中获取可运转的使命并运转它们,即轮询它们的future。

让咱们界说大局使命行列,而且在它第一次被初始化时产生履行线程池:

use crossbeam::channel;
use once_cell::sync::Lazy;
static QUEUE: Lazy<channel::Sender<Arc<Task>>> = Lazy::new(|| {
    let (sender, receiver) = channel::unbounded::<Arc<Task>>();
    for _ in 0..num_cpus::get().max(1) {
        let receiver = receiver.clone();
        thread::spawn(move || receiver.iter().for_each(|task| task.run()));
    }
    sender
});

十分简略——履行器线程实际上是一行代码! 使命行列是一个无界通道,而履行器线程则从这个通道接收使命并运转每个使命.

履行器线程的数量等于体系上的核心数量,该核心数量由nums_cpus供给。

现在咱们现已有了使命行列和线程池,最后一个需求完结的部分是run()办法。

使命履行

运转一个使命仅仅意味着轮询它的future。 咱们现已从咱们完结block_on()的前一篇博客文章中知道怎么轮询future。

Run()办法如下所示:

impl Task {
    fn run(self: Arc<Task>) {
        let waker = todo!();
        let cx = &mut Context::from_waker(&waker);
        self.future.try_lock().unwrap().as_mut().poll(cx);
    }
}

请留意,咱们需求确定future,以取得可变拜访权并对其进行轮询。 根据规划,没有其他线程会一起持有锁,因而try_lock()有必要总是成功。

可是咱们怎么发明一个唤醒者呢? 咱们将像前次相同运用async_task::waker_fn(),但唤醒函数应该做什么呢?

咱们不能就这样把一个Arc<Task>放到QUEUE中,以下是咱们应该考虑的潜在竞赛冲突:

  • 假如一个使命现已完结了,然后被唤醒了怎么办? Waker生命周期会超越他关联的Future,而且咱们也不想在行列中包括现已完结的使命.
  • 假如一个使命在运转之前,连续被唤醒两次会怎么样? 咱们不期望在行列中同一个使命出现两次.
  • 假如一个使命正在运转的时分被唤醒了怎么办? 假如这时分将其加入行列中,另一个履行线程或许试图运转它,这将导致一个使命一起在两个线程上运转.

假如咱们仔细想想,咱们会想出两个简略的规矩,高雅地解决一切这些问题:

  1. 假如还没有被唤醒而且当时没有正在运转,唤醒函数会安排此使命
  2. 假如一个使命正在运转时被唤醒,由当时履行器线程(当时正在运转这个future的那个线程)从头调度它.

让咱们勾勒出这些规矩:

impl Task {
    fn run(self: Arc<Task>) {
        let waker = async_task::waker_fn(|| {
            todo!("schedule if the task is not woken already and is not running");
        });
        let cx = &mut Context::from_waker(&waker);
        self.future.try_lock().unwrap().as_mut().poll(cx);
        todo!("schedule if the task was woken while running");
    }
}

还记得咱们在 Task 中界说的 AtomicUsize 类型的状况字段吗? 现在是时分在其间存储一些有用的数据了。 关于使命,有两条信息能够协助咱们完结唤醒: 1. 使命是否现已被唤醒 2. 使命是否正在运转

这两个值都是 true / false 值,咱们能够在 state 字段顶用两个位表示它们:

const WOKEN: usize = 0b01;
const RUNNING: usize = 0b10;

唤醒函数设置“WOKEN”位。 假如两个位从前都是0(即使命既没有被唤醒也没有运转) ,那么咱们经过将引证推入行列来调度使命:

let task = self.clone();
let waker = async_task::waker_fn(move || {
    if task.state.fetch_or(WOKEN, Ordering::SeqCst) == 0 {
        QUEUE.send(task.clone()).unwrap();
    }
});

在轮询future之前,咱们撤销了WOKEN位的设置,并设置了RUNNING位:

self.state.store(RUNNING, Ordering::SeqCst);
let cx = &mut Context::from_waker(&waker);
let poll = self.future.try_lock().unwrap().as_mut().poll(cx);

在轮询future之后,咱们撤销RUNNING位的设置,并检查从前的状况是否现已设置了WOKEN 和 RUNNING 位(即使命在运转时被唤醒)。 假如是这样,咱们从头安排使命:

风趣的是,假如使命完结了(即它的future不再是pending) ,咱们就会让它永远处于RUNNING状况。 这样future今后被唤醒后就不或许再次进入行列。

咱们现在有了一个真实的履行器ーー在v1.rs中看到完整的完结。

一点魔法

假如您发现处理Task结构体及其状况转化很有挑战,我感同身受。 但也有好音讯,这些作业都不需求你亲自做,运用asyc-task即可!

咱们只需求用async_task::Task()替换Arc<Task>,并用async-task::JoinHandle<()>替换oneshot通道。

这便是咱们怎么简化生成:

type Task = async_task::Task<()>;
fn spawn<F, R>(future: F) -> JoinHandle<R>
where
    F: Future<Output = R> + Send + 'static,
    R: Send + 'static,
{
    let (task, handle) = async_task::spawn(future, |t| QUEUE.send(t).unwrap(), ());
    task.schedule();
    Box::pin(async { handle.await.unwrap() })
}

async_task::spawn()接受三个参数: 1. 待运转的future 2. 一个将使命放入行列的调度函数. 该函数或许被唤醒器履行,也或许被run()在轮询future后履行. 3. 一个包括恣意信息的tag,这个tag信息会保存在task中. 这篇博客中咱们不考虑仅仅简略的保存(),也便是疏忽它.

然后构造函数回来两个值: 1.async_task::Task<()>,其间()便是刚刚传入的tag. 2.async_task::JoinHandle<R, ()>,这儿的()仍是刚刚的tag. 这个JoinHandle是一个future,它完结的时分会回来一个Option<R>. 当回来None的时分表示使命产生了panic或许被撤销了.

假如您想知道schedule()办法,它只需调用使命上的 schedule 函数将其推入行列。 咱们也能够自己将使命推入QUEUE——终究成果是相同的。

综上所述,咱们终究得到了这个十分简略的履行器:

static QUEUE: Lazy<channel::Sender<Task>> = Lazy::new(|| {
    let (sender, receiver) = channel::unbounded::<Task>();
    for _ in 0..num_cpus::get().max(1) {
        let receiver = receiver.clone();
        thread::spawn(move || receiver.iter().for_each(|task| task.run()));
    }
    sender
});
type Task = async_task::Task<()>;
type JoinHandle<R> = Pin<Box<dyn Future<Output = R> + Send>>;
fn spawn<F, R>(future: F) -> JoinHandle<R>
where
    F: Future<Output = R> + Send + 'static,
    R: Send + 'static,
{
    let (task, handle) = async_task::spawn(future, |t| QUEUE.send(t).unwrap(), ());
    task.schedule();
    Box::pin(async { handle.await.unwrap() })
}

完整的代码能够在v2.rs中找到。

这儿运用async_task::spawn()的好处不仅仅是简略。它也比咱们自己写的Task更有功率,也更健壮。 举一个健壮性的比方,async_task::Task在完结后当即删去未来,而不是等候使命的一切引证都失效后才删去。

除此之外,async-task还供给了一些有用的特性,比方tags和cancellation,可是咱们今天不讨论这些。 还值得一提的是,async-task 是一个#[no_std]crate,乃至能够在没有标准库的情况下运用。

改进的JoinHandle

假如你仔细观察咱们最新的履行器,还有一个功率低下的实例——JoinHandle的冗余Box::pin()分配。

假如咱们能够运用下面的类型别名就更好了,可是咱们不能,因为async_task::JoinHandle<R>输出Option<R>,而JoinHandle输出R:

type JoinHandle<R> = async_task::JoinHandle<R, ()>;

咱们只能将async_task::JoinHandle封装到一个新的结构体中,假如使命产生panic或许被撤销,它也会panic: >这句话感觉说不通呢,需求看看async_task源码才行

struct JoinHandle<R>(async_task::JoinHandle<R, ()>);
impl<R> Future for JoinHandle<R> {
    type Output = R;
    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        match Pin::new(&mut self.0).poll(cx) {
            Poll::Pending => Poll::Pending,
            Poll::Ready(output) => Poll::Ready(output.expect("task failed")),
        }
    }
}

完整的履行器完结能够在v3.rs中找到。

处理惊惧(panic)

到目前为止,咱们还没有真实考虑过当使命感到惊惧时会产生什么,即调用poll()时会产生惊惧。 现在run ()办法仅仅将惊惧传播到履行器中。 咱们应该考虑这是否是咱们真实想要的。

明智的做法是以某种办法处理这些惊惧。 例如,咱们能够简略地疏忽惊惧,持续运转。 这样它们仅仅屏幕上打印信息,但不会溃散整个进程ーー惊惧的线程的作业办法彻底相同。

为了疏忽惊惧,咱们将run()包装成catch_unwind():

use std::panic::catch_unwind;
static QUEUE: Lazy<channel::Sender<Task>> = Lazy::new(|| {
    let (sender, receiver) = channel::unbounded::<Task>();
    for _ in 0..num_cpus::get().max(1) {
        let receiver = receiver.clone();
        thread::spawn(move || {
            receiver.iter().for_each(|task| {
                let _ = catch_unwind(|| task.run());
            })
        });
    }
    sender
});

在v4.rs中能够找到疏忽惊惧的完整履行器代码。

有许多明智的应对惊惧的战略。下面是一些在async-task库中供给的比方:

  • 疏忽惊惧– 惊惧直接被疏忽,当JoinHandle<R>在await时也会产生惊惧
  • 传播惊惧panic被从头放入在等候JoinHandle<R>成果的那个使命中.
  • 输出惊惧JoinHandle<R>输出std::thread::Result<R>.

完结任何你想要的惊惧处理战略都是很简略的。 这彻底由你来决议哪一个是最好的!

履行器的功率

当时的代码简短、简略、安全,但它有多快呢?

async_task::spawn ()分配的使命仅仅一个分配,存储使命状况、future以及future完结后的输出。 没有其他躲藏成本了ーーspawn的速度实际上现已到了极限!

其他履行器,如 async-std 和 tokio,分配使命的办法彻底相同。 咱们的履行器的基础本质上是一个最优的完结,现在咱们离与盛行的履行器竞赛只要一步之遥: 使命盗取。

现在,一切的履行器线程同享相同的使命行列。 假如一切线程都在一起拜访行列,则因为争用,性能将受到影响。 使命盗取背后的主意是为每个履行器线程分配一个不同的行列。 这样履行器线程只需求在自己的行列为空时从其他行列中盗取使命,这意味着争用只会很少产生,而不是一向产生。

我将在另一篇博客文章中更多地议论使命盗取。

正确性

每个人都告诉咱们,并发是困难的。 Go言语供给了一个内置的竞赛检测器,tokio创建了自己的并发检查器 loom 来寻觅并发错误,而crossbeam在某些情况下乃至采用了方式证明。 听起来很可怕!

可是咱们能够坐下来,放松,不必忧虑。 竞赛检测器,消毒器,乃至miri(译者 Miri是一个实验性的 Rust MIR解释器。它能够运转Rust二进制文件,对其进行测验,能够检查出某些未界说的行为)或loom,都不能在咱们的遗言履行器上捕捉到bug。 原因是咱们只编写了安全代码,而安全代码是内存安全的,也便是说它不能包括数据竞赛。 Rust的类型体系现已证明咱们的履行器是正确的。

确保内存安全的担负彻底由依靠的crate承当,更确切地说是aysnc-task和crossbeam。 请定心,两者都十分重视正确性。async-task有一个覆盖一切边际情况的广泛测验套件,crossbeam的通道有许多测验,乃至经过Go和std::sync::mpsc测验套件,作业盗取双向行列根据一个经过方式证明的完结,而根据epoch的垃圾收集器也有正确性证明。

适用于一切人的履行器

自从Alex和Aaron在2016年初次designed zero-cost futures以来,他们的计划便是每个spawn的future只进行一次内存分配:

每个“使命”需求一个分配,成果通常是每个连接需求一个分配。

然而,单次分配使命是一个好心的谎话ーー咱们花了好几年才真实得到它们。 比方tokio 0.1版本中spawn时需求分配一个future,然后分配使命状况,最后分配一个oneshot通道。 也便是每个spawn三个分配点!

然后,在2019年8月,async-task诞生了。 有史以来第一次,咱们成功地将future、使命状况和通道的分配压缩为单次分配。 之所以花费这么长期,是因为使命内部的手动分配和状况转化办理十分复杂。 可是现在现已完结了,你再也不必忧虑任何事情了。

此后不久,在2019年10月,tokio也采用了类似于 async-task 的完结办法。

现在,任何人都能够经过单次分配使命来构建一个高效的履行器。 从前的火箭科学现在现已不复存在了。

原文:stjepang.github.io/2020/01/31/…

翻译:stevenbai.top/rust/build_…

作者: stevenbai 博客:stevenbai.top/