Rust多线程编程实例分析
第一个例子是共享通道,实现多个线程往一个通道中写数据的效果。这里是4个线程,每个线程往通道写10条数据,接收端接收全部数据的例子。
分析1.1处,因为线程需要取得通道发送端tx的所有权,而不能是引用,此处需要使用into_iter()函数,而不能使用iter(),这样线程闭包拿到的是有所有权tx,并在循环结束后tx生命周期结束。
分析1.2处,接收端为什么刚好收到4*10=40条数据?接收端rx如何判断发送端已经全部发送完毕? for received in rx 会持续从通道中接收数据,直到通道被关闭,通道关闭的条件是:所有关联的mpsc::Sender实例被销毁(Drop)。本例中,各tx的存续范围就是1.1处标记的大括号的范围,退出后,tx自动销毁。因此,4个tx在做完自己的10次循环后自动Drop,rx也就知道通道结束因此停止接收消息。
const THREADS_NUM: u32 = 4;
pub fn shared_channel() {
let (tx, rx) = mpsc::channel();
let mut txes = (1..THREADS_NUM)
.map(|_| tx.clone())
.collect::<Vec<_>>();
txes.push(tx);
let mut threads = vec![];
for (i, tx) in txes.into_iter().enumerate() { // 1.1
let handle = thread::spawn(move || {
for j in 0..10 {
tx.send(format!("From thread-{} number {}", i, j)).unwrap();
let sleeptime = rand::thread_rng().gen_range(50..500);
thread::sleep(Duration::from_millis(sleeptime));
}
});
threads.push(handle);
}
for received in rx { // 1.2
println!("{received}");
}
for t in threads {
t.join().unwrap();
}
}
第二个例子通过共享数据来保持同步。例子中有4个并发线程,每个线程不间断生成1到1000之间的随机数。如果某个线程恰好生成随机数666,那么所有线程都停止工作。并且例子中没有使用互斥锁而是使用原子操作+内存排序实现高效同步。
分析2.1处,此处使用原子类型的bool,原子类型是线程安全的,使用CPU指令级别的原子操作,适合单一类型的跨线程,比如本例中的bool类型;并且AtomicBool::new(bool)比Mutex::new(bool)效率高。但如果类型比较复杂,则仍需使用互斥锁Mutex::new(xyz)。
分析2.2处,Rust多线程禁止在不同线程之间共享所有权,因此需要使用引用计数,确保安全释放。此处对共享数据的引用做clone,引用数+1,离开作用域时引用数-1,使用Arc而不用Rc的原因就是多线程环境引用数的操作必须原子化的。
分析2.3处以及2.4处,现代处理器会重排指令以优化性能,2.3处Ordering::Acquire确保在load之后的操作不会被重排到load之前,2.4处是在操作成功或失败时内存排序保证。
flag_clone.compare_exchange(
false, true,
Ordering::AcqRel, // 成功时的内存排序
Ordering::Relaxed // 失败时的内存排序
)
因此上面这小段代码的含义是,在调用compare_exchange时候,我期待当前的值是false,我把它改成true,使用Ordering::AcqRel,确保数据一致性,这是Success场景,对应match的Ok分支。还有一种情况,我期待当前值是false,结果它是true,这个说明它被其他线程改掉了,这是失败场景,使用Ordering::Relaxed, 因为它什么也不用做,使用仅原子性就可以。
详细说明内存排序类型和性能等的关系:
排序类型 | 性能 | 保证强度 | 适用场景 |
---|---|---|---|
Relaxed | 最快 | 仅原子性 | 独立状态标志 |
Acquire | 中等 | 获取屏障 | 读取后访问共享数据 |
Release | 中等 | 释放屏障 | 写入后设置标志 |
AcqRel | 较慢 | 获取+释放 | 读-改-写操作 |
SeqCst | 最慢 | 全序一致 | 需要严格顺序的场景 |
pub fn shared_var() {
let stop_flag = Arc::new(AtomicBool::new(false)); // 2.1
let mut handles = vec![];
for thread_id in 0..THREADS_NUM {
let flag_clone = Arc::clone(&stop_flag); // 2.2
let handle = thread::spawn(move || {
let mut rng = rand::thread_rng();
loop {
if flag_clone.load(Ordering::Acquire) { // 2.3
break;
}
let num = rng.gen_range(1..=1000);
println!("Thread {} generated number: {}", thread_id, num);
if num == 666 {
match flag_clone.compare_exchange(false, true, // 2.4
Ordering::AcqRel, Ordering::Relaxed) {
Ok(_) => {
println!("Thread {} generated 666. Terminating all threads.", thread_id);
break;
}
Err(_) => break,
}
}
thread::sleep(Duration::from_millis(50));
}
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
}
第三个例子是多个线程并发读写一个u64;每个线程循环10次,每次生成一个1到10000之间的随机数,如果新数小于共享值,则更新该共享值。使用RwLock来实现读写控制,确保在读写操作时的同步情况。
分析3.1处,这里使用代码块,对数据加读锁后确定need_update的值立刻释放锁,避免加锁太长时间,也避免自己等待自己(自己加读锁,自己又尝试获取写锁)。另外这种写法也颇有scala风格。
分析3.2处,此时在确定需要写之后,获取写锁,并再次判断共享值是否被改变。这个判断是必需的,在其他类似场景中也有,比如Java单例模式中的再次判断。
读写锁并发加锁逻辑如下:
资源锁情况 | 尝试加读锁 | 尝试加写锁 |
---|---|---|
无锁状态 | 进入 | 进入 |
有读锁 | 进入 | 等待 |
有写锁 | 等待 | 等待 |
也就是常说的可以并发读。
pub fn rwlock() {
let result = Arc::new(RwLock::new(u64::MAX));
let mut handles = vec![];
for thread_id in 0..THREADS_NUM {
let result_clone = Arc::clone(&result);
let handle = thread::spawn(move || {
let mut rng = rand::thread_rng();
for _ in 0..10 {
let n = rng.gen_range(1..=10000);
let need_update = { // 3.1
let read_guard = result_clone.read().unwrap();
println!("Thread {} read value: {} and generated value: {}", thread_id, *read_guard, n);
n < *read_guard
};
if need_update {
let mut write_guard = result_clone.write().unwrap();
if n < *write_guard { // 3.2
*write_guard = n;
}
}
thread::sleep(Duration::from_millis(500));
}
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
let final_result = result.read().unwrap();
println!("Final result: {}", *final_result);
}
另外,观察下面的运行日志,第6行已经产生了更小的数字,但因为可以并发地读,Thread 2还没来得及写,所以第7/8行还是读到了之前的数字。
1 Thread 2 read value: 18446744073709551615 and generated value: 1873
2 Thread 0 read value: 18446744073709551615 and generated value: 8170
3 Thread 1 read value: 18446744073709551615 and generated value: 8528
4 Thread 3 read value: 18446744073709551615 and generated value: 2977
5 Thread 0 read value: 1873 and generated value: 8254
6 Thread 2 read value: 1873 and generated value: 148
7 Thread 1 read value: 1873 and generated value: 3472
8 Thread 3 read value: 1873 and generated value: 7293
9 Thread 3 read value: 148 and generated value: 8101
10 Thread 0 read value: 148 and generated value: 630
11 Thread 1 read value: 148 and generated value: 9032
12 Thread 2 read value: 148 and generated value: 6230
...
第四个例子是通道多接收端。从设计上,std::sync::mpsc就是多发送端+单接收端,比如在第一个例子中你可以clone sender但是不能clone receiver,但可以使用一部分技巧来实现多接收端。下面先看整体程序。
use std::sync::{Arc, Mutex};
use std::sync::mpsc::{channel, Sender, Receiver};
use std::thread;
#[derive(Clone)]
pub struct SharedReceiver<T>(Arc<Mutex<Receiver<T>>>); // 4.1
impl<T> Iterator for SharedReceiver<T> {
type Item = T;
fn next(&mut self) -> Option<T> {
let guard = self.0.lock().unwrap(); // 4.2
guard.recv().ok() // 4.3
}
}
pub fn shared_channel<T>() -> (Sender<T>, SharedReceiver<T>) {
let (sender, receiver) = channel();
(sender, SharedReceiver(Arc::new(Mutex::new(receiver))))
}
pub fn test_shared_channel() {
let (sender, receiver) = shared_channel::<String>();
let mut threads = vec![];
let sender_thread = thread::spawn(move || {
for i in 0..20 {
sender.send(format!("number {}", i)).unwrap();
}
});
threads.push(sender_thread);
for i in 0..3 {
let mut rx_iter = receiver.clone(); // 4.4
let rec_thread = thread::spawn(move ||{
let mut count = 0;
while let Some(msg) = rx_iter.next() {
println!{"Thread {} received: {}", i, msg};
count += 1;
}
println!(">> Thread {} total received: {} <<", i, count);
}) ;
threads.push(rec_thread);
}
for t in threads {
t.join().unwrap();
}
}
详细分析这个代码。4.1处,就是对接收端的包装,Mutex互斥锁保证唯一访问,Arc原子引用计数保证多线程间的引用计数和回收。另外,这种struct中写元组的方式是下面写法的简化:
pub struct SharedReceiver<T>(Arc<Mutex<Receiver<T>>>);
// 等价于
pub struct SharedReceiver<T> {
0: Arc<Mutex<Receiver<T>>>
}
因此4.2处 self.0拿到的就是带锁recevier。
4.3处,ok()方法是把Result转为Option,因为guard.recv()返回的是Result而Iterator的next()要求返回Option.
4.4处clone,因为线程要求所有权,索引需要把receiver做clone,又因为它是Arc,所以本质就是把引用计数加了1. 另外需要注意是加mut,因为next方法需要&mut self。
所以,总体功能,1个sender线程发送20条消息到channel,而3个receiver线程处理这20条消息。因为系统每次对线程调度不同,所以每次结果也不尽相同。下面是某次运行结果:
Thread 0 received: number 0
Thread 0 received: number 2
Thread 2 received: number 3
Thread 2 received: number 5
Thread 2 received: number 6
Thread 2 received: number 7
Thread 2 received: number 8
Thread 1 received: number 1
Thread 1 received: number 10
Thread 1 received: number 11
Thread 0 received: number 4
Thread 0 received: number 13
Thread 0 received: number 14
Thread 0 received: number 15
Thread 0 received: number 16
Thread 2 received: number 9
Thread 1 received: number 12
Thread 1 received: number 19
Thread 2 received: number 18
>> Thread 1 total received: 5 <<
Thread 0 received: number 17
>> Thread 0 total received: 8 <<
>> Thread 2 total received: 7 <<