From 8e9f4cb859957dae28298122151c206838d013c0 Mon Sep 17 00:00:00 2001 From: Moses Narrow <36607567+0pcom@users.noreply.github.com> Date: Thu, 26 Mar 2026 13:10:27 -0500 Subject: [PATCH 1/4] Fix server CPU exhaustion under high stream load - Enforce maxSessions limit: reject new TCP connections when at capacity instead of accepting and logging a debug message - Add per-session concurrent stream limit (2048) using a semaphore to prevent a single session (e.g. setup-node) from spawning unbounded goroutines that starve the CPU - Add backoff delay (50ms) on non-fatal stream accept errors to prevent tight CPU spin loops when persistent errors occur - Streams that exceed the concurrency limit are immediately closed rather than queued, providing backpressure to the client --- pkg/dmsg/server.go | 8 +++++--- pkg/dmsg/server_session.go | 41 ++++++++++++++++++++++++++++++++++++-- 2 files changed, 44 insertions(+), 5 deletions(-) diff --git a/pkg/dmsg/server.go b/pkg/dmsg/server.go index eef196ba..c474d2b0 100644 --- a/pkg/dmsg/server.go +++ b/pkg/dmsg/server.go @@ -150,12 +150,14 @@ func (s *Server) Serve(lis net.Listener, addr string) error { return err } - // TODO(evanlinjin): Implement proper load-balancing. - if s.SessionCount() >= s.maxSessions { + if s.SessionCount() >= s.maxSessions { s.log. WithField("max_sessions", s.maxSessions). WithField("remote_tcp", conn.RemoteAddr()). - Debug("Max sessions is reached, but still accepting so clients who delegated us can still listen.") + Warn("Max sessions reached, rejecting connection.") + conn.Close() //nolint:errcheck,gosec + time.Sleep(10 * time.Millisecond) + continue } s.wg.Add(1) diff --git a/pkg/dmsg/server_session.go b/pkg/dmsg/server_session.go index b2565578..8c70b9ff 100644 --- a/pkg/dmsg/server_session.go +++ b/pkg/dmsg/server_session.go @@ -5,6 +5,7 @@ import ( "fmt" "io" "net" + "time" "github.com/hashicorp/yamux" "github.com/sirupsen/logrus" @@ -15,6 +16,16 @@ import ( "github.com/skycoin/dmsg/pkg/noise" ) +const ( + // maxConcurrentStreams limits how many streams can be served concurrently + // per session to prevent a single session from exhausting server resources. + maxConcurrentStreams = 2048 + + // streamErrorBackoff is the delay after a non-fatal stream accept error + // to prevent CPU spin on persistent errors. + streamErrorBackoff = 50 * time.Millisecond +) + // ServerSession represents a session from the perspective of a dmsg server. type ServerSession struct { *SessionCommon @@ -45,6 +56,10 @@ func (ss *ServerSession) Close() error { func (ss *ServerSession) Serve() { ss.m.RecordSession(metrics.DeltaConnect) // record successful connection defer ss.m.RecordSession(metrics.DeltaDisconnect) // record disconnection + + // Semaphore to limit concurrent streams per session. + sem := make(chan struct{}, maxConcurrentStreams) + if ss.sm.smux != nil { for { sStr, err := ss.sm.smux.AcceptStream() @@ -54,13 +69,24 @@ func (ss *ServerSession) Serve() { return } ss.log.WithError(err).Warn("Failed to accept smux stream, continuing...") + time.Sleep(streamErrorBackoff) continue } log := ss.log.WithField("smux_id", sStr.ID()) - log.Info("Initiating stream.") + // Acquire semaphore slot; if full, reject the stream. + select { + case sem <- struct{}{}: + default: + log.Warn("Max concurrent streams reached, rejecting stream.") + sStr.Close() //nolint:errcheck,gosec + continue + } + + log.Info("Initiating stream.") go func(sStr *smux.Stream) { + defer func() { <-sem }() defer func() { if r := recover(); r != nil { log.WithField("panic", r).Error("Recovered from panic in serveStream") @@ -79,13 +105,24 @@ func (ss *ServerSession) Serve() { return } ss.log.WithError(err).Warn("Failed to accept yamux stream, continuing...") + time.Sleep(streamErrorBackoff) continue } log := ss.log.WithField("yamux_id", yStr.StreamID()) - log.Info("Initiating stream.") + // Acquire semaphore slot; if full, reject the stream. + select { + case sem <- struct{}{}: + default: + log.Warn("Max concurrent streams reached, rejecting stream.") + yStr.Close() //nolint:errcheck,gosec + continue + } + + log.Info("Initiating stream.") go func(yStr *yamux.Stream) { + defer func() { <-sem }() defer func() { if r := recover(); r != nil { log.WithField("panic", r).Error("Recovered from panic in serveStream") From 72166675a55f9470621b1946012d2d5aa0cdf382 Mon Sep 17 00:00:00 2001 From: Moses Narrow <36607567+0pcom@users.noreply.github.com> Date: Thu, 26 Mar 2026 13:14:21 -0500 Subject: [PATCH 2/4] Revert maxSessions rejection to original behavior maxSessions only controls discovery advertisement, not connection acceptance. Services and visors connect to all servers regardless of advertised load, so rejecting sessions would break connectivity. --- pkg/dmsg/server.go | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/pkg/dmsg/server.go b/pkg/dmsg/server.go index c474d2b0..46c3eb1b 100644 --- a/pkg/dmsg/server.go +++ b/pkg/dmsg/server.go @@ -154,10 +154,7 @@ func (s *Server) Serve(lis net.Listener, addr string) error { s.log. WithField("max_sessions", s.maxSessions). WithField("remote_tcp", conn.RemoteAddr()). - Warn("Max sessions reached, rejecting connection.") - conn.Close() //nolint:errcheck,gosec - time.Sleep(10 * time.Millisecond) - continue + Debug("Max sessions is reached, but still accepting so clients who delegated us can still listen.") } s.wg.Add(1) From 1325c948651d178af07399fcdba9b88e503fd518 Mon Sep 17 00:00:00 2001 From: Moses Narrow <36607567+0pcom@users.noreply.github.com> Date: Thu, 26 Mar 2026 13:21:32 -0500 Subject: [PATCH 3/4] Add stream read deadline and fix indentation - Add read deadline (HandshakeTimeout) on initial stream request read so slow or malicious clients cannot hold goroutines and semaphore slots indefinitely. Deadline is cleared before the long-lived bidirectional copy loop. - Remove stale TODO comment in server accept loop - Fix indentation from previous revert --- pkg/dmsg/server.go | 2 +- pkg/dmsg/server_session.go | 13 +++++++++++++ 2 files changed, 14 insertions(+), 1 deletion(-) diff --git a/pkg/dmsg/server.go b/pkg/dmsg/server.go index 46c3eb1b..8cb22513 100644 --- a/pkg/dmsg/server.go +++ b/pkg/dmsg/server.go @@ -150,7 +150,7 @@ func (s *Server) Serve(lis net.Listener, addr string) error { return err } - if s.SessionCount() >= s.maxSessions { + if s.SessionCount() >= s.maxSessions { s.log. WithField("max_sessions", s.maxSessions). WithField("remote_tcp", conn.RemoteAddr()). diff --git a/pkg/dmsg/server_session.go b/pkg/dmsg/server_session.go index 8c70b9ff..800e8cdf 100644 --- a/pkg/dmsg/server_session.go +++ b/pkg/dmsg/server_session.go @@ -138,6 +138,14 @@ func (ss *ServerSession) Serve() { // struct func (ss *ServerSession) serveStream(log logrus.FieldLogger, yStr io.ReadWriteCloser, addr net.Addr) error { + // Set a deadline for the initial stream request read so a slow or + // malicious client cannot hold a goroutine and semaphore slot indefinitely. + if conn, ok := yStr.(net.Conn); ok { + if err := conn.SetReadDeadline(time.Now().Add(HandshakeTimeout)); err != nil { + return fmt.Errorf("set read deadline: %w", err) + } + } + readRequest := func() (StreamRequest, error) { obj, err := ss.readObject(yStr) if err != nil { @@ -220,6 +228,11 @@ func (ss *ServerSession) serveStream(log logrus.FieldLogger, yStr io.ReadWriteCl } log.Debug("Forwarded stream response.") + // Clear the read deadline before the long-lived bidirectional copy. + if conn, ok := yStr.(net.Conn); ok { + conn.SetReadDeadline(time.Time{}) //nolint:errcheck,gosec + } + // Serve stream. log.Info("Serving stream.") ss.m.RecordStream(metrics.DeltaConnect) // record successful stream From 8a56e9f749a76dde0167783c6dc787ad4e8ab806 Mon Sep 17 00:00:00 2001 From: Moses Narrow <36607567+0pcom@users.noreply.github.com> Date: Thu, 26 Mar 2026 13:23:32 -0500 Subject: [PATCH 4/4] Ensure pprof HTTP server remains responsive under high load Run the pprof HTTP server on a dedicated OS thread via runtime.LockOSThread() and bump GOMAXPROCS by 1 to reserve a thread for it. This ensures the kernel scheduler gives pprof CPU time even when the Go runtime is saturated with thousands of stream-handling goroutines, which is exactly when pprof is needed most to diagnose the problem. --- pkg/cmdutil/pprof.go | 85 ++++++++++++++++++++++++-------------------- 1 file changed, 46 insertions(+), 39 deletions(-) diff --git a/pkg/cmdutil/pprof.go b/pkg/cmdutil/pprof.go index 99095e0e..f08999ed 100644 --- a/pkg/cmdutil/pprof.go +++ b/pkg/cmdutil/pprof.go @@ -21,30 +21,7 @@ func InitPProf(log *logging.Logger, mode string, addr string) func() { //nolint: switch mode { case "http": - go func() { - mux := http.NewServeMux() - mux.HandleFunc("/debug/pprof/", pprof.Index) - mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline) - mux.HandleFunc("/debug/pprof/profile", pprof.Profile) - mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol) - mux.HandleFunc("/debug/pprof/trace", pprof.Trace) - - for _, profile := range []string{"heap", "goroutine", "threadcreate", "block", "mutex", "allocs"} { - mux.Handle("/debug/pprof/"+profile, pprof.Handler(profile)) - } - - srv := &http.Server{ - Addr: addr, - Handler: mux, - ReadHeaderTimeout: 5 * time.Second, - WriteTimeout: 30 * time.Second, - } - log.Infof("Serving pprof on http://%s", addr) - if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed { - log.Errorf("pprof http server failed: %v", err) - } - }() - + startPProfHTTP(log, addr, false) time.Sleep(100 * time.Millisecond) return noop @@ -122,21 +99,7 @@ func InitPProf(log *logging.Logger, mode string, addr string) func() { //nolint: } case "trace": - go func() { - mux := http.NewServeMux() - mux.HandleFunc("/debug/pprof/trace", pprof.Trace) - srv := &http.Server{ - Addr: addr, - Handler: mux, - ReadHeaderTimeout: 5 * time.Second, - WriteTimeout: 60 * time.Second, - } - log.Infof("Serving trace endpoint on http://%s/debug/pprof/trace", addr) - if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed { - log.Errorf("pprof trace server failed: %v", err) - } - }() - + startPProfHTTP(log, addr, true) time.Sleep(100 * time.Millisecond) return noop @@ -149,3 +112,47 @@ func InitPProf(log *logging.Logger, mode string, addr string) func() { //nolint: return noop } + +// startPProfHTTP starts a pprof HTTP server on a dedicated OS thread. +// Locking the goroutine to its own thread ensures the kernel scheduler +// gives it CPU time even when the Go runtime is saturated with goroutines, +// which is exactly when pprof is needed most. +func startPProfHTTP(log *logging.Logger, addr string, traceOnly bool) { + // Reserve an extra OS thread for the pprof server so it doesn't + // compete with application goroutines for GOMAXPROCS slots. + runtime.GOMAXPROCS(runtime.GOMAXPROCS(0) + 1) + + go func() { + // Pin this goroutine to a dedicated OS thread so the kernel + // scheduler guarantees it CPU time independent of Go's + // cooperative goroutine scheduler. + runtime.LockOSThread() + + mux := http.NewServeMux() + if traceOnly { + mux.HandleFunc("/debug/pprof/trace", pprof.Trace) + log.Infof("Serving trace endpoint on http://%s/debug/pprof/trace (dedicated thread)", addr) + } else { + mux.HandleFunc("/debug/pprof/", pprof.Index) + mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline) + mux.HandleFunc("/debug/pprof/profile", pprof.Profile) + mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol) + mux.HandleFunc("/debug/pprof/trace", pprof.Trace) + + for _, profile := range []string{"heap", "goroutine", "threadcreate", "block", "mutex", "allocs"} { + mux.Handle("/debug/pprof/"+profile, pprof.Handler(profile)) + } + log.Infof("Serving pprof on http://%s (dedicated thread)", addr) + } + + srv := &http.Server{ + Addr: addr, + Handler: mux, + ReadHeaderTimeout: 5 * time.Second, + WriteTimeout: 30 * time.Second, + } + if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed { + log.Errorf("pprof http server failed: %v", err) + } + }() +}