Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ members = [
"magicblock-ledger",
"magicblock-metrics",
"magicblock-processor",
"magicblock-replicator",
"magicblock-rpc-client",
"magicblock-table-mania",
"magicblock-task-scheduler",
Expand Down Expand Up @@ -54,6 +55,7 @@ assert_matches = "1.5.0"
async-trait = "0.1.77"
base64 = "0.21.7"
bincode = "1.3.3"
bytes = "1.0"
borsh = { version = "1.5.1", features = ["derive", "unstable__schema"] }
bs58 = "0.5.1"
byteorder = "1.5.0"
Expand Down
24 changes: 24 additions & 0 deletions magicblock-replicator/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
[package]
name = "magicblock-replicator"
version.workspace = true
authors.workspace = true
repository.workspace = true
homepage.workspace = true
license.workspace = true
edition.workspace = true

[dependencies]
bincode = { workspace = true }
bytes = { workspace = true }
futures = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true, features = ["net", "rt", "macros"] }
tokio-stream = { workspace = true }
tokio-util = { workspace = true, features = ["codec"] }
serde = { workspace = true, features = ["derive"] }
solana-hash = { workspace = true, features = ["serde"] }
solana-keypair = { workspace = true }
solana-pubkey = { workspace = true, features = ["serde"] }
solana-signature = { workspace = true, features = ["serde"] }
solana-signer = { workspace = true }
solana-transaction = { workspace = true, features = ["serde"] }
104 changes: 104 additions & 0 deletions magicblock-replicator/src/connection.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
//! Codec and stream types for length-prefixed bincode framing.

use std::{
pin::Pin,
task::{Context, Poll},
};

use bytes::{BufMut, BytesMut};
use futures::{SinkExt, Stream, StreamExt};
use tokio::io::{AsyncRead, AsyncWrite};
use tokio_util::codec::{
Encoder, FramedRead, FramedWrite, LengthDelimitedCodec,
};

use crate::{
error::{Error, Result},
proto::Message,
};

/// Encodes `Message` with 4-byte LE length prefix.
pub(crate) struct MessageEncoder;

pub(crate) type InputStream<IO> = FramedRead<IO, LengthDelimitedCodec>;
pub(crate) type OutputStream<IO> = FramedWrite<IO, MessageEncoder>;

const MAX_FRAME_SIZE: usize = 64 * 1024 * 1024;

impl Encoder<&Message> for MessageEncoder {
type Error = Error;

fn encode(&mut self, msg: &Message, dst: &mut BytesMut) -> Result<()> {
let start = dst.len();
dst.put_u32_le(0);
bincode::serialize_into(dst.writer(), msg)?;
let len = dst.len() - start - 4;
if len > MAX_FRAME_SIZE {
dst.truncate(start);
return Err(Box::new(bincode::ErrorKind::SizeLimit))?;
}
dst[start..start + 4].copy_from_slice(&(len as u32).to_le_bytes());
Ok(())
}
}

/// Receives messages from an async stream (max frame: 64MB).
pub struct Receiver<IO> {
inner: InputStream<IO>,
}

impl<IO: AsyncRead + Unpin> Receiver<IO> {
pub fn new(io: IO) -> Self {
let inner = LengthDelimitedCodec::builder()
.little_endian()
.max_frame_length(MAX_FRAME_SIZE)
.length_field_type::<u32>()
.new_read(io);
Self { inner }
}

pub async fn recv(&mut self) -> Result<Message> {
let frame =
self.inner.next().await.ok_or(Error::ConnectionClosed)??;
bincode::deserialize(&frame).map_err(Into::into)
}
}

impl<IO: AsyncRead + Unpin> Stream for Receiver<IO> {
type Item = Result<Message>;

fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
match self.inner.poll_next_unpin(cx) {
Poll::Ready(Some(Ok(item))) => {
let result = bincode::deserialize(&item).map_err(Into::into);
Poll::Ready(Some(result))
}
Poll::Ready(Some(Err(err))) => Poll::Ready(Some(Err(err.into()))),
Poll::Ready(None) => {
Poll::Ready(Some(Err(Error::ConnectionClosed)))
}
Comment on lines +80 to +82
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

find . -name "connection.rs" -path "*/magicblock-replicator/*" | head -5

Repository: magicblock-labs/magicblock-validator

Length of output: 121


🏁 Script executed:

git ls-files | grep -E "(magicblock-replicator|connection\.rs)" | head -20

Repository: magicblock-labs/magicblock-validator

Length of output: 448


🏁 Script executed:

# Get the full path and check if it exists
fd -t f "connection.rs" | grep replicator

Repository: magicblock-labs/magicblock-validator

Length of output: 119


🏁 Script executed:

cat -n magicblock-replicator/src/connection.rs | head -100

Repository: magicblock-labs/magicblock-validator

Length of output: 3624


🏁 Script executed:

rg "Receiver<" magicblock-replicator/src/ -A 3 -B 1

Repository: magicblock-labs/magicblock-validator

Length of output: 1355


🏁 Script executed:

# Search for consumers of the Stream implementation
rg "while let Some" magicblock-replicator/src/ -A 2 -B 1

Repository: magicblock-labs/magicblock-validator

Length of output: 62


🏁 Script executed:

# Check tests to see expected behavior
cat -n magicblock-replicator/src/tests.rs | head -100

Repository: magicblock-labs/magicblock-validator

Length of output: 3722


🏁 Script executed:

rg "select\!|select|while.*stream|next\(\)" magicblock-replicator/src/ -A 2 -B 1

Repository: magicblock-labs/magicblock-validator

Length of output: 190


🏁 Script executed:

# Search for actual usage of Receiver in business logic
rg "Receiver" magicblock-replicator/src/ --type rust -A 5 -B 1 | head -60

Repository: magicblock-labs/magicblock-validator

Length of output: 2404


🏁 Script executed:

# Look for patterns that might use this stream
rg "poll_next|StreamExt" magicblock-replicator/src/ -A 2 -B 1

Repository: magicblock-labs/magicblock-validator

Length of output: 990


🏁 Script executed:

# Search for where TcpReceiver is used
rg "TcpReceiver|receiver" magicblock-replicator/src/ -A 3 -B 1 | grep -v "Binary"

Repository: magicblock-labs/magicblock-validator

Length of output: 1035


🏁 Script executed:

# Look for actual message consumption patterns
find magicblock-replicator -name "*.rs" -exec cat -n {} \; | grep -A 10 "recv\|next"

Repository: magicblock-labs/magicblock-validator

Length of output: 3171


🏁 Script executed:

# Search across entire codebase for Stream consumer patterns with Receiver
rg "\.next\(\)|select\!" --type rust -A 2 -B 1 | head -100

Repository: magicblock-labs/magicblock-validator

Length of output: 182


🏁 Script executed:

# Search for Stream usage patterns - simpler regex
rg "\.next\(\)" --type rust -A 2 -B 1 | head -100

Repository: magicblock-labs/magicblock-validator

Length of output: 8406


🏁 Script executed:

# Search for select! macro usage
rg "select!" --type rust -A 3 -B 1 | head -100

Repository: magicblock-labs/magicblock-validator

Length of output: 7810


🏁 Script executed:

# Check if the Stream trait is actually used anywhere (not just recv())
rg "for.*in.*receiver|while.*receiver|stream\." magicblock-replicator/src/ -A 2 -B 1

Repository: magicblock-labs/magicblock-validator

Length of output: 612


🏁 Script executed:

# Look at the error enum to understand ConnectionClosed
cat -n magicblock-replicator/src/error.rs

Repository: magicblock-labs/magicblock-validator

Length of output: 559


🏁 Script executed:

# Check if there are any actual consumers of the Stream trait on Receiver
rg "stream\(" magicblock-replicator/src/ -A 3 -B 1

Repository: magicblock-labs/magicblock-validator

Length of output: 496


🏁 Script executed:

# Look for any use of StreamExt methods on Receiver types
rg "StreamExt|\.take\(|\.filter|\.map.*receiver" magicblock-replicator/src/ -A 2 -B 1

Repository: magicblock-labs/magicblock-validator

Length of output: 378


🌐 Web query:

Rust futures Stream trait poll_next EOF None contract semantics

💡 Result:

In Rust’s futures::Stream, poll_next returns Poll<Option<Item>>, where Poll::Ready(None) is the “end of stream” / exhausted signal (analogous to iterator EOF). (docs.rs)

Contract after Ready(None)

  • The trait contract says: once poll_next returns Poll::Ready(None), the stream is terminated and poll_next should not be invoked again. (docs.rs)
  • If you do poll again anyway, the Stream trait does not define what happens (it may panic, block forever, or even produce more items), but it still must not cause Rust UB. (docs.rs)

How to get “EOF stays EOF” semantics

  • Use StreamExt::fuse() / Fuse, which guarantees that after the underlying stream ends, subsequent polls keep returning None forever (well-defined post-EOF behavior). (people.eecs.berkeley.edu)
  • A “fused” stream is often described/marked via FusedStream (and you can check is_terminated()), which is specifically about tracking when the stream should no longer be polled. (docs.rs)

Important caveat

Some types implement Stream but document that None can mean “currently empty” rather than “permanently finished” (e.g., FuturesOrdered can return Ready(None) when it has no futures, then later produce items after you push more). In those cases, treat None according to that type’s docs, or wrap in fuse() if you need strict EOF semantics. (docs.subsocial.network)

Citations:


Receiver stream must return None on EOF to comply with Stream trait contract.

Returning Some(Err(ConnectionClosed)) when the inner stream ends violates the Stream trait contract, which requires poll_next to signal termination with Ready(None). This prevents proper stream termination and can cause while let Some(...) consumers to loop indefinitely on disconnect.

♻️ Proposed fix
-            Poll::Ready(None) => {
-                Poll::Ready(Some(Err(Error::ConnectionClosed)))
-            }
+            Poll::Ready(None) => Poll::Ready(None),
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@magicblock-replicator/src/connection.rs` around lines 80 - 82, The Receiver
stream currently maps the inner stream's Poll::Ready(None) to
Poll::Ready(Some(Err(Error::ConnectionClosed))), violating Stream's contract;
update the poll_next implementation in the Receiver (the impl Stream for
Receiver / poll_next method) so that when the inner stream returns
Poll::Ready(None) it returns Poll::Ready(None) (i.e., signal EOF) instead of
emitting a ConnectionClosed error, preserving proper termination semantics for
consumers.

Poll::Pending => Poll::Pending,
}
}
}

/// Sends messages to an async stream.
pub struct Sender<IO> {
inner: OutputStream<IO>,
}

impl<IO: AsyncWrite + Unpin> Sender<IO> {
pub fn new(io: IO) -> Self {
Self {
inner: FramedWrite::new(io, MessageEncoder),
}
}

pub async fn send(&mut self, msg: &Message) -> Result<()> {
self.inner.send(msg).await?;
Ok(())
}
}
14 changes: 14 additions & 0 deletions magicblock-replicator/src/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
//! Error types for the replication protocol.

/// Replication operation errors.
#[derive(thiserror::Error, Debug)]
pub enum Error {
#[error("connection closed")]
ConnectionClosed,
#[error("IO error: {0}")]
Io(#[from] std::io::Error),
#[error("serialization error: {0}")]
SerDe(#[from] bincode::Error),
}

pub type Result<T> = std::result::Result<T, Error>;
14 changes: 14 additions & 0 deletions magicblock-replicator/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
//! State replication protocol for streaming transactions from primary to standby nodes.
//!
//! Messages are length-prefixed (4B LE) + bincode payload.

pub mod connection;
pub mod error;
pub mod proto;
pub mod tcp;

#[cfg(test)]
mod tests;

pub use error::{Error, Result};
pub use proto::{Message, PROTOCOL_VERSION};
146 changes: 146 additions & 0 deletions magicblock-replicator/src/proto.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
//! Protocol message types for replication.
//!
//! Wire format: 4-byte LE length prefix + bincode payload.
//! Bincode encodes enum variant index as implicit type tag.

Comment on lines +3 to +5
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

🧩 Analysis chain

🏁 Script executed:

cat -n magicblock-replicator/src/proto.rs | head -40

Repository: magicblock-labs/magicblock-validator

Length of output: 1450


🏁 Script executed:

rg -A 5 "bincode|serialize|deserialize" magicblock-replicator/src/proto.rs

Repository: magicblock-labs/magicblock-validator

Length of output: 1766


🏁 Script executed:

rg "Message::" magicblock-replicator/src/ -A 3 | head -50

Repository: magicblock-labs/magicblock-validator

Length of output: 3044


🏁 Script executed:

find magicblock-replicator/src -name "*.rs" | xargs grep -l "bincode\|serialize" | head -10

Repository: magicblock-labs/magicblock-validator

Length of output: 257


🏁 Script executed:

cat -n magicblock-replicator/src/connection.rs

Repository: magicblock-labs/magicblock-validator

Length of output: 3712


Consider adding explicit message type identifier to wire format instead of relying on bincode's implicit enum discriminants.

The current implementation uses bincode's implicit enum encoding for the Message type. Adding an explicit u8 type byte before the serialized payload would:

  • Decouple the protocol from bincode's internal enum representation
  • Enable clearer message type identification during debugging and protocol analysis
  • Improve forward compatibility if serializer implementation changes

This is an architectural improvement worth considering, though the current approach works reliably with bincode.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@magicblock-replicator/src/proto.rs` around lines 3 - 5, The protocol
currently relies on bincode's implicit enum discriminants for Message; change
the wire format to prefix an explicit u8 message-type byte before the bincode
payload and keep the existing 4-byte LE length prefix logic. Update
serde/serialization code paths that read/write messages (the encoder/decoder
that handle the 4-byte length and bincode(Message)) to: write length = 1 +
payload_len, then write a single u8 type tag mapped from the Message enum
variants, followed by the bincode payload; and on read, use the length to read
the frame, read the u8 type tag first and dispatch/deserialize the remainder
into the corresponding Message variant. Ensure the Message-to-u8 mapping and the
inverse dispatch logic are defined (e.g., a small match on Message variants and
a match to construct/deserialize variants) so the wire format no longer depends
on bincode's internal discriminant ordering.

use serde::{Deserialize, Serialize};
use solana_hash::Hash;
use solana_keypair::Keypair;
use solana_pubkey::Pubkey;
use solana_signature::Signature;
use solana_signer::Signer;
use solana_transaction::versioned::VersionedTransaction;

use crate::error::Result;

pub type Slot = u64;
pub type TxIndex = u32;

pub const PROTOCOL_VERSION: u32 = 1;

/// Top-level replication message.
#[derive(Deserialize, Serialize, Clone, Debug)]
pub enum Message {
HandshakeReq(HandshakeRequest),
HandshakeResp(HandshakeResponse),
Transaction(Transaction),
Block(Block),
SuperBlock(SuperBlock),
Failover(FailoverSignal),
}
Comment on lines +23 to +30
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Heartbeat is missing from the protocol message set.

The linked objective for issue #951 requires Heartbeat as a defined message type. The current enum omits it and introduces Failover instead, which makes the implementation diverge from the stated protocol contract.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@magicblock-replicator/src/proto.rs` around lines 23 - 30, The Message enum is
missing the required Heartbeat variant and currently contains Failover which
diverges from the protocol; update the enum definition (Message) to include
Heartbeat(Heartbeat) and remove the Failover(FailoverSignal) variant unless
Failover is part of the spec, and ensure the corresponding Heartbeat type/name
is imported/defined so the protocol matches the required contract.


/// Client -> Server: initiate replication session.
/// Authenticated via Ed25519 signature over `start_slot`.
#[derive(Deserialize, Serialize, Clone, Debug)]
pub struct HandshakeRequest {
pub version: u32,
pub start_slot: Slot,
pub identity: Pubkey,
signature: Signature,
}

/// Server -> Client: accept or reject session.
/// Signed over `slot` (success) or error message (failure).
#[derive(Deserialize, Serialize, Clone, Debug)]
pub struct HandshakeResponse {
pub result: std::result::Result<Slot, String>,
pub identity: Pubkey,
signature: Signature,
}

/// Slot boundary marker with blockhash.
#[derive(Deserialize, Serialize, Clone, Debug)]
pub struct Block {
pub slot: Slot,
pub hash: Hash,
}

/// Transaction with slot and ordinal position.
#[derive(Deserialize, Serialize, Clone, Debug)]
pub struct Transaction {
pub slot: Slot,
pub index: TxIndex,
/// Bincode-encoded `VersionedTransaction`.
pub payload: Vec<u8>,
}

/// Periodic checkpoint for state verification.
#[derive(Deserialize, Serialize, Clone, Debug)]
pub struct SuperBlock {
pub blocks: u64,
pub transactions: u64,
pub checksum: u64,
}

/// Primary -> Standby: signal controlled failover.
#[derive(Deserialize, Serialize, Clone, Debug)]
pub struct FailoverSignal {
pub slot: Slot,
signature: Signature,
}

impl HandshakeRequest {
pub fn new(start_slot: Slot, keypair: &Keypair) -> Self {
Self {
version: PROTOCOL_VERSION,
start_slot,
identity: keypair.pubkey(),
signature: keypair.sign_message(&start_slot.to_le_bytes()),
}
}

/// Verifies signature matches claimed identity.
pub fn verify(&self) -> bool {
self.signature
.verify(self.identity.as_array(), &self.start_slot.to_le_bytes())
}
}

impl HandshakeResponse {
pub fn new(result: Result<Slot>, keypair: &Keypair) -> Self {
let result = result.map_err(|e| e.to_string());
let signature = match &result {
Ok(slot) => keypair.sign_message(&slot.to_le_bytes()),
Err(err) => keypair.sign_message(err.as_bytes()),
};
Self {
result,
identity: keypair.pubkey(),
signature,
}
}

/// Verifies signature matches server identity.
pub fn verify(&self) -> bool {
match &self.result {
Ok(slot) => self
.signature
.verify(self.identity.as_array(), &slot.to_le_bytes()),
Err(err) => self
.signature
.verify(self.identity.as_array(), err.as_bytes()),
}
}
}

impl Transaction {
/// Deserializes the inner transaction.
pub fn decode(&self) -> bincode::Result<VersionedTransaction> {
bincode::deserialize(&self.payload)
}
}

impl FailoverSignal {
pub fn new(slot: Slot, keypair: &Keypair) -> Self {
Self {
slot,
signature: keypair.sign_message(&slot.to_le_bytes()),
}
}

/// Verifies signal against expected identity.
pub fn verify(&self, identity: Pubkey) -> bool {
self.signature
.verify(identity.as_array(), &self.slot.to_le_bytes())
}
}
24 changes: 24 additions & 0 deletions magicblock-replicator/src/tcp.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
//! TCP transport utilities.

use std::{io, net::SocketAddr};

use tokio::net::{
tcp::{OwnedReadHalf, OwnedWriteHalf},
TcpStream,
};

use crate::connection::{Receiver, Sender};

pub type TcpReceiver = Receiver<OwnedReadHalf>;
pub type TcpSender = Sender<OwnedWriteHalf>;

/// Connects to a primary at `addr`, returning (sender, receiver).
pub async fn connect(addr: SocketAddr) -> io::Result<(TcpSender, TcpReceiver)> {
TcpStream::connect(addr).await.map(split)
}

/// Splits a TCP stream into sender and receiver halves.
pub fn split(stream: TcpStream) -> (TcpSender, TcpReceiver) {
let (rx, tx) = stream.into_split();
(Sender::new(tx), Receiver::new(rx))
}
Loading
Loading