Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,13 @@ public static boolean isSupported(AbstractCompactionStrategy.ScannerList scanner
TableMetadata metadata = controller.cfs.metadata();
if (unsupportedMetadata(metadata)) return false;

// TODO: Implement GarbageSkipper logic to support Materialized Views.
if (metadata.isView())
{
if (LOGGER.isDebugEnabled()) logDebugReason(metadata, "Materialized Views are not supported.");
return false;
}

for (ISSTableScanner scanner : scanners.scanners)
{
// TODO: implement partial range reader
Expand Down Expand Up @@ -553,7 +560,7 @@ private boolean mergeRows(int rowMergeLimit, DeletionTime partitionActiveDeletio
mergedRowDeletion = DeletionTime.LIVE;
}

if (rowActiveDeletion.deletes(mergedRowInfo) || purger.shouldPurge(mergedRowInfo, nowInSec))
if (rowActiveDeletion.deletes(mergedRowInfo) || (mergedRowInfo.isExpiring() && !mergedRowInfo.isLive(nowInSec)) || purger.shouldPurge(mergedRowInfo, nowInSec))
{
mergedRowInfo = LivenessInfo.EMPTY;
}
Expand Down
67 changes: 67 additions & 0 deletions test/unit/org/apache/cassandra/cql3/ViewFlushTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.cql3;

import java.util.Collections;

import org.junit.Test;

import org.apache.cassandra.Util;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Keyspace;

public class ViewFlushTest extends ViewAbstractTest
{

@Test
public void testView() throws Throwable
{
createTable("CREATE TABLE %s (k int, a int, b int, c int, primary key(k, a)) with default_time_to_live=6000");
createView("CREATE MATERIALIZED VIEW %s AS SELECT k,a,b FROM %s WHERE k IS NOT NULL AND a IS NOT NULL PRIMARY KEY (a, k)");

execute("UPDATE %s SET c=2 WHERE k=1 AND a=1");
flushView();
assertRows(execute("SELECT k,a,b,c FROM %s"), row(1, 1, null, 2));
assertRows(executeView("SELECT k,a,b FROM %s"), row(1, 1, null));

// view mutation on update
// [cql_test_keyspace.mv_testview_01] key=1 partition_deletion=LIVE columns=[[] | [b]]
// Row[info=[ts=1770378054212000 ttl=6000, let=1770384054] ]: k=1 |

// view mutation on delete
// [cql_test_keyspace.mv_testview_01] key=1 partition_deletion=LIVE columns=[[] | [b]]
// Row[info=[ts=1770377809808000 ttl=2147483647, let=1770377924] ]: k=1 |

execute("DELETE c FROM %s WHERE k=1 AND a=1");
flushView();

assertRows(execute("SELECT k,a,b,c FROM %s"), Collections.emptyList());
assertRows(executeView("SELECT k,a,b FROM %s"), Collections.emptyList());

compact(keyspace(), currentView());

assertRows(execute("SELECT k,a,b,c FROM %s"), Collections.emptyList());
assertRows(executeView("SELECT k,a,b FROM %s"), Collections.emptyList());
}

private void flushView()
{
ColumnFamilyStore cfs = Keyspace.open(keyspace()).getColumnFamilyStore(currentView());
Util.flush(cfs);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.cassandra.db.compaction.simple;

import org.junit.After;
import org.junit.Assume;
import org.junit.Before;
import org.junit.Test;

import org.apache.cassandra.cql3.CQLTester;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Keyspace;

import static org.junit.Assert.assertEquals;

public class CursorCompactionExpiringTest extends CQLTester
{
private String originalCursorCompactionProperty;

@Before
public void setup()
{
originalCursorCompactionProperty = System.getProperty("cassandra.cursor_compaction_enabled");
System.setProperty("cassandra.cursor_compaction_enabled", "true");
}

@After
public void teardown()
{
if (originalCursorCompactionProperty != null)
System.setProperty("cassandra.cursor_compaction_enabled", originalCursorCompactionProperty);
else
System.clearProperty("cassandra.cursor_compaction_enabled");
}

@Test
public void testExpiredRowPurge() throws Throwable
{
Assume.assumeTrue("Skipping test because Cursor Compaction is disabled.", Boolean.getBoolean("cassandra.cursor_compaction_enabled"));

createTable("CREATE TABLE %s (k int PRIMARY KEY, v int) WITH gc_grace_seconds=10000");
execute("INSERT INTO %s (k) VALUES (1) USING TTL 1");

flush();

ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(currentTable());
assertEquals("Expected 1 SSTable after flush", 1, cfs.getLiveSSTables().size());

Thread.sleep(2000);
compact();

assertEquals("Expected 0 SSTables after compacting fully expired table, but found zombie data.",
0, cfs.getLiveSSTables().size());
}
}