From 16246b05dceaf6f5f449a38f91d0908454d6d9c7 Mon Sep 17 00:00:00 2001 From: nugaon Date: Thu, 25 Sep 2025 19:34:55 +0200 Subject: [PATCH 01/16] fix: strategy channels of redundancy getter --- pkg/file/redundancy/getter/getter.go | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/pkg/file/redundancy/getter/getter.go b/pkg/file/redundancy/getter/getter.go index 8244142ad9b..6027e63da15 100644 --- a/pkg/file/redundancy/getter/getter.go +++ b/pkg/file/redundancy/getter/getter.go @@ -240,7 +240,14 @@ func (g *decoder) runStrategy(s Strategy) error { c := make(chan error, len(m)) ctx, cancel := context.WithCancel(context.Background()) - defer cancel() + completed := 0 + defer func() { + cancel() + remaining := len(m) - completed + for i := 0; i < remaining; i++ { + <-c + } + }() for _, i := range m { go func(i int) { @@ -248,7 +255,9 @@ func (g *decoder) runStrategy(s Strategy) error { }(i) } - for range c { + for completed < len(m) { + <-c + completed++ if g.fetchedCnt.Load() >= int32(g.shardCnt) { return nil } From 0e25a21ad76ec4f9de63cf676a4e091d0ca2a514 Mon Sep 17 00:00:00 2001 From: nugaon Date: Fri, 27 Feb 2026 13:18:53 +0100 Subject: [PATCH 02/16] fix: default redundancy levels --- pkg/api/accesscontrol.go | 71 ++++++++++++++++++++++++++------------ pkg/api/bytes.go | 2 +- pkg/api/bzz.go | 2 +- pkg/api/chunk.go | 18 ++++++---- pkg/api/dirs.go | 2 +- pkg/api/feed.go | 17 +++++---- pkg/api/pin.go | 16 ++++++++- pkg/api/soc.go | 16 ++++++--- pkg/api/stewardship.go | 26 ++++++++++++-- pkg/steward/steward.go | 16 ++++----- pkg/traversal/traversal.go | 15 ++++---- 11 files changed, 139 insertions(+), 62 deletions(-) diff --git a/pkg/api/accesscontrol.go b/pkg/api/accesscontrol.go index 1bc4b03a6c3..8af08f0c333 100644 --- a/pkg/api/accesscontrol.go +++ b/pkg/api/accesscontrol.go @@ -100,10 +100,11 @@ func (s *Service) actDecryptionHandler() func(h http.Handler) http.Handler { } headers := struct { - Timestamp *int64 `map:"Swarm-Act-Timestamp"` - Publisher *ecdsa.PublicKey `map:"Swarm-Act-Publisher"` - HistoryAddress *swarm.Address `map:"Swarm-Act-History-Address"` - Cache *bool `map:"Swarm-Cache"` + Timestamp *int64 `map:"Swarm-Act-Timestamp"` + Publisher *ecdsa.PublicKey `map:"Swarm-Act-Publisher"` + HistoryAddress *swarm.Address `map:"Swarm-Act-History-Address"` + Cache *bool `map:"Swarm-Cache"` + RLevel *redundancy.Level `map:"Swarm-Redundancy-Level"` }{} if response := s.mapStructure(r.Header, &headers); response != nil { response("invalid header params", logger, w) @@ -125,8 +126,14 @@ func (s *Service) actDecryptionHandler() func(h http.Handler) http.Handler { if headers.Cache != nil { cache = *headers.Cache } + + rLevel := redundancy.PARANOID + if headers.RLevel != nil { + rLevel = *headers.RLevel + } + ctx := r.Context() - ls := loadsave.NewReadonly(s.storer.Download(cache), s.storer.Cache(), redundancy.DefaultLevel) + ls := loadsave.NewReadonly(s.storer.Download(cache), s.storer.Cache(), rLevel) reference, err := s.accesscontrol.DownloadHandler(ctx, ls, paths.Address, headers.Publisher, *headers.HistoryAddress, timestamp) if err != nil { logger.Debug("access control download failed", "error", err) @@ -157,9 +164,10 @@ func (s *Service) actEncryptionHandler( putter storer.PutterSession, reference swarm.Address, historyRootHash swarm.Address, + rLevel redundancy.Level, ) (swarm.Address, swarm.Address, error) { publisherPublicKey := &s.publicKey - ls := loadsave.New(s.storer.Download(true), s.storer.Cache(), requestPipelineFactory(ctx, putter, false, redundancy.NONE), redundancy.DefaultLevel) + ls := loadsave.New(s.storer.Download(true), s.storer.Cache(), requestPipelineFactory(ctx, putter, false, redundancy.NONE), rLevel) storageReference, historyReference, encryptedReference, err := s.accesscontrol.UploadHandler(ctx, ls, reference, publisherPublicKey, historyRootHash) if err != nil { return swarm.ZeroAddress, swarm.ZeroAddress, err @@ -193,7 +201,8 @@ func (s *Service) actListGranteesHandler(w http.ResponseWriter, r *http.Request) } headers := struct { - Cache *bool `map:"Swarm-Cache"` + Cache *bool `map:"Swarm-Cache"` + RLevel *redundancy.Level `map:"Swarm-Redundancy-Level"` }{} if response := s.mapStructure(r.Header, &headers); response != nil { response("invalid header params", logger, w) @@ -203,8 +212,14 @@ func (s *Service) actListGranteesHandler(w http.ResponseWriter, r *http.Request) if headers.Cache != nil { cache = *headers.Cache } + + rLevel := redundancy.PARANOID + if headers.RLevel != nil { + rLevel = *headers.RLevel + } + publisher := &s.publicKey - ls := loadsave.NewReadonly(s.storer.Download(cache), s.storer.Cache(), redundancy.DefaultLevel) + ls := loadsave.NewReadonly(s.storer.Download(cache), s.storer.Cache(), rLevel) grantees, err := s.accesscontrol.Get(r.Context(), ls, publisher, paths.GranteesAddress) if err != nil { logger.Debug("could not get grantees", "error", err) @@ -239,11 +254,12 @@ func (s *Service) actGrantRevokeHandler(w http.ResponseWriter, r *http.Request) } headers := struct { - BatchID []byte `map:"Swarm-Postage-Batch-Id" validate:"required"` - SwarmTag uint64 `map:"Swarm-Tag"` - Pin bool `map:"Swarm-Pin"` - Deferred *bool `map:"Swarm-Deferred-Upload"` - HistoryAddress *swarm.Address `map:"Swarm-Act-History-Address" validate:"required"` + BatchID []byte `map:"Swarm-Postage-Batch-Id" validate:"required"` + SwarmTag uint64 `map:"Swarm-Tag"` + Pin bool `map:"Swarm-Pin"` + Deferred *bool `map:"Swarm-Deferred-Upload"` + HistoryAddress *swarm.Address `map:"Swarm-Act-History-Address" validate:"required"` + RLevel *redundancy.Level `map:"Swarm-Redundancy-Level"` }{} if response := s.mapStructure(r.Header, &headers); response != nil { response("invalid header params", logger, w) @@ -255,6 +271,11 @@ func (s *Service) actGrantRevokeHandler(w http.ResponseWriter, r *http.Request) historyAddress = *headers.HistoryAddress } + rLevel := redundancy.PARANOID + if headers.RLevel != nil { + rLevel = *headers.RLevel + } + var ( tag uint64 err error @@ -342,8 +363,8 @@ func (s *Service) actGrantRevokeHandler(w http.ResponseWriter, r *http.Request) granteeref := paths.GranteesAddress publisher := &s.publicKey - ls := loadsave.New(s.storer.Download(true), s.storer.Cache(), requestPipelineFactory(ctx, putter, false, redundancy.NONE), redundancy.DefaultLevel) - gls := loadsave.New(s.storer.Download(true), s.storer.Cache(), requestPipelineFactory(ctx, putter, granteeListEncrypt, redundancy.NONE), redundancy.DefaultLevel) + ls := loadsave.New(s.storer.Download(true), s.storer.Cache(), requestPipelineFactory(ctx, putter, false, redundancy.NONE), rLevel) + gls := loadsave.New(s.storer.Download(true), s.storer.Cache(), requestPipelineFactory(ctx, putter, granteeListEncrypt, redundancy.NONE), rLevel) granteeref, encryptedglref, historyref, actref, err := s.accesscontrol.UpdateHandler(ctx, ls, gls, granteeref, historyAddress, publisher, grantees.Addlist, grantees.Revokelist) if err != nil { logger.Debug("failed to update grantee list", "error", err) @@ -403,11 +424,12 @@ func (s *Service) actCreateGranteesHandler(w http.ResponseWriter, r *http.Reques } headers := struct { - BatchID []byte `map:"Swarm-Postage-Batch-Id" validate:"required"` - SwarmTag uint64 `map:"Swarm-Tag"` - Pin bool `map:"Swarm-Pin"` - Deferred *bool `map:"Swarm-Deferred-Upload"` - HistoryAddress *swarm.Address `map:"Swarm-Act-History-Address"` + BatchID []byte `map:"Swarm-Postage-Batch-Id" validate:"required"` + SwarmTag uint64 `map:"Swarm-Tag"` + Pin bool `map:"Swarm-Pin"` + Deferred *bool `map:"Swarm-Deferred-Upload"` + HistoryAddress *swarm.Address `map:"Swarm-Act-History-Address"` + RLevel *redundancy.Level `map:"Swarm-Redundancy-Level"` }{} if response := s.mapStructure(r.Header, &headers); response != nil { response("invalid header params", logger, w) @@ -419,6 +441,11 @@ func (s *Service) actCreateGranteesHandler(w http.ResponseWriter, r *http.Reques historyAddress = *headers.HistoryAddress } + rLevel := redundancy.PARANOID + if headers.RLevel != nil { + rLevel = *headers.RLevel + } + var ( tag uint64 err error @@ -494,8 +521,8 @@ func (s *Service) actCreateGranteesHandler(w http.ResponseWriter, r *http.Reques } publisher := &s.publicKey - ls := loadsave.New(s.storer.Download(true), s.storer.Cache(), requestPipelineFactory(ctx, putter, false, redundancy.NONE), redundancy.DefaultLevel) - gls := loadsave.New(s.storer.Download(true), s.storer.Cache(), requestPipelineFactory(ctx, putter, granteeListEncrypt, redundancy.NONE), redundancy.DefaultLevel) + ls := loadsave.New(s.storer.Download(true), s.storer.Cache(), requestPipelineFactory(ctx, putter, false, redundancy.NONE), rLevel) + gls := loadsave.New(s.storer.Download(true), s.storer.Cache(), requestPipelineFactory(ctx, putter, granteeListEncrypt, redundancy.NONE), rLevel) granteeref, encryptedglref, historyref, actref, err := s.accesscontrol.UpdateHandler(ctx, ls, gls, swarm.ZeroAddress, historyAddress, publisher, list, nil) if err != nil { logger.Debug("failed to create grantee list", "error", err) diff --git a/pkg/api/bytes.go b/pkg/api/bytes.go index 314e39fcb56..a039e07bdb8 100644 --- a/pkg/api/bytes.go +++ b/pkg/api/bytes.go @@ -121,7 +121,7 @@ func (s *Service) bytesUploadHandler(w http.ResponseWriter, r *http.Request) { encryptedReference := reference historyReference := swarm.ZeroAddress if headers.Act { - encryptedReference, historyReference, err = s.actEncryptionHandler(r.Context(), putter, reference, headers.HistoryAddress) + encryptedReference, historyReference, err = s.actEncryptionHandler(r.Context(), putter, reference, headers.HistoryAddress, headers.RLevel) if err != nil { logger.Debug("access control upload failed", "error", err) logger.Error(nil, "access control upload failed") diff --git a/pkg/api/bzz.go b/pkg/api/bzz.go index a365212a4ec..f03fec08b8e 100644 --- a/pkg/api/bzz.go +++ b/pkg/api/bzz.go @@ -303,7 +303,7 @@ func (s *Service) fileUploadHandler( reference := manifestReference historyReference := swarm.ZeroAddress if act { - reference, historyReference, err = s.actEncryptionHandler(r.Context(), putter, reference, historyAddress) + reference, historyReference, err = s.actEncryptionHandler(r.Context(), putter, reference, historyAddress, rLevel) if err != nil { logger.Debug("access control upload failed", "error", err) logger.Error(nil, "access control upload failed") diff --git a/pkg/api/chunk.go b/pkg/api/chunk.go index 6ddb2a50b10..e8f0537dba8 100644 --- a/pkg/api/chunk.go +++ b/pkg/api/chunk.go @@ -14,6 +14,7 @@ import ( "github.com/ethersphere/bee/v2/pkg/accesscontrol" "github.com/ethersphere/bee/v2/pkg/cac" + "github.com/ethersphere/bee/v2/pkg/file/redundancy" "github.com/ethersphere/bee/v2/pkg/soc" "github.com/ethersphere/bee/v2/pkg/storer" @@ -32,11 +33,12 @@ func (s *Service) chunkUploadHandler(w http.ResponseWriter, r *http.Request) { logger := s.logger.WithName("post_chunk").Build() headers := struct { - BatchID []byte `map:"Swarm-Postage-Batch-Id"` - StampSig []byte `map:"Swarm-Postage-Stamp"` - SwarmTag uint64 `map:"Swarm-Tag"` - Act bool `map:"Swarm-Act"` - HistoryAddress swarm.Address `map:"Swarm-Act-History-Address"` + BatchID []byte `map:"Swarm-Postage-Batch-Id"` + StampSig []byte `map:"Swarm-Postage-Stamp"` + SwarmTag uint64 `map:"Swarm-Tag"` + Act bool `map:"Swarm-Act"` + HistoryAddress swarm.Address `map:"Swarm-Act-History-Address"` + RLevel *redundancy.Level `map:"Swarm-Redundancy-Level"` }{} if response := s.mapStructure(r.Header, &headers); response != nil { response("invalid header params", logger, w) @@ -185,7 +187,11 @@ func (s *Service) chunkUploadHandler(w http.ResponseWriter, r *http.Request) { reference := chunk.Address() historyReference := swarm.ZeroAddress if headers.Act { - reference, historyReference, err = s.actEncryptionHandler(r.Context(), putter, reference, headers.HistoryAddress) + rLevel := redundancy.PARANOID + if headers.RLevel != nil { + rLevel = *headers.RLevel + } + reference, historyReference, err = s.actEncryptionHandler(r.Context(), putter, reference, headers.HistoryAddress, rLevel) if err != nil { logger.Debug("access control upload failed", "error", err) logger.Error(nil, "access control upload failed") diff --git a/pkg/api/dirs.go b/pkg/api/dirs.go index ae0f15d23c0..d3fa10ab1d3 100644 --- a/pkg/api/dirs.go +++ b/pkg/api/dirs.go @@ -103,7 +103,7 @@ func (s *Service) dirUploadHandler( encryptedReference := reference historyReference := swarm.ZeroAddress if act { - encryptedReference, historyReference, err = s.actEncryptionHandler(r.Context(), putter, reference, historyAddress) + encryptedReference, historyReference, err = s.actEncryptionHandler(r.Context(), putter, reference, historyAddress, rLevel) if err != nil { logger.Debug("access control upload failed", "error", err) logger.Error(nil, "access control upload failed") diff --git a/pkg/api/feed.go b/pkg/api/feed.go index 1b9f1367acf..a9712894777 100644 --- a/pkg/api/feed.go +++ b/pkg/api/feed.go @@ -168,11 +168,12 @@ func (s *Service) feedPostHandler(w http.ResponseWriter, r *http.Request) { } headers := struct { - BatchID []byte `map:"Swarm-Postage-Batch-Id" validate:"required"` - Pin bool `map:"Swarm-Pin"` - Deferred *bool `map:"Swarm-Deferred-Upload"` - Act bool `map:"Swarm-Act"` - HistoryAddress swarm.Address `map:"Swarm-Act-History-Address"` + BatchID []byte `map:"Swarm-Postage-Batch-Id" validate:"required"` + Pin bool `map:"Swarm-Pin"` + Deferred *bool `map:"Swarm-Deferred-Upload"` + Act bool `map:"Swarm-Act"` + HistoryAddress swarm.Address `map:"Swarm-Act-History-Address"` + RLevel *redundancy.Level `map:"Swarm-Redundancy-Level"` }{} if response := s.mapStructure(r.Header, &headers); response != nil { response("invalid header params", logger, w) @@ -279,7 +280,11 @@ func (s *Service) feedPostHandler(w http.ResponseWriter, r *http.Request) { encryptedReference := ref historyReference := swarm.ZeroAddress if headers.Act { - encryptedReference, historyReference, err = s.actEncryptionHandler(r.Context(), putter, ref, headers.HistoryAddress) + rLevel := redundancy.PARANOID + if headers.RLevel != nil { + rLevel = *headers.RLevel + } + encryptedReference, historyReference, err = s.actEncryptionHandler(r.Context(), putter, ref, headers.HistoryAddress, rLevel) if err != nil { logger.Debug("access control upload failed", "error", err) logger.Error(nil, "access control upload failed") diff --git a/pkg/api/pin.go b/pkg/api/pin.go index 696f5185150..b1e43704098 100644 --- a/pkg/api/pin.go +++ b/pkg/api/pin.go @@ -32,6 +32,19 @@ func (s *Service) pinRootHash(w http.ResponseWriter, r *http.Request) { return } + headers := struct { + RLevel *redundancy.Level `map:"Swarm-Redundancy-Level"` + }{} + if response := s.mapStructure(r.Header, &headers); response != nil { + response("invalid header params", logger, w) + return + } + + rLevel := redundancy.PARANOID + if headers.RLevel != nil { + rLevel = *headers.RLevel + } + has, err := s.storer.HasPin(paths.Reference) if err != nil { logger.Debug("pin root hash: has pin failed", "chunk_address", paths.Reference, "error", err) @@ -53,7 +66,7 @@ func (s *Service) pinRootHash(w http.ResponseWriter, r *http.Request) { } getter := s.storer.Download(true) - traverser := traversal.New(getter, s.storer.Cache(), redundancy.DefaultLevel) + traverser := traversal.New(getter, s.storer.Cache()) sem := semaphore.NewWeighted(100) var errTraverse error @@ -93,6 +106,7 @@ func (s *Service) pinRootHash(w http.ResponseWriter, r *http.Request) { }() return nil }, + rLevel, ) wg.Wait() diff --git a/pkg/api/soc.go b/pkg/api/soc.go index 07623e3ff52..ce098a54f17 100644 --- a/pkg/api/soc.go +++ b/pkg/api/soc.go @@ -14,6 +14,7 @@ import ( "github.com/ethersphere/bee/v2/pkg/accesscontrol" "github.com/ethersphere/bee/v2/pkg/cac" + "github.com/ethersphere/bee/v2/pkg/file/redundancy" "github.com/ethersphere/bee/v2/pkg/jsonhttp" "github.com/ethersphere/bee/v2/pkg/postage" "github.com/ethersphere/bee/v2/pkg/soc" @@ -47,10 +48,11 @@ func (s *Service) socUploadHandler(w http.ResponseWriter, r *http.Request) { } headers := struct { - BatchID []byte `map:"Swarm-Postage-Batch-Id"` - StampSig []byte `map:"Swarm-Postage-Stamp"` - Act bool `map:"Swarm-Act"` - HistoryAddress swarm.Address `map:"Swarm-Act-History-Address"` + BatchID []byte `map:"Swarm-Postage-Batch-Id"` + StampSig []byte `map:"Swarm-Postage-Stamp"` + Act bool `map:"Swarm-Act"` + HistoryAddress swarm.Address `map:"Swarm-Act-History-Address"` + RLevel *redundancy.Level `map:"Swarm-Redundancy-Level"` }{} if response := s.mapStructure(r.Header, &headers); response != nil { response("invalid header params", logger, w) @@ -181,7 +183,11 @@ func (s *Service) socUploadHandler(w http.ResponseWriter, r *http.Request) { reference := sch.Address() historyReference := swarm.ZeroAddress if headers.Act { - reference, historyReference, err = s.actEncryptionHandler(r.Context(), putter, reference, headers.HistoryAddress) + rLevel := redundancy.PARANOID + if headers.RLevel != nil { + rLevel = *headers.RLevel + } + reference, historyReference, err = s.actEncryptionHandler(r.Context(), putter, reference, headers.HistoryAddress, rLevel) if err != nil { logger.Debug("access control upload failed", "error", err) logger.Error(nil, "access control upload failed") diff --git a/pkg/api/stewardship.go b/pkg/api/stewardship.go index b11b5ea5a6c..962ba41991f 100644 --- a/pkg/api/stewardship.go +++ b/pkg/api/stewardship.go @@ -8,6 +8,7 @@ import ( "errors" "net/http" + "github.com/ethersphere/bee/v2/pkg/file/redundancy" "github.com/ethersphere/bee/v2/pkg/postage" storage "github.com/ethersphere/bee/v2/pkg/storage" "github.com/ethersphere/bee/v2/pkg/swarm" @@ -29,13 +30,19 @@ func (s *Service) stewardshipPutHandler(w http.ResponseWriter, r *http.Request) } headers := struct { - BatchID []byte `map:"Swarm-Postage-Batch-Id" validate:"required"` + BatchID []byte `map:"Swarm-Postage-Batch-Id" validate:"required"` + RLevel *redundancy.Level `map:"Swarm-Redundancy-Level"` }{} if response := s.mapStructure(r.Header, &headers); response != nil { response("invalid header params", logger, w) return } + rLevel := redundancy.PARANOID + if headers.RLevel != nil { + rLevel = *headers.RLevel + } + var ( batchID []byte err error @@ -57,7 +64,7 @@ func (s *Service) stewardshipPutHandler(w http.ResponseWriter, r *http.Request) return } - err = s.steward.Reupload(r.Context(), paths.Address, stamper) + err = s.steward.Reupload(r.Context(), paths.Address, stamper, rLevel) if err != nil { logger.Debug("re-upload failed", "chunk_address", paths.Address, "error", err) logger.Error(nil, "re-upload failed") @@ -91,7 +98,20 @@ func (s *Service) stewardshipGetHandler(w http.ResponseWriter, r *http.Request) return } - res, err := s.steward.IsRetrievable(r.Context(), paths.Address) + headers := struct { + RLevel *redundancy.Level `map:"Swarm-Redundancy-Level"` + }{} + if response := s.mapStructure(r.Header, &headers); response != nil { + response("invalid header params", logger, w) + return + } + + rLevel := redundancy.PARANOID + if headers.RLevel != nil { + rLevel = *headers.RLevel + } + + res, err := s.steward.IsRetrievable(r.Context(), paths.Address, rLevel) if err != nil { logger.Debug("is retrievable check failed", "chunk_address", paths.Address, "error", err) logger.Error(nil, "is retrievable") diff --git a/pkg/steward/steward.go b/pkg/steward/steward.go index 4272095f4ee..463d389814e 100644 --- a/pkg/steward/steward.go +++ b/pkg/steward/steward.go @@ -24,11 +24,11 @@ import ( type Interface interface { // Reupload root hash and all of its underlying // associated chunks to the network. - Reupload(context.Context, swarm.Address, postage.Stamper) error + Reupload(context.Context, swarm.Address, postage.Stamper, redundancy.Level) error // IsRetrievable checks whether the content // on the given address is retrievable. - IsRetrievable(context.Context, swarm.Address) (bool, error) + IsRetrievable(context.Context, swarm.Address, redundancy.Level) (bool, error) } type steward struct { @@ -41,8 +41,8 @@ type steward struct { func New(ns storer.NetStore, r retrieval.Interface, joinerPutter storage.Putter) Interface { return &steward{ netStore: ns, - traverser: traversal.New(ns.Download(true), joinerPutter, redundancy.DefaultLevel), - netTraverser: traversal.New(&netGetter{r}, joinerPutter, redundancy.DefaultLevel), + traverser: traversal.New(ns.Download(true), joinerPutter), + netTraverser: traversal.New(&netGetter{r}, joinerPutter), netGetter: r, } } @@ -52,7 +52,7 @@ func New(ns storer.NetStore, r retrieval.Interface, joinerPutter storage.Putter) // addresses and push every chunk individually to the network. // It assumes all chunks are available locally. It is therefore // advisable to pin the content locally before trying to reupload it. -func (s *steward) Reupload(ctx context.Context, root swarm.Address, stamper postage.Stamper) error { +func (s *steward) Reupload(ctx context.Context, root swarm.Address, stamper postage.Stamper, rLevel redundancy.Level) error { uploaderSession := s.netStore.DirectUpload() getter := s.netStore.Download(false) @@ -70,7 +70,7 @@ func (s *steward) Reupload(ctx context.Context, root swarm.Address, stamper post return uploaderSession.Put(ctx, c.WithStamp(stamp)) } - if err := s.traverser.Traverse(ctx, root, fn); err != nil { + if err := s.traverser.Traverse(ctx, root, fn, rLevel); err != nil { return errors.Join( fmt.Errorf("traversal of %s failed: %w", root.String(), err), uploaderSession.Cleanup(), @@ -81,12 +81,12 @@ func (s *steward) Reupload(ctx context.Context, root swarm.Address, stamper post } // IsRetrievable implements Interface.IsRetrievable method. -func (s *steward) IsRetrievable(ctx context.Context, root swarm.Address) (bool, error) { +func (s *steward) IsRetrievable(ctx context.Context, root swarm.Address, rLevel redundancy.Level) (bool, error) { fn := func(a swarm.Address) error { _, err := s.netGetter.RetrieveChunk(ctx, a, swarm.ZeroAddress) return err } - switch err := s.netTraverser.Traverse(ctx, root, fn); { + switch err := s.netTraverser.Traverse(ctx, root, fn, rLevel); { case errors.Is(err, storage.ErrNotFound): return false, nil case errors.Is(err, topology.ErrNotFound): diff --git a/pkg/traversal/traversal.go b/pkg/traversal/traversal.go index 7bb3ade6ce9..2516be5301a 100644 --- a/pkg/traversal/traversal.go +++ b/pkg/traversal/traversal.go @@ -26,25 +26,24 @@ import ( // Traverser represents service which traverse through address dependent chunks. type Traverser interface { // Traverse iterates through each address related to the supplied one, if possible. - Traverse(context.Context, swarm.Address, swarm.AddressIterFunc) error + Traverse(context.Context, swarm.Address, swarm.AddressIterFunc, redundancy.Level) error } // New constructs for a new Traverser. -func New(getter storage.Getter, putter storage.Putter, rLevel redundancy.Level) Traverser { - return &service{getter: getter, putter: putter, rLevel: rLevel} +func New(getter storage.Getter, putter storage.Putter) Traverser { + return &service{getter: getter, putter: putter} } // service is implementation of Traverser using storage.Storer as its storage. type service struct { getter storage.Getter putter storage.Putter - rLevel redundancy.Level } // Traverse implements Traverser.Traverse method. -func (s *service) Traverse(ctx context.Context, addr swarm.Address, iterFn swarm.AddressIterFunc) error { +func (s *service) Traverse(ctx context.Context, addr swarm.Address, iterFn swarm.AddressIterFunc, rLevel redundancy.Level) error { processBytes := func(ref swarm.Address) error { - j, _, err := joiner.New(ctx, s.getter, s.putter, ref, s.rLevel) + j, _, err := joiner.New(ctx, s.getter, s.putter, ref, rLevel) if err != nil { return fmt.Errorf("traversal: joiner error on %q: %w", ref, err) } @@ -67,7 +66,7 @@ func (s *service) Traverse(ctx context.Context, addr swarm.Address, iterFn swarm } } - j, _, err := joiner.New(ctx, s.getter, s.putter, addr, s.rLevel) + j, _, err := joiner.New(ctx, s.getter, s.putter, addr, rLevel) if err != nil { return err } @@ -77,7 +76,7 @@ func (s *service) Traverse(ctx context.Context, addr swarm.Address, iterFn swarm // then the reference is likely a manifest reference. This is because manifest holds metadata // that points to the actual data file, and this metadata is assumed to be small - Less than or equal to swarm.ChunkSize. if j.Size() <= swarm.ChunkSize { - ls := loadsave.NewReadonly(s.getter, s.putter, s.rLevel) + ls := loadsave.NewReadonly(s.getter, s.putter, rLevel) switch mf, err := manifest.NewDefaultManifestReference(addr, ls); { case errors.Is(err, manifest.ErrInvalidManifestType): break From ed1a926b68b923fb5b188f75dc80672fa576fd2b Mon Sep 17 00:00:00 2001 From: nugaon Date: Fri, 27 Feb 2026 13:19:01 +0100 Subject: [PATCH 03/16] docs: openapi --- openapi/Swarm.yaml | 49 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 49 insertions(+) diff --git a/openapi/Swarm.yaml b/openapi/Swarm.yaml index 8028998e970..76b866819a3 100644 --- a/openapi/Swarm.yaml +++ b/openapi/Swarm.yaml @@ -59,6 +59,11 @@ paths: $ref: "SwarmCommon.yaml#/components/parameters/SwarmActHistoryAddress" name: swarm-act-history-address required: false + - in: header + schema: + $ref: "SwarmCommon.yaml#/components/parameters/SwarmRedundancyLevelParameter" + name: swarm-redundancy-level + required: false requestBody: required: true content: @@ -89,6 +94,11 @@ paths: $ref: "SwarmCommon.yaml#/components/schemas/SwarmEncryptedReference" required: true description: Grantee list reference + - in: header + schema: + $ref: "SwarmCommon.yaml#/components/parameters/SwarmRedundancyLevelParameter" + name: swarm-redundancy-level + required: false responses: "200": description: OK @@ -139,6 +149,11 @@ paths: $ref: "SwarmCommon.yaml#/components/parameters/SwarmDeferredUpload" name: swarm-deferred-upload required: false + - in: header + schema: + $ref: "SwarmCommon.yaml#/components/parameters/SwarmRedundancyLevelParameter" + name: swarm-redundancy-level + required: false requestBody: required: true content: @@ -281,6 +296,12 @@ paths: - $ref: "SwarmCommon.yaml#/components/parameters/SwarmPostageStamp" - $ref: "SwarmCommon.yaml#/components/parameters/SwarmAct" - $ref: "SwarmCommon.yaml#/components/parameters/SwarmActHistoryAddress" + - in: header + schema: + $ref: "SwarmCommon.yaml#/components/parameters/SwarmRedundancyLevelParameter" + name: swarm-redundancy-level + required: false + description: Redundancy level for ACT encryption only requestBody: description: Chunk binary data containing at least 8 bytes. content: @@ -659,6 +680,12 @@ paths: summary: Pin a root hash by reference tags: - Pinning + parameters: + - in: header + schema: + $ref: "SwarmCommon.yaml#/components/parameters/SwarmRedundancyLevelParameter" + name: swarm-redundancy-level + required: false responses: "200": description: Pin already exists @@ -867,6 +894,12 @@ paths: - $ref: "SwarmCommon.yaml#/components/parameters/SwarmPostageStamp" - $ref: "SwarmCommon.yaml#/components/parameters/SwarmAct" - $ref: "SwarmCommon.yaml#/components/parameters/SwarmActHistoryAddress" + - in: header + schema: + $ref: "SwarmCommon.yaml#/components/parameters/SwarmRedundancyLevelParameter" + name: swarm-redundancy-level + required: false + description: Redundancy level for ACT encryption only requestBody: required: true description: The SOC binary data, composed of the span (8 bytes) and up to 4KB of payload. @@ -965,6 +998,12 @@ paths: - $ref: "SwarmCommon.yaml#/components/parameters/SwarmPostageBatchId" - $ref: "SwarmCommon.yaml#/components/parameters/SwarmAct" - $ref: "SwarmCommon.yaml#/components/parameters/SwarmActHistoryAddress" + - in: header + schema: + $ref: "SwarmCommon.yaml#/components/parameters/SwarmRedundancyLevelParameter" + name: swarm-redundancy-level + required: false + description: Redundancy level for ACT encryption only responses: "201": description: Created @@ -1064,6 +1103,11 @@ paths: $ref: "SwarmCommon.yaml#/components/schemas/SwarmReference" required: true description: "Root hash of content (can be of any type: collection, file, chunk)" + - in: header + schema: + $ref: "SwarmCommon.yaml#/components/parameters/SwarmRedundancyLevelParameter" + name: swarm-redundancy-level + required: false responses: "200": description: Returns if the content is retrievable @@ -1093,6 +1137,11 @@ paths: $ref: "SwarmCommon.yaml#/components/parameters/SwarmPostageBatchId" name: swarm-postage-batch-id description: Postage batch to use for re-upload. If none is provided and the file was uploaded on the same node before, it will reuse the same batch. If not found, it will return error. If a new batch is provided, the chunks are stamped again with the new batch. + - in: header + schema: + $ref: "SwarmCommon.yaml#/components/parameters/SwarmRedundancyLevelParameter" + name: swarm-redundancy-level + required: false responses: "200": description: OK From 8c4bbb0306b66bb51d9e5b3bfa687a71bcde049a Mon Sep 17 00:00:00 2001 From: nugaon Date: Fri, 27 Feb 2026 15:31:50 +0100 Subject: [PATCH 04/16] test: mock interface changes --- pkg/steward/mock/steward.go | 5 +++-- pkg/steward/steward_test.go | 4 ++-- pkg/traversal/traversal_test.go | 8 ++++---- 3 files changed, 9 insertions(+), 8 deletions(-) diff --git a/pkg/steward/mock/steward.go b/pkg/steward/mock/steward.go index b8cee8b3588..737ddf8b3d4 100644 --- a/pkg/steward/mock/steward.go +++ b/pkg/steward/mock/steward.go @@ -7,6 +7,7 @@ package mock import ( "context" + "github.com/ethersphere/bee/v2/pkg/file/redundancy" "github.com/ethersphere/bee/v2/pkg/postage" "github.com/ethersphere/bee/v2/pkg/swarm" ) @@ -18,14 +19,14 @@ type Steward struct { // Reupload implements steward.Interface Reupload method. // The given address is recorded. -func (s *Steward) Reupload(_ context.Context, addr swarm.Address, _ postage.Stamper) error { +func (s *Steward) Reupload(_ context.Context, addr swarm.Address, _ postage.Stamper, _ redundancy.Level) error { s.addr = addr return nil } // IsRetrievable implements steward.Interface IsRetrievable method. // The method always returns true. -func (s *Steward) IsRetrievable(_ context.Context, addr swarm.Address) (bool, error) { +func (s *Steward) IsRetrievable(_ context.Context, addr swarm.Address, _ redundancy.Level) (bool, error) { return addr.Equal(s.addr), nil } diff --git a/pkg/steward/steward_test.go b/pkg/steward/steward_test.go index 7f80614e6d6..336729d9b12 100644 --- a/pkg/steward/steward_test.go +++ b/pkg/steward/steward_test.go @@ -87,7 +87,7 @@ func TestSteward(t *testing.T) { } }() - err = s.Reupload(ctx, addr, stamper) + err = s.Reupload(ctx, addr, stamper, redundancy.PARANOID) if err != nil { t.Fatal(err) } @@ -104,7 +104,7 @@ func TestSteward(t *testing.T) { default: } - isRetrievable, err := s.IsRetrievable(ctx, addr) + isRetrievable, err := s.IsRetrievable(ctx, addr, redundancy.PARANOID) if err != nil { t.Fatal(err) } diff --git a/pkg/traversal/traversal_test.go b/pkg/traversal/traversal_test.go index 2853e7961f6..07373a67a09 100644 --- a/pkg/traversal/traversal_test.go +++ b/pkg/traversal/traversal_test.go @@ -167,7 +167,7 @@ func TestTraversalBytes(t *testing.T) { t.Fatal(err) } - err = traversal.New(storerMock, storerMock, redundancy.DefaultLevel).Traverse(ctx, address, iter.Next) + err = traversal.New(storerMock, storerMock).Traverse(ctx, address, iter.Next, redundancy.DefaultLevel) if err != nil { t.Fatal(err) } @@ -293,7 +293,7 @@ func TestTraversalFiles(t *testing.T) { t.Fatal(err) } - err = traversal.New(storerMock, storerMock, redundancy.DefaultLevel).Traverse(ctx, address, iter.Next) + err = traversal.New(storerMock, storerMock).Traverse(ctx, address, iter.Next, redundancy.DefaultLevel) if err != nil { t.Fatal(err) } @@ -450,7 +450,7 @@ func TestTraversalManifest(t *testing.T) { t.Fatal(err) } - err = traversal.New(storerMock, storerMock, redundancy.DefaultLevel).Traverse(ctx, address, iter.Next) + err = traversal.New(storerMock, storerMock).Traverse(ctx, address, iter.Next, redundancy.DefaultLevel) if err != nil { t.Fatal(err) } @@ -488,7 +488,7 @@ func TestTraversalSOC(t *testing.T) { t.Fatal(err) } - err = traversal.New(store, store, 0).Traverse(ctx, sch.Address(), iter.Next) + err = traversal.New(store, store).Traverse(ctx, sch.Address(), iter.Next, redundancy.NONE) if err != nil { t.Fatal(err) } From 9216d3385d9773063d979385476cccbbdeacf85c Mon Sep 17 00:00:00 2001 From: nugaon Date: Mon, 4 May 2026 14:11:07 +0200 Subject: [PATCH 05/16] feat: defaultDownloadLevel and defaultUploadLevel --- pkg/accesscontrol/controller_test.go | 4 +-- pkg/accesscontrol/grantee_test.go | 2 +- pkg/accesscontrol/history_test.go | 6 ++--- pkg/accesscontrol/kvs/kvs_test.go | 2 +- pkg/accesscontrol/mock/controller.go | 2 +- pkg/api/accesscontrol.go | 8 +++--- pkg/api/accesscontrol_test.go | 2 +- pkg/api/bzz.go | 4 +-- pkg/api/bzz_test.go | 2 +- pkg/api/dirs_test.go | 2 +- pkg/api/feed.go | 2 +- pkg/api/feed_test.go | 2 +- pkg/file/addresses/addresses_getter_test.go | 2 +- pkg/file/file_test.go | 2 +- pkg/file/joiner/joiner_test.go | 30 ++++++++++----------- pkg/file/loadsave/loadsave_test.go | 4 +-- pkg/file/pipeline/hashtrie/hashtrie_test.go | 2 +- pkg/file/redundancy/level.go | 8 ++++-- pkg/traversal/traversal_test.go | 10 +++---- 19 files changed, 50 insertions(+), 46 deletions(-) diff --git a/pkg/accesscontrol/controller_test.go b/pkg/accesscontrol/controller_test.go index c282ef825fb..4d3e0a2a529 100644 --- a/pkg/accesscontrol/controller_test.go +++ b/pkg/accesscontrol/controller_test.go @@ -183,7 +183,7 @@ func TestController_UpdateHandler(t *testing.T) { assertNoError(t, "Session key", err) refCipher := encryption.New(keys[0], 0, 0, sha3.NewLegacyKeccak256) ls := createLs() - gls := loadsave.New(mockStorer.ChunkStore(), mockStorer.Cache(), requestPipelineFactory(context.Background(), mockStorer.Cache(), true, redundancy.NONE), redundancy.DefaultLevel) + gls := loadsave.New(mockStorer.ChunkStore(), mockStorer.Cache(), requestPipelineFactory(context.Background(), mockStorer.Cache(), true, redundancy.NONE), redundancy.DefaultDownloadLevel) c := accesscontrol.NewController(al) href, err := getHistoryFixture(t, ctx, ls, al, &publisher.PublicKey) assertNoError(t, "history fixture create", err) @@ -310,7 +310,7 @@ func TestController_Get(t *testing.T) { al1 := accesscontrol.NewLogic(diffieHellman1) al2 := accesscontrol.NewLogic(diffieHellman2) ls := createLs() - gls := loadsave.New(mockStorer.ChunkStore(), mockStorer.Cache(), requestPipelineFactory(context.Background(), mockStorer.Cache(), true, redundancy.NONE), redundancy.DefaultLevel) + gls := loadsave.New(mockStorer.ChunkStore(), mockStorer.Cache(), requestPipelineFactory(context.Background(), mockStorer.Cache(), true, redundancy.NONE), redundancy.DefaultDownloadLevel) c1 := accesscontrol.NewController(al1) c2 := accesscontrol.NewController(al2) diff --git a/pkg/accesscontrol/grantee_test.go b/pkg/accesscontrol/grantee_test.go index e11f2c5a555..33f4285d409 100644 --- a/pkg/accesscontrol/grantee_test.go +++ b/pkg/accesscontrol/grantee_test.go @@ -32,7 +32,7 @@ func requestPipelineFactory(ctx context.Context, s storage.Putter, encrypt bool, } func createLs() file.LoadSaver { - return loadsave.New(mockStorer.ChunkStore(), mockStorer.Cache(), requestPipelineFactory(context.Background(), mockStorer.Cache(), false, redundancy.NONE), redundancy.DefaultLevel) + return loadsave.New(mockStorer.ChunkStore(), mockStorer.Cache(), requestPipelineFactory(context.Background(), mockStorer.Cache(), false, redundancy.NONE), redundancy.DefaultDownloadLevel) } func generateKeyListFixture() ([]*ecdsa.PublicKey, error) { diff --git a/pkg/accesscontrol/history_test.go b/pkg/accesscontrol/history_test.go index ab976270205..d4218491f47 100644 --- a/pkg/accesscontrol/history_test.go +++ b/pkg/accesscontrol/history_test.go @@ -38,7 +38,7 @@ func TestSingleNodeHistoryLookup(t *testing.T) { t.Parallel() storer := mockstorer.New() ctx := context.Background() - ls := loadsave.New(storer.ChunkStore(), storer.Cache(), pipelineFactory(storer.Cache(), false), redundancy.DefaultLevel) + ls := loadsave.New(storer.ChunkStore(), storer.Cache(), pipelineFactory(storer.Cache(), false), redundancy.DefaultDownloadLevel) h, err := accesscontrol.NewHistory(ls) assertNoError(t, "create history", err) @@ -62,7 +62,7 @@ func TestMultiNodeHistoryLookup(t *testing.T) { t.Parallel() storer := mockstorer.New() ctx := context.Background() - ls := loadsave.New(storer.ChunkStore(), storer.Cache(), pipelineFactory(storer.Cache(), false), redundancy.DefaultLevel) + ls := loadsave.New(storer.ChunkStore(), storer.Cache(), pipelineFactory(storer.Cache(), false), redundancy.DefaultDownloadLevel) h, err := accesscontrol.NewHistory(ls) assertNoError(t, "create history", err) @@ -134,7 +134,7 @@ func TestHistoryStore(t *testing.T) { t.Parallel() storer := mockstorer.New() ctx := context.Background() - ls := loadsave.New(storer.ChunkStore(), storer.Cache(), pipelineFactory(storer.Cache(), false), redundancy.DefaultLevel) + ls := loadsave.New(storer.ChunkStore(), storer.Cache(), pipelineFactory(storer.Cache(), false), redundancy.DefaultDownloadLevel) h1, err := accesscontrol.NewHistory(ls) assertNoError(t, "create history", err) diff --git a/pkg/accesscontrol/kvs/kvs_test.go b/pkg/accesscontrol/kvs/kvs_test.go index d61c9b9b9cd..65caf50a641 100644 --- a/pkg/accesscontrol/kvs/kvs_test.go +++ b/pkg/accesscontrol/kvs/kvs_test.go @@ -31,7 +31,7 @@ func requestPipelineFactory(ctx context.Context, s storage.Putter, encrypt bool, } func createLs() file.LoadSaver { - return loadsave.New(mockStorer.ChunkStore(), mockStorer.Cache(), requestPipelineFactory(context.Background(), mockStorer.Cache(), false, redundancy.NONE), redundancy.DefaultLevel) + return loadsave.New(mockStorer.ChunkStore(), mockStorer.Cache(), requestPipelineFactory(context.Background(), mockStorer.Cache(), false, redundancy.NONE), redundancy.DefaultDownloadLevel) } func keyValuePair(t *testing.T) ([]byte, []byte) { diff --git a/pkg/accesscontrol/mock/controller.go b/pkg/accesscontrol/mock/controller.go index 51df0fce8dc..457dd6a0cf4 100644 --- a/pkg/accesscontrol/mock/controller.go +++ b/pkg/accesscontrol/mock/controller.go @@ -55,7 +55,7 @@ func New(o ...Option) accesscontrol.Controller { refMap: make(map[string]swarm.Address), publisher: "", encrypter: encryption.New(encryption.Key("b6ee086390c280eeb9824c331a4427596f0c8510d5564bc1b6168d0059a46e2b"), 0, 0, sha3.NewLegacyKeccak256), - ls: loadsave.New(storer.ChunkStore(), storer.Cache(), requestPipelineFactory(context.Background(), storer.Cache(), false, redundancy.NONE), redundancy.DefaultLevel), + ls: loadsave.New(storer.ChunkStore(), storer.Cache(), requestPipelineFactory(context.Background(), storer.Cache(), false, redundancy.NONE), redundancy.DefaultDownloadLevel), } for _, v := range o { v.apply(m) diff --git a/pkg/api/accesscontrol.go b/pkg/api/accesscontrol.go index 8af08f0c333..db84cc207c8 100644 --- a/pkg/api/accesscontrol.go +++ b/pkg/api/accesscontrol.go @@ -127,7 +127,7 @@ func (s *Service) actDecryptionHandler() func(h http.Handler) http.Handler { cache = *headers.Cache } - rLevel := redundancy.PARANOID + rLevel := redundancy.DefaultDownloadLevel if headers.RLevel != nil { rLevel = *headers.RLevel } @@ -213,7 +213,7 @@ func (s *Service) actListGranteesHandler(w http.ResponseWriter, r *http.Request) cache = *headers.Cache } - rLevel := redundancy.PARANOID + rLevel := redundancy.DefaultDownloadLevel if headers.RLevel != nil { rLevel = *headers.RLevel } @@ -271,7 +271,7 @@ func (s *Service) actGrantRevokeHandler(w http.ResponseWriter, r *http.Request) historyAddress = *headers.HistoryAddress } - rLevel := redundancy.PARANOID + rLevel := redundancy.DefaultUploadLevel if headers.RLevel != nil { rLevel = *headers.RLevel } @@ -441,7 +441,7 @@ func (s *Service) actCreateGranteesHandler(w http.ResponseWriter, r *http.Reques historyAddress = *headers.HistoryAddress } - rLevel := redundancy.PARANOID + rLevel := redundancy.DefaultUploadLevel if headers.RLevel != nil { rLevel = *headers.RLevel } diff --git a/pkg/api/accesscontrol_test.go b/pkg/api/accesscontrol_test.go index 737a47ebc11..0af51222554 100644 --- a/pkg/api/accesscontrol_test.go +++ b/pkg/api/accesscontrol_test.go @@ -35,7 +35,7 @@ import ( //nolint:ireturn func prepareHistoryFixture(storer api.Storer) (accesscontrol.History, swarm.Address) { ctx := context.Background() - ls := loadsave.New(storer.ChunkStore(), storer.Cache(), pipelineFactory(storer.Cache(), false, redundancy.NONE), redundancy.DefaultLevel) + ls := loadsave.New(storer.ChunkStore(), storer.Cache(), pipelineFactory(storer.Cache(), false, redundancy.NONE), redundancy.DefaultDownloadLevel) h, _ := accesscontrol.NewHistory(ls) diff --git a/pkg/api/bzz.go b/pkg/api/bzz.go index f03fec08b8e..a3be8d00be8 100644 --- a/pkg/api/bzz.go +++ b/pkg/api/bzz.go @@ -525,7 +525,7 @@ func (s *Service) serveReference(logger log.Logger, address swarm.Address, pathV cache = *headers.Cache } - rLevel := redundancy.DefaultLevel + rLevel := redundancy.DefaultDownloadLevel if headers.RLevel != nil { rLevel = *headers.RLevel } @@ -748,7 +748,7 @@ func (s *Service) downloadHandler(logger log.Logger, w http.ResponseWriter, r *h jsonhttp.BadRequest(w, "could not parse headers") return } - rLevel := redundancy.DefaultLevel + rLevel := redundancy.DefaultDownloadLevel if headers.RLevel != nil { rLevel = *headers.RLevel } diff --git a/pkg/api/bzz_test.go b/pkg/api/bzz_test.go index 589222c3f77..3a122c9d0ed 100644 --- a/pkg/api/bzz_test.go +++ b/pkg/api/bzz_test.go @@ -877,7 +877,7 @@ func TestFeedIndirection(t *testing.T) { } m, err := manifest.NewDefaultManifest( - loadsave.New(storer.ChunkStore(), storer.Cache(), pipelineFactory(storer.Cache(), false, 0), redundancy.DefaultLevel), + loadsave.New(storer.ChunkStore(), storer.Cache(), pipelineFactory(storer.Cache(), false, 0), redundancy.DefaultDownloadLevel), false, ) if err != nil { diff --git a/pkg/api/dirs_test.go b/pkg/api/dirs_test.go index bcf4f1ea1c5..3f7ac099ba5 100644 --- a/pkg/api/dirs_test.go +++ b/pkg/api/dirs_test.go @@ -283,7 +283,7 @@ func TestDirs(t *testing.T) { // verify manifest content verifyManifest, err := manifest.NewDefaultManifestReference( resp.Reference, - loadsave.NewReadonly(storer.ChunkStore(), storer.Cache(), redundancy.DefaultLevel), + loadsave.NewReadonly(storer.ChunkStore(), storer.Cache(), redundancy.DefaultDownloadLevel), ) if err != nil { t.Fatal(err) diff --git a/pkg/api/feed.go b/pkg/api/feed.go index a9712894777..6b00035becd 100644 --- a/pkg/api/feed.go +++ b/pkg/api/feed.go @@ -228,7 +228,7 @@ func (s *Service) feedPostHandler(w http.ResponseWriter, r *http.Request) { logger: logger, } - l := loadsave.New(s.storer.ChunkStore(), s.storer.Cache(), requestPipelineFactory(r.Context(), putter, false, 0), redundancy.DefaultLevel) + l := loadsave.New(s.storer.ChunkStore(), s.storer.Cache(), requestPipelineFactory(r.Context(), putter, false, 0), redundancy.DefaultDownloadLevel) feedManifest, err := manifest.NewDefaultManifest(l, false) if err != nil { logger.Debug("create manifest failed", "error", err) diff --git a/pkg/api/feed_test.go b/pkg/api/feed_test.go index 3f08e063b85..4429d1dc7c2 100644 --- a/pkg/api/feed_test.go +++ b/pkg/api/feed_test.go @@ -280,7 +280,7 @@ func TestFeed_Post(t *testing.T) { }), ) - ls := loadsave.NewReadonly(mockStorer.ChunkStore(), mockStorer.Cache(), redundancy.DefaultLevel) + ls := loadsave.NewReadonly(mockStorer.ChunkStore(), mockStorer.Cache(), redundancy.DefaultDownloadLevel) i, err := manifest.NewMantarayManifestReference(expReference, ls) if err != nil { t.Fatal(err) diff --git a/pkg/file/addresses/addresses_getter_test.go b/pkg/file/addresses/addresses_getter_test.go index 3ad1c8237e7..5f6f0678882 100644 --- a/pkg/file/addresses/addresses_getter_test.go +++ b/pkg/file/addresses/addresses_getter_test.go @@ -64,7 +64,7 @@ func TestAddressesGetterIterateChunkAddresses(t *testing.T) { addressesGetter := addresses.NewGetter(store, addressIterFunc) - j, _, err := joiner.New(ctx, addressesGetter, store, rootChunk.Address(), redundancy.DefaultLevel) + j, _, err := joiner.New(ctx, addressesGetter, store, rootChunk.Address(), redundancy.DefaultDownloadLevel) if err != nil { t.Fatal(err) } diff --git a/pkg/file/file_test.go b/pkg/file/file_test.go index 2a5cc038631..810aa284740 100644 --- a/pkg/file/file_test.go +++ b/pkg/file/file_test.go @@ -63,7 +63,7 @@ func testSplitThenJoin(t *testing.T) { } // then join - r, l, err := joiner.New(ctx, store, store, resultAddress, redundancy.DefaultLevel) + r, l, err := joiner.New(ctx, store, store, resultAddress, redundancy.DefaultDownloadLevel) if err != nil { t.Fatal(err) } diff --git a/pkg/file/joiner/joiner_test.go b/pkg/file/joiner/joiner_test.go index bff2b3d2aa6..dd1a7e4228e 100644 --- a/pkg/file/joiner/joiner_test.go +++ b/pkg/file/joiner/joiner_test.go @@ -43,7 +43,7 @@ func TestJoiner_ErrReferenceLength(t *testing.T) { t.Parallel() store := inmemchunkstore.New() - _, _, err := joiner.New(context.Background(), store, store, swarm.ZeroAddress, redundancy.DefaultLevel) + _, _, err := joiner.New(context.Background(), store, store, swarm.ZeroAddress, redundancy.DefaultDownloadLevel) if !errors.Is(err, storage.ErrReferenceLength) { t.Fatalf("expected ErrReferenceLength %x but got %v", swarm.ZeroAddress, err) @@ -72,7 +72,7 @@ func TestJoinerSingleChunk(t *testing.T) { } // read back data and compare - joinReader, l, err := joiner.New(ctx, store, store, mockAddr, redundancy.DefaultLevel) + joinReader, l, err := joiner.New(ctx, store, store, mockAddr, redundancy.DefaultDownloadLevel) if err != nil { t.Fatal(err) } @@ -110,7 +110,7 @@ func TestJoinerDecryptingStore_NormalChunk(t *testing.T) { } // read back data and compare - joinReader, l, err := joiner.New(ctx, st, st, mockAddr, redundancy.DefaultLevel) + joinReader, l, err := joiner.New(ctx, st, st, mockAddr, redundancy.DefaultDownloadLevel) if err != nil { t.Fatal(err) } @@ -157,7 +157,7 @@ func TestJoinerWithReference(t *testing.T) { } // read back data and compare - joinReader, l, err := joiner.New(ctx, st, st, rootChunk.Address(), redundancy.DefaultLevel) + joinReader, l, err := joiner.New(ctx, st, st, rootChunk.Address(), redundancy.DefaultDownloadLevel) if err != nil { t.Fatal(err) } @@ -212,7 +212,7 @@ func TestJoinerMalformed(t *testing.T) { t.Fatal(err) } - joinReader, _, err := joiner.New(ctx, store, store, rootChunk.Address(), redundancy.DefaultLevel) + joinReader, _, err := joiner.New(ctx, store, store, rootChunk.Address(), redundancy.DefaultDownloadLevel) if err != nil { t.Fatal(err) } @@ -257,7 +257,7 @@ func TestEncryptDecrypt(t *testing.T) { if err != nil { t.Fatal(err) } - reader, l, err := joiner.New(context.Background(), store, store, resultAddress, redundancy.DefaultLevel) + reader, l, err := joiner.New(context.Background(), store, store, resultAddress, redundancy.DefaultDownloadLevel) if err != nil { t.Fatal(err) } @@ -343,7 +343,7 @@ func TestSeek(t *testing.T) { t.Fatal(err) } - j, _, err := joiner.New(ctx, store, store, addr, redundancy.DefaultLevel) + j, _, err := joiner.New(ctx, store, store, addr, redundancy.DefaultDownloadLevel) if err != nil { t.Fatal(err) } @@ -619,7 +619,7 @@ func TestPrefetch(t *testing.T) { t.Fatal(err) } - j, _, err := joiner.New(ctx, store, store, addr, redundancy.DefaultLevel) + j, _, err := joiner.New(ctx, store, store, addr, redundancy.DefaultDownloadLevel) if err != nil { t.Fatal(err) } @@ -667,7 +667,7 @@ func TestJoinerReadAt(t *testing.T) { t.Fatal(err) } - j, _, err := joiner.New(ctx, store, store, rootChunk.Address(), redundancy.DefaultLevel) + j, _, err := joiner.New(ctx, store, store, rootChunk.Address(), redundancy.DefaultDownloadLevel) if err != nil { t.Fatal(err) } @@ -715,7 +715,7 @@ func TestJoinerOneLevel(t *testing.T) { t.Fatal(err) } - j, _, err := joiner.New(ctx, store, store, rootChunk.Address(), redundancy.DefaultLevel) + j, _, err := joiner.New(ctx, store, store, rootChunk.Address(), redundancy.DefaultDownloadLevel) if err != nil { t.Fatal(err) } @@ -809,7 +809,7 @@ func TestJoinerTwoLevelsAcrossChunk(t *testing.T) { t.Fatal(err) } - j, _, err := joiner.New(ctx, store, store, rootChunk.Address(), redundancy.DefaultLevel) + j, _, err := joiner.New(ctx, store, store, rootChunk.Address(), redundancy.DefaultDownloadLevel) if err != nil { t.Fatal(err) } @@ -865,7 +865,7 @@ func TestJoinerIterateChunkAddresses(t *testing.T) { createdAddresses := []swarm.Address{rootChunk.Address(), firstAddress, secondAddress} - j, _, err := joiner.New(ctx, store, store, rootChunk.Address(), redundancy.DefaultLevel) + j, _, err := joiner.New(ctx, store, store, rootChunk.Address(), redundancy.DefaultDownloadLevel) if err != nil { t.Fatal(err) } @@ -919,7 +919,7 @@ func TestJoinerIterateChunkAddresses_Encrypted(t *testing.T) { if err != nil { t.Fatal(err) } - j, l, err := joiner.New(context.Background(), store, store, resultAddress, redundancy.DefaultLevel) + j, l, err := joiner.New(context.Background(), store, store, resultAddress, redundancy.DefaultDownloadLevel) if err != nil { t.Fatal(err) } @@ -1123,7 +1123,7 @@ func TestJoinerRedundancy(t *testing.T) { t.Fatal(err) } - joinReader, rootSpan, err := joiner.New(ctx, store, store, swarmAddr, redundancy.DefaultLevel) + joinReader, rootSpan, err := joiner.New(ctx, store, store, swarmAddr, redundancy.DefaultDownloadLevel) if err != nil { t.Fatal(err) } @@ -1318,7 +1318,7 @@ func runRedundancyTest(t *testing.T, rLevel redundancy.Level, encrypt bool, size t.Fatal(err) } - j, _, err := joiner.New(ctx, store, store, addr, redundancy.DefaultLevel) + j, _, err := joiner.New(ctx, store, store, addr, redundancy.DefaultDownloadLevel) if err != nil { t.Fatal(err) } diff --git a/pkg/file/loadsave/loadsave_test.go b/pkg/file/loadsave/loadsave_test.go index a5828402d04..892f29ed32a 100644 --- a/pkg/file/loadsave/loadsave_test.go +++ b/pkg/file/loadsave/loadsave_test.go @@ -29,7 +29,7 @@ func TestLoadSave(t *testing.T) { t.Parallel() store := inmemchunkstore.New() - ls := loadsave.New(store, store, pipelineFn(store), redundancy.DefaultLevel) + ls := loadsave.New(store, store, pipelineFn(store), redundancy.DefaultDownloadLevel) ref, err := ls.Save(context.Background(), data) if err != nil { t.Fatal(err) @@ -51,7 +51,7 @@ func TestReadonlyLoadSave(t *testing.T) { store := inmemchunkstore.New() factory := pipelineFn(store) - ls := loadsave.NewReadonly(store, store, redundancy.DefaultLevel) + ls := loadsave.NewReadonly(store, store, redundancy.DefaultDownloadLevel) _, err := ls.Save(context.Background(), data) if !errors.Is(err, loadsave.ErrReadonlyLoadSave) { t.Fatal("expected error but got none") diff --git a/pkg/file/pipeline/hashtrie/hashtrie_test.go b/pkg/file/pipeline/hashtrie/hashtrie_test.go index 9f502f3fd5d..7df56b8f0ad 100644 --- a/pkg/file/pipeline/hashtrie/hashtrie_test.go +++ b/pkg/file/pipeline/hashtrie/hashtrie_test.go @@ -194,7 +194,7 @@ func TestLevels_TrieFull(t *testing.T) { Params: *r, } - ht = hashtrie.NewHashTrieWriter(ctx, hashSize, rMock, pf, s, redundancy.DefaultLevel) + ht = hashtrie.NewHashTrieWriter(ctx, hashSize, rMock, pf, s, redundancy.DefaultDownloadLevel) ) // to create a level wrap we need to do branching^(level-1) writes diff --git a/pkg/file/redundancy/level.go b/pkg/file/redundancy/level.go index 8a3b30a82ff..118033df4f7 100644 --- a/pkg/file/redundancy/level.go +++ b/pkg/file/redundancy/level.go @@ -173,5 +173,9 @@ func GetReplicaCounts() [5]int { // we use an approximation as the successive powers of 2 var replicaCounts = [5]int{0, 2, 4, 8, 16} -// DefaultLevel is the default redundancy level -const DefaultLevel = PARANOID +// DefaultDownloadLevel is the default redundancy level for downloading chunks +// expected to exist in the network (non-feed chunks) +const DefaultDownloadLevel = PARANOID + +// DefaultUploadLevel is the default redundancy level for uploading chunks +const DefaultUploadLevel = MEDIUM diff --git a/pkg/traversal/traversal_test.go b/pkg/traversal/traversal_test.go index 07373a67a09..70a3c50f37d 100644 --- a/pkg/traversal/traversal_test.go +++ b/pkg/traversal/traversal_test.go @@ -167,7 +167,7 @@ func TestTraversalBytes(t *testing.T) { t.Fatal(err) } - err = traversal.New(storerMock, storerMock).Traverse(ctx, address, iter.Next, redundancy.DefaultLevel) + err = traversal.New(storerMock, storerMock).Traverse(ctx, address, iter.Next, redundancy.DefaultDownloadLevel) if err != nil { t.Fatal(err) } @@ -261,7 +261,7 @@ func TestTraversalFiles(t *testing.T) { t.Fatal(err) } - ls := loadsave.New(storerMock, storerMock, pipelineFactory(storerMock, false), redundancy.DefaultLevel) + ls := loadsave.New(storerMock, storerMock, pipelineFactory(storerMock, false), redundancy.DefaultDownloadLevel) fManifest, err := manifest.NewDefaultManifest(ls, false) if err != nil { t.Fatal(err) @@ -293,7 +293,7 @@ func TestTraversalFiles(t *testing.T) { t.Fatal(err) } - err = traversal.New(storerMock, storerMock).Traverse(ctx, address, iter.Next, redundancy.DefaultLevel) + err = traversal.New(storerMock, storerMock).Traverse(ctx, address, iter.Next, redundancy.DefaultDownloadLevel) if err != nil { t.Fatal(err) } @@ -419,7 +419,7 @@ func TestTraversalManifest(t *testing.T) { } wantHashes = append(wantHashes, tc.manifestHashes...) - ls := loadsave.New(storerMock, storerMock, pipelineFactory(storerMock, false), redundancy.DefaultLevel) + ls := loadsave.New(storerMock, storerMock, pipelineFactory(storerMock, false), redundancy.DefaultDownloadLevel) dirManifest, err := manifest.NewMantarayManifest(ls, false) if err != nil { t.Fatal(err) @@ -450,7 +450,7 @@ func TestTraversalManifest(t *testing.T) { t.Fatal(err) } - err = traversal.New(storerMock, storerMock).Traverse(ctx, address, iter.Next, redundancy.DefaultLevel) + err = traversal.New(storerMock, storerMock).Traverse(ctx, address, iter.Next, redundancy.DefaultDownloadLevel) if err != nil { t.Fatal(err) } From b46d5779bcb5aead62a8ea9f23138662c5eded42 Mon Sep 17 00:00:00 2001 From: nugaon Date: Mon, 4 May 2026 14:13:47 +0200 Subject: [PATCH 06/16] fix: bytes upload default --- pkg/api/bytes.go | 25 +++++++++++++++---------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/pkg/api/bytes.go b/pkg/api/bytes.go index a039e07bdb8..7ba80efdc06 100644 --- a/pkg/api/bytes.go +++ b/pkg/api/bytes.go @@ -35,20 +35,25 @@ func (s *Service) bytesUploadHandler(w http.ResponseWriter, r *http.Request) { defer span.Finish() headers := struct { - BatchID []byte `map:"Swarm-Postage-Batch-Id" validate:"required"` - SwarmTag uint64 `map:"Swarm-Tag"` - Pin bool `map:"Swarm-Pin"` - Deferred *bool `map:"Swarm-Deferred-Upload"` - Encrypt bool `map:"Swarm-Encrypt"` - RLevel redundancy.Level `map:"Swarm-Redundancy-Level" validate:"rLevel"` - Act bool `map:"Swarm-Act"` - HistoryAddress swarm.Address `map:"Swarm-Act-History-Address"` + BatchID []byte `map:"Swarm-Postage-Batch-Id" validate:"required"` + SwarmTag uint64 `map:"Swarm-Tag"` + Pin bool `map:"Swarm-Pin"` + Deferred *bool `map:"Swarm-Deferred-Upload"` + Encrypt bool `map:"Swarm-Encrypt"` + RLevel *redundancy.Level `map:"Swarm-Redundancy-Level" validate:"omitempty,rLevel"` + Act bool `map:"Swarm-Act"` + HistoryAddress swarm.Address `map:"Swarm-Act-History-Address"` }{} if response := s.mapStructure(r.Header, &headers); response != nil { response("invalid header params", logger, w) return } + rLevel := redundancy.DefaultUploadLevel + if headers.RLevel != nil { + rLevel = *headers.RLevel + } + var ( tag uint64 err error @@ -103,7 +108,7 @@ func (s *Service) bytesUploadHandler(w http.ResponseWriter, r *http.Request) { logger: logger, } - p := requestPipelineFn(putter, headers.Encrypt, headers.RLevel) + p := requestPipelineFn(putter, headers.Encrypt, rLevel) reference, err := p(ctx, r.Body) if err != nil { logger.Debug("split write all failed", "error", err) @@ -121,7 +126,7 @@ func (s *Service) bytesUploadHandler(w http.ResponseWriter, r *http.Request) { encryptedReference := reference historyReference := swarm.ZeroAddress if headers.Act { - encryptedReference, historyReference, err = s.actEncryptionHandler(r.Context(), putter, reference, headers.HistoryAddress, headers.RLevel) + encryptedReference, historyReference, err = s.actEncryptionHandler(r.Context(), putter, reference, headers.HistoryAddress, rLevel) if err != nil { logger.Debug("access control upload failed", "error", err) logger.Error(nil, "access control upload failed") From 2bba2a26c1cb25bb83c734f400ff45f0cce9fb74 Mon Sep 17 00:00:00 2001 From: nugaon Date: Mon, 4 May 2026 14:17:08 +0200 Subject: [PATCH 07/16] fix: rlevel validations in the header --- pkg/api/accesscontrol.go | 8 ++++---- pkg/api/chunk.go | 2 +- pkg/api/feed.go | 2 +- pkg/api/pin.go | 2 +- pkg/api/soc.go | 2 +- pkg/api/stewardship.go | 4 ++-- 6 files changed, 10 insertions(+), 10 deletions(-) diff --git a/pkg/api/accesscontrol.go b/pkg/api/accesscontrol.go index db84cc207c8..d893f1b0673 100644 --- a/pkg/api/accesscontrol.go +++ b/pkg/api/accesscontrol.go @@ -104,7 +104,7 @@ func (s *Service) actDecryptionHandler() func(h http.Handler) http.Handler { Publisher *ecdsa.PublicKey `map:"Swarm-Act-Publisher"` HistoryAddress *swarm.Address `map:"Swarm-Act-History-Address"` Cache *bool `map:"Swarm-Cache"` - RLevel *redundancy.Level `map:"Swarm-Redundancy-Level"` + RLevel *redundancy.Level `map:"Swarm-Redundancy-Level" validate:"omitempty,rLevel"` }{} if response := s.mapStructure(r.Header, &headers); response != nil { response("invalid header params", logger, w) @@ -202,7 +202,7 @@ func (s *Service) actListGranteesHandler(w http.ResponseWriter, r *http.Request) headers := struct { Cache *bool `map:"Swarm-Cache"` - RLevel *redundancy.Level `map:"Swarm-Redundancy-Level"` + RLevel *redundancy.Level `map:"Swarm-Redundancy-Level" validate:"omitempty,rLevel"` }{} if response := s.mapStructure(r.Header, &headers); response != nil { response("invalid header params", logger, w) @@ -259,7 +259,7 @@ func (s *Service) actGrantRevokeHandler(w http.ResponseWriter, r *http.Request) Pin bool `map:"Swarm-Pin"` Deferred *bool `map:"Swarm-Deferred-Upload"` HistoryAddress *swarm.Address `map:"Swarm-Act-History-Address" validate:"required"` - RLevel *redundancy.Level `map:"Swarm-Redundancy-Level"` + RLevel *redundancy.Level `map:"Swarm-Redundancy-Level" validate:"omitempty,rLevel"` }{} if response := s.mapStructure(r.Header, &headers); response != nil { response("invalid header params", logger, w) @@ -429,7 +429,7 @@ func (s *Service) actCreateGranteesHandler(w http.ResponseWriter, r *http.Reques Pin bool `map:"Swarm-Pin"` Deferred *bool `map:"Swarm-Deferred-Upload"` HistoryAddress *swarm.Address `map:"Swarm-Act-History-Address"` - RLevel *redundancy.Level `map:"Swarm-Redundancy-Level"` + RLevel *redundancy.Level `map:"Swarm-Redundancy-Level" validate:"omitempty,rLevel"` }{} if response := s.mapStructure(r.Header, &headers); response != nil { response("invalid header params", logger, w) diff --git a/pkg/api/chunk.go b/pkg/api/chunk.go index e8f0537dba8..8e7d6f17109 100644 --- a/pkg/api/chunk.go +++ b/pkg/api/chunk.go @@ -38,7 +38,7 @@ func (s *Service) chunkUploadHandler(w http.ResponseWriter, r *http.Request) { SwarmTag uint64 `map:"Swarm-Tag"` Act bool `map:"Swarm-Act"` HistoryAddress swarm.Address `map:"Swarm-Act-History-Address"` - RLevel *redundancy.Level `map:"Swarm-Redundancy-Level"` + RLevel *redundancy.Level `map:"Swarm-Redundancy-Level" validate:"omitempty,rLevel"` }{} if response := s.mapStructure(r.Header, &headers); response != nil { response("invalid header params", logger, w) diff --git a/pkg/api/feed.go b/pkg/api/feed.go index 6b00035becd..fa5df0cef14 100644 --- a/pkg/api/feed.go +++ b/pkg/api/feed.go @@ -173,7 +173,7 @@ func (s *Service) feedPostHandler(w http.ResponseWriter, r *http.Request) { Deferred *bool `map:"Swarm-Deferred-Upload"` Act bool `map:"Swarm-Act"` HistoryAddress swarm.Address `map:"Swarm-Act-History-Address"` - RLevel *redundancy.Level `map:"Swarm-Redundancy-Level"` + RLevel *redundancy.Level `map:"Swarm-Redundancy-Level" validate:"omitempty,rLevel"` }{} if response := s.mapStructure(r.Header, &headers); response != nil { response("invalid header params", logger, w) diff --git a/pkg/api/pin.go b/pkg/api/pin.go index b1e43704098..b901fd5f0af 100644 --- a/pkg/api/pin.go +++ b/pkg/api/pin.go @@ -33,7 +33,7 @@ func (s *Service) pinRootHash(w http.ResponseWriter, r *http.Request) { } headers := struct { - RLevel *redundancy.Level `map:"Swarm-Redundancy-Level"` + RLevel *redundancy.Level `map:"Swarm-Redundancy-Level" validate:"omitempty,rLevel"` }{} if response := s.mapStructure(r.Header, &headers); response != nil { response("invalid header params", logger, w) diff --git a/pkg/api/soc.go b/pkg/api/soc.go index ce098a54f17..62ab7cdfec4 100644 --- a/pkg/api/soc.go +++ b/pkg/api/soc.go @@ -52,7 +52,7 @@ func (s *Service) socUploadHandler(w http.ResponseWriter, r *http.Request) { StampSig []byte `map:"Swarm-Postage-Stamp"` Act bool `map:"Swarm-Act"` HistoryAddress swarm.Address `map:"Swarm-Act-History-Address"` - RLevel *redundancy.Level `map:"Swarm-Redundancy-Level"` + RLevel *redundancy.Level `map:"Swarm-Redundancy-Level" validate:"omitempty,rLevel"` }{} if response := s.mapStructure(r.Header, &headers); response != nil { response("invalid header params", logger, w) diff --git a/pkg/api/stewardship.go b/pkg/api/stewardship.go index 962ba41991f..7f6fb4b3149 100644 --- a/pkg/api/stewardship.go +++ b/pkg/api/stewardship.go @@ -31,7 +31,7 @@ func (s *Service) stewardshipPutHandler(w http.ResponseWriter, r *http.Request) headers := struct { BatchID []byte `map:"Swarm-Postage-Batch-Id" validate:"required"` - RLevel *redundancy.Level `map:"Swarm-Redundancy-Level"` + RLevel *redundancy.Level `map:"Swarm-Redundancy-Level" validate:"omitempty,rLevel"` }{} if response := s.mapStructure(r.Header, &headers); response != nil { response("invalid header params", logger, w) @@ -99,7 +99,7 @@ func (s *Service) stewardshipGetHandler(w http.ResponseWriter, r *http.Request) } headers := struct { - RLevel *redundancy.Level `map:"Swarm-Redundancy-Level"` + RLevel *redundancy.Level `map:"Swarm-Redundancy-Level" validate:"omitempty,rLevel"` }{} if response := s.mapStructure(r.Header, &headers); response != nil { response("invalid header params", logger, w) From 9e94c28baa4f7ce1984d60aee7da09e252f2183a Mon Sep 17 00:00:00 2001 From: nugaon Date: Mon, 4 May 2026 14:44:38 +0200 Subject: [PATCH 08/16] feat: use header redundancy level for feed upload pipeline Apply the redundancy level from request headers to the feed upload pipeline instead of using the default download level. Move redundancy level extraction earlier to use it for both the manifest creation and ACT encryption. --- pkg/api/feed.go | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/pkg/api/feed.go b/pkg/api/feed.go index fa5df0cef14..12c87736896 100644 --- a/pkg/api/feed.go +++ b/pkg/api/feed.go @@ -180,6 +180,11 @@ func (s *Service) feedPostHandler(w http.ResponseWriter, r *http.Request) { return } + rLevel := redundancy.DefaultUploadLevel + if headers.RLevel != nil { + rLevel = *headers.RLevel + } + var ( tag storer.SessionInfo err error @@ -228,7 +233,7 @@ func (s *Service) feedPostHandler(w http.ResponseWriter, r *http.Request) { logger: logger, } - l := loadsave.New(s.storer.ChunkStore(), s.storer.Cache(), requestPipelineFactory(r.Context(), putter, false, 0), redundancy.DefaultDownloadLevel) + l := loadsave.New(s.storer.ChunkStore(), s.storer.Cache(), requestPipelineFactory(r.Context(), putter, false, rLevel), rLevel) feedManifest, err := manifest.NewDefaultManifest(l, false) if err != nil { logger.Debug("create manifest failed", "error", err) @@ -280,10 +285,6 @@ func (s *Service) feedPostHandler(w http.ResponseWriter, r *http.Request) { encryptedReference := ref historyReference := swarm.ZeroAddress if headers.Act { - rLevel := redundancy.PARANOID - if headers.RLevel != nil { - rLevel = *headers.RLevel - } encryptedReference, historyReference, err = s.actEncryptionHandler(r.Context(), putter, ref, headers.HistoryAddress, rLevel) if err != nil { logger.Debug("access control upload failed", "error", err) From 7e524b426efd4d3c483db239230562fd3f451f55 Mon Sep 17 00:00:00 2001 From: nugaon Date: Mon, 4 May 2026 14:47:53 +0200 Subject: [PATCH 09/16] refactor: pin post rLevel is defaultDownloadLevel --- pkg/api/pin.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/api/pin.go b/pkg/api/pin.go index b901fd5f0af..9ce1bf053bd 100644 --- a/pkg/api/pin.go +++ b/pkg/api/pin.go @@ -40,7 +40,7 @@ func (s *Service) pinRootHash(w http.ResponseWriter, r *http.Request) { return } - rLevel := redundancy.PARANOID + rLevel := redundancy.DefaultDownloadLevel if headers.RLevel != nil { rLevel = *headers.RLevel } From b0573f510c27393ef8b46fbc9f40e0444bff578f Mon Sep 17 00:00:00 2001 From: nugaon Date: Mon, 4 May 2026 14:52:21 +0200 Subject: [PATCH 10/16] refactor: use defaultUploadLevel on chunk ACT upload --- pkg/api/chunk.go | 17 ++++++----------- 1 file changed, 6 insertions(+), 11 deletions(-) diff --git a/pkg/api/chunk.go b/pkg/api/chunk.go index 8e7d6f17109..80130df2d31 100644 --- a/pkg/api/chunk.go +++ b/pkg/api/chunk.go @@ -33,12 +33,11 @@ func (s *Service) chunkUploadHandler(w http.ResponseWriter, r *http.Request) { logger := s.logger.WithName("post_chunk").Build() headers := struct { - BatchID []byte `map:"Swarm-Postage-Batch-Id"` - StampSig []byte `map:"Swarm-Postage-Stamp"` - SwarmTag uint64 `map:"Swarm-Tag"` - Act bool `map:"Swarm-Act"` - HistoryAddress swarm.Address `map:"Swarm-Act-History-Address"` - RLevel *redundancy.Level `map:"Swarm-Redundancy-Level" validate:"omitempty,rLevel"` + BatchID []byte `map:"Swarm-Postage-Batch-Id"` + StampSig []byte `map:"Swarm-Postage-Stamp"` + SwarmTag uint64 `map:"Swarm-Tag"` + Act bool `map:"Swarm-Act"` + HistoryAddress swarm.Address `map:"Swarm-Act-History-Address"` }{} if response := s.mapStructure(r.Header, &headers); response != nil { response("invalid header params", logger, w) @@ -187,11 +186,7 @@ func (s *Service) chunkUploadHandler(w http.ResponseWriter, r *http.Request) { reference := chunk.Address() historyReference := swarm.ZeroAddress if headers.Act { - rLevel := redundancy.PARANOID - if headers.RLevel != nil { - rLevel = *headers.RLevel - } - reference, historyReference, err = s.actEncryptionHandler(r.Context(), putter, reference, headers.HistoryAddress, rLevel) + reference, historyReference, err = s.actEncryptionHandler(r.Context(), putter, reference, headers.HistoryAddress, redundancy.DefaultUploadLevel) if err != nil { logger.Debug("access control upload failed", "error", err) logger.Error(nil, "access control upload failed") From a21abc86e9088ccd9be0a8294b7064d7749fcb29 Mon Sep 17 00:00:00 2001 From: nugaon Date: Tue, 5 May 2026 11:44:17 +0200 Subject: [PATCH 11/16] revert: redundancy getter fix --- pkg/file/redundancy/getter/getter.go | 13 ++----------- 1 file changed, 2 insertions(+), 11 deletions(-) diff --git a/pkg/file/redundancy/getter/getter.go b/pkg/file/redundancy/getter/getter.go index 6027e63da15..8244142ad9b 100644 --- a/pkg/file/redundancy/getter/getter.go +++ b/pkg/file/redundancy/getter/getter.go @@ -240,14 +240,7 @@ func (g *decoder) runStrategy(s Strategy) error { c := make(chan error, len(m)) ctx, cancel := context.WithCancel(context.Background()) - completed := 0 - defer func() { - cancel() - remaining := len(m) - completed - for i := 0; i < remaining; i++ { - <-c - } - }() + defer cancel() for _, i := range m { go func(i int) { @@ -255,9 +248,7 @@ func (g *decoder) runStrategy(s Strategy) error { }(i) } - for completed < len(m) { - <-c - completed++ + for range c { if g.fetchedCnt.Load() >= int32(g.shardCnt) { return nil } From 4c05fd5693d2a79b2602d239850ce1d73c0aa2e8 Mon Sep 17 00:00:00 2001 From: nugaon Date: Wed, 6 May 2026 13:38:01 +0200 Subject: [PATCH 12/16] refactor: remaining hard-coded levels --- pkg/api/soc.go | 2 +- pkg/api/stewardship.go | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/api/soc.go b/pkg/api/soc.go index 62ab7cdfec4..00309ea3043 100644 --- a/pkg/api/soc.go +++ b/pkg/api/soc.go @@ -183,7 +183,7 @@ func (s *Service) socUploadHandler(w http.ResponseWriter, r *http.Request) { reference := sch.Address() historyReference := swarm.ZeroAddress if headers.Act { - rLevel := redundancy.PARANOID + rLevel := redundancy.DefaultUploadLevel if headers.RLevel != nil { rLevel = *headers.RLevel } diff --git a/pkg/api/stewardship.go b/pkg/api/stewardship.go index 7f6fb4b3149..9767334baca 100644 --- a/pkg/api/stewardship.go +++ b/pkg/api/stewardship.go @@ -38,7 +38,7 @@ func (s *Service) stewardshipPutHandler(w http.ResponseWriter, r *http.Request) return } - rLevel := redundancy.PARANOID + rLevel := redundancy.DefaultUploadLevel if headers.RLevel != nil { rLevel = *headers.RLevel } @@ -106,7 +106,7 @@ func (s *Service) stewardshipGetHandler(w http.ResponseWriter, r *http.Request) return } - rLevel := redundancy.PARANOID + rLevel := redundancy.DefaultDownloadLevel if headers.RLevel != nil { rLevel = *headers.RLevel } From 4fd61f1c65429ac7a99d4d4768fa6b926ea7165b Mon Sep 17 00:00:00 2001 From: nugaon Date: Wed, 6 May 2026 14:07:59 +0200 Subject: [PATCH 13/16] test: use old redundancy levels at hardcoded hashes --- pkg/api/accesscontrol_test.go | 1 + pkg/api/bytes_test.go | 1 + 2 files changed, 2 insertions(+) diff --git a/pkg/api/accesscontrol_test.go b/pkg/api/accesscontrol_test.go index 0af51222554..1fdfd25cc44 100644 --- a/pkg/api/accesscontrol_test.go +++ b/pkg/api/accesscontrol_test.go @@ -170,6 +170,7 @@ func TestAccessLogicEachEndpointWithAct(t *testing.T) { jsonhttptest.WithRequestHeader(api.SwarmPostageBatchIdHeader, batchOkStr), jsonhttptest.WithRequestHeader(api.SwarmPinHeader, "true"), jsonhttptest.WithRequestHeader(api.SwarmTagHeader, fmt.Sprintf("%d", tag.TagID)), + jsonhttptest.WithRequestHeader(api.SwarmRedundancyLevelHeader, "0"), jsonhttptest.WithRequestBody(v.data), jsonhttptest.WithExpectedJSONResponse(v.resp), jsonhttptest.WithRequestHeader(api.ContentTypeHeader, v.contenttype), diff --git a/pkg/api/bytes_test.go b/pkg/api/bytes_test.go index 9351dc511d0..5a15694ea01 100644 --- a/pkg/api/bytes_test.go +++ b/pkg/api/bytes_test.go @@ -57,6 +57,7 @@ func TestBytes(t *testing.T) { jsonhttptest.Request(t, client, http.MethodPost, resource, http.StatusCreated, jsonhttptest.WithRequestHeader(api.SwarmDeferredUploadHeader, "true"), jsonhttptest.WithRequestHeader(api.SwarmPostageBatchIdHeader, batchOkStr), + jsonhttptest.WithRequestHeader(api.SwarmRedundancyLevelHeader, "0"), jsonhttptest.WithRequestBody(bytes.NewReader(content)), jsonhttptest.WithExpectedJSONResponse(api.BytesPostResponse{ Reference: chunkAddr, From faaf897951e6da27e7d78ae1b5e3a37f9b97b411 Mon Sep 17 00:00:00 2001 From: nugaon Date: Wed, 20 May 2026 11:57:21 +0200 Subject: [PATCH 14/16] fix: bzz with rlevel pointer --- pkg/api/bzz.go | 29 +++++++++++++++++------------ 1 file changed, 17 insertions(+), 12 deletions(-) diff --git a/pkg/api/bzz.go b/pkg/api/bzz.go index a3be8d00be8..1bd3854d4d9 100644 --- a/pkg/api/bzz.go +++ b/pkg/api/bzz.go @@ -70,16 +70,16 @@ func (s *Service) bzzUploadHandler(w http.ResponseWriter, r *http.Request) { defer span.Finish() headers := struct { - ContentType string `map:"Content-Type"` - BatchID []byte `map:"Swarm-Postage-Batch-Id" validate:"required"` - SwarmTag uint64 `map:"Swarm-Tag"` - Pin bool `map:"Swarm-Pin"` - Deferred *bool `map:"Swarm-Deferred-Upload"` - Encrypt bool `map:"Swarm-Encrypt"` - IsDir bool `map:"Swarm-Collection"` - RLevel redundancy.Level `map:"Swarm-Redundancy-Level" validate:"rLevel"` - Act bool `map:"Swarm-Act"` - HistoryAddress swarm.Address `map:"Swarm-Act-History-Address"` + ContentType string `map:"Content-Type"` + BatchID []byte `map:"Swarm-Postage-Batch-Id" validate:"required"` + SwarmTag uint64 `map:"Swarm-Tag"` + Pin bool `map:"Swarm-Pin"` + Deferred *bool `map:"Swarm-Deferred-Upload"` + Encrypt bool `map:"Swarm-Encrypt"` + IsDir bool `map:"Swarm-Collection"` + RLevel *redundancy.Level `map:"Swarm-Redundancy-Level" validate:"omitempty,rLevel"` + Act bool `map:"Swarm-Act"` + HistoryAddress swarm.Address `map:"Swarm-Act-History-Address"` }{} if response := s.mapStructure(r.Header, &headers); response != nil { response("invalid header params", logger, w) @@ -145,9 +145,14 @@ func (s *Service) bzzUploadHandler(w http.ResponseWriter, r *http.Request) { mt, _, errParseCT := mime.ParseMediaType(contentTypeHdr) isMultipart := errParseCT == nil && mt == multiPartFormData + rLevel := redundancy.DefaultUploadLevel + if headers.RLevel != nil { + rLevel = *headers.RLevel + } + isDirUpload := headers.IsDir || isMultipart if !isDirUpload { - s.fileUploadHandler(ctx, logger, span, ow, r, putter, headers.Encrypt, tag, headers.RLevel, headers.Act, headers.HistoryAddress) + s.fileUploadHandler(ctx, logger, span, ow, r, putter, headers.Encrypt, tag, rLevel, headers.Act, headers.HistoryAddress) return } @@ -157,7 +162,7 @@ func (s *Service) bzzUploadHandler(w http.ResponseWriter, r *http.Request) { return } - s.dirUploadHandler(ctx, logger, span, ow, r, putter, headers.Encrypt, tag, headers.RLevel, headers.Act, headers.HistoryAddress) + s.dirUploadHandler(ctx, logger, span, ow, r, putter, headers.Encrypt, tag, rLevel, headers.Act, headers.HistoryAddress) } // bzzUploadResponse is returned when an HTTP request to upload a file is successful From bb952b4845625d0d22c055b48302c8c9ae71bfdc Mon Sep 17 00:00:00 2001 From: nugaon Date: Wed, 20 May 2026 12:22:09 +0200 Subject: [PATCH 15/16] refactor: remove rlevel from soc and chunk endpoints --- openapi/Swarm.yaml | 12 ------------ pkg/api/chunk.go | 1 + pkg/api/soc.go | 16 ++++++---------- 3 files changed, 7 insertions(+), 22 deletions(-) diff --git a/openapi/Swarm.yaml b/openapi/Swarm.yaml index 76b866819a3..15219a62ee3 100644 --- a/openapi/Swarm.yaml +++ b/openapi/Swarm.yaml @@ -296,12 +296,6 @@ paths: - $ref: "SwarmCommon.yaml#/components/parameters/SwarmPostageStamp" - $ref: "SwarmCommon.yaml#/components/parameters/SwarmAct" - $ref: "SwarmCommon.yaml#/components/parameters/SwarmActHistoryAddress" - - in: header - schema: - $ref: "SwarmCommon.yaml#/components/parameters/SwarmRedundancyLevelParameter" - name: swarm-redundancy-level - required: false - description: Redundancy level for ACT encryption only requestBody: description: Chunk binary data containing at least 8 bytes. content: @@ -894,12 +888,6 @@ paths: - $ref: "SwarmCommon.yaml#/components/parameters/SwarmPostageStamp" - $ref: "SwarmCommon.yaml#/components/parameters/SwarmAct" - $ref: "SwarmCommon.yaml#/components/parameters/SwarmActHistoryAddress" - - in: header - schema: - $ref: "SwarmCommon.yaml#/components/parameters/SwarmRedundancyLevelParameter" - name: swarm-redundancy-level - required: false - description: Redundancy level for ACT encryption only requestBody: required: true description: The SOC binary data, composed of the span (8 bytes) and up to 4KB of payload. diff --git a/pkg/api/chunk.go b/pkg/api/chunk.go index 80130df2d31..80583731aac 100644 --- a/pkg/api/chunk.go +++ b/pkg/api/chunk.go @@ -186,6 +186,7 @@ func (s *Service) chunkUploadHandler(w http.ResponseWriter, r *http.Request) { reference := chunk.Address() historyReference := swarm.ZeroAddress if headers.Act { + // Redundancy level is hardcoded; ACT on chunk endpoints is semantically broken and will be removed, see https://github.com/ethersphere/bee/issues/5469. reference, historyReference, err = s.actEncryptionHandler(r.Context(), putter, reference, headers.HistoryAddress, redundancy.DefaultUploadLevel) if err != nil { logger.Debug("access control upload failed", "error", err) diff --git a/pkg/api/soc.go b/pkg/api/soc.go index 00309ea3043..27a2c659f6c 100644 --- a/pkg/api/soc.go +++ b/pkg/api/soc.go @@ -48,11 +48,10 @@ func (s *Service) socUploadHandler(w http.ResponseWriter, r *http.Request) { } headers := struct { - BatchID []byte `map:"Swarm-Postage-Batch-Id"` - StampSig []byte `map:"Swarm-Postage-Stamp"` - Act bool `map:"Swarm-Act"` - HistoryAddress swarm.Address `map:"Swarm-Act-History-Address"` - RLevel *redundancy.Level `map:"Swarm-Redundancy-Level" validate:"omitempty,rLevel"` + BatchID []byte `map:"Swarm-Postage-Batch-Id"` + StampSig []byte `map:"Swarm-Postage-Stamp"` + Act bool `map:"Swarm-Act"` + HistoryAddress swarm.Address `map:"Swarm-Act-History-Address"` }{} if response := s.mapStructure(r.Header, &headers); response != nil { response("invalid header params", logger, w) @@ -183,11 +182,8 @@ func (s *Service) socUploadHandler(w http.ResponseWriter, r *http.Request) { reference := sch.Address() historyReference := swarm.ZeroAddress if headers.Act { - rLevel := redundancy.DefaultUploadLevel - if headers.RLevel != nil { - rLevel = *headers.RLevel - } - reference, historyReference, err = s.actEncryptionHandler(r.Context(), putter, reference, headers.HistoryAddress, rLevel) + // Redundancy level is hardcoded; ACT on SOC is semantically broken and will be removed, see https://github.com/ethersphere/bee/issues/5469. + reference, historyReference, err = s.actEncryptionHandler(r.Context(), putter, reference, headers.HistoryAddress, redundancy.DefaultUploadLevel) if err != nil { logger.Debug("access control upload failed", "error", err) logger.Error(nil, "access control upload failed") From c78113b5dffd5b575d544255e6c9b392e73b6bdc Mon Sep 17 00:00:00 2001 From: nugaon Date: Wed, 20 May 2026 12:35:32 +0200 Subject: [PATCH 16/16] docs: update feed upload redundancy level parameter description --- openapi/Swarm.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/openapi/Swarm.yaml b/openapi/Swarm.yaml index 15219a62ee3..02f77bbd82d 100644 --- a/openapi/Swarm.yaml +++ b/openapi/Swarm.yaml @@ -991,7 +991,7 @@ paths: $ref: "SwarmCommon.yaml#/components/parameters/SwarmRedundancyLevelParameter" name: swarm-redundancy-level required: false - description: Redundancy level for ACT encryption only + description: Redundancy level for the feed manifest upload pipeline and ACT encryption responses: "201": description: Created