Rust 并发编程-多线程无畏并发

并发程序是指运转多个使命的程序(或看上去是多使命),即两个及以上的使命在重叠的时间跨度内替换运转。这些使命由线程——最小的处理单元履行。在其背面,并不完全是多使命(并行)处理,而是线程之间以普通人无法感知的速度进行上下文快速切换。许多现代应用程序都依赖于这种幻觉,比如服务器能够在处理恳求的一起等候其他恳求。当线程间同享数据时或许会出许多问题,最常见的两种是:竞态条件和死锁。

Rust 的一切权体系以及类型安全体系是一系列处理内存安全以及并发问题的强有力东西。经过一切权以及类型查看,大多数过错发生在编译期,而非运转时过错。因而,你能够在开发时修正代码,而不是在布置到出产环境后修正代码。一旦代码能够编译了,咱们就能够深信这些代码能够正确的运转于多线程环境,而不会呈现其他言语中经常呈现的那些难以追踪的 bug,这便是 Rust 的无畏并发fearless concurrency

多线程模型

多线程编程的危险

在大部分现代操作体系中,履行中的程序代码运转于一个 进程(process)中,操作体系则负责办理多个进程。 在程序内部,也能够具有多个一起运转的独立部分,这些独立部分的功用被称为 线程(threads)。

将程序中的计算拆分进多个线程能够改进功能,由于程序能够一起进行多个使命,不过这也会增加复杂性。由于线程是一起运转的,所以无法预先确保不同线程中的代码的履行次序。这会导致比如此类的问题:

1、竞赛状况(Race conditions),多个线程以不一致的次序拜访数据或资源
2、死锁(Deadlocks),两个线程彼此等候对方停止运用其所具有的资源,这会阻止它们持续运转
3、或许会发生在特定情况且难以安稳重现和修正的 bug

编程言语有一些不同的办法来完成线程。许多操作体系供给了创立新线程的 API。这种由编程言语调用操作体系 API 创立线程的模模型有时被称为 1:1,一个 OS 线程对应一个言语线程。

Rust 规范库只供给了 1:1 线程模型完成。

运用 spawn 创立新线程

use std::thread;
use std::time::Duration;
fn main() {
    // 调用 thread::spawn函数并传递一个闭包,创立一个新线程
    let thread = thread::spawn(|| {
        for i in 1..10 {
            println!("this is thread {}", i);
            thread::sleep(Duration::from_millis(1));
        }
    });
    for k in 1..5 {
        println!("this is main {}", k);
        thread::sleep(Duration::from_millis(1));
    }
}
输出:
this is main 1
this is thread 1
this is main 2
this is thread 2
this is main 3
this is thread 3
this is main 4
this is thread 4
this is thread 5

咱们看到主线程履行了5次循环后退出,一起,新线程尽管创立了10次循环,但也履行了5次就退出了,当主线程完毕时,新线程也会完毕,而不论其是否履行完毕。

假如想让新线程履行完毕再履行主线线程,能够运用JoinHandle


use std::thread;
use std::time::Duration;
fn main() {
    // handler 是一个具有一切权的值
    let handler = thread::spawn(||{  
        for i in 1..10 {
            println!("this is thread {}", i);
            thread::sleep(Duration::from_millis(1));
        }
    });
    for k in 1..5 {
        println!("this is main {}", k);
        thread::sleep(Duration::from_millis(1));
    }
    handler.join().unwrap(); // 堵塞主线程退出,直到新线程履行完毕
}
输出:
this is main 1
this is thread 1
this is main 2
this is thread 2
this is main 3
this is thread 3
this is main 4
this is thread 4
this is thread 5
this is thread 6
this is thread 7
this is thread 8
this is thread 9

thread::spawn 的回来值类型是 JoinHandle。JoinHandle 是一个具有一切权的值,当对其调用 join 办法时,它会等候其线程完毕。

经过调用 handle 的 join 会堵塞当时线程直到 handle 所代表的线程完毕。堵塞(Blocking) 线程意味着阻止该线程履行工作或退出。

线程和 move 闭包

经过运用move闭包来完成,把主线程的变量一切权转移到闭包里

use std::thread;
fn main() {
    let v = vec![2,4,5];
    // move 把变量 v 的一切权转移到闭包里
    let thread = thread::spawn( move || {
        println!("v is {:?}", v);
    });
}
输出:
v is [2, 4, 5]

Rust 将变量 v 的一切权移动到新建线程,这样,咱们在主线程就不能再运用变量 v了(比如把变量 v drop掉),Rust 就能够确保变量v在新线程是安全的。

假如没有运用move关键字,编译会报错:

$ cargo run
error[E0373]: closure may outlive the current function, but it borrows `v`, which is owned by the current function
 --> src/main.rs:6:32
  |
6 |     let handle = thread::spawn(|| {
  |                                ^^ may outlive borrowed value `v`
7 |         println!("Here's a vector: {:?}", v);
  |                                           - `v` is borrowed here
  |
note: function requires argument type to outlive `'static`

Rust 的一切权规则又一次帮助了咱们!

音讯传递

Rust 中一个完成音讯传递并发的首要东西是 通道(channel),一个 Rust 规范库供给了其完成的编程概念。你能够将其想象为一个水流的通道,比如河流或小溪。假如你将比如橡皮鸭或小舟之类的东西放入其间,它们会顺流而下到达下游。

通道有两部分组成,一个发送者(transmitter)和一个接纳者(receiver),当发送者或接纳者任一被丢弃(dropped) 时能够认为通道被封闭(closed)。

经过规范库std::sync::mpsc来完成,mpsc 是 多个出产者,单个消费者(multiple producer, single consumer)的缩写。

注:根据 Channel 读和写的数量,Channel 能够分为:(图片引证自陈天教师《Rust编程第一课》)

Rust 并发编程-多线程无畏并发

在线程之间传递音讯


use std::thread;
use std::sync::mpsc;
fn main() {
    // 由于历史原因,`tx`和`rx`一般作为发送者(transmitter)和接纳者(receiver)的缩写,
    // 所以这便是咱们将用来绑定这两头变量的姓名。
    // 这儿运用了一个`let`语句和形式来解构了此元组;
    let (tx, rx) = mpsc::channel();
    // 运用`move`将`tx`移动到闭包中这样新建线程就具有`tx`了
    thread::spawn(move || {
        // `send`办法回来一个`Result<T, E>`类型
        tx.send("hello").unwrap();
    });
    // recv()这个办法会堵塞主线程履行直到从信道中接纳一个值
    let msg = rx.recv().unwrap();
    println!("message is {}", msg);
}
输出:
message is hello

通道的接纳端有两个有用的办法:recv 和 try_recv。这儿,咱们运用了 recv,它是 receive 的缩写。这个办法会堵塞主线程履行直到从通道中接纳一个值。一旦发送了一个值,recv 会在一个 Result<T, E> 中回来它。当通道发送端封闭,recv 会回来一个过错表明不会再有新的值到来了。

try_recv 不会堵塞,相反它立刻回来一个 Result<T, E>:Ok 值包含可用的信息,而 Err 值代表此刻没有任何音讯。

由于 try_recv 不会堵塞主线程履行,新线程假如没有履行完毕就无法接纳到音讯,会报编译过错:

use std::thread;
use std::sync::mpsc;
fn main() {
    let (tx, rx) = mpsc::channel();
    thread::spawn(move || {
        tx.send("hello").unwrap();
    });
    // try_recv 办法不会堵塞主线程履行,无法接纳到音讯,就会报编译过错
    let msg = rx.try_recv().unwrap();
    println!("message is {}", msg);
}
报错:
thread 'main' panicked at 'called `Result::unwrap()` on an `Err` value: Empty', src/libcore/result.rs:1188:5

发送多个值并观察接纳者的等候

use std::thread;
use std::sync::mpsc;
use std::time::Duration;
fn main() {
    let (tx, rx) = mpsc::channel();
    thread::spawn(move || {
        let vals = vec![
            String::from("hi"),
            String::from("from"),
            String::from("the"),
            String::from("thread"),
        ];
        for val in vals {
            // 线程中每次循环独自的发送每一个字符串
            tx.send(val).unwrap();
            // 线程中每次循环调用`thread::sleep`函数来暂停一秒
            thread::sleep(Duration::from_secs(1));
        }
    });
    // 主线程中的`for`循环里并没有任何暂停或等候的代码,所以能够说主线程是在等候从新建线程中接纳值
    for received in rx { // 不再显式调用 rx.recv() 函数:而是将 rx 当作一个迭代器
        println!("Got: {}", received);
    }
}
如下输出,每一行都会暂停一秒:
Got: hi
Got: from
Got: the
Got: thread

经过克隆发送者来创立多个出产者

  use std::thread;
  use std::sync::mpsc; 
  use std::time::Duration;
  fn main() {
    let (tx, rx) = mpsc::channel();
    // 对发送者调用了`clone`办法克隆一个发送者
    let tx1 = tx.clone();
    thread::spawn(move || {
        let vals = vec![
            String::from("hi"),
            String::from("from"),
            String::from("the"),
            String::from("thread"),
        ];
        for val in vals {
            // 线程1的发送端句柄运用克隆的tx1,向同一接纳者rx发送值
            tx1.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });
    thread::spawn(move || {
        let vals = vec![
            String::from("more"),
            String::from("messages"),
            String::from("for"),
            String::from("you"),
        ];
        for val in vals {
            // 线程2的发送端句柄运用原始的tx,向同一接纳者rx发送值
            tx.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });
    // 两个发送者 tx 和 tx1 发送的值,都由同一个接纳者 rx 接纳
    for received in rx {
        println!("Got: {}", received);
    }
 }
或许会看到这样的输出:(每次会发生不同的输出次序,依赖于你的体系)
Got: hi
Got: more
Got: from
Got: messages
Got: for
Got: the
Got: thread
Got: you

同享状况

同享状况或数据,意味着有多个线程一起拜访相同的内存方位,Rust 经过互斥器(锁),来完成同享内存并发原语。

互斥器一次只允许一个线程拜访数据

互斥器(mutex)是 「mutual exclusion」 的缩写,也便是说,恣意时刻,其只允许一个线程拜访某些数据。为了拜访互斥器中的数据,线程首要需求经过获取互斥器的锁(lock)来表明其希望拜访数据。锁是一个作为互斥器一部分的数据结构,它记录谁有数据的排他拜访权。

运用规范库std::sync::Mutex运用互斥器:

use std::sync::Mutex;
fn main() {
    let m = Mutex::new(5);
    {
        let mut num = m.lock().unwrap();
        *num = 10; // 从头赋值
        println!("num is {}",num);
    }
    println!("m is {:?}",m);
}
输出:
num is 10
m is Mutex { data: 10 }

运用 lock 办法获取锁,以拜访互斥器中的数据。这个调用会堵塞当时线程,直到咱们具有锁为止。

Mutex 是一个智能指针。更准确的说,lock 调用 回来 一个叫做 MutexGuard 的智能指针。这个智能指针完成了Deref来指向其内部数据;其也供给了一个Drop完成当MutexGuard脱离效果域时自动释放锁。

在线程间同享 Mutex

在多线程之间同享信息,存在多个一切者一起具有一切权,能够运用Arc智能指针来寄存Mutex,Arc 是线程安全的,Rc 是非线程安全的,所以,不能是用 Rc ,Rc 被完成为用于单线程场景。

下面是运用 Arc 包装一个 Mutex 能够完成在多线程之间同享一切权的例子:

use std::sync::{Mutex, Arc};
use std::thread;
fn main() {
    let counter = Arc::new(Mutex::new(0));
    let mut handles = vec![];
    for _ in 0..10 {
        // let counter = Rc::clone(&counter) // 这儿运用 Rc 会编译报错,由于 Rc 没有完成 Send 和 Sync trait,不能安全的在线程间传递和同享
        let counter = Arc::clone(&counter); // 将一切权移入线程之前克隆了 Arc 
        let handle = thread::spawn(move || {
            let mut num = counter.lock().unwrap();
            *num += 1;
        });
        handles.push(handle);
    }
    for handle in handles {
        handle.join().unwrap();
    }
    println!("Result: {}", *counter.lock().unwrap());
}
输出:
Result: 10

简略总结下:一般Rc<T>/RefCell<T>结合用于单线程内部可变性,Arc<T>/Mutex<T>结合用于多线程内部可变性。

Mutex 也有造成 死锁(deadlock) 的危险。这发生于当一个操作需求锁住两个资源而两个线程各持一个锁,这会造成它们永久彼此等候。

基于 Send 和 Sync 的线程安全

Rust 中与并发相关的内容都属于规范库,而不是言语自身的内容,可是有两个并发概念是内嵌于言语中的:std::marker中的SyncSendtrait。

Send 和 Sync 效果

SendSync是 Rust 安全并发的重中之重,可是实际上它们仅仅符号特征(marker trait,该类型 trait 未定义任何行为,因而非常适合用于符号), 来看看它们的效果:

  • 完成Send的类型能够在线程间安全的传递其一切权
  • 完成Sync的类型能够在线程间安全的同享(经过引证,当且仅当&TSend时,TSync的)

由上可知,若类型 T 的引证&TSend,则TSync

完成SendSync的类型

在 Rust 中,简直一切类型都默许完成了SendSync,意味着一个复合类型(例如结构体), 只需它内部的一切成员都完成了Send或许Sync,那么它就自动完成了SendSync的。,只需有一个成员不是SendSync,该复合类型就不是SendSync的。

简略总结下:

  1. 完成Send的类型能够在线程间安全的传递其一切权, 完成Sync的类型能够在线程间安全的同享(经过引证)
  2. 绝大部分类型都完成了SendSync,常见的未完成的有:裸指针、CellRefCellRc
  3. 能够为自定义类型完成SendSync,可是需求unsafe代码块,有必要小心保护。
  4. 能够为部分 Rust 中的类型完成SendSync,可是需求运用newtype

注:CellRefCell没完成Sync(由于UnsafeCell不是Sync),Rc两者都没完成(由于内部的引证计数器不是线程安全的),裸指针两者都没完成(由于它自身就没有任何安全确保)

注:手动完成SendSync是不安全(unsafe)的,一般并不需求手动完成 Send 和 Sync trait,完成者需求运用unsafe小心保护并发安全确保。

总结

Rust 一起供给了 async/await多线程两种并发模型,要熟练运用多线程并发模型首要要把握 Rust 的线程等相关常识,比如线程创立,线程同步,线程安全等。线程间音讯传递的通道 channel,线程间同享状况的智能指针 Mutex 和 Arc。类型体系和借用查看器会确保这些场景中的代码,不会呈现数据竞赛和无效的引证。一旦代码能够编译了,咱们就能够深信这些代码能够正确的运转于多线程环境,而不会呈现其他言语中经常呈现的那些难以追踪的 bug。SyncSendtrait 的为多线程中数据传递和同享供给安全确保。

  1. 线程模型:多线程并发编程中,需求处理竞态条件,死锁以及一些难以安稳重现和修正的 bug等问题。
  2. 音讯传递(Message passing)并发,其间通道(channel)被用来在线程间传递音讯。
  3. 同享状况(Shared state)并发,结合运用 Mutex 和 Arc,能够让多个线程拜访同一片数据。
  4. 线程安全:Sync 和 Send trait,能够为多线程中传递或同享的数据供给安全保障。

参考

  • kaisery.github.io/trpl-zh-cn/…
  • course.rs/advance/con…
  • nomicon.purewhite.io/send-and-sy…
  • blog.rust-lang.org/2015/04/10/…
  • huonw.github.io/blog/2015/0…
  • www.zhihu.com/question/30…
  • zhuanlan.zhihu.com/p/266562042