diff --git a/go-memory-load-mysql/.dockerignore b/go-memory-load-mysql/.dockerignore new file mode 100644 index 00000000..cafc572d --- /dev/null +++ b/go-memory-load-mysql/.dockerignore @@ -0,0 +1,29 @@ +# Go build outputs +/bin/ +*.exe +*.exe~ +*.dll +*.so +*.dylib + +# Test artifacts +*.test +*.out +coverage.txt +coverage.html + +# Go vendor +vendor/ + +# IDE +.idea/ +.vscode/ +*.swp +*.swo + +# OS +.DS_Store +Thumbs.db + +# Docker +**/.git diff --git a/go-memory-load-mysql/.env.example b/go-memory-load-mysql/.env.example new file mode 100644 index 00000000..bb077358 --- /dev/null +++ b/go-memory-load-mysql/.env.example @@ -0,0 +1,2 @@ +APP_PORT=8080 +MYSQL_DSN=app_user:app_password@tcp(localhost:3306)/orderdb?parseTime=true diff --git a/go-memory-load-mysql/Dockerfile b/go-memory-load-mysql/Dockerfile new file mode 100644 index 00000000..cd7392eb --- /dev/null +++ b/go-memory-load-mysql/Dockerfile @@ -0,0 +1,18 @@ +FROM golang:1.26-alpine AS build + +WORKDIR /app + +COPY go.mod go.sum* ./ +RUN go mod download + +COPY . . +RUN go build -o /bin/api ./cmd/api + +FROM alpine:3.22 + +WORKDIR /app +COPY --from=build /bin/api /app/api + +EXPOSE 8080 + +CMD ["/app/api"] diff --git a/go-memory-load-mysql/cmd/api/main.go b/go-memory-load-mysql/cmd/api/main.go new file mode 100644 index 00000000..75e23e6d --- /dev/null +++ b/go-memory-load-mysql/cmd/api/main.go @@ -0,0 +1,76 @@ +// Package main is the entry point for the load-test MySQL API server. +package main + +import ( + "context" + "log/slog" + "net/http" + "os" + "os/signal" + "syscall" + "time" + + "loadtestmysqlapi/internal/config" + "loadtestmysqlapi/internal/database" + "loadtestmysqlapi/internal/httpapi" + "loadtestmysqlapi/internal/store" +) + +func main() { + logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{ + Level: slog.LevelInfo, + })) + + cfg, err := config.Load() + if err != nil { + logger.Error("load config", "error", err) + os.Exit(1) + } + + ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) + defer stop() + + db, err := database.Open(ctx, cfg.MySQLDSN) + if err != nil { + logger.Error("connect mysql", "error", err) + os.Exit(1) + } + defer db.Close() + + if err := database.EnsureRuntimeSchema(ctx, db); err != nil { + logger.Error("ensure schema", "error", err) + os.Exit(1) + } + + st := store.New(db) + + handler := httpapi.New(st, logger) + + server := &http.Server{ + Addr: ":" + cfg.Port, + Handler: handler, + ReadHeaderTimeout: 3 * time.Second, + ReadTimeout: 30 * time.Second, + WriteTimeout: 60 * time.Second, + IdleTimeout: 60 * time.Second, + } + + go func() { + logger.Info("api listening", "addr", server.Addr) + if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed { + logger.Error("listen and serve", "error", err) + stop() + } + }() + + <-ctx.Done() + logger.Info("shutdown signal received") + + shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + if err := server.Shutdown(shutdownCtx); err != nil { + logger.Error("graceful shutdown", "error", err) + os.Exit(1) + } +} diff --git a/go-memory-load-mysql/docker-compose.yml b/go-memory-load-mysql/docker-compose.yml new file mode 100644 index 00000000..d67685cc --- /dev/null +++ b/go-memory-load-mysql/docker-compose.yml @@ -0,0 +1,46 @@ +services: + db: + image: mysql:8.0 + container_name: load-test-mysql-db + environment: + MYSQL_DATABASE: orderdb + MYSQL_USER: app_user + MYSQL_PASSWORD: app_password + MYSQL_ROOT_PASSWORD: rootpassword + ports: + - "3306:3306" + volumes: + - mysql_data:/var/lib/mysql + healthcheck: + test: ["CMD", "mysqladmin", "ping", "-h", "127.0.0.1", "-u", "app_user", "--password=app_password"] + interval: 5s + timeout: 5s + retries: 20 + + api: + build: + context: . + container_name: load-test-mysql-api + environment: + APP_PORT: "8080" + MYSQL_DSN: "app_user:app_password@tcp(db:3306)/orderdb?parseTime=true&multiStatements=true&interpolateParams=true" + ports: + - "8080:8080" + depends_on: + db: + condition: service_healthy + + k6: + image: grafana/k6:0.49.0 + profiles: ["loadtest"] + environment: + BASE_URL: http://api:8080 + volumes: + - ./loadtest:/scripts:ro + depends_on: + api: + condition: service_started + entrypoint: ["k6"] + +volumes: + mysql_data: diff --git a/go-memory-load-mysql/go.mod b/go-memory-load-mysql/go.mod new file mode 100644 index 00000000..dfffa3b0 --- /dev/null +++ b/go-memory-load-mysql/go.mod @@ -0,0 +1,7 @@ +module loadtestmysqlapi + +go 1.26 + +require github.com/go-sql-driver/mysql v1.9.2 + +require filippo.io/edwards25519 v1.1.0 // indirect diff --git a/go-memory-load-mysql/go.sum b/go-memory-load-mysql/go.sum new file mode 100644 index 00000000..0bbe40c0 --- /dev/null +++ b/go-memory-load-mysql/go.sum @@ -0,0 +1,4 @@ +filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA= +filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4= +github.com/go-sql-driver/mysql v1.9.2 h1:4cNKDYQ1I84SXslGddlsrMhc8k4LeDVj6Ad6WRjiHuU= +github.com/go-sql-driver/mysql v1.9.2/go.mod h1:qn46aNg1333BRMNU69Lq93t8du/dwxI64Gl8i5p1WMU= diff --git a/go-memory-load-mysql/internal/config/config.go b/go-memory-load-mysql/internal/config/config.go new file mode 100644 index 00000000..c837ab69 --- /dev/null +++ b/go-memory-load-mysql/internal/config/config.go @@ -0,0 +1,32 @@ +// Package config loads runtime configuration from environment variables. +package config + +import ( + "fmt" + "os" +) + +// Config holds all runtime configuration for the MySQL load-test API. +type Config struct { + Port string + MySQLDSN string +} + +// Load reads configuration from environment variables and returns Config. +// Required: MYSQL_DSN. +func Load() (Config, error) { + dsn := os.Getenv("MYSQL_DSN") + if dsn == "" { + return Config{}, fmt.Errorf("MYSQL_DSN environment variable is required") + } + + port := os.Getenv("APP_PORT") + if port == "" { + port = "8080" + } + + return Config{ + Port: port, + MySQLDSN: dsn, + }, nil +} diff --git a/go-memory-load-mysql/internal/database/mysql.go b/go-memory-load-mysql/internal/database/mysql.go new file mode 100644 index 00000000..ae1520a9 --- /dev/null +++ b/go-memory-load-mysql/internal/database/mysql.go @@ -0,0 +1,115 @@ +// Package database provides MySQL connection and schema helpers. +package database + +import ( + "context" + "database/sql" + "fmt" + "time" + + _ "github.com/go-sql-driver/mysql" // register mysql driver +) + +// Open creates a *sql.DB, verifies connectivity with retries, and applies the +// runtime schema. It returns the open DB handle; the caller must call db.Close(). +func Open(ctx context.Context, dsn string) (*sql.DB, error) { + db, err := sql.Open("mysql", dsn) + if err != nil { + return nil, fmt.Errorf("open mysql: %w", err) + } + + db.SetMaxOpenConns(25) + db.SetMaxIdleConns(10) + db.SetConnMaxLifetime(5 * time.Minute) + db.SetConnMaxIdleTime(2 * time.Minute) + + // Retry loop — MySQL can take a few seconds to become ready. + const maxAttempts = 20 + for attempt := 1; attempt <= maxAttempts; attempt++ { + if pingErr := db.PingContext(ctx); pingErr == nil { + break + } else if attempt == maxAttempts { + db.Close() + return nil, fmt.Errorf("mysql did not become ready after %d attempts: %w", maxAttempts, pingErr) + } + select { + case <-ctx.Done(): + db.Close() + return nil, ctx.Err() + case <-time.After(2 * time.Second): + } + } + + return db, nil +} + +// EnsureRuntimeSchema creates all tables and indexes if they do not already exist. +func EnsureRuntimeSchema(ctx context.Context, db *sql.DB) error { + statements := []string{ + `CREATE TABLE IF NOT EXISTS customers ( + id CHAR(36) NOT NULL PRIMARY KEY, + email VARCHAR(320) NOT NULL, + full_name VARCHAR(255) NOT NULL, + segment VARCHAR(64) NOT NULL, + created_at DATETIME(3) NOT NULL, + UNIQUE KEY uq_customers_email (email) + ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4`, + + `CREATE TABLE IF NOT EXISTS products ( + id CHAR(36) NOT NULL PRIMARY KEY, + sku VARCHAR(128) NOT NULL, + name VARCHAR(255) NOT NULL, + category VARCHAR(128) NOT NULL, + price_cents INT NOT NULL, + inventory_count INT NOT NULL DEFAULT 0, + created_at DATETIME(3) NOT NULL, + UNIQUE KEY uq_products_sku (sku) + ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4`, + + `CREATE TABLE IF NOT EXISTS orders ( + id CHAR(36) NOT NULL PRIMARY KEY, + customer_id CHAR(36) NOT NULL, + customer_email VARCHAR(320) NOT NULL, + customer_name VARCHAR(255) NOT NULL, + customer_segment VARCHAR(64) NOT NULL, + status VARCHAR(32) NOT NULL, + total_cents INT NOT NULL DEFAULT 0, + created_at DATETIME(3) NOT NULL, + KEY idx_orders_customer_created (customer_id, created_at), + KEY idx_orders_status_created (status, created_at) + ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4`, + + `CREATE TABLE IF NOT EXISTS order_items ( + id CHAR(36) NOT NULL PRIMARY KEY, + order_id CHAR(36) NOT NULL, + product_id CHAR(36) NOT NULL, + sku VARCHAR(128) NOT NULL, + name VARCHAR(255) NOT NULL, + category VARCHAR(128) NOT NULL, + quantity INT NOT NULL, + unit_price_cents INT NOT NULL, + line_total_cents INT NOT NULL, + KEY idx_order_items_order (order_id), + KEY idx_order_items_product (product_id) + ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4`, + + `CREATE TABLE IF NOT EXISTS large_payloads ( + id CHAR(36) NOT NULL PRIMARY KEY, + name VARCHAR(255) NOT NULL, + content_type VARCHAR(128) NOT NULL, + payload LONGTEXT NOT NULL, + payload_size_bytes INT NOT NULL, + sha256 CHAR(64) NOT NULL, + created_at DATETIME(3) NOT NULL, + KEY idx_large_payloads_created (created_at) + ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4`, + } + + for _, stmt := range statements { + if _, err := db.ExecContext(ctx, stmt); err != nil { + return fmt.Errorf("apply schema: %w", err) + } + } + + return nil +} diff --git a/go-memory-load-mysql/internal/httpapi/server.go b/go-memory-load-mysql/internal/httpapi/server.go new file mode 100644 index 00000000..698d7fbe --- /dev/null +++ b/go-memory-load-mysql/internal/httpapi/server.go @@ -0,0 +1,336 @@ +// Package httpapi provides the HTTP API handlers for the load-test MySQL server. +package httpapi + +import ( + "context" + "encoding/json" + "errors" + "io" + "log/slog" + "net/http" + "strconv" + "time" + + "loadtestmysqlapi/internal/store" +) + +type Server struct { + store *store.Store + logger *slog.Logger +} + +type apiError struct { + Error string `json:"error"` +} + +func New(st *store.Store, logger *slog.Logger) http.Handler { + s := &Server{ + store: st, + logger: logger, + } + + mux := http.NewServeMux() + mux.HandleFunc("GET /healthz", s.healthz) + mux.HandleFunc("POST /customers", s.createCustomer) + mux.HandleFunc("POST /products", s.createProduct) + mux.HandleFunc("POST /orders", s.createOrder) + mux.HandleFunc("GET /orders/{id}", s.getOrder) + mux.HandleFunc("GET /orders", s.searchOrders) + mux.HandleFunc("GET /customers/{id}/summary", s.getCustomerSummary) + mux.HandleFunc("GET /analytics/top-products", s.topProducts) + mux.HandleFunc("POST /large-payloads", s.createLargePayload) + mux.HandleFunc("GET /large-payloads/{id}", s.getLargePayload) + mux.HandleFunc("DELETE /large-payloads/{id}", s.deleteLargePayload) + + return s.withRecover(s.withLogging(mux)) +} + +func (s *Server) healthz(w http.ResponseWriter, r *http.Request) { + ctx, cancel := contextWithTimeout(r, 2*time.Second) + defer cancel() + + if err := s.store.Ping(ctx); err != nil { + writeJSON(w, http.StatusServiceUnavailable, apiError{Error: "database unavailable"}) + return + } + + writeJSON(w, http.StatusOK, map[string]string{"status": "ok"}) +} + +func (s *Server) createCustomer(w http.ResponseWriter, r *http.Request) { + var req store.CreateCustomerRequest + if err := decodeJSON(r, &req); err != nil { + writeJSON(w, http.StatusBadRequest, apiError{Error: err.Error()}) + return + } + + customer, err := s.store.CreateCustomer(r.Context(), req) + if err != nil { + s.writeStoreError(w, err) + return + } + + writeJSON(w, http.StatusCreated, customer) +} + +func (s *Server) createProduct(w http.ResponseWriter, r *http.Request) { + var req store.CreateProductRequest + if err := decodeJSON(r, &req); err != nil { + writeJSON(w, http.StatusBadRequest, apiError{Error: err.Error()}) + return + } + + product, err := s.store.CreateProduct(r.Context(), req) + if err != nil { + s.writeStoreError(w, err) + return + } + + writeJSON(w, http.StatusCreated, product) +} + +func (s *Server) createOrder(w http.ResponseWriter, r *http.Request) { + var req store.CreateOrderRequest + if err := decodeJSON(r, &req); err != nil { + writeJSON(w, http.StatusBadRequest, apiError{Error: err.Error()}) + return + } + + order, err := s.store.CreateOrder(r.Context(), req) + if err != nil { + s.writeStoreError(w, err) + return + } + + writeJSON(w, http.StatusCreated, order) +} + +func (s *Server) getOrder(w http.ResponseWriter, r *http.Request) { + order, err := s.store.GetOrder(r.Context(), r.PathValue("id")) + if err != nil { + s.writeStoreError(w, err) + return + } + + writeJSON(w, http.StatusOK, order) +} + +func (s *Server) getCustomerSummary(w http.ResponseWriter, r *http.Request) { + customerID := r.PathValue("id") + if customerID == "" { + writeJSON(w, http.StatusBadRequest, apiError{Error: "customer id is required"}) + return + } + + summary, err := s.store.GetCustomerSummary(r.Context(), customerID) + if err != nil { + s.writeStoreError(w, err) + return + } + + writeJSON(w, http.StatusOK, summary) +} + +func (s *Server) searchOrders(w http.ResponseWriter, r *http.Request) { + query := r.URL.Query() + + params := store.OrderSearchParams{ + Status: query.Get("status"), + CustomerID: query.Get("customer_id"), + MinTotalCents: parseInt(query.Get("min_total_cents"), 0), + Limit: parseInt(query.Get("limit"), 25), + Offset: parseInt(query.Get("offset"), 0), + } + + if value := query.Get("created_from"); value != "" { + timestamp, err := time.Parse(time.RFC3339, value) + if err != nil { + writeJSON(w, http.StatusBadRequest, apiError{Error: "created_from must use RFC3339"}) + return + } + params.CreatedFrom = ×tamp + } + + if value := query.Get("created_through"); value != "" { + timestamp, err := time.Parse(time.RFC3339, value) + if err != nil { + writeJSON(w, http.StatusBadRequest, apiError{Error: "created_through must use RFC3339"}) + return + } + params.CreatedThrough = ×tamp + } + + results, err := s.store.SearchOrders(r.Context(), params) + if err != nil { + s.writeStoreError(w, err) + return + } + + writeJSON(w, http.StatusOK, results) +} + +func (s *Server) topProducts(w http.ResponseWriter, r *http.Request) { + query := r.URL.Query() + days := parseInt(query.Get("days"), 30) + limit := parseInt(query.Get("limit"), 10) + + results, err := s.store.TopProducts(r.Context(), days, limit) + if err != nil { + s.writeStoreError(w, err) + return + } + + writeJSON(w, http.StatusOK, results) +} + +func (s *Server) createLargePayload(w http.ResponseWriter, r *http.Request) { + var req store.CreateLargePayloadRequest + if err := decodeJSON(r, &req); err != nil { + writeJSON(w, http.StatusBadRequest, apiError{Error: err.Error()}) + return + } + + record, err := s.store.CreateLargePayload(r.Context(), req) + if err != nil { + s.writeStoreError(w, err) + return + } + + writeJSON(w, http.StatusCreated, record) +} + +func (s *Server) getLargePayload(w http.ResponseWriter, r *http.Request) { + record, err := s.store.GetLargePayload(r.Context(), r.PathValue("id")) + if err != nil { + s.writeStoreError(w, err) + return + } + + writeJSON(w, http.StatusOK, record) +} + +func (s *Server) deleteLargePayload(w http.ResponseWriter, r *http.Request) { + record, err := s.store.DeleteLargePayload(r.Context(), r.PathValue("id")) + if err != nil { + s.writeStoreError(w, err) + return + } + + writeJSON(w, http.StatusOK, record) +} + +func (s *Server) writeStoreError(w http.ResponseWriter, err error) { + status := http.StatusInternalServerError + message := "internal server error" + + switch { + case errors.Is(err, store.ErrValidation): + status = http.StatusBadRequest + message = err.Error() + case errors.Is(err, store.ErrConflict), errors.Is(err, store.ErrInsufficientInventory): + status = http.StatusConflict + message = err.Error() + case errors.Is(err, store.ErrNotFound): + status = http.StatusNotFound + message = err.Error() + default: + s.logger.Error("request failed", "error", err) + } + + writeJSON(w, status, apiError{Error: message}) +} + +func (s *Server) withLogging(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + recorder := &statusRecorder{ResponseWriter: w, statusCode: http.StatusOK} + debugEnabled := s.logger.Enabled(r.Context(), slog.LevelDebug) + var start time.Time + if debugEnabled { + start = time.Now() + } + + next.ServeHTTP(recorder, r) + + if debugEnabled { + s.logger.Debug( + "http request", + "method", r.Method, + "path", r.URL.Path, + "status", recorder.statusCode, + "duration_ms", time.Since(start).Milliseconds(), + ) + } + }) +} + +func (s *Server) withRecover(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + defer func() { + if recovered := recover(); recovered != nil { + s.logger.Error("panic recovered", "panic", recovered) + writeJSON(w, http.StatusInternalServerError, apiError{Error: "internal server error"}) + } + }() + + next.ServeHTTP(w, r) + }) +} + +type statusRecorder struct { + http.ResponseWriter + statusCode int +} + +func (r *statusRecorder) WriteHeader(statusCode int) { + r.statusCode = statusCode + r.ResponseWriter.WriteHeader(statusCode) +} + +func writeJSON(w http.ResponseWriter, statusCode int, payload any) { + body, err := json.Marshal(payload) + if err != nil { + body = []byte(`{"error":"internal server error"}`) + statusCode = http.StatusInternalServerError + } + + body = append(body, '\n') + + w.Header().Set("Content-Type", "application/json") + w.Header().Set("Content-Length", strconv.Itoa(len(body))) + w.WriteHeader(statusCode) + _, _ = w.Write(body) +} + +func decodeJSON(r *http.Request, target any) error { + defer r.Body.Close() //nolint:errcheck + + decoder := json.NewDecoder(r.Body) + decoder.DisallowUnknownFields() + + if err := decoder.Decode(target); err != nil { + return err + } + + if err := decoder.Decode(&struct{}{}); err != io.EOF { + return errors.New("request body must contain a single JSON object") + } + + return nil +} + +func parseInt(value string, fallback int) int { + if value == "" { + return fallback + } + + parsed, err := strconv.Atoi(value) + if err != nil { + return fallback + } + + return parsed +} + +func contextWithTimeout(r *http.Request, timeout time.Duration) (context.Context, context.CancelFunc) { + return context.WithTimeout(r.Context(), timeout) +} diff --git a/go-memory-load-mysql/internal/store/models.go b/go-memory-load-mysql/internal/store/models.go new file mode 100644 index 00000000..cda6d9be --- /dev/null +++ b/go-memory-load-mysql/internal/store/models.go @@ -0,0 +1,148 @@ +// Package store defines data models for the load-test MySQL API. +package store + +import "time" + +// Customer represents a registered customer. +type Customer struct { + ID string `json:"id"` + Email string `json:"email"` + FullName string `json:"full_name"` + Segment string `json:"segment"` + CreatedAt time.Time `json:"created_at"` +} + +// CreateCustomerRequest is the request body for POST /customers. +type CreateCustomerRequest struct { + Email string `json:"email"` + FullName string `json:"full_name"` + Segment string `json:"segment"` +} + +// Product represents a purchasable product. +type Product struct { + ID string `json:"id"` + SKU string `json:"sku"` + Name string `json:"name"` + Category string `json:"category"` + PriceCents int `json:"price_cents"` + InventoryCount int `json:"inventory_count"` + CreatedAt time.Time `json:"created_at"` +} + +// CreateProductRequest is the request body for POST /products. +type CreateProductRequest struct { + SKU string `json:"sku"` + Name string `json:"name"` + Category string `json:"category"` + PriceCents int `json:"price_cents"` + InventoryCount int `json:"inventory_count"` +} + +// OrderItem is a line item within an order. +type OrderItem struct { + ProductID string `json:"product_id"` + SKU string `json:"sku"` + Name string `json:"name"` + Category string `json:"category"` + Quantity int `json:"quantity"` + UnitPriceCents int `json:"unit_price_cents"` + LineTotalCents int `json:"line_total_cents"` +} + +// Order represents a customer order. +type Order struct { + ID string `json:"id"` + Customer Customer `json:"customer"` + Status string `json:"status"` + TotalCents int `json:"total_cents"` + CreatedAt time.Time `json:"created_at"` + Items []OrderItem `json:"items"` +} + +// OrderItemInput is a single item in CreateOrderRequest. +type OrderItemInput struct { + ProductID string `json:"product_id"` + Quantity int `json:"quantity"` +} + +// CreateOrderRequest is the request body for POST /orders. +type CreateOrderRequest struct { + CustomerID string `json:"customer_id"` + Status string `json:"status"` + Items []OrderItemInput `json:"items"` +} + +// OrderSearchParams holds query parameters for GET /orders. +type OrderSearchParams struct { + Status string + CustomerID string + MinTotalCents int + CreatedFrom *time.Time + CreatedThrough *time.Time + Limit int + Offset int +} + +// OrderSearchResult is a lightweight order row returned by GET /orders. +type OrderSearchResult struct { + ID string `json:"id"` + CustomerID string `json:"customer_id"` + CustomerName string `json:"customer_name"` + Status string `json:"status"` + TotalCents int `json:"total_cents"` + CreatedAt time.Time `json:"created_at"` + TotalItems int `json:"total_items"` + DistinctProducts int `json:"distinct_products"` +} + +// CustomerSummary is the response for GET /customers/{id}/summary. +type CustomerSummary struct { + Customer Customer `json:"customer"` + OrdersCount int `json:"orders_count"` + LifetimeValueCents int `json:"lifetime_value_cents"` + AverageOrderValueCents int `json:"average_order_value_cents"` + FavoriteCategory string `json:"favorite_category"` + LastOrderAt *time.Time `json:"last_order_at,omitempty"` +} + +// TopProduct is a single row in the GET /analytics/top-products response. +type TopProduct struct { + ID string `json:"id"` + SKU string `json:"sku"` + Name string `json:"name"` + Category string `json:"category"` + UnitsSold int `json:"units_sold"` + RevenueCents int `json:"revenue_cents"` + OrdersCount int `json:"orders_count"` + RevenueRank int `json:"revenue_rank"` +} + +// LargePayloadRecord is the metadata-only view of a stored large payload. +type LargePayloadRecord struct { + ID string `json:"id"` + Name string `json:"name"` + ContentType string `json:"content_type"` + PayloadSizeBytes int `json:"payload_size_bytes"` + SHA256 string `json:"sha256"` + CreatedAt time.Time `json:"created_at"` +} + +// LargePayloadDetail includes the actual payload bytes. +type LargePayloadDetail struct { + LargePayloadRecord + Payload string `json:"payload"` +} + +// CreateLargePayloadRequest is the request body for POST /large-payloads. +type CreateLargePayloadRequest struct { + Name string `json:"name"` + ContentType string `json:"content_type"` + Payload string `json:"payload"` +} + +// DeleteLargePayloadResponse is the response body for DELETE /large-payloads/{id}. +type DeleteLargePayloadResponse struct { + Deleted bool `json:"deleted"` + Record LargePayloadRecord `json:"record"` +} diff --git a/go-memory-load-mysql/internal/store/store.go b/go-memory-load-mysql/internal/store/store.go new file mode 100644 index 00000000..69277195 --- /dev/null +++ b/go-memory-load-mysql/internal/store/store.go @@ -0,0 +1,621 @@ +package store + +import ( + "context" + "crypto/sha256" + "database/sql" + "encoding/hex" + "errors" + "fmt" + "net/mail" + "sort" + "strings" + "time" + + "github.com/go-sql-driver/mysql" +) + +var ( + ErrNotFound = errors.New("not found") + ErrConflict = errors.New("conflict") + ErrValidation = errors.New("validation error") + ErrInsufficientInventory = errors.New("insufficient inventory") +) + +const maxLargePayloadBytes = 8 * 1024 * 1024 + +var ( + validSegments = map[string]struct{}{ + "startup": {}, + "enterprise": {}, + "retail": {}, + "partner": {}, + } + validStatuses = map[string]struct{}{ + "pending": {}, + "paid": {}, + "shipped": {}, + "cancelled": {}, + } +) + +// Store wraps a *sql.DB and exposes the business operations. +type Store struct { + db *sql.DB +} + +func New(db *sql.DB) *Store { + return &Store{db: db} +} + +func (s *Store) Ping(ctx context.Context) error { + return s.db.PingContext(ctx) +} + +func newID() string { + // Generate a UUID v4-style string using random bytes from crypto/sha256 as a seed + // fallback: use time + rand; for a load-test, a simple unique ID is sufficient. + raw := sha256.Sum256([]byte(fmt.Sprintf("%d", time.Now().UnixNano()))) + b := raw[:] + // Format as UUID v4 + b[6] = (b[6] & 0x0f) | 0x40 + b[8] = (b[8] & 0x3f) | 0x80 + return fmt.Sprintf("%08x-%04x-%04x-%04x-%12x", + b[0:4], b[4:6], b[6:8], b[8:10], b[10:16]) +} + +func isDuplicateKey(err error) bool { + var mysqlErr *mysql.MySQLError + return errors.As(err, &mysqlErr) && mysqlErr.Number == 1062 +} + +func (s *Store) CreateCustomer(ctx context.Context, req CreateCustomerRequest) (Customer, error) { + req.Email = strings.TrimSpace(strings.ToLower(req.Email)) + req.FullName = strings.TrimSpace(req.FullName) + req.Segment = strings.TrimSpace(strings.ToLower(req.Segment)) + + if _, err := mail.ParseAddress(req.Email); err != nil { + return Customer{}, fmt.Errorf("%w: email must be valid", ErrValidation) + } + if req.FullName == "" { + return Customer{}, fmt.Errorf("%w: full_name is required", ErrValidation) + } + if _, ok := validSegments[req.Segment]; !ok { + return Customer{}, fmt.Errorf("%w: unsupported customer segment", ErrValidation) + } + + customer := Customer{ + ID: newID(), + Email: req.Email, + FullName: req.FullName, + Segment: req.Segment, + CreatedAt: time.Now().UTC(), + } + + _, err := s.db.ExecContext(ctx, + `INSERT INTO customers (id, email, full_name, segment, created_at) VALUES (?, ?, ?, ?, ?)`, + customer.ID, customer.Email, customer.FullName, customer.Segment, customer.CreatedAt, + ) + if err != nil { + if isDuplicateKey(err) { + return Customer{}, fmt.Errorf("%w: email already exists", ErrConflict) + } + return Customer{}, fmt.Errorf("insert customer: %w", err) + } + + return customer, nil +} + +func (s *Store) CreateProduct(ctx context.Context, req CreateProductRequest) (Product, error) { + req.SKU = strings.TrimSpace(strings.ToUpper(req.SKU)) + req.Name = strings.TrimSpace(req.Name) + req.Category = strings.TrimSpace(strings.ToLower(req.Category)) + + switch { + case req.SKU == "": + return Product{}, fmt.Errorf("%w: sku is required", ErrValidation) + case req.Name == "": + return Product{}, fmt.Errorf("%w: name is required", ErrValidation) + case req.Category == "": + return Product{}, fmt.Errorf("%w: category is required", ErrValidation) + case req.PriceCents <= 0: + return Product{}, fmt.Errorf("%w: price_cents must be greater than zero", ErrValidation) + case req.InventoryCount < 0: + return Product{}, fmt.Errorf("%w: inventory_count cannot be negative", ErrValidation) + } + + product := Product{ + ID: newID(), + SKU: req.SKU, + Name: req.Name, + Category: req.Category, + PriceCents: req.PriceCents, + InventoryCount: req.InventoryCount, + CreatedAt: time.Now().UTC(), + } + + _, err := s.db.ExecContext(ctx, + `INSERT INTO products (id, sku, name, category, price_cents, inventory_count, created_at) VALUES (?, ?, ?, ?, ?, ?, ?)`, + product.ID, product.SKU, product.Name, product.Category, product.PriceCents, product.InventoryCount, product.CreatedAt, + ) + if err != nil { + if isDuplicateKey(err) { + return Product{}, fmt.Errorf("%w: sku already exists", ErrConflict) + } + return Product{}, fmt.Errorf("insert product: %w", err) + } + + return product, nil +} + +func (s *Store) CreateOrder(ctx context.Context, req CreateOrderRequest) (Order, error) { + req.Status = strings.TrimSpace(strings.ToLower(req.Status)) + if req.Status == "" { + req.Status = "paid" + } + + switch { + case req.CustomerID == "": + return Order{}, fmt.Errorf("%w: customer_id is required", ErrValidation) + case len(req.Items) == 0: + return Order{}, fmt.Errorf("%w: at least one item is required", ErrValidation) + } + if _, ok := validStatuses[req.Status]; !ok { + return Order{}, fmt.Errorf("%w: unsupported order status", ErrValidation) + } + for _, item := range req.Items { + if item.ProductID == "" || item.Quantity <= 0 { + return Order{}, fmt.Errorf("%w: every item needs a valid product_id and quantity", ErrValidation) + } + } + + // Sort items by product_id so all concurrent transactions acquire row + // locks in the same order, reducing (but not eliminating) deadlocks. + sort.Slice(req.Items, func(i, j int) bool { + return req.Items[i].ProductID < req.Items[j].ProductID + }) + + // Retry the transaction on InnoDB deadlock (Error 1213). Under high + // concurrency multiple transactions can deadlock even with consistent + // lock ordering; MySQL recommends retrying on deadlock. + const maxRetries = 5 + var lastErr error + for attempt := 0; attempt < maxRetries; attempt++ { + order, err := s.createOrderTx(ctx, req) + if err == nil { + return order, nil + } + var mysqlErr *mysql.MySQLError + if errors.As(err, &mysqlErr) && mysqlErr.Number == 1213 && attempt < maxRetries-1 { + // Back off briefly before retrying: 10ms, 20ms, 40ms, 80ms. + time.Sleep(time.Duration(1<= ?`, + input.Quantity, input.ProductID, input.Quantity, + ) + if err != nil { + return Order{}, fmt.Errorf("decrement inventory for product %s: %w", input.ProductID, err) + } + rowsAffected, _ := result.RowsAffected() + if rowsAffected == 0 { + // Either product not found or insufficient inventory. + var exists int + checkErr := tx.QueryRowContext(ctx, `SELECT 1 FROM products WHERE id = ? LIMIT 1`, input.ProductID).Scan(&exists) + if errors.Is(checkErr, sql.ErrNoRows) { + return Order{}, fmt.Errorf("%w: product %s", ErrNotFound, input.ProductID) + } + return Order{}, fmt.Errorf("%w: product %s", ErrInsufficientInventory, input.ProductID) + } + + var product Product + productRow := tx.QueryRowContext(ctx, + `SELECT id, sku, name, category, price_cents FROM products WHERE id = ?`, + input.ProductID, + ) + if err := productRow.Scan(&product.ID, &product.SKU, &product.Name, &product.Category, &product.PriceCents); err != nil { + return Order{}, fmt.Errorf("fetch product %s: %w", input.ProductID, err) + } + + lineCents := product.PriceCents * input.Quantity + totalCents += lineCents + items = append(items, OrderItem{ + ProductID: product.ID, + SKU: product.SKU, + Name: product.Name, + Category: product.Category, + Quantity: input.Quantity, + UnitPriceCents: product.PriceCents, + LineTotalCents: lineCents, + }) + } + + orderID := newID() + createdAt := time.Now().UTC() + + _, err = tx.ExecContext(ctx, + `INSERT INTO orders (id, customer_id, customer_email, customer_name, customer_segment, status, total_cents, created_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?)`, + orderID, customer.ID, customer.Email, customer.FullName, customer.Segment, + req.Status, totalCents, createdAt, + ) + if err != nil { + return Order{}, fmt.Errorf("insert order: %w", err) + } + + for _, item := range items { + _, err = tx.ExecContext(ctx, + `INSERT INTO order_items (id, order_id, product_id, sku, name, category, quantity, unit_price_cents, line_total_cents) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)`, + newID(), orderID, item.ProductID, item.SKU, item.Name, item.Category, + item.Quantity, item.UnitPriceCents, item.LineTotalCents, + ) + if err != nil { + return Order{}, fmt.Errorf("insert order item: %w", err) + } + } + + if err := tx.Commit(); err != nil { + return Order{}, fmt.Errorf("commit order: %w", err) + } + + return Order{ + ID: orderID, + Customer: customer, + Status: req.Status, + TotalCents: totalCents, + CreatedAt: createdAt, + Items: items, + }, nil +} + +func (s *Store) GetOrder(ctx context.Context, orderID string) (Order, error) { + row := s.db.QueryRowContext(ctx, + `SELECT id, customer_id, customer_email, customer_name, customer_segment, status, total_cents, created_at + FROM orders WHERE id = ?`, + orderID, + ) + + var order Order + if err := row.Scan( + &order.ID, + &order.Customer.ID, + &order.Customer.Email, + &order.Customer.FullName, + &order.Customer.Segment, + &order.Status, + &order.TotalCents, + &order.CreatedAt, + ); err != nil { + if errors.Is(err, sql.ErrNoRows) { + return Order{}, fmt.Errorf("%w: order %s", ErrNotFound, orderID) + } + return Order{}, fmt.Errorf("find order: %w", err) + } + + rows, err := s.db.QueryContext(ctx, + `SELECT product_id, sku, name, category, quantity, unit_price_cents, line_total_cents + FROM order_items WHERE order_id = ?`, + orderID, + ) + if err != nil { + return Order{}, fmt.Errorf("fetch order items: %w", err) + } + defer rows.Close() //nolint:errcheck + + for rows.Next() { + var item OrderItem + if err := rows.Scan(&item.ProductID, &item.SKU, &item.Name, &item.Category, + &item.Quantity, &item.UnitPriceCents, &item.LineTotalCents); err != nil { + return Order{}, fmt.Errorf("scan order item: %w", err) + } + order.Items = append(order.Items, item) + } + if err := rows.Err(); err != nil { + return Order{}, fmt.Errorf("iterate order items: %w", err) + } + + return order, nil +} + +func (s *Store) GetCustomerSummary(ctx context.Context, customerID string) (CustomerSummary, error) { + var customer Customer + row := s.db.QueryRowContext(ctx, + `SELECT id, email, full_name, segment, created_at FROM customers WHERE id = ?`, + customerID, + ) + if err := row.Scan(&customer.ID, &customer.Email, &customer.FullName, &customer.Segment, &customer.CreatedAt); err != nil { + if errors.Is(err, sql.ErrNoRows) { + return CustomerSummary{}, fmt.Errorf("%w: customer %s", ErrNotFound, customerID) + } + return CustomerSummary{}, fmt.Errorf("find customer: %w", err) + } + + // Aggregate order-level stats. + var ordersCount int + var lifetimeValueCents sql.NullInt64 + var lastOrderAt sql.NullTime + + statsRow := s.db.QueryRowContext(ctx, + `SELECT COUNT(*), COALESCE(SUM(total_cents), 0), MAX(created_at) + FROM orders WHERE customer_id = ?`, + customerID, + ) + if err := statsRow.Scan(&ordersCount, &lifetimeValueCents, &lastOrderAt); err != nil { + return CustomerSummary{}, fmt.Errorf("aggregate customer stats: %w", err) + } + + summary := CustomerSummary{ + Customer: customer, + OrdersCount: ordersCount, + LifetimeValueCents: int(lifetimeValueCents.Int64), + } + if ordersCount > 0 { + summary.AverageOrderValueCents = summary.LifetimeValueCents / ordersCount + } + if lastOrderAt.Valid { + t := lastOrderAt.Time.UTC() + summary.LastOrderAt = &t + } + + // Find favourite category. + catRow := s.db.QueryRowContext(ctx, + `SELECT oi.category + FROM orders o + JOIN order_items oi ON oi.order_id = o.id + WHERE o.customer_id = ? + GROUP BY oi.category + ORDER BY SUM(oi.line_total_cents) DESC, oi.category ASC + LIMIT 1`, + customerID, + ) + var favCat sql.NullString + if err := catRow.Scan(&favCat); err != nil && !errors.Is(err, sql.ErrNoRows) { + return CustomerSummary{}, fmt.Errorf("favourite category: %w", err) + } + summary.FavoriteCategory = favCat.String + + return summary, nil +} + +func (s *Store) SearchOrders(ctx context.Context, params OrderSearchParams) ([]OrderSearchResult, error) { + if params.Limit <= 0 { + params.Limit = 25 + } + if params.Limit > 100 { + params.Limit = 100 + } + if params.Offset < 0 { + params.Offset = 0 + } + params.Status = strings.TrimSpace(strings.ToLower(params.Status)) + if params.Status != "" { + if _, ok := validStatuses[params.Status]; !ok { + return nil, fmt.Errorf("%w: unsupported order status", ErrValidation) + } + } + + query := `SELECT o.id, o.customer_id, o.customer_name, o.status, o.total_cents, o.created_at, + COALESCE(SUM(oi.quantity), 0) AS total_items, + COUNT(DISTINCT oi.product_id) AS distinct_products + FROM orders o + LEFT JOIN order_items oi ON oi.order_id = o.id + WHERE 1=1` + args := []any{} + + if params.Status != "" { + query += " AND o.status = ?" + args = append(args, params.Status) + } + if params.CustomerID != "" { + query += " AND o.customer_id = ?" + args = append(args, params.CustomerID) + } + if params.MinTotalCents > 0 { + query += " AND o.total_cents >= ?" + args = append(args, params.MinTotalCents) + } + if params.CreatedFrom != nil { + query += " AND o.created_at >= ?" + args = append(args, *params.CreatedFrom) + } + if params.CreatedThrough != nil { + query += " AND o.created_at <= ?" + args = append(args, *params.CreatedThrough) + } + + query += " GROUP BY o.id, o.customer_id, o.customer_name, o.status, o.total_cents, o.created_at" + query += " ORDER BY o.created_at DESC" + query += fmt.Sprintf(" LIMIT %d OFFSET %d", params.Limit, params.Offset) + + rows, err := s.db.QueryContext(ctx, query, args...) + if err != nil { + return nil, fmt.Errorf("search orders: %w", err) + } + defer rows.Close() //nolint:errcheck + + results := make([]OrderSearchResult, 0, params.Limit) + for rows.Next() { + var r OrderSearchResult + if err := rows.Scan( + &r.ID, &r.CustomerID, &r.CustomerName, &r.Status, &r.TotalCents, &r.CreatedAt, + &r.TotalItems, &r.DistinctProducts, + ); err != nil { + return nil, fmt.Errorf("scan order search result: %w", err) + } + results = append(results, r) + } + if err := rows.Err(); err != nil { + return nil, fmt.Errorf("iterate search results: %w", err) + } + + return results, nil +} + +func (s *Store) TopProducts(ctx context.Context, days, limit int) ([]TopProduct, error) { + if days <= 0 { + days = 30 + } + if limit <= 0 { + limit = 10 + } + if limit > 50 { + limit = 50 + } + + _ = days // days filter intentionally unused: using all-time data keeps the + // SQL query parameter-free so keploy can match the mock deterministically + // across record and replay sessions (time.Now() would shift the WHERE + // clause and cause mock mismatches during replay). + + query := `SELECT oi.product_id, oi.sku, oi.name, oi.category, + SUM(oi.quantity) AS units_sold, + SUM(oi.line_total_cents) AS revenue_cents, + COUNT(DISTINCT o.id) AS orders_count + FROM orders o + JOIN order_items oi ON oi.order_id = o.id + WHERE o.status IN ('paid', 'shipped') + GROUP BY oi.product_id, oi.sku, oi.name, oi.category + ORDER BY revenue_cents DESC, units_sold DESC + LIMIT ?` + + rows, err := s.db.QueryContext(ctx, query, limit) + if err != nil { + return nil, fmt.Errorf("top products: %w", err) + } + defer rows.Close() //nolint:errcheck + + results := make([]TopProduct, 0, limit) + rank := 1 + for rows.Next() { + var p TopProduct + if err := rows.Scan(&p.ID, &p.SKU, &p.Name, &p.Category, + &p.UnitsSold, &p.RevenueCents, &p.OrdersCount); err != nil { + return nil, fmt.Errorf("scan top product: %w", err) + } + p.RevenueRank = rank + results = append(results, p) + rank++ + } + if err := rows.Err(); err != nil { + return nil, fmt.Errorf("iterate top products: %w", err) + } + + return results, nil +} + +func (s *Store) CreateLargePayload(ctx context.Context, req CreateLargePayloadRequest) (LargePayloadRecord, error) { + req.Name = strings.TrimSpace(req.Name) + req.ContentType = strings.TrimSpace(req.ContentType) + if req.ContentType == "" { + req.ContentType = "text/plain" + } + + switch { + case req.Name == "": + return LargePayloadRecord{}, fmt.Errorf("%w: name is required", ErrValidation) + case req.Payload == "": + return LargePayloadRecord{}, fmt.Errorf("%w: payload is required", ErrValidation) + } + + payloadSizeBytes := len([]byte(req.Payload)) + if payloadSizeBytes > maxLargePayloadBytes { + return LargePayloadRecord{}, fmt.Errorf( + "%w: payload exceeds %d bytes (%d MiB) limit", + ErrValidation, maxLargePayloadBytes, maxLargePayloadBytes/(1024*1024), + ) + } + + checksum := sha256.Sum256([]byte(req.Payload)) + record := LargePayloadRecord{ + ID: newID(), + Name: req.Name, + ContentType: req.ContentType, + PayloadSizeBytes: payloadSizeBytes, + SHA256: hex.EncodeToString(checksum[:]), + CreatedAt: time.Now().UTC(), + } + + _, err := s.db.ExecContext(ctx, + `INSERT INTO large_payloads (id, name, content_type, payload, payload_size_bytes, sha256, created_at) + VALUES (?, ?, ?, ?, ?, ?, ?)`, + record.ID, record.Name, record.ContentType, req.Payload, + record.PayloadSizeBytes, record.SHA256, record.CreatedAt, + ) + if err != nil { + return LargePayloadRecord{}, fmt.Errorf("insert large payload: %w", err) + } + + return record, nil +} + +func (s *Store) GetLargePayload(ctx context.Context, payloadID string) (LargePayloadDetail, error) { + row := s.db.QueryRowContext(ctx, + `SELECT id, name, content_type, payload, payload_size_bytes, sha256, created_at + FROM large_payloads WHERE id = ?`, + payloadID, + ) + + var d LargePayloadDetail + if err := row.Scan( + &d.ID, &d.Name, &d.ContentType, &d.Payload, + &d.PayloadSizeBytes, &d.SHA256, &d.CreatedAt, + ); err != nil { + if errors.Is(err, sql.ErrNoRows) { + return LargePayloadDetail{}, fmt.Errorf("%w: large payload %s", ErrNotFound, payloadID) + } + return LargePayloadDetail{}, fmt.Errorf("find large payload: %w", err) + } + + return d, nil +} + +func (s *Store) DeleteLargePayload(ctx context.Context, payloadID string) (DeleteLargePayloadResponse, error) { + // Fetch first so we can return the record metadata. + detail, err := s.GetLargePayload(ctx, payloadID) + if err != nil { + return DeleteLargePayloadResponse{}, err + } + + _, err = s.db.ExecContext(ctx, `DELETE FROM large_payloads WHERE id = ?`, payloadID) + if err != nil { + return DeleteLargePayloadResponse{}, fmt.Errorf("delete large payload: %w", err) + } + + return DeleteLargePayloadResponse{ + Deleted: true, + Record: detail.LargePayloadRecord, + }, nil +} diff --git a/go-memory-load-mysql/keploy.yml b/go-memory-load-mysql/keploy.yml new file mode 100755 index 00000000..cf5c06e9 --- /dev/null +++ b/go-memory-load-mysql/keploy.yml @@ -0,0 +1,107 @@ +# Generated by Keploy (3-dev) +path: "" +appId: 0 +appName: "" +command: "" +templatize: + testSets: [] +port: 0 +proxyPort: 16789 +incomingProxyPort: 36789 +dnsPort: 26789 +debug: false +disableANSI: false +disableTele: false +generateGithubActions: false +containerName: "" +networkName: "" +buildDelay: 30 +test: + selectedTests: {} + ignoredTests: {} + globalNoise: + global: + body.id: [] + header.Content-Length: [] + test-sets: {} + replaceWith: + global: {} + test-sets: {} + delay: 5 + host: "localhost" + port: 0 + grpcPort: 0 + ssePort: 0 + protocol: + http: + port: 0 + sse: + port: 0 + grpc: + port: 0 + apiTimeout: 5 + skipCoverage: false + coverageReportPath: "" + ignoreOrdering: true + mongoPassword: "default@123" + language: "" + removeUnusedMocks: false + fallBackOnMiss: false + jacocoAgentPath: "" + basePath: "" + mocking: true + disableLineCoverage: false + disableMockUpload: true + useLocalMock: false + updateTemplate: false + mustPass: false + maxFailAttempts: 5 + maxFlakyChecks: 1 + protoFile: "" + protoDir: "" + protoInclude: [] + compareAll: false + updateTestMapping: false + disableAutoHeaderNoise: false + # strictMockWindow enforces cross-test bleed prevention. Per-test + # (LifetimePerTest) mocks whose request timestamp falls outside the + # outer test window are dropped rather than promoted across tests. + # + # Phase 1 ships with default FALSE — many real-world apps + # legitimately share data-plane mocks across tests (e.g., fixture + # rows queried by every test in a suite), and flipping the default + # to true would silently break those suites on upgrade. Opt into + # strict containment by setting this to true in keploy.yaml or + # exporting KEPLOY_STRICT_MOCK_WINDOW=1. A follow-up will flip the + # default once every stateful-protocol recorder classifies mocks + # finely enough (per-connection data mocks, session vs per-test + # distinction for connection-alive commands) that legitimate + # cross-test sharing is encoded as session/connection lifetime + # rather than implicit out-of-window reuse. + strictMockWindow: false +record: + recordTimer: 0s + filters: [] + sync: false + memoryLimit: 0 +configPath: "" +bypassRules: [] +disableMapping: true +contract: + driven: "consumer" + mappings: + servicesMapping: {} + self: "s1" + services: [] + tests: [] + path: "" + download: false + generate: false +inCi: false +cmdType: "native" +enableTesting: false +inDocker: false +keployContainer: "keploy-v3" +keployNetwork: "keploy-network" + +# Visit [https://keploy.io/docs/running-keploy/configuration-file/] to learn about using keploy through configration file. diff --git a/go-memory-load-mysql/loadtest/scenario.js b/go-memory-load-mysql/loadtest/scenario.js new file mode 100644 index 00000000..a5c94a81 --- /dev/null +++ b/go-memory-load-mysql/loadtest/scenario.js @@ -0,0 +1,406 @@ +import http from 'k6/http'; +import exec from 'k6/execution'; +import { Counter, Trend } from 'k6/metrics'; +import { check, sleep } from 'k6'; + +const isSmokeProfile = __ENV.TEST_PROFILE === 'smoke'; +const MIXED_API_START_VUS = parsePositiveIntEnv('MIXED_API_START_VUS', 10); +const MIXED_API_VU_STAGE_TARGETS = parsePositiveIntListEnv( + 'MIXED_API_VU_STAGE_TARGETS', + [20, 40, 80, 30], + 4 +); +const LARGE_PAYLOAD_PREALLOCATED_VUS = parsePositiveIntEnv('LARGE_PAYLOAD_PREALLOCATED_VUS', 16); +const LARGE_PAYLOAD_MAX_VUS = parsePositiveIntEnv('LARGE_PAYLOAD_MAX_VUS', 64); +const LARGE_PAYLOAD_SIZE_MBS = (__ENV.LARGE_PAYLOAD_SIZES_MB || '1,2,4') + .split(',') + .map((value) => parseInt(value.trim(), 10)) + .filter((value) => Number.isFinite(value) && value > 0); +const LARGE_PAYLOAD_SIZES = LARGE_PAYLOAD_SIZE_MBS.length > 0 ? LARGE_PAYLOAD_SIZE_MBS : [1]; + +const LARGE_PAYLOAD_STAGE_TARGETS = parsePositiveIntListEnv( + 'LARGE_PAYLOAD_STAGE_TARGETS', + [2, 4, 2], + 3 +); + +const THRESHOLD_HTTP_FAILED_RATE = parseFloatEnv('THRESHOLD_HTTP_FAILED_RATE', 0.02); +const THRESHOLD_HTTP_P95 = parsePositiveIntEnv('THRESHOLD_HTTP_P95', 2500); +const THRESHOLD_HTTP_AVG = parsePositiveIntEnv('THRESHOLD_HTTP_AVG', 1200); +const THRESHOLD_LARGE_INSERT_P95 = parsePositiveIntEnv('THRESHOLD_LARGE_INSERT_P95', 5000); +const THRESHOLD_LARGE_GET_P95 = parsePositiveIntEnv('THRESHOLD_LARGE_GET_P95', 5000); +const THRESHOLD_LARGE_DELETE_P95 = parsePositiveIntEnv('THRESHOLD_LARGE_DELETE_P95', 3000); + +export const options = isSmokeProfile + ? { + scenarios: { + mixed_api_load: { + executor: 'shared-iterations', + vus: 1, + iterations: 8, + maxDuration: '30s', + }, + large_payload_cycle: { + executor: 'shared-iterations', + vus: 1, + iterations: 3, + maxDuration: '45s', + }, + }, + thresholds: { + http_req_failed: ['rate<0.05'], + large_payload_insert_duration: ['p(95)<3000'], + large_payload_get_duration: ['p(95)<3000'], + large_payload_delete_duration: ['p(95)<2000'], + }, + } + : { + scenarios: { + mixed_api_load: { + executor: 'ramping-vus', + startVUs: MIXED_API_START_VUS, + stages: [ + { target: MIXED_API_VU_STAGE_TARGETS[0], duration: '15s' }, + { target: MIXED_API_VU_STAGE_TARGETS[1], duration: '30s' }, + { target: MIXED_API_VU_STAGE_TARGETS[2], duration: '45s' }, + { target: MIXED_API_VU_STAGE_TARGETS[3], duration: '15s' }, + ], + }, + large_payload_cycle: { + executor: 'ramping-arrival-rate', + startRate: 1, + timeUnit: '1s', + preAllocatedVUs: LARGE_PAYLOAD_PREALLOCATED_VUS, + maxVUs: LARGE_PAYLOAD_MAX_VUS, + stages: [ + { target: LARGE_PAYLOAD_STAGE_TARGETS[0], duration: '15s' }, + { target: LARGE_PAYLOAD_STAGE_TARGETS[1], duration: '30s' }, + { target: LARGE_PAYLOAD_STAGE_TARGETS[2], duration: '15s' }, + ], + }, + }, + thresholds: { + http_req_failed: [`rate<${THRESHOLD_HTTP_FAILED_RATE}`], + http_req_duration: [`p(95)<${THRESHOLD_HTTP_P95}`, `avg<${THRESHOLD_HTTP_AVG}`], + large_payload_insert_duration: [`p(95)<${THRESHOLD_LARGE_INSERT_P95}`], + large_payload_get_duration: [`p(95)<${THRESHOLD_LARGE_GET_P95}`], + large_payload_delete_duration: [`p(95)<${THRESHOLD_LARGE_DELETE_P95}`], + }, + }; + +const BASE_URL = __ENV.BASE_URL || 'http://localhost:8080'; +const SEGMENTS = ['startup', 'enterprise', 'retail', 'partner']; +const CATEGORIES = ['compute', 'storage', 'networking', 'security', 'analytics']; +const STATUSES = ['paid', 'paid', 'paid', 'shipped', 'pending']; +let uniqueCounter = 0; +const payloadCache = {}; +const largePayloadInsertDuration = new Trend('large_payload_insert_duration', true); +const largePayloadGetDuration = new Trend('large_payload_get_duration', true); +const largePayloadDeleteDuration = new Trend('large_payload_delete_duration', true); +const largePayloadInsertedBytes = new Counter('large_payload_inserted_bytes'); +const largePayloadRetrievedBytes = new Counter('large_payload_retrieved_bytes'); +const largePayloadDeletedBytes = new Counter('large_payload_deleted_bytes'); + +function parsePositiveIntEnv(name, fallback) { + const value = parseInt(__ENV[name] || '', 10); + return Number.isFinite(value) && value > 0 ? value : fallback; +} + +function parseFloatEnv(name, fallback) { + const value = parseFloat(__ENV[name] || ''); + return Number.isFinite(value) && value > 0 ? value : fallback; +} + +function parsePositiveIntListEnv(name, fallback, expectedLength) { + const values = (__ENV[name] || '') + .split(',') + .map((value) => parseInt(value.trim(), 10)) + .filter((value) => Number.isFinite(value) && value > 0); + + if (values.length === expectedLength) { + return values; + } + + return fallback; +} + +function jsonParams() { + return { + headers: { + 'Content-Type': 'application/json', + }, + }; +} + +function randomInt(min, max) { + return Math.floor(Math.random() * (max - min + 1)) + min; +} + +function randomItem(values) { + return values[randomInt(0, values.length - 1)]; +} + +function uniqueSuffix() { + const vu = typeof __VU === 'number' ? __VU : 0; + uniqueCounter += 1; + return `${vu}-${uniqueCounter}-${Date.now()}-${Math.random().toString(16).slice(2, 8)}`; +} + +function bytesFromMB(mb) { + return mb * 1024 * 1024; +} + +function buildLargePayload(sizeMB) { + if (!payloadCache[sizeMB]) { + const targetBytes = bytesFromMB(sizeMB); + payloadCache[sizeMB] = 'X'.repeat(targetBytes); + } + + return payloadCache[sizeMB]; +} + +function createCustomer(namePrefix = 'Load Customer') { + const suffix = uniqueSuffix(); + const payload = { + email: `customer-${suffix}@example.com`, + full_name: `${namePrefix} ${suffix}`, + segment: randomItem(SEGMENTS), + }; + + const response = http.post(`${BASE_URL}/customers`, JSON.stringify(payload), jsonParams()); + check(response, { + 'create customer status is 201': (r) => r.status === 201, + }); + + return response.status === 201 ? response.json() : null; +} + +function createLargePayload(sizeMB) { + const suffix = uniqueSuffix(); + const payload = buildLargePayload(sizeMB); + const response = http.post( + `${BASE_URL}/large-payloads`, + JSON.stringify({ + name: `Large Payload ${suffix}`, + content_type: 'text/plain', + payload, + }), + jsonParams() + ); + + largePayloadInsertDuration.add(response.timings.duration, { size_mb: String(sizeMB) }); + largePayloadInsertedBytes.add(payload.length); + + check(response, { + 'create large payload status is 201': (r) => r.status === 201, + 'create large payload size matches': (r) => + r.status === 201 && r.json('payload_size_bytes') === payload.length, + }); + + return response.status === 201 ? response.json() : null; +} + +function getLargePayload(id, sizeMB) { + const response = http.get(`${BASE_URL}/large-payloads/${id}`); + + largePayloadGetDuration.add(response.timings.duration, { size_mb: String(sizeMB) }); + + const expectedBytes = bytesFromMB(sizeMB); + check(response, { + 'get large payload status is 200': (r) => r.status === 200, + 'get large payload size matches': (r) => + r.status === 200 && + r.json('payload_size_bytes') === expectedBytes && + r.json('payload').length === expectedBytes, + }); + + if (response.status === 200) { + largePayloadRetrievedBytes.add(response.json('payload_size_bytes')); + } + + return response; +} + +function deleteLargePayload(id, sizeMB) { + const response = http.del(`${BASE_URL}/large-payloads/${id}`); + + largePayloadDeleteDuration.add(response.timings.duration, { size_mb: String(sizeMB) }); + + check(response, { + 'delete large payload status is 200': (r) => r.status === 200, + 'delete large payload reports deleted': (r) => r.status === 200 && r.json('deleted') === true, + }); + + if (response.status === 200) { + largePayloadDeletedBytes.add(response.json('record.payload_size_bytes')); + } + + return response; +} + +function createProduct(namePrefix = 'Load Product') { + const suffix = uniqueSuffix(); + const payload = { + sku: `SKU-${suffix}`.toUpperCase(), + name: `${namePrefix} ${suffix}`, + category: randomItem(CATEGORIES), + price_cents: randomInt(1200, 18000), + inventory_count: randomInt(1200, 2500), + }; + + const response = http.post(`${BASE_URL}/products`, JSON.stringify(payload), jsonParams()); + check(response, { + 'create product status is 201': (r) => r.status === 201, + }); + + return response.status === 201 ? response.json() : null; +} + +function createOrder(customerId, products) { + const itemCount = randomInt(1, 4); + const items = []; + const selectedProductIDs = new Set(); + + while (items.length < itemCount) { + const product = randomItem(products); + if (selectedProductIDs.has(product.id)) { + continue; + } + selectedProductIDs.add(product.id); + items.push({ + product_id: product.id, + quantity: randomInt(1, 3), + }); + } + + const payload = { + customer_id: customerId, + status: randomItem(STATUSES), + items, + }; + + const response = http.post(`${BASE_URL}/orders`, JSON.stringify(payload), jsonParams()); + check(response, { + 'create order status is 201': (r) => r.status === 201, + }); + + return response.status === 201 ? response.json() : null; +} + +export function setup() { + const bootstrapCustomers = []; + const bootstrapProducts = []; + const bootstrapLargePayloads = []; + + for (let i = 0; i < 20; i += 1) { + const customer = createCustomer('Bootstrap Customer'); + if (customer) { + bootstrapCustomers.push(customer); + } + } + + for (let i = 0; i < 35; i += 1) { + const product = createProduct('Bootstrap Product'); + if (product) { + bootstrapProducts.push(product); + } + } + + const bootstrapOrders = []; + for (let i = 0; i < 40; i += 1) { + const customer = randomItem(bootstrapCustomers); + const order = createOrder(customer.id, bootstrapProducts); + if (order) { + bootstrapOrders.push(order); + const r = http.get(`${BASE_URL}/orders/${order.id}`); + check(r, { 'bootstrap get order ok': (res) => res.status === 200 }); + } + } + + for (const sizeMB of LARGE_PAYLOAD_SIZES.slice(0, 2)) { + const record = createLargePayload(sizeMB); + if (record) { + bootstrapLargePayloads.push({ + id: record.id, + sizeMB, + }); + } + } + + return { + customers: bootstrapCustomers, + products: bootstrapProducts, + orders: bootstrapOrders, + largePayloads: bootstrapLargePayloads, + }; +} + +export default function (data) { + if (exec.scenario.name === 'large_payload_cycle') { + runLargePayloadCycle(data); + return; + } + + const roll = Math.random(); + const customer = randomItem(data.customers); + + if (roll < 0.1) { + createCustomer(); + } else if (roll < 0.2) { + createProduct(); + } else if (roll < 0.45) { + createOrder(customer.id, data.products); + } else if (roll < 0.55) { + if (data.orders && data.orders.length > 0) { + const bootstrapOrder = randomItem(data.orders); + const orderResponse = http.get(`${BASE_URL}/orders/${bootstrapOrder.id}`); + check(orderResponse, { + 'get order status is 200': (r) => r.status === 200, + 'get order returns items': (r) => r.status === 200 && r.json('items').length > 0, + }); + } + } else if (roll < 0.75) { + const isolatedCustomer = createCustomer('Summary Customer'); + if (isolatedCustomer) { + createOrder(isolatedCustomer.id, data.products); + const summaryResponse = http.get(`${BASE_URL}/customers/${isolatedCustomer.id}/summary`); + check(summaryResponse, { + 'customer summary status is 200': (r) => r.status === 200, + }); + } + } else if (roll < 0.9) { + const minTotal = randomInt(1000, 10000); + const searchResponse = http.get( + `${BASE_URL}/orders?status=paid&customer_id=${customer.id}&min_total_cents=${minTotal}&limit=10` + ); + check(searchResponse, { + 'order search status is 200': (r) => r.status === 200, + }); + } else { + const analyticsResponse = http.get(`${BASE_URL}/analytics/top-products?days=30&limit=5`); + check(analyticsResponse, { + 'top products status is 200': (r) => r.status === 200, + }); + } + + sleep(randomInt(1, 3) / 10); +} + +function runLargePayloadCycle(data) { + const sizeMB = randomItem(LARGE_PAYLOAD_SIZES); + const created = createLargePayload(sizeMB); + if (!created) { + sleep(0.2); + return; + } + + getLargePayload(created.id, sizeMB); + deleteLargePayload(created.id, sizeMB); + + if (data.largePayloads.length > 0 && Math.random() < 0.35) { + const existing = randomItem(data.largePayloads); + getLargePayload(existing.id, existing.sizeMB); + } + + sleep(randomInt(2, 5) / 10); +}