From 7a5719bbc7e7d150302600f7c8ff889803030a43 Mon Sep 17 00:00:00 2001 From: Moses Narrow <36607567+0pcom@users.noreply.github.com> Date: Fri, 27 Mar 2026 15:28:20 -0500 Subject: [PATCH] Add idle timeout to DMSG streams to prevent ephemeral port exhaustion MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Streams that complete the handshake but never receive data would block in smux.waitRead indefinitely, holding their ephemeral port forever. Over time this exhausts all ~16K ports (49152-65535) on the Porter, causing "ephemeral port space exhausted" errors for new streams. Fix by adding a 2-minute idle timeout (StreamIdleTimeout) that is: - Set as a read deadline after the stream handshake completes - Refreshed on every successful read, so active streams are unaffected - Applied on both initiating (DialStream) and responding (acceptStream) Stale streams will time out, the caller gets an error, and the stream is closed — releasing its ephemeral port back to the pool. --- pkg/dmsg/client_session.go | 16 ++++++++++++---- pkg/dmsg/stream.go | 12 ++++++++++-- pkg/dmsg/types.go | 5 +++++ 3 files changed, 27 insertions(+), 6 deletions(-) diff --git a/pkg/dmsg/client_session.go b/pkg/dmsg/client_session.go index 3a4f2edad..bfe8f24d9 100644 --- a/pkg/dmsg/client_session.go +++ b/pkg/dmsg/client_session.go @@ -61,8 +61,12 @@ func (cs *ClientSession) DialStream(dst Addr) (dStr *Stream, err error) { return nil, err } - // Clear deadline. - if err = dStr.SetDeadline(time.Time{}); err != nil { + // Set idle timeout — refreshed on each successful read. + if err = dStr.SetReadDeadline(time.Now().Add(StreamIdleTimeout)); err != nil { + return nil, err + } + // Clear the write deadline so writes are not affected. + if err = dStr.SetWriteDeadline(time.Time{}); err != nil { return nil, err } @@ -169,8 +173,12 @@ func (cs *ClientSession) acceptStream() (dStr *Stream, err error) { return nil, err } - // Clear deadline. - if err = dStr.SetDeadline(time.Time{}); err != nil { + // Set idle timeout — refreshed on each successful read. + if err = dStr.SetReadDeadline(time.Now().Add(StreamIdleTimeout)); err != nil { + return nil, err + } + // Clear the write deadline so writes are not affected. + if err = dStr.SetWriteDeadline(time.Time{}); err != nil { return nil, err } diff --git a/pkg/dmsg/stream.go b/pkg/dmsg/stream.go index d6a2927ae..bb70240b3 100644 --- a/pkg/dmsg/stream.go +++ b/pkg/dmsg/stream.go @@ -325,9 +325,17 @@ func (s *Stream) StreamID() uint32 { return s.yStr.StreamID() } -// Read implements io.Reader +// Read implements io.Reader. +// It refreshes the idle timeout on each successful read so that active +// streams are never killed, while stale streams stuck in waitRead with +// no incoming data will time out and release their ephemeral port. func (s *Stream) Read(b []byte) (int, error) { - return s.nsConn.Read(b) + n, err := s.nsConn.Read(b) + if n > 0 { + // Reset the read deadline on successful read to keep the stream alive. + s.SetReadDeadline(time.Now().Add(StreamIdleTimeout)) //nolint:errcheck,gosec + } + return n, err } // Write implements io.Writer diff --git a/pkg/dmsg/types.go b/pkg/dmsg/types.go index d3839c8ac..f57a9337d 100644 --- a/pkg/dmsg/types.go +++ b/pkg/dmsg/types.go @@ -24,6 +24,11 @@ var ( // HandshakeTimeout defines the duration a stream handshake should take. HandshakeTimeout = time.Second * 20 + // StreamIdleTimeout defines how long a stream can be idle (no reads) + // before it is considered stale and closed. This prevents streams stuck + // in waitRead from holding ephemeral ports indefinitely. + StreamIdleTimeout = 2 * time.Minute + // AcceptBufferSize defines the size of the accepts buffer. AcceptBufferSize = 20 )