From bc8a46ffa9b9036c57e9a11633cdacf3bd772d2b Mon Sep 17 00:00:00 2001 From: Aparna Naik Date: Fri, 13 Feb 2026 12:03:36 -0800 Subject: [PATCH 1/2] Prevent log-incompatible upgrades --- .../tcm/transformations/Register.java | 16 ++ .../tcm/transformations/Startup.java | 13 ++ .../distributed/test/log/RegisterTest.java | 161 ++++++++++++++++-- .../distributed/test/log/StartupTest.java | 155 +++++++++++++++++ 4 files changed, 331 insertions(+), 14 deletions(-) create mode 100644 test/distributed/org/apache/cassandra/distributed/test/log/StartupTest.java diff --git a/src/java/org/apache/cassandra/tcm/transformations/Register.java b/src/java/org/apache/cassandra/tcm/transformations/Register.java index 0b3ebb375fab..63a0fdf9e331 100644 --- a/src/java/org/apache/cassandra/tcm/transformations/Register.java +++ b/src/java/org/apache/cassandra/tcm/transformations/Register.java @@ -74,6 +74,22 @@ public Kind kind() @Override public Result execute(ClusterMetadata prev) { + // Ensure the joining node can read existing cluster metadata. + // Skip check for empty directory (first node in a new cluster). + if (!prev.directory.isEmpty()) + { + Version clusterVersion = prev.directory.commonSerializationVersion; + Version newNodeVersion = version.serializationVersion(); + if (newNodeVersion.isBefore(clusterVersion)) + { + return new Rejected(INVALID, + String.format("Cannot register node: this node's metadata serialization version %s " + + "is lower than the cluster's minimum required version %s. " + + "Node would not be able to read cluster metadata.", + newNodeVersion, clusterVersion)); + } + } + for (Map.Entry entry : prev.directory.addresses.entrySet()) { NodeAddresses existingAddresses = entry.getValue(); diff --git a/src/java/org/apache/cassandra/tcm/transformations/Startup.java b/src/java/org/apache/cassandra/tcm/transformations/Startup.java index a1abfdbff4ec..8516e51fc671 100644 --- a/src/java/org/apache/cassandra/tcm/transformations/Startup.java +++ b/src/java/org/apache/cassandra/tcm/transformations/Startup.java @@ -68,6 +68,19 @@ public Kind kind() @Override public Result execute(ClusterMetadata prev) { + // Prevent downgrade to a version that cannot read cluster metadata. + // This protects against restarting a node with an older binary. + Version clusterVersion = prev.directory.commonSerializationVersion; + Version newNodeVersion = nodeVersion.serializationVersion(); + if (newNodeVersion.isBefore(clusterVersion)) + { + return new Rejected(INVALID, + String.format("Cannot downgrade node: this node's metadata serialization version %s " + + "is lower than the cluster's minimum required version %s. " + + "Node would not be able to read cluster metadata.", + newNodeVersion, clusterVersion)); + } + ClusterMetadata.Transformer next = prev.transformer(); if (!prev.directory.addresses.get(nodeId).equals(addresses)) { diff --git a/test/distributed/org/apache/cassandra/distributed/test/log/RegisterTest.java b/test/distributed/org/apache/cassandra/distributed/test/log/RegisterTest.java index 393e9efd7707..5cef9c35650a 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/log/RegisterTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/log/RegisterTest.java @@ -23,8 +23,11 @@ import java.nio.ByteBuffer; import java.util.EnumSet; +import com.google.common.collect.Sets; + import org.junit.Test; +import org.apache.cassandra.dht.Murmur3Partitioner; import org.apache.cassandra.distributed.Cluster; import org.apache.cassandra.distributed.api.Feature; import org.apache.cassandra.distributed.api.IInstanceConfig; @@ -34,10 +37,15 @@ import org.apache.cassandra.distributed.test.TestBaseImpl; import org.apache.cassandra.io.util.DataInputBuffer; import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.schema.DistributedMetadataLogKeyspace; +import org.apache.cassandra.schema.DistributedSchema; +import org.apache.cassandra.schema.Keyspaces; import org.apache.cassandra.tcm.ClusterMetadata; import org.apache.cassandra.tcm.ClusterMetadataService; import org.apache.cassandra.tcm.MetadataSnapshots; import org.apache.cassandra.tcm.Transformation; +import org.apache.cassandra.tcm.Transformation.Result; +import org.apache.cassandra.tcm.membership.Directory; import org.apache.cassandra.tcm.membership.Location; import org.apache.cassandra.tcm.membership.NodeAddresses; import org.apache.cassandra.tcm.membership.NodeId; @@ -49,15 +57,33 @@ import org.apache.cassandra.tcm.serialization.Version; import org.apache.cassandra.tcm.transformations.PrepareLeave; import org.apache.cassandra.tcm.transformations.Register; -import org.apache.cassandra.tcm.transformations.Startup; import org.apache.cassandra.tcm.transformations.TriggerSnapshot; import org.apache.cassandra.tcm.transformations.Unregister; import org.apache.cassandra.utils.CassandraVersion; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; public class RegisterTest extends TestBaseImpl { + private static final Location TEST_LOCATION = new Location("datacenter1", "rack1"); + + private static ClusterMetadata createEmptyMetadata() + { + Keyspaces keyspaces = Keyspaces.of(DistributedMetadataLogKeyspace.initialMetadata(Sets.newHashSet("datacenter1"))); + DistributedSchema schema = new DistributedSchema(keyspaces); + return new ClusterMetadata(Murmur3Partitioner.instance, Directory.EMPTY, schema); + } + + private static ClusterMetadata register(String endpoint, NodeVersion version, ClusterMetadata metadata) throws UnknownHostException + { + return new Register( + new NodeAddresses(InetAddressAndPort.getByName(endpoint)), + TEST_LOCATION, + version + ).execute(metadata).success().metadata; + } + @Test public void testRegistrationIdempotence() throws Throwable { @@ -108,23 +134,24 @@ public void serializationVersionCeilingTest() throws Throwable cluster.get(1).runOnInstance(() -> { try { - // Register a ghost node with V0 to fake-force V0 serialization. In a real world cluster we will always be upgrading from a smaller version. + // Unregister to make directory empty + ClusterMetadataService.instance().commit(new Unregister(ClusterMetadata.current().myNodeId(), + EnumSet.allOf(NodeState.class), + ClusterMetadataService.instance().placementProvider())); + + // Register a ghost node with V0 (bypasses version check because directory is now empty). + // In a real world cluster we will always be upgrading from a smaller version. ClusterMetadataService.instance().commit(new Register(new NodeAddresses(InetAddressAndPort.getByName(firstNodeEndpoint)), - ClusterMetadata.current().directory.location(ClusterMetadata.current().myNodeId()), + TEST_LOCATION, new NodeVersion(NodeVersion.CURRENT.cassandraVersion, Version.V0))); NodeId oldNode = ClusterMetadata.current().directory.peerId(InetAddressAndPort.getByName(firstNodeEndpoint)); - // Fake an upgrade of this node and assert we continue to serialize so that the one which only - // supports V0 can deserialize. In a real cluster it wouldn't happen exactly in this way (here the - // min serialization version actually goes backwards from CURRENT to V0 when we upgrade, which would - // not happen in a real cluster as we would never register like oldNode, with the current C* version - // but an older metadata version + + // Register a node with upgraded version CassandraVersion currentVersion = NodeVersion.CURRENT.cassandraVersion; NodeVersion upgraded = new NodeVersion(new CassandraVersion(String.format("%d.%d.%d", currentVersion.major + 1, 0, 0)), NodeVersion.CURRENT_METADATA_VERSION); - ClusterMetadata metadata = ClusterMetadata.current(); - NodeId id = metadata.myNodeId(); - Startup startup = new Startup(id, metadata.directory.getNodeAddresses(id), upgraded); - ClusterMetadataService.instance().commit(startup); + ClusterMetadataService.instance().commit(new Register(NodeAddresses.current(), TEST_LOCATION, upgraded)); + // Doesn't matter which specific Transformation we use here, we're testing that the serializer uses // the correct lower bound Transformation t = new Register(NodeAddresses.current(), new Location("DC", "RACK"), NodeVersion.CURRENT); @@ -173,9 +200,15 @@ public void replayLocallyFromV0Snapshot() throws Throwable cluster.get(1).runOnInstance(() -> { try { - // Register a ghost node with V0 to fake-force V0 serialization. In a real world cluster we will always be upgrading from a smaller version. + // Unregister to make directory empty + ClusterMetadataService.instance().commit(new Unregister(ClusterMetadata.current().myNodeId(), + EnumSet.allOf(NodeState.class), + ClusterMetadataService.instance().placementProvider())); + + // Register a ghost node with V0 (bypasses version check because directory is now empty). + // In a real world cluster we will always be upgrading from a smaller version. ClusterMetadataService.instance().commit(new Register(new NodeAddresses(InetAddressAndPort.getByName("127.0.0.10")), - ClusterMetadata.current().directory.location(ClusterMetadata.current().myNodeId()), + TEST_LOCATION, new NodeVersion(NodeVersion.CURRENT.cassandraVersion, Version.V0))); } catch (UnknownHostException e) @@ -187,8 +220,108 @@ public void replayLocallyFromV0Snapshot() throws Throwable ClusterMetadata cm = new MetadataSnapshots.SystemKeyspaceMetadataSnapshots().getSnapshot(ClusterMetadata.current().epoch); cm.equals(ClusterMetadata.current()); }); + } + } + + /** + * Tests that registering a new node with a serialization version lower than the cluster's + * commonSerializationVersion is rejected. + * + * Scenario: + * - Cluster has 1 node running at CURRENT_METADATA_VERSION (e.g., V8) + * - commonSerializationVersion = V8 + * - A NEW node tries to register with V3 + * - Should be REJECTED because V3 cannot read V8 metadata + */ + @Test + public void testRegisterRejectsLowerSerializationVersion() throws Throwable + { + try (Cluster cluster = builder().withNodes(1).createWithoutStarting()) + { + cluster.get(1).startup(); + cluster.get(1).runOnInstance(() -> { + try + { + // Cluster's node1 is running at CURRENT_METADATA_VERSION (e.g., V8) + // commonSerializationVersion = V8 + + // Try to register a NEW node with a lower version (V3) + NodeVersion lowerVersion = new NodeVersion(NodeVersion.CURRENT.cassandraVersion, Version.V3); + Register register = new Register( + new NodeAddresses(InetAddressAndPort.getByName("127.0.0.10")), + ClusterMetadata.current().directory.location(ClusterMetadata.current().myNodeId()), + lowerVersion + ); + + Transformation.Result result = register.execute(ClusterMetadata.current()); + + assertTrue("Registration should be rejected for node with lower serialization version", + result.isRejected()); + assertTrue("Rejection message should mention serialization version", + result.rejected().reason.contains("serialization version")); + } + catch (UnknownHostException e) + { + throw new RuntimeException(e); + } + }); + } + } + /** + * Tests that registering nodes with serialization version equal to or higher than + * the cluster's commonSerializationVersion is allowed. + * + * Scenario: + * - Create empty metadata and register a V3 node first (bypasses version check) + * - commonSerializationVersion = V3 + * - Then register a V5 node - should succeed since V5 >= V3 + * - Then register a V3 node - should succeed since V3 >= V3 + */ + @Test + public void testRegisterAllowsEqualOrHigherSerializationVersion() throws Throwable + { + try (Cluster cluster = builder().withNodes(1).createWithoutStarting()) + { + cluster.get(1).startup(); + cluster.get(1).runOnInstance(() -> { + try + { + // Create empty metadata and register V3 node first (bypasses version check because directory is empty) + NodeVersion v3 = new NodeVersion(NodeVersion.CURRENT.cassandraVersion, Version.V3); + ClusterMetadata metadata = register("127.0.0.10", v3, createEmptyMetadata()); + + assertEquals("commonSerializationVersion should be V3", + Version.V3, metadata.directory.commonSerializationVersion); + + // Now register a V5 node - should succeed since V5 >= V3 + NodeVersion v5 = new NodeVersion(NodeVersion.CURRENT.cassandraVersion, Version.V5); + Register registerV5 = new Register( + new NodeAddresses(InetAddressAndPort.getByName("127.0.0.11")), + TEST_LOCATION, + v5 + ); + + Result resultV5 = registerV5.execute(metadata); + assertTrue("Registration should succeed for V5 node when cluster is at V3", + resultV5.isSuccess()); + + // Register another V3 node - should succeed since V3 >= V3 + Register registerV3 = new Register( + new NodeAddresses(InetAddressAndPort.getByName("127.0.0.12")), + TEST_LOCATION, + v3 + ); + Result resultV3 = registerV3.execute(metadata); + assertTrue("Registration should succeed for V3 node when cluster is at V3", + resultV3.isSuccess()); + } + catch (UnknownHostException e) + { + throw new RuntimeException(e); + } + }); } } diff --git a/test/distributed/org/apache/cassandra/distributed/test/log/StartupTest.java b/test/distributed/org/apache/cassandra/distributed/test/log/StartupTest.java new file mode 100644 index 000000000000..33b843230c89 --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/test/log/StartupTest.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.test.log; + +import java.net.UnknownHostException; + +import com.google.common.collect.Sets; + +import org.junit.Test; + +import org.apache.cassandra.dht.Murmur3Partitioner; +import org.apache.cassandra.distributed.Cluster; +import org.apache.cassandra.distributed.test.TestBaseImpl; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.schema.DistributedMetadataLogKeyspace; +import org.apache.cassandra.schema.DistributedSchema; +import org.apache.cassandra.schema.Keyspaces; +import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.tcm.Transformation.Result; +import org.apache.cassandra.tcm.membership.Directory; +import org.apache.cassandra.tcm.membership.Location; +import org.apache.cassandra.tcm.membership.NodeAddresses; +import org.apache.cassandra.tcm.membership.NodeId; +import org.apache.cassandra.tcm.membership.NodeVersion; +import org.apache.cassandra.tcm.serialization.Version; +import org.apache.cassandra.tcm.transformations.Register; +import org.apache.cassandra.tcm.transformations.Startup; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class StartupTest extends TestBaseImpl +{ + private static final Location TEST_LOCATION = new Location("datacenter1", "rack1"); + + private static ClusterMetadata createEmptyMetadata() + { + Keyspaces keyspaces = Keyspaces.of(DistributedMetadataLogKeyspace.initialMetadata(Sets.newHashSet("datacenter1"))); + DistributedSchema schema = new DistributedSchema(keyspaces); + return new ClusterMetadata(Murmur3Partitioner.instance, Directory.EMPTY, schema); + } + + private static ClusterMetadata register(String endpoint, NodeVersion version, ClusterMetadata metadata) throws UnknownHostException + { + return new Register( + new NodeAddresses(InetAddressAndPort.getByName(endpoint)), + TEST_LOCATION, + version + ).execute(metadata).success().metadata; + } + + /** + * Tests that the Startup transformation rejects downgrading a node to a version + * that cannot read cluster metadata. + * + * Scenario: + * - Cluster has 1 node running at CURRENT_METADATA_VERSION + * - commonSerializationVersion = CURRENT_METADATA_VERSION + * - Node tries to "downgrade" by restarting with a lower version + * - Should be REJECTED because the lower version cannot read the current metadata + */ + @Test + public void testStartupRejectsDowngrade() throws Throwable + { + try (Cluster cluster = builder().withNodes(1).createWithoutStarting()) + { + cluster.get(1).startup(); + cluster.get(1).runOnInstance(() -> { + ClusterMetadata metadata = ClusterMetadata.current(); + NodeId nodeId = metadata.myNodeId(); + + // Try to "downgrade" the node to V3 (simulating restart with older binary) + NodeVersion downgradedVersion = new NodeVersion(NodeVersion.CURRENT.cassandraVersion, Version.V3); + Startup startup = new Startup(nodeId, metadata.directory.getNodeAddresses(nodeId), downgradedVersion); + + Result result = startup.execute(metadata); + + assertTrue("Startup should be rejected for downgrade to lower serialization version", + result.isRejected()); + assertTrue("Rejection message should mention serialization version", + result.rejected().reason.contains("serialization version")); + }); + } + } + + /** + * Tests that the Startup transformation allows a node to restart with equal or higher + * serialization version. + * + * Scenario: + * - Create empty ClusterMetadata with Directory.EMPTY + * - Register a V3 node (succeeds because directory is empty - first node) + * - Register a second node to test Startup against + * - Test Startup with V5 (higher than V3) - should succeed + * - Test Startup with V3 (equal to V3) - should succeed + */ + @Test + public void testStartupAllowsEqualOrHigherSerializationVersion() throws Throwable + { + try (Cluster cluster = builder().withNodes(1).createWithoutStarting()) + { + cluster.get(1).startup(); + cluster.get(1).runOnInstance(() -> { + try + { + // Register first node with V3 - succeeds because directory is empty + NodeVersion v3 = new NodeVersion(NodeVersion.CURRENT.cassandraVersion, Version.V3); + ClusterMetadata metadataWithV3Node = register("127.0.0.10", v3, createEmptyMetadata()); + + assertEquals("commonSerializationVersion should be V3", + Version.V3, metadataWithV3Node.directory.commonSerializationVersion); + + // Register a second node to test Startup against + ClusterMetadata testMetadata = register("127.0.0.11", v3, metadataWithV3Node); + NodeId testNodeId = testMetadata.directory.peerId(InetAddressAndPort.getByName("127.0.0.11")); + + // Test Startup with V5 (higher than V3) - should succeed + NodeVersion v5 = new NodeVersion(NodeVersion.CURRENT.cassandraVersion, Version.V5); + Startup startupV5 = new Startup(testNodeId, testMetadata.directory.getNodeAddresses(testNodeId), v5); + + Result resultV5 = startupV5.execute(testMetadata); + assertTrue("Startup should succeed for V5 when cluster is at V3", + resultV5.isSuccess()); + + // Test Startup with V3 (equal to cluster version) - should succeed + Startup startupV3 = new Startup(testNodeId, testMetadata.directory.getNodeAddresses(testNodeId), v3); + + Result resultV3 = startupV3.execute(testMetadata); + assertTrue("Startup should succeed for V3 when cluster is at V3", + resultV3.isSuccess()); + } + catch (UnknownHostException e) + { + throw new RuntimeException(e); + } + }); + } + } +} From b651a23d457d79dbd0be70ad933283c71c142320 Mon Sep 17 00:00:00 2001 From: Aparna Naik Date: Thu, 19 Feb 2026 15:13:12 -0800 Subject: [PATCH 2/2] JIRA Feedback addressed --- ...tibleMetadataSerializationVersionTest.java | 111 +++++++++++++ .../distributed/test/log/RegisterTest.java | 105 ------------ .../distributed/test/log/StartupTest.java | 155 ------------------ .../tcm/transformations/RegisterTest.java | 150 +++++++++++++++++ .../tcm/transformations/StartupTest.java | 112 +++++++++++++ 5 files changed, 373 insertions(+), 260 deletions(-) create mode 100644 test/distributed/org/apache/cassandra/distributed/test/log/IncompatibleMetadataSerializationVersionTest.java delete mode 100644 test/distributed/org/apache/cassandra/distributed/test/log/StartupTest.java create mode 100644 test/unit/org/apache/cassandra/tcm/transformations/RegisterTest.java create mode 100644 test/unit/org/apache/cassandra/tcm/transformations/StartupTest.java diff --git a/test/distributed/org/apache/cassandra/distributed/test/log/IncompatibleMetadataSerializationVersionTest.java b/test/distributed/org/apache/cassandra/distributed/test/log/IncompatibleMetadataSerializationVersionTest.java new file mode 100644 index 000000000000..84e461b6e474 --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/test/log/IncompatibleMetadataSerializationVersionTest.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.test.log; + +import net.bytebuddy.ByteBuddy; +import net.bytebuddy.dynamic.loading.ClassLoadingStrategy; +import net.bytebuddy.implementation.MethodDelegation; + +import org.junit.Assert; +import org.junit.Test; + +import org.apache.cassandra.distributed.Cluster; +import org.apache.cassandra.distributed.test.TestBaseImpl; +import org.apache.cassandra.tcm.membership.NodeVersion; +import org.apache.cassandra.tcm.serialization.Version; + +import static net.bytebuddy.matcher.ElementMatchers.named; +import static org.apache.cassandra.tcm.membership.NodeVersion.CURRENT_METADATA_VERSION; + +public class IncompatibleMetadataSerializationVersionTest extends TestBaseImpl +{ + @Test + public void incompatibleVersionsCauseStartupFailureTest() throws Throwable + { + try (Cluster cluster = builder().withNodes(2) + .withInstanceInitializer(BB::install) + .createWithoutStarting()) + { + cluster.get(1).startup(); + // node1 has joined as normal so any entries committed to the metadata log will be serialized with + // NodeVersion.CURRENT_METADATA_VERSION. We will join node2, but the BB class used as an instanceInitializer + // will force it not to recognise this version. This simulates a node running an older, incompatible version + // attempting to join the cluster and should fail as the metadata log and snapshots it receives at startup + // are unreadable to it. + // We'll also set up the uncaught exceptions filter so that errors reported by node2 do not automatically + // trigger a failure, so that we can assert that the specific error we're expecting is thrown and logged. + cluster.setUncaughtExceptionsFilter((i, t) -> i != 2); + try + { + cluster.get(2).startup(); + Assert.fail("Node2 startup should fail due to unsupported metadata versions"); + } + catch (Exception e) + { + String expectedError = String.format("Unsupported metadata version \\(%s\\)", CURRENT_METADATA_VERSION.asInt()); + Assert.assertFalse(cluster.get(2) + .logs() + .grep(expectedError) + .getResult() + .isEmpty()); + } + } + } + + public static class BB + { + static void install(ClassLoader cl, int node) + { + // only change behaviour of node2 + if (node == 2) + { + new ByteBuddy().rebase(Version.class) + .method(named("fromInt")) + .intercept(MethodDelegation.to(BB.class)) + .make() + .load(cl, ClassLoadingStrategy.Default.INJECTION); + + new ByteBuddy().rebase(NodeVersion.class) + .method(named("serializationVersion")) + .intercept(MethodDelegation.to(BB.class)) + .make() + .load(cl, ClassLoadingStrategy.Default.INJECTION); + } + } + + public static Version serializationVersion() + { + // This is called during node startup when initializing the LogState class and in particular its static + // defaultMessageSerializer field. We will emulate the behaviour of a node running an old version. + return Version.V0; + } + + public static Version fromInt(int i) + { + // Behave as if the supplied version is invalid, unless it is the V0 value we are returning from the other + // intercepted method. This will cause any other version encountered, such as when receiving versioned log + // entries from another node, to appear unreadable. + if (i == Version.V0.asInt()) + return Version.V0; + + throw new IllegalArgumentException("Unsupported metadata version (" + i + ")"); + } + } + +} diff --git a/test/distributed/org/apache/cassandra/distributed/test/log/RegisterTest.java b/test/distributed/org/apache/cassandra/distributed/test/log/RegisterTest.java index 5cef9c35650a..6a4c858f5b81 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/log/RegisterTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/log/RegisterTest.java @@ -44,7 +44,6 @@ import org.apache.cassandra.tcm.ClusterMetadataService; import org.apache.cassandra.tcm.MetadataSnapshots; import org.apache.cassandra.tcm.Transformation; -import org.apache.cassandra.tcm.Transformation.Result; import org.apache.cassandra.tcm.membership.Directory; import org.apache.cassandra.tcm.membership.Location; import org.apache.cassandra.tcm.membership.NodeAddresses; @@ -62,7 +61,6 @@ import org.apache.cassandra.utils.CassandraVersion; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; public class RegisterTest extends TestBaseImpl { @@ -222,107 +220,4 @@ public void replayLocallyFromV0Snapshot() throws Throwable }); } } - - /** - * Tests that registering a new node with a serialization version lower than the cluster's - * commonSerializationVersion is rejected. - * - * Scenario: - * - Cluster has 1 node running at CURRENT_METADATA_VERSION (e.g., V8) - * - commonSerializationVersion = V8 - * - A NEW node tries to register with V3 - * - Should be REJECTED because V3 cannot read V8 metadata - */ - @Test - public void testRegisterRejectsLowerSerializationVersion() throws Throwable - { - try (Cluster cluster = builder().withNodes(1).createWithoutStarting()) - { - cluster.get(1).startup(); - cluster.get(1).runOnInstance(() -> { - try - { - // Cluster's node1 is running at CURRENT_METADATA_VERSION (e.g., V8) - // commonSerializationVersion = V8 - - // Try to register a NEW node with a lower version (V3) - NodeVersion lowerVersion = new NodeVersion(NodeVersion.CURRENT.cassandraVersion, Version.V3); - Register register = new Register( - new NodeAddresses(InetAddressAndPort.getByName("127.0.0.10")), - ClusterMetadata.current().directory.location(ClusterMetadata.current().myNodeId()), - lowerVersion - ); - - Transformation.Result result = register.execute(ClusterMetadata.current()); - - assertTrue("Registration should be rejected for node with lower serialization version", - result.isRejected()); - assertTrue("Rejection message should mention serialization version", - result.rejected().reason.contains("serialization version")); - } - catch (UnknownHostException e) - { - throw new RuntimeException(e); - } - }); - } - } - - /** - * Tests that registering nodes with serialization version equal to or higher than - * the cluster's commonSerializationVersion is allowed. - * - * Scenario: - * - Create empty metadata and register a V3 node first (bypasses version check) - * - commonSerializationVersion = V3 - * - Then register a V5 node - should succeed since V5 >= V3 - * - Then register a V3 node - should succeed since V3 >= V3 - */ - @Test - public void testRegisterAllowsEqualOrHigherSerializationVersion() throws Throwable - { - try (Cluster cluster = builder().withNodes(1).createWithoutStarting()) - { - cluster.get(1).startup(); - cluster.get(1).runOnInstance(() -> { - try - { - // Create empty metadata and register V3 node first (bypasses version check because directory is empty) - NodeVersion v3 = new NodeVersion(NodeVersion.CURRENT.cassandraVersion, Version.V3); - ClusterMetadata metadata = register("127.0.0.10", v3, createEmptyMetadata()); - - assertEquals("commonSerializationVersion should be V3", - Version.V3, metadata.directory.commonSerializationVersion); - - // Now register a V5 node - should succeed since V5 >= V3 - NodeVersion v5 = new NodeVersion(NodeVersion.CURRENT.cassandraVersion, Version.V5); - Register registerV5 = new Register( - new NodeAddresses(InetAddressAndPort.getByName("127.0.0.11")), - TEST_LOCATION, - v5 - ); - - Result resultV5 = registerV5.execute(metadata); - assertTrue("Registration should succeed for V5 node when cluster is at V3", - resultV5.isSuccess()); - - // Register another V3 node - should succeed since V3 >= V3 - Register registerV3 = new Register( - new NodeAddresses(InetAddressAndPort.getByName("127.0.0.12")), - TEST_LOCATION, - v3 - ); - - Result resultV3 = registerV3.execute(metadata); - assertTrue("Registration should succeed for V3 node when cluster is at V3", - resultV3.isSuccess()); - } - catch (UnknownHostException e) - { - throw new RuntimeException(e); - } - }); - } - } - } diff --git a/test/distributed/org/apache/cassandra/distributed/test/log/StartupTest.java b/test/distributed/org/apache/cassandra/distributed/test/log/StartupTest.java deleted file mode 100644 index 33b843230c89..000000000000 --- a/test/distributed/org/apache/cassandra/distributed/test/log/StartupTest.java +++ /dev/null @@ -1,155 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.cassandra.distributed.test.log; - -import java.net.UnknownHostException; - -import com.google.common.collect.Sets; - -import org.junit.Test; - -import org.apache.cassandra.dht.Murmur3Partitioner; -import org.apache.cassandra.distributed.Cluster; -import org.apache.cassandra.distributed.test.TestBaseImpl; -import org.apache.cassandra.locator.InetAddressAndPort; -import org.apache.cassandra.schema.DistributedMetadataLogKeyspace; -import org.apache.cassandra.schema.DistributedSchema; -import org.apache.cassandra.schema.Keyspaces; -import org.apache.cassandra.tcm.ClusterMetadata; -import org.apache.cassandra.tcm.Transformation.Result; -import org.apache.cassandra.tcm.membership.Directory; -import org.apache.cassandra.tcm.membership.Location; -import org.apache.cassandra.tcm.membership.NodeAddresses; -import org.apache.cassandra.tcm.membership.NodeId; -import org.apache.cassandra.tcm.membership.NodeVersion; -import org.apache.cassandra.tcm.serialization.Version; -import org.apache.cassandra.tcm.transformations.Register; -import org.apache.cassandra.tcm.transformations.Startup; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -public class StartupTest extends TestBaseImpl -{ - private static final Location TEST_LOCATION = new Location("datacenter1", "rack1"); - - private static ClusterMetadata createEmptyMetadata() - { - Keyspaces keyspaces = Keyspaces.of(DistributedMetadataLogKeyspace.initialMetadata(Sets.newHashSet("datacenter1"))); - DistributedSchema schema = new DistributedSchema(keyspaces); - return new ClusterMetadata(Murmur3Partitioner.instance, Directory.EMPTY, schema); - } - - private static ClusterMetadata register(String endpoint, NodeVersion version, ClusterMetadata metadata) throws UnknownHostException - { - return new Register( - new NodeAddresses(InetAddressAndPort.getByName(endpoint)), - TEST_LOCATION, - version - ).execute(metadata).success().metadata; - } - - /** - * Tests that the Startup transformation rejects downgrading a node to a version - * that cannot read cluster metadata. - * - * Scenario: - * - Cluster has 1 node running at CURRENT_METADATA_VERSION - * - commonSerializationVersion = CURRENT_METADATA_VERSION - * - Node tries to "downgrade" by restarting with a lower version - * - Should be REJECTED because the lower version cannot read the current metadata - */ - @Test - public void testStartupRejectsDowngrade() throws Throwable - { - try (Cluster cluster = builder().withNodes(1).createWithoutStarting()) - { - cluster.get(1).startup(); - cluster.get(1).runOnInstance(() -> { - ClusterMetadata metadata = ClusterMetadata.current(); - NodeId nodeId = metadata.myNodeId(); - - // Try to "downgrade" the node to V3 (simulating restart with older binary) - NodeVersion downgradedVersion = new NodeVersion(NodeVersion.CURRENT.cassandraVersion, Version.V3); - Startup startup = new Startup(nodeId, metadata.directory.getNodeAddresses(nodeId), downgradedVersion); - - Result result = startup.execute(metadata); - - assertTrue("Startup should be rejected for downgrade to lower serialization version", - result.isRejected()); - assertTrue("Rejection message should mention serialization version", - result.rejected().reason.contains("serialization version")); - }); - } - } - - /** - * Tests that the Startup transformation allows a node to restart with equal or higher - * serialization version. - * - * Scenario: - * - Create empty ClusterMetadata with Directory.EMPTY - * - Register a V3 node (succeeds because directory is empty - first node) - * - Register a second node to test Startup against - * - Test Startup with V5 (higher than V3) - should succeed - * - Test Startup with V3 (equal to V3) - should succeed - */ - @Test - public void testStartupAllowsEqualOrHigherSerializationVersion() throws Throwable - { - try (Cluster cluster = builder().withNodes(1).createWithoutStarting()) - { - cluster.get(1).startup(); - cluster.get(1).runOnInstance(() -> { - try - { - // Register first node with V3 - succeeds because directory is empty - NodeVersion v3 = new NodeVersion(NodeVersion.CURRENT.cassandraVersion, Version.V3); - ClusterMetadata metadataWithV3Node = register("127.0.0.10", v3, createEmptyMetadata()); - - assertEquals("commonSerializationVersion should be V3", - Version.V3, metadataWithV3Node.directory.commonSerializationVersion); - - // Register a second node to test Startup against - ClusterMetadata testMetadata = register("127.0.0.11", v3, metadataWithV3Node); - NodeId testNodeId = testMetadata.directory.peerId(InetAddressAndPort.getByName("127.0.0.11")); - - // Test Startup with V5 (higher than V3) - should succeed - NodeVersion v5 = new NodeVersion(NodeVersion.CURRENT.cassandraVersion, Version.V5); - Startup startupV5 = new Startup(testNodeId, testMetadata.directory.getNodeAddresses(testNodeId), v5); - - Result resultV5 = startupV5.execute(testMetadata); - assertTrue("Startup should succeed for V5 when cluster is at V3", - resultV5.isSuccess()); - - // Test Startup with V3 (equal to cluster version) - should succeed - Startup startupV3 = new Startup(testNodeId, testMetadata.directory.getNodeAddresses(testNodeId), v3); - - Result resultV3 = startupV3.execute(testMetadata); - assertTrue("Startup should succeed for V3 when cluster is at V3", - resultV3.isSuccess()); - } - catch (UnknownHostException e) - { - throw new RuntimeException(e); - } - }); - } - } -} diff --git a/test/unit/org/apache/cassandra/tcm/transformations/RegisterTest.java b/test/unit/org/apache/cassandra/tcm/transformations/RegisterTest.java new file mode 100644 index 000000000000..3b385b01b4bf --- /dev/null +++ b/test/unit/org/apache/cassandra/tcm/transformations/RegisterTest.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.tcm.transformations; + +import java.net.UnknownHostException; + +import org.junit.Test; + +import org.apache.cassandra.dht.Murmur3Partitioner; +import org.apache.cassandra.distributed.test.log.ClusterMetadataTestHelper; +import org.apache.cassandra.exceptions.ExceptionCode; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.tcm.Transformation; +import org.apache.cassandra.tcm.membership.Directory; +import org.apache.cassandra.tcm.membership.Location; +import org.apache.cassandra.tcm.membership.NodeAddresses; +import org.apache.cassandra.tcm.membership.NodeId; +import org.apache.cassandra.tcm.membership.NodeState; +import org.apache.cassandra.tcm.membership.NodeVersion; +import org.apache.cassandra.tcm.serialization.Version; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class RegisterTest +{ + private static final Location LOCATION = new Location("dc", "rack"); + + /** + * Tests that registering a new node with a serialization version lower than the cluster's + * commonSerializationVersion is rejected. + */ + @Test + public void rejectsLowerSerializationVersion() throws UnknownHostException + { + NodeId existingNode = new NodeId(1); + + Directory directory = Directory.EMPTY + .unsafeWithNodeForTesting(existingNode, + new NodeAddresses(InetAddressAndPort.getByName("127.0.0.1")), + LOCATION, + NodeVersion.CURRENT) + .withNodeState(existingNode, NodeState.JOINED); + + ClusterMetadata metadata = ClusterMetadataTestHelper.minimalForTesting(Murmur3Partitioner.instance) + .transformer() + .with(directory) + .build().metadata; + + assertEquals("commonSerializationVersion should be CURRENT_METADATA_VERSION", NodeVersion.CURRENT_METADATA_VERSION, metadata.directory.commonSerializationVersion); + + // Try to register a new node with V3 (lower than cluster's current version) + NodeVersion lowerVersion = new NodeVersion(NodeVersion.CURRENT.cassandraVersion, Version.V3); + Register register = new Register( + new NodeAddresses(InetAddressAndPort.getByName("127.0.0.2")), + LOCATION, + lowerVersion + ); + + Transformation.Result result = register.execute(metadata); + + assertTrue("Registration should be rejected for node with lower serialization version", result.isRejected()); + assertEquals(ExceptionCode.INVALID, result.rejected().code); + } + + /** + * Tests that registering nodes with serialization version equal to or higher than + * the cluster's commonSerializationVersion is allowed. + */ + @Test + public void allowsEqualOrHigherSerializationVersion() throws UnknownHostException + { + NodeId existingNode = new NodeId(1); + NodeVersion v3 = new NodeVersion(NodeVersion.CURRENT.cassandraVersion, Version.V3); + + Directory directory = Directory.EMPTY + .unsafeWithNodeForTesting(existingNode, + new NodeAddresses(InetAddressAndPort.getByName("127.0.0.1")), + LOCATION, + v3) + .withNodeState(existingNode, NodeState.JOINED); + + ClusterMetadata metadata = ClusterMetadataTestHelper.minimalForTesting(Murmur3Partitioner.instance) + .transformer() + .with(directory) + .build().metadata; + + assertEquals("commonSerializationVersion should be V3", Version.V3, metadata.directory.commonSerializationVersion); + + // Register a node with higher version - should succeed + Register registerHigher = new Register( + new NodeAddresses(InetAddressAndPort.getByName("127.0.0.2")), + LOCATION, + NodeVersion.CURRENT + ); + + Transformation.Result resultHigher = registerHigher.execute(metadata); + assertTrue("Registration should succeed for node with higher serialization version", resultHigher.isSuccess()); + + // Register a node with equal version - should succeed + Register registerEqual = new Register( + new NodeAddresses(InetAddressAndPort.getByName("127.0.0.3")), + LOCATION, + v3 + ); + + Transformation.Result resultEqual = registerEqual.execute(metadata); + assertTrue("Registration should succeed for node with equal serialization version", resultEqual.isSuccess()); + } + + /** + * Tests that the first node in an empty cluster can register with any version + * (bypasses version check because directory is empty). + */ + @Test + public void allowsAnyVersionForFirstNode() throws UnknownHostException + { + ClusterMetadata metadata = ClusterMetadataTestHelper.minimalForTesting(Murmur3Partitioner.instance); + + assertTrue("Directory should be empty", metadata.directory.isEmpty()); + + // Register first node with V0 - should succeed because directory is empty + NodeVersion v0 = new NodeVersion(NodeVersion.CURRENT.cassandraVersion, Version.V0); + Register register = new Register( + new NodeAddresses(InetAddressAndPort.getByName("127.0.0.1")), + LOCATION, + v0 + ); + + Transformation.Result result = register.execute(metadata); + assertTrue("First node registration should succeed with any version", result.isSuccess()); + } +} \ No newline at end of file diff --git a/test/unit/org/apache/cassandra/tcm/transformations/StartupTest.java b/test/unit/org/apache/cassandra/tcm/transformations/StartupTest.java new file mode 100644 index 000000000000..8e0c165a22bc --- /dev/null +++ b/test/unit/org/apache/cassandra/tcm/transformations/StartupTest.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.tcm.transformations; + +import java.net.UnknownHostException; + +import org.junit.Test; + +import org.apache.cassandra.dht.Murmur3Partitioner; +import org.apache.cassandra.distributed.test.log.ClusterMetadataTestHelper; +import org.apache.cassandra.exceptions.ExceptionCode; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.tcm.Transformation; +import org.apache.cassandra.tcm.membership.Directory; +import org.apache.cassandra.tcm.membership.Location; +import org.apache.cassandra.tcm.membership.NodeAddresses; +import org.apache.cassandra.tcm.membership.NodeId; +import org.apache.cassandra.tcm.membership.NodeState; +import org.apache.cassandra.tcm.membership.NodeVersion; +import org.apache.cassandra.tcm.serialization.Version; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class StartupTest +{ + private static final Location LOCATION = new Location("dc", "rack"); + + /** + * Tests that the Startup transformation rejects downgrading a node to a version + * that cannot read cluster metadata. + */ + @Test + public void rejectsDowngrade() throws UnknownHostException + { + NodeId nodeId = new NodeId(1); + NodeAddresses addresses = new NodeAddresses(InetAddressAndPort.getByName("127.0.0.1")); + + Directory directory = Directory.EMPTY + .unsafeWithNodeForTesting(nodeId, addresses, LOCATION, NodeVersion.CURRENT) + .withNodeState(nodeId, NodeState.JOINED); + + ClusterMetadata metadata = ClusterMetadataTestHelper.minimalForTesting(Murmur3Partitioner.instance) + .transformer() + .with(directory) + .build().metadata; + + assertEquals("commonSerializationVersion should be CURRENT_METADATA_VERSION", + NodeVersion.CURRENT_METADATA_VERSION, metadata.directory.commonSerializationVersion); + + // Try to "downgrade" the node to V3 (simulating restart with older binary) + NodeVersion downgradedVersion = new NodeVersion(NodeVersion.CURRENT.cassandraVersion, Version.V3); + Startup startup = new Startup(nodeId, addresses, downgradedVersion); + + Transformation.Result result = startup.execute(metadata); + + assertTrue("Startup should be rejected for downgrade to lower serialization version", result.isRejected()); + assertEquals(ExceptionCode.INVALID, result.rejected().code); + } + + /** + * Tests that the Startup transformation allows a node to restart with equal or higher + * serialization version. + */ + @Test + public void allowsEqualOrHigherSerializationVersion() throws UnknownHostException + { + NodeId nodeId = new NodeId(1); + NodeAddresses addresses = new NodeAddresses(InetAddressAndPort.getByName("127.0.0.1")); + NodeVersion v3 = new NodeVersion(NodeVersion.CURRENT.cassandraVersion, Version.V3); + + Directory directory = Directory.EMPTY + .unsafeWithNodeForTesting(nodeId, addresses, LOCATION, v3) + .withNodeState(nodeId, NodeState.JOINED); + + ClusterMetadata metadata = ClusterMetadataTestHelper.minimalForTesting(Murmur3Partitioner.instance) + .transformer() + .with(directory) + .build().metadata; + + assertEquals("commonSerializationVersion should be V3", Version.V3, metadata.directory.commonSerializationVersion); + + // Startup with higher version - should succeed + Startup startupHigher = new Startup(nodeId, addresses, NodeVersion.CURRENT); + + Transformation.Result resultHigher = startupHigher.execute(metadata); + assertTrue("Startup should succeed for higher serialization version", resultHigher.isSuccess()); + + // Startup with equal version - should succeed + Startup startupEqual = new Startup(nodeId, addresses, v3); + + Transformation.Result resultEqual = startupEqual.execute(metadata); + assertTrue("Startup should succeed for equal serialization version", resultEqual.isSuccess()); + } +}