From f7f49d3fee80eaf18cffe1bbb92c2d401b389caf Mon Sep 17 00:00:00 2001 From: Hemanth Krishna Date: Tue, 3 Feb 2026 02:27:10 -0500 Subject: [PATCH] Add tpstatsjson nodetool subcommand for deterministic JSON thread-pool stats MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Provides a machine-parseable alternative to "tpstats -F json" with alphabetically sorted keys at every nesting level, making the output safe to diff in CI without post-processing. Two-tier error handling (section-level + per-metric) ensures a single bad MBean never silences the rest of the output; the "N/A" sentinel from unregistered MBeans is normalised to JSON null rather than treated as an error. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- .../tools/nodetool/NodetoolCommand.java | 1 + .../cassandra/tools/nodetool/TpStatsJson.java | 198 +++++++++++++ test/resources/nodetool/help/tpstatsjson | 28 ++ .../tools/nodetool/TpStatsJsonTest.java | 261 ++++++++++++++++++ 4 files changed, 488 insertions(+) create mode 100644 src/java/org/apache/cassandra/tools/nodetool/TpStatsJson.java create mode 100644 test/resources/nodetool/help/tpstatsjson create mode 100644 test/unit/org/apache/cassandra/tools/nodetool/TpStatsJsonTest.java diff --git a/src/java/org/apache/cassandra/tools/nodetool/NodetoolCommand.java b/src/java/org/apache/cassandra/tools/nodetool/NodetoolCommand.java index 33292b24831c..ba41fc2f371d 100644 --- a/src/java/org/apache/cassandra/tools/nodetool/NodetoolCommand.java +++ b/src/java/org/apache/cassandra/tools/nodetool/NodetoolCommand.java @@ -208,6 +208,7 @@ TableStats.class, TopPartitions.class, TpStats.class, + TpStatsJson.class, CompressionDictionaryCommandGroup.class, TruncateHints.class, UpdateCIDRGroup.class, diff --git a/src/java/org/apache/cassandra/tools/nodetool/TpStatsJson.java b/src/java/org/apache/cassandra/tools/nodetool/TpStatsJson.java new file mode 100644 index 000000000000..755453a9d646 --- /dev/null +++ b/src/java/org/apache/cassandra/tools/nodetool/TpStatsJson.java @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.tools.nodetool; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import com.fasterxml.jackson.databind.ObjectWriter; +import com.fasterxml.jackson.databind.SerializationFeature; + +import org.apache.cassandra.tools.NodeProbe; +import org.apache.cassandra.utils.JsonUtils; + +import picocli.CommandLine.Command; + +/** + * Prints thread-pool and dropped-message statistics as a single JSON object + * to standard output. Every key at every nesting level is sorted + * alphabetically, making the output safe to {@code diff} in CI without + * post-processing. + * + *

Output format — pretty-printed, not compact

+ *

+ * Pretty-printing is chosen deliberately. The primary consumer of this + * command is CI regression checks in which the output of two runs is diffed. + * With pretty JSON each value lives on its own line; a single counter + * changing from {@code 5} to {@code 6} produces a one-line diff. Compact + * (single-line) JSON collapses the entire document into one line, turning + * every change into a full-document replacement and making diffs + * uninformative. Jackson's {@code DefaultPrettyPrinter} is deterministic + * across platforms: consistent two-space indentation, no trailing + * whitespace, and platform-independent line endings when written to a + * {@code PrintStream}. Users who need compact output for transport or + * storage can pipe through {@code jq -c}. + *

+ * + *

Key ordering

+ *

+ * {@link SerializationFeature#ORDER_MAP_ENTRIES_BY_KEYS} is enabled on a + * command-local {@link ObjectWriter}. It recursively sorts every + * {@code Map<String, …>} during serialization. The shared + * {@link JsonUtils#JSON_OBJECT_MAPPER} is never mutated. + *

+ * + *

Error handling — two tiers, nothing silent

+ *

+ * The two data sections ({@code thread_pools} and {@code dropped_messages}) + * are fetched independently so that a failure in one does not suppress the + * other. Within {@code thread_pools} each individual metric is fetched + * inside its own try-block so that one bad gauge does not kill the whole + * pool entry. Any failure is recorded — never silently swallowed — in a + * top-level {@code errors} object that maps section names to a list of + * human-readable messages. The {@code errors} key appears in the output + * only when at least one failure actually occurred; a clean run produces no + * trace of error-handling machinery. + *

+ * + *

N/A normalisation

+ *

+ * {@link NodeProbe#getThreadPoolMetric} returns the string {@code "N/A"} + * when the backing MBean is not registered. This sentinel is converted to + * JSON {@code null} so that downstream parsers see a typed absence rather + * than a magic string. This is data normalisation, not an error; it does + * not populate the {@code errors} map. + *

+ */ +@Command(name = "tpstatsjson", + description = "Print thread pool and dropped message statistics as deterministic JSON") +public class TpStatsJson extends AbstractCommand +{ + // Pretty-print + sorted keys. Rationale for pretty-printing is in the + // class javadoc above. ORDER_MAP_ENTRIES_BY_KEYS recurses into nested + // maps automatically; no TreeMap is required anywhere in the code. + // Package-private so that the test suite can serialise controlled maps + // through the exact same pipeline the command uses. + static final ObjectWriter WRITER = JsonUtils.JSON_OBJECT_MAPPER + .writerWithDefaultPrettyPrinter() + .with(SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS); + + // Metric definitions: each row is { json_field_name, JMX_metric_name }. + // The array order here is kept logical for source readers; the output + // key order is entirely determined by the ObjectWriter and is always + // alphabetical regardless of iteration order. + private static final String[][] POOL_METRICS = + { + { "active", "ActiveTasks" }, + { "pending", "PendingTasks" }, + { "completed", "CompletedTasks" }, + { "blocked", "CurrentlyBlockedTasks" }, + { "all_time_blocked", "TotalBlockedTasks" } + }; + + @Override + public void execute(NodeProbe probe) + { + Map root = new HashMap<>(); + Map> errors = new HashMap<>(); + + // ── thread_pools ────────────────────────────────────────────────── + // Outer try: enumerating pools via JMX. + // Inner try (per metric): reading a single gauge/counter. + // Both levels record failures explicitly; neither swallows them. + Map pools = new HashMap<>(); + try + { + for (Map.Entry tp : probe.getThreadPools().entries()) + { + String poolName = tp.getValue(); + Map pool = new HashMap<>(); + + for (String[] m : POOL_METRICS) + { + try + { + pool.put(m[0], normalizeMetricValue( + probe.getThreadPoolMetric(tp.getKey(), poolName, m[1]))); + } + catch (Exception e) + { + // Value set to null so the pool entry stays structurally + // complete (five keys, always). The failure is recorded + // in errors so nothing is silently lost. + pool.put(m[0], null); + errors.computeIfAbsent("thread_pools", k -> new ArrayList<>()) + .add(poolName + "." + m[0] + ": " + e.getMessage()); + } + } + pools.put(poolName, pool); + } + } + catch (Exception e) + { + // getThreadPools() itself failed — no pool data is available. + errors.computeIfAbsent("thread_pools", k -> new ArrayList<>()) + .add("failed to enumerate pools: " + e.getMessage()); + } + root.put("thread_pools", pools); + + // ── dropped_messages ────────────────────────────────────────────── + try + { + root.put("dropped_messages", probe.getDroppedMessages()); + } + catch (Exception e) + { + root.put("dropped_messages", Collections.emptyMap()); + errors.computeIfAbsent("dropped_messages", k -> new ArrayList<>()) + .add(e.getMessage()); + } + + // ── errors (present only when at least one failure occurred) ────── + if (!errors.isEmpty()) + root.put("errors", errors); + + // ── serialise ───────────────────────────────────────────────────── + try + { + probe.output().out.println(WRITER.writeValueAsString(root)); + } + catch (IOException e) + { + throw new RuntimeException("failed to serialise tpstatsjson output", e); + } + } + + /** + * Normalises a raw value returned by {@link NodeProbe#getThreadPoolMetric}. + * The sentinel string {@code "N/A"} (emitted when the backing MBean is not + * registered) is converted to {@code null}; all other values — numeric or + * otherwise — pass through unchanged. + * + * Package-private so the test suite can exercise the normalisation contract + * directly without spinning up a JMX server. + */ + static Object normalizeMetricValue(Object raw) + { + return "N/A".equals(raw) ? null : raw; + } +} diff --git a/test/resources/nodetool/help/tpstatsjson b/test/resources/nodetool/help/tpstatsjson new file mode 100644 index 000000000000..bbe6b1dd728e --- /dev/null +++ b/test/resources/nodetool/help/tpstatsjson @@ -0,0 +1,28 @@ +NAME + nodetool tpstatsjson - Print thread pool and dropped message statistics + as deterministic JSON + +SYNOPSIS + nodetool [(-h | --host )] [(-p | --port )] + [(-pp | --print-port)] [(-pw | --password )] + [(-pwf | --password-file )] + [(-u | --username )] tpstatsjson + +OPTIONS + -h , --host + Node hostname or ip address + + -p , --port + Remote jmx agent port number + + -pp, --print-port + Operate in 4.0 mode with hosts disambiguated by port number + + -pw , --password + Remote jmx agent password + + -pwf , --password-file + Path to the JMX password file + + -u , --username + Remote jmx agent username diff --git a/test/unit/org/apache/cassandra/tools/nodetool/TpStatsJsonTest.java b/test/unit/org/apache/cassandra/tools/nodetool/TpStatsJsonTest.java new file mode 100644 index 000000000000..f2f4c67c5977 --- /dev/null +++ b/test/unit/org/apache/cassandra/tools/nodetool/TpStatsJsonTest.java @@ -0,0 +1,261 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.tools.nodetool; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import com.fasterxml.jackson.databind.node.ObjectNode; + +import org.junit.BeforeClass; +import org.junit.Test; + +import org.apache.cassandra.cql3.CQLTester; +import org.apache.cassandra.tools.ToolRunner; +import org.apache.cassandra.utils.JsonUtils; + +import static org.assertj.core.api.Assertions.assertThat; + +public class TpStatsJsonTest extends CQLTester +{ + @BeforeClass + public static void setup() throws Exception + { + requireNetwork(); + startJMXServer(); + } + + // ── 1. command exits cleanly and stdout is parseable JSON ──────────── + @Test + public void testOutputIsValidJson() + { + ToolRunner.ToolResult tool = ToolRunner.invokeNodetool("tpstatsjson"); + tool.assertOnCleanExit(); + assertThat(parseRoot(tool.getStdout())).isNotNull(); + } + + // ── 2. top-level shape and per-pool metric fields ──────────────────── + @Test + public void testStructure() + { + ObjectNode root = runAndParse(); + + // A healthy node produces exactly these two keys; no errors key. + assertThat(collectFieldNames(root)) + .containsExactlyInAnyOrder("thread_pools", "dropped_messages"); + + ObjectNode pools = (ObjectNode) root.get("thread_pools"); + assertThat(pools.size()).isGreaterThan(0); + + // Every pool must expose exactly the five required metric fields. + pools.fields().forEachRemaining(entry -> + { + List poolKeys = collectFieldNames((ObjectNode) entry.getValue()); + assertThat(poolKeys) + .containsExactlyInAnyOrder("active", "pending", "completed", + "blocked", "all_time_blocked"); + }); + + // dropped_messages is present (may be empty on a fresh node, that is fine). + assertThat(root.has("dropped_messages")).isTrue(); + } + + // ── 3. every object-node in the tree has alphabetically sorted keys ── + @Test + public void testKeyOrderingIsDeterministic() + { + assertKeysSorted(runAndParse()); + } + + // ── 4. a write + flush visibly advances at least one completed counter ─ + @Test + public void testDataChangesAreVisible() throws Throwable + { + ObjectNode before = runAndParse(); + + createTable("CREATE TABLE %s (pk int PRIMARY KEY, c int)"); + execute("INSERT INTO %s (pk, c) VALUES (?, ?)", 1, 1); + flush(); + + ObjectNode after = runAndParse(); + + // At least one thread pool's counters must have advanced. + assertThat(after.get("thread_pools")).isNotEqualTo(before.get("thread_pools")); + } + + // ── 5. N/A normalisation — the main contract for partially missing data ─ + /** + * Exercises the {@code "N/A"} -> {@code null} normalisation path + * end-to-end through the exact same {@link TpStatsJson#WRITER} the + * command uses, with fully controlled input. + *

+ * Setup: two pools. {@code ReadStage} has {@code "N/A"} on {@code blocked} + * and realistic numbers everywhere else. {@code MutationStage} is + * entirely populated. A single {@code dropped_messages} entry is included. + *

+ * Assertions + *

    + *
  1. The {@code blocked} field in {@code ReadStage} is JSON {@code null}.
  2. + *
  3. All four sibling fields in {@code ReadStage} carry their expected values.
  4. + *
  5. {@code MutationStage} is completely intact — nothing bled across pools.
  6. + *
  7. {@code dropped_messages} serialised correctly.
  8. + *
  9. No {@code errors} key is present — {@code "N/A"} is normalisation, + * not a failure, so it must not pollute the error map.
  10. + *
  11. Every object-node in the tree has its keys in alphabetical order.
  12. + *
+ */ + @Test + public void testPartiallyMissingMetricsProduceNullWithRestIntact() throws IOException + { + // ── build the map exactly as execute() would ───────────────────── + Map readStage = new HashMap<>(); + readStage.put("active", TpStatsJson.normalizeMetricValue(42)); + readStage.put("pending", TpStatsJson.normalizeMetricValue(0)); + readStage.put("completed", TpStatsJson.normalizeMetricValue(1000L)); + readStage.put("blocked", TpStatsJson.normalizeMetricValue("N/A")); // ← missing MBean + readStage.put("all_time_blocked", TpStatsJson.normalizeMetricValue(7L)); + + Map mutationStage = new HashMap<>(); + mutationStage.put("active", TpStatsJson.normalizeMetricValue(1)); + mutationStage.put("pending", TpStatsJson.normalizeMetricValue(0)); + mutationStage.put("completed", TpStatsJson.normalizeMetricValue(500L)); + mutationStage.put("blocked", TpStatsJson.normalizeMetricValue(0)); + mutationStage.put("all_time_blocked", TpStatsJson.normalizeMetricValue(2L)); + + Map pools = new HashMap<>(); + pools.put("ReadStage", readStage); + pools.put("MutationStage", mutationStage); + + Map root = new HashMap<>(); + root.put("thread_pools", pools); + root.put("dropped_messages", Collections.singletonMap("MUTATION_REQ", 3)); + // Deliberately no "errors" key — N/A is not an error. + + // ── serialise with the command's ObjectWriter ───────────────────── + ObjectNode parsed = parseRoot(TpStatsJson.WRITER.writeValueAsString(root)); + + // (6) key ordering holds at every level + assertKeysSorted(parsed); + + // (5) no errors key + assertThat(parsed.has("errors")).isFalse(); + + // (1) the N/A field is null + ObjectNode rs = (ObjectNode) parsed.get("thread_pools").get("ReadStage"); + assertThat(rs.get("blocked").isNull()).isTrue(); + + // (2) siblings in ReadStage are present and correct + assertThat(rs.get("active").asInt()).isEqualTo(42); + assertThat(rs.get("pending").asInt()).isEqualTo(0); + assertThat(rs.get("completed").asLong()).isEqualTo(1000L); + assertThat(rs.get("all_time_blocked").asLong()).isEqualTo(7L); + + // (3) MutationStage is completely intact + ObjectNode ms = (ObjectNode) parsed.get("thread_pools").get("MutationStage"); + assertThat(ms.get("active").asInt()).isEqualTo(1); + assertThat(ms.get("pending").asInt()).isEqualTo(0); + assertThat(ms.get("completed").asLong()).isEqualTo(500L); + assertThat(ms.get("blocked").asInt()).isEqualTo(0); + assertThat(ms.get("all_time_blocked").asLong()).isEqualTo(2L); + + // (4) dropped_messages came through + assertThat(parsed.get("dropped_messages").get("MUTATION_REQ").asInt()).isEqualTo(3); + } + + // ── 6. errors key serialises correctly and sorts with the rest ──────── + /** + * When {@code execute()} catches a section-level failure it adds an entry + * to the {@code errors} map. This test constructs exactly that shape and + * verifies: the key is present, its content is intact, and it sorts + * alphabetically between {@code dropped_messages} and {@code thread_pools} + * as required by the determinism contract. + */ + @Test + public void testErrorsFieldAppearsAndIsSortedWhenSectionFails() throws IOException + { + Map root = new HashMap<>(); + root.put("thread_pools", Collections.emptyMap()); + root.put("dropped_messages", Collections.emptyMap()); + root.put("errors", Collections.singletonMap("thread_pools", + Collections.singletonList("failed to enumerate pools: connection refused"))); + + ObjectNode parsed = parseRoot(TpStatsJson.WRITER.writeValueAsString(root)); + + // errors sorts between dropped_messages and thread_pools alphabetically. + List topKeys = collectFieldNames(parsed); + assertThat(topKeys).isEqualTo(List.of("dropped_messages", "errors", "thread_pools")); + + // Content is reachable and correct. + assertThat(parsed.get("errors").get("thread_pools").get(0).asText()) + .contains("connection refused"); + } + + // ── helpers ─────────────────────────────────────────────────────────── + + /** Invoke the command, assert clean exit, and return the parsed root node. */ + private static ObjectNode runAndParse() + { + ToolRunner.ToolResult result = ToolRunner.invokeNodetool("tpstatsjson"); + result.assertOnCleanExit(); + return parseRoot(result.getStdout()); + } + + private static ObjectNode parseRoot(String json) + { + try + { + return (ObjectNode) JsonUtils.JSON_OBJECT_MAPPER.readTree(json); + } + catch (IOException e) + { + throw new AssertionError("tpstatsjson output is not valid JSON", e); + } + } + + private static List collectFieldNames(ObjectNode node) + { + List keys = new ArrayList<>(); + node.fieldNames().forEachRemaining(keys::add); + return keys; + } + + /** + * Recursively asserts that every {@link ObjectNode} in the tree has its + * field names in natural (alphabetical) order. This is the precise + * definition of "deterministic output" for this command. + */ + private static void assertKeysSorted(ObjectNode node) + { + List keys = collectFieldNames(node); + List sorted = new ArrayList<>(keys); + Collections.sort(sorted); + assertThat(keys).as("field names of " + node.getClass().getSimpleName()) + .isEqualTo(sorted); + + // Recurse into every child that is itself an object. + node.fields().forEachRemaining(entry -> + { + if (entry.getValue().isObject()) + assertKeysSorted((ObjectNode) entry.getValue()); + }); + } +}