Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,19 @@ jobs:
-run "TestHTTP1Parallel|TestH2CParallel|TestAutoProtocolDetection|TestH2CMultipleStreams|TestHTTP1LargeBodyParallel"
./test/spec/...

integration:
name: Integration
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v6
- uses: actions/setup-go@v6
with:
go-version: "1.26"
- name: Integration tests
run: go test -v -race -count=1 -timeout=120s ./test/integration/...
- name: Integration tests (constrained)
run: taskset -c 0 go test -v -count=1 -timeout=120s -run TestAdaptiveAutoSingleWorker ./test/integration/...

build:
name: Build (${{ matrix.os }})
runs-on: ${{ matrix.os }}
Expand Down
95 changes: 51 additions & 44 deletions adaptive/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"fmt"
"log/slog"
"net"
"runtime"
"sync"
"sync/atomic"
"time"
Expand All @@ -28,56 +27,35 @@ type addrEngine interface {

// Engine is an adaptive meta-engine that switches between io_uring and epoll.
type Engine struct {
primary addrEngine // io_uring
secondary addrEngine // epoll
active atomic.Pointer[engine.Engine]
ctrl *controller
cfg resource.Config
handler stream.Handler
addr atomic.Pointer[net.Addr]
mu sync.Mutex
frozen atomic.Bool
logger *slog.Logger
}

// splitResources resolves the full resource config and halves shared resources
// so each sub-engine gets roughly half the total footprint. Per-connection
// settings (BufferSize, SocketRecv, SocketSend) are unchanged.
func splitResources(cfg resource.Config) (iouringCfg, epollCfg resource.Config) {
full := cfg.Resources.Resolve(runtime.NumCPU())
half := resource.Resources{
Preset: cfg.Resources.Preset,
Workers: max(full.Workers/2, 1),
SQERingSize: max(full.SQERingSize/2, 1024),
BufferPool: max(full.BufferPool/2, 256),
MaxEvents: max(full.MaxEvents/2, 128),
MaxConns: max(full.MaxConns/2, 256),
BufferSize: cfg.Resources.BufferSize,
SocketRecv: cfg.Resources.SocketRecv,
SocketSend: cfg.Resources.SocketSend,
}
iouringCfg = cfg
iouringCfg.Resources = half
epollCfg = cfg
epollCfg.Resources = half
return
primary addrEngine // io_uring
secondary addrEngine // epoll
active atomic.Pointer[engine.Engine]
ctrl *controller
cfg resource.Config
handler stream.Handler
addr atomic.Pointer[net.Addr]
mu sync.Mutex
switchMu sync.Mutex // protects evaluate + performSwitch coordination
frozen atomic.Bool
logger *slog.Logger
suppressFreeze func(time.Duration)
}

// New creates a new adaptive engine with io_uring as primary and epoll as secondary.
// Both sub-engines get the full resource config. This is safe because standby
// workers are fully suspended (zero CPU, zero connections, listen sockets closed).
func New(cfg resource.Config, handler stream.Handler) (*Engine, error) {
cfg = cfg.WithDefaults()
if errs := cfg.Validate(); len(errs) > 0 {
return nil, fmt.Errorf("config validation: %w", errs[0])
}

iouringCfg, epollCfg := splitResources(cfg)

primary, err := iouring.New(iouringCfg, handler)
primary, err := iouring.New(cfg, handler)
if err != nil {
return nil, fmt.Errorf("io_uring sub-engine: %w", err)
}

secondary, err := epoll.New(epollCfg, handler)
secondary, err := epoll.New(cfg, handler)
if err != nil {
return nil, fmt.Errorf("epoll sub-engine: %w", err)
}
Expand Down Expand Up @@ -162,7 +140,8 @@ func (e *Engine) Listen(ctx context.Context) error {
})

// Wait for both engines to bind their addresses.
deadline := time.Now().Add(5 * time.Second)
// io_uring may need multiple tier fallback attempts, so allow ample time.
deadline := time.Now().Add(20 * time.Second)
for time.Now().Before(deadline) {
if e.primary.Addr() != nil && e.secondary.Addr() != nil {
break
Expand Down Expand Up @@ -222,7 +201,10 @@ func (e *Engine) runEvalLoop(ctx context.Context) {
case <-ctx.Done():
return
case now := <-ticker.C:
if e.ctrl.evaluate(now, e.frozen.Load()) {
e.switchMu.Lock()
shouldSwitch := e.ctrl.evaluate(now, e.frozen.Load())
e.switchMu.Unlock()
if shouldSwitch {
e.performSwitch()
}
}
Expand All @@ -235,6 +217,7 @@ func (e *Engine) performSwitch() {

now := time.Now()

e.switchMu.Lock()
var newActive, newStandby addrEngine
if e.ctrl.state.activeIsPrimary {
// Switching: primary → secondary.
Expand All @@ -245,23 +228,34 @@ func (e *Engine) performSwitch() {
newActive = e.primary
newStandby = e.secondary
}
e.switchMu.Unlock()

// Pause standby (was active), resume new active.
if ac, ok := newStandby.(engine.AcceptController); ok {
_ = ac.PauseAccept()
}
// Resume new active BEFORE pausing old — this creates a brief overlap
// where both engines listen (via SO_REUSEPORT), which is correct. The
// alternative (pause first) creates a window where NEITHER listens,
// because io_uring ASYNC_CANCEL and epoll listen socket re-creation
// are asynchronous.
if ac, ok := newActive.(engine.AcceptController); ok {
_ = ac.ResumeAccept()
}
if ac, ok := newStandby.(engine.AcceptController); ok {
_ = ac.PauseAccept()
}

var eng engine.Engine = newActive
e.active.Store(&eng)
e.switchMu.Lock()
e.ctrl.recordSwitch(now)
e.switchMu.Unlock()

e.logger.Info("engine switch completed",
"now_active", newActive.Type().String(),
"now_standby", newStandby.Type().String(),
)

if e.suppressFreeze != nil {
e.suppressFreeze(5 * time.Second)
}
}

// Shutdown gracefully shuts down both sub-engines.
Expand Down Expand Up @@ -314,3 +308,16 @@ func (e *Engine) UnfreezeSwitching() {
func (e *Engine) ActiveEngine() engine.Engine {
return *e.active.Load()
}

// SetFreezeSuppressor registers a callback to suppress overload freeze
// during engine switches.
func (e *Engine) SetFreezeSuppressor(fn func(time.Duration)) {
e.mu.Lock()
defer e.mu.Unlock()
e.suppressFreeze = fn
}

// ForceSwitch triggers an immediate engine switch (for testing).
func (e *Engine) ForceSwitch() {
e.performSwitch()
}
81 changes: 37 additions & 44 deletions adaptive/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,50 +283,6 @@ func TestOverloadFreeze(t *testing.T) {
}
}

func TestResourceSplit(t *testing.T) {
tests := []struct {
workers int
wantWorkers int
}{
{1, 1},
{2, 1},
{4, 2},
{8, 4},
{16, 8},
}
for _, tt := range tests {
cfg := resource.Config{
Resources: resource.Resources{
Workers: tt.workers,
},
}
ioCfg, epCfg := splitResources(cfg)

if ioCfg.Resources.Workers != tt.wantWorkers {
t.Errorf("workers=%d: io_uring workers = %d, want %d", tt.workers, ioCfg.Resources.Workers, tt.wantWorkers)
}
if epCfg.Resources.Workers != tt.wantWorkers {
t.Errorf("workers=%d: epoll workers = %d, want %d", tt.workers, epCfg.Resources.Workers, tt.wantWorkers)
}

// Both sub-engine configs should be identical.
if ioCfg.Resources != epCfg.Resources {
t.Errorf("workers=%d: io_uring and epoll resources differ", tt.workers)
}

// Per-connection settings should be unchanged.
if ioCfg.Resources.BufferSize != cfg.Resources.BufferSize {
t.Errorf("workers=%d: BufferSize changed", tt.workers)
}
if ioCfg.Resources.SocketRecv != cfg.Resources.SocketRecv {
t.Errorf("workers=%d: SocketRecv changed", tt.workers)
}
if ioCfg.Resources.SocketSend != cfg.Resources.SocketSend {
t.Errorf("workers=%d: SocketSend changed", tt.workers)
}
}
}

func TestHistoricalScoreDecay(t *testing.T) {
primary := newMockEngine(engine.IOUring)
secondary := newMockEngine(engine.Epoll)
Expand Down Expand Up @@ -416,3 +372,40 @@ func TestSwitchAfterActiveDegrades(t *testing.T) {
t.Error("expected switch when active degrades below standby historical")
}
}

func TestSwitchTriggersFreezeSuppression(t *testing.T) {
primary := newMockEngine(engine.IOUring)
secondary := newMockEngine(engine.Epoll)
sampler := newSyntheticSampler()

cfg := resource.Config{Protocol: engine.HTTP1}
e := newFromEngines(primary, secondary, sampler, cfg)
e.ctrl.cooldown = 0
e.ctrl.minObserve = 0

var suppressDuration time.Duration
var suppressCalls int
e.SetFreezeSuppressor(func(d time.Duration) {
suppressDuration = d
suppressCalls++
})

// Pre-seed standby historical score.
now := time.Now()
e.ctrl.state.lastActiveScore[engine.Epoll] = 200
e.ctrl.state.lastActiveTime[engine.Epoll] = now

sampler.Set(engine.IOUring, TelemetrySnapshot{ThroughputRPS: 100})

if !e.ctrl.evaluate(now, false) {
t.Fatal("expected switch")
}
e.performSwitch()

if suppressCalls != 1 {
t.Errorf("expected 1 suppressFreeze call, got %d", suppressCalls)
}
if suppressDuration != 5*time.Second {
t.Errorf("expected 5s suppression, got %v", suppressDuration)
}
}
12 changes: 12 additions & 0 deletions engine/epoll/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,20 @@ func (e *Engine) PauseAccept() error {
}

// ResumeAccept starts accepting new connections again.
// Wakes any suspended loops so they re-create listen sockets.
func (e *Engine) ResumeAccept() error {
e.acceptPaused.Store(false)
e.mu.Lock()
defer e.mu.Unlock()
for _, l := range e.loops {
l.wakeMu.Lock()
if l.suspended.Load() {
close(l.wake)
l.wake = make(chan struct{})
l.suspended.Store(false)
}
l.wakeMu.Unlock()
}
return nil
}

Expand Down
Loading