Rust - Basic - 15 - Concurrency
Concurrency
Comprehension
实现并发/并行,可以使用 消息队列(Message-passing)或者 状态共享(Shared-state)实现
Thread
并发需要注意以下几点
数据竞争
Race conditions, where threads are accessing data or resources in an inconsistent order
死锁
Deadlocks, where two threads are waiting for each other to finish using a resource the other thread has, preventing both threads from continuing
难以复现的 BUG
Bugs that happen only in certain situations and are hard to reproduce and fix reliably
Green Thread 是编程语言级别的 thread,和操作系统提供的 thread 功能不完全相同
use std::thread;
use std::time::Duration;
fn main() {
let handle = thread::spawn(|| { // 注意这里的闭包如果需要入参,需要显式声明 move
for i in 1..10 {
println!("hi number {} from the spawned thread!", i);
thread::sleep(Duration::from_millis(1)); // 此时操作系统有机会切换执行其他线程
}
});
handle.join().unwrap(); // 使用 join 等待所有线程执行结束(否则后续代码不执行)
for i in 1..5 {
println!("hi number {} from the main thread!", i);
thread::sleep(Duration::from_millis(1));
}
}
Message-passing
Channel 基础写法
use std::sync::mpsc; // multiple producer single consumer fn main() { let (tx, rx) = mpsc::channel(); // transmitter, receiver thread::spawn(move || { let val = String::from("hi"); tx.send(val).unwrap(); // 此时 val moved ownership }); let received = rx.recv().unwrap(); // 如果使用 try_recv 则不会阻塞 println!("Got: {}", received); }
Multi producer/transmitter && Receive multi message
use std::sync::mpsc; use std::thread; use std::time::Duration; fn main() { let (tx, rx) = mpsc::channel(); // 因为后续闭包内会 move tx 的 ownership // 所以这里需要提前 clone 出 tx1 实现 multi producer let tx1 = tx.clone(); thread::spawn(move || { let vals = vec![ String::from("hi"), String::from("from"), String::from("the"), String::from("thread"), ]; for val in vals { tx1.send(val).unwrap(); thread::sleep(Duration::from_secs(1)); } }); thread::spawn(move || { let vals = vec![ String::from("more"), String::from("messages"), String::from("for"), String::from("you"), ]; for val in vals { tx.send(val).unwrap(); thread::sleep(Duration::from_secs(1)); } }); // 注意较于上例,使用了 for-in loop for received in rx { println!("Got: {}", received); } }
Shared-state
Mutex(Mutual Exclusion,互斥现象),其中一个表现是:在同一时刻,只允许一个线程访问特定数据
线程必须获取 lock 才能访问 mutex 里的数据。lock 也在 mutex 的一部分,它可以持续跟踪谁对 mutex 里的数据有互斥访问权限。因此,互斥锁被描述为通过锁定系统保护其保持的数据。
基于以下两个规则,Mutexes 很难使用:
在使用数据之前,必须试图获取锁
You must attempt to acquire the lock before using the data.
当 mutex 保护的数据使用完成时,必须释放锁,以让其他线程能获取锁
When you’re done with the data that the mutex guards, you must unlock the data so other threads can acquire the lock.
管理 mutexex 很难做到,所以更多人喜欢用 channels 。不过,Rust 的 类型系统 和 所有权规则 保证了我们不会错误地获取、释放锁
Mutex 基础写法
use std::sync::Mutex; fn main() { let m = Mutex::new(5); { let mut num = m.lock().unwrap(); // 调用 lock 获取锁 *num = 6; } println!("m = {:?}", m); }
Mutex<T>
是一种智能指针;更精确地说,lock
返回LockResult
,此后的unwrap
返回智能指针MutexGuard
;MutexGuard
实现了Deref
trait 以指向内部数据,还实现了Drop
trait 以在MutexGuard
超出作用域时自动释放锁。因此,我们不会冒忘记释放锁和阻塞互斥锁被其他线程使用的风险,因为锁释放是自动发生的。多线程共享 Mutex
Atomic Reference Counted Type,可以理解为带有 原子功能 的
Rc<T>
use std::sync::{Arc, Mutex}; use std::thread; fn main() { let counter = Arc::new(Mutex::new(0)); let mut handles = vec![]; for _ in 0..10 { let counter = Arc::clone(&counter); let handle = thread::spawn(move || { let mut num = counter.lock().unwrap(); *num += 1; }); handles.push(handle); } for handle in handles { handle.join().unwrap(); } println!("Result: {}", *counter.lock().unwrap()); }
Send & Sync trait
实现了 Send
trait 的类型,其 ownership 可以在线程间传递
实现了 Sync
trait 的类型,其可以被多线程访问
可以看出,Sync
包含了 Send
的特性
Origin
…
Atomic Reference Counting with Arc<T>
Fortunately, Arc<T>
is a type like Rc<T>
that is safe to use in concurrent situations. The a stands for atomic, meaning it’s an atomically reference counted type. Atomics are an additional kind of concurrency primitive that we won’t cover in detail here: see the standard library documentation for [std::sync::atomic](https://doc.rust-lang.org/std/sync/atomic/index.html)
for more details. At this point, you just need to know that atomics work like primitive types but are safe to share across threads.
You might then wonder why all primitive types aren’t atomic and why standard library types aren’t implemented to use Arc<T>
by default. The reason is that thread safety comes with a performance penalty that you only want to pay when you really need to. If you’re just performing operations on values within a single thread, your code can run faster if it doesn’t have to enforce the guarantees atomics provide.
Let’s return to our example: Arc<T>
and Rc<T>
have the same API, so we fix our program by changing the use
line, the call to new
, and the call to clone
. The code in Listing 16-15 will finally compile and run:
…