项目:异步 MLP 流水线#
你已经学会了如何定义 kernel、传输数据、使用组合子(combinator)编排操作,以及跨 stream 调度任务。现在是时候将它们整合起来了。
本章将带你走读 async_mlp——cuda-oxide 仓库中最完整的示例——从第一个 use 语句到 stdout 上的 [ReLU OK]。到本章结束时,你将构建一个多 kernel 的 GPU 流水线,利用工具包中的所有异步模式,并发处理四个 batch。如果你正在按顺序阅读各章,可以把本章看作最终大 boss。
参见
并发执行一章深入介绍了并发流水线背后的概念。本章则聚焦于代码——一个完整、可构建、可运行、可修改、可 profiling 的项目。
我们要构建什么#
一个在 GPU 上运行的玩具级多层感知器(MLP)前向传播:
input [64×64] ──► GEMM(input, W0) ──► hidden [64×64]
│
MatVec(hidden, W1) ──► output [64]
│
ReLU(output) ──► result [64]
三个 kernel、四个阶段(包括最终的 device-to-host 拷贝)、四个 batch、四条 CUDA stream、一组共享的权重,以及零个空闲等待的 host 线程。总体架构如下:
四个 MLP 前向传播并发运行。共享权重作为 Arc
顺序方法一次处理一个 batch——四倍的工作量,四倍的等待时间。并发方法将它们重叠起来:如果 GPU 有空闲的 SM,你大约可以在单个 batch 的时间加上一个很小的错开时间内完成。对于几行 Arc::clone() 调用来说,这个回报相当不错。
项目结构#
该示例位于 crates/rustc-codegen-cuda/examples/async_mlp/ 目录下,是一个独立的 Cargo workspace 成员:
async_mlp/
├── Cargo.toml ← 依赖:cuda-device, cuda-core, cuda-async, tokio
└── src/
└── main.rs ← kernel + host 代码,单文件
一切——device kernel 和 host 编排——都放在一个文件中。#[kernel] 属性告诉编译器哪些函数会变成 PTX;其余部分按普通 Rust 编译。没有单独的 .cu 文件,不需要 header 体操,也没有构建系统的分裂人格。
依赖#
Cargo.toml 精确引入了我们需要的 crate:
[dependencies]
cuda-device = { path = "../../../cuda-device" } # #[kernel], DisjointSlice, thread::*
cuda-core = { path = "../../../cuda-core" } # CudaModule, LaunchConfig
cuda-async = { path = "../../../cuda-async" } # DeviceOperation, zip!, and_then, spawn
tokio = { version = "1", features = ["rt", "macros"] }
cuda-device 提供了 device 端 API(内联函数、安全的可变切片)。
cuda-core 处理模块加载和 launch 配置。
cuda-async 提供了 DeviceOperation 图、组合子以及 stream 池调度器。
tokio 是 host 端的异步运行时,负责轮询 future。
GPU kernel#
以下三个函数由 rustc-codegen-cuda 编译为 PTX。它们永远不会在 host 上执行——#[kernel] 属性将每个函数重命名为 cuda_oxide_kernel_<hash>_<name>,以便 codegen 后端能够识别并提取它们。
sgemm_naive —— 矩阵乘法#
use cuda_device::{kernel, thread, DisjointSlice};
use cuda_device::thread::Runtime2DIndex;
#[kernel]
pub fn sgemm_naive(
m: u32, n: u32, k: u32,
alpha: f32, a: &[f32], b: &[f32],
beta: f32, mut c: DisjointSlice<f32, Runtime2DIndex>,
) {
let n_sz = n as usize;
let row = thread::index_2d_row();
let col = thread::index_2d_col();
// SAFETY: every thread sees the same `n_sz` (same kernel arg).
if let Some(c_idx) = unsafe { thread::index_2d_runtime(n_sz) } {
// col < n_sz guaranteed by `Some` -- no manual check needed
if row < m as usize {
let k_sz = k as usize;
let mut sum = 0.0f32;
let mut i = 0usize;
while i < k_sz {
sum = sum + a[row * k_sz + i] * b[i * n_sz + col];
i = i + 1;
}
if let Some(c_elem) = c.get_mut(c_idx) {
*c_elem = alpha * sum + beta * (*c_elem);
}
}
}
}
每个线程计算输出矩阵的一个元素。2D 线程索引直接映射到 (row, col) 位置。DisjointSlice<f32> 是安全的可变视图——它在编译时保证每个线程写入不同的元素,因此没有数据竞争,也不需要 unsafe。
小技巧
这里有意使用朴素(naive) 的 GEMM——一个线程,一个元素,没有 shared memory 分块(tiling),没有合并访问(coalescing)技巧。生产级的 GEMM 会使用共享内存章节中介绍的技术。但对于演示异步组合来说,正确性比性能更重要。
matvec_naive —— 矩阵-向量乘法#
#[kernel]
pub fn matvec_naive(
_m: u32, n: u32,
mat: &[f32], vec_in: &[f32],
mut vec_out: DisjointSlice<f32>,
) {
let row = thread::index_1d();
if let Some(out_elem) = vec_out.get_mut(row) {
let n_sz = n as usize;
let mut sum = 0.0f32;
let mut j = 0usize;
while j < n_sz {
sum = sum + mat[row.get() * n_sz + j] * vec_in[j];
j = j + 1;
}
*out_elem = sum;
}
}
每个输出元素一个线程,一维索引。_m 参数虽然未使用,但保持了与 BLAS 风格接口一致的调用约定。
relu —— 激活函数#
#[kernel]
pub fn relu(input: &[f32], mut output: DisjointSlice<f32>) {
let idx = thread::index_1d();
if let Some(out_elem) = output.get_mut(idx) {
let val = input[idx.get()];
*out_elem = if val > 0.0f32 { val } else { 0.0f32 };
}
}
逐元素 max(0, x)。在流水线中,input 和 output 指向同一个缓冲区——这是一种完全合法的原地(in-place)模式,因为每个线程读取和写入的是相同的索引。
值得留意的模式#
以下是三个 kernel 中反复出现的几种模式:
模式 |
作用 |
|---|---|
|
从 block/grid 维度计算全局线程索引 |
|
安全的可变输出——编译器保证没有别名(aliasing) |
|
边界检查,使超出数据大小的线程静默退出 |
使用 |
风格选择——带范围的 |
连接 host 与 device#
三个辅助函数将原始的 CUDA 驱动调用包装成 DeviceOperation 值。它们使用 with_context 在执行时才接收调度器的 stream——而不是在调用点。这是关键洞察:你现在构建配方,调度器稍后选择 stream。
h2d —— host 到 device#
fn h2d(host_data: Vec<f32>) -> impl DeviceOperation<Output = DeviceBox<[f32]>> {
device_operation::with_context(move |ctx| {
let stream = ctx.get_cuda_stream();
let n = host_data.len();
let num_bytes = n * mem::size_of::<f32>();
unsafe {
let dptr = malloc_async(stream.cu_stream(), num_bytes).unwrap();
memcpy_htod_async(dptr, host_data.as_ptr(), num_bytes, stream.cu_stream())
.unwrap();
value(DeviceBox::from_raw_parts(dptr, n, ctx.get_device_id()))
}
})
}
host_data 向量被 move 捕获。闭包在调度器执行操作时运行——此时它已经拥有了一条真正的 CUDA stream。malloc_async 和 memcpy_htod_async 是 stream 有序的,因此分配和拷贝在调度器选择的 stream 上序列化执行。
zeros —— 零初始化的 device 缓冲区#
fn zeros(n: usize) -> impl DeviceOperation<Output = DeviceBox<[f32]>> {
device_operation::with_context(move |ctx| {
let stream = ctx.get_cuda_stream();
let num_bytes = n * mem::size_of::<f32>();
unsafe {
let dptr = malloc_async(stream.cu_stream(), num_bytes).unwrap();
memset_d8_async(dptr, 0, num_bytes, stream.cu_stream()).unwrap();
value(DeviceBox::from_raw_parts(dptr, n, ctx.get_device_id()))
}
})
}
与 h2d 模式相同,但使用 memset_d8_async 而非 memcpy。GEMM kernel 使用 beta = 0.0,所以初始内容无关紧要,但置零是一种防御性做法。
d2h —— device 到 host#
fn d2h(dev: DeviceBox<[f32]>) -> impl DeviceOperation<Output = Vec<f32>> {
device_operation::with_context(move |ctx| {
let stream = ctx.get_cuda_stream();
let n = dev.len();
let num_bytes = n * mem::size_of::<f32>();
let mut host = vec![0.0f32; n];
unsafe {
memcpy_dtoh_async(
host.as_mut_ptr(), dev.cu_deviceptr(),
num_bytes, stream.cu_stream(),
).unwrap();
}
let _ = &dev;
value(host)
})
}
let _ = &dev; 这一行看起来像是空操作,但它确保 dev 在闭包中存活到 stream 同步完成。如果没有它,dev 将在 memcpy_dtoh_async 调用之后、stream 完成拷贝之前被丢弃——这是 device 端经典的 use-after-free 问题。
组合流水线#
每个 batch 是一个由组合子构建的 DeviceOperation。当你在组装它时,不会发生任何 GPU 工作——它是对未来工作的惰性描述。以下是各个部分如何组合在一起的:
单个 batch 的组合子流水线。zip! 并行分配三个缓冲区。and_then 链依次串联 GEMM → MatVec → ReLU → D2H,通过 value() 在每个阶段之间传递 device 句柄。#
步骤 1:初始化运行时#
const DIM: usize = 64;
const BLOCK: u32 = 16;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
init_device_contexts(0, 1)?;
let module = kernels::load_async(0)?;
init_device_contexts(0, 1) 将 device 0 设为默认,并初始化 device context map(容量为 1)。轮询 stream 池在首次使用时惰性创建。由我们的 #[kernel] 函数编译而成的嵌入式模块被加载一次,并通过类型化模块句柄共享。
步骤 2:上传共享权重#
let w0_host: Vec<f32> = (0..DIM * DIM)
.map(|i| ((i % 7) as f32 - 3.0) * 0.01)
.collect();
let w1_host: Vec<f32> = (0..DIM)
.map(|i| ((i % 5) as f32 - 2.0) * 0.01)
.collect();
let (w0, w1): (Arc<DeviceBox<[f32]>>, Arc<DeviceBox<[f32]>>) =
zip!(h2d(w0_host).arc(), h2d(w1_host).arc()).await?;
两个独立的上传操作,通过 zip! 捆绑在一起,以便它们共享一条 stream。.arc() 将每个结果包装在 Arc 中——一个引用计数的指针,clone 的成本大约是一纳秒。四个 batch,四次 clone,零次 device 拷贝。
步骤 3:构建每个 batch 的流水线#
这是魔法发生的地方。对于每个 batch,我们构建一个四阶段的链:
let pipeline = zip!(h2d(batch_data), zeros(DIM * DIM), zeros(DIM))
.and_then(move |(input, hidden, output)| {
// Stage 1: GEMM — hidden = input @ W0
let func = module.load_function("sgemm_naive").unwrap();
let mut launch = AsyncKernelLaunch::new(Arc::new(func));
launch.push_args((
DIM as u32, DIM as u32, DIM as u32,
1.0f32,
input.cu_deviceptr(), input.len() as u64,
w0.cu_deviceptr(), w0.len() as u64,
0.0f32,
hidden.cu_deviceptr(), hidden.len() as u64,
)).set_launch_config(gemm_cfg);
launch.and_then(move |()| value((hidden, output, w1, module)))
})
.and_then(move |(hidden, output, w1, module)| {
// Stage 2: MatVec — output = hidden @ W1
let func = module.load_function("matvec_naive").unwrap();
let mut launch = AsyncKernelLaunch::new(Arc::new(func));
launch.push_args((
DIM as u32, DIM as u32,
hidden.cu_deviceptr(), hidden.len() as u64,
w1.cu_deviceptr(), w1.len() as u64,
output.cu_deviceptr(), output.len() as u64,
)).set_launch_config(matvec_cfg);
launch.and_then(move |()| value((output, module)))
})
.and_then(move |(output, module)| {
// Stage 3: ReLU — result = max(0, output)
let func = module.load_function("relu").unwrap();
let relu_out: DeviceBox<[f32]> = output;
let mut launch = AsyncKernelLaunch::new(Arc::new(func));
launch.push_args((
relu_out.cu_deviceptr(), relu_out.len() as u64,
relu_out.cu_deviceptr(), relu_out.len() as u64,
)).set_launch_config(relu_cfg);
launch.and_then(move |()| value(relu_out))
})
.and_then(d2h);
有几处值得放慢速度仔细看:
value() 接力棒。 每个 and_then 闭包消费前一阶段的输出并返回一个 DeviceOperation。Kernel 启动返回 (),因此你需要 value() 来将下一个阶段所需的 device 句柄传递下去。可以把它看作接力棒——kernel 运行,接力棒传递。
类型标注。 zip! + and_then 链产生的深度嵌套泛型超出了 Rust 的类型推断能力。你需要在闭包参数上添加显式标注:
.and_then(move |(hidden, output, w1, module): (
DeviceBox<[f32]>,
DeviceBox<[f32]>,
Arc<DeviceBox<[f32]>>,
Arc<CudaModule>,
)| { ... })
这是唯一一个体验上的粗糙边缘。此外,Zippable trait 导入也是 zip! 正常工作的必要条件。
原地 ReLU。 阶段 3 将 relu_out 同时作为 input 和 output 传递给 kernel。由于每个线程读取 input[idx] 并写入相同的 output[idx],这是安全的——没有线程读取另一个线程的写入。
步骤 4:Spawn 并收集#
handles.push(tokio::spawn(pipeline.into_future()));
.into_future() 将惰性的 DeviceOperation 转换为 DeviceFuture。此时调度策略选择一条 stream(batch 0 → stream 0,batch 1 → stream 1,轮询)。tokio::spawn 将 future 交给 Tokio 运行时。
在第一次 poll 时,流水线的 execute() 将所有 GPU 工作提交到分配的 stream 上,并注册一个 cuLaunchHostFunc 回调。future 返回 Poll::Pending。当 GPU 完成时,回调唤醒任务。没有 host 线程自旋等待。
for (i, handle) in handles.into_iter().enumerate() {
let result: Vec<f32> = handle.await.expect("Tokio task panicked")?;
let all_non_negative = result.iter().all(|&v| v >= 0.0);
println!("Batch {}: {} elements, first 8 = {:?}{}",
i, result.len(), &result[..8.min(result.len())],
if all_non_negative { " [ReLU OK]" } else { " [ReLU FAILED]" }
);
}
ReLU 完整性检查并非深度学习验证——它只是确认激活函数已经运行。每个输出都应该是非负的。如果你看到 [ReLU FAILED],说明上游出了严重问题。
构建、运行、验证#
cargo oxide run async_mlp
预期输出:
=== Async MLP Pipeline ===
Allocating model weights...
W0: 64x64 on device (Arc refcount=1)
W1: 64 on device (Arc refcount=1)
Launched 4 batches concurrently, awaiting results...
Batch 0: 64 elements, first 8 = [0.0020799995, 0.0, ...] [ReLU OK]
Batch 1: 64 elements, first 8 = [0.0, 0.0, ...] [ReLU OK]
Batch 2: 64 elements, first 8 = [0.0, 0.00108, ...] [ReLU OK]
Batch 3: 64 elements, first 8 = [0.00244, 0.0025, ...] [ReLU OK]
SUCCESS: All batches completed.
Arc 引用计数从 1 开始(w0 和 w1 各一个引用)。在 batch 处理期间,它们暂时上升到 5(原始引用 + 四个 clone),并在 batch 完成时回落。如果添加更多 batch,引用计数也会相应扩展——无需拷贝,无需重新分配。
使用 Nsight Systems 进行 profiling#
要查看 stream 重叠的实际效果:
nsys profile --trace=cuda cargo oxide run async_mlp
nsys-ui report.nsys-rep
在时间线视图中,寻找四行并行的 kernel 块——每条 stream 一行。如果 kernel 在一条 stream 上被串行化,说明轮询调度器没有被使用(可能 init_device_contexts 没有被调用,或者只配置了一条 stream)。
什么使这个示例“真实”#
这仍然是一个玩具——64×64 的矩阵、伪造的权重、三个 kernel。但它的结构与生产级 GPU 流水线相同:
生产级模式 |
async_mlp 等价物 |
|---|---|
模型权重加载一次,跨请求共享 |
|
每个请求的推理流水线 |
|
并发请求处理 |
|
基于 stream 的 GPU 调度 |
轮询 |
非阻塞 host |
对 join handle 进行 |
将矩阵从 64 扩展到 4096,将朴素 kernel 替换为分块的 shared memory 版本(参见共享内存),添加更多层,你就拥有了一台真实推理服务器的骨架。
扩展该示例#
以下是一些继续深入的想法:
添加 softmax 层。 编写一个 #[kernel] 来计算跨 64 元素输出向量的数值稳定的 softmax。在 ReLU 之后用另一个 .and_then 链接它。
在更大维度上 profiling。 将 DIM 改为 512 或 1024。观察 GEMM 如何主导时间线。然后用 SharedArray 的分块版本替换 sgemm_naive,观察加速效果。
添加错误恢复。 将 kernel 启动闭包中的 .unwrap() 替换为正确的 Result 传播。使用 并发执行 一章中的三分支 match 模式,独立处理 GPU 错误和任务 panic。
多 device。 传入 init_device_contexts(0, 2) 来管理两个 GPU。构建一个 batch 分配器,将偶数 batch 路由到 GPU 0,奇数 batch 路由到 GPU 1。
参见
DeviceOperation 模型 —— 惰性 GPU 图的工作原理
组合子与组合 ——
and_then、zip!、value()、.arc()详解调度与 Stream —— 轮询、stream 池、调度策略
并发执行 —— 本章所有内容背后的理论基础