Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -505,15 +505,21 @@ public enum CassandraRelevantProperties
/** Whether to allow the user to specify custom options to the hnsw index */
SAI_VECTOR_ALLOW_CUSTOM_PARAMETERS("cassandra.sai.vector.allow_custom_parameters", "false"),

/** Controls the maximum top-k limit for vector search */
SAI_VECTOR_SEARCH_MAX_TOP_K("cassandra.sai.vector_search.max_top_k", "1000"),

/**
* Controls the maximum number of PrimaryKeys that will be read into memory at one time when ordering/limiting
* the results of an ANN query constrained by non-ANN predicates.
* The maximum number of primary keys that a WHERE clause may materialize before the query planner switches
* from a search-then-sort execution strategy to an order-by-then-filter strategy. Increasing this limit allows
* more primary keys to be buffered in memory, enabling either (a) brute-force sorting or (b) graph traversal
* with a restrictive filter that admits only nodes whose primary keys matched the WHERE clause.
*
* Note also that the SAI_INTERSECTION_CLAUSE_LIMIT is applied to the WHERE clause before using a search to
* build a potential result set for search-then-sort query execution.
*/
SAI_VECTOR_SEARCH_ORDER_CHUNK_SIZE("cassandra.sai.vector_search.order_chunk_size", "100000"),
SAI_VECTOR_SEARCH_MAX_MATERIALIZE_KEYS("cassandra.sai.vector_search.max_materialized_keys", "16000"),

/** Controls the maximum top-k limit for vector search */
SAI_VECTOR_SEARCH_MAX_TOP_K("cassandra.sai.vector_search.max_top_k", "1000"),

SCHEMA_PULL_INTERVAL_MS("cassandra.schema_pull_interval_ms", "60000"),
SCHEMA_UPDATE_HANDLER_FACTORY_CLASS("cassandra.schema.update_handler_factory.class"),
SEARCH_CONCURRENCY_FACTOR("cassandra.search_concurrency_factor", "1"),

Expand Down
54 changes: 41 additions & 13 deletions src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.apache.cassandra.db.partitions.PartitionIterators;
import org.apache.cassandra.db.partitions.SingletonUnfilteredPartitionIterator;
import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
import org.apache.cassandra.db.rows.BaseRowIterator;
import org.apache.cassandra.db.rows.Cell;
import org.apache.cassandra.db.rows.Row;
import org.apache.cassandra.db.rows.Rows;
Expand Down Expand Up @@ -726,10 +727,26 @@ public UnfilteredRowIterator queryMemtableAndDisk(ColumnFamilyStore cfs, ReadExe
assert executionController != null && executionController.validForReadOn(cfs);
Tracing.trace("Executing single-partition query on {}", cfs.name);

return queryMemtableAndDiskInternal(cfs, executionController);
Tracing.trace("Acquiring sstable references");
ColumnFamilyStore.ViewFragment view = cfs.select(View.select(SSTableSet.LIVE, partitionKey()));
return queryMemtableAndDiskInternal(cfs, view, null, executionController);
}

public UnfilteredRowIterator queryMemtableAndDisk(ColumnFamilyStore cfs,
ColumnFamilyStore.ViewFragment view,
Function<Object, Transformation<BaseRowIterator<?>>> rowTransformer,
ReadExecutionController executionController)
{
assert executionController != null && executionController.validForReadOn(cfs);
Tracing.trace("Executing single-partition query on {}", cfs.name);

return queryMemtableAndDiskInternal(cfs, view, rowTransformer, executionController);
}

private UnfilteredRowIterator queryMemtableAndDiskInternal(ColumnFamilyStore cfs, ReadExecutionController controller)
private UnfilteredRowIterator queryMemtableAndDiskInternal(ColumnFamilyStore cfs,
ColumnFamilyStore.ViewFragment view,
Function<Object, Transformation<BaseRowIterator<?>>> rowTransformer,
ReadExecutionController controller)
{
/*
* We have 2 main strategies:
Expand All @@ -753,11 +770,9 @@ private UnfilteredRowIterator queryMemtableAndDiskInternal(ColumnFamilyStore cfs
&& !queriesMulticellType()
&& !controller.isTrackingRepairedStatus())
{
return queryMemtableAndSSTablesInTimestampOrder(cfs, (ClusteringIndexNamesFilter)clusteringIndexFilter(), controller);
return queryMemtableAndSSTablesInTimestampOrder(cfs, view, rowTransformer, (ClusteringIndexNamesFilter)clusteringIndexFilter(), controller);
}

Tracing.trace("Acquiring sstable references");
ColumnFamilyStore.ViewFragment view = cfs.select(View.select(SSTableSet.LIVE, partitionKey()));
view.sstables.sort(SSTableReader.maxTimestampDescending);
ClusteringIndexFilter filter = clusteringIndexFilter();
long minTimestamp = Long.MAX_VALUE;
Expand All @@ -776,6 +791,9 @@ private UnfilteredRowIterator queryMemtableAndDiskInternal(ColumnFamilyStore cfs
if (memtable.getMinTimestamp() != Memtable.NO_MIN_TIMESTAMP)
minTimestamp = Math.min(minTimestamp, memtable.getMinTimestamp());

if (rowTransformer != null)
iter = Transformation.apply(iter, rowTransformer.apply(memtable));

// Memtable data is always considered unrepaired
controller.updateMinOldestUnrepairedTombstone(memtable.getMinLocalDeletionTime());
inputCollector.addMemtableIterator(RTBoundValidator.validate(iter, RTBoundValidator.Stage.MEMTABLE, false));
Expand Down Expand Up @@ -835,6 +853,9 @@ private UnfilteredRowIterator queryMemtableAndDiskInternal(ColumnFamilyStore cfs
UnfilteredRowIterator iter = intersects ? makeRowIteratorWithLowerBound(cfs, sstable, metricsCollector)
: makeRowIteratorWithSkippedNonStaticContent(cfs, sstable, metricsCollector);

if (rowTransformer != null)
iter = Transformation.apply(iter, rowTransformer.apply(sstable.getId()));

inputCollector.addSSTableIterator(sstable, iter);
mostRecentPartitionTombstone = Math.max(mostRecentPartitionTombstone,
iter.partitionLevelDeletion().markedForDeleteAt());
Expand All @@ -857,6 +878,10 @@ private UnfilteredRowIterator queryMemtableAndDiskInternal(ColumnFamilyStore cfs
{
if (!sstable.isRepaired())
controller.updateMinOldestUnrepairedTombstone(sstable.getMinLocalDeletionTime());

if (rowTransformer != null)
iter = Transformation.apply(iter, rowTransformer.apply(sstable.getId()));

inputCollector.addSSTableIterator(sstable, iter);
includedDueToTombstones++;
mostRecentPartitionTombstone = Math.max(mostRecentPartitionTombstone,
Expand Down Expand Up @@ -996,11 +1021,8 @@ private boolean queriesMulticellType()
* no collection or counters are included).
* This method assumes the filter is a {@code ClusteringIndexNamesFilter}.
*/
private UnfilteredRowIterator queryMemtableAndSSTablesInTimestampOrder(ColumnFamilyStore cfs, ClusteringIndexNamesFilter filter, ReadExecutionController controller)
private UnfilteredRowIterator queryMemtableAndSSTablesInTimestampOrder(ColumnFamilyStore cfs, ColumnFamilyStore.ViewFragment view, Function<Object, Transformation<BaseRowIterator<?>>> rowTransformer, ClusteringIndexNamesFilter filter, ReadExecutionController controller)
{
Tracing.trace("Acquiring sstable references");
ColumnFamilyStore.ViewFragment view = cfs.select(View.select(SSTableSet.LIVE, partitionKey()));

ImmutableBTreePartition result = null;
SSTableReadMetricsCollector metricsCollector = new SSTableReadMetricsCollector();

Expand All @@ -1012,7 +1034,9 @@ private UnfilteredRowIterator queryMemtableAndSSTablesInTimestampOrder(ColumnFam
if (iter == null)
continue;

result = add(RTBoundValidator.validate(iter, RTBoundValidator.Stage.MEMTABLE, false),
UnfilteredRowIterator wrapped = rowTransformer != null ? Transformation.apply(iter, rowTransformer.apply(memtable))
: iter;
result = add(RTBoundValidator.validate(wrapped, RTBoundValidator.Stage.MEMTABLE, false),
result,
filter,
false,
Expand Down Expand Up @@ -1067,7 +1091,10 @@ private UnfilteredRowIterator queryMemtableAndSSTablesInTimestampOrder(ColumnFam
}
else
{
result = add(RTBoundValidator.validate(iter, RTBoundValidator.Stage.SSTABLE, false),
UnfilteredRowIterator wrapped = rowTransformer != null ? Transformation.apply(iter, rowTransformer.apply(sstable.getId()))
: iter;

result = add(RTBoundValidator.validate(wrapped, RTBoundValidator.Stage.SSTABLE, false),
result,
filter,
sstable.isRepaired(),
Expand All @@ -1082,8 +1109,9 @@ private UnfilteredRowIterator queryMemtableAndSSTablesInTimestampOrder(ColumnFam
{
if (iter.isEmpty())
continue;

result = add(RTBoundValidator.validate(iter, RTBoundValidator.Stage.SSTABLE, false),
UnfilteredRowIterator wrapped = rowTransformer != null ? Transformation.apply(iter, rowTransformer.apply(sstable.getId()))
: iter;
result = add(RTBoundValidator.validate(wrapped, RTBoundValidator.Stage.SSTABLE, false),
result,
filter,
sstable.isRepaired(),
Expand Down
6 changes: 3 additions & 3 deletions src/java/org/apache/cassandra/db/lifecycle/Tracker.java
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,7 @@ public Memtable switchMemtable(boolean truncating, Memtable newMemtable)
if (truncating)
notifyRenewed(newMemtable);
else
notifySwitched(result.left.getCurrentMemtable());
notifySwitched(result.left.getCurrentMemtable(), result.right.getCurrentMemtable());

return result.left.getCurrentMemtable();
}
Expand Down Expand Up @@ -577,9 +577,9 @@ public void notifyRenewed(Memtable renewed)
notify(new MemtableRenewedNotification(renewed));
}

public void notifySwitched(Memtable previous)
public void notifySwitched(Memtable previous, Memtable next)
{
notify(new MemtableSwitchedNotification(previous));
notify(new MemtableSwitchedNotification(previous, next));
}

public void notifyDiscarded(Memtable discarded)
Expand Down
8 changes: 2 additions & 6 deletions src/java/org/apache/cassandra/index/sai/QueryContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,6 @@ public class QueryContext
* */
public boolean hasUnrepairedMatches = false;

private VectorQueryContext vectorContext;

public QueryContext(ReadCommand readCommand, long executionQuotaMs)
{
this.readCommand = readCommand;
Expand All @@ -94,10 +92,8 @@ public void checkpoint()
}
}

public VectorQueryContext vectorContext()
public int limit()
{
if (vectorContext == null)
vectorContext = new VectorQueryContext(readCommand);
return vectorContext;
return readCommand.limits().count();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import org.apache.cassandra.notifications.INotificationConsumer;
import org.apache.cassandra.notifications.MemtableDiscardedNotification;
import org.apache.cassandra.notifications.MemtableRenewedNotification;
import org.apache.cassandra.notifications.MemtableSwitchedNotification;
import org.apache.cassandra.notifications.SSTableAddedNotification;
import org.apache.cassandra.notifications.SSTableListChangedNotification;
import org.apache.cassandra.schema.TableMetadata;
Expand Down Expand Up @@ -277,6 +278,10 @@ else if (notification instanceof MemtableRenewedNotification)
{
indexes.forEach(index -> index.memtableIndexManager().renewMemtable(((MemtableRenewedNotification) notification).renewed));
}
else if (notification instanceof MemtableSwitchedNotification)
{
indexes.forEach(index -> index.memtableIndexManager().maybeInitializeMemtableIndex(((MemtableSwitchedNotification) notification).next));
}
else if (notification instanceof MemtableDiscardedNotification)
{
indexes.forEach(index -> index.memtableIndexManager().discardMemtable(((MemtableDiscardedNotification) notification).memtable));
Expand Down
Loading