Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions backend/internal/adapters/telemetry/fanout.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package telemetry

import (
"context"
"errors"

"github.com/aoagents/agent-orchestrator/backend/internal/ports"
)

// FanoutSink emits each event to multiple sinks.
type FanoutSink struct {
sinks []ports.EventSink
}

// NewFanoutSink builds a sink that forwards each event to every non-nil sink.
func NewFanoutSink(sinks ...ports.EventSink) *FanoutSink {
filtered := make([]ports.EventSink, 0, len(sinks))
for _, sink := range sinks {
if sink != nil {
filtered = append(filtered, sink)
}
}
return &FanoutSink{sinks: filtered}
}

// Emit forwards the event to each configured sink.
func (s *FanoutSink) Emit(ctx context.Context, ev ports.TelemetryEvent) {
for _, sink := range s.sinks {
sink.Emit(ctx, ev)
}
}

// Close closes every configured sink and joins any returned errors.
func (s *FanoutSink) Close(ctx context.Context) error {
var errs []error
for _, sink := range s.sinks {
if err := sink.Close(ctx); err != nil {
errs = append(errs, err)
}
}
return errors.Join(errs...)
}
124 changes: 124 additions & 0 deletions backend/internal/adapters/telemetry/localsqlite.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package telemetry

import (
"context"
"encoding/json"
"log/slog"
"sync"
"time"

"github.com/google/uuid"

"github.com/aoagents/agent-orchestrator/backend/internal/ports"
sqlitestore "github.com/aoagents/agent-orchestrator/backend/internal/storage/sqlite/store"
)

const (
localBufferSize = 128
localRetention = 30 * 24 * time.Hour
localPruneEvery = time.Hour
localPruneBatchLimit = int64(1000)
)

type localStore interface {
CreateTelemetryEvent(ctx context.Context, rec sqlitestore.TelemetryEventRecord) error
PruneTelemetryEventsBefore(ctx context.Context, before time.Time, limit int64) (int64, error)
}

// LocalSQLiteSink persists telemetry events into the daemon's SQLite database
// behind a small buffered worker so event emission stays best-effort.
type LocalSQLiteSink struct {
store localStore
log *slog.Logger
ch chan ports.TelemetryEvent
wg sync.WaitGroup
closeOnce sync.Once
now func() time.Time
newID func() string

pruneMu sync.Mutex
lastPrune time.Time
}

// NewLocalSQLiteSink starts a buffered SQLite-backed telemetry sink.
func NewLocalSQLiteSink(store localStore, log *slog.Logger) *LocalSQLiteSink {
s := &LocalSQLiteSink{
store: store,
log: log,
ch: make(chan ports.TelemetryEvent, localBufferSize),
now: time.Now,
newID: func() string { return "tev_" + uuid.NewString() },
}
s.wg.Add(1)
go s.loop()
return s
}

// Emit enqueues an event for best-effort persistence.
func (s *LocalSQLiteSink) Emit(_ context.Context, ev ports.TelemetryEvent) {
select {
case s.ch <- ev:
default:
s.log.Warn("telemetry local sink buffer full; dropping event", "name", ev.Name, "source", ev.Source)
}
}

// Close drains the worker until completion or context cancellation.
func (s *LocalSQLiteSink) Close(ctx context.Context) error {
s.closeOnce.Do(func() { close(s.ch) })
done := make(chan struct{})
go func() {
defer close(done)
s.wg.Wait()
}()
select {
case <-ctx.Done():
return ctx.Err()
case <-done:
return nil
}
}

func (s *LocalSQLiteSink) loop() {
defer s.wg.Done()
for ev := range s.ch {
s.persist(ev)
}
}

func (s *LocalSQLiteSink) persist(ev ports.TelemetryEvent) {
payloadJSON, err := json.Marshal(ev.Payload)
if err != nil {
s.log.Warn("telemetry payload marshal failed", "name", ev.Name, "error", err)
return
}
rec := sqlitestore.TelemetryEventRecord{
ID: s.newID(),
OccurredAt: ev.OccurredAt.UTC(),
Name: ev.Name,
Source: ev.Source,
Level: string(ev.Level),
ProjectID: ev.ProjectID,
SessionID: ev.SessionID,
RequestID: ev.RequestID,
PayloadJSON: string(payloadJSON),
}
if err := s.store.CreateTelemetryEvent(context.Background(), rec); err != nil {
s.log.Warn("telemetry local sink write failed", "name", ev.Name, "error", err)
return
}
s.maybePrune()
}

func (s *LocalSQLiteSink) maybePrune() {
s.pruneMu.Lock()
defer s.pruneMu.Unlock()
now := s.now().UTC()
if !s.lastPrune.IsZero() && now.Sub(s.lastPrune) < localPruneEvery {
return
}
s.lastPrune = now
if _, err := s.store.PruneTelemetryEventsBefore(context.Background(), now.Add(-localRetention), localPruneBatchLimit); err != nil {
s.log.Warn("telemetry local sink prune failed", "error", err)
}
}
16 changes: 16 additions & 0 deletions backend/internal/adapters/telemetry/noop.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package telemetry

import (
"context"

"github.com/aoagents/agent-orchestrator/backend/internal/ports"
)

// NoopSink discards every event.
type NoopSink struct{}

// Emit discards the event.
func (NoopSink) Emit(context.Context, ports.TelemetryEvent) {}

// Close is a no-op.
func (NoopSink) Close(context.Context) error { return nil }
180 changes: 180 additions & 0 deletions backend/internal/adapters/telemetry/posthog.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
package telemetry

import (
"bytes"
"context"
"crypto/sha256"
"encoding/hex"
"encoding/json"
"fmt"
"io"
"log/slog"
"net/http"
"os"
"path/filepath"
"strings"
"sync"
"time"

"github.com/google/uuid"

"github.com/aoagents/agent-orchestrator/backend/internal/ports"
)

const postHogBufferSize = 128

type postHogClient interface {
Do(req *http.Request) (*http.Response, error)
}

// PostHogSink exports allowlisted telemetry events to PostHog.
type PostHogSink struct {
apiKey string
host string
distinctID string
client postHogClient
log *slog.Logger
ch chan ports.TelemetryEvent
wg sync.WaitGroup
closeOnce sync.Once
}

// NewPostHogSink starts a buffered PostHog exporter with a stable install ID.
func NewPostHogSink(dataDir, apiKey, host string, client postHogClient, log *slog.Logger) (*PostHogSink, error) {
if strings.TrimSpace(apiKey) == "" {
return nil, fmt.Errorf("posthog api key is required")
}
if strings.TrimSpace(host) == "" {
return nil, fmt.Errorf("posthog host is required")
}
if client == nil {
client = &http.Client{Timeout: 5 * time.Second}
}
distinctID, err := loadOrCreateInstallID(dataDir)
if err != nil {
return nil, err
}
s := &PostHogSink{
apiKey: apiKey,
host: strings.TrimRight(host, "/"),
distinctID: distinctID,
client: client,
log: telemetryLogger(log),
ch: make(chan ports.TelemetryEvent, postHogBufferSize),
}
s.wg.Add(1)
go s.loop()
return s, nil
}

// Emit enqueues an event for best-effort export.
func (s *PostHogSink) Emit(_ context.Context, ev ports.TelemetryEvent) {
select {
case s.ch <- ev:
default:
s.log.Warn("telemetry posthog sink buffer full; dropping event", "name", ev.Name, "source", ev.Source)
}
}

// Close drains the exporter until completion or context cancellation.
func (s *PostHogSink) Close(ctx context.Context) error {
s.closeOnce.Do(func() { close(s.ch) })
done := make(chan struct{})
go func() {
defer close(done)
s.wg.Wait()
}()
select {
case <-ctx.Done():
return ctx.Err()
case <-done:
return nil
}
}

func (s *PostHogSink) loop() {
defer s.wg.Done()
for ev := range s.ch {
s.send(ev)
}
}

func (s *PostHogSink) send(ev ports.TelemetryEvent) {
body := map[string]any{
"api_key": s.apiKey,
"event": ev.Name,
"distinct_id": s.distinctID,
"properties": s.properties(ev),
"timestamp": ev.OccurredAt.UTC().Format(time.RFC3339Nano),
}
payload, err := json.Marshal(body)
if err != nil {
s.log.Warn("telemetry posthog payload marshal failed", "name", ev.Name, "error", err)
return
}
req, err := http.NewRequestWithContext(context.Background(), http.MethodPost, s.host+"/capture/", bytes.NewReader(payload))
if err != nil {
s.log.Warn("telemetry posthog request build failed", "name", ev.Name, "error", err)
return
}
req.Header.Set("Content-Type", "application/json")

resp, err := s.client.Do(req)
if err != nil {
s.log.Warn("telemetry posthog export failed", "name", ev.Name, "error", err)
return
}
defer func() { _ = resp.Body.Close() }()
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
b, _ := io.ReadAll(io.LimitReader(resp.Body, 2048))
s.log.Warn("telemetry posthog rejected event", "name", ev.Name, "status", resp.StatusCode, "body", strings.TrimSpace(string(b)))
}
}

func (s *PostHogSink) properties(ev ports.TelemetryEvent) map[string]any {
props := map[string]any{
"source": ev.Source,
"level": string(ev.Level),
}
if ev.RequestID != "" {
props["request_id"] = ev.RequestID
}
if ev.ProjectID != nil {
props["project_id_hash"] = sha256String(string(*ev.ProjectID))
}
if ev.SessionID != nil {
props["session_id_hash"] = sha256String(string(*ev.SessionID))
}
for k, v := range ev.Payload {
props[k] = v
}
return props
}

func loadOrCreateInstallID(dataDir string) (string, error) {
path := filepath.Join(dataDir, "telemetry_install_id")
if b, err := os.ReadFile(path); err == nil {
if id := strings.TrimSpace(string(b)); id != "" {
return id, nil
}
} else if !os.IsNotExist(err) {
return "", fmt.Errorf("read telemetry install id: %w", err)
}
id := "ins_" + uuid.NewString()
if err := os.WriteFile(path, []byte(id+"\n"), 0o600); err != nil {
return "", fmt.Errorf("write telemetry install id: %w", err)
}
return id, nil
}

func sha256String(raw string) string {
sum := sha256.Sum256([]byte(raw))
return hex.EncodeToString(sum[:])
}

func telemetryLogger(log *slog.Logger) *slog.Logger {
if log != nil {
return log
}
return slog.Default()
}
Loading
Loading