diff --git a/pkg/dmsg/client_session.go b/pkg/dmsg/client_session.go index 3a4f2eda..bfe8f24d 100644 --- a/pkg/dmsg/client_session.go +++ b/pkg/dmsg/client_session.go @@ -61,8 +61,12 @@ func (cs *ClientSession) DialStream(dst Addr) (dStr *Stream, err error) { return nil, err } - // Clear deadline. - if err = dStr.SetDeadline(time.Time{}); err != nil { + // Set idle timeout — refreshed on each successful read. + if err = dStr.SetReadDeadline(time.Now().Add(StreamIdleTimeout)); err != nil { + return nil, err + } + // Clear the write deadline so writes are not affected. + if err = dStr.SetWriteDeadline(time.Time{}); err != nil { return nil, err } @@ -169,8 +173,12 @@ func (cs *ClientSession) acceptStream() (dStr *Stream, err error) { return nil, err } - // Clear deadline. - if err = dStr.SetDeadline(time.Time{}); err != nil { + // Set idle timeout — refreshed on each successful read. + if err = dStr.SetReadDeadline(time.Now().Add(StreamIdleTimeout)); err != nil { + return nil, err + } + // Clear the write deadline so writes are not affected. + if err = dStr.SetWriteDeadline(time.Time{}); err != nil { return nil, err } diff --git a/pkg/dmsg/stream.go b/pkg/dmsg/stream.go index d6a2927a..bb70240b 100644 --- a/pkg/dmsg/stream.go +++ b/pkg/dmsg/stream.go @@ -325,9 +325,17 @@ func (s *Stream) StreamID() uint32 { return s.yStr.StreamID() } -// Read implements io.Reader +// Read implements io.Reader. +// It refreshes the idle timeout on each successful read so that active +// streams are never killed, while stale streams stuck in waitRead with +// no incoming data will time out and release their ephemeral port. func (s *Stream) Read(b []byte) (int, error) { - return s.nsConn.Read(b) + n, err := s.nsConn.Read(b) + if n > 0 { + // Reset the read deadline on successful read to keep the stream alive. + s.SetReadDeadline(time.Now().Add(StreamIdleTimeout)) //nolint:errcheck,gosec + } + return n, err } // Write implements io.Writer diff --git a/pkg/dmsg/types.go b/pkg/dmsg/types.go index d3839c8a..f57a9337 100644 --- a/pkg/dmsg/types.go +++ b/pkg/dmsg/types.go @@ -24,6 +24,11 @@ var ( // HandshakeTimeout defines the duration a stream handshake should take. HandshakeTimeout = time.Second * 20 + // StreamIdleTimeout defines how long a stream can be idle (no reads) + // before it is considered stale and closed. This prevents streams stuck + // in waitRead from holding ephemeral ports indefinitely. + StreamIdleTimeout = 2 * time.Minute + // AcceptBufferSize defines the size of the accepts buffer. AcceptBufferSize = 20 )