-
Notifications
You must be signed in to change notification settings - Fork 41
feat: added replcator crate with basic protocol impl #1007
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: bmuddha/scheduler/dual-mode
Are you sure you want to change the base?
Changes from all commits
4f78bfa
66d162a
1ee08cd
ebc12ca
dd50387
8c2636d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,24 @@ | ||
| [package] | ||
| name = "magicblock-replicator" | ||
| version.workspace = true | ||
| authors.workspace = true | ||
| repository.workspace = true | ||
| homepage.workspace = true | ||
| license.workspace = true | ||
| edition.workspace = true | ||
|
|
||
| [dependencies] | ||
| bincode = { workspace = true } | ||
| bytes = { workspace = true } | ||
| futures = { workspace = true } | ||
| thiserror = { workspace = true } | ||
| tokio = { workspace = true, features = ["net", "rt", "macros"] } | ||
| tokio-stream = { workspace = true } | ||
| tokio-util = { workspace = true, features = ["codec"] } | ||
| serde = { workspace = true, features = ["derive"] } | ||
| solana-hash = { workspace = true, features = ["serde"] } | ||
| solana-keypair = { workspace = true } | ||
| solana-pubkey = { workspace = true, features = ["serde"] } | ||
| solana-signature = { workspace = true, features = ["serde"] } | ||
| solana-signer = { workspace = true } | ||
| solana-transaction = { workspace = true, features = ["serde"] } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,104 @@ | ||
| //! Codec and stream types for length-prefixed bincode framing. | ||
|
|
||
| use std::{ | ||
| pin::Pin, | ||
| task::{Context, Poll}, | ||
| }; | ||
|
|
||
| use bytes::{BufMut, BytesMut}; | ||
| use futures::{SinkExt, Stream, StreamExt}; | ||
| use tokio::io::{AsyncRead, AsyncWrite}; | ||
| use tokio_util::codec::{ | ||
| Encoder, FramedRead, FramedWrite, LengthDelimitedCodec, | ||
| }; | ||
|
|
||
| use crate::{ | ||
| error::{Error, Result}, | ||
| proto::Message, | ||
| }; | ||
|
|
||
| /// Encodes `Message` with 4-byte LE length prefix. | ||
| pub(crate) struct MessageEncoder; | ||
|
|
||
| pub(crate) type InputStream<IO> = FramedRead<IO, LengthDelimitedCodec>; | ||
| pub(crate) type OutputStream<IO> = FramedWrite<IO, MessageEncoder>; | ||
|
|
||
| const MAX_FRAME_SIZE: usize = 64 * 1024 * 1024; | ||
|
|
||
| impl Encoder<&Message> for MessageEncoder { | ||
| type Error = Error; | ||
|
|
||
| fn encode(&mut self, msg: &Message, dst: &mut BytesMut) -> Result<()> { | ||
| let start = dst.len(); | ||
| dst.put_u32_le(0); | ||
| bincode::serialize_into(dst.writer(), msg)?; | ||
| let len = dst.len() - start - 4; | ||
| if len > MAX_FRAME_SIZE { | ||
| dst.truncate(start); | ||
| return Err(Box::new(bincode::ErrorKind::SizeLimit))?; | ||
| } | ||
| dst[start..start + 4].copy_from_slice(&(len as u32).to_le_bytes()); | ||
| Ok(()) | ||
| } | ||
| } | ||
|
|
||
| /// Receives messages from an async stream (max frame: 64MB). | ||
| pub struct Receiver<IO> { | ||
| inner: InputStream<IO>, | ||
| } | ||
|
|
||
| impl<IO: AsyncRead + Unpin> Receiver<IO> { | ||
| pub fn new(io: IO) -> Self { | ||
| let inner = LengthDelimitedCodec::builder() | ||
| .little_endian() | ||
| .max_frame_length(MAX_FRAME_SIZE) | ||
| .length_field_type::<u32>() | ||
| .new_read(io); | ||
| Self { inner } | ||
| } | ||
|
|
||
| pub async fn recv(&mut self) -> Result<Message> { | ||
| let frame = | ||
| self.inner.next().await.ok_or(Error::ConnectionClosed)??; | ||
| bincode::deserialize(&frame).map_err(Into::into) | ||
| } | ||
| } | ||
|
|
||
| impl<IO: AsyncRead + Unpin> Stream for Receiver<IO> { | ||
| type Item = Result<Message>; | ||
|
|
||
| fn poll_next( | ||
| mut self: Pin<&mut Self>, | ||
| cx: &mut Context<'_>, | ||
| ) -> Poll<Option<Self::Item>> { | ||
| match self.inner.poll_next_unpin(cx) { | ||
| Poll::Ready(Some(Ok(item))) => { | ||
| let result = bincode::deserialize(&item).map_err(Into::into); | ||
| Poll::Ready(Some(result)) | ||
| } | ||
| Poll::Ready(Some(Err(err))) => Poll::Ready(Some(Err(err.into()))), | ||
| Poll::Ready(None) => { | ||
| Poll::Ready(Some(Err(Error::ConnectionClosed))) | ||
| } | ||
| Poll::Pending => Poll::Pending, | ||
| } | ||
| } | ||
| } | ||
|
|
||
| /// Sends messages to an async stream. | ||
| pub struct Sender<IO> { | ||
| inner: OutputStream<IO>, | ||
| } | ||
|
|
||
| impl<IO: AsyncWrite + Unpin> Sender<IO> { | ||
| pub fn new(io: IO) -> Self { | ||
| Self { | ||
| inner: FramedWrite::new(io, MessageEncoder), | ||
| } | ||
| } | ||
|
|
||
| pub async fn send(&mut self, msg: &Message) -> Result<()> { | ||
| self.inner.send(msg).await?; | ||
| Ok(()) | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,14 @@ | ||
| //! Error types for the replication protocol. | ||
|
|
||
| /// Replication operation errors. | ||
| #[derive(thiserror::Error, Debug)] | ||
| pub enum Error { | ||
| #[error("connection closed")] | ||
| ConnectionClosed, | ||
| #[error("IO error: {0}")] | ||
| Io(#[from] std::io::Error), | ||
| #[error("serialization error: {0}")] | ||
| SerDe(#[from] bincode::Error), | ||
| } | ||
|
|
||
| pub type Result<T> = std::result::Result<T, Error>; |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,14 @@ | ||
| //! State replication protocol for streaming transactions from primary to standby nodes. | ||
| //! | ||
| //! Messages are length-prefixed (4B LE) + bincode payload. | ||
|
|
||
| pub mod connection; | ||
| pub mod error; | ||
| pub mod proto; | ||
| pub mod tcp; | ||
|
|
||
| #[cfg(test)] | ||
| mod tests; | ||
|
|
||
| pub use error::{Error, Result}; | ||
| pub use proto::{Message, PROTOCOL_VERSION}; |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,146 @@ | ||
| //! Protocol message types for replication. | ||
| //! | ||
| //! Wire format: 4-byte LE length prefix + bincode payload. | ||
| //! Bincode encodes enum variant index as implicit type tag. | ||
|
|
||
|
Comment on lines
+3
to
+5
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧹 Nitpick | 🔵 Trivial 🧩 Analysis chain🏁 Script executed: cat -n magicblock-replicator/src/proto.rs | head -40Repository: magicblock-labs/magicblock-validator Length of output: 1450 🏁 Script executed: rg -A 5 "bincode|serialize|deserialize" magicblock-replicator/src/proto.rsRepository: magicblock-labs/magicblock-validator Length of output: 1766 🏁 Script executed: rg "Message::" magicblock-replicator/src/ -A 3 | head -50Repository: magicblock-labs/magicblock-validator Length of output: 3044 🏁 Script executed: find magicblock-replicator/src -name "*.rs" | xargs grep -l "bincode\|serialize" | head -10Repository: magicblock-labs/magicblock-validator Length of output: 257 🏁 Script executed: cat -n magicblock-replicator/src/connection.rsRepository: magicblock-labs/magicblock-validator Length of output: 3712 Consider adding explicit message type identifier to wire format instead of relying on bincode's implicit enum discriminants. The current implementation uses bincode's implicit enum encoding for the
This is an architectural improvement worth considering, though the current approach works reliably with bincode. 🤖 Prompt for AI Agents |
||
| use serde::{Deserialize, Serialize}; | ||
| use solana_hash::Hash; | ||
| use solana_keypair::Keypair; | ||
| use solana_pubkey::Pubkey; | ||
| use solana_signature::Signature; | ||
| use solana_signer::Signer; | ||
| use solana_transaction::versioned::VersionedTransaction; | ||
|
|
||
| use crate::error::Result; | ||
|
|
||
| pub type Slot = u64; | ||
| pub type TxIndex = u32; | ||
|
|
||
| pub const PROTOCOL_VERSION: u32 = 1; | ||
|
|
||
| /// Top-level replication message. | ||
| #[derive(Deserialize, Serialize, Clone, Debug)] | ||
| pub enum Message { | ||
| HandshakeReq(HandshakeRequest), | ||
| HandshakeResp(HandshakeResponse), | ||
| Transaction(Transaction), | ||
| Block(Block), | ||
| SuperBlock(SuperBlock), | ||
| Failover(FailoverSignal), | ||
| } | ||
|
Comment on lines
+23
to
+30
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
The linked objective for issue 🤖 Prompt for AI Agents |
||
|
|
||
| /// Client -> Server: initiate replication session. | ||
| /// Authenticated via Ed25519 signature over `start_slot`. | ||
| #[derive(Deserialize, Serialize, Clone, Debug)] | ||
| pub struct HandshakeRequest { | ||
| pub version: u32, | ||
| pub start_slot: Slot, | ||
| pub identity: Pubkey, | ||
| signature: Signature, | ||
| } | ||
|
|
||
| /// Server -> Client: accept or reject session. | ||
| /// Signed over `slot` (success) or error message (failure). | ||
| #[derive(Deserialize, Serialize, Clone, Debug)] | ||
| pub struct HandshakeResponse { | ||
| pub result: std::result::Result<Slot, String>, | ||
| pub identity: Pubkey, | ||
| signature: Signature, | ||
| } | ||
|
|
||
| /// Slot boundary marker with blockhash. | ||
| #[derive(Deserialize, Serialize, Clone, Debug)] | ||
| pub struct Block { | ||
| pub slot: Slot, | ||
| pub hash: Hash, | ||
| } | ||
|
|
||
| /// Transaction with slot and ordinal position. | ||
| #[derive(Deserialize, Serialize, Clone, Debug)] | ||
| pub struct Transaction { | ||
| pub slot: Slot, | ||
| pub index: TxIndex, | ||
| /// Bincode-encoded `VersionedTransaction`. | ||
| pub payload: Vec<u8>, | ||
| } | ||
|
|
||
| /// Periodic checkpoint for state verification. | ||
| #[derive(Deserialize, Serialize, Clone, Debug)] | ||
| pub struct SuperBlock { | ||
| pub blocks: u64, | ||
| pub transactions: u64, | ||
| pub checksum: u64, | ||
| } | ||
|
|
||
| /// Primary -> Standby: signal controlled failover. | ||
| #[derive(Deserialize, Serialize, Clone, Debug)] | ||
| pub struct FailoverSignal { | ||
| pub slot: Slot, | ||
| signature: Signature, | ||
| } | ||
|
|
||
| impl HandshakeRequest { | ||
| pub fn new(start_slot: Slot, keypair: &Keypair) -> Self { | ||
| Self { | ||
| version: PROTOCOL_VERSION, | ||
| start_slot, | ||
| identity: keypair.pubkey(), | ||
| signature: keypair.sign_message(&start_slot.to_le_bytes()), | ||
| } | ||
| } | ||
|
|
||
| /// Verifies signature matches claimed identity. | ||
| pub fn verify(&self) -> bool { | ||
| self.signature | ||
| .verify(self.identity.as_array(), &self.start_slot.to_le_bytes()) | ||
| } | ||
| } | ||
|
|
||
| impl HandshakeResponse { | ||
| pub fn new(result: Result<Slot>, keypair: &Keypair) -> Self { | ||
| let result = result.map_err(|e| e.to_string()); | ||
| let signature = match &result { | ||
| Ok(slot) => keypair.sign_message(&slot.to_le_bytes()), | ||
| Err(err) => keypair.sign_message(err.as_bytes()), | ||
| }; | ||
| Self { | ||
| result, | ||
| identity: keypair.pubkey(), | ||
| signature, | ||
| } | ||
| } | ||
|
|
||
| /// Verifies signature matches server identity. | ||
| pub fn verify(&self) -> bool { | ||
| match &self.result { | ||
| Ok(slot) => self | ||
| .signature | ||
| .verify(self.identity.as_array(), &slot.to_le_bytes()), | ||
| Err(err) => self | ||
| .signature | ||
| .verify(self.identity.as_array(), err.as_bytes()), | ||
| } | ||
| } | ||
| } | ||
|
|
||
| impl Transaction { | ||
| /// Deserializes the inner transaction. | ||
| pub fn decode(&self) -> bincode::Result<VersionedTransaction> { | ||
| bincode::deserialize(&self.payload) | ||
| } | ||
| } | ||
|
|
||
| impl FailoverSignal { | ||
| pub fn new(slot: Slot, keypair: &Keypair) -> Self { | ||
| Self { | ||
| slot, | ||
| signature: keypair.sign_message(&slot.to_le_bytes()), | ||
| } | ||
| } | ||
|
|
||
| /// Verifies signal against expected identity. | ||
| pub fn verify(&self, identity: Pubkey) -> bool { | ||
| self.signature | ||
| .verify(identity.as_array(), &self.slot.to_le_bytes()) | ||
| } | ||
bmuddha marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,24 @@ | ||
| //! TCP transport utilities. | ||
|
|
||
| use std::{io, net::SocketAddr}; | ||
|
|
||
| use tokio::net::{ | ||
| tcp::{OwnedReadHalf, OwnedWriteHalf}, | ||
| TcpStream, | ||
| }; | ||
|
|
||
| use crate::connection::{Receiver, Sender}; | ||
|
|
||
| pub type TcpReceiver = Receiver<OwnedReadHalf>; | ||
| pub type TcpSender = Sender<OwnedWriteHalf>; | ||
|
|
||
| /// Connects to a primary at `addr`, returning (sender, receiver). | ||
| pub async fn connect(addr: SocketAddr) -> io::Result<(TcpSender, TcpReceiver)> { | ||
| TcpStream::connect(addr).await.map(split) | ||
| } | ||
|
|
||
| /// Splits a TCP stream into sender and receiver halves. | ||
| pub fn split(stream: TcpStream) -> (TcpSender, TcpReceiver) { | ||
| let (rx, tx) = stream.into_split(); | ||
| (Sender::new(tx), Receiver::new(rx)) | ||
bmuddha marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
Repository: magicblock-labs/magicblock-validator
Length of output: 121
🏁 Script executed:
Repository: magicblock-labs/magicblock-validator
Length of output: 448
🏁 Script executed:
Repository: magicblock-labs/magicblock-validator
Length of output: 119
🏁 Script executed:
cat -n magicblock-replicator/src/connection.rs | head -100Repository: magicblock-labs/magicblock-validator
Length of output: 3624
🏁 Script executed:
rg "Receiver<" magicblock-replicator/src/ -A 3 -B 1Repository: magicblock-labs/magicblock-validator
Length of output: 1355
🏁 Script executed:
Repository: magicblock-labs/magicblock-validator
Length of output: 62
🏁 Script executed:
Repository: magicblock-labs/magicblock-validator
Length of output: 3722
🏁 Script executed:
rg "select\!|select|while.*stream|next\(\)" magicblock-replicator/src/ -A 2 -B 1Repository: magicblock-labs/magicblock-validator
Length of output: 190
🏁 Script executed:
Repository: magicblock-labs/magicblock-validator
Length of output: 2404
🏁 Script executed:
Repository: magicblock-labs/magicblock-validator
Length of output: 990
🏁 Script executed:
Repository: magicblock-labs/magicblock-validator
Length of output: 1035
🏁 Script executed:
Repository: magicblock-labs/magicblock-validator
Length of output: 3171
🏁 Script executed:
Repository: magicblock-labs/magicblock-validator
Length of output: 182
🏁 Script executed:
Repository: magicblock-labs/magicblock-validator
Length of output: 8406
🏁 Script executed:
Repository: magicblock-labs/magicblock-validator
Length of output: 7810
🏁 Script executed:
Repository: magicblock-labs/magicblock-validator
Length of output: 612
🏁 Script executed:
# Look at the error enum to understand ConnectionClosed cat -n magicblock-replicator/src/error.rsRepository: magicblock-labs/magicblock-validator
Length of output: 559
🏁 Script executed:
Repository: magicblock-labs/magicblock-validator
Length of output: 496
🏁 Script executed:
Repository: magicblock-labs/magicblock-validator
Length of output: 378
🌐 Web query:
Rust futures Stream trait poll_next EOF None contract semantics💡 Result:
In Rust’s
futures::Stream,poll_nextreturnsPoll<Option<Item>>, wherePoll::Ready(None)is the “end of stream” / exhausted signal (analogous to iterator EOF). (docs.rs)Contract after
Ready(None)poll_nextreturnsPoll::Ready(None), the stream is terminated andpoll_nextshould not be invoked again. (docs.rs)Streamtrait does not define what happens (it may panic, block forever, or even produce more items), but it still must not cause Rust UB. (docs.rs)How to get “EOF stays EOF” semantics
StreamExt::fuse()/Fuse, which guarantees that after the underlying stream ends, subsequent polls keep returningNoneforever (well-defined post-EOF behavior). (people.eecs.berkeley.edu)FusedStream(and you can checkis_terminated()), which is specifically about tracking when the stream should no longer be polled. (docs.rs)Important caveat
Some types implement
Streambut document thatNonecan mean “currently empty” rather than “permanently finished” (e.g.,FuturesOrderedcan returnReady(None)when it has no futures, then later produce items after youpushmore). In those cases, treatNoneaccording to that type’s docs, or wrap infuse()if you need strict EOF semantics. (docs.subsocial.network)Citations:
Receiverstream must returnNoneon EOF to comply with Stream trait contract.Returning
Some(Err(ConnectionClosed))when the inner stream ends violates the Stream trait contract, which requirespoll_nextto signal termination withReady(None). This prevents proper stream termination and can causewhile let Some(...)consumers to loop indefinitely on disconnect.♻️ Proposed fix
🤖 Prompt for AI Agents