下一站 - Ihcblog!

远方的风景与脚下的路 | 子站点:ihc.im

0%

Rust Runtime 设计与实现-IO兼容篇

本系列文章主要介绍如何设计和实现一个基于 io-uring 的 Thread-per-core 模型的 Runtime。

我们的 Runtime 最终产品 Monoio 现已开源,你可以在 github.com/bytedance/monoio 找到它。

  1. Rust Runtime 设计与实现-科普篇
  2. Rust Runtime 设计与实现-设计篇-Part1
  3. Rust Runtime 设计与实现-设计篇-Part2
  4. Rust Runtime 设计与实现-组件篇
  5. Rust Runtime 设计与实现-IO兼容篇

本文是系列的第五篇。本来写四篇已经结束的,最近增加了 epoll 支持(!73),干脆写一下这块的设计吧。

Monoio 之前仅支持 io_uring,但无论是公司外分发二进制应用,还是公司内大规模发布,都会对内核版本有较高的要求。字节内部的内核版本升级往往是一个很漫长的过程,而外部用户的内核就更难以保证了。

所以我们想要提供 legacy 的 IO driver 支持。不同于业界已有方案,我们计划将其作为无感知的支持方式:即用户或框架仍然使用类似 uring 模式操作 IO(即我们的 AsyncReadRent/AsyncWriteRent),Runtime 内部通过 syscall 感知当前环境的支持情况并做 fallback。

也就是说,用户只需要写一套代码而无需担心兼容性问题(目前其他开源方案都强绑定了 uring 和 epoll 之一,以及绑定了对应的用户编程模式和 API,无法做到无缝迁移和低版本环境无缝适配)。

epoll 的正确使用姿势

legacy io driver 在 linux 下当然是 epoll 了。

在本系列分享的第一篇里已经简要介绍过了,当时我们的例子中使用了 polling 这个 crate 提供的包装,这个包装在 epoll 实现中用了 oneshot 模式。和字面意思一样,oneshot 只会被触发一次,所以需要每次使用完继续等待时再调用 syscall add 上去。往往我们需要对同一个 fd 做多次操作,所以这种一次性的就绪通知明显不太经济。

Mio 作为使用更为广泛的 poll 包装库,使用了更为高效的 edge trigger 模式。这种模式下,fd 和对应 interest 仅需要被注册一次,epoll_wait 仅在 io 的就绪状态由未就绪切换为就绪态时得到通知。

当然,有得就有舍,这种模式下我们必须在用户态维护就绪状态。虽然这个话题可以说是校招面试必问八股文之一了,还是顺便提一嘴。比如我们仅依靠内核的通知,那么我们要么一次把 io 搞到空,否则在某次 io 操作后,可能 io 还是在就绪态,我们等 kernel 的通知就永远等不来了。所以内部需要记录一下,如果 io 已经就绪了,就不等待了,直接做对应 syscall。

Mio 基本算是跨平台 poll 的较 low level 封装,其内部不维护前面说到的 io 就绪状态,这个状态维护是需要其更上层的 runtime 做的。

我们以 Tokio 为例简要分析一下注册和操作 IO 两个过程。

tokio-epoll

IO 注册

在创建 IO 时,如 TcpListener accept 到一个 fd,那么这个 fd 会被包装为 TcpStream。TcpStream 内其实是一个 PollEvented 结构,很多面向用户的结构(如 UnixStream 等)都只是 PollEvented 的包装。

PollEvented 创建时即会做 fd 和 interest 的注册,drop 时会解注册。IO 的就绪状态和等待者是在一个统一的 Slab 中管理的(对应 Registration 这个概念)。PollEvented 内部包含 io注册信息 两部分;注册信息 又持有了 Driver Handle 和 **Ref<ScheduledIo>**。

在 PollEvented 创建时,会从 TLS 拿到当前 Driver 的 Handle,并将自己的 Interest 注册上去。注册的时候 Driver 内部会在 slab 中分配一块空间存放其状态信息(即 ScheduledIo),并在返回的注册信息中记录该状态在 slab 中的 index。

IO 操作

通过 PollEvented,tokio 可以为其他上层网络封装提供 poll_readpoll_write 的方法实现。

poll_read 实现的核心代码这么几行:

1
2
3
4
let n = ready!(self.registration.poll_read_io(cx, || {
let b = &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]);
self.io.as_ref().unwrap().read(b)
}))?;

其将读方法作为闭包丢给 poll_read_io 处理:

1
2
3
4
5
6
7
8
9
10
11
12
13
loop {
let ev = ready!(self.poll_ready(cx, direction))?;

match f() {
Ok(ret) => {
return Poll::Ready(Ok(ret));
}
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
self.clear_readiness(ev);
}
Err(e) => return Poll::Ready(Err(e)),
}
}

poll_read_io 内部主要做两件事情:

  1. 等待 fd ready

  2. 做 io

    1. 如果 io 返回 WOULD_BLOCK 则将注册信息中 Ref<ScheduledIo> 清除就绪状态,并继续循环。

难点主要在于 poll_readypoll_ready 又会调用 poll_readiness

poll_readiness 这段代码比较复杂,复杂度主要在于解决跨线程下的同步问题。其 atomic 中包含了 Generation 和 Tick 信息,在修改时会校验并 CAS。其中 Generation 是全局递增的,用于标记线程;Tick 是线程级别的,用于标记不同轮次的 epoll wait。这些机制保证了在 fd 被跨线程操作时不会漏信号。

不过鉴于我们的 Runtime 本身设计上不支持这类操作,所以在此不详细展开。总结一下,tokio 对 io 的操作在 driver 内部只有感知 readiness 能力,io syscall 的执行是交给网络组件自己做的。

兼容方案

我们需要基于 epoll(mio) 的 driver 暴露和基于 io_uring driver 相同的接口。

模拟 uring 方案

因为 uring = 等待就绪+执行;而 epoll 只有等待就绪这个能力,所以一个很直接的想法是,我们把 syscall 执行帮忙做掉不就把能力补齐了吗?基于 Mio 自行“模拟”一个 uring,暴露 uring 形式的接口。

Tokio 为代表的基于 epoll 的 runtime 只需要感知到就绪事件后,设置 fd 就绪状态并唤醒对应 Interest 的 tasks;我们模拟 uring 的话在此基础上还要帮用户做 syscall。

每个 fd 对应一个附属结构,其中包含:

  1. 读写队列(分开,两个独立的队列):

    1. 当通过 epoll 感知到 fd 就绪时,设置 fd 的就绪状态,执行 syscall 并 wake 队列中的 task。
    2. 除了存储等待在其上的 waker 外,还需要存储任务本身,因为我们需要帮用户执行 syscall,syscall 成功执行后才 wake task。
  2. 就绪状态:新推入读写队列的请求可以根据这个状态来决定是否直接执行 syscall,或将自身放入读写队列中。

这个方案需要我们抽象 uring 的访问形式,而不是目前的裸接入 uring。

考虑到这部分其实有大量冗余封装和匹配开销,没有采用。

OpAble 方案

一个 Op 结构(如 Connect,其指定了 fd 和 OsSocketAddr)产生 io_uring Entry 是由其自己实现的,那么我们能不能将这个实现扩展为一个 Trait,让其自行实现如何对接 epoll 呢?

让我们思考一下,基于 io_uring,Connect 这类结构(下文将其称为 Op,因为它代表了一个 Operation)只需要造出一个 Entry,之后 driver 内部将其推入 SQ,然后从 CQ 消费到的是一个通用表示,包括 user_data、result 和 flag。driver 内部只需要用 user_data 从状态存储 Slab 定位到该任务,并设置状态、唤醒 waiter。

在 epoll 下,基本单位不再是“任务”,而是 io。io 包括了其对应的就绪状态、等待在其上的读 waiter、写 waiter。driver 通过 epoll_wait 感知到某个就绪事件后,需要拿着该事件的 user_data 从 Slab 中查找到该 io,更新其就绪状态,并根据该事件中的就绪状态决定是否唤醒读 waiter 和 写 waiter。

唤醒 waiter 之后,还需要执行 io syscall。这里其实有 3 种方案:

  1. 内部实现,内部执行 -> 对应前面的模拟 uring 方案
  2. 外部实现,内部执行 -> 本方案,外部 Op 以实现 OpAble 的形式提供具体的 syscall 执行函数
  3. 外部实现,外部执行 -> tokio 等 runtime 的方案,driver 对外暴露 poll_readiness,具体的执行者自己做 syscall,并在 syscall 返回后判断是否是 WOULD_BLOCK,如果是则需要设置 runtime 内的 readiness(可以参考 net/TcpStream 的实现)

对比方案 1,本方案给 Op 提供了更大的自由度(虽然其实也没啥用,但能省点封装),可以用来实现 async fn ready() 这种行为(不做 syscall 即可,这个是由 Op 自己实现决定的);另外也可以省掉一些 match 比较开销(这也是封装带来的代价)。

对比方案 3,将 syscall 在 driver 内部执行掉一来可以省掉 WOULD_BLOCK 判断(重复代码,每个组件都要写的),二来对组件屏蔽了 uring 和 epoll 的差异,因为在本方案中,epoll 和 uring 都是向组件直接返回 syscall 结果。

1
2
3
4
5
6
7
8
9
pub(crate) trait OpAble {
#[cfg(all(target_os = "linux", feature = "iouring"))]
fn uring_op(self: &mut std::pin::Pin<Box<Self>>) -> io_uring::squeue::Entry;

#[cfg(feature = "legacy")]
fn legacy_interest(&self) -> Option<(super::legacy::ready::Direction, usize)>;
#[cfg(feature = "legacy")]
fn legacy_call(self: &mut std::pin::Pin<Box<Self>>) -> io::Result<u32>;
}

在本方案中,组件(如 net/TcpStream)只感知 Op(不感知 uring/epoll 差别);Op 实现 uring 行为和 epoll 行为(作为提供透明兼容能力的 runtime,当然都要实现啦);Runtime 启动后选用的不同 driver 会调用不同的 Op 行为。

一个 Op 实现例子(这个例子中从 mio 里搬了一部分代码,它的实现本身并不涉及 epoll/uring 的判定):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
/// Accept
pub(crate) struct Accept {
#[allow(unused)]
pub(crate) fd: SharedFd,
pub(crate) addr: MaybeUninit<libc::sockaddr_storage>,
pub(crate) addrlen: libc::socklen_t,
}

impl Op<Accept> {
/// Accept a connection
pub(crate) fn accept(fd: &SharedFd) -> io::Result<Self> {
Op::submit_with(Accept {
fd: fd.clone(),
addr: MaybeUninit::uninit(),
addrlen: size_of::<libc::sockaddr_storage>() as libc::socklen_t,
})
}
}

impl OpAble for Accept {
#[cfg(all(target_os = "linux", feature = "iouring"))]
fn uring_op(self: &mut std::pin::Pin<Box<Self>>) -> io_uring::squeue::Entry {
opcode::Accept::new(
types::Fd(self.fd.raw_fd()),
self.addr.as_mut_ptr() as *mut _,
&mut self.addrlen,
)
.build()
}

#[cfg(feature = "legacy")]
fn legacy_interest(&self) -> Option<(Direction, usize)> {
self.fd.registered_index().map(|idx| (Direction::Read, idx))
}

#[cfg(feature = "legacy")]
fn legacy_call(self: &mut std::pin::Pin<Box<Self>>) -> io::Result<u32> {
let fd = self.fd.as_raw_fd();
let addr = self.addr.as_mut_ptr() as *mut _;
let len = &mut self.addrlen;
// Here I use copied some code from mio because I don't want the convertion.

// On platforms that support it we can use `accept4(2)` to set `NONBLOCK`
// and `CLOEXEC` in the call to accept the connection.
#[cfg(any(
// Android x86's seccomp profile forbids calls to `accept4(2)`
// See https://github.com/tokio-rs/mio/issues/1445 for details
all(
not(target_arch="x86"),
target_os = "android"
),
target_os = "dragonfly",
target_os = "freebsd",
target_os = "illumos",
target_os = "linux",
target_os = "netbsd",
target_os = "openbsd"
))]
return syscall_u32!(accept4(
fd,
addr,
len,
libc::SOCK_CLOEXEC | libc::SOCK_NONBLOCK,
));

// But not all platforms have the `accept4(2)` call. Luckily BSD (derived)
// OSes inherit the non-blocking flag from the listener, so we just have to
// set `CLOEXEC`.
#[cfg(any(
all(target_arch = "x86", target_os = "android"),
target_os = "ios",
target_os = "macos",
target_os = "redox"
))]
return {
let stream_fd = syscall_u32!(accept(fd, addr, len))? as i32;
syscall_u32!(fcntl(stream_fd, libc::F_SETFD, libc::FD_CLOEXEC))
.and_then(|_| syscall_u32!(fcntl(stream_fd, libc::F_SETFL, libc::O_NONBLOCK)))
.map_err(|e| {
let _ = syscall_u32!(close(stream_fd));
e
})?;
Ok(stream_fd as _)
};
}
}

IO 注册问题

别忘了,使用 epoll 需要将 fd 和 interest 事先注册上去,并在内部管理就绪状态,不是 uring 那种把任务扔进去就行的。

网络组件持有 fd 实际上持有的是 SharedFd(它设计上的主要目的是在 uring entry 推入 kernel 后能够维持 fd 打开的状态),当创建 SharedFd 时,我们可以在其内部判断当前 io driver 类型并决定是否注册,同理 drop 时也会做摘掉的操作。

就绪状态和等待者的管理仍旧采用 Slab 便于 O1 查找、删除。

能力暴露

我们支持了 Legacy 和 Uring 两种 io driver,怎么暴露给用户使用呢?

首先我们通过暴露 feature 利用条件编译来消除用户不想要的分支(比如用户明确指定要 uring 或 legacy);其次是我们通过宏参数或者 builder 参数来允许用户在运行时指定 driver。

同时,我们也提供了一个 FusionDriver,用于自动探测当前平台支持性,并自动选用对应 driver。默认情况下,使用 #[monoio::main] 就会使用这个行为,方便不同平台不同内核版本的用户都能愉快启动。

欢迎关注我的其它发布渠道