Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
5.1
* Wire compaction_read_disk_access_mode through cursor-based compaction (CASSANDRA-21147)
* Reduce memory allocation during transformation of BatchStatement to Mutation (CASSANDRA-21141)
* Direct I/O support for compaction reads (CASSANDRA-19987)
* Support custom StartupCheck implementations via SPI (CASSANDRA-21093)
Expand Down
39 changes: 30 additions & 9 deletions src/java/org/apache/cassandra/db/compaction/CursorCompactor.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@
import java.util.function.LongPredicate;

import com.google.common.collect.ImmutableSet;
import com.google.common.collect.UnmodifiableIterator;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.cassandra.config.Config.DiskAccessMode;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.AbstractCompactionController;
import org.apache.cassandra.db.ClusteringComparator;
Expand Down Expand Up @@ -70,6 +70,7 @@
import org.apache.cassandra.schema.CompactionParams;
import org.apache.cassandra.schema.SchemaConstants;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.utils.Throwables;
import org.apache.cassandra.utils.TimeUUID;

import static org.apache.cassandra.db.ClusteringPrefix.Kind.EXCL_END_BOUND;
Expand Down Expand Up @@ -294,15 +295,8 @@ private CursorCompactor(OperationType type,
* {@link CompactionIterator#CompactionIterator(OperationType, List, AbstractCompactionController, long, TimeUUID, ActiveCompactionsTracker)}
*/

// Convert Readers to Cursors
this.sstableCursors = new StatefulCursor[sstables.size()];
this.sstableCursors = convertScannersToCursors(scanners, sstables, DatabaseDescriptor.getCompactionReadDiskAccessMode());
this.sstableCursorsEqualsNext = new boolean[sstables.size()];
UnmodifiableIterator<SSTableReader> iterator = sstables.iterator();
for (int i = 0; i < this.sstableCursors.length; i++)
{
SSTableReader ssTableReader = iterator.next();
this.sstableCursors[i] = new StatefulCursor(ssTableReader);
}
this.enforceStrictLiveness = controller.cfs.metadata.get().enforceStrictLiveness();

purger = new Purger(type, controller, nowInSec);
Expand Down Expand Up @@ -1553,6 +1547,33 @@ private static String mergeHistogramToString(long[] histogram)
return sb.toString();
}

/**
* Closes scanner-opened readers before opening cursor-specific readers with the configured disk access mode.
* In cursor compaction, scanners are only used for metadata; closing them avoids holding redundant file
* descriptors and prevents conflicts when scan and non-scan readers for the same file share thread-local
* buffer state on the same thread.
*/
private static StatefulCursor[] convertScannersToCursors(List<ISSTableScanner> scanners, ImmutableSet<SSTableReader> sstables,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has some smell to it. Should the key in ThreadLocalReadAheadBuffer include the disk access mode? Should these scanners have been opened with the correct disk access mode in the first place or is it still an issue because we then need to re-open with the Cursor based approach?

Copy link
Contributor Author

@samueldlightfoot samueldlightfoot Feb 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree there's a smell - the cursor compaction code has been tailored to fit the existing compaction interface and accept scanners and pull just metadata from them, whereas raw SSTables would have suited better. The ideal refactor I see is to let each separate compaction pipeline open the readers it requires, but this may be better done in a follow-up.

The disk access mode is not actually the issue here, it is the fact two readers (one scan and one non-scan) exist for an SSTable at the same time on the same thread. On trunk this is a bug causing cursor compaction to use scan reads (with a read-ahead buffer) rather than random access reads (intended), due to ScanCompressedReader's allocated() check being based on shared static state, looking like the below

        @Override
        public boolean allocated()
        {
            // this checks the static block map, which is inherently shared between the two 
            // readers (scan + non-scan)
            return readAheadBuffer.hasBuffer(); 
        }

Closing the scanner before opening the cursor reader deallocates the block map and thus allocated() returns false when opening the cursor readers, leading to the correct random access reader 'reader' being chosen:

// How readChunk picks the reader (scan vs random)
CompressedReader readFrom = (scanReader != null && scanReader.allocated()) ? scanReader : reader;

This is perhaps slightly brittle from CompressedChunkReader's allocated() perspective, but the precondition that a given file is not opened by two readers from the same thread concurrently does not seem unreasonable (like it is currently in cursor compaction). I did look into guarding instantiation for the same file but it caused a large number of test failures (I cannot remember the full details).

DiskAccessMode diskAccessMode)
{
for (ISSTableScanner scanner : scanners)
scanner.close();

StatefulCursor[] cursors = new StatefulCursor[sstables.size()];
int i = 0;
try
{
for (SSTableReader reader : sstables)
cursors[i++] = new StatefulCursor(reader, diskAccessMode);
return cursors;
}
catch (RuntimeException | Error e)
{
Throwables.closeNonNullAndAddSuppressed(e, cursors);
throw e;
}
}

public void close()
{
try
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.cassandra.db.compaction;

import org.apache.cassandra.config.Config;
import org.apache.cassandra.config.Config.DiskAccessMode;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.ReusableLivenessInfo;
Expand Down Expand Up @@ -55,9 +56,9 @@ class StatefulCursor extends SSTableCursorReader

private boolean isOpenRangeTombstonePresent = false;

public StatefulCursor(SSTableReader reader)
public StatefulCursor(SSTableReader reader, DiskAccessMode diskAccessMode)
{
super(reader);
super(reader, diskAccessMode);
currPartition = new PartitionDescriptor(reader.getPartitioner().createReusableKey(0));
prevPartition = new PartitionDescriptor(reader.getPartitioner().createReusableKey(0));
unfiltered = new UnfilteredDescriptor(reader.header.clusteringTypes().toArray(AbstractType[]::new));
Expand Down
14 changes: 10 additions & 4 deletions src/java/org/apache/cassandra/io/sstable/SSTableCursorReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import com.google.common.collect.ImmutableList;

import org.apache.cassandra.config.Config.DiskAccessMode;
import org.apache.cassandra.db.ClusteringPrefix;
import org.apache.cassandra.db.Columns;
import org.apache.cassandra.db.DeletionTime;
Expand Down Expand Up @@ -197,15 +198,20 @@ public static SSTableCursorReader fromDescriptor(Descriptor desc) throws IOExcep
{
TableMetadata metadata = Util.metadataFromSSTable(desc);
SSTableReader reader = SSTableReader.openNoValidation(null, desc, TableMetadataRef.forOfflineTools(metadata));
return new SSTableCursorReader(reader, metadata, reader.ref());
return new SSTableCursorReader(reader, metadata, reader.ref(), null);
}

public SSTableCursorReader(SSTableReader reader)
{
this(reader, reader.metadata(), null);
this(reader, reader.metadata(), null, null);
}

private SSTableCursorReader(SSTableReader reader, TableMetadata metadata, Ref<SSTableReader> readerRef)
public SSTableCursorReader(SSTableReader reader, DiskAccessMode diskAccessMode)
{
this(reader, reader.metadata(), null, diskAccessMode);
}

private SSTableCursorReader(SSTableReader reader, TableMetadata metadata, Ref<SSTableReader> readerRef, DiskAccessMode diskAccessMode)
{
ssTableReader = reader;
ssTableReaderRef = readerRef;
Expand All @@ -221,7 +227,7 @@ private SSTableCursorReader(SSTableReader reader, TableMetadata metadata, Ref<SS
deserializationHelper = new DeserializationHelper(metadata, version.correspondingMessagingVersion(), DeserializationHelper.Flag.LOCAL, null);
serializationHeader = reader.header;

dataReader = reader.openDataReader();
dataReader = reader.openDataReader(diskAccessMode);
hasStaticColumns = metadata.hasStaticColumns();
}

Expand Down
44 changes: 31 additions & 13 deletions src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import javax.annotation.Nullable;

import com.clearspring.analytics.stream.cardinality.CardinalityMergeException;
import com.clearspring.analytics.stream.cardinality.ICardinality;
import com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -1417,44 +1419,60 @@ public StatsMetadata getSSTableMetadata()
return sstableMetadata;
}

public RandomAccessReader openDataReader()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should there really be a no access mode arg versions here? Shouldn't everyone know what access mode they want or explicitly supply they don't care?

Not sure if I am right here and the verbosity is worth it. The concern is people call the no-arg version when they should have passed configuration, but at some point it's not worth it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one may not be worth it. We'd have to make sure the original dFile is available to callers so they can pass dFile.diskAccessMode() in (i.e. the default).

There are currently >10 usages of openDataReader/openDataReaderForScan that we'd have to pass it through for, and I can see a couple at a glance that do not have access to the FileHandle for the data file (dFile).

We could improve the docs to indicate the files default disk access mode will be used for these overloads.

{
return openDataReaderInternal(null, null, false);
}

public RandomAccessReader openDataReader(RateLimiter limiter)
{
assert limiter != null;
return dfile.createReader(limiter);
return openDataReaderInternal(null, limiter, false);
}

public RandomAccessReader openDataReader()
public RandomAccessReader openDataReader(DiskAccessMode diskAccessMode)
{
return dfile.createReader();
return openDataReaderInternal(diskAccessMode, null, false);
}

public RandomAccessReader openDataReaderForScan()
{
return openDataReaderForScan(dfile.diskAccessMode());
return openDataReaderInternal(null, null, true);
}

public RandomAccessReader openDataReaderForScan(DiskAccessMode diskAccessMode)
{
boolean isSameDiskAccessMode = diskAccessMode == dfile.diskAccessMode();
boolean isDirectIONotSupported = diskAccessMode == DiskAccessMode.direct && !dfile.supportsDirectIO();
return openDataReaderInternal(diskAccessMode, null, true);
}

if (isSameDiskAccessMode || isDirectIONotSupported)
return dfile.createReaderForScan(OnReaderClose.RETAIN_FILE_OPEN);
private RandomAccessReader openDataReaderInternal(@Nullable DiskAccessMode diskAccessMode,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logic in this method (and similar from the introduction of DIO for scans) needs test coverage to make sure it can re-use when it is supposed to re-use, and fall back to creating a new dfile, and then closes the dfile correctly.

I haven't checked that these tests exist yet just pointing it out.

@Nullable RateLimiter limiter,
boolean forScan)
{
if (canReuseDfile(diskAccessMode))
return dfile.createReader(limiter, forScan, OnReaderClose.RETAIN_FILE_OPEN);

FileHandle dataFile = dfile.toBuilder()
.withDiskAccessMode(diskAccessMode)
.complete();
FileHandle handle = dfile.toBuilder()
.withDiskAccessMode(diskAccessMode)
.complete();
try
{
return dataFile.createReaderForScan(OnReaderClose.CLOSE_FILE);
return handle.createReader(limiter, forScan, OnReaderClose.CLOSE_FILE);
}
catch (Throwable t)
{
dataFile.close();
handle.close();
throw t;
}
}

private boolean canReuseDfile(@Nullable DiskAccessMode diskAccessMode)
{
return diskAccessMode == null
|| diskAccessMode == dfile.diskAccessMode()
|| (diskAccessMode == DiskAccessMode.direct && !dfile.supportsDirectIO());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we at any point log that we weren't able to open the file with direct IO as requested? Might be a use case for a rate limited logger.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We fail on start-up if any of the data file directories do not support direct I/O.

The only case FileHandle::supportsDirectIO would return false is when an SSTable is uncompressed as we have only implemented support for the compressed case, which may be deemed as noise if we log for. Curious on your thoughts.

}

public void trySkipFileCacheBefore(DecoratedKey key)
{
long position = getPosition(key, SSTableReader.Operator.GE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,8 @@ public ByteBuffer getBuffer(int size)

private static void cleanBuffer(ByteBuffer buffer)
{
// Aligned buffers are slices; clean the backing buffer (attachment)
DirectBuffer db = (DirectBuffer) buffer;
ByteBuffer attachment = (ByteBuffer) db.attachment();
MemoryUtil.clean(attachment != null ? attachment : buffer);
// Aligned buffers from BufferUtil.allocateDirectAligned are slices; clean the backing buffer (attachment)
MemoryUtil.clean((ByteBuffer) ((DirectBuffer) buffer).attachment());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@
import org.agrona.BufferUtil;

import org.apache.cassandra.io.sstable.CorruptSSTableException;
import org.apache.cassandra.utils.memory.MemoryUtil;

import sun.nio.ch.DirectBuffer;

public final class DirectThreadLocalReadAheadBuffer extends ThreadLocalReadAheadBuffer
{
Expand All @@ -46,4 +49,12 @@ protected void loadBlock(ByteBuffer blockBuffer, long blockPosition, int sizeToR
if (channel.read(blockBuffer, blockPosition) < sizeToRead)
throw new CorruptSSTableException(null, channel.filePath());
}

@Override
protected void cleanBuffer(ByteBuffer buffer)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this check be done in MemoryUtil.clean or should MemoryUtil.clean have some kind of assertion added for cases where the buffer is a slice? I imagine if we try to clean a slice we get some error on the native side, but maybe it would be cleaner to error out in Java with a nice stack trace.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Strongly agree it should throw - it would have detected the potential leak. Changes pushed.

{
// Aligned buffers from BufferUtil.allocateDirectAligned are slices; clean the backing buffer (attachment)
MemoryUtil.clean((ByteBuffer) ((DirectBuffer) buffer).attachment());
}

}
5 changes: 0 additions & 5 deletions src/java/org/apache/cassandra/io/util/FileHandle.java
Original file line number Diff line number Diff line change
Expand Up @@ -196,11 +196,6 @@ public RandomAccessReader createReader()
return createReader(null);
}

public RandomAccessReader createReaderForScan(OnReaderClose onReaderClose)
{
return createReader(null, true, onReaderClose);
}

/**
* Create {@link RandomAccessReader} with configured method of reading content of the file.
* Reading from file will be rate limited by given {@link RateLimiter}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,11 +156,16 @@ public void clear(boolean deallocate)
blockBuffer.clear();
if (deallocate)
{
MemoryUtil.clean(blockBuffer);
cleanBuffer(blockBuffer);
block.buffer = null;
}
}

protected void cleanBuffer(ByteBuffer buffer)
{
MemoryUtil.clean(buffer);
}

@Override
public void close()
{
Expand Down
4 changes: 4 additions & 0 deletions src/java/org/apache/cassandra/utils/MerkleTree.java
Original file line number Diff line number Diff line change
Expand Up @@ -991,7 +991,11 @@ void release()
{
Object attachment = MemoryUtil.getAttachment(buffer);
if (attachment instanceof Ref.DirectBufferRef)
{
// Attachment set in trace mode only
((Ref.DirectBufferRef) attachment).release();
MemoryUtil.setAttachment(buffer, null);
}
MemoryUtil.clean(buffer);
}

Expand Down
7 changes: 6 additions & 1 deletion src/java/org/apache/cassandra/utils/memory/MemoryUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -324,14 +324,19 @@ public static void getBytes(long sourceAddress, ByteBuffer targetBuffer, int len
getBytes(sourceAddress, targetBuffer, 0, length);
}

/*
* Clean a direct ByteBuffer that is a root allocation (not a slice, duplicate, or view).
*/
public static void clean(ByteBuffer buffer)
{
if (buffer == null || !buffer.isDirect())
return;

DirectBuffer db = (DirectBuffer) buffer;
if (db.attachment() != null)
return; // duplicate or slice
throw new IllegalArgumentException(
"Cannot clean a slice/duplicate/view buffer directly; " +
"resolve to the root allocation before calling clean()");

unsafe.invokeCleaner(buffer);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,49 @@
package org.apache.cassandra.db.compaction.simple;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.ExecutionException;

import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import org.apache.cassandra.config.Config.DiskAccessMode;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.cql3.CQLTester;
import org.apache.cassandra.utils.TestHelper;


@Ignore
@RunWith(Parameterized.class)
public abstract class SimpleCompactionTest extends CQLTester
{
@Parameterized.Parameter
public DiskAccessMode compactionReadDiskAccessMode;

@Parameterized.Parameters(name = "{0}")
public static Collection<Object[]> diskAccessModes()
{
return Arrays.asList(new Object[]{ DiskAccessMode.standard },
new Object[]{ DiskAccessMode.direct });
}

@Before
public void setCompactionReadDiskAccessMode()
{
DatabaseDescriptor.setCompactionReadDiskAccessMode(compactionReadDiskAccessMode);
}

@After
public void restoreCompactionReadDiskAccessMode()
{
DatabaseDescriptor.setCompactionReadDiskAccessMode(DiskAccessMode.standard);
}

@AfterClass
public static void teardown() throws IOException, InterruptedException, ExecutionException
{
Expand Down
Loading