Skip to content

Conversation

@yashmayya
Copy link
Collaborator

@yashmayya yashmayya commented Jan 22, 2026

@gortiz gortiz self-requested a review January 22, 2026 07:15
Comment on lines 254 to 260
pub fn deserialize_from_ipc(data: &[u8]) -> (SchemaRef, Vec<RecordBatch>) {
let cursor = Cursor::new(data);
let reader = StreamReader::try_new(cursor, None).unwrap();
let schema = reader.schema();
let batches: Vec<RecordBatch> = reader.map(|r| r.expect("Failed to read batch")).collect();
(schema, batches)
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we try zero-copy IPC as well? See https://github.com/apache/arrow-rs/blob/main/arrow/examples/zero_copy_ipc.rs

Although that example uses files mmaped, it should work in the same way with any vec of bytes.

The reason for this is that if we change the protocol to directly send arrow blocks, we are going to buffer them in memory and once we have them we should be able to directly use them without copying into new RecordBatches.

We should also be able to do the same when writing, although I didn't find an example on how to do it.

Notice a real implementation would need to receive/send these bytes through the network, but we are also ignoring that cost in the Java version

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've been investigating and it looks we cannot serialize in zero-copy fashion, so I created #2, which uses zero-copy deser.

The serialization is not that important given we will have to copy the structure anyway in order to send partitions when using that kind of exchanges. In case we end up committing to datafusion, we will need to think whether we want to use the streaming or the file IPC format

Comment on lines +118 to +120
/// 4. **full_pipeline**: Complete deser + filter + output serialization
/// - Real-world end-to-end latency including result serialization
/// - Relevant for scenarios where results are sent over network
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unless we implement zero-copy reads in bench_utils, this is an unfair comparison to Java, as in Java we are using zero-copy (SerializedDataBlock is a wrapper on top of a DataBlock which just points to the original ByteBuffer). It is true that later in the Java pipeline we always convert SerializedDataBlock into heap blocks, but that is one of the inefficiencies current MSE has that we can remove with DataFusion.

We should either remove the deserialization time from this benchmark or implement zero-copy reads.

/// - Relevant for scenarios where results are sent over network
fn bench_filter(c: &mut Criterion) {
// Create a Tokio runtime for async execution
let rt = Runtime::new().unwrap();
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This may be unfair because this runtime can use multiple threads, whereas Java uses only one.

Probably this isn't actually important because the pipeline we create uses a single partition and therefore probably a single thread, but I think it would be better to only use the original carrier thread.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants