Rust - Basic - 15 - Concurrency

Concurrency

Comprehension

实现并发/并行,可以使用 消息队列(Message-passing)或者 状态共享(Shared-state)实现

Thread

并发需要注意以下几点

  • 数据竞争

    Race conditions, where threads are accessing data or resources in an inconsistent order

  • 死锁

    Deadlocks, where two threads are waiting for each other to finish using a resource the other thread has, preventing both threads from continuing

  • 难以复现的 BUG

    Bugs that happen only in certain situations and are hard to reproduce and fix reliably

Green Thread 是编程语言级别的 thread,和操作系统提供的 thread 功能不完全相同

use std::thread;
use std::time::Duration;

fn main() {
    let handle = thread::spawn(|| {  // 注意这里的闭包如果需要入参,需要显式声明 move
        for i in 1..10 {
            println!("hi number {} from the spawned thread!", i);
            thread::sleep(Duration::from_millis(1));  // 此时操作系统有机会切换执行其他线程
        }
    });

    handle.join().unwrap();  // 使用 join 等待所有线程执行结束(否则后续代码不执行)

    for i in 1..5 {
        println!("hi number {} from the main thread!", i);
        thread::sleep(Duration::from_millis(1));
    }
}

Message-passing

  • Channel 基础写法

    use std::sync::mpsc;  // multiple producer single consumer
    
    fn main() {
        let (tx, rx) = mpsc::channel();  // transmitter, receiver
    
    		thread::spawn(move || {
            let val = String::from("hi");
            tx.send(val).unwrap();  // 此时 val moved ownership
        });
    
    		let received = rx.recv().unwrap();  // 如果使用 try_recv 则不会阻塞
        println!("Got: {}", received);
    }
    
  • Multi producer/transmitter && Receive multi message

    use std::sync::mpsc;
    use std::thread;
    use std::time::Duration;
    
    fn main() {
        let (tx, rx) = mpsc::channel();
    
    		// 因为后续闭包内会 move tx 的 ownership
        // 所以这里需要提前 clone 出 tx1 实现 multi producer
    		let tx1 = tx.clone();
    		thread::spawn(move || {
            let vals = vec![
                String::from("hi"),
                String::from("from"),
                String::from("the"),
                String::from("thread"),
            ];
    
            for val in vals {
                tx1.send(val).unwrap();
                thread::sleep(Duration::from_secs(1));
            }
        });
    
        thread::spawn(move || {
            let vals = vec![
                String::from("more"),
                String::from("messages"),
                String::from("for"),
                String::from("you"),
            ];
    
            for val in vals {
                tx.send(val).unwrap();
                thread::sleep(Duration::from_secs(1));
            }
        });
    
    		// 注意较于上例,使用了 for-in loop
        for received in rx {  
            println!("Got: {}", received);
        }
    }
    

Shared-state

Mutex(Mutual Exclusion,互斥现象),其中一个表现是:在同一时刻,只允许一个线程访问特定数据

线程必须获取 lock 才能访问 mutex 里的数据。lock 也在 mutex 的一部分,它可以持续跟踪谁对 mutex 里的数据有互斥访问权限。因此,互斥锁被描述为通过锁定系统保护其保持的数据。

基于以下两个规则,Mutexes 很难使用:

  • 在使用数据之前,必须试图获取锁

    You must attempt to acquire the lock before using the data.

  • 当 mutex 保护的数据使用完成时,必须释放锁,以让其他线程能获取锁

    When you’re done with the data that the mutex guards, you must unlock the data so other threads can acquire the lock.

管理 mutexex 很难做到,所以更多人喜欢用 channels 。不过,Rust 的 类型系统 和 所有权规则 保证了我们不会错误地获取、释放锁

  • Mutex 基础写法

    use std::sync::Mutex;
    
    fn main() {
        let m = Mutex::new(5);
    
        {
            let mut num = m.lock().unwrap();  // 调用 lock 获取锁
            *num = 6;
        }
    
        println!("m = {:?}", m);
    }
    

    Mutex<T> 是一种智能指针;更精确地说,lock 返回 LockResult,此后的 unwrap 返回智能指针 MutexGuardMutexGuard 实现了 Deref trait 以指向内部数据,还实现了 Drop trait 以在 MutexGuard 超出作用域时自动释放锁。因此,我们不会冒忘记释放锁和阻塞互斥锁被其他线程使用的风险,因为锁释放是自动发生的。

  • 多线程共享 Mutex

    Atomic Reference Counted Type,可以理解为带有 原子功能 的 Rc<T>

    use std::sync::{Arc, Mutex};
    use std::thread;
    
    fn main() {
        let counter = Arc::new(Mutex::new(0));
        let mut handles = vec![];
    
        for _ in 0..10 {
            let counter = Arc::clone(&counter);
            let handle = thread::spawn(move || {
                let mut num = counter.lock().unwrap();
                *num += 1;
            });
            handles.push(handle);
        }
    
        for handle in handles {
            handle.join().unwrap();
        }
    
        println!("Result: {}", *counter.lock().unwrap());
    }
    

Send & Sync trait

实现了 Send trait 的类型,其 ownership 可以在线程间传递

实现了 Sync trait 的类型,其可以被多线程访问

可以看出,Sync 包含了 Send 的特性

Origin

https://doc.rust-lang.org/book/ch16-00-concurrency.html

Atomic Reference Counting with Arc<T>

Fortunately, Arc<T> is a type like Rc<T> that is safe to use in concurrent situations. The a stands for atomic, meaning it’s an atomically reference counted type. Atomics are an additional kind of concurrency primitive that we won’t cover in detail here: see the standard library documentation for [std::sync::atomic](https://doc.rust-lang.org/std/sync/atomic/index.html) for more details. At this point, you just need to know that atomics work like primitive types but are safe to share across threads.

You might then wonder why all primitive types aren’t atomic and why standard library types aren’t implemented to use Arc<T> by default. The reason is that thread safety comes with a performance penalty that you only want to pay when you really need to. If you’re just performing operations on values within a single thread, your code can run faster if it doesn’t have to enforce the guarantees atomics provide.

Let’s return to our example: Arc<T> and Rc<T> have the same API, so we fix our program by changing the use line, the call to new, and the call to clone. The code in Listing 16-15 will finally compile and run: