组合器与组合#

单个内核启动很少是全部故事。一个真正的 GPU 工作负载可能需要 上传输入数据、乘两个矩阵、应用激活函数、并 将结果复制回主机——四个必须严格按照该顺序发生的独立操作。 本章展示了 cuda-oxide 的组合器系统如何让你 将这些片段拼接成一个管道,该管道在执行之前始终保持延迟和 流无关。

参见

DeviceOperation 模型了解组合器所构建的基础 trait 和执行方法。

问题:多步 GPU 工作#

假设你正在构建一个简单的神经网络前向传播。步骤如下:

  1. 上传输入批次从主机到设备。

  2. GEMM——将输入乘以权重矩阵 W0。

  3. ReLU——对结果应用激活函数。

  4. 下载输出回到主机。

每一步都依赖于前一步。在上传完成之前你不能运行 GEMM, 在 ReLU 完成之前你不能下载。在同步世界中, 你会编写四个独立的 .sync() 调用:

let input  = h2d(batch_data).sync()?;       // 上传
let hidden = launch_gemm(input, w0).sync()?; // GEMM
let output = launch_relu(hidden).sync()?;    // ReLU
let result = d2h(output).sync()?;            // 下载

这可以工作,但有一个问题。每个 .sync() 都经过调度 策略,而调度策略可能每次都分配一个不同的流。不同流之间 没有顺序保证——GEMM 可能在上传在另一个流上完成之前就开始。 即使策略恰好选择了同一个流, 每个 .sync() 都会阻塞主机线程、提交一个操作、再次阻塞、 提交下一个,如此反复。你要付出"把配方交给厨房,等待菜肴, 交出下一个配方"的往返代价,整整四次。

你真正想要的是将整个餐食计划写在一张卡片上,然后一次性 交给厨房。

and_then——"当这个完成时,做那个"#

and_then 组合器链接两个操作:运行第一个,将其输出传递 给一个闭包,闭包产生第二个操作。两者在同一流上运行, 所以 CUDA 的流内顺序保证意味着第二个操作总是看到 第一个操作的结果:

let pipeline = h2d(batch_data)
    .and_then(|input| launch_gemm(input, w0))
    .and_then(|hidden| launch_relu(hidden))
    .and_then(|output| d2h(output));

let result: Vec<f32> = pipeline.sync()?;

整个链是一个单独的 DeviceOperation。编写它时没有 GPU 工作发生—— 你只是在描述序列。当你调用 .sync() 时,调度 策略选择一个流,链在该流上从上到下运行。 一次厨房之旅,一次等待,四道菜。

数据如何流经链条#

每个 and_then 闭包接收上一阶段的 Output 作为其参数。 闭包必须返回一个新的 DeviceOperation,它成为链中的下一个链接:

h2d(batch)         → DeviceBox<[f32]>
    │
    └─ and_then ─► launch_gemm(input, w0)   → DeviceBox<[f32]>
                        │
                        └─ and_then ─► launch_relu(hidden)   → DeviceBox<[f32]>
                                            │
                                            └─ and_then ─► d2h(output)   → Vec<f32>

整个链的类型由编译器推断。你永远不需要命名 中间类型——它们通过闭包自动流转。

使用 value() 携带额外数据#

内核启动产生 ()——它们通过对设备内存的副作用工作, 而不是返回值。但下一阶段需要缓冲区句柄、模块 引用以及可能还有其他元数据。技巧是使用 value() 来打包 下一阶段需要的一切:

let pipeline = launch_gemm(input, hidden)
    .and_then(move |()| {
        // 内核产生了 () 但我们仍然有 `hidden` 和 `module`
        // 来自外围作用域。将它们打包以便下一阶段使用。
        value((hidden, module))
    })
    .and_then(move |(hidden, module)| {
        launch_relu(hidden, module)
    });

move 关键字很重要——每个闭包通过所有权捕获它需要的 数据。当闭包运行时,它消耗这些值并通过 value() 将它们 向前传递。这正是 Rust 的所有权系统设计的初衷: 确保每个缓冲区在任意时刻只被一个阶段使用, 没有悬垂引用。

and_then_with_context——当你需要流时#

有时阶段之间的闭包需要执行原始 CUDA 操作—— 异步内存复制、事件记录或同步调用。这些 需要 CUstream 句柄,而在普通的 and_then 闭包中不可用。and_then_with_context 同时传递上一个结果和 ExecutionContext

let pipeline = launch_kernel(input)
    .and_then_with_context(|ctx, gpu_result| {
        let stream = ctx.get_cuda_stream();
        copy_result_to_staging(stream, gpu_result)
    });

谨慎使用——大多数管道可以完全用 and_then 和 内部使用 with_context 的辅助函数(h2dd2hzeros)构建。

zip!——打包独立工作#

并非一切都是顺序的。在运行前向传播之前,你需要 分配三个缓冲区:输入、隐藏层的暂存缓冲区和 输出缓冲区。这些分配是独立的——没有一个依赖于其他。 但每个都返回你稍后需要的值。

如果你对所有三个都使用 and_then,你最终会笨拙地嵌套闭包 来携带所有结果向前:

// 不要这样做——可以工作但难以阅读
let pipeline = h2d(batch_data)
    .and_then(|input| {
        zeros(DIM * DIM).and_then(move |hidden| {
            zeros(DIM).and_then(move |output| {
                value((input, hidden, output))
            })
        })
    });

zip! 通过将独立操作组合成一个返回其结果元组的操作 来解决这个问题:

use cuda_async::zip;

let pipeline = zip!(h2d(batch_data), zeros(DIM * DIM), zeros(DIM));
// pipeline: impl DeviceOperation<Output = (DeviceBox, DeviceBox, DeviceBox)>

let (input, hidden, output) = pipeline.sync()?;

清晰多了。zip! 接受两个或三个参数,并在同一流上按顺序 执行它们。结果被收集到一个元组中并一起返回。

小技巧

zip 这个名字来自数据组合模式——两个独立 结果打包成一个元组——而不是并行执行。所有支路在 同一流上按顺序运行。要实现跨流的真正并发执行,请参阅 并发执行

结合 zip!and_then#

真正的威力在你结合它们时显现。zip! 处理独立的 设置,and_then 处理依赖的管道:

let pipeline = zip!(h2d(batch_data), zeros(DIM * DIM), zeros(DIM))
    .and_then(|(input, hidden, output)| launch_gemm(input, hidden, w0))
    .and_then(|hidden| launch_relu(hidden))
    .and_then(|result| d2h(result));

这读起来几乎像伪代码:"分配三个缓冲区,然后 GEMM,然后 ReLU,然后下载。"整个东西是一个 DeviceOperation,当你 .sync().await 它时在一个流上运行。

.arc()——跨管道共享结果#

在批处理场景中,你可能加载一次模型权重并将它们 在多个前向传播之间共享。每个批次管道都需要对 权重的引用,但 DeviceOperation 通过值消耗其输出。你不能 将相同的 DeviceBox 移动到四个不同的闭包中。

.arc() 将输出包装在 Arc<T> 中,使其可以廉价地克隆:

let (w0, w1) = zip!(
    h2d(w0_host).arc(),
    h2d(w1_host).arc()
).await?;

// w0: Arc<DeviceBox<[f32]>>  -- 将其克隆到每个批次中
for batch in batches {
    let w0 = w0.clone();
    let w1 = w1.clone();
    tokio::spawn(build_forward_pass(batch, w0, w1).into_future());
}

权重缓冲区驻留在设备上,通过 Arc 共享,每个批次管道 持有一个廉价的 Arc::clone。只要有任何批次仍在使用它们, 权重就保持存活,Rust 的引用计数自动处理清理。

unzip!——拆分配对结果#

有时你有一个产生元组的操作,但你想将每 个元素送入不同的下游链。unzip! 将一个产生元组的 操作拆分为两个共享执行的独立操作——源操作 最多运行一次:

use cuda_async::unzip;

let pair_op = zip!(allocate_a(), allocate_b());
let (op_a, op_b) = unzip!(pair_op);

let result_a = op_a.and_then(|a| process_a(a));
let result_b = op_b.and_then(|b| process_b(b));

在幕后,unzip! 创建一个共享记忆化节点。先执行的 分支触发源操作;第二个读取缓存的结果。这对于 将共享的设置步骤拆分为分叉的下游管道非常有用。

综合运用:MLP 前向传播#

async-programming/images/combinator-dataflow.svg

使用组合器构建的 MLP 前向传播中的数据流。zip! 在顶部打包 三个独立分配。四个 and_then 阶段按顺序链接: GEMM、MatVec、ReLU 和 D2H。类型通过 value() 元组在阶段之间流动。 整个链是一个 DeviceOperation——在 .sync().await 之前不会执行任何操作。#

这里是一个简化的 async_mlp 示例版本,展示了 每个组合器协同工作。此函数返回一个描述单个批次 完整前向传播的 DeviceOperation

fn forward_pass(
    batch: Vec<f32>,
    w0: Arc<DeviceBox<[f32]>>,
    w1: Arc<DeviceBox<[f32]>>,
    module: Arc<CudaModule>,
) -> impl DeviceOperation<Output = Vec<f32>> {
    // 设置:分配输入 + 暂存缓冲区(独立 → 用 zip 打包)
    zip!(h2d(batch), zeros(DIM * DIM), zeros(DIM))
        // 阶段 1:GEMM — hidden = input × W0
        .and_then(move |(input, hidden, output)| {
            let func = module.load_function("sgemm_naive").unwrap();
            let mut launch = AsyncKernelLaunch::new(Arc::new(func));
            launch
                .push_args((DIM as u32, DIM as u32, DIM as u32,
                            1.0f32,
                            input.cu_deviceptr(), input.len() as u64,
                            w0.cu_deviceptr(), w0.len() as u64,
                            0.0f32,
                            hidden.cu_deviceptr(), hidden.len() as u64))
                .set_launch_config(gemm_cfg);
            // 内核返回 ()。携带我们仍然需要的缓冲区向前。
            launch.and_then(move |()| value((hidden, output, w1, module)))
        })
        // 阶段 2:MatVec — output = hidden × W1
        .and_then(move |(hidden, output, w1, module)| {
            let func = module.load_function("matvec_naive").unwrap();
            let mut launch = AsyncKernelLaunch::new(Arc::new(func));
            launch
                .push_args((DIM as u32, DIM as u32,
                            hidden.cu_deviceptr(), hidden.len() as u64,
                            w1.cu_deviceptr(), w1.len() as u64,
                            output.cu_deviceptr(), output.len() as u64))
                .set_launch_config(matvec_cfg);
            launch.and_then(move |()| value((output, module)))
        })
        // 阶段 3:ReLU — result = max(0, output)
        .and_then(move |(output, module)| {
            let func = module.load_function("relu").unwrap();
            let mut launch = AsyncKernelLaunch::new(Arc::new(func));
            launch
                .push_args((output.cu_deviceptr(), output.len() as u64,
                            output.cu_deviceptr(), output.len() as u64))
                .set_launch_config(relu_cfg);
            launch.and_then(move |()| value(output))
        })
        // 阶段 4:将结果下载到主机
        .and_then(d2h)
}

研究数据如何流经此管道:

  1. zip! 产生三个 DeviceBox 句柄。

  2. 第一个 and_then 消耗所有三个,启动 GEMM,并通过 value() 将其仍然需要的 缓冲区打包成元组。

  3. 第二个 and_then 解包元组,启动 MatVec,并仅将下一阶段 需要的内容向前传递。

  4. 第三个 and_then 原地启动 ReLU。

  5. 最后的 and_then 调用 d2h 将结果复制到主机。

整个函数返回 impl DeviceOperation<Output = Vec<f32>>。尚未 执行任何操作。你可以 .sync() 它、.await 它,或将其交给 tokio::spawn 并让调度策略决定使用哪个流。

快速参考#

组合器

功能

使用场景

.and_then(f)

运行 self,然后在同一流上运行 f(result)

顺序依赖阶段

.and_then_with_context(f)

类似 and_then,但 f 也获取 ExecutionContext

闭包需要原始流时

.apply(f)

and_then 的别名

在上下文中哪个读起来更好

zip!(a, b)

运行两者,返回 (A, B)

独立设置步骤

zip!(a, b, c)

运行全部三个,返回 (A, B, C)

同上,三个操作数

.arc()

将输出包装在 Arc<T>

跨管道共享结果

unzip!(op)

将产生元组的操作拆分为两个

分叉的下游链

value(x)

x 包装为空操作

将主机数据送入管道

with_context(f)

将构造推迟到流已知时

包装原始 CUDA 驱动调用

参见

并发执行一章展示了如何使用 tokio::spawn 并发运行多个管道, 调度与流一章解释了调度策略如何 将管道分发到 CUDA 流上。