-
Notifications
You must be signed in to change notification settings - Fork 0
Add benchmarks for different execution plans and Arrow IPC serde #1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
| pub fn deserialize_from_ipc(data: &[u8]) -> (SchemaRef, Vec<RecordBatch>) { | ||
| let cursor = Cursor::new(data); | ||
| let reader = StreamReader::try_new(cursor, None).unwrap(); | ||
| let schema = reader.schema(); | ||
| let batches: Vec<RecordBatch> = reader.map(|r| r.expect("Failed to read batch")).collect(); | ||
| (schema, batches) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we try zero-copy IPC as well? See https://github.com/apache/arrow-rs/blob/main/arrow/examples/zero_copy_ipc.rs
Although that example uses files mmaped, it should work in the same way with any vec of bytes.
The reason for this is that if we change the protocol to directly send arrow blocks, we are going to buffer them in memory and once we have them we should be able to directly use them without copying into new RecordBatches.
We should also be able to do the same when writing, although I didn't find an example on how to do it.
Notice a real implementation would need to receive/send these bytes through the network, but we are also ignoring that cost in the Java version
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've been investigating and it looks we cannot serialize in zero-copy fashion, so I created #2, which uses zero-copy deser.
The serialization is not that important given we will have to copy the structure anyway in order to send partitions when using that kind of exchanges. In case we end up committing to datafusion, we will need to think whether we want to use the streaming or the file IPC format
| /// 4. **full_pipeline**: Complete deser + filter + output serialization | ||
| /// - Real-world end-to-end latency including result serialization | ||
| /// - Relevant for scenarios where results are sent over network |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unless we implement zero-copy reads in bench_utils, this is an unfair comparison to Java, as in Java we are using zero-copy (SerializedDataBlock is a wrapper on top of a DataBlock which just points to the original ByteBuffer). It is true that later in the Java pipeline we always convert SerializedDataBlock into heap blocks, but that is one of the inefficiencies current MSE has that we can remove with DataFusion.
We should either remove the deserialization time from this benchmark or implement zero-copy reads.
| /// - Relevant for scenarios where results are sent over network | ||
| fn bench_filter(c: &mut Criterion) { | ||
| // Create a Tokio runtime for async execution | ||
| let rt = Runtime::new().unwrap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This may be unfair because this runtime can use multiple threads, whereas Java uses only one.
Probably this isn't actually important because the pipeline we create uses a single partition and therefore probably a single thread, but I think it would be better to only use the original carrier thread.
* Try to use zero copy deser * Try string and byte views * Actual zero copy deser/serde * Use zero copy on benchmarks * Remove incorrect docs and one unused fun
…startreedata/datafusion into serde-and-physical-plan-exec-benchmarks
cargo bench --bench filter_bench -p datafusion-physical-plan.