Skip to content

[fix,serialization] Fix FieldMeta status update and remove unnecessary copy and use recv_multipart(copy=False) by default#46

Merged
0oshowero0 merged 9 commits intoAscend:mainfrom
0oshowero0:zerocopy
Mar 19, 2026
Merged

[fix,serialization] Fix FieldMeta status update and remove unnecessary copy and use recv_multipart(copy=False) by default#46
0oshowero0 merged 9 commits intoAscend:mainfrom
0oshowero0:zerocopy

Conversation

@0oshowero0
Copy link
Copy Markdown
Collaborator

@0oshowero0 0oshowero0 commented Mar 16, 2026

  1. Use recv_multipart(copy=False) by default, which returns a writable memory object in zmq.Frame
  2. Remove redundant memory copy
  3. Fix bugs introduced in [fix,feat] Support MooncakeStore easy init #45 when dynamically update the FieldMeta status.

Scripts to validate:

import zmq
import torch
import numpy as np
import multiprocessing
import time

# Import your serialization and deserialization interfaces
# Assuming the code you just wrote is saved in serial_utils.py in the same directory
from transfer_queue.utils.serial_utils import encode, decode

def sender():
    """Sender process"""
    context = zmq.Context()
    socket = context.socket(zmq.PUSH)
    socket.bind("tcp://127.0.0.1:5557")
    
    # 1. Create data to send (containing a numpy array and a torch tensor)
    np_arr = np.ones((5, 5), dtype=np.float32)
    pt_tensor = torch.ones((5, 5), dtype=torch.float32)
    
    print(f"[Sender] Created NumPy Array, shape: {np_arr.shape}")
    print(f"[Sender] Created PyTorch Tensor, shape: {pt_tensor.shape}")
    
    # 2. Assemble into a complex nested structure for testing
    payload = {
        "metadata": {"version": "1.0", "description": "test zero-copy"},
        "data_np": np_arr,
        "data_pt": pt_tensor
    }
    
    # 3. Call your encode interface
    # encode returns a list[bytes], the first frame is msgpack, and subsequent frames are underlying memory views
    frames = encode(payload)
    print(f"[Sender] Serialization complete, generated {len(frames)} frames.")
    
    # 4. Send using multipart + zero-copy
    socket.send_multipart(frames, copy=False)
    print("[Sender] Data sending complete.")
    
    time.sleep(1) # Wait for receiver to process
    socket.close()
    context.term()

def receiver():
    """Receiver process"""
    context = zmq.Context()
    socket = context.socket(zmq.PULL)
    socket.connect("tcp://127.0.0.1:5557")
    
    # 1. Receive multiple frames with zero-copy
    # copy=False will make the returned result a list[zmq.Frame]
    frames = socket.recv_multipart(copy=False)
    print(f"\n[Receiver] Received {len(frames)} frames.")
    print(f"[Receiver] Frame types: {[type(f) for f in frames]}")
    
    # 2. Call your decode interface to deserialize
    # At this point, the passed frames are a list of zmq.Frame
    payload = decode(frames)
    
    recv_np = payload["data_np"]
    recv_pt = payload["data_pt"]
    
    print("\n--- Verify NumPy ---")
    print(f"[Receiver] NumPy object type: {type(recv_np)}, dtype: {recv_np.dtype}")
    print(f"[Receiver] Is NumPy memory writeable: {recv_np.flags.writeable}")
    try:
        recv_np[0, 0] = 99.0
        print(f"[Receiver] ✅ NumPy write successful! recv_np[0, 0] = {recv_np[0, 0]}")
    except Exception as e:
        print(f"[Receiver] ❌ NumPy write failed: {e}")
        
    print("\n--- Verify PyTorch Tensor ---")
    print(f"[Receiver] Tensor object type: {type(recv_pt)}, dtype: {recv_pt.dtype}")
    try:
        recv_pt[0, 0] = 88.0
        print(f"[Receiver] ✅ Tensor write successful! recv_pt[0, 0] = {recv_pt[0, 0].item()}")
    except Exception as e:
        print(f"[Receiver] ❌ Tensor write failed: {e}")

    socket.close()
    context.term()

if __name__ == '__main__':
    p_recv = multiprocessing.Process(target=receiver)
    p_send = multiprocessing.Process(target=sender)
    
    p_recv.start()
    time.sleep(0.5) # Ensure the receiver binds first
    p_send.start()
    
    p_send.join()
    p_recv.join()

Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
Copilot AI review requested due to automatic review settings March 16, 2026 09:06
@ascend-robot
Copy link
Copy Markdown

CLA Signature Pass

0oshowero0, thanks for your pull request. All authors of the commits have signed the CLA. 👍

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR updates TransferQueue’s ZMQ receive path to use recv_multipart(copy=False) more consistently and removes a redundant detaching copy for numpy data, with the goal of enabling true zero-copy deserialization (yielding writable buffers) and reducing memory overhead.

Changes:

  • Switch several ZMQ receive sites to recv_multipart(copy=False) so deserialization operates on zmq.Frame buffers.
  • Remove the numpy-specific .copy() detachment in AsyncSimpleStorageManager._pack_field_values and drop the now-unused numpy import.
  • Simplify AsyncSimpleStorageManager._get_from_single_storage_unit return shape and update affected tests/mocks.

Reviewed changes

Copilot reviewed 6 out of 6 changed files in this pull request and generated 2 comments.

Show a summary per file
File Description
transfer_queue/storage/managers/simple_backend_manager.py Removes numpy detachment copy in batching logic; adjusts get_data result scattering and recv_multipart(copy=False) usage.
transfer_queue/storage/managers/base.py Uses recv_multipart(copy=False) during controller handshake deserialization.
transfer_queue/controller.py Switches controller ROUTER sockets to recv_multipart(copy=False) for handshake/request/status loops.
tests/test_simple_storage_unit.py Updates test client to deserialize from recv_multipart(copy=False) frames.
tests/test_client.py Updates mock controller/storage sockets to receive frames with copy=False.
tests/test_async_simple_storage_manager.py Updates mocks/fixtures to match the simplified _get_from_single_storage_unit return tuple.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

# Use per-element .copy() instead of np.stack because string-dtype
# arrays may have heterogeneous shapes.
return NonTensorStack(*[v.copy() for v in values])
return NonTensorStack(*values)
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
@ascend-robot
Copy link
Copy Markdown

CLA Signature Pass

0oshowero0, thanks for your pull request. All authors of the commits have signed the CLA. 👍

Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
@ascend-robot
Copy link
Copy Markdown

CLA Signature Pass

0oshowero0, thanks for your pull request. All authors of the commits have signed the CLA. 👍

Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
@ascend-robot
Copy link
Copy Markdown

CLA Signature Pass

0oshowero0, thanks for your pull request. All authors of the commits have signed the CLA. 👍

@0oshowero0 0oshowero0 changed the title [serialization] Remove unnecessary copy and use recv_multipart(copy=False) by default [fix,serialization] Remove unnecessary copy and use recv_multipart(copy=False) by default Mar 17, 2026
@0oshowero0 0oshowero0 requested a review from Copilot March 17, 2026 12:38
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR aims to improve ZMQ zero-copy behavior across the TransferQueue stack by switching to recv_multipart(copy=False) more broadly, removing redundant memory copies, and fixing dynamic FieldMeta update behavior introduced in #45.

Changes:

  • Switch multiple ZMQ receive paths to recv_multipart(copy=False) (including controller, managers, and tests).
  • Simplify/adjust Simple backend data retrieval + packing and remove a NumPy-specific copy path.
  • Refactor controller-side FieldMeta tracking to include global_indexes and update nested/per-sample shape handling.

Reviewed changes

Copilot reviewed 10 out of 10 changed files in this pull request and generated 4 comments.

Show a summary per file
File Description
transfer_queue/storage/managers/simple_backend_manager.py Removes NumPy copy path and updates multipart receive + get-path return structure.
transfer_queue/storage/managers/base.py Uses recv_multipart(copy=False) for handshake and mutates field_schema per-sample shapes before notifying controller.
transfer_queue/storage/clients/mooncake_client.py Fixes initialization by calling base KV client constructor.
transfer_queue/metadata.py Updates schema extraction logic (incl. 1D tensors) and adds empty-batch warnings.
transfer_queue/controller.py Refactors FieldMeta update/remove logic and updates controller receive paths to copy=False.
tests/test_simple_storage_unit.py Updates test sockets to receive with copy=False.
tests/test_metadata.py Removes an empty-batch is_non_tensor is None expectation.
tests/test_controller_data_partitions.py Updates tests for new per-sample-shapes representation and new FieldMeta behaviors.
tests/test_client.py Updates mock server receive paths to copy=False.
tests/test_async_simple_storage_manager.py Updates mocks/return shapes to match manager API changes.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

elif self.shape is not None and self.shape != new_shape:
new_is_nested = incoming.get("is_nested")
new_is_non_tensor = incoming.get("is_non_tensor")

Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>

fix

Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>

fix

Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
@ascend-robot
Copy link
Copy Markdown

CLA Signature Pass

0oshowero0, thanks for your pull request. All authors of the commits have signed the CLA. 👍

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR updates TransferQueue’s ZMQ serialization/data-path to default to recv_multipart(copy=False) for fewer copies, and refactors controller-side FieldMeta handling to better support dynamic field/nested-shape updates.

Changes:

  • Switch multiple ZMQ receive sites to recv_multipart(copy=False) and adjust related call sites/tests.
  • Refactor controller FieldMeta tracking (including per-sample shape handling) and add/expand integration tests.
  • Adjust metadata extraction and simple-backend batching behavior to align with the new zero-copy path.

Reviewed changes

Copilot reviewed 10 out of 10 changed files in this pull request and generated 6 comments.

Show a summary per file
File Description
transfer_queue/storage/managers/simple_backend_manager.py Removes NumPy special-casing/copy in packing and updates get/clear response handling; adopts recv_multipart(copy=False) in clear.
transfer_queue/storage/managers/base.py Uses recv_multipart(copy=False) for handshake and adds per-sample-shape normalization before notifying controller.
transfer_queue/storage/clients/mooncake_client.py Ensures base client initialization via super().__init__(config).
transfer_queue/metadata.py Updates schema extraction (empty-batch behavior, stricter batch-dimension checks, 1D tensor handling) and no-op on empty add_fields.
transfer_queue/controller.py Refactors FieldMeta (global index tracking, update/remove logic) and switches controller sockets to recv_multipart(copy=False).
tests/test_simple_storage_unit.py Updates tests to receive ZMQ frames with copy=False.
tests/test_metadata.py Removes test case related to empty-batch is_non_tensor “unknown” behavior.
tests/test_controller_data_partitions.py Updates per-sample-shape schema format expectations and adds extensive FieldMeta integration coverage.
tests/test_client.py Updates mock controller recv to copy=False.
tests/test_async_simple_storage_manager.py Aligns mocked return signatures with updated manager behavior.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +172 to +173
logger.warning(f"Receiving 1D tensor for field '{field_name}'. Unsqueeze the last dimension.")
value = value.unsqueeze(-1)
Comment on lines +237 to +258
new_is_nested = incoming.get("is_nested")
new_is_non_tensor = incoming.get("is_non_tensor")

if new_is_nested:
new_per_sample_shapes = incoming.get("per_sample_shapes", None)
if new_per_sample_shapes is None:
raise ValueError("Receiving a nested field without 'per_sample_shapes'!")
if self.is_nested is not None and not self.is_nested:
# new input is nested, but original is regular tensor.
# We need to write old shape into per_sample_shampes
assert self.shape is not None
for gi in self.global_indexes:
self.per_sample_shapes[gi] = self.shape
self.is_nested = True
self.shape = None

# explicit is_nested flag overrides inference
if incoming.get("is_nested"):
self.is_nested = True
self.shape = None
# Update newly provided per_sample_shapes
self.per_sample_shapes.update(new_per_sample_shapes)

else:
if not new_is_non_tensor:
# newly input is regular tensor
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
@ascend-robot
Copy link
Copy Markdown

CLA Signature Pass

0oshowero0, thanks for your pull request. All authors of the commits have signed the CLA. 👍

@0oshowero0 0oshowero0 changed the title [fix,serialization] Remove unnecessary copy and use recv_multipart(copy=False) by default [fix,serialization] Fix FieldMeta status update and remove unnecessary copy and use recv_multipart(copy=False) by default Mar 18, 2026
@0oshowero0 0oshowero0 merged commit 44c683b into Ascend:main Mar 19, 2026
5 checks passed
@0oshowero0 0oshowero0 deleted the zerocopy branch April 2, 2026 07:22
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants