DeviceOperation 模型#
在编写 GPU 程序一章中,你看到
了类型化的同步启动会在显式的流上排队工作,而类型化的
异步启动则返回一个延迟句柄,该句柄将流的选择推迟。本章
深入探讨该延迟句柄背后的抽象——DeviceOperation
trait——并解释为何将 GPU 应该做什么与它在哪个流上运行解耦
是 cuda-oxide 中可组合异步 GPU 编程的基础。
参见
CUDA Programming Guide -- Asynchronous Concurrent Execution
了解 DeviceOperation 所构建的底层 CUDA 流和事件模型。
为什么要使用延迟操作?#
在 CUDA C++ 中,你通过创建多个 cudaStream_t 句柄
并在其上显式放置内核启动和内存复制来构建并发。程序员
在每个调用点决定使用哪个流。这将 GPU 工作的
定义与调度决策耦合在一起,使得事后难以组合
和重新排列工作。
cuda-oxide 采用了不同的方法。DeviceOperation 描述 GPU 工作
但不绑定到任何流。你可以使用组合器
(and_then、zip!)组合操作,跨函数边界传递它们,将它们存储在
集合中,只在最后一刻才决定如何调度它们。这与
Rust 的 Iterator 理念相同——延迟构建管道,在调用点
急切地执行它。
方式 |
流由谁选择 |
可组合? |
|---|---|---|
类型化同步启动 |
调用者 |
否,立即排队 |
类型化异步启动 |
调度策略 |
是,返回 |
DeviceOperation 的生命周期。阶段 1:类型化异步方法构建一个延迟配方
(没有 GPU 工作)。阶段 2:调度策略从池中选择一个流。
阶段 3:execute() 提交 GPU 工作和一个 cuLaunchHostFunc 回调。
阶段 4:回调触发,唤醒异步运行时,并交付结果。
底部:四种执行方法,从最简单(.sync())到最手动
(async_on)。#
配方与厨房#
将 DeviceOperation 想象成一张配方卡片。卡片描述了菜肴的每个步骤
——使用什么食材,在什么温度下,烹饪多久——
但它没有说明哪个厨房会烹饪它。你可以将卡片交给任何厨房,
复印它,将两张卡片装订在一起组成一顿多道菜的大餐,
或将其归档以备后用。只有当有人走进厨房并开始按照说明操作时,
菜肴才开始烹饪。
在 cuda-oxide 的模型中:
配方是一个
DeviceOperation——GPU 工作的延迟描述。厨房是一个 CUDA 流——工作实际运行的有序队列。
主厨是一个
SchedulingPolicy——决定哪个厨房处理每个配方的逻辑。餐食是
Output——当一切完成后你得到的结果。
这种分离使得系统具有可组合性。你可以编写一个函数, 返回一个"上传数据、运行 GEMM、应用 ReLU"的配方,而不关心 哪个流会执行它。调用者可以在配方上链接更多步骤, 在特定流上运行它,或将其交给调度策略然后走开。
你的第一个异步启动#
创建 DeviceOperation 最简单的方式是生成的 {kernel}_async
方法。它看起来像同步方法,但没有流参数,并且
返回一个配方而不是立即烹饪:
use cuda_async::device_context::init_device_contexts;
use cuda_core::LaunchConfig;
// 一次性设置:为调度创建流池
init_device_contexts(0, 1)?;
// 构建配方(尚未发生 GPU 工作)
let module = kernels::load_async(0)?;
let op = module.vecadd_async(LaunchConfig::for_num_elems(1024), &a_dev, &b_dev, &mut c_dev)?;
// 现在烹饪它:选一个流,启动,等待结果
op.sync()?;
在创建 op 的位置,GPU 上什么也没有发生。该方法
构建了一个 AsyncKernelLaunch 值,它记住了要调用哪个函数、传递什么
参数以及如何配置网格——但它不会触碰任何流。
它是放在台面上的一张配方卡片。
当你调用 .sync() 时,调度策略从池中选择一个流,
提交内核,并阻塞直到流空闲。这一行代码就是配方变成
烹饪好的餐食的地方。
什么构成了 DeviceOperation#
在幕后,DeviceOperation 是一个 trait。任何描述 GPU
工作的类型都可以实现它。该 trait 有一个必需的方法和一个关联类型:
pub trait DeviceOperation: Send + Sized + IntoFuture {
type Output: Send;
unsafe fn execute(
self,
context: &ExecutionContext,
) -> Result<Self::Output, DeviceError>;
}
Output 是操作完成后产生的 Rust 值。对于
内核启动,这是 ()——内核通过其对设备内存的副作用运行。
对于设备到主机的复制,它可能是 Vec<f32>。对于内存
分配,它可能是一个拥有指针的 DeviceBox<[f32]>。
execute 是实际 GPU 工作发生的地方。它接收一个
ExecutionContext——分配的厨房——并将工作提交到其中的流。
该方法是 unsafe 的,因为当它返回时 GPU 工作可能仍在进行中;
调用者有责任在读取结果之前进行同步。
Send 约束意味着操作可以在线程之间移动(对于
tokio::spawn 至关重要)。IntoFuture 约束使得 .await 能够工作——
稍后会详述。
你很少会自己实现 DeviceOperation。该 crate 提供了一组
实现它的类型,你可以使用组合器来组合它们:
AsyncKernelLaunch——由类型化异步启动方法产生。启动一个内核。Value<T>——包装一个主机端值。没有 GPU 工作。立即返回T。AndThen——链接两个操作:运行 A,将结果传给 B。Zip——运行两个操作并将两个结果作为元组返回。StreamOperation——将构造推迟到流已知时才进行。
这些是每个异步管道的构建块。 组合器与组合一章详细介绍了 每一个。
ExecutionContext——流所在之处#
当配方被执行时,它需要知道自己身处哪个厨房。
ExecutionContext 携带了该信息:
pub struct ExecutionContext {
device: usize, // 哪个 GPU
cuda_stream: Arc<CudaStream>, // 哪个流
cuda_context: Arc<CudaContext>, // 哪个 CUDA 上下文
}
操作永远不会自己创建流。调度策略(在
调度与流中介绍)创建
ExecutionContext 并将其传递给 execute。这是分离的
核心:操作描述做什么,上下文提供在哪里做。
在 execute 实现内部,你通过
ctx.get_cuda_stream() 访问流,通过 ctx.get_cuda_context() 访问 CUDA 上下文。
对于大多数操作来说,这就是你需要的全部——在流上排队一个内核或内存复制,
就完成了。
运行配方:四种执行方式#
一旦你有了 DeviceOperation,你需要触发它。cuda-oxide 提供了
四种路径,从"帮我完成一切"到"我自己来"。
.sync()——阻塞并等待#
最简单的选项。调度策略选择一个流,运行操作, 并阻塞调用线程直到流空闲:
let result: Vec<f32> = d2h_operation.sync()?;
这对于脚本、测试以及任何只想要立即得到答案的地方都很适用。 不需要 Tokio 运行时。
.await——让出并恢复#
在异步运行时内部,.await 执行相同的操作但不会阻塞
线程。它将操作转换为 DeviceFuture,提交 GPU 工作,
并让出当前任务。当 GPU 完成时,它唤醒任务并
交付结果:
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
init_device_contexts(0, 1)?;
let module = kernels::load_async(0)?;
module
.vecadd_async(LaunchConfig::for_num_elems(1024), &a_dev, &b_dev, &mut c_dev)?
.await?;
Ok(())
}
当 GPU 在工作时,Tokio 运行时可以自由地轮询其他任务——没有 线程闲置等待硬件。这是并发运行多个 GPU 管道的关键,我们将在 并发执行中探讨。
.sync_on(&stream)——你来选择流#
当你需要特定的流时——为了与现有 CUDA 库互操作,
或者保证与该流上其他工作的顺序——sync_on 允许你
直接提供流并阻塞直到完成:
let stream = ctx.new_stream()?;
operation.sync_on(&stream)?;
unsafe async_on(&stream)——发射后不管#
最手动的选项。它将工作提交到流并立即返回, 不进行同步。调用者必须确保在读取结果之前流已完成 同步。这对于将许多操作批量提交到一个流然后在结束时 进行一次同步非常有用:
let stream = ctx.new_stream()?;
unsafe { op_a.async_on(&stream)? };
unsafe { op_b.async_on(&stream)? };
stream.synchronize()?; // 现在两者都完成了
使用 value() 提升主机数据#
并非管道中的每一步都涉及 GPU。有时你需要将
一个主机端值——配置参数、一组维度、预加载的
权重向量——输入到设备操作链中。value() 函数
将任何 Send 类型包装成一个空操作的 DeviceOperation,立即返回它:
use cuda_async::device_operation::value;
let weights = vec![1.0f32; 1024];
let op = value(weights); // impl DeviceOperation<Output = Vec<f32>>
单独来看,value() 看起来毫无意义。它的威力体现在组合中。
如果你要将主机到设备的传输和一个配置结构体合并在一起,
value() 使配置适应管道:
let (device_buf, config) = zip!(
h2d(raw_data),
value(ModelConfig { dim: 64, layers: 3 })
).sync()?;
zip! 的两个支路都必须是 DeviceOperation。value() 是使
主机数据与设备工作良好配合的适配器。
使用 with_context 与流通信#
有些操作需要在执行时访问流本身。内存
分配(malloc_async)、异步复制(memcpy_htod_async)和
事件记录都需要原始的 CUstream 句柄。但请记住——一个
DeviceOperation 在创建时并不知道自己将在哪个流上运行。
流是由调度策略稍后分配的。
with_context 弥合了这一差距。它创建一个操作,其主体被推迟
到 ExecutionContext 可用时才执行:
use cuda_async::device_operation::{with_context, value};
use cuda_core::memory::{malloc_async, memcpy_htod_async};
fn h2d(host_data: Vec<f32>) -> impl DeviceOperation<Output = DeviceBox<[f32]>> {
with_context(move |ctx| {
let stream = ctx.get_cuda_stream();
let n = host_data.len();
let num_bytes = n * std::mem::size_of::<f32>();
unsafe {
let dptr = malloc_async(stream.cu_stream(), num_bytes).unwrap();
memcpy_htod_async(dptr, host_data.as_ptr(), num_bytes, stream.cu_stream())
.unwrap();
value(DeviceBox::from_raw_parts(dptr, n, ctx.get_device_id()))
}
})
}
闭包接收 ExecutionContext 并且必须返回另一个
DeviceOperation。这里它返回一个包装了新分配的
设备指针的 Value。内部操作在同一流上立即执行。
这种模式——with_context 包装原始驱动调用,在末尾返回 value()——
是你将任何低级 CUDA 操作变成可组合构建块的方式。
async_mlp 示例将其用于 h2d、d2h 和 zeros
辅助函数,这些函数可以干净地插入 and_then 链中。
小技巧
with_context 是需要 CUstream 的原始驱动调用的逃生舱口。
对于内核启动,优先使用生成的异步启动方法,因为
它们处理参数编组和缓冲区生命周期。
GPU 如何告诉 Rust 它已完成#
当你 .await 一个 DeviceOperation 时,幕后发生了一些有趣的事情。
该操作变成一个 DeviceFuture——一个实现了 Rust 的
std::future::Future 的类型——异步运行时轮询它。但基于轮询的
系统如何知道硬件何时完成了它的工作呢?
答案是 cuLaunchHostFunc,一个 CUDA 驱动 API,它将主机端
回调排队到流中。当该流上所有先前的 GPU 工作完成时,
驱动在驱动线程上调用该回调。cuda-oxide 使用它来构建
CUDA 与 Rust 异步模型之间的零忙等待桥梁。
DeviceFuture 是一个三状态机:
空闲 ───poll()───► 执行中 ───回调触发───► 完成
│ │
(提交 GPU 工作 (将结果返回
+ 排队回调) 给运行时)
在第一次轮询时,future:
在操作上调用
execute(),将 GPU 工作提交到流。在同一流上,紧接 GPU 工作之后,排队一个
cuLaunchHostFunc回调。 CUDA 保证流内顺序:此回调不会在内核完成之前触发。返回
Poll::Pending。异步运行时挂起任务并继续执行。
当GPU 完成内核时,CUDA 驱动在驱动线程上调用主机回调。
回调设置一个 AtomicBool 标志并唤醒任务的
AtomicWaker。异步运行时注意到唤醒并重新轮询 future。
在第二次轮询时,future 看到该标志并返回
Poll::Ready(Ok(result))。任务随值恢复。
关键特性:GPU 工作时没有主机线程自旋或休眠。
异步执行器可以自由运行其他任务——包括其他流上的其他 DeviceFuture。
这就是 cuda-oxide 在不给每个 GPU 操作分配专用线程的情况下
实现真正并发执行的方式。