feat: Bitswap improvements for Kubo compatibility#1321
Conversation
…trieval process for compatibility wth kubo
…compatibility with ipfs kubo - Added support for signed records in the DHT by introducing `make_signed_put_record` function. - Updated `ValueStore` to create signed records when storing values. - Enhanced `Envelope` class to handle raw payload types for peer records. - Introduced utility functions for signing and verifying DHT records. - Updated protobuf definitions to include author and signature fields in records. - Improved logging and debug messages for better traceability.
…ce DAG-PB encoding Co-authored-by: Copilot <copilot@github.com>
… layout Co-authored-by: Copilot <copilot@github.com>
… in MerkleDag Co-authored-by: Copilot <copilot@github.com>
…s and implement add_stream method in MerkleDag for handling io.IOBase streams Co-authored-by: Copilot <copilot@github.com>
…improve chunk_stream documentation, and add Wantlist functionality Co-authored-by: Copilot <copilot@github.com>
- Introduced `test_block_service.py` to validate BlockService behavior including local hits, network fetches, auto-caching, and block storage. - Created `test_filesystem_blockstore.py` to manually test FilesystemBlockStore for basic operations, persistence, and directory structure. - Added `test_io_stream.py` to verify io.IOBase input support with chunk_stream and MerkleDag.add_stream functionalities. - Implemented `test_unixfs_encoding.py` to ensure add_file and add_bytes produce dag-pb leaf blocks and validate balanced layout tree structures. - Developed `test_wantlist.py` to test Wantlist and Message dataclasses, including backward compatibility and public API exports.
- Updated type hints in `make_service` function to allow for None. - Specified type hints for lists of bytes in block retrieval tests. - Added assertions to check for non-null `unixfs` in various tests to ensure proper decoding of DAG PB blocks. - Enhanced type hints for observer and subscriber peers in Gossipsub tests. - Improved type hints for candidate lists in opportunistic grafting tests. - Added type ignore comments for factory Meta classes to suppress type checker warnings. - Updated import statements for ID to include type ignore comments in interop utilities.
475086a to
6acceb2
Compare
…handling, and update tests for dag-pb leaf blocks Co-authored-by: Copilot <copilot@github.com>
…ndling in DAG fetching Co-authored-by: Copilot <copilot@github.com>
… and MerkleDag implementations
…leDag Co-authored-by: Copilot <copilot@github.com>
…aching in Bitswap Co-authored-by: Copilot <copilot@github.com>
…st files and factories
Co-authored-by: Copilot <copilot@github.com>
Co-authored-by: Copilot <copilot@github.com>
Co-authored-by: Copilot <copilot@github.com>
There was a problem hiding this comment.
@sumanjeet0012 Strong PR overall — the encoding root-causes are correct and well-tested (verified locally: 238/238 bitswap tests pass, wire format confirmed Links-before-Data). The PR description is exemplary.
Leaving a few comments inline. Three I'd flag as blocking before merge:
- Breaking API change in
add_filedefault (wrap_with_directory=True) — old callers get a directory CID instead of a file CID for the same call. Either revert the default or call this out loudly in the newsfragment. verify_recordonly handles Ed25519 — silently fails for RSA/Secp256k1 peers, breaking DHT interop with non-Ed25519 nodes.DEFAULT_CHUNK_SIZE = 63 KiB - 32doesn't match Kubo's 256 KiB default — files added by py-libp2p won't have the same root CID asipfs add file.binunless Kubo is told--chunker=size-65504. The PR header claims "Kubo CID compatibility"; this caveat needs to be documented.
Medium-priority items (perf and hygiene) inline. Everything else is comfortable post-merge cleanup.
Nice work on the manual DAG-PB outer envelope — limiting hand-rolling to 0x12 <varlen> <linkbytes> then 0x0a <varlen> <unixfs> while still using protobuf for the inner messages is the minimal, correct approach.
| @@ -187,7 +272,7 @@ async def add_file( | |||
|
|
|||
| dir_data = create_directory_node([(filename, cid, file_size)]) | |||
| dir_cid = compute_cid_v1(dir_data, codec=CODEC_DAG_PB) | |||
There was a problem hiding this comment.
Breaking API change (re: the wrap_with_directory=True default a few lines up at the parameter decl): defaulting this to True silently changes the behavior of add_file(path) for every existing caller — they'll now get a directory CID where they previously got a file CID, and fetch_file returns a (bytes, filename) tuple where it returned bytes before. This is invisible in the new tests because they all pass wrap_with_directory=False.
Suggest defaulting to False for back-compat, or version-bump and add a clear migration note to newsfragments/1321.feature.rst (currently doesn't flag this as breaking).
|
|
||
| """ | ||
| try: | ||
| public_key = Ed25519PublicKey.from_bytes(author_public_key) |
There was a problem hiding this comment.
Interop bug for non-Ed25519 peers. This hard-codes Ed25519 deserialization, so DHT records signed by RSA or Secp256k1 peers will silently fail verification (caught by the try/except and returned as False, indistinguishable from a genuinely invalid signature).
Compare with libp2p/peer/envelope.py:198-216 (pub_key_from_protobuf) which dispatches on KeyType for all three types. Either dispatch the same way here, or accept a PublicKey object and let the caller deserialize.
Kubo defaults to Ed25519 today, so this won't surface in basic interop tests, but it's a real correctness gap.
| DEFAULT_CHUNK_SIZE = 63 * 1024 | ||
| # 63 KB minus 32 bytes to leave room for the dag-pb leaf envelope overhead, | ||
| # ensuring wrapped blocks never exceed MAX_BLOCK_SIZE (63 * 1024). | ||
| DEFAULT_CHUNK_SIZE = 63 * 1024 - 32 |
There was a problem hiding this comment.
Mismatch with Kubo's default chunk size. Kubo's ipfs add uses size-262144 (256 KiB) by default. With this 63 KiB default, py-libp2p will produce a different root CID than ipfs add file.bin for the same content — contradicting the PR's "Kubo CID compatibility" headline.
Leaf CIDs match Kubo (because of RawLeaves=false + dag-pb wrapping), but the root over a multi-chunk file won't. Either:
- Document this clearly in
newsfragments/1321.feature.rst("CIDs matchipfs add --chunker=size-65504"), or - Match Kubo's 256 KiB default and split large messages at the wire layer instead of capping the chunk size.
| chunk = leaf_raw | ||
| logger.debug(f"[DAG] Leaf {idx + 1}: raw block {len(chunk)} bytes") | ||
|
|
||
| file_data += chunk |
There was a problem hiding this comment.
O(n²) bytes concat. file_data += chunk over potentially thousands of leaves means each += allocates a new bytes object and copies all previous data. For a 100 MB file at 63 KB chunks (~1626 leaves), this allocates roughly 80 GB of intermediate strings.
The fix is one line — accumulate into a bytearray (or list + b"".join(parts) at the end). This is the single biggest perf win available in the PR.
Same pattern in _read_message (client.py:1017-1047) and encode_dag_pb (dag_pb.py:125-149).
| await trio.to_thread.run_sync( | ||
| lambda: path.parent.mkdir(parents=True, exist_ok=True) | ||
| ) | ||
| await trio.to_thread.run_sync(path.write_bytes, data) |
There was a problem hiding this comment.
Non-atomic writes. path.write_bytes(data) writes in place. If the process crashes mid-write, the next startup finds a truncated file at a CID path. get_block will then return corrupted bytes that fail verification only if the caller checks.
Standard fix: write to path.with_suffix('.tmp'), then os.replace(tmp, path) — atomic on POSIX, durable on most filesystems.
| # Ensure the result is a plain dict (not a coroutine from a mock) | ||
| if isinstance(result, dict): | ||
| return result | ||
| except Exception: |
There was a problem hiding this comment.
Test infrastructure leaking into production code. The getattr probe + isinstance(result, dict) check + bare except Exception: pass is a workaround for MagicMock returning a coroutine object. This silently masks real failures in production.
Suggest defining a Protocol for the batch interface, typing self.bitswap with it, and removing the runtime probing entirely. Tests can then mock the protocol explicitly.
|
|
||
| # Send all CIDs in a single wantlist to the peer | ||
| if peer_id: | ||
| await self._send_wantlist_to_peer(peer_id, batch) |
There was a problem hiding this comment.
Swallowed exception in _send_wantlist_to_peer causes hangs. That helper (further down in this file) catches all exceptions, logs "Failed to send wantlist to peer", and returns. When called from this batch path, if host.new_stream or _write_message fails for one CID in the batch, the corresponding _pending_requests[cid].wait() below will block until trio.fail_after(timeout) cancels it — adding the full timeout (default 30s) to every per-batch failure.
Fix: propagate the failure from _send_wantlist_to_peer, or event.set() with a sentinel so the waiter can fail fast.
| # We manually construct the wire format to enforce the correct ordering. | ||
|
|
||
| # Add links | ||
| result = b"" |
There was a problem hiding this comment.
Minor: same O(n²) concat pattern as fetch_file reassembly. With 174 links on an internal node and large CIDs, each result += ... allocates a fresh bytes. Trivial fix: build into a bytearray and convert to bytes at return.
| yield chunk | ||
|
|
||
|
|
||
| def chunk_stream( |
There was a problem hiding this comment.
Nit: this new chunk_stream should also be added to the module's __all__ (further down in this file). Doesn't affect direct imports, but it breaks from chunker import * and confuses some IDE auto-import tools.
| ordered_leaf_cids: list[bytes] = [] | ||
|
|
||
| def _collect_leaves_local(cid_bytes: bytes, depth: int = 1) -> None: | ||
| """Traverse locally-fetched blocks to collect leaf CIDs.""" |
There was a problem hiding this comment.
Minor: _collect_leaves_local is unbounded recursion. For a balanced 174-fanout DAG over a 100 MB file the depth is ~2, so this never trips Python's default 1000-frame limit in practice. But a maliciously crafted DAG (e.g. a chain of single-link nodes) would crash with RecursionError. Convert to an explicit stack if you want to harden this.
PR #1321 AI Review1. Summary of Changes
2. Strengths
3. Issues FoundCritical
Major
Minor
4. Security Review
5. Documentation and Examples
6. Newsfragment Requirement
7. Tests and Validation
8. Recommendations for Improvement
9. Questions for the Author
10. Overall Assessment
|
… lookups and always send signed records. Co-authored-by: Copilot <copilot@github.com>
…rove unmarshal_public_key functionality
- Added PaymentGatedDecisionEngine to handle payment-required logic for block serving. - Introduced PaymentTerms, PaymentAuthorization, PaymentReceipt, and PaymentRejection messages in the new bitswap_1_3_0.proto. - Enhanced existing MerkleDag class to store internal nodes with a callback for payment gating. - Created BitswapPaymentClient_1_3 to manage client-side payment authorizations and receipts. - Updated balanced_layout function to support payment gating and internal node storage. - Added necessary protobuf definitions and generated Python files for Bitswap 1.3.0.
|
Thanks @sumanjeet0012, full review here: AI PR Review: #1321 — feat: Bitswap improvements for Kubo compatibilityPR: #1321 1. Summary of ChangesThis PR is a large feature change centered on Bitswap / Merkle DAG interoperability with Kubo (go-ipfs) and supporting infrastructure:
Related issue: #1321 (same title/body as the PR; tracks Kubo compatibility work). The PR body does not include an explicit Breaking / behavior changes (user-facing):
Modules touched: 2. Branch Sync Status and Merge ConflictsBranch sync status
Merge conflict analysis✅ No merge conflicts detected. Test merge of 3. Strengths
4. Issues FoundCriticalNone identified that would corrupt protocol state or introduce obvious remote code execution. CI docs job is failing (see §8) — treat as merge blocker until fixed. Major
async def add_file(
self,
file_path: str,
chunk_size: int | None = None,
progress_callback: Callable[[int, int, str], None] | None = None,
wrap_with_directory: bool = True,
) -> bytes:
# Default chunk size: 63 KB (py-libp2p accepts less than 64 KB)
# 63 KB minus 32 bytes to leave room for the dag-pb leaf envelope overhead,
# ensuring wrapped blocks never exceed MAX_BLOCK_SIZE (63 * 1024).
DEFAULT_CHUNK_SIZE = 63 * 1024 - 32
Minor
else:
chunk = leaf_raw
logger.debug(f"[DAG] Leaf {idx + 1}: raw block {len(chunk)} bytes")
file_data += chunk
bytes_fetched += len(chunk)
except Exception as e:
logger.error(f"Failed to send wantlist to peer {peer_id}: {e}")
Maintainer feedback status
5. Security Review
Overall security impact: Low (incremental hardening on DHT signing; no new high-risk surfaces identified). 6. Documentation and ExamplesPresent:
Gaps:
7. Newsfragment Requirement
Not a blocker on newsfragment presence alone; blocker if breaking default is shipped without 8. Tests and ValidationLinting (
|
| Check | Status |
|---|---|
tox (3.10–3.13, core) |
✅ pass |
tox (*, lint) |
✅ pass |
tox (3.10, docs) |
❌ fail |
| Read the Docs | ❌ fail |
| Windows core | ✅ pass |
9. Recommendations for Improvement
- Unblock CI: Fix
verify_recorddocstring RST formatting; re-run docs tox. - Resolve reviewer blockers: Revert
wrap_with_directorydefault toFalseor ship.breaking.rst+ migration text; document chunk-size vs Kubo in newsfragment andchunker.pymodule docstring. - Performance (quick wins):
bytearrayinfetch_fileandencode_dag_pb; atomic writes inFilesystemBlockStore. - Reliability: Propagate or surface errors from
_send_wantlist_to_peer; remove MagicMock-oriented fallback in_get_blocks_batch. - Tests: Add explicit canonical wire-order test (
0x12before0x0a); optional integration test with Kubo/ipfs addfor same chunker. - Process: Add
Fixes #1321to PR body; squash/clean 26 commits if maintainers require linear history.
10. Questions for the Author
- Is
wrap_with_directory=Trueas default intentional for IPFS parity, and are you willing to call it breaking in release notes? - Should the project standardize on 63 KiB chunks permanently, or align with Kubo’s 256 KiB default over time?
- For
ProviderQueryManager, isprovider_store.find_providers(network DHT walk) sufficient, or should callers useKadDHT.find_providersat the host API level for consistency? - Do you plan Kubo interop integration tests (dockerized
ipfs add/ipfs cat) in a follow-up PR? - Will you fix the Sphinx docstring in this PR or a fast follow-up?
11. Overall Assessment
| Metric | Rating |
|---|---|
| Quality | Good — strong technical core; remaining issues are mostly API contract, docs CI, and performance hygiene |
| Security impact | Low |
| Merge readiness | Needs fixes — docs CI failure + unresolved blocking review on wrap_with_directory default and CID/chunk-size messaging |
| Confidence | High (full local test run + code review on PR branch; CI docs failure reproduced locally) |
Summary: This PR materially fixes real Kubo Bitswap/DHT interoperability gaps and adds valuable infrastructure (BlockService, persistent store, batch wants). Earlier major review findings (signed PUT propagation, network provider lookup, multi-key verification) appear addressed. Before merge: fix docs build, resolve wrap_with_directory default / breaking disclosure, and correct newsfragment claims about chunk size and root CID parity with default Kubo ipfs add.
- Added `payment_client_1_3.py` for handling in-band payment messages, including: - Processing PaymentTerms and sending PaymentAuthorization. - Handling PaymentReceipts and PaymentRejections. - Validating payment amounts and signing EIP-3009 transactions. - Introduced `payment_ledger.py` for tracking payments at the root CID level: - Supports registration of DAG structures for child blocks. - Implements nonce deduplication to prevent replay attacks. - Provides methods to check payment status and record payments. - Updated protobuf definitions in `bitswap_1_3_0_pb2.pyi` to reflect new payment structures: - Added PaymentTerms, PaymentAuthorization, and PaymentReceipt messages. - Introduced TxReceipt for transaction details. - Created `pricing_engine.py` to compute block pricing based on configurable strategies: - Supports free, fixed, size-based, and custom pricing strategies. - Allows marking specific CIDs as free and setting per-CID prices.
- Added PaymentExtension class to handle payment-related protobuf fields and wantlists in Bitswap 1.3.0. - Integrated payment terms, receipts, and rejections processing for client-side and server-side. - Enhanced PaymentLedger to track root CID payments and manage payment records. - Updated pricing engine to support configurable pricing strategies. - Refactored tests to accommodate changes in block storage and encoding, ensuring raw blocks are used for leaves. - Improved type hints and documentation across the codebase for better clarity and maintainability.
f16afc1 to
be24050
Compare
What was wrong?
Issue: py-libp2p's Bitswap file operations were not compatible with Kubo (Go-IPFS)
and the broader IPFS network. Files added by py-libp2p could not be fetched by Kubo
nodes, and vice versa. Additionally, there was no support for the Bitswap 1.3.0 Payment Extension, making the node incapable of participating in paid data retrieval, and the DHT records lacked cryptographic verification required by Kubo peers.
Several root causes were identified:
1. Wrong leaf block encoding (
CODEC_RAWinstead ofdag-pb+ UnixFS)MerkleDag.add_file()andadd_bytes()stored leaf chunks as raw blocks. Kubo wraps every leaf inUnixFS Data(type=File, data=chunk)inside adag-pbPBNode. UsingCODEC_RAWproduced completely different CIDs for the same content.2. Flat DAG structure (no balanced layout)
create_file_node()put all chunks as direct links on the root node — a flat 1-level tree. Kubo uses a balanced tree with a maximum of 174 links per node.3. Wrong DAG-PB wire encoding (field ordering)
encode_dag_pb()usedPBNode.SerializeToString()which emitsDatabeforeLinks. The DAG-PB spec requires Links before Data for canonical encoding.4. No persistent block storage
MemoryBlockStorewas the onlyBlockStoreimplementation, meaning all fetched blocks were lost when the process exited.5. No transparent caching layer
MerkleDagcalledbitswap.get_block()/bitswap.add_block()directly with no service layer, meaning fetched blocks were not auto-cached locally.6. No stream input support
add_file()only accepted a file path string.7. Magic integers in Bitswap message construction
create_wantlist_entry(cid, want_type=1)used raw integers with no type safety.8. DHT record signing/verification not compatible with Kubo
DHT value records lacked proper signing and verification, causing incompatibility with Kubo's DHT implementation.
9. Missing Bitswap 1.3.0 Payment Support
There was no support for payment-gated file retrieval, block pricing engines, or payment ledgers.
How was it fixed?
Bitswap: Kubo CID compatibility
New
create_leaf_node(data)indag_pb.py:Wraps each chunk in
UnixFS Data(type=File)+PBNode— matches Kubo'sRawLeaves=falsedefault.New
balanced_layout(leaves)indag_pb.py:Groups leaves into batches of 174, builds a tree level by level.
Fixed
encode_dag_pb()indag_pb.py:Manually constructs wire bytes with Links before Data.
Bitswap 1.3.0 Payment Extension
New Payment Architecture:
Fully implemented
BitswapPaymentClient,PaymentLedger, andPaymentGatedDecisionEnginefor tracking spent payments and managing paid data. AddedBlockPricingEngineto allow dynamic block pricing strategies. Extracted payment logic intoPaymentExtensionusing theIBitswapExtensioninterface.Bitswap: batch fetching
Enhanced
get_blocks_batch()inclient.py:Sends all CIDs in a single wantlist message per batch, waits for all responses on the same stream.
New: FilesystemBlockStore
New
FilesystemBlockStoreinblock_store.py:Stores each block as a file at
<base>/<cid[:2]>/<cid[2:]>. Drop-in replacement forMemoryBlockStore.New: BlockService
New
block_service.py:Transparent local→network fallback layer between
MerkleDagandBitswapClient. Auto-caches network-fetched blocks locally.New:
add_stream()+chunk_stream()New
chunk_stream(stream: io.IOBase)inchunker.pyandadd_stream()inMerkleDag:Reads one chunk at a time from any
io.IOBasestream.New: Wantlist / Message dataclasses
New
wantlist.pywith 6 typed dataclasses (WantType,BlockPresenceType,WantlistEntry,Wantlist,BlockPresence,BitswapMessage).DHT: record signing and verification & ProviderQueryManager
Enhanced DHT record handling with proper signing and verification for compatibility with Kubo's DHT implementation. Added
ProviderQueryManagerfor robust DHT-based provider discovery and caching in Bitswap.Files changed
libp2p/bitswap/dag_pb.pycreate_leaf_node(),balanced_layout(), fixedencode_dag_pb()canonical orderinglibp2p/bitswap/dag.pyadd_file(),add_bytes(), newadd_stream(),BlockServiceroutinglibp2p/bitswap/client.pyget_blocks_batch(), modularized withIBitswapExtensionlibp2p/bitswap/chunker.pychunk_stream(stream: io.IOBase)libp2p/bitswap/block_store.pyFilesystemBlockStorelibp2p/bitswap/block_service.pyBlockServicelibp2p/bitswap/wantlist.pylibp2p/bitswap/payment_extension.pylibp2p/bitswap/messages.pycreate_wantlist_entry()acceptsWantType | intlibp2p/bitswap/__init__.pylibp2p/kad_dht/ProviderQueryManagerlibp2p/records/record.py,utils.pylibp2p/peer/envelope.py,peer_record.pyexamples/bitswap/bitswap.pyexamples/bitswap_payment_example.pyTest files added
test_unixfs_encoding.pydag-pbleaf encoding + balanced DAG layouttest_canonical_dag_pb.pytest_filesystem_blockstore.pyFilesystemBlockStorepersistence + round-triptest_block_service.pyBlockServicelocal hit / miss / auto-cache / announcetest_io_stream.pychunk_stream()+add_stream()with BytesIO, GzipFile, file handlestest_wantlist.pyWantType,Wantlist,BitswapMessage,to_proto()/from_proto()To-Do