下一站 - Ihcblog!

远方的风景与脚下的路 | 子站点:ihc.im

0%

Rust Runtime 设计与实现-组件篇

本系列文章主要介绍如何设计和实现一个基于 io-uring 的 Thread-per-core 模型的 Runtime。

我们的 Runtime 最终产品 Monoio 现已开源,你可以在 github.com/bytedance/monoio 找到它。

  1. Rust Runtime 设计与实现-科普篇
  2. Rust Runtime 设计与实现-设计篇-Part1
  3. Rust Runtime 设计与实现-设计篇-Part2
  4. Rust Runtime 设计与实现-组件篇

本文是系列的第四篇,前面该讲的设计基本讲完了,这里主要说说 channel 等组件。

Channel in Go and in Rust

在我们熟悉的 Golang 中,所有 Task 一定是 goroutine,通信基本是靠 channel 的。但是在 Rust 中,Future 和 Task 不一定是一个东西,多个 Future 可以通过 select 等嵌套,然后其本质上还是单个 Task,性能上也相比 Golang 的模型要好。

但是即便是如此,在 Rust 中我们还是有很强的使用 channel 的需求。在 thread-per-core 模型下,我们可以使用仅本线程的 channel 来提升性能。

这部分的内容已经开源为 local-sync crate,它并不依赖我们的 Runtime,你可以在合适的场景独立使用它。

https://github.com/monoio-rs/local-sync

MPSC Channel

MPSC = Multiple Producer Single Consumer。

我们的 mpsc channel 需要提供两种模式,一个是 bounded 一个是 unbounded。

在 bounded 模式下,我们的 send 可能会 await,因为可能容量不够写入,需要等消费。

在 unbounded 模式下,send 不需要等待。因为不限制存储空间,所以即便消费端卡住,我们在空间不够时也可以随意地开辟新的空间来存储。

当然,因为 channel 里可能没有数据,所以两种模式下的 recv 都是异步接口。

最终实现效果:

1
2
3
4
5
6
7
8
9
10
#[monoio::test]
async fn tets_unbounded_channel() {
let (tx, mut rx) = channel();
tx.send(1).unwrap();
assert_eq!(rx.recv().await.unwrap(), 1);


drop(tx);
assert_eq!(rx.recv().await, None);
}

存储数据结构

我们需要先设计底层存储数据结构。为了能够低成本地扩容,并且不产生大量内存申请和释放,我们设计出了两种方案可以选择。

链表 + 固定大小 Block

storage1

这种设计下每个 block 大小一致,block 之间使用单向链表维护。

读数据时从 head 的 begin 位置读,写数据时向 tail 的 end 位置写。当一块数据被度完时,它会被插入到 tail 之后继而可以循环使用,避免重复释放和分配。
这样当缓存数据量增多时会付出 数据峰值 / BLOCK_SIZE 次的内存分配。

一个额外的好处是,我们可以全局缓存 Block 继而进一步优化性能。

链表 + 指数扩大 Block

storage1

这个方案也是使用链表维护 blocks,理解起来可以认为平滑扩容,即扩容时仅写入新的 ring,旧 ring 在消费完时再释放。

每次 ring 扩容即可扩容为原有的两倍大小,这样可以在峰值流量下只需要付出 log(数据峰值) 次内存分配。

结合实现复杂度,我们最后决定采用方案1:链表 + 固定大小 Block。Tokio 的 mpsc 也是这种结构,不过它这么做更多的是为了无锁并发考虑。

多 Block 通过单向链表组成 Queue,Queue 提供 push 和 pop 能力(无大小限制)。

Queue 在 push 和 pop 时会保证指针指向内容的合法性,在适当的时机会移动 block 指针。

等待机制

我们的等待需要两种:

  1. Bounded 实现下,发送者需要等待空槽
  2. 接收者需要等待数据

第二种等待非常易与实现:因为 mpsc 本身就只有一个 consumer,当它需要等待数据时,只需要将自己对应 waker 设置在 channel 中,当有数据写入时,会额外检查这个 waker 并 wake 它。

第一种等待的实现略复杂,复杂度在于可以有一堆等待者。并且为了避免饥饿,还需要先到先得地唤醒。那么显然这个结构可以直接利用 VecDeque 或链表存储。当等待者被 drop 的时候,还需要将自己摘掉,这样一来就只有类似 Slab 或者链表可以使用了。

我们这里参考 Tokio 的做法,使用链表存储。将这个结构抽象为 Semaphore,既可以满足 mpsc 之用,又可以单独暴露给用户使用,还可以拿来实现其他同步结构。

分层抽象

layers

前面有了 Block,我们在 Block 的基础上构建了与异步无关的同步存储 Queue,它是无限容量的。

前面也提到有两种等待,但发送方的等待只有在 bounded 的情况下使用。

所以我们在 Queue 的基础上抽象出一个底层结构 Chan,提供大家的共用的逻辑,只实现第二种等待:

  1. 暴露 recv 接口,允许等待。
  2. 暴露同步 send 接口,上层调用者需要自己维护容量限制和 sender 等待。

Chan 的基础上我们要封装出 bounded 和 unbounded 两种实现。unbounded 下这个约等于原样转发,因为我们的 Chan 本身就可以理解为一个 unbounded channel。

对于 bounded channel,我们需要实现容量限制和发送方的唤醒机制。这部分就要引入前一小节提到的 semaphore。我们利用 semaphore 管理和等待空槽数量(同时还顺便拿来管理接收者状态)。

所以我们在 Chan 层面集成 semaphore。由于上层需求不同,所以这里 Semaphore 其实是个 trait。对于 bounded channel,我们直接使用 Semaphore 实现;对于 unbounded,我们可以实现一个空的 Semaphore。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
pub trait Semaphore {
fn add_permits(&self, n: usize);
fn close(&self);
fn is_closed(&self) -> bool;
}


impl Semaphore for crate::semaphore::Inner {
fn add_permits(&self, n: usize) {
self.release(n);
}


fn close(&self) {
crate::semaphore::Inner::close(self);
}


fn is_closed(&self) -> bool {
crate::semaphore::Inner::is_closed(self)
}
}


pub struct Unlimited {
closed: UnsafeCell<bool>,
}


impl Unlimited {
pub fn new() -> Self {
Self{
closed: UnsafeCell::new(false),
}
}
}


impl Semaphore for Unlimited {
fn add_permits(&self, _: usize) {}


fn close(&self) {
unsafe {
*self.closed.get() = true;
}
}


fn is_closed(&self) -> bool {
unsafe { *self.closed.get() }
}
}

Oneshot Channel

TODO

Once Cell

TODO

Semaphore

TODO

Q & A

Q:都 thread local 了为啥要搞同步能力?

A:并发 != 并行,所以即便是单线程,也会同时执行多个 task,而 task 之间需要通信,有依赖关系,所以需要一个组件来支持它们异步等待。

Q:为啥不用开源库?

A:目前并没有基本能用的同类开源组件。目前主流 Rust Runtime 是 Tokio,非 thread per core 模型,所以使用自带跨线程同步能力的数据结构。

有一个还算可用的是 local-channel 但是其实现上一来性能不佳(使用 Vec 存储,扩容时会产生数据拷贝),二来功能上有所欠缺(只有 unbounded mpsc 实现,无法做 back pressure 等),所以我们在考虑之后决定自己造轮子。

结语

本系列文章到此结束。文章里难免会有一些笔误或者我理解上的错误。如果你想与我讨论,可以直接邮件我(在关于页有联系方式);如果是 Monoio 相关相关的问题,也可以直接在 Github 上提 Issue 或 Discussion。

Monoio 目前仍处于非常不完善的阶段,期待你的贡献:)

另外,我们还搭建了一个国内的 crates.io 和 rustup 的镜像,欢迎使用 RsProxy

如果你想转载本博客的文章,麻烦注明出处谢谢。

欢迎关注我的其它发布渠道