diff --git a/ChangeLog.md b/ChangeLog.md index eb826dd..ae8c205 100644 --- a/ChangeLog.md +++ b/ChangeLog.md @@ -17,6 +17,74 @@ _No unreleased changes._ --- +## 26.13 — 2026-05-27 + +Change detection + alert sinks (P2-02). The agent now diffs the host +inventory pre- and post-cycle and emits `host.discovered` / +`host.vanished` events to a configurable HTTP webhook, syslog, or both. +Operators no longer have to grep logs to notice a new device appeared. + +### Added + +- **`internal/alerts` package** — `Event` (JSON-tagged for wire reuse), + `EventType` (`host.discovered`, `host.vanished`), `Emitter` + interface, `Multiplexer` that fans events out to N sinks in + parallel, `NoopEmitter` for the alerts-disabled deployment. +- **WebhookSink** — HTTP POST JSON. One retry on transient failures + (network error or 5xx); 4xx is final. Optional `Authorization` + header passed verbatim (so `Bearer …` and `Basic …` both work + without per-scheme code). +- **SyslogSink** — RFC 5424 over UDP/TCP. Hand-rolled because + stdlib `log/syslog` is Unix-only and the agent runs on Windows. + Message body is the same JSON as the webhook payload, so syslog + parsers (rsyslog mmjsonparse, syslog-ng, Splunk) get structured + fields for free. +- **`config.AlertsConfig`** — new optional top-level section: + ```json + "alerts": { + "webhook": { "url": "https://hooks.example/x", "auth_header": "Bearer …" }, + "syslog": { "addr": "udp://syslog.example:514", "tag": "inventory" } + } + ``` + Either or both sub-sections may be set; absence of both silently + disables alerting (no change for existing deployments). + +### Changed + +- **`agent.New` gained an `alerts.Emitter` parameter** (8th positional + arg). Pass `nil` for the noop emitter; the constructor substitutes + one automatically so tests stay terse. +- **`agent.runCycle` snapshots the host inventory** before scanning + and again after, then diffs the two by IP. Discovered hosts get the + fresh enrichment (Hostname/Vendor/DeviceType); vanished hosts get + the last-known pre-cycle row (since the post-cycle one is gone). + Cycle-failed runs intentionally skip the diff to avoid alert spam + on transient DB blips. +- **`mockHostStore.Upsert`** in the agent test suite now mirrors the + sqlite UPSERT (overwrite-on-conflict) so test stores stay parity + with production — same fix landed in the scanner mock during 26.11. + +### Tests + +- 11 new tests covering: multiplexer fan-out, sibling sinks surviving + a peer-sink error, webhook auth-header round-trip, 5xx retry, 4xx + no-retry, nil-on-empty-URL/addr guards, syslog RFC 5424 format + against a real UDP listener (PRI calculation, JSON MSG, MSGID = + event type), bad-scheme rejection, agent-level diff producing + `host.vanished` on prune and `host.discovered` on a mid-cycle insert. + +### Notes + +- Sinks deliver in goroutines. Multiplexer.Emit returns immediately; + failures show up in slog warnings, not back at the call site. This + matches operator expectations for alert pipelines and keeps the + scan-cycle hot path off the network. +- Port-level events (`port.appeared` / `port.vanished`) and watchdog + events are deliberately deferred — host-level coverage satisfies the + headline operator ask first. + +--- + ## 26.12 — 2026-05-27 Service / application discovery (P2-01). Turns "port 22 is open" into diff --git a/cmd/internal/runtime/runtime.go b/cmd/internal/runtime/runtime.go index 31a20a6..80eb56c 100644 --- a/cmd/internal/runtime/runtime.go +++ b/cmd/internal/runtime/runtime.go @@ -22,6 +22,7 @@ import ( "github.com/Ronin48/NetworkInventoryAgent/internal/admin" "github.com/Ronin48/NetworkInventoryAgent/internal/agent" + "github.com/Ronin48/NetworkInventoryAgent/internal/alerts" "github.com/Ronin48/NetworkInventoryAgent/internal/config" "github.com/Ronin48/NetworkInventoryAgent/internal/health" "github.com/Ronin48/NetworkInventoryAgent/internal/logging" @@ -131,7 +132,13 @@ func Run(opts Options) int { } slog.Info("health server started", "addr", healthSrv.Addr(), "tls", healthTLS != nil) - a := agent.New(opts.Name, cfg.Scanner, db.Hosts(), db.Ports(), db.Scans(), tracker) + alertSinks := buildAlertSinks(cfg.Alerts) + mux := alerts.NewMultiplexer(alertSinks...) + if len(alertSinks) > 0 { + slog.Info("alert sinks configured", "count", len(alertSinks)) + } + + a := agent.New(opts.Name, cfg.Scanner, db.Hosts(), db.Ports(), db.Scans(), tracker, mux) adminSrv, err := admin.NewServer( cfg.Admin.Addr, opts.Name, @@ -185,10 +192,31 @@ func Run(opts Options) int { if err := traceShutdown(shutdownCtx); err != nil { slog.Warn("tracer shutdown error", "err", err) } + mux.Close() slog.Info("agent stopped", "name", opts.Name) return 0 } +// buildAlertSinks instantiates the configured event sinks. A sink that +// fails to construct (bad syslog URL, etc.) is logged and skipped; the +// other sinks still ship. Returns nil when nothing is configured, which +// NewMultiplexer handles cleanly. +func buildAlertSinks(cfg config.AlertsConfig) []alerts.Sink { + var sinks []alerts.Sink + if w := alerts.NewWebhookSink(cfg.Webhook.URL, cfg.Webhook.AuthHeader); w != nil { + sinks = append(sinks, w) + } + if cfg.Syslog.Addr != "" { + s, err := alerts.NewSyslogSink(cfg.Syslog.Addr, cfg.Syslog.Tag, cfg.Syslog.Facility) + if err != nil { + slog.Warn("syslog sink disabled", "err", err) + } else if s != nil { + sinks = append(sinks, s) + } + } + return sinks +} + // buildPeerClient assembles the watchdog's health.Client with a // tracing-instrumented transport. TLS is layered onto the transport before // otelhttp wraps it, so spans cover the full handshake-through-read path. diff --git a/internal/agent/agent.go b/internal/agent/agent.go index 5bb4590..9efc6d4 100644 --- a/internal/agent/agent.go +++ b/internal/agent/agent.go @@ -8,11 +8,13 @@ import ( "log/slog" "time" + "github.com/Ronin48/NetworkInventoryAgent/internal/alerts" "github.com/Ronin48/NetworkInventoryAgent/internal/config" "github.com/Ronin48/NetworkInventoryAgent/internal/health" "github.com/Ronin48/NetworkInventoryAgent/internal/metrics" "github.com/Ronin48/NetworkInventoryAgent/internal/scanner" "github.com/Ronin48/NetworkInventoryAgent/internal/store" + "github.com/Ronin48/NetworkInventoryAgent/models" ) // Agent runs a periodic scan loop and publishes results to a health.Tracker. @@ -22,6 +24,7 @@ type Agent struct { hosts store.HostStore scanner *scanner.Scanner tracker *health.Tracker + alerts alerts.Emitter now func() time.Time // trigger is a buffered channel that lets external callers @@ -31,7 +34,10 @@ type Agent struct { } // New creates an Agent. The caller is responsible for starting the health -// server and watchdog; this constructor only wires the scan loop. +// server and watchdog; this constructor only wires the scan loop. Pass +// alerts.NoopEmitter() when alerts are unconfigured (the constructor +// substitutes one if alertEmitter is nil to avoid an awkward call-site +// guard at every binding). func New( name string, cfg config.ScannerConfig, @@ -39,7 +45,11 @@ func New( ports store.PortStore, scans store.ScanStore, tracker *health.Tracker, + alertEmitter alerts.Emitter, ) *Agent { + if alertEmitter == nil { + alertEmitter = alerts.NoopEmitter() + } return &Agent{ name: name, cfg: cfg, @@ -58,6 +68,7 @@ func New( EnrichARP: cfg.EnrichARP, }), tracker: tracker, + alerts: alertEmitter, now: time.Now, trigger: make(chan struct{}, 1), } @@ -105,6 +116,15 @@ func (a *Agent) Run(ctx context.Context) { func (a *Agent) runCycle(ctx context.Context, log *slog.Logger) { log.Info("scan cycle started", "subnets", len(a.cfg.Subnets)) started := a.now() + + // Snapshot the pre-cycle host inventory so we can diff it against + // the post-cycle list and fire HostDiscovered / HostVanished events. + // Snapshotting before the scan (rather than tracking what the + // scanner returned) means the diff correctly reflects "ground truth + // changed", including hosts the operator added or removed + // externally. + prevHosts := snapshotByIP(ctx, a.hosts, log) + cycleHosts := 0 cycleHealthy := true for _, subnet := range a.cfg.Subnets { @@ -125,6 +145,13 @@ func (a *Agent) runCycle(ctx context.Context, log *slog.Logger) { log.Info("pruned stale hosts", "count", pruned) } + // Diff and fire events. Only meaningful when the cycle didn't + // itself fail mid-way — declaring hosts "vanished" because of a + // transient DB error would be alert spam. + if cycleHealthy { + a.emitChangeEvents(ctx, log, prevHosts, started) + } + // Use the actual DB count so the tracker reflects total accumulated // inventory, not just hosts found in this cycle. total, err := a.hosts.Count(ctx) @@ -181,3 +208,59 @@ func (a *Agent) pruneStale(ctx context.Context, log *slog.Logger, now time.Time) } return pruned } + +// snapshotByIP lists the current host inventory keyed by IP. Used pre- +// cycle so the change-detection diff has a stable view to compare +// against. A List failure logs and returns nil — the diff will then +// produce no events (better than misleading ones based on a partial set). +func snapshotByIP(ctx context.Context, hs store.HostStore, log *slog.Logger) map[string]*models.Host { + hosts, err := hs.List(ctx) + if err != nil { + log.Warn("change-detect: pre-cycle snapshot failed", "err", err) + return nil + } + out := make(map[string]*models.Host, len(hosts)) + for _, h := range hosts { + out[h.IPAddress] = h + } + return out +} + +// emitChangeEvents compares the post-cycle host set with the pre-cycle +// snapshot and fires one alert event per change. Discovered events use +// the post-cycle enrichment; vanished events use the pre-cycle row +// (since the post-cycle one is gone). +func (a *Agent) emitChangeEvents(ctx context.Context, log *slog.Logger, prev map[string]*models.Host, cycleStart time.Time) { + curr := snapshotByIP(ctx, a.hosts, log) + if curr == nil { + return + } + for ip, h := range curr { + if _, was := prev[ip]; !was { + a.alerts.Emit(ctx, alerts.Event{ + Type: alerts.HostDiscovered, + IP: ip, + Hostname: h.Hostname, + MACAddress: h.MACAddress, + Vendor: h.Vendor, + DeviceType: h.DeviceType, + Time: cycleStart, + Agent: a.name, + }) + } + } + for ip, h := range prev { + if _, still := curr[ip]; !still { + a.alerts.Emit(ctx, alerts.Event{ + Type: alerts.HostVanished, + IP: ip, + Hostname: h.Hostname, + MACAddress: h.MACAddress, + Vendor: h.Vendor, + DeviceType: h.DeviceType, + Time: cycleStart, + Agent: a.name, + }) + } + } +} diff --git a/internal/agent/agent_test.go b/internal/agent/agent_test.go index 8596eed..37ddea9 100644 --- a/internal/agent/agent_test.go +++ b/internal/agent/agent_test.go @@ -12,12 +12,33 @@ import ( "github.com/stretchr/testify/require" "github.com/Ronin48/NetworkInventoryAgent/internal/agent" + "github.com/Ronin48/NetworkInventoryAgent/internal/alerts" "github.com/Ronin48/NetworkInventoryAgent/internal/config" "github.com/Ronin48/NetworkInventoryAgent/internal/health" "github.com/Ronin48/NetworkInventoryAgent/internal/store" "github.com/Ronin48/NetworkInventoryAgent/models" ) +// recordingEmitter captures events for diff assertions. +type recordingEmitter struct { + mu sync.Mutex + events []alerts.Event +} + +func (r *recordingEmitter) Emit(_ context.Context, ev alerts.Event) { + r.mu.Lock() + defer r.mu.Unlock() + r.events = append(r.events, ev) +} + +func (r *recordingEmitter) snapshot() []alerts.Event { + r.mu.Lock() + defer r.mu.Unlock() + out := make([]alerts.Event, len(r.events)) + copy(out, r.events) + return out +} + // --- minimal mock stores --- type mockHostStore struct { @@ -27,6 +48,13 @@ type mockHostStore struct { listErr error countErr error deleteErr error + // onListInsert, when non-nil, is upserted into the store BEFORE the + // SECOND List() call returns — simulating "a new host appeared + // between the pre- and post-cycle snapshots" without needing a real + // scanner. Counted by listCalls so the hook fires exactly once, + // during the post-cycle list. + onListInsert *models.Host + listCalls int } func newMockHostStore() *mockHostStore { @@ -60,6 +88,14 @@ func (m *mockHostStore) List(_ context.Context) ([]*models.Host, error) { if m.listErr != nil { return nil, m.listErr } + m.listCalls++ + if m.onListInsert != nil && m.listCalls == 2 { + m.nextID++ + clone := *m.onListInsert + clone.ID = m.nextID + m.hosts[clone.ID] = &clone + m.onListInsert = nil + } out := make([]*models.Host, 0, len(m.hosts)) for _, h := range m.hosts { out = append(out, h) @@ -147,6 +183,7 @@ func TestAgent_TriggerCoalesces(t *testing.T) { mockPortStore{}, newMockScanStore(), health.NewTracker("test"), + nil, ) assert.True(t, a.Trigger(), "first Trigger() must enqueue") assert.False(t, a.Trigger(), "second Trigger() must coalesce, not enqueue") @@ -168,6 +205,7 @@ func TestAgent_CycleMarksHealthyOnCleanRun(t *testing.T) { mockPortStore{}, newMockScanStore(), tracker, + nil, ) ctx, cancel := context.WithTimeout(context.Background(), 75*time.Millisecond) @@ -196,6 +234,7 @@ func TestAgent_CycleMarksUnhealthyOnCountFailure(t *testing.T) { mockPortStore{}, newMockScanStore(), tracker, + nil, ) ctx, cancel := context.WithTimeout(context.Background(), 75*time.Millisecond) @@ -232,6 +271,7 @@ func TestAgent_PrunesStaleHosts(t *testing.T) { mockPortStore{}, newMockScanStore(), tracker, + nil, ) ctx, cancel := context.WithTimeout(context.Background(), 75*time.Millisecond) @@ -265,6 +305,7 @@ func TestAgent_PruneDisabledWithoutTTL(t *testing.T) { mockPortStore{}, newMockScanStore(), health.NewTracker("test"), + nil, ) ctx, cancel := context.WithTimeout(context.Background(), 75*time.Millisecond) @@ -275,3 +316,88 @@ func TestAgent_PruneDisabledWithoutTTL(t *testing.T) { require.NoError(t, err) assert.Len(t, remaining, 1, "with HostTTL=0 no pruning should happen") } + +// TestAgent_EmitsHostVanishedOnPrune verifies that a host pruned via +// the HostTTL path produces a host.vanished alert. The discovered path +// is exercised indirectly: a host present in the pre-cycle snapshot but +// gone from the post-cycle snapshot (due to prune) must alert. +func TestAgent_EmitsHostVanishedOnPrune(t *testing.T) { + hosts := newMockHostStore() + _, err := hosts.Upsert(context.Background(), &models.Host{ + IPAddress: "10.0.0.42", + LastSeen: time.Now().Add(-24 * time.Hour), + }) + require.NoError(t, err) + + rec := &recordingEmitter{} + a := agent.New( + "test", + config.ScannerConfig{ + Subnets: nil, + ScanInterval: config.Duration{Duration: 50 * time.Millisecond}, + HostTTL: config.Duration{Duration: time.Hour}, + }, + hosts, + mockPortStore{}, + newMockScanStore(), + health.NewTracker("test"), + rec, + ) + + ctx, cancel := context.WithTimeout(context.Background(), 75*time.Millisecond) + defer cancel() + a.Run(ctx) + + evs := rec.snapshot() + require.NotEmpty(t, evs, "prune should have fired at least one vanished event") + found := false + for _, ev := range evs { + if ev.Type == alerts.HostVanished && ev.IP == "10.0.0.42" { + found = true + assert.Equal(t, "test", ev.Agent) + } + } + assert.True(t, found, "expected host.vanished for 10.0.0.42; got %v", evs) +} + +// TestAgent_EmitsHostDiscoveredOnNewHost verifies that a host inserted +// mid-cycle (here, pre-seeded via the mock between snapshots) produces +// a host.discovered alert. +func TestAgent_EmitsHostDiscoveredOnNewHost(t *testing.T) { + hosts := newMockHostStore() + hosts.onListInsert = &models.Host{ + IPAddress: "10.0.0.99", + Hostname: "newbox.local", + DeviceType: "linux-host", + LastSeen: time.Now(), + } + + rec := &recordingEmitter{} + a := agent.New( + "test", + config.ScannerConfig{ + Subnets: nil, + ScanInterval: config.Duration{Duration: 50 * time.Millisecond}, + }, + hosts, + mockPortStore{}, + newMockScanStore(), + health.NewTracker("test"), + rec, + ) + + ctx, cancel := context.WithTimeout(context.Background(), 75*time.Millisecond) + defer cancel() + a.Run(ctx) + + evs := rec.snapshot() + found := false + for _, ev := range evs { + if ev.Type == alerts.HostDiscovered && ev.IP == "10.0.0.99" { + found = true + assert.Equal(t, "newbox.local", ev.Hostname) + assert.Equal(t, "linux-host", ev.DeviceType) + } + } + assert.True(t, found, "expected host.discovered for 10.0.0.99; got %v", evs) +} diff --git a/internal/alerts/alerts.go b/internal/alerts/alerts.go new file mode 100644 index 0000000..9d7d00d --- /dev/null +++ b/internal/alerts/alerts.go @@ -0,0 +1,130 @@ +// Package alerts dispatches host-level inventory change events to one or +// more configured sinks (HTTP webhook, syslog). It is intentionally fire- +// and-forget — sinks log their failures and the agent keeps scanning, +// because an unreachable webhook should not stop inventory collection. +// +// Event sources today: +// +// HostDiscovered — IP appeared in the post-cycle host list but not +// the pre-cycle one. Includes the host's IP plus +// any enrichment fields (Hostname, Vendor, +// DeviceType) that landed during the same cycle. +// HostVanished — IP was in the pre-cycle list but not after the +// cycle (TTL prune deleted it, or it was removed +// some other way). Includes the last-known +// enrichment so the alert payload is useful even +// though the host record is already gone. +// +// Port-level events (PortAppeared/PortVanished) and watchdog events are +// intentionally out of scope for the initial release — the host-level +// events satisfy the headline "something new appeared on my network" +// use case, which is what operators ask for first. +package alerts + +import ( + "context" + "log/slog" + "time" +) + +// EventType is a stable string identifier suitable for webhook routing. +// Values match the JSON wire form ("host.discovered", not "HostDiscovered") +// so downstream consumers can switch on them without case folding. +type EventType string + +const ( + HostDiscovered EventType = "host.discovered" + HostVanished EventType = "host.vanished" +) + +// Event is the wire shape sent to every sink. Fields are JSON-tagged so +// the same struct serialises directly into the webhook body and into the +// MSG portion of an RFC 5424 syslog line. +type Event struct { + Type EventType `json:"type"` + IP string `json:"ip"` + Hostname string `json:"hostname,omitempty"` + MACAddress string `json:"mac_address,omitempty"` + Vendor string `json:"vendor,omitempty"` + DeviceType string `json:"device_type,omitempty"` + Time time.Time `json:"time"` + Agent string `json:"agent"` +} + +// Emitter is the surface the agent depends on. Multiplexer is the +// production implementation; tests use a recording fake. +type Emitter interface { + Emit(ctx context.Context, ev Event) +} + +// Sink is one downstream destination. Implementations live next to this +// file (webhook.go, syslog.go); third-party additions just satisfy the +// interface. +type Sink interface { + // Emit delivers the event. A non-nil error gets logged but does not + // propagate further — the Multiplexer always tries the remaining + // sinks even when one fails. + Emit(ctx context.Context, ev Event) error + // Name identifies the sink in failure logs (e.g. "webhook", + // "syslog"). No need for uniqueness across instances; it's a + // debugging string, not a key. + Name() string + // Close is called when the agent shuts down. + Close() error +} + +// Multiplexer fans an event out to every configured sink. The zero value +// is valid and emits nowhere — useful for "alerts disabled" deployments. +type Multiplexer struct { + sinks []Sink +} + +// NewMultiplexer wraps a slice of sinks. nil entries are skipped so +// callers can pass `[]Sink{NewWebhookSink(...), NewSyslogSink(...)}` +// and either constructor returning nil silently drops it. +func NewMultiplexer(sinks ...Sink) *Multiplexer { + m := &Multiplexer{} + for _, s := range sinks { + if s != nil { + m.sinks = append(m.sinks, s) + } + } + return m +} + +// Emit delivers ev to every sink in parallel. Sink failures are logged +// but do not cancel sibling deliveries — a flapping webhook must not +// suppress a syslog audit trail. +func (m *Multiplexer) Emit(ctx context.Context, ev Event) { + for _, s := range m.sinks { + go func(sink Sink) { + if err := sink.Emit(ctx, ev); err != nil { + slog.Warn("alert sink error", + "sink", sink.Name(), + "event", string(ev.Type), + "ip", ev.IP, + "err", err) + } + }(s) + } +} + +// Close flushes every sink. Errors are logged but a failing close on one +// sink doesn't skip the others. +func (m *Multiplexer) Close() { + for _, s := range m.sinks { + if err := s.Close(); err != nil { + slog.Warn("alert sink close error", "sink", s.Name(), "err", err) + } + } +} + +// noopEmitter is what the agent gets when alerts are unconfigured. Avoids +// nil checks at every Emit call site. +type noopEmitter struct{} + +func (noopEmitter) Emit(context.Context, Event) {} + +// NoopEmitter returns an Emitter that drops every event. Use this rather +// than passing nil to the agent constructor. +func NoopEmitter() Emitter { return noopEmitter{} } diff --git a/internal/alerts/alerts_test.go b/internal/alerts/alerts_test.go new file mode 100644 index 0000000..82a89ff --- /dev/null +++ b/internal/alerts/alerts_test.go @@ -0,0 +1,186 @@ +package alerts_test + +import ( + "context" + "encoding/json" + "io" + "net" + "net/http" + "net/http/httptest" + "strings" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/Ronin48/NetworkInventoryAgent/internal/alerts" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestMultiplexer_FansOutToAllSinks(t *testing.T) { + a := &recordingSink{name: "a"} + b := &recordingSink{name: "b"} + mux := alerts.NewMultiplexer(a, b) + + ev := alerts.Event{Type: alerts.HostDiscovered, IP: "10.0.0.1", Time: time.Now()} + mux.Emit(context.Background(), ev) + + // Multiplexer emits in goroutines — give them a beat. + deadline := time.Now().Add(time.Second) + for time.Now().Before(deadline) { + if a.count.Load() > 0 && b.count.Load() > 0 { + break + } + time.Sleep(5 * time.Millisecond) + } + assert.Equal(t, int32(1), a.count.Load()) + assert.Equal(t, int32(1), b.count.Load()) +} + +func TestMultiplexer_SinkErrorDoesNotStopOthers(t *testing.T) { + bad := &recordingSink{name: "bad", err: io.ErrUnexpectedEOF} + good := &recordingSink{name: "good"} + mux := alerts.NewMultiplexer(bad, good) + + mux.Emit(context.Background(), alerts.Event{Type: alerts.HostDiscovered, IP: "10.0.0.1"}) + + deadline := time.Now().Add(time.Second) + for time.Now().Before(deadline) { + if good.count.Load() > 0 { + break + } + time.Sleep(5 * time.Millisecond) + } + assert.Equal(t, int32(1), good.count.Load(), "a sibling sink must still receive the event") +} + +func TestNoopEmitter_DoesNotPanic(t *testing.T) { + e := alerts.NoopEmitter() + e.Emit(context.Background(), alerts.Event{}) +} + +func TestWebhookSink_PostsEvent(t *testing.T) { + var ( + mu sync.Mutex + body []byte + auth string + ) + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + mu.Lock() + defer mu.Unlock() + body, _ = io.ReadAll(r.Body) + auth = r.Header.Get("Authorization") + w.WriteHeader(http.StatusOK) + })) + defer srv.Close() + + sink := alerts.NewWebhookSink(srv.URL, "Bearer s3cret") + require.NotNil(t, sink) + err := sink.Emit(context.Background(), alerts.Event{ + Type: alerts.HostDiscovered, + IP: "10.0.0.5", + Time: time.Date(2026, 5, 27, 12, 0, 0, 0, time.UTC), + }) + require.NoError(t, err) + + mu.Lock() + defer mu.Unlock() + assert.Equal(t, "Bearer s3cret", auth, "auth header must round-trip") + var got alerts.Event + require.NoError(t, json.Unmarshal(body, &got)) + assert.Equal(t, alerts.HostDiscovered, got.Type) + assert.Equal(t, "10.0.0.5", got.IP) +} + +func TestWebhookSink_RetriesOn500(t *testing.T) { + var hits atomic.Int32 + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + if hits.Add(1) == 1 { + w.WriteHeader(http.StatusInternalServerError) + return + } + w.WriteHeader(http.StatusOK) + })) + defer srv.Close() + + sink := alerts.NewWebhookSink(srv.URL, "") + err := sink.Emit(context.Background(), alerts.Event{IP: "10.0.0.5"}) + require.NoError(t, err) + assert.Equal(t, int32(2), hits.Load(), "must retry once on 5xx") +} + +func TestWebhookSink_NoRetryOn400(t *testing.T) { + var hits atomic.Int32 + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + hits.Add(1) + w.WriteHeader(http.StatusBadRequest) + })) + defer srv.Close() + + sink := alerts.NewWebhookSink(srv.URL, "") + err := sink.Emit(context.Background(), alerts.Event{IP: "10.0.0.5"}) + require.Error(t, err) + assert.Equal(t, int32(1), hits.Load(), "4xx must not be retried") +} + +func TestWebhookSink_NilOnEmptyURL(t *testing.T) { + assert.Nil(t, alerts.NewWebhookSink("", "")) +} + +func TestSyslogSink_FormatsRFC5424OverUDP(t *testing.T) { + pc, err := net.ListenPacket("udp", "127.0.0.1:0") + require.NoError(t, err) + defer pc.Close() + + addr := "udp://" + pc.LocalAddr().String() + sink, err := alerts.NewSyslogSink(addr, "test-app", 16) + require.NoError(t, err) + defer func() { _ = sink.Close() }() + + go func() { + _ = sink.Emit(context.Background(), alerts.Event{ + Type: alerts.HostDiscovered, + IP: "10.0.0.5", + Time: time.Date(2026, 5, 27, 12, 0, 0, 0, time.UTC), + }) + }() + + buf := make([]byte, 1500) + _ = pc.SetReadDeadline(time.Now().Add(2 * time.Second)) + n, _, err := pc.ReadFrom(buf) + require.NoError(t, err) + line := string(buf[:n]) + + assert.True(t, strings.HasPrefix(line, "<134>1 "), + "facility 16 + severity 6 = PRI 134; got %q", line[:min(len(line), 20)]) + assert.Contains(t, line, "test-app", "tag must appear in APP-NAME field") + assert.Contains(t, line, "host.discovered", "MSGID must be the event type") + assert.Contains(t, line, `"ip":"10.0.0.5"`, "MSG should carry the event JSON") +} + +func TestSyslogSink_RejectsBadScheme(t *testing.T) { + _, err := alerts.NewSyslogSink("file:///tmp/syslog", "tag", 16) + assert.Error(t, err) +} + +func TestSyslogSink_NilOnEmptyAddr(t *testing.T) { + s, err := alerts.NewSyslogSink("", "tag", 16) + require.NoError(t, err) + assert.Nil(t, s) +} + +// --- helpers --- + +type recordingSink struct { + name string + err error + count atomic.Int32 +} + +func (r *recordingSink) Emit(_ context.Context, _ alerts.Event) error { + r.count.Add(1) + return r.err +} +func (r *recordingSink) Name() string { return r.name } +func (*recordingSink) Close() error { return nil } diff --git a/internal/alerts/syslog.go b/internal/alerts/syslog.go new file mode 100644 index 0000000..22cfdc1 --- /dev/null +++ b/internal/alerts/syslog.go @@ -0,0 +1,156 @@ +package alerts + +import ( + "context" + "encoding/json" + "fmt" + "net" + "net/url" + "os" + "strings" + "sync" + "time" +) + +// SyslogSink sends RFC 5424 syslog lines over UDP or TCP. Stdlib +// `log/syslog` only exists on Unix, but the agent runs on Windows too, so +// the protocol is hand-rolled here (it's tiny — one printf-style line). +// +// One persistent connection per sink instance (re-dialled on write +// failure). Best-effort: events are dropped when the receiver is +// unreachable, with the failure surfacing in the Multiplexer's slog +// output. Syslog is fire-and-forget by design; we honour that. +type SyslogSink struct { + addr string + scheme string // "udp" or "tcp" + tag string + facility int + hostname string + + mu sync.Mutex + conn net.Conn +} + +// NewSyslogSink parses addr as a URL ("udp://syslog.example:514" or +// "tcp://..."), establishes the first connection, and returns the sink. +// Returns nil when addr is empty so wiring stays guard-free at the call +// site. +// +// facility is the RFC 5424 facility number (0–23). Default 16 (local0). +// tag is the APP-NAME field; default "network-inventory". +func NewSyslogSink(addr, tag string, facility int) (*SyslogSink, error) { + if addr == "" { + return nil, nil + } + u, err := url.Parse(addr) + if err != nil { + return nil, fmt.Errorf("parse syslog addr %q: %w", addr, err) + } + scheme := strings.ToLower(u.Scheme) + if scheme != "udp" && scheme != "tcp" { + return nil, fmt.Errorf("syslog scheme %q not supported; use udp:// or tcp://", scheme) + } + if u.Host == "" { + return nil, fmt.Errorf("syslog addr missing host:port: %q", addr) + } + if tag == "" { + tag = "network-inventory" + } + if facility < 0 || facility > 23 { + facility = 16 // local0 + } + hostname, _ := os.Hostname() + if hostname == "" { + hostname = "-" + } + s := &SyslogSink{ + addr: u.Host, + scheme: scheme, + tag: tag, + facility: facility, + hostname: hostname, + } + // Open the first connection eagerly so config errors surface at + // boot, not on the first event. + if err := s.dial(); err != nil { + return nil, fmt.Errorf("dial syslog %s://%s: %w", scheme, u.Host, err) + } + return s, nil +} + +// Name implements Sink. +func (*SyslogSink) Name() string { return "syslog" } + +// Emit writes one RFC 5424 line. On a write failure the connection is +// closed and re-dialled once; if the redial succeeds the line is retried. +func (s *SyslogSink) Emit(ctx context.Context, ev Event) error { + line, err := s.format(ev) + if err != nil { + return err + } + s.mu.Lock() + defer s.mu.Unlock() + + // Honour ctx deadline if it's tighter than our default. + deadline, ok := ctx.Deadline() + if !ok { + deadline = time.Now().Add(2 * time.Second) + } + if s.conn != nil { + _ = s.conn.SetWriteDeadline(deadline) + if _, err := s.conn.Write([]byte(line)); err == nil { + return nil + } + _ = s.conn.Close() + s.conn = nil + } + if err := s.dial(); err != nil { + return err + } + _ = s.conn.SetWriteDeadline(deadline) + _, err = s.conn.Write([]byte(line)) + return err +} + +// Close releases the underlying socket. +func (s *SyslogSink) Close() error { + s.mu.Lock() + defer s.mu.Unlock() + if s.conn != nil { + err := s.conn.Close() + s.conn = nil + return err + } + return nil +} + +func (s *SyslogSink) dial() error { + c, err := net.DialTimeout(s.scheme, s.addr, 2*time.Second) + if err != nil { + return err + } + s.conn = c + return nil +} + +// format builds an RFC 5424 line: +// +// VERSION TIMESTAMP HOSTNAME APP-NAME PROCID MSGID - MSG\n +// +// MSG is the JSON-encoded Event, which lets downstream syslog parsers +// (rsyslog mmjsonparse, syslog-ng's json-parser, Splunk INDEXED_EXTRACTIONS) +// pull the same fields the webhook receiver gets, no separate format. +func (s *SyslogSink) format(ev Event) (string, error) { + const severityInfo = 6 // Informational + pri := s.facility*8 + severityInfo + ts := ev.Time.UTC().Format(time.RFC3339Nano) + + body, err := json.Marshal(ev) + if err != nil { + return "", fmt.Errorf("marshal event: %w", err) + } + procid := os.Getpid() + msgid := string(ev.Type) + return fmt.Sprintf("<%d>1 %s %s %s %d %s - %s\n", + pri, ts, s.hostname, s.tag, procid, msgid, body), nil +} diff --git a/internal/alerts/webhook.go b/internal/alerts/webhook.go new file mode 100644 index 0000000..c62a2a2 --- /dev/null +++ b/internal/alerts/webhook.go @@ -0,0 +1,87 @@ +package alerts + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "net/http" + "time" +) + +// WebhookSink POSTs each event as JSON to a single configured URL. +// One retry on transient network/5xx failures; anything else is on the +// operator's webhook receiver to handle (idempotency via Event.Time + +// Event.IP, for example). +type WebhookSink struct { + url string + authHeader string + client *http.Client +} + +// NewWebhookSink builds a sink. Returns nil when url is empty so callers +// can wire it unconditionally from config without a guard. +// +// authHeader, when non-empty, is sent verbatim as the `Authorization` +// header on every request — operators choose `Bearer ` or +// `Basic ` depending on their receiver. +func NewWebhookSink(url, authHeader string) *WebhookSink { + if url == "" { + return nil + } + return &WebhookSink{ + url: url, + authHeader: authHeader, + client: &http.Client{Timeout: 5 * time.Second}, + } +} + +// Name implements Sink. +func (*WebhookSink) Name() string { return "webhook" } + +// Emit POSTs the event. One retry on transient failure (network error or +// 5xx). 4xx is not retried because the receiver has rejected the payload +// and another identical try will not help. +func (w *WebhookSink) Emit(ctx context.Context, ev Event) error { + body, err := json.Marshal(ev) + if err != nil { + return fmt.Errorf("marshal event: %w", err) + } + var lastErr error + for attempt := 0; attempt < 2; attempt++ { + if attempt > 0 { + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(500 * time.Millisecond): + } + } + req, err := http.NewRequestWithContext(ctx, http.MethodPost, w.url, bytes.NewReader(body)) + if err != nil { + return fmt.Errorf("build request: %w", err) + } + req.Header.Set("Content-Type", "application/json") + req.Header.Set("User-Agent", "NetworkInventoryAgent") + if w.authHeader != "" { + req.Header.Set("Authorization", w.authHeader) + } + resp, err := w.client.Do(req) + if err != nil { + lastErr = err + continue + } + _ = resp.Body.Close() + if resp.StatusCode >= 200 && resp.StatusCode < 300 { + return nil + } + if resp.StatusCode >= 500 { + lastErr = fmt.Errorf("webhook returned %d", resp.StatusCode) + continue + } + return fmt.Errorf("webhook returned %d", resp.StatusCode) + } + return lastErr +} + +// Close is a no-op — http.Client has no resources to free. +func (*WebhookSink) Close() error { return nil } diff --git a/internal/config/config.go b/internal/config/config.go index 3f84335..f351964 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -49,6 +49,34 @@ type Config struct { Admin AdminConfig `json:"admin"` Watchdog WatchdogConfig `json:"watchdog"` Tracing TracingConfig `json:"tracing,omitempty"` + Alerts AlertsConfig `json:"alerts,omitempty"` +} + +// AlertsConfig configures change-detection event sinks. Either or both +// sub-sections may be set; absence of both silently disables alerting. +type AlertsConfig struct { + Webhook WebhookConfig `json:"webhook,omitempty"` + Syslog SyslogConfig `json:"syslog,omitempty"` +} + +// WebhookConfig is the HTTP-POST-with-JSON sink. +type WebhookConfig struct { + // URL is the receiver endpoint. Empty disables this sink. + URL string `json:"url,omitempty"` + // AuthHeader, if non-empty, is sent verbatim as the Authorization + // header (e.g. "Bearer abc123" or "Basic dXNlcjpwYXNz"). + AuthHeader string `json:"auth_header,omitempty"` +} + +// SyslogConfig is the RFC 5424 over UDP/TCP sink. +type SyslogConfig struct { + // Addr is a URL like "udp://syslog.example.com:514" or "tcp://...". + // Empty disables this sink. + Addr string `json:"addr,omitempty"` + // Tag is the APP-NAME field. Default "network-inventory". + Tag string `json:"tag,omitempty"` + // Facility is the RFC 5424 facility number 0..23. Default 16 (local0). + Facility int `json:"facility,omitempty"` } // TracingConfig controls the OpenTelemetry exporter. When Endpoint is empty