feat: added replcator crate with basic protocol impl#1007
feat: added replcator crate with basic protocol impl#1007bmuddha wants to merge 6 commits intobmuddha/scheduler/dual-modefrom
Conversation
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughThe PR adds a new workspace crate Assessment against linked issues
Out-of-scope changes
Suggested reviewers
✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
Warning This pull request is not mergeable via GitHub because a downstack PR is open. Once all requirements are satisfied, merge this PR as a stack on Graphite.
This stack of pull requests is managed by Graphite. Learn more about stacking. |
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@magicblock-replicator/src/connection.rs`:
- Around line 19-30: MessageEncoder::encode can produce frames larger than the
Receiver's 64KB limit, causing the remote side to reject them; update the
Encoder (MessageEncoder::encode) to compute the serialized message size (e.g.,
via bincode::serialized_size or by serializing into a temporary buffer), compare
it against the same 64 * 1024 frame limit used by the Receiver, and return an
Err(Error) if it exceeds that limit instead of writing an oversize frame; keep
the existing write path when size is within the limit so the existing
dst.put_u32_le / serialize_into logic is unchanged.
ℹ️ Review info
Configuration used: Path: .coderabbit.yaml
Review profile: ASSERTIVE
Plan: Pro
⛔ Files ignored due to path filters (1)
Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (7)
Cargo.tomlmagicblock-replicator/Cargo.tomlmagicblock-replicator/src/connection.rsmagicblock-replicator/src/error.rsmagicblock-replicator/src/lib.rsmagicblock-replicator/src/proto.rsmagicblock-replicator/src/tcp.rs
1acdff2 to
6e3a51e
Compare
23c811a to
be09338
Compare
6e3a51e to
936c1c0
Compare
be09338 to
31292d0
Compare
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@magicblock-replicator/src/proto.rs`:
- Around line 83-145: The signatures are created over raw bytes that overlap
across message types and omit the protocol version; update all signing and
verification to domain-separate and bind version by including an explicit
context string and the protocol version in the signed payload. Concretely,
change each signer call (the constructors that call keypair.sign_message in the
implementations for the first message type (the HandshakeRequest-like impl),
HandshakeResponse::new, and FailoverSignal::new) to sign a concatenation like
b"<MSG_TYPE>|v{PROTOCOL_VERSION}|"+payload (use a distinct MSG_TYPE per struct,
e.g., "HandshakeRequest", "HandshakeResponse", "FailoverSignal"), and update the
corresponding verify methods (the verify implementations for those types) to
reconstruct the same domain-separated bytes (including version) before calling
signature.verify; do the same for error branches in
HandshakeResponse::new/verify so both Ok and Err paths use the same
domain-separated scheme. Ensure VersionedTransaction/Transaction::decode is
unaffected but do not sign raw payloads elsewhere.
In `@magicblock-replicator/src/tcp.rs`:
- Around line 15-23: The connect/split helpers currently hard-code Tokio
TcpStream and so cannot support the WebSocket framing required by the objective;
replace or refactor TcpSender/TcpReceiver creation so the transport is
abstracted and supports WebSocket-frame wrapping (type prefix + bincode
payload). Update the public functions connect and split (and any constructors
like Sender::new / Receiver::new) to work against a Transport trait (or separate
WsStream implementation) that implements send/receive with WebSocket framing, or
provide alternate connect_ws/split_ws that wrap a WebSocket stream into
TcpSender/TcpReceiver equivalents using the required frame format; ensure the
new implementation still returns the same TcpSender/TcpReceiver types or
introduces clearly named WsSender/WsReceiver to avoid breaking callers.
ℹ️ Review info
Configuration used: Path: .coderabbit.yaml
Review profile: ASSERTIVE
Plan: Pro
⛔ Files ignored due to path filters (1)
Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (8)
Cargo.tomlmagicblock-replicator/Cargo.tomlmagicblock-replicator/src/connection.rsmagicblock-replicator/src/error.rsmagicblock-replicator/src/lib.rsmagicblock-replicator/src/proto.rsmagicblock-replicator/src/tcp.rsmagicblock-replicator/src/tests.rs
936c1c0 to
1642077
Compare
31292d0 to
15c008a
Compare
1642077 to
69f7765
Compare
15c008a to
66c7f58
Compare
There was a problem hiding this comment.
Actionable comments posted: 1
♻️ Duplicate comments (1)
magicblock-replicator/src/proto.rs (1)
83-145:⚠️ Potential issue | 🟠 MajorSignatures are not domain-separated and do not bind protocol version.
These constructors/validators sign and verify raw payload bytes only. This permits cross-context replay and does not cryptographically protect
versionfrom tampering.🔐 Suggested hardening pattern
+fn signature_preimage(domain: &'static [u8], version: u32, body: &[u8]) -> Vec<u8> { + let mut out = Vec::with_capacity(domain.len() + 1 + 4 + body.len()); + out.extend_from_slice(domain); + out.push(0); + out.extend_from_slice(&version.to_le_bytes()); + out.extend_from_slice(body); + out +} + impl HandshakeRequest { pub fn new(start_slot: Slot, keypair: &Keypair) -> Self { + let version = PROTOCOL_VERSION; + let msg = signature_preimage(b"HandshakeRequest", version, &start_slot.to_le_bytes()); Self { - version: PROTOCOL_VERSION, + version, start_slot, identity: keypair.pubkey(), - signature: keypair.sign_message(&start_slot.to_le_bytes()), + signature: keypair.sign_message(&msg), } } pub fn verify(&self) -> bool { - self.signature - .verify(self.identity.as_array(), &self.start_slot.to_le_bytes()) + let msg = signature_preimage(b"HandshakeRequest", self.version, &self.start_slot.to_le_bytes()); + self.signature.verify(self.identity.as_array(), &msg) } }Apply the same domain+version preimage approach to
HandshakeResponseandFailoverSignal.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@magicblock-replicator/src/proto.rs` around lines 83 - 145, Construct and verify signatures over a domain-separated preimage that includes a static domain tag and the PROTOCOL_VERSION to prevent cross-context replay and bind the version; update Handshake::new/verify, HandshakeResponse::new/verify, and FailoverSignal::new/verify so their sign_message input is a concatenation of a fixed domain string (e.g. "magicblock-v1"), PROTOCOL_VERSION (as bytes), and the existing payload bytes (start_slot/slot or result bytes), and make the corresponding verify methods expect that same prefixed preimage when calling signature.verify.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@magicblock-replicator/src/connection.rs`:
- Around line 80-82: The Receiver stream currently maps the inner stream's
Poll::Ready(None) to Poll::Ready(Some(Err(Error::ConnectionClosed))), violating
Stream's contract; update the poll_next implementation in the Receiver (the impl
Stream for Receiver / poll_next method) so that when the inner stream returns
Poll::Ready(None) it returns Poll::Ready(None) (i.e., signal EOF) instead of
emitting a ConnectionClosed error, preserving proper termination semantics for
consumers.
---
Duplicate comments:
In `@magicblock-replicator/src/proto.rs`:
- Around line 83-145: Construct and verify signatures over a domain-separated
preimage that includes a static domain tag and the PROTOCOL_VERSION to prevent
cross-context replay and bind the version; update Handshake::new/verify,
HandshakeResponse::new/verify, and FailoverSignal::new/verify so their
sign_message input is a concatenation of a fixed domain string (e.g.
"magicblock-v1"), PROTOCOL_VERSION (as bytes), and the existing payload bytes
(start_slot/slot or result bytes), and make the corresponding verify methods
expect that same prefixed preimage when calling signature.verify.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: ASSERTIVE
Plan: Pro
Run ID: fcc6e96e-ecb4-42d7-857e-e455fb474253
⛔ Files ignored due to path filters (2)
Cargo.lockis excluded by!**/*.locktest-integration/Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (8)
Cargo.tomlmagicblock-replicator/Cargo.tomlmagicblock-replicator/src/connection.rsmagicblock-replicator/src/error.rsmagicblock-replicator/src/lib.rsmagicblock-replicator/src/proto.rsmagicblock-replicator/src/tcp.rsmagicblock-replicator/src/tests.rs
| Poll::Ready(None) => { | ||
| Poll::Ready(Some(Err(Error::ConnectionClosed))) | ||
| } |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
find . -name "connection.rs" -path "*/magicblock-replicator/*" | head -5Repository: magicblock-labs/magicblock-validator
Length of output: 121
🏁 Script executed:
git ls-files | grep -E "(magicblock-replicator|connection\.rs)" | head -20Repository: magicblock-labs/magicblock-validator
Length of output: 448
🏁 Script executed:
# Get the full path and check if it exists
fd -t f "connection.rs" | grep replicatorRepository: magicblock-labs/magicblock-validator
Length of output: 119
🏁 Script executed:
cat -n magicblock-replicator/src/connection.rs | head -100Repository: magicblock-labs/magicblock-validator
Length of output: 3624
🏁 Script executed:
rg "Receiver<" magicblock-replicator/src/ -A 3 -B 1Repository: magicblock-labs/magicblock-validator
Length of output: 1355
🏁 Script executed:
# Search for consumers of the Stream implementation
rg "while let Some" magicblock-replicator/src/ -A 2 -B 1Repository: magicblock-labs/magicblock-validator
Length of output: 62
🏁 Script executed:
# Check tests to see expected behavior
cat -n magicblock-replicator/src/tests.rs | head -100Repository: magicblock-labs/magicblock-validator
Length of output: 3722
🏁 Script executed:
rg "select\!|select|while.*stream|next\(\)" magicblock-replicator/src/ -A 2 -B 1Repository: magicblock-labs/magicblock-validator
Length of output: 190
🏁 Script executed:
# Search for actual usage of Receiver in business logic
rg "Receiver" magicblock-replicator/src/ --type rust -A 5 -B 1 | head -60Repository: magicblock-labs/magicblock-validator
Length of output: 2404
🏁 Script executed:
# Look for patterns that might use this stream
rg "poll_next|StreamExt" magicblock-replicator/src/ -A 2 -B 1Repository: magicblock-labs/magicblock-validator
Length of output: 990
🏁 Script executed:
# Search for where TcpReceiver is used
rg "TcpReceiver|receiver" magicblock-replicator/src/ -A 3 -B 1 | grep -v "Binary"Repository: magicblock-labs/magicblock-validator
Length of output: 1035
🏁 Script executed:
# Look for actual message consumption patterns
find magicblock-replicator -name "*.rs" -exec cat -n {} \; | grep -A 10 "recv\|next"Repository: magicblock-labs/magicblock-validator
Length of output: 3171
🏁 Script executed:
# Search across entire codebase for Stream consumer patterns with Receiver
rg "\.next\(\)|select\!" --type rust -A 2 -B 1 | head -100Repository: magicblock-labs/magicblock-validator
Length of output: 182
🏁 Script executed:
# Search for Stream usage patterns - simpler regex
rg "\.next\(\)" --type rust -A 2 -B 1 | head -100Repository: magicblock-labs/magicblock-validator
Length of output: 8406
🏁 Script executed:
# Search for select! macro usage
rg "select!" --type rust -A 3 -B 1 | head -100Repository: magicblock-labs/magicblock-validator
Length of output: 7810
🏁 Script executed:
# Check if the Stream trait is actually used anywhere (not just recv())
rg "for.*in.*receiver|while.*receiver|stream\." magicblock-replicator/src/ -A 2 -B 1Repository: magicblock-labs/magicblock-validator
Length of output: 612
🏁 Script executed:
# Look at the error enum to understand ConnectionClosed
cat -n magicblock-replicator/src/error.rsRepository: magicblock-labs/magicblock-validator
Length of output: 559
🏁 Script executed:
# Check if there are any actual consumers of the Stream trait on Receiver
rg "stream\(" magicblock-replicator/src/ -A 3 -B 1Repository: magicblock-labs/magicblock-validator
Length of output: 496
🏁 Script executed:
# Look for any use of StreamExt methods on Receiver types
rg "StreamExt|\.take\(|\.filter|\.map.*receiver" magicblock-replicator/src/ -A 2 -B 1Repository: magicblock-labs/magicblock-validator
Length of output: 378
🌐 Web query:
Rust futures Stream trait poll_next EOF None contract semantics
💡 Result:
In Rust’s futures::Stream, poll_next returns Poll<Option<Item>>, where Poll::Ready(None) is the “end of stream” / exhausted signal (analogous to iterator EOF). (docs.rs)
Contract after Ready(None)
- The trait contract says: once
poll_nextreturnsPoll::Ready(None), the stream is terminated andpoll_nextshould not be invoked again. (docs.rs) - If you do poll again anyway, the
Streamtrait does not define what happens (it may panic, block forever, or even produce more items), but it still must not cause Rust UB. (docs.rs)
How to get “EOF stays EOF” semantics
- Use
StreamExt::fuse()/Fuse, which guarantees that after the underlying stream ends, subsequent polls keep returningNoneforever (well-defined post-EOF behavior). (people.eecs.berkeley.edu) - A “fused” stream is often described/marked via
FusedStream(and you can checkis_terminated()), which is specifically about tracking when the stream should no longer be polled. (docs.rs)
Important caveat
Some types implement Stream but document that None can mean “currently empty” rather than “permanently finished” (e.g., FuturesOrdered can return Ready(None) when it has no futures, then later produce items after you push more). In those cases, treat None according to that type’s docs, or wrap in fuse() if you need strict EOF semantics. (docs.subsocial.network)
Citations:
- 1: https://docs.rs/futures-util/latest/futures_util/stream/trait.Stream.html?utm_source=openai
- 2: https://docs.rs/futures-util/latest/futures_util/stream/trait.Stream.html?utm_source=openai
- 3: https://docs.rs/futures-util/latest/futures_util/stream/trait.Stream.html?utm_source=openai
- 4: https://people.eecs.berkeley.edu/~pschafhalter/pub/erdos/doc/futures_util/stream/trait.StreamExt.html?utm_source=openai
- 5: https://docs.rs/futures-util/latest/futures_util/stream/trait.FusedStream.html?utm_source=openai
- 6: https://docs.subsocial.network/rust-docs/latest/futures_util/stream/struct.FuturesOrdered.html?utm_source=openai
Receiver stream must return None on EOF to comply with Stream trait contract.
Returning Some(Err(ConnectionClosed)) when the inner stream ends violates the Stream trait contract, which requires poll_next to signal termination with Ready(None). This prevents proper stream termination and can cause while let Some(...) consumers to loop indefinitely on disconnect.
♻️ Proposed fix
- Poll::Ready(None) => {
- Poll::Ready(Some(Err(Error::ConnectionClosed)))
- }
+ Poll::Ready(None) => Poll::Ready(None),🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@magicblock-replicator/src/connection.rs` around lines 80 - 82, The Receiver
stream currently maps the inner stream's Poll::Ready(None) to
Poll::Ready(Some(Err(Error::ConnectionClosed))), violating Stream's contract;
update the poll_next implementation in the Receiver (the impl Stream for
Receiver / poll_next method) so that when the inner stream returns
Poll::Ready(None) it returns Poll::Ready(None) (i.e., signal EOF) instead of
emitting a ConnectionClosed error, preserving proper termination semantics for
consumers.
66c7f58 to
7a1d2ff
Compare
There was a problem hiding this comment.
Actionable comments posted: 2
♻️ Duplicate comments (1)
magicblock-replicator/src/connection.rs (1)
80-82:⚠️ Potential issue | 🟠 MajorReturn
Poll::Ready(None)on EOF inStream::poll_next.Mapping EOF to
Some(Err(ConnectionClosed))prevents proper stream termination and can keep stream consumers looping indefinitely after disconnect.Suggested fix
- Poll::Ready(None) => { - Poll::Ready(Some(Err(Error::ConnectionClosed))) - } + Poll::Ready(None) => Poll::Ready(None),🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@magicblock-replicator/src/connection.rs` around lines 80 - 82, In the Stream::poll_next implementation, change the EOF handling so it returns Poll::Ready(None) instead of Poll::Ready(Some(Err(Error::ConnectionClosed))); locate the match arm that currently maps Poll::Ready(None) => Poll::Ready(Some(Err(Error::ConnectionClosed))) and replace it with Poll::Ready(None) so the stream terminates cleanly on EOF while preserving other error paths that should still return Some(Err(...)).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@magicblock-replicator/src/proto.rs`:
- Around line 23-30: The Message enum is missing the required Heartbeat variant
and currently contains Failover which diverges from the protocol; update the
enum definition (Message) to include Heartbeat(Heartbeat) and remove the
Failover(FailoverSignal) variant unless Failover is part of the spec, and ensure
the corresponding Heartbeat type/name is imported/defined so the protocol
matches the required contract.
- Around line 3-5: The protocol currently relies on bincode's implicit enum
discriminants for Message; change the wire format to prefix an explicit u8
message-type byte before the bincode payload and keep the existing 4-byte LE
length prefix logic. Update serde/serialization code paths that read/write
messages (the encoder/decoder that handle the 4-byte length and
bincode(Message)) to: write length = 1 + payload_len, then write a single u8
type tag mapped from the Message enum variants, followed by the bincode payload;
and on read, use the length to read the frame, read the u8 type tag first and
dispatch/deserialize the remainder into the corresponding Message variant.
Ensure the Message-to-u8 mapping and the inverse dispatch logic are defined
(e.g., a small match on Message variants and a match to construct/deserialize
variants) so the wire format no longer depends on bincode's internal
discriminant ordering.
---
Duplicate comments:
In `@magicblock-replicator/src/connection.rs`:
- Around line 80-82: In the Stream::poll_next implementation, change the EOF
handling so it returns Poll::Ready(None) instead of
Poll::Ready(Some(Err(Error::ConnectionClosed))); locate the match arm that
currently maps Poll::Ready(None) =>
Poll::Ready(Some(Err(Error::ConnectionClosed))) and replace it with
Poll::Ready(None) so the stream terminates cleanly on EOF while preserving other
error paths that should still return Some(Err(...)).
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: ASSERTIVE
Plan: Pro
Run ID: b72f2e54-f6b2-4403-9775-3af27b6b5f55
⛔ Files ignored due to path filters (2)
Cargo.lockis excluded by!**/*.locktest-integration/Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (8)
Cargo.tomlmagicblock-replicator/Cargo.tomlmagicblock-replicator/src/connection.rsmagicblock-replicator/src/error.rsmagicblock-replicator/src/lib.rsmagicblock-replicator/src/proto.rsmagicblock-replicator/src/tcp.rsmagicblock-replicator/src/tests.rs
| //! Wire format: 4-byte LE length prefix + bincode payload. | ||
| //! Bincode encodes enum variant index as implicit type tag. | ||
|
|
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial
🧩 Analysis chain
🏁 Script executed:
cat -n magicblock-replicator/src/proto.rs | head -40Repository: magicblock-labs/magicblock-validator
Length of output: 1450
🏁 Script executed:
rg -A 5 "bincode|serialize|deserialize" magicblock-replicator/src/proto.rsRepository: magicblock-labs/magicblock-validator
Length of output: 1766
🏁 Script executed:
rg "Message::" magicblock-replicator/src/ -A 3 | head -50Repository: magicblock-labs/magicblock-validator
Length of output: 3044
🏁 Script executed:
find magicblock-replicator/src -name "*.rs" | xargs grep -l "bincode\|serialize" | head -10Repository: magicblock-labs/magicblock-validator
Length of output: 257
🏁 Script executed:
cat -n magicblock-replicator/src/connection.rsRepository: magicblock-labs/magicblock-validator
Length of output: 3712
Consider adding explicit message type identifier to wire format instead of relying on bincode's implicit enum discriminants.
The current implementation uses bincode's implicit enum encoding for the Message type. Adding an explicit u8 type byte before the serialized payload would:
- Decouple the protocol from bincode's internal enum representation
- Enable clearer message type identification during debugging and protocol analysis
- Improve forward compatibility if serializer implementation changes
This is an architectural improvement worth considering, though the current approach works reliably with bincode.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@magicblock-replicator/src/proto.rs` around lines 3 - 5, The protocol
currently relies on bincode's implicit enum discriminants for Message; change
the wire format to prefix an explicit u8 message-type byte before the bincode
payload and keep the existing 4-byte LE length prefix logic. Update
serde/serialization code paths that read/write messages (the encoder/decoder
that handle the 4-byte length and bincode(Message)) to: write length = 1 +
payload_len, then write a single u8 type tag mapped from the Message enum
variants, followed by the bincode payload; and on read, use the length to read
the frame, read the u8 type tag first and dispatch/deserialize the remainder
into the corresponding Message variant. Ensure the Message-to-u8 mapping and the
inverse dispatch logic are defined (e.g., a small match on Message variants and
a match to construct/deserialize variants) so the wire format no longer depends
on bincode's internal discriminant ordering.
| pub enum Message { | ||
| HandshakeReq(HandshakeRequest), | ||
| HandshakeResp(HandshakeResponse), | ||
| Transaction(Transaction), | ||
| Block(Block), | ||
| SuperBlock(SuperBlock), | ||
| Failover(FailoverSignal), | ||
| } |
There was a problem hiding this comment.
Heartbeat is missing from the protocol message set.
The linked objective for issue #951 requires Heartbeat as a defined message type. The current enum omits it and introduces Failover instead, which makes the implementation diverge from the stated protocol contract.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@magicblock-replicator/src/proto.rs` around lines 23 - 30, The Message enum is
missing the required Heartbeat variant and currently contains Failover which
diverges from the protocol; update the enum definition (Message) to include
Heartbeat(Heartbeat) and remove the Failover(FailoverSignal) variant unless
Failover is part of the spec, and ensure the corresponding Heartbeat type/name
is imported/defined so the protocol matches the required contract.
7a1d2ff to
0aca470
Compare
There was a problem hiding this comment.
♻️ Duplicate comments (3)
magicblock-replicator/src/connection.rs (1)
80-82:⚠️ Potential issue | 🟠 MajorReturn
Poll::Ready(None)on EOF inStreamimplementation.This branch should terminate the stream instead of yielding
ConnectionClosed; otherwisewhile let Some(...)consumers may never exit on disconnect.♻️ Suggested fix
- Poll::Ready(None) => { - Poll::Ready(Some(Err(Error::ConnectionClosed))) - } + Poll::Ready(None) => Poll::Ready(None),#!/bin/bash set -euo pipefail echo "Inspecting EOF branch in Receiver::poll_next..." nl -ba magicblock-replicator/src/connection.rs | sed -n '70,86p' echo echo "Searching for EOF mapping to ConnectionClosed..." rg -n 'Poll::Ready\(None\)|ConnectionClosed' magicblock-replicator/src/connection.rs -C 2Expected verification result: EOF path should be
Poll::Ready(None)and notPoll::Ready(Some(Err(Error::ConnectionClosed))).🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@magicblock-replicator/src/connection.rs` around lines 80 - 82, In the Stream implementation for Receiver (Receiver::poll_next) the EOF branch currently maps Poll::Ready(None) to Poll::Ready(Some(Err(Error::ConnectionClosed))); change this so the EOF case returns Poll::Ready(None) to properly terminate the stream instead of yielding a ConnectionClosed error; locate the match/branch in connection.rs that checks for Poll::Ready(None) and replace the wrapped Some(Err(Error::ConnectionClosed)) return with a plain Poll::Ready(None).magicblock-replicator/src/proto.rs (2)
23-30:⚠️ Potential issue | 🟠 Major
Heartbeatmessage type is missing from protocol enum.The issue objective lists
Heartbeatas a required wire message. Please add it (or update the objective/spec reference if requirements changed) before merging.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@magicblock-replicator/src/proto.rs` around lines 23 - 30, The Message enum is missing the Heartbeat variant required by the protocol; add a Heartbeat(Heartbeat) variant to the Message enum (the enum named Message in proto.rs) and ensure the Heartbeat type (struct/alias) is defined or imported into this module so serialization/deserialization and matches over Message compile; update any match statements or serialization logic that enumerate Message variants (e.g., handlers for HandshakeReq/Resp, Transaction, Block, SuperBlock, Failover) to handle the new Heartbeat variant accordingly.
3-5:⚠️ Potential issue | 🟠 MajorWire format still depends on bincode’s implicit enum discriminants, not explicit byte type IDs.
The linked protocol objective calls for byte-identified message types with a type prefix. Encoding
Messagedirectly via bincode keeps the type tag implicit and serializer-coupled.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@magicblock-replicator/src/proto.rs` around lines 3 - 5, The Message enum is currently serialized via bincode which uses implicit, serializer-specific discriminants; instead implement an explicit byte type-ID prefix to satisfy the protocol: assign each Message variant a stable u8 type id, update the Message (de)serialization paths to write/read one leading byte (the variant id) followed by the bincode-encoded payload of that variant (or no payload) and use that byte to dispatch decoding to the correct variant; update the corresponding serialize/deserialize helpers (e.g., the Message::encode / Message::decode or the read_message/write_message routines in proto.rs) to perform this two-step process and remove reliance on bincode’s implicit enum tag.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Duplicate comments:
In `@magicblock-replicator/src/connection.rs`:
- Around line 80-82: In the Stream implementation for Receiver
(Receiver::poll_next) the EOF branch currently maps Poll::Ready(None) to
Poll::Ready(Some(Err(Error::ConnectionClosed))); change this so the EOF case
returns Poll::Ready(None) to properly terminate the stream instead of yielding a
ConnectionClosed error; locate the match/branch in connection.rs that checks for
Poll::Ready(None) and replace the wrapped Some(Err(Error::ConnectionClosed))
return with a plain Poll::Ready(None).
In `@magicblock-replicator/src/proto.rs`:
- Around line 23-30: The Message enum is missing the Heartbeat variant required
by the protocol; add a Heartbeat(Heartbeat) variant to the Message enum (the
enum named Message in proto.rs) and ensure the Heartbeat type (struct/alias) is
defined or imported into this module so serialization/deserialization and
matches over Message compile; update any match statements or serialization logic
that enumerate Message variants (e.g., handlers for HandshakeReq/Resp,
Transaction, Block, SuperBlock, Failover) to handle the new Heartbeat variant
accordingly.
- Around line 3-5: The Message enum is currently serialized via bincode which
uses implicit, serializer-specific discriminants; instead implement an explicit
byte type-ID prefix to satisfy the protocol: assign each Message variant a
stable u8 type id, update the Message (de)serialization paths to write/read one
leading byte (the variant id) followed by the bincode-encoded payload of that
variant (or no payload) and use that byte to dispatch decoding to the correct
variant; update the corresponding serialize/deserialize helpers (e.g., the
Message::encode / Message::decode or the read_message/write_message routines in
proto.rs) to perform this two-step process and remove reliance on bincode’s
implicit enum tag.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: ASSERTIVE
Plan: Pro
Run ID: 44576bcc-6961-4eb6-85ce-bc7f45a82c6b
⛔ Files ignored due to path filters (2)
Cargo.lockis excluded by!**/*.locktest-integration/Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (8)
Cargo.tomlmagicblock-replicator/Cargo.tomlmagicblock-replicator/src/connection.rsmagicblock-replicator/src/error.rsmagicblock-replicator/src/lib.rsmagicblock-replicator/src/proto.rsmagicblock-replicator/src/tcp.rsmagicblock-replicator/src/tests.rs
0aca470 to
f025746
Compare
40fa0de to
ee85613
Compare

Summary
Adds a new magicblock-replicator crate, with minimal
replication protocol design. This should lay the groundwork
for the future replication service implementation.
Compatibility
Checklist
Summary by CodeRabbit
New Features
Tests
Chores