From 0d16207ff0769aae5567a2efda1bce7426b03165 Mon Sep 17 00:00:00 2001 From: samlightfoot Date: Sat, 31 Jan 2026 11:40:57 +0000 Subject: [PATCH 1/2] Wire compaction_read_disk_access_mode through cursor-based compaction This change wires DiskAccessMode through cursor-based compaction, enabling direct I/O reads for cursor compaction to match the existing support in the iterator-based compaction path. Key changes: - Thread DiskAccessMode from DatabaseDescriptor.getCompactionReadDiskAccessMode() through CursorCompactor, StatefulCursor, and SSTableCursorReader - Consolidate SSTableReader's openDataReader/openDataReaderForScan variants into a unified openDataReaderInternal with canReuseDfile guard - Fix DirectThreadLocalReadAheadBuffer.cleanBuffer() to clean the backing buffer rather than the aligned slice - Ensure SSTable scanners are closed before opening cursor readers (resolving readahead buffer interference and excessive fd usage) --- CHANGES.txt | 1 + .../db/compaction/CursorCompactor.java | 39 +++++++++++---- .../db/compaction/StatefulCursor.java | 5 +- .../io/sstable/SSTableCursorReader.java | 14 ++++-- .../io/sstable/format/SSTableReader.java | 44 ++++++++++++----- .../DirectThreadLocalReadAheadBuffer.java | 13 +++++ .../apache/cassandra/io/util/FileHandle.java | 5 -- .../io/util/ThreadLocalReadAheadBuffer.java | 7 ++- .../simple/SimpleCompactionTest.java | 31 ++++++++++++ .../DirectThreadLocalReadAheadBufferTest.java | 49 +++++++++++++++++++ 10 files changed, 174 insertions(+), 34 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 016aa739f8cf..115f6b3240f7 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 5.1 + * Wire compaction_read_disk_access_mode through cursor-based compaction (CASSANDRA-21147) * Reduce memory allocation during transformation of BatchStatement to Mutation (CASSANDRA-21141) * Direct I/O support for compaction reads (CASSANDRA-19987) * Support custom StartupCheck implementations via SPI (CASSANDRA-21093) diff --git a/src/java/org/apache/cassandra/db/compaction/CursorCompactor.java b/src/java/org/apache/cassandra/db/compaction/CursorCompactor.java index 3b4819c15670..7d528b5e2d9d 100644 --- a/src/java/org/apache/cassandra/db/compaction/CursorCompactor.java +++ b/src/java/org/apache/cassandra/db/compaction/CursorCompactor.java @@ -27,11 +27,11 @@ import java.util.function.LongPredicate; import com.google.common.collect.ImmutableSet; -import com.google.common.collect.UnmodifiableIterator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.cassandra.config.Config.DiskAccessMode; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.AbstractCompactionController; import org.apache.cassandra.db.ClusteringComparator; @@ -70,6 +70,7 @@ import org.apache.cassandra.schema.CompactionParams; import org.apache.cassandra.schema.SchemaConstants; import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.utils.Throwables; import org.apache.cassandra.utils.TimeUUID; import static org.apache.cassandra.db.ClusteringPrefix.Kind.EXCL_END_BOUND; @@ -294,15 +295,8 @@ private CursorCompactor(OperationType type, * {@link CompactionIterator#CompactionIterator(OperationType, List, AbstractCompactionController, long, TimeUUID, ActiveCompactionsTracker)} */ - // Convert Readers to Cursors - this.sstableCursors = new StatefulCursor[sstables.size()]; + this.sstableCursors = convertScannersToCursors(scanners, sstables, DatabaseDescriptor.getCompactionReadDiskAccessMode()); this.sstableCursorsEqualsNext = new boolean[sstables.size()]; - UnmodifiableIterator iterator = sstables.iterator(); - for (int i = 0; i < this.sstableCursors.length; i++) - { - SSTableReader ssTableReader = iterator.next(); - this.sstableCursors[i] = new StatefulCursor(ssTableReader); - } this.enforceStrictLiveness = controller.cfs.metadata.get().enforceStrictLiveness(); purger = new Purger(type, controller, nowInSec); @@ -1553,6 +1547,33 @@ private static String mergeHistogramToString(long[] histogram) return sb.toString(); } + /** + * Closes scanner-opened readers before opening cursor-specific readers with the configured disk access mode. + * In cursor compaction, scanners are only used for metadata; closing them avoids holding redundant file + * descriptors and prevents conflicts when scan and non-scan readers for the same file share thread-local + * buffer state on the same thread. + */ + private static StatefulCursor[] convertScannersToCursors(List scanners, ImmutableSet sstables, + DiskAccessMode diskAccessMode) + { + for (ISSTableScanner scanner : scanners) + scanner.close(); + + StatefulCursor[] cursors = new StatefulCursor[sstables.size()]; + int i = 0; + try + { + for (SSTableReader reader : sstables) + cursors[i++] = new StatefulCursor(reader, diskAccessMode); + return cursors; + } + catch (RuntimeException | Error e) + { + Throwables.closeNonNullAndAddSuppressed(e, cursors); + throw e; + } + } + public void close() { try diff --git a/src/java/org/apache/cassandra/db/compaction/StatefulCursor.java b/src/java/org/apache/cassandra/db/compaction/StatefulCursor.java index c28d218ac321..f0d81a26c182 100644 --- a/src/java/org/apache/cassandra/db/compaction/StatefulCursor.java +++ b/src/java/org/apache/cassandra/db/compaction/StatefulCursor.java @@ -19,6 +19,7 @@ package org.apache.cassandra.db.compaction; import org.apache.cassandra.config.Config; +import org.apache.cassandra.config.Config.DiskAccessMode; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.ReusableLivenessInfo; @@ -55,9 +56,9 @@ class StatefulCursor extends SSTableCursorReader private boolean isOpenRangeTombstonePresent = false; - public StatefulCursor(SSTableReader reader) + public StatefulCursor(SSTableReader reader, DiskAccessMode diskAccessMode) { - super(reader); + super(reader, diskAccessMode); currPartition = new PartitionDescriptor(reader.getPartitioner().createReusableKey(0)); prevPartition = new PartitionDescriptor(reader.getPartitioner().createReusableKey(0)); unfiltered = new UnfilteredDescriptor(reader.header.clusteringTypes().toArray(AbstractType[]::new)); diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableCursorReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableCursorReader.java index ccf94ce6dac8..9d6a2b990faa 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableCursorReader.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableCursorReader.java @@ -22,6 +22,7 @@ import com.google.common.collect.ImmutableList; +import org.apache.cassandra.config.Config.DiskAccessMode; import org.apache.cassandra.db.ClusteringPrefix; import org.apache.cassandra.db.Columns; import org.apache.cassandra.db.DeletionTime; @@ -197,15 +198,20 @@ public static SSTableCursorReader fromDescriptor(Descriptor desc) throws IOExcep { TableMetadata metadata = Util.metadataFromSSTable(desc); SSTableReader reader = SSTableReader.openNoValidation(null, desc, TableMetadataRef.forOfflineTools(metadata)); - return new SSTableCursorReader(reader, metadata, reader.ref()); + return new SSTableCursorReader(reader, metadata, reader.ref(), null); } public SSTableCursorReader(SSTableReader reader) { - this(reader, reader.metadata(), null); + this(reader, reader.metadata(), null, null); } - private SSTableCursorReader(SSTableReader reader, TableMetadata metadata, Ref readerRef) + public SSTableCursorReader(SSTableReader reader, DiskAccessMode diskAccessMode) + { + this(reader, reader.metadata(), null, diskAccessMode); + } + + private SSTableCursorReader(SSTableReader reader, TableMetadata metadata, Ref readerRef, DiskAccessMode diskAccessMode) { ssTableReader = reader; ssTableReaderRef = readerRef; @@ -221,7 +227,7 @@ private SSTableCursorReader(SSTableReader reader, TableMetadata metadata, Ref diskAccessModes() + { + return Arrays.asList(new Object[]{ DiskAccessMode.standard }, + new Object[]{ DiskAccessMode.direct }); + } + + @Before + public void setCompactionReadDiskAccessMode() + { + DatabaseDescriptor.setCompactionReadDiskAccessMode(compactionReadDiskAccessMode); + } + + @After + public void restoreCompactionReadDiskAccessMode() + { + DatabaseDescriptor.setCompactionReadDiskAccessMode(DiskAccessMode.standard); + } + @AfterClass public static void teardown() throws IOException, InterruptedException, ExecutionException { diff --git a/test/unit/org/apache/cassandra/io/util/DirectThreadLocalReadAheadBufferTest.java b/test/unit/org/apache/cassandra/io/util/DirectThreadLocalReadAheadBufferTest.java index 84b60bc5c8da..46b3091f1167 100644 --- a/test/unit/org/apache/cassandra/io/util/DirectThreadLocalReadAheadBufferTest.java +++ b/test/unit/org/apache/cassandra/io/util/DirectThreadLocalReadAheadBufferTest.java @@ -17,8 +17,12 @@ */ package org.apache.cassandra.io.util; +import java.lang.management.BufferPoolMXBean; +import java.lang.management.ManagementFactory; import java.util.Arrays; +import java.util.List; +import org.junit.Assert; import org.junit.Test; import org.apache.cassandra.utils.Pair; @@ -61,4 +65,49 @@ private void testReads(InputData propertyInputs, int bufferSize, int blockSize) tlrab.close(); } } + + @Test + public void testDirectMemoryIsCleanedOnClose() + { + BufferPoolMXBean directPool = getDirectBufferPool(); + int blockSize = FileUtils.getFileBlockSize(files[0]); + int bufferSize = 64 * 1024 * 1024; // 64MB - large enough to reliably detect + + try (ChannelProxy channel = new ChannelProxy(files[0], ChannelProxy.IOMode.DIRECT)) + { + DirectThreadLocalReadAheadBuffer tlrab = + new DirectThreadLocalReadAheadBuffer(channel, bufferSize, blockSize); + + // Force buffer allocation + tlrab.allocateBuffer(); + + long memoryUsedBefore = directPool.getMemoryUsed(); + + // Close should clean the direct memory + tlrab.close(); + + long memoryUsedAfter = directPool.getMemoryUsed(); + + // Memory should decrease by approximately buffer size (+ alignment overhead) + long expectedDecrease = bufferSize; + long actualDecrease = memoryUsedBefore - memoryUsedAfter; + + Assert.assertTrue( + "Direct memory should decrease after close(). " + + "Before: " + memoryUsedBefore + ", After: " + memoryUsedAfter + + ", Expected decrease: ~" + expectedDecrease + ", Actual: " + actualDecrease, + actualDecrease >= expectedDecrease * 0.9); // 10% tolerance for alignment + } + } + + private static BufferPoolMXBean getDirectBufferPool() + { + List pools = ManagementFactory.getPlatformMXBeans(BufferPoolMXBean.class); + for (BufferPoolMXBean pool : pools) + if (pool.getName().equals("direct")) + return pool; + + throw new IllegalStateException("Direct buffer pool not found"); + } + } \ No newline at end of file From a572dee0c509511275a4b65df331fd3f64742fe1 Mon Sep 17 00:00:00 2001 From: samlightfoot Date: Wed, 18 Feb 2026 11:26:58 +0000 Subject: [PATCH 2/2] Make MemoryUtil.clean() throw for slices/duplicates and fix mmap cleanup clean() previously silently returned for any buffer with a non-null attachment. This masked two problems: callers passing slices/duplicates would silently leak memory, and MappedByteBuffers (which gained a non-null Unmapper attachment in JDK 14+) were never actually cleaned. Now throws IllegalArgumentException for slice/duplicate/view buffers (attachment instanceof ByteBuffer) and uses DirectBuffer.cleaner() directly instead of Unsafe.invokeCleaner() to bypass the JDK's overly broad attachment check that blocks MappedByteBuffers. --- .../DirectThreadLocalByteBufferHolder.java | 6 +-- .../DirectThreadLocalReadAheadBuffer.java | 4 +- .../cassandra/utils/memory/BufferPool.java | 6 ++- .../cassandra/utils/memory/MemoryUtil.java | 13 ++++-- .../cassandra/transport/WriteBytesTest.java | 9 +++- .../utils/memory/MemoryUtilTest.java | 44 +++++++++---------- 6 files changed, 45 insertions(+), 37 deletions(-) diff --git a/src/java/org/apache/cassandra/io/util/DirectThreadLocalByteBufferHolder.java b/src/java/org/apache/cassandra/io/util/DirectThreadLocalByteBufferHolder.java index ec4c9abb7765..eb4614966599 100644 --- a/src/java/org/apache/cassandra/io/util/DirectThreadLocalByteBufferHolder.java +++ b/src/java/org/apache/cassandra/io/util/DirectThreadLocalByteBufferHolder.java @@ -70,10 +70,8 @@ public ByteBuffer getBuffer(int size) private static void cleanBuffer(ByteBuffer buffer) { - // Aligned buffers are slices; clean the backing buffer (attachment) - DirectBuffer db = (DirectBuffer) buffer; - ByteBuffer attachment = (ByteBuffer) db.attachment(); - MemoryUtil.clean(attachment != null ? attachment : buffer); + // Aligned buffers from BufferUtil.allocateDirectAligned are slices; clean the backing buffer (attachment) + MemoryUtil.clean((ByteBuffer) ((DirectBuffer) buffer).attachment()); } } \ No newline at end of file diff --git a/src/java/org/apache/cassandra/io/util/DirectThreadLocalReadAheadBuffer.java b/src/java/org/apache/cassandra/io/util/DirectThreadLocalReadAheadBuffer.java index 1611888b0369..09e09d6b9760 100644 --- a/src/java/org/apache/cassandra/io/util/DirectThreadLocalReadAheadBuffer.java +++ b/src/java/org/apache/cassandra/io/util/DirectThreadLocalReadAheadBuffer.java @@ -54,9 +54,7 @@ protected void loadBlock(ByteBuffer blockBuffer, long blockPosition, int sizeToR protected void cleanBuffer(ByteBuffer buffer) { // Aligned buffers from BufferUtil.allocateDirectAligned are slices; clean the backing buffer (attachment) - DirectBuffer db = (DirectBuffer) buffer; - ByteBuffer attachment = (ByteBuffer) db.attachment(); - MemoryUtil.clean(attachment != null ? attachment : buffer); + MemoryUtil.clean((ByteBuffer) ((DirectBuffer) buffer).attachment()); } } diff --git a/src/java/org/apache/cassandra/utils/memory/BufferPool.java b/src/java/org/apache/cassandra/utils/memory/BufferPool.java index c85368c3a8b4..dfd9702d28c6 100644 --- a/src/java/org/apache/cassandra/utils/memory/BufferPool.java +++ b/src/java/org/apache/cassandra/utils/memory/BufferPool.java @@ -1566,7 +1566,11 @@ void unsafeFree() if (parent != null) parent.free(slab); else - MemoryUtil.clean(slab); + { + // slab may be an aligned slice from allocateDirectAligned(); clean the root allocation + ByteBuffer attachment = (ByteBuffer) ((DirectBuffer) slab).attachment(); + MemoryUtil.clean(attachment != null ? attachment : slab); + } } static void unsafeRecycle(Chunk chunk) diff --git a/src/java/org/apache/cassandra/utils/memory/MemoryUtil.java b/src/java/org/apache/cassandra/utils/memory/MemoryUtil.java index ec94da2c9385..d372b5a8fa7c 100644 --- a/src/java/org/apache/cassandra/utils/memory/MemoryUtil.java +++ b/src/java/org/apache/cassandra/utils/memory/MemoryUtil.java @@ -24,6 +24,7 @@ import com.sun.jna.Native; +import jdk.internal.ref.Cleaner; import sun.misc.Unsafe; import sun.nio.ch.DirectBuffer; @@ -330,9 +331,13 @@ public static void clean(ByteBuffer buffer) return; DirectBuffer db = (DirectBuffer) buffer; - if (db.attachment() != null) - return; // duplicate or slice - - unsafe.invokeCleaner(buffer); + if (db.attachment() instanceof ByteBuffer) + throw new IllegalArgumentException( + "Cannot clean a slice/duplicate/view buffer directly; " + + "resolve to the root allocation before calling clean()"); + + Cleaner cleaner = db.cleaner(); + if (cleaner != null) + cleaner.clean(); } } diff --git a/test/unit/org/apache/cassandra/transport/WriteBytesTest.java b/test/unit/org/apache/cassandra/transport/WriteBytesTest.java index 19a70e999fd6..987ec78c52aa 100644 --- a/test/unit/org/apache/cassandra/transport/WriteBytesTest.java +++ b/test/unit/org/apache/cassandra/transport/WriteBytesTest.java @@ -18,6 +18,8 @@ package org.apache.cassandra.transport; +import java.nio.ByteBuffer; + import org.assertj.core.api.Assertions; import org.junit.Test; @@ -26,6 +28,7 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; +import sun.nio.ch.DirectBuffer; import static org.quicktheories.QuickTheory.qt; @@ -52,7 +55,11 @@ public void test() Assertions.assertThat(buf.writerIndex()).isEqualTo(size); for (int i = 0; i < size; i++) Assertions.assertThat(buf.getByte(buf.readerIndex() + i)).describedAs("byte mismatch at index %d", i).isEqualTo(bb.get(bb.position() + i)); - MemoryUtil.clean(bb); + + if (bb.isDirect()) { + Object attachment = ((DirectBuffer) bb).attachment(); + MemoryUtil.clean(attachment == null ? bb : (ByteBuffer) attachment); + } }); } diff --git a/test/unit/org/apache/cassandra/utils/memory/MemoryUtilTest.java b/test/unit/org/apache/cassandra/utils/memory/MemoryUtilTest.java index 842823a2d54c..1b06274daeed 100644 --- a/test/unit/org/apache/cassandra/utils/memory/MemoryUtilTest.java +++ b/test/unit/org/apache/cassandra/utils/memory/MemoryUtilTest.java @@ -50,35 +50,31 @@ public void testClean() } @Test - public void testCleanViewDoesNotThrow() + public void testCleanSliceThrows() { - // Use a large buffer to likely get mmap'd memory from the OS. This ensures that if cleaning a view incorrectly - // unmaps the original buffer's memory, subsequent access to 'original' would more reliably fail. - // For context: glibc's mmap threshold is 32MB on 64-bit systems - ByteBuffer original = ByteBuffer.allocateDirect(64 * 1024 * 1024); - + ByteBuffer original = ByteBuffer.allocateDirect(1024); ByteBuffer slice = original.slice(); - MemoryUtil.clean(slice); - try - { - original.putInt(10); - } - catch (Exception exc) - { - Assertions.fail("Unable to write to original buffer after cleaning (slice). " + exc.getMessage(), exc); - } + Assertions.assertThatThrownBy(() -> MemoryUtil.clean(slice)) + .isInstanceOf(IllegalArgumentException.class); + + // original should still be usable after the rejected clean + original.putInt(10); + MemoryUtil.clean(original); + } + + @Test + public void testCleanDuplicateThrows() + { + ByteBuffer original = ByteBuffer.allocateDirect(1024); ByteBuffer duplicate = original.duplicate(); - MemoryUtil.clean(duplicate); - try - { - original.putInt(10); - } - catch (Exception exc) - { - Assertions.fail("Unable to write to original buffer after cleaning (duplicate). " + exc.getMessage(), exc); - } + Assertions.assertThatThrownBy(() -> MemoryUtil.clean(duplicate)) + .isInstanceOf(IllegalArgumentException.class); + + // original should still be usable after the rejected clean + original.putInt(10); + MemoryUtil.clean(original); } @Test