Skip to content

CUDA Streams 与异步执�?

CUDA Stream 是实现计算与数据传输重叠、多 Kernel 并发执行的核心机制,是榨�?GPU 性能的关键工具�?

什么是 CUDA Stream

Stream �?GPU 上的有序操作队列,同一 Stream 内的操作按顺序执行,不同 Stream 间的操作可以并发执行�?

默认 Stream(Stream 0):所有操作串�?
┌──────────┬──────────┬──────────┬──────────�?
�? H2D 拷贝 �? Kernel  �? D2H 拷贝 �? Kernel  �?
└──────────┴──────────┴──────────┴──────────�?

�?Stream:计算与传输重叠
Stream 1: ┌──────────�?         ┌──────────�?
          �? H2D 拷贝 �?         �? H2D 拷贝 �?
          └──────────�?         └──────────�?
Stream 2:          ┌──────────�?         ┌──────────�?
                   �? Kernel  �?         �? Kernel  �?
                   └──────────�?         └──────────�?
Stream 3:                    ┌──────────�?
                             �? D2H 拷贝 �?
                             └──────────�?

基本 API

cpp
// 创建 Stream
cudaStream_t stream;
cudaStreamCreate(&stream);

// 创建高优先级 Stream
int leastPriority, greatestPriority;
cudaDeviceGetStreamPriorityRange(&leastPriority, &greatestPriority);
cudaStreamCreateWithPriority(&stream, cudaStreamNonBlocking, greatestPriority);

// �?Stream 上执行操�?
cudaMemcpyAsync(dst, src, size, cudaMemcpyHostToDevice, stream);
kernel<<<grid, block, 0, stream>>>(args);
cudaMemcpyAsync(dst, src, size, cudaMemcpyDeviceToHost, stream);

// 等待 Stream 完成
cudaStreamSynchronize(stream);

// 查询 Stream 是否完成(非阻塞�?
cudaError_t status = cudaStreamQuery(stream);
if (status == cudaSuccess) { /* 完成 */ }

// 销�?
cudaStreamDestroy(stream);

异步拷贝的前�?

cudaMemcpyAsync 要求主机内存必须�?固定内存(Pinned Memory�?,否则会退化为同步拷贝�?

流水线重叠模�?

双缓冲(Double Buffering�?

cpp
const int CHUNK = N / 2;
cudaStream_t stream[2];
cudaStreamCreate(&stream[0]);
cudaStreamCreate(&stream[1]);

float *d_buf[2];
cudaMalloc(&d_buf[0], CHUNK * sizeof(float));
cudaMalloc(&d_buf[1], CHUNK * sizeof(float));

// 预加载第一�?
cudaMemcpyAsync(d_buf[0], h_data, CHUNK * sizeof(float),
    cudaMemcpyHostToDevice, stream[0]);

for (int i = 0; i < N / CHUNK; i++) {
    int cur = i % 2;
    int nxt = (i + 1) % 2;
    
    // 计算当前块(stream[cur]�?
    kernel<<<grid, block, 0, stream[cur]>>>(d_buf[cur], CHUNK);
    
    // 同时传输下一块(stream[nxt]�?
    if (i + 1 < N / CHUNK) {
        cudaMemcpyAsync(d_buf[nxt], h_data + (i+1) * CHUNK,
            CHUNK * sizeof(float), cudaMemcpyHostToDevice, stream[nxt]);
    }
}

cudaStreamSynchronize(stream[0]);
cudaStreamSynchronize(stream[1]);

三阶段流水线

cpp
// 同时重叠:H2D 传输 + Kernel 计算 + D2H 传输
cudaStream_t s_h2d, s_compute, s_d2h;
cudaStreamCreate(&s_h2d);
cudaStreamCreate(&s_compute);
cudaStreamCreate(&s_d2h);

for (int i = 0; i < numChunks; i++) {
    // H2D:传输第 i+2 �?
    if (i + 2 < numChunks)
        cudaMemcpyAsync(d_in[(i+2)%3], h_in + (i+2)*CHUNK, ..., s_h2d);
    
    // Compute:处理第 i+1 �?
    if (i + 1 < numChunks)
        kernel<<<grid, block, 0, s_compute>>>(d_in[(i+1)%3], d_out[(i+1)%3]);
    
    // D2H:取回第 i �?
    if (i >= 0)
        cudaMemcpyAsync(h_out + i*CHUNK, d_out[i%3], ..., s_d2h);
    
    // 同步(确保依赖关系)
    cudaStreamSynchronize(s_d2h);
}

CUDA Event

Event 用于精确计时�?Stream 间同步:

cpp
cudaEvent_t start, stop;
cudaEventCreate(&start);
cudaEventCreate(&stop);

// 精确计时
cudaEventRecord(start, stream);
kernel<<<grid, block, 0, stream>>>(args);
cudaEventRecord(stop, stream);

cudaEventSynchronize(stop);  // 等待 stop 事件完成

float milliseconds;
cudaEventElapsedTime(&milliseconds, start, stop);
printf("Kernel time: %.3f ms\n", milliseconds);

cudaEventDestroy(start);
cudaEventDestroy(stop);

Stream 间依赖(Event 同步�?

cpp
cudaEvent_t event;
cudaEventCreate(&event);

// Stream A 完成某操作后记录 Event
kernelA<<<grid, block, 0, streamA>>>(d_data);
cudaEventRecord(event, streamA);

// Stream B 等待 Event(不阻塞 CPU�?
cudaStreamWaitEvent(streamB, event, 0);

// Stream B �?Event 完成后才开始执�?
kernelB<<<grid, block, 0, streamB>>>(d_data);

cudaEventDestroy(event);

�?Kernel 并发执行

�?GPU 资源未被单个 Kernel 占满时,多个 Kernel 可以并发执行�?

cpp
// 条件:每�?Kernel 使用�?SM 资源 < 总资�?
// 例如:两个各�?50% SM �?Kernel 可以并发

cudaStream_t stream1, stream2;
cudaStreamCreate(&stream1);
cudaStreamCreate(&stream2);

// 两个 Kernel 在不�?Stream 上并�?
smallKernel<<<32, 256, 0, stream1>>>(d_A);
smallKernel<<<32, 256, 0, stream2>>>(d_B);

cudaStreamSynchronize(stream1);
cudaStreamSynchronize(stream2);

默认 Stream 的隐式同�?

默认 Stream 是全局同步�?

默认 Stream(Stream 0)上的操作会与所有其�?Stream 同步,破坏并发性�?

cpp
// 问题:默�?Stream 导致隐式同步
kernel1<<<grid, block, 0, stream1>>>(args);
kernel2<<<grid, block>>>(args);          // 默认 Stream!等�?stream1 完成
kernel3<<<grid, block, 0, stream2>>>(args);

// 解决:使用非阻塞 Stream
cudaStreamCreateWithFlags(&stream, cudaStreamNonBlocking);
// 非阻�?Stream 不与默认 Stream 同步

图执行(CUDA Graph�?

CUDA Graph 将一系列操作预先录制为图,减�?CPU 启动开销�?

cpp
// 方法1:流捕获(Stream Capture�?
cudaGraph_t graph;
cudaGraphExec_t graphExec;

// 开始捕�?
cudaStreamBeginCapture(stream, cudaStreamCaptureModeGlobal);

// 录制操作序列
cudaMemcpyAsync(d_data, h_data, size, cudaMemcpyHostToDevice, stream);
kernel1<<<grid, block, 0, stream>>>(d_data);
kernel2<<<grid, block, 0, stream>>>(d_data);
cudaMemcpyAsync(h_result, d_data, size, cudaMemcpyDeviceToHost, stream);

// 结束捕获
cudaStreamEndCapture(stream, &graph);

// 实例化图
cudaGraphInstantiate(&graphExec, graph, nullptr, nullptr, 0);

// 重复执行(CPU 开销极低�?
for (int i = 0; i < 1000; i++) {
    cudaGraphLaunch(graphExec, stream);
    cudaStreamSynchronize(stream);
}

// 清理
cudaGraphExecDestroy(graphExec);
cudaGraphDestroy(graph);

CUDA Graph 的性能优势

传统方式(每次迭代)�?
  CPU: 启动 Kernel1 �?启动 Kernel2 �?启动 Kernel3
  每次启动:~5-10 μs CPU 开销

CUDA Graph(每次迭代)�?
  CPU: 提交整个�?�?~1 μs
  
对于大量�?Kernel 的场景(如推理),Graph 可减�?50%+ �?CPU 开销

实战:推理流水线

cpp
class InferencePipeline {
    cudaStream_t preprocess_stream, compute_stream, postprocess_stream;
    cudaEvent_t preprocess_done, compute_done;
    
public:
    void run(float* h_input, float* h_output, int batchSize) {
        // 预处理(归一化等�?
        preprocessKernel<<<grid, block, 0, preprocess_stream>>>(
            d_raw, d_input, batchSize);
        cudaEventRecord(preprocess_done, preprocess_stream);
        
        // 等待预处理完成后开始推�?
        cudaStreamWaitEvent(compute_stream, preprocess_done);
        
        // 模型推理(多层)
        for (auto& layer : layers) {
            layer.forward(compute_stream);
        }
        cudaEventRecord(compute_done, compute_stream);
        
        // 等待推理完成后后处理
        cudaStreamWaitEvent(postprocess_stream, compute_done);
        postprocessKernel<<<grid, block, 0, postprocess_stream>>>(
            d_output, d_result, batchSize);
        
        // 异步取回结果
        cudaMemcpyAsync(h_output, d_result, outputSize,
            cudaMemcpyDeviceToHost, postprocess_stream);
        
        cudaStreamSynchronize(postprocess_stream);
    }
};

下一篇:并行算法与优化策�?→

基于 NVIDIA CUDA 官方文档整理