diff --git a/Cargo.lock b/Cargo.lock index 2531e8e12..e90160ff8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3434,6 +3434,26 @@ dependencies = [ "thiserror 1.0.69", ] +[[package]] +name = "magicblock-replicator" +version = "0.8.2" +dependencies = [ + "bincode", + "bytes", + "futures", + "serde", + "solana-hash", + "solana-keypair", + "solana-pubkey", + "solana-signature", + "solana-signer", + "solana-transaction", + "thiserror 1.0.69", + "tokio", + "tokio-stream", + "tokio-util", +] + [[package]] name = "magicblock-rpc-client" version = "0.8.2" diff --git a/Cargo.toml b/Cargo.toml index b8f6b13b7..6d6c75f9b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,6 +19,7 @@ members = [ "magicblock-ledger", "magicblock-metrics", "magicblock-processor", + "magicblock-replicator", "magicblock-rpc-client", "magicblock-table-mania", "magicblock-task-scheduler", @@ -54,6 +55,7 @@ assert_matches = "1.5.0" async-trait = "0.1.77" base64 = "0.21.7" bincode = "1.3.3" +bytes = "1.0" borsh = { version = "1.5.1", features = ["derive", "unstable__schema"] } bs58 = "0.5.1" byteorder = "1.5.0" diff --git a/magicblock-replicator/Cargo.toml b/magicblock-replicator/Cargo.toml new file mode 100644 index 000000000..62ceb4e1e --- /dev/null +++ b/magicblock-replicator/Cargo.toml @@ -0,0 +1,24 @@ +[package] +name = "magicblock-replicator" +version.workspace = true +authors.workspace = true +repository.workspace = true +homepage.workspace = true +license.workspace = true +edition.workspace = true + +[dependencies] +bincode = { workspace = true } +bytes = { workspace = true } +futures = { workspace = true } +thiserror = { workspace = true } +tokio = { workspace = true, features = ["net", "rt", "macros"] } +tokio-stream = { workspace = true } +tokio-util = { workspace = true, features = ["codec"] } +serde = { workspace = true, features = ["derive"] } +solana-hash = { workspace = true, features = ["serde"] } +solana-keypair = { workspace = true } +solana-pubkey = { workspace = true, features = ["serde"] } +solana-signature = { workspace = true, features = ["serde"] } +solana-signer = { workspace = true } +solana-transaction = { workspace = true, features = ["serde"] } diff --git a/magicblock-replicator/src/connection.rs b/magicblock-replicator/src/connection.rs new file mode 100644 index 000000000..a6651c0df --- /dev/null +++ b/magicblock-replicator/src/connection.rs @@ -0,0 +1,104 @@ +//! Codec and stream types for length-prefixed bincode framing. + +use std::{ + pin::Pin, + task::{Context, Poll}, +}; + +use bytes::{BufMut, BytesMut}; +use futures::{SinkExt, Stream, StreamExt}; +use tokio::io::{AsyncRead, AsyncWrite}; +use tokio_util::codec::{ + Encoder, FramedRead, FramedWrite, LengthDelimitedCodec, +}; + +use crate::{ + error::{Error, Result}, + proto::Message, +}; + +/// Encodes `Message` with 4-byte LE length prefix. +pub(crate) struct MessageEncoder; + +pub(crate) type InputStream = FramedRead; +pub(crate) type OutputStream = FramedWrite; + +const MAX_FRAME_SIZE: usize = 64 * 1024 * 1024; + +impl Encoder<&Message> for MessageEncoder { + type Error = Error; + + fn encode(&mut self, msg: &Message, dst: &mut BytesMut) -> Result<()> { + let start = dst.len(); + dst.put_u32_le(0); + bincode::serialize_into(dst.writer(), msg)?; + let len = dst.len() - start - 4; + if len > MAX_FRAME_SIZE { + dst.truncate(start); + return Err(Box::new(bincode::ErrorKind::SizeLimit))?; + } + dst[start..start + 4].copy_from_slice(&(len as u32).to_le_bytes()); + Ok(()) + } +} + +/// Receives messages from an async stream (max frame: 64MB). +pub struct Receiver { + inner: InputStream, +} + +impl Receiver { + pub fn new(io: IO) -> Self { + let inner = LengthDelimitedCodec::builder() + .little_endian() + .max_frame_length(MAX_FRAME_SIZE) + .length_field_type::() + .new_read(io); + Self { inner } + } + + pub async fn recv(&mut self) -> Result { + let frame = + self.inner.next().await.ok_or(Error::ConnectionClosed)??; + bincode::deserialize(&frame).map_err(Into::into) + } +} + +impl Stream for Receiver { + type Item = Result; + + fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + match self.inner.poll_next_unpin(cx) { + Poll::Ready(Some(Ok(item))) => { + let result = bincode::deserialize(&item).map_err(Into::into); + Poll::Ready(Some(result)) + } + Poll::Ready(Some(Err(err))) => Poll::Ready(Some(Err(err.into()))), + Poll::Ready(None) => { + Poll::Ready(Some(Err(Error::ConnectionClosed))) + } + Poll::Pending => Poll::Pending, + } + } +} + +/// Sends messages to an async stream. +pub struct Sender { + inner: OutputStream, +} + +impl Sender { + pub fn new(io: IO) -> Self { + Self { + inner: FramedWrite::new(io, MessageEncoder), + } + } + + pub async fn send(&mut self, msg: &Message) -> Result<()> { + self.inner.send(msg).await?; + Ok(()) + } +} diff --git a/magicblock-replicator/src/error.rs b/magicblock-replicator/src/error.rs new file mode 100644 index 000000000..9e66f70bb --- /dev/null +++ b/magicblock-replicator/src/error.rs @@ -0,0 +1,14 @@ +//! Error types for the replication protocol. + +/// Replication operation errors. +#[derive(thiserror::Error, Debug)] +pub enum Error { + #[error("connection closed")] + ConnectionClosed, + #[error("IO error: {0}")] + Io(#[from] std::io::Error), + #[error("serialization error: {0}")] + SerDe(#[from] bincode::Error), +} + +pub type Result = std::result::Result; diff --git a/magicblock-replicator/src/lib.rs b/magicblock-replicator/src/lib.rs new file mode 100644 index 000000000..c4ee0513e --- /dev/null +++ b/magicblock-replicator/src/lib.rs @@ -0,0 +1,14 @@ +//! State replication protocol for streaming transactions from primary to standby nodes. +//! +//! Messages are length-prefixed (4B LE) + bincode payload. + +pub mod connection; +pub mod error; +pub mod proto; +pub mod tcp; + +#[cfg(test)] +mod tests; + +pub use error::{Error, Result}; +pub use proto::{Message, PROTOCOL_VERSION}; diff --git a/magicblock-replicator/src/proto.rs b/magicblock-replicator/src/proto.rs new file mode 100644 index 000000000..1becc33ce --- /dev/null +++ b/magicblock-replicator/src/proto.rs @@ -0,0 +1,146 @@ +//! Protocol message types for replication. +//! +//! Wire format: 4-byte LE length prefix + bincode payload. +//! Bincode encodes enum variant index as implicit type tag. + +use serde::{Deserialize, Serialize}; +use solana_hash::Hash; +use solana_keypair::Keypair; +use solana_pubkey::Pubkey; +use solana_signature::Signature; +use solana_signer::Signer; +use solana_transaction::versioned::VersionedTransaction; + +use crate::error::Result; + +pub type Slot = u64; +pub type TxIndex = u32; + +pub const PROTOCOL_VERSION: u32 = 1; + +/// Top-level replication message. +#[derive(Deserialize, Serialize, Clone, Debug)] +pub enum Message { + HandshakeReq(HandshakeRequest), + HandshakeResp(HandshakeResponse), + Transaction(Transaction), + Block(Block), + SuperBlock(SuperBlock), + Failover(FailoverSignal), +} + +/// Client -> Server: initiate replication session. +/// Authenticated via Ed25519 signature over `start_slot`. +#[derive(Deserialize, Serialize, Clone, Debug)] +pub struct HandshakeRequest { + pub version: u32, + pub start_slot: Slot, + pub identity: Pubkey, + signature: Signature, +} + +/// Server -> Client: accept or reject session. +/// Signed over `slot` (success) or error message (failure). +#[derive(Deserialize, Serialize, Clone, Debug)] +pub struct HandshakeResponse { + pub result: std::result::Result, + pub identity: Pubkey, + signature: Signature, +} + +/// Slot boundary marker with blockhash. +#[derive(Deserialize, Serialize, Clone, Debug)] +pub struct Block { + pub slot: Slot, + pub hash: Hash, +} + +/// Transaction with slot and ordinal position. +#[derive(Deserialize, Serialize, Clone, Debug)] +pub struct Transaction { + pub slot: Slot, + pub index: TxIndex, + /// Bincode-encoded `VersionedTransaction`. + pub payload: Vec, +} + +/// Periodic checkpoint for state verification. +#[derive(Deserialize, Serialize, Clone, Debug)] +pub struct SuperBlock { + pub blocks: u64, + pub transactions: u64, + pub checksum: u64, +} + +/// Primary -> Standby: signal controlled failover. +#[derive(Deserialize, Serialize, Clone, Debug)] +pub struct FailoverSignal { + pub slot: Slot, + signature: Signature, +} + +impl HandshakeRequest { + pub fn new(start_slot: Slot, keypair: &Keypair) -> Self { + Self { + version: PROTOCOL_VERSION, + start_slot, + identity: keypair.pubkey(), + signature: keypair.sign_message(&start_slot.to_le_bytes()), + } + } + + /// Verifies signature matches claimed identity. + pub fn verify(&self) -> bool { + self.signature + .verify(self.identity.as_array(), &self.start_slot.to_le_bytes()) + } +} + +impl HandshakeResponse { + pub fn new(result: Result, keypair: &Keypair) -> Self { + let result = result.map_err(|e| e.to_string()); + let signature = match &result { + Ok(slot) => keypair.sign_message(&slot.to_le_bytes()), + Err(err) => keypair.sign_message(err.as_bytes()), + }; + Self { + result, + identity: keypair.pubkey(), + signature, + } + } + + /// Verifies signature matches server identity. + pub fn verify(&self) -> bool { + match &self.result { + Ok(slot) => self + .signature + .verify(self.identity.as_array(), &slot.to_le_bytes()), + Err(err) => self + .signature + .verify(self.identity.as_array(), err.as_bytes()), + } + } +} + +impl Transaction { + /// Deserializes the inner transaction. + pub fn decode(&self) -> bincode::Result { + bincode::deserialize(&self.payload) + } +} + +impl FailoverSignal { + pub fn new(slot: Slot, keypair: &Keypair) -> Self { + Self { + slot, + signature: keypair.sign_message(&slot.to_le_bytes()), + } + } + + /// Verifies signal against expected identity. + pub fn verify(&self, identity: Pubkey) -> bool { + self.signature + .verify(identity.as_array(), &self.slot.to_le_bytes()) + } +} diff --git a/magicblock-replicator/src/tcp.rs b/magicblock-replicator/src/tcp.rs new file mode 100644 index 000000000..10b3d9a6a --- /dev/null +++ b/magicblock-replicator/src/tcp.rs @@ -0,0 +1,24 @@ +//! TCP transport utilities. + +use std::{io, net::SocketAddr}; + +use tokio::net::{ + tcp::{OwnedReadHalf, OwnedWriteHalf}, + TcpStream, +}; + +use crate::connection::{Receiver, Sender}; + +pub type TcpReceiver = Receiver; +pub type TcpSender = Sender; + +/// Connects to a primary at `addr`, returning (sender, receiver). +pub async fn connect(addr: SocketAddr) -> io::Result<(TcpSender, TcpReceiver)> { + TcpStream::connect(addr).await.map(split) +} + +/// Splits a TCP stream into sender and receiver halves. +pub fn split(stream: TcpStream) -> (TcpSender, TcpReceiver) { + let (rx, tx) = stream.into_split(); + (Sender::new(tx), Receiver::new(rx)) +} diff --git a/magicblock-replicator/src/tests.rs b/magicblock-replicator/src/tests.rs new file mode 100644 index 000000000..ea7499cd2 --- /dev/null +++ b/magicblock-replicator/src/tests.rs @@ -0,0 +1,309 @@ +//! Tests suite for replication protocol. + +use solana_keypair::Keypair; +use solana_pubkey::Pubkey; +use solana_signer::Signer; +use tokio::net::{TcpListener, TcpStream}; + +use crate::{ + proto::{ + Block, FailoverSignal, HandshakeRequest, HandshakeResponse, Message, + SuperBlock, Transaction, + }, + tcp::split, +}; + +// ============================================================================= +// Wire Format Tests - catch serialization/protocol changes +// ============================================================================= + +#[test] +fn variant_order_stability() { + // Bincode encodes enum discriminant as variant index. + // Reordering enum variants silently breaks wire compatibility. + let cases: [(Message, u32); 6] = [ + ( + Message::HandshakeReq(HandshakeRequest::new(0, &Keypair::new())), + 0, + ), + ( + Message::HandshakeResp(HandshakeResponse::new( + Ok(0), + &Keypair::new(), + )), + 1, + ), + ( + Message::Transaction(Transaction { + slot: 0, + index: 0, + payload: vec![], + }), + 2, + ), + ( + Message::Block(Block { + slot: 0, + hash: solana_hash::Hash::default(), + }), + 3, + ), + ( + Message::SuperBlock(SuperBlock { + blocks: 0, + transactions: 0, + checksum: 0, + }), + 4, + ), + ( + Message::Failover(FailoverSignal::new(0, &Keypair::new())), + 5, + ), + ]; + + for (msg, expected_idx) in cases { + let encoded = bincode::serialize(&msg).unwrap(); + let actual_idx = u32::from_le_bytes([ + encoded[0], encoded[1], encoded[2], encoded[3], + ]); + assert_eq!( + actual_idx, expected_idx, + "variant index changed - this breaks wire compatibility!" + ); + } +} + +#[test] +fn signed_message_roundtrip() { + // Signed messages (handshake, failover) have complex serialization. + // Unsigned messages are trivial and covered by variant_order_stability. + let kp = Keypair::new(); + + let cases = vec![ + Message::HandshakeReq(HandshakeRequest::new(12345, &kp)), + Message::HandshakeResp(HandshakeResponse::new(Ok(99999), &kp)), + Message::HandshakeResp(HandshakeResponse::new( + Err(crate::error::Error::ConnectionClosed), + &kp, + )), + Message::Failover(FailoverSignal::new(77777, &kp)), + Message::Transaction(Transaction { + slot: 54321, + index: 42, + payload: (0..255).collect(), + }), + ]; + + for msg in cases { + let encoded = bincode::serialize(&msg).unwrap(); + let decoded: Message = bincode::deserialize(&encoded).unwrap(); + assert_eq!(bincode::serialize(&decoded).unwrap(), encoded); + } +} + +// ============================================================================= +// Signature Verification Tests - catch crypto/auth bugs +// ============================================================================= + +#[test] +fn handshake_tampering_detected() { + let kp = Keypair::new(); + let req = HandshakeRequest::new(12345, &kp); + + // Valid signature works + assert!(req.verify()); + + // Tampered identity fails + let mut tampered = req.clone(); + tampered.identity = Pubkey::new_unique(); + assert!(!tampered.verify(), "tampered identity should fail"); + + // Tampered slot fails (slot is at offset 4 after version u32) + let mut bytes = bincode::serialize(&req).unwrap(); + bytes[4..12].copy_from_slice(&99999u64.to_le_bytes()); + let decoded: HandshakeRequest = bincode::deserialize(&bytes).unwrap(); + assert!(!decoded.verify(), "tampered slot should fail"); +} + +#[test] +fn handshake_response_signing() { + // Success and error paths sign different data - both must verify. + let kp = Keypair::new(); + + let success = HandshakeResponse::new(Ok(5000), &kp); + assert!(success.verify(), "success response should verify"); + + let error = + HandshakeResponse::new(Err(crate::error::Error::ConnectionClosed), &kp); + assert!(error.verify(), "error response should verify"); +} + +#[test] +fn failover_signal_verification() { + let kp = Keypair::new(); + let signal = FailoverSignal::new(99999, &kp); + + // Correct identity verifies + assert!(signal.verify(kp.pubkey())); + + // Wrong identity fails + assert!( + !signal.verify(Pubkey::new_unique()), + "wrong identity should fail" + ); +} + +// ============================================================================= +// TCP Transport Tests - catch framing/connection bugs +// ============================================================================= + +#[tokio::test] +async fn bidirectional_handshake() { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + + let client = TcpStream::connect(addr).await.unwrap(); + let (server, _) = listener.accept().await.unwrap(); + + let (mut client_tx, mut client_rx) = split(client); + let (mut server_tx, mut server_rx) = split(server); + + // Client -> Server: handshake request + let kp = Keypair::new(); + client_tx + .send(&Message::HandshakeReq(HandshakeRequest::new(1000, &kp))) + .await + .unwrap(); + + let req = match server_rx.recv().await.unwrap() { + Message::HandshakeReq(r) => r, + _ => panic!("expected HandshakeReq"), + }; + assert!(req.verify()); + assert_eq!(req.start_slot, 1000); + + // Server -> Client: handshake response + server_tx + .send(&Message::HandshakeResp(HandshakeResponse::new( + Ok(1000), + &Keypair::new(), + ))) + .await + .unwrap(); + + let resp = match client_rx.recv().await.unwrap() { + Message::HandshakeResp(r) => r, + _ => panic!("expected HandshakeResp"), + }; + assert!(resp.verify()); + assert_eq!(resp.result, Ok(1000u64)); +} + +#[tokio::test] +async fn message_ordering_over_stream() { + // Tests that TCP framing preserves message boundaries and order. + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + + let client = TcpStream::connect(addr).await.unwrap(); + let (server, _) = listener.accept().await.unwrap(); + + let (mut tx, _) = split(client); + let (_, mut rx) = split(server); + + // Send mixed message types + for i in 0..10 { + tx.send(&Message::Block(Block { + slot: i, + hash: solana_hash::Hash::new_unique(), + })) + .await + .unwrap(); + } + + // Verify order is preserved + for expected in 0..10 { + match rx.recv().await.unwrap() { + Message::Block(b) => assert_eq!(b.slot, expected), + _ => panic!("expected Block"), + } + } +} + +#[tokio::test] +async fn large_payload() { + // Tests frame handling for messages larger than TCP buffer. + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + + let client = TcpStream::connect(addr).await.unwrap(); + let (server, _) = listener.accept().await.unwrap(); + + let (mut tx, _) = split(client); + let (_, mut rx) = split(server); + + let payload = vec![0xAB; 1024 * 1024]; // 1MB + tx.send(&Message::Transaction(Transaction { + slot: 0, + index: 0, + payload: payload.clone(), + })) + .await + .unwrap(); + + match rx.recv().await.unwrap() { + Message::Transaction(t) => { + assert_eq!(t.payload, payload); + } + _ => panic!("expected Transaction"), + } +} + +#[tokio::test] +async fn all_message_types_over_wire() { + // Tests encoder→TCP→decoder path for all 6 message types. + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + + let client = TcpStream::connect(addr).await.unwrap(); + let (server, _) = listener.accept().await.unwrap(); + + let (mut tx, _) = split(client); + let (_, mut rx) = split(server); + + let kp = Keypair::new(); + let messages: Vec = vec![ + Message::HandshakeReq(HandshakeRequest::new(12345, &kp)), + Message::HandshakeResp(HandshakeResponse::new(Ok(67890), &kp)), + Message::Transaction(Transaction { + slot: 100, + index: 5, + payload: vec![0xDE, 0xAD, 0xBE, 0xEF], + }), + Message::Block(Block { + slot: 200, + hash: solana_hash::Hash::new_unique(), + }), + Message::SuperBlock(SuperBlock { + blocks: 1000, + transactions: 50000, + checksum: 0xCAFEBABE, + }), + Message::Failover(FailoverSignal::new(99999, &kp)), + ]; + + for msg in &messages { + tx.send(msg).await.unwrap(); + } + + for expected in &messages { + let received = rx.recv().await.unwrap(); + // Compare serialized form to catch any encoding differences + assert_eq!( + bincode::serialize(&received).unwrap(), + bincode::serialize(expected).unwrap(), + "wire roundtrip mismatch" + ); + } +} diff --git a/test-integration/Cargo.lock b/test-integration/Cargo.lock index 7c461cac7..2937cd779 100644 --- a/test-integration/Cargo.lock +++ b/test-integration/Cargo.lock @@ -10084,6 +10084,7 @@ dependencies = [ "solana-transaction", "solana-transaction-status-client-types", "tempfile", + "tokio", "tracing", "tracing-log", "tracing-subscriber",