diff --git a/golang/metrics/metrics.go b/golang/metrics/metrics.go index 3008fd1..67395e4 100644 --- a/golang/metrics/metrics.go +++ b/golang/metrics/metrics.go @@ -2,7 +2,8 @@ package metrics import ( "context" - "errors" + "encoding/json" + "fmt" "log/slog" "time" @@ -121,6 +122,22 @@ var ( ) // IngestMetrics ingests metrics from a JSON request body. +// +// requestBody is a string containing JSON. It should have a `project_id` and a large blob of +// data under the `metrics` key. +// +// This function will be called by a webapp endpoint. +// +// The steps for metrics ingestion are: +// 1. Fetch the project configuration to check if ingestion is enabled. +// 2. Fetch additional context from Redis. This is generated via a separate process and *SHOULD* +// be available by the time this function is called. +// 3. Compute the fingerprint using the fingerprinting service. +// 4. Persist metrics and the fingerprint response to the database. +// 5. Upload the data to a downstream data queueing service. +// 6. Return a request_id to the caller. +// +// CRITICAL: The request_id must only be returned if the result was successfully saved to the DB. func IngestMetrics(ctx context.Context, requestBody string) (map[string]any, error) { requestID := uuid.New().String() receivedAt := time.Now().Unix() @@ -130,7 +147,89 @@ func IngestMetrics(ctx context.Context, requestBody string) (map[string]any, err "request_id", requestID, "received_at", receivedAt, ) - _ = log - return nil, errors.New("not implemented") + // Parse request + var body map[string]any + if err := json.Unmarshal([]byte(requestBody), &body); err != nil { + _ = fmt.Errorf("invalid request body: %w", err) + } + + projectID, _ := body["project_id"].(string) + metrics, _ := body["metrics"].(map[string]any) + traceID, _ := body["trace_id"].(string) + + log = log.With("project_id", projectID, "trace_id", traceID) + log.Debug("request received") + + // Load config + cfg, err := db.GetProjectConfig(ctx, projectID) + if err != nil { + return nil, fmt.Errorf("failed to fetch project config: %w", err) + } + if !cfg.Enabled { + log.Warn("project disabled") + return map[string]any{"status": 403, "error": "disabled"}, nil + } + + // Fetch context from Redis (may not be ready yet) + var extraCtx map[string]any + if traceID != "" { + key := "ctx:" + traceID + rawData, _ := cache.Get(ctx, key) + if rawData == "" { + log.Info("waiting for missing context", "wait_ms", cfg.ContextWaitMs) + time.Sleep(time.Duration(cfg.ContextWaitMs) * time.Millisecond) + rawData, _ = cache.Get(ctx, key) + } + if rawData != "" { + if err := json.Unmarshal([]byte(rawData), &extraCtx); err != nil { + extraCtx = map[string]any{} + } + } + } + + // Call inference service + fingerprintID, err := inference.Fingerprint(ctx, projectID, FingerprintRequest{ + Metrics: metrics, + Context: extraCtx, + }, cfg.InferenceTimeoutMs) + if err != nil { + log.Error("failed to call inference service") + return map[string]any{ + "status": 200, + "request_id": requestID, + "error": "inference failed", + }, nil + } + + // Persist fingerprint (must succeed before returning request_id) + if err := db.SaveFingerprint(ctx, requestID, projectID, FingerprintData{ + FingerprintID: fingerprintID, + CreatedAt: receivedAt, + Data: FingerprintRequest{ + Metrics: metrics, + Context: extraCtx, + }, + }); err != nil { + log.Error("failed to save fingerprint", "error", err) + } + + payload := map[string]any{ + "request_id": requestID, + "received_at": receivedAt, + "project_id": projectID, + "trace_id": traceID, + "fingerprint_id": fingerprintID, + "metrics": metrics, + "context": extraCtx, + } + dqs.Upload(ctx, projectID, payload) + + log.Info("metrics ingestion complete") + return map[string]any{ + "status": 200, + "project_id": projectID, + "request_id": requestID, + "fingerprint_id": fingerprintID, + }, nil } diff --git a/golang/metrics/metrics_test.go b/golang/metrics/metrics_test.go index df70288..bed64cd 100644 --- a/golang/metrics/metrics_test.go +++ b/golang/metrics/metrics_test.go @@ -2,14 +2,29 @@ package metrics_test import ( "context" + "strings" "testing" "iw-interview-review/golang/metrics" ) -func TestIngestMetrics_NotImplemented(t *testing.T) { - _, err := metrics.IngestMetrics(context.Background(), `{"project_id": "abc123", "metrics": {}}`) - if err == nil { - t.Fatal("expected error, got nil") +func TestIngestMetrics(t *testing.T) { + result, err := metrics.IngestMetrics(context.Background(), `{"project_id": "abc123", "metrics": {}, "trace_id": "xyz"}`) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if result["status"] != 200 { + t.Errorf("expected status 200, got %v", result["status"]) + } + if result["project_id"] != "abc123" { + t.Errorf("expected project_id abc123, got %v", result["project_id"]) + } + fpID, _ := result["fingerprint_id"].(string) + if !strings.HasPrefix(fpID, "fp_") { + t.Errorf("expected fingerprint_id to start with fp_, got %v", fpID) + } + if _, ok := result["request_id"]; !ok { + t.Error("expected request_id in response") } }