diff --git a/internal/server/server.go b/internal/server/server.go index 9262a6c..9ee0e41 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -1184,9 +1184,65 @@ func (s *Server) defaultForwardRequestWithBodyFunc(w http.ResponseWriter, ctx co return err } +// Liveness-detection knobs for the proxied WebSocket. Vars (not consts) so +// tests can shorten them. wsIdleTimeout is the maximum gap between any frames +// (data, ping, pong) received from the upstream before we declare the +// connection dead and tear it down. wsWriteTimeout caps any single forwarded +// data write so a slow downstream peer can't stall the proxy or starve the +// idle detector on the other leg. +var ( + wsIdleTimeout = 90 * time.Second + wsWriteTimeout = 30 * time.Second + wsControlWriteTimeout = 10 * time.Second +) + +// wsBackendIdleError marks a read-deadline timeout from the backend leg. +// This is the only error condition that unambiguously means "upstream went +// silent"; other timeouts (e.g. a write to a slow client) are downstream +// backpressure and must not be attributed to the upstream endpoint. +type wsBackendIdleError struct{ err error } + +func (e *wsBackendIdleError) Error() string { return e.err.Error() } +func (e *wsBackendIdleError) Unwrap() error { return e.err } + +// installControlForwarders installs ping/pong handlers on src that forward +// the frame to dst via WriteControl, instead of letting gorilla auto-pong +// inbound pings on src. This makes the proxy transparent for WS control +// frames: a client's ping reaches the upstream RPC node, and the upstream's +// pong reaches the client, proving the full chain. +// +// If bumpDeadline is true, every received control frame also extends src's +// read deadline by wsIdleTimeout — used for the upstream side so quiet +// pong/ping traffic from a healthy upstream resets the idle backstop. +// +// A WriteControl failure on dst is returned from the handler, which causes +// src.ReadMessage to return the same error and triggers the existing +// teardown path. +func installControlForwarders(src, dst *websocket.Conn, bumpDeadline bool) { + src.SetPingHandler(func(appData string) error { + if bumpDeadline { + _ = src.SetReadDeadline(time.Now().Add(wsIdleTimeout)) + } + return dst.WriteControl(websocket.PingMessage, + []byte(appData), time.Now().Add(wsControlWriteTimeout)) + }) + src.SetPongHandler(func(appData string) error { + if bumpDeadline { + _ = src.SetReadDeadline(time.Now().Add(wsIdleTimeout)) + } + return dst.WriteControl(websocket.PongMessage, + []byte(appData), time.Now().Add(wsControlWriteTimeout)) + }) +} + // proxyWebSocketCopy copies messages from src to dst, forwarding close frames // to the destination so both peers receive a proper WebSocket close handshake. -func proxyWebSocketCopy(src, dst *websocket.Conn) error { +// Each forwarded write is bounded by wsWriteTimeout so a slow peer can't +// stall the goroutine indefinitely. If bumpSrcDeadline is true, the src read +// deadline is reset to wsIdleTimeout AFTER every successful end-to-end +// forward, so a slow downstream write doesn't shorten the next read budget +// and cause a false "upstream idle" timeout. +func proxyWebSocketCopy(src, dst *websocket.Conn, bumpSrcDeadline bool) error { for { msgType, msg, err := src.ReadMessage() if err != nil { @@ -1202,11 +1258,26 @@ func proxyWebSocketCopy(src, dst *websocket.Conn) error { _ = dst.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(code, closeErr.Text)) } + // Tag a backend-leg read-deadline timeout as the genuine + // upstream-idle signal. Write timeouts (or read timeouts on + // the client leg, which has no deadline today) must not be + // misattributed to the upstream — they are caught by the + // caller's isExpectedWSClose path and don't mark unhealthy. + if bumpSrcDeadline { + var netErr net.Error + if errors.As(err, &netErr) && netErr.Timeout() { + return &wsBackendIdleError{err: err} + } + } return err } + _ = dst.SetWriteDeadline(time.Now().Add(wsWriteTimeout)) if err := dst.WriteMessage(msgType, msg); err != nil { return err } + if bumpSrcDeadline { + _ = src.SetReadDeadline(time.Now().Add(wsIdleTimeout)) + } } } @@ -1307,15 +1378,30 @@ func (s *Server) defaultProxyWebSocket(w http.ResponseWriter, r *http.Request, b backendConn.Close() return err } - // Proxy messages in both directions + + // Forward WS control frames in both directions instead of letting + // gorilla auto-pong inbound pings on each side. This makes liveness + // checks transparent end-to-end: a client ping reaches the upstream RPC + // node, and the upstream pong reaches the client. + // + // On the backend conn we also bump the read deadline on every received + // frame and seed it now. If the upstream goes silent for wsIdleTimeout + // (no data, ping, or pong), ReadMessage returns a Timeout error and the + // teardown below propagates a close frame to the client so it knows to + // reconnect. The client conn intentionally has no read deadline — silent + // clients are fine; only upstream silence is the failure mode. + installControlForwarders(clientConn, backendConn, false) + installControlForwarders(backendConn, clientConn, true) + _ = backendConn.SetReadDeadline(time.Now().Add(wsIdleTimeout)) + + // Proxy messages in both directions. The backend leg also resets the + // backend read deadline on each received data frame. errc := make(chan error, 2) go func() { - err := proxyWebSocketCopy(clientConn, backendConn) - errc <- err + errc <- proxyWebSocketCopy(clientConn, backendConn, false) }() go func() { - err := proxyWebSocketCopy(backendConn, clientConn) - errc <- err + errc <- proxyWebSocketCopy(backendConn, clientConn, true) }() // Wait for one direction to fail/close, then immediately close both // connections so the other goroutine unblocks and finishes cleanly. @@ -1324,8 +1410,24 @@ func (s *Server) defaultProxyWebSocket(w http.ResponseWriter, r *http.Request, b backendConn.Close() <-errc // wait for the second goroutine to finish - // Mark endpoint as unhealthy for WS if error is not a normal closure if err != nil { + // Backend leg's read deadline fired = upstream sent nothing for + // wsIdleTimeout. Only this specific error is attributed to the + // upstream. Other timeouts (downstream write blocking, control + // frame forwarding) are caught by isExpectedWSClose below and + // don't mark the endpoint unhealthy. + var idleErr *wsBackendIdleError + if errors.As(err, &idleErr) { + log.Warn(). + Err(err). + Str("endpoint", helpers.RedactAPIKey(backendURL)). + Dur("idle_timeout", wsIdleTimeout). + Msg("WebSocket upstream idle timeout, marking endpoint unhealthy") + if chain, endpointID, found := s.findChainAndEndpointByURL(backendURL); found { + s.markEndpointUnhealthyProtocol(chain, endpointID, "ws") + } + return err + } if isExpectedWSClose(err) { if closeErr, ok := err.(*websocket.CloseError); ok && closeErr.Code == websocket.CloseAbnormalClosure { log.Debug().Err(err).Str("endpoint", helpers.RedactAPIKey(backendURL)).Msg("WebSocket connection closed abnormally (1006), not counting as failure") diff --git a/internal/server/server_test.go b/internal/server/server_test.go index 0568233..1423432 100644 --- a/internal/server/server_test.go +++ b/internal/server/server_test.go @@ -6,6 +6,7 @@ import ( "io" "net/http" "net/http/httptest" + "strings" "testing" "time" @@ -499,6 +500,202 @@ func TestIsExpectedWSClose(t *testing.T) { } } +// TestProxyWebSocket_UpstreamIdleTimeout verifies that when the upstream +// stops sending any frames (data, ping, or pong) for longer than +// wsIdleTimeout, the proxy tears down the client connection and marks the +// endpoint unhealthy. This is the regression test for the silent-subscription +// bug where eth_subscribe stops delivering newHeads but the client connection +// stays open indefinitely. +func TestProxyWebSocket_UpstreamIdleTimeout(t *testing.T) { + origTimeout := wsIdleTimeout + wsIdleTimeout = 300 * time.Millisecond + t.Cleanup(func() { wsIdleTimeout = origTimeout }) + + // Silent upstream: complete the WS handshake, then do nothing — no + // reads (so no auto-pong on inbound pings), no writes, no close frame. + upgrader := websocket.Upgrader{CheckOrigin: func(*http.Request) bool { return true }} + upstream := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + c, err := upgrader.Upgrade(w, r, nil) + if err != nil { + return + } + <-r.Context().Done() + c.Close() + })) + t.Cleanup(upstream.Close) + + upstreamWSURL := "ws" + strings.TrimPrefix(upstream.URL, "http") + + cfg := &config.Config{ + Endpoints: map[string]config.ChainEndpoints{ + "chainX": { + "ep1": config.Endpoint{Provider: "ep1", WSURL: upstreamWSURL, Role: "primary", Type: "full"}, + }, + }, + } + valkeyClient := store.NewMockValkeyClient() + valkeyClient.PopulateStatuses(map[string]*store.EndpointStatus{ + "chainX:ep1": {HasWS: true, HealthyWS: true}, + }) + srv := NewServer(cfg, valkeyClient, createTestConfig()) + + proxy := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + _ = srv.defaultProxyWebSocket(w, r, upstreamWSURL) + })) + t.Cleanup(proxy.Close) + + proxyWSURL := "ws" + strings.TrimPrefix(proxy.URL, "http") + client, resp, err := websocket.DefaultDialer.Dial(proxyWSURL, nil) + if err != nil { + t.Fatalf("client dial failed: %v", err) + } + if resp != nil && resp.Body != nil { + _ = resp.Body.Close() + } + defer client.Close() + + // Give the proxy a generous read deadline; we expect it to hang up well + // before this fires. + _ = client.SetReadDeadline(time.Now().Add(wsIdleTimeout + 2*time.Second)) + start := time.Now() + _, _, err = client.ReadMessage() + elapsed := time.Since(start) + + if err == nil { + t.Fatalf("expected client ReadMessage to fail after upstream idle timeout, got nil after %s", elapsed) + } + if elapsed < wsIdleTimeout { + t.Errorf("client torn down too early: elapsed=%s wsIdleTimeout=%s", elapsed, wsIdleTimeout) + } + if elapsed > wsIdleTimeout+2*time.Second { + t.Errorf("client torn down too late: elapsed=%s wsIdleTimeout=%s", elapsed, wsIdleTimeout) + } + + // The proxy should mark the upstream endpoint unhealthy. The status + // write happens just before defaultProxyWebSocket returns, after both + // goroutines have joined; allow a brief moment for it to land. + deadline := time.Now().Add(500 * time.Millisecond) + for { + status, _ := valkeyClient.GetEndpointStatus(context.Background(), "chainX", "ep1") + if !status.HealthyWS { + break + } + if time.Now().After(deadline) { + t.Errorf("expected HealthyWS=false after upstream idle timeout, still true") + break + } + time.Sleep(10 * time.Millisecond) + } +} + +// TestProxyWebSocket_PingPongForwarded verifies that a client ping is +// forwarded all the way to the upstream and the upstream's pong is forwarded +// back to the client (rather than aetherlay auto-ponging at the proxy +// layer). Payload bytes must round-trip unchanged. +func TestProxyWebSocket_PingPongForwarded(t *testing.T) { + origTimeout := wsIdleTimeout + wsIdleTimeout = 5 * time.Second + t.Cleanup(func() { wsIdleTimeout = origTimeout }) + + pingReceived := make(chan string, 1) + upgrader := websocket.Upgrader{CheckOrigin: func(*http.Request) bool { return true }} + upstream := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + c, err := upgrader.Upgrade(w, r, nil) + if err != nil { + return + } + // Override default ping handler so we can capture the payload AND + // reply with a pong carrying the same payload (default behavior, but + // explicit here). + c.SetPingHandler(func(appData string) error { + select { + case pingReceived <- appData: + default: + } + return c.WriteControl(websocket.PongMessage, + []byte(appData), time.Now().Add(time.Second)) + }) + // Drive the read loop so the ping handler actually fires. + for { + if _, _, err := c.ReadMessage(); err != nil { + return + } + } + })) + t.Cleanup(upstream.Close) + + upstreamWSURL := "ws" + strings.TrimPrefix(upstream.URL, "http") + + cfg := &config.Config{ + Endpoints: map[string]config.ChainEndpoints{ + "chainX": { + "ep1": config.Endpoint{Provider: "ep1", WSURL: upstreamWSURL, Role: "primary", Type: "full"}, + }, + }, + } + valkeyClient := store.NewMockValkeyClient() + valkeyClient.PopulateStatuses(map[string]*store.EndpointStatus{ + "chainX:ep1": {HasWS: true, HealthyWS: true}, + }) + srv := NewServer(cfg, valkeyClient, createTestConfig()) + + proxy := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + _ = srv.defaultProxyWebSocket(w, r, upstreamWSURL) + })) + t.Cleanup(proxy.Close) + + proxyWSURL := "ws" + strings.TrimPrefix(proxy.URL, "http") + client, resp, err := websocket.DefaultDialer.Dial(proxyWSURL, nil) + if err != nil { + t.Fatalf("client dial failed: %v", err) + } + if resp != nil && resp.Body != nil { + _ = resp.Body.Close() + } + defer client.Close() + + pongReceived := make(chan string, 1) + client.SetPongHandler(func(appData string) error { + select { + case pongReceived <- appData: + default: + } + return nil + }) + // Read loop required to drive the pong handler. + go func() { + for { + if _, _, err := client.ReadMessage(); err != nil { + return + } + } + }() + + payload := "abc-correlation-123" + if err := client.WriteControl(websocket.PingMessage, + []byte(payload), time.Now().Add(time.Second)); err != nil { + t.Fatalf("client ping write failed: %v", err) + } + + select { + case got := <-pingReceived: + if got != payload { + t.Errorf("upstream got ping payload %q, want %q", got, payload) + } + case <-time.After(2 * time.Second): + t.Fatal("upstream never received the forwarded ping (proxy is auto-ponging instead of forwarding)") + } + + select { + case got := <-pongReceived: + if got != payload { + t.Errorf("client got pong payload %q, want %q", got, payload) + } + case <-time.After(2 * time.Second): + t.Fatal("client never received the forwarded pong from upstream") + } +} + // TestMarkEndpointUnhealthy_HTTP tests marking an endpoint unhealthy for HTTP. func TestMarkEndpointUnhealthy_HTTP(t *testing.T) { cfg := &config.Config{