-
Notifications
You must be signed in to change notification settings - Fork 3.8k
CASSANDRA-21147 - Direct IO support for Cursor Compaction #4606
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: trunk
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -38,6 +38,8 @@ | |
| import java.util.concurrent.atomic.AtomicBoolean; | ||
| import java.util.concurrent.locks.ReentrantReadWriteLock; | ||
|
|
||
| import javax.annotation.Nullable; | ||
|
|
||
| import com.clearspring.analytics.stream.cardinality.CardinalityMergeException; | ||
| import com.clearspring.analytics.stream.cardinality.ICardinality; | ||
| import com.google.common.annotations.VisibleForTesting; | ||
|
|
@@ -1417,44 +1419,60 @@ public StatsMetadata getSSTableMetadata() | |
| return sstableMetadata; | ||
| } | ||
|
|
||
| public RandomAccessReader openDataReader() | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should there really be a no access mode arg versions here? Shouldn't everyone know what access mode they want or explicitly supply they don't care? Not sure if I am right here and the verbosity is worth it. The concern is people call the no-arg version when they should have passed configuration, but at some point it's not worth it.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This one may not be worth it. We'd have to make sure the original dFile is available to callers so they can pass dFile.diskAccessMode() in (i.e. the default). There are currently >10 usages of openDataReader/openDataReaderForScan that we'd have to pass it through for, and I can see a couple at a glance that do not have access to the FileHandle for the data file (dFile). We could improve the docs to indicate the files default disk access mode will be used for these overloads. |
||
| { | ||
| return openDataReaderInternal(null, null, false); | ||
| } | ||
|
|
||
| public RandomAccessReader openDataReader(RateLimiter limiter) | ||
| { | ||
| assert limiter != null; | ||
| return dfile.createReader(limiter); | ||
| return openDataReaderInternal(null, limiter, false); | ||
| } | ||
|
|
||
| public RandomAccessReader openDataReader() | ||
| public RandomAccessReader openDataReader(DiskAccessMode diskAccessMode) | ||
| { | ||
| return dfile.createReader(); | ||
| return openDataReaderInternal(diskAccessMode, null, false); | ||
| } | ||
|
|
||
| public RandomAccessReader openDataReaderForScan() | ||
| { | ||
| return openDataReaderForScan(dfile.diskAccessMode()); | ||
| return openDataReaderInternal(null, null, true); | ||
| } | ||
|
|
||
| public RandomAccessReader openDataReaderForScan(DiskAccessMode diskAccessMode) | ||
| { | ||
| boolean isSameDiskAccessMode = diskAccessMode == dfile.diskAccessMode(); | ||
| boolean isDirectIONotSupported = diskAccessMode == DiskAccessMode.direct && !dfile.supportsDirectIO(); | ||
| return openDataReaderInternal(diskAccessMode, null, true); | ||
| } | ||
|
|
||
| if (isSameDiskAccessMode || isDirectIONotSupported) | ||
| return dfile.createReaderForScan(OnReaderClose.RETAIN_FILE_OPEN); | ||
| private RandomAccessReader openDataReaderInternal(@Nullable DiskAccessMode diskAccessMode, | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The logic in this method (and similar from the introduction of DIO for scans) needs test coverage to make sure it can re-use when it is supposed to re-use, and fall back to creating a new dfile, and then closes the dfile correctly. I haven't checked that these tests exist yet just pointing it out. |
||
| @Nullable RateLimiter limiter, | ||
| boolean forScan) | ||
| { | ||
| if (canReuseDfile(diskAccessMode)) | ||
| return dfile.createReader(limiter, forScan, OnReaderClose.RETAIN_FILE_OPEN); | ||
|
|
||
| FileHandle dataFile = dfile.toBuilder() | ||
| .withDiskAccessMode(diskAccessMode) | ||
| .complete(); | ||
| FileHandle handle = dfile.toBuilder() | ||
| .withDiskAccessMode(diskAccessMode) | ||
| .complete(); | ||
| try | ||
| { | ||
| return dataFile.createReaderForScan(OnReaderClose.CLOSE_FILE); | ||
| return handle.createReader(limiter, forScan, OnReaderClose.CLOSE_FILE); | ||
| } | ||
| catch (Throwable t) | ||
| { | ||
| dataFile.close(); | ||
| handle.close(); | ||
| throw t; | ||
| } | ||
| } | ||
|
|
||
| private boolean canReuseDfile(@Nullable DiskAccessMode diskAccessMode) | ||
| { | ||
| return diskAccessMode == null | ||
| || diskAccessMode == dfile.diskAccessMode() | ||
| || (diskAccessMode == DiskAccessMode.direct && !dfile.supportsDirectIO()); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we at any point log that we weren't able to open the file with direct IO as requested? Might be a use case for a rate limited logger.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We fail on start-up if any of the data file directories do not support direct I/O. The only case FileHandle::supportsDirectIO would return false is when an SSTable is uncompressed as we have only implemented support for the compressed case, which may be deemed as noise if we log for. Curious on your thoughts. |
||
| } | ||
|
|
||
| public void trySkipFileCacheBefore(DecoratedKey key) | ||
| { | ||
| long position = getPosition(key, SSTableReader.Operator.GE); | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -24,6 +24,9 @@ | |
| import org.agrona.BufferUtil; | ||
|
|
||
| import org.apache.cassandra.io.sstable.CorruptSSTableException; | ||
| import org.apache.cassandra.utils.memory.MemoryUtil; | ||
|
|
||
| import sun.nio.ch.DirectBuffer; | ||
|
|
||
| public final class DirectThreadLocalReadAheadBuffer extends ThreadLocalReadAheadBuffer | ||
| { | ||
|
|
@@ -46,4 +49,12 @@ protected void loadBlock(ByteBuffer blockBuffer, long blockPosition, int sizeToR | |
| if (channel.read(blockBuffer, blockPosition) < sizeToRead) | ||
| throw new CorruptSSTableException(null, channel.filePath()); | ||
| } | ||
|
|
||
| @Override | ||
| protected void cleanBuffer(ByteBuffer buffer) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could this check be done in
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Strongly agree it should throw - it would have detected the potential leak. Changes pushed. |
||
| { | ||
| // Aligned buffers from BufferUtil.allocateDirectAligned are slices; clean the backing buffer (attachment) | ||
| MemoryUtil.clean((ByteBuffer) ((DirectBuffer) buffer).attachment()); | ||
| } | ||
|
|
||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This has some smell to it. Should the key in ThreadLocalReadAheadBuffer include the disk access mode? Should these scanners have been opened with the correct disk access mode in the first place or is it still an issue because we then need to re-open with the Cursor based approach?
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree there's a smell - the cursor compaction code has been tailored to fit the existing compaction interface and accept scanners and pull just metadata from them, whereas raw SSTables would have suited better. The ideal refactor I see is to let each separate compaction pipeline open the readers it requires, but this may be better done in a follow-up.
The disk access mode is not actually the issue here, it is the fact two readers (one scan and one non-scan) exist for an SSTable at the same time on the same thread. On trunk this is a bug causing cursor compaction to use scan reads (with a read-ahead buffer) rather than random access reads (intended), due to ScanCompressedReader's allocated() check being based on shared static state, looking like the below
Closing the scanner before opening the cursor reader deallocates the block map and thus allocated() returns false when opening the cursor readers, leading to the correct random access reader 'reader' being chosen:
This is perhaps slightly brittle from CompressedChunkReader's allocated() perspective, but the precondition that a given file is not opened by two readers from the same thread concurrently does not seem unreasonable (like it is currently in cursor compaction). I did look into guarding instantiation for the same file but it caused a large number of test failures (I cannot remember the full details).