diff --git a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java index cf514b7dfce1..01f3bf31afcd 100644 --- a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java +++ b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java @@ -531,6 +531,12 @@ public enum CassandraRelevantProperties SIZE_RECORDER_INTERVAL("cassandra.size_recorder_interval", "300"), SKIP_AUTH_SETUP("cassandra.skip_auth_setup", "false"), SKIP_GC_INSPECTOR("cassandra.skip_gc_inspector", "false"), + /** + * Skip the metadata serialization version check during node registration and startup. + * WARNING: Only use this if you are certain the cluster metadata log contains no entries + * that require a newer serialization version. Misuse can lead to unreadable metadata. + */ + SKIP_METADATA_SERIALIZATION_VERSION_CHECK("cassandra.skip_metadata_serialization_version_check", "false"), /** * Do not try to calculate optimal streaming candidates. This can take a lot of time in some configs specially diff --git a/src/java/org/apache/cassandra/tcm/transformations/Register.java b/src/java/org/apache/cassandra/tcm/transformations/Register.java index 0b3ebb375fab..4b17fbd37d56 100644 --- a/src/java/org/apache/cassandra/tcm/transformations/Register.java +++ b/src/java/org/apache/cassandra/tcm/transformations/Register.java @@ -28,6 +28,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.cassandra.config.CassandraRelevantProperties; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.SystemKeyspace; import org.apache.cassandra.io.util.DataInputPlus; @@ -74,6 +75,22 @@ public Kind kind() @Override public Result execute(ClusterMetadata prev) { + // Ensure the joining node can read existing cluster metadata. + // Skip check for empty directory (first node in a new cluster). + if (!prev.directory.isEmpty() && !CassandraRelevantProperties.SKIP_METADATA_SERIALIZATION_VERSION_CHECK.getBoolean()) + { + Version clusterVersion = prev.directory.commonSerializationVersion; + Version newNodeVersion = version.serializationVersion(); + if (newNodeVersion.isBefore(clusterVersion)) + { + return new Rejected(INVALID, + String.format("Cannot register node: this node's metadata serialization version %s " + + "is lower than the cluster's minimum required version %s. " + + "Node would not be able to read cluster metadata.", + newNodeVersion, clusterVersion)); + } + } + for (Map.Entry entry : prev.directory.addresses.entrySet()) { NodeAddresses existingAddresses = entry.getValue(); diff --git a/src/java/org/apache/cassandra/tcm/transformations/Startup.java b/src/java/org/apache/cassandra/tcm/transformations/Startup.java index a1abfdbff4ec..e050f680764b 100644 --- a/src/java/org/apache/cassandra/tcm/transformations/Startup.java +++ b/src/java/org/apache/cassandra/tcm/transformations/Startup.java @@ -22,6 +22,7 @@ import java.util.Map; import java.util.Objects; +import org.apache.cassandra.config.CassandraRelevantProperties; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.locator.InetAddressAndPort; @@ -68,6 +69,22 @@ public Kind kind() @Override public Result execute(ClusterMetadata prev) { + // Prevent downgrade to a version that cannot read cluster metadata. + // This protects against restarting a node with an older binary. + if (!CassandraRelevantProperties.SKIP_METADATA_SERIALIZATION_VERSION_CHECK.getBoolean()) + { + Version clusterVersion = prev.directory.commonSerializationVersion; + Version newNodeVersion = nodeVersion.serializationVersion(); + if (newNodeVersion.isBefore(clusterVersion)) + { + return new Rejected(INVALID, + String.format("Cannot downgrade node: this node's metadata serialization version %s " + + "is lower than the cluster's minimum required version %s. " + + "Node would not be able to read cluster metadata.", + newNodeVersion, clusterVersion)); + } + } + ClusterMetadata.Transformer next = prev.transformer(); if (!prev.directory.addresses.get(nodeId).equals(addresses)) { diff --git a/test/distributed/org/apache/cassandra/distributed/test/log/RegisterTest.java b/test/distributed/org/apache/cassandra/distributed/test/log/RegisterTest.java index 393e9efd7707..cce1e430290a 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/log/RegisterTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/log/RegisterTest.java @@ -25,6 +25,7 @@ import org.junit.Test; +import org.apache.cassandra.config.CassandraRelevantProperties; import org.apache.cassandra.distributed.Cluster; import org.apache.cassandra.distributed.api.Feature; import org.apache.cassandra.distributed.api.IInstanceConfig; @@ -49,15 +50,15 @@ import org.apache.cassandra.tcm.serialization.Version; import org.apache.cassandra.tcm.transformations.PrepareLeave; import org.apache.cassandra.tcm.transformations.Register; -import org.apache.cassandra.tcm.transformations.Startup; import org.apache.cassandra.tcm.transformations.TriggerSnapshot; import org.apache.cassandra.tcm.transformations.Unregister; -import org.apache.cassandra.utils.CassandraVersion; import static org.junit.Assert.assertEquals; public class RegisterTest extends TestBaseImpl { + private static final Location TEST_LOCATION = new Location("datacenter1", "rack1"); + @Test public void testRegistrationIdempotence() throws Throwable { @@ -108,23 +109,20 @@ public void serializationVersionCeilingTest() throws Throwable cluster.get(1).runOnInstance(() -> { try { - // Register a ghost node with V0 to fake-force V0 serialization. In a real world cluster we will always be upgrading from a smaller version. - ClusterMetadataService.instance().commit(new Register(new NodeAddresses(InetAddressAndPort.getByName(firstNodeEndpoint)), - ClusterMetadata.current().directory.location(ClusterMetadata.current().myNodeId()), - new NodeVersion(NodeVersion.CURRENT.cassandraVersion, Version.V0))); + // Use skip property to register a node with V0 (simulates upgrading from older version) + CassandraRelevantProperties.SKIP_METADATA_SERIALIZATION_VERSION_CHECK.setBoolean(true); + try + { + ClusterMetadataService.instance().commit(new Register(new NodeAddresses(InetAddressAndPort.getByName(firstNodeEndpoint)), + TEST_LOCATION, + new NodeVersion(NodeVersion.CURRENT.cassandraVersion, Version.V0))); + } + finally + { + CassandraRelevantProperties.SKIP_METADATA_SERIALIZATION_VERSION_CHECK.setBoolean(false); + } NodeId oldNode = ClusterMetadata.current().directory.peerId(InetAddressAndPort.getByName(firstNodeEndpoint)); - // Fake an upgrade of this node and assert we continue to serialize so that the one which only - // supports V0 can deserialize. In a real cluster it wouldn't happen exactly in this way (here the - // min serialization version actually goes backwards from CURRENT to V0 when we upgrade, which would - // not happen in a real cluster as we would never register like oldNode, with the current C* version - // but an older metadata version - CassandraVersion currentVersion = NodeVersion.CURRENT.cassandraVersion; - NodeVersion upgraded = new NodeVersion(new CassandraVersion(String.format("%d.%d.%d", currentVersion.major + 1, 0, 0)), - NodeVersion.CURRENT_METADATA_VERSION); - ClusterMetadata metadata = ClusterMetadata.current(); - NodeId id = metadata.myNodeId(); - Startup startup = new Startup(id, metadata.directory.getNodeAddresses(id), upgraded); - ClusterMetadataService.instance().commit(startup); + // Doesn't matter which specific Transformation we use here, we're testing that the serializer uses // the correct lower bound Transformation t = new Register(NodeAddresses.current(), new Location("DC", "RACK"), NodeVersion.CURRENT); @@ -173,10 +171,18 @@ public void replayLocallyFromV0Snapshot() throws Throwable cluster.get(1).runOnInstance(() -> { try { - // Register a ghost node with V0 to fake-force V0 serialization. In a real world cluster we will always be upgrading from a smaller version. - ClusterMetadataService.instance().commit(new Register(new NodeAddresses(InetAddressAndPort.getByName("127.0.0.10")), - ClusterMetadata.current().directory.location(ClusterMetadata.current().myNodeId()), - new NodeVersion(NodeVersion.CURRENT.cassandraVersion, Version.V0))); + // Use skip property to register a node with V0 (simulates upgrading from older version) + CassandraRelevantProperties.SKIP_METADATA_SERIALIZATION_VERSION_CHECK.setBoolean(true); + try + { + ClusterMetadataService.instance().commit(new Register(new NodeAddresses(InetAddressAndPort.getByName("127.0.0.10")), + TEST_LOCATION, + new NodeVersion(NodeVersion.CURRENT.cassandraVersion, Version.V0))); + } + finally + { + CassandraRelevantProperties.SKIP_METADATA_SERIALIZATION_VERSION_CHECK.setBoolean(false); + } } catch (UnknownHostException e) { @@ -187,9 +193,6 @@ public void replayLocallyFromV0Snapshot() throws Throwable ClusterMetadata cm = new MetadataSnapshots.SystemKeyspaceMetadataSnapshots().getSnapshot(ClusterMetadata.current().epoch); cm.equals(ClusterMetadata.current()); }); - - } } - } diff --git a/test/unit/org/apache/cassandra/tcm/transformations/RegisterTest.java b/test/unit/org/apache/cassandra/tcm/transformations/RegisterTest.java new file mode 100644 index 000000000000..3b385b01b4bf --- /dev/null +++ b/test/unit/org/apache/cassandra/tcm/transformations/RegisterTest.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.tcm.transformations; + +import java.net.UnknownHostException; + +import org.junit.Test; + +import org.apache.cassandra.dht.Murmur3Partitioner; +import org.apache.cassandra.distributed.test.log.ClusterMetadataTestHelper; +import org.apache.cassandra.exceptions.ExceptionCode; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.tcm.Transformation; +import org.apache.cassandra.tcm.membership.Directory; +import org.apache.cassandra.tcm.membership.Location; +import org.apache.cassandra.tcm.membership.NodeAddresses; +import org.apache.cassandra.tcm.membership.NodeId; +import org.apache.cassandra.tcm.membership.NodeState; +import org.apache.cassandra.tcm.membership.NodeVersion; +import org.apache.cassandra.tcm.serialization.Version; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class RegisterTest +{ + private static final Location LOCATION = new Location("dc", "rack"); + + /** + * Tests that registering a new node with a serialization version lower than the cluster's + * commonSerializationVersion is rejected. + */ + @Test + public void rejectsLowerSerializationVersion() throws UnknownHostException + { + NodeId existingNode = new NodeId(1); + + Directory directory = Directory.EMPTY + .unsafeWithNodeForTesting(existingNode, + new NodeAddresses(InetAddressAndPort.getByName("127.0.0.1")), + LOCATION, + NodeVersion.CURRENT) + .withNodeState(existingNode, NodeState.JOINED); + + ClusterMetadata metadata = ClusterMetadataTestHelper.minimalForTesting(Murmur3Partitioner.instance) + .transformer() + .with(directory) + .build().metadata; + + assertEquals("commonSerializationVersion should be CURRENT_METADATA_VERSION", NodeVersion.CURRENT_METADATA_VERSION, metadata.directory.commonSerializationVersion); + + // Try to register a new node with V3 (lower than cluster's current version) + NodeVersion lowerVersion = new NodeVersion(NodeVersion.CURRENT.cassandraVersion, Version.V3); + Register register = new Register( + new NodeAddresses(InetAddressAndPort.getByName("127.0.0.2")), + LOCATION, + lowerVersion + ); + + Transformation.Result result = register.execute(metadata); + + assertTrue("Registration should be rejected for node with lower serialization version", result.isRejected()); + assertEquals(ExceptionCode.INVALID, result.rejected().code); + } + + /** + * Tests that registering nodes with serialization version equal to or higher than + * the cluster's commonSerializationVersion is allowed. + */ + @Test + public void allowsEqualOrHigherSerializationVersion() throws UnknownHostException + { + NodeId existingNode = new NodeId(1); + NodeVersion v3 = new NodeVersion(NodeVersion.CURRENT.cassandraVersion, Version.V3); + + Directory directory = Directory.EMPTY + .unsafeWithNodeForTesting(existingNode, + new NodeAddresses(InetAddressAndPort.getByName("127.0.0.1")), + LOCATION, + v3) + .withNodeState(existingNode, NodeState.JOINED); + + ClusterMetadata metadata = ClusterMetadataTestHelper.minimalForTesting(Murmur3Partitioner.instance) + .transformer() + .with(directory) + .build().metadata; + + assertEquals("commonSerializationVersion should be V3", Version.V3, metadata.directory.commonSerializationVersion); + + // Register a node with higher version - should succeed + Register registerHigher = new Register( + new NodeAddresses(InetAddressAndPort.getByName("127.0.0.2")), + LOCATION, + NodeVersion.CURRENT + ); + + Transformation.Result resultHigher = registerHigher.execute(metadata); + assertTrue("Registration should succeed for node with higher serialization version", resultHigher.isSuccess()); + + // Register a node with equal version - should succeed + Register registerEqual = new Register( + new NodeAddresses(InetAddressAndPort.getByName("127.0.0.3")), + LOCATION, + v3 + ); + + Transformation.Result resultEqual = registerEqual.execute(metadata); + assertTrue("Registration should succeed for node with equal serialization version", resultEqual.isSuccess()); + } + + /** + * Tests that the first node in an empty cluster can register with any version + * (bypasses version check because directory is empty). + */ + @Test + public void allowsAnyVersionForFirstNode() throws UnknownHostException + { + ClusterMetadata metadata = ClusterMetadataTestHelper.minimalForTesting(Murmur3Partitioner.instance); + + assertTrue("Directory should be empty", metadata.directory.isEmpty()); + + // Register first node with V0 - should succeed because directory is empty + NodeVersion v0 = new NodeVersion(NodeVersion.CURRENT.cassandraVersion, Version.V0); + Register register = new Register( + new NodeAddresses(InetAddressAndPort.getByName("127.0.0.1")), + LOCATION, + v0 + ); + + Transformation.Result result = register.execute(metadata); + assertTrue("First node registration should succeed with any version", result.isSuccess()); + } +} \ No newline at end of file diff --git a/test/unit/org/apache/cassandra/tcm/transformations/StartupTest.java b/test/unit/org/apache/cassandra/tcm/transformations/StartupTest.java new file mode 100644 index 000000000000..306322e97621 --- /dev/null +++ b/test/unit/org/apache/cassandra/tcm/transformations/StartupTest.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.tcm.transformations; + +import java.net.UnknownHostException; + +import org.junit.Test; + +import org.apache.cassandra.dht.Murmur3Partitioner; +import org.apache.cassandra.distributed.test.log.ClusterMetadataTestHelper; +import org.apache.cassandra.exceptions.ExceptionCode; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.tcm.Transformation; +import org.apache.cassandra.tcm.membership.Directory; +import org.apache.cassandra.tcm.membership.Location; +import org.apache.cassandra.tcm.membership.NodeAddresses; +import org.apache.cassandra.tcm.membership.NodeId; +import org.apache.cassandra.tcm.membership.NodeState; +import org.apache.cassandra.tcm.membership.NodeVersion; +import org.apache.cassandra.tcm.serialization.Version; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class StartupTest +{ + private static final Location LOCATION = new Location("dc", "rack"); + + /** + * Tests that the Startup transformation rejects downgrading a node to a version + * that cannot read cluster metadata. + */ + @Test + public void rejectsDowngrade() throws UnknownHostException + { + NodeId nodeId = new NodeId(1); + NodeAddresses addresses = new NodeAddresses(InetAddressAndPort.getByName("127.0.0.1")); + + Directory directory = Directory.EMPTY + .unsafeWithNodeForTesting(nodeId, addresses, LOCATION, NodeVersion.CURRENT) + .withNodeState(nodeId, NodeState.JOINED); + + ClusterMetadata metadata = ClusterMetadataTestHelper.minimalForTesting(Murmur3Partitioner.instance) + .transformer() + .with(directory) + .build().metadata; + + assertEquals("commonSerializationVersion should be CURRENT_METADATA_VERSION", + NodeVersion.CURRENT_METADATA_VERSION, metadata.directory.commonSerializationVersion); + + // Try to "downgrade" the node to V3 (simulating restart with older binary) + NodeVersion downgradedVersion = new NodeVersion(NodeVersion.CURRENT.cassandraVersion, Version.V3); + Startup startup = new Startup(nodeId, addresses, downgradedVersion); + + Transformation.Result result = startup.execute(metadata); + + assertTrue("Startup should be rejected for downgrade to lower serialization version", result.isRejected()); + assertEquals(ExceptionCode.INVALID, result.rejected().code); + } + + /** + * Tests that the Startup transformation allows a node to restart with equal or higher + * serialization version. + */ + @Test + public void allowsEqualOrHigherSerializationVersion() throws UnknownHostException + { + NodeId nodeId = new NodeId(1); + NodeAddresses addresses = new NodeAddresses(InetAddressAndPort.getByName("127.0.0.1")); + NodeVersion v3 = new NodeVersion(NodeVersion.CURRENT.cassandraVersion, Version.V3); + + Directory directory = Directory.EMPTY + .unsafeWithNodeForTesting(nodeId, addresses, LOCATION, v3) + .withNodeState(nodeId, NodeState.JOINED); + + ClusterMetadata metadata = ClusterMetadataTestHelper.minimalForTesting(Murmur3Partitioner.instance) + .transformer() + .with(directory) + .build().metadata; + + assertEquals("commonSerializationVersion should be V3", Version.V3, metadata.directory.commonSerializationVersion); + + // Startup with higher version - should succeed + Startup startupHigher = new Startup(nodeId, addresses, NodeVersion.CURRENT); + + Transformation.Result resultHigher = startupHigher.execute(metadata); + assertTrue("Startup should succeed for higher serialization version", resultHigher.isSuccess()); + + // Startup with equal version - should succeed + Startup startupEqual = new Startup(nodeId, addresses, v3); + + Transformation.Result resultEqual = startupEqual.execute(metadata); + assertTrue("Startup should succeed for equal serialization version", resultEqual.isSuccess()); + } +} \ No newline at end of file