diff --git a/src/java/org/apache/cassandra/db/compaction/CursorCompactor.java b/src/java/org/apache/cassandra/db/compaction/CursorCompactor.java index 3b4819c15670..1f67470c87b1 100644 --- a/src/java/org/apache/cassandra/db/compaction/CursorCompactor.java +++ b/src/java/org/apache/cassandra/db/compaction/CursorCompactor.java @@ -119,6 +119,13 @@ public static boolean isSupported(AbstractCompactionStrategy.ScannerList scanner TableMetadata metadata = controller.cfs.metadata(); if (unsupportedMetadata(metadata)) return false; + // TODO: Implement GarbageSkipper logic to support Materialized Views. + if (metadata.isView()) + { + if (LOGGER.isDebugEnabled()) logDebugReason(metadata, "Materialized Views are not supported."); + return false; + } + for (ISSTableScanner scanner : scanners.scanners) { // TODO: implement partial range reader @@ -553,7 +560,7 @@ private boolean mergeRows(int rowMergeLimit, DeletionTime partitionActiveDeletio mergedRowDeletion = DeletionTime.LIVE; } - if (rowActiveDeletion.deletes(mergedRowInfo) || purger.shouldPurge(mergedRowInfo, nowInSec)) + if (rowActiveDeletion.deletes(mergedRowInfo) || (mergedRowInfo.isExpiring() && !mergedRowInfo.isLive(nowInSec)) || purger.shouldPurge(mergedRowInfo, nowInSec)) { mergedRowInfo = LivenessInfo.EMPTY; } diff --git a/test/unit/org/apache/cassandra/cql3/ViewFlushTest.java b/test/unit/org/apache/cassandra/cql3/ViewFlushTest.java new file mode 100644 index 000000000000..9784f2a00e19 --- /dev/null +++ b/test/unit/org/apache/cassandra/cql3/ViewFlushTest.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.cql3; + +import java.util.Collections; + +import org.junit.Test; + +import org.apache.cassandra.Util; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.Keyspace; + +public class ViewFlushTest extends ViewAbstractTest +{ + + @Test + public void testView() throws Throwable + { + createTable("CREATE TABLE %s (k int, a int, b int, c int, primary key(k, a)) with default_time_to_live=6000"); + createView("CREATE MATERIALIZED VIEW %s AS SELECT k,a,b FROM %s WHERE k IS NOT NULL AND a IS NOT NULL PRIMARY KEY (a, k)"); + + execute("UPDATE %s SET c=2 WHERE k=1 AND a=1"); + flushView(); + assertRows(execute("SELECT k,a,b,c FROM %s"), row(1, 1, null, 2)); + assertRows(executeView("SELECT k,a,b FROM %s"), row(1, 1, null)); + + // view mutation on update + // [cql_test_keyspace.mv_testview_01] key=1 partition_deletion=LIVE columns=[[] | [b]] + // Row[info=[ts=1770378054212000 ttl=6000, let=1770384054] ]: k=1 | + + // view mutation on delete + // [cql_test_keyspace.mv_testview_01] key=1 partition_deletion=LIVE columns=[[] | [b]] + // Row[info=[ts=1770377809808000 ttl=2147483647, let=1770377924] ]: k=1 | + + execute("DELETE c FROM %s WHERE k=1 AND a=1"); + flushView(); + + assertRows(execute("SELECT k,a,b,c FROM %s"), Collections.emptyList()); + assertRows(executeView("SELECT k,a,b FROM %s"), Collections.emptyList()); + + compact(keyspace(), currentView()); + + assertRows(execute("SELECT k,a,b,c FROM %s"), Collections.emptyList()); + assertRows(executeView("SELECT k,a,b FROM %s"), Collections.emptyList()); + } + + private void flushView() + { + ColumnFamilyStore cfs = Keyspace.open(keyspace()).getColumnFamilyStore(currentView()); + Util.flush(cfs); + } +} \ No newline at end of file diff --git a/test/unit/org/apache/cassandra/db/compaction/simple/CursorCompactionExpiringTest.java b/test/unit/org/apache/cassandra/db/compaction/simple/CursorCompactionExpiringTest.java new file mode 100644 index 000000000000..59c352fd2612 --- /dev/null +++ b/test/unit/org/apache/cassandra/db/compaction/simple/CursorCompactionExpiringTest.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.cassandra.db.compaction.simple; + +import org.junit.After; +import org.junit.Assume; +import org.junit.Before; +import org.junit.Test; + +import org.apache.cassandra.cql3.CQLTester; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.Keyspace; + +import static org.junit.Assert.assertEquals; + +public class CursorCompactionExpiringTest extends CQLTester +{ + private String originalCursorCompactionProperty; + + @Before + public void setup() + { + originalCursorCompactionProperty = System.getProperty("cassandra.cursor_compaction_enabled"); + System.setProperty("cassandra.cursor_compaction_enabled", "true"); + } + + @After + public void teardown() + { + if (originalCursorCompactionProperty != null) + System.setProperty("cassandra.cursor_compaction_enabled", originalCursorCompactionProperty); + else + System.clearProperty("cassandra.cursor_compaction_enabled"); + } + + @Test + public void testExpiredRowPurge() throws Throwable + { + Assume.assumeTrue("Skipping test because Cursor Compaction is disabled.", Boolean.getBoolean("cassandra.cursor_compaction_enabled")); + + createTable("CREATE TABLE %s (k int PRIMARY KEY, v int) WITH gc_grace_seconds=10000"); + execute("INSERT INTO %s (k) VALUES (1) USING TTL 1"); + + flush(); + + ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(currentTable()); + assertEquals("Expected 1 SSTable after flush", 1, cfs.getLiveSSTables().size()); + + Thread.sleep(2000); + compact(); + + assertEquals("Expected 0 SSTables after compacting fully expired table, but found zombie data.", + 0, cfs.getLiveSSTables().size()); + } +} \ No newline at end of file