diff --git a/cmd/bee/cmd/cmd.go b/cmd/bee/cmd/cmd.go index 5e0677065fe..1cd66e705be 100644 --- a/cmd/bee/cmd/cmd.go +++ b/cmd/bee/cmd/cmd.go @@ -88,6 +88,7 @@ const ( optionAutoTLSDomain = "autotls-domain" optionAutoTLSRegistrationEndpoint = "autotls-registration-endpoint" optionAutoTLSCAEndpoint = "autotls-ca-endpoint" + optionNamePubsubBrokerMode = "pubsub-broker-mode" optionUseSIMD = "use-simd-hashing" // blockchain-rpc @@ -335,6 +336,7 @@ func (c *command) setAllFlags(cmd *cobra.Command) { cmd.Flags().String(optionAutoTLSDomain, p2pforge.DefaultForgeDomain, "autotls domain") cmd.Flags().String(optionAutoTLSRegistrationEndpoint, p2pforge.DefaultForgeEndpoint, "autotls registration endpoint") cmd.Flags().String(optionAutoTLSCAEndpoint, p2pforge.DefaultCAEndpoint, "autotls certificate authority endpoint") + cmd.Flags().Bool(optionNamePubsubBrokerMode, true, "enable pubsub broker mode") cmd.Flags().Bool(optionUseSIMD, false, "use SIMD BMT hasher (available only on linux amd64 platforms)") } diff --git a/cmd/bee/cmd/start.go b/cmd/bee/cmd/start.go index bf6fe5404c7..7ded9e051c9 100644 --- a/cmd/bee/cmd/start.go +++ b/cmd/bee/cmd/start.go @@ -351,6 +351,7 @@ func buildBeeNode(ctx context.Context, c *command, cmd *cobra.Command, logger lo WarmupTime: c.config.GetDuration(optionWarmUpTime), WelcomeMessage: c.config.GetString(optionWelcomeMessage), WhitelistedWithdrawalAddress: c.config.GetStringSlice(optionNameWhitelistedWithdrawalAddress), + PubsubBrokerMode: c.config.GetBool(optionNamePubsubBrokerMode), }) return b, err diff --git a/openapi/Swarm.yaml b/openapi/Swarm.yaml index 70f9ffaf4be..cd9f80f616c 100644 --- a/openapi/Swarm.yaml +++ b/openapi/Swarm.yaml @@ -2541,3 +2541,57 @@ paths: $ref: "SwarmCommon.yaml#/components/responses/400" default: description: Default response. + + "/pubsub/{topic}": + get: + summary: Connect to a pubsub topic via WebSocket + description: | + Opens a WebSocket connection to a pubsub topic. The connection acts as either a publisher (read+write) + or subscriber (read-only) depending on the presence of GSOC query params. + + **WebSocket protocol:** + - Inbound (client → node, publisher only): raw SOC payload `[sig:65B][span:8B][payload:N B]` + - Outbound (node → client): raw SOC payload `[sig:65B][span:8B][payload:N B]` + tags: + - Pubsub + parameters: + - in: path + name: topic + schema: + type: string + required: true + description: Topic identifier (hex-encoded address or arbitrary string to be hashed) + - $ref: "SwarmCommon.yaml#/components/parameters/SwarmPubsubPeer" + - $ref: "SwarmCommon.yaml#/components/parameters/SwarmPubsubGsocEthAddress" + - $ref: "SwarmCommon.yaml#/components/parameters/SwarmPubsubGsocTopic" + - in: header + name: swarm-keep-alive + schema: + type: integer + required: false + description: "WebSocket ping period in seconds (default: 60)" + responses: + "101": + description: WebSocket upgrade successful + "400": + $ref: "SwarmCommon.yaml#/components/responses/400" + "500": + $ref: "SwarmCommon.yaml#/components/responses/500" + + "/pubsub/": + get: + summary: List all pubsub topics + description: Returns a list of all active pubsub topics this node is participating in (as broker or subscriber) + tags: + - Pubsub + responses: + "200": + description: List of pubsub topics + content: + application/json: + schema: + $ref: "SwarmCommon.yaml#/components/schemas/PubsubTopicListResponse" + "400": + $ref: "SwarmCommon.yaml#/components/responses/400" + "500": + $ref: "SwarmCommon.yaml#/components/responses/500" diff --git a/openapi/SwarmCommon.yaml b/openapi/SwarmCommon.yaml index 303245501f9..1d50bf8f5ba 100644 --- a/openapi/SwarmCommon.yaml +++ b/openapi/SwarmCommon.yaml @@ -1052,6 +1052,33 @@ components: properties: transactionHash: $ref: "#/components/schemas/TransactionHash" + + PubsubTopicInfo: + type: object + properties: + topicAddress: + type: string + description: "Hex-encoded topic address" + mode: + type: integer + description: "Pubsub mode identifier" + role: + type: string + description: "Role of this node: 'broker' or 'subscriber'" + connections: + type: array + items: + type: string + description: "List of connected peer overlays" + + PubsubTopicListResponse: + type: object + properties: + topics: + type: array + items: + $ref: "#/components/schemas/PubsubTopicInfo" + headers: SwarmTag: description: "Tag UID" @@ -1095,7 +1122,6 @@ components: required: false description: "Indicates which feed version was resolved (v1 or v2)" - parameters: GasPriceParameter: in: header @@ -1308,6 +1334,30 @@ components: required: false description: "ACT history Unix timestamp" + SwarmPubsubPeer: + in: query + name: peer + schema: + type: string + required: true + description: "Multiaddress of the broker peer to connect to for pubsub" + + SwarmPubsubGsocEthAddress: + in: query + name: gsoc-eth-address + schema: + $ref: "#/components/schemas/HexString" + required: false + description: "GSOC owner Ethereum address (20 bytes, hex-encoded) for publisher role. Required together with gsoc-topic to upgrade to publisher." + + SwarmPubsubGsocTopic: + in: query + name: gsoc-topic + schema: + $ref: "#/components/schemas/HexString" + required: false + description: "GSOC topic identifier (hex) for publisher role. Required together with gsoc-eth-address to upgrade to publisher." + responses: "200": description: Success diff --git a/pkg/api/api.go b/pkg/api/api.go index acd838a3ff6..a519eccbf08 100644 --- a/pkg/api/api.go +++ b/pkg/api/api.go @@ -40,6 +40,7 @@ import ( "github.com/ethersphere/bee/v2/pkg/postage" "github.com/ethersphere/bee/v2/pkg/postage/postagecontract" "github.com/ethersphere/bee/v2/pkg/pss" + "github.com/ethersphere/bee/v2/pkg/pubsub" "github.com/ethersphere/bee/v2/pkg/resolver" "github.com/ethersphere/bee/v2/pkg/resolver/client/ens" "github.com/ethersphere/bee/v2/pkg/resolver/multiresolver" @@ -93,11 +94,10 @@ const ( SwarmActTimestampHeader = "Swarm-Act-Timestamp" SwarmActPublisherHeader = "Swarm-Act-Publisher" SwarmActHistoryAddressHeader = "Swarm-Act-History-Address" - - ImmutableHeader = "Immutable" - GasPriceHeader = "Gas-Price" - GasLimitHeader = "Gas-Limit" - ETagHeader = "ETag" + ImmutableHeader = "Immutable" + GasPriceHeader = "Gas-Price" + GasLimitHeader = "Gas-Limit" + ETagHeader = "ETag" AuthorizationHeader = "Authorization" AcceptEncodingHeader = "Accept-Encoding" @@ -185,6 +185,7 @@ type Service struct { topologyDriver topology.Driver p2p p2p.DebugService + pubsubSvc *pubsub.Service accounting accounting.Interface chequebook chequebook.Service pseudosettle settlement.Interface @@ -268,6 +269,7 @@ type ExtraOptions struct { SyncStatus func() (bool, error) NodeStatus *status.Service PinIntegrity PinIntegrity + PubsubService *pubsub.Service } func New( @@ -355,6 +357,7 @@ func (s *Service) Configure(signer crypto.Signer, tracer *tracing.Tracer, o Opti s.lightNodes = e.LightNodes s.pseudosettle = e.Pseudosettle s.blockTime = e.BlockTime + s.pubsubSvc = e.PubsubService s.statusSem = semaphore.NewWeighted(1) s.postageSem = semaphore.NewWeighted(1) diff --git a/pkg/api/api_test.go b/pkg/api/api_test.go index 4dd7aa1d3f7..4c46af1f3cd 100644 --- a/pkg/api/api_test.go +++ b/pkg/api/api_test.go @@ -42,6 +42,7 @@ import ( "github.com/ethersphere/bee/v2/pkg/postage/postagecontract" contractMock "github.com/ethersphere/bee/v2/pkg/postage/postagecontract/mock" "github.com/ethersphere/bee/v2/pkg/pss" + "github.com/ethersphere/bee/v2/pkg/pubsub" "github.com/ethersphere/bee/v2/pkg/pusher" "github.com/ethersphere/bee/v2/pkg/resolver" resolverMock "github.com/ethersphere/bee/v2/pkg/resolver/mock" @@ -135,6 +136,7 @@ type testServerOptions struct { FullAPIDisabled bool ChequebookDisabled bool SwapDisabled bool + PubsubService *pubsub.Service } func newTestServer(t *testing.T, o testServerOptions) (*http.Client, *websocket.Conn, string, *chanStorer) { @@ -205,6 +207,7 @@ func newTestServer(t *testing.T, o testServerOptions) (*http.Client, *websocket. Staking: o.StakingContract, NodeStatus: o.NodeStatus, PinIntegrity: o.PinIntegrity, + PubsubService: o.PubsubService, } // By default bee mode is set to full mode. diff --git a/pkg/api/pubsub.go b/pkg/api/pubsub.go new file mode 100644 index 00000000000..bb0b3dd33de --- /dev/null +++ b/pkg/api/pubsub.go @@ -0,0 +1,137 @@ +// Copyright 2026 The Swarm Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package api + +import ( + "context" + "encoding/hex" + "net/http" + "time" + + "github.com/ethersphere/bee/v2/pkg/jsonhttp" + "github.com/ethersphere/bee/v2/pkg/pubsub" + "github.com/ethersphere/bee/v2/pkg/swarm" + "github.com/gorilla/mux" + "github.com/gorilla/websocket" + ma "github.com/multiformats/go-multiaddr" +) + +func (s *Service) pubsubWsHandler(w http.ResponseWriter, r *http.Request) { + logger := s.logger.WithName("pubsub").Build() + + paths := struct { + Topic string `map:"topic" validate:"required"` + }{} + if response := s.mapStructure(mux.Vars(r), &paths); response != nil { + response("invalid path params", logger, w) + return + } + + var topicAddr [32]byte + if decoded, err := hex.DecodeString(paths.Topic); err == nil && len(decoded) == swarm.HashSize { + copy(topicAddr[:], decoded) + } else { + h := swarm.NewHasher() + _, _ = h.Write([]byte(paths.Topic)) + copy(topicAddr[:], h.Sum(nil)) + } + + peerMultiaddr := r.URL.Query().Get("peer") + if peerMultiaddr == "" { + jsonhttp.BadRequest(w, "missing peer query param") + return + } + underlay, err := ma.NewMultiaddr(peerMultiaddr) + if err != nil { + logger.Info("invalid peer multiaddr", "value", peerMultiaddr, "error", err) + jsonhttp.BadRequest(w, "invalid peer query param") + return + } + + var connectOpts pubsub.ConnectOptions + + gsocEthAddrHex := r.URL.Query().Get("gsoc-eth-address") + gsocTopicHex := r.URL.Query().Get("gsoc-topic") + if gsocEthAddrHex != "" && gsocTopicHex != "" { + gsocOwner, err := hex.DecodeString(gsocEthAddrHex) + if err != nil { + jsonhttp.BadRequest(w, "invalid gsoc-eth-address query param") + return + } + gsocID, err := hex.DecodeString(gsocTopicHex) + if err != nil { + jsonhttp.BadRequest(w, "invalid gsoc-topic query param") + return + } + connectOpts.GsocOwner = gsocOwner + connectOpts.GsocID = gsocID + connectOpts.ReadWrite = true + } + + headers := struct { + KeepAlive int `map:"Swarm-Keep-Alive"` + }{} + if response := s.mapStructure(r.Header, &headers); response != nil { + response("invalid header params", logger, w) + return + } + + // Connect to broker peer + ctx, cancel := context.WithCancel(context.Background()) + mode, err := s.pubsubSvc.Connect(ctx, underlay, topicAddr, pubsub.ModeGSOCEphemeral, connectOpts) + if err != nil { + cancel() + logger.Info("pubsub connect failed", "error", err) + jsonhttp.InternalServerError(w, "pubsub connect failed") + return + } + // Upgrade to WebSocket + upgrader := websocket.Upgrader{ + ReadBufferSize: swarm.ChunkWithSpanSize, + WriteBufferSize: swarm.ChunkWithSpanSize, + CheckOrigin: s.checkOrigin, + } + + logger.Info("upgrading to websocket") + conn, err := upgrader.Upgrade(w, r, nil) + if err != nil { + // Upgrade() hijacks the connection before returning an error, + // so do NOT write an HTTP response here. + cancel() + logger.Info("websocket upgrade failed", "error", err) + logger.Error(nil, "websocket upgrade failed") + return + } + logger.Info("websocket upgrade successful") + + pingPeriod := time.Duration(headers.KeepAlive) * time.Second + if pingPeriod == 0 { + pingPeriod = time.Minute + } + + isPublisher := connectOpts.ReadWrite + + s.wsWg.Add(1) + go func() { + pubsub.ListeningWs(ctx, conn, pubsub.WsOptions{PingPeriod: pingPeriod, Cancel: cancel}, logger, mode, isPublisher) + cancel() + _ = conn.Close() + s.wsWg.Done() + }() +} + +func (s *Service) pubsubListHandler(w http.ResponseWriter, r *http.Request) { + if s.pubsubSvc == nil { + jsonhttp.NotFound(w, "pubsub service not available") + return + } + + topics := s.pubsubSvc.Topics() + jsonhttp.OK(w, struct { + Topics []pubsub.TopicInfo `json:"topics"` + }{ + Topics: topics, + }) +} diff --git a/pkg/api/pubsub_test.go b/pkg/api/pubsub_test.go new file mode 100644 index 00000000000..5c1600927f9 --- /dev/null +++ b/pkg/api/pubsub_test.go @@ -0,0 +1,410 @@ +// Copyright 2026 The Swarm Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package api_test + +import ( + "context" + "encoding/json" + "io" + "net/http" + "net/url" + "sync" + "testing" + "time" + + "github.com/ethersphere/bee/v2/pkg/bzz" + "github.com/ethersphere/bee/v2/pkg/log" + "github.com/ethersphere/bee/v2/pkg/p2p" + "github.com/ethersphere/bee/v2/pkg/pubsub" + "github.com/ethersphere/bee/v2/pkg/spinlock" + "github.com/ethersphere/bee/v2/pkg/swarm" + "github.com/gorilla/websocket" + ma "github.com/multiformats/go-multiaddr" +) + +func dialWs(t *testing.T, listener, topic string) *websocket.Conn { + t.Helper() + u := url.URL{ + Scheme: "ws", + Host: listener, + Path: "/pubsub/" + topic, + RawQuery: "peer=/ip4/127.0.0.1/tcp/9000", + } + conn, resp, err := websocket.DefaultDialer.Dial(u.String(), nil) + if err != nil { + if resp != nil { + t.Fatalf("websocket dial failed status=%d: %v", resp.StatusCode, err) + } + t.Fatalf("websocket dial failed: %v", err) + } + return conn +} + +// pipeStreamAPI is a bidirectional p2p.Stream backed by an io.Pipe pair. +type pipeStreamAPI struct { + once sync.Once + pr *io.PipeReader + pw *io.PipeWriter +} + +func newPipeStreamAPI() *pipeStreamAPI { + pr, pw := io.Pipe() + return &pipeStreamAPI{pr: pr, pw: pw} +} + +func (s *pipeStreamAPI) Read(p []byte) (int, error) { return s.pr.Read(p) } +func (s *pipeStreamAPI) Write(p []byte) (int, error) { return s.pw.Write(p) } +func (s *pipeStreamAPI) ResponseHeaders() p2p.Headers { return nil } +func (s *pipeStreamAPI) Headers() p2p.Headers { return nil } +func (s *pipeStreamAPI) FullClose() error { return s.Close() } +func (s *pipeStreamAPI) Close() error { + s.once.Do(func() { s.pr.Close(); s.pw.Close() }) + return nil +} +func (s *pipeStreamAPI) Reset() error { return s.Close() } + +// pubsubMockP2P implements pubsub.P2P for testing. +// Only ConnectAllowLight and NewStream do real work; all other methods are stubs. +type pubsubMockP2P struct { + connectAllowLight func(ctx context.Context, addrs []ma.Multiaddr) (*bzz.Address, error) + newStream func(ctx context.Context, address swarm.Address, h p2p.Headers, protocol, version, stream string) (p2p.Stream, error) +} + +func (m *pubsubMockP2P) ConnectAllowLight(ctx context.Context, addrs []ma.Multiaddr) (*bzz.Address, error) { + return m.connectAllowLight(ctx, addrs) +} +func (m *pubsubMockP2P) NewStream(ctx context.Context, address swarm.Address, h p2p.Headers, protocol, version, stream string) (p2p.Stream, error) { + return m.newStream(ctx, address, h, protocol, version, stream) +} +func (m *pubsubMockP2P) AddProtocol(p2p.ProtocolSpec) error { return nil } +func (m *pubsubMockP2P) Connect(context.Context, []ma.Multiaddr) (*bzz.Address, error) { + return nil, nil +} +func (m *pubsubMockP2P) Disconnect(swarm.Address, string) error { return nil } +func (m *pubsubMockP2P) Blocklist(swarm.Address, time.Duration, string) error { return nil } +func (m *pubsubMockP2P) NetworkStatus() p2p.NetworkStatus { return p2p.NetworkStatusAvailable } +func (m *pubsubMockP2P) Peers() []p2p.Peer { return nil } +func (m *pubsubMockP2P) Blocklisted(swarm.Address) (bool, error) { return false, nil } +func (m *pubsubMockP2P) BlocklistedPeers() ([]p2p.BlockListedPeer, error) { return nil, nil } +func (m *pubsubMockP2P) Addresses() ([]ma.Multiaddr, error) { return nil, nil } +func (m *pubsubMockP2P) SetPickyNotifier(p2p.PickyNotifier) {} +func (m *pubsubMockP2P) Halt() {} + +// nopStream is a p2p.Stream that never returns data and ignores all writes. +// It blocks reads until closed, simulating a long-lived idle broker connection. +type nopStream struct { + once sync.Once + done chan struct{} +} + +func newNopStream() *nopStream { return &nopStream{done: make(chan struct{})} } + +func (s *nopStream) Read(p []byte) (int, error) { + <-s.done + return 0, io.EOF +} +func (s *nopStream) Write(p []byte) (int, error) { return len(p), nil } +func (s *nopStream) Close() error { s.once.Do(func() { close(s.done) }); return nil } +func (s *nopStream) ResponseHeaders() p2p.Headers { return nil } +func (s *nopStream) Headers() p2p.Headers { return nil } +func (s *nopStream) FullClose() error { return s.Close() } +func (s *nopStream) Reset() error { return s.Close() } + +func newPubsubService(t *testing.T) (*pubsub.Service, *nopStream) { + t.Helper() + ns := newNopStream() + t.Cleanup(func() { _ = ns.Close() }) + + mockP2P := &pubsubMockP2P{ + connectAllowLight: func(_ context.Context, _ []ma.Multiaddr) (*bzz.Address, error) { + return &bzz.Address{Overlay: swarm.NewAddress(make([]byte, 32))}, nil + }, + newStream: func(_ context.Context, _ swarm.Address, _ p2p.Headers, _, _, _ string) (p2p.Stream, error) { + return ns, nil + }, + } + return pubsub.New(mockP2P, log.Noop, false), ns +} + +// newPubsubServiceMultiStream returns a pubsub.Service whose mock P2P creates a +// fresh pipe stream for every NewStream call and collects them for test control. +func newPubsubServiceMultiStream(t *testing.T) (*pubsub.Service, func() []*pipeStreamAPI) { + t.Helper() + var mu sync.Mutex + var streams []*pipeStreamAPI + + mockP2P := &pubsubMockP2P{ + connectAllowLight: func(_ context.Context, _ []ma.Multiaddr) (*bzz.Address, error) { + return &bzz.Address{Overlay: swarm.NewAddress(make([]byte, 32))}, nil + }, + newStream: func(_ context.Context, _ swarm.Address, _ p2p.Headers, _, _, _ string) (p2p.Stream, error) { + ps := newPipeStreamAPI() + mu.Lock() + streams = append(streams, ps) + mu.Unlock() + return ps, nil + }, + } + svc := pubsub.New(mockP2P, log.Noop, false) + return svc, func() []*pipeStreamAPI { + mu.Lock() + defer mu.Unlock() + return streams + } +} + +func TestPubsubList_NilService(t *testing.T) { + t.Parallel() + + client, _, _, _ := newTestServer(t, testServerOptions{}) + + req, err := http.NewRequestWithContext(t.Context(), http.MethodGet, "/pubsub/", nil) + if err != nil { + t.Fatal(err) + } + resp, err := client.Do(req) + if err != nil { + t.Fatal(err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusNotFound { + t.Fatalf("expected 404, got %d", resp.StatusCode) + } +} + +func TestPubsubList_Empty(t *testing.T) { + t.Parallel() + + svc := pubsub.New(nil, log.Noop, true) + client, _, _, _ := newTestServer(t, testServerOptions{PubsubService: svc}) + + req, err := http.NewRequestWithContext(t.Context(), http.MethodGet, "/pubsub/", nil) + if err != nil { + t.Fatal(err) + } + resp, err := client.Do(req) + if err != nil { + t.Fatal(err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + t.Fatalf("expected 200, got %d", resp.StatusCode) + } +} + +func TestPubsubWs_MissingPeer(t *testing.T) { + t.Parallel() + + svc := pubsub.New(nil, log.Noop, false) + client, _, listener, _ := newTestServer(t, testServerOptions{PubsubService: svc}) + _ = listener + + req, err := http.NewRequestWithContext(t.Context(), http.MethodGet, "/pubsub/testtopic", nil) + if err != nil { + t.Fatal(err) + } + resp, err := client.Do(req) + if err != nil { + t.Fatal(err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusBadRequest { + t.Fatalf("expected 400 for missing peer, got %d", resp.StatusCode) + } +} + +func TestPubsubWs_InvalidMultiaddr(t *testing.T) { + t.Parallel() + + svc := pubsub.New(nil, log.Noop, false) + client, _, _, _ := newTestServer(t, testServerOptions{PubsubService: svc}) + + req, err := http.NewRequestWithContext(t.Context(), http.MethodGet, "/pubsub/testtopic?peer=notamultiaddr", nil) + if err != nil { + t.Fatal(err) + } + resp, err := client.Do(req) + if err != nil { + t.Fatal(err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusBadRequest { + t.Fatalf("expected 400 for invalid multiaddr, got %d", resp.StatusCode) + } +} + +func TestPubsubWs_InvalidGsocEthAddress(t *testing.T) { + t.Parallel() + + svc := pubsub.New(nil, log.Noop, false) + client, _, _, _ := newTestServer(t, testServerOptions{PubsubService: svc}) + + req, err := http.NewRequestWithContext(t.Context(), http.MethodGet, "/pubsub/testtopic?peer=/ip4/127.0.0.1/tcp/9000&gsoc-eth-address=ZZZZ&gsoc-topic=aabb", nil) + if err != nil { + t.Fatal(err) + } + resp, err := client.Do(req) + if err != nil { + t.Fatal(err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusBadRequest { + t.Fatalf("expected 400 for invalid gsoc-eth-address, got %d", resp.StatusCode) + } +} + +func TestPubsubWs_InvalidGsocTopic(t *testing.T) { + t.Parallel() + + svc := pubsub.New(nil, log.Noop, false) + client, _, _, _ := newTestServer(t, testServerOptions{PubsubService: svc}) + + req, err := http.NewRequestWithContext(t.Context(), http.MethodGet, "/pubsub/testtopic?peer=/ip4/127.0.0.1/tcp/9000&gsoc-eth-address=aabbccddeeff001122334455667788990011223344&gsoc-topic=ZZZZ", nil) + if err != nil { + t.Fatal(err) + } + resp, err := client.Do(req) + if err != nil { + t.Fatal(err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusBadRequest { + t.Fatalf("expected 400 for invalid gsoc-topic, got %d", resp.StatusCode) + } +} + +func TestPubsubWs_SubscriberConnect(t *testing.T) { + t.Parallel() + + svc, nopSt := newPubsubService(t) + _ = nopSt + + _, _, listener, _ := newTestServer(t, testServerOptions{ + PubsubService: svc, + Logger: log.Noop, + }) + + u := url.URL{ + Scheme: "ws", + Host: listener, + Path: "/pubsub/testtopic", + RawQuery: "peer=/ip4/127.0.0.1/tcp/9000", + } + + conn, resp, err := websocket.DefaultDialer.Dial(u.String(), nil) + if err != nil { + if resp != nil { + t.Fatalf("websocket dial failed with status %d: %v", resp.StatusCode, err) + } + t.Fatalf("websocket dial failed: %v", err) + } + defer conn.Close() + + // Connection should be alive; send a close frame and verify clean teardown. + err = conn.WriteMessage(websocket.CloseMessage, + websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")) + if err != nil { + t.Fatalf("write close: %v", err) + } + + _ = conn.SetReadDeadline(time.Now().Add(2 * time.Second)) + for { + _, _, err := conn.ReadMessage() + if websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseGoingAway) { + break + } + if err != nil { + break + } + } +} + +func TestPubsubWs_TwoTopicsListAndMessages(t *testing.T) { + t.Parallel() + + svc, getStreams := newPubsubServiceMultiStream(t) + client, _, listener, _ := newTestServer(t, testServerOptions{ + PubsubService: svc, + Logger: log.Noop, + }) + + // Open two WS connections on different topics. + conn1 := dialWs(t, listener, "topic-alpha") + defer conn1.Close() + conn2 := dialWs(t, listener, "topic-beta") + defer conn2.Close() + + // Wait until both topics appear as active subscribers. + err := spinlock.Wait(3*time.Second, func() bool { + return len(svc.Topics()) == 2 + }) + if err != nil { + t.Fatalf("timed out waiting for 2 active topics, got %d", len(svc.Topics())) + } + + // Broker sends a ping (0x01) on each stream to exercise message exchange. + // A ping is the simplest valid broker message; runMux eats it and keeps running. + streams := getStreams() + if len(streams) != 2 { + t.Fatalf("expected 2 streams, got %d", len(streams)) + } + for _, st := range streams { + if _, err := st.pw.Write([]byte{pubsub.MsgTypePing}); err != nil { + t.Fatalf("write ping to stream: %v", err) + } + } + + // Query the list endpoint — both topics must be present. + req, err := http.NewRequestWithContext(t.Context(), http.MethodGet, "/pubsub/", nil) + if err != nil { + t.Fatal(err) + } + resp, err := client.Do(req) + if err != nil { + t.Fatal(err) + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + t.Fatalf("expected 200, got %d", resp.StatusCode) + } + + var body struct { + Topics []pubsub.TopicInfo `json:"topics"` + } + if err := json.NewDecoder(resp.Body).Decode(&body); err != nil { + t.Fatalf("decode response: %v", err) + } + if len(body.Topics) != 2 { + t.Fatalf("expected 2 topics, got %d", len(body.Topics)) + } + + // Close conn1 and verify the topic list shrinks to 1. + _ = conn1.WriteMessage(websocket.CloseMessage, + websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")) + conn1.Close() + + if err := spinlock.Wait(3*time.Second, func() bool { + return len(svc.Topics()) == 1 + }); err != nil { + t.Fatalf("timed out waiting for topic count to drop to 1, got %d", len(svc.Topics())) + } + + // Close conn2 and verify no topics remain. + _ = conn2.WriteMessage(websocket.CloseMessage, + websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")) + conn2.Close() + + if err := spinlock.Wait(3*time.Second, func() bool { + return len(svc.Topics()) == 0 + }); err != nil { + t.Fatalf("timed out waiting for topic count to drop to 0, got %d", len(svc.Topics())) + } +} diff --git a/pkg/api/router.go b/pkg/api/router.go index 3d61fc98c00..b897ccb0fe6 100644 --- a/pkg/api/router.go +++ b/pkg/api/router.go @@ -44,7 +44,15 @@ func (s *Service) Mount() { s.Handler = web.ChainHandlers( httpaccess.NewHTTPAccessLogHandler(s.logger, s.tracer, "api access"), - handlers.CompressHandler, + func(h http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if strings.EqualFold(r.Header.Get("Upgrade"), "websocket") { + h.ServeHTTP(w, r) + return + } + handlers.CompressHandler(h).ServeHTTP(w, r) + }) + }, s.corsHandler, web.NoCacheHeadersHandler, web.FinalHandler(router), @@ -74,6 +82,16 @@ func (s *Service) EnableFullAPI() { } return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // Skip compression for WebSocket upgrade requests. + // CompressHandler wraps the ResponseWriter with a gzip writer; when + // the WebSocket upgrader hijacks the connection and the handler returns, + // the gzip writer tries to flush/close and writes to the hijacked + // connection, causing "response.Write on hijacked connection" errors. + if strings.EqualFold(r.Header.Get("Upgrade"), "websocket") { + h.ServeHTTP(w, r) + return + } + // Skip compression for GET requests on download endpoints. // This is done in order to preserve Content-Length header in response, // because CompressHandler is always removing it. @@ -366,6 +384,14 @@ func (s *Service) mountAPI() { ), }) + handle("/pubsub/{topic}", http.HandlerFunc(s.pubsubWsHandler)) + + handle("/pubsub/", web.ChainHandlers( + web.FinalHandler(jsonhttp.MethodHandler{ + "GET": http.HandlerFunc(s.pubsubListHandler), + }), + )) + handle("/pss/subscribe/{topic}", jsonhttp.MethodHandler{ "GET": http.HandlerFunc(s.pssWsHandler), }) diff --git a/pkg/log/httpaccess/http_access.go b/pkg/log/httpaccess/http_access.go index fbc350dba60..b30803e78b3 100644 --- a/pkg/log/httpaccess/http_access.go +++ b/pkg/log/httpaccess/http_access.go @@ -92,12 +92,16 @@ type responseRecorder struct { http.ResponseWriter // Metrics. - status int - size int + status int + size int + hijacked bool } // Write implements http.ResponseWriter. func (rr *responseRecorder) Write(b []byte) (int, error) { + if rr.hijacked { + return 0, http.ErrHijacked + } size, err := rr.ResponseWriter.Write(b) rr.size += size return size, err @@ -105,6 +109,9 @@ func (rr *responseRecorder) Write(b []byte) (int, error) { // WriteHeader implements http.ResponseWriter. func (rr *responseRecorder) WriteHeader(s int) { + if rr.hijacked { + return + } rr.ResponseWriter.WriteHeader(s) if rr.status == 0 { rr.status = s @@ -118,13 +125,27 @@ func (rr *responseRecorder) CloseNotify() <-chan bool { return rr.ResponseWriter.(http.CloseNotifier).CloseNotify() } -// Hijack implements http.Hijacker. +// Hijack implements http.Hijacker so that WebSocket upgrades pass through +// correctly. Without this, the underlying connection is hijacked by the +// upgrader but the recorder still holds a reference to the (now-hijacked) +// ResponseWriter, causing "response.Write on hijacked connection" errors. func (rr *responseRecorder) Hijack() (net.Conn, *bufio.ReadWriter, error) { - return rr.ResponseWriter.(http.Hijacker).Hijack() + h, ok := rr.ResponseWriter.(http.Hijacker) + if !ok { + return nil, nil, http.ErrNotSupported + } + conn, brw, err := h.Hijack() + if err == nil { + rr.hijacked = true + } + return conn, brw, err } // Flush implements http.Flusher. func (rr *responseRecorder) Flush() { + if rr.hijacked { + return + } rr.ResponseWriter.(http.Flusher).Flush() } diff --git a/pkg/node/node.go b/pkg/node/node.go index 53d96ec5419..4330ac1acd7 100644 --- a/pkg/node/node.go +++ b/pkg/node/node.go @@ -48,6 +48,7 @@ import ( "github.com/ethersphere/bee/v2/pkg/pricer" "github.com/ethersphere/bee/v2/pkg/pricing" "github.com/ethersphere/bee/v2/pkg/pss" + "github.com/ethersphere/bee/v2/pkg/pubsub" "github.com/ethersphere/bee/v2/pkg/puller" "github.com/ethersphere/bee/v2/pkg/pullsync" "github.com/ethersphere/bee/v2/pkg/pusher" @@ -193,6 +194,7 @@ type Options struct { WarmupTime time.Duration WelcomeMessage string WhitelistedWithdrawalAddress []string + PubsubBrokerMode bool } const ( @@ -741,6 +743,11 @@ func NewBee( return nil, fmt.Errorf("init batch service: %w", err) } + pubsubSvc := pubsub.New(p2ps, logger, o.PubsubBrokerMode) + if err = p2ps.AddProtocol(pubsubSvc.Protocol()); err != nil { + return nil, fmt.Errorf("pubsub protocol: %w", err) + } + // Construct protocols. pingPong := pingpong.New(p2ps, logger, tracer) @@ -1310,6 +1317,7 @@ func NewBee( SyncStatus: syncStatusFn, NodeStatus: nodeStatus, PinIntegrity: localStore.PinIntegrity(), + PubsubService: pubsubSvc, } if o.APIAddr != "" { diff --git a/pkg/p2p/libp2p/libp2p.go b/pkg/p2p/libp2p/libp2p.go index 316cbd6171f..802fbd0264d 100644 --- a/pkg/p2p/libp2p/libp2p.go +++ b/pkg/p2p/libp2p/libp2p.go @@ -208,7 +208,6 @@ func New(ctx context.Context, signer beecrypto.Signer, networkID uint64, overlay return nil, err } - // Tweak certain settings cfg := rcmgr.PartialLimitConfig{ System: rcmgr.ResourceLimits{ Streams: IncomingStreamCountLimit + OutgoingStreamCountLimit, @@ -587,7 +586,7 @@ func (s *Service) handleIncoming(stream network.Stream) { peerAddrs, err := s.peerMultiaddrs(s.ctx, peerID) if err != nil { s.logger.Debug("stream handler: handshake: build remote multiaddrs", "peer_id", peerID, "error", err) - s.logger.Error(nil, "stream handler: handshake: build remote multiaddrs", "peer_id", peerID) + s.logger.Error(err, "stream handler: handshake: build remote multiaddrs", "peer_id", peerID) _ = handshakeStream.Reset() _ = s.host.Network().ClosePeer(peerID) return @@ -602,7 +601,7 @@ func (s *Service) handleIncoming(stream network.Stream) { observedAddrs, err = buildFullMAs([]ma.Multiaddr{stream.Conn().RemoteMultiaddr()}, peerID) if err != nil { s.logger.Debug("stream handler: handshake: build remote multiaddrs fallback", "peer_id", peerID, "error", err) - s.logger.Error(nil, "stream handler: handshake: build remote multiaddrs fallback", "peer_id", peerID) + s.logger.Error(err, "stream handler: handshake: build remote multiaddrs fallback", "peer_id", peerID) _ = handshakeStream.Reset() _ = s.host.Network().ClosePeer(peerID) return @@ -619,7 +618,7 @@ func (s *Service) handleIncoming(stream network.Stream) { ) if err != nil { s.logger.Debug("stream handler: handshake: handle failed", "peer_id", peerID, "error", err) - s.logger.Error(nil, "stream handler: handshake: handle failed", "peer_id", peerID) + s.logger.Error(err, "stream handler: handshake: handle failed", "peer_id", peerID) _ = handshakeStream.Reset() _ = s.host.Network().ClosePeer(peerID) return @@ -630,14 +629,14 @@ func (s *Service) handleIncoming(stream network.Stream) { blocked, err := s.blocklist.Exists(overlay) if err != nil { s.logger.Debug("stream handler: blocklisting: exists failed", "peer_address", overlay, "error", err) - s.logger.Error(nil, "stream handler: internal error while connecting with peer", "peer_address", overlay) + s.logger.Error(err, "stream handler: internal error while connecting with peer", "peer_address", overlay) _ = handshakeStream.Reset() _ = stream.Conn().Close() return } if blocked { - s.logger.Error(nil, "stream handler: blocked connection from blocklisted peer", "peer_address", overlay) + s.logger.Error(err, "stream handler: blocked connection from blocklisted peer", "peer_address", overlay) _ = handshakeStream.Reset() _ = s.host.Network().ClosePeer(peerID) return @@ -647,7 +646,7 @@ func (s *Service) handleIncoming(stream network.Stream) { s.logger.Debug("stream handler: peer already exists", "peer_address", overlay) if err = handshakeStream.FullClose(); err != nil { s.logger.Debug("stream handler: could not close stream", "peer_address", overlay, "error", err) - s.logger.Error(nil, "stream handler: unable to handshake with peer", "peer_address", overlay) + s.logger.Error(err, "stream handler: unable to handshake with peer", "peer_address", overlay) _ = stream.Conn().Close() } return @@ -655,7 +654,7 @@ func (s *Service) handleIncoming(stream network.Stream) { if err = handshakeStream.FullClose(); err != nil { s.logger.Debug("stream handler: could not close stream", "peer_address", overlay, "error", err) - s.logger.Error(nil, "stream handler: unable to handshake with peer", "peer_address", overlay) + s.logger.Error(err, "stream handler: unable to handshake with peer", "peer_address", overlay) _ = s.Disconnect(overlay, "could not fully close stream on handshake") return } @@ -665,7 +664,7 @@ func (s *Service) handleIncoming(stream network.Stream) { err = s.addressbook.Put(i.BzzAddress.Overlay, *i.BzzAddress) if err != nil { s.logger.Debug("stream handler: addressbook put error", "peer_id", peerID, "error", err) - s.logger.Error(nil, "stream handler: unable to persist peer", "peer_id", peerID) + s.logger.Error(err, "stream handler: unable to persist peer", "peer_id", peerID) _ = s.Disconnect(i.BzzAddress.Overlay, "unable to persist peer in addressbook") return } @@ -868,7 +867,7 @@ func (s *Service) AddProtocol(p p2p.ProtocolSpec) (err error) { _ = stream.Reset() if err := s.Blocklist(overlay, bpe.Duration(), bpe.Error()); err != nil { logger.Debug("blocklist: could not blocklist peer", "peer_id", peerID, "error", err) - logger.Error(nil, "unable to blocklist peer", "peer_id", peerID) + logger.Error(err, "unable to blocklist peer", "peer_id", peerID) } loggerV1.Debug("handler: peer blocklisted", "protocol", p.Name, "peer_address", overlay) } @@ -959,7 +958,18 @@ func buildHostAddress(peerID libp2ppeer.ID) (ma.Multiaddr, error) { return ma.NewMultiaddr(fmt.Sprintf("/p2p/%s", peerID.String())) } -func (s *Service) Connect(ctx context.Context, addrs []ma.Multiaddr) (address *bzz.Address, err error) { +func (s *Service) Connect(ctx context.Context, addrs []ma.Multiaddr) (*bzz.Address, error) { + return s.connect(ctx, addrs, false) +} + +// ConnectAllowLight behaves like Connect but does not reject the peer +// if it identifies itself as a light node. Intended for protocols such +// as pubsub where broker peers may operate in light-node mode. +func (s *Service) ConnectAllowLight(ctx context.Context, addrs []ma.Multiaddr) (*bzz.Address, error) { + return s.connect(ctx, addrs, true) +} + +func (s *Service) connect(ctx context.Context, addrs []ma.Multiaddr, allowLight bool) (address *bzz.Address, err error) { loggerV1 := s.logger.V(1).Register() defer func() { @@ -1011,6 +1021,13 @@ func (s *Service) Connect(ctx context.Context, addrs []ma.Multiaddr) (address *b } connectCtx, cancel := context.WithTimeout(ctx, 15*time.Second) + // Clear any stale libp2p swarm dial backoff for this peer so that + // an explicit Connect call always attempts a real TCP dial rather + // than failing immediately with a cached backoff error (which would + // still count against the Bee connection breaker). + if sw, ok := s.host.Network().(*lp2pswarm.Swarm); ok { + sw.Backoff().Clear(info.ID) + } err = s.connectionBreaker.Execute(func() error { return s.host.Connect(connectCtx, *info) }) cancel() @@ -1086,7 +1103,7 @@ func (s *Service) Connect(ctx context.Context, addrs []ma.Multiaddr) (address *b return nil, fmt.Errorf("handshake: %w", err) } - if !i.FullNode { + if !i.FullNode && !allowLight { _ = handshakeStream.Reset() _ = s.host.Network().ClosePeer(info.ID) return nil, p2p.ErrDialLightNode @@ -1097,14 +1114,14 @@ func (s *Service) Connect(ctx context.Context, addrs []ma.Multiaddr) (address *b blocked, err := s.blocklist.Exists(overlay) if err != nil { s.logger.Debug("blocklisting: exists failed", "peer_id", info.ID, "error", err) - s.logger.Error(nil, "internal error while connecting with peer", "peer_id", info.ID) + s.logger.Error(err, "internal error while connecting with peer", "peer_id", info.ID) _ = handshakeStream.Reset() _ = s.host.Network().ClosePeer(info.ID) return nil, err } if blocked { - s.logger.Error(nil, "blocked connection to blocklisted peer", "peer_id", info.ID) + s.logger.Error(err, "blocked connection to blocklisted peer", "peer_id", info.ID) _ = handshakeStream.Reset() _ = s.host.Network().ClosePeer(info.ID) return nil, p2p.ErrPeerBlocklisted diff --git a/pkg/pubsub/export_test.go b/pkg/pubsub/export_test.go new file mode 100644 index 00000000000..7eb91d37f7e --- /dev/null +++ b/pkg/pubsub/export_test.go @@ -0,0 +1,40 @@ +// Copyright 2026 The Swarm Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package pubsub + +import "github.com/ethersphere/bee/v2/pkg/log" + +func (m *GSOCEphemeralMode) FormatBroadcast(sub *brokerSubscriber, rawMsg []byte) []byte { + return m.formatBroadcast(sub, rawMsg) +} + +func (m *GSOCEphemeralMode) SetGsocParams(gsocOwner, gsocID []byte) { + m.setGsocParams(gsocOwner, gsocID) +} + +func (m *GSOCEphemeralMode) GsocOwner() []byte { + m.mu.RLock() + defer m.mu.RUnlock() + return m.gsocOwner +} + +func NewBrokerSubscriber() *brokerSubscriber { + return &brokerSubscriber{} +} + +func (b *brokerSubscriber) SetHandshakeDone() { + b.handshakeHappened = true +} + +func NewTestSubscriberConn(logger log.Logger) *SubscriberConn { + return &SubscriberConn{ + subs: make(map[uint64]chan []byte), + logger: logger, + } +} + +func (sc *SubscriberConn) FanOut(msg []byte) { + sc.fanOut(msg) +} diff --git a/pkg/pubsub/mode.go b/pkg/pubsub/mode.go new file mode 100644 index 00000000000..73831a6152d --- /dev/null +++ b/pkg/pubsub/mode.go @@ -0,0 +1,440 @@ +// Copyright 2026 The Swarm Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package pubsub + +import ( + "bytes" + "context" + "encoding/binary" + "errors" + "fmt" + "io" + "sync" + + "github.com/ethersphere/bee/v2/pkg/log" + "github.com/ethersphere/bee/v2/pkg/p2p" + "github.com/ethersphere/bee/v2/pkg/soc" + "github.com/ethersphere/bee/v2/pkg/swarm" +) + +var ErrInvalidSignature = errors.New("pubsub: invalid SOC signature") + +const ( + // P2P headers + HeaderGsocOwner = "pubsub-gsoc-owner" + HeaderGsocID = "pubsub-gsoc-id" +) + +// ModeID identifies a pubsub mode. +type ModeID uint8 + +// Mode defines mode-specific behavior for the pubsub protocol. +// Each mode determines its own roles, wire format, and message handling. +type Mode interface { + ID() ModeID + TopicAddress() swarm.Address + + // Subscriber side - outbound connection to broker + Connect(ctx context.Context, p p2p.Streamer, overlay swarm.Address, opts ConnectOptions) (p2p.Stream, error) + CreateSubscriberConn(stream p2p.Stream, overlay swarm.Address) *SubscriberConn + GetSubscriberConn() *SubscriberConn + RemoveSubscriberConn(conn *SubscriberConn) + ReadBrokerMessage(stream p2p.Stream) ([]byte, error) + + // Broker side - handles incoming streams (publisher and subscriber) + HandleBroker(ctx context.Context, peer p2p.Peer, stream p2p.Stream, headers p2p.Headers) error + SubscriberCount() int + SubscriberOverlays() []string +} + +// --- GSOC Ephemeral Mode (mode 1) --- + +const ( + // Mode-specific message types (Broker → Subscriber); 0x01 is reserved for service-level ping. + MsgTypeHandshake byte = 0x02 + MsgTypeData byte = 0x03 +) + +// GSOCEphemeralMode implements Mode for GSOC ephemeral messaging. +type GSOCEphemeralMode struct { + mu sync.RWMutex + topicAddress swarm.Address + gsocOwner []byte + gsocID []byte + logger log.Logger + subscribers map[string]*brokerSubscriber + subscriberConn *SubscriberConn +} + +var _ Mode = (*GSOCEphemeralMode)(nil) + +func NewGSOCEphemeralMode(topicAddress []byte, logger log.Logger) *GSOCEphemeralMode { + return &GSOCEphemeralMode{ + topicAddress: swarm.NewAddress(topicAddress), + logger: logger, + subscribers: make(map[string]*brokerSubscriber), + } +} + +func (m *GSOCEphemeralMode) ID() ModeID { return ModeGSOCEphemeral } + +func (m *GSOCEphemeralMode) TopicAddress() swarm.Address { return m.topicAddress.Clone() } + +func (m *GSOCEphemeralMode) Connect(ctx context.Context, p p2p.Streamer, overlay swarm.Address, opts ConnectOptions) (p2p.Stream, error) { + var rw byte + if opts.ReadWrite { + rw = 1 + } + headers := p2p.Headers{ + HeaderTopicAddress: m.topicAddress.Bytes(), + HeaderMode: {byte(m.ID())}, + HeaderReadWrite: {rw}, + } + if len(opts.GsocOwner) > 0 { + headers[HeaderGsocOwner] = opts.GsocOwner + } + if len(opts.GsocID) > 0 { + headers[HeaderGsocID] = opts.GsocID + } + return p.NewStream(ctx, overlay, headers, protocolName, protocolVersion, streamName) +} + +// validatePublisher sets SOC parameters on the broker side so it can validate the messages. +func (m *GSOCEphemeralMode) validatePublisher(headers p2p.Headers) error { + gsocOwner := headers[HeaderGsocOwner] + gsocID := headers[HeaderGsocID] + + m.mu.Lock() + m.setGsocParams(gsocOwner, gsocID) + set := m.gsocID != nil + m.mu.Unlock() + + if !set { + return ErrWrongHeaders + } + return nil +} + +// FormatBroadcast formats a raw publisher message for delivery to a subscriber. +// First delivery to each subscriber includes a handshake with SOC identity; subsequent are data-only. +func (m *GSOCEphemeralMode) formatBroadcast(sub *brokerSubscriber, rawMsg []byte) []byte { + if !sub.handshakeHappened { + // Handshake: [1B type=0x02][32B SOC ID][20B owner][65B sig][8B span][NB payload] + msg := make([]byte, 1+IDSize+OwnerSize+len(rawMsg)) + msg[0] = MsgTypeHandshake + copy(msg[1:1+IDSize], m.gsocID) + copy(msg[1+IDSize:1+IDSize+OwnerSize], m.gsocOwner) + copy(msg[1+IDSize+OwnerSize:], rawMsg) + sub.handshakeHappened = true + return msg + } + + // Data: [1B type=0x03][65B sig][8B span][NB payload] + msg := make([]byte, 1+len(rawMsg)) + msg[0] = MsgTypeData + copy(msg[1:], rawMsg) + return msg +} + +// ReadPublisherMessage reads [65B sig][8B span][NB payload (max 4KB)] from the stream, +// constructs and validates the SOC chunk and returns that. +func (m *GSOCEphemeralMode) ReadPublisherMessage(stream p2p.Stream) ([]byte, error) { + sig := make([]byte, SigSize) + if _, err := io.ReadFull(stream, sig); err != nil { + return nil, err + } + spanBytes := make([]byte, SpanSize) + if _, err := io.ReadFull(stream, spanBytes); err != nil { + return nil, err + } + span := min(binary.LittleEndian.Uint64(spanBytes), MaxPayload) + + payload := make([]byte, span) + if _, err := io.ReadFull(stream, payload); err != nil { + return nil, err + } + + // Construct SOC chunk: [ID (32B)][sig (65B)][span (8B)][payload] + socData := make([]byte, IDSize+SigSize+SpanSize+int(span)) + copy(socData, m.gsocID) + copy(socData[IDSize:], sig) + copy(socData[IDSize+SigSize:], spanBytes) + copy(socData[IDSize+SigSize+SpanSize:], payload) + + socAddr, err := soc.CreateAddress(m.gsocID, m.gsocOwner) + if err != nil { + return nil, fmt.Errorf("pubsub: compute SOC address: %w", err) + } + + if !soc.Valid(swarm.NewChunk(socAddr, socData)) { + return nil, ErrInvalidSignature + } + + return socData[IDSize:], nil +} + +// ReadBrokerMessage reads one broker→subscriber message and verifies it +func (m *GSOCEphemeralMode) ReadBrokerMessage(stream p2p.Stream) ([]byte, error) { + typeBuf := make([]byte, 1) + if _, err := io.ReadFull(stream, typeBuf); err != nil { + return nil, err + } + + if handled, err := readServiceMessage(typeBuf[0]); err != nil { + return nil, err + } else if handled { + return nil, nil + } + + switch typeBuf[0] { + case MsgTypeHandshake: + socID := make([]byte, IDSize) + if _, err := io.ReadFull(stream, socID); err != nil { + return nil, fmt.Errorf("read SOC ID: %w", err) + } + ownerAddr := make([]byte, OwnerSize) + if _, err := io.ReadFull(stream, ownerAddr); err != nil { + return nil, fmt.Errorf("read owner addr: %w", err) + } + m.setGsocParams(ownerAddr, socID) + + return m.ReadPublisherMessage(stream) // same as publisher message at this point + + case MsgTypeData: + if m.gsocID == nil { + return nil, fmt.Errorf("pubsub: data message before handshake") + } + return m.ReadPublisherMessage(stream) + + default: + return nil, fmt.Errorf("pubsub: unknown message type: 0x%02x", typeBuf[0]) + } +} + +// SubscriberCount returns the number of active subscribers. +func (m *GSOCEphemeralMode) SubscriberCount() int { + m.mu.RLock() + defer m.mu.RUnlock() + return len(m.subscribers) +} + +// SubscriberOverlays returns the overlay addresses of all active subscribers. +func (m *GSOCEphemeralMode) SubscriberOverlays() []string { + m.mu.RLock() + defer m.mu.RUnlock() + overlays := make([]string, 0, len(m.subscribers)) + for _, sub := range m.subscribers { + overlays = append(overlays, sub.overlay.String()) + } + return overlays +} + +// broadcast sends a message to all subscribers. +func (m *GSOCEphemeralMode) broadcast(rawMsg []byte) { + m.mu.Lock() + defer m.mu.Unlock() + + m.logger.Info("broadcasting to subscribers", "count", len(m.subscribers), "size", len(rawMsg)) + for _, sub := range m.subscribers { + msg := m.formatBroadcast(sub, rawMsg) + + select { + case sub.outCh <- msg: + m.logger.Info("message enqueued for subscriber", "peer", sub.overlay, "size", len(msg)) + default: + m.logger.Warning("subscriber message queue full, dropping message", "peer", sub.overlay) + } + } +} + +// HandleBroker handles an incoming broker-side stream, dispatching to publisher or subscriber handling. +func (m *GSOCEphemeralMode) HandleBroker(ctx context.Context, peer p2p.Peer, stream p2p.Stream, headers p2p.Headers) error { + rwBytes := headers[HeaderReadWrite] + m.logger.Info("broker stream opened", "peer", peer.Address, "topic", m.TopicAddress(), "rw", rwBytes) + if len(rwBytes) != 1 { + _ = stream.Reset() + return ErrWrongHeaders + } + if rwBytes[0] == 1 { + return m.handlePublisher(ctx, peer, stream, headers) + } + m.logger.Info("handling as subscriber", "peer", peer.Address) + return m.handleSubscriber(ctx, peer, stream) +} + +func (m *GSOCEphemeralMode) handleSubscriber(ctx context.Context, peer p2p.Peer, stream p2p.Stream) error { + subCtx, cancel, unregister := m.registerSubscriber(ctx, peer.Address, stream) + defer cancel() + defer unregister() + + m.logger.Info("subscriber connected", "peer", peer.Address, "topic", m.TopicAddress()) + + <-subCtx.Done() + if errors.Is(subCtx.Err(), context.Canceled) { + return nil + } + return subCtx.Err() +} + +func (m *GSOCEphemeralMode) handlePublisher(ctx context.Context, peer p2p.Peer, stream p2p.Stream, headers p2p.Headers) error { + m.logger.Info("publisher handler entered", "peer", peer.Address, "topic", m.TopicAddress()) + + if err := m.validatePublisher(headers); err != nil { + m.logger.Info("publisher validation failed", "peer", peer.Address, "error", err) + _ = stream.Reset() + return err + } + m.logger.Info("publisher validated", "peer", peer.Address, "gsoc_id", fmt.Sprintf("%x", m.gsocID), "gsoc_owner", fmt.Sprintf("%x", m.gsocOwner)) + + partCtx, cancel, unregister := m.registerSubscriber(ctx, peer.Address, stream) + defer cancel() + defer unregister() + + m.logger.Info("publisher connected, starting read loop", "peer", peer.Address, "topic", m.TopicAddress(), "subscribers", m.SubscriberCount()) + + for { + select { + case <-partCtx.Done(): + if errors.Is(partCtx.Err(), context.Canceled) { + return nil + } + return partCtx.Err() + default: + } + + m.logger.Info("waiting for publisher message", "peer", peer.Address) + rawMsg, err := m.ReadPublisherMessage(stream) + if err != nil { + if errors.Is(err, io.EOF) { + m.logger.Info("publisher stream EOF", "peer", peer.Address) + return nil + } + m.logger.Info("publisher read error", "peer", peer.Address, "error", err) + return fmt.Errorf("read publisher message: %w", err) + } + + m.logger.Info("publisher message received", "peer", peer.Address, "size", len(rawMsg)) + m.broadcast(rawMsg) + } +} + +// registerSubscriber adds a peer as a subscriber and starts a write goroutine for it. +func (m *GSOCEphemeralMode) registerSubscriber(ctx context.Context, overlay swarm.Address, stream p2p.Stream) (context.Context, context.CancelFunc, func()) { + connCtx, cancel := context.WithCancel(ctx) + + sub := &brokerSubscriber{ + overlay: overlay, + stream: stream, + outCh: make(chan []byte, 256), + cancel: cancel, + } + + overlayKey := overlay.String() + m.mu.Lock() + m.subscribers[overlayKey] = sub + m.mu.Unlock() + + startBrokerWriter(connCtx, cancel, stream, sub.outCh, overlay, m.logger) + + unregister := func() { + m.mu.Lock() + if m.subscribers[overlayKey] == sub { + delete(m.subscribers, overlayKey) + } + m.mu.Unlock() + } + + return connCtx, cancel, unregister +} + +// CreateSubscriberConn returns the existing SubscriberConn for this topic if one is active, +// incrementing its ref count so the shared stream stays open. When no conn exists yet, +// a new one is created and a single mux goroutine is started to fan out broker messages. +func (m *GSOCEphemeralMode) CreateSubscriberConn(stream p2p.Stream, overlay swarm.Address) *SubscriberConn { + m.mu.Lock() + defer m.mu.Unlock() + + if m.subscriberConn != nil { + m.subscriberConn.refs++ + return m.subscriberConn + } + + sc := &SubscriberConn{ + Stream: stream, + Overlay: overlay, + refs: 1, + subs: make(map[uint64]chan []byte), + logger: m.logger, + } + m.subscriberConn = sc + go m.runMux(stream) + return sc +} + +// runMux reads broker messages from the shared p2p stream and broadcasts each to all +// registered WS sessions. It exits when the stream closes or returns an error. +// On exit it immediately clears m.subscriberConn so new Connect calls open a fresh stream. +func (m *GSOCEphemeralMode) runMux(stream p2p.Stream) { + // Capture once; RemoveSubscriberConn may set m.subscriberConn=nil concurrently. + sc := m.subscriberConn + defer func() { + sc.closeAll() + m.mu.Lock() + if m.subscriberConn == sc { + m.subscriberConn = nil + } + m.mu.Unlock() + }() + for { + msg, err := m.ReadBrokerMessage(stream) + if err != nil { + m.logger.Debug("pubsub mux: stream error, stopping", "error", err) + return + } + if msg == nil { + continue + } + sc.fanOut(msg) + } +} + +// GetSubscriberConn returns the subscriber-side connection, or nil. +func (m *GSOCEphemeralMode) GetSubscriberConn() *SubscriberConn { + m.mu.RLock() + defer m.mu.RUnlock() + return m.subscriberConn +} + +// RemoveSubscriberConn decrements the ref count for conn. +// When the last WS session exits it closes the stream, stopping the mux goroutine. +// If the mux already died and cleared m.subscriberConn, refs are still tracked on conn +// so the stream is closed exactly once when refs reach zero. +func (m *GSOCEphemeralMode) RemoveSubscriberConn(conn *SubscriberConn) { + m.mu.Lock() + defer m.mu.Unlock() + conn.refs-- + if conn.refs <= 0 { + m.subscriberConn = nil + _ = conn.Stream.FullClose() + } +} + +// setGsocParams sets the GSOC recurring parameters so that messages don't need to include them. +func (m *GSOCEphemeralMode) setGsocParams(gsocOwner, gsocID []byte) { + if m.gsocOwner != nil { + return + } + // Verify got socId and address match with topicaddress + addr, err := soc.CreateAddress(gsocID, gsocOwner) + if err != nil || !bytes.Equal(addr.Bytes(), m.topicAddress.Bytes()) { + m.logger.Debug("gsoc params verification failed", "err", err, "addr", addr, "topicAddress", m.topicAddress) + return + } + + m.gsocOwner = make([]byte, OwnerSize) + copy(m.gsocOwner, gsocOwner) + m.gsocID = make([]byte, IDSize) + copy(m.gsocID, gsocID) +} diff --git a/pkg/pubsub/mode_1_test.go b/pkg/pubsub/mode_1_test.go new file mode 100644 index 00000000000..d4ad201c332 --- /dev/null +++ b/pkg/pubsub/mode_1_test.go @@ -0,0 +1,381 @@ +// Copyright 2026 The Swarm Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Testing out Mode 1 (GSoC Ephemeral) implementation + +package pubsub_test + +import ( + "bytes" + "encoding/binary" + "io" + "testing" + + "github.com/ethersphere/bee/v2/pkg/cac" + "github.com/ethersphere/bee/v2/pkg/crypto" + "github.com/ethersphere/bee/v2/pkg/log" + "github.com/ethersphere/bee/v2/pkg/p2p" + "github.com/ethersphere/bee/v2/pkg/pubsub" + "github.com/ethersphere/bee/v2/pkg/soc" + "github.com/ethersphere/bee/v2/pkg/swarm" +) + +// readerStream is a p2p.Stream whose Read side is backed by a bytes.Reader. +// Write calls succeed but are discarded. Close/Reset are no-ops. +type readerStream struct { + io.Reader + headers p2p.Headers +} + +func (r *readerStream) Write(p []byte) (int, error) { return len(p), nil } +func (r *readerStream) Close() error { return nil } +func (r *readerStream) ResponseHeaders() p2p.Headers { return nil } +func (r *readerStream) Headers() p2p.Headers { return r.headers } +func (r *readerStream) FullClose() error { return nil } +func (r *readerStream) Reset() error { return nil } + +func newReaderStream(data []byte, headers p2p.Headers) *readerStream { + return &readerStream{Reader: bytes.NewReader(data), headers: headers} +} + +// socTestCtx holds key material for building valid SOC publisher messages. +type socTestCtx struct { + signer crypto.Signer + owner []byte + gsocID []byte + topicAddr [32]byte +} + +func newSocTestCtx(t *testing.T) *socTestCtx { + t.Helper() + + privKey, err := crypto.GenerateSecp256k1Key() + if err != nil { + t.Fatal(err) + } + signer := crypto.NewDefaultSigner(privKey) + ownerAddr, err := signer.EthereumAddress() + if err != nil { + t.Fatal(err) + } + + gsocID := make([]byte, swarm.HashSize) + + topicAddress, err := soc.CreateAddress(gsocID, ownerAddr.Bytes()) + if err != nil { + t.Fatal(err) + } + + var topicArr [32]byte + copy(topicArr[:], topicAddress.Bytes()) + + return &socTestCtx{ + signer: signer, + owner: ownerAddr.Bytes(), + gsocID: gsocID, + topicAddr: topicArr, + } +} + +// buildPublisherMsg returns a valid pubsub publisher wire message: sig(65B)+span(8B)+payload. +func buildPublisherMsg(t *testing.T, tc *socTestCtx, payload []byte) []byte { + t.Helper() + + spanBytes := make([]byte, pubsub.SpanSize) + binary.LittleEndian.PutUint64(spanBytes, uint64(len(payload))) + + cacData := make([]byte, 0, pubsub.SpanSize+len(payload)) + cacData = append(cacData, spanBytes...) + cacData = append(cacData, payload...) + ch, err := cac.NewWithDataSpan(cacData) + if err != nil { + t.Fatal(err) + } + + h := swarm.NewHasher() + _, _ = h.Write(tc.gsocID) + _, _ = h.Write(ch.Address().Bytes()) + toSign := h.Sum(nil) + + sig, err := tc.signer.Sign(toSign) + if err != nil { + t.Fatal(err) + } + + msg := make([]byte, 0, len(sig)+pubsub.SpanSize+len(payload)) + msg = append(msg, sig...) + msg = append(msg, spanBytes[:pubsub.SpanSize]...) + msg = append(msg, payload...) + return msg +} + +func TestFormatBroadcast_Handshake(t *testing.T) { + t.Parallel() + + tc := newSocTestCtx(t) + mode := pubsub.NewGSOCEphemeralMode(tc.topicAddr[:], log.Noop) + mode.SetGsocParams(tc.owner, tc.gsocID) + + rawMsg := []byte("sig_span_payload_placeholder_data_65_8_N") + sub := pubsub.NewBrokerSubscriber() + + out := mode.FormatBroadcast(sub, rawMsg) + + if out[0] != pubsub.MsgTypeHandshake { + t.Fatalf("expected type 0x%02x, got 0x%02x", pubsub.MsgTypeHandshake, out[0]) + } + + gotID := out[1 : 1+pubsub.IDSize] + if !bytes.Equal(gotID, tc.gsocID) { + t.Fatalf("gsocID mismatch: got %x want %x", gotID, tc.gsocID) + } + + gotOwner := out[1+pubsub.IDSize : 1+pubsub.IDSize+pubsub.OwnerSize] + if !bytes.Equal(gotOwner, tc.owner) { + t.Fatalf("owner mismatch: got %x want %x", gotOwner, tc.owner) + } + + gotPayload := out[1+pubsub.IDSize+pubsub.OwnerSize:] + if !bytes.Equal(gotPayload, rawMsg) { + t.Fatalf("payload mismatch: got %x want %x", gotPayload, rawMsg) + } +} + +func TestFormatBroadcast_HandshakeOnce(t *testing.T) { + t.Parallel() + + tc := newSocTestCtx(t) + mode := pubsub.NewGSOCEphemeralMode(tc.topicAddr[:], log.Noop) + mode.SetGsocParams(tc.owner, tc.gsocID) + + rawMsg := []byte("data") + sub := pubsub.NewBrokerSubscriber() + + // First call: handshake + first := mode.FormatBroadcast(sub, rawMsg) + if first[0] != pubsub.MsgTypeHandshake { + t.Fatalf("expected handshake type on first call") + } + + // Second call: data only + second := mode.FormatBroadcast(sub, rawMsg) + if second[0] != pubsub.MsgTypeData { + t.Fatalf("expected data type on second call, got 0x%02x", second[0]) + } + if !bytes.Equal(second[1:], rawMsg) { + t.Fatalf("data payload mismatch") + } +} + +func TestFormatBroadcast_DataAfterHandshake(t *testing.T) { + t.Parallel() + + tc := newSocTestCtx(t) + mode := pubsub.NewGSOCEphemeralMode(tc.topicAddr[:], log.Noop) + mode.SetGsocParams(tc.owner, tc.gsocID) + + rawMsg := []byte("payload") + sub := pubsub.NewBrokerSubscriber() + sub.SetHandshakeDone() + + out := mode.FormatBroadcast(sub, rawMsg) + + if out[0] != pubsub.MsgTypeData { + t.Fatalf("expected type 0x%02x, got 0x%02x", pubsub.MsgTypeData, out[0]) + } + if !bytes.Equal(out[1:], rawMsg) { + t.Fatalf("data payload mismatch") + } +} + +func TestReadBrokerMessage_Ping(t *testing.T) { + t.Parallel() + + tc := newSocTestCtx(t) + mode := pubsub.NewGSOCEphemeralMode(tc.topicAddr[:], log.Noop) + + stream := newReaderStream([]byte{pubsub.MsgTypePing}, nil) + msg, err := mode.ReadBrokerMessage(stream) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if msg != nil { + t.Fatalf("expected nil msg for ping, got %v", msg) + } +} + +func TestReadBrokerMessage_Handshake(t *testing.T) { + t.Parallel() + + tc := newSocTestCtx(t) + mode := pubsub.NewGSOCEphemeralMode(tc.topicAddr[:], log.Noop) + + payload := []byte("sziasztok!") + publisherFrame := buildPublisherMsg(t, tc, payload) + + // Assemble handshake message: [0x02][gsocID(32B)][owner(20B)][sig(65B)][span(8B)][payload] + streamData := make([]byte, 0, 1+len(tc.gsocID)+len(tc.owner)+len(publisherFrame)) + streamData = append(streamData, pubsub.MsgTypeHandshake) + streamData = append(streamData, tc.gsocID...) + streamData = append(streamData, tc.owner...) + streamData = append(streamData, publisherFrame...) + + stream := newReaderStream(streamData, nil) + msg, err := mode.ReadBrokerMessage(stream) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if msg == nil { + t.Fatal("expected non-nil message from handshake") + } + + // Returned msg is [sig(65B)][span(8B)][payload]; verify it matches publisherFrame + if !bytes.Equal(msg, publisherFrame) { + t.Fatalf("returned message mismatch") + } +} + +func TestReadBrokerMessage_Data(t *testing.T) { + t.Parallel() + + tc := newSocTestCtx(t) + mode := pubsub.NewGSOCEphemeralMode(tc.topicAddr[:], log.Noop) + mode.SetGsocParams(tc.owner, tc.gsocID) + + payload := []byte("data message") + publisherFrame := buildPublisherMsg(t, tc, payload) + + // Assemble data message: [0x03][sig(65B)][span(8B)][payload] + streamData := make([]byte, 0, 1+len(publisherFrame)) + streamData = append(streamData, pubsub.MsgTypeData) + streamData = append(streamData, publisherFrame...) + + stream := newReaderStream(streamData, nil) + msg, err := mode.ReadBrokerMessage(stream) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if !bytes.Equal(msg, publisherFrame) { + t.Fatalf("data message mismatch: got %x want %x", msg, publisherFrame) + } +} + +func TestReadBrokerMessage_InvalidSig(t *testing.T) { + t.Parallel() + + tc := newSocTestCtx(t) + mode := pubsub.NewGSOCEphemeralMode(tc.topicAddr[:], log.Noop) + mode.SetGsocParams(tc.owner, tc.gsocID) + + payload := []byte("bad sig") + spanBytes := make([]byte, pubsub.SpanSize) + binary.LittleEndian.PutUint64(spanBytes, uint64(len(payload))) + badSig := make([]byte, pubsub.SigSize) // all zeros — invalid signature + + streamData := make([]byte, 0, 1+len(badSig)+len(spanBytes)+len(payload)) + streamData = append(streamData, pubsub.MsgTypeData) + streamData = append(streamData, badSig...) + streamData = append(streamData, spanBytes...) + streamData = append(streamData, payload...) + + stream := newReaderStream(streamData, nil) + _, err := mode.ReadBrokerMessage(stream) + if err == nil { + t.Fatal("expected error for invalid signature, got nil") + } +} + +func TestSetGsocParams_AddressMismatch(t *testing.T) { + t.Parallel() + + tc := newSocTestCtx(t) + mode := pubsub.NewGSOCEphemeralMode(tc.topicAddr[:], log.Noop) + + // Use a different ID that does not hash to topicAddr with tc.owner + wrongID := make([]byte, swarm.HashSize) + wrongID[0] = 0xff + + mode.SetGsocParams(tc.owner, wrongID) + + if mode.GsocOwner() != nil { + t.Fatal("expected gsocOwner to remain nil after address mismatch") + } +} + +func TestSetGsocParams_ValidParams(t *testing.T) { + t.Parallel() + + tc := newSocTestCtx(t) + mode := pubsub.NewGSOCEphemeralMode(tc.topicAddr[:], log.Noop) + + mode.SetGsocParams(tc.owner, tc.gsocID) + + if mode.GsocOwner() == nil { + t.Fatal("expected gsocOwner to be set after valid params") + } + if !bytes.Equal(mode.GsocOwner(), tc.owner) { + t.Fatalf("gsocOwner mismatch: got %x want %x", mode.GsocOwner(), tc.owner) + } +} + +func TestSubscriberConn_FanOut(t *testing.T) { + t.Parallel() + + sc := pubsub.NewTestSubscriberConn(log.Noop) + + id1, ch1 := sc.Subscribe() + id2, ch2 := sc.Subscribe() + defer sc.Unsubscribe(id1) + defer sc.Unsubscribe(id2) + + msg := []byte("broadcast") + sc.FanOut(msg) + + for i, ch := range []<-chan []byte{ch1, ch2} { + select { + case got := <-ch: + if !bytes.Equal(got, msg) { + t.Fatalf("subscriber %d: got %x want %x", i, got, msg) + } + default: + t.Fatalf("subscriber %d: no message received", i) + } + } +} + +func TestSubscriberConn_Unsubscribe(t *testing.T) { + t.Parallel() + + sc := pubsub.NewTestSubscriberConn(log.Noop) + + id, ch := sc.Subscribe() + sc.Unsubscribe(id) + + // Channel should be closed after unsubscribe. + select { + case _, ok := <-ch: + if ok { + t.Fatal("expected channel to be closed") + } + default: + t.Fatal("expected channel to be closed (not blocking)") + } +} + +func TestSubscriberConn_FanOut_SkipsFullChannel(t *testing.T) { + t.Parallel() + + sc := pubsub.NewTestSubscriberConn(log.Noop) + + id, ch := sc.Subscribe() + defer sc.Unsubscribe(id) + + // Fill the channel to capacity (16 slots). + for i := range cap(ch) { + sc.FanOut([]byte{byte(i)}) + } + + // This should not block even though the channel is full. + sc.FanOut([]byte("overflow")) +} diff --git a/pkg/pubsub/pubsub.go b/pkg/pubsub/pubsub.go new file mode 100644 index 00000000000..dea26128da3 --- /dev/null +++ b/pkg/pubsub/pubsub.go @@ -0,0 +1,373 @@ +// Copyright 2026 The Swarm Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package pubsub + +import ( + "context" + "errors" + "fmt" + "sync" + "time" + + "github.com/ethersphere/bee/v2/pkg/bzz" + "github.com/ethersphere/bee/v2/pkg/crypto" + "github.com/ethersphere/bee/v2/pkg/log" + "github.com/ethersphere/bee/v2/pkg/p2p" + "github.com/ethersphere/bee/v2/pkg/swarm" + ma "github.com/multiformats/go-multiaddr" +) + +const ( + loggerName = "pubsub" + protocolName = "pubsub" + protocolVersion = "1.0.0" + streamName = "msg" + + // p2p stream header keys + HeaderTopicAddress = "pubsub-topic-address" + HeaderMode = "pubsub-mode" + HeaderReadWrite = "pubsub-readwrite" // 1 = read+write (publisher), 0 = read-only (subscriber) + + // Mode constants + ModeGSOCEphemeral ModeID = 1 + + // Service-level broker message types. + // 0x01 is reserved for ping across all modes; mode-specific types start at 0x02. + MsgTypePing byte = 0x01 + + // streamPingInterval is how often the broker sends a keepalive ping to each subscriber. + streamPingInterval = 30 * time.Second + + // Wire format sizes + SpanSize = swarm.SpanSize // pubsub span: 8-byte little-endian uint64 (matches bee-js Span.LENGTH) + MaxPayload = swarm.ChunkSize + SigSize = swarm.SocSignatureSize + IDSize = swarm.HashSize + OwnerSize = crypto.AddressSize +) + +var ( + ErrBrokerDisabled = errors.New("pubsub: broker mode is disabled") + ErrInvalidHandshake = errors.New("pubsub: handshake verification failed") + ErrWrongHeaders = errors.New("pubsub: wrong required headers") + ErrTopicMismatch = errors.New("pubsub: topic address mismatch") +) + +// startBrokerWriter starts a goroutine that drains outCh to stream and sends +// keepalive pings on every streamPingInterval tick. +func startBrokerWriter(ctx context.Context, cancel context.CancelFunc, stream p2p.Stream, outCh <-chan []byte, overlay swarm.Address, logger log.Logger) { + go func() { + ticker := time.NewTicker(streamPingInterval) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case msg := <-outCh: + if err := writeRaw(stream, msg); err != nil { + logger.Info("broker write to subscriber failed", "peer", overlay, "error", err) + cancel() + return + } + logger.Info("broker wrote to subscriber", "peer", overlay, "size", len(msg)) + case <-ticker.C: + if err := writeRaw(stream, []byte{MsgTypePing}); err != nil { + cancel() + return + } + } + } + }() +} + +// readServiceMessage handles broker wire messages that are common to all modes. +// Returns (handled, err). The caller should return (nil, err) when handled is true. +func readServiceMessage(typeBuf byte) (handled bool, err error) { + switch typeBuf { + case MsgTypePing: + return true, nil + } + return false, nil +} + +func newMode(topicAddr [32]byte, modeID ModeID, logger log.Logger) (Mode, error) { + switch modeID { + case ModeGSOCEphemeral: + return NewGSOCEphemeralMode(topicAddr[:], logger), nil + default: + return nil, fmt.Errorf("pubsub: unknown mode: %d", modeID) + } +} + +// ConnectOptions carries optional mode-specific parameters for Connect. +type ConnectOptions struct { + ReadWrite bool // true = publisher (read+write), false = subscriber (read-only) + GsocOwner []byte + GsocID []byte +} + +// TopicInfo describes a topic for the list endpoint. +type TopicInfo struct { + TopicAddress string `json:"topicAddress"` + Mode ModeID `json:"mode"` + Role string `json:"role"` + Connections []string `json:"connections"` +} + +// TopicModeKey is a composite key for identifying a mode instance per topic. +type TopicModeKey struct { + TopicAddr [32]byte + ModeID ModeID +} + +// P2P groups the p2p capabilities needed by the pubsub service. +type P2P interface { + p2p.Service + p2p.Streamer + // ConnectAllowLight dials a peer, accepting it even if it identifies + // itself as a light node (broker peers may run in light-node mode). + ConnectAllowLight(ctx context.Context, addrs []ma.Multiaddr) (*bzz.Address, error) +} + +// Service is the pubsub protocol service. +type Service struct { + mu sync.RWMutex + p2p P2P + logger log.Logger + brokerMode bool + modes map[TopicModeKey]Mode // (topic, mode) -> mode instance +} + +func New(p2p P2P, logger log.Logger, brokerMode bool) *Service { + s := &Service{ + p2p: p2p, + logger: logger.WithName(loggerName).Register(), + brokerMode: brokerMode, + modes: make(map[TopicModeKey]Mode), + } + return s +} + +// Protocol returns the p2p protocol spec. +func (s *Service) Protocol() p2p.ProtocolSpec { + return p2p.ProtocolSpec{ + Name: protocolName, + Version: protocolVersion, + StreamSpecs: []p2p.StreamSpec{ + { + Name: streamName, + Handler: s.brokerHandler, + }, + }, + } +} + +// Connect establishes a subscriber connection to a broker peer. +func (s *Service) Connect(ctx context.Context, underlay ma.Multiaddr, topicAddr [32]byte, modeID ModeID, opts ConnectOptions) (Mode, error) { + key := TopicModeKey{TopicAddr: topicAddr, ModeID: modeID} + m, err := s.getOrCreateMode(key) + if err != nil { + return nil, err + } + + s.logger.Info("connecting to broker peer", "underlay", underlay) + bzzAddr, err := s.p2p.ConnectAllowLight(ctx, []ma.Multiaddr{underlay}) + if err != nil && !errors.Is(err, p2p.ErrAlreadyConnected) { + return nil, fmt.Errorf("connect to peer: %w", err) + } + s.logger.Info("connected to broker peer", "overlay", bzzAddr.Overlay) + + var sc *SubscriberConn + if existing := m.GetSubscriberConn(); existing != nil { + // Reuse the existing p2p stream — no new broker-side stream, just bump the ref count. + sc = m.CreateSubscriberConn(existing.Stream, bzzAddr.Overlay) + } else { + stream, err := m.Connect(ctx, s.p2p, bzzAddr.Overlay, opts) + if err != nil { + s.logger.Error(err, "open stream failed") + return nil, fmt.Errorf("open stream: %w", err) + } + sc = m.CreateSubscriberConn(stream, bzzAddr.Overlay) + if sc.Stream != stream { + // Race: another goroutine created the conn between our check and create. + _ = stream.FullClose() + } + } + + go func() { + <-ctx.Done() + m.RemoveSubscriberConn(sc) + }() + + return m, nil +} + +// Topics returns info about all active topics. +func (s *Service) Topics() []TopicInfo { + s.mu.RLock() + defer s.mu.RUnlock() + + topics := make([]TopicInfo, 0, len(s.modes)) + + for key, m := range s.modes { + info := TopicInfo{ + TopicAddress: fmt.Sprintf("%x", key.TopicAddr), + Mode: m.ID(), + Connections: m.SubscriberOverlays(), + } + sc := m.GetSubscriberConn() + switch { + case m.SubscriberCount() > 0 && sc != nil: + info.Role = "broker+subscriber" + info.Connections = append(info.Connections, sc.Overlay.String()) + case m.SubscriberCount() > 0: + info.Role = "broker" + case sc != nil: + info.Role = "subscriber" + info.Connections = []string{sc.Overlay.String()} + default: + continue + } + topics = append(topics, info) + } + + return topics +} + +// brokerHandler handles incoming streams on the broker side. +func (s *Service) brokerHandler(ctx context.Context, peer p2p.Peer, stream p2p.Stream) error { + s.logger.Info("broker handler invoked", "peer", peer.Address) + if !s.brokerMode { + _ = stream.Reset() + return ErrBrokerDisabled + } + + headers := stream.Headers() + + topicAddrBytes := headers[HeaderTopicAddress] + if len(topicAddrBytes) != IDSize { + _ = stream.Reset() + return ErrWrongHeaders + } + var topicAddr [32]byte + copy(topicAddr[:], topicAddrBytes) + + modeBytes := headers[HeaderMode] + if len(modeBytes) != 1 { + _ = stream.Reset() + return ErrWrongHeaders + } + key := TopicModeKey{TopicAddr: topicAddr, ModeID: ModeID(modeBytes[0])} + m, err := s.getOrCreateMode(key) + if err != nil { + _ = stream.Reset() + return err + } + + return m.HandleBroker(ctx, peer, stream, headers) +} + +func (s *Service) getOrCreateMode(key TopicModeKey) (Mode, error) { + s.mu.Lock() + defer s.mu.Unlock() + + if m, ok := s.modes[key]; ok { + return m, nil + } + + m, err := newMode(key.TopicAddr, key.ModeID, s.logger) + if err != nil { + return nil, err + } + + s.modes[key] = m + return m, nil +} + +// writeRaw writes raw bytes to the stream. +func writeRaw(stream p2p.Stream, data []byte) error { + c := 0 + for c < len(data) { + n, err := stream.Write(data[c:]) + if err != nil { + return err + } + c += n + } + return nil +} + +// brokerSubscriber holds a subscriber's stream and outgoing message channel. +type brokerSubscriber struct { + overlay swarm.Address + stream p2p.Stream + outCh chan []byte + cancel context.CancelFunc + handshakeHappened bool +} + +// SubscriberConn represents the shared subscriber-side p2p stream to a broker. +// Multiple WebSocket sessions can attach to one SubscriberConn via the mux. +type SubscriberConn struct { + Stream p2p.Stream + Overlay swarm.Address + + refs int // number of active WS sessions; protected by the owning mode's mu + writeMu sync.Mutex + subsMu sync.Mutex + subs map[uint64]chan []byte + nextID uint64 + logger log.Logger +} + +// Subscribe registers a new WS session and returns its per-session message channel. +func (sc *SubscriberConn) Subscribe() (uint64, <-chan []byte) { + sc.subsMu.Lock() + defer sc.subsMu.Unlock() + id := sc.nextID + sc.nextID++ + ch := make(chan []byte, 16) + sc.subs[id] = ch + return id, ch +} + +// Unsubscribe removes the WS session channel and closes it. +func (sc *SubscriberConn) Unsubscribe(id uint64) { + sc.subsMu.Lock() + defer sc.subsMu.Unlock() + if ch, ok := sc.subs[id]; ok { + close(ch) + delete(sc.subs, id) + } +} + +// fanOut broadcasts a message to all registered WS session channels. +func (sc *SubscriberConn) fanOut(msg []byte) { + sc.subsMu.Lock() + defer sc.subsMu.Unlock() + for _, ch := range sc.subs { + select { + case ch <- msg: + default: + sc.logger.Warning("pubsub: subscriber ws channel full, dropping message") + } + } +} + +func (sc *SubscriberConn) closeAll() { + sc.subsMu.Lock() + defer sc.subsMu.Unlock() + for id, ch := range sc.subs { + close(ch) + delete(sc.subs, id) + } +} + +// WriteToStream serializes concurrent writes from multiple WS sessions. +func (sc *SubscriberConn) WriteToStream(data []byte) error { + sc.writeMu.Lock() + defer sc.writeMu.Unlock() + return writeRaw(sc.Stream, data) +} diff --git a/pkg/pubsub/pubsub_test.go b/pkg/pubsub/pubsub_test.go new file mode 100644 index 00000000000..8de22d31ff5 --- /dev/null +++ b/pkg/pubsub/pubsub_test.go @@ -0,0 +1,180 @@ +// Copyright 2026 The Swarm Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package pubsub_test + +import ( + "context" + "errors" + "io" + "testing" + "time" + + "github.com/ethersphere/bee/v2/pkg/log" + "github.com/ethersphere/bee/v2/pkg/p2p" + "github.com/ethersphere/bee/v2/pkg/pubsub" + "github.com/ethersphere/bee/v2/pkg/spinlock" + "github.com/ethersphere/bee/v2/pkg/swarm" +) + +// pipeStream is a p2p.Stream backed by an io.Pipe pair for bidirectional use. +type pipeStream struct { + pr *io.PipeReader + pw *io.PipeWriter + headers p2p.Headers +} + +func (p *pipeStream) Read(b []byte) (int, error) { return p.pr.Read(b) } +func (p *pipeStream) Write(b []byte) (int, error) { return p.pw.Write(b) } +func (p *pipeStream) Close() error { p.pr.Close(); p.pw.Close(); return nil } +func (p *pipeStream) ResponseHeaders() p2p.Headers { return nil } +func (p *pipeStream) Headers() p2p.Headers { return p.headers } +func (p *pipeStream) FullClose() error { return p.Close() } +func (p *pipeStream) Reset() error { + p.pr.CloseWithError(io.ErrUnexpectedEOF) + p.pw.CloseWithError(io.ErrUnexpectedEOF) + return nil +} + +// newService creates a pubsub.Service with a nil P2P backend. +// This is valid for tests that never reach any P2P method call. +func newService(t *testing.T, brokerMode bool) *pubsub.Service { + t.Helper() + return pubsub.New(nil, log.Noop, brokerMode) +} + +func TestBrokerHandler_Disabled(t *testing.T) { + t.Parallel() + + svc := newService(t, false) + handler := svc.Protocol().StreamSpecs[0].Handler + + var topicAddr [32]byte + headers := p2p.Headers{ + pubsub.HeaderTopicAddress: topicAddr[:], + pubsub.HeaderMode: {byte(pubsub.ModeGSOCEphemeral)}, + pubsub.HeaderReadWrite: {0}, + } + stream := newReaderStream(nil, headers) + peer := p2p.Peer{Address: swarm.NewAddress(make([]byte, 32))} + + err := handler(context.Background(), peer, stream) + if !errors.Is(err, pubsub.ErrBrokerDisabled) { + t.Fatalf("expected ErrBrokerDisabled, got %v", err) + } +} + +func TestBrokerHandler_MissingTopicAddress(t *testing.T) { + t.Parallel() + + svc := newService(t, true) + handler := svc.Protocol().StreamSpecs[0].Handler + + headers := p2p.Headers{ + pubsub.HeaderMode: {byte(pubsub.ModeGSOCEphemeral)}, + } + stream := newReaderStream(nil, headers) + peer := p2p.Peer{Address: swarm.NewAddress(make([]byte, 32))} + + err := handler(context.Background(), peer, stream) + if !errors.Is(err, pubsub.ErrWrongHeaders) { + t.Fatalf("expected ErrWrongHeaders, got %v", err) + } +} + +func TestBrokerHandler_MissingMode(t *testing.T) { + t.Parallel() + + svc := newService(t, true) + handler := svc.Protocol().StreamSpecs[0].Handler + + var topicAddr [32]byte + headers := p2p.Headers{ + pubsub.HeaderTopicAddress: topicAddr[:], + } + stream := newReaderStream(nil, headers) + peer := p2p.Peer{Address: swarm.NewAddress(make([]byte, 32))} + + err := handler(context.Background(), peer, stream) + if !errors.Is(err, pubsub.ErrWrongHeaders) { + t.Fatalf("expected ErrWrongHeaders, got %v", err) + } +} + +func TestBrokerHandler_UnknownMode(t *testing.T) { + t.Parallel() + + svc := newService(t, true) + handler := svc.Protocol().StreamSpecs[0].Handler + + var topicAddr [32]byte + headers := p2p.Headers{ + pubsub.HeaderTopicAddress: topicAddr[:], + pubsub.HeaderMode: {0xff}, + } + stream := newReaderStream(nil, headers) + peer := p2p.Peer{Address: swarm.NewAddress(make([]byte, 32))} + + err := handler(context.Background(), peer, stream) + if err == nil { + t.Fatal("expected error for unknown mode, got nil") + } + if errors.Is(err, pubsub.ErrBrokerDisabled) || errors.Is(err, pubsub.ErrWrongHeaders) { + t.Fatalf("unexpected sentinel error for unknown mode: %v", err) + } +} + +func TestService_Topics_Empty(t *testing.T) { + t.Parallel() + + svc := newService(t, true) + if topics := svc.Topics(); len(topics) != 0 { + t.Fatalf("expected empty topics, got %d", len(topics)) + } +} + +func TestService_Topics_BrokerRole(t *testing.T) { + t.Parallel() + + svc := newService(t, true) + handler := svc.Protocol().StreamSpecs[0].Handler + + tc := newSocTestCtx(t) + headers := p2p.Headers{ + pubsub.HeaderTopicAddress: tc.topicAddr[:], + pubsub.HeaderMode: {byte(pubsub.ModeGSOCEphemeral)}, + pubsub.HeaderReadWrite: {0}, // subscriber on broker side + } + + pr, pw := io.Pipe() + stream := &pipeStream{pr: pr, pw: pw, headers: headers} + peer := p2p.Peer{Address: swarm.NewAddress(make([]byte, 32))} + + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(func() { + cancel() + _ = stream.Close() + }) + + handlerErrCh := make(chan error, 1) + go func() { + handlerErrCh <- handler(ctx, peer, stream) + }() + + err := spinlock.Wait(time.Second, func() bool { + for _, topic := range svc.Topics() { + if topic.Role == "broker" { + return true + } + } + return false + }) + if err != nil { + t.Fatal("timed out waiting for broker topic to appear") + } + + cancel() + _ = stream.Close() + <-handlerErrCh +} diff --git a/pkg/pubsub/ws.go b/pkg/pubsub/ws.go new file mode 100644 index 00000000000..1cc65793ac4 --- /dev/null +++ b/pkg/pubsub/ws.go @@ -0,0 +1,126 @@ +// Copyright 2026 The Swarm Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package pubsub + +import ( + "context" + "time" + + "github.com/ethersphere/bee/v2/pkg/log" + "github.com/gorilla/websocket" +) + +type WsOptions struct { + PingPeriod time.Duration + Cancel context.CancelFunc +} + +// ListeningWs bridges a subscriber's p2p stream to a WebSocket connection. +// The Mode handles all wire-format details: reading broker messages, +// verifying them, and returning the payload to forward to the WebSocket. +// If the subscriber is a Publisher, it also reads from the WebSocket +// and writes raw messages to the p2p stream. +func ListeningWs(ctx context.Context, conn *websocket.Conn, options WsOptions, logger log.Logger, mode Mode, isPublisher bool) { + sc := mode.GetSubscriberConn() + subID, msgCh := sc.Subscribe() + var ( + ticker = time.NewTicker(options.PingPeriod) + writeDeadline = options.PingPeriod + time.Second + readDeadline = options.PingPeriod + time.Second + ) + + logger.Info("pubsub ws: starting", "topic", mode.TopicAddress(), "isPublisher", isPublisher, "pingPeriod", options.PingPeriod) + + conn.SetCloseHandler(func(code int, text string) error { + logger.Info("pubsub ws: client gone", "topic", mode.TopicAddress(), "code", code, "message", text) + options.Cancel() + return nil + }) + + // Reset read deadline on every pong so idle subscribers don't time out. + conn.SetPongHandler(func(appData string) error { + if err := conn.SetReadDeadline(time.Now().Add(readDeadline)); err != nil { + return err + } + return conn.SetWriteDeadline(time.Now().Add(writeDeadline)) + }) + + // A read loop is always required so gorilla can process pong responses + // and close frames from the client. + go func() { + for { + if err := conn.SetReadDeadline(time.Now().Add(readDeadline)); err != nil { + logger.Info("pubsub ws: set read deadline failed", "error", err) + break + } + msgType, p, err := conn.ReadMessage() + if err != nil { + if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseNormalClosure) { + logger.Info("pubsub ws: read error", "error", err) + } else { + logger.Info("pubsub ws: read loop ended", "error", err) + } + break + } + + if isPublisher { + logger.Info("pubsub ws: publisher message from ws", "type", msgType, "size", len(p)) + if err := sc.WriteToStream(p); err != nil { + logger.Info("pubsub ws: write to p2p stream failed", "error", err) + break + } + } + } + options.Cancel() + }() + + // Forward mux-delivered broker messages to this WebSocket session. + go func() { + defer sc.Unsubscribe(subID) + for { + select { + case <-ctx.Done(): + logger.Info("pubsub ws: p2p reader context done") + return + case msg, ok := <-msgCh: + if !ok { + logger.Info("pubsub ws: mux channel closed") + options.Cancel() + return + } + logger.Info("pubsub ws: forwarding broker message to ws", "size", len(msg)) + if err := conn.WriteMessage(websocket.BinaryMessage, msg); err != nil { + logger.Info("pubsub ws: write to ws failed", "error", err) + options.Cancel() + return + } + } + } + }() + + defer func() { + ticker.Stop() + _ = conn.Close() + logger.Info("pubsub ws: closed", "topic", mode.TopicAddress()) + }() + + for { + if err := conn.SetWriteDeadline(time.Now().Add(writeDeadline)); err != nil { + logger.Info("pubsub ws: set write deadline failed", "error", err) + return + } + select { + case <-ctx.Done(): + logger.Info("pubsub ws: context cancelled, closing") + _ = conn.WriteMessage(websocket.CloseMessage, []byte{}) + return + case <-ticker.C: + if err := conn.WriteMessage(websocket.PingMessage, nil); err != nil { + logger.Info("pubsub ws: ping failed, closing", "error", err) + return + } + } + } +}