[fix,serialization] Fix FieldMeta status update and remove unnecessary copy and use recv_multipart(copy=False) by default#46
Conversation
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
CLA Signature Pass0oshowero0, thanks for your pull request. All authors of the commits have signed the CLA. 👍 |
There was a problem hiding this comment.
Pull request overview
This PR updates TransferQueue’s ZMQ receive path to use recv_multipart(copy=False) more consistently and removes a redundant detaching copy for numpy data, with the goal of enabling true zero-copy deserialization (yielding writable buffers) and reducing memory overhead.
Changes:
- Switch several ZMQ receive sites to
recv_multipart(copy=False)so deserialization operates onzmq.Framebuffers. - Remove the numpy-specific
.copy()detachment inAsyncSimpleStorageManager._pack_field_valuesand drop the now-unused numpy import. - Simplify
AsyncSimpleStorageManager._get_from_single_storage_unitreturn shape and update affected tests/mocks.
Reviewed changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
transfer_queue/storage/managers/simple_backend_manager.py |
Removes numpy detachment copy in batching logic; adjusts get_data result scattering and recv_multipart(copy=False) usage. |
transfer_queue/storage/managers/base.py |
Uses recv_multipart(copy=False) during controller handshake deserialization. |
transfer_queue/controller.py |
Switches controller ROUTER sockets to recv_multipart(copy=False) for handshake/request/status loops. |
tests/test_simple_storage_unit.py |
Updates test client to deserialize from recv_multipart(copy=False) frames. |
tests/test_client.py |
Updates mock controller/storage sockets to receive frames with copy=False. |
tests/test_async_simple_storage_manager.py |
Updates mocks/fixtures to match the simplified _get_from_single_storage_unit return tuple. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| # Use per-element .copy() instead of np.stack because string-dtype | ||
| # arrays may have heterogeneous shapes. | ||
| return NonTensorStack(*[v.copy() for v in values]) | ||
| return NonTensorStack(*values) |
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
CLA Signature Pass0oshowero0, thanks for your pull request. All authors of the commits have signed the CLA. 👍 |
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
CLA Signature Pass0oshowero0, thanks for your pull request. All authors of the commits have signed the CLA. 👍 |
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
CLA Signature Pass0oshowero0, thanks for your pull request. All authors of the commits have signed the CLA. 👍 |
recv_multipart(copy=False) by defaultrecv_multipart(copy=False) by default
There was a problem hiding this comment.
Pull request overview
This PR aims to improve ZMQ zero-copy behavior across the TransferQueue stack by switching to recv_multipart(copy=False) more broadly, removing redundant memory copies, and fixing dynamic FieldMeta update behavior introduced in #45.
Changes:
- Switch multiple ZMQ receive paths to
recv_multipart(copy=False)(including controller, managers, and tests). - Simplify/adjust Simple backend data retrieval + packing and remove a NumPy-specific copy path.
- Refactor controller-side
FieldMetatracking to includeglobal_indexesand update nested/per-sample shape handling.
Reviewed changes
Copilot reviewed 10 out of 10 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| transfer_queue/storage/managers/simple_backend_manager.py | Removes NumPy copy path and updates multipart receive + get-path return structure. |
| transfer_queue/storage/managers/base.py | Uses recv_multipart(copy=False) for handshake and mutates field_schema per-sample shapes before notifying controller. |
| transfer_queue/storage/clients/mooncake_client.py | Fixes initialization by calling base KV client constructor. |
| transfer_queue/metadata.py | Updates schema extraction logic (incl. 1D tensors) and adds empty-batch warnings. |
| transfer_queue/controller.py | Refactors FieldMeta update/remove logic and updates controller receive paths to copy=False. |
| tests/test_simple_storage_unit.py | Updates test sockets to receive with copy=False. |
| tests/test_metadata.py | Removes an empty-batch is_non_tensor is None expectation. |
| tests/test_controller_data_partitions.py | Updates tests for new per-sample-shapes representation and new FieldMeta behaviors. |
| tests/test_client.py | Updates mock server receive paths to copy=False. |
| tests/test_async_simple_storage_manager.py | Updates mocks/return shapes to match manager API changes. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| elif self.shape is not None and self.shape != new_shape: | ||
| new_is_nested = incoming.get("is_nested") | ||
| new_is_non_tensor = incoming.get("is_non_tensor") | ||
|
|
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com> fix Signed-off-by: 0oshowero0 <o0shower0o@outlook.com> fix Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
CLA Signature Pass0oshowero0, thanks for your pull request. All authors of the commits have signed the CLA. 👍 |
There was a problem hiding this comment.
Pull request overview
This PR updates TransferQueue’s ZMQ serialization/data-path to default to recv_multipart(copy=False) for fewer copies, and refactors controller-side FieldMeta handling to better support dynamic field/nested-shape updates.
Changes:
- Switch multiple ZMQ receive sites to
recv_multipart(copy=False)and adjust related call sites/tests. - Refactor controller
FieldMetatracking (including per-sample shape handling) and add/expand integration tests. - Adjust metadata extraction and simple-backend batching behavior to align with the new zero-copy path.
Reviewed changes
Copilot reviewed 10 out of 10 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
| transfer_queue/storage/managers/simple_backend_manager.py | Removes NumPy special-casing/copy in packing and updates get/clear response handling; adopts recv_multipart(copy=False) in clear. |
| transfer_queue/storage/managers/base.py | Uses recv_multipart(copy=False) for handshake and adds per-sample-shape normalization before notifying controller. |
| transfer_queue/storage/clients/mooncake_client.py | Ensures base client initialization via super().__init__(config). |
| transfer_queue/metadata.py | Updates schema extraction (empty-batch behavior, stricter batch-dimension checks, 1D tensor handling) and no-op on empty add_fields. |
| transfer_queue/controller.py | Refactors FieldMeta (global index tracking, update/remove logic) and switches controller sockets to recv_multipart(copy=False). |
| tests/test_simple_storage_unit.py | Updates tests to receive ZMQ frames with copy=False. |
| tests/test_metadata.py | Removes test case related to empty-batch is_non_tensor “unknown” behavior. |
| tests/test_controller_data_partitions.py | Updates per-sample-shape schema format expectations and adds extensive FieldMeta integration coverage. |
| tests/test_client.py | Updates mock controller recv to copy=False. |
| tests/test_async_simple_storage_manager.py | Aligns mocked return signatures with updated manager behavior. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| logger.warning(f"Receiving 1D tensor for field '{field_name}'. Unsqueeze the last dimension.") | ||
| value = value.unsqueeze(-1) |
| new_is_nested = incoming.get("is_nested") | ||
| new_is_non_tensor = incoming.get("is_non_tensor") | ||
|
|
||
| if new_is_nested: | ||
| new_per_sample_shapes = incoming.get("per_sample_shapes", None) | ||
| if new_per_sample_shapes is None: | ||
| raise ValueError("Receiving a nested field without 'per_sample_shapes'!") | ||
| if self.is_nested is not None and not self.is_nested: | ||
| # new input is nested, but original is regular tensor. | ||
| # We need to write old shape into per_sample_shampes | ||
| assert self.shape is not None | ||
| for gi in self.global_indexes: | ||
| self.per_sample_shapes[gi] = self.shape | ||
| self.is_nested = True | ||
| self.shape = None | ||
|
|
||
| # explicit is_nested flag overrides inference | ||
| if incoming.get("is_nested"): | ||
| self.is_nested = True | ||
| self.shape = None | ||
| # Update newly provided per_sample_shapes | ||
| self.per_sample_shapes.update(new_per_sample_shapes) | ||
|
|
||
| else: | ||
| if not new_is_non_tensor: | ||
| # newly input is regular tensor |
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
CLA Signature Pass0oshowero0, thanks for your pull request. All authors of the commits have signed the CLA. 👍 |
recv_multipart(copy=False) by defaultFieldMeta status update and remove unnecessary copy and use recv_multipart(copy=False) by default
recv_multipart(copy=False)by default, which returns a writable memory object inzmq.FrameMooncakeStoreeasy init #45 when dynamically update theFieldMetastatus.Scripts to validate: