并发执行#
你知道了如何使用 DeviceOperation 描述 GPU 工作,使用 and_then 和 zip!
组合多阶段管道,并让调度策略分配流。
现在到了回报部分:同时运行多个管道。
本章从头到尾讲解 async_mlp 示例,展示
Tokio 任务、轮询调度器和 Rust 的所有权模型如何
协同工作,在一个 GPU 上并发处理四个批次。
参见
CUDA Programming Guide -- Multi-Device System 了解 CUDA 在多 GPU 上下文、对等访问和跨设备内存方面的规则。
场景#
你正在运行一个三层 MLP 前向传播:GEMM(矩阵乘法)、 MatVec(矩阵-向量乘积)和 ReLU(激活函数)。你有两个 权重矩阵已加载到 GPU 上,并需要处理四个批次的输入数据。 每个批次是独立的——批次 0 不依赖于批次 1——但它们都 共享相同的权重。
四个 MLP 前向传播并发运行。顶部:共享权重一次性上传
为 Arc<DeviceBox>,廉价地克隆到每个批次中。中间:GPU
时间线——轮询将批次分布到四个流上,交错的管道重叠。
底部:顺序处理花费约一个批次的 4 倍时间;如果 GPU 有空闲 SM,
并发处理花费约 1.3 倍时间。#
顺序方法逐批次处理:
Batch 0: ████ GEMM ████ MatVec ██ ReLU █ D2H █
Batch 1: ████ GEMM ████ ...
并发方法在不同流上重叠它们:
Stream 0: ████ GEMM ████ MatVec ██ ReLU █ D2H █
Stream 1: ████ GEMM ████ MatVec ██ ReLU █ D2H █
Stream 2: ████ GEMM ████ MatVec ██ ReLU █ D2H █
Stream 3: ████ GEMM ████ MatVec ██ ReLU █ D2H █
如果 GPU 有足够的 SM 来同时运行多个内核,
重叠版本显著更快完成。而且因为每个管道是在一个流上的
单个 and_then 链,批次内的阶段仍然是严格排序的——不需要
跨流同步。
步骤 1:初始化运行时#
一切从 init_device_contexts 开始。这创建 CUDA 上下文、
设置调度策略(默认四流轮询),并使线程局部状态
对 .sync() 和 .await 可用:
use cuda_async::device_context::init_device_contexts;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
init_device_contexts(0, 1)?;
let module = kernels::load_async(0)?;
嵌入式模块通过线程局部异步上下文加载。类型化的 模块句柄可以廉价地克隆到每个批次管道中。
步骤 2:上传共享权重#
模型有两个权重矩阵:W0(DIM x DIM)和 W1(DIM)。两者都需要
在所有四个前向传播期间驻留在设备上。这就是
zip! 和 .arc() 发挥作用的地方:
let w0_host: Vec<f32> = (0..DIM * DIM)
.map(|i| ((i % 7) as f32 - 3.0) * 0.01)
.collect();
let w1_host: Vec<f32> = (0..DIM)
.map(|i| ((i % 5) as f32 - 2.0) * 0.01)
.collect();
let (w0, w1): (Arc<DeviceBox<[f32]>>, Arc<DeviceBox<[f32]>>) = zip!(
h2d(w0_host).arc(),
h2d(w1_host).arc()
).await?;
zip! 将两个独立的 H2D 传输打包成一个操作。.arc() 将
每个结果包装在 Arc 中以便权重可以共享。.await 在
池中的一个流上调度组合操作并等待其完成。
在这行之后,w0 和 w1 是 Arc<DeviceBox<[f32]>>——廉价克隆、
安全共享,并固定在设备上。
步骤 3:构建并调度批次管道#
现在是最有趣的部分。对于每个批次,你构建一个延迟管道(尚未产生 GPU
工作)并将其交给 tokio::spawn:
let num_batches = 4;
let mut handles = vec![];
for batch_idx in 0..num_batches {
let w0 = w0.clone(); // Arc 克隆:约 1 ns
let w1 = w1.clone();
let module = module.clone();
let batch_data: Vec<f32> = (0..DIM * DIM)
.map(|i| ((i + batch_idx * 37) % 13) as f32 * 0.1)
.collect();
let pipeline = zip!(h2d(batch_data), zeros(DIM * DIM), zeros(DIM))
.and_then(move |(input, hidden, output)| {
// 阶段 1:GEMM — hidden = input × W0
// ... 构建 AsyncKernelLaunch,推送参数,用 and_then 链接 ...
})
.and_then(move |(hidden, output, w1, module)| {
// 阶段 2:MatVec — output = hidden × W1
// ...
})
.and_then(move |(output, module)| {
// 阶段 3:ReLU — result = max(0, output)
// ...
})
.and_then(d2h); // 阶段 4:将结果复制到主机
handles.push(tokio::spawn(pipeline.into_future()));
}
让我们解析这里发生了什么:
pipeline是一个DeviceOperation。 它描述了整个前向传播 但不执行任何 GPU 工作。构建它纯粹是主机端的计算——分配闭包和结构体。.into_future()将其转换为DeviceFuture。 这触发了 调度策略,该策略选择一个流。批次 0 获得流 0,批次 1 获得流 1, 以此类推——轮询分配。tokio::spawn将 future 交给 Tokio 运行时。 运行时将在 执行器有容量时轮询它。在第一次轮询时,管道的execute()运行, 将所有 GPU 工作提交到分配的流。运行时是空闲的。 第一次轮询后,任务被挂起。GPU 正在四个流上同时进行数字运算。没有主机线程在 等待。
当 GPU 完成某个管道的工作时,
cuLaunchHostFunc回调 触发,唤醒相应的 Tokio 任务。运行时重新轮询它, 交付Vec<f32>结果。
步骤 4:收集结果#
for (i, handle) in handles.into_iter().enumerate() {
let result: Vec<f32> = handle.await??;
println!("Batch {}: {} elements, first 4 = {:?}",
i, result.len(), &result[..4]);
}
双 ? 解包两层:外层的 JoinError(如果 Tokio
任务发生了 panic)和内层的 DeviceError(如果 GPU 工作失败)。在
生产系统中,你应该分别处理这些错误。
.await vs .sync() vs tokio::spawn#
看到三者都实际运行后,以下是何时使用每个的指南:
.sync() 阻塞调用线程直到 GPU 完成。在
脚本、测试以及任何没有异步运行时的地方使用它。简单,没有
仪式感,但也没有并发——主机线程被卡在等待中:
let result = pipeline.sync()?;
.await 在 GPU 工作时让出当前异步任务。同一
Tokio 线程上的其他任务可以取得进展。这比 .sync() 对
吞吐量更好,但在任务内部仍然是顺序的——.await 之后的代码在
操作完成之前不会运行:
let result = pipeline.await?;
tokio::spawn(op.into_future()) 将管道启动为完全
独立的任务。启动代码立即继续,结果
稍后通过 join 句柄到达。这是实现真正并发的方式
——多个管道同时在不同流上运行:
let handle = tokio::spawn(pipeline.into_future());
// ... 启动更多管道,做其他工作 ...
let result = handle.await??;
小技巧
对于依赖操作链(如四阶段前向传播),
优先使用 and_then 而非顺序的 .await。and_then 链完全在
一个流上运行,阶段之间调度开销为零。
顺序 .await 每次操作都经过调度策略,
可能落在不同流上并需要跨流同步。
所有权模式#
Rust 中的并发 GPU 编程意味着 Rust 的所有权规则在积极 为你工作——偶尔也会阻碍你。以下是反复出现的模式。
通过 and_then 闭包移动数据#
每个 and_then 闭包通过 move 捕获数据。内核启动产生
(),所以你需要显式地向前传递下一阶段需要的缓冲区:
launch_gemm(input, hidden, w0)
.and_then(move |()| {
// input 被内核消耗。hidden 和 module 存活
// 因为它们被捕获但没有被消耗。
value((hidden, output, w1, module))
})
闭包返回一个包含下一阶段所需一切元组的 Value。
这个元组是在阶段之间传递的"接力棒"。
使用 Arc 共享不可变数据#
模型权重、查找表和其他跨管道共享的不可变数据应该
包装在 Arc 中。.arc() 组合器自动为
DeviceOperation 的输出执行此操作。对于你已有数据,Arc::new() 也可行:
let weights = h2d(weight_data).arc().await?; // Arc<DeviceBox<[f32]>>
for batch in batches {
let w = weights.clone(); // 廉价的引用计数增量
tokio::spawn(forward_pass(batch, w).into_future());
}
保持设备内存存活#
DeviceBox 必须保持存活直到 GPU 完成使用它。在 and_then
链中这是自动的——闭包拥有 DeviceBox,它存活直到
下一阶段取走它。危险在于 with_context 闭包,你在其中执行
异步复制,而 DeviceBox 可能在复制完成之前被丢弃:
fn d2h(dev: DeviceBox<[f32]>) -> impl DeviceOperation<Output = Vec<f32>> {
with_context(move |ctx| {
let stream = ctx.get_cuda_stream();
let mut host = vec![0.0f32; dev.len()];
unsafe {
memcpy_dtoh_async(
host.as_mut_ptr(), dev.cu_deviceptr(),
dev.len() * std::mem::size_of::<f32>(),
stream.cu_stream(),
).unwrap();
}
// dev 被闭包捕获,并在闭包返回之前存活。
// 流将在结果被消费之前同步,
// 因此异步复制在 dev 被丢弃之前完成。
value(host)
})
}
关键:dev 被 move 闭包捕获,在闭包返回之前不会被丢弃。
由于流在 DeviceFuture 交付结果之前同步,
异步复制在 dev 被释放之前完成。
错误处理#
GPU 工作产生的错误通过 Result 链传播,就像任何 Rust
代码一样。当运行并发批次时,你通常希望即使一个批次失败也继续
处理:
for (i, handle) in handles.into_iter().enumerate() {
match handle.await {
Ok(Ok(result)) => {
println!("Batch {i}: {} elements", result.len());
}
Ok(Err(device_err)) => {
eprintln!("Batch {i}: GPU error: {device_err}");
}
Err(join_err) => {
eprintln!("Batch {i}: task panicked: {join_err}");
}
}
}
三个分支对应:成功、CUDA 驱动或调度错误 (例如内存不足、无效的启动配置)以及 Tokio 任务 panic。
多设备执行#
对于具有多个 GPU 的系统,将更高的设备计数传递给
init_device_contexts:
init_device_contexts(0, 2)?; // 默认设备 0,容量为 2 个设备
这将默认设备设置为 GPU 0,并准备好线程局部映射以支持
两个设备。每个设备的 CUDA 上下文、调度策略和流池
在首次使用时延迟创建。所有策略驱动的操作(.sync()、.await)
使用默认设备,除非你显式指定另一个设备。
ExecutionContext 携带设备 ID,因此 GPU 0 上的操作永远不会
干扰 GPU 1 上的流。
小技巧
多设备编程需要注意内存位置。在 GPU 0 上分配的
DeviceBox 在 GPU 1 上不可访问,除非启用对等
访问。在构建设备特定操作时,使用 with_context 检查
ctx.get_device_id()。
性能调优#
流池大小#
默认的四流池对大多数工作负载运行良好,但如果你在 追求最后几个百分点的吞吐量,合适的池大小取决于你的 情况:
工作负载 |
建议池大小 |
原因 |
|---|---|---|
每次启动一个大内核 |
1--2 |
内核已经饱和 GPU |
许多小内核 |
4--8 |
重叠流间的启动开销 |
混合内核 + memcpy |
2--4 |
重叠计算与数据传输 |
延迟敏感的服务 |
每请求 1 |
避免队头阻塞 |
性能分析#
Nsight Systems 是可视化流占用情况的标准工具:
nsys profile --trace=cuda cargo oxide run async_mlp
nsys-ui report.nsys-rep
寻找同一流上内核间的间隙(启动开销)、池中的空闲 流(不均衡的工作分布)以及跨流的意外 串行化(缺失或多余的依赖)。
常见陷阱#
陷阱 |
现象 |
修复方法 |
|---|---|---|
在异步上下文中使用 |
阻塞 Tokio 线程,使其上的所有任务停顿 |
改用 |
|
释放后使用崩溃或结果损坏 |
在 |
流太多 |
调度开销超过重叠收益 |
分析,减少池大小 |
缺少 |
第一个操作时报 |
在程序开始时调用一次 |
参见
Async MLP Pipeline 项目章节
包含完整的 async_mlp 源代码,附带构建说明和预期输出。