feat: dual mode transaction scheduler#1004
feat: dual mode transaction scheduler#1004bmuddha wants to merge 7 commits intobmuddha/transaction/original-bincodefrom
Conversation
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughThe PR threads a tokio::sync::Notify-based mode_switcher from MagicValidator into TransactionSchedulerState and TransactionScheduler, adds a ReplicationMode config option, changes TransactionProcessingMode::Replay to carry a ReplayContext with a persist flag, introduces coordinator coordination modes (Primary/Replica) with runtime switching, enforces per-mode transaction acceptance, updates executor and ledger APIs to use IndexedTransaction and explicit per-transaction indices, and adds replica-mode tests and test-kit helpers. Files across validator, processor, ledger, core, tests, and test-kit are updated to propagate these changes. Assessment against linked issues
Out-of-scope changes
Suggested reviewers
✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
Warning This pull request is not mergeable via GitHub because a downstack PR is open. Once all requirements are satisfied, merge this PR as a stack on Graphite.
This stack of pull requests is managed by Graphite. Learn more about stacking. |
There was a problem hiding this comment.
Actionable comments posted: 6
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
magicblock-core/src/link/transactions.rs (1)
78-89:⚠️ Potential issue | 🟡 MinorFix stale
TransactionProcessingModedocumentation.The enum-level comment still says each variant carries a one-shot sender, but
Replay(bool)no longer does. This is now misleading for API consumers.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@magicblock-core/src/link/transactions.rs` around lines 78 - 89, The enum-level doc for TransactionProcessingMode is stale: it claims "each variant also carries the one-shot sender" but Replay(bool) no longer carries a sender; update the documentation to accurately state which variants carry result senders (Simulation and Execution) and that Replay only carries a bool controlling ledger persistence. Also adjust the Replay variant doc to clearly explain the bool semantics (true = record to ledger, false = no recording) and remove any reference to one-shot senders from the enum-level comment.magicblock-processor/tests/replay.rs (1)
93-113:⚠️ Potential issue | 🟡 MinorValidate “no notifications” after replay application is confirmed.
Because Line 94 only confirms enqueue success, the emptiness checks can run too early and miss delayed notifications. Assert post-replay state first, then check channels stay empty over the timeout window.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@magicblock-processor/tests/replay.rs` around lines 93 - 113, The test currently checks that notifications channels are empty immediately after enqueueing a replay (env.replay_transaction(false, txn)) which can race with delayed notifications; instead, first verify post-replay state by checking accounts via env.accountsdb.get_account for each pubkey (assert the data byte equals 42) and only after confirming the replay applied, assert that env.dispatch.transaction_status.recv_timeout(TIMEOUT) and env.dispatch.account_update.try_recv() remain empty; reorder the checks so the loop that inspects account state runs before the channel emptiness assertions (references: replay_transaction, env.accountsdb.get_account, env.dispatch.transaction_status, env.dispatch.account_update, TIMEOUT).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@magicblock-config/src/config/validator.rs`:
- Around line 22-29: Add equality derives to the ReplicationMode enum so it can
be compared in tests and config code: update the derive on ReplicationMode to
include PartialEq and Eq alongside Deserialize, Serialize, Debug, Clone; no
other API changes needed because Url already implements PartialEq/Eq, so
consumers can now use ==/!= with ReplicationMode.
In `@magicblock-core/src/link/transactions.rs`:
- Around line 269-275: The replay path is dropping any pre-encoded bytes by
calling txn.sanitize(true) and hardcoding ProcessableTransaction.encoded: None;
update the replay branch to call txn.sanitize_with_encoded(true) (or the
equivalent method that returns WithEncoded) and set
ProcessableTransaction.encoded to the encoded bytes returned by that call so
that TransactionProcessingMode::Replay preserves optional pre-encoded bytes;
adjust the assignment where ProcessableTransaction { transaction, mode, encoded
} is built to use the encoded value from sanitize_with_encoded.
In `@magicblock-processor/src/scheduler/coordinator.rs`:
- Around line 92-99: Replica readiness currently returns true if
pending.is_none(), which allows non-conflicting replays to run concurrently and
breaks primary ordering; change the Replica branch in is_ready (and the same
logic at the other occurrence) to require both pending.is_none() AND "no replay
in flight" (e.g., check m.in_flight_replay/is_empty or m.active_replay_count ==
0 or whatever field tracks current replay activity) so only a single replay can
be active at once; update Replica's readiness checks to reference the concrete
field names (pending and the in-flight/active replay tracker) in
ReplicaMode/Replica struct to enforce strict single‑flight replay ordering.
- Around line 217-219: The decrement of
CoordinationMode::Primary.p.blocked_txn_count using p.blocked_txn_count -=
txn.is_some() as usize can underflow after a Replica→Primary switch; change the
decrement to be safe by either checking p.blocked_txn_count > 0 before
subtracting or using a saturating/checked subtraction (e.g., saturating_sub)
whenever blocked_txn_count is modified (the same fix should be applied to the
other occurrences in the same block/branch around the 228–239 region) so that
blocked_txn_count never wraps below zero.
- Around line 245-255: The match in is_transaction_allowed currently blocks
Execution in Replica mode but still allows Simulation; tighten the Replica
branch so non-replay modes are rejected by matching both Execution and
Simulation. Update the match arm for Replica to something like matching
(Replica(_), Execution(_) | Simulation(_)) => false so Replica(_) only permits
Replay modes, leaving the Primary/Replay rule unchanged; reference function
is_transaction_allowed, CoordinationMode::Replica, and
TransactionProcessingMode::{Execution, Simulation}.
In `@magicblock-processor/tests/replica_ordering.rs`:
- Around line 120-140: The helper submit_all_and_start currently spawns
concurrent tasks that call scheduler.replay, making send order nondeterministic
and risking deadlock on bounded channels; change it to submit sequentially by
iterating txs and awaiting scheduler.replay(true, tx).await directly (no
tokio::spawn) so the order matches sigs, and if the scheduler consumes via
run_scheduler(), ensure run_scheduler() is started before or concurrently with
submission to avoid blocking—refer to submit_all_and_start and scheduler.replay
to locate and update the code.
---
Outside diff comments:
In `@magicblock-core/src/link/transactions.rs`:
- Around line 78-89: The enum-level doc for TransactionProcessingMode is stale:
it claims "each variant also carries the one-shot sender" but Replay(bool) no
longer carries a sender; update the documentation to accurately state which
variants carry result senders (Simulation and Execution) and that Replay only
carries a bool controlling ledger persistence. Also adjust the Replay variant
doc to clearly explain the bool semantics (true = record to ledger, false = no
recording) and remove any reference to one-shot senders from the enum-level
comment.
In `@magicblock-processor/tests/replay.rs`:
- Around line 93-113: The test currently checks that notifications channels are
empty immediately after enqueueing a replay (env.replay_transaction(false, txn))
which can race with delayed notifications; instead, first verify post-replay
state by checking accounts via env.accountsdb.get_account for each pubkey
(assert the data byte equals 42) and only after confirming the replay applied,
assert that env.dispatch.transaction_status.recv_timeout(TIMEOUT) and
env.dispatch.account_update.try_recv() remain empty; reorder the checks so the
loop that inspects account state runs before the channel emptiness assertions
(references: replay_transaction, env.accountsdb.get_account,
env.dispatch.transaction_status, env.dispatch.account_update, TIMEOUT).
ℹ️ Review info
Configuration used: Path: .coderabbit.yaml
Review profile: ASSERTIVE
Plan: Pro
⛔ Files ignored due to path filters (1)
Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (14)
magicblock-api/src/magic_validator.rsmagicblock-config/src/config/validator.rsmagicblock-core/src/link/transactions.rsmagicblock-ledger/src/blockstore_processor/mod.rsmagicblock-processor/src/executor/mod.rsmagicblock-processor/src/executor/processing.rsmagicblock-processor/src/scheduler/coordinator.rsmagicblock-processor/src/scheduler/mod.rsmagicblock-processor/src/scheduler/state.rsmagicblock-processor/src/scheduler/tests.rsmagicblock-processor/tests/replay.rsmagicblock-processor/tests/replica_ordering.rstest-kit/Cargo.tomltest-kit/src/lib.rs
There was a problem hiding this comment.
Actionable comments posted: 2
♻️ Duplicate comments (3)
magicblock-processor/tests/replica_ordering.rs (1)
120-136:⚠️ Potential issue | 🟠 MajorPotential pre-start enqueue hang in
submit_all_and_start.On Line 127-Line 132, all
replay()sends are awaited beforerun_scheduler()(Line 135). If replay uses a bounded queue and the consumer starts only afterrun_scheduler(), this can block the helper under stress.Suggested adjustment
async fn submit_all_and_start( env: &mut ExecutionTestEnv, txs: Vec<Transaction>, ) -> Vec<Signature> { let sigs: Vec<Signature> = txs.iter().map(|tx| tx.signatures[0]).collect(); - // Submit all transactions sequentially to preserve order + // Start consumer first to avoid potential bounded-queue backpressure deadlock. + env.run_scheduler(); + env.advance_slot(); + + // Submit sequentially to preserve deterministic order. for tx in txs { env.transaction_scheduler .replay(true, tx) .await .expect("Failed to submit transaction"); } - - // Now start the scheduler - env.run_scheduler(); - env.advance_slot(); sigs }#!/bin/bash set -euo pipefail # Verify whether replay submission can block before scheduler startup. # 1) Locate replay method definitions and inspect enqueue behavior. ast-grep --pattern $'async fn replay($$$) -> $_ { $$$ }' || true ast-grep --pattern $'fn replay($$$) -> $_ { $$$ }' || true # 2) Find channel types/capacities used by scheduler/dispatch paths. rg -n -C3 --type rust 'async_channel::bounded|async_channel::unbounded|tokio::sync::mpsc::channel|crossbeam_channel::bounded|crossbeam_channel::unbounded|flume::bounded|flume::unbounded' # 3) Check where scheduler consumer is started relative to replay calls. rg -n -C3 --type rust '\brun_scheduler\s*\(' rg -n -C3 --type rust '\.replay\s*\('Expected verification outcome: if replay ultimately sends to a bounded channel and consumer activation occurs in/after
run_scheduler(), this helper pattern remains vulnerable to blocking.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@magicblock-processor/tests/replica_ordering.rs` around lines 120 - 136, The helper submit_all_and_start awaits transaction_scheduler.replay for each tx before calling env.run_scheduler, which can block if replay enqueues onto a bounded queue; fix by starting the consumer before flooding the queue or making enqueues non-blocking: either call env.run_scheduler() (and env.advance_slot() if needed) prior to looping over txs, or submit replays without awaiting (spawn each replay future or use a try_send-style non-blocking API) so that transaction_scheduler.replay calls cannot backpressure the test helper; update submit_all_and_start accordingly to use run_scheduler or non-blocking replay submission.magicblock-processor/src/scheduler/coordinator.rs (1)
234-238:⚠️ Potential issue | 🟠 MajorInitialize
blocked_txn_countfrom existing queues during Replica→Primary switch.Line [235] sets
blocked_txn_countto0. If any blocked entries exist at switch time, Primary backpressure accounting starts undercounted.Suggested fix
let mode = PrimaryMode { - blocked_txn_count: 0, + blocked_txn_count: self + .blocked_transactions + .iter() + .map(|queue| queue.len()) + .sum(), max_blocked_txn: self.blocked_transactions.len() * BLOCKED_TXN_MULTIPLIER, };🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@magicblock-processor/src/scheduler/coordinator.rs` around lines 234 - 238, When constructing PrimaryMode during the Replica→Primary transition, initialize blocked_txn_count from the existing blocked queue instead of zero: set PrimaryMode.blocked_txn_count to reflect the current number of blocked entries (e.g. based on self.blocked_transactions.len() or the appropriate aggregated count of entries) so backpressure accounting starts correctly; leave max_blocked_txn computed with BLOCKED_TXN_MULTIPLIER as-is. Ensure you update the PrimaryMode initialization where PrimaryMode { blocked_txn_count: ..., max_blocked_txn: ... } is created.magicblock-core/src/link/transactions.rs (1)
273-279: 🧹 Nitpick | 🔵 TrivialPreserve pre-encoded bytes in
replay()submissions.Line [274] uses
sanitize(true)and Line [278] hardcodesencoded: None, so replay dropsWithEncoded<T>bytes and forces redundant serialization later.Suggested fix
pub async fn replay( &self, persist: bool, txn: impl SanitizeableTransaction, ) -> TransactionResult { let mode = TransactionProcessingMode::Replay(persist); - let transaction = txn.sanitize(true)?; + let (transaction, encoded) = txn.sanitize_with_encoded(true)?; let txn = ProcessableTransaction { transaction, mode, - encoded: None, + encoded, }; self.0 .send(txn) .await .map_err(|_| TransactionError::ClusterMaintenance) }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@magicblock-core/src/link/transactions.rs` around lines 273 - 279, The replay branch currently calls txn.sanitize(true) which strips pre-encoded bytes and then hardcodes encoded: None on the new ProcessableTransaction, causing replay to drop WithEncoded<T> bytes; instead preserve any existing encoded bytes by calling sanitize in a way that does not remove the encoded payload (e.g., txn.sanitize(false)) and set ProcessableTransaction.encoded from the original txn's encoded field (or clone/retain it via the original WithEncoded<T> accessor) when creating the ProcessableTransaction for TransactionProcessingMode::Replay so redundant serialization is avoided.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@magicblock-processor/src/scheduler/coordinator.rs`:
- Around line 242-256: The doc comment for is_transaction_allowed incorrectly
states that Primary accepts only Execution and Replica accepts only Replay, but
the match implementation also permits Simulation in both modes; update either
the comment or the logic to match: either revise the comment above
is_transaction_allowed to explicitly mention that Simulation transactions are
allowed in both CoordinationMode::Primary and CoordinationMode::Replica, or
change the match arms in is_transaction_allowed (referencing
CoordinationMode::Primary, CoordinationMode::Replica and
TransactionProcessingMode::{Execution, Replay, Simulation}) to disallow
Simulation where appropriate so policy and code are consistent.
In `@magicblock-processor/tests/replica_ordering.rs`:
- Around line 237-246: The "Independent" branch reuses the same independent
account for both non-conflict transactions in each 4-item batch, causing
unintended conflicts; in the loop (for i in 0..count) where tx_write is called
for group_a/group_b/independent, change how you compute idx for independent so
the two independent branches in a batch select distinct accounts (e.g., derive
idx from the batch number rather than i directly: use batch = i/4 and an offset
for the first vs second independent slot, or multiply batch by 2 and add 0/1
based on (i % 4) to pick different independent[(batch*2 + offset) %
independent.len()]); update the selection logic used in the tx_write calls so
independent accesses are unique per non-conflict transaction.
---
Duplicate comments:
In `@magicblock-core/src/link/transactions.rs`:
- Around line 273-279: The replay branch currently calls txn.sanitize(true)
which strips pre-encoded bytes and then hardcodes encoded: None on the new
ProcessableTransaction, causing replay to drop WithEncoded<T> bytes; instead
preserve any existing encoded bytes by calling sanitize in a way that does not
remove the encoded payload (e.g., txn.sanitize(false)) and set
ProcessableTransaction.encoded from the original txn's encoded field (or
clone/retain it via the original WithEncoded<T> accessor) when creating the
ProcessableTransaction for TransactionProcessingMode::Replay so redundant
serialization is avoided.
In `@magicblock-processor/src/scheduler/coordinator.rs`:
- Around line 234-238: When constructing PrimaryMode during the Replica→Primary
transition, initialize blocked_txn_count from the existing blocked queue instead
of zero: set PrimaryMode.blocked_txn_count to reflect the current number of
blocked entries (e.g. based on self.blocked_transactions.len() or the
appropriate aggregated count of entries) so backpressure accounting starts
correctly; leave max_blocked_txn computed with BLOCKED_TXN_MULTIPLIER as-is.
Ensure you update the PrimaryMode initialization where PrimaryMode {
blocked_txn_count: ..., max_blocked_txn: ... } is created.
In `@magicblock-processor/tests/replica_ordering.rs`:
- Around line 120-136: The helper submit_all_and_start awaits
transaction_scheduler.replay for each tx before calling env.run_scheduler, which
can block if replay enqueues onto a bounded queue; fix by starting the consumer
before flooding the queue or making enqueues non-blocking: either call
env.run_scheduler() (and env.advance_slot() if needed) prior to looping over
txs, or submit replays without awaiting (spawn each replay future or use a
try_send-style non-blocking API) so that transaction_scheduler.replay calls
cannot backpressure the test helper; update submit_all_and_start accordingly to
use run_scheduler or non-blocking replay submission.
ℹ️ Review info
Configuration used: Path: .coderabbit.yaml
Review profile: ASSERTIVE
Plan: Pro
📒 Files selected for processing (3)
magicblock-core/src/link/transactions.rsmagicblock-processor/src/scheduler/coordinator.rsmagicblock-processor/tests/replica_ordering.rs
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@magicblock-processor/src/scheduler/coordinator.rs`:
- Around line 234-235: The initialization of PrimaryMode with blocked_txn_count:
0 assumes all executor blocked queues are empty at the Replica→Primary
transition; update this to be defensive by either (a) computing
blocked_txn_count from the actual queue lengths across executors (use the
executor/queue APIs available in the coordinator to sum blocked counts) or (b)
adding a runtime assertion that verifies all blocked queues are empty before
constructing PrimaryMode, or (c) expanding the doc comment on the transition
function to explicitly state the empty-queue invariant; reference PrimaryMode
and blocked_txn_count and the coordinator's executor queue accessors when making
the change so the check or computed initialization is placed at the mode-switch
site right after ledger replay completes.
1acdff2 to
6e3a51e
Compare
9edc6c4 to
378e32d
Compare
6e3a51e to
936c1c0
Compare
378e32d to
618b99c
Compare
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (3)
magicblock-processor/src/executor/processing.rs (1)
55-70:⚠️ Potential issue | 🟠 Major
persist = Some(false)still triggers failure persistence side effects.On error,
execute()always routes tohandle_failure(), andhandle_failure()unconditionally records failure to ledger. That violates the stated replay-without-persist contract.Suggested fix
- fn handle_failure( + fn handle_failure( &self, txn: SanitizedTransaction, err: TransactionError, logs: Option<Vec<String>>, tx: TxnExecutionResultTx, + persist: Option<bool>, ) { FAILED_TRANSACTIONS_COUNT.inc(); - self.record_failure(txn, Err(err.clone()), logs); + if persist.unwrap_or(true) { + self.record_failure(txn, Err(err.clone()), logs); + } // Even on failure, ensure stash is clear (though likely empty if load failed). ExecutionTlsStash::clear(); if let Some(tx) = tx { let _ = tx.send(Err(err)); } }- return self.handle_failure(txn, err, None, tx); + return self.handle_failure(txn, err, None, tx, persist); ... - return self.handle_failure( + return self.handle_failure( txn, TransactionError::CommitCancelled, Some(vec![err.to_string()]), tx, + persist, );Also applies to: 163-172
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@magicblock-processor/src/executor/processing.rs` around lines 55 - 70, The commit error path currently always calls handle_failure which records the failure to the ledger; change the logic in execute() around the commit_accounts call to respect the persist flag: use the existing notify/persist check (persist.is_none()) and if persist == Some(false) (i.e. notify == false) do NOT invoke handle_failure that writes to ledger—return or propagate a non-persistent error instead (or call a non-persistent failure helper) so no ledger write occurs; apply the same change for the other similar block around lines 163-172. Refer to commit_accounts, execute(), handle_failure, persist and notify to locate and fix the branches.magicblock-processor/tests/replay.rs (1)
105-106:⚠️ Potential issue | 🟡 MinorUse a timed receive for the negative account-update assertion.
At Line 105,
try_recv()can pass before a delayed update arrives, which makes this check flaky under load. Userecv_timeout(TIMEOUT)like the status assertion.Suggested patch
assert!( - env.dispatch.account_update.try_recv().is_err(), + env.dispatch.account_update.recv_timeout(TIMEOUT).is_err(), "Replay should NOT broadcast account updates" );🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@magicblock-processor/tests/replay.rs` around lines 105 - 106, The negative assertion currently uses env.dispatch.account_update.try_recv() which can race with delayed updates; replace it with a timed receive using env.dispatch.account_update.recv_timeout(TIMEOUT) (the same TIMEOUT used for status) and assert that this call returns a timeout (is_err/is_timeout) so the test is no longer flaky under load; update the assertion near the existing status assertion in replay.rs to use recv_timeout on the account_update receiver.magicblock-processor/src/scheduler/mod.rs (1)
131-137:⚠️ Potential issue | 🔴 CriticalPin the
Notifiedfuture to avoid losing the Primary-mode switch signal.At line 135,
self.mode_switcher.notified()is recreated every loop iteration insideselect!. With TokioNotify, a wake-up fromnotify_one()can be lost if theNotifiedfuture is not yet polled when the notification occurs. Since the select! drops losing branches (including thenotified()future) each iteration, a registration-timing window opens wherenotify_one()calls can pass unobserved, potentially leaving the scheduler stuck in Replica mode.Suggested patch
async fn run(mut self) { let mut block_produced = self.latest_block.subscribe(); + let mut mode_switch_notified = self.mode_switcher.notified(); + tokio::pin!(mode_switch_notified); + let mut switched_to_primary = false; loop { tokio::select! { biased; Ok(()) = block_produced.recv() => self.transition_to_new_slot(), Some(executor) = self.ready_rx.recv() => self.handle_ready_executor(executor), - _ = self.mode_switcher.notified() => { + _ = &mut mode_switch_notified, if !switched_to_primary => { self.coordinator.switch_to_primary_mode(); + switched_to_primary = true; } Some(txn) = self.transactions_rx.recv(), if self.coordinator.is_ready() => { self.handle_new_transaction(txn); }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@magicblock-processor/src/scheduler/mod.rs` around lines 131 - 137, The Notified future is created inside the tokio::select! each iteration and can be lost between polls; fix by creating and pinning a single Notified future outside the loop (e.g., let mut mode_switch_fut = Box::pin(self.mode_switcher.notified())) and use mode_switch_fut.as_mut() in the select! branch instead of recreating self.mode_switcher.notified() each time, then after handling the notification (where you call self.coordinator.switch_to_primary_mode()) recreate/re-pin a fresh notified future so future notifications are not missed; apply this change where the loop invokes tokio::select! alongside self.transition_to_new_slot() and self.handle_ready_executor().
♻️ Duplicate comments (2)
magicblock-core/src/link/transactions.rs (1)
273-279: 🧹 Nitpick | 🔵 TrivialReplay path still discards optional encoded bytes.
replay()still sanitizes viasanitize(true)and hardcodesencoded: None, soWithEncoded<T>bytes are dropped on replay.Proposed adjustment
pub async fn replay( &self, persist: bool, txn: impl SanitizeableTransaction, ) -> TransactionResult { let mode = TransactionProcessingMode::Replay(persist); - let transaction = txn.sanitize(true)?; + let (transaction, encoded) = txn.sanitize_with_encoded(true)?; let txn = ProcessableTransaction { transaction, mode, - encoded: None, + encoded, }; self.0 .send(txn) .await .map_err(|_| TransactionError::ClusterMaintenance) }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@magicblock-core/src/link/transactions.rs` around lines 273 - 279, The replay path drops WithEncoded<T> bytes because it calls txn.sanitize(true) and always sets ProcessableTransaction.encoded = None; instead, preserve and pass through the optional encoded bytes: do not force-removal in replay (call sanitize(false) or otherwise avoid stripping encoded) and set ProcessableTransaction.encoded to the original txn's encoded bytes (or capture txn.encoded before sanitization) in the replay branch (the code constructing ProcessableTransaction in replay()). Ensure changes reference replay(), sanitize(...), ProcessableTransaction, and WithEncoded<T> so encoded data is retained.magicblock-processor/tests/replica_ordering.rs (1)
243-245:⚠️ Potential issue | 🟡 MinorIndependent branch still reuses the same account twice per 4-item batch.
let idx = (i / 4) % independent.len()maps bothi % 4 == 2andi % 4 == 3to the same index, so those two transactions are not independent as intended.Suggested patch
let mut txs = Vec::with_capacity(count); + let mut independent_idx = 0usize; for i in 0..count { env.advance_slot(); match i % 4 { 0 => txs.push(tx_write(&mut env, group_a[0], i as u8)), 1 => txs.push(tx_write(&mut env, group_b[0], i as u8)), _ => { // Independent writes to unique accounts - let idx = (i / 4) % independent.len(); + let idx = independent_idx % independent.len(); + independent_idx += 1; txs.push(tx_write(&mut env, independent[idx], i as u8)); } } }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@magicblock-processor/tests/replica_ordering.rs` around lines 243 - 245, The index calculation for independent writes is wrong: replace the current idx = (i / 4) % independent.len() with an expression that uses the position within the 4-item batch (e.g., let batch_pos = i % 4; let idx = batch_pos % independent.len()) so each of the four transactions in the batch maps to different entries in independent; update the code around txs.push(tx_write(&mut env, independent[idx], i as u8)) accordingly to use the new idx computation.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@test-integration/test-tools/src/toml_to_args.rs`:
- Around line 106-107: The loop currently uses entries.flatten() which silently
drops Err values from read_dir; instead iterate raw entries (for entry in
entries) and pattern-match each Result so that Ok(entry) prints the file name
and Err(err) logs the per-entry error (including the path/context) via eprintln!
(or the existing logger) to surface filesystem failures; update the loop around
the entries variable in toml_to_args.rs to handle Result::Err cases rather than
flattening them away.
---
Outside diff comments:
In `@magicblock-processor/src/executor/processing.rs`:
- Around line 55-70: The commit error path currently always calls handle_failure
which records the failure to the ledger; change the logic in execute() around
the commit_accounts call to respect the persist flag: use the existing
notify/persist check (persist.is_none()) and if persist == Some(false) (i.e.
notify == false) do NOT invoke handle_failure that writes to ledger—return or
propagate a non-persistent error instead (or call a non-persistent failure
helper) so no ledger write occurs; apply the same change for the other similar
block around lines 163-172. Refer to commit_accounts, execute(), handle_failure,
persist and notify to locate and fix the branches.
In `@magicblock-processor/src/scheduler/mod.rs`:
- Around line 131-137: The Notified future is created inside the tokio::select!
each iteration and can be lost between polls; fix by creating and pinning a
single Notified future outside the loop (e.g., let mut mode_switch_fut =
Box::pin(self.mode_switcher.notified())) and use mode_switch_fut.as_mut() in the
select! branch instead of recreating self.mode_switcher.notified() each time,
then after handling the notification (where you call
self.coordinator.switch_to_primary_mode()) recreate/re-pin a fresh notified
future so future notifications are not missed; apply this change where the loop
invokes tokio::select! alongside self.transition_to_new_slot() and
self.handle_ready_executor().
In `@magicblock-processor/tests/replay.rs`:
- Around line 105-106: The negative assertion currently uses
env.dispatch.account_update.try_recv() which can race with delayed updates;
replace it with a timed receive using
env.dispatch.account_update.recv_timeout(TIMEOUT) (the same TIMEOUT used for
status) and assert that this call returns a timeout (is_err/is_timeout) so the
test is no longer flaky under load; update the assertion near the existing
status assertion in replay.rs to use recv_timeout on the account_update
receiver.
---
Duplicate comments:
In `@magicblock-core/src/link/transactions.rs`:
- Around line 273-279: The replay path drops WithEncoded<T> bytes because it
calls txn.sanitize(true) and always sets ProcessableTransaction.encoded = None;
instead, preserve and pass through the optional encoded bytes: do not
force-removal in replay (call sanitize(false) or otherwise avoid stripping
encoded) and set ProcessableTransaction.encoded to the original txn's encoded
bytes (or capture txn.encoded before sanitization) in the replay branch (the
code constructing ProcessableTransaction in replay()). Ensure changes reference
replay(), sanitize(...), ProcessableTransaction, and WithEncoded<T> so encoded
data is retained.
In `@magicblock-processor/tests/replica_ordering.rs`:
- Around line 243-245: The index calculation for independent writes is wrong:
replace the current idx = (i / 4) % independent.len() with an expression that
uses the position within the 4-item batch (e.g., let batch_pos = i % 4; let idx
= batch_pos % independent.len()) so each of the four transactions in the batch
maps to different entries in independent; update the code around
txs.push(tx_write(&mut env, independent[idx], i as u8)) accordingly to use the
new idx computation.
ℹ️ Review info
Configuration used: Path: .coderabbit.yaml
Review profile: ASSERTIVE
Plan: Pro
⛔ Files ignored due to path filters (2)
Cargo.lockis excluded by!**/*.locktest-integration/Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (15)
magicblock-api/src/magic_validator.rsmagicblock-config/src/config/validator.rsmagicblock-core/src/link/transactions.rsmagicblock-ledger/src/blockstore_processor/mod.rsmagicblock-processor/src/executor/mod.rsmagicblock-processor/src/executor/processing.rsmagicblock-processor/src/scheduler/coordinator.rsmagicblock-processor/src/scheduler/mod.rsmagicblock-processor/src/scheduler/state.rsmagicblock-processor/src/scheduler/tests.rsmagicblock-processor/tests/replay.rsmagicblock-processor/tests/replica_ordering.rstest-integration/test-tools/src/toml_to_args.rstest-kit/Cargo.tomltest-kit/src/lib.rs
618b99c to
26235a1
Compare
936c1c0 to
1642077
Compare
There was a problem hiding this comment.
Actionable comments posted: 3
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
magicblock-processor/src/executor/processing.rs (1)
303-303:⚠️ Potential issue | 🟠 MajorRemove
.unwrap()from program cache write lock pathLine 303 uses
.unwrap()in production execution. A poisoned lock will panic and abort processing; this should be handled explicitly with error handling/recovery.As per coding guidelines
Treat any usage of .unwrap() or .expect() in production Rust code as a MAJOR issue.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@magicblock-processor/src/executor/processing.rs` at line 303, Replace the direct `.unwrap()` on the program cache write lock with explicit error handling: where the code obtains the write lock for the program cache (the call that currently ends with `.unwrap()`), match on the Result from the lock acquisition, handle Ok by proceeding with the write, and handle Err(PoisonError) by either recovering with `err.into_inner()` to continue or by returning/logging a descriptive Executor error (e.g., an error variant from the executor's error type) so the poison case does not panic; update the surrounding function to propagate the error (or log and abort gracefully) instead of calling `.unwrap()`.
♻️ Duplicate comments (1)
magicblock-processor/tests/replica_ordering.rs (1)
249-251:⚠️ Potential issue | 🟡 MinorIndependent branch still introduces avoidable conflicts.
At Line 250, both “independent” transactions in each batch (
i % 4 == 2andi % 4 == 3) map to the same account index, so they are not actually independent.♻️ Suggested fix
- let idx = (i / 4) % independent.len(); + let batch = i / 4; + let offset = if i % 4 == 2 { 0 } else { 1 }; + let idx = (batch * 2 + offset) % independent.len(); txs.push(tx_write(&mut env, independent[idx], i as u8));🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@magicblock-processor/tests/replica_ordering.rs` around lines 249 - 251, The two "independent" writes use the same index calculation causing collisions; change the index computation around the txs.push(tx_write(...)) call so the two independent cases map to different accounts (e.g. compute idx_a = (i/4) % independent.len() for the first independent slot and idx_b = ((i/4) + 1) % independent.len() for the second, or branch on i % 4 and assign different idx values) and pass the appropriate idx to tx_write; update the code that references independent, tx_write and the loop variable i accordingly so the i%4 == 2 and i%4 == 3 cases target distinct independent indices.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@magicblock-processor/src/executor/processing.rs`:
- Around line 93-95: The persist flag is inconsistently applied: currently
record_transaction is gated by persist but failure recording and status
notifications are always executed; update the code paths so every side-effect
respects persist.unwrap_or(true) — i.e., wrap or gate calls to
self.record_transaction(...), self.record_failure(...) (the failure-recording
call around lines ~190), and the transaction status notification emitter (the
emit/notify call around ~286) with the same persist check used at lines 93-95;
make the same change in the other two locations mentioned (around 174-191 and
258-287) so persist consistently suppresses ledger writes, failure records, and
notifications.
In `@magicblock-processor/src/scheduler/mod.rs`:
- Line 231: The current line uses self.program_cache.write().unwrap(), which
will panic on a poisoned lock; change it to explicitly handle the PoisonError by
matching the result of self.program_cache.write() and on Err(poisoned) log the
poison event (e.g., using error!/warn!/trace!) and recover by calling
poisoned.into_inner() to obtain the guard so the scheduler thread does not
abort; update the binding currently named cache to come from the matched
Ok(guard) or poisoned.into_inner() path and ensure any logging includes context
like "program_cache poisoned in scheduler" and the error details.
In `@test-kit/src/lib.rs`:
- Around line 364-373: The helper replay_transaction currently ignores the
persist parameter by constructing ReplayContext with persist: false; update the
ReplayContext initialization in replay_transaction to use the incoming persist
argument (i.e., persist: persist) so callers can exercise the persisted replay
path; locate the ReplayContext creation in the replay_transaction function and
replace the hardcoded false with the passed-in persist value.
---
Outside diff comments:
In `@magicblock-processor/src/executor/processing.rs`:
- Line 303: Replace the direct `.unwrap()` on the program cache write lock with
explicit error handling: where the code obtains the write lock for the program
cache (the call that currently ends with `.unwrap()`), match on the Result from
the lock acquisition, handle Ok by proceeding with the write, and handle
Err(PoisonError) by either recovering with `err.into_inner()` to continue or by
returning/logging a descriptive Executor error (e.g., an error variant from the
executor's error type) so the poison case does not panic; update the surrounding
function to propagate the error (or log and abort gracefully) instead of calling
`.unwrap()`.
---
Duplicate comments:
In `@magicblock-processor/tests/replica_ordering.rs`:
- Around line 249-251: The two "independent" writes use the same index
calculation causing collisions; change the index computation around the
txs.push(tx_write(...)) call so the two independent cases map to different
accounts (e.g. compute idx_a = (i/4) % independent.len() for the first
independent slot and idx_b = ((i/4) + 1) % independent.len() for the second, or
branch on i % 4 and assign different idx values) and pass the appropriate idx to
tx_write; update the code that references independent, tx_write and the loop
variable i accordingly so the i%4 == 2 and i%4 == 3 cases target distinct
independent indices.
ℹ️ Review info
Configuration used: Path: .coderabbit.yaml
Review profile: ASSERTIVE
Plan: Pro
⛔ Files ignored due to path filters (2)
Cargo.lockis excluded by!**/*.locktest-integration/Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (19)
magicblock-api/src/magic_validator.rsmagicblock-config/src/config/validator.rsmagicblock-core/src/link/transactions.rsmagicblock-ledger/src/blockstore_processor/mod.rsmagicblock-ledger/src/store/api.rsmagicblock-ledger/tests/common.rsmagicblock-ledger/tests/get_block.rsmagicblock-ledger/tests/test_ledger_truncator.rsmagicblock-processor/src/executor/mod.rsmagicblock-processor/src/executor/processing.rsmagicblock-processor/src/scheduler/coordinator.rsmagicblock-processor/src/scheduler/mod.rsmagicblock-processor/src/scheduler/state.rsmagicblock-processor/src/scheduler/tests.rsmagicblock-processor/tests/replay.rsmagicblock-processor/tests/replica_ordering.rstest-integration/test-tools/src/toml_to_args.rstest-kit/Cargo.tomltest-kit/src/lib.rs
💤 Files with no reviewable changes (1)
- test-integration/test-tools/src/toml_to_args.rs
1642077 to
69f7765
Compare
26235a1 to
b8d302a
Compare
Co-authored-by: Amp <amp@ampcode.com>
40fa0de to
ee85613
Compare
b8d302a to
1d366ad
Compare

Summary
Adds two modes to the transaction scheduler: primary and replica,
these modes affect how state is persisted and what transaction sources
are allowed.
Compatibility
Testing
Checklist
Summary by CodeRabbit
New Features
Tests