下一站 - Ihcblog!

远方的风景与脚下的路 | 子站点:ihc.im

0%

Rust Runtime 设计与实现-设计篇 Part2

本系列文章主要介绍如何设计和实现一个基于 io-uring 的 Thread-per-core 模型的 Runtime。

我们的 Runtime 最终产品 Monoio 现已开源,你可以在 github.com/bytedance/monoio 找到它。

  1. Rust Runtime 设计与实现-科普篇
  2. Rust Runtime 设计与实现-设计篇-Part1
  3. Rust Runtime 设计与实现-设计篇-Part2
  4. Rust Runtime 设计与实现-组件篇
  5. Rust Runtime 设计与实现-IO兼容篇

本文是系列的第三篇,继续讲一些 Runtime 上的设计取舍。

Time Driver 设计

很多场景需要计时器,如超时,需要 select 两个 future,其中一个是 timeout。作为 Runtime 必须支持异步 sleep。

计时器管理与唤醒

Glommio 内部这部分实现较为简单,直接使用了 BTreeMap 维护 Instant -> Waker 的映射,每次拿当前时间去 split_off 拿到所有过期的 timer 唤醒并计算下次 wake 间隔时间,然后在 driver park 的时候作为参数传入。Tokio 中类似,也有自己的时间轮实现,更复杂,但效率也更高(精确度上不如 Glommio 的实现方案)。

考虑到我们性能优先的实现初衷,我们选择类似 Tokio 的时间轮方案。

和 Driver 集成

在 epoll 下,在我们做 wait 前,需要检查当前最近的计时器。如果有,那么必须将它的超时事件作为 wait 的参数传入,否则如果没有任何 IO 在这段时间里就绪,我们就会错过这次计时器的预期唤醒时间,如用户要 timeout 100ms,结果可能 200ms 了才唤醒,这已经失去意义了。

Tokio 内部基于 EPOLL 和 时间轮 做了这件事。EPOLL 作为 IO Driver,并在这个之上封装了 Timer Driver。在 Timer Driver 陷入 syscall 之前,计算时间轮中最近的事件超时间隔,并作为 epoll wait 的参数。

为啥不直接使用 TimerFd 直接利用 epoll 能力?因为这么搞有点重:timerfd 的创建、使用 epoll_ctl 添加和移除都是 syscall,而且不能做粗粒度的合并(时间轮的可以)。

然而,io-uring 的 enter 并不支持传入一个超时时间。我们只能向 SQ 推 TimeoutOp 来做到这件事。

方案1

在插入 element 到时间轮空格子的时候,推 TimeoutOp;并在该格子取消至数量为 0 时推 TimeoutRemoveOp(取消这部分也可以不推,只是要额外付出一次误唤醒的 cost)。

例如,我们会创建 5 个 10ms 的超时,它们会被插入到时间轮的同一个格子。在这个格子中数量从 0 变 1 的时机,我们向 SQ 推一个 10ms 的 TimeoutOp。

方案 2

每次 wait 前计算最近超时时间,推入 SQ 然后 wait;TimeoutOp 中设置 offset = 1。

这里解释一下 offset 参数的含义,简单来说就是当有 $offset 个 CQ 完成时,或超时发生时会完成。

This command will register a timeout operation. The addr field must contain a pointer to a struct timespec64 structure, len must contain 1 to signify one timespec64 structure, timeout_flags may contain IORING_TIMEOUT_ABS for an absolute timeout value, or 0 for a relative timeout. off may contain a completion event count. A timeout will trigger a wakeup event on the completion ring for anyone waiting for events. A timeout condition is met when either the specified timeout expires, or the specified number of events have completed. Either condition will trigger the event. If set to 0, completed events are not counted, which effectively acts like a timer. io_uring timeouts use the CLOCK_MONOTONIC clock source. The request will complete with -ETIME if the timeout got completed through expiration of the timer, or 0 if the timeout got completed through requests completing on their own. If the timeout was cancelled before it expired, the request will complete with -ECANCELED. Available since 5.4.

这样需要在每次 wait 前推 SQ 进去,好处是不需要 remove(因为每次返回时就已经被消费掉了),没有误唤醒问题;并且实现简单,不需要维护 Op 的 user_data 字段用来推 TimeoutRemoveOp。
wait

方案 3

类似方案 2,只不过 TimeoutOp 中的 offset 设置为 0。

这样实现起来较为麻烦:因为 offset = 0 表示它是一个纯粹的计时器,与 CQ 完成个数无关,它只会在实际超时时完成。这样就意味着,我们需要推 TimeoutRemoveOp 或者承担误唤醒开销(Glommio 实现类似这种方案,它的 cost 选择了后者)。
wait

讨论

在插入 TimeoutOp 时,我们应当尽可能晚地插入,因为它可能会被 cancel。所以方案 1 会在 wait 前 0->1->0->1 变化时插入 2 次 TimeoutOp 和 2 次TiemoutRemoveOp,而这是不必要的,方案 1 基本不可取。

方案 2 和 3 在执行时机上和 EPOLL 场景下的 Tokio 以及 Glommio 是一样的。细节上的差别是:

  • 方案 2 下,任何一个 CQ 完成时顺便把 TimeoutOp 给完成掉,这样就不需要 Remove,也就是说不需要维护 user_data,实现上会非常简单,也省了推 TimeoutRemoveOp 以及内核处理的开销。
  • 方案 3 相对 2 的好处是,当 wait 次数很多时,方案 2 每次都要推一个 TimeoutOp 进去,而方案 3 可以检查 TimeoutOp 是否被消耗掉,省一些推入次数;当然,对比方案 2 也有缺点,就是当超时取消时得推 TimeoutRemove 进去。

在我们的实际业务场景中,时间事件绝大多数都是作为超时,少部分是定时轮询用。
超时场景中往往是注册超时并移除超时,真正的超时并非热路径:所以我们这里初步决定是使用方案 2。同时,方案 2 实现简单,后续即便是要优化也成本不大。

跨线程通信

虽然是 thread per core 的 runtime,但是如果没有跨线程通信能力,很多事情是做不了的。比如常见的 case:单个线程拉取配置并分发到每个线程的 thread local 上。

如果只是期望有跨线程通信能力,那么不需要任何 runtime 支持。无论是使用无锁的数据结构,或是跨线程锁,都可以做到。

但我们希望可以在 runtime 层面集成。举例来说,A 线程有一个 channel rx,B 线程有一个 tx,我们通过 B 发送数据,A 可以 await 在 rx 上。这里的实现难点在于,A 线程上的 reactor 可能已经陷入内核进入等待 uring CQ 状态了,我们需要在任务被唤醒时额外唤醒其所在 thread。

Unpark 能力

所以我们需要在 Driver trait 上额外添加一个 Unpark 接口用于跨线程主动唤醒。

在 epoll 下,tokio 内部实现是注册上去一个 eventfd。因为 tokio 本身的调度模型就依赖于跨线程唤醒,所以无论你是否使用 tokio 提供的一些 sync 数据结构,它都会在 epoll 上挂上这么一个 eventfd;而我们的实现主体是不依赖这个的,只有在用到我们实现的 channel 的时候才会依赖,所以我们这里通过条件编译,开启 “sync” feature 才插入相关代码,尽可能做到 zero cost。

在 iouring 下怎么插入 eventfd 呢?同 time driver 中我们实现 park_timeout 做的事情差不多,可以直接推一个 ReadOp 进去读 8 byte,fd 就是 eventfd 的 fd。eventfd 读写都是 8 byte(u64)。

注:文档里提到了两个 syscall flag(IORING_REGISTER_EVENTFD, IORING_REGISTER_EVENTFD_ASYNC),不是做这个事的。
Ref: https://unixism.net/loti/ref-iouring/io_uring_register.html

在什么时机我们需要推入 eventfd 呢?我们可以在内部维护一个状态标记当前 ring 里是否已存在 eventfd。在 sleep 前,如果已存在则直接 sleep,不存在则推一个进去并标记已存在。

当消费 CQ 的时候,遇到 eventfd 对应的 userdata,则标记为不存在,这样下次 sleep 前会重新插入。

当我们需要 unpark 线程时,只需要拿到它对应的 eventfd,并向其中写入 1u64,则这个 fd 就会可读,触发 ring 从 syscall 返回。

我们将 UnparkHandle 维护在一个全局 Map 中便于每个线程都能够唤醒其他线程。在线程创建时,我们向全局注册自己的 UnparkHandle 的 Weak 引用。

当需要跨线程唤醒时,我们只需要从全局 Map 里拿到这个 UnparkHandle 并尝试 upgrade,然后写入数据即可。为了减少对全部 Map 的访问,我们在每个线程中缓存这个映射。

参考 Eventfd 的实现,kernel 内部一来有锁,二来会保证这 8 byte 的 u64 是一口气写完的,不存在乱序问题。所以目前实现改为了直接走 libc::write。(wow so unsafe!)

集成 Waker

在纯本线程下,我们的唤醒逻辑是这样的:

  1. 我们想等待一个在本线程执行 future
  2. 因为事件源是 uring,所以我们在 future 被 poll 时将 task 的 waker 存储在 op 关联的存储区
  3. Uring 产生了事件,唤醒 waker
  4. Waker 执行时将任务重新塞回本线程的执行队列

在 uring driver 下,我们的事件源是 uring,所以 uring 负责存储并唤醒 waker;在 time driver 下,我们的事件源是 time wheel,所以也由其负责存储和唤醒 waker。

现在我们的事件源是其他线程。以 oneshot channel 为例,当 rx poll 时,需要将 waker 存储在 channel 的共享存储区内;在 tx send 后,需要从 channel 共享存储区拿到 waker 并唤醒。waker 的唤醒逻辑不再是无脑把任务加到本地队列,而是需要调度到其所在线程的队列中。

所以这样我们需要为每个 Executor 添加一个 shared_queue 用于共享地存储远程推入的 waker。当非本地 waker 被 wake 时,会将自己添加到目标线程的 queue 中。

Glommio 中的另一种参考实现:

前面说的方案是跨线程传递 waker,可以通用支持 channel、mutex 等数据结构。

还可以不传递 waker,poll 的时候将 waker 加入本线程的数据结构,然后发送端有数据后并不是直接唤醒接收端的 waker,而是直接唤醒它所在的线程,由对端线程轮询所有存在 waker 的 channel。

这种轮询的方式在某些场景下不够高效,且方案并不通用。

Executor 设计

Executor 在 thread per core 下按理说应该非常简单:

  1. 直接做一个 Queue 然后从一端推任务,从另一端消费任务
  2. 在没任务可做时,陷入 syscall 并等待至少一个任务完成
  3. 拿到完成任务后逐个处理,将 syscall 结果应用到 buffer 上,并 wake 对应任务。

在 epoll 下可能确实是这个逻辑;但是在 io-uring 下可能可以做一些额外的优化。

低延迟 or 减少 syscall

在我们推入 SQE 之后,我们可以当场 submit() 以尽快完成 syscall,降低延迟;也可以先等等,等到无事可做时再 submit_and_wait(1)。考虑到尽可能高性能,我们选择第二种方案(Glommio 和 Tokio-uring)——测试数据反映事实上延迟并不高,相比 Glommio 延迟有时反而因为 CPU 利用率降低而更低。在负载相对较低时,也可以采用一些动态的方式决定是否更激进地 submit

饥饿问题

在这个 case 下,饥饿问题往往是用户代码写出了问题导致的。考虑下面几个场景:

  1. 用户的 Future 每次 poll 都会 spawn 一个新任务,然后返回 Ready。
  2. 用户的 Future 每次都会立刻 wake。
  3. 用户的 Future 中的状态转换太多了,或者状态转换出现了死循环。

如果我们选择在没有任务时再处理 IO(包括提交、等待和收割),那么这几个场景下,依赖 IO 的任务都无法得到处理,因为任务队列永远不会为空,或者任务永远执行不完。

对于问题 1 和 2,我们提出了一个做法,与其执行完所有任务,我们设置一个执行上限,当达到上限时强制做一次提交和收割。

对于问题 3,可以类似 Tokio 做一个 coop 机制,限制递归 poll 执行的次数来做到限制状态转换的目的。

欢迎关注我的其它发布渠道