并发执行#

你知道了如何使用 DeviceOperation 描述 GPU 工作,使用 and_thenzip! 组合多阶段管道,并让调度策略分配流。 现在到了回报部分:同时运行多个管道。 本章从头到尾讲解 async_mlp 示例,展示 Tokio 任务、轮询调度器和 Rust 的所有权模型如何 协同工作,在一个 GPU 上并发处理四个批次。

参见

CUDA Programming Guide -- Multi-Device System 了解 CUDA 在多 GPU 上下文、对等访问和跨设备内存方面的规则。

场景#

你正在运行一个三层 MLP 前向传播:GEMM(矩阵乘法)、 MatVec(矩阵-向量乘积)和 ReLU(激活函数)。你有两个 权重矩阵已加载到 GPU 上,并需要处理四个批次的输入数据。 每个批次是独立的——批次 0 不依赖于批次 1——但它们都 共享相同的权重。

async-programming/images/concurrent-batches.svg

四个 MLP 前向传播并发运行。顶部:共享权重一次性上传 为 Arc<DeviceBox>,廉价地克隆到每个批次中。中间:GPU 时间线——轮询将批次分布到四个流上,交错的管道重叠。 底部:顺序处理花费约一个批次的 4 倍时间;如果 GPU 有空闲 SM, 并发处理花费约 1.3 倍时间。#

顺序方法逐批次处理:

Batch 0:  ████ GEMM ████ MatVec ██ ReLU █ D2H █
                                                  Batch 1:  ████ GEMM ████ ...

并发方法在不同流上重叠它们:

Stream 0:  ████ GEMM ████ MatVec ██ ReLU █ D2H █
Stream 1:    ████ GEMM ████ MatVec ██ ReLU █ D2H █
Stream 2:      ████ GEMM ████ MatVec ██ ReLU █ D2H █
Stream 3:        ████ GEMM ████ MatVec ██ ReLU █ D2H █

如果 GPU 有足够的 SM 来同时运行多个内核, 重叠版本显著更快完成。而且因为每个管道是在一个流上的 单个 and_then 链,批次内的阶段仍然是严格排序的——不需要 跨流同步。

步骤 1:初始化运行时#

一切从 init_device_contexts 开始。这创建 CUDA 上下文、 设置调度策略(默认四流轮询),并使线程局部状态 对 .sync().await 可用:

use cuda_async::device_context::init_device_contexts;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    init_device_contexts(0, 1)?;
    let module = kernels::load_async(0)?;

嵌入式模块通过线程局部异步上下文加载。类型化的 模块句柄可以廉价地克隆到每个批次管道中。

步骤 2:上传共享权重#

模型有两个权重矩阵:W0(DIM x DIM)和 W1(DIM)。两者都需要 在所有四个前向传播期间驻留在设备上。这就是 zip!.arc() 发挥作用的地方:

    let w0_host: Vec<f32> = (0..DIM * DIM)
        .map(|i| ((i % 7) as f32 - 3.0) * 0.01)
        .collect();
    let w1_host: Vec<f32> = (0..DIM)
        .map(|i| ((i % 5) as f32 - 2.0) * 0.01)
        .collect();

    let (w0, w1): (Arc<DeviceBox<[f32]>>, Arc<DeviceBox<[f32]>>) = zip!(
        h2d(w0_host).arc(),
        h2d(w1_host).arc()
    ).await?;

zip! 将两个独立的 H2D 传输打包成一个操作。.arc() 将 每个结果包装在 Arc 中以便权重可以共享。.await 在 池中的一个流上调度组合操作并等待其完成。

在这行之后,w0w1Arc<DeviceBox<[f32]>>——廉价克隆、 安全共享,并固定在设备上。

步骤 3:构建并调度批次管道#

现在是最有趣的部分。对于每个批次,你构建一个延迟管道(尚未产生 GPU 工作)并将其交给 tokio::spawn

    let num_batches = 4;
    let mut handles = vec![];

    for batch_idx in 0..num_batches {
        let w0 = w0.clone();       // Arc 克隆:约 1 ns
        let w1 = w1.clone();
        let module = module.clone();

        let batch_data: Vec<f32> = (0..DIM * DIM)
            .map(|i| ((i + batch_idx * 37) % 13) as f32 * 0.1)
            .collect();

        let pipeline = zip!(h2d(batch_data), zeros(DIM * DIM), zeros(DIM))
            .and_then(move |(input, hidden, output)| {
                // 阶段 1:GEMM — hidden = input × W0
                // ... 构建 AsyncKernelLaunch,推送参数,用 and_then 链接 ...
            })
            .and_then(move |(hidden, output, w1, module)| {
                // 阶段 2:MatVec — output = hidden × W1
                // ...
            })
            .and_then(move |(output, module)| {
                // 阶段 3:ReLU — result = max(0, output)
                // ...
            })
            .and_then(d2h);  // 阶段 4:将结果复制到主机

        handles.push(tokio::spawn(pipeline.into_future()));
    }

让我们解析这里发生了什么:

  1. pipeline 是一个 DeviceOperation 它描述了整个前向传播 但不执行任何 GPU 工作。构建它纯粹是主机端的计算——分配闭包和结构体。

  2. .into_future() 将其转换为 DeviceFuture 这触发了 调度策略,该策略选择一个流。批次 0 获得流 0,批次 1 获得流 1, 以此类推——轮询分配。

  3. tokio::spawn 将 future 交给 Tokio 运行时。 运行时将在 执行器有容量时轮询它。在第一次轮询时,管道的 execute() 运行, 将所有 GPU 工作提交到分配的流。

  4. 运行时是空闲的。 第一次轮询后,任务被挂起。GPU 正在四个流上同时进行数字运算。没有主机线程在 等待。

  5. 当 GPU 完成某个管道的工作时,cuLaunchHostFunc 回调 触发,唤醒相应的 Tokio 任务。运行时重新轮询它, 交付 Vec<f32> 结果。

步骤 4:收集结果#

    for (i, handle) in handles.into_iter().enumerate() {
        let result: Vec<f32> = handle.await??;
        println!("Batch {}: {} elements, first 4 = {:?}",
            i, result.len(), &result[..4]);
    }

? 解包两层:外层的 JoinError(如果 Tokio 任务发生了 panic)和内层的 DeviceError(如果 GPU 工作失败)。在 生产系统中,你应该分别处理这些错误。

.await vs .sync() vs tokio::spawn#

看到三者都实际运行后,以下是何时使用每个的指南:

.sync() 阻塞调用线程直到 GPU 完成。在 脚本、测试以及任何没有异步运行时的地方使用它。简单,没有 仪式感,但也没有并发——主机线程被卡在等待中:

let result = pipeline.sync()?;

.await 在 GPU 工作时让出当前异步任务。同一 Tokio 线程上的其他任务可以取得进展。这比 .sync() 对 吞吐量更好,但在任务内部仍然是顺序的——.await 之后的代码在 操作完成之前不会运行:

let result = pipeline.await?;

tokio::spawn(op.into_future()) 将管道启动为完全 独立的任务。启动代码立即继续,结果 稍后通过 join 句柄到达。这是实现真正并发的方式 ——多个管道同时在不同流上运行:

let handle = tokio::spawn(pipeline.into_future());
// ... 启动更多管道,做其他工作 ...
let result = handle.await??;

小技巧

对于依赖操作链(如四阶段前向传播), 优先使用 and_then 而非顺序的 .awaitand_then 链完全在 一个流上运行,阶段之间调度开销为零。 顺序 .await 每次操作都经过调度策略, 可能落在不同流上并需要跨流同步。

所有权模式#

Rust 中的并发 GPU 编程意味着 Rust 的所有权规则在积极 为你工作——偶尔也会阻碍你。以下是反复出现的模式。

通过 and_then 闭包移动数据#

每个 and_then 闭包通过 move 捕获数据。内核启动产生 (),所以你需要显式地向前传递下一阶段需要的缓冲区:

launch_gemm(input, hidden, w0)
    .and_then(move |()| {
        // input 被内核消耗。hidden 和 module 存活
        // 因为它们被捕获但没有被消耗。
        value((hidden, output, w1, module))
    })

闭包返回一个包含下一阶段所需一切元组的 Value。 这个元组是在阶段之间传递的"接力棒"。

使用 Arc 共享不可变数据#

模型权重、查找表和其他跨管道共享的不可变数据应该 包装在 Arc 中。.arc() 组合器自动为 DeviceOperation 的输出执行此操作。对于你已有数据,Arc::new() 也可行:

let weights = h2d(weight_data).arc().await?;  // Arc<DeviceBox<[f32]>>
for batch in batches {
    let w = weights.clone();  // 廉价的引用计数增量
    tokio::spawn(forward_pass(batch, w).into_future());
}

保持设备内存存活#

DeviceBox 必须保持存活直到 GPU 完成使用它。在 and_then 链中这是自动的——闭包拥有 DeviceBox,它存活直到 下一阶段取走它。危险在于 with_context 闭包,你在其中执行 异步复制,而 DeviceBox 可能在复制完成之前被丢弃:

fn d2h(dev: DeviceBox<[f32]>) -> impl DeviceOperation<Output = Vec<f32>> {
    with_context(move |ctx| {
        let stream = ctx.get_cuda_stream();
        let mut host = vec![0.0f32; dev.len()];
        unsafe {
            memcpy_dtoh_async(
                host.as_mut_ptr(), dev.cu_deviceptr(),
                dev.len() * std::mem::size_of::<f32>(),
                stream.cu_stream(),
            ).unwrap();
        }
        // dev 被闭包捕获,并在闭包返回之前存活。
        // 流将在结果被消费之前同步,
        // 因此异步复制在 dev 被丢弃之前完成。
        value(host)
    })
}

关键:devmove 闭包捕获,在闭包返回之前不会被丢弃。 由于流在 DeviceFuture 交付结果之前同步, 异步复制在 dev 被释放之前完成。

错误处理#

GPU 工作产生的错误通过 Result 链传播,就像任何 Rust 代码一样。当运行并发批次时,你通常希望即使一个批次失败也继续 处理:

for (i, handle) in handles.into_iter().enumerate() {
    match handle.await {
        Ok(Ok(result)) => {
            println!("Batch {i}: {} elements", result.len());
        }
        Ok(Err(device_err)) => {
            eprintln!("Batch {i}: GPU error: {device_err}");
        }
        Err(join_err) => {
            eprintln!("Batch {i}: task panicked: {join_err}");
        }
    }
}

三个分支对应:成功、CUDA 驱动或调度错误 (例如内存不足、无效的启动配置)以及 Tokio 任务 panic。

多设备执行#

对于具有多个 GPU 的系统,将更高的设备计数传递给 init_device_contexts

init_device_contexts(0, 2)?;  // 默认设备 0,容量为 2 个设备

这将默认设备设置为 GPU 0,并准备好线程局部映射以支持 两个设备。每个设备的 CUDA 上下文、调度策略和流池 在首次使用时延迟创建。所有策略驱动的操作(.sync().await) 使用默认设备,除非你显式指定另一个设备。 ExecutionContext 携带设备 ID,因此 GPU 0 上的操作永远不会 干扰 GPU 1 上的流。

小技巧

多设备编程需要注意内存位置。在 GPU 0 上分配的 DeviceBox 在 GPU 1 上不可访问,除非启用对等 访问。在构建设备特定操作时,使用 with_context 检查 ctx.get_device_id()

性能调优#

流池大小#

默认的四流池对大多数工作负载运行良好,但如果你在 追求最后几个百分点的吞吐量,合适的池大小取决于你的 情况:

工作负载

建议池大小

原因

每次启动一个大内核

1--2

内核已经饱和 GPU

许多小内核

4--8

重叠流间的启动开销

混合内核 + memcpy

2--4

重叠计算与数据传输

延迟敏感的服务

每请求 1

避免队头阻塞

性能分析#

Nsight Systems 是可视化流占用情况的标准工具:

nsys profile --trace=cuda cargo oxide run async_mlp
nsys-ui report.nsys-rep

寻找同一流上内核间的间隙(启动开销)、池中的空闲 流(不均衡的工作分布)以及跨流的意外 串行化(缺失或多余的依赖)。

常见陷阱#

陷阱

现象

修复方法

在异步上下文中使用 .sync()

阻塞 Tokio 线程,使其上的所有任务停顿

改用 .await

DeviceBox 在流同步前被丢弃

释放后使用崩溃或结果损坏

and_then 闭包中保持所有权

流太多

调度开销超过重叠收益

分析,减少池大小

缺少 init_device_contexts

第一个操作时报 DeviceError::Context

在程序开始时调用一次

参见

Async MLP Pipeline 项目章节 包含完整的 async_mlp 源代码,附带构建说明和预期输出。