Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 2 additions & 14 deletions conf/cassandra.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2923,24 +2923,12 @@ compression_dictionary_cache_size: 10
# Min unit: s
compression_dictionary_cache_expire: 24h

# Dictionary training configuration (advanced settings)
# These settings control how compression dictionaries are trained from sample data.

# Maximum size of a trained compression dictionary.
# Larger dictionaries may provide better compression but use more memory.
compression_dictionary_training_max_dictionary_size: 64KiB

# Maximum total size of sample data to collect for dictionary training.
# More sample data generally produces better dictionaries but takes longer to train.
# The recommended sample size is 100x the dictionary size.
compression_dictionary_training_max_total_sample_size: 10MiB

# Enable automatic dictionary training based on sampling of write operations.
# When enabled, the system will automatically collect samples and train new dictionaries.
# Manual training via nodetool is always available regardless of this setting.
compression_dictionary_training_auto_train_enabled: false

# Sampling rate for automatic dictionary training (1-10000).
# Value of 100 means 1% of writes are sampled. Lower values reduce overhead but may
# Sampling rate for automatic dictionary training (0.01-1).
# Value of 0.01 means 1% of writes are sampled. Lower values reduce overhead but may
# result in less representative sample data for dictionary training.
compression_dictionary_training_sampling_rate: 0.01
16 changes: 2 additions & 14 deletions conf/cassandra_latest.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2662,24 +2662,12 @@ compression_dictionary_cache_size: 10
# Min unit: s
compression_dictionary_cache_expire: 24h

# Dictionary training configuration (advanced settings)
# These settings control how compression dictionaries are trained from sample data.

# Maximum size of a trained compression dictionary.
# Larger dictionaries may provide better compression but use more memory.
compression_dictionary_training_max_dictionary_size: 64KiB

# Maximum total size of sample data to collect for dictionary training.
# More sample data generally produces better dictionaries but takes longer to train.
# The recommended sample size is 100x the dictionary size.
compression_dictionary_training_max_total_sample_size: 10MiB

# Enable automatic dictionary training based on sampling of write operations.
# When enabled, the system will automatically collect samples and train new dictionaries.
# Manual training via nodetool is always available regardless of this setting.
compression_dictionary_training_auto_train_enabled: false

# Sampling rate for automatic dictionary training (1-10000).
# Value of 100 means 1% of writes are sampled. Lower values reduce overhead but may
# Sampling rate for automatic dictionary training (0.01-1).
# Value of 0.01 means 1% of writes are sampled. Lower values reduce overhead but may
# result in less representative sample data for dictionary training.
compression_dictionary_training_sampling_rate: 0.01
48 changes: 35 additions & 13 deletions doc/modules/cassandra/pages/managing/operating/compression.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -148,12 +148,12 @@ Enable automatic training in `cassandra.yaml`:
[source,yaml]
----
compression_dictionary_training_auto_train_enabled: true
compression_dictionary_training_sampling_rate: 100 # 1% of writes
compression_dictionary_training_sampling_rate: 0.01 # 1% of writes
----

When enabled, Cassandra automatically samples write operations and
trains dictionaries in the background based on the configured sampling
rate (range: 1-10000, where 100 = 1% of writes).
rate (range: 0.01-1, where 0.01 = 1% of writes).

=== Dictionary Storage and Distribution

Expand Down Expand Up @@ -298,17 +298,11 @@ next access.

=== Training Configuration

* `compression_dictionary_training_max_dictionary_size` (default: `65536`):
Maximum size of trained dictionaries in bytes. Larger dictionaries can
capture more patterns but increase memory overhead.
* `compression_dictionary_training_max_total_sample_size` (default:
`10485760`): Maximum total size of sample data to collect for training,
approximately 10MB.
* `compression_dictionary_training_auto_train_enabled` (default: `false`):
Enable automatic background dictionary training. When enabled, Cassandra
samples writes and trains dictionaries automatically.
* `compression_dictionary_training_sampling_rate` (default: `100`):
Sampling rate for automatic training, range 1-10000 where 100 = 1% of
* `compression_dictionary_training_sampling_rate` (default: `0.01`):
Sampling rate for automatic training, range 0.01-1 where 0.01 = 1% of
writes. Lower values reduce training overhead but may miss data patterns.

Example configuration:
Expand All @@ -323,11 +317,34 @@ compression_dictionary_cache_expire: 3600

# Automatic training
compression_dictionary_training_auto_train_enabled: false
compression_dictionary_training_sampling_rate: 100
compression_dictionary_training_max_dictionary_size: 65536
compression_dictionary_training_max_total_sample_size: 10485760
compression_dictionary_training_sampling_rate: 0.01
----

=== CQL training parameters:

These parameters are meant to be configured via CQL for each respective table if defaults are not appropriate.

* `training_max_total_sample_size` (default: `10MiB`): Maximum total size of sample data to collect for training, approximately 10MB. This parameter is configured in the
table's compression options for `ZstdDictionaryCompressor`.
* `training_max_dictionary_size` (default: `64KiB`): Maximum size of trained dictionaries in bytes. Larger dictionaries can capture more patterns but increase memory overhead. This is a parameter of `ZstdDictionaryCompressor` of a table, in `compression` section.

Example:

[source,cql]
----
ALTER TABLE keyspace.table
WITH compression = {
'class': 'ZstdDictionaryCompressor',
'compression_level': '3',
'training_max_total_sample_size': '20MiB',
'training_max_dictionary_size': '128KiB'
};
----

It is possible to override these training parameters by `nodetool compressiondictionary train` command as
explained in the section futher down below. If `train` subcommand do not override them, CQL parameters are
taken into account.

== Other options

* `crc_check_chance` (default: `1.0`): determines how likely Cassandra
Expand Down Expand Up @@ -417,6 +434,11 @@ There are these four commands for now related to compression dictionaries:
by a specific id, to a file.
* import - a user can import a compression dictionary, exported by above command, from a file to a cluster.

For `train` subcommand, it is possible to specify:

* `--max-dict-size` - overrides `training_max_dictionary_size` in CQL `compression` configuration.
* `--max-total-sample-size` - overrides `training_max_total_sample_size` in CQL `compression` configuration.

Importing a dictionary to a table from a file should happen only against one node at a time as
dictionary will be eventually stored in `system_distributed.compression_dictionaries` table and reused
cluster-wide. When imports happen from multiple nodes, the highest-version dictionary will be used.
Expand Down
2 changes: 0 additions & 2 deletions src/java/org/apache/cassandra/config/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -524,8 +524,6 @@ public static class SSTableConfig
public volatile DurationSpec.IntSecondsBound compression_dictionary_cache_expire = new DurationSpec.IntSecondsBound("24h");

// Dictionary training settings
public volatile DataStorageSpec.IntKibibytesBound compression_dictionary_training_max_dictionary_size = new DataStorageSpec.IntKibibytesBound("64KiB");
public volatile DataStorageSpec.IntKibibytesBound compression_dictionary_training_max_total_sample_size = new DataStorageSpec.IntKibibytesBound("10MiB");
public volatile boolean compression_dictionary_training_auto_train_enabled = false;
public volatile float compression_dictionary_training_sampling_rate = 0.01f; // samples 1%

Expand Down
13 changes: 3 additions & 10 deletions src/java/org/apache/cassandra/config/DatabaseDescriptor.java
Original file line number Diff line number Diff line change
Expand Up @@ -1236,6 +1236,9 @@ else if (conf.max_value_size.toMebibytes() >= 2048)
{
throw new ConfigurationException(ex.getMessage());
}

if (conf.compression_dictionary_training_sampling_rate <= 0.0f || conf.compression_dictionary_training_sampling_rate > 1.0f)
throw new ConfigurationException("Sampling rate has to be between (0.0;1], it is " + conf.compression_dictionary_training_sampling_rate);
}

@VisibleForTesting
Expand Down Expand Up @@ -4423,16 +4426,6 @@ public static int getCompressionDictionaryCacheExpireSeconds()
return conf.compression_dictionary_cache_expire.toSeconds();
}

public static int getCompressionDictionaryTrainingMaxDictionarySize()
{
return conf.compression_dictionary_training_max_dictionary_size.toBytes();
}

public static int getCompressionDictionaryTrainingMaxTotalSampleSize()
{
return conf.compression_dictionary_training_max_total_sample_size.toBytes();
}

public static boolean getCompressionDictionaryTrainingAutoTrainEnabled()
{
return conf.compression_dictionary_training_auto_train_enabled;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -344,13 +344,12 @@ public ICompressor createCompressor(CompressionDictionary dictionary)
@Override
public ICompressionDictionaryTrainer createTrainer(String keyspaceName,
String tableName,
CompressionDictionaryTrainingConfig config,
ICompressor compressor)
{
Preconditions.checkArgument(compressor instanceof ZstdDictionaryCompressor,
"Expected compressor to be ZstdDictionaryCompressor; actual: %s",
compressor.getClass().getSimpleName());
return new ZstdDictionaryTrainer(keyspaceName, tableName, config, ((ZstdDictionaryCompressor) compressor).compressionLevel());
return new ZstdDictionaryTrainer(keyspaceName, tableName, ((ZstdDictionaryCompressor) compressor).compressionLevel());
}
};

Expand All @@ -377,13 +376,11 @@ public ICompressionDictionaryTrainer createTrainer(String keyspaceName,
*
* @param keyspaceName the keyspace name
* @param tableName the table name
* @param config the training configuration
* @param compressor the compressor to use for training
* @return a dictionary trainer instance
*/
public abstract ICompressionDictionaryTrainer createTrainer(String keyspaceName,
String tableName,
CompressionDictionaryTrainingConfig config,
ICompressor compressor);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.annotation.Nullable;
import javax.management.openmbean.CompositeData;
Expand All @@ -30,6 +31,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.cassandra.config.DataStorageSpec;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.compression.CompressionDictionary.LightweightCompressionDictionary;
Expand All @@ -41,6 +43,10 @@
import org.apache.cassandra.utils.MBeanWrapper.OnException;

import static java.lang.String.format;
import static org.apache.cassandra.io.compress.IDictionaryCompressor.DEFAULT_TRAINING_MAX_DICTIONARY_SIZE_PARAMETER_VALUE;
import static org.apache.cassandra.io.compress.IDictionaryCompressor.DEFAULT_TRAINING_MAX_TOTAL_SAMPLE_SIZE_PARAMETER_VALUE;
import static org.apache.cassandra.io.compress.IDictionaryCompressor.TRAINING_MAX_DICTIONARY_SIZE_PARAMETER_NAME;
import static org.apache.cassandra.io.compress.IDictionaryCompressor.TRAINING_MAX_TOTAL_SAMPLE_SIZE_PARAMETER_NAME;

public class CompressionDictionaryManager implements CompressionDictionaryManagerMBean,
ICompressionDictionaryCache,
Expand Down Expand Up @@ -75,13 +81,12 @@ public CompressionDictionaryManager(ColumnFamilyStore columnFamilyStore, boolean
{
// Initialize components
this.trainer = ICompressionDictionaryTrainer.create(keyspaceName, tableName,
columnFamilyStore.metadata().params.compression,
createTrainingConfig());
columnFamilyStore.metadata().params.compression);
trainer.setDictionaryTrainedListener(this::handleNewDictionary);

scheduler.scheduleRefreshTask();

trainer.start(false);
trainer.start(false, createTrainingConfig());
}

if (registerBookkeeping && isEnabled)
Expand Down Expand Up @@ -132,7 +137,7 @@ public synchronized void maybeReloadFromSchema(CompressionParams newParams)
}
}

trainer = ICompressionDictionaryTrainer.create(keyspaceName, tableName, newParams, createTrainingConfig());
trainer = ICompressionDictionaryTrainer.create(keyspaceName, tableName, newParams);
trainer.setDictionaryTrainedListener(this::handleNewDictionary);
}

Expand All @@ -141,7 +146,7 @@ public synchronized void maybeReloadFromSchema(CompressionParams newParams)
// Start trainer if it exists
if (trainer != null)
{
trainer.start(false);
trainer.start(false, createTrainingConfig());
}
return;
}
Expand Down Expand Up @@ -172,7 +177,7 @@ public void addSample(ByteBuffer sample)
dictionaryTrainer.addSample(sample);
}
}

@Nullable
@Override
public CompressionDictionary getCurrent()
Expand Down Expand Up @@ -211,7 +216,7 @@ public void onNewDictionaryAvailable(CompressionDictionary.DictId dictionaryId)
}

@Override
public synchronized void train(boolean force)
public synchronized void train(boolean force, Map<String, String> parameters)
{
// Validate table supports dictionary compression
if (!isEnabled)
Expand All @@ -224,12 +229,15 @@ public synchronized void train(boolean force)
throw new IllegalStateException("Dictionary trainer is not available for table " + keyspaceName + '.' + tableName);
}

// resolve training config and fail fast when invalid, so we do not reach logic which would e.g. flush unnecessarily.
CompressionDictionaryTrainingConfig trainingConfig = createTrainingConfig(parameters);

// SSTable-based training: sample from existing SSTables
Set<SSTableReader> sstables = columnFamilyStore.getLiveSSTables();
if (sstables.isEmpty())
{
logger.info("No SSTables available for training in table {}.{}, flushing memtable first",
keyspaceName, tableName);
keyspaceName, tableName);
columnFamilyStore.forceBlockingFlush(ColumnFamilyStore.FlushReason.USER_FORCED);
sstables = columnFamilyStore.getLiveSSTables();

Expand All @@ -240,10 +248,10 @@ public synchronized void train(boolean force)
}

logger.info("Starting SSTable-based training for {}.{} with {} SSTables",
keyspaceName, tableName, sstables.size());
keyspaceName, tableName, sstables.size());

trainer.start(true);
scheduler.scheduleSSTableBasedTraining(trainer, sstables, createTrainingConfig(), force);
trainer.start(true, trainingConfig);
scheduler.scheduleSSTableBasedTraining(trainer, sstables, trainingConfig, force);
}

@Override
Expand Down Expand Up @@ -356,18 +364,71 @@ private void handleNewDictionary(CompressionDictionary dictionary)
onNewDictionaryTrained(dictionary.dictId());
}

/**
* @return training configuration with max dictionary size and total sample size from CQL table compression params.
*/
private CompressionDictionaryTrainingConfig createTrainingConfig()
{
return createTrainingConfig(Map.of());
}

/**
* Returns configuration for training where max dictionary size and total sample size can be supplied by a
* user, e.g. upon the invocation of training method via JMX.
*
* @param parameters user-supplied parameters from training, when not specified, CQL compression parameters
* for a given table will be used
* @return training configuration with max dictionary size and total sample size of supplied arguments.
*/
private CompressionDictionaryTrainingConfig createTrainingConfig(Map<String, String> parameters)
{
CompressionParams compressionParams = columnFamilyStore.metadata().params.compression;
return CompressionDictionaryTrainingConfig
.builder()
.maxDictionarySize(DatabaseDescriptor.getCompressionDictionaryTrainingMaxDictionarySize())
.maxTotalSampleSize(DatabaseDescriptor.getCompressionDictionaryTrainingMaxTotalSampleSize())
.maxDictionarySize(getCompressionDictionaryTrainingMaxDictionarySize(compressionParams, parameters))
.maxTotalSampleSize(getCompressionDictionaryTrainingMaxTotalSampleSize(compressionParams, parameters))
.samplingRate(DatabaseDescriptor.getCompressionDictionaryTrainingSamplingRate())
.chunkSize(compressionParams.chunkLength())
.build();
}

private int getCompressionDictionaryTrainingMaxDictionarySize(CompressionParams compressionParams, Map<String, String> parameters)
{
return internalTrainingParameterResolution(compressionParams,
parameters.get(TRAINING_MAX_DICTIONARY_SIZE_PARAMETER_NAME),
TRAINING_MAX_DICTIONARY_SIZE_PARAMETER_NAME,
DEFAULT_TRAINING_MAX_DICTIONARY_SIZE_PARAMETER_VALUE);
}

private int getCompressionDictionaryTrainingMaxTotalSampleSize(CompressionParams compressionParams, Map<String, String> parameters)
{
return internalTrainingParameterResolution(compressionParams,
parameters.get(TRAINING_MAX_TOTAL_SAMPLE_SIZE_PARAMETER_NAME),
TRAINING_MAX_TOTAL_SAMPLE_SIZE_PARAMETER_NAME,
DEFAULT_TRAINING_MAX_TOTAL_SAMPLE_SIZE_PARAMETER_VALUE);
}

private int internalTrainingParameterResolution(CompressionParams compressionParams,
String userSuppliedValue,
String parameterName,
String defaultParameterValue)
{
String resolvedValue = null;
try
{
if (userSuppliedValue == null)
resolvedValue = compressionParams.getOtherOptions().getOrDefault(parameterName, defaultParameterValue);
else
resolvedValue = userSuppliedValue;

return new DataStorageSpec.IntKibibytesBound(resolvedValue).toBytes();
}
catch (Throwable t)
{
throw new IllegalArgumentException(String.format("Invalid value for %s: %s", parameterName, resolvedValue));
}
}

private void storeDictionary(CompressionDictionary dictionary)
{
if (!isEnabled)
Expand All @@ -383,7 +444,7 @@ private void storeDictionary(CompressionDictionary dictionary)
* Determines if a new trainer should be created based on compression parameter changes.
* A new trainer is needed when no existing trainer exists or when the existing trainer
* is not compatible with the new compression parameters.
*
* <p>
* The method is (and should be) only invoked inside {@link #maybeReloadFromSchema(CompressionParams)},
* which is guarded by synchronized.
*
Expand Down
Loading