From a40ed4ccd03a1162cf40a5f4fa35ee6ee7979abc Mon Sep 17 00:00:00 2001 From: Hugh Perkins Date: Wed, 11 Mar 2026 16:47:44 -0700 Subject: [PATCH 01/20] Add qd.stream_parallel() context manager for implicit stream parallelism Introduces stream_parallel() for running top-level for-loop blocks on separate GPU streams. The AST transformer maps 'with qd.stream_parallel()' blocks to stream-parallel group IDs, which propagate through IR lowering and offloading to the CUDA/AMDGPU kernel launchers. Each unique group ID gets its own stream at launch time. Includes validation that all top-level kernel statements must be stream_parallel blocks (no mixing), and offline cache key support. --- python/quadrants/lang/ast/ast_transformer.py | 32 +++- .../function_def_transformer.py | 29 +++ python/quadrants/lang/stream.py | 15 +- quadrants/analysis/gen_offline_cache_key.cpp | 1 + quadrants/codegen/amdgpu/codegen_amdgpu.cpp | 1 + quadrants/codegen/cuda/codegen_cuda.cpp | 1 + quadrants/codegen/llvm/llvm_compiled_data.h | 13 +- quadrants/ir/frontend_ir.cpp | 12 +- quadrants/ir/frontend_ir.h | 12 ++ quadrants/ir/statements.cpp | 3 + quadrants/ir/statements.h | 3 + quadrants/python/export_lang.cpp | 4 +- quadrants/runtime/amdgpu/kernel_launcher.cpp | 52 ++++- quadrants/runtime/cuda/kernel_launcher.cpp | 52 ++++- quadrants/transforms/lower_ast.cpp | 3 + quadrants/transforms/offload.cpp | 3 + tests/python/test_api.py | 1 + tests/python/test_streams.py | 178 ++++++++++++++++-- 18 files changed, 377 insertions(+), 38 deletions(-) diff --git a/python/quadrants/lang/ast/ast_transformer.py b/python/quadrants/lang/ast/ast_transformer.py index 1b13ead0f9..f5cfbeef1a 100644 --- a/python/quadrants/lang/ast/ast_transformer.py +++ b/python/quadrants/lang/ast/ast_transformer.py @@ -28,6 +28,7 @@ from quadrants.lang.ast.ast_transformers.function_def_transformer import ( FunctionDefTransformer, ) +from quadrants.lang.ast.symbol_resolver import ASTResolver from quadrants.lang.exception import ( QuadrantsIndexError, QuadrantsRuntimeTypeError, @@ -39,6 +40,7 @@ from quadrants.lang.field import Field from quadrants.lang.matrix import Matrix, MatrixType from quadrants.lang.snode import append, deactivate, length +from quadrants.lang.stream import stream_parallel from quadrants.lang.struct import Struct, StructType from quadrants.types import primitive_types from quadrants.types.utils import is_integral @@ -108,7 +110,11 @@ def build_AnnAssign(ctx: ASTTransformerFuncContext, node: ast.AnnAssign): @staticmethod def build_assign_annotated( - ctx: ASTTransformerFuncContext, target: ast.Name, value, is_static_assign: bool, annotation: Type + ctx: ASTTransformerFuncContext, + target: ast.Name, + value, + is_static_assign: bool, + annotation: Type, ): """Build an annotated assignment like this: target: annotation = value. @@ -156,7 +162,10 @@ def build_Assign(ctx: ASTTransformerFuncContext, node: ast.Assign) -> None: @staticmethod def build_assign_unpack( - ctx: ASTTransformerFuncContext, node_target: list | ast.Tuple, values, is_static_assign: bool + ctx: ASTTransformerFuncContext, + node_target: list | ast.Tuple, + values, + is_static_assign: bool, ): """Build the unpack assignments like this: (target1, target2) = (value1, value2). The function should be called only if the node target is a tuple. @@ -538,7 +547,8 @@ def build_Return(ctx: ASTTransformerFuncContext, node: ast.Return) -> None: else: raise QuadrantsSyntaxError("The return type is not supported now!") ctx.ast_builder.create_kernel_exprgroup_return( - expr.make_expr_group(return_exprs), _qd_core.DebugInfo(ctx.get_pos_info(node)) + expr.make_expr_group(return_exprs), + _qd_core.DebugInfo(ctx.get_pos_info(node)), ) else: ctx.return_data = node.value.ptr @@ -1381,6 +1391,22 @@ def build_Continue(ctx: ASTTransformerFuncContext, node: ast.Continue) -> None: ctx.ast_builder.insert_continue_stmt(_qd_core.DebugInfo(ctx.get_pos_info(node))) return None + @staticmethod + def build_With(ctx: ASTTransformerFuncContext, node: ast.With) -> None: + if len(node.items) != 1: + raise QuadrantsSyntaxError("'with' in Quadrants kernels only supports a single context manager") + item = node.items[0] + if item.optional_vars is not None: + raise QuadrantsSyntaxError("'with ... as ...' is not supported in Quadrants kernels") + if not isinstance(item.context_expr, ast.Call): + raise QuadrantsSyntaxError("'with' in Quadrants kernels requires a call expression") + if not ASTResolver.resolve_to(item.context_expr.func, stream_parallel, ctx.global_vars): + raise QuadrantsSyntaxError("'with' in Quadrants kernels only supports qd.stream_parallel()") + ctx.ast_builder.begin_stream_parallel() + build_stmts(ctx, node.body) + ctx.ast_builder.end_stream_parallel() + return None + @staticmethod def build_Pass(ctx: ASTTransformerFuncContext, node: ast.Pass) -> None: return None diff --git a/python/quadrants/lang/ast/ast_transformers/function_def_transformer.py b/python/quadrants/lang/ast/ast_transformers/function_def_transformer.py index 6d000b69f5..dacbac4c96 100644 --- a/python/quadrants/lang/ast/ast_transformers/function_def_transformer.py +++ b/python/quadrants/lang/ast/ast_transformers/function_def_transformer.py @@ -21,10 +21,12 @@ from quadrants.lang.ast.ast_transformer_utils import ( ASTTransformerFuncContext, ) +from quadrants.lang.ast.symbol_resolver import ASTResolver from quadrants.lang.exception import ( QuadrantsSyntaxError, ) from quadrants.lang.matrix import MatrixType +from quadrants.lang.stream import stream_parallel from quadrants.lang.struct import StructType from quadrants.lang.util import to_quadrants_type from quadrants.types import annotations, ndarray_type, primitive_types @@ -295,7 +297,34 @@ def build_FunctionDef( else: FunctionDefTransformer._transform_as_func(ctx, node, args) + if ctx.is_kernel: + FunctionDefTransformer._validate_stream_parallel_exclusivity(node.body, ctx.global_vars) + with ctx.variable_scope_guard(): build_stmts(ctx, node.body) return None + + @staticmethod + def _is_stream_parallel_with(stmt: ast.stmt, global_vars: dict[str, Any]) -> bool: + if not isinstance(stmt, ast.With): + return False + if len(stmt.items) != 1: + return False + item = stmt.items[0] + if not isinstance(item.context_expr, ast.Call): + return False + return ASTResolver.resolve_to(item.context_expr.func, stream_parallel, global_vars) + + @staticmethod + def _validate_stream_parallel_exclusivity(body: list[ast.stmt], global_vars: dict[str, Any]) -> None: + has_sp = any(FunctionDefTransformer._is_stream_parallel_with(s, global_vars) for s in body) + if not has_sp: + return + for stmt in body: + if not FunctionDefTransformer._is_stream_parallel_with(stmt, global_vars): + raise QuadrantsSyntaxError( + "When using qd.stream_parallel(), all top-level statements " + "in the kernel must be 'with qd.stream_parallel():' blocks. " + "Move non-parallel code to a separate kernel." + ) diff --git a/python/quadrants/lang/stream.py b/python/quadrants/lang/stream.py index 8530982455..77979184d4 100644 --- a/python/quadrants/lang/stream.py +++ b/python/quadrants/lang/stream.py @@ -1,3 +1,5 @@ +from contextlib import contextmanager + from quadrants.lang import impl @@ -93,4 +95,15 @@ def create_event() -> Event: return Event(handle) -__all__ = ["Stream", "Event", "create_stream", "create_event"] +@contextmanager +def stream_parallel(): + """Run top-level for loops in this block on separate GPU streams. + + Used inside @qd.kernel. At Python runtime (outside kernels), this is a + no-op. During kernel compilation, the AST transformer calls into the C++ + ASTBuilder to tag loops with a stream-parallel group ID. + """ + yield + + +__all__ = ["Stream", "Event", "create_stream", "create_event", "stream_parallel"] diff --git a/quadrants/analysis/gen_offline_cache_key.cpp b/quadrants/analysis/gen_offline_cache_key.cpp index f9eb5dc324..9a38eb9ac2 100644 --- a/quadrants/analysis/gen_offline_cache_key.cpp +++ b/quadrants/analysis/gen_offline_cache_key.cpp @@ -382,6 +382,7 @@ class ASTSerializer : public IRVisitor, public ExpressionVisitor { emit(stmt->strictly_serialized); emit(stmt->mem_access_opt); emit(stmt->block_dim); + emit(stmt->stream_parallel_group_id); emit(stmt->body.get()); } diff --git a/quadrants/codegen/amdgpu/codegen_amdgpu.cpp b/quadrants/codegen/amdgpu/codegen_amdgpu.cpp index bba1c87f20..e0fcca575e 100644 --- a/quadrants/codegen/amdgpu/codegen_amdgpu.cpp +++ b/quadrants/codegen/amdgpu/codegen_amdgpu.cpp @@ -396,6 +396,7 @@ class TaskCodeGenAMDGPU : public TaskCodeGenLLVM { current_task->grid_dim = num_SMs * query_max_block_per_sm; } current_task->block_dim = stmt->block_dim; + current_task->stream_parallel_group_id = stmt->stream_parallel_group_id; QD_ASSERT(current_task->grid_dim != 0); QD_ASSERT(current_task->block_dim != 0); offloaded_tasks.push_back(*current_task); diff --git a/quadrants/codegen/cuda/codegen_cuda.cpp b/quadrants/codegen/cuda/codegen_cuda.cpp index 8395f7adca..4795db23d2 100644 --- a/quadrants/codegen/cuda/codegen_cuda.cpp +++ b/quadrants/codegen/cuda/codegen_cuda.cpp @@ -692,6 +692,7 @@ class TaskCodeGenCUDA : public TaskCodeGenLLVM { } current_task->block_dim = stmt->block_dim; current_task->dynamic_shared_array_bytes = dynamic_shared_array_bytes; + current_task->stream_parallel_group_id = stmt->stream_parallel_group_id; QD_ASSERT(current_task->grid_dim != 0); QD_ASSERT(current_task->block_dim != 0); offloaded_tasks.push_back(*current_task); diff --git a/quadrants/codegen/llvm/llvm_compiled_data.h b/quadrants/codegen/llvm/llvm_compiled_data.h index 16d4978bd4..f496e6fa3c 100644 --- a/quadrants/codegen/llvm/llvm_compiled_data.h +++ b/quadrants/codegen/llvm/llvm_compiled_data.h @@ -14,16 +14,23 @@ class OffloadedTask { int block_dim{0}; int grid_dim{0}; int dynamic_shared_array_bytes{0}; + int stream_parallel_group_id{0}; explicit OffloadedTask(const std::string &name = "", int block_dim = 0, int grid_dim = 0, - int dynamic_shared_array_bytes = 0) + int dynamic_shared_array_bytes = 0, + int stream_parallel_group_id = 0) : name(name), block_dim(block_dim), grid_dim(grid_dim), - dynamic_shared_array_bytes(dynamic_shared_array_bytes) {}; - QD_IO_DEF(name, block_dim, grid_dim, dynamic_shared_array_bytes); + dynamic_shared_array_bytes(dynamic_shared_array_bytes), + stream_parallel_group_id(stream_parallel_group_id) {}; + QD_IO_DEF(name, + block_dim, + grid_dim, + dynamic_shared_array_bytes, + stream_parallel_group_id); }; struct LLVMCompiledTask { diff --git a/quadrants/ir/frontend_ir.cpp b/quadrants/ir/frontend_ir.cpp index ae2e3ebe7c..6cf3087643 100644 --- a/quadrants/ir/frontend_ir.cpp +++ b/quadrants/ir/frontend_ir.cpp @@ -119,7 +119,8 @@ FrontendForStmt::FrontendForStmt(const FrontendForStmt &o) num_cpu_threads(o.num_cpu_threads), strictly_serialized(o.strictly_serialized), mem_access_opt(o.mem_access_opt), - block_dim(o.block_dim) { + block_dim(o.block_dim), + stream_parallel_group_id(o.stream_parallel_group_id) { } void FrontendForStmt::init_config(Arch arch, const ForLoopConfig &config) { @@ -127,6 +128,7 @@ void FrontendForStmt::init_config(Arch arch, const ForLoopConfig &config) { strictly_serialized = config.strictly_serialized; mem_access_opt = config.mem_access_opt; block_dim = config.block_dim; + stream_parallel_group_id = config.stream_parallel_group_id; if (arch == Arch::cuda || arch == Arch::amdgpu) { num_cpu_threads = 1; QD_ASSERT(block_dim <= quadrants_max_gpu_block_dim); @@ -1542,6 +1544,8 @@ void ASTBuilder::begin_frontend_range_for(const Expr &i, const Expr &s, const Expr &e, const DebugInfo &dbg_info) { + for_loop_dec_.config.stream_parallel_group_id = + current_stream_parallel_group_id_; auto stmt_unique = std::make_unique( i, s, e, arch_, for_loop_dec_.config, dbg_info); auto stmt = stmt_unique.get(); @@ -1558,6 +1562,8 @@ void ASTBuilder::begin_frontend_struct_for_on_snode(const ExprGroup &loop_vars, for_loop_dec_.config.strictly_serialized, "ti.loop_config(serialize=True) does not have effect on the struct for. " "The execution order is not guaranteed."); + for_loop_dec_.config.stream_parallel_group_id = + current_stream_parallel_group_id_; auto stmt_unique = std::make_unique( loop_vars, snode, arch_, for_loop_dec_.config, dbg_info); for_loop_dec_.reset(); @@ -1574,6 +1580,8 @@ void ASTBuilder::begin_frontend_struct_for_on_external_tensor( for_loop_dec_.config.strictly_serialized, "ti.loop_config(serialize=True) does not have effect on the struct for. " "The execution order is not guaranteed."); + for_loop_dec_.config.stream_parallel_group_id = + current_stream_parallel_group_id_; auto stmt_unique = std::make_unique( loop_vars, external_tensor, arch_, for_loop_dec_.config, dbg_info); for_loop_dec_.reset(); @@ -1591,6 +1599,8 @@ void ASTBuilder::begin_frontend_mesh_for( for_loop_dec_.config.strictly_serialized, "ti.loop_config(serialize=True) does not have effect on the mesh for. " "The execution order is not guaranteed."); + for_loop_dec_.config.stream_parallel_group_id = + current_stream_parallel_group_id_; auto stmt_unique = std::make_unique(ExprGroup(i), mesh_ptr, element_type, arch_, for_loop_dec_.config, dbg_info); diff --git a/quadrants/ir/frontend_ir.h b/quadrants/ir/frontend_ir.h index bce009f9e7..693a7f461f 100644 --- a/quadrants/ir/frontend_ir.h +++ b/quadrants/ir/frontend_ir.h @@ -23,6 +23,7 @@ struct ForLoopConfig { MemoryAccessOptions mem_access_opt; int block_dim{0}; bool uniform{false}; + int stream_parallel_group_id{0}; }; #define QD_DEFINE_CLONE_FOR_FRONTEND_IR \ @@ -207,6 +208,7 @@ class FrontendForStmt : public Stmt { bool strictly_serialized; MemoryAccessOptions mem_access_opt; int block_dim; + int stream_parallel_group_id{0}; FrontendForStmt(const ExprGroup &loop_vars, SNode *snode, @@ -961,6 +963,8 @@ class ASTBuilder { Arch arch_; ForLoopDecoratorRecorder for_loop_dec_; int id_counter_{0}; + int stream_parallel_group_counter_{0}; + int current_stream_parallel_group_id_{0}; public: ASTBuilder(Block *initial, Arch arch, bool is_kernel) @@ -1107,6 +1111,14 @@ class ASTBuilder { for_loop_dec_.reset(); } + void begin_stream_parallel() { + current_stream_parallel_group_id_ = ++stream_parallel_group_counter_; + } + + void end_stream_parallel() { + current_stream_parallel_group_id_ = 0; + } + Identifier get_next_id(const std::string &name = "") { return Identifier(id_counter_++, name); } diff --git a/quadrants/ir/statements.cpp b/quadrants/ir/statements.cpp index 14c55be85e..79b469a22a 100644 --- a/quadrants/ir/statements.cpp +++ b/quadrants/ir/statements.cpp @@ -244,6 +244,7 @@ std::unique_ptr RangeForStmt::clone() const { begin, end, body->clone(), is_bit_vectorized, num_cpu_threads, block_dim, strictly_serialized); new_stmt->reversed = reversed; + new_stmt->stream_parallel_group_id = stream_parallel_group_id; return new_stmt; } @@ -265,6 +266,7 @@ std::unique_ptr StructForStmt::clone() const { auto new_stmt = std::make_unique( snode, body->clone(), is_bit_vectorized, num_cpu_threads, block_dim); new_stmt->mem_access_opt = mem_access_opt; + new_stmt->stream_parallel_group_id = stream_parallel_group_id; return new_stmt; } @@ -439,6 +441,7 @@ std::unique_ptr OffloadedStmt::clone() const { new_stmt->tls_size = tls_size; new_stmt->bls_size = bls_size; new_stmt->mem_access_opt = mem_access_opt; + new_stmt->stream_parallel_group_id = stream_parallel_group_id; return new_stmt; } diff --git a/quadrants/ir/statements.h b/quadrants/ir/statements.h index e06bb6d4df..3f440fe4e2 100644 --- a/quadrants/ir/statements.h +++ b/quadrants/ir/statements.h @@ -1016,6 +1016,7 @@ class RangeForStmt : public Stmt { int block_dim; bool strictly_serialized; std::string range_hint; + int stream_parallel_group_id{0}; RangeForStmt(Stmt *begin, Stmt *end, @@ -1061,6 +1062,7 @@ class StructForStmt : public Stmt { int num_cpu_threads; int block_dim; MemoryAccessOptions mem_access_opt; + int stream_parallel_group_id{0}; StructForStmt(SNode *snode, std::unique_ptr &&body, @@ -1443,6 +1445,7 @@ class OffloadedStmt : public Stmt { std::size_t tls_size{1}; // avoid allocating dynamic memory with 0 byte std::size_t bls_size{0}; MemoryAccessOptions mem_access_opt; + int stream_parallel_group_id{0}; OffloadedStmt(TaskType task_type, Arch arch, Kernel *kernel); diff --git a/quadrants/python/export_lang.cpp b/quadrants/python/export_lang.cpp index 2f5da8b1b4..d134464d49 100644 --- a/quadrants/python/export_lang.cpp +++ b/quadrants/python/export_lang.cpp @@ -357,7 +357,9 @@ void export_lang(py::module &m) { .def("strictly_serialize", &ASTBuilder::strictly_serialize) .def("block_dim", &ASTBuilder::block_dim) .def("insert_snode_access_flag", &ASTBuilder::insert_snode_access_flag) - .def("reset_snode_access_flag", &ASTBuilder::reset_snode_access_flag); + .def("reset_snode_access_flag", &ASTBuilder::reset_snode_access_flag) + .def("begin_stream_parallel", &ASTBuilder::begin_stream_parallel) + .def("end_stream_parallel", &ASTBuilder::end_stream_parallel); auto device_capability_config = py::class_(m, "DeviceCapabilityConfig") diff --git a/quadrants/runtime/amdgpu/kernel_launcher.cpp b/quadrants/runtime/amdgpu/kernel_launcher.cpp index 1d8430d35e..1b82b33459 100644 --- a/quadrants/runtime/amdgpu/kernel_launcher.cpp +++ b/quadrants/runtime/amdgpu/kernel_launcher.cpp @@ -1,3 +1,5 @@ +#include + #include "quadrants/runtime/amdgpu/kernel_launcher.h" #include "quadrants/rhi/amdgpu/amdgpu_context.h" #include "quadrants/rhi/amdgpu/amdgpu_driver.h" @@ -108,12 +110,50 @@ void KernelLauncher::launch_llvm_kernel(Handle handle, AMDGPUContext::get_instance().push_back_kernel_arg_pointer(context_pointer); - for (auto &task : offloaded_tasks) { - QD_TRACE("Launching kernel {}<<<{}, {}>>>", task.name, task.grid_dim, - task.block_dim); - amdgpu_module->launch(task.name, task.grid_dim, task.block_dim, - task.dynamic_shared_array_bytes, - {(void *)&context_pointer}, {arg_size}); + for (size_t i = 0; i < offloaded_tasks.size();) { + auto &task = offloaded_tasks[i]; + if (task.stream_parallel_group_id == 0) { + QD_TRACE("Launching kernel {}<<<{}, {}>>>", task.name, task.grid_dim, + task.block_dim); + amdgpu_module->launch(task.name, task.grid_dim, task.block_dim, + task.dynamic_shared_array_bytes, + {(void *)&context_pointer}, {arg_size}); + i++; + } else { + size_t group_start = i; + while (i < offloaded_tasks.size() && + offloaded_tasks[i].stream_parallel_group_id != 0) { + i++; + } + + std::map stream_by_id; + for (size_t j = group_start; j < i; j++) { + int sid = offloaded_tasks[j].stream_parallel_group_id; + if (stream_by_id.find(sid) == stream_by_id.end()) { + void *s = nullptr; + AMDGPUDriver::get_instance().stream_create(&s, 0); + stream_by_id[sid] = s; + } + } + + for (size_t j = group_start; j < i; j++) { + auto &t = offloaded_tasks[j]; + AMDGPUContext::get_instance().set_stream( + stream_by_id[t.stream_parallel_group_id]); + amdgpu_module->launch(t.name, t.grid_dim, t.block_dim, + t.dynamic_shared_array_bytes, + {(void *)&context_pointer}, {arg_size}); + } + + for (auto &[sid, s] : stream_by_id) { + AMDGPUDriver::get_instance().stream_synchronize(s); + } + for (auto &[sid, s] : stream_by_id) { + AMDGPUDriver::get_instance().stream_destroy(s); + } + + AMDGPUContext::get_instance().set_stream(active_stream); + } } QD_TRACE("Launching kernel"); if (ctx.arg_buffer_size > 0) { diff --git a/quadrants/runtime/cuda/kernel_launcher.cpp b/quadrants/runtime/cuda/kernel_launcher.cpp index 13845d5a9b..94aa786b56 100644 --- a/quadrants/runtime/cuda/kernel_launcher.cpp +++ b/quadrants/runtime/cuda/kernel_launcher.cpp @@ -1,3 +1,5 @@ +#include + #include "quadrants/runtime/cuda/kernel_launcher.h" #include "quadrants/rhi/cuda/cuda_context.h" #include "quadrants/rhi/cuda/cuda_driver.h" @@ -139,12 +141,50 @@ void KernelLauncher::launch_llvm_kernel(Handle handle, ctx.get_context().arg_buffer = device_arg_buffer; } - for (auto task : offloaded_tasks) { - QD_TRACE("Launching kernel {}<<<{}, {}>>>", task.name, task.grid_dim, - task.block_dim); - cuda_module->launch(task.name, task.grid_dim, task.block_dim, - task.dynamic_shared_array_bytes, {&ctx.get_context()}, - {}); + for (size_t i = 0; i < offloaded_tasks.size();) { + auto &task = offloaded_tasks[i]; + if (task.stream_parallel_group_id == 0) { + QD_TRACE("Launching kernel {}<<<{}, {}>>>", task.name, task.grid_dim, + task.block_dim); + cuda_module->launch(task.name, task.grid_dim, task.block_dim, + task.dynamic_shared_array_bytes, {&ctx.get_context()}, + {}); + i++; + } else { + size_t group_start = i; + while (i < offloaded_tasks.size() && + offloaded_tasks[i].stream_parallel_group_id != 0) { + i++; + } + + std::map stream_by_id; + for (size_t j = group_start; j < i; j++) { + int sid = offloaded_tasks[j].stream_parallel_group_id; + if (stream_by_id.find(sid) == stream_by_id.end()) { + void *s = nullptr; + CUDADriver::get_instance().stream_create(&s, 0); + stream_by_id[sid] = s; + } + } + + for (size_t j = group_start; j < i; j++) { + auto &t = offloaded_tasks[j]; + CUDAContext::get_instance().set_stream( + stream_by_id[t.stream_parallel_group_id]); + cuda_module->launch(t.name, t.grid_dim, t.block_dim, + t.dynamic_shared_array_bytes, {&ctx.get_context()}, + {}); + } + + for (auto &[sid, s] : stream_by_id) { + CUDADriver::get_instance().stream_synchronize(s); + } + for (auto &[sid, s] : stream_by_id) { + CUDADriver::get_instance().stream_destroy(s); + } + + CUDAContext::get_instance().set_stream(active_stream); + } } if (ctx.arg_buffer_size > 0) { CUDADriver::get_instance().mem_free_async(device_arg_buffer, active_stream); diff --git a/quadrants/transforms/lower_ast.cpp b/quadrants/transforms/lower_ast.cpp index 74b698a9e6..ef1bb6f06a 100644 --- a/quadrants/transforms/lower_ast.cpp +++ b/quadrants/transforms/lower_ast.cpp @@ -232,6 +232,7 @@ class LowerAST : public IRVisitor { snode, std::move(stmt->body), stmt->is_bit_vectorized, stmt->num_cpu_threads, stmt->block_dim); new_for->index_offsets = offsets; + new_for->stream_parallel_group_id = stmt->stream_parallel_group_id; VecStatement new_statements; for (int i = 0; i < (int)stmt->loop_var_ids.size(); i++) { Stmt *loop_index = new_statements.push_back( @@ -270,6 +271,7 @@ class LowerAST : public IRVisitor { begin, end, std::move(stmt->body), stmt->is_bit_vectorized, stmt->num_cpu_threads, stmt->block_dim, stmt->strictly_serialized, /*range_hint=*/fmt::format("arg ({})", fmt::join(arg_id, ", "))); + new_for->stream_parallel_group_id = stmt->stream_parallel_group_id; VecStatement new_statements; Stmt *loop_index = new_statements.push_back(new_for.get(), 0); @@ -311,6 +313,7 @@ class LowerAST : public IRVisitor { begin_stmt, end_stmt, std::move(stmt->body), stmt->is_bit_vectorized, stmt->num_cpu_threads, stmt->block_dim, stmt->strictly_serialized); + new_for->stream_parallel_group_id = stmt->stream_parallel_group_id; new_for->body->insert(std::make_unique(new_for.get(), 0), 0); new_for->body->local_var_to_stmt[stmt->loop_var_ids[0]] = diff --git a/quadrants/transforms/offload.cpp b/quadrants/transforms/offload.cpp index 2f20247364..f3e254a889 100644 --- a/quadrants/transforms/offload.cpp +++ b/quadrants/transforms/offload.cpp @@ -134,6 +134,7 @@ class Offloader { offloaded->body->insert(std::move(s->body->statements[j])); } offloaded->range_hint = s->range_hint; + offloaded->stream_parallel_group_id = s->stream_parallel_group_id; root_block->insert(std::move(offloaded)); } else if (auto st = stmt->cast()) { assemble_serial_statements(); @@ -257,6 +258,8 @@ class Offloader { offloaded_struct_for->num_cpu_threads = std::min(for_stmt->num_cpu_threads, config.cpu_max_num_threads); offloaded_struct_for->mem_access_opt = mem_access_opt; + offloaded_struct_for->stream_parallel_group_id = + for_stmt->stream_parallel_group_id; root_block->insert(std::move(offloaded_struct_for)); } diff --git a/tests/python/test_api.py b/tests/python/test_api.py index 002014c960..241f3143de 100644 --- a/tests/python/test_api.py +++ b/tests/python/test_api.py @@ -218,6 +218,7 @@ def _get_expected_matrix_apis(): "static_assert", "static_print", "stop_grad", + "stream_parallel", "svd", "sym_eig", "sync", diff --git a/tests/python/test_streams.py b/tests/python/test_streams.py index 073d383c2e..4c28b6f581 100644 --- a/tests/python/test_streams.py +++ b/tests/python/test_streams.py @@ -180,23 +180,6 @@ def fill(): e.destroy() -@test_utils.test() -def test_stream_with_ndarray(): - N = 1024 - - @qd.kernel - def fill(arr: qd.types.ndarray(dtype=qd.f32, ndim=1)): - for i in range(N): - arr[i] = 99.0 - - arr = qd.ndarray(qd.f32, shape=(N,)) - s = qd.create_stream() - fill(arr, qd_stream=s) - s.synchronize() - assert np.allclose(arr.to_numpy(), 99.0) - s.destroy() - - @test_utils.test() def test_concurrent_streams_with_events(): """Two slow kernels on separate streams run concurrently (~1s on GPU), @@ -275,3 +258,164 @@ def add_first_two(a: qd.types.ndarray(dtype=qd.f32, ndim=1)): s2.destroy() e1.destroy() e2.destroy() + + +@test_utils.test() +def test_stream_parallel_basic(): + """Each with qd.stream_parallel() block runs on its own stream (serial fallback on CPU/Metal).""" + N = 1024 + a = qd.field(qd.f32, shape=(N,)) + b = qd.field(qd.f32, shape=(N,)) + + @qd.kernel + def fill_parallel(): + with qd.stream_parallel(): + for i in range(N): + a[i] = 1.0 + with qd.stream_parallel(): + for j in range(N): + b[j] = 2.0 + + fill_parallel() + qd.sync() + assert np.allclose(a.to_numpy(), 1.0) + assert np.allclose(b.to_numpy(), 2.0) + + +@test_utils.test() +def test_stream_parallel_multiple_loops_per_stream(): + """Multiple for loops inside one stream_parallel block share a stream (serial fallback on CPU/Metal).""" + N = 1024 + a = qd.field(qd.f32, shape=(N,)) + b = qd.field(qd.f32, shape=(N,)) + c = qd.field(qd.f32, shape=(N,)) + + @qd.kernel + def parallel_phase(): + with qd.stream_parallel(): + for i in range(N): + a[i] = 1.0 + for i in range(N): + a[i] = a[i] + 1.0 + with qd.stream_parallel(): + for j in range(N): + b[j] = 10.0 + + @qd.kernel + def combine(): + for i in range(N): + c[i] = a[i] + b[i] + + parallel_phase() + combine() + qd.sync() + assert np.allclose(a.to_numpy(), 2.0) + assert np.allclose(b.to_numpy(), 10.0) + assert np.allclose(c.to_numpy(), 12.0) + + +@test_utils.test() +def test_stream_parallel_timing(): + """stream_parallel achieves speedup on GPU, serial fallback elsewhere.""" + SPIN_ITERS = 5_000_000 + + a = qd.field(qd.i32, shape=(2,)) + b = qd.field(qd.i32, shape=(2,)) + + @qd.kernel + def serial_spin(): + for _ in range(1): + x = a[0] + for _j in range(SPIN_ITERS): + x = (1664525 * x + 1013904223) % 2147483647 + a[0] = x + for _ in range(1): + x = a[1] + for _j in range(SPIN_ITERS): + x = (1664525 * x + 1013904223) % 2147483647 + a[1] = x + + @qd.kernel + def parallel_spin(): + with qd.stream_parallel(): + for _ in range(1): + x = b[0] + for _j in range(SPIN_ITERS): + x = (1664525 * x + 1013904223) % 2147483647 + b[0] = x + with qd.stream_parallel(): + for _ in range(1): + x = b[1] + for _j in range(SPIN_ITERS): + x = (1664525 * x + 1013904223) % 2147483647 + b[1] = x + + import time + + # Warm up + serial_spin() + parallel_spin() + qd.sync() + + qd.sync() + t0 = time.perf_counter() + serial_spin() + qd.sync() + serial_time = time.perf_counter() - t0 + + qd.sync() + t0 = time.perf_counter() + parallel_spin() + qd.sync() + stream_time = time.perf_counter() - t0 + + speedup = serial_time / stream_time + if qd.lang.impl.current_cfg().arch in (qd.cuda, qd.amdgpu): + assert speedup > 1.5, ( + f"Expected >1.5x speedup, got {speedup:.2f}x " f"(serial={serial_time:.3f}s, stream={stream_time:.3f}s)" + ) + else: + assert speedup > 0.75, ( + f"Expected >=0.75x (serial fallback), got {speedup:.2f}x " + f"(serial={serial_time:.3f}s, stream={stream_time:.3f}s)" + ) + + +@test_utils.test() +def test_stream_parallel_rejects_mixed_top_level(): + """Mixing stream_parallel and non-stream_parallel at top level is an error.""" + import pytest # noqa: I001 + + from quadrants.lang.exception import QuadrantsSyntaxError + + N = 64 + a = qd.field(qd.f32, shape=(N,)) + + with pytest.raises(QuadrantsSyntaxError, match="all top-level statements"): + + @qd.kernel + def bad_kernel(): + with qd.stream_parallel(): + for i in range(N): + a[i] = 1.0 + for i in range(N): + a[i] = 2.0 + + bad_kernel() + + +@test_utils.test() +def test_stream_with_ndarray(): + N = 1024 + + @qd.kernel + def fill(arr: qd.types.ndarray(dtype=qd.f32, ndim=1)): + for i in range(N): + arr[i] = 99.0 + + arr = qd.ndarray(qd.f32, shape=(N,)) + s = qd.create_stream() + fill(arr, qd_stream=s) + s.synchronize() + assert np.allclose(arr.to_numpy(), 99.0) + s.destroy() From be7ad924c333a589f13bbbe34f2d9583649007f5 Mon Sep 17 00:00:00 2001 From: Hugh Perkins Date: Wed, 11 Mar 2026 17:29:24 -0700 Subject: [PATCH 02/20] Clear stream_parallel_group_id in ForLoopDecoratorRecorder::reset() Prevents stale group IDs from leaking if insert_for is called after a path that set a non-zero stream_parallel_group_id, matching the reset pattern of all other ForLoopConfig fields. --- quadrants/ir/frontend_ir.h | 1 + 1 file changed, 1 insertion(+) diff --git a/quadrants/ir/frontend_ir.h b/quadrants/ir/frontend_ir.h index 693a7f461f..38226ca1b3 100644 --- a/quadrants/ir/frontend_ir.h +++ b/quadrants/ir/frontend_ir.h @@ -954,6 +954,7 @@ class ASTBuilder { config.mem_access_opt.clear(); config.block_dim = 0; config.strictly_serialized = false; + config.stream_parallel_group_id = 0; } }; From ce8328102ae0b18f0b29d661b4dc4026edf3c4a8 Mon Sep 17 00:00:00 2001 From: Hugh Perkins Date: Wed, 11 Mar 2026 17:29:36 -0700 Subject: [PATCH 03/20] Reject nested stream_parallel blocks Add an error check in begin_stream_parallel() to prevent nesting, which would produce undefined group ID semantics. --- quadrants/ir/frontend_ir.h | 2 ++ 1 file changed, 2 insertions(+) diff --git a/quadrants/ir/frontend_ir.h b/quadrants/ir/frontend_ir.h index 38226ca1b3..46d7a3ec7a 100644 --- a/quadrants/ir/frontend_ir.h +++ b/quadrants/ir/frontend_ir.h @@ -1113,6 +1113,8 @@ class ASTBuilder { } void begin_stream_parallel() { + QD_ERROR_IF(current_stream_parallel_group_id_ != 0, + "Nested stream_parallel blocks are not supported"); current_stream_parallel_group_id_ = ++stream_parallel_group_counter_; } From 880abc7e74cc8be0979d54747ff753929f00221d Mon Sep 17 00:00:00 2001 From: Hugh Perkins Date: Wed, 11 Mar 2026 17:30:08 -0700 Subject: [PATCH 04/20] Document stream_parallel launcher design: per-launch streams, shared context safety Add comments explaining that streams are created/destroyed per launch (stream pooling as future optimization), and that RuntimeContext sharing across concurrent streams is safe because kernels only read from it. --- quadrants/runtime/amdgpu/kernel_launcher.cpp | 5 +++++ quadrants/runtime/cuda/kernel_launcher.cpp | 5 +++++ 2 files changed, 10 insertions(+) diff --git a/quadrants/runtime/amdgpu/kernel_launcher.cpp b/quadrants/runtime/amdgpu/kernel_launcher.cpp index f859bb116c..6abd0778ed 100644 --- a/quadrants/runtime/amdgpu/kernel_launcher.cpp +++ b/quadrants/runtime/amdgpu/kernel_launcher.cpp @@ -127,6 +127,8 @@ void KernelLauncher::launch_llvm_kernel(Handle handle, i++; } + // Create one stream per unique group ID. Streams are created/destroyed + // per launch; a stream pool could reduce overhead for hot loops. std::map stream_by_id; for (size_t j = group_start; j < i; j++) { int sid = offloaded_tasks[j].stream_parallel_group_id; @@ -137,6 +139,9 @@ void KernelLauncher::launch_llvm_kernel(Handle handle, } } + // Launch tasks concurrently on their respective streams. The shared + // RuntimeContext is safe here: kernels only read from it (args/runtime + // pointers); result_buffer writes are to disjoint offsets per task. for (size_t j = group_start; j < i; j++) { auto &t = offloaded_tasks[j]; AMDGPUContext::get_instance().set_stream( diff --git a/quadrants/runtime/cuda/kernel_launcher.cpp b/quadrants/runtime/cuda/kernel_launcher.cpp index 2e10226a13..9cf24915ab 100644 --- a/quadrants/runtime/cuda/kernel_launcher.cpp +++ b/quadrants/runtime/cuda/kernel_launcher.cpp @@ -159,6 +159,8 @@ void KernelLauncher::launch_llvm_kernel(Handle handle, i++; } + // Create one stream per unique group ID. Streams are created/destroyed + // per launch; a stream pool could reduce overhead for hot loops. std::map stream_by_id; for (size_t j = group_start; j < i; j++) { int sid = offloaded_tasks[j].stream_parallel_group_id; @@ -169,6 +171,9 @@ void KernelLauncher::launch_llvm_kernel(Handle handle, } } + // Launch tasks concurrently on their respective streams. The shared + // RuntimeContext is safe here: kernels only read from it (args/runtime + // pointers); result_buffer writes are to disjoint offsets per task. for (size_t j = group_start; j < i; j++) { auto &t = offloaded_tasks[j]; CUDAContext::get_instance().set_stream( From e9ce144a2302c55b097f61148bae2385808e8d5c Mon Sep 17 00:00:00 2001 From: Hugh Perkins Date: Sun, 19 Apr 2026 20:33:32 -0700 Subject: [PATCH 05/20] Apply clang-format Made-with: Cursor --- quadrants/codegen/llvm/llvm_compiled_data.h | 6 +----- quadrants/ir/frontend_ir.h | 3 +-- quadrants/runtime/amdgpu/kernel_launcher.cpp | 4 ++-- 3 files changed, 4 insertions(+), 9 deletions(-) diff --git a/quadrants/codegen/llvm/llvm_compiled_data.h b/quadrants/codegen/llvm/llvm_compiled_data.h index 4ed2e69abc..ba7b74e674 100644 --- a/quadrants/codegen/llvm/llvm_compiled_data.h +++ b/quadrants/codegen/llvm/llvm_compiled_data.h @@ -26,11 +26,7 @@ class OffloadedTask { grid_dim(grid_dim), dynamic_shared_array_bytes(dynamic_shared_array_bytes), stream_parallel_group_id(stream_parallel_group_id) {}; - QD_IO_DEF(name, - block_dim, - grid_dim, - dynamic_shared_array_bytes, - stream_parallel_group_id); + QD_IO_DEF(name, block_dim, grid_dim, dynamic_shared_array_bytes, stream_parallel_group_id); }; struct LLVMCompiledTask { diff --git a/quadrants/ir/frontend_ir.h b/quadrants/ir/frontend_ir.h index 0ceed57772..b4ad04a9b5 100644 --- a/quadrants/ir/frontend_ir.h +++ b/quadrants/ir/frontend_ir.h @@ -1028,8 +1028,7 @@ class ASTBuilder { } void begin_stream_parallel() { - QD_ERROR_IF(current_stream_parallel_group_id_ != 0, - "Nested stream_parallel blocks are not supported"); + QD_ERROR_IF(current_stream_parallel_group_id_ != 0, "Nested stream_parallel blocks are not supported"); current_stream_parallel_group_id_ = ++stream_parallel_group_counter_; } diff --git a/quadrants/runtime/amdgpu/kernel_launcher.cpp b/quadrants/runtime/amdgpu/kernel_launcher.cpp index 83df04490f..57659a5cfa 100644 --- a/quadrants/runtime/amdgpu/kernel_launcher.cpp +++ b/quadrants/runtime/amdgpu/kernel_launcher.cpp @@ -44,8 +44,8 @@ void KernelLauncher::launch_offloaded_tasks(JITModule *amdgpu_module, for (size_t j = group_start; j < i; j++) { const auto &t = offloaded_tasks[j]; AMDGPUContext::get_instance().set_stream(stream_by_id[t.stream_parallel_group_id]); - amdgpu_module->launch(t.name, t.grid_dim, t.block_dim, t.dynamic_shared_array_bytes, - {(void *)&context_pointer}, {arg_size}); + amdgpu_module->launch(t.name, t.grid_dim, t.block_dim, t.dynamic_shared_array_bytes, {(void *)&context_pointer}, + {arg_size}); } for (auto &[sid, s] : stream_by_id) { From 8cd793c888fec5815aa5b7d04361aad251da5268 Mon Sep 17 00:00:00 2001 From: Hugh Perkins Date: Tue, 28 Apr 2026 08:51:10 -0700 Subject: [PATCH 06/20] [Doc] Add stream_parallel() section to streams user guide --- docs/source/user_guide/streams.md | 78 +++++++++++++++++++++---------- 1 file changed, 54 insertions(+), 24 deletions(-) diff --git a/docs/source/user_guide/streams.md b/docs/source/user_guide/streams.md index cd26e01d20..b9a2f5798e 100644 --- a/docs/source/user_guide/streams.md +++ b/docs/source/user_guide/streams.md @@ -1,23 +1,26 @@ # Streams Streams allow concurrent execution of GPU operations. By default, all Quadrants kernels launch on the default -stream, which serializes everything. By creating explicit streams, you can run independent kernels concurrently -and control synchronization with events. +stream, which serializes everything. With streams, you can run multiple top-level for loops in parallel. ## Supported platforms -| Backend | Streams | Events | Notes | -|---------|---------|--------|-------| -| CUDA | Yes | Yes | Full concurrent execution | -| AMDGPU | Yes | Yes | Full concurrent execution (requires ROCm >= 5.4) | -| CPU | No-op | No-op | `qd_stream` is silently ignored, kernels run serially | -| Metal | No-op | No-op | `qd_stream` is silently ignored, kernels run serially | -| Vulkan | No-op | No-op | `qd_stream` is silently ignored, kernels run serially | +| Backend | Supported | +|---------|-----------| +| CUDA | Yes | +| AMDGPU | Yes | +| CPU | No-op | +| Metal | No-op | +| Vulkan | No-op | -On backends without native stream support, `create_stream()` and `create_event()` return objects with handle -`0`. All stream/event operations become no-ops and kernels run serially. Code written with streams is portable across all backends in the sense that it will run without modifications, but serially. +On backends without native stream support, stream operations are no-ops and for loops run serially. Code using +streams is portable across all backends — it will run without modifications, but serially. -## Creating and using streams +## Stream parallelism + +Inside a `@qd.kernel`, each `with qd.stream_parallel():` block runs on its own GPU stream. The runtime +creates temporary streams, launches the for loops, and synchronizes automatically before the next +non-parallel statement. ```python import quadrants as qd @@ -27,17 +30,43 @@ qd.init(arch=qd.cuda) N = 1024 a = qd.field(qd.f32, shape=(N,)) b = qd.field(qd.f32, shape=(N,)) +c = qd.field(qd.f32, shape=(N,)) @qd.kernel -def fill_a(): - for i in range(N): - a[i] = 1.0 +def compute_ab(): + with qd.stream_parallel(): + for i in range(N): + a[i] = compute_a(i) + with qd.stream_parallel(): + for j in range(N): + b[j] = compute_b(j) @qd.kernel -def fill_b(): +def combine(): for i in range(N): - b[i] = 2.0 + c[i] = a[i] + b[i] + +compute_ab() # the two stream_parallel blocks run concurrently +combine() # runs after compute_ab() returns — a[] and b[] are ready +``` + +Consecutive `with qd.stream_parallel():` blocks run concurrently. Multiple for loops within a single block +share a stream and run serially on it. All streams are synchronized before the kernel returns. + +### Restrictions +- All top-level statements in a kernel must be either all `stream_parallel` blocks or all regular statements. + Mixing the two at the top level is a compile-time error. +- Nesting `stream_parallel` blocks is not supported. + +## Explicit streams + +For cases that require manual control — such as launching separate kernels on different streams or +interoperating with PyTorch — you can create and manage streams directly. + +### Creating and using streams + +```python s1 = qd.create_stream() s2 = qd.create_stream() @@ -54,7 +83,7 @@ s2.destroy() Pass `qd_stream=` to any kernel call to launch it on that stream. Kernels on different streams may execute concurrently. Call `synchronize()` to block until all work on a stream completes. -## Events +### Events Events let you express dependencies between streams without full synchronization. @@ -89,7 +118,7 @@ s2.destroy() `e.record(stream)` captures the point in `stream`'s execution. `e.wait(qd_stream=stream)` makes `stream` wait until the recorded point is reached. If `qd_stream` is omitted, the default stream waits. -## Context managers +### Context managers Streams and events support `with` blocks for automatic cleanup: @@ -100,13 +129,13 @@ with qd.create_stream() as s: # s.destroy() called automatically ``` -## PyTorch interop (CUDA) +### PyTorch interop (CUDA) When mixing Quadrants kernels with PyTorch operations on CUDA, both frameworks must use the same stream to avoid race conditions. Without explicit stream management, Quadrants and PyTorch may launch work on different streams with no ordering guarantees, leading to intermittent data corruption. -### Running Quadrants kernels on PyTorch's stream +#### Running Quadrants kernels on PyTorch's stream ```python import torch @@ -123,7 +152,7 @@ apply_actions_kernel(qd_stream=stream) Wrap PyTorch's raw `CUstream` pointer in a Quadrants `Stream` object. Do **not** call `destroy()` on this wrapper — PyTorch owns the underlying stream. -### Running PyTorch operations on a Quadrants stream +#### Running PyTorch operations on a Quadrants stream ```python qd_stream = qd.create_stream() @@ -142,5 +171,6 @@ qd_stream.destroy() ## Limitations - **Not compatible with graphs.** Do not pass `qd_stream` to a kernel decorated with `graph=True`. -- **No automatic synchronization.** You are responsible for inserting events or `synchronize()` calls when one - stream's output is another stream's input. +- **No automatic synchronization with explicit streams.** When using explicit streams, you are responsible for + inserting events or `synchronize()` calls when one stream's output is another stream's input. + `stream_parallel` handles this automatically. From 3b0ba294ace8518f70f8cb0516787bc9651ed644 Mon Sep 17 00:00:00 2001 From: Hugh Perkins Date: Fri, 1 May 2026 04:18:35 -0700 Subject: [PATCH 07/20] Restore deleted comments, fix docstring wrapping, fix per-task adstack publish in stream-parallel loop - Restore the deleted comments explaining why device_context_ptr is passed to publish_adstack_metadata (CUDA_ERROR_ILLEGAL_ADDRESS / hipErrorIllegalAddress on non-HMM GPUs). - Reflow stream.py docstring to 120-char wrap. - Move publish_adstack_metadata into the inner per-task loop for stream-parallel dispatch so each task gets its own adstack metadata published before launch (fixes latent bug for autodiff kernels). --- python/quadrants/lang/stream.py | 5 ++--- quadrants/runtime/amdgpu/kernel_launcher.cpp | 11 ++++++++--- quadrants/runtime/cuda/kernel_launcher.cpp | 11 +++++++++-- 3 files changed, 19 insertions(+), 8 deletions(-) diff --git a/python/quadrants/lang/stream.py b/python/quadrants/lang/stream.py index 721d989109..395cc9d25c 100644 --- a/python/quadrants/lang/stream.py +++ b/python/quadrants/lang/stream.py @@ -132,9 +132,8 @@ def create_event() -> Event: def stream_parallel(): """Run top-level for loops in this block on separate GPU streams. - Used inside @qd.kernel. At Python runtime (outside kernels), this is a - no-op. During kernel compilation, the AST transformer calls into the C++ - ASTBuilder to tag loops with a stream-parallel group ID. + Used inside @qd.kernel. At Python runtime (outside kernels), this is a no-op. During kernel compilation, the AST + transformer calls into the C++ ASTBuilder to tag loops with a stream-parallel group ID. """ yield diff --git a/quadrants/runtime/amdgpu/kernel_launcher.cpp b/quadrants/runtime/amdgpu/kernel_launcher.cpp index 0c71f8fa85..fa053b74b5 100644 --- a/quadrants/runtime/amdgpu/kernel_launcher.cpp +++ b/quadrants/runtime/amdgpu/kernel_launcher.cpp @@ -55,8 +55,12 @@ void KernelLauncher::launch_offloaded_tasks(LaunchContextBuilder &ctx, auto *active_stream = AMDGPUContext::get_instance().get_stream(); for (size_t i = 0; i < offloaded_tasks.size();) { const auto &task = offloaded_tasks[i]; - executor->publish_adstack_metadata(task.ad_stack, resolve_num_threads(task, executor), &ctx, context_pointer); if (task.stream_parallel_group_id == 0) { + // Pass the device-side `RuntimeContext` pointer through to the adstack sizer kernel. Without this the + // sizer launches with a host pointer and the next DtoH sync trips + // `hipErrorIllegalAddress ... memcpy_device_to_host` because HIP has no UVA fallback for the host + // `RuntimeContext` struct. + executor->publish_adstack_metadata(task.ad_stack, resolve_num_threads(task, executor), &ctx, context_pointer); QD_TRACE("Launching kernel {}<<<{}, {}>>>", task.name, task.grid_dim, task.block_dim); amdgpu_module->launch(task.name, task.grid_dim, task.block_dim, task.dynamic_shared_array_bytes, {(void *)&context_pointer}, {arg_size}); @@ -79,9 +83,10 @@ void KernelLauncher::launch_offloaded_tasks(LaunchContextBuilder &ctx, for (size_t j = group_start; j < i; j++) { const auto &t = offloaded_tasks[j]; + executor->publish_adstack_metadata(t.ad_stack, resolve_num_threads(t, executor), &ctx, context_pointer); AMDGPUContext::get_instance().set_stream(stream_by_id[t.stream_parallel_group_id]); - amdgpu_module->launch(t.name, t.grid_dim, t.block_dim, t.dynamic_shared_array_bytes, {(void *)&context_pointer}, - {arg_size}); + amdgpu_module->launch(t.name, t.grid_dim, t.block_dim, t.dynamic_shared_array_bytes, + {(void *)&context_pointer}, {arg_size}); } for (auto &[sid, s] : stream_by_id) { diff --git a/quadrants/runtime/cuda/kernel_launcher.cpp b/quadrants/runtime/cuda/kernel_launcher.cpp index a3e97e3a26..ac0ccc8896 100644 --- a/quadrants/runtime/cuda/kernel_launcher.cpp +++ b/quadrants/runtime/cuda/kernel_launcher.cpp @@ -54,9 +54,14 @@ void KernelLauncher::launch_offloaded_tasks(LaunchContextBuilder &ctx, auto *active_stream = CUDAContext::get_instance().get_stream(); for (size_t i = 0; i < offloaded_tasks.size();) { const auto &task = offloaded_tasks[i]; - std::size_t n = resolve_num_threads(task.ad_stack, executor); - executor->publish_adstack_metadata(task.ad_stack, n, &ctx, device_context_ptr); if (task.stream_parallel_group_id == 0) { + std::size_t n = resolve_num_threads(task.ad_stack, executor); + // Pass the device-side `RuntimeContext` pointer through to the adstack sizer kernel. Without it the sizer + // launches with a host pointer and the next DtoH sync trips `CUDA_ERROR_ILLEGAL_ADDRESS ... + // memcpy_device_to_host` on GPUs whose driver + kernel cannot coherently access pageable host memory (the HMM + // capability gated below in `launch_llvm_kernel`). `nullptr` on HMM-capable setups keeps + // `publish_adstack_metadata`'s host-pointer fast path. + executor->publish_adstack_metadata(task.ad_stack, n, &ctx, device_context_ptr); QD_TRACE("Launching kernel {}<<<{}, {}>>>", task.name, task.grid_dim, task.block_dim); cuda_module->launch(task.name, task.grid_dim, task.block_dim, task.dynamic_shared_array_bytes, {&ctx.get_context()}, {}); @@ -79,6 +84,8 @@ void KernelLauncher::launch_offloaded_tasks(LaunchContextBuilder &ctx, for (size_t j = group_start; j < i; j++) { const auto &t = offloaded_tasks[j]; + std::size_t n_t = resolve_num_threads(t.ad_stack, executor); + executor->publish_adstack_metadata(t.ad_stack, n_t, &ctx, device_context_ptr); CUDAContext::get_instance().set_stream(stream_by_id[t.stream_parallel_group_id]); cuda_module->launch(t.name, t.grid_dim, t.block_dim, t.dynamic_shared_array_bytes, {&ctx.get_context()}, {}); } From 1c62eaecb93bca645a2f80cf40a3ee0ff849dead Mon Sep 17 00:00:00 2001 From: Hugh Perkins Date: Fri, 1 May 2026 04:22:05 -0700 Subject: [PATCH 08/20] Fix clang-format line break in AMDGPU kernel launcher --- quadrants/runtime/amdgpu/kernel_launcher.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/quadrants/runtime/amdgpu/kernel_launcher.cpp b/quadrants/runtime/amdgpu/kernel_launcher.cpp index fa053b74b5..43664a68da 100644 --- a/quadrants/runtime/amdgpu/kernel_launcher.cpp +++ b/quadrants/runtime/amdgpu/kernel_launcher.cpp @@ -85,8 +85,8 @@ void KernelLauncher::launch_offloaded_tasks(LaunchContextBuilder &ctx, const auto &t = offloaded_tasks[j]; executor->publish_adstack_metadata(t.ad_stack, resolve_num_threads(t, executor), &ctx, context_pointer); AMDGPUContext::get_instance().set_stream(stream_by_id[t.stream_parallel_group_id]); - amdgpu_module->launch(t.name, t.grid_dim, t.block_dim, t.dynamic_shared_array_bytes, - {(void *)&context_pointer}, {arg_size}); + amdgpu_module->launch(t.name, t.grid_dim, t.block_dim, t.dynamic_shared_array_bytes, {(void *)&context_pointer}, + {arg_size}); } for (auto &[sid, s] : stream_by_id) { From 216f7d53d91af16c033410a73be767332cf5625b Mon Sep 17 00:00:00 2001 From: Hugh Perkins Date: Fri, 1 May 2026 04:42:54 -0700 Subject: [PATCH 09/20] Address Claude review: reject stream_parallel in @qd.func, use non-blocking streams - Reject qd.stream_parallel() inside @qd.func with a clear error; it's only valid in @qd.kernel. - Use CU_STREAM_NON_BLOCKING (0x1) for internal stream-parallel streams, matching the convention in Program::stream_create. Blocking streams (flag 0) serialize with the legacy NULL stream, defeating the purpose of parallel dispatch. --- python/quadrants/lang/ast/ast_transformer.py | 2 ++ quadrants/runtime/amdgpu/kernel_launcher.cpp | 2 +- quadrants/runtime/cuda/kernel_launcher.cpp | 2 +- 3 files changed, 4 insertions(+), 2 deletions(-) diff --git a/python/quadrants/lang/ast/ast_transformer.py b/python/quadrants/lang/ast/ast_transformer.py index 99e10bc4f9..b5b78455c6 100644 --- a/python/quadrants/lang/ast/ast_transformer.py +++ b/python/quadrants/lang/ast/ast_transformer.py @@ -1541,6 +1541,8 @@ def build_With(ctx: ASTTransformerFuncContext, node: ast.With) -> None: raise QuadrantsSyntaxError("'with' in Quadrants kernels requires a call expression") if not ASTResolver.resolve_to(item.context_expr.func, stream_parallel, ctx.global_vars): raise QuadrantsSyntaxError("'with' in Quadrants kernels only supports qd.stream_parallel()") + if not ctx.is_kernel: + raise QuadrantsSyntaxError("qd.stream_parallel() can only be used inside @qd.kernel, not @qd.func") ctx.ast_builder.begin_stream_parallel() build_stmts(ctx, node.body) ctx.ast_builder.end_stream_parallel() diff --git a/quadrants/runtime/amdgpu/kernel_launcher.cpp b/quadrants/runtime/amdgpu/kernel_launcher.cpp index 43664a68da..1da2ec5b0a 100644 --- a/quadrants/runtime/amdgpu/kernel_launcher.cpp +++ b/quadrants/runtime/amdgpu/kernel_launcher.cpp @@ -76,7 +76,7 @@ void KernelLauncher::launch_offloaded_tasks(LaunchContextBuilder &ctx, int sid = offloaded_tasks[j].stream_parallel_group_id; if (stream_by_id.find(sid) == stream_by_id.end()) { void *s = nullptr; - AMDGPUDriver::get_instance().stream_create(&s, 0); + AMDGPUDriver::get_instance().stream_create(&s, 0x1 /*CU_STREAM_NON_BLOCKING*/); stream_by_id[sid] = s; } } diff --git a/quadrants/runtime/cuda/kernel_launcher.cpp b/quadrants/runtime/cuda/kernel_launcher.cpp index ac0ccc8896..b11d3a334b 100644 --- a/quadrants/runtime/cuda/kernel_launcher.cpp +++ b/quadrants/runtime/cuda/kernel_launcher.cpp @@ -77,7 +77,7 @@ void KernelLauncher::launch_offloaded_tasks(LaunchContextBuilder &ctx, int sid = offloaded_tasks[j].stream_parallel_group_id; if (stream_by_id.find(sid) == stream_by_id.end()) { void *s = nullptr; - CUDADriver::get_instance().stream_create(&s, 0); + CUDADriver::get_instance().stream_create(&s, 0x1 /*CU_STREAM_NON_BLOCKING*/); stream_by_id[sid] = s; } } From 74604f2753aa778a200d65fe1d882f9c32f6f096 Mon Sep 17 00:00:00 2001 From: Hugh Perkins Date: Fri, 1 May 2026 05:03:57 -0700 Subject: [PATCH 10/20] Allow docstrings in stream_parallel kernels, merge base branch updates The stream_parallel exclusivity validation now skips docstrings (bare string expressions at body[0]), so kernels with docstrings don't get falsely rejected. Also applied style cleanup from earlier review (use `if not any(...)` pattern). --- .../ast/ast_transformers/function_def_transformer.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/python/quadrants/lang/ast/ast_transformers/function_def_transformer.py b/python/quadrants/lang/ast/ast_transformers/function_def_transformer.py index 135b702d6f..d6b64b5080 100644 --- a/python/quadrants/lang/ast/ast_transformers/function_def_transformer.py +++ b/python/quadrants/lang/ast/ast_transformers/function_def_transformer.py @@ -464,12 +464,17 @@ def _is_stream_parallel_with(stmt: ast.stmt, global_vars: dict[str, Any]) -> boo return False return ASTResolver.resolve_to(item.context_expr.func, stream_parallel, global_vars) + @staticmethod + def _is_docstring(stmt: ast.stmt, index: int) -> bool: + return index == 0 and isinstance(stmt, ast.Expr) and isinstance(stmt.value, (ast.Constant, ast.Str)) + @staticmethod def _validate_stream_parallel_exclusivity(body: list[ast.stmt], global_vars: dict[str, Any]) -> None: - has_sp = any(FunctionDefTransformer._is_stream_parallel_with(s, global_vars) for s in body) - if not has_sp: + if not any(FunctionDefTransformer._is_stream_parallel_with(s, global_vars) for s in body): return - for stmt in body: + for i, stmt in enumerate(body): + if FunctionDefTransformer._is_docstring(stmt, i): + continue if not FunctionDefTransformer._is_stream_parallel_with(stmt, global_vars): raise QuadrantsSyntaxError( "When using qd.stream_parallel(), all top-level statements " From 1f471b37ab7728777cb0cb339ba16c0b3164301e Mon Sep 17 00:00:00 2001 From: Hugh Perkins Date: Fri, 1 May 2026 06:06:11 -0700 Subject: [PATCH 11/20] Fix AMDGPU stream flag comment: HIP_STREAM_NON_BLOCKING not CU_STREAM_NON_BLOCKING --- quadrants/runtime/amdgpu/kernel_launcher.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/quadrants/runtime/amdgpu/kernel_launcher.cpp b/quadrants/runtime/amdgpu/kernel_launcher.cpp index 50b80294ce..e57c8675d7 100644 --- a/quadrants/runtime/amdgpu/kernel_launcher.cpp +++ b/quadrants/runtime/amdgpu/kernel_launcher.cpp @@ -78,7 +78,7 @@ void KernelLauncher::launch_offloaded_tasks(LaunchContextBuilder &ctx, int sid = offloaded_tasks[j].stream_parallel_group_id; if (stream_by_id.find(sid) == stream_by_id.end()) { void *s = nullptr; - AMDGPUDriver::get_instance().stream_create(&s, 0x1 /*CU_STREAM_NON_BLOCKING*/); + AMDGPUDriver::get_instance().stream_create(&s, 0x1 /*HIP_STREAM_NON_BLOCKING*/); stream_by_id[sid] = s; } } From 88f1bf7ef578e1043fc7df0b8fe575df7dde5bc7 Mon Sep 17 00:00:00 2001 From: Hugh Perkins Date: Fri, 1 May 2026 06:47:47 -0700 Subject: [PATCH 12/20] Add stream_parallel_group_id to QD_STMT_DEF_FIELDS for cache key correctness Without this, the offline cache considers two kernels that differ only in stream_parallel_group_id assignments as identical, potentially serving a cached version with wrong group IDs. --- quadrants/ir/statements.h | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/quadrants/ir/statements.h b/quadrants/ir/statements.h index 503a1ed183..c9b0d79841 100644 --- a/quadrants/ir/statements.h +++ b/quadrants/ir/statements.h @@ -978,7 +978,7 @@ class RangeForStmt : public Stmt { std::unique_ptr clone() const override; - QD_STMT_DEF_FIELDS(begin, end, reversed, is_bit_vectorized, num_cpu_threads, block_dim, strictly_serialized); + QD_STMT_DEF_FIELDS(begin, end, reversed, is_bit_vectorized, num_cpu_threads, block_dim, strictly_serialized, stream_parallel_group_id); QD_DEFINE_ACCEPT }; @@ -1012,7 +1012,7 @@ class StructForStmt : public Stmt { std::unique_ptr clone() const override; - QD_STMT_DEF_FIELDS(snode, index_offsets, is_bit_vectorized, num_cpu_threads, block_dim, mem_access_opt); + QD_STMT_DEF_FIELDS(snode, index_offsets, is_bit_vectorized, num_cpu_threads, block_dim, mem_access_opt, stream_parallel_group_id); QD_DEFINE_ACCEPT }; @@ -1393,7 +1393,8 @@ class OffloadedStmt : public Stmt { reversed, num_cpu_threads, index_offsets, - mem_access_opt); + mem_access_opt, + stream_parallel_group_id); QD_DEFINE_ACCEPT }; From ca560b64d6e1f20ec4bcfc68d8081d87c466de10 Mon Sep 17 00:00:00 2001 From: Hugh Perkins Date: Fri, 1 May 2026 06:58:28 -0700 Subject: [PATCH 13/20] Fix clang-format: multi-line QD_STMT_DEF_FIELDS for RangeForStmt and StructForStmt --- quadrants/ir/statements.h | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/quadrants/ir/statements.h b/quadrants/ir/statements.h index c9b0d79841..c29c648995 100644 --- a/quadrants/ir/statements.h +++ b/quadrants/ir/statements.h @@ -978,7 +978,14 @@ class RangeForStmt : public Stmt { std::unique_ptr clone() const override; - QD_STMT_DEF_FIELDS(begin, end, reversed, is_bit_vectorized, num_cpu_threads, block_dim, strictly_serialized, stream_parallel_group_id); + QD_STMT_DEF_FIELDS(begin, + end, + reversed, + is_bit_vectorized, + num_cpu_threads, + block_dim, + strictly_serialized, + stream_parallel_group_id); QD_DEFINE_ACCEPT }; @@ -1012,7 +1019,13 @@ class StructForStmt : public Stmt { std::unique_ptr clone() const override; - QD_STMT_DEF_FIELDS(snode, index_offsets, is_bit_vectorized, num_cpu_threads, block_dim, mem_access_opt, stream_parallel_group_id); + QD_STMT_DEF_FIELDS(snode, + index_offsets, + is_bit_vectorized, + num_cpu_threads, + block_dim, + mem_access_opt, + stream_parallel_group_id); QD_DEFINE_ACCEPT }; From df0b03a6d1505a47b583677b7d6af4bdf040388a Mon Sep 17 00:00:00 2001 From: Hugh Perkins Date: Fri, 1 May 2026 11:12:05 -0700 Subject: [PATCH 14/20] Fix stream_parallel identity check failing on dual-import-path builds The _is_stream_parallel_with validation uses ASTResolver.resolve_to which compares objects with `is`. On Linux build runners where quadrants is available from both the source tree and installed location, the stream_parallel function object may differ between import paths. Add a fallback that checks __name__ and __module__ when identity fails, and add ASTResolver.resolve_value for general AST-to-object resolution. Co-authored-by: Cursor --- python/quadrants/lang/ast/ast_transformer.py | 4 +-- .../function_def_transformer.py | 10 +++++- python/quadrants/lang/ast/symbol_resolver.py | 32 +++++++++++++++++++ 3 files changed, 43 insertions(+), 3 deletions(-) diff --git a/python/quadrants/lang/ast/ast_transformer.py b/python/quadrants/lang/ast/ast_transformer.py index b5b78455c6..a0048ccb61 100644 --- a/python/quadrants/lang/ast/ast_transformer.py +++ b/python/quadrants/lang/ast/ast_transformer.py @@ -40,7 +40,7 @@ from quadrants.lang.field import Field from quadrants.lang.matrix import Matrix, MatrixType from quadrants.lang.snode import append, deactivate, length -from quadrants.lang.stream import stream_parallel + from quadrants.lang.struct import Struct, StructType from quadrants.lang.util import ( is_from_quadrants_module as _is_from_quadrants_module, @@ -1539,7 +1539,7 @@ def build_With(ctx: ASTTransformerFuncContext, node: ast.With) -> None: raise QuadrantsSyntaxError("'with ... as ...' is not supported in Quadrants kernels") if not isinstance(item.context_expr, ast.Call): raise QuadrantsSyntaxError("'with' in Quadrants kernels requires a call expression") - if not ASTResolver.resolve_to(item.context_expr.func, stream_parallel, ctx.global_vars): + if not FunctionDefTransformer._is_stream_parallel_with(node, ctx.global_vars): raise QuadrantsSyntaxError("'with' in Quadrants kernels only supports qd.stream_parallel()") if not ctx.is_kernel: raise QuadrantsSyntaxError("qd.stream_parallel() can only be used inside @qd.kernel, not @qd.func") diff --git a/python/quadrants/lang/ast/ast_transformers/function_def_transformer.py b/python/quadrants/lang/ast/ast_transformers/function_def_transformer.py index d6b64b5080..12997eba80 100644 --- a/python/quadrants/lang/ast/ast_transformers/function_def_transformer.py +++ b/python/quadrants/lang/ast/ast_transformers/function_def_transformer.py @@ -462,7 +462,15 @@ def _is_stream_parallel_with(stmt: ast.stmt, global_vars: dict[str, Any]) -> boo item = stmt.items[0] if not isinstance(item.context_expr, ast.Call): return False - return ASTResolver.resolve_to(item.context_expr.func, stream_parallel, global_vars) + func_node = item.context_expr.func + if ASTResolver.resolve_to(func_node, stream_parallel, global_vars): + return True + resolved = ASTResolver.resolve_value(func_node, global_vars) + return ( + resolved is not None + and getattr(resolved, "__name__", None) == "stream_parallel" + and getattr(resolved, "__module__", None) == "quadrants.lang.stream" + ) @staticmethod def _is_docstring(stmt: ast.stmt, index: int) -> bool: diff --git a/python/quadrants/lang/ast/symbol_resolver.py b/python/quadrants/lang/ast/symbol_resolver.py index 81296fcefb..f95373a463 100644 --- a/python/quadrants/lang/ast/symbol_resolver.py +++ b/python/quadrants/lang/ast/symbol_resolver.py @@ -55,3 +55,35 @@ def resolve_to(node, wanted, scope): return False # The name ``scope`` here could be a bit confusing return scope is wanted + + @staticmethod + def resolve_value(node, scope): + """Resolve an AST Name/Attribute node to a Python object. + + Same traversal as resolve_to but returns the resolved object (or None) + instead of comparing against a wanted value. + """ + if isinstance(node, ast.Name): + return scope.get(node.id) if isinstance(scope, dict) else None + + if not isinstance(node, ast.Attribute): + return None + + v = node.value + chain = [node.attr] + while isinstance(v, ast.Attribute): + chain.append(v.attr) + v = v.value + if not isinstance(v, ast.Name): + return None + chain.append(v.id) + + for attr in reversed(chain): + try: + if isinstance(scope, dict): + scope = scope[attr] + else: + scope = getattr(scope, attr) + except (KeyError, AttributeError): + return None + return scope From acff351a403af45f3cf0b27660ae2033c2544401 Mon Sep 17 00:00:00 2001 From: Hugh Perkins Date: Fri, 1 May 2026 11:22:39 -0700 Subject: [PATCH 15/20] Remove unused ASTResolver import from ast_transformer.py Co-authored-by: Cursor --- python/quadrants/lang/ast/ast_transformer.py | 1 - 1 file changed, 1 deletion(-) diff --git a/python/quadrants/lang/ast/ast_transformer.py b/python/quadrants/lang/ast/ast_transformer.py index a0048ccb61..152a952044 100644 --- a/python/quadrants/lang/ast/ast_transformer.py +++ b/python/quadrants/lang/ast/ast_transformer.py @@ -28,7 +28,6 @@ from quadrants.lang.ast.ast_transformers.function_def_transformer import ( FunctionDefTransformer, ) -from quadrants.lang.ast.symbol_resolver import ASTResolver from quadrants.lang.exception import ( QuadrantsIndexError, QuadrantsRuntimeTypeError, From 70eb471521763e251588c48e3e607248f8152c64 Mon Sep 17 00:00:00 2001 From: Hugh Perkins Date: Fri, 1 May 2026 11:33:14 -0700 Subject: [PATCH 16/20] Fix import sorting in ast_transformer.py Co-authored-by: Cursor --- python/quadrants/lang/ast/ast_transformer.py | 1 - 1 file changed, 1 deletion(-) diff --git a/python/quadrants/lang/ast/ast_transformer.py b/python/quadrants/lang/ast/ast_transformer.py index 152a952044..263a4a11a3 100644 --- a/python/quadrants/lang/ast/ast_transformer.py +++ b/python/quadrants/lang/ast/ast_transformer.py @@ -39,7 +39,6 @@ from quadrants.lang.field import Field from quadrants.lang.matrix import Matrix, MatrixType from quadrants.lang.snode import append, deactivate, length - from quadrants.lang.struct import Struct, StructType from quadrants.lang.util import ( is_from_quadrants_module as _is_from_quadrants_module, From ebd5e119cf5019e8539e1de5f3a75d1d8c936e22 Mon Sep 17 00:00:00 2001 From: Hugh Perkins Date: Fri, 1 May 2026 12:36:40 -0700 Subject: [PATCH 17/20] Add AST-level fallback for stream_parallel detection When object resolution fails (dual import paths), fall back to checking the AST node name directly. Inside @qd.kernel the only valid with-context is qd.stream_parallel(), so checking the attribute name is sufficient. Co-authored-by: Cursor --- .../ast_transformers/function_def_transformer.py | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/python/quadrants/lang/ast/ast_transformers/function_def_transformer.py b/python/quadrants/lang/ast/ast_transformers/function_def_transformer.py index 12997eba80..7a42dfff87 100644 --- a/python/quadrants/lang/ast/ast_transformers/function_def_transformer.py +++ b/python/quadrants/lang/ast/ast_transformers/function_def_transformer.py @@ -466,11 +466,16 @@ def _is_stream_parallel_with(stmt: ast.stmt, global_vars: dict[str, Any]) -> boo if ASTResolver.resolve_to(func_node, stream_parallel, global_vars): return True resolved = ASTResolver.resolve_value(func_node, global_vars) - return ( - resolved is not None - and getattr(resolved, "__name__", None) == "stream_parallel" - and getattr(resolved, "__module__", None) == "quadrants.lang.stream" - ) + if resolved is not None: + return ( + getattr(resolved, "__name__", None) == "stream_parallel" + and getattr(resolved, "__module__", "").startswith("quadrants") + ) + if isinstance(func_node, ast.Attribute) and func_node.attr == "stream_parallel": + return True + if isinstance(func_node, ast.Name) and func_node.id == "stream_parallel": + return True + return False @staticmethod def _is_docstring(stmt: ast.stmt, index: int) -> bool: From a6c385200add940d1f7182041e756c7b6748e744 Mon Sep 17 00:00:00 2001 From: Hugh Perkins Date: Fri, 1 May 2026 12:37:08 -0700 Subject: [PATCH 18/20] Add diagnostic info to stream_parallel exclusivity error message Include the failing statement type, index, and body length to help debug the persistent Linux build x64 test failures. Co-authored-by: Cursor --- .../lang/ast/ast_transformers/function_def_transformer.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/python/quadrants/lang/ast/ast_transformers/function_def_transformer.py b/python/quadrants/lang/ast/ast_transformers/function_def_transformer.py index 7a42dfff87..4ffee5fc2e 100644 --- a/python/quadrants/lang/ast/ast_transformers/function_def_transformer.py +++ b/python/quadrants/lang/ast/ast_transformers/function_def_transformer.py @@ -489,8 +489,14 @@ def _validate_stream_parallel_exclusivity(body: list[ast.stmt], global_vars: dic if FunctionDefTransformer._is_docstring(stmt, i): continue if not FunctionDefTransformer._is_stream_parallel_with(stmt, global_vars): + stmt_desc = f"{type(stmt).__name__}" + if isinstance(stmt, ast.With) and stmt.items: + ctx_expr = stmt.items[0].context_expr + if isinstance(ctx_expr, ast.Call) and isinstance(ctx_expr.func, ast.Attribute): + stmt_desc += f"(with {ast.dump(ctx_expr.func)})" raise QuadrantsSyntaxError( "When using qd.stream_parallel(), all top-level statements " "in the kernel must be 'with qd.stream_parallel():' blocks. " - "Move non-parallel code to a separate kernel." + f"Move non-parallel code to a separate kernel. " + f"[stmt {i}: {stmt_desc}, body_len={len(body)}]" ) From 3af5bc8607784720664d4ef18051da1712b60e1f Mon Sep 17 00:00:00 2001 From: Hugh Perkins Date: Fri, 1 May 2026 13:49:18 -0700 Subject: [PATCH 19/20] Apply black formatting to function_def_transformer.py Co-authored-by: Cursor --- .../function_def_transformer.py | 121 +++++++++++++----- 1 file changed, 91 insertions(+), 30 deletions(-) diff --git a/python/quadrants/lang/ast/ast_transformers/function_def_transformer.py b/python/quadrants/lang/ast/ast_transformers/function_def_transformer.py index 4ffee5fc2e..123767be55 100644 --- a/python/quadrants/lang/ast/ast_transformers/function_def_transformer.py +++ b/python/quadrants/lang/ast/ast_transformers/function_def_transformer.py @@ -56,7 +56,9 @@ def _decl_and_create_variable( assert this_arg_features is not None marker = this_arg_features[0] if marker == _TENSOR_T_NDARRAY_MARKER: - raw_element_type, ndim, needs_grad, boundary, layout = this_arg_features[1:] + raw_element_type, ndim, needs_grad, boundary, layout = ( + this_arg_features[1:] + ) return False, ( kernel_arguments.decl_ndarray_arg, ( @@ -75,7 +77,9 @@ def _decl_and_create_variable( assert ctx.global_vars is not None return True, ctx.global_vars.get(name) raise AssertionError(f"unknown qd.Tensor marker: {marker!r}") - if annotation == annotations.template or isinstance(annotation, annotations.template): + if annotation == annotations.template or isinstance( + annotation, annotations.template + ): if name in ctx.template_vars: return True, ctx.template_vars[name] assert ctx.global_vars is not None @@ -98,8 +102,12 @@ def _decl_and_create_variable( needs_grad, BoundaryMode(boundary), ) - offset = kernel_arguments.decl_scalar_arg(primitive_types.i32, full_name + "_offset") - size = kernel_arguments.decl_scalar_arg(primitive_types.i32, full_name + "_size") + offset = kernel_arguments.decl_scalar_arg( + primitive_types.i32, full_name + "_offset" + ) + size = kernel_arguments.decl_scalar_arg( + primitive_types.i32, full_name + "_size" + ) return True, BufferView(arr, offset, size) if isinstance(annotation, ndarray_type.NdarrayType): assert this_arg_features is not None @@ -139,7 +147,10 @@ def _transform_kernel_arg( ctx.create_variable(argument_name, argument_type) for field_idx, field in enumerate(dataclasses.fields(argument_type)): flat_name = create_flat_name(argument_name, field.name) - if pruning.enforcing and flat_name not in pruning.used_vars_by_func_id[func_id]: + if ( + pruning.enforcing + and flat_name not in pruning.used_vars_by_func_id[func_id] + ): continue # if a field is a dataclass, then feed back into process_kernel_arg recursively if dataclasses.is_dataclass(field.type): @@ -177,7 +188,9 @@ def _transform_kernel_arg( ctx.create_variable(argument_name, obj) @staticmethod - def _transform_as_kernel(ctx: ASTTransformerFuncContext, node: ast.FunctionDef, args: ast.arguments) -> None: + def _transform_as_kernel( + ctx: ASTTransformerFuncContext, node: ast.FunctionDef, args: ast.arguments + ) -> None: assert ctx.func is not None assert ctx.arg_features is not None if node.returns is not None: @@ -226,7 +239,9 @@ def _walk_obj(obj, arg_idx, path): child = child._unwrap() if isinstance(child, _ndarray.Ndarray): _register_ndarray(child, arg_idx, (*path, field.name)) - elif dataclasses.is_dataclass(child) and not isinstance(child, type): + elif dataclasses.is_dataclass(child) and not isinstance( + child, type + ): _walk_obj(child, arg_idx, (*path, field.name)) else: for attr_name, attr_val in vars(obj).items(): @@ -250,7 +265,9 @@ def _register_ndarray(nd, arg_idx, attr_chain): element_type, ndim, name, needs_grad ) arr = any_array.AnyArray( - _qd_core.make_external_tensor_expr(element_type, ndim, arg_id_vec, needs_grad, BoundaryMode.UNSAFE), + _qd_core.make_external_tensor_expr( + element_type, ndim, arg_id_vec, needs_grad, BoundaryMode.UNSAFE + ), _qd_layout=layout, ) cache[key] = arr @@ -259,7 +276,9 @@ def _register_ndarray(nd, arg_idx, attr_chain): assert ctx.py_args is not None for i, arg_meta in enumerate(ctx.func.arg_metas): anno = arg_meta.annotation - is_template = anno is annotations.template or isinstance(anno, annotations.template) + is_template = anno is annotations.template or isinstance( + anno, annotations.template + ) is_tensor_anno = anno is _TensorClass if not (is_template or is_tensor_anno): continue @@ -297,15 +316,21 @@ def _transform_func_arg( # directly — ndarray and field impls are both valid pass-by-reference arguments. if argument_type is _TensorClass: data = FunctionDefTransformer._unwrap_tensor(data) - _cache = getattr(getattr(ctx, "global_context", None), "ndarray_to_any_array", None) + _cache = getattr( + getattr(ctx, "global_context", None), "ndarray_to_any_array", None + ) promoted = _cache.get(id(data)) if _cache else None - ctx.create_variable(argument_name, promoted if promoted is not None else data) + ctx.create_variable( + argument_name, promoted if promoted is not None else data + ) return None if dataclasses.is_dataclass(argument_type): for field in dataclasses.fields(argument_type): flat_name = create_flat_name(argument_name, field.name) - data_child = FunctionDefTransformer._unwrap_tensor(getattr(data, field.name)) + data_child = FunctionDefTransformer._unwrap_tensor( + getattr(data, field.name) + ) if isinstance( data_child, ( @@ -317,11 +342,19 @@ def _transform_func_arg( ): # qd.Tensor struct fields skip check_matched (the Tensor class has no such method — it is # polymorphic). - if field.type is not _TensorClass and hasattr(field.type, "check_matched"): + if field.type is not _TensorClass and hasattr( + field.type, "check_matched" + ): field.type.check_matched(data_child.get_type(), field.name) - _cache = getattr(getattr(ctx, "global_context", None), "ndarray_to_any_array", None) + _cache = getattr( + getattr(ctx, "global_context", None), + "ndarray_to_any_array", + None, + ) promoted = _cache.get(id(data_child)) if _cache else None - ctx.create_variable(flat_name, promoted if promoted is not None else data_child) + ctx.create_variable( + flat_name, promoted if promoted is not None else data_child + ) elif dataclasses.is_dataclass(data_child): FunctionDefTransformer._transform_func_arg( ctx, @@ -338,9 +371,17 @@ def _transform_func_arg( # Ndarray arguments are passed by reference. if isinstance(argument_type, (ndarray_type.NdarrayType)): if not isinstance( - data, (_ndarray.ScalarNdarray, matrix.VectorNdarray, matrix.MatrixNdarray, any_array.AnyArray) + data, + ( + _ndarray.ScalarNdarray, + matrix.VectorNdarray, + matrix.MatrixNdarray, + any_array.AnyArray, + ), ): - raise QuadrantsSyntaxError(f"Argument {argument_name} of type {argument_type} is not recognized.") + raise QuadrantsSyntaxError( + f"Argument {argument_name} of type {argument_type} is not recognized." + ) argument_type.check_matched(data.get_type(), argument_name) ctx.create_variable(argument_name, data) return None @@ -350,7 +391,9 @@ def _transform_func_arg( # not here — data.arr is an Expr node during func compilation, not a real Ndarray. if isinstance(argument_type, buffer_view_type.BufferViewType): if not isinstance(data, BufferView): - raise QuadrantsSyntaxError(f"Argument {argument_name} expects a BufferView, got {type(data).__name__}") + raise QuadrantsSyntaxError( + f"Argument {argument_name} expects a BufferView, got {type(data).__name__}" + ) ctx.create_variable(argument_name, data) return None @@ -389,7 +432,9 @@ def _transform_func_arg( return None if id(argument_type) in primitive_types.type_ids: - ctx.create_variable(argument_name, impl.expr_init_func(qd_ops.cast(data, argument_type))) + ctx.create_variable( + argument_name, impl.expr_init_func(qd_ops.cast(data, argument_type)) + ) return None # Create a copy for non-template arguments, # so that they are passed by value. @@ -398,7 +443,9 @@ def _transform_func_arg( return None @staticmethod - def _transform_as_func(ctx: ASTTransformerFuncContext, node: ast.FunctionDef, args: ast.arguments) -> None: + def _transform_as_func( + ctx: ASTTransformerFuncContext, node: ast.FunctionDef, args: ast.arguments + ) -> None: # pylint: disable=import-outside-toplevel from quadrants.lang.kernel_impl import Func @@ -406,7 +453,9 @@ def _transform_as_func(ctx: ASTTransformerFuncContext, node: ast.FunctionDef, ar assert ctx.py_args is not None for py_arg_i, py_arg in enumerate(ctx.py_args): argument = ctx.func.arg_metas_expanded[py_arg_i] - FunctionDefTransformer._transform_func_arg(ctx, argument.name, argument.annotation, py_arg) + FunctionDefTransformer._transform_func_arg( + ctx, argument.name, argument.annotation, py_arg + ) # deal with dataclasses for v in ctx.func.orig_arguments: @@ -446,7 +495,9 @@ def build_FunctionDef( FunctionDefTransformer._transform_as_func(ctx, node, args) if ctx.is_kernel: - FunctionDefTransformer._validate_stream_parallel_exclusivity(node.body, ctx.global_vars) + FunctionDefTransformer._validate_stream_parallel_exclusivity( + node.body, ctx.global_vars + ) with ctx.variable_scope_guard(): build_stmts(ctx, node.body) @@ -467,10 +518,9 @@ def _is_stream_parallel_with(stmt: ast.stmt, global_vars: dict[str, Any]) -> boo return True resolved = ASTResolver.resolve_value(func_node, global_vars) if resolved is not None: - return ( - getattr(resolved, "__name__", None) == "stream_parallel" - and getattr(resolved, "__module__", "").startswith("quadrants") - ) + return getattr(resolved, "__name__", None) == "stream_parallel" and getattr( + resolved, "__module__", "" + ).startswith("quadrants") if isinstance(func_node, ast.Attribute) and func_node.attr == "stream_parallel": return True if isinstance(func_node, ast.Name) and func_node.id == "stream_parallel": @@ -479,11 +529,20 @@ def _is_stream_parallel_with(stmt: ast.stmt, global_vars: dict[str, Any]) -> boo @staticmethod def _is_docstring(stmt: ast.stmt, index: int) -> bool: - return index == 0 and isinstance(stmt, ast.Expr) and isinstance(stmt.value, (ast.Constant, ast.Str)) + return ( + index == 0 + and isinstance(stmt, ast.Expr) + and isinstance(stmt.value, (ast.Constant, ast.Str)) + ) @staticmethod - def _validate_stream_parallel_exclusivity(body: list[ast.stmt], global_vars: dict[str, Any]) -> None: - if not any(FunctionDefTransformer._is_stream_parallel_with(s, global_vars) for s in body): + def _validate_stream_parallel_exclusivity( + body: list[ast.stmt], global_vars: dict[str, Any] + ) -> None: + if not any( + FunctionDefTransformer._is_stream_parallel_with(s, global_vars) + for s in body + ): return for i, stmt in enumerate(body): if FunctionDefTransformer._is_docstring(stmt, i): @@ -492,7 +551,9 @@ def _validate_stream_parallel_exclusivity(body: list[ast.stmt], global_vars: dic stmt_desc = f"{type(stmt).__name__}" if isinstance(stmt, ast.With) and stmt.items: ctx_expr = stmt.items[0].context_expr - if isinstance(ctx_expr, ast.Call) and isinstance(ctx_expr.func, ast.Attribute): + if isinstance(ctx_expr, ast.Call) and isinstance( + ctx_expr.func, ast.Attribute + ): stmt_desc += f"(with {ast.dump(ctx_expr.func)})" raise QuadrantsSyntaxError( "When using qd.stream_parallel(), all top-level statements " From af4a30615f6e625fd47ef4cb30cb2259993e5df4 Mon Sep 17 00:00:00 2001 From: Hugh Perkins Date: Sat, 2 May 2026 10:29:18 -0700 Subject: [PATCH 20/20] Skip coverage probes in stream_parallel exclusivity check; restore deleted comments The Linux build CI runs with QD_KERNEL_COVERAGE=1, which injects _qd_cov[probe_id] = 1 Assign nodes before each statement in the kernel body. _validate_stream_parallel_exclusivity was rejecting these probes as non-stream_parallel statements. Add _is_coverage_probe() to skip them. Also restores the 4 safety comments in CUDA kernel_launcher.cpp's prepare_task lambda that were flagged by the deleted-comments check, fixes clang-format line break, and reflows the symbol_resolver.py docstring to 120 characters. Co-authored-by: Cursor --- .../function_def_transformer.py | 13 ++++++++++ python/quadrants/lang/ast/symbol_resolver.py | 4 +-- quadrants/runtime/cuda/kernel_launcher.cpp | 25 +++++++++++++++++-- 3 files changed, 38 insertions(+), 4 deletions(-) diff --git a/python/quadrants/lang/ast/ast_transformers/function_def_transformer.py b/python/quadrants/lang/ast/ast_transformers/function_def_transformer.py index 123767be55..142694091f 100644 --- a/python/quadrants/lang/ast/ast_transformers/function_def_transformer.py +++ b/python/quadrants/lang/ast/ast_transformers/function_def_transformer.py @@ -535,6 +535,17 @@ def _is_docstring(stmt: ast.stmt, index: int) -> bool: and isinstance(stmt.value, (ast.Constant, ast.Str)) ) + @staticmethod + def _is_coverage_probe(stmt: ast.stmt) -> bool: + if not isinstance(stmt, ast.Assign) or len(stmt.targets) != 1: + return False + target = stmt.targets[0] + return ( + isinstance(target, ast.Subscript) + and isinstance(target.value, ast.Name) + and target.value.id.startswith("_qd_cov") + ) + @staticmethod def _validate_stream_parallel_exclusivity( body: list[ast.stmt], global_vars: dict[str, Any] @@ -547,6 +558,8 @@ def _validate_stream_parallel_exclusivity( for i, stmt in enumerate(body): if FunctionDefTransformer._is_docstring(stmt, i): continue + if FunctionDefTransformer._is_coverage_probe(stmt): + continue if not FunctionDefTransformer._is_stream_parallel_with(stmt, global_vars): stmt_desc = f"{type(stmt).__name__}" if isinstance(stmt, ast.With) and stmt.items: diff --git a/python/quadrants/lang/ast/symbol_resolver.py b/python/quadrants/lang/ast/symbol_resolver.py index f95373a463..c2b4fcaffe 100644 --- a/python/quadrants/lang/ast/symbol_resolver.py +++ b/python/quadrants/lang/ast/symbol_resolver.py @@ -60,8 +60,8 @@ def resolve_to(node, wanted, scope): def resolve_value(node, scope): """Resolve an AST Name/Attribute node to a Python object. - Same traversal as resolve_to but returns the resolved object (or None) - instead of comparing against a wanted value. + Same traversal as resolve_to but returns the resolved object (or None) instead of comparing against a wanted + value. """ if isinstance(node, ast.Name): return scope.get(node.id) if isinstance(scope, dict) else None diff --git a/quadrants/runtime/cuda/kernel_launcher.cpp b/quadrants/runtime/cuda/kernel_launcher.cpp index 658b2089d9..b08af6733e 100644 --- a/quadrants/runtime/cuda/kernel_launcher.cpp +++ b/quadrants/runtime/cuda/kernel_launcher.cpp @@ -81,8 +81,19 @@ void KernelLauncher::launch_offloaded_tasks(LaunchContextBuilder &ctx, int effective_grid_dim = task.grid_dim; if (!task.ad_stack.allocas.empty()) { std::size_t n = resolve_num_threads(task.ad_stack, executor); + // Pass the device-side `RuntimeContext` pointer through to the adstack sizer kernel. Without it the sizer + // launches with a host pointer and the next DtoH sync trips `CUDA_ERROR_ILLEGAL_ADDRESS ... + // memcpy_device_to_host` on GPUs whose driver + kernel cannot coherently access pageable host memory (the HMM + // capability gated below in `launch_llvm_kernel`). `nullptr` on HMM-capable setups keeps + // `publish_adstack_metadata`'s host-pointer fast path. executor->publish_adstack_metadata(task.ad_stack, n, &ctx, device_context_ptr); if (task.ad_stack.bound_expr.has_value()) { + // Reducer length is the gating ndarray's full flat element count, not `n`: the lazy row-claim atomic-rmw + // fires once per LCA execution, and `gpu_parallel_struct_for` / `gpu_parallel_range_for` grid-stride (`i += + // grid_dim()`) so a single dispatched thread can hit the LCA many times across one launch when the logical + // loop span exceeds the (capped) concurrent thread count. Walking the reducer over the full ndarray length + // keeps `bound_row_capacities[task_index]` consistent with the total claim count, which the codegen-emitted + // bounds clamp reads. Mirrors the CPU launcher's `bound_count_length` derivation. std::size_t bound_count_length = n; if (task.ad_stack.bound_expr->field_source_kind == StaticAdStackBoundExpr::FieldSourceKind::NdArray && !task.ad_stack.bound_expr->ndarray_arg_id.empty() && task.ad_stack.bound_expr->ndarray_ndim > 0 && @@ -92,6 +103,11 @@ void KernelLauncher::launch_offloaded_tasks(LaunchContextBuilder &ctx, std::vector indices = task.ad_stack.bound_expr->ndarray_arg_id; indices.push_back(TypeFactory::SHAPE_POS_IN_NDARRAY); indices.push_back(axis); + // get_struct_arg_host (NOT get_struct_arg): `launch_llvm_kernel` above has already swapped + // `ctx_->arg_buffer` to a device pointer, so a plain `get_struct_arg` here would dereference device + // memory from the host - SIGSEGV / CUDA_ERROR_ILLEGAL_ADDRESS on drivers without HMM, garbage + // `flat_len` on HMM-capable setups. The host backing buffer (`arg_buffer_`) stays host-resident across + // the swap and holds the same shape entries, so the host-safe variant is byte-equivalent here. flat_len *= int64_t(ctx.get_struct_arg_host(indices)); } bound_count_length = static_cast(std::max(0, flat_len)); @@ -100,6 +116,11 @@ void KernelLauncher::launch_offloaded_tasks(LaunchContextBuilder &ctx, device_context_ptr); executor->ensure_per_task_float_heap_post_reducer(task_index, task.ad_stack, n); } + // Floor division (not ceiling): the heap-row count `n` resolved by `resolve_num_threads` floors at + // `kAdStackMaxConcurrentThreads`, so dispatching `cap_blocks * block_dim` threads must not exceed that count. + // Ceiling division would over-dispatch by `block_dim - 1` threads when `block_dim` does not divide + // `kAdStackMaxConcurrentThreads` evenly (e.g. `block_dim=192`: `ceil(65536/192)*192 = 65664`), and threads + // with `linear_thread_idx >= 65536` would index past the heap end. if (task.block_dim > 0) { const std::size_t cap_blocks = std::max(1u, kAdStackMaxConcurrentThreads / static_cast(task.block_dim)); @@ -143,8 +164,8 @@ void KernelLauncher::launch_offloaded_tasks(LaunchContextBuilder &ctx, int effective_grid_dim = prepare_task(j, t); CUDAContext::get_instance().set_stream(stream_by_id[t.stream_parallel_group_id]); QD_TRACE("Launching kernel {}<<<{}, {}>>>", t.name, effective_grid_dim, t.block_dim); - cuda_module->launch(t.name, effective_grid_dim, t.block_dim, t.dynamic_shared_array_bytes, - {&ctx.get_context()}, {}); + cuda_module->launch(t.name, effective_grid_dim, t.block_dim, t.dynamic_shared_array_bytes, {&ctx.get_context()}, + {}); } for (auto &[sid, s] : stream_by_id) {