[Perf] Streams 3: Add qd.stream_parallel() context manager#409
[Perf] Streams 3: Add qd.stream_parallel() context manager#409hughperkins wants to merge 46 commits intohp/streams-quadrantsic-2-amdgpu-cpufrom
Conversation
Introduces stream_parallel() for running top-level for-loop blocks on separate GPU streams. The AST transformer maps 'with qd.stream_parallel()' blocks to stream-parallel group IDs, which propagate through IR lowering and offloading to the CUDA/AMDGPU kernel launchers. Each unique group ID gets its own stream at launch time. Includes validation that all top-level kernel statements must be stream_parallel blocks (no mixing), and offline cache key support.
…adrantsic-3-stream-parallel # Conflicts: # python/quadrants/lang/stream.py
Prevents stale group IDs from leaking if insert_for is called after a path that set a non-zero stream_parallel_group_id, matching the reset pattern of all other ForLoopConfig fields.
Add an error check in begin_stream_parallel() to prevent nesting, which would produce undefined group ID semantics.
…context safety Add comments explaining that streams are created/destroyed per launch (stream pooling as future optimization), and that RuntimeContext sharing across concurrent streams is safe because kernels only read from it.
…adrantsic-3-stream-parallel
|
Review from Opus (predates last 5 commits above): PR Review:
|
| Priority | Item |
|---|---|
| High | Add config.stream_parallel_group_id = 0; to ForLoopDecoratorRecorder::reset() (#3) |
| High | Verify RuntimeContext is safe to share across concurrent streams (#5) |
| Medium | Reject nested stream_parallel blocks explicitly (#7) |
| Medium | Consider rejecting stream_parallel in @qd.func (#8) |
| Low | Extract shared stream-dispatch logic from CUDA/AMDGPU launchers (#2) |
| Low | Consider stream pooling for repeated kernel launches (#1) |
|
For the concern about stream pool, added 4th pr to add stream pool #410 |
| if len(stmt.items) != 1: | ||
| return False | ||
| item = stmt.items[0] |
There was a problem hiding this comment.
What is items ? Could you document here or somewhere else why the length can be 1 or more, and what does it means in this context?
| "When using qd.stream_parallel(), all top-level statements " | ||
| "in the kernel must be 'with qd.stream_parallel():' blocks. " | ||
| "Move non-parallel code to a separate kernel." | ||
| ) |
There was a problem hiding this comment.
I still don't understand why you are moving to the next line before you have to. This is weird to me. But I don't care much.
| has_sp = any(FunctionDefTransformer._is_stream_parallel_with(s, global_vars) for s in body) | ||
| if not has_sp: |
There was a problem hiding this comment.
I would rather do
# <Insert fancy comment explaining what this check is doing>
if not any(FunctionDefTransformer._is_stream_parallel_with(s, global_vars) for s in body):
return| if len(node.items) != 1: | ||
| raise QuadrantsSyntaxError("'with' in Quadrants kernels only supports a single context manager") | ||
| item = node.items[0] |
There was a problem hiding this comment.
Same. Not clear what items is.
There was a problem hiding this comment.
All this "code duplication" (at least duplicate logics) is annoying but if there is no better choice and it is what we have been doing so far, then it is ok.
…cpu' into hp/streams-quadrantsic-3-stream-parallel Made-with: Cursor # Conflicts: # quadrants/codegen/llvm/llvm_compiled_data.h # quadrants/ir/frontend_ir.cpp # quadrants/ir/frontend_ir.h # quadrants/ir/statements.cpp # quadrants/ir/statements.h # quadrants/runtime/amdgpu/kernel_launcher.cpp # quadrants/runtime/cuda/kernel_launcher.cpp # quadrants/transforms/lower_ast.cpp # quadrants/transforms/offload.cpp
Made-with: Cursor
…cpu' into hp/streams-quadrantsic-3-stream-parallel Made-with: Cursor # Conflicts: # quadrants/codegen/llvm/llvm_compiled_data.h # quadrants/runtime/amdgpu/kernel_launcher.cpp # quadrants/runtime/cuda/kernel_launcher.cpp
|
migrated to use single PR on streams 4 |
…adrantsic-3-stream-parallel
bee7e65 to
b3fbc39
Compare
…-3-stream-parallel Resolve conflict in test_streams.py: keep both base branch tests (context manager, event, tape/graph rejection) and head branch tests (stream_parallel basic, timing, mixed rejection). Co-authored-by: Cursor <cursoragent@cursor.com>
| ctx.ast_builder.begin_stream_parallel() | ||
| build_stmts(ctx, node.body) | ||
| ctx.ast_builder.end_stream_parallel() |
There was a problem hiding this comment.
🔴 Non-for statements (e.g. a[0] = 1.0, qd.deactivate(snode, [k]), counter[None] = 0) placed directly inside with qd.stream_parallel(): silently race with sibling for-loops. The non-for statement is bundled into a serial OffloadedStmt with default stream_parallel_group_id=0 and dispatched on active_stream, while the sibling for-loop runs on a fresh CU_STREAM_NON_BLOCKING per-group stream that has no event handoff back to the active stream — so the for-loop can begin reading what the assignment is supposed to write before the assignment retires. Fix: in build_With, walk node.body and reject anything that is not a for-loop (mirroring the existing kernel-body validator), or stamp the active group_id onto every emitted root_block statement while inside a stream_parallel block.
Extended reasoning...
Bug\n\nbuild_With (python/quadrants/lang/ast/ast_transformer.py:1546-1548) does not introspect node.body and does not open a new IR scope — begin_stream_parallel only flips a counter on the ASTBuilder. So statements inside the with body are inserted directly into the kernel root_block. The frontend group-id stamping the PR added lives only in the four begin_frontend_*_for methods (frontend_ir.cpp:1395/1409/1423/1439), which means non-for statements (FrontendAssignStmt, FrontendSNodeOpStmt, etc.) never carry a stream_parallel_group_id — that field does not exist on those statement types.\n\nIn Offloader::run (quadrants/transforms/offload.cpp:90-158), only RangeForStmt / StructForStmt / MeshForStmt become standalone OffloadedStmts; everything else falls into the else at line 155-157 and is moved into pending_serial_statements — a serial OffloadedStmt constructed with default-initialized stream_parallel_group_id=0 (statements.h:1370). Sibling RangeForStmt at line 129 propagates group_id correctly. Codegen at codegen_cuda.cpp:641 / codegen_amdgpu.cpp:354 copies that 0 onto the OffloadedTask, and the launcher at runtime/cuda/kernel_launcher.cpp:55-95 (AMDGPU twin) takes the default-stream branch for group_id=0 and the per-group stream branch for the for-loop.\n\nThe two streams are independent: the per-group stream is created with CU_STREAM_NON_BLOCKING (line 80), which does NOT implicit-sync with the legacy NULL stream nor with arbitrary user-created streams. The launcher records no event from active_stream and inserts no stream_wait_event on the new stream — so the for-loop on s_K can begin executing before the serial assignment on active_stream finishes.\n\n## Distinct from existing PR-timeline bugs\n\n- Bug #11 (for-with-break): trigger requires a break inside the for-loop, which causes lower_ast to emit AllocaStmt+WhileStmt at root_block. No assignment-style trigger.\n- Bug #6 (strictly_serialized): trigger is a top-level RangeForStmt with strictly_serialized=true that fails the !s->strictly_serialized predicate at offload.cpp:93. The cast succeeds; the predicate fails. Different upstream path.\n- Bug #13 (non-static if/while wrapping for-loop): trigger is an IfStmt/WhileStmt at root_block whose body contains a for-loop. The cast at line 93 fails because of TYPE (IfStmt/WhileStmt), and the BUNDLE drops the inner for's group_id. Bug 13's proposed fix is to recursively scan the bundle for an inner for-loop and propagate that for-loop's group_id onto the bundle. That fix does not help here: the offending statement is itself the bundle entry (Assignment, SNodeOp, etc.) — there IS NO inner for-loop to read group_id from.\n- Bug #16 (qd.deactivate gc tasks): trigger is qd.deactivate INSIDE a for-loop, producing gc auxiliary tasks via insert_gc. This bug is qd.deactivate (or any non-for statement) at the with-body level, NOT inside a for-loop.\n\nThe shared root cause across these bugs is that pending_serial_statements always defaults to stream_parallel_group_id=0, but the user-reachable trigger here (plain non-for statement directly in the with-body) is not covered by any of those bugs' fix proposals.\n\n## Step-by-step proof\n\npython\n@qd.kernel\ndef k():\n with qd.stream_parallel():\n a[0] = 1.0 # FrontendAssignmentStmt at root_block, NO group_id\n for i in range(N):\n b[i] = a[0] * 2 # range_for, group_id=1, reads a[0]\n\n\n1. _validate_stream_parallel_exclusivity (function_def_transformer.py:472) walks node.body == [ast.With] — single with qd.stream_parallel():, all top-level entries match. Validation passes.\n2. build_With (ast_transformer.py:1533-1548) calls begin_stream_parallel() (counter→1), then build_stmts(ctx, node.body) which walks [ast.Assign, ast.For] at the SAME scope as kernel root. build_Assign emits a FrontendAssignmentStmt directly into root_block — no group_id stamping. build_For reaches begin_frontend_range_for which DOES stamp stream_parallel_group_id=1 onto the FrontendForStmt.\n3. After lowering: root_block = [FrontendAssignment, RangeForStmt(group_id=1)]. The FrontendAssignment has no stream_parallel_group_id field at all.\n4. Offloader::run iterates root_block. The FrontendAssignment fails every for-loop cast → falls into the else at offload.cpp:155-157 → moved into pending_serial_statements. RangeForStmt hits offload.cpp:93 → assemble_serial_statements flushes the serial OffloadedStmt (group_id=0) into root_block, then constructs a fresh range_for OffloadedStmt with group_id=1.\n5. Final OffloadedTask list: [serial(group=0, [Assignment]), range_for(group=1, [for-body])].\n6. Launcher walk:\n - i=0, group=0 → default-stream branch, launches serial on active_stream (async).\n - i=1, group=1 → enters else branch. Creates s_1 with CU_STREAM_NON_BLOCKING (line 80). Sets stream to s_1, launches range_for on s_1, syncs, destroys.\n7. Race: s_1 has no implicit dependency on active_stream (NON_BLOCKING semantics), and the launcher inserts no event handoff. The range_for on s_1 can begin reading a[0] before the serial task on active_stream finishes writing 1.0.\n\n## Reachable user patterns\n\n- a[0] = some_value before a for-loop (initialize accumulator, then iterate)\n- counter[None] = 0 (global atomic-store followed by parallel reduction)\n- qd.deactivate(snode, [k]) (plain snode op without a wrapping if/while)\n- qd.append(snode, [...], val)\n- A function call like qd.sync() or any custom @qd.func that performs a global store\n\nIn each case the user is inside with qd.stream_parallel(): and reasonably expects safe sequencing. The validator's contract ('all top-level statements... must be stream_parallel blocks') is enforced at the kernel-body level but not at the with-body level. tests/python/test_streams.py only exercises with-bodies that contain only for-loops, so this regression is untested.\n\n## Fix\n\n**(a) Conservative, one-liner**: in build_With, after the stream_parallel call resolves, walk node.body and reject anything that is not an ast.For. Symmetric with the kernel-body validator already introduced in this PR; the docs already imply only for-loops are intended inside with qd.stream_parallel():.\n\n**(b) Structural**: when emitting any root_block statement while current_stream_parallel_group_id_ != 0, propagate the group_id onto a new field on the statement (or split bundles in the offloader by surrounding group_id). More invasive, but preserves the legality of mixing assignments and for-loops in a single with block.
…cpu' into hp/streams-quadrantsic-3-stream-parallel
| for (size_t j = group_start; j < i; j++) { | ||
| const auto &t = offloaded_tasks[j]; | ||
| executor->publish_adstack_metadata(t.ad_stack, resolve_num_threads(t, executor), &ctx, context_pointer); | ||
| AMDGPUContext::get_instance().set_stream(stream_by_id[t.stream_parallel_group_id]); | ||
| amdgpu_module->launch(t.name, t.grid_dim, t.block_dim, t.dynamic_shared_array_bytes, {(void *)&context_pointer}, | ||
| {arg_size}); | ||
| } |
There was a problem hiding this comment.
🟡 On AMDGPU with kernel_profiler=True, every with qd.stream_parallel(): block runs strictly sequentially: AMDGPUContext::launch calls profiler_->stop after each kernel dispatch, and KernelProfilerAMDGPU::stop (amdgpu_profiler.cpp:64, added on this branch by 3499bbc) does stream_synchronize(active_stream). Inside the new inner stream_parallel loop at amdgpu/kernel_launcher.cpp:86-92, active_stream IS the per-group stream just installed by set_stream(s_K), so the host blocks on s_K before the next iteration's set_stream(s_{K+1}) and launch — silently defeating the documented concurrency. Latent (profiler is not on by default) and perf-only (results remain correct), so likely fixable by deferring profiler->stop until after the per-group sync, switching to event_synchronize on the stop event, or documenting the incompatibility.
Extended reasoning...
What the bug is
This PR introduces a with qd.stream_parallel(): context manager that produces OffloadedTasks with non-zero stream_parallel_group_id. The new inner dispatch loop at quadrants/runtime/amdgpu/kernel_launcher.cpp:86-92 sequences each iteration as:
for (size_t j = group_start; j < i; j++) {
const auto &t = offloaded_tasks[j];
executor->publish_adstack_metadata(t.ad_stack, resolve_num_threads(t, executor), &ctx, context_pointer);
AMDGPUContext::get_instance().set_stream(stream_by_id[t.stream_parallel_group_id]);
amdgpu_module->launch(t.name, t.grid_dim, t.block_dim, ...);
}Where amdgpu_module->launch routes (via jit_amdgpu.h:81) into AMDGPUContext::launch (amdgpu_context.cpp:171-206), which unconditionally calls profiler_->stop(task_handle) immediately after driver_.launch_kernel whenever profiler_ is set.
KernelProfilerAMDGPU::stop at quadrants/rhi/amdgpu/amdgpu_profiler.cpp:61-64 — added on this PR's merge chain by commit 3499bbc — reads active_stream = AMDGPUContext::get_instance().get_stream() and then calls AMDGPUDriver::get_instance().stream_synchronize(active_stream) so the subsequent event_elapsed_time read does not fault on a non-completed event.
Inside the inner stream_parallel loop the active stream at the moment profiler_->stop runs IS the per-group stream s_K just installed by set_stream(stream_by_id[t.stream_parallel_group_id]). So the host blocks on s_K before the loop's next iteration runs set_stream(s_{K+1}) and launch(...). The two per-group launches end up strictly serialized despite each being on its own stream — the documented concurrency stream_parallel exists to provide is silently lost.
Step-by-step proof
User program (AMDGPU + profiler + two-block stream_parallel):
qd.init(arch=qd.amdgpu, kernel_profiler=True)
@qd.kernel
def k():
with qd.stream_parallel():
for i in range(N): a[i] = compute_a(i)
with qd.stream_parallel():
for j in range(N): b[j] = compute_b(j)- Two
OffloadedTasks emerge withstream_parallel_group_id1 and 2. - Launcher creates
s_1ands_2(HIP_STREAM_NON_BLOCKING) at lines 78-85. - Iter j=group_start (task A, group=1):
set_stream(s_1);amdgpu_module->launch→AMDGPUContext::launchqueues kernel A ons_1, thenprofiler_->stop(handle_A)→ readsactive_stream = s_1andstream_synchronize(s_1). Host blocks until A ons_1fully completes. - Iter j=group_start+1 (task B, group=2): Only NOW does
set_stream(s_2)run;launchqueues B ons_2. By the time B is enqueued, A has already drained — there is no overlap.
The kernel results are correct; the concurrency contract is silently violated. test_stream_parallel_timing's >1.5x assertion would fail under qd.init(arch=qd.amdgpu, kernel_profiler=True), but tests/python/test_streams.py runs with kernel_profiler off by default, so this regression is latent.
Why nothing else catches this
tests/python/test_streams.pydoes not exercisekernel_profiler=True.- The CUDA twin at
cuda_profiler.cpp:127-128records its stop event onnullptr(legacy NULL stream) andstream_synchronize(nullptr), which on a CU_STREAM_NON_BLOCKING per-group stream produces ~0 ms timings rather than host-blocking — that is the same pre-3499bbc shape AMDGPU had before the fix landed; not the bug here, just an explanation of why CUDA does not exhibit the same regression. - This is distinct from the previously-flagged active_stream entry handoff, the blocking-flag bug, the rand_states race, and the adstack metadata buffer race — none of those involve the profiler.
Impact
Narrow trigger (opt-in profiler + stream_parallel) and silent perf-only failure (output is correct, just non-concurrent), but the failure mode directly contradicts the documented purpose of the feature this PR introduces. Two of the three verifiers rated this nit for those reasons; one rated it normal because the trigger is reachable through a public-API combination.
Fix options
(a) Defer profiler_->stop until after all per-group launches in a stream_parallel batch, so that the per-group stream_synchronize(s_K) already happening at lines 95-100 of the launcher provides the completion guarantee before event_elapsed_time is read.
(b) Change KernelProfilerAMDGPU::stop to use event_synchronize on the stop event itself (events synchronize independently of the stream), which preserves per-task timing without host-syncing the stream.
(c) Document that kernel_profiler defeats concurrency for qd.stream_parallel() on AMDGPU as a known limitation.
(a) and (b) are real fixes that preserve both timing and concurrency; (c) is the minimum acceptable mitigation. Note (b) also incidentally improves the CUDA side, which currently produces ~0 ms timings for kernels launched on non-default streams.
| @staticmethod | ||
| def _is_docstring(stmt: ast.stmt, index: int) -> bool: | ||
| return index == 0 and isinstance(stmt, ast.Expr) and isinstance(stmt.value, (ast.Constant, ast.Str)) | ||
|
|
||
| @staticmethod | ||
| def _validate_stream_parallel_exclusivity(body: list[ast.stmt], global_vars: dict[str, Any]) -> None: | ||
| if not any(FunctionDefTransformer._is_stream_parallel_with(s, global_vars) for s in body): | ||
| return | ||
| for i, stmt in enumerate(body): | ||
| if FunctionDefTransformer._is_docstring(stmt, i): | ||
| continue | ||
| if not FunctionDefTransformer._is_stream_parallel_with(stmt, global_vars): | ||
| raise QuadrantsSyntaxError( | ||
| "When using qd.stream_parallel(), all top-level statements " | ||
| "in the kernel must be 'with qd.stream_parallel():' blocks. " | ||
| "Move non-parallel code to a separate kernel." | ||
| ) |
There was a problem hiding this comment.
🟡 The new _validate_stream_parallel_exclusivity check (function_def_transformer.py:467-477) only carves out docstrings (index==0 ast.Expr(Constant)), but ast.Pass and qd.static_assert(...) / qd.static_print(...) at the kernel top-level — both compile-time directives that emit no IR — also trip the validator with the misleading "Move non-parallel code to a separate kernel" error. A kernel that writes qd.static_assert(N > 0) (the idiomatic pattern shown in tests/python/test_assert.py:138/150) followed by with qd.stream_parallel(): blocks fails compilation; the workaround is to delete or relocate the directive. Fix is a one-helper extension that also skips ast.Pass and ast.Expr(Call) whose call resolves to known compile-time directives.
Extended reasoning...
What the bug is
_validate_stream_parallel_exclusivity (function_def_transformer.py:467-477) iterates node.body and raises QuadrantsSyntaxError("...all top-level statements must be with qd.stream_parallel(): blocks. Move non-parallel code to a separate kernel.") for any statement that is neither _is_stream_parallel_with nor _is_docstring. The new _is_docstring carve-out only matches index == 0 and isinstance(stmt, ast.Expr) and isinstance(stmt.value, (ast.Constant, ast.Str)) — i.e. PEP 257 docstrings at body[0]. Three other harmless top-level constructs are not handled:
ast.Pass—passplaceholder, lowered bybuild_Pass(ast_transformer.py) to a no-op (return None). Emits no IR.qd.static_assert(...)at top level — pure-Python compile-time check (impl.py:615-638), uses Pythonassertagainst a static value. Emits no IR. Idiomatic kernel-top directive, exercised at tests/python/test_assert.py:138, 150, 161, 171 and tests/python/test_lexical_scope.py:13, 17. Parses toast.Expr(value=ast.Call(func=Attribute(...static_assert)))—ast.Call, notast.Constant, so_is_docstringreturns False even at index 0.qd.static_print(...)at top level — same shape, same compile-time-only semantics, same incorrect rejection.
Note: the original synthesis also lists Python assert as a harmless construct, but build_Assert (ast_transformer.py:1475) does emit real runtime-checked IR that becomes part of the offloaded task graph. Rejecting it at top level is correct (it would race with sibling stream_parallel for-loops the same way any other store would). I am narrowing the bug to ast.Pass + qd.static_assert + qd.static_print and leaving assert out.
How the failure manifests
@qd.kernel
def k():
qd.static_assert(N > 0) # ast.Expr(Call(static_assert)) — no IR
with qd.stream_parallel():
for i in range(N): a[i] = 1.0
with qd.stream_parallel():
for j in range(N): b[j] = 2.0Step-by-step trace:
build_FunctionDefcalls_validate_stream_parallel_exclusivity(node.body, ctx.global_vars).- The body has
[ast.Expr(Call(static_assert)), ast.With, ast.With]._is_stream_parallel_withreturns True for the twoast.Withnodes →has_sp = True. - The walk iterates body. At
i=0, stmt=ast.Expr(Call):_is_docstring(stmt, 0)checksisinstance(stmt.value, (ast.Constant, ast.Str))—stmt.valueisast.Call, returns False._is_stream_parallel_with(stmt)returns False (notast.With). Validator raisesQuadrantsSyntaxError("...Move non-parallel code to a separate kernel.").
The user sees an error telling them to "move non-parallel code", but there is no non-parallel runtime code to move — qd.static_assert only emits a Python-level assertion at kernel compile time. The workaround is to delete the invariant check or to relocate it inside one of the with bodies (where it still runs at compile time and so still works), but neither preserves the kernel-wide intent. Same story for pass and static_print.
Why no existing safeguard catches it
_is_docstringonly matches index 0 +ast.Expr+ast.Constant/ast.Str—ast.Callis neither._is_stream_parallel_withonly matchesast.With.- The frontend group-id stamping is irrelevant here; the rejection happens during the AST walk before any IR is emitted.
Impact and severity
User-facing impact is a confusing compile-time error. No silent miscompilation, no perf regression, no data race. Workaround is trivial (delete or relocate the directive). All four verifiers independently confirmed at nit severity:
- Verifier 1: "narrow papercut, not a correctness issue, low frequency of occurrence."
- Verifier 2: explicitly excluded
assertfrom the carve-out (see narrowing above). - Verifier 3: "Worth folding into the existing carve-out as a one-helper change."
- Verifier 4: "no correctness/perf impact and the workaround is trivial; the papercut is most acute for users following defensive-programming patterns or PEP 257-style annotations."
Fix
Extend the existing _is_docstring carve-out in function_def_transformer.py:
@staticmethod
def _is_skippable(stmt: ast.stmt, index: int, global_vars: dict[str, Any]) -> bool:
if FunctionDefTransformer._is_docstring(stmt, index):
return True
if isinstance(stmt, ast.Pass):
return True
if isinstance(stmt, ast.Expr) and isinstance(stmt.value, ast.Call):
for target in (static_assert, static_print):
if ASTResolver.resolve_to(stmt.value.func, target, global_vars):
return True
return FalseThen call _is_skippable in place of _is_docstring inside _validate_stream_parallel_exclusivity. The principled alternative noted in the original Bug #9 — only reject statements that actually emit offloaded tasks — would catch this class of constructs (and any future no-op directive) for free, but it is a larger refactor.
Coverage Report (
|
| File | Coverage | Missing |
|---|---|---|
🔴 python/quadrants/lang/ast/ast_transformer.py |
0% | 31,43,1533-1549 |
🔴 python/quadrants/lang/ast/ast_transformers/function_def_transformer.py |
58% | 29,35,456-457,461,464,467-468,471-472,477 |
🔴 python/quadrants/lang/stream.py |
0% | 2,131-132,138,141 |
🔴 tests/python/test_streams.py |
24% | 342-344,346-353,355-358,364-367,369-378,380-383,385-390,396,398-399,401-412,414-427,429,432-434,436-440,442-446,448-450,454,474-478 |
Diff coverage: 26% · Overall: 65% · 170 lines, 126 missing
The _is_stream_parallel_with validation uses ASTResolver.resolve_to which compares objects with `is`. On Linux build runners where quadrants is available from both the source tree and installed location, the stream_parallel function object may differ between import paths. Add a fallback that checks __name__ and __module__ when identity fails, and add ASTResolver.resolve_value for general AST-to-object resolution. Co-authored-by: Cursor <cursoragent@cursor.com>
Co-authored-by: Cursor <cursoragent@cursor.com>
Co-authored-by: Cursor <cursoragent@cursor.com>
There was a problem hiding this comment.
Additional findings (outside current diff — PR may have been updated during review):
-
🟡
python/quadrants/lang/ast/ast_transformer.py:31— Unused import:from quadrants.lang.ast.symbol_resolver import ASTResolverwas added at ast_transformer.py:31 but the symbol is never used in this file. The actual usage ofASTResolverlives infunction_def_transformer.pyvia_is_stream_parallel_with, whichbuild_Withcalls indirectly. Likely a leftover from an earlier draft wherebuild_WithcalledASTResolverdirectly. Pure nit — remove the import line.Extended reasoning...
What the bug is
This PR adds
from quadrants.lang.ast.symbol_resolver import ASTResolveratpython/quadrants/lang/ast/ast_transformer.py:31, but the imported symbol is never referenced anywhere else in this file.Verification
Grepping for
ASTResolverinpython/quadrants/lang/ast/ast_transformer.pyreturns exactly one match — the import line itself at line 31. There are no further usages in the module. Thebuild_Withhandler (around line 1533-1547) that this import was presumably added to support delegates the stream_parallel detection toFunctionDefTransformer._is_stream_parallel_with(line 1541), which has its ownASTResolverimport infunction_def_transformer.py. So the actual user ofASTResolveris the other module, not this one.Why it slipped through
This looks like a leftover from an earlier draft of
build_Withthat calledASTResolver.resolve_to(...)inline before the resolution logic was refactored intoFunctionDefTransformer._is_stream_parallel_with. The import was kept; the call site was removed.Step-by-step proof
- Open
python/quadrants/lang/ast/ast_transformer.pyand grep forASTResolver— single match on line 31 (the import). - Open
python/quadrants/lang/ast/ast_transformers/function_def_transformer.pyand grep forASTResolver— multiple matches: import plus actual usage in_is_stream_parallel_with(andresolve_valuecallers). - The
build_Withhandler inast_transformer.py(lines 1533-1547) only referencesFunctionDefTransformer._is_stream_parallel_with, neverASTResolverdirectly.
Impact
Zero behavioral impact — purely a dead import. Linters (ruff/flake8 with F401) will flag it, and a future grep for
ASTResolverin this file would mislead a maintainer into thinking the symbol is used here.Fix
Delete the
from quadrants.lang.ast.symbol_resolver import ASTResolverline atast_transformer.py:31. One-line removal. - Open
| @contextmanager | ||
| def stream_parallel(): | ||
| """Run top-level for loops in this block on separate GPU streams. | ||
|
|
||
| Used inside @qd.kernel. At Python runtime (outside kernels), this is a no-op. During kernel compilation, the AST | ||
| transformer calls into the C++ ASTBuilder to tag loops with a stream-parallel group ID. | ||
| """ |
There was a problem hiding this comment.
🟡 The docstring on stream_parallel says 'Run top-level for loops in this block on separate GPU streams' (plural), but per the same PR's user_guide/streams.md ('Multiple for loops within a single block share a stream and run serially on it') and the actual implementation, all for-loops within ONE with qd.stream_parallel(): block share ONE stream — it is consecutive blocks that get separate streams. Suggest clarifying to e.g. 'Run this block on its own GPU stream (separate from sibling stream_parallel blocks). Multiple for-loops inside one block share that stream and execute serially on it.'
Extended reasoning...
What the docstring says vs. what the code does
python/quadrants/lang/stream.py:131-138:
@contextmanager
def stream_parallel():
"""Run top-level for loops in this block on separate GPU streams.
Used inside @qd.kernel. ...
"""
yieldThe plural 'separate GPU streams' for the for-loops within a single block reads naturally as 'each for-loop here gets its own stream'. That contradicts the actual semantics established by this same PR.
Why it is wrong
ASTBuilder::begin_stream_parallel (quadrants/ir/frontend_ir.h:1027-1029) increments stream_parallel_group_counter_ once per with-block and assigns the new value to current_stream_parallel_group_id_:
void begin_stream_parallel() {
QD_ERROR_IF(current_stream_parallel_group_id_ != 0, ...);
current_stream_parallel_group_id_ = ++stream_parallel_group_counter_;
}Every for-loop inside the with-body then reads that single value. begin_frontend_range_for at frontend_ir.cpp:1395 stamps it onto for_loop_dec_.config, and the same pattern fires in begin_frontend_struct_for_on_snode (1409), begin_frontend_struct_for_on_external_tensor (1423), and begin_frontend_mesh_for (1439). So all for-loops in one block carry the same group id.
The launcher at quadrants/runtime/cuda/kernel_launcher.cpp:75-83 (and the byte-identical AMDGPU twin) creates one stream per unique group id — stream_by_id is keyed by stream_parallel_group_id, so a single block always maps to a single stream.
Why I'm certain — corroborated by the PR's own docs
The user-guide rewrite in this same PR (docs/source/user_guide/streams.md) spells out the correct behavior verbatim:
Consecutive
with qd.stream_parallel():blocks run concurrently. Multiple for loops within a single block share a stream and run serially on it.
So the docstring directly contradicts the prose docs the same PR ships.
Step-by-step proof — two for-loops in one block
@qd.kernel
def k():
with qd.stream_parallel():
for i in range(N): a[i] = 1.0 # for-A
for j in range(N): b[j] = 2.0 # for-Bbuild_Withcallsbegin_stream_parallel()→ counter goes 0→1,current_stream_parallel_group_id_=1.begin_frontend_range_forfor for-A stampsstream_parallel_group_id=1onto its FrontendForStmt.begin_frontend_range_forfor for-B reads the samecurrent_stream_parallel_group_id_=1and stamps 1 onto its FrontendForStmt — the counter is not incremented again.end_stream_parallelresets to 0 (counter retains the high-water mark).- After lowering and offloading both tasks carry
stream_parallel_group_id=1. - Launcher walks
offloaded_tasks: enters the stream-parallel branch, buildsstream_by_idkeyed by group id → exactly one entry for id=1, so exactly one stream is created. for-A and for-B are launched on that same stream sequentially.
So the user-visible behavior for the docstring's described scenario ('top-level for loops in this block') is one stream, not multiple. The plural is wrong.
Fix
Replace the docstring with text that matches the implementation and the .md docs, e.g.:
@contextmanager
def stream_parallel():
"""Run this block on its own GPU stream, separate from sibling stream_parallel blocks.
Used inside @qd.kernel. Consecutive `with qd.stream_parallel():` blocks execute concurrently on
different streams; multiple for-loops inside one block share that block's stream and execute
serially on it. At Python runtime (outside kernels), this is a no-op — the AST transformer
interprets the with-statement at compile time and tags loops with a stream-parallel group ID.
"""
yieldSeverity
All four verifiers rated this nit: pure documentation papercut, no behavioral or correctness impact, the help() output for stream_parallel is the only user-facing surface affected. Worth folding in because the contradiction is right next to the PR's own .md prose.
There was a problem hiding this comment.
Comment is unchanged at python/quadrants/lang/stream.py:133-137 — still reads "Run top-level for loops in this block on separate GPU streams" (plural). The contradiction with user_guide/streams.md ("Multiple for loops within a single block share a stream and run serially on it") still stands.
Coverage Report (
|
| File | Coverage | Missing |
|---|---|---|
🔴 python/quadrants/lang/ast/ast_transformer.py |
0% | 1531-1547 |
🔴 python/quadrants/lang/ast/ast_transformers/function_def_transformer.py |
57% | 29,35,456-457,461,464,468-469,475-476,479-480,485 |
🔴 python/quadrants/lang/ast/symbol_resolver.py |
0% | 59-60,66-67,69-70,72-79,81-84,86-89 |
🔴 python/quadrants/lang/stream.py |
0% | 2,131-132,138,141 |
🔴 tests/python/test_streams.py |
24% | 342-344,346-353,355-358,364-367,369-378,380-383,385-390,396,398-399,401-412,414-427,429,432-434,436-440,442-446,448-450,454,474-478 |
Diff coverage: 24% · Overall: 60% · 194 lines, 148 missing
When object resolution fails (dual import paths), fall back to checking the AST node name directly. Inside @qd.kernel the only valid with-context is qd.stream_parallel(), so checking the attribute name is sufficient. Co-authored-by: Cursor <cursoragent@cursor.com>
Include the failing statement type, index, and body length to help debug the persistent Linux build x64 test failures. Co-authored-by: Cursor <cursoragent@cursor.com>
Keep both new limitation bullets from head and base branches. Co-authored-by: Cursor <cursoragent@cursor.com>
| raise QuadrantsSyntaxError( | ||
| "When using qd.stream_parallel(), all top-level statements " | ||
| "in the kernel must be 'with qd.stream_parallel():' blocks. " | ||
| f"Move non-parallel code to a separate kernel. " | ||
| f"[stmt {i}: {stmt_desc}, body_len={len(body)}]" |
There was a problem hiding this comment.
🟡 The error message at function_def_transformer.py:497-501 ends with a debug-instrumentation suffix [stmt {i}: {stmt_desc}, body_len={len(body)}] (added by commit a6c3852 to chase the persistent Linux build x64 test failures) that leaks raw ast.dump() output to end users. The user-facing portion ("all top-level statements... must be with qd.stream_parallel(): blocks. Move non-parallel code to a separate kernel.") is sufficient on its own; please drop the bracketed suffix once the test failure being debugged is resolved.
Extended reasoning...
What the bug is
The QuadrantsSyntaxError raised by _validate_stream_parallel_exclusivity at function_def_transformer.py:497-501 currently formats as:
When using qd.stream_parallel(), all top-level statements in the kernel must be 'with qd.stream_parallel():' blocks. Move non-parallel code to a separate kernel. [stmt {i}: {stmt_desc}, body_len={len(body)}]
The trailing bracket is debug instrumentation. The commit that added it (a6c3852 — Add diagnostic info to stream_parallel exclusivity error message) explicitly states the intent in its commit body: "Include the failing statement type, index, and body length to help debug the persistent Linux build x64 test failures." That is a temporary diagnostic for an in-flight investigation, not durable user-facing text.
Why the suffix is unsuitable for users
stmt_desc is built at function_def_transformer.py:493-496 by appending ast.dump(ctx_expr.func) whenever the offending statement is an ast.With whose context is a Call with an Attribute func. ast.dump produces raw Python AST repr strings like Attribute(value=Name(id='qd', ctx=Load()), attr='static_assert', ctx=Load()) — implementation-detail strings that would land verbatim in a user-visible SyntaxError. body_len (the count of top-level kernel-body statements) has no actionable value to a kernel author, and a bare integer index without source location adds little.
Why this is safe to drop
The user-facing first sentence is already actionable on its own: it names the rule ("all top-level statements... must be with qd.stream_parallel(): blocks") and the remediation ("Move non-parallel code to a separate kernel"). Removing the diagnostic bracket has zero correctness or performance impact — it only improves the polish of the displayed message. The diagnostic served its purpose during the Linux x64 investigation, but durable error text should not carry chase-the-bug instrumentation into a release.
Step-by-step proof
- A user kernel mixes a
with qd.stream_parallel():block with a non-stream_parallel statement (e.g.with some_other_ctx():followed by awith qd.stream_parallel():block). _validate_stream_parallel_exclusivity(function_def_transformer.py:483-501) walksnode.body, findshas_sp = True, then encounters the offendingast.Withwhose call issome_other_ctx()._is_stream_parallel_withreturnsFalse, so the validator constructsstmt_desc = "With(with Attribute(value=Name(id='some_other_ctx', ctx=Load()), attr='__call__', ctx=Load()))"(or similar AST-dump string for the resolved attribute chain).- The raised message reads:
...Move non-parallel code to a separate kernel. [stmt 0: With(with Attribute(...)), body_len=2]. - The trailing bracket is implementation noise unrelated to the user's fix.
Fix
Drop the stmt_desc and body_len formatting and the f-string at the tail of the error, leaving:
raise QuadrantsSyntaxError(
"When using qd.stream_parallel(), all top-level statements "
"in the kernel must be 'with qd.stream_parallel():' blocks. "
"Move non-parallel code to a separate kernel."
)Once the underlying Linux build x64 test failure that motivated commit a6c3852 is resolved, the diagnostic bracket can be removed in the same change. If the diagnostic is still occasionally useful, consider gating it behind QD_LOG_LEVEL / logger.debug rather than embedding it in the user-visible exception text.
Co-authored-by: Cursor <cursoragent@cursor.com>
Coverage Report (
|
| File | Coverage | Missing |
|---|---|---|
🔴 python/quadrants/lang/ast/ast_transformer.py |
0% | 1531-1547 |
🔴 python/quadrants/lang/ast/ast_transformers/function_def_transformer.py |
57% | 29,35,191,382,394,446,507-508,512,515,519-521,524-528,530-531,538-539,549,553-554,557 |
🔴 python/quadrants/lang/ast/symbol_resolver.py |
0% | 59-60,66-67,69-70,72-79,81-84,86-89 |
🔴 python/quadrants/lang/stream.py |
0% | 2,131-132,138,141 |
🔴 tests/python/test_streams.py |
24% | 342-344,346-353,355-358,364-367,369-378,380-383,385-390,396,398-399,401-412,414-427,429,432-434,436-440,442-446,448-450,454,474-478 |
Diff coverage: 28% · Overall: 65% · 224 lines, 161 missing
…pr with stream_parallel Resolve conflicts in CUDA and AMDGPU kernel launchers by extracting a shared prepare_task lambda that handles adstack metadata, bound_expr reducers, and grid-dim capping. Both serial and stream-parallel dispatch paths use the same preparation logic. Co-authored-by: Cursor <cursoragent@cursor.com>
…adrantsic-3-stream-parallel
Coverage Report (
|
| File | Coverage | Missing |
|---|---|---|
🔴 python/quadrants/lang/ast/ast_transformer.py |
0% | 1531-1547 |
🔴 python/quadrants/lang/ast/ast_transformers/function_def_transformer.py |
57% | 29,35,191,382,394,446,507-508,512,515,519-521,524-528,530-531,538-539,549,553-554,557 |
🔴 python/quadrants/lang/ast/symbol_resolver.py |
0% | 59-60,66-67,69-70,72-79,81-84,86-89 |
🔴 python/quadrants/lang/stream.py |
24% | 2,43,80-82,88,93,97,105,139-140,146,149 |
🔴 tests/python/test_streams.py |
24% | 357-359,361-368,370-373,379-382,384-393,395-398,400-405,411,413-414,416-427,429-442,444,447-449,451-455,457-461,463-465,469,489-493 |
Diff coverage: 28% · Overall: 65% · 236 lines, 169 missing
…leted comments The Linux build CI runs with QD_KERNEL_COVERAGE=1, which injects _qd_cov[probe_id] = 1 Assign nodes before each statement in the kernel body. _validate_stream_parallel_exclusivity was rejecting these probes as non-stream_parallel statements. Add _is_coverage_probe() to skip them. Also restores the 4 safety comments in CUDA kernel_launcher.cpp's prepare_task lambda that were flagged by the deleted-comments check, fixes clang-format line break, and reflows the symbol_resolver.py docstring to 120 characters. Co-authored-by: Cursor <cursoragent@cursor.com>
…cpu' into hp/streams-quadrantsic-3-stream-parallel
Coverage Report (
|
| File | Coverage | Missing |
|---|---|---|
🔴 python/quadrants/lang/ast/ast_transformer.py |
59% | 1531-1532,1534,1537,1539,1541,1543 |
🔴 python/quadrants/lang/ast/ast_transformers/function_def_transformer.py |
59% | 29,35,191,382,394,446,507-508,512,515,519-521,524-528,530-531,538-539,549-550,560,566-567,570 |
🔴 python/quadrants/lang/ast/symbol_resolver.py |
0% | 59-60,66-67,69-70,72-79,81-84,86-89 |
🔴 python/quadrants/lang/stream.py |
0% | 2,152-153,159,162 |
🟢 tests/python/test_streams.py |
96% | 489-493 |
Diff coverage: 71% · Overall: 74% · 232 lines, 67 missing
…adrantsic-3-stream-parallel
Coverage Report (
|
| File | Coverage | Missing |
|---|---|---|
🔴 python/quadrants/lang/ast/ast_transformer.py |
59% | 1531-1532,1534,1537,1539,1541,1543 |
🔴 python/quadrants/lang/ast/ast_transformers/function_def_transformer.py |
59% | 29,35,191,382,394,446,507-508,512,515,519-521,524-528,530-531,538-539,549-550,560,566-567,570 |
🔴 python/quadrants/lang/ast/symbol_resolver.py |
0% | 59-60,66-67,69-70,72-79,81-84,86-89 |
🔴 python/quadrants/lang/stream.py |
22% | 2,53,123,152-153,159,162 |
🟢 tests/python/test_streams.py |
96% | 489-493 |
Diff coverage: 71% · Overall: 74% · 236 lines, 69 missing
…ucer ctx param Base branch added &ctx parameter to ensure_per_task_float_heap_post_reducer, moved cap_blocks outside allocas scope, and added safety comments in both CUDA and AMDGPU kernel launchers. Integrated into prepare_task lambda. Co-authored-by: Cursor <cursoragent@cursor.com>
Coverage Report (
|
| File | Coverage | Missing |
|---|---|---|
🔴 python/quadrants/lang/ast/ast_transformer.py |
59% | 1531-1532,1534,1537,1539,1541,1543 |
🔴 python/quadrants/lang/ast/ast_transformers/function_def_transformer.py |
59% | 29,35,191,382,394,446,507-508,512,515,519-521,524-528,530-531,538-539,549-550,560,566-567,570 |
🔴 python/quadrants/lang/ast/symbol_resolver.py |
0% | 59-60,66-67,69-70,72-79,81-84,86-89 |
🔴 python/quadrants/lang/stream.py |
0% | 2,178-179,185,188 |
🟢 tests/python/test_streams.py |
96% | 489-493 |
Diff coverage: 71% · Overall: 74% · 232 lines, 67 missing
|
repalced with streams 1-4 |
Introduces stream_parallel() for running top-level for-loop blocks on separate GPU streams. The AST transformer maps 'with qd.stream_parallel()' blocks to stream-parallel group IDs, which propagate through IR lowering and offloading to the CUDA/AMDGPU kernel launchers. Each unique group ID gets its own stream at launch time. Includes validation that all top-level kernel statements must be stream_parallel blocks (no mixing), and offline cache key support.
lines added: +377 - 161 = +216
Issue: #
Brief Summary
copilot:summary
Walkthrough
copilot:walkthrough