diff --git a/CHANGES.txt b/CHANGES.txt index 016aa739f8cf..115f6b3240f7 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 5.1 + * Wire compaction_read_disk_access_mode through cursor-based compaction (CASSANDRA-21147) * Reduce memory allocation during transformation of BatchStatement to Mutation (CASSANDRA-21141) * Direct I/O support for compaction reads (CASSANDRA-19987) * Support custom StartupCheck implementations via SPI (CASSANDRA-21093) diff --git a/src/java/org/apache/cassandra/db/compaction/CursorCompactor.java b/src/java/org/apache/cassandra/db/compaction/CursorCompactor.java index 3b4819c15670..7d528b5e2d9d 100644 --- a/src/java/org/apache/cassandra/db/compaction/CursorCompactor.java +++ b/src/java/org/apache/cassandra/db/compaction/CursorCompactor.java @@ -27,11 +27,11 @@ import java.util.function.LongPredicate; import com.google.common.collect.ImmutableSet; -import com.google.common.collect.UnmodifiableIterator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.cassandra.config.Config.DiskAccessMode; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.AbstractCompactionController; import org.apache.cassandra.db.ClusteringComparator; @@ -70,6 +70,7 @@ import org.apache.cassandra.schema.CompactionParams; import org.apache.cassandra.schema.SchemaConstants; import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.utils.Throwables; import org.apache.cassandra.utils.TimeUUID; import static org.apache.cassandra.db.ClusteringPrefix.Kind.EXCL_END_BOUND; @@ -294,15 +295,8 @@ private CursorCompactor(OperationType type, * {@link CompactionIterator#CompactionIterator(OperationType, List, AbstractCompactionController, long, TimeUUID, ActiveCompactionsTracker)} */ - // Convert Readers to Cursors - this.sstableCursors = new StatefulCursor[sstables.size()]; + this.sstableCursors = convertScannersToCursors(scanners, sstables, DatabaseDescriptor.getCompactionReadDiskAccessMode()); this.sstableCursorsEqualsNext = new boolean[sstables.size()]; - UnmodifiableIterator iterator = sstables.iterator(); - for (int i = 0; i < this.sstableCursors.length; i++) - { - SSTableReader ssTableReader = iterator.next(); - this.sstableCursors[i] = new StatefulCursor(ssTableReader); - } this.enforceStrictLiveness = controller.cfs.metadata.get().enforceStrictLiveness(); purger = new Purger(type, controller, nowInSec); @@ -1553,6 +1547,33 @@ private static String mergeHistogramToString(long[] histogram) return sb.toString(); } + /** + * Closes scanner-opened readers before opening cursor-specific readers with the configured disk access mode. + * In cursor compaction, scanners are only used for metadata; closing them avoids holding redundant file + * descriptors and prevents conflicts when scan and non-scan readers for the same file share thread-local + * buffer state on the same thread. + */ + private static StatefulCursor[] convertScannersToCursors(List scanners, ImmutableSet sstables, + DiskAccessMode diskAccessMode) + { + for (ISSTableScanner scanner : scanners) + scanner.close(); + + StatefulCursor[] cursors = new StatefulCursor[sstables.size()]; + int i = 0; + try + { + for (SSTableReader reader : sstables) + cursors[i++] = new StatefulCursor(reader, diskAccessMode); + return cursors; + } + catch (RuntimeException | Error e) + { + Throwables.closeNonNullAndAddSuppressed(e, cursors); + throw e; + } + } + public void close() { try diff --git a/src/java/org/apache/cassandra/db/compaction/StatefulCursor.java b/src/java/org/apache/cassandra/db/compaction/StatefulCursor.java index c28d218ac321..f0d81a26c182 100644 --- a/src/java/org/apache/cassandra/db/compaction/StatefulCursor.java +++ b/src/java/org/apache/cassandra/db/compaction/StatefulCursor.java @@ -19,6 +19,7 @@ package org.apache.cassandra.db.compaction; import org.apache.cassandra.config.Config; +import org.apache.cassandra.config.Config.DiskAccessMode; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.ReusableLivenessInfo; @@ -55,9 +56,9 @@ class StatefulCursor extends SSTableCursorReader private boolean isOpenRangeTombstonePresent = false; - public StatefulCursor(SSTableReader reader) + public StatefulCursor(SSTableReader reader, DiskAccessMode diskAccessMode) { - super(reader); + super(reader, diskAccessMode); currPartition = new PartitionDescriptor(reader.getPartitioner().createReusableKey(0)); prevPartition = new PartitionDescriptor(reader.getPartitioner().createReusableKey(0)); unfiltered = new UnfilteredDescriptor(reader.header.clusteringTypes().toArray(AbstractType[]::new)); diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableCursorReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableCursorReader.java index ccf94ce6dac8..9d6a2b990faa 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableCursorReader.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableCursorReader.java @@ -22,6 +22,7 @@ import com.google.common.collect.ImmutableList; +import org.apache.cassandra.config.Config.DiskAccessMode; import org.apache.cassandra.db.ClusteringPrefix; import org.apache.cassandra.db.Columns; import org.apache.cassandra.db.DeletionTime; @@ -197,15 +198,20 @@ public static SSTableCursorReader fromDescriptor(Descriptor desc) throws IOExcep { TableMetadata metadata = Util.metadataFromSSTable(desc); SSTableReader reader = SSTableReader.openNoValidation(null, desc, TableMetadataRef.forOfflineTools(metadata)); - return new SSTableCursorReader(reader, metadata, reader.ref()); + return new SSTableCursorReader(reader, metadata, reader.ref(), null); } public SSTableCursorReader(SSTableReader reader) { - this(reader, reader.metadata(), null); + this(reader, reader.metadata(), null, null); } - private SSTableCursorReader(SSTableReader reader, TableMetadata metadata, Ref readerRef) + public SSTableCursorReader(SSTableReader reader, DiskAccessMode diskAccessMode) + { + this(reader, reader.metadata(), null, diskAccessMode); + } + + private SSTableCursorReader(SSTableReader reader, TableMetadata metadata, Ref readerRef, DiskAccessMode diskAccessMode) { ssTableReader = reader; ssTableReaderRef = readerRef; @@ -221,7 +227,7 @@ private SSTableCursorReader(SSTableReader reader, TableMetadata metadata, Ref diskAccessModes() + { + return Arrays.asList(new Object[]{ DiskAccessMode.standard }, + new Object[]{ DiskAccessMode.direct }); + } + + @Before + public void setCompactionReadDiskAccessMode() + { + DatabaseDescriptor.setCompactionReadDiskAccessMode(compactionReadDiskAccessMode); + } + + @After + public void restoreCompactionReadDiskAccessMode() + { + DatabaseDescriptor.setCompactionReadDiskAccessMode(DiskAccessMode.standard); + } + @AfterClass public static void teardown() throws IOException, InterruptedException, ExecutionException { diff --git a/test/unit/org/apache/cassandra/io/util/DirectThreadLocalReadAheadBufferTest.java b/test/unit/org/apache/cassandra/io/util/DirectThreadLocalReadAheadBufferTest.java index 84b60bc5c8da..46b3091f1167 100644 --- a/test/unit/org/apache/cassandra/io/util/DirectThreadLocalReadAheadBufferTest.java +++ b/test/unit/org/apache/cassandra/io/util/DirectThreadLocalReadAheadBufferTest.java @@ -17,8 +17,12 @@ */ package org.apache.cassandra.io.util; +import java.lang.management.BufferPoolMXBean; +import java.lang.management.ManagementFactory; import java.util.Arrays; +import java.util.List; +import org.junit.Assert; import org.junit.Test; import org.apache.cassandra.utils.Pair; @@ -61,4 +65,49 @@ private void testReads(InputData propertyInputs, int bufferSize, int blockSize) tlrab.close(); } } + + @Test + public void testDirectMemoryIsCleanedOnClose() + { + BufferPoolMXBean directPool = getDirectBufferPool(); + int blockSize = FileUtils.getFileBlockSize(files[0]); + int bufferSize = 64 * 1024 * 1024; // 64MB - large enough to reliably detect + + try (ChannelProxy channel = new ChannelProxy(files[0], ChannelProxy.IOMode.DIRECT)) + { + DirectThreadLocalReadAheadBuffer tlrab = + new DirectThreadLocalReadAheadBuffer(channel, bufferSize, blockSize); + + // Force buffer allocation + tlrab.allocateBuffer(); + + long memoryUsedBefore = directPool.getMemoryUsed(); + + // Close should clean the direct memory + tlrab.close(); + + long memoryUsedAfter = directPool.getMemoryUsed(); + + // Memory should decrease by approximately buffer size (+ alignment overhead) + long expectedDecrease = bufferSize; + long actualDecrease = memoryUsedBefore - memoryUsedAfter; + + Assert.assertTrue( + "Direct memory should decrease after close(). " + + "Before: " + memoryUsedBefore + ", After: " + memoryUsedAfter + + ", Expected decrease: ~" + expectedDecrease + ", Actual: " + actualDecrease, + actualDecrease >= expectedDecrease * 0.9); // 10% tolerance for alignment + } + } + + private static BufferPoolMXBean getDirectBufferPool() + { + List pools = ManagementFactory.getPlatformMXBeans(BufferPoolMXBean.class); + for (BufferPoolMXBean pool : pools) + if (pool.getName().equals("direct")) + return pool; + + throw new IllegalStateException("Direct buffer pool not found"); + } + } \ No newline at end of file diff --git a/test/unit/org/apache/cassandra/transport/WriteBytesTest.java b/test/unit/org/apache/cassandra/transport/WriteBytesTest.java index 19a70e999fd6..987ec78c52aa 100644 --- a/test/unit/org/apache/cassandra/transport/WriteBytesTest.java +++ b/test/unit/org/apache/cassandra/transport/WriteBytesTest.java @@ -18,6 +18,8 @@ package org.apache.cassandra.transport; +import java.nio.ByteBuffer; + import org.assertj.core.api.Assertions; import org.junit.Test; @@ -26,6 +28,7 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; +import sun.nio.ch.DirectBuffer; import static org.quicktheories.QuickTheory.qt; @@ -52,7 +55,11 @@ public void test() Assertions.assertThat(buf.writerIndex()).isEqualTo(size); for (int i = 0; i < size; i++) Assertions.assertThat(buf.getByte(buf.readerIndex() + i)).describedAs("byte mismatch at index %d", i).isEqualTo(bb.get(bb.position() + i)); - MemoryUtil.clean(bb); + + if (bb.isDirect()) { + Object attachment = ((DirectBuffer) bb).attachment(); + MemoryUtil.clean(attachment == null ? bb : (ByteBuffer) attachment); + } }); } diff --git a/test/unit/org/apache/cassandra/utils/memory/MemoryUtilTest.java b/test/unit/org/apache/cassandra/utils/memory/MemoryUtilTest.java index 842823a2d54c..1b06274daeed 100644 --- a/test/unit/org/apache/cassandra/utils/memory/MemoryUtilTest.java +++ b/test/unit/org/apache/cassandra/utils/memory/MemoryUtilTest.java @@ -50,35 +50,31 @@ public void testClean() } @Test - public void testCleanViewDoesNotThrow() + public void testCleanSliceThrows() { - // Use a large buffer to likely get mmap'd memory from the OS. This ensures that if cleaning a view incorrectly - // unmaps the original buffer's memory, subsequent access to 'original' would more reliably fail. - // For context: glibc's mmap threshold is 32MB on 64-bit systems - ByteBuffer original = ByteBuffer.allocateDirect(64 * 1024 * 1024); - + ByteBuffer original = ByteBuffer.allocateDirect(1024); ByteBuffer slice = original.slice(); - MemoryUtil.clean(slice); - try - { - original.putInt(10); - } - catch (Exception exc) - { - Assertions.fail("Unable to write to original buffer after cleaning (slice). " + exc.getMessage(), exc); - } + Assertions.assertThatThrownBy(() -> MemoryUtil.clean(slice)) + .isInstanceOf(IllegalArgumentException.class); + + // original should still be usable after the rejected clean + original.putInt(10); + MemoryUtil.clean(original); + } + + @Test + public void testCleanDuplicateThrows() + { + ByteBuffer original = ByteBuffer.allocateDirect(1024); ByteBuffer duplicate = original.duplicate(); - MemoryUtil.clean(duplicate); - try - { - original.putInt(10); - } - catch (Exception exc) - { - Assertions.fail("Unable to write to original buffer after cleaning (duplicate). " + exc.getMessage(), exc); - } + Assertions.assertThatThrownBy(() -> MemoryUtil.clean(duplicate)) + .isInstanceOf(IllegalArgumentException.class); + + // original should still be usable after the rejected clean + original.putInt(10); + MemoryUtil.clean(original); } @Test