tokio编程:在多任务之间操作同一片数据

你好,我是Mike。今天我们一起来学习如何在tokio的多个任务之间共享同一片数据。

并发任务之间如何共享数据是一个非常重要的课题,在所有语言中都会碰到。不同的语言提供的方案支持不尽相同,比如 Erlang 语言默认只提供消息模型,Golang 也推荐使用 channel 来在并发任务之间进行同步。

Rust语言考虑到其应用领域的广泛性和多样性,提供了多种机制来达到这一目的,需要我们根据不同的场景自行选择最合适的机制。所以相对来说,Rust在这方面要学的知识点要多一些,好处是它在几乎所有场景中都能做到最好。

任务目标

定义一个内存数据库db,在不同的子任务中,并发地向这个内存数据库更新数据。

潜在问题

为了简化问题,我们把 Vec<u32> 当作db。比如这个db中现在有10个数据。

let mut db: Vec<u32> = vec![1,2,3,4,5,6,7,8,9,10];

现在有两个任务 task_a 和 task_b,它们都想更新db里的第5个元素的数据 db[4]。

task_a 想把它更新成 50,task_b 想把它更新成 100。这两个任务之间是没有协同机制的,也就是互相不知道对方的存在,更不知道对方要干嘛。于是就可能出现这样的情况,两个任务几乎同时发起更新请求,假如 task_a 领先一点点时间,先把 db[4] 更新成 50 了,但是它得校验一下更新正确了没有,所以它得发起一个索引请求,把 db[4] 的数据取出来看看是不是 50。

但是在task_a去检查 db[4] 的值之前极小的一个时间片段里面,task_b 对db[4]更新操作也发生了,于是db[4] 被更新成了 100。然后 task_a 取回值之后,发现值是 100。很奇怪,并且判断自己没有更新成功这个数据,有可能会再更新一次,再次把 db[4] 置为 50。这又可能对 task_b 的校验机制造成干扰。于是整个系统就开始紊乱了。

这就是多个任务操作共享数据可能会发生的问题。下面我们看看在Rust中怎么去解决这个问题。

方案尝试

我们可以先从最简单的思路开始,设计如下方案。

方案一:全局变量

如果你有其他语言编程经验的话,应该很容易就能想到一个方案,就是利用全局变量来实现多个任务的数据共享。假如我们有一个全局变量 DB,为 Vec<u32> 类型,每个任务都可以访问到这个DB,并可以向里面push数据。

我们先来试试直接在main函数里对这个DB全局变量进行操作。

static DB: Vec<u32> = Vec::new();

fn main() {
    DB.push(10);
}

发现不行,编译会报错:

error[E0596]: cannot borrow immutable static item `DB` as mutable
 --> src/main.rs:4:5
  |
4 |     DB.push(10);
  |     ^^^^^^^^^^^ cannot borrow as mutable

可能是没加 mut?加上试一试。

static mut DB: Vec<u32> = Vec::new();

fn main() {
    DB.push(10);
}

还是出错:

error[E0133]: use of mutable static is unsafe and requires unsafe function or block
 --> src/main.rs:4:5
  |
4 |     DB.push(10);
  |     ^^^^^^^^^^^ use of mutable static
  |
  = note: mutable statics can be mutated by multiple threads: aliasing violations or data races will cause undefined behavior

Rust编译器报错说,要使用可变的静态全局变量是不安全的,需要用到unsafe功能。unsafe功能我们还没讲,而且不是这节课的重点,所以这里不展开。总之, Rust不推荐我们使用全局(静态)变量。因为全局变量不是一个好的方案,特别是对于多任务并发程序来说,应该尽可能避免。

那既然全局变量不能用了,有一个可行的选择是,在 main 函数中创建一个对象实例,把这个实例传给各个任务。因为这个实例是在main函数中创建的,它的生命期跟main函数一样长,所以也相当于全局变量了。

方案二:main函数中的对象

稍微改一下上面的代码,这下可以了。

fn main() {
    let mut db: Vec<u32> = vec![1,2,3,4,5,6,7,8,9,10];
    db[4] = 50;
}

下面我们尝试在main函数中创建一个tokio任务。

#[tokio::main]
async fn main() {
    let mut db: Vec<u32> = vec![1,2,3,4,5,6,7,8,9,10];

    let task_a = tokio::task::spawn(async {
        db[4] = 50;
    });
    _ = task_a.await.unwrap();

    println!("{:?}", db);
}

这是一段稀松平常的代码,目的就是起一个task,更新一下Vec里的元素,然后等待这个任务结束,打印这个Vec的值。但是,在Rust中,这段代码无法通过,Rust编译器会报错。

error[E0373]: async block may outlive the current function, but it borrows `db`, which is owned by the current function
 --> src/main.rs:5:37
  |
5 |       let task_a = tokio::task::spawn(async {
  |  _____________________________________^
6 | |         db[4] = 50;
  | |         -- `db` is borrowed here
7 | |     });
  | |_____^ may outlive borrowed value `db`
  |
  = note: async blocks are not executed immediately and must either take a reference or ownership of outside variables they use
help: to force the async block to take ownership of `db` (and any other referenced variables), use the `move` keyword
  |
5 |     let task_a = tokio::task::spawn(async move {
  |                                           ++++

我们来分析一下这个错误提示,错误提示第1行说,任何异步块都有可能超出当前函数的生存期,但是它里面依赖的db是在当前函数定义的,因此在异步块执行的时候,db指向的对象有可能会消失,从而出错。你可能已经猜到了,错误原因跟所有权相关。原因是这个task_a运行的生存时间段,有可能超过main task生存的时间段,所以task_a里的async块中直接借用main函数中的局部变量db会有所有权相关风险。

不过这里还有一点疑问,我们不是在main中手动 .await 了吗?会等待这个子task的返回结果,但是Rust并没有分析到这一点。它可能会觉得你在 task_a.await 这一句之前其实是有机会将db给弄消失的,比如手动 drop() 掉这个db,或者说调用了什么别的函数,把db的所有权在那里给消耗了。

它在错误信息第12行建议我们在 async 后加 move 修饰符,这样指明强制将 db 的所有权移动进task_里去。我们按照建议修改一下。

#[tokio::main]
async fn main() {
    let mut db: Vec<u32> = vec![1,2,3,4,5,6,7,8,9,10];

    let task_a = tokio::task::spawn(async move {
        db[4] = 50;
    });
    _ = task_a.await.unwrap();

    println!("{:?}", db);
}

仍然编译出错,报错信息换了。

error[E0382]: borrow of moved value: `db`
  --> src/main.rs:10:22
   |
3  |       let mut db: Vec<u32> = vec![1,2,3,4,5,6,7,8,9,10];
   |           ------ move occurs because `db` has type `Vec<u32>`, which does not implement the `Copy` trait
4  |
5  |       let task_a = tokio::task::spawn(async move {
   |  _____________________________________-
6  | |         db[4] = 50;
   | |         -- variable moved due to use in generator
7  | |     });
   | |_____- value moved here
...
10 |       println!("{:?}", db);
   |                        ^^ value borrowed here after move

它说,db已经被移动到了task_a里了,所以在main函数中访问不到。这种严苛性,对于从其他语言过来的新手来说,确实令人崩溃。不过反过来想想Rust确实把细节抠得很死,这也是它比其他语言安全的原因。

那么我们就听小助手的话,不在main函数中打印了。这样确实能编译通过。

#[tokio::main]
async fn main() {
    let mut db: Vec<u32> = vec![1,2,3,4,5,6,7,8,9,10];

    let task_a = tokio::task::spawn(async move {
        db[4] = 50;
    });
    _ = task_a.await.unwrap();
}

第一步走通了,下一步我们要测试多任务并发的情况,所以我们需要再增加一个任务。

use tokio::task;

#[tokio::main]
async fn main() {
    let mut db: Vec<u32> = vec![1,2,3,4,5,6,7,8,9,10];

    let task_a = task::spawn(async move {
        db[4] = 50;
    });
    let task_b = task::spawn(async move {
        db[4] = 100;
    });
    _ = task_a.await.unwrap();
    _ = task_b.await.unwrap();
}

这种写法明显会有问题,我们甚至不需要编译就可以知道,因为出现了两次 async move 块。如果你是一步步学过来的话,应该知道,db在第一个async move时已经被移动进 task_a 了,后面不可能再移动进 task_b。

编译验证后,确实如此。

error[E0382]: use of moved value: `db`
  --> src/main.rs:10:30
   |
5  |       let mut db: Vec<u32> = vec![1,2,3,4,5,6,7,8,9,10];
   |           ------ move occurs because `db` has type `Vec<u32>`, which does not implement the `Copy` trait
6  |
7  |       let task_a = task::spawn(async move {
   |  ______________________________-
8  | |         db[4] = 50;
   | |         -- variable moved due to use in generator
9  | |     });
   | |_____- value moved here
10 |       let task_b = task::spawn(async move {
   |  ______________________________^
11 | |         db[4] = 100;
   | |         -- use occurs due to use in generator
12 | |     });
   | |_____^ value used here after move

那应该怎么办呢?

方案三:利用 Arc

回想一下 第 12 讲 我们讲到过的Arc这个智能指针,它可以让多个持有者共享对同一资源的所有权。但是Arc也有一个巨大的限制,就是它无法修改被包裹的值。但不管怎样,我们还是碰碰运气,改动一下。

use std::sync::Arc;

#[tokio::main]
async fn main() {
    let mut db: Vec<u32> = vec![1,2,3,4,5,6,7,8,9,10];
    let arc_db = Arc::new(db);
    let arc_db2 = arc_db.clone();

    let task_a = tokio::task::spawn(async move {
        arc_db[4] = 50;
    });
    let task_b = tokio::task::spawn(async move {
        arc_db2[4] = 100;
    });
    _ = task_a.await.unwrap();
    _ = task_b.await.unwrap();

    // println!("{:?}", db);
}

不出所料,Rust编译不通过,这说明通过 Arc<T> 没办法修改里面的值。

error[E0596]: cannot borrow data in an `Arc` as mutable
  --> src/main.rs:10:9
   |
10 |         arc_db[4] = 50;
   |         ^^^^^^ cannot borrow as mutable
   |
   = help: trait `DerefMut` is required to modify through a dereference, but it is not implemented for `Arc<Vec<u32>>`

error[E0596]: cannot borrow data in an `Arc` as mutable
  --> src/main.rs:13:9
   |
13 |         arc_db2[4] = 100;
   |         ^^^^^^^ cannot borrow as mutable
   |
   = help: trait `DerefMut` is required to modify through a dereference, but it is not implemented for `Arc<Vec<u32>>`

虽然现在我们的代码还是没有编译通过,但是思路是没问题的:要在并发的多个任务中,访问同一个资源,那么必然涉及到多所有权,所以使用Arc是完全没有问题的。现在的问题是, 有没有办法更改被Arc包裹起来的值

答案是有的,利用 Mutex 就可以。

方案四:Arc+Mutex

一个多任务并发编程要修改同一个值,那必然要防止修改冲突,这就不得不用到计算机领域里一个常见的工具——锁。

Mutex是一种互斥锁,被Mutex包裹住的对象,同时只能存在一个reader或一个writer。使用的时候,要先获得Mutex锁,成功后,才能读或写这个锁里面的值。多个任务不能同时获得同一个Mutex锁,当一个任务持有Mutex锁时,其他任务会处于等待状态,直到那个任务用完了Mutex锁,并自动释放了它。

use std::sync::Arc;
use tokio::sync::Mutex;

#[tokio::main]
async fn main() {
    let db: Vec<u32> = vec![1,2,3,4,5,6,7,8,9,10];
    let arc_db = Arc::new(Mutex::new(db));  // 加锁
    let arc_db2 = arc_db.clone();
    let arc_db3 = arc_db.clone();

    let task_a = tokio::task::spawn(async move {
        let mut db = arc_db.lock().await;  // 获取锁
        db[4] = 50;
        assert_eq!(db[4], 50);             // 校验值
    });
    let task_b = tokio::task::spawn(async move {
        let mut db = arc_db2.lock().await;  // 获取锁
        db[4] = 100;
        assert_eq!(db[4], 100);            // 校验值
    });
    _ = task_a.await.unwrap();
    _ = task_b.await.unwrap();

    println!("{:?}", arc_db3.lock().await);  // 获取锁
}
// 输出
[1, 2, 3, 4, 100, 6, 7, 8, 9, 10]

加上Mutex,这个例子就能顺利编译并运行通过了。

这个例子里的第7行,我们使用 Arc::new(Mutex::new()) 组合把db包了两层,外层是Arc,里层是Mutex。然后,我们把arc_db克隆了两次,这种克隆只是增加Arc的引用计数,代价非常低。

然后在每次使用的时候,先通过 arc_db.lock().await 这种方式获得锁,再等待取出锁中对象的引用,这里也就是Vec的引用,然后通过这个引用去更新db的值。

利用Arc与Mutex的组合,我们还顺便解决了在main task不能打印这个db的问题。实际上,在Rust中, Arc<Mutex<T>> 是一对很常见的组合,利用它们的组合技术,基本上可以满足绝大部分的并发编程场景。

有了这两兄弟的加持,我们用Rust写业务代码就变得像Java一样高效、便捷。相对于Go、Python或JavaScript来说,Rust的异步并发编程代码稍微有些繁琐,但是它的模式是非常固定的,最后这个示例里的模式可以无脑使用。正因为如此,Arc、Mutex和clone() 一起,被社区叫做“Rust三板斧”,就是因为它们简单粗暴,方便好用。

到这里为止,我们已经解决了这节课开头提出的问题。

其他锁

除了Mutex,tokio里还提供了一些锁,我们来看一看。

tokio::sync::RwLock

RwLock是读写锁。它和Mutex的区别是,Mutex不论是读还是写,同时只有一个能拿到锁。比如,一个task在读,而另一个task也想读的时候,仍然需要等待第一个task先释放锁。所以在读比较多的情况下,Mutex的运行效率不是太理想。

而RwLock的设计是,当一个任务拿的是读锁时,其他任务也能再拿到读锁,多个读锁之间可以同时存在。当一个任务想拿写锁的时候,必须等待其他所有读锁或写锁释放后才能拿到。

当一个任务拿到了写锁时,其他任务只能等待它完成后才能继续操作,不管其他任务是要写还是读。因此对于写来讲,RwLock是排斥型访问;对于读来讲,RwLock提供了共享访问。这一点与不可变引用和可变引用的关系特别像。

我们来看下面的示例:

use tokio::sync::RwLock;
#[tokio::main]
async fn main() {
    let lock = RwLock::new(5);
    // 多个读锁可以同时存在
    {
        let r1 = lock.read().await;
        let r2 = lock.read().await;
        assert_eq!(*r1, 5);
        assert_eq!(*r2, 5);
    } // 在这一句结束时,两个读锁都释放掉了

    // 同时只能存在一个写锁
    {
        let mut w = lock.write().await;
        *w += 1;
        assert_eq!(*w, 6);
    } // 在这一句结束时,写锁释放掉了
}

可以看到,RwLock的使用非常简单,在读操作比写操作多很多的情况下,RwLock的性能会比Mutex好很多。

Rust标准库中还有一些用于简单类型的原子锁。

std::sync::atomic

如果共享数据的只是一些简单的类型,比如 bool、i32、u8、usize等等,就不需要使用Mutex或RwLock把这些类型包起来,比如像这样 Arc<Mutex<u32>>,可以直接用Rust标准库里提供的原子类型。 std::sync::atomic 这个模块下面提供了很多原子类型,比如AtomicBool、AtomicI8、AtomicI16等等。

Mutex<u32> 对应的原子类型是 std::sync::atomic::AtomicU32

像下面这样使用:

use std::sync::atomic::AtomicU32;

fn main() {
    // 创建
    let atomic_forty_two = AtomicU32::new(42);
    let arc_data = Arc::new(atomic_forty_two);

    let mut some_var = AtomicU32::new(10);
    // 更新
    *some_var.get_mut() = 5;
    assert_eq!(*some_var.get_mut(), 5);
}

其他类型按类似的方式使用就可以了。原子类型之所以会单独提出来,是因为它是锁的基础,其他的锁会建立在这些基础原子类型之上,这些原子类型也可以充分利用硬件提供的关于原子操作的支持,从而提高应用的性能。

注:在多核CPU中,常见的硬件支持原子操作的方法包括CPU中的缓存一致性协议、总线锁定等机制,可以使多个线程或进程同时对同一变量进行原子操作时,不会出现数据竞争和线程同步的问题。

锁(lock)和无锁(lock-free)是计算机科学领域一个非常大的课题,Rust有本书 《Rust Atomics and Locks》 专门讲这个,有兴趣的话你可以看一看。

小结

这节课我们通过一步步验证的方式,学习了在Rust和tokio中如何在多个任务中操作共享数据,经过多次被编译器拒绝的痛苦,最后我们得到了一个相当舒服的方案。这个方案可以供我们以后在做并发编程时使用,使用时的模式非常固定,没有什么心智负担。整个探索过程虽然比较辛苦,但是结果却是比较美好的。这也许就是Rust的学习过程吧,先苦后甜。

在整个探索过程中,我们也能深切体会Rust所有权模型在并发场景下发挥的重要作用。如果你想让程序编译通过,那么必须严格遵守Rust的所有权模型;一旦你在Rust的所有权指导下捣鼓出了并发代码,那么 你的并发代码就一定不会产生由于竞争条件而导致的概率性Bug

如果你有这方面的经验教训,你一定会特别憎恶这种概率性的Bug,因为有可能仅仅是重现Bug现场就要花一个月的时间。同时,你会爱上Rust,因为它从语言层面就帮我们杜绝了这种情况,让我们的线上服务特别稳定,晚上可以安心睡觉了。

思考题

这节课代码里下面这两句的意义是什么,第一行会阻塞第二句吗?

    _ = task_a.await.unwrap();
    _ = task_b.await.unwrap();

希望你可以开动脑筋,认真思考,把你的答案分享到评论区,也欢迎你把这节课的内容分享给其他朋友,邀他们一起学习Rust。好了,我们下节课再见吧!