Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 128 additions & 0 deletions cmd/elastickv-list-routes/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
// elastickv-list-routes prints the cluster's current route catalog as
// JSON. Used by the Composed-1 M5a Jepsen setup-hook verification to
// assert the launch script's --shardRanges actually placed the M5
// table-route keys on the expected Raft groups before any workload
// op runs.
//
// Per the design doc (docs/design/2026_06_02_proposed_composed1_m5_jepsen_route_shuffle.md
// §3.3), the Jepsen client's setup! shells out to this tool rather
// than re-implementing the gRPC client in Clojure: a JSON contract is
// stable across versions and a future ListRoutes schema change shows
// up as an unmarshal failure rather than as a silent mis-routing
// during a Jepsen run.
//
// Usage:
//
// elastickv-list-routes --address 127.0.0.1:50051
//
// Output (stdout, one JSON object):
//
// {
// "catalog_version": 7,
// "routes": [
// {"route_id": 100, "raft_group_id": 1, "start": "...", "end": "...", "state": "ROUTE_STATE_ACTIVE"},
// ...
// ]
// }
//
// `start` and `end` are base64-encoded raw bytes so any byte
// sequence (including unprintables in the routing keyspace) survives
// the JSON round-trip without quoting issues. Non-zero exit on any
// error so the Jepsen setup-hook sees the failure verbatim.
package main

import (
"context"
"encoding/base64"
"encoding/json"
"flag"
"fmt"
"io"
"os"
"time"

pb "github.com/bootjp/elastickv/proto"
"github.com/cockroachdb/errors"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)

const rpcTimeout = 10 * time.Second

var address = flag.String("address", "127.0.0.1:50051", "gRPC address of an elastickv server (any node works — the Distribution service is replicated)")

// routeJSON is the on-the-wire shape this tool emits. Field names
// are snake_case to match the proto's wire format; Start/End are
// base64-encoded so arbitrary byte ranges survive JSON.
type routeJSON struct {
RouteID uint64 `json:"route_id"`
RaftGroupID uint64 `json:"raft_group_id"`
Start string `json:"start"` // base64(raw bytes)
End string `json:"end"` // base64(raw bytes); empty == +infinity
State string `json:"state"`
}

type responseJSON struct {
CatalogVersion uint64 `json:"catalog_version"`
Routes []routeJSON `json:"routes"`
}

func main() {
flag.Parse()
if err := run(); err != nil {
fmt.Fprintf(os.Stderr, "elastickv-list-routes: %v\n", err)
os.Exit(1)
}
}

func run() error {
conn, err := grpc.NewClient(*address, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return errors.Wrapf(err, "dial %s", *address)
}
defer func() {
// Mirror cmd/elastickv-split's close pattern: surface close
// errors on stderr without failing the process — by this
// point the response has been printed.
if cerr := conn.Close(); cerr != nil {
fmt.Fprintf(os.Stderr, "elastickv-list-routes: close: %v\n", cerr)
}
}()

client := pb.NewDistributionClient(conn)
ctx, cancel := context.WithTimeout(context.Background(), rpcTimeout)
defer cancel()

resp, err := client.ListRoutes(ctx, &pb.ListRoutesRequest{})
if err != nil {
return errors.Wrap(err, "ListRoutes")
}
return emit(resp, os.Stdout)
}

// emit serialises resp as JSON to w. Extracted from run() so the
// encoding is testable with bytes.Buffer rather than a real
// temp file (gemini medium on PR #925). io.Writer is the
// idiomatic Go return shape for "I write structured output to
// something."
func emit(resp *pb.ListRoutesResponse, w io.Writer) error {
out := responseJSON{
CatalogVersion: resp.GetCatalogVersion(),
Routes: make([]routeJSON, 0, len(resp.GetRoutes())),
}
for _, r := range resp.GetRoutes() {
out.Routes = append(out.Routes, routeJSON{
RouteID: r.GetRouteId(),
RaftGroupID: r.GetRaftGroupId(),
Start: base64.StdEncoding.EncodeToString(r.GetStart()),
End: base64.StdEncoding.EncodeToString(r.GetEnd()),
State: r.GetState().String(),
})
}
enc := json.NewEncoder(w)
enc.SetIndent("", " ")
if err := enc.Encode(out); err != nil {
return errors.Wrap(err, "encode")
}
return nil
}
92 changes: 92 additions & 0 deletions cmd/elastickv-list-routes/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package main

import (
"bytes"
"encoding/base64"
"encoding/json"
"testing"

pb "github.com/bootjp/elastickv/proto"
"github.com/stretchr/testify/require"
)

// TestEmit_Empty checks the JSON shape when no routes exist.
// catalog_version is still emitted, routes is an empty array
// (not nil) so Clojure callers don't have to special-case
// nil-vs-empty.
func TestEmit_Empty(t *testing.T) {
var buf bytes.Buffer
require.NoError(t, emit(&pb.ListRoutesResponse{CatalogVersion: 7}, &buf))

var out struct {
CatalogVersion uint64 `json:"catalog_version"`
Routes []any `json:"routes"`
}
require.NoError(t, json.Unmarshal(buf.Bytes(), &out))
require.Equal(t, uint64(7), out.CatalogVersion)
require.NotNil(t, out.Routes)
require.Empty(t, out.Routes)
}

// TestEmit_RoundTripsRouteBytes pins the on-the-wire shape against
// the JSON Clojure will parse. Start/End are base64-encoded so any
// byte sequence survives — verified by round-tripping a routing-key
// shape that contains '|' (ASCII 124, outside the base64 alphabet).
func TestEmit_RoundTripsRouteBytes(t *testing.T) {
startBytes := []byte("!ddb|route|table|amVwc2VuX2FwcGVuZF90MQ")
endBytes := []byte("!ddb|route|table|amVwc2VuX2FwcGVuZF90Mw")

var buf bytes.Buffer
require.NoError(t, emit(&pb.ListRoutesResponse{
CatalogVersion: 3,
Routes: []*pb.RouteDescriptor{
{
RouteId: 100,
RaftGroupId: 1,
Start: startBytes,
End: endBytes,
State: pb.RouteState_ROUTE_STATE_ACTIVE,
},
},
}, &buf))

var out responseJSON
require.NoError(t, json.Unmarshal(buf.Bytes(), &out))

require.Equal(t, uint64(3), out.CatalogVersion)
require.Len(t, out.Routes, 1)
require.Equal(t, uint64(100), out.Routes[0].RouteID)
require.Equal(t, uint64(1), out.Routes[0].RaftGroupID)

// Round-trip the base64-encoded bytes — the load-bearing claim is
// that the Clojure caller decodes back to the exact bytes the
// server holds.
decodedStart, err := base64.StdEncoding.DecodeString(out.Routes[0].Start)
require.NoError(t, err)
require.Equal(t, startBytes, decodedStart)
decodedEnd, err := base64.StdEncoding.DecodeString(out.Routes[0].End)
require.NoError(t, err)
require.Equal(t, endBytes, decodedEnd)

// State serialises via proto's String() — verify the on-the-wire
// shape so a future enum-name change (e.g. stripped prefix to
// "ACTIVE") is caught here rather than silently parsed by the
// Clojure regex (claude[bot] low on PR #925).
require.Equal(t, "ROUTE_STATE_ACTIVE", out.Routes[0].State)
}

// TestEmit_EmptyEndDistinguishable verifies that an unset End
// (the +infinity sentinel) round-trips as an empty string rather
// than as a missing field — the Clojure setup-hook relies on this
// to detect the rightmost route in the catalog.
func TestEmit_EmptyEndDistinguishable(t *testing.T) {
var buf bytes.Buffer
require.NoError(t, emit(&pb.ListRoutesResponse{
Routes: []*pb.RouteDescriptor{
{RouteId: 1, RaftGroupId: 2, Start: []byte("x"), End: nil},
},
}, &buf))

require.Contains(t, buf.String(), `"end": ""`,
"empty End must serialise as an explicit empty string so Clojure can detect the +infinity boundary")
}
116 changes: 113 additions & 3 deletions jepsen/src/elastickv/dynamodb_multi_table_workload.clj
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@
maintained API and obscure the side-by-side comparability that is
the point of running both."
(:gen-class)
(:require [clojure.string :as str]
(:require [clojure.java.shell :as shell]
[clojure.string :as str]
[cognitect.aws.client.api :as aws]
[cognitect.aws.credentials :as creds]
[elastickv.cli :as cli]
Expand Down Expand Up @@ -279,6 +280,90 @@
(ddb-invoke! ddb :TransactWriteItems
{:TransactItems (into checks updates)})))

;; ---------------------------------------------------------------------------
;; M5a setup-hook verification (design doc §3.3)
;; ---------------------------------------------------------------------------

(def ^:private default-list-routes-bin
"Default path to the cmd/elastickv-list-routes Go helper. Tunable
via (:list-routes-bin opts) so the binary can sit anywhere on disk
when the launch script doesn't put it on PATH."
"elastickv-list-routes")

(def ^:private default-grpc-port
"Default elastickv server gRPC port. Combined with the test's
first node hostname (or 127.0.0.1 when --local) to form the
--address argument."
50051)

(defn- default-grpc-host-port-for
"Returns the default --address for elastickv-list-routes when the
test does NOT pass an explicit :grpc-host-port. Resolves to the
first node's hostname + default port — works in both local mode
(first node is typically 127.0.0.1 or 'n1') and distributed Jepsen
runs where database nodes live on separate hosts (gemini medium
on PR #925). Falls back to 127.0.0.1:50051 only when :nodes is
missing entirely."
[test]
(let [node (first (:nodes test))]
(if node
(str (name node) ":" default-grpc-port)
(str "127.0.0.1:" default-grpc-port))))

(defn- distinct-group-ids
"Parses elastickv-list-routes' JSON output and returns the set of
distinct raft_group_id values present. Uses a regex rather than
pulling in a JSON dependency: the CLI emits a stable JSON shape
tested in cmd/elastickv-list-routes/main_test.go, and this hook
only needs a coarse-grained 'how many groups own routes' check.
If a future ListRoutes change introduces a different field name,
the regex returns the empty set and the assertion below fails
loudly — strictly better than silently passing on an unexpected
shape."
[json-str]
(->> (re-seq #"\"raft_group_id\"\s*:\s*(\d+)" json-str)
(map (comp #(Long/parseLong %) second))
set))

(defn- verify-multi-group-routing!
"Asserts the cluster reports >=2 distinct Raft groups in its route
catalog. Shells out to cmd/elastickv-list-routes (the JSON
contract is stable; design doc §3.3). Throws ex-info on any
failure so the Jepsen setup-hook fails fast with a clear error
pointing the operator at the launch-script flag the cluster is
missing.

opts (read from the test map):
:list-routes-bin — absolute path to the CLI (default \"elastickv-list-routes\";
assumes PATH or matching launch-script PWD).
:grpc-host-port — --address arg to the CLI; defaults to the
first node's hostname + 50051 so distributed
Jepsen runs work without flag plumbing
(gemini medium on PR #925)."
[test]
(let [bin (or (:list-routes-bin test) default-list-routes-bin)
addr (or (:grpc-host-port test) (default-grpc-host-port-for test))
result (shell/sh bin "--address" addr)]
Comment on lines +344 to +346
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Hardcoding 127.0.0.1:50051 as the default gRPC address works fine for local testing, but will fail in a distributed Jepsen environment where the database nodes run on separate hosts. Consider dynamically resolving the default address using the first node in the test configuration.

  (let [bin    (or (:list-routes-bin test) default-list-routes-bin)
        addr   (or (:grpc-host-port test)
                   (if-let [node (first (:nodes test))]
                     (str (name node) ":50051")
                     default-grpc-host-port))
        result (shell/sh bin "--address" addr)]

(when-not (zero? (:exit result))
(throw (ex-info (str bin " --address " addr " failed: " (:err result))
{:exit (:exit result)
:stdout (:out result)
:stderr (:err result)})))
(let [groups (distinct-group-ids (:out result))]
(when (< (count groups) 2)
(throw (ex-info
(str "M5a multi-group routing precondition failed: only "
(count groups) " distinct Raft group(s) observed in the catalog "
"(expected >=2). Re-launch the cluster with both --raftGroups "
"AND --shardRanges (see scripts/run-jepsen-m5-local.sh) — "
"without both flags --shardRanges collapses every range into "
"the default group 1 and dispatchMultiShardTxn never fires.")
{:groups groups
:bin bin
:address addr
:raw-out (:out result)})))
groups)))

;; ---------------------------------------------------------------------------
;; Jepsen client
;; ---------------------------------------------------------------------------
Expand All @@ -291,7 +376,19 @@
host (or (:dynamo-host test) (name node))]
(assoc this :ddb (make-ddb-client host port))))

(setup! [this _test]
(setup! [_this test]
;; M5a setup-hook verification per design doc §3.3. Asserts the
;; launch script's --raftGroups / --shardRanges actually placed
;; the M5 table-route keys on >=2 distinct Raft groups before any
;; workload op runs. Fails fast with a clear error so the operator
;; knows the cluster needs to be relaunched — strictly better than
;; silently running the workload on a single-shard layout and
;; reporting "zero G1c" without ever exercising dispatchMultiShardTxn.
;;
;; jepsen.client/setup! is invoked exactly once per test (not
;; per-node like jepsen.db/setup!), so no first-node gating is
;; required.
(verify-multi-group-routing! test)
;; No DescribeTable poll loop is needed: elastickv's adapter
;; returns TableStatus=ACTIVE synchronously in the CreateTable
;; response (adapter/dynamodb.go:779-783), so the table is
Expand Down Expand Up @@ -442,6 +539,15 @@
{:name (or (:name opts) "elastickv-dynamodb-append-multi-table")
:nodes nodes
:db db
;; Setup-hook verification keys — read by
;; verify-multi-group-routing! at workload setup! time.
;; Threaded into the test map (not the workload map)
;; because jepsen.client/setup! receives the test, not
;; opts; the M5a launch script passes these via the
;; --list-routes-bin and --grpc-host-port CLI flags
;; defined in dynamo-cli-opts below.
:list-routes-bin (:list-routes-bin opts)
:grpc-host-port (:grpc-host-port opts)
:dynamo-host (:dynamo-host opts)
:os (if local? os/noop debian/os)
:net (if local? net/noop net/iptables)
Expand Down Expand Up @@ -481,7 +587,11 @@
:parse-fn #(Integer/parseInt %)]
[nil "--max-txn-length N" "Maximum number of micro-ops per transaction."
:default nil
:parse-fn #(Integer/parseInt %)]])
:parse-fn #(Integer/parseInt %)]
[nil "--list-routes-bin PATH" "Path to the cmd/elastickv-list-routes Go helper used by the workload's setup-hook verification. Defaults to bare-name 'elastickv-list-routes' (assumes PATH lookup); pass an absolute path when invoking from a launch script that builds the binary into a tmp dir."
:default nil]
[nil "--grpc-host-port HOST:PORT" "gRPC --address argument passed to elastickv-list-routes by the setup-hook verification. Default 127.0.0.1:50051 matches scripts/run-jepsen-m5-local.sh's PROC_ADDR."
:default nil]])

(defn- prepare-dynamo-opts
"Transform parsed CLI options into the map expected by
Expand Down
Loading
Loading