Skip to content

feat: dual mode transaction scheduler#1004

Open
bmuddha wants to merge 7 commits intobmuddha/transaction/original-bincodefrom
bmuddha/scheduler/dual-mode
Open

feat: dual mode transaction scheduler#1004
bmuddha wants to merge 7 commits intobmuddha/transaction/original-bincodefrom
bmuddha/scheduler/dual-mode

Conversation

@bmuddha
Copy link
Collaborator

@bmuddha bmuddha commented Feb 26, 2026

Summary

Adds two modes to the transaction scheduler: primary and replica,
these modes affect how state is persisted and what transaction sources
are allowed.

Compatibility

  • There're now two modes of scheduling, primary mode is the default, as it was before
  • Config change (describe): configuration now has an extra field to specify the replication mode

Testing

  • added more tests for the replication mode in the scheduler

Checklist

Summary by CodeRabbit

  • New Features

    • Validators now support Primary and Replica roles with runtime mode switching; schedulers can transition modes and will reject transactions invalid for the current mode.
    • Replay operations now accept a persist flag to control whether replays produce side effects.
    • Configuration adds a ReplicationMode setting to choose Primary or Replica.
  • Tests

    • Added extensive replica-mode ordering and coordination tests and helpers, plus test utilities to start in or switch to Primary mode.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Feb 26, 2026

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

The PR threads a tokio::sync::Notify-based mode_switcher from MagicValidator into TransactionSchedulerState and TransactionScheduler, adds a ReplicationMode config option, changes TransactionProcessingMode::Replay to carry a ReplayContext with a persist flag, introduces coordinator coordination modes (Primary/Replica) with runtime switching, enforces per-mode transaction acceptance, updates executor and ledger APIs to use IndexedTransaction and explicit per-transaction indices, and adds replica-mode tests and test-kit helpers. Files across validator, processor, ledger, core, tests, and test-kit are updated to propagate these changes.

Assessment against linked issues

Objective Addressed Explanation
Scheduler mode enum and runtime mode switch method; runtime mode switching without restart [#958]
Replay mode enforces strict ordering [#958] Coordinator enforces a single-pending transaction in Replica but I do not see explicit index-based ordering checks comparing incoming transaction indices to a maintained expected sequence.
Non-replay transactions rejected with warning in replay mode [#958]
Execution mode behavior unchanged [#958]

Out-of-scope changes

Code Change Explanation
Extensive Ledger API: removal of per-slot transaction index cache and change of write_transaction / write_transaction_status signatures to accept explicit index and return () (magicblock-ledger/src/store/api.rs and call sites) Index-management refactor affects ledger storage API and callers but is orthogonal to the scheduler mode/replay objective; not required by issue #958.
New large integration tests for replica ordering (magicblock-processor/tests/replica_ordering.rs) Adds comprehensive replica stress tests beyond the minimal acceptance criteria; these are test additions rather than required scheduler implementation changes.

Suggested reviewers

  • GabrielePicco
  • Dodecahedr0x
✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch bmuddha/scheduler/dual-mode

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Collaborator Author

bmuddha commented Feb 26, 2026

Warning

This pull request is not mergeable via GitHub because a downstack PR is open. Once all requirements are satisfied, merge this PR as a stack on Graphite.
Learn more

This stack of pull requests is managed by Graphite. Learn more about stacking.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 6

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
magicblock-core/src/link/transactions.rs (1)

78-89: ⚠️ Potential issue | 🟡 Minor

Fix stale TransactionProcessingMode documentation.

The enum-level comment still says each variant carries a one-shot sender, but Replay(bool) no longer does. This is now misleading for API consumers.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@magicblock-core/src/link/transactions.rs` around lines 78 - 89, The
enum-level doc for TransactionProcessingMode is stale: it claims "each variant
also carries the one-shot sender" but Replay(bool) no longer carries a sender;
update the documentation to accurately state which variants carry result senders
(Simulation and Execution) and that Replay only carries a bool controlling
ledger persistence. Also adjust the Replay variant doc to clearly explain the
bool semantics (true = record to ledger, false = no recording) and remove any
reference to one-shot senders from the enum-level comment.
magicblock-processor/tests/replay.rs (1)

93-113: ⚠️ Potential issue | 🟡 Minor

Validate “no notifications” after replay application is confirmed.

Because Line 94 only confirms enqueue success, the emptiness checks can run too early and miss delayed notifications. Assert post-replay state first, then check channels stay empty over the timeout window.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@magicblock-processor/tests/replay.rs` around lines 93 - 113, The test
currently checks that notifications channels are empty immediately after
enqueueing a replay (env.replay_transaction(false, txn)) which can race with
delayed notifications; instead, first verify post-replay state by checking
accounts via env.accountsdb.get_account for each pubkey (assert the data byte
equals 42) and only after confirming the replay applied, assert that
env.dispatch.transaction_status.recv_timeout(TIMEOUT) and
env.dispatch.account_update.try_recv() remain empty; reorder the checks so the
loop that inspects account state runs before the channel emptiness assertions
(references: replay_transaction, env.accountsdb.get_account,
env.dispatch.transaction_status, env.dispatch.account_update, TIMEOUT).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@magicblock-config/src/config/validator.rs`:
- Around line 22-29: Add equality derives to the ReplicationMode enum so it can
be compared in tests and config code: update the derive on ReplicationMode to
include PartialEq and Eq alongside Deserialize, Serialize, Debug, Clone; no
other API changes needed because Url already implements PartialEq/Eq, so
consumers can now use ==/!= with ReplicationMode.

In `@magicblock-core/src/link/transactions.rs`:
- Around line 269-275: The replay path is dropping any pre-encoded bytes by
calling txn.sanitize(true) and hardcoding ProcessableTransaction.encoded: None;
update the replay branch to call txn.sanitize_with_encoded(true) (or the
equivalent method that returns WithEncoded) and set
ProcessableTransaction.encoded to the encoded bytes returned by that call so
that TransactionProcessingMode::Replay preserves optional pre-encoded bytes;
adjust the assignment where ProcessableTransaction { transaction, mode, encoded
} is built to use the encoded value from sanitize_with_encoded.

In `@magicblock-processor/src/scheduler/coordinator.rs`:
- Around line 92-99: Replica readiness currently returns true if
pending.is_none(), which allows non-conflicting replays to run concurrently and
breaks primary ordering; change the Replica branch in is_ready (and the same
logic at the other occurrence) to require both pending.is_none() AND "no replay
in flight" (e.g., check m.in_flight_replay/is_empty or m.active_replay_count ==
0 or whatever field tracks current replay activity) so only a single replay can
be active at once; update Replica's readiness checks to reference the concrete
field names (pending and the in-flight/active replay tracker) in
ReplicaMode/Replica struct to enforce strict single‑flight replay ordering.
- Around line 217-219: The decrement of
CoordinationMode::Primary.p.blocked_txn_count using p.blocked_txn_count -=
txn.is_some() as usize can underflow after a Replica→Primary switch; change the
decrement to be safe by either checking p.blocked_txn_count > 0 before
subtracting or using a saturating/checked subtraction (e.g., saturating_sub)
whenever blocked_txn_count is modified (the same fix should be applied to the
other occurrences in the same block/branch around the 228–239 region) so that
blocked_txn_count never wraps below zero.
- Around line 245-255: The match in is_transaction_allowed currently blocks
Execution in Replica mode but still allows Simulation; tighten the Replica
branch so non-replay modes are rejected by matching both Execution and
Simulation. Update the match arm for Replica to something like matching
(Replica(_), Execution(_) | Simulation(_)) => false so Replica(_) only permits
Replay modes, leaving the Primary/Replay rule unchanged; reference function
is_transaction_allowed, CoordinationMode::Replica, and
TransactionProcessingMode::{Execution, Simulation}.

In `@magicblock-processor/tests/replica_ordering.rs`:
- Around line 120-140: The helper submit_all_and_start currently spawns
concurrent tasks that call scheduler.replay, making send order nondeterministic
and risking deadlock on bounded channels; change it to submit sequentially by
iterating txs and awaiting scheduler.replay(true, tx).await directly (no
tokio::spawn) so the order matches sigs, and if the scheduler consumes via
run_scheduler(), ensure run_scheduler() is started before or concurrently with
submission to avoid blocking—refer to submit_all_and_start and scheduler.replay
to locate and update the code.

---

Outside diff comments:
In `@magicblock-core/src/link/transactions.rs`:
- Around line 78-89: The enum-level doc for TransactionProcessingMode is stale:
it claims "each variant also carries the one-shot sender" but Replay(bool) no
longer carries a sender; update the documentation to accurately state which
variants carry result senders (Simulation and Execution) and that Replay only
carries a bool controlling ledger persistence. Also adjust the Replay variant
doc to clearly explain the bool semantics (true = record to ledger, false = no
recording) and remove any reference to one-shot senders from the enum-level
comment.

In `@magicblock-processor/tests/replay.rs`:
- Around line 93-113: The test currently checks that notifications channels are
empty immediately after enqueueing a replay (env.replay_transaction(false, txn))
which can race with delayed notifications; instead, first verify post-replay
state by checking accounts via env.accountsdb.get_account for each pubkey
(assert the data byte equals 42) and only after confirming the replay applied,
assert that env.dispatch.transaction_status.recv_timeout(TIMEOUT) and
env.dispatch.account_update.try_recv() remain empty; reorder the checks so the
loop that inspects account state runs before the channel emptiness assertions
(references: replay_transaction, env.accountsdb.get_account,
env.dispatch.transaction_status, env.dispatch.account_update, TIMEOUT).

ℹ️ Review info

Configuration used: Path: .coderabbit.yaml

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 9edc6c4 and 5237fb4.

⛔ Files ignored due to path filters (1)
  • Cargo.lock is excluded by !**/*.lock
📒 Files selected for processing (14)
  • magicblock-api/src/magic_validator.rs
  • magicblock-config/src/config/validator.rs
  • magicblock-core/src/link/transactions.rs
  • magicblock-ledger/src/blockstore_processor/mod.rs
  • magicblock-processor/src/executor/mod.rs
  • magicblock-processor/src/executor/processing.rs
  • magicblock-processor/src/scheduler/coordinator.rs
  • magicblock-processor/src/scheduler/mod.rs
  • magicblock-processor/src/scheduler/state.rs
  • magicblock-processor/src/scheduler/tests.rs
  • magicblock-processor/tests/replay.rs
  • magicblock-processor/tests/replica_ordering.rs
  • test-kit/Cargo.toml
  • test-kit/src/lib.rs

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

♻️ Duplicate comments (3)
magicblock-processor/tests/replica_ordering.rs (1)

120-136: ⚠️ Potential issue | 🟠 Major

Potential pre-start enqueue hang in submit_all_and_start.

On Line 127-Line 132, all replay() sends are awaited before run_scheduler() (Line 135). If replay uses a bounded queue and the consumer starts only after run_scheduler(), this can block the helper under stress.

Suggested adjustment
 async fn submit_all_and_start(
     env: &mut ExecutionTestEnv,
     txs: Vec<Transaction>,
 ) -> Vec<Signature> {
     let sigs: Vec<Signature> = txs.iter().map(|tx| tx.signatures[0]).collect();

-    // Submit all transactions sequentially to preserve order
+    // Start consumer first to avoid potential bounded-queue backpressure deadlock.
+    env.run_scheduler();
+    env.advance_slot();
+
+    // Submit sequentially to preserve deterministic order.
     for tx in txs {
         env.transaction_scheduler
             .replay(true, tx)
             .await
             .expect("Failed to submit transaction");
     }
-
-    // Now start the scheduler
-    env.run_scheduler();
-    env.advance_slot();

     sigs
 }
#!/bin/bash
set -euo pipefail

# Verify whether replay submission can block before scheduler startup.

# 1) Locate replay method definitions and inspect enqueue behavior.
ast-grep --pattern $'async fn replay($$$) -> $_ { $$$ }' || true
ast-grep --pattern $'fn replay($$$) -> $_ { $$$ }' || true

# 2) Find channel types/capacities used by scheduler/dispatch paths.
rg -n -C3 --type rust 'async_channel::bounded|async_channel::unbounded|tokio::sync::mpsc::channel|crossbeam_channel::bounded|crossbeam_channel::unbounded|flume::bounded|flume::unbounded'

# 3) Check where scheduler consumer is started relative to replay calls.
rg -n -C3 --type rust '\brun_scheduler\s*\('
rg -n -C3 --type rust '\.replay\s*\('

Expected verification outcome: if replay ultimately sends to a bounded channel and consumer activation occurs in/after run_scheduler(), this helper pattern remains vulnerable to blocking.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@magicblock-processor/tests/replica_ordering.rs` around lines 120 - 136, The
helper submit_all_and_start awaits transaction_scheduler.replay for each tx
before calling env.run_scheduler, which can block if replay enqueues onto a
bounded queue; fix by starting the consumer before flooding the queue or making
enqueues non-blocking: either call env.run_scheduler() (and env.advance_slot()
if needed) prior to looping over txs, or submit replays without awaiting (spawn
each replay future or use a try_send-style non-blocking API) so that
transaction_scheduler.replay calls cannot backpressure the test helper; update
submit_all_and_start accordingly to use run_scheduler or non-blocking replay
submission.
magicblock-processor/src/scheduler/coordinator.rs (1)

234-238: ⚠️ Potential issue | 🟠 Major

Initialize blocked_txn_count from existing queues during Replica→Primary switch.

Line [235] sets blocked_txn_count to 0. If any blocked entries exist at switch time, Primary backpressure accounting starts undercounted.

Suggested fix
         let mode = PrimaryMode {
-            blocked_txn_count: 0,
+            blocked_txn_count: self
+                .blocked_transactions
+                .iter()
+                .map(|queue| queue.len())
+                .sum(),
             max_blocked_txn: self.blocked_transactions.len()
                 * BLOCKED_TXN_MULTIPLIER,
         };
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@magicblock-processor/src/scheduler/coordinator.rs` around lines 234 - 238,
When constructing PrimaryMode during the Replica→Primary transition, initialize
blocked_txn_count from the existing blocked queue instead of zero: set
PrimaryMode.blocked_txn_count to reflect the current number of blocked entries
(e.g. based on self.blocked_transactions.len() or the appropriate aggregated
count of entries) so backpressure accounting starts correctly; leave
max_blocked_txn computed with BLOCKED_TXN_MULTIPLIER as-is. Ensure you update
the PrimaryMode initialization where PrimaryMode { blocked_txn_count: ...,
max_blocked_txn: ... } is created.
magicblock-core/src/link/transactions.rs (1)

273-279: 🧹 Nitpick | 🔵 Trivial

Preserve pre-encoded bytes in replay() submissions.

Line [274] uses sanitize(true) and Line [278] hardcodes encoded: None, so replay drops WithEncoded<T> bytes and forces redundant serialization later.

Suggested fix
     pub async fn replay(
         &self,
         persist: bool,
         txn: impl SanitizeableTransaction,
     ) -> TransactionResult {
         let mode = TransactionProcessingMode::Replay(persist);
-        let transaction = txn.sanitize(true)?;
+        let (transaction, encoded) = txn.sanitize_with_encoded(true)?;
         let txn = ProcessableTransaction {
             transaction,
             mode,
-            encoded: None,
+            encoded,
         };
         self.0
             .send(txn)
             .await
             .map_err(|_| TransactionError::ClusterMaintenance)
     }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@magicblock-core/src/link/transactions.rs` around lines 273 - 279, The replay
branch currently calls txn.sanitize(true) which strips pre-encoded bytes and
then hardcodes encoded: None on the new ProcessableTransaction, causing replay
to drop WithEncoded<T> bytes; instead preserve any existing encoded bytes by
calling sanitize in a way that does not remove the encoded payload (e.g.,
txn.sanitize(false)) and set ProcessableTransaction.encoded from the original
txn's encoded field (or clone/retain it via the original WithEncoded<T>
accessor) when creating the ProcessableTransaction for
TransactionProcessingMode::Replay so redundant serialization is avoided.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@magicblock-processor/src/scheduler/coordinator.rs`:
- Around line 242-256: The doc comment for is_transaction_allowed incorrectly
states that Primary accepts only Execution and Replica accepts only Replay, but
the match implementation also permits Simulation in both modes; update either
the comment or the logic to match: either revise the comment above
is_transaction_allowed to explicitly mention that Simulation transactions are
allowed in both CoordinationMode::Primary and CoordinationMode::Replica, or
change the match arms in is_transaction_allowed (referencing
CoordinationMode::Primary, CoordinationMode::Replica and
TransactionProcessingMode::{Execution, Replay, Simulation}) to disallow
Simulation where appropriate so policy and code are consistent.

In `@magicblock-processor/tests/replica_ordering.rs`:
- Around line 237-246: The "Independent" branch reuses the same independent
account for both non-conflict transactions in each 4-item batch, causing
unintended conflicts; in the loop (for i in 0..count) where tx_write is called
for group_a/group_b/independent, change how you compute idx for independent so
the two independent branches in a batch select distinct accounts (e.g., derive
idx from the batch number rather than i directly: use batch = i/4 and an offset
for the first vs second independent slot, or multiply batch by 2 and add 0/1
based on (i % 4) to pick different independent[(batch*2 + offset) %
independent.len()]); update the selection logic used in the tx_write calls so
independent accesses are unique per non-conflict transaction.

---

Duplicate comments:
In `@magicblock-core/src/link/transactions.rs`:
- Around line 273-279: The replay branch currently calls txn.sanitize(true)
which strips pre-encoded bytes and then hardcodes encoded: None on the new
ProcessableTransaction, causing replay to drop WithEncoded<T> bytes; instead
preserve any existing encoded bytes by calling sanitize in a way that does not
remove the encoded payload (e.g., txn.sanitize(false)) and set
ProcessableTransaction.encoded from the original txn's encoded field (or
clone/retain it via the original WithEncoded<T> accessor) when creating the
ProcessableTransaction for TransactionProcessingMode::Replay so redundant
serialization is avoided.

In `@magicblock-processor/src/scheduler/coordinator.rs`:
- Around line 234-238: When constructing PrimaryMode during the Replica→Primary
transition, initialize blocked_txn_count from the existing blocked queue instead
of zero: set PrimaryMode.blocked_txn_count to reflect the current number of
blocked entries (e.g. based on self.blocked_transactions.len() or the
appropriate aggregated count of entries) so backpressure accounting starts
correctly; leave max_blocked_txn computed with BLOCKED_TXN_MULTIPLIER as-is.
Ensure you update the PrimaryMode initialization where PrimaryMode {
blocked_txn_count: ..., max_blocked_txn: ... } is created.

In `@magicblock-processor/tests/replica_ordering.rs`:
- Around line 120-136: The helper submit_all_and_start awaits
transaction_scheduler.replay for each tx before calling env.run_scheduler, which
can block if replay enqueues onto a bounded queue; fix by starting the consumer
before flooding the queue or making enqueues non-blocking: either call
env.run_scheduler() (and env.advance_slot() if needed) prior to looping over
txs, or submit replays without awaiting (spawn each replay future or use a
try_send-style non-blocking API) so that transaction_scheduler.replay calls
cannot backpressure the test helper; update submit_all_and_start accordingly to
use run_scheduler or non-blocking replay submission.

ℹ️ Review info

Configuration used: Path: .coderabbit.yaml

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 5237fb4 and ce34538.

📒 Files selected for processing (3)
  • magicblock-core/src/link/transactions.rs
  • magicblock-processor/src/scheduler/coordinator.rs
  • magicblock-processor/tests/replica_ordering.rs

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@magicblock-processor/src/scheduler/coordinator.rs`:
- Around line 234-235: The initialization of PrimaryMode with blocked_txn_count:
0 assumes all executor blocked queues are empty at the Replica→Primary
transition; update this to be defensive by either (a) computing
blocked_txn_count from the actual queue lengths across executors (use the
executor/queue APIs available in the coordinator to sum blocked counts) or (b)
adding a runtime assertion that verifies all blocked queues are empty before
constructing PrimaryMode, or (c) expanding the doc comment on the transition
function to explicitly state the empty-queue invariant; reference PrimaryMode
and blocked_txn_count and the coordinator's executor queue accessors when making
the change so the check or computed initialization is placed at the mode-switch
site right after ledger replay completes.

ℹ️ Review info

Configuration used: Path: .coderabbit.yaml

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between ce34538 and 1acdff2.

📒 Files selected for processing (1)
  • magicblock-processor/src/scheduler/coordinator.rs

@bmuddha bmuddha force-pushed the bmuddha/scheduler/dual-mode branch from 1acdff2 to 6e3a51e Compare March 1, 2026 17:18
@bmuddha bmuddha force-pushed the bmuddha/transaction/original-bincode branch from 9edc6c4 to 378e32d Compare March 1, 2026 17:18
@bmuddha bmuddha force-pushed the bmuddha/scheduler/dual-mode branch from 6e3a51e to 936c1c0 Compare March 2, 2026 10:41
@bmuddha bmuddha force-pushed the bmuddha/transaction/original-bincode branch from 378e32d to 618b99c Compare March 2, 2026 10:41
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (3)
magicblock-processor/src/executor/processing.rs (1)

55-70: ⚠️ Potential issue | 🟠 Major

persist = Some(false) still triggers failure persistence side effects.

On error, execute() always routes to handle_failure(), and handle_failure() unconditionally records failure to ledger. That violates the stated replay-without-persist contract.

Suggested fix
-    fn handle_failure(
+    fn handle_failure(
         &self,
         txn: SanitizedTransaction,
         err: TransactionError,
         logs: Option<Vec<String>>,
         tx: TxnExecutionResultTx,
+        persist: Option<bool>,
     ) {
         FAILED_TRANSACTIONS_COUNT.inc();
-        self.record_failure(txn, Err(err.clone()), logs);
+        if persist.unwrap_or(true) {
+            self.record_failure(txn, Err(err.clone()), logs);
+        }

         // Even on failure, ensure stash is clear (though likely empty if load failed).
         ExecutionTlsStash::clear();

         if let Some(tx) = tx {
             let _ = tx.send(Err(err));
         }
     }
-                return self.handle_failure(txn, err, None, tx);
+                return self.handle_failure(txn, err, None, tx, persist);
...
-            return self.handle_failure(
+            return self.handle_failure(
                 txn,
                 TransactionError::CommitCancelled,
                 Some(vec![err.to_string()]),
                 tx,
+                persist,
             );

Also applies to: 163-172

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@magicblock-processor/src/executor/processing.rs` around lines 55 - 70, The
commit error path currently always calls handle_failure which records the
failure to the ledger; change the logic in execute() around the commit_accounts
call to respect the persist flag: use the existing notify/persist check
(persist.is_none()) and if persist == Some(false) (i.e. notify == false) do NOT
invoke handle_failure that writes to ledger—return or propagate a non-persistent
error instead (or call a non-persistent failure helper) so no ledger write
occurs; apply the same change for the other similar block around lines 163-172.
Refer to commit_accounts, execute(), handle_failure, persist and notify to
locate and fix the branches.
magicblock-processor/tests/replay.rs (1)

105-106: ⚠️ Potential issue | 🟡 Minor

Use a timed receive for the negative account-update assertion.

At Line 105, try_recv() can pass before a delayed update arrives, which makes this check flaky under load. Use recv_timeout(TIMEOUT) like the status assertion.

Suggested patch
     assert!(
-        env.dispatch.account_update.try_recv().is_err(),
+        env.dispatch.account_update.recv_timeout(TIMEOUT).is_err(),
         "Replay should NOT broadcast account updates"
     );
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@magicblock-processor/tests/replay.rs` around lines 105 - 106, The negative
assertion currently uses env.dispatch.account_update.try_recv() which can race
with delayed updates; replace it with a timed receive using
env.dispatch.account_update.recv_timeout(TIMEOUT) (the same TIMEOUT used for
status) and assert that this call returns a timeout (is_err/is_timeout) so the
test is no longer flaky under load; update the assertion near the existing
status assertion in replay.rs to use recv_timeout on the account_update
receiver.
magicblock-processor/src/scheduler/mod.rs (1)

131-137: ⚠️ Potential issue | 🔴 Critical

Pin the Notified future to avoid losing the Primary-mode switch signal.

At line 135, self.mode_switcher.notified() is recreated every loop iteration inside select!. With Tokio Notify, a wake-up from notify_one() can be lost if the Notified future is not yet polled when the notification occurs. Since the select! drops losing branches (including the notified() future) each iteration, a registration-timing window opens where notify_one() calls can pass unobserved, potentially leaving the scheduler stuck in Replica mode.

Suggested patch
 async fn run(mut self) {
     let mut block_produced = self.latest_block.subscribe();
+    let mut mode_switch_notified = self.mode_switcher.notified();
+    tokio::pin!(mode_switch_notified);
+    let mut switched_to_primary = false;
     loop {
         tokio::select! {
             biased;
             Ok(()) = block_produced.recv() => self.transition_to_new_slot(),
             Some(executor) = self.ready_rx.recv() => self.handle_ready_executor(executor),
-            _ = self.mode_switcher.notified() => {
+            _ = &mut mode_switch_notified, if !switched_to_primary => {
                 self.coordinator.switch_to_primary_mode();
+                switched_to_primary = true;
             }
             Some(txn) = self.transactions_rx.recv(), if self.coordinator.is_ready() => {
                 self.handle_new_transaction(txn);
             }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@magicblock-processor/src/scheduler/mod.rs` around lines 131 - 137, The
Notified future is created inside the tokio::select! each iteration and can be
lost between polls; fix by creating and pinning a single Notified future outside
the loop (e.g., let mut mode_switch_fut =
Box::pin(self.mode_switcher.notified())) and use mode_switch_fut.as_mut() in the
select! branch instead of recreating self.mode_switcher.notified() each time,
then after handling the notification (where you call
self.coordinator.switch_to_primary_mode()) recreate/re-pin a fresh notified
future so future notifications are not missed; apply this change where the loop
invokes tokio::select! alongside self.transition_to_new_slot() and
self.handle_ready_executor().
♻️ Duplicate comments (2)
magicblock-core/src/link/transactions.rs (1)

273-279: 🧹 Nitpick | 🔵 Trivial

Replay path still discards optional encoded bytes.

replay() still sanitizes via sanitize(true) and hardcodes encoded: None, so WithEncoded<T> bytes are dropped on replay.

Proposed adjustment
     pub async fn replay(
         &self,
         persist: bool,
         txn: impl SanitizeableTransaction,
     ) -> TransactionResult {
         let mode = TransactionProcessingMode::Replay(persist);
-        let transaction = txn.sanitize(true)?;
+        let (transaction, encoded) = txn.sanitize_with_encoded(true)?;
         let txn = ProcessableTransaction {
             transaction,
             mode,
-            encoded: None,
+            encoded,
         };
         self.0
             .send(txn)
             .await
             .map_err(|_| TransactionError::ClusterMaintenance)
     }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@magicblock-core/src/link/transactions.rs` around lines 273 - 279, The replay
path drops WithEncoded<T> bytes because it calls txn.sanitize(true) and always
sets ProcessableTransaction.encoded = None; instead, preserve and pass through
the optional encoded bytes: do not force-removal in replay (call sanitize(false)
or otherwise avoid stripping encoded) and set ProcessableTransaction.encoded to
the original txn's encoded bytes (or capture txn.encoded before sanitization) in
the replay branch (the code constructing ProcessableTransaction in replay()).
Ensure changes reference replay(), sanitize(...), ProcessableTransaction, and
WithEncoded<T> so encoded data is retained.
magicblock-processor/tests/replica_ordering.rs (1)

243-245: ⚠️ Potential issue | 🟡 Minor

Independent branch still reuses the same account twice per 4-item batch.

let idx = (i / 4) % independent.len() maps both i % 4 == 2 and i % 4 == 3 to the same index, so those two transactions are not independent as intended.

Suggested patch
     let mut txs = Vec::with_capacity(count);
+    let mut independent_idx = 0usize;
     for i in 0..count {
         env.advance_slot();
         match i % 4 {
             0 => txs.push(tx_write(&mut env, group_a[0], i as u8)),
             1 => txs.push(tx_write(&mut env, group_b[0], i as u8)),
             _ => {
                 // Independent writes to unique accounts
-                let idx = (i / 4) % independent.len();
+                let idx = independent_idx % independent.len();
+                independent_idx += 1;
                 txs.push(tx_write(&mut env, independent[idx], i as u8));
             }
         }
     }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@magicblock-processor/tests/replica_ordering.rs` around lines 243 - 245, The
index calculation for independent writes is wrong: replace the current idx = (i
/ 4) % independent.len() with an expression that uses the position within the
4-item batch (e.g., let batch_pos = i % 4; let idx = batch_pos %
independent.len()) so each of the four transactions in the batch maps to
different entries in independent; update the code around txs.push(tx_write(&mut
env, independent[idx], i as u8)) accordingly to use the new idx computation.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@test-integration/test-tools/src/toml_to_args.rs`:
- Around line 106-107: The loop currently uses entries.flatten() which silently
drops Err values from read_dir; instead iterate raw entries (for entry in
entries) and pattern-match each Result so that Ok(entry) prints the file name
and Err(err) logs the per-entry error (including the path/context) via eprintln!
(or the existing logger) to surface filesystem failures; update the loop around
the entries variable in toml_to_args.rs to handle Result::Err cases rather than
flattening them away.

---

Outside diff comments:
In `@magicblock-processor/src/executor/processing.rs`:
- Around line 55-70: The commit error path currently always calls handle_failure
which records the failure to the ledger; change the logic in execute() around
the commit_accounts call to respect the persist flag: use the existing
notify/persist check (persist.is_none()) and if persist == Some(false) (i.e.
notify == false) do NOT invoke handle_failure that writes to ledger—return or
propagate a non-persistent error instead (or call a non-persistent failure
helper) so no ledger write occurs; apply the same change for the other similar
block around lines 163-172. Refer to commit_accounts, execute(), handle_failure,
persist and notify to locate and fix the branches.

In `@magicblock-processor/src/scheduler/mod.rs`:
- Around line 131-137: The Notified future is created inside the tokio::select!
each iteration and can be lost between polls; fix by creating and pinning a
single Notified future outside the loop (e.g., let mut mode_switch_fut =
Box::pin(self.mode_switcher.notified())) and use mode_switch_fut.as_mut() in the
select! branch instead of recreating self.mode_switcher.notified() each time,
then after handling the notification (where you call
self.coordinator.switch_to_primary_mode()) recreate/re-pin a fresh notified
future so future notifications are not missed; apply this change where the loop
invokes tokio::select! alongside self.transition_to_new_slot() and
self.handle_ready_executor().

In `@magicblock-processor/tests/replay.rs`:
- Around line 105-106: The negative assertion currently uses
env.dispatch.account_update.try_recv() which can race with delayed updates;
replace it with a timed receive using
env.dispatch.account_update.recv_timeout(TIMEOUT) (the same TIMEOUT used for
status) and assert that this call returns a timeout (is_err/is_timeout) so the
test is no longer flaky under load; update the assertion near the existing
status assertion in replay.rs to use recv_timeout on the account_update
receiver.

---

Duplicate comments:
In `@magicblock-core/src/link/transactions.rs`:
- Around line 273-279: The replay path drops WithEncoded<T> bytes because it
calls txn.sanitize(true) and always sets ProcessableTransaction.encoded = None;
instead, preserve and pass through the optional encoded bytes: do not
force-removal in replay (call sanitize(false) or otherwise avoid stripping
encoded) and set ProcessableTransaction.encoded to the original txn's encoded
bytes (or capture txn.encoded before sanitization) in the replay branch (the
code constructing ProcessableTransaction in replay()). Ensure changes reference
replay(), sanitize(...), ProcessableTransaction, and WithEncoded<T> so encoded
data is retained.

In `@magicblock-processor/tests/replica_ordering.rs`:
- Around line 243-245: The index calculation for independent writes is wrong:
replace the current idx = (i / 4) % independent.len() with an expression that
uses the position within the 4-item batch (e.g., let batch_pos = i % 4; let idx
= batch_pos % independent.len()) so each of the four transactions in the batch
maps to different entries in independent; update the code around
txs.push(tx_write(&mut env, independent[idx], i as u8)) accordingly to use the
new idx computation.

ℹ️ Review info

Configuration used: Path: .coderabbit.yaml

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 1acdff2 and 936c1c0.

⛔ Files ignored due to path filters (2)
  • Cargo.lock is excluded by !**/*.lock
  • test-integration/Cargo.lock is excluded by !**/*.lock
📒 Files selected for processing (15)
  • magicblock-api/src/magic_validator.rs
  • magicblock-config/src/config/validator.rs
  • magicblock-core/src/link/transactions.rs
  • magicblock-ledger/src/blockstore_processor/mod.rs
  • magicblock-processor/src/executor/mod.rs
  • magicblock-processor/src/executor/processing.rs
  • magicblock-processor/src/scheduler/coordinator.rs
  • magicblock-processor/src/scheduler/mod.rs
  • magicblock-processor/src/scheduler/state.rs
  • magicblock-processor/src/scheduler/tests.rs
  • magicblock-processor/tests/replay.rs
  • magicblock-processor/tests/replica_ordering.rs
  • test-integration/test-tools/src/toml_to_args.rs
  • test-kit/Cargo.toml
  • test-kit/src/lib.rs

@bmuddha bmuddha force-pushed the bmuddha/transaction/original-bincode branch from 618b99c to 26235a1 Compare March 3, 2026 18:28
@bmuddha bmuddha force-pushed the bmuddha/scheduler/dual-mode branch from 936c1c0 to 1642077 Compare March 3, 2026 18:28
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
magicblock-processor/src/executor/processing.rs (1)

303-303: ⚠️ Potential issue | 🟠 Major

Remove .unwrap() from program cache write lock path

Line 303 uses .unwrap() in production execution. A poisoned lock will panic and abort processing; this should be handled explicitly with error handling/recovery.

As per coding guidelines Treat any usage of .unwrap() or .expect() in production Rust code as a MAJOR issue.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@magicblock-processor/src/executor/processing.rs` at line 303, Replace the
direct `.unwrap()` on the program cache write lock with explicit error handling:
where the code obtains the write lock for the program cache (the call that
currently ends with `.unwrap()`), match on the Result from the lock acquisition,
handle Ok by proceeding with the write, and handle Err(PoisonError) by either
recovering with `err.into_inner()` to continue or by returning/logging a
descriptive Executor error (e.g., an error variant from the executor's error
type) so the poison case does not panic; update the surrounding function to
propagate the error (or log and abort gracefully) instead of calling
`.unwrap()`.
♻️ Duplicate comments (1)
magicblock-processor/tests/replica_ordering.rs (1)

249-251: ⚠️ Potential issue | 🟡 Minor

Independent branch still introduces avoidable conflicts.

At Line 250, both “independent” transactions in each batch (i % 4 == 2 and i % 4 == 3) map to the same account index, so they are not actually independent.

♻️ Suggested fix
-                let idx = (i / 4) % independent.len();
+                let batch = i / 4;
+                let offset = if i % 4 == 2 { 0 } else { 1 };
+                let idx = (batch * 2 + offset) % independent.len();
                 txs.push(tx_write(&mut env, independent[idx], i as u8));
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@magicblock-processor/tests/replica_ordering.rs` around lines 249 - 251, The
two "independent" writes use the same index calculation causing collisions;
change the index computation around the txs.push(tx_write(...)) call so the two
independent cases map to different accounts (e.g. compute idx_a = (i/4) %
independent.len() for the first independent slot and idx_b = ((i/4) + 1) %
independent.len() for the second, or branch on i % 4 and assign different idx
values) and pass the appropriate idx to tx_write; update the code that
references independent, tx_write and the loop variable i accordingly so the i%4
== 2 and i%4 == 3 cases target distinct independent indices.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@magicblock-processor/src/executor/processing.rs`:
- Around line 93-95: The persist flag is inconsistently applied: currently
record_transaction is gated by persist but failure recording and status
notifications are always executed; update the code paths so every side-effect
respects persist.unwrap_or(true) — i.e., wrap or gate calls to
self.record_transaction(...), self.record_failure(...) (the failure-recording
call around lines ~190), and the transaction status notification emitter (the
emit/notify call around ~286) with the same persist check used at lines 93-95;
make the same change in the other two locations mentioned (around 174-191 and
258-287) so persist consistently suppresses ledger writes, failure records, and
notifications.

In `@magicblock-processor/src/scheduler/mod.rs`:
- Line 231: The current line uses self.program_cache.write().unwrap(), which
will panic on a poisoned lock; change it to explicitly handle the PoisonError by
matching the result of self.program_cache.write() and on Err(poisoned) log the
poison event (e.g., using error!/warn!/trace!) and recover by calling
poisoned.into_inner() to obtain the guard so the scheduler thread does not
abort; update the binding currently named cache to come from the matched
Ok(guard) or poisoned.into_inner() path and ensure any logging includes context
like "program_cache poisoned in scheduler" and the error details.

In `@test-kit/src/lib.rs`:
- Around line 364-373: The helper replay_transaction currently ignores the
persist parameter by constructing ReplayContext with persist: false; update the
ReplayContext initialization in replay_transaction to use the incoming persist
argument (i.e., persist: persist) so callers can exercise the persisted replay
path; locate the ReplayContext creation in the replay_transaction function and
replace the hardcoded false with the passed-in persist value.

---

Outside diff comments:
In `@magicblock-processor/src/executor/processing.rs`:
- Line 303: Replace the direct `.unwrap()` on the program cache write lock with
explicit error handling: where the code obtains the write lock for the program
cache (the call that currently ends with `.unwrap()`), match on the Result from
the lock acquisition, handle Ok by proceeding with the write, and handle
Err(PoisonError) by either recovering with `err.into_inner()` to continue or by
returning/logging a descriptive Executor error (e.g., an error variant from the
executor's error type) so the poison case does not panic; update the surrounding
function to propagate the error (or log and abort gracefully) instead of calling
`.unwrap()`.

---

Duplicate comments:
In `@magicblock-processor/tests/replica_ordering.rs`:
- Around line 249-251: The two "independent" writes use the same index
calculation causing collisions; change the index computation around the
txs.push(tx_write(...)) call so the two independent cases map to different
accounts (e.g. compute idx_a = (i/4) % independent.len() for the first
independent slot and idx_b = ((i/4) + 1) % independent.len() for the second, or
branch on i % 4 and assign different idx values) and pass the appropriate idx to
tx_write; update the code that references independent, tx_write and the loop
variable i accordingly so the i%4 == 2 and i%4 == 3 cases target distinct
independent indices.

ℹ️ Review info

Configuration used: Path: .coderabbit.yaml

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 936c1c0 and 1642077.

⛔ Files ignored due to path filters (2)
  • Cargo.lock is excluded by !**/*.lock
  • test-integration/Cargo.lock is excluded by !**/*.lock
📒 Files selected for processing (19)
  • magicblock-api/src/magic_validator.rs
  • magicblock-config/src/config/validator.rs
  • magicblock-core/src/link/transactions.rs
  • magicblock-ledger/src/blockstore_processor/mod.rs
  • magicblock-ledger/src/store/api.rs
  • magicblock-ledger/tests/common.rs
  • magicblock-ledger/tests/get_block.rs
  • magicblock-ledger/tests/test_ledger_truncator.rs
  • magicblock-processor/src/executor/mod.rs
  • magicblock-processor/src/executor/processing.rs
  • magicblock-processor/src/scheduler/coordinator.rs
  • magicblock-processor/src/scheduler/mod.rs
  • magicblock-processor/src/scheduler/state.rs
  • magicblock-processor/src/scheduler/tests.rs
  • magicblock-processor/tests/replay.rs
  • magicblock-processor/tests/replica_ordering.rs
  • test-integration/test-tools/src/toml_to_args.rs
  • test-kit/Cargo.toml
  • test-kit/src/lib.rs
💤 Files with no reviewable changes (1)
  • test-integration/test-tools/src/toml_to_args.rs

@bmuddha bmuddha force-pushed the bmuddha/scheduler/dual-mode branch from 1642077 to 69f7765 Compare March 4, 2026 12:00
@bmuddha bmuddha force-pushed the bmuddha/transaction/original-bincode branch from 26235a1 to b8d302a Compare March 4, 2026 12:00
@bmuddha bmuddha marked this pull request as ready for review March 5, 2026 10:04
@bmuddha bmuddha force-pushed the bmuddha/scheduler/dual-mode branch from 40fa0de to ee85613 Compare March 5, 2026 16:55
@bmuddha bmuddha force-pushed the bmuddha/transaction/original-bincode branch from b8d302a to 1d366ad Compare March 5, 2026 16:55
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants