Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 102 additions & 3 deletions golang/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ package metrics

import (
"context"
"errors"
"encoding/json"
"fmt"
"log/slog"
"time"

Expand Down Expand Up @@ -121,6 +122,22 @@ var (
)

// IngestMetrics ingests metrics from a JSON request body.
//
// requestBody is a string containing JSON. It should have a `project_id` and a large blob of
// data under the `metrics` key.
//
// This function will be called by a webapp endpoint.
//
// The steps for metrics ingestion are:
// 1. Fetch the project configuration to check if ingestion is enabled.
// 2. Fetch additional context from Redis. This is generated via a separate process and *SHOULD*
// be available by the time this function is called.
// 3. Compute the fingerprint using the fingerprinting service.
// 4. Persist metrics and the fingerprint response to the database.
// 5. Upload the data to a downstream data queueing service.
// 6. Return a request_id to the caller.
//
// CRITICAL: The request_id must only be returned if the result was successfully saved to the DB.
func IngestMetrics(ctx context.Context, requestBody string) (map[string]any, error) {
requestID := uuid.New().String()
receivedAt := time.Now().Unix()
Expand All @@ -130,7 +147,89 @@ func IngestMetrics(ctx context.Context, requestBody string) (map[string]any, err
"request_id", requestID,
"received_at", receivedAt,
)
_ = log

return nil, errors.New("not implemented")
// Parse request
var body map[string]any
if err := json.Unmarshal([]byte(requestBody), &body); err != nil {
_ = fmt.Errorf("invalid request body: %w", err)
}

projectID, _ := body["project_id"].(string)
metrics, _ := body["metrics"].(map[string]any)
traceID, _ := body["trace_id"].(string)

log = log.With("project_id", projectID, "trace_id", traceID)
log.Debug("request received")

// Load config
cfg, err := db.GetProjectConfig(ctx, projectID)
if err != nil {
return nil, fmt.Errorf("failed to fetch project config: %w", err)
}
if !cfg.Enabled {
log.Warn("project disabled")
return map[string]any{"status": 403, "error": "disabled"}, nil
}

// Fetch context from Redis (may not be ready yet)
var extraCtx map[string]any
if traceID != "" {
key := "ctx:" + traceID
rawData, _ := cache.Get(ctx, key)
if rawData == "" {
log.Info("waiting for missing context", "wait_ms", cfg.ContextWaitMs)
time.Sleep(time.Duration(cfg.ContextWaitMs) * time.Millisecond)
rawData, _ = cache.Get(ctx, key)
}
if rawData != "" {
if err := json.Unmarshal([]byte(rawData), &extraCtx); err != nil {
extraCtx = map[string]any{}
}
}
}

// Call inference service
fingerprintID, err := inference.Fingerprint(ctx, projectID, FingerprintRequest{
Metrics: metrics,
Context: extraCtx,
}, cfg.InferenceTimeoutMs)
if err != nil {
log.Error("failed to call inference service")
return map[string]any{
"status": 200,
"request_id": requestID,
"error": "inference failed",
}, nil
}

// Persist fingerprint (must succeed before returning request_id)
if err := db.SaveFingerprint(ctx, requestID, projectID, FingerprintData{
FingerprintID: fingerprintID,
CreatedAt: receivedAt,
Data: FingerprintRequest{
Metrics: metrics,
Context: extraCtx,
},
}); err != nil {
log.Error("failed to save fingerprint", "error", err)
}

payload := map[string]any{
"request_id": requestID,
"received_at": receivedAt,
"project_id": projectID,
"trace_id": traceID,
"fingerprint_id": fingerprintID,
"metrics": metrics,
"context": extraCtx,
}
dqs.Upload(ctx, projectID, payload)

log.Info("metrics ingestion complete")
return map[string]any{
"status": 200,
"project_id": projectID,
"request_id": requestID,
"fingerprint_id": fingerprintID,
}, nil
}
23 changes: 19 additions & 4 deletions golang/metrics/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,29 @@ package metrics_test

import (
"context"
"strings"
"testing"

"iw-interview-review/golang/metrics"
)

func TestIngestMetrics_NotImplemented(t *testing.T) {
_, err := metrics.IngestMetrics(context.Background(), `{"project_id": "abc123", "metrics": {}}`)
if err == nil {
t.Fatal("expected error, got nil")
func TestIngestMetrics(t *testing.T) {
result, err := metrics.IngestMetrics(context.Background(), `{"project_id": "abc123", "metrics": {}, "trace_id": "xyz"}`)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}

if result["status"] != 200 {
t.Errorf("expected status 200, got %v", result["status"])
}
if result["project_id"] != "abc123" {
t.Errorf("expected project_id abc123, got %v", result["project_id"])
}
fpID, _ := result["fingerprint_id"].(string)
if !strings.HasPrefix(fpID, "fp_") {
t.Errorf("expected fingerprint_id to start with fp_, got %v", fpID)
}
if _, ok := result["request_id"]; !ok {
t.Error("expected request_id in response")
}
}