组合器与组合#
单个内核启动很少是全部故事。一个真正的 GPU 工作负载可能需要 上传输入数据、乘两个矩阵、应用激活函数、并 将结果复制回主机——四个必须严格按照该顺序发生的独立操作。 本章展示了 cuda-oxide 的组合器系统如何让你 将这些片段拼接成一个管道,该管道在执行之前始终保持延迟和 流无关。
参见
DeviceOperation 模型了解组合器所构建的基础 trait 和执行方法。
问题:多步 GPU 工作#
假设你正在构建一个简单的神经网络前向传播。步骤如下:
上传输入批次从主机到设备。
GEMM——将输入乘以权重矩阵 W0。
ReLU——对结果应用激活函数。
下载输出回到主机。
每一步都依赖于前一步。在上传完成之前你不能运行 GEMM,
在 ReLU 完成之前你不能下载。在同步世界中,
你会编写四个独立的 .sync() 调用:
let input = h2d(batch_data).sync()?; // 上传
let hidden = launch_gemm(input, w0).sync()?; // GEMM
let output = launch_relu(hidden).sync()?; // ReLU
let result = d2h(output).sync()?; // 下载
这可以工作,但有一个问题。每个 .sync() 都经过调度
策略,而调度策略可能每次都分配一个不同的流。不同流之间
没有顺序保证——GEMM 可能在上传在另一个流上完成之前就开始。
即使策略恰好选择了同一个流,
每个 .sync() 都会阻塞主机线程、提交一个操作、再次阻塞、
提交下一个,如此反复。你要付出"把配方交给厨房,等待菜肴,
交出下一个配方"的往返代价,整整四次。
你真正想要的是将整个餐食计划写在一张卡片上,然后一次性 交给厨房。
and_then——"当这个完成时,做那个"#
and_then 组合器链接两个操作:运行第一个,将其输出传递
给一个闭包,闭包产生第二个操作。两者在同一流上运行,
所以 CUDA 的流内顺序保证意味着第二个操作总是看到
第一个操作的结果:
let pipeline = h2d(batch_data)
.and_then(|input| launch_gemm(input, w0))
.and_then(|hidden| launch_relu(hidden))
.and_then(|output| d2h(output));
let result: Vec<f32> = pipeline.sync()?;
整个链是一个单独的 DeviceOperation。编写它时没有 GPU 工作发生——
你只是在描述序列。当你调用 .sync() 时,调度
策略选择一个流,链在该流上从上到下运行。
一次厨房之旅,一次等待,四道菜。
数据如何流经链条#
每个 and_then 闭包接收上一阶段的 Output 作为其参数。
闭包必须返回一个新的 DeviceOperation,它成为链中的下一个链接:
h2d(batch) → DeviceBox<[f32]>
│
└─ and_then ─► launch_gemm(input, w0) → DeviceBox<[f32]>
│
└─ and_then ─► launch_relu(hidden) → DeviceBox<[f32]>
│
└─ and_then ─► d2h(output) → Vec<f32>
整个链的类型由编译器推断。你永远不需要命名 中间类型——它们通过闭包自动流转。
使用 value() 携带额外数据#
内核启动产生 ()——它们通过对设备内存的副作用工作,
而不是返回值。但下一阶段需要缓冲区句柄、模块
引用以及可能还有其他元数据。技巧是使用 value() 来打包
下一阶段需要的一切:
let pipeline = launch_gemm(input, hidden)
.and_then(move |()| {
// 内核产生了 () 但我们仍然有 `hidden` 和 `module`
// 来自外围作用域。将它们打包以便下一阶段使用。
value((hidden, module))
})
.and_then(move |(hidden, module)| {
launch_relu(hidden, module)
});
move 关键字很重要——每个闭包通过所有权捕获它需要的
数据。当闭包运行时,它消耗这些值并通过 value() 将它们
向前传递。这正是 Rust 的所有权系统设计的初衷:
确保每个缓冲区在任意时刻只被一个阶段使用,
没有悬垂引用。
and_then_with_context——当你需要流时#
有时阶段之间的闭包需要执行原始 CUDA 操作——
异步内存复制、事件记录或同步调用。这些
需要 CUstream 句柄,而在普通的 and_then
闭包中不可用。and_then_with_context 同时传递上一个结果和
ExecutionContext:
let pipeline = launch_kernel(input)
.and_then_with_context(|ctx, gpu_result| {
let stream = ctx.get_cuda_stream();
copy_result_to_staging(stream, gpu_result)
});
谨慎使用——大多数管道可以完全用 and_then 和
内部使用 with_context 的辅助函数(h2d、d2h、zeros)构建。
zip!——打包独立工作#
并非一切都是顺序的。在运行前向传播之前,你需要 分配三个缓冲区:输入、隐藏层的暂存缓冲区和 输出缓冲区。这些分配是独立的——没有一个依赖于其他。 但每个都返回你稍后需要的值。
如果你对所有三个都使用 and_then,你最终会笨拙地嵌套闭包
来携带所有结果向前:
// 不要这样做——可以工作但难以阅读
let pipeline = h2d(batch_data)
.and_then(|input| {
zeros(DIM * DIM).and_then(move |hidden| {
zeros(DIM).and_then(move |output| {
value((input, hidden, output))
})
})
});
zip! 通过将独立操作组合成一个返回其结果元组的操作
来解决这个问题:
use cuda_async::zip;
let pipeline = zip!(h2d(batch_data), zeros(DIM * DIM), zeros(DIM));
// pipeline: impl DeviceOperation<Output = (DeviceBox, DeviceBox, DeviceBox)>
let (input, hidden, output) = pipeline.sync()?;
清晰多了。zip! 接受两个或三个参数,并在同一流上按顺序
执行它们。结果被收集到一个元组中并一起返回。
小技巧
zip 这个名字来自数据组合模式——两个独立
结果打包成一个元组——而不是并行执行。所有支路在
同一流上按顺序运行。要实现跨流的真正并发执行,请参阅
并发执行。
结合 zip! 与 and_then#
真正的威力在你结合它们时显现。zip! 处理独立的
设置,and_then 处理依赖的管道:
let pipeline = zip!(h2d(batch_data), zeros(DIM * DIM), zeros(DIM))
.and_then(|(input, hidden, output)| launch_gemm(input, hidden, w0))
.and_then(|hidden| launch_relu(hidden))
.and_then(|result| d2h(result));
这读起来几乎像伪代码:"分配三个缓冲区,然后 GEMM,然后
ReLU,然后下载。"整个东西是一个 DeviceOperation,当你
.sync() 或 .await 它时在一个流上运行。
.arc()——跨管道共享结果#
在批处理场景中,你可能加载一次模型权重并将它们
在多个前向传播之间共享。每个批次管道都需要对
权重的引用,但 DeviceOperation 通过值消耗其输出。你不能
将相同的 DeviceBox 移动到四个不同的闭包中。
.arc() 将输出包装在 Arc<T> 中,使其可以廉价地克隆:
let (w0, w1) = zip!(
h2d(w0_host).arc(),
h2d(w1_host).arc()
).await?;
// w0: Arc<DeviceBox<[f32]>> -- 将其克隆到每个批次中
for batch in batches {
let w0 = w0.clone();
let w1 = w1.clone();
tokio::spawn(build_forward_pass(batch, w0, w1).into_future());
}
权重缓冲区驻留在设备上,通过 Arc 共享,每个批次管道
持有一个廉价的 Arc::clone。只要有任何批次仍在使用它们,
权重就保持存活,Rust 的引用计数自动处理清理。
unzip!——拆分配对结果#
有时你有一个产生元组的操作,但你想将每
个元素送入不同的下游链。unzip! 将一个产生元组的
操作拆分为两个共享执行的独立操作——源操作
最多运行一次:
use cuda_async::unzip;
let pair_op = zip!(allocate_a(), allocate_b());
let (op_a, op_b) = unzip!(pair_op);
let result_a = op_a.and_then(|a| process_a(a));
let result_b = op_b.and_then(|b| process_b(b));
在幕后,unzip! 创建一个共享记忆化节点。先执行的
分支触发源操作;第二个读取缓存的结果。这对于
将共享的设置步骤拆分为分叉的下游管道非常有用。
综合运用:MLP 前向传播#
使用组合器构建的 MLP 前向传播中的数据流。zip! 在顶部打包
三个独立分配。四个 and_then 阶段按顺序链接:
GEMM、MatVec、ReLU 和 D2H。类型通过 value() 元组在阶段之间流动。
整个链是一个 DeviceOperation——在 .sync() 或 .await 之前不会执行任何操作。#
这里是一个简化的 async_mlp 示例版本,展示了
每个组合器协同工作。此函数返回一个描述单个批次
完整前向传播的 DeviceOperation:
fn forward_pass(
batch: Vec<f32>,
w0: Arc<DeviceBox<[f32]>>,
w1: Arc<DeviceBox<[f32]>>,
module: Arc<CudaModule>,
) -> impl DeviceOperation<Output = Vec<f32>> {
// 设置:分配输入 + 暂存缓冲区(独立 → 用 zip 打包)
zip!(h2d(batch), zeros(DIM * DIM), zeros(DIM))
// 阶段 1:GEMM — hidden = input × W0
.and_then(move |(input, hidden, output)| {
let func = module.load_function("sgemm_naive").unwrap();
let mut launch = AsyncKernelLaunch::new(Arc::new(func));
launch
.push_args((DIM as u32, DIM as u32, DIM as u32,
1.0f32,
input.cu_deviceptr(), input.len() as u64,
w0.cu_deviceptr(), w0.len() as u64,
0.0f32,
hidden.cu_deviceptr(), hidden.len() as u64))
.set_launch_config(gemm_cfg);
// 内核返回 ()。携带我们仍然需要的缓冲区向前。
launch.and_then(move |()| value((hidden, output, w1, module)))
})
// 阶段 2:MatVec — output = hidden × W1
.and_then(move |(hidden, output, w1, module)| {
let func = module.load_function("matvec_naive").unwrap();
let mut launch = AsyncKernelLaunch::new(Arc::new(func));
launch
.push_args((DIM as u32, DIM as u32,
hidden.cu_deviceptr(), hidden.len() as u64,
w1.cu_deviceptr(), w1.len() as u64,
output.cu_deviceptr(), output.len() as u64))
.set_launch_config(matvec_cfg);
launch.and_then(move |()| value((output, module)))
})
// 阶段 3:ReLU — result = max(0, output)
.and_then(move |(output, module)| {
let func = module.load_function("relu").unwrap();
let mut launch = AsyncKernelLaunch::new(Arc::new(func));
launch
.push_args((output.cu_deviceptr(), output.len() as u64,
output.cu_deviceptr(), output.len() as u64))
.set_launch_config(relu_cfg);
launch.and_then(move |()| value(output))
})
// 阶段 4:将结果下载到主机
.and_then(d2h)
}
研究数据如何流经此管道:
zip!产生三个DeviceBox句柄。第一个
and_then消耗所有三个,启动 GEMM,并通过value()将其仍然需要的 缓冲区打包成元组。第二个
and_then解包元组,启动 MatVec,并仅将下一阶段 需要的内容向前传递。第三个
and_then原地启动 ReLU。最后的
and_then调用d2h将结果复制到主机。
整个函数返回 impl DeviceOperation<Output = Vec<f32>>。尚未
执行任何操作。你可以 .sync() 它、.await 它,或将其交给 tokio::spawn
并让调度策略决定使用哪个流。
快速参考#
组合器 |
功能 |
使用场景 |
|---|---|---|
|
运行 self,然后在同一流上运行 |
顺序依赖阶段 |
|
类似 |
闭包需要原始流时 |
|
|
在上下文中哪个读起来更好 |
|
运行两者,返回 |
独立设置步骤 |
|
运行全部三个,返回 |
同上,三个操作数 |
|
将输出包装在 |
跨管道共享结果 |
|
将产生元组的操作拆分为两个 |
分叉的下游链 |
|
将 |
将主机数据送入管道 |
|
将构造推迟到流已知时 |
包装原始 CUDA 驱动调用 |