Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion config/config_test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@ algorithm: "round-robin"
timeouts:
dial: 10s
connect: 20s
shutdown: 30s
shutdown: 30s
health_check: 5s
7 changes: 4 additions & 3 deletions config/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,10 @@ type BackendCfg struct {
}

type TimeoutsCfg struct {
Dial time.Duration `yaml:"dial"`
Connect time.Duration `yaml:"connect"`
Shutdown time.Duration `yaml:"shutdown"`
Dial time.Duration `yaml:"dial"`
Connect time.Duration `yaml:"connect"`
Shutdown time.Duration `yaml:"shutdown"`
HealthCheck time.Duration `yaml:"health_check"`
}

type Config struct {
Expand Down
58 changes: 58 additions & 0 deletions healthcheck/healthchecker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package healthcheck

import (
"context"
"load-balancer/backend"
"log"
"net"
"sync"
"time"
)

type BackendPoolIO interface {
GetPool() []*backend.Backend
}

type HealthChecker struct {
bp BackendPoolIO
}

func NewHealthChecker(bp BackendPoolIO) *HealthChecker {
if bp == nil {
panic("backend pool cannot be nil")
}
return &HealthChecker{bp: bp}
}

func (hc *HealthChecker) Run(ctx context.Context, duration time.Duration) {
ticker := time.NewTicker(duration)
defer ticker.Stop()
for {
select {
case <-ticker.C:
hc.RunOnce()
case <-ctx.Done():
return
}
}
}

func (hc *HealthChecker) RunOnce() {
bp := hc.bp.GetPool()
wg := sync.WaitGroup{}
for _, b := range bp {
wg.Add(1)
go func() {
defer wg.Done()
conn, err := net.DialTimeout("tcp", b.GetUrl(), 10*time.Second)
if err != nil {
b.SetHealth(false)
log.Printf("dial %s: %v", b.GetUrl(), err)
return
}
b.SetHealth(true)
conn.Close()
}()
}
wg.Wait()
}
95 changes: 95 additions & 0 deletions healthcheck/healthchecker_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package healthcheck

import (
"context"
"load-balancer/backend"
"net"
"testing"
"time"
)

type stubPool struct {
backends []*backend.Backend
}

func (s *stubPool) GetPool() []*backend.Backend {
return s.backends
}

func startTCPServer(t *testing.T) string {
t.Helper()
ln, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Fatal(err)
}
t.Cleanup(func() { ln.Close() })
go func() {
for {
conn, err := ln.Accept()
if err != nil {
return
}
conn.Close()
}
}()
return ln.Addr().String()
}

func TestRunOnceMarksBackendHealthy(t *testing.T) {
addr := startTCPServer(t)
b := backend.NewBackend(backend.BackendOptions{Url: addr})
hc := NewHealthChecker(&stubPool{backends: []*backend.Backend{b}})

hc.RunOnce()

if !b.IsHealthy() {
t.Error("expected backend to be marked healthy")
}
}

func TestRunOnceMarksBackendUnhealthy(t *testing.T) {
b := backend.NewBackend(backend.BackendOptions{Url: "127.0.0.1:1"})
hc := NewHealthChecker(&stubPool{backends: []*backend.Backend{b}})

hc.RunOnce()

if b.IsHealthy() {
t.Error("expected backend to be marked unhealthy")
}
}

func TestRunOnceChecksAllBackends(t *testing.T) {
healthyAddr := startTCPServer(t)
healthy := backend.NewBackend(backend.BackendOptions{Url: healthyAddr})
unhealthy := backend.NewBackend(backend.BackendOptions{Url: "127.0.0.1:1"})

hc := NewHealthChecker(&stubPool{backends: []*backend.Backend{healthy, unhealthy}})
hc.RunOnce()

if !healthy.IsHealthy() {
t.Error("expected healthy backend to be marked healthy")
}
if unhealthy.IsHealthy() {
t.Error("expected unreachable backend to be marked unhealthy")
}
}

func TestRunStopsOnContextCancellation(t *testing.T) {
b := backend.NewBackend(backend.BackendOptions{Url: "127.0.0.1:1"})
hc := NewHealthChecker(&stubPool{backends: []*backend.Backend{b}})

ctx, cancel := context.WithCancel(context.Background())
cancel()

done := make(chan struct{})
go func() {
hc.Run(ctx, time.Hour)
close(done)
}()

select {
case <-done:
case <-time.After(time.Second):
t.Error("Run did not stop after context cancellation")
}
}
14 changes: 9 additions & 5 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"load-balancer/backend"
"load-balancer/config"
"load-balancer/healthcheck"
"load-balancer/listener"
"load-balancer/proxy"
"load-balancer/router"
Expand All @@ -26,10 +27,9 @@ func main() {
}

backends := make([]*backend.Backend, 0, len(conf.Backends))
for i, b := range conf.Backends {
for _, b := range conf.Backends {
backends = append(backends, backend.NewBackend(backend.BackendOptions{
Url: b.Url}))
backends[i].SetHealth(true)
}
bp := backend.NewBackendPool(backends)

Expand All @@ -48,14 +48,18 @@ func main() {
})
l := listener.NewListener(p)

hc := healthcheck.NewHealthChecker(bp)
hc.RunOnce()

ctx, cancel := context.WithTimeout(context.Background(), conf.Timeouts.Shutdown)
defer cancel()

go hc.Run(ctx, conf.Timeouts.HealthCheck)
go l.Listen(ln)

sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGTERM, syscall.SIGINT)
<-sigCh

ctx, cancel := context.WithTimeout(context.Background(), conf.Timeouts.Shutdown)
defer cancel()

l.GracefulShutdown(ctx)
}
Loading