diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 7d96375..cd944f9 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -74,6 +74,19 @@ jobs: -run "TestHTTP1Parallel|TestH2CParallel|TestAutoProtocolDetection|TestH2CMultipleStreams|TestHTTP1LargeBodyParallel" ./test/spec/... + integration: + name: Integration + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v6 + - uses: actions/setup-go@v6 + with: + go-version: "1.26" + - name: Integration tests + run: go test -v -race -count=1 -timeout=120s ./test/integration/... + - name: Integration tests (constrained) + run: taskset -c 0 go test -v -count=1 -timeout=120s -run TestAdaptiveAutoSingleWorker ./test/integration/... + build: name: Build (${{ matrix.os }}) runs-on: ${{ matrix.os }} diff --git a/adaptive/engine.go b/adaptive/engine.go index 3c78236..479c4c6 100644 --- a/adaptive/engine.go +++ b/adaptive/engine.go @@ -8,7 +8,6 @@ import ( "fmt" "log/slog" "net" - "runtime" "sync" "sync/atomic" "time" @@ -28,56 +27,35 @@ type addrEngine interface { // Engine is an adaptive meta-engine that switches between io_uring and epoll. type Engine struct { - primary addrEngine // io_uring - secondary addrEngine // epoll - active atomic.Pointer[engine.Engine] - ctrl *controller - cfg resource.Config - handler stream.Handler - addr atomic.Pointer[net.Addr] - mu sync.Mutex - frozen atomic.Bool - logger *slog.Logger -} - -// splitResources resolves the full resource config and halves shared resources -// so each sub-engine gets roughly half the total footprint. Per-connection -// settings (BufferSize, SocketRecv, SocketSend) are unchanged. -func splitResources(cfg resource.Config) (iouringCfg, epollCfg resource.Config) { - full := cfg.Resources.Resolve(runtime.NumCPU()) - half := resource.Resources{ - Preset: cfg.Resources.Preset, - Workers: max(full.Workers/2, 1), - SQERingSize: max(full.SQERingSize/2, 1024), - BufferPool: max(full.BufferPool/2, 256), - MaxEvents: max(full.MaxEvents/2, 128), - MaxConns: max(full.MaxConns/2, 256), - BufferSize: cfg.Resources.BufferSize, - SocketRecv: cfg.Resources.SocketRecv, - SocketSend: cfg.Resources.SocketSend, - } - iouringCfg = cfg - iouringCfg.Resources = half - epollCfg = cfg - epollCfg.Resources = half - return + primary addrEngine // io_uring + secondary addrEngine // epoll + active atomic.Pointer[engine.Engine] + ctrl *controller + cfg resource.Config + handler stream.Handler + addr atomic.Pointer[net.Addr] + mu sync.Mutex + switchMu sync.Mutex // protects evaluate + performSwitch coordination + frozen atomic.Bool + logger *slog.Logger + suppressFreeze func(time.Duration) } // New creates a new adaptive engine with io_uring as primary and epoll as secondary. +// Both sub-engines get the full resource config. This is safe because standby +// workers are fully suspended (zero CPU, zero connections, listen sockets closed). func New(cfg resource.Config, handler stream.Handler) (*Engine, error) { cfg = cfg.WithDefaults() if errs := cfg.Validate(); len(errs) > 0 { return nil, fmt.Errorf("config validation: %w", errs[0]) } - iouringCfg, epollCfg := splitResources(cfg) - - primary, err := iouring.New(iouringCfg, handler) + primary, err := iouring.New(cfg, handler) if err != nil { return nil, fmt.Errorf("io_uring sub-engine: %w", err) } - secondary, err := epoll.New(epollCfg, handler) + secondary, err := epoll.New(cfg, handler) if err != nil { return nil, fmt.Errorf("epoll sub-engine: %w", err) } @@ -162,7 +140,8 @@ func (e *Engine) Listen(ctx context.Context) error { }) // Wait for both engines to bind their addresses. - deadline := time.Now().Add(5 * time.Second) + // io_uring may need multiple tier fallback attempts, so allow ample time. + deadline := time.Now().Add(20 * time.Second) for time.Now().Before(deadline) { if e.primary.Addr() != nil && e.secondary.Addr() != nil { break @@ -222,7 +201,10 @@ func (e *Engine) runEvalLoop(ctx context.Context) { case <-ctx.Done(): return case now := <-ticker.C: - if e.ctrl.evaluate(now, e.frozen.Load()) { + e.switchMu.Lock() + shouldSwitch := e.ctrl.evaluate(now, e.frozen.Load()) + e.switchMu.Unlock() + if shouldSwitch { e.performSwitch() } } @@ -235,6 +217,7 @@ func (e *Engine) performSwitch() { now := time.Now() + e.switchMu.Lock() var newActive, newStandby addrEngine if e.ctrl.state.activeIsPrimary { // Switching: primary → secondary. @@ -245,23 +228,34 @@ func (e *Engine) performSwitch() { newActive = e.primary newStandby = e.secondary } + e.switchMu.Unlock() - // Pause standby (was active), resume new active. - if ac, ok := newStandby.(engine.AcceptController); ok { - _ = ac.PauseAccept() - } + // Resume new active BEFORE pausing old — this creates a brief overlap + // where both engines listen (via SO_REUSEPORT), which is correct. The + // alternative (pause first) creates a window where NEITHER listens, + // because io_uring ASYNC_CANCEL and epoll listen socket re-creation + // are asynchronous. if ac, ok := newActive.(engine.AcceptController); ok { _ = ac.ResumeAccept() } + if ac, ok := newStandby.(engine.AcceptController); ok { + _ = ac.PauseAccept() + } var eng engine.Engine = newActive e.active.Store(&eng) + e.switchMu.Lock() e.ctrl.recordSwitch(now) + e.switchMu.Unlock() e.logger.Info("engine switch completed", "now_active", newActive.Type().String(), "now_standby", newStandby.Type().String(), ) + + if e.suppressFreeze != nil { + e.suppressFreeze(5 * time.Second) + } } // Shutdown gracefully shuts down both sub-engines. @@ -314,3 +308,16 @@ func (e *Engine) UnfreezeSwitching() { func (e *Engine) ActiveEngine() engine.Engine { return *e.active.Load() } + +// SetFreezeSuppressor registers a callback to suppress overload freeze +// during engine switches. +func (e *Engine) SetFreezeSuppressor(fn func(time.Duration)) { + e.mu.Lock() + defer e.mu.Unlock() + e.suppressFreeze = fn +} + +// ForceSwitch triggers an immediate engine switch (for testing). +func (e *Engine) ForceSwitch() { + e.performSwitch() +} diff --git a/adaptive/engine_test.go b/adaptive/engine_test.go index e3662bb..e0d30c0 100644 --- a/adaptive/engine_test.go +++ b/adaptive/engine_test.go @@ -283,50 +283,6 @@ func TestOverloadFreeze(t *testing.T) { } } -func TestResourceSplit(t *testing.T) { - tests := []struct { - workers int - wantWorkers int - }{ - {1, 1}, - {2, 1}, - {4, 2}, - {8, 4}, - {16, 8}, - } - for _, tt := range tests { - cfg := resource.Config{ - Resources: resource.Resources{ - Workers: tt.workers, - }, - } - ioCfg, epCfg := splitResources(cfg) - - if ioCfg.Resources.Workers != tt.wantWorkers { - t.Errorf("workers=%d: io_uring workers = %d, want %d", tt.workers, ioCfg.Resources.Workers, tt.wantWorkers) - } - if epCfg.Resources.Workers != tt.wantWorkers { - t.Errorf("workers=%d: epoll workers = %d, want %d", tt.workers, epCfg.Resources.Workers, tt.wantWorkers) - } - - // Both sub-engine configs should be identical. - if ioCfg.Resources != epCfg.Resources { - t.Errorf("workers=%d: io_uring and epoll resources differ", tt.workers) - } - - // Per-connection settings should be unchanged. - if ioCfg.Resources.BufferSize != cfg.Resources.BufferSize { - t.Errorf("workers=%d: BufferSize changed", tt.workers) - } - if ioCfg.Resources.SocketRecv != cfg.Resources.SocketRecv { - t.Errorf("workers=%d: SocketRecv changed", tt.workers) - } - if ioCfg.Resources.SocketSend != cfg.Resources.SocketSend { - t.Errorf("workers=%d: SocketSend changed", tt.workers) - } - } -} - func TestHistoricalScoreDecay(t *testing.T) { primary := newMockEngine(engine.IOUring) secondary := newMockEngine(engine.Epoll) @@ -416,3 +372,40 @@ func TestSwitchAfterActiveDegrades(t *testing.T) { t.Error("expected switch when active degrades below standby historical") } } + +func TestSwitchTriggersFreezeSuppression(t *testing.T) { + primary := newMockEngine(engine.IOUring) + secondary := newMockEngine(engine.Epoll) + sampler := newSyntheticSampler() + + cfg := resource.Config{Protocol: engine.HTTP1} + e := newFromEngines(primary, secondary, sampler, cfg) + e.ctrl.cooldown = 0 + e.ctrl.minObserve = 0 + + var suppressDuration time.Duration + var suppressCalls int + e.SetFreezeSuppressor(func(d time.Duration) { + suppressDuration = d + suppressCalls++ + }) + + // Pre-seed standby historical score. + now := time.Now() + e.ctrl.state.lastActiveScore[engine.Epoll] = 200 + e.ctrl.state.lastActiveTime[engine.Epoll] = now + + sampler.Set(engine.IOUring, TelemetrySnapshot{ThroughputRPS: 100}) + + if !e.ctrl.evaluate(now, false) { + t.Fatal("expected switch") + } + e.performSwitch() + + if suppressCalls != 1 { + t.Errorf("expected 1 suppressFreeze call, got %d", suppressCalls) + } + if suppressDuration != 5*time.Second { + t.Errorf("expected 5s suppression, got %v", suppressDuration) + } +} diff --git a/engine/epoll/engine.go b/engine/epoll/engine.go index feca833..e109368 100644 --- a/engine/epoll/engine.go +++ b/engine/epoll/engine.go @@ -110,8 +110,20 @@ func (e *Engine) PauseAccept() error { } // ResumeAccept starts accepting new connections again. +// Wakes any suspended loops so they re-create listen sockets. func (e *Engine) ResumeAccept() error { e.acceptPaused.Store(false) + e.mu.Lock() + defer e.mu.Unlock() + for _, l := range e.loops { + l.wakeMu.Lock() + if l.suspended.Load() { + close(l.wake) + l.wake = make(chan struct{}) + l.suspended.Store(false) + } + l.wakeMu.Unlock() + } return nil } diff --git a/engine/epoll/loop.go b/engine/epoll/loop.go index 62b8f2a..ee0f1de 100644 --- a/engine/epoll/loop.go +++ b/engine/epoll/loop.go @@ -8,6 +8,7 @@ import ( "log/slog" "net" "runtime" + "sync" "sync/atomic" "golang.org/x/sys/unix" @@ -36,6 +37,9 @@ type Loop struct { cfg resource.Config logger *slog.Logger acceptPaused *atomic.Bool + wake chan struct{} + wakeMu sync.Mutex + suspended atomic.Bool reqCount *atomic.Uint64 activeConns *atomic.Int64 @@ -80,6 +84,7 @@ func newLoop(id, cpuID int, handler stream.Handler, cfg: cfg, logger: cfg.Logger, acceptPaused: acceptPaused, + wake: make(chan struct{}), sockOpts: sockopts.Options{ TCPNoDelay: objective.TCPNoDelay, TCPQuickAck: objective.TCPQuickAck, @@ -112,8 +117,35 @@ func (l *Loop) run(ctx context.Context) { default: } + // ACTIVE → DRAINING: close listen socket to leave SO_REUSEPORT group. + if l.listenFD >= 0 && l.acceptPaused.Load() { + _ = unix.EpollCtl(l.epollFD, unix.EPOLL_CTL_DEL, l.listenFD, nil) + _ = unix.Close(l.listenFD) + l.listenFD = -1 + } + + // SUSPENDED → ACTIVE: re-create listen socket after ResumeAccept. + if l.listenFD < 0 && !l.acceptPaused.Load() { + fd, err := createListenSocket(l.cfg.Addr) + if err != nil { + l.logger.Error("re-create listen socket", "loop", l.id, "err", err) + l.shutdown() + return + } + if err := unix.EpollCtl(l.epollFD, unix.EPOLL_CTL_ADD, fd, &unix.EpollEvent{ + Events: unix.EPOLLIN | unix.EPOLLET, + Fd: int32(fd), + }); err != nil { + l.logger.Error("epoll_ctl re-add listen", "loop", l.id, "err", err) + _ = unix.Close(fd) + l.shutdown() + return + } + l.listenFD = fd + } + timeoutMs := activeTimeoutMs - if l.acceptPaused.Load() && len(l.conns) == 0 { + if l.listenFD < 0 { timeoutMs = 500 } @@ -130,7 +162,7 @@ func (l *Loop) run(ctx context.Context) { ev := &l.events[i] fd := int(ev.Fd) - if fd == l.listenFD { + if fd == l.listenFD && l.listenFD >= 0 { l.acceptAll(ctx) continue } @@ -155,14 +187,30 @@ func (l *Loop) run(ctx context.Context) { cs.pendingBytes = 0 } } + + // DRAINING → SUSPENDED: no listen socket, no connections, events processed. + if l.listenFD < 0 && len(l.conns) == 0 && l.acceptPaused.Load() { + l.wakeMu.Lock() + if !l.acceptPaused.Load() { + l.wakeMu.Unlock() + continue + } + l.suspended.Store(true) + wake := l.wake + l.wakeMu.Unlock() + + select { + case <-wake: + case <-ctx.Done(): + l.shutdown() + return + } + continue + } } } func (l *Loop) acceptAll(ctx context.Context) { - if l.acceptPaused.Load() { - return - } - for { newFD, _, err := unix.Accept4(l.listenFD, unix.SOCK_NONBLOCK|unix.SOCK_CLOEXEC) if err != nil { @@ -318,7 +366,9 @@ func (l *Loop) shutdown() { cs.cancel() _ = unix.Close(fd) } - _ = unix.Close(l.listenFD) + if l.listenFD >= 0 { + _ = unix.Close(l.listenFD) + } _ = unix.Close(l.epollFD) } diff --git a/engine/iouring/consts.go b/engine/iouring/consts.go index 09693fe..c546641 100644 --- a/engine/iouring/consts.go +++ b/engine/iouring/consts.go @@ -31,6 +31,7 @@ const ( opACCEPT = 13 opRECV = 27 opSEND = 26 + opASYNCCANCEL = 14 opCLOSE = 19 opPROVIDEBUFFERS = 31 ) @@ -52,6 +53,12 @@ const ( acceptMultishot = 1 << 0 ) +// Async cancel flags. +const ( + cancelFD = 1 << 1 // IORING_ASYNC_CANCEL_FD + cancelAll = 1 << 0 // IORING_ASYNC_CANCEL_ALL +) + // Mmap offsets for ring buffers. const ( offSQRing = 0 diff --git a/engine/iouring/engine.go b/engine/iouring/engine.go index fe14d4c..dc75241 100644 --- a/engine/iouring/engine.go +++ b/engine/iouring/engine.go @@ -202,8 +202,20 @@ func (e *Engine) PauseAccept() error { } // ResumeAccept starts accepting new connections again. +// Wakes any suspended workers so they re-create listen sockets. func (e *Engine) ResumeAccept() error { e.acceptPaused.Store(false) + e.mu.Lock() + defer e.mu.Unlock() + for _, w := range e.workers { + w.wakeMu.Lock() + if w.suspended.Load() { + close(w.wake) + w.wake = make(chan struct{}) + w.suspended.Store(false) + } + w.wakeMu.Unlock() + } return nil } diff --git a/engine/iouring/sqe.go b/engine/iouring/sqe.go index 879597b..872f89f 100644 --- a/engine/iouring/sqe.go +++ b/engine/iouring/sqe.go @@ -53,6 +53,16 @@ func prepClose(sqePtr unsafe.Pointer, fd int) { *(*int32)(unsafe.Pointer(&sqe[4])) = int32(fd) } +// prepCancelFD cancels all pending io_uring operations on a file descriptor. +// This releases the kernel's io_uring reference to the underlying file, +// allowing listen sockets to leave the SO_REUSEPORT group immediately. +func prepCancelFD(sqePtr unsafe.Pointer, fd int) { + sqe := (*[sqeSize]byte)(sqePtr) + sqe[0] = opASYNCCANCEL + *(*int32)(unsafe.Pointer(&sqe[4])) = int32(fd) + *(*uint32)(unsafe.Pointer(&sqe[28])) = cancelFD | cancelAll +} + func prepProvideBuffers(sqePtr unsafe.Pointer, addr unsafe.Pointer, bufLen int, count int, groupID uint16, bufID uint16) { sqe := (*[sqeSize]byte)(sqePtr) sqe[0] = opPROVIDEBUFFERS diff --git a/engine/iouring/worker.go b/engine/iouring/worker.go index 371f777..24c2b32 100644 --- a/engine/iouring/worker.go +++ b/engine/iouring/worker.go @@ -8,6 +8,7 @@ import ( "log/slog" "net" "runtime" + "sync" "sync/atomic" "time" @@ -39,6 +40,9 @@ type Worker struct { cfg resource.Config ready chan error acceptPaused *atomic.Bool + wake chan struct{} + wakeMu sync.Mutex + suspended atomic.Bool reqCount *atomic.Uint64 activeConns *atomic.Int64 @@ -72,6 +76,7 @@ func newWorker(id, cpuID int, tier TierStrategy, handler stream.Handler, activeConns: activeConns, errCount: errCount, acceptPaused: acceptPaused, + wake: make(chan struct{}), ready: make(chan error, 1), sockOpts: sockopts.Options{ TCPNoDelay: objective.TCPNoDelay, @@ -118,10 +123,54 @@ func (w *Worker) run(ctx context.Context) { return } + // ACTIVE → DRAINING: cancel pending io_uring operations on the listen + // socket, then close it. The cancel releases the kernel's io_uring + // reference to the underlying file, allowing the socket to leave the + // SO_REUSEPORT group immediately. Without this, unix.Close alone + // leaves a phantom socket that intercepts connections. + if w.listenFD >= 0 && w.acceptPaused.Load() { + if sqe := w.ring.GetSQE(); sqe != nil { + prepCancelFD(sqe, w.listenFD) + setSQEUserData(sqe, 0) + // Submit and wait for the cancel to complete before closing. + _ = w.ring.SubmitAndWaitTimeout(50 * time.Millisecond) + // Process CQEs: skip cancel completions (userData=0), + // handle everything else normally to avoid breaking + // active connections. + for { + entry := w.ring.peekCQE() + if entry == nil { + break + } + if entry.UserData != 0 { + w.processCQE(ctx, entry) + } + w.ring.AdvanceCQ() + } + } + _ = unix.Close(w.listenFD) + w.listenFD = -1 + } + + // SUSPENDED → ACTIVE: re-create listen socket after ResumeAccept. + if w.listenFD < 0 && !w.acceptPaused.Load() { + fd, err := createListenSocket(w.cfg.Addr) + if err != nil { + w.logger.Error("re-create listen socket", "worker", w.id, "err", err) + w.shutdown() + return + } + w.listenFD = fd + w.tier.PrepareAccept(w.ring, w.listenFD) + if _, err := w.ring.Submit(); err != nil { + w.logger.Error("submit after listen re-create", "worker", w.id, "err", err) + } + } + // Non-blocking peek for CQEs, fall back to timed wait. if w.ring.peekCQE() == nil { waitTimeout := 100 * time.Millisecond - if w.acceptPaused.Load() && len(w.conns) == 0 { + if w.listenFD < 0 { waitTimeout = 1 * time.Second } if err := w.ring.SubmitAndWaitTimeout(waitTimeout); err != nil { @@ -151,6 +200,28 @@ func (w *Worker) run(ctx context.Context) { if _, err := w.ring.Submit(); err != nil { w.logger.Error("submit failed", "worker", w.id, "err", err) } + + // DRAINING → SUSPENDED: no listen socket, no connections, CQEs processed. + // Checked after CQE processing so accept CQEs for connections that + // completed before the listen socket close are served, not leaked. + if w.listenFD < 0 && len(w.conns) == 0 && w.acceptPaused.Load() { + w.wakeMu.Lock() + if !w.acceptPaused.Load() { + w.wakeMu.Unlock() + continue + } + w.suspended.Store(true) + wake := w.wake + w.wakeMu.Unlock() + + select { + case <-wake: + case <-ctx.Done(): + w.shutdown() + return + } + continue + } } } @@ -172,24 +243,20 @@ func (w *Worker) processCQE(ctx context.Context, c *completionEntry) { } } -func (w *Worker) handleAccept(ctx context.Context, c *completionEntry, listenFD int) { +func (w *Worker) handleAccept(ctx context.Context, c *completionEntry, _ int) { if c.Res < 0 { w.errCount.Add(1) - if !w.tier.SupportsMultishotAccept() { - w.tier.PrepareAccept(w.ring, listenFD) + if w.listenFD >= 0 && !w.tier.SupportsMultishotAccept() { + w.tier.PrepareAccept(w.ring, w.listenFD) } return } newFD := int(c.Res) - if w.acceptPaused.Load() { - _ = unix.Close(newFD) - if !cqeHasMore(c.Flags) && !w.tier.SupportsMultishotAccept() { - w.tier.PrepareAccept(w.ring, listenFD) - } - return - } + // Don't discard accepted connections even when paused — the TCP handshake + // already completed and the client expects a response. The listen socket + // will be closed within one event loop iteration to prevent further accepts. _ = sockopts.ApplyFD(newFD, w.sockOpts) @@ -206,8 +273,8 @@ func (w *Worker) handleAccept(ctx context.Context, c *completionEntry, listenFD // detect the protocol from the received data before processing it. w.tier.PrepareRecv(w.ring, newFD, cs.buf) - if !cqeHasMore(c.Flags) && !w.tier.SupportsMultishotAccept() { - w.tier.PrepareAccept(w.ring, listenFD) + if !cqeHasMore(c.Flags) && !w.tier.SupportsMultishotAccept() && w.listenFD >= 0 { + w.tier.PrepareAccept(w.ring, w.listenFD) } } @@ -357,8 +424,9 @@ func (w *Worker) handleSend(c *completionEntry, fd int) { } func (w *Worker) handleClose(fd int) { + // finishClose already removed from conns and decremented activeConns. + // This is a no-op for the normal path but kept as a safety guard. delete(w.conns, fd) - w.activeConns.Add(-1) } func (w *Worker) closeConn(fd int) { @@ -468,7 +536,9 @@ func (w *Worker) shutdown() { cs.cancel() _ = unix.Close(fd) } - _ = unix.Close(w.listenFD) + if w.listenFD >= 0 { + _ = unix.Close(w.listenFD) + } if w.ring != nil { _ = w.ring.Close() } diff --git a/overload/manager.go b/overload/manager.go index e85ba2a..a38c25c 100644 --- a/overload/manager.go +++ b/overload/manager.go @@ -22,6 +22,7 @@ type Manager struct { escalateAboveSince time.Time deescalateBelowSince time.Time cooldownUntil time.Time + freezeSuppressUntil time.Time baseWorkers int } @@ -47,6 +48,14 @@ func (m *Manager) SetFreezeHook(fn func(frozen bool)) { m.freezeHook = fn } +// SuppressFreeze defers freeze escalation for the given duration. +// Other escalation stages (Expand, Reap, Backpressure, Reject) fire normally. +func (m *Manager) SuppressFreeze(d time.Duration) { + m.mu.Lock() + defer m.mu.Unlock() + m.freezeSuppressUntil = time.Now().Add(d) +} + // Stage returns the current overload stage. func (m *Manager) Stage() Stage { m.mu.Lock() @@ -135,7 +144,7 @@ func (m *Manager) escalateTo(stage Stage, now time.Time) { m.hooks.ReapIdleConnections(30 * time.Second) case Reorder: m.hooks.SetSchedulingMode(true) - if m.freezeHook != nil { + if m.freezeHook != nil && !now.Before(m.freezeSuppressUntil) { m.freezeHook(true) } case Backpressure: diff --git a/overload/manager_test.go b/overload/manager_test.go index 23744a7..12bd722 100644 --- a/overload/manager_test.go +++ b/overload/manager_test.go @@ -140,7 +140,7 @@ func TestStageEscalation(t *testing.T) { for _, tc := range stages { mon.Set(tc.cpu) - runForDuration(t, mgr, 15*time.Millisecond) + runForDuration(t, mgr, 30*time.Millisecond) got := mgr.Stage() if got != tc.want { t.Errorf("cpu=%.2f: got stage %v, want %v", tc.cpu, got, tc.want) @@ -155,14 +155,14 @@ func TestDeescalationHysteresis(t *testing.T) { mgr := NewManager(cfg, mon, hooks, nil) // Escalate to Reorder (stage 3): 0.88 exceeds 0.85 threshold. - runForDuration(t, mgr, 30*time.Millisecond) + runForDuration(t, mgr, 80*time.Millisecond) if mgr.Stage() < Reorder { t.Fatalf("expected at least Reorder, got %v", mgr.Stage()) } // Drop CPU to 0.74 (below de-escalate threshold of 0.75 for Reorder). mon.Set(0.74) - runForDuration(t, mgr, 15*time.Millisecond) + runForDuration(t, mgr, 50*time.Millisecond) got := mgr.Stage() if got >= Reorder { @@ -176,7 +176,7 @@ func Test503AtStage5(t *testing.T) { cfg := fastConfig() mgr := NewManager(cfg, mon, hooks, nil) - runForDuration(t, mgr, 50*time.Millisecond) + runForDuration(t, mgr, 150*time.Millisecond) if mgr.Stage() != Reject { t.Fatalf("expected Reject, got %v", mgr.Stage()) @@ -195,7 +195,7 @@ func TestBackpressureAtStage4(t *testing.T) { cfg := fastConfig() mgr := NewManager(cfg, mon, hooks, nil) - runForDuration(t, mgr, 40*time.Millisecond) + runForDuration(t, mgr, 120*time.Millisecond) if mgr.Stage() < Backpressure { t.Fatalf("expected at least Backpressure, got %v", mgr.Stage()) @@ -215,14 +215,14 @@ func TestRecovery(t *testing.T) { mgr := NewManager(cfg, mon, hooks, nil) // Escalate to Reject. - runForDuration(t, mgr, 50*time.Millisecond) + runForDuration(t, mgr, 150*time.Millisecond) if mgr.Stage() != Reject { t.Fatalf("expected Reject, got %v", mgr.Stage()) } // Recover fully. mon.Set(0.20) - runForDuration(t, mgr, 100*time.Millisecond) + runForDuration(t, mgr, 200*time.Millisecond) if mgr.Stage() != Normal { t.Errorf("expected Normal after recovery, got %v", mgr.Stage()) @@ -292,3 +292,94 @@ func TestNoEscalationWithoutSustained(t *testing.T) { t.Errorf("expected Normal (not sustained), got %v", mgr.Stage()) } } + +func TestFreezeSuppression(t *testing.T) { + mon := cpumon.NewSynthetic(0.0) + hooks := newMockHooks() + cfg := fastConfig() + mgr := NewManager(cfg, mon, hooks, nil) + + var frozenMu sync.Mutex + frozen := false + mgr.SetFreezeHook(func(f bool) { + frozenMu.Lock() + frozen = f + frozenMu.Unlock() + }) + + getFrozen := func() bool { + frozenMu.Lock() + defer frozenMu.Unlock() + return frozen + } + + // Suppress freeze for a long duration. + mgr.SuppressFreeze(5 * time.Second) + + // Escalate to Reorder (stage 3). Freeze hook should NOT fire. + mon.Set(0.88) + runForDuration(t, mgr, 80*time.Millisecond) + + if mgr.Stage() < Reorder { + t.Fatalf("expected at least Reorder, got %v", mgr.Stage()) + } + if getFrozen() { + t.Error("freeze hook should be suppressed during suppression period") + } +} + +func TestFreezeFiringAfterSuppressionExpiry(t *testing.T) { + mon := cpumon.NewSynthetic(0.0) + hooks := newMockHooks() + cfg := fastConfig() + mgr := NewManager(cfg, mon, hooks, nil) + + var frozenMu sync.Mutex + frozen := false + mgr.SetFreezeHook(func(f bool) { + frozenMu.Lock() + frozen = f + frozenMu.Unlock() + }) + + getFrozen := func() bool { + frozenMu.Lock() + defer frozenMu.Unlock() + return frozen + } + + // Suppress for a very short duration (already expired). + mgr.SuppressFreeze(0) + + // Escalate to Reorder. Freeze hook SHOULD fire (suppression expired). + mon.Set(0.88) + runForDuration(t, mgr, 80*time.Millisecond) + + if mgr.Stage() < Reorder { + t.Fatalf("expected at least Reorder, got %v", mgr.Stage()) + } + if !getFrozen() { + t.Error("freeze hook should fire after suppression expiry") + } +} + +func TestSuppressDoesNotBlockOtherStages(t *testing.T) { + mon := cpumon.NewSynthetic(0.0) + hooks := newMockHooks() + cfg := fastConfig() + mgr := NewManager(cfg, mon, hooks, nil) + + // Suppress freeze for a long duration. + mgr.SuppressFreeze(5 * time.Second) + + // Escalate to Reject (stage 5). All stages should fire except freeze. + mon.Set(0.97) + runForDuration(t, mgr, 150*time.Millisecond) + + if mgr.Stage() != Reject { + t.Fatalf("expected Reject, got %v", mgr.Stage()) + } + if !hooks.getAcceptPaused() { + t.Error("SetAcceptPaused should still be called at Reject stage") + } +} diff --git a/resource/config_test.go b/resource/config_test.go index 345ae4f..ec6b54f 100644 --- a/resource/config_test.go +++ b/resource/config_test.go @@ -122,7 +122,7 @@ func TestValidateNegativeTimeouts(t *testing.T) { func TestValidateWorkersBelowMin(t *testing.T) { c := Config{ - Resources: Resources{Workers: -1}, + Resources: Resources{Workers: 1}, } errs := c.Validate() found := false @@ -132,7 +132,7 @@ func TestValidateWorkersBelowMin(t *testing.T) { } } if !found { - t.Error("expected Workers validation error for Workers=-1") + t.Error("expected Workers validation error for Workers=1") } } diff --git a/resource/preset.go b/resource/preset.go index 4d9cbdc..fe50758 100644 --- a/resource/preset.go +++ b/resource/preset.go @@ -2,7 +2,7 @@ package resource // Resource limit constants for validation and clamping. const ( - MinWorkers = 1 + MinWorkers = 2 MaxSQERing = 65536 MinBufferSize = 4096 MaxBufferSize = 262144 diff --git a/resource/preset_test.go b/resource/preset_test.go index bc80dec..47b941b 100644 --- a/resource/preset_test.go +++ b/resource/preset_test.go @@ -81,7 +81,7 @@ func TestResolvePresetMinimal(t *testing.T) { numCPU int workers int }{ - {1, 1}, + {1, MinWorkers}, {4, 2}, {8, 4}, {16, 8}, diff --git a/test/integration/adaptive_hybrid_test.go b/test/integration/adaptive_hybrid_test.go new file mode 100644 index 0000000..3305354 --- /dev/null +++ b/test/integration/adaptive_hybrid_test.go @@ -0,0 +1,371 @@ +//go:build linux + +package integration + +import ( + "context" + "crypto/tls" + "fmt" + "io" + "net" + "net/http" + "sync" + "testing" + "time" + + "github.com/goceleris/celeris/adaptive" + "github.com/goceleris/celeris/engine" + "github.com/goceleris/celeris/engine/epoll" + + "golang.org/x/net/http2" +) + +func TestAdaptiveAutoProtocol(t *testing.T) { + port := freePort(t) + cfg := defaultTestConfig(port, engine.Auto) + cfg.Engine = engine.Adaptive + cfg.Resources.Workers = 2 + + e, err := adaptive.New(cfg, &echoHandler{}) + if err != nil { + t.Skipf("adaptive engine not available: %v", err) + } + + startEngine(t, e) + + addr := e.Addr().String() + + // H1 requests. + h1Client := &http.Client{Timeout: 3 * time.Second} + for range 5 { + resp, err := h1Client.Get("http://" + addr + "/h1test") + if err != nil { + t.Fatalf("H1 request failed: %v", err) + } + body, _ := io.ReadAll(resp.Body) + _ = resp.Body.Close() + if resp.StatusCode != 200 { + t.Errorf("H1: got status %d, body: %s", resp.StatusCode, body) + } + } + + // H2C requests. + h2c := h2cClient(addr) + for range 5 { + resp, err := h2c.Get("http://" + addr + "/h2test") + if err != nil { + t.Fatalf("H2C request failed: %v", err) + } + body, _ := io.ReadAll(resp.Body) + _ = resp.Body.Close() + if resp.StatusCode != 200 { + t.Errorf("H2C: got status %d, body: %s", resp.StatusCode, body) + } + } + + // Mixed parallel. + var wg sync.WaitGroup + errs := make(chan error, 10) + for range 5 { + wg.Add(2) + go func() { + defer wg.Done() + resp, err := h1Client.Get("http://" + addr + "/mixed-h1") + if err != nil { + errs <- fmt.Errorf("H1 parallel: %w", err) + return + } + _ = resp.Body.Close() + }() + go func() { + defer wg.Done() + resp, err := h2c.Get("http://" + addr + "/mixed-h2") + if err != nil { + errs <- fmt.Errorf("H2C parallel: %w", err) + return + } + _ = resp.Body.Close() + }() + } + wg.Wait() + close(errs) + for err := range errs { + t.Error(err) + } +} + +func TestAdaptiveAutoSingleWorker(t *testing.T) { + port := freePort(t) + cfg := defaultTestConfig(port, engine.Auto) + cfg.Engine = engine.Adaptive + cfg.Resources.Workers = 0 // will default based on NumCPU + cfg.Resources.SQERingSize = 1024 + + e, err := adaptive.New(cfg, &echoHandler{}) + if err != nil { + t.Skipf("adaptive engine not available: %v", err) + } + + startEngine(t, e) + + addr := e.Addr().String() + + // H1 request. + h1Client := &http.Client{Timeout: 3 * time.Second} + resp, err := h1Client.Get("http://" + addr + "/single-h1") + if err != nil { + t.Fatalf("H1 request failed: %v", err) + } + _ = resp.Body.Close() + if resp.StatusCode != 200 { + t.Errorf("H1: got status %d", resp.StatusCode) + } + + // H2C request. + h2c := h2cClient(addr) + resp, err = h2c.Get("http://" + addr + "/single-h2") + if err != nil { + t.Fatalf("H2C request failed: %v", err) + } + _ = resp.Body.Close() + if resp.StatusCode != 200 { + t.Errorf("H2C: got status %d", resp.StatusCode) + } +} + +func TestAdaptiveSwitchUnderLoad(t *testing.T) { + port := freePort(t) + cfg := defaultTestConfig(port, engine.HTTP1) + cfg.Engine = engine.Adaptive + cfg.Resources.Workers = 2 + + e, err := adaptive.New(cfg, &echoHandler{}) + if err != nil { + t.Skipf("adaptive engine not available: %v", err) + } + + startEngine(t, e) + + addr := e.Addr().String() + + // Open persistent connections. + client := &http.Client{ + Timeout: 5 * time.Second, + Transport: &http.Transport{ + MaxIdleConnsPerHost: 10, + DisableKeepAlives: false, + }, + } + + // Send traffic to establish connections. + for range 10 { + resp, err := client.Get("http://" + addr + "/pre-switch") + if err != nil { + t.Fatalf("pre-switch request failed: %v", err) + } + _ = resp.Body.Close() + } + + initialType := e.ActiveEngine().Type() + + // Close old connections before switching — they're on the active engine's + // sockets and will never drain otherwise (keep-alive). + client.CloseIdleConnections() + time.Sleep(100 * time.Millisecond) + + // Force switch. + e.ForceSwitch() + + newType := e.ActiveEngine().Type() + if newType == initialType { + t.Error("expected engine type to change after ForceSwitch") + } + + // Use a fresh transport — old connections were on the now-standby engine. + // Our H1 response writer sends Connection: close, so every request opens + // a new TCP connection. Retry all requests to handle the brief transition + // window where listen sockets are being transferred between engines. + postClient := &http.Client{ + Timeout: 2 * time.Second, + Transport: &http.Transport{DisableKeepAlives: true}, + } + + successCount := 0 + deadline := time.Now().Add(10 * time.Second) + for successCount < 5 && time.Now().Before(deadline) { + resp, err := postClient.Get("http://" + addr + "/post-switch") + if err != nil { + time.Sleep(50 * time.Millisecond) + continue + } + _ = resp.Body.Close() + if resp.StatusCode != 200 { + t.Errorf("post-switch: got status %d", resp.StatusCode) + } + successCount++ + } + if successCount < 5 { + t.Fatalf("only %d/5 post-switch requests succeeded within deadline", successCount) + } +} + +func TestAdaptiveResourceCleanup(t *testing.T) { + port := freePort(t) + cfg := defaultTestConfig(port, engine.HTTP1) + cfg.Engine = engine.Adaptive + cfg.Resources.Workers = 2 + + ctx, cancel := context.WithCancel(t.Context()) + e, err := adaptive.New(cfg, &echoHandler{}) + if err != nil { + t.Skipf("adaptive engine not available: %v", err) + } + + errCh := make(chan error, 1) + go func() { errCh <- e.Listen(ctx) }() + + // Wait for engine to be ready. + deadline := time.Now().Add(20 * time.Second) + for e.Addr() == nil && time.Now().Before(deadline) { + select { + case err := <-errCh: + cancel() + if err != nil { + t.Skipf("engine failed to start: %v", err) + } + t.Skip("engine Listen returned nil without addr") + default: + } + time.Sleep(10 * time.Millisecond) + } + if e.Addr() == nil { + cancel() + select { + case err := <-errCh: + if err != nil { + t.Skipf("engine failed to start: %v", err) + } + case <-time.After(5 * time.Second): + } + t.Skip("engine did not start in time") + } + + // Readiness probe. + addr := e.Addr().String() + client := &http.Client{Timeout: 2 * time.Second} + resp, probeErr := client.Get("http://" + addr + "/probe") + if probeErr != nil { + cancel() + <-errCh + t.Skipf("engine not functional: %v", probeErr) + } + _ = resp.Body.Close() + + // Send some traffic. + for range 5 { + resp, err := client.Get("http://" + addr + "/cleanup-test") + if err != nil { + t.Fatalf("request failed: %v", err) + } + _ = resp.Body.Close() + } + + // Close transport to release connections. + client.CloseIdleConnections() + + // Allow connections to drain. + time.Sleep(100 * time.Millisecond) + + // Shutdown. + cancel() + <-errCh + + // Verify no negative active connections (which would indicate double-decrement). + m := e.Metrics() + if m.ActiveConnections < 0 { + t.Errorf("negative active connections after shutdown: %d", m.ActiveConnections) + } +} + +func TestAdaptiveConstrainedRing(t *testing.T) { + port := freePort(t) + cfg := defaultTestConfig(port, engine.HTTP1) + cfg.Engine = engine.Adaptive + cfg.Resources.Workers = 2 + cfg.Resources.SQERingSize = 1024 + + e, err := adaptive.New(cfg, &echoHandler{}) + if err != nil { + t.Skipf("adaptive engine not available: %v", err) + } + + startEngine(t, e) + + addr := e.Addr().String() + client := &http.Client{Timeout: 3 * time.Second} + resp, err := client.Get("http://" + addr + "/constrained") + if err != nil { + t.Fatalf("request failed: %v", err) + } + _ = resp.Body.Close() + if resp.StatusCode != 200 { + t.Errorf("got status %d", resp.StatusCode) + } +} + +func TestEpollPauseResume(t *testing.T) { + port := freePort(t) + cfg := defaultTestConfig(port, engine.HTTP1) + + e, err := epoll.New(cfg, &echoHandler{}) + if err != nil { + t.Fatalf("epoll engine: %v", err) + } + + startEngine(t, e) + addr := e.Addr().String() + + // Verify it works initially. + client := &http.Client{Timeout: 3 * time.Second} + resp, err := client.Get("http://" + addr + "/before-pause") + if err != nil { + t.Fatalf("before pause: %v", err) + } + _ = resp.Body.Close() + t.Log("before pause: OK") + + // Pause. + _ = e.PauseAccept() + client.CloseIdleConnections() + t.Log("paused, waiting for loops to suspend...") + time.Sleep(2 * time.Second) + + // Resume. + _ = e.ResumeAccept() + t.Log("resumed, waiting for listen sockets...") + time.Sleep(1 * time.Second) + + // Try again with fresh transport. + postClient := &http.Client{Timeout: 3 * time.Second, Transport: &http.Transport{}} + resp, err = postClient.Get("http://" + addr + "/after-resume") + if err != nil { + t.Fatalf("after resume: %v", err) + } + _ = resp.Body.Close() + if resp.StatusCode != 200 { + t.Errorf("after resume: got status %d", resp.StatusCode) + } + t.Log("after resume: OK") +} + +func h2cClient(addr string) *http.Client { + return &http.Client{ + Timeout: 3 * time.Second, + Transport: &http2.Transport{ + AllowHTTP: true, + DialTLSContext: func(ctx context.Context, network, _ string, _ *tls.Config) (net.Conn, error) { + return (&net.Dialer{}).DialContext(ctx, network, addr) + }, + }, + } +} diff --git a/test/integration/helpers_test.go b/test/integration/helpers_test.go index 773eefb..5c92e78 100644 --- a/test/integration/helpers_test.go +++ b/test/integration/helpers_test.go @@ -65,7 +65,7 @@ func startEngine(t *testing.T, e engine.Engine) { }() if ag, ok := e.(addrGetter); ok { - deadline := time.Now().Add(5 * time.Second) + deadline := time.Now().Add(25 * time.Second) for ag.Addr() == nil && time.Now().Before(deadline) { select { case err := <-errCh: @@ -80,6 +80,14 @@ func startEngine(t *testing.T, e engine.Engine) { } if ag.Addr() == nil { cancel() + // Wait briefly for an error from Listen before giving up. + select { + case err := <-errCh: + if err != nil { + t.Skipf("engine failed to start: %v", err) + } + case <-time.After(5 * time.Second): + } t.Fatal("engine did not start listening") } diff --git a/test/integration/integration_test.go b/test/integration/integration_test.go index 72d2c60..8354f1b 100644 --- a/test/integration/integration_test.go +++ b/test/integration/integration_test.go @@ -117,6 +117,47 @@ func TestOverloadFreezesAdaptive(t *testing.T) { } } +func TestSwitchDuringHighCPU(t *testing.T) { + mon := cpumon.NewSynthetic(0.87) + hooks := &testHooks{workers: 4} + cfg := overload.DefaultConfig() + cfg.Interval = 1 * time.Millisecond + for i := range cfg.Stages { + cfg.Stages[i].EscalateSustained = 3 * time.Millisecond + cfg.Stages[i].DeescalateSustained = 3 * time.Millisecond + cfg.Stages[i].Cooldown = 0 + } + + mgr := overload.NewManager(cfg, mon, hooks, nil) + + var frozenMu sync.Mutex + frozen := false + mgr.SetFreezeHook(func(f bool) { + frozenMu.Lock() + frozen = f + frozenMu.Unlock() + }) + + getFrozen := func() bool { + frozenMu.Lock() + defer frozenMu.Unlock() + return frozen + } + + // Suppress freeze (simulating an adaptive engine switch). + mgr.SuppressFreeze(500 * time.Millisecond) + + // Run at 0.87 CPU — above Reorder threshold (0.85). + runManager(t, mgr, 30*time.Millisecond) + + if mgr.Stage() < overload.Reorder { + t.Fatalf("expected at least Reorder, got %v", mgr.Stage()) + } + if getFrozen() { + t.Error("freeze should be suppressed during adaptive switch grace period") + } +} + func TestStateCombinationMatrix(t *testing.T) { mon := cpumon.NewSynthetic(0.0) hooks := &testHooks{workers: 4}