Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
ca9a067
feat: pubsub
nugaon Apr 14, 2026
218cd08
refactor: mode id enum type
nugaon Apr 16, 2026
2f1f4dc
refactor: rename participant to publisher
nugaon Apr 16, 2026
4a01a68
docs: openapi
nugaon Apr 16, 2026
f1710db
fix: eth address instead of public key on api
nugaon Apr 17, 2026
0f05391
fix: pubsub header parsing
bosi95 Apr 20, 2026
c3921c4
fix: config cli option for broker mode
nugaon Apr 20, 2026
837730f
fix: http hijacked write error
nugaon Apr 21, 2026
96983da
chore: debugging
nugaon Apr 21, 2026
4f83e7f
fix: libp2p backoff clear
bosi95 Apr 21, 2026
cba55c8
fix: http hijack handler
bosi95 Apr 21, 2026
4c08e72
fix: hijacked websocket error
nugaon Apr 21, 2026
54a62a8
chore: logs debugging
nugaon Apr 21, 2026
92cf330
fix: pingpong with subscriber
nugaon Apr 21, 2026
24accc9
fix: upgrade ws no compress
bosi95 Apr 21, 2026
a30f1dc
fix: readdeadline on pong
nugaon Apr 21, 2026
b59e6e2
fix(broker): eof return
nugaon Apr 21, 2026
b108e69
chore: logs
nugaon Apr 21, 2026
7a31252
fix: wrong connection mapping and many refactors
nugaon Apr 22, 2026
525f6d8
fix: eof in websocket
nugaon Apr 22, 2026
b7783ce
fix: p2p ping
nugaon Apr 22, 2026
92ba5ed
chore: debugging on broker side
nugaon Apr 22, 2026
d2910b9
chore: debug
nugaon Apr 22, 2026
2670176
chore: add logs for debugging
bosi95 Apr 22, 2026
ac843ad
chore: debug instead of info log
nugaon Apr 22, 2026
af506a2
chore: message check
nugaon Apr 22, 2026
dd4172d
fix: soc sig validation
bosi95 Apr 22, 2026
5633c63
fix: chunk span calc
bosi95 Apr 22, 2026
f14697b
fix: span size 8
bosi95 Apr 22, 2026
991ce65
fix: swap pubsub ws upgrade and connect order
bosi95 Apr 23, 2026
0272fff
fix: revert ws and svc connect order and cancel subscriberConn if pre…
bosi95 Apr 23, 2026
70de89c
fix: unregister delete overlay
bosi95 Apr 23, 2026
775bcb0
feat: allow light node mode
nugaon Apr 23, 2026
0a7f7b7
fix: remove and create sub conn race conditions
bosi95 Apr 23, 2026
2242743
fix: multiple ws on the same topic
nugaon Apr 23, 2026
3998eb8
refactor: remove unnecessary param in runMux
nugaon Apr 29, 2026
da3bb7e
refactor: move connection related struct defs and methods to service
nugaon Apr 29, 2026
510d9a6
refactor: subtract the pinging service from mode
nugaon Apr 29, 2026
d0b43fe
docs: correction of size in the span
nugaon Apr 29, 2026
699ef1e
fix: removing duplicated stream close, leave only cancel
nugaon Apr 29, 2026
bdb7453
refactor: log out dropping messages when ws buffer is full
nugaon Apr 29, 2026
387337f
fix: clear subscriberConn in runMux defer to prevent stale reference …
nugaon Apr 29, 2026
4ac575c
fix: msg type in mode
nugaon Apr 30, 2026
9420ee1
fix: remove devmode
nugaon May 12, 2026
c503fd9
refactor: remove api header and use only query
nugaon May 12, 2026
6bfe79f
refactor: remove pubsub max connections
nugaon May 15, 2026
dda3237
fix: ci linting
nugaon May 15, 2026
1b032bc
refactor: move pubsub schemas from headers to components section
nugaon May 15, 2026
cfa2fb1
fix: subscriberConn null set concurrency
nugaon May 15, 2026
2fe4c32
test: init
nugaon May 21, 2026
d78bccf
Merge branch 'master' into feat/pubsub
nugaon May 21, 2026
c3e2d28
fix: linting
nugaon May 21, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cmd/bee/cmd/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ const (
optionAutoTLSDomain = "autotls-domain"
optionAutoTLSRegistrationEndpoint = "autotls-registration-endpoint"
optionAutoTLSCAEndpoint = "autotls-ca-endpoint"
optionNamePubsubBrokerMode = "pubsub-broker-mode"
optionUseSIMD = "use-simd-hashing"

// blockchain-rpc
Expand Down Expand Up @@ -335,6 +336,7 @@ func (c *command) setAllFlags(cmd *cobra.Command) {
cmd.Flags().String(optionAutoTLSDomain, p2pforge.DefaultForgeDomain, "autotls domain")
cmd.Flags().String(optionAutoTLSRegistrationEndpoint, p2pforge.DefaultForgeEndpoint, "autotls registration endpoint")
cmd.Flags().String(optionAutoTLSCAEndpoint, p2pforge.DefaultCAEndpoint, "autotls certificate authority endpoint")
cmd.Flags().Bool(optionNamePubsubBrokerMode, true, "enable pubsub broker mode")
cmd.Flags().Bool(optionUseSIMD, false, "use SIMD BMT hasher (available only on linux amd64 platforms)")
}

Expand Down
1 change: 1 addition & 0 deletions cmd/bee/cmd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,7 @@ func buildBeeNode(ctx context.Context, c *command, cmd *cobra.Command, logger lo
WarmupTime: c.config.GetDuration(optionWarmUpTime),
WelcomeMessage: c.config.GetString(optionWelcomeMessage),
WhitelistedWithdrawalAddress: c.config.GetStringSlice(optionNameWhitelistedWithdrawalAddress),
PubsubBrokerMode: c.config.GetBool(optionNamePubsubBrokerMode),
})

return b, err
Expand Down
54 changes: 54 additions & 0 deletions openapi/Swarm.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2541,3 +2541,57 @@ paths:
$ref: "SwarmCommon.yaml#/components/responses/400"
default:
description: Default response.

"/pubsub/{topic}":
get:
summary: Connect to a pubsub topic via WebSocket
description: |
Opens a WebSocket connection to a pubsub topic. The connection acts as either a publisher (read+write)
or subscriber (read-only) depending on the presence of GSOC query params.

**WebSocket protocol:**
- Inbound (client → node, publisher only): raw SOC payload `[sig:65B][span:8B][payload:N B]`
- Outbound (node → client): raw SOC payload `[sig:65B][span:8B][payload:N B]`
tags:
- Pubsub
parameters:
- in: path
name: topic
schema:
type: string
required: true
description: Topic identifier (hex-encoded address or arbitrary string to be hashed)
- $ref: "SwarmCommon.yaml#/components/parameters/SwarmPubsubPeer"
- $ref: "SwarmCommon.yaml#/components/parameters/SwarmPubsubGsocEthAddress"
- $ref: "SwarmCommon.yaml#/components/parameters/SwarmPubsubGsocTopic"
- in: header
name: swarm-keep-alive
schema:
type: integer
required: false
description: "WebSocket ping period in seconds (default: 60)"
responses:
"101":
description: WebSocket upgrade successful
"400":
$ref: "SwarmCommon.yaml#/components/responses/400"
"500":
$ref: "SwarmCommon.yaml#/components/responses/500"

"/pubsub/":
get:
summary: List all pubsub topics
description: Returns a list of all active pubsub topics this node is participating in (as broker or subscriber)
tags:
- Pubsub
responses:
"200":
description: List of pubsub topics
content:
application/json:
schema:
$ref: "SwarmCommon.yaml#/components/schemas/PubsubTopicListResponse"
"400":
$ref: "SwarmCommon.yaml#/components/responses/400"
"500":
$ref: "SwarmCommon.yaml#/components/responses/500"
52 changes: 51 additions & 1 deletion openapi/SwarmCommon.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1052,6 +1052,33 @@ components:
properties:
transactionHash:
$ref: "#/components/schemas/TransactionHash"

PubsubTopicInfo:
type: object
properties:
topicAddress:
type: string
description: "Hex-encoded topic address"
mode:
type: integer
description: "Pubsub mode identifier"
role:
type: string
description: "Role of this node: 'broker' or 'subscriber'"
connections:
type: array
items:
type: string
description: "List of connected peer overlays"

PubsubTopicListResponse:
type: object
properties:
topics:
type: array
items:
$ref: "#/components/schemas/PubsubTopicInfo"

headers:
SwarmTag:
description: "Tag UID"
Expand Down Expand Up @@ -1095,7 +1122,6 @@ components:
required: false
description: "Indicates which feed version was resolved (v1 or v2)"


parameters:
GasPriceParameter:
in: header
Expand Down Expand Up @@ -1308,6 +1334,30 @@ components:
required: false
description: "ACT history Unix timestamp"

SwarmPubsubPeer:
in: query
name: peer
schema:
type: string
required: true
description: "Multiaddress of the broker peer to connect to for pubsub"

SwarmPubsubGsocEthAddress:
in: query
name: gsoc-eth-address
schema:
$ref: "#/components/schemas/HexString"
required: false
description: "GSOC owner Ethereum address (20 bytes, hex-encoded) for publisher role. Required together with gsoc-topic to upgrade to publisher."

SwarmPubsubGsocTopic:
in: query
name: gsoc-topic
schema:
$ref: "#/components/schemas/HexString"
required: false
description: "GSOC topic identifier (hex) for publisher role. Required together with gsoc-eth-address to upgrade to publisher."

responses:
"200":
description: Success
Expand Down
13 changes: 8 additions & 5 deletions pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/ethersphere/bee/v2/pkg/postage"
"github.com/ethersphere/bee/v2/pkg/postage/postagecontract"
"github.com/ethersphere/bee/v2/pkg/pss"
"github.com/ethersphere/bee/v2/pkg/pubsub"
"github.com/ethersphere/bee/v2/pkg/resolver"
"github.com/ethersphere/bee/v2/pkg/resolver/client/ens"
"github.com/ethersphere/bee/v2/pkg/resolver/multiresolver"
Expand Down Expand Up @@ -93,11 +94,10 @@ const (
SwarmActTimestampHeader = "Swarm-Act-Timestamp"
SwarmActPublisherHeader = "Swarm-Act-Publisher"
SwarmActHistoryAddressHeader = "Swarm-Act-History-Address"

ImmutableHeader = "Immutable"
GasPriceHeader = "Gas-Price"
GasLimitHeader = "Gas-Limit"
ETagHeader = "ETag"
ImmutableHeader = "Immutable"
GasPriceHeader = "Gas-Price"
GasLimitHeader = "Gas-Limit"
ETagHeader = "ETag"

AuthorizationHeader = "Authorization"
AcceptEncodingHeader = "Accept-Encoding"
Expand Down Expand Up @@ -185,6 +185,7 @@ type Service struct {

topologyDriver topology.Driver
p2p p2p.DebugService
pubsubSvc *pubsub.Service
accounting accounting.Interface
chequebook chequebook.Service
pseudosettle settlement.Interface
Expand Down Expand Up @@ -268,6 +269,7 @@ type ExtraOptions struct {
SyncStatus func() (bool, error)
NodeStatus *status.Service
PinIntegrity PinIntegrity
PubsubService *pubsub.Service
}

func New(
Expand Down Expand Up @@ -355,6 +357,7 @@ func (s *Service) Configure(signer crypto.Signer, tracer *tracing.Tracer, o Opti
s.lightNodes = e.LightNodes
s.pseudosettle = e.Pseudosettle
s.blockTime = e.BlockTime
s.pubsubSvc = e.PubsubService

s.statusSem = semaphore.NewWeighted(1)
s.postageSem = semaphore.NewWeighted(1)
Expand Down
3 changes: 3 additions & 0 deletions pkg/api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"github.com/ethersphere/bee/v2/pkg/postage/postagecontract"
contractMock "github.com/ethersphere/bee/v2/pkg/postage/postagecontract/mock"
"github.com/ethersphere/bee/v2/pkg/pss"
"github.com/ethersphere/bee/v2/pkg/pubsub"
"github.com/ethersphere/bee/v2/pkg/pusher"
"github.com/ethersphere/bee/v2/pkg/resolver"
resolverMock "github.com/ethersphere/bee/v2/pkg/resolver/mock"
Expand Down Expand Up @@ -135,6 +136,7 @@ type testServerOptions struct {
FullAPIDisabled bool
ChequebookDisabled bool
SwapDisabled bool
PubsubService *pubsub.Service
}

func newTestServer(t *testing.T, o testServerOptions) (*http.Client, *websocket.Conn, string, *chanStorer) {
Expand Down Expand Up @@ -205,6 +207,7 @@ func newTestServer(t *testing.T, o testServerOptions) (*http.Client, *websocket.
Staking: o.StakingContract,
NodeStatus: o.NodeStatus,
PinIntegrity: o.PinIntegrity,
PubsubService: o.PubsubService,
}

// By default bee mode is set to full mode.
Expand Down
137 changes: 137 additions & 0 deletions pkg/api/pubsub.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
// Copyright 2026 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package api

import (
"context"
"encoding/hex"
"net/http"
"time"

"github.com/ethersphere/bee/v2/pkg/jsonhttp"
"github.com/ethersphere/bee/v2/pkg/pubsub"
"github.com/ethersphere/bee/v2/pkg/swarm"
"github.com/gorilla/mux"
"github.com/gorilla/websocket"
ma "github.com/multiformats/go-multiaddr"
)

func (s *Service) pubsubWsHandler(w http.ResponseWriter, r *http.Request) {
logger := s.logger.WithName("pubsub").Build()

paths := struct {
Topic string `map:"topic" validate:"required"`
}{}
if response := s.mapStructure(mux.Vars(r), &paths); response != nil {
response("invalid path params", logger, w)
return
}

var topicAddr [32]byte
if decoded, err := hex.DecodeString(paths.Topic); err == nil && len(decoded) == swarm.HashSize {
copy(topicAddr[:], decoded)
} else {
h := swarm.NewHasher()
_, _ = h.Write([]byte(paths.Topic))
copy(topicAddr[:], h.Sum(nil))
}

peerMultiaddr := r.URL.Query().Get("peer")
if peerMultiaddr == "" {
jsonhttp.BadRequest(w, "missing peer query param")
return
}
underlay, err := ma.NewMultiaddr(peerMultiaddr)
if err != nil {
logger.Info("invalid peer multiaddr", "value", peerMultiaddr, "error", err)
jsonhttp.BadRequest(w, "invalid peer query param")
return
}

var connectOpts pubsub.ConnectOptions

gsocEthAddrHex := r.URL.Query().Get("gsoc-eth-address")
gsocTopicHex := r.URL.Query().Get("gsoc-topic")
if gsocEthAddrHex != "" && gsocTopicHex != "" {
gsocOwner, err := hex.DecodeString(gsocEthAddrHex)
if err != nil {
jsonhttp.BadRequest(w, "invalid gsoc-eth-address query param")
return
}
gsocID, err := hex.DecodeString(gsocTopicHex)
if err != nil {
jsonhttp.BadRequest(w, "invalid gsoc-topic query param")
return
}
connectOpts.GsocOwner = gsocOwner
connectOpts.GsocID = gsocID
connectOpts.ReadWrite = true
}

headers := struct {
KeepAlive int `map:"Swarm-Keep-Alive"`
}{}
if response := s.mapStructure(r.Header, &headers); response != nil {
response("invalid header params", logger, w)
return
}

// Connect to broker peer
ctx, cancel := context.WithCancel(context.Background())
mode, err := s.pubsubSvc.Connect(ctx, underlay, topicAddr, pubsub.ModeGSOCEphemeral, connectOpts)
if err != nil {
cancel()
logger.Info("pubsub connect failed", "error", err)
jsonhttp.InternalServerError(w, "pubsub connect failed")
return
}
// Upgrade to WebSocket
upgrader := websocket.Upgrader{
ReadBufferSize: swarm.ChunkWithSpanSize,
WriteBufferSize: swarm.ChunkWithSpanSize,
CheckOrigin: s.checkOrigin,
}

logger.Info("upgrading to websocket")
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
// Upgrade() hijacks the connection before returning an error,
// so do NOT write an HTTP response here.
cancel()
logger.Info("websocket upgrade failed", "error", err)
logger.Error(nil, "websocket upgrade failed")
return
}
logger.Info("websocket upgrade successful")

pingPeriod := time.Duration(headers.KeepAlive) * time.Second
if pingPeriod == 0 {
pingPeriod = time.Minute
}

isPublisher := connectOpts.ReadWrite

s.wsWg.Add(1)
go func() {
pubsub.ListeningWs(ctx, conn, pubsub.WsOptions{PingPeriod: pingPeriod, Cancel: cancel}, logger, mode, isPublisher)
cancel()
_ = conn.Close()
s.wsWg.Done()
}()
}

func (s *Service) pubsubListHandler(w http.ResponseWriter, r *http.Request) {
if s.pubsubSvc == nil {
jsonhttp.NotFound(w, "pubsub service not available")
return
}

topics := s.pubsubSvc.Topics()
jsonhttp.OK(w, struct {
Topics []pubsub.TopicInfo `json:"topics"`
}{
Topics: topics,
})
}
Loading
Loading