你是否曾经想过futures中的block_on是怎么作业的呢?今日咱们就来完结你自己的 block_on 版别.
这篇博客创意应该来自两个 crate, 分别是 wakeful 和 extreme.wakeful
供给了一种简略的直接从一个函数创立Waker的办法,extreme
则供给了一种十分简练的block_on完结.
咱们的完结目标将与extreme
略有不同。 咱们不再寻求零依赖性和最少的代码行数,而是寻求一个安全、高效但仍然相当简略的完结。
咱们将运用的依赖项有pin-utils、crossbeam和async-task。
签名
block_on的签名如下所示。 咱们以一个 future 作为参数,在当时线程上运转它(当它pending时堵塞) ,然后回来它的输出:
fn block_on<F: Future>(future: F) -> F::Output {
todo!()
}
现在让咱们完结遗失的 todo! ()部分。
初度尝试
请注意,规范库的Future中的poll的参数是一个Pin<&mut Future>
。 所以咱们需求先把它固定(Pin)住。 虽然有一种办法能够安全地运用Box::pin()
来完结这一点,可是咱们更乐意把Future放在栈上而不是堆上(译者注 Box::Pin会把Future分配到堆上,然后Pin)。
不幸的是,安全地将Future固定在栈上的仅有办法是运用pin-utils
:
pin_utils::pin_mut!(future);
pin_mut这个宏会将一个类型为F
的future
转换为Pin<&mut F>
,并将其固定在栈上.
接下来咱们需求具体说明当这个future
被唤醒时会发生什么。 在这种情况下,唤醒应该仅仅免除运转future
的线程的堵塞。
结构一个唤醒器或许很丑恶ーー只要看一眼extreme
的完结就能够了。 这是手工构建Waker
最简略的办法! 处处都是原始指针和不安全的代码… … 让咱们暂时越过这一部分,今后再填空。
let waker = todo!();
终究,咱们从Waker
创立一个使命上下文,并在循环中轮询future
。 假如完结,回来输出。 假如它挂起,阻止当时线程:
let cx = &mut Context::from_waker(&waker);
loop {
match future.as_mut().poll(cx) {
Poll::Ready(output) => return output,
Poll::Pending => thread::park(),
}
}
请别对Contex类型感到困惑,它便是 Waker 的一个包装器ーー没有什么比这更好的了。 当在 Rust 中设计 async / await 时,咱们不确定除了传递 Waker给poll ()之外,传递其他任何东西是否有用,所以咱们想出了这个包装器,它或许在 Rust 的未来版别中包含更多的东西。
不管怎样… 咱们差不多完结了。让咱们回到方才的block_on,并将todo替换为上面的代码。
仔细想想,Waker 实际上是Arc<dyn Fn () + Send + Sync>
的精心优化版别,wake()调用这个函数。 换句话说,Waker 是一个回调函数,当将来能够继续履行时,它就会被调用。
因为 Waker 是如此难以结构,所以sagebind供给了waker_fn(),这是一种将任何函数转换为 Waker 的直接办法。 不幸的是,wakeful
好像此时被猛拉,所以我借用了wakerfn()
并将其放入我的async-task
中。
在咱们的代码块中,回调函数便是唤醒future所在线程:
let thread = thread::current();
let waker = async_task::waker_fn(move || thread.unpark());
简略多了! 比起处理RawWaker 和 RawWakerVTable 好多了。
在内部,waker_fn()结构函数实际上创立了Arc<impl Fn () + Send + Sync>
,然后用不安全的代码将其转换为 Waker,这些代码看起来与咱们在extreme中看到的类似。
下面是对block_on的完好完结:
fn block_on<F: Future>(future: F) -> F::Output {
pin_utils::pin_mut!(future);
let thread = thread::current();
let waker = async_task::waker_fn(move || thread.unpark());
let cx = &mut Context::from_waker(&waker);
loop {
match future.as_mut().poll(cx) {
Poll::Ready(output) => return output,
Poll::Pending => thread::park(),
}
}
}
完好代码见v1.rs
park问题
可是,现在还不是庆祝的时分。 有个问题。 假如future的用户代码也运用 park / unpark API,它或许会从回调函数中获取并“盗取” unpark 告诉。 阅读这个问题能够得到更具体的解说。
译者注 说白了便是因为都在用unpark,会让导致不该唤醒的时分被唤醒,或许收不到唤醒告诉.
一个或许的解决方案是运用一种不同于 std: : thread 模块中的线程的park/unpark办法。 这样,future的代码就不会搅扰唤醒。
在crossbeam中有一个十分类似的 park / unpark 机制,只不过它答应咱们创立恣意多的独立的 parkers,而不是每个线程都有一个。 让咱们为block_on的每一次调都独立创立一个parker:
fn block_on<F: Future>(future: F) -> F::Output {
pin_utils::pin_mut!(future);
let parker = Parker::new();
let unparker = parker.unparker().clone();
let waker = async_task::waker_fn(move || unparker.unpark());
let cx = &mut Context::from_waker(&waker);
loop {
match future.as_mut().poll(cx) {
Poll::Ready(output) => return output,
Poll::Pending => parker.park(),
}
}
}
就这样! 问题解决了。 完好的代码在v2.rs,你能够去运转他.
一个优化
创立一个 Parker 和 Waker 并不是免费的ーー这两者都会引起内存分配, 咱们能改进吗?
为什么不在线程本地存储器中缓存它们,而不是在每次调用block_on时结构 Parker 和 Waker 呢? 这样,线程就能够在 block on ()的所有调用中重用相同的实例:
fn block_on<F: Future>(future: F) -> F::Output {
pin_utils::pin_mut!(future);
thread_local! {
static CACHE: (Parker, Waker) = {
let parker = Parker::new();
let unparker = parker.unparker().clone();
let waker = async_task::waker_fn(move || unparker.unpark());
(parker, waker)
};
}
CACHE.with(|(parker, waker)| {
let cx = &mut Context::from_waker(&waker);
loop {
match future.as_mut().poll(cx) {
Poll::Ready(output) => return output,
Poll::Pending => parker.park(),
}
}
})
}
假如future能够快速履行,这个小小的更改将使 block on ()功率大大提升!
完好的代码在v3.rs中
怎么处理递归
是否还有其他问题?
假如future在block_on的内部块再次递归地调用block_on会怎样? 当然咱们能够答应也能够禁止递归。
假如咱们选择答应递归,那么咱们还需求确保 block on ()的递归调用不同享相同的 Parker 和 Waker 实例,否则就无法知道哪个 block on ()调用会被唤醒。
futures的block on 发生递归调用会panic。 对于答应递归仍是禁止递归,我没有激烈的意见—- 这两种行为都是明智的。 可是,已然咱们在仿照futures的版别,那么就不要运用递归。
为了检测递归调用,咱们能够引入另一个线程本地变量来指示咱们当时是否在 block on()中。 但这是一个很大的作业量。
这里有一个很帅的技巧,它只需对代码进行更少的更改。 让咱们把(Parker,Waker)包装到 RefCell 中,假如屡次发生可变的借用,程序就会panic:
fn block_on<F: Future>(future: F) -> F::Output {
pin_utils::pin_mut!(future);
thread_local! {
static CACHE: RefCell<(Parker, Waker)> = {
let parker = Parker::new();
let unparker = parker.unparker().clone();
let waker = async_task::waker_fn(move || unparker.unpark());
RefCell::new((parker, waker))
};
}
CACHE.with(|cache| {
let (parker, waker) = &mut *cache.try_borrow_mut().ok()
.expect("recursive `block_on`");
let cx = &mut Context::from_waker(&waker);
loop {
match future.as_mut().poll(cx) {
Poll::Ready(output) => return output,
Poll::Pending => parker.park(),
}
}
})
}
终于。 现在咱们真的结束了,我保证! 终究的完结是正确的,强健的,高效的。 差不多吧。 :)
完好的代码见v4.rs
Benchmarks
为了测验block_on的功率,让咱们将它与futures的进行基准测验比较。
可是首要,咱们将编写一个 helper future 类型,它能够反复被轮询,直到完结:
struct Yields(u32);
impl Future for Yields {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
if self.0 == 0 {
Poll::Ready(())
} else {
self.0 -= 1;
cx.waker().wake_by_ref();
Poll::Pending
}
}
}
举个例子,为了测验轮询十次的性能,能够这样写:
#[bench]
fn custom_block_on_10_yields(b: &mut Bencher) {
b.iter(|| block_on(Yields(10)));
}
让咱们设定一组三个基准,轮询次数分别为0、10和50。 咱们运用自定义的block_on,然后运用 futures 的。 您能够在yield.rs中找到完好的基准测验代码。
下面是我的机器上的结果:
test custom_block_on_0_yields ... bench: 3 ns/iter (+/- 0)
test custom_block_on_10_yields ... bench: 130 ns/iter (+/- 12)
test custom_block_on_50_yields ... bench: 638 ns/iter (+/- 20)
test futures_block_on_0_yields ... bench: 10 ns/iter (+/- 0)
test futures_block_on_10_yields ... bench: 236 ns/iter (+/- 10)
test futures_block_on_50_yields ... bench: 1,139 ns/iter (+/- 30)
结果显现,在这个特定的基准测验中,咱们的block_on比futures的大约快2到3倍,这一点都不差!
结论
Async Rust 之所以令人生畏,是因为它包含了太多的机制: Future trait、 pinning、 Context、 Waker 及其相关的RawWaker 和 RawWakerVTable、 Async 和 await 的语法糖背面的机制、不安全的代码、原始指针等等。
但问题是,许多丑恶的东西乃至不是那么重要ーー实际上它们仅仅无聊的样板文件,能够用像 pin-utils、async-task 和 crossbeam 这样的 crate 绕开。
实际上,今日咱们已经在几行安全代码中构建了一个高效的 block_on,而不需求理解大部分的样板文件。 在另一篇博文中,咱们将树立一个真正的履行器..。
原文:stjepang.github.io/2020/01/25/…
翻译:stevenbai.top/rust/build_…