调度与流#

前几章建立了描述 GPU 工作的词汇—— DeviceOperationand_then 链、zip! 打包。但描述不是执行。 在某个时刻,配方必须到达厨房。本章关于 厨房:什么是 CUDA 流,cuda-oxide 的调度策略如何将工作分配 给它们,以及当 DeviceOperation 变成运行中的 DeviceFuture 时 幕后发生了什么。

参见

CUDA Programming Guide -- Streams 了解调度策略所构建的底层 CUDA 流语义。

结账通道#

将 CUDA 流想象成杂货店的结账通道。每个通道 按顺序处理顾客——排在第一个的人先被服务。但是 多个通道独立运行,所以通道 1 可以为一位顾客结账, 而通道 2 在为另一位顾客包装。商店获得更高的吞吐量, 因为不同通道上的工作在时间上重叠。

GPU 以同样的方式工作。一个是一个有序的操作队列。 在单个流内,一切按顺序执行——内核 A 在内核 B 开始之前完成。但如果你将内核 A 放在流 0 上,内核 B 放在 流 1 上,它们可以在硬件上重叠:

Stream 0:  ┌─── Kernel A ───┐
           └────────────────┘
Stream 1:       ┌─── Kernel B ───┐
                └────────────────┘

           ◄─── time ────────────────►

只有一个流时,一切都是串行的。有多个流时,独立工作 重叠,GPU 更忙碌。这是 GPU 上并发的 基本机制。

为什么你很少直接操作流#

在 CUDA C++ 中,程序员创建流,决定每个操作 在哪个流上运行,并在一个流上的工作依赖于另一个流的结果时 手动插入事件。这很强大但很繁琐,并且将每个函数 耦合到特定的并发策略。

cuda-oxide 插入了一层间接层:调度策略。你无需自己选择流, 而是将 DeviceOperation 交给策略,它为你选择流。 这意味着同一个管道可以在单流(用于调试)、 四流池(用于吞吐量)或你自己编写的自定义策略上运行—— 全部无需更改管道代码。

SchedulingPolicy trait#

调度策略回答一个问题:"给定这个操作,它应该在哪个流上运行?" 该 trait 有三个方法:

  • init——在启动时调用一次以创建 CUDA 流。

  • schedule——选择一个流并将操作包装到 DeviceFuture 中以供 .await

  • sync——选择一个流,执行操作,并阻塞直到它完成。

策略是 Sync 的,意味着单个实例在设备上的所有操作之间共享。 流选择必须是线程安全的。

StreamPoolRoundRobin——默认策略#

当你调用 init_device_contexts(0, 1) 时,cuda-oxide 创建一个 带有四个 CUDA 流的 StreamPoolRoundRobin。每次调度操作时, 一个原子计数器前进,池中的下一个流被选中:

Operation 1  →  Stream 0  ──► ████████
Operation 2  →  Stream 1  ──►    ████████         (与 1 重叠)
Operation 3  →  Stream 2  ──►       ████████      (与 1、2 重叠)
Operation 4  →  Stream 3  ──►          ████████
Operation 5  →  Stream 0  ──►                ████████  (等待 1)

选择是无锁的——在 AtomicUsize 上进行单次 fetch_add, 模上池大小。与 GPU 工作的成本相比,开销可以忽略不计。

四个流是一个好的默认值。它为 GPU 提供了足够的工作来 重叠内核执行与内存传输,而不会产生过多的上下文 切换开销。对于大多数工作负载,你永远不需要考虑它—— 策略就是能工作。

轮询的优势场景#

  • 批处理推理: 每个批次是一个独立的管道。轮询 将批次分布到流上,重叠计算。

  • 混合计算 + 传输: 当一个流运行内核时,另一个流在复制 数据。GPU 的复制引擎和计算单元同时工作。

  • 许多小内核: 重叠启动开销减少了内核之间的 间隙,使 GPU 更忙碌。

需要重新考虑的场景#

  • 依赖链繁重: 如果你构建单个 and_then 链(如 前一章的前向传播),该链无论如何都完全在一个流上运行。 轮询仅在你调度多个独立操作时才有意义。

  • 非常大的内核: 一个饱和 GPU 的单个内核对 多流调度毫无益处。多余的流处于空闲状态。

SingleStream——单通道,严格顺序#

备注

SingleStream 在调度内部已实现,但目前尚未接入 GlobalSchedulingPolicy 或通过 init_device_contexts 暴露。 默认设置始终使用 StreamPoolRoundRobin。 本节描述了未来 API 界面的设计意图。

对于调试,或者当你需要所有操作之间的保证排序时, SingleStream 将所有内容路由到一个流。每个操作看到 每个先前操作的结果,消除了任何与流相关的 并发错误的可能性:

Operation 1  →  Stream 0  ──► ████████
Operation 2  →  Stream 0  ──►          ████████   (等待 1)
Operation 3  →  Stream 0  ──►                  ████████

小技巧

如果你怀疑 GPU 管道中存在并发错误,切换到 SingleStream 是最快的检查方法。如果错误消失了,那是不同流上的操作之间 缺少依赖关系。如果错误仍然存在, 问题在别处。

设置运行时#

在任何异步操作可以运行之前,你需要初始化线程局部的设备 上下文:

use cuda_async::device_context::init_device_contexts;

init_device_contexts(0, 1)?;

第一个参数是默认 GPU 序号;第二个是要管理的设备数量。 在幕后,这注册了一个线程局部,该局部在首次使用时 为每个设备延迟创建一个 StreamPoolRoundRobin。池默认 持有四个流。

在程序开始时调用一次,在任何 .sync().await 之前。 在同一线程上调用两次会返回错误。

.await 时发生了什么#

async-programming/images/device-future-polling.svg

左:DeviceFuture 三状态机。在第一次轮询时,GPU 工作和一个 cuLaunchHostFunc 回调被提交——然后返回 Poll::Pending。 当 GPU 完成时,回调设置 AtomicBool 并唤醒任务。 在第二次轮询时,结果被交付。右:.sync()(线程全程阻塞)vs .await (GPU 工作时线程运行其他任务)的对比。#

以下是一个操作从构造到完成的完整旅程:

module.kernel_async(...)       ← 构建配方(无 GPU 工作)
        │
        ▼
  AsyncKernelLaunch            ← 一个 DeviceOperation,延迟且流无关
        │
        │  .await
        ▼
  IntoFuture::into_future()    ← 调度策略选择一个流
        │
        ▼
  DeviceFuture                 ← 绑定到流,准备轮询
        │
        │  第一次 poll()
        ▼
  execute() on stream 2        ← GPU 工作被提交
  cuLaunchHostFunc on stream 2 ← 主机回调在内核之后排队
  return Poll::Pending
        │
        │  ... GPU 在工作,主机线程空闲 ...
        │
        │  回调在 CUDA 驱动线程上触发
        │  → 设置 AtomicBool,唤醒 AtomicWaker
        │
        │  第二次 poll()
        ▼
  return Poll::Ready(Ok(()))   ← 结果交付给调用者

关键的洞察是,在第一次 poll() 和回调之间,没有主机 线程被占用。异步运行时挂起任务并运行其他任务。 GPU 在完成时通过 cuLaunchHostFunc 回调通知运行时。 这就是为什么 .await 对并发工作负载比 .sync() 更具扩展性—— 你可以有数十个飞行中的操作而不占用 每个操作一个线程。

手动流控制#

大多数情况下,调度策略为你处理流。但有些情况下 你需要直接控制——与期望特定流的 CUDA 库互操作、 计算和传输的细粒度重叠, 或单独分析单个内核。cuda-oxide 为这些情况暴露了完整的流 API。

创建流#

let ctx = CudaContext::new(0)?;
let default = ctx.default_stream();  // 每个上下文的默认(空)流
let custom  = ctx.new_stream()?;     // 一个新的非阻塞流

默认流在 CUDA 中具有特殊的同步语义(它隐式地 与大多数其他流串行化)。由 new_stream() 创建的 非阻塞流没有此约束,这就是调度策略 专门使用它们的原因。

Fork 和 Join#

一种常见模式是从父流 fork 一个子流,在子流上运行独立 工作,然后将结果 join 回来。fork 创建一个新流,该流与 父流的当前位置有隐式依赖——子流直到父流上所有 先前工作完成后才会开始。join 执行相反操作:父流 在继续之前等待子流完成。

let main = ctx.default_stream();

// 在 main 上上传数据
let buf_a = DeviceBuffer::from_host(&main, &data_a)?;
let buf_b = DeviceBuffer::from_host(&main, &data_b)?;

// Fork:子流能看到上传的数据
let child_1 = main.fork()?;
let child_2 = main.fork()?;

// 并行运行独立工作
module.process(&child_1, cfg, &mut buf_a)?;
module.process(&child_2, cfg, &mut buf_b)?;

// Join:main 等待两个子流完成
main.join(&child_1)?;
main.join(&child_2)?;

// 现在可以在 main 上安全使用 buf_a 和 buf_b
module.combine(&main, cfg, &buf_a, &buf_b)?;

此操作的 GPU 时间线如下:

main:      ██ upload_a ██ upload_b ██ ──fork──────────────── join ──► ██ combine ██
                                          |                    ^  ^
child_1:                                  └─► ██ process_a ██ ─┘  |
                                          |                       |
child_2:                                  └─► ██ process_b ██ ────┘

在底层,forkjoin 使用 CUDA 事件——一个流上的 cuEventRecord, 另一个流上的 cuStreamWaitEvent。事件是 GPU 端的同步 令牌;在 fork 或 join 期间没有主机线程被阻塞。

使用事件实现细粒度排序#

fork/join 太粗糙时,你可以直接使用事件在不同流中 的特定点之间建立排序:

// 在内核完成后在流 A 上记录一个事件
let event = stream_a.record_event(None)?;

// 流 B 在继续之前等待该特定点
stream_b.wait(&event)?;

CUDA 事件不属于流——它是一个独立的同步 令牌。record 在一个流上的特定点给它打上时间戳;wait 将 依赖插入另一个流。事件是汇合点:在流 B 上 wait 之后 的任何内容直到流 A 上的事件触发后才会运行。

事件也是测量 GPU 执行时间的标准方法。请注意, 计时需要创建不带 CU_EVENT_DISABLE_TIMING 标志的事件—— 显式传递标志以启用计时:

use cuda_bindings::CUevent_flags_enum::CU_EVENT_DEFAULT;

let start = stream.record_event(Some(CU_EVENT_DEFAULT))?;
module.my_kernel(&stream, config)?;
let end = stream.record_event(Some(CU_EVENT_DEFAULT))?;
end.synchronize()?;
println!("Kernel took {:.2} ms", start.elapsed_ms(&end)?);

这测量的是实际的 GPU 时间,而非主机端调度开销。record_event(None) 默认创建禁用计时的事件,这些事件不能用于 elapsed_ms

小技巧

对于日常异步管道,你永远不需要手动创建流、事件或 fork/join。调度策略和 and_then 链自动处理排序。 手动 API 用于互操作、分析和高级优化。

选择正确的方法#

情况

推荐方法

简单脚本,一个内核

module.kernel_async(...).sync()

多阶段管道(GEMM → ReLU → D2H)

and_then 链,策略选择一个流

独立批次并发运行

tokio::spawn 每个批次,轮询分发

调试疑似流排序错误

切换到 SingleStream

与现有 CUDA 库互操作

使用他们的流调用 .sync_on(&stream)

单独分析内核

在启动周围显式使用事件

最大吞吐量

使用 Nsight Systems 分析,调整池大小

参见

CUDA Programming Guide -- Events 了解 CUDA 事件的完整规范。 并发执行一章展示了这些调度 概念应用于真实的多批次工作负载。