调度与流#
前几章建立了描述 GPU 工作的词汇——
DeviceOperation、and_then 链、zip! 打包。但描述不是执行。
在某个时刻,配方必须到达厨房。本章关于
厨房:什么是 CUDA 流,cuda-oxide 的调度策略如何将工作分配
给它们,以及当 DeviceOperation 变成运行中的 DeviceFuture 时
幕后发生了什么。
参见
CUDA Programming Guide -- Streams 了解调度策略所构建的底层 CUDA 流语义。
结账通道#
将 CUDA 流想象成杂货店的结账通道。每个通道 按顺序处理顾客——排在第一个的人先被服务。但是 多个通道独立运行,所以通道 1 可以为一位顾客结账, 而通道 2 在为另一位顾客包装。商店获得更高的吞吐量, 因为不同通道上的工作在时间上重叠。
GPU 以同样的方式工作。一个流是一个有序的操作队列。 在单个流内,一切按顺序执行——内核 A 在内核 B 开始之前完成。但如果你将内核 A 放在流 0 上,内核 B 放在 流 1 上,它们可以在硬件上重叠:
Stream 0: ┌─── Kernel A ───┐
└────────────────┘
Stream 1: ┌─── Kernel B ───┐
└────────────────┘
◄─── time ────────────────►
只有一个流时,一切都是串行的。有多个流时,独立工作 重叠,GPU 更忙碌。这是 GPU 上并发的 基本机制。
为什么你很少直接操作流#
在 CUDA C++ 中,程序员创建流,决定每个操作 在哪个流上运行,并在一个流上的工作依赖于另一个流的结果时 手动插入事件。这很强大但很繁琐,并且将每个函数 耦合到特定的并发策略。
cuda-oxide 插入了一层间接层:调度策略。你无需自己选择流,
而是将 DeviceOperation 交给策略,它为你选择流。
这意味着同一个管道可以在单流(用于调试)、
四流池(用于吞吐量)或你自己编写的自定义策略上运行——
全部无需更改管道代码。
SchedulingPolicy trait#
调度策略回答一个问题:"给定这个操作,它应该在哪个流上运行?" 该 trait 有三个方法:
init——在启动时调用一次以创建 CUDA 流。schedule——选择一个流并将操作包装到DeviceFuture中以供.await。sync——选择一个流,执行操作,并阻塞直到它完成。
策略是 Sync 的,意味着单个实例在设备上的所有操作之间共享。
流选择必须是线程安全的。
StreamPoolRoundRobin——默认策略#
当你调用 init_device_contexts(0, 1) 时,cuda-oxide 创建一个
带有四个 CUDA 流的 StreamPoolRoundRobin。每次调度操作时,
一个原子计数器前进,池中的下一个流被选中:
Operation 1 → Stream 0 ──► ████████
Operation 2 → Stream 1 ──► ████████ (与 1 重叠)
Operation 3 → Stream 2 ──► ████████ (与 1、2 重叠)
Operation 4 → Stream 3 ──► ████████
Operation 5 → Stream 0 ──► ████████ (等待 1)
选择是无锁的——在 AtomicUsize 上进行单次 fetch_add,
模上池大小。与 GPU 工作的成本相比,开销可以忽略不计。
四个流是一个好的默认值。它为 GPU 提供了足够的工作来 重叠内核执行与内存传输,而不会产生过多的上下文 切换开销。对于大多数工作负载,你永远不需要考虑它—— 策略就是能工作。
轮询的优势场景#
批处理推理: 每个批次是一个独立的管道。轮询 将批次分布到流上,重叠计算。
混合计算 + 传输: 当一个流运行内核时,另一个流在复制 数据。GPU 的复制引擎和计算单元同时工作。
许多小内核: 重叠启动开销减少了内核之间的 间隙,使 GPU 更忙碌。
需要重新考虑的场景#
依赖链繁重: 如果你构建单个
and_then链(如 前一章的前向传播),该链无论如何都完全在一个流上运行。 轮询仅在你调度多个独立操作时才有意义。非常大的内核: 一个饱和 GPU 的单个内核对 多流调度毫无益处。多余的流处于空闲状态。
SingleStream——单通道,严格顺序#
备注
SingleStream 在调度内部已实现,但目前尚未接入
GlobalSchedulingPolicy 或通过 init_device_contexts 暴露。
默认设置始终使用 StreamPoolRoundRobin。
本节描述了未来 API 界面的设计意图。
对于调试,或者当你需要所有操作之间的保证排序时,
SingleStream 将所有内容路由到一个流。每个操作看到
每个先前操作的结果,消除了任何与流相关的
并发错误的可能性:
Operation 1 → Stream 0 ──► ████████
Operation 2 → Stream 0 ──► ████████ (等待 1)
Operation 3 → Stream 0 ──► ████████
小技巧
如果你怀疑 GPU 管道中存在并发错误,切换到
SingleStream 是最快的检查方法。如果错误消失了,那是不同流上的操作之间
缺少依赖关系。如果错误仍然存在,
问题在别处。
设置运行时#
在任何异步操作可以运行之前,你需要初始化线程局部的设备 上下文:
use cuda_async::device_context::init_device_contexts;
init_device_contexts(0, 1)?;
第一个参数是默认 GPU 序号;第二个是要管理的设备数量。
在幕后,这注册了一个线程局部,该局部在首次使用时
为每个设备延迟创建一个 StreamPoolRoundRobin。池默认
持有四个流。
在程序开始时调用一次,在任何 .sync() 或 .await 之前。
在同一线程上调用两次会返回错误。
.await 时发生了什么#
左:DeviceFuture 三状态机。在第一次轮询时,GPU 工作和一个
cuLaunchHostFunc 回调被提交——然后返回 Poll::Pending。
当 GPU 完成时,回调设置 AtomicBool 并唤醒任务。
在第二次轮询时,结果被交付。右:.sync()(线程全程阻塞)vs .await
(GPU 工作时线程运行其他任务)的对比。#
以下是一个操作从构造到完成的完整旅程:
module.kernel_async(...) ← 构建配方(无 GPU 工作)
│
▼
AsyncKernelLaunch ← 一个 DeviceOperation,延迟且流无关
│
│ .await
▼
IntoFuture::into_future() ← 调度策略选择一个流
│
▼
DeviceFuture ← 绑定到流,准备轮询
│
│ 第一次 poll()
▼
execute() on stream 2 ← GPU 工作被提交
cuLaunchHostFunc on stream 2 ← 主机回调在内核之后排队
return Poll::Pending
│
│ ... GPU 在工作,主机线程空闲 ...
│
│ 回调在 CUDA 驱动线程上触发
│ → 设置 AtomicBool,唤醒 AtomicWaker
│
│ 第二次 poll()
▼
return Poll::Ready(Ok(())) ← 结果交付给调用者
关键的洞察是,在第一次 poll() 和回调之间,没有主机
线程被占用。异步运行时挂起任务并运行其他任务。
GPU 在完成时通过 cuLaunchHostFunc 回调通知运行时。
这就是为什么 .await 对并发工作负载比 .sync() 更具扩展性——
你可以有数十个飞行中的操作而不占用
每个操作一个线程。
手动流控制#
大多数情况下,调度策略为你处理流。但有些情况下 你需要直接控制——与期望特定流的 CUDA 库互操作、 计算和传输的细粒度重叠, 或单独分析单个内核。cuda-oxide 为这些情况暴露了完整的流 API。
创建流#
let ctx = CudaContext::new(0)?;
let default = ctx.default_stream(); // 每个上下文的默认(空)流
let custom = ctx.new_stream()?; // 一个新的非阻塞流
默认流在 CUDA 中具有特殊的同步语义(它隐式地
与大多数其他流串行化)。由 new_stream() 创建的
非阻塞流没有此约束,这就是调度策略
专门使用它们的原因。
Fork 和 Join#
一种常见模式是从父流 fork 一个子流,在子流上运行独立
工作,然后将结果 join 回来。fork 创建一个新流,该流与
父流的当前位置有隐式依赖——子流直到父流上所有
先前工作完成后才会开始。join 执行相反操作:父流
在继续之前等待子流完成。
let main = ctx.default_stream();
// 在 main 上上传数据
let buf_a = DeviceBuffer::from_host(&main, &data_a)?;
let buf_b = DeviceBuffer::from_host(&main, &data_b)?;
// Fork:子流能看到上传的数据
let child_1 = main.fork()?;
let child_2 = main.fork()?;
// 并行运行独立工作
module.process(&child_1, cfg, &mut buf_a)?;
module.process(&child_2, cfg, &mut buf_b)?;
// Join:main 等待两个子流完成
main.join(&child_1)?;
main.join(&child_2)?;
// 现在可以在 main 上安全使用 buf_a 和 buf_b
module.combine(&main, cfg, &buf_a, &buf_b)?;
此操作的 GPU 时间线如下:
main: ██ upload_a ██ upload_b ██ ──fork──────────────── join ──► ██ combine ██
| ^ ^
child_1: └─► ██ process_a ██ ─┘ |
| |
child_2: └─► ██ process_b ██ ────┘
在底层,fork 和 join 使用 CUDA 事件——一个流上的 cuEventRecord,
另一个流上的 cuStreamWaitEvent。事件是 GPU 端的同步
令牌;在 fork 或 join 期间没有主机线程被阻塞。
使用事件实现细粒度排序#
当 fork/join 太粗糙时,你可以直接使用事件在不同流中
的特定点之间建立排序:
// 在内核完成后在流 A 上记录一个事件
let event = stream_a.record_event(None)?;
// 流 B 在继续之前等待该特定点
stream_b.wait(&event)?;
CUDA 事件不属于流——它是一个独立的同步
令牌。record 在一个流上的特定点给它打上时间戳;wait 将
依赖插入另一个流。事件是汇合点:在流 B 上 wait 之后
的任何内容直到流 A 上的事件触发后才会运行。
事件也是测量 GPU 执行时间的标准方法。请注意,
计时需要创建不带 CU_EVENT_DISABLE_TIMING 标志的事件——
显式传递标志以启用计时:
use cuda_bindings::CUevent_flags_enum::CU_EVENT_DEFAULT;
let start = stream.record_event(Some(CU_EVENT_DEFAULT))?;
module.my_kernel(&stream, config)?;
let end = stream.record_event(Some(CU_EVENT_DEFAULT))?;
end.synchronize()?;
println!("Kernel took {:.2} ms", start.elapsed_ms(&end)?);
这测量的是实际的 GPU 时间,而非主机端调度开销。record_event(None) 默认创建禁用计时的事件,这些事件不能用于 elapsed_ms。
小技巧
对于日常异步管道,你永远不需要手动创建流、事件或
fork/join。调度策略和 and_then 链自动处理排序。
手动 API 用于互操作、分析和高级优化。
选择正确的方法#
情况 |
推荐方法 |
|---|---|
简单脚本,一个内核 |
|
多阶段管道(GEMM → ReLU → D2H) |
|
独立批次并发运行 |
|
调试疑似流排序错误 |
切换到 |
与现有 CUDA 库互操作 |
使用他们的流调用 |
单独分析内核 |
在启动周围显式使用事件 |
最大吞吐量 |
使用 Nsight Systems 分析,调整池大小 |
参见
CUDA Programming Guide -- Events 了解 CUDA 事件的完整规范。 并发执行一章展示了这些调度 概念应用于真实的多批次工作负载。