From 7e59ce45136915f46d835e66c9ac9332fcd6abd9 Mon Sep 17 00:00:00 2001 From: Jacob Sussmilch Date: Fri, 15 May 2026 10:04:32 +1000 Subject: [PATCH 1/2] fix: Detect upstream WebSocket silence on proxied subscriptions Long-lived WebSocket subscriptions (eth_subscribe newHeads) could stop delivering messages while the proxied connection stayed open, leaving clients with a valid socket and no signal to reconnect. The previous proxy used a transparent byte-pipe with no read deadline and let gorilla auto-pong inbound pings, so a half-open upstream TCP or a wedged provider subscription dispatcher never surfaced as an error on either leg. This change: - Forwards WS ping/pong frames end-to-end in both directions instead of auto-ponging at the proxy. A client ping reaches the upstream RPC node and the upstream's pong reaches the client, with payload bytes preserved for any client-side correlation. - Sets a 90s read deadline on the upstream conn that resets on every received frame (data, ping, pong). If upstream stops sending anything for that long, ReadMessage returns a timeout, both legs are torn down, and the client receives a close so its existing reconnect logic can run. - Marks the endpoint unhealthy on idle-timeout teardown, since the read deadline only fires on the backend conn so the timeout is unambiguously an upstream failure. Tests added for the upstream-silence teardown path and for end-to-end ping/pong forwarding (asserts payload round-trips, proving the proxy is not auto-replying locally). --- internal/server/server.go | 88 +++++++++++++-- internal/server/server_test.go | 191 +++++++++++++++++++++++++++++++++ 2 files changed, 272 insertions(+), 7 deletions(-) diff --git a/internal/server/server.go b/internal/server/server.go index 9262a6c..b1962cc 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -1184,9 +1184,51 @@ func (s *Server) defaultForwardRequestWithBodyFunc(w http.ResponseWriter, ctx co return err } +// Liveness-detection knobs for the proxied WebSocket. Vars (not consts) so +// tests can shorten them. wsIdleTimeout is the maximum gap between any frames +// (data, ping, pong) received from the upstream before we declare the +// connection dead and tear it down. +var ( + wsIdleTimeout = 90 * time.Second + wsControlWriteTimeout = 10 * time.Second +) + +// installControlForwarders installs ping/pong handlers on src that forward +// the frame to dst via WriteControl, instead of letting gorilla auto-pong +// inbound pings on src. This makes the proxy transparent for WS control +// frames: a client's ping reaches the upstream RPC node, and the upstream's +// pong reaches the client, proving the full chain. +// +// If bumpDeadline is true, every received control frame also extends src's +// read deadline by wsIdleTimeout — used for the upstream side so quiet +// pong/ping traffic from a healthy upstream resets the idle backstop. +// +// A WriteControl failure on dst is returned from the handler, which causes +// src.ReadMessage to return the same error and triggers the existing +// teardown path. +func installControlForwarders(src, dst *websocket.Conn, bumpDeadline bool) { + src.SetPingHandler(func(appData string) error { + if bumpDeadline { + _ = src.SetReadDeadline(time.Now().Add(wsIdleTimeout)) + } + return dst.WriteControl(websocket.PingMessage, + []byte(appData), time.Now().Add(wsControlWriteTimeout)) + }) + src.SetPongHandler(func(appData string) error { + if bumpDeadline { + _ = src.SetReadDeadline(time.Now().Add(wsIdleTimeout)) + } + return dst.WriteControl(websocket.PongMessage, + []byte(appData), time.Now().Add(wsControlWriteTimeout)) + }) +} + // proxyWebSocketCopy copies messages from src to dst, forwarding close frames // to the destination so both peers receive a proper WebSocket close handshake. -func proxyWebSocketCopy(src, dst *websocket.Conn) error { +// If bumpSrcDeadline is true, the src read deadline is reset to +// wsIdleTimeout from now on every successful read, so that data traffic from +// a healthy upstream keeps the idle backstop from firing. +func proxyWebSocketCopy(src, dst *websocket.Conn, bumpSrcDeadline bool) error { for { msgType, msg, err := src.ReadMessage() if err != nil { @@ -1204,6 +1246,9 @@ func proxyWebSocketCopy(src, dst *websocket.Conn) error { } return err } + if bumpSrcDeadline { + _ = src.SetReadDeadline(time.Now().Add(wsIdleTimeout)) + } if err := dst.WriteMessage(msgType, msg); err != nil { return err } @@ -1307,15 +1352,30 @@ func (s *Server) defaultProxyWebSocket(w http.ResponseWriter, r *http.Request, b backendConn.Close() return err } - // Proxy messages in both directions + + // Forward WS control frames in both directions instead of letting + // gorilla auto-pong inbound pings on each side. This makes liveness + // checks transparent end-to-end: a client ping reaches the upstream RPC + // node, and the upstream pong reaches the client. + // + // On the backend conn we also bump the read deadline on every received + // frame and seed it now. If the upstream goes silent for wsIdleTimeout + // (no data, ping, or pong), ReadMessage returns a Timeout error and the + // teardown below propagates a close frame to the client so it knows to + // reconnect. The client conn intentionally has no read deadline — silent + // clients are fine; only upstream silence is the failure mode. + installControlForwarders(clientConn, backendConn, false) + installControlForwarders(backendConn, clientConn, true) + _ = backendConn.SetReadDeadline(time.Now().Add(wsIdleTimeout)) + + // Proxy messages in both directions. The backend leg also resets the + // backend read deadline on each received data frame. errc := make(chan error, 2) go func() { - err := proxyWebSocketCopy(clientConn, backendConn) - errc <- err + errc <- proxyWebSocketCopy(clientConn, backendConn, false) }() go func() { - err := proxyWebSocketCopy(backendConn, clientConn) - errc <- err + errc <- proxyWebSocketCopy(backendConn, clientConn, true) }() // Wait for one direction to fail/close, then immediately close both // connections so the other goroutine unblocks and finishes cleanly. @@ -1324,8 +1384,22 @@ func (s *Server) defaultProxyWebSocket(w http.ResponseWriter, r *http.Request, b backendConn.Close() <-errc // wait for the second goroutine to finish - // Mark endpoint as unhealthy for WS if error is not a normal closure if err != nil { + // Read deadline only fires on the backend conn, so any timeout error + // here means upstream went silent. Mark the endpoint unhealthy so + // the next connection attempt picks a different one. + var netErr net.Error + if errors.As(err, &netErr) && netErr.Timeout() { + log.Warn(). + Err(err). + Str("endpoint", helpers.RedactAPIKey(backendURL)). + Dur("idle_timeout", wsIdleTimeout). + Msg("WebSocket upstream idle timeout, marking endpoint unhealthy") + if chain, endpointID, found := s.findChainAndEndpointByURL(backendURL); found { + s.markEndpointUnhealthyProtocol(chain, endpointID, "ws") + } + return err + } if isExpectedWSClose(err) { if closeErr, ok := err.(*websocket.CloseError); ok && closeErr.Code == websocket.CloseAbnormalClosure { log.Debug().Err(err).Str("endpoint", helpers.RedactAPIKey(backendURL)).Msg("WebSocket connection closed abnormally (1006), not counting as failure") diff --git a/internal/server/server_test.go b/internal/server/server_test.go index 0568233..1f2bfd5 100644 --- a/internal/server/server_test.go +++ b/internal/server/server_test.go @@ -6,6 +6,7 @@ import ( "io" "net/http" "net/http/httptest" + "strings" "testing" "time" @@ -499,6 +500,196 @@ func TestIsExpectedWSClose(t *testing.T) { } } +// TestProxyWebSocket_UpstreamIdleTimeout verifies that when the upstream +// stops sending any frames (data, ping, or pong) for longer than +// wsIdleTimeout, the proxy tears down the client connection and marks the +// endpoint unhealthy. This is the regression test for the silent-subscription +// bug where eth_subscribe stops delivering newHeads but the client connection +// stays open indefinitely. +func TestProxyWebSocket_UpstreamIdleTimeout(t *testing.T) { + origTimeout := wsIdleTimeout + wsIdleTimeout = 300 * time.Millisecond + t.Cleanup(func() { wsIdleTimeout = origTimeout }) + + // Silent upstream: complete the WS handshake, then do nothing — no + // reads (so no auto-pong on inbound pings), no writes, no close frame. + upgrader := websocket.Upgrader{CheckOrigin: func(*http.Request) bool { return true }} + upstream := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + c, err := upgrader.Upgrade(w, r, nil) + if err != nil { + return + } + <-r.Context().Done() + c.Close() + })) + t.Cleanup(upstream.Close) + + upstreamWSURL := "ws" + strings.TrimPrefix(upstream.URL, "http") + + cfg := &config.Config{ + Endpoints: map[string]config.ChainEndpoints{ + "chainX": { + "ep1": config.Endpoint{Provider: "ep1", WSURL: upstreamWSURL, Role: "primary", Type: "full"}, + }, + }, + } + valkeyClient := store.NewMockValkeyClient() + valkeyClient.PopulateStatuses(map[string]*store.EndpointStatus{ + "chainX:ep1": {HasWS: true, HealthyWS: true}, + }) + srv := NewServer(cfg, valkeyClient, createTestConfig()) + + proxy := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + _ = srv.defaultProxyWebSocket(w, r, upstreamWSURL) + })) + t.Cleanup(proxy.Close) + + proxyWSURL := "ws" + strings.TrimPrefix(proxy.URL, "http") + client, _, err := websocket.DefaultDialer.Dial(proxyWSURL, nil) + if err != nil { + t.Fatalf("client dial failed: %v", err) + } + defer client.Close() + + // Give the proxy a generous read deadline; we expect it to hang up well + // before this fires. + _ = client.SetReadDeadline(time.Now().Add(wsIdleTimeout + 2*time.Second)) + start := time.Now() + _, _, err = client.ReadMessage() + elapsed := time.Since(start) + + if err == nil { + t.Fatalf("expected client ReadMessage to fail after upstream idle timeout, got nil after %s", elapsed) + } + if elapsed < wsIdleTimeout { + t.Errorf("client torn down too early: elapsed=%s wsIdleTimeout=%s", elapsed, wsIdleTimeout) + } + if elapsed > wsIdleTimeout+2*time.Second { + t.Errorf("client torn down too late: elapsed=%s wsIdleTimeout=%s", elapsed, wsIdleTimeout) + } + + // The proxy should mark the upstream endpoint unhealthy. The status + // write happens just before defaultProxyWebSocket returns, after both + // goroutines have joined; allow a brief moment for it to land. + deadline := time.Now().Add(500 * time.Millisecond) + for { + status, _ := valkeyClient.GetEndpointStatus(context.Background(), "chainX", "ep1") + if !status.HealthyWS { + break + } + if time.Now().After(deadline) { + t.Errorf("expected HealthyWS=false after upstream idle timeout, still true") + break + } + time.Sleep(10 * time.Millisecond) + } +} + +// TestProxyWebSocket_PingPongForwarded verifies that a client ping is +// forwarded all the way to the upstream and the upstream's pong is forwarded +// back to the client (rather than aetherlay auto-ponging at the proxy +// layer). Payload bytes must round-trip unchanged. +func TestProxyWebSocket_PingPongForwarded(t *testing.T) { + origTimeout := wsIdleTimeout + wsIdleTimeout = 5 * time.Second + t.Cleanup(func() { wsIdleTimeout = origTimeout }) + + pingReceived := make(chan string, 1) + upgrader := websocket.Upgrader{CheckOrigin: func(*http.Request) bool { return true }} + upstream := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + c, err := upgrader.Upgrade(w, r, nil) + if err != nil { + return + } + // Override default ping handler so we can capture the payload AND + // reply with a pong carrying the same payload (default behavior, but + // explicit here). + c.SetPingHandler(func(appData string) error { + select { + case pingReceived <- appData: + default: + } + return c.WriteControl(websocket.PongMessage, + []byte(appData), time.Now().Add(time.Second)) + }) + // Drive the read loop so the ping handler actually fires. + for { + if _, _, err := c.ReadMessage(); err != nil { + return + } + } + })) + t.Cleanup(upstream.Close) + + upstreamWSURL := "ws" + strings.TrimPrefix(upstream.URL, "http") + + cfg := &config.Config{ + Endpoints: map[string]config.ChainEndpoints{ + "chainX": { + "ep1": config.Endpoint{Provider: "ep1", WSURL: upstreamWSURL, Role: "primary", Type: "full"}, + }, + }, + } + valkeyClient := store.NewMockValkeyClient() + valkeyClient.PopulateStatuses(map[string]*store.EndpointStatus{ + "chainX:ep1": {HasWS: true, HealthyWS: true}, + }) + srv := NewServer(cfg, valkeyClient, createTestConfig()) + + proxy := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + _ = srv.defaultProxyWebSocket(w, r, upstreamWSURL) + })) + t.Cleanup(proxy.Close) + + proxyWSURL := "ws" + strings.TrimPrefix(proxy.URL, "http") + client, _, err := websocket.DefaultDialer.Dial(proxyWSURL, nil) + if err != nil { + t.Fatalf("client dial failed: %v", err) + } + defer client.Close() + + pongReceived := make(chan string, 1) + client.SetPongHandler(func(appData string) error { + select { + case pongReceived <- appData: + default: + } + return nil + }) + // Read loop required to drive the pong handler. + go func() { + for { + if _, _, err := client.ReadMessage(); err != nil { + return + } + } + }() + + payload := "abc-correlation-123" + if err := client.WriteControl(websocket.PingMessage, + []byte(payload), time.Now().Add(time.Second)); err != nil { + t.Fatalf("client ping write failed: %v", err) + } + + select { + case got := <-pingReceived: + if got != payload { + t.Errorf("upstream got ping payload %q, want %q", got, payload) + } + case <-time.After(2 * time.Second): + t.Fatal("upstream never received the forwarded ping (proxy is auto-ponging instead of forwarding)") + } + + select { + case got := <-pongReceived: + if got != payload { + t.Errorf("client got pong payload %q, want %q", got, payload) + } + case <-time.After(2 * time.Second): + t.Fatal("client never received the forwarded pong from upstream") + } +} + // TestMarkEndpointUnhealthy_HTTP tests marking an endpoint unhealthy for HTTP. func TestMarkEndpointUnhealthy_HTTP(t *testing.T) { cfg := &config.Config{ From 2ed16f83587e15ef4f9cfb89feee4d06f745121e Mon Sep 17 00:00:00 2001 From: Jacob Sussmilch Date: Fri, 15 May 2026 11:28:35 +1000 Subject: [PATCH 2/2] fix: Don't misattribute downstream backpressure to upstream silence Addresses CodeRabbit feedback on PR #35. Without a write deadline, dst.WriteMessage on the backend leg could block arbitrarily long when the client was slow draining its socket. Bumping the src read deadline before the write meant the next ReadMessage would time out instantly after a slow write completed, marking a healthy upstream unhealthy. Three changes in proxyWebSocketCopy / defaultProxyWebSocket: - Cap each forwarded write with wsWriteTimeout (30s) via SetWriteDeadline, so a slow peer can't stall the proxy. - Move the src read-deadline bump to AFTER the write succeeds. The semantic becomes "no upstream activity since the last successful end-to-end forward," which removes the false positive when downstream is slow. - Tag backend-leg read-deadline timeouts with a wsBackendIdleError sentinel. Only this error marks the upstream endpoint unhealthy. Other timeouts (write deadlines, control-frame forwarding failures, client leg read errors) fall through to the existing isExpectedWSClose path and don't cause endpoint health changes. Also closed *http.Response bodies returned from websocket.DefaultDialer.Dial in the two new tests (golangci-lint bodyclose). --- internal/server/server.go | 52 ++++++++++++++++++++++++++-------- internal/server/server_test.go | 10 +++++-- 2 files changed, 48 insertions(+), 14 deletions(-) diff --git a/internal/server/server.go b/internal/server/server.go index b1962cc..9ee0e41 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -1187,12 +1187,24 @@ func (s *Server) defaultForwardRequestWithBodyFunc(w http.ResponseWriter, ctx co // Liveness-detection knobs for the proxied WebSocket. Vars (not consts) so // tests can shorten them. wsIdleTimeout is the maximum gap between any frames // (data, ping, pong) received from the upstream before we declare the -// connection dead and tear it down. +// connection dead and tear it down. wsWriteTimeout caps any single forwarded +// data write so a slow downstream peer can't stall the proxy or starve the +// idle detector on the other leg. var ( wsIdleTimeout = 90 * time.Second + wsWriteTimeout = 30 * time.Second wsControlWriteTimeout = 10 * time.Second ) +// wsBackendIdleError marks a read-deadline timeout from the backend leg. +// This is the only error condition that unambiguously means "upstream went +// silent"; other timeouts (e.g. a write to a slow client) are downstream +// backpressure and must not be attributed to the upstream endpoint. +type wsBackendIdleError struct{ err error } + +func (e *wsBackendIdleError) Error() string { return e.err.Error() } +func (e *wsBackendIdleError) Unwrap() error { return e.err } + // installControlForwarders installs ping/pong handlers on src that forward // the frame to dst via WriteControl, instead of letting gorilla auto-pong // inbound pings on src. This makes the proxy transparent for WS control @@ -1225,9 +1237,11 @@ func installControlForwarders(src, dst *websocket.Conn, bumpDeadline bool) { // proxyWebSocketCopy copies messages from src to dst, forwarding close frames // to the destination so both peers receive a proper WebSocket close handshake. -// If bumpSrcDeadline is true, the src read deadline is reset to -// wsIdleTimeout from now on every successful read, so that data traffic from -// a healthy upstream keeps the idle backstop from firing. +// Each forwarded write is bounded by wsWriteTimeout so a slow peer can't +// stall the goroutine indefinitely. If bumpSrcDeadline is true, the src read +// deadline is reset to wsIdleTimeout AFTER every successful end-to-end +// forward, so a slow downstream write doesn't shorten the next read budget +// and cause a false "upstream idle" timeout. func proxyWebSocketCopy(src, dst *websocket.Conn, bumpSrcDeadline bool) error { for { msgType, msg, err := src.ReadMessage() @@ -1244,14 +1258,26 @@ func proxyWebSocketCopy(src, dst *websocket.Conn, bumpSrcDeadline bool) error { _ = dst.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(code, closeErr.Text)) } + // Tag a backend-leg read-deadline timeout as the genuine + // upstream-idle signal. Write timeouts (or read timeouts on + // the client leg, which has no deadline today) must not be + // misattributed to the upstream — they are caught by the + // caller's isExpectedWSClose path and don't mark unhealthy. + if bumpSrcDeadline { + var netErr net.Error + if errors.As(err, &netErr) && netErr.Timeout() { + return &wsBackendIdleError{err: err} + } + } return err } - if bumpSrcDeadline { - _ = src.SetReadDeadline(time.Now().Add(wsIdleTimeout)) - } + _ = dst.SetWriteDeadline(time.Now().Add(wsWriteTimeout)) if err := dst.WriteMessage(msgType, msg); err != nil { return err } + if bumpSrcDeadline { + _ = src.SetReadDeadline(time.Now().Add(wsIdleTimeout)) + } } } @@ -1385,11 +1411,13 @@ func (s *Server) defaultProxyWebSocket(w http.ResponseWriter, r *http.Request, b <-errc // wait for the second goroutine to finish if err != nil { - // Read deadline only fires on the backend conn, so any timeout error - // here means upstream went silent. Mark the endpoint unhealthy so - // the next connection attempt picks a different one. - var netErr net.Error - if errors.As(err, &netErr) && netErr.Timeout() { + // Backend leg's read deadline fired = upstream sent nothing for + // wsIdleTimeout. Only this specific error is attributed to the + // upstream. Other timeouts (downstream write blocking, control + // frame forwarding) are caught by isExpectedWSClose below and + // don't mark the endpoint unhealthy. + var idleErr *wsBackendIdleError + if errors.As(err, &idleErr) { log.Warn(). Err(err). Str("endpoint", helpers.RedactAPIKey(backendURL)). diff --git a/internal/server/server_test.go b/internal/server/server_test.go index 1f2bfd5..1423432 100644 --- a/internal/server/server_test.go +++ b/internal/server/server_test.go @@ -545,10 +545,13 @@ func TestProxyWebSocket_UpstreamIdleTimeout(t *testing.T) { t.Cleanup(proxy.Close) proxyWSURL := "ws" + strings.TrimPrefix(proxy.URL, "http") - client, _, err := websocket.DefaultDialer.Dial(proxyWSURL, nil) + client, resp, err := websocket.DefaultDialer.Dial(proxyWSURL, nil) if err != nil { t.Fatalf("client dial failed: %v", err) } + if resp != nil && resp.Body != nil { + _ = resp.Body.Close() + } defer client.Close() // Give the proxy a generous read deadline; we expect it to hang up well @@ -642,10 +645,13 @@ func TestProxyWebSocket_PingPongForwarded(t *testing.T) { t.Cleanup(proxy.Close) proxyWSURL := "ws" + strings.TrimPrefix(proxy.URL, "http") - client, _, err := websocket.DefaultDialer.Dial(proxyWSURL, nil) + client, resp, err := websocket.DefaultDialer.Dial(proxyWSURL, nil) if err != nil { t.Fatalf("client dial failed: %v", err) } + if resp != nil && resp.Body != nil { + _ = resp.Body.Close() + } defer client.Close() pongReceived := make(chan string, 1)