Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
5.1
* Introduce created_at column to system_distributed.compression_dictionaries (CASSANDRA-21178)
* Be able to detect and remove orphaned compression dictionaries (CASSANDRA-21157)
* Fix BigTableVerifier to only read a data file during extended verification (CASSANDRA-21150)
* Reduce memory allocation during transformation of BatchStatement to Mutation (CASSANDRA-21141)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.io.DataOutput;
import java.io.EOFException;
import java.io.IOException;
import java.time.Instant;
import java.util.Objects;

import javax.annotation.Nullable;
Expand All @@ -34,6 +35,7 @@
import org.apache.cassandra.cql3.UntypedResultSet;
import org.apache.cassandra.io.compress.ICompressor;
import org.apache.cassandra.io.compress.ZstdDictionaryCompressor;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.concurrent.Ref;

/**
Expand Down Expand Up @@ -119,6 +121,14 @@ default Kind kind()
return dictId().kind;
}

/**
* Returns a date of creation of this dictionary. By creation, we mean when
* the compression object was instantiated.
*
* @return when was this dictionary created.
*/
Instant createdAt();

/**
* Returns a reference from lazily initialized reference counter.
*
Expand Down Expand Up @@ -262,6 +272,7 @@ static LightweightCompressionDictionary createFromRowLightweight(UntypedResultSe
String keyspaceName = row.getString("keyspace_name");
String tableName = row.getString("table_name");
String tableId = row.getString("table_id");
Instant createdAt = row.getTimestamp("created_at").toInstant();

try
{
Expand All @@ -270,7 +281,8 @@ static LightweightCompressionDictionary createFromRowLightweight(UntypedResultSe
tableId,
new DictId(CompressionDictionary.Kind.valueOf(kindStr), dictId),
checksum,
size);
size,
createdAt);
}
catch (IllegalArgumentException ex)
{
Expand All @@ -285,6 +297,7 @@ static CompressionDictionary createFromRow(UntypedResultSet.Row row)
byte[] dict = row.getByteArray("dict");
int storedLength = row.getInt("dict_length");
int storedChecksum = row.getInt("dict_checksum");
Instant createdAt = row.getTimestamp("created_at").toInstant();

try
{
Expand All @@ -305,7 +318,7 @@ static CompressionDictionary createFromRow(UntypedResultSet.Row row)
kindStr, dictId, storedChecksum, calculatedChecksum));
}

return kind.createDictionary(new DictId(kind, dictId), row.getByteArray("dict"), storedChecksum);
return kind.createDictionary(new DictId(kind, dictId), row.getByteArray("dict"), storedChecksum, createdAt);
}
catch (IllegalArgumentException ex)
{
Expand All @@ -330,9 +343,9 @@ enum Kind
ZSTD
{
@Override
public CompressionDictionary createDictionary(DictId dictId, byte[] dict, int checksum)
public CompressionDictionary createDictionary(DictId dictId, byte[] dict, int checksum, Instant createdAt)
{
return new ZstdCompressionDictionary(dictId, dict, checksum);
return new ZstdCompressionDictionary(dictId, dict, checksum, createdAt);
}

@Override
Expand Down Expand Up @@ -364,7 +377,21 @@ public ICompressionDictionaryTrainer createTrainer(String keyspaceName,
* @param checksum checksum of this dictionary
* @return a compression dictionary instance
*/
public abstract CompressionDictionary createDictionary(CompressionDictionary.DictId dictId, byte[] dict, int checksum);
public CompressionDictionary createDictionary(CompressionDictionary.DictId dictId, byte[] dict, int checksum)
{
return createDictionary(dictId, dict, checksum, FBUtilities.now());
}

/**
* Creates a compression dictionary instance for this kind
*
* @param dictId the dictionary identifier
* @param dict the raw dictionary bytes
* @param checksum checksum of this dictionary
* @param createdAt creation date of to-be-constructed dictionary
* @return a compression dictionary instance
*/
public abstract CompressionDictionary createDictionary(CompressionDictionary.DictId dictId, byte[] dict, int checksum, Instant createdAt);

/**
* Creates a dictionary compressor for this kind
Expand Down Expand Up @@ -436,20 +463,23 @@ class LightweightCompressionDictionary
public final DictId dictId;
public final int checksum;
public final int size;
public final Instant createdAt;

public LightweightCompressionDictionary(String keyspaceName,
String tableName,
String tableId,
DictId dictId,
int checksum,
int size)
int size,
Instant createdAt)
{
this.keyspaceName = keyspaceName;
this.tableName = tableName;
this.tableId = tableId;
this.dictId = dictId;
this.checksum = checksum;
this.size = size;
this.createdAt = createdAt;
}

@Override
Expand All @@ -462,6 +492,7 @@ public String toString()
", dictId=" + dictId +
", checksum=" + checksum +
", size=" + size +
", createdAt=" + createdAt +
'}';
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.cassandra.db.compression;

import java.time.Instant;
import java.util.Arrays;

import javax.management.openmbean.ArrayType;
Expand All @@ -33,7 +34,6 @@
import com.fasterxml.jackson.annotation.JsonProperty;

import org.apache.cassandra.db.compression.CompressionDictionary.LightweightCompressionDictionary;
import org.apache.cassandra.io.util.FileUtils;

import static java.lang.String.format;

Expand Down Expand Up @@ -63,7 +63,7 @@ public class CompressionDictionaryDetailsTabularData
public static final String KIND_NAME = "Kind";
public static final String CHECKSUM_NAME = "Checksum";
public static final String SIZE_NAME = "Size";

public static final String CREATED_AT_NAME = "CreatedAt";

private static final String[] ITEM_NAMES = new String[]{ KEYSPACE_NAME,
TABLE_NAME,
Expand All @@ -72,7 +72,8 @@ public class CompressionDictionaryDetailsTabularData
DICT_NAME,
KIND_NAME,
CHECKSUM_NAME,
SIZE_NAME };
SIZE_NAME,
CREATED_AT_NAME };

private static final String[] ITEM_DESCS = new String[]{ "keyspace",
"table",
Expand All @@ -81,7 +82,8 @@ public class CompressionDictionaryDetailsTabularData
"dictionary_bytes",
"kind",
"checksum",
"size" };
"size",
"created_at" };

private static final String TYPE_NAME = "DictionaryDetails";
private static final String ROW_DESC = "DictionaryDetails";
Expand All @@ -100,7 +102,8 @@ public class CompressionDictionaryDetailsTabularData
new ArrayType<String[]>(SimpleType.BYTE, true), // dict bytes
SimpleType.STRING, // kind
SimpleType.INTEGER, // checksum
SimpleType.INTEGER }; // size of dict bytes
SimpleType.INTEGER, // size of dict bytes
SimpleType.STRING }; // created_at

COMPOSITE_TYPE = new CompositeType(TYPE_NAME, ROW_DESC, ITEM_NAMES, ITEM_DESCS, ITEM_TYPES);
TABULAR_TYPE = new TabularType(TYPE_NAME, ROW_DESC, COMPOSITE_TYPE, ITEM_NAMES);
Expand Down Expand Up @@ -133,6 +136,7 @@ public static CompositeData fromLightweightCompressionDictionary(LightweightComp
dictionary.dictId.kind.name(),
dictionary.checksum,
dictionary.size,
dictionary.createdAt.toString()
});
}
catch (OpenDataException e)
Expand Down Expand Up @@ -166,6 +170,7 @@ public static CompositeData fromCompressionDictionary(String keyspace, String ta
dictionary.kind().name(),
dictionary.checksum(),
dictionary.rawDictionary().length,
dictionary.createdAt().toString(),
});
}
catch (OpenDataException e)
Expand Down Expand Up @@ -196,7 +201,8 @@ public static CompositeData fromCompressionDictionaryDataObject(CompressionDicti
dataObject.dict,
dataObject.kind,
dataObject.dictChecksum,
dataObject.dictLength
dataObject.dictLength,
dataObject.createdAt.toString()
});
}
catch (OpenDataException e)
Expand All @@ -221,7 +227,8 @@ public static CompressionDictionaryDataObject fromCompositeData(CompositeData co
(byte[]) compositeData.get(CompressionDictionaryDetailsTabularData.DICT_NAME),
(String) compositeData.get(CompressionDictionaryDetailsTabularData.KIND_NAME),
(Integer) compositeData.get(CompressionDictionaryDetailsTabularData.CHECKSUM_NAME),
(Integer) compositeData.get(CompressionDictionaryDetailsTabularData.SIZE_NAME));
(Integer) compositeData.get(CompressionDictionaryDetailsTabularData.SIZE_NAME),
Instant.parse((String) compositeData.get(CompressionDictionaryDetailsTabularData.CREATED_AT_NAME)));
}

public static class CompressionDictionaryDataObject
Expand All @@ -234,6 +241,7 @@ public static class CompressionDictionaryDataObject
public final String kind;
public final int dictChecksum;
public final int dictLength;
public final Instant createdAt;

@JsonCreator
public CompressionDictionaryDataObject(@JsonProperty("keyspace") String keyspace,
Expand All @@ -243,7 +251,8 @@ public CompressionDictionaryDataObject(@JsonProperty("keyspace") String keyspace
@JsonProperty("dict") byte[] dict,
@JsonProperty("kind") String kind,
@JsonProperty("dictChecksum") int dictChecksum,
@JsonProperty("dictLength") int dictLength)
@JsonProperty("dictLength") int dictLength,
@JsonProperty("createdAt") Instant createdAt)
{
this.keyspace = keyspace;
this.table = table;
Expand All @@ -253,6 +262,7 @@ public CompressionDictionaryDataObject(@JsonProperty("keyspace") String keyspace
this.kind = kind;
this.dictChecksum = dictChecksum;
this.dictLength = dictLength;
this.createdAt = createdAt;

validate();
}
Expand All @@ -269,6 +279,7 @@ public CompressionDictionaryDataObject(@JsonProperty("keyspace") String keyspace
* <li>dictLength is bigger than 0</li>
* <li>dictLength has to be equal to dict's length</li>
* <li>dictChecksum has to be equal to checksum computed as part of this method</li>
* <li>creation date is not null</li>
* </ul>
*/
private void validate()
Expand All @@ -278,15 +289,11 @@ private void validate()
if (table == null)
throw new IllegalArgumentException("Table not specified.");
if (tableId == null)
throw new IllegalArgumentException("Table id not specified");
throw new IllegalArgumentException("Table id not specified.");
if (dictId <= 0)
throw new IllegalArgumentException("Provided dictionary id must be positive but it is '" + dictId + "'.");
if (dict == null || dict.length == 0)
throw new IllegalArgumentException("Provided dictionary byte array is null or empty.");
if (dict.length > FileUtils.ONE_MIB)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed this here because when I was testing import / export with created_at, I realized that we can not import dictionary bigger than 1MiB BUT WE CAN TRAIN IT.

So we train > 1MiB but we can not import after export.

It is possible to override the configuration via nodetool or cql, there we do not check max size, we check that only on import ...

I can revert this change and treat is more robustly in a completely different ticket, hardening sizes on all levels (cql, nodetool ...), can go in even after 6.0-alpha1. If I remove it here, we will at least not see the discrepancy I described above.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the background! It helps to understand.

Dictionaries are attached to every SSTable. The size limit is added with this context. The size of the dictionaries are typically 64~100 KiB. That said, the underlying zstd trainer do allow train large dictionaries. The questions, do we want to train dictionaries larger than 1 MiB? The added dictionary size might outweighs the compression gains (64 KiB vs. 1 MiB dictionaries)

throw new IllegalArgumentException("Imported dictionary can not be larger than " +
FileUtils.ONE_MIB + " bytes, but it is " +
dict.length + " bytes.");
if (kind == null)
throw new IllegalArgumentException("Provided kind is null.");

Expand All @@ -305,6 +312,8 @@ private void validate()
throw new IllegalArgumentException("Size has to be strictly positive number, it is '" + dictLength + "'.");
if (dict.length != dictLength)
throw new IllegalArgumentException("The length of the provided dictionary array (" + dict.length + ") is not equal to provided length value (" + dictLength + ").");
if (createdAt == null)
throw new IllegalArgumentException("The creation date not specified.");

int checksumOfDictionaryToImport = CompressionDictionary.calculateChecksum((byte) dictionaryKind.ordinal(), dictId, dict);
if (checksumOfDictionaryToImport != dictChecksum)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.cassandra.db.compression;

import java.time.Instant;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
Expand All @@ -30,6 +31,7 @@
import org.slf4j.LoggerFactory;

import org.apache.cassandra.io.compress.ZstdCompressorBase;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.concurrent.Ref;
import org.apache.cassandra.utils.concurrent.RefCounted;
import org.apache.cassandra.utils.concurrent.SelfRefCounted;
Expand All @@ -45,21 +47,24 @@ public class ZstdCompressionDictionary implements CompressionDictionary, SelfRef
private final ConcurrentHashMap<Integer, ZstdDictCompress> zstdDictCompressPerLevel = new ConcurrentHashMap<>();
private final AtomicReference<ZstdDictDecompress> dictDecompress = new AtomicReference<>();
private volatile Ref<ZstdCompressionDictionary> selfRef;
private final Instant createdAt;

@VisibleForTesting
public ZstdCompressionDictionary(DictId dictId, byte[] rawDictionary)
{
this(dictId,
rawDictionary,
CompressionDictionary.calculateChecksum((byte) dictId.kind.ordinal(), dictId.id, rawDictionary));
CompressionDictionary.calculateChecksum((byte) dictId.kind.ordinal(), dictId.id, rawDictionary),
FBUtilities.now());
}

public ZstdCompressionDictionary(DictId dictId, byte[] rawDictionary, int checksum)
public ZstdCompressionDictionary(DictId dictId, byte[] rawDictionary, int checksum, Instant createdAt)
{
this.dictId = dictId;
this.rawDictionary = rawDictionary;
this.checksum = checksum;
this.selfRef = null;
this.createdAt = createdAt;
}

@Override
Expand All @@ -86,6 +91,12 @@ public int checksum()
return checksum;
}

@Override
public Instant createdAt()
{
return createdAt;
}

@Override
public int estimatedOccupiedMemoryBytes()
{
Expand Down
Loading