From 2cfa604086aea3263c013fd49a5bbc8fbf6d471b Mon Sep 17 00:00:00 2001 From: Stefan Miklosovic Date: Mon, 15 Dec 2025 18:26:51 +0100 Subject: [PATCH 1/6] CASSANDRA-21078 move training params to CQL --- conf/cassandra.yaml | 12 --- conf/cassandra_latest.yaml | 12 --- .../pages/managing/operating/compression.adoc | 38 +++++++-- .../org/apache/cassandra/config/Config.java | 2 - .../cassandra/config/DatabaseDescriptor.java | 10 --- .../CompressionDictionaryManager.java | 78 +++++++++++++++++-- .../CompressionDictionaryManagerMBean.java | 24 +++++- .../io/compress/IDictionaryCompressor.java | 39 +++++++++- .../io/compress/ZstdDictionaryCompressor.java | 12 ++- .../org/apache/cassandra/tools/NodeProbe.java | 18 +++-- .../CompressionDictionaryCommandGroup.java | 60 +++++++++++++- .../nodetool/help/compressiondictionary | 15 +++- .../nodetool/help/compressiondictionary$train | 18 ++++- .../CompressionDictionaryIntegrationTest.java | 70 ++++++++++++----- .../CompressionDictionarySchedulerTest.java | 8 +- 15 files changed, 326 insertions(+), 90 deletions(-) diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml index 5736890b5144..7f8812973234 100644 --- a/conf/cassandra.yaml +++ b/conf/cassandra.yaml @@ -2923,18 +2923,6 @@ compression_dictionary_cache_size: 10 # Min unit: s compression_dictionary_cache_expire: 24h -# Dictionary training configuration (advanced settings) -# These settings control how compression dictionaries are trained from sample data. - -# Maximum size of a trained compression dictionary. -# Larger dictionaries may provide better compression but use more memory. -compression_dictionary_training_max_dictionary_size: 64KiB - -# Maximum total size of sample data to collect for dictionary training. -# More sample data generally produces better dictionaries but takes longer to train. -# The recommended sample size is 100x the dictionary size. -compression_dictionary_training_max_total_sample_size: 10MiB - # Enable automatic dictionary training based on sampling of write operations. # When enabled, the system will automatically collect samples and train new dictionaries. # Manual training via nodetool is always available regardless of this setting. diff --git a/conf/cassandra_latest.yaml b/conf/cassandra_latest.yaml index b43f6cf45cb8..8adefe51a090 100644 --- a/conf/cassandra_latest.yaml +++ b/conf/cassandra_latest.yaml @@ -2662,18 +2662,6 @@ compression_dictionary_cache_size: 10 # Min unit: s compression_dictionary_cache_expire: 24h -# Dictionary training configuration (advanced settings) -# These settings control how compression dictionaries are trained from sample data. - -# Maximum size of a trained compression dictionary. -# Larger dictionaries may provide better compression but use more memory. -compression_dictionary_training_max_dictionary_size: 64KiB - -# Maximum total size of sample data to collect for dictionary training. -# More sample data generally produces better dictionaries but takes longer to train. -# The recommended sample size is 100x the dictionary size. -compression_dictionary_training_max_total_sample_size: 10MiB - # Enable automatic dictionary training based on sampling of write operations. # When enabled, the system will automatically collect samples and train new dictionaries. # Manual training via nodetool is always available regardless of this setting. diff --git a/doc/modules/cassandra/pages/managing/operating/compression.adoc b/doc/modules/cassandra/pages/managing/operating/compression.adoc index 22214e046bce..2e232b63744f 100644 --- a/doc/modules/cassandra/pages/managing/operating/compression.adoc +++ b/doc/modules/cassandra/pages/managing/operating/compression.adoc @@ -298,12 +298,6 @@ next access. === Training Configuration -* `compression_dictionary_training_max_dictionary_size` (default: `65536`): -Maximum size of trained dictionaries in bytes. Larger dictionaries can -capture more patterns but increase memory overhead. -* `compression_dictionary_training_max_total_sample_size` (default: -`10485760`): Maximum total size of sample data to collect for training, -approximately 10MB. * `compression_dictionary_training_auto_train_enabled` (default: `false`): Enable automatic background dictionary training. When enabled, Cassandra samples writes and trains dictionaries automatically. @@ -324,10 +318,33 @@ compression_dictionary_cache_expire: 3600 # Automatic training compression_dictionary_training_auto_train_enabled: false compression_dictionary_training_sampling_rate: 100 -compression_dictionary_training_max_dictionary_size: 65536 -compression_dictionary_training_max_total_sample_size: 10485760 ---- +=== CQL training parameters: + +These parameters are meant to be configured via CQL for each respective table if defaults are not appropriate. + +* `training_max_total_sample_size` (default: `10MiB`): Maximum total size of sample data to collect for training, approximately 10MB. This is a parameter of `ZstdDictionaryCompressor` +of a table, in `compression` section. +* `training_max_dictionary_size` (default: `64KiB`): Maximum size of trained dictionaries in bytes. Larger dictionaries can capture more patterns but increase memory overhead. This is a parameter of `ZstdDictionaryCompressor` of a table, in `compression` section. + +Example: + +[source,cql] +---- +ALTER TABLE keyspace.table + WITH compression = { + 'class': 'ZstdDictionaryCompressor', + 'compression_level': '3', + 'training_max_total_sample_size': '20MiB', + 'training_max_dictionary_size': '128KiB' + }; +---- + +It is possible to override these training parameters by `nodetool compressiondictionary train` command as +explained in the section futher down below. If `train` subcommand do not override them, CQL parameters are +taken into account. + == Other options * `crc_check_chance` (default: `1.0`): determines how likely Cassandra @@ -417,6 +434,11 @@ There are these four commands for now related to compression dictionaries: by a specific id, to a file. * import - a user can import a compression dictionary, exported by above command, from a file to a cluster. +For `train` subcommand, it is possible to specify: + +* `--max-dict-size` - overrides `training_max_dictionary_size` in CQL `compression` configuration. +* `--max-total-sample-size` - overrides `training_max_total_sample_size` in CQL `compression` configuration. + Importing a dictionary to a table from a file should happen only against one node at a time as dictionary will be eventually stored in `system_distributed.compression_dictionaries` table and reused cluster-wide. When imports happen from multiple nodes, the highest-version dictionary will be used. diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java index 2d517fc12795..492bb441abcb 100644 --- a/src/java/org/apache/cassandra/config/Config.java +++ b/src/java/org/apache/cassandra/config/Config.java @@ -524,8 +524,6 @@ public static class SSTableConfig public volatile DurationSpec.IntSecondsBound compression_dictionary_cache_expire = new DurationSpec.IntSecondsBound("24h"); // Dictionary training settings - public volatile DataStorageSpec.IntKibibytesBound compression_dictionary_training_max_dictionary_size = new DataStorageSpec.IntKibibytesBound("64KiB"); - public volatile DataStorageSpec.IntKibibytesBound compression_dictionary_training_max_total_sample_size = new DataStorageSpec.IntKibibytesBound("10MiB"); public volatile boolean compression_dictionary_training_auto_train_enabled = false; public volatile float compression_dictionary_training_sampling_rate = 0.01f; // samples 1% diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index 322fbf5ff48f..f2e8b5a64c8b 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -4423,16 +4423,6 @@ public static int getCompressionDictionaryCacheExpireSeconds() return conf.compression_dictionary_cache_expire.toSeconds(); } - public static int getCompressionDictionaryTrainingMaxDictionarySize() - { - return conf.compression_dictionary_training_max_dictionary_size.toBytes(); - } - - public static int getCompressionDictionaryTrainingMaxTotalSampleSize() - { - return conf.compression_dictionary_training_max_total_sample_size.toBytes(); - } - public static boolean getCompressionDictionaryTrainingAutoTrainEnabled() { return conf.compression_dictionary_training_auto_train_enabled; diff --git a/src/java/org/apache/cassandra/db/compression/CompressionDictionaryManager.java b/src/java/org/apache/cassandra/db/compression/CompressionDictionaryManager.java index d640fe35e380..da36d1817daf 100644 --- a/src/java/org/apache/cassandra/db/compression/CompressionDictionaryManager.java +++ b/src/java/org/apache/cassandra/db/compression/CompressionDictionaryManager.java @@ -20,6 +20,7 @@ import java.nio.ByteBuffer; import java.util.List; +import java.util.Map; import java.util.Set; import javax.annotation.Nullable; import javax.management.openmbean.CompositeData; @@ -30,6 +31,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.cassandra.config.DataStorageSpec; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.compression.CompressionDictionary.LightweightCompressionDictionary; @@ -41,6 +43,10 @@ import org.apache.cassandra.utils.MBeanWrapper.OnException; import static java.lang.String.format; +import static org.apache.cassandra.io.compress.IDictionaryCompressor.DEFAULT_TRAINING_MAX_DICTIONARY_SIZE_PARAMETER_VALUE; +import static org.apache.cassandra.io.compress.IDictionaryCompressor.DEFAULT_TRAINING_MAX_TOTAL_SAMPLE_SIZE_PARAMETER_VALUE; +import static org.apache.cassandra.io.compress.IDictionaryCompressor.TRAINING_MAX_DICTIONARY_SIZE_PARAMETER_NAME; +import static org.apache.cassandra.io.compress.IDictionaryCompressor.TRAINING_MAX_TOTAL_SAMPLE_SIZE_PARAMETER_NAME; public class CompressionDictionaryManager implements CompressionDictionaryManagerMBean, ICompressionDictionaryCache, @@ -172,7 +178,7 @@ public void addSample(ByteBuffer sample) dictionaryTrainer.addSample(sample); } } - + @Nullable @Override public CompressionDictionary getCurrent() @@ -211,7 +217,7 @@ public void onNewDictionaryAvailable(CompressionDictionary.DictId dictionaryId) } @Override - public synchronized void train(boolean force) + public synchronized void train(boolean force, Map parameters) { // Validate table supports dictionary compression if (!isEnabled) @@ -224,12 +230,15 @@ public synchronized void train(boolean force) throw new IllegalStateException("Dictionary trainer is not available for table " + keyspaceName + '.' + tableName); } + // resolve training config and fail fast when invalid, so we do not reach logic which would e.g. flush unnecessarily. + CompressionDictionaryTrainingConfig trainingConfig = createTrainingConfig(parameters); + // SSTable-based training: sample from existing SSTables Set sstables = columnFamilyStore.getLiveSSTables(); if (sstables.isEmpty()) { logger.info("No SSTables available for training in table {}.{}, flushing memtable first", - keyspaceName, tableName); + keyspaceName, tableName); columnFamilyStore.forceBlockingFlush(ColumnFamilyStore.FlushReason.USER_FORCED); sstables = columnFamilyStore.getLiveSSTables(); @@ -240,10 +249,10 @@ public synchronized void train(boolean force) } logger.info("Starting SSTable-based training for {}.{} with {} SSTables", - keyspaceName, tableName, sstables.size()); + keyspaceName, tableName, sstables.size()); trainer.start(true); - scheduler.scheduleSSTableBasedTraining(trainer, sstables, createTrainingConfig(), force); + scheduler.scheduleSSTableBasedTraining(trainer, sstables, trainingConfig, force); } @Override @@ -356,18 +365,71 @@ private void handleNewDictionary(CompressionDictionary dictionary) onNewDictionaryTrained(dictionary.dictId()); } + /** + * @return training configuration with max dictionary size and total sample size from CQL table compression params. + */ private CompressionDictionaryTrainingConfig createTrainingConfig() + { + return createTrainingConfig(Map.of()); + } + + /** + * Returns configuration for training where max dictionary size and total sample size can be supplied by a + * user, e.g. upon the invocation of training method via JMX. + * + * @param parameters user-supplied parameters from training, when not specified, CQL compression parameters + * for a given table will be used + * @return training configuration with max dictionary size and total sample size of supplied arguments. + */ + private CompressionDictionaryTrainingConfig createTrainingConfig(Map parameters) { CompressionParams compressionParams = columnFamilyStore.metadata().params.compression; return CompressionDictionaryTrainingConfig .builder() - .maxDictionarySize(DatabaseDescriptor.getCompressionDictionaryTrainingMaxDictionarySize()) - .maxTotalSampleSize(DatabaseDescriptor.getCompressionDictionaryTrainingMaxTotalSampleSize()) + .maxDictionarySize(getCompressionDictionaryTrainingMaxDictionarySize(compressionParams, parameters)) + .maxTotalSampleSize(getCompressionDictionaryTrainingMaxTotalSampleSize(compressionParams, parameters)) .samplingRate(DatabaseDescriptor.getCompressionDictionaryTrainingSamplingRate()) .chunkSize(compressionParams.chunkLength()) .build(); } + private int getCompressionDictionaryTrainingMaxDictionarySize(CompressionParams compressionParams, Map parameters) + { + return internalTrainingParameterResolution(compressionParams, + parameters.get(TRAINING_MAX_DICTIONARY_SIZE_PARAMETER_NAME), + TRAINING_MAX_DICTIONARY_SIZE_PARAMETER_NAME, + DEFAULT_TRAINING_MAX_DICTIONARY_SIZE_PARAMETER_VALUE); + } + + private int getCompressionDictionaryTrainingMaxTotalSampleSize(CompressionParams compressionParams, Map parameters) + { + return internalTrainingParameterResolution(compressionParams, + parameters.get(TRAINING_MAX_TOTAL_SAMPLE_SIZE_PARAMETER_NAME), + TRAINING_MAX_TOTAL_SAMPLE_SIZE_PARAMETER_NAME, + DEFAULT_TRAINING_MAX_TOTAL_SAMPLE_SIZE_PARAMETER_VALUE); + } + + private int internalTrainingParameterResolution(CompressionParams compressionParams, + String userSuppliedValue, + String parameterName, + String defaultParameterValue) + { + String resolvedValue = null; + try + { + if (userSuppliedValue != null) + resolvedValue = userSuppliedValue; + else + resolvedValue = compressionParams.getOtherOptions().getOrDefault(parameterName, defaultParameterValue); + + return new DataStorageSpec.IntKibibytesBound(resolvedValue).toBytes(); + } + catch (Throwable t) + { + throw new IllegalArgumentException(String.format("Invalid value for %s: %s", parameterName, resolvedValue)); + } + } + private void storeDictionary(CompressionDictionary dictionary) { if (!isEnabled) @@ -383,7 +445,7 @@ private void storeDictionary(CompressionDictionary dictionary) * Determines if a new trainer should be created based on compression parameter changes. * A new trainer is needed when no existing trainer exists or when the existing trainer * is not compatible with the new compression parameters. - * + *

* The method is (and should be) only invoked inside {@link #maybeReloadFromSchema(CompressionParams)}, * which is guarded by synchronized. * diff --git a/src/java/org/apache/cassandra/db/compression/CompressionDictionaryManagerMBean.java b/src/java/org/apache/cassandra/db/compression/CompressionDictionaryManagerMBean.java index 3dc349d115ce..04e805bd9930 100644 --- a/src/java/org/apache/cassandra/db/compression/CompressionDictionaryManagerMBean.java +++ b/src/java/org/apache/cassandra/db/compression/CompressionDictionaryManagerMBean.java @@ -18,6 +18,7 @@ package org.apache.cassandra.db.compression; +import java.util.Map; import javax.annotation.Nullable; import javax.management.openmbean.CompositeData; import javax.management.openmbean.TabularData; @@ -32,12 +33,31 @@ public interface CompressionDictionaryManagerMBean * If no SSTables are available, automatically flushes the memtable first. * This operation runs synchronously and blocks until training completes. * + * @param force force the dictionary training even if there are not enough samples; + * otherwise, dictionary training won't start if the trainer is not ready + * @param parameters parameters of training process + * @throws UnsupportedOperationException if table doesn't support dictionary compression + * @throws IllegalStateException if no SSTables available after flush + */ + void train(boolean force, Map parameters); + + /** + * Starts training from existing SSTables for this table. + * Samples chunks from all live SSTables and trains a compression dictionary. + * If no SSTables are available, automatically flushes the memtable first. + * This operation runs synchronously and blocks until training completes. + *

+ * Training parameters will be taken from CQL's compression section of a given table training is conducted on. + * * @param force force the dictionary training even if there are not enough samples; * otherwise, dictionary training won't start if the trainer is not ready * @throws UnsupportedOperationException if table doesn't support dictionary compression * @throws IllegalStateException if no SSTables available after flush */ - void train(boolean force); + default void train(boolean force) + { + train(force, Map.of()); + } /** * Gets the current training state for this table. @@ -77,7 +97,7 @@ public interface CompressionDictionaryManagerMBean * * @param dictionary compression dictionary to import * @throws IllegalArgumentException when dictionary to import is older (based on dictionary id) than - * the latest compression dictionary for given table, or when dictionary data are invalid + * the latest compression dictionary for given table, or when dictionary data are invalid * @throws IllegalStateException if underlying table does not support dictionary compression or * kind of dictionary to import does not match kind of dictionary table * is configured for diff --git a/src/java/org/apache/cassandra/io/compress/IDictionaryCompressor.java b/src/java/org/apache/cassandra/io/compress/IDictionaryCompressor.java index f6f7681f74ab..fd4ce62ea3a7 100644 --- a/src/java/org/apache/cassandra/io/compress/IDictionaryCompressor.java +++ b/src/java/org/apache/cassandra/io/compress/IDictionaryCompressor.java @@ -18,7 +18,11 @@ package org.apache.cassandra.io.compress; +import org.apache.cassandra.config.DataStorageSpec; import org.apache.cassandra.db.compression.CompressionDictionary; +import org.apache.cassandra.exceptions.ConfigurationException; + +import static java.lang.String.format; /** * Interface for compressors that support dictionary-based compression. @@ -26,18 +30,45 @@ * Dictionary compressors can use pre-trained compression dictionaries to achieve * better compression ratios, especially for small data chunks that are similar * to the training data used to create the dictionary. - * + * * @param the specific type of compression dictionary this compressor supports */ public interface IDictionaryCompressor { + String TRAINING_MAX_DICTIONARY_SIZE_PARAMETER_NAME = "training_max_dictionary_size"; + String DEFAULT_TRAINING_MAX_DICTIONARY_SIZE_PARAMETER_VALUE = "64KiB"; + + String TRAINING_MAX_TOTAL_SAMPLE_SIZE_PARAMETER_NAME = "training_max_total_sample_size"; + String DEFAULT_TRAINING_MAX_TOTAL_SAMPLE_SIZE_PARAMETER_VALUE = "10MiB"; + + /** + * Validates value of a parameter for training purposes. The value to validate should + * be accepted by {@link DataStorageSpec.IntKibibytesBound}. This method is used upon validation + * of input parameters in the implementations of dictionary compressor. + * + * @param parameterName name of a parameter to validate + * @param resolvedValue value to validate + */ + static void validateTrainingParameter(String parameterName, String resolvedValue) + { + try + { + new DataStorageSpec.IntKibibytesBound(resolvedValue).toBytes(); + } + catch (Throwable t) + { + throw new ConfigurationException(format("Unable to set value to parameter %s: %s. Reason: %s", + parameterName, resolvedValue, t.getMessage())); + } + } + /** * Returns a compressor instance configured with the specified compression dictionary. *
* This method may return the same instance if it already uses the given dictionary, * or create a new instance configured with the dictionary. The implementation should * be efficient and avoid unnecessary object creation when possible. - * + * * @param compressionDictionary the dictionary to use for compression/decompression * @return a compressor instance that will use the specified dictionary */ @@ -49,7 +80,7 @@ public interface IDictionaryCompressor * This is used to validate dictionary compatibility before attempting to use * a dictionary with this compressor. Only dictionaries of the returned kind * should be passed to {@link #getOrCopyWithDictionary(CompressionDictionary)}. - * + * * @return the compression dictionary kind supported by this compressor */ CompressionDictionary.Kind acceptableDictionaryKind(); @@ -60,7 +91,7 @@ public interface IDictionaryCompressor * The default implementation compares the dictionary's kind with the kind * returned by {@link #acceptableDictionaryKind()}. Compressor implementations * may override this method to provide more sophisticated compatibility checks. - * + * * @param dictionary the compression dictionary to check for compatibility * @return true if this compressor can use the dictionary, false otherwise */ diff --git a/src/java/org/apache/cassandra/io/compress/ZstdDictionaryCompressor.java b/src/java/org/apache/cassandra/io/compress/ZstdDictionaryCompressor.java index 59cba54b2003..d51285d69597 100644 --- a/src/java/org/apache/cassandra/io/compress/ZstdDictionaryCompressor.java +++ b/src/java/org/apache/cassandra/io/compress/ZstdDictionaryCompressor.java @@ -40,6 +40,8 @@ import javax.annotation.Nullable; +import static org.apache.cassandra.io.compress.IDictionaryCompressor.validateTrainingParameter; + public class ZstdDictionaryCompressor extends ZstdCompressorBase implements ICompressor, IDictionaryCompressor { private static final ConcurrentHashMap instancesPerLevel = new ConcurrentHashMap<>(); @@ -76,6 +78,12 @@ public static ZstdDictionaryCompressor create(Map options) { int level = getOrDefaultCompressionLevel(options); validateCompressionLevel(level); + validateTrainingParameter(TRAINING_MAX_DICTIONARY_SIZE_PARAMETER_NAME, + options.getOrDefault(TRAINING_MAX_DICTIONARY_SIZE_PARAMETER_NAME, + DEFAULT_TRAINING_MAX_DICTIONARY_SIZE_PARAMETER_VALUE)); + validateTrainingParameter(TRAINING_MAX_TOTAL_SAMPLE_SIZE_PARAMETER_NAME, + options.getOrDefault(TRAINING_MAX_TOTAL_SAMPLE_SIZE_PARAMETER_NAME, + DEFAULT_TRAINING_MAX_TOTAL_SAMPLE_SIZE_PARAMETER_VALUE)); return getOrCreate(level, null); } @@ -110,7 +118,9 @@ private ZstdDictionaryCompressor(int level) private ZstdDictionaryCompressor(int level, ZstdCompressionDictionary dictionary, Ref dictionaryRef) { - super(level, Set.of(COMPRESSION_LEVEL_OPTION_NAME)); + super(level, Set.of(COMPRESSION_LEVEL_OPTION_NAME, + TRAINING_MAX_DICTIONARY_SIZE_PARAMETER_NAME, + TRAINING_MAX_TOTAL_SAMPLE_SIZE_PARAMETER_NAME)); this.dictionary = dictionary; this.dictionaryRef = dictionaryRef; } diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java index 897f49acfc9d..075ae8b73b7e 100644 --- a/src/java/org/apache/cassandra/tools/NodeProbe.java +++ b/src/java/org/apache/cassandra/tools/NodeProbe.java @@ -2692,15 +2692,19 @@ public void setMixedMajorVersionRepairEnabled(boolean enabled) * Triggers compression dictionary training for the specified table. * Samples chunks from existing SSTables and trains a dictionary. * - * @param keyspace the keyspace name - * @param table the table name - * @param force force the dictionary training even if there are not enough samples - * @throws IOException if there's an error accessing the MBean - * @throws IllegalArgumentException if table doesn't support dictionary compression + * @param keyspace the keyspace name + * @param table the table name + * @param force force the dictionary training even if there are not enough samples + * @param parameters training parameters, if empty, training parameters will be taken from CQL's + * compression section of a given table training is conducted on. + * @throws IOException if there's an error accessing the MBean + * @throws IllegalArgumentException if table doesn't support dictionary compression */ - public void trainCompressionDictionary(String keyspace, String table, boolean force) throws IOException + public void trainCompressionDictionary(String keyspace, String table, + boolean force, + Map parameters) throws IOException { - doWithCompressionDictionaryManagerMBean(proxy -> { proxy.train(force); return null; }, keyspace, table); + doWithCompressionDictionaryManagerMBean(proxy -> {proxy.train(force, parameters); return null; }, keyspace, table); } /** diff --git a/src/java/org/apache/cassandra/tools/nodetool/CompressionDictionaryCommandGroup.java b/src/java/org/apache/cassandra/tools/nodetool/CompressionDictionaryCommandGroup.java index 02901cd37529..4849291ac266 100644 --- a/src/java/org/apache/cassandra/tools/nodetool/CompressionDictionaryCommandGroup.java +++ b/src/java/org/apache/cassandra/tools/nodetool/CompressionDictionaryCommandGroup.java @@ -19,7 +19,9 @@ import java.io.PrintStream; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.TimeUnit; import javax.management.openmbean.CompositeData; import javax.management.openmbean.TabularData; @@ -27,6 +29,7 @@ import com.google.common.util.concurrent.Uninterruptibles; import com.fasterxml.jackson.databind.exc.ValueInstantiationException; +import org.apache.cassandra.config.DataStorageSpec; import org.apache.cassandra.db.compression.CompressionDictionaryDetailsTabularData; import org.apache.cassandra.db.compression.CompressionDictionaryDetailsTabularData.CompressionDictionaryDataObject; import org.apache.cassandra.db.compression.ICompressionDictionaryTrainer.TrainingStatus; @@ -46,6 +49,10 @@ import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING; import static java.nio.file.StandardOpenOption.WRITE; import static java.util.stream.Collectors.joining; +import static org.apache.cassandra.io.compress.IDictionaryCompressor.DEFAULT_TRAINING_MAX_DICTIONARY_SIZE_PARAMETER_VALUE; +import static org.apache.cassandra.io.compress.IDictionaryCompressor.DEFAULT_TRAINING_MAX_TOTAL_SAMPLE_SIZE_PARAMETER_VALUE; +import static org.apache.cassandra.io.compress.IDictionaryCompressor.TRAINING_MAX_DICTIONARY_SIZE_PARAMETER_NAME; +import static org.apache.cassandra.io.compress.IDictionaryCompressor.TRAINING_MAX_TOTAL_SAMPLE_SIZE_PARAMETER_NAME; @Command(name = "compressiondictionary", description = "Manage compression dictionaries", @@ -68,18 +75,40 @@ public static class TrainDictionary extends AbstractCommand @Option(names = { "-f", "--force" }, description = "Force the dictionary training even if there are not enough samples") private boolean force = false; + @Option(names = {"--max-dict-size"}, description = "Maximum size of a trained compression dictionary. " + + "Larger dictionaries may provide better compression but use more memory. When not set, " + + "the value from compression configuration from CQL for a given table is used. " + + "The default value is " + DEFAULT_TRAINING_MAX_DICTIONARY_SIZE_PARAMETER_VALUE + '.') + private String trainingMaxDictionarySize; + + @Option(names = "--max-total-sample-size", description = "Maximum total size of sample data to collect for dictionary training. " + + "More sample data generally produces better dictionaries but takes longer to train. " + + "The recommended sample size is 100x the dictionary size. When not set, " + + "the value from compression configuration from CQL for a give table is used. " + + "The default value is " + DEFAULT_TRAINING_MAX_TOTAL_SAMPLE_SIZE_PARAMETER_VALUE + '.') + private String trainingMaxTotalSampleSize; + @Override public void execute(NodeProbe probe) { PrintStream out = probe.output().out; PrintStream err = probe.output().err; + validateParameters(err, trainingMaxDictionarySize, trainingMaxTotalSampleSize); + try { out.printf("Starting compression dictionary training for %s.%s...%n", keyspace, table); out.printf("Training from existing SSTables (flushing first if needed)%n"); - probe.trainCompressionDictionary(keyspace, table, force); + Map parameters = new HashMap<>(); + if (trainingMaxTotalSampleSize != null) + parameters.put(TRAINING_MAX_DICTIONARY_SIZE_PARAMETER_NAME, trainingMaxDictionarySize); + + if (trainingMaxTotalSampleSize != null) + parameters.put(TRAINING_MAX_TOTAL_SAMPLE_SIZE_PARAMETER_NAME, trainingMaxTotalSampleSize); + + probe.trainCompressionDictionary(keyspace, table, force, parameters); // Wait for training completion (10 minutes timeout for SSTable-based training) out.println("Sampling from existing SSTables and training."); @@ -138,6 +167,35 @@ private static void displayProgress(TrainingState trainingState, long startTime, out.printf("\rStatus: %s | Samples: %d | Size: %.2f MiB | Elapsed: %ds", status, sampleCount, sampleSizeMB, elapsedSeconds); } + + private static void validateParameters(PrintStream err, String trainingMaxDictionarySize, String trainingMaxTotalSampleSize) + { + if (trainingMaxDictionarySize != null) + { + try + { + new DataStorageSpec.IntKibibytesBound(trainingMaxDictionarySize).toBytes(); + } + catch (Throwable t) + { + err.println("Invalid value for --max-dict-size: " + t.getMessage()); + System.exit(1); + } + } + + if (trainingMaxTotalSampleSize != null) + { + try + { + new DataStorageSpec.IntKibibytesBound(trainingMaxTotalSampleSize).toBytes(); + } + catch (Throwable t) + { + err.println("Invalid value for --max-total-sample-size: " + t.getMessage()); + System.exit(1); + } + } + } } @Command(name = "list", diff --git a/test/resources/nodetool/help/compressiondictionary b/test/resources/nodetool/help/compressiondictionary index 5a65cae36dd3..92b9ecbe5d06 100644 --- a/test/resources/nodetool/help/compressiondictionary +++ b/test/resources/nodetool/help/compressiondictionary @@ -12,7 +12,9 @@ SYNOPSIS [(-pp | --print-port)] [(-pw | --password )] [(-pwf | --password-file )] [(-u | --username )] compressiondictionary train - [(-f | --force)] [--] + [(-f | --force)] [--max-dict-size ] + [--max-total-sample-size ] [--] +
nodetool [(-h | --host )] [(-p | --port )] [(-pp | --print-port)] [(-pw | --password )] @@ -60,6 +62,17 @@ COMMANDS With --force option, Force the dictionary training even if there are not enough samples + + With --max-dict-size option, Maximum size of a trained compression + dictionary. Larger dictionaries may provide better compression but use more + memory. When not set, the value from compression configuration from CQL for + a given table is used. The default value is 64KiB. + + With --max-total-sample-size option, Maximum total size of sample data to + collect for dictionary training. More sample data generally produces better + dictionaries but takes longer to train. The recommended sample size is 100x + the dictionary size. When not set, the value from compression configuration + from CQL for a give table is used. The default value is 10MiB. list List available dictionaries of specific keyspace and table. export diff --git a/test/resources/nodetool/help/compressiondictionary$train b/test/resources/nodetool/help/compressiondictionary$train index ce36f648f8ac..206a6d2b0be6 100644 --- a/test/resources/nodetool/help/compressiondictionary$train +++ b/test/resources/nodetool/help/compressiondictionary$train @@ -8,7 +8,9 @@ SYNOPSIS [(-pp | --print-port)] [(-pw | --password )] [(-pwf | --password-file )] [(-u | --username )] compressiondictionary train - [(-f | --force)] [--]
+ [(-f | --force)] [--max-dict-size ] + [--max-total-sample-size ] [--] +
OPTIONS -f, --force @@ -17,6 +19,20 @@ OPTIONS -h , --host Node hostname or ip address + --max-dict-size + Maximum size of a trained compression dictionary. Larger + dictionaries may provide better compression but use more memory. + When not set, the value from compression configuration from CQL for + a given table is used. The default value is 64KiB. + + --max-total-sample-size + Maximum total size of sample data to collect for dictionary + training. More sample data generally produces better dictionaries + but takes longer to train. The recommended sample size is 100x the + dictionary size. When not set, the value from compression + configuration from CQL for a give table is used. The default value + is 10MiB. + -p , --port Remote jmx agent port number diff --git a/test/unit/org/apache/cassandra/db/compression/CompressionDictionaryIntegrationTest.java b/test/unit/org/apache/cassandra/db/compression/CompressionDictionaryIntegrationTest.java index 0d5a99746455..b192b6bd2d82 100644 --- a/test/unit/org/apache/cassandra/db/compression/CompressionDictionaryIntegrationTest.java +++ b/test/unit/org/apache/cassandra/db/compression/CompressionDictionaryIntegrationTest.java @@ -19,6 +19,7 @@ package org.apache.cassandra.db.compression; import java.util.Collections; +import java.util.Map; import java.util.concurrent.TimeUnit; import org.junit.Before; @@ -26,7 +27,6 @@ import org.apache.cassandra.config.Config; import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.config.DataStorageSpec; import org.apache.cassandra.cql3.CQLTester; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.Keyspace; @@ -37,6 +37,10 @@ import org.apache.cassandra.utils.Clock; import static org.apache.cassandra.Util.spinUntilTrue; +import static org.apache.cassandra.io.compress.IDictionaryCompressor.DEFAULT_TRAINING_MAX_DICTIONARY_SIZE_PARAMETER_VALUE; +import static org.apache.cassandra.io.compress.IDictionaryCompressor.DEFAULT_TRAINING_MAX_TOTAL_SAMPLE_SIZE_PARAMETER_VALUE; +import static org.apache.cassandra.io.compress.IDictionaryCompressor.TRAINING_MAX_DICTIONARY_SIZE_PARAMETER_NAME; +import static org.apache.cassandra.io.compress.IDictionaryCompressor.TRAINING_MAX_TOTAL_SAMPLE_SIZE_PARAMETER_NAME; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatNoException; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -45,13 +49,14 @@ public class CompressionDictionaryIntegrationTest extends CQLTester { private static final String REPEATED_DATA = "The quick brown fox jumps over the lazy dog. This text repeats for better compression. "; + private final static String TRAINING_MAX_TOTAL_SAMPLE_SIZE = "128KiB"; + private final static String TRAINING_MAX_DICTIONARY_SIZE = "10KiB"; + @Before public void configureDatabaseDescriptor() { Config config = DatabaseDescriptor.getRawConfig(); config.compression_dictionary_training_sampling_rate = 1.0f; - config.compression_dictionary_training_max_total_sample_size = new DataStorageSpec.IntKibibytesBound("128KiB"); - config.compression_dictionary_training_max_dictionary_size = new DataStorageSpec.IntKibibytesBound("10KiB"); config.flush_compression = Config.FlushCompression.table; DatabaseDescriptor.setConfig(config); } @@ -59,51 +64,59 @@ public void configureDatabaseDescriptor() @Test public void testEnableDisableDictionaryCompression() { - String table = createTable("CREATE TABLE %s (id int PRIMARY KEY, data text) WITH compression = {'class': 'ZstdDictionaryCompressor'}"); + String table = createTable(getTableCql()); ColumnFamilyStore cfs = Keyspace.open(keyspace()).getColumnFamilyStore(table); CompressionDictionaryManager manager = cfs.compressionDictionaryManager(); // Insert data and flush to create SSTables for (int i = 0; i < 100; i++) { - execute("INSERT INTO %s (id, data) VALUES (?, ?)", i, REPEATED_DATA + " " + i); + execute("INSERT INTO %s (pk, data) VALUES (?, ?)", Integer.toString(i), REPEATED_DATA + " " + i); } flush(); assertThatNoException() .as("Should allow manual training") - .isThrownBy(() -> manager.train(false)); + .isThrownBy(() -> manager.train(false, + Map.of(TRAINING_MAX_DICTIONARY_SIZE_PARAMETER_NAME, TRAINING_MAX_DICTIONARY_SIZE, + TRAINING_MAX_TOTAL_SAMPLE_SIZE_PARAMETER_NAME, TRAINING_MAX_TOTAL_SAMPLE_SIZE))); // Disable dictionary compression CompressionParams nonDictParams = CompressionParams.lz4(); manager.maybeReloadFromSchema(nonDictParams); - assertThatThrownBy(() -> manager.train(false)) + assertThatThrownBy(() -> manager.train(false, + Map.of(TRAINING_MAX_DICTIONARY_SIZE_PARAMETER_NAME, TRAINING_MAX_DICTIONARY_SIZE, + TRAINING_MAX_TOTAL_SAMPLE_SIZE_PARAMETER_NAME, TRAINING_MAX_TOTAL_SAMPLE_SIZE))) .as("Should disallow manual training when using lz4") .isInstanceOf(UnsupportedOperationException.class) .hasMessageContaining("does not support dictionary compression"); // Re-enable dictionary compression CompressionParams dictParams = CompressionParams.zstd(CompressionParams.DEFAULT_CHUNK_LENGTH, true, - Collections.singletonMap("compression_level", "3")); + Map.of("compression_level", "3", + TRAINING_MAX_TOTAL_SAMPLE_SIZE_PARAMETER_NAME, TRAINING_MAX_TOTAL_SAMPLE_SIZE, + TRAINING_MAX_DICTIONARY_SIZE_PARAMETER_NAME, TRAINING_MAX_DICTIONARY_SIZE)); manager.maybeReloadFromSchema(dictParams); // Insert more data for the re-enabled compression for (int i = 100; i < 200; i++) { - execute("INSERT INTO %s (id, data) VALUES (?, ?)", i, REPEATED_DATA + " " + i); + execute("INSERT INTO %s (pk, data) VALUES (?, ?)", Integer.toString(i), REPEATED_DATA + " " + i); } flush(); assertThatNoException() .as("Should allow manual training after switching back to dictionary compression") - .isThrownBy(() -> manager.train(false)); + .isThrownBy(() -> manager.train(false, + Map.of(TRAINING_MAX_DICTIONARY_SIZE_PARAMETER_NAME, TRAINING_MAX_DICTIONARY_SIZE, + TRAINING_MAX_TOTAL_SAMPLE_SIZE_PARAMETER_NAME, TRAINING_MAX_TOTAL_SAMPLE_SIZE))); } @Test public void testCompressionParameterChanges() { - String table = createTable("CREATE TABLE %s (id int PRIMARY KEY, data text) WITH compression = {'class': 'ZstdDictionaryCompressor'}"); + String table = createTable(getTableCql()); ColumnFamilyStore cfs = Keyspace.open(keyspace()).getColumnFamilyStore(table); CompressionDictionaryManager manager = cfs.compressionDictionaryManager(); ICompressionDictionaryTrainer trainer = manager.trainer(); @@ -124,7 +137,7 @@ public void testCompressionParameterChanges() @Test public void testResourceCleanupOnClose() throws Exception { - createTable("CREATE TABLE %s (id int PRIMARY KEY, data text) WITH compression = {'class': 'ZstdDictionaryCompressor'}"); + createTable(getTableCql()); ColumnFamilyStore cfs = getCurrentColumnFamilyStore(); CompressionDictionaryManager manager = cfs.compressionDictionaryManager(); @@ -162,8 +175,7 @@ private static ZstdCompressionDictionary createTestDictionary() public void testSSTableBasedTraining() { DatabaseDescriptor.setFlushCompression(Config.FlushCompression.table); - String table = createTable("CREATE TABLE %s (pk text PRIMARY KEY, data text) " + - "WITH compression = {'class': 'ZstdDictionaryCompressor', 'chunk_length_in_kb' : 4}"); + String table = createTable(getTableCqlWithChunkLength()); ColumnFamilyStore cfs = Keyspace.open(keyspace()).getColumnFamilyStore(table); CompressionDictionaryManager manager = cfs.compressionDictionaryManager(); @@ -184,7 +196,8 @@ public void testSSTableBasedTraining() .hasSizeGreaterThan(0); // Train from existing SSTables - manager.train(true); + manager.train(true, Map.of(TRAINING_MAX_DICTIONARY_SIZE_PARAMETER_NAME, TRAINING_MAX_DICTIONARY_SIZE, + TRAINING_MAX_TOTAL_SAMPLE_SIZE_PARAMETER_NAME, TRAINING_MAX_TOTAL_SAMPLE_SIZE)); // Training should complete quickly since we're reading from existing SSTables spinUntilTrue(() -> TrainingState.fromCompositeData(manager.getTrainingState()).status == TrainingStatus.COMPLETED, 10); @@ -210,15 +223,36 @@ public void testSSTableBasedTraining() @Test public void testSSTableBasedTrainingWithoutSSTables() { - String table = createTable("CREATE TABLE %s (pk text PRIMARY KEY, data text) " + - "WITH compression = {'class': 'ZstdDictionaryCompressor'}"); + String table = createTable(getTableCql()); ColumnFamilyStore cfs = Keyspace.open(keyspace()).getColumnFamilyStore(table); CompressionDictionaryManager manager = cfs.compressionDictionaryManager(); // Try to train without any SSTables - assertThatThrownBy(() -> manager.train(false)) + assertThatThrownBy(() -> manager.train(false, Map.of(TRAINING_MAX_DICTIONARY_SIZE_PARAMETER_NAME, TRAINING_MAX_DICTIONARY_SIZE, + TRAINING_MAX_TOTAL_SAMPLE_SIZE_PARAMETER_NAME, TRAINING_MAX_TOTAL_SAMPLE_SIZE))) .as("Should fail when no SSTables are available") .isInstanceOf(IllegalStateException.class) .hasMessageContaining("No SSTables available for training"); } + + private String getTableCqlWithChunkLength() + { + return "CREATE TABLE %s (pk text PRIMARY KEY, data text) " + + "WITH compression = {" + + "'class': 'ZstdDictionaryCompressor'," + + "'chunk_length_in_kb' : 4, " + + '\'' + TRAINING_MAX_DICTIONARY_SIZE_PARAMETER_NAME + "': '" + DEFAULT_TRAINING_MAX_DICTIONARY_SIZE_PARAMETER_VALUE + "'," + + '\'' + TRAINING_MAX_TOTAL_SAMPLE_SIZE_PARAMETER_NAME + "': '" + DEFAULT_TRAINING_MAX_TOTAL_SAMPLE_SIZE_PARAMETER_VALUE + '\'' + + '}'; + } + + private String getTableCql() + { + return "CREATE TABLE %s (pk text PRIMARY KEY, data text) " + + "WITH compression = {" + + "'class': 'ZstdDictionaryCompressor'," + + '\'' + TRAINING_MAX_DICTIONARY_SIZE_PARAMETER_NAME + "': '" + DEFAULT_TRAINING_MAX_DICTIONARY_SIZE_PARAMETER_VALUE + "'," + + '\'' + TRAINING_MAX_TOTAL_SAMPLE_SIZE_PARAMETER_NAME + "': '" + DEFAULT_TRAINING_MAX_TOTAL_SAMPLE_SIZE_PARAMETER_VALUE + '\'' + + '}'; + } } diff --git a/test/unit/org/apache/cassandra/db/compression/CompressionDictionarySchedulerTest.java b/test/unit/org/apache/cassandra/db/compression/CompressionDictionarySchedulerTest.java index 98a0110e6f7a..ff4b88775fc2 100644 --- a/test/unit/org/apache/cassandra/db/compression/CompressionDictionarySchedulerTest.java +++ b/test/unit/org/apache/cassandra/db/compression/CompressionDictionarySchedulerTest.java @@ -25,13 +25,15 @@ import org.junit.Before; import org.junit.Test; -import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.config.DataStorageSpec; import org.apache.cassandra.cql3.CQLTester; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.io.sstable.format.SSTableReader; import static org.apache.cassandra.Util.spinUntilTrue; +import static org.apache.cassandra.io.compress.IDictionaryCompressor.DEFAULT_TRAINING_MAX_DICTIONARY_SIZE_PARAMETER_VALUE; +import static org.apache.cassandra.io.compress.IDictionaryCompressor.DEFAULT_TRAINING_MAX_TOTAL_SAMPLE_SIZE_PARAMETER_VALUE; import static org.assertj.core.api.Assertions.assertThat; public class CompressionDictionarySchedulerTest extends CQLTester @@ -118,8 +120,8 @@ private void createSSTables() private static CompressionDictionaryTrainingConfig createSampleAllTrainingConfig(ColumnFamilyStore cfs) { return CompressionDictionaryTrainingConfig .builder() - .maxDictionarySize(DatabaseDescriptor.getCompressionDictionaryTrainingMaxDictionarySize()) - .maxTotalSampleSize(DatabaseDescriptor.getCompressionDictionaryTrainingMaxTotalSampleSize()) + .maxDictionarySize(new DataStorageSpec.IntKibibytesBound(DEFAULT_TRAINING_MAX_DICTIONARY_SIZE_PARAMETER_VALUE).toBytes()) + .maxTotalSampleSize(new DataStorageSpec.IntKibibytesBound(DEFAULT_TRAINING_MAX_TOTAL_SAMPLE_SIZE_PARAMETER_VALUE).toBytes()) .samplingRate(1.0f) .chunkSize(cfs.metadata().params.compression.chunkLength()) .build(); From 41dd07949534acbf0253bb58f9b71dd6a0ec1a73 Mon Sep 17 00:00:00 2001 From: Stefan Miklosovic Date: Wed, 17 Dec 2025 12:12:07 +0100 Subject: [PATCH 2/6] make trainer to be configured per each start --- .../db/compression/CompressionDictionary.java | 5 +- .../CompressionDictionaryManager.java | 11 ++- .../ICompressionDictionaryTrainer.java | 13 ++-- .../db/compression/ZstdDictionaryTrainer.java | 29 ++++--- .../CompressionDictionarySchedulerTest.java | 2 +- .../ZstdDictionaryTrainerTest.java | 76 ++++++++++--------- .../TrainCompressionDictionaryTest.java | 51 +++++++++++++ .../utils/CompressionDictionaryHelper.java | 4 +- 8 files changed, 126 insertions(+), 65 deletions(-) diff --git a/src/java/org/apache/cassandra/db/compression/CompressionDictionary.java b/src/java/org/apache/cassandra/db/compression/CompressionDictionary.java index a6c78da4978a..5ca0bf7112a7 100644 --- a/src/java/org/apache/cassandra/db/compression/CompressionDictionary.java +++ b/src/java/org/apache/cassandra/db/compression/CompressionDictionary.java @@ -344,13 +344,12 @@ public ICompressor createCompressor(CompressionDictionary dictionary) @Override public ICompressionDictionaryTrainer createTrainer(String keyspaceName, String tableName, - CompressionDictionaryTrainingConfig config, ICompressor compressor) { Preconditions.checkArgument(compressor instanceof ZstdDictionaryCompressor, "Expected compressor to be ZstdDictionaryCompressor; actual: %s", compressor.getClass().getSimpleName()); - return new ZstdDictionaryTrainer(keyspaceName, tableName, config, ((ZstdDictionaryCompressor) compressor).compressionLevel()); + return new ZstdDictionaryTrainer(keyspaceName, tableName, ((ZstdDictionaryCompressor) compressor).compressionLevel()); } }; @@ -377,13 +376,11 @@ public ICompressionDictionaryTrainer createTrainer(String keyspaceName, * * @param keyspaceName the keyspace name * @param tableName the table name - * @param config the training configuration * @param compressor the compressor to use for training * @return a dictionary trainer instance */ public abstract ICompressionDictionaryTrainer createTrainer(String keyspaceName, String tableName, - CompressionDictionaryTrainingConfig config, ICompressor compressor); } diff --git a/src/java/org/apache/cassandra/db/compression/CompressionDictionaryManager.java b/src/java/org/apache/cassandra/db/compression/CompressionDictionaryManager.java index da36d1817daf..f2aa6575e230 100644 --- a/src/java/org/apache/cassandra/db/compression/CompressionDictionaryManager.java +++ b/src/java/org/apache/cassandra/db/compression/CompressionDictionaryManager.java @@ -81,13 +81,12 @@ public CompressionDictionaryManager(ColumnFamilyStore columnFamilyStore, boolean { // Initialize components this.trainer = ICompressionDictionaryTrainer.create(keyspaceName, tableName, - columnFamilyStore.metadata().params.compression, - createTrainingConfig()); + columnFamilyStore.metadata().params.compression); trainer.setDictionaryTrainedListener(this::handleNewDictionary); scheduler.scheduleRefreshTask(); - trainer.start(false); + trainer.start(false, createTrainingConfig()); } if (registerBookkeeping && isEnabled) @@ -138,7 +137,7 @@ public synchronized void maybeReloadFromSchema(CompressionParams newParams) } } - trainer = ICompressionDictionaryTrainer.create(keyspaceName, tableName, newParams, createTrainingConfig()); + trainer = ICompressionDictionaryTrainer.create(keyspaceName, tableName, newParams); trainer.setDictionaryTrainedListener(this::handleNewDictionary); } @@ -147,7 +146,7 @@ public synchronized void maybeReloadFromSchema(CompressionParams newParams) // Start trainer if it exists if (trainer != null) { - trainer.start(false); + trainer.start(false, createTrainingConfig()); } return; } @@ -251,7 +250,7 @@ public synchronized void train(boolean force, Map parameters) logger.info("Starting SSTable-based training for {}.{} with {} SSTables", keyspaceName, tableName, sstables.size()); - trainer.start(true); + trainer.start(true, trainingConfig); scheduler.scheduleSSTableBasedTraining(trainer, sstables, trainingConfig, force); } diff --git a/src/java/org/apache/cassandra/db/compression/ICompressionDictionaryTrainer.java b/src/java/org/apache/cassandra/db/compression/ICompressionDictionaryTrainer.java index 64810711ff73..4e048c6f0cd1 100644 --- a/src/java/org/apache/cassandra/db/compression/ICompressionDictionaryTrainer.java +++ b/src/java/org/apache/cassandra/db/compression/ICompressionDictionaryTrainer.java @@ -42,11 +42,12 @@ public interface ICompressionDictionaryTrainer extends AutoCloseable * Starts the trainer for collecting samples. * * @param manualTraining true if this is manual training, false for automatic + * @param trainingConfig training configuration to use * @return true if the trainer is started; otherwise false. The trainer is started * in any of those conditions: 1. trainer closed; 2. not requested for * either manual or auto training; 3. failed to start */ - boolean start(boolean manualTraining); + boolean start(boolean manualTraining, CompressionDictionaryTrainingConfig trainingConfig); /** * @return true if the trainer is ready to take a new sample; otherwise, false @@ -87,8 +88,10 @@ default Future trainDictionaryAsync(boolean force) /** * Clears all collected samples and resets trainer state. + * + * @param trainingConfig configuration to use upon resetting */ - void reset(); + void reset(CompressionDictionaryTrainingConfig trainingConfig); /** * Gets the current training state including status, progress, and failure details. @@ -133,14 +136,12 @@ default Future trainDictionaryAsync(boolean force) * @param keyspaceName the keyspace name for logging * @param tableName the table name for logging * @param params the compression parameters - * @param config the training configuration * @return a dictionary trainer for the specified compression algorithm * @throws IllegalArgumentException if no dictionary trainer is available for the compression algorithm */ static ICompressionDictionaryTrainer create(String keyspaceName, String tableName, - CompressionParams params, - CompressionDictionaryTrainingConfig config) + CompressionParams params) { ICompressor compressor = params.getSstableCompressor(); if (!(compressor instanceof IDictionaryCompressor)) @@ -149,7 +150,7 @@ static ICompressionDictionaryTrainer create(String keyspaceName, } IDictionaryCompressor dictionaryCompressor = (IDictionaryCompressor) compressor; - return dictionaryCompressor.acceptableDictionaryKind().createTrainer(keyspaceName, tableName, config, compressor); + return dictionaryCompressor.acceptableDictionaryKind().createTrainer(keyspaceName, tableName, compressor); } enum TrainingStatus diff --git a/src/java/org/apache/cassandra/db/compression/ZstdDictionaryTrainer.java b/src/java/org/apache/cassandra/db/compression/ZstdDictionaryTrainer.java index 81e99c22bc57..6ffb1d4e28b6 100644 --- a/src/java/org/apache/cassandra/db/compression/ZstdDictionaryTrainer.java +++ b/src/java/org/apache/cassandra/db/compression/ZstdDictionaryTrainer.java @@ -50,7 +50,7 @@ public class ZstdDictionaryTrainer implements ICompressionDictionaryTrainer private final String keyspaceName; private final String tableName; - private final CompressionDictionaryTrainingConfig config; + private CompressionDictionaryTrainingConfig config; private final AtomicLong totalSampleSize; private final AtomicLong sampleCount; private final int compressionLevel; // optimal if using the same level for training as when compressing. @@ -68,17 +68,23 @@ public class ZstdDictionaryTrainer implements ICompressionDictionaryTrainer private volatile TrainingStatus currentTrainingStatus; private volatile String failureMessage; - public ZstdDictionaryTrainer(String keyspaceName, String tableName, - CompressionDictionaryTrainingConfig config, - int compressionLevel) + public ZstdDictionaryTrainer(String keyspaceName, String tableName, int compressionLevel) + { + this(keyspaceName, + tableName, + compressionLevel, + Math.round(1 / DatabaseDescriptor.getCompressionDictionaryTrainingSamplingRate())); + } + + @VisibleForTesting + public ZstdDictionaryTrainer(String keyspaceName, String tableName, int compressionLevel, int samplingRate) { this.keyspaceName = keyspaceName; this.tableName = tableName; - this.config = config; this.totalSampleSize = new AtomicLong(0); this.sampleCount = new AtomicLong(0); this.compressionLevel = compressionLevel; - this.samplingRate = config.samplingRate; + this.samplingRate = samplingRate; this.currentTrainingStatus = TrainingStatus.NOT_STARTED; } @@ -262,6 +268,7 @@ public boolean isReady() return currentTrainingStatus != TrainingStatus.TRAINING && !closed && zstdTrainer != null + && config != null && totalSampleSize.get() >= config.acceptableTotalSampleSize && sampleCount.get() >= MIN_SAMPLES_REQUIRED; } @@ -290,7 +297,7 @@ public TrainingState getTrainingState() } @Override - public boolean start(boolean manualTraining) + public boolean start(boolean manualTraining, CompressionDictionaryTrainingConfig trainingConfig) { if (closed || !(manualTraining || shouldAutoStartTraining())) return false; @@ -298,7 +305,8 @@ public boolean start(boolean manualTraining) try { // reset on starting; a new zstdTrainer instance is created during reset - reset(); + reset(trainingConfig); + config = trainingConfig; logger.info("Started dictionary training for {}.{}", keyspaceName, tableName); currentTrainingStatus = TrainingStatus.SAMPLING; failureMessage = null; // Clear any previous failure message @@ -322,7 +330,7 @@ private boolean shouldAutoStartTraining() } @Override - public void reset() + public void reset(CompressionDictionaryTrainingConfig trainingConfig) { if (closed) { @@ -334,7 +342,8 @@ public void reset() { totalSampleSize.set(0); sampleCount.set(0); - zstdTrainer = new ZstdDictTrainer(config.maxTotalSampleSize, config.maxDictionarySize, compressionLevel); + zstdTrainer = new ZstdDictTrainer(trainingConfig.maxTotalSampleSize, trainingConfig.maxDictionarySize, compressionLevel); + config = null; } } diff --git a/test/unit/org/apache/cassandra/db/compression/CompressionDictionarySchedulerTest.java b/test/unit/org/apache/cassandra/db/compression/CompressionDictionarySchedulerTest.java index ff4b88775fc2..42f91a82e0eb 100644 --- a/test/unit/org/apache/cassandra/db/compression/CompressionDictionarySchedulerTest.java +++ b/test/unit/org/apache/cassandra/db/compression/CompressionDictionarySchedulerTest.java @@ -92,7 +92,7 @@ public void testScheduleSSTableBasedTrainingWithSSTables() assertThat(sstables).isNotEmpty(); CompressionDictionaryTrainingConfig config = createSampleAllTrainingConfig(cfs); - manager.trainer().start(true); + manager.trainer().start(true, config); assertThat(manager.getCurrent()).as("There should be no dictionary at this step").isNull(); scheduler.scheduleSSTableBasedTraining(manager.trainer(), sstables, config, true); diff --git a/test/unit/org/apache/cassandra/db/compression/ZstdDictionaryTrainerTest.java b/test/unit/org/apache/cassandra/db/compression/ZstdDictionaryTrainerTest.java index d3ddf257ddf4..bcb3884a9208 100644 --- a/test/unit/org/apache/cassandra/db/compression/ZstdDictionaryTrainerTest.java +++ b/test/unit/org/apache/cassandra/db/compression/ZstdDictionaryTrainerTest.java @@ -71,7 +71,7 @@ public void setUp() callbackResult = new AtomicReference<>(); mockCallback = callbackResult::set; - trainer = new ZstdDictionaryTrainer(TEST_KEYSPACE, TEST_TABLE, testConfig, COMPRESSION_LEVEL); + trainer = new ZstdDictionaryTrainer(TEST_KEYSPACE, TEST_TABLE, COMPRESSION_LEVEL, testConfig.samplingRate); trainer.setDictionaryTrainedListener(mockCallback); } @@ -110,7 +110,7 @@ public void testTrainerInitialState() public void testTrainerStart() { // Auto start depends on configuration - test both scenarios - boolean started = trainer.start(false); + boolean started = trainer.start(false, testConfig); if (started) { assertThat(trainer.getTrainingState().getStatus()) @@ -128,7 +128,7 @@ public void testTrainerStart() @Test public void testTrainerStartManual() { - assertThat(trainer.start(true)) + assertThat(trainer.start(true, testConfig)) .as("Manual training should start successfully") .isTrue(); assertThat(trainer.getTrainingState().getStatus()) @@ -142,17 +142,17 @@ public void testTrainerStartManual() @Test public void testTrainerStartMultipleTimes() { - assertThat(trainer.start(true)) + assertThat(trainer.start(true, testConfig)) .as("First start (manual training) should succeed") .isTrue(); Object firstTrainer = trainer.trainer(); assertThat(firstTrainer).isNotNull(); - assertThat(trainer.start(true)) + assertThat(trainer.start(true, testConfig)) .as("Second start (manual training) should suceed and reset") .isTrue(); Object secondTrainer = trainer.trainer(); assertThat(secondTrainer).isNotNull().isNotSameAs(firstTrainer); - assertThat(trainer.start(false)) + assertThat(trainer.start(false, testConfig)) .as("Third start (not manual training) should fail") .isFalse(); } @@ -160,7 +160,7 @@ public void testTrainerStartMultipleTimes() @Test public void testTrainerCloseIdempotent() { - trainer.start(true); + trainer.start(true, testConfig); trainer.close(); trainer.close(); // Should not throw trainer.close(); // Should not throw @@ -173,14 +173,14 @@ public void testTrainerCloseIdempotent() @Test public void testTrainerReset() { - trainer.start(true); + trainer.start(true, testConfig); addSampleData(1000); // Add some samples assertThat(trainer.getTrainingState().getSampleCount()) .as("Should have samples before reset") .isGreaterThan(0); - trainer.reset(); + trainer.reset(testConfig); assertThat(trainer.getTrainingState().getStatus()) .as("Status should be NOT_STARTED after reset") .isEqualTo(TrainingStatus.NOT_STARTED); @@ -195,10 +195,10 @@ public void testTrainerReset() @Test public void testStartAfterClose() { - trainer.start(true); + trainer.start(true, testConfig); trainer.close(); - assertThat(trainer.start(true)) + assertThat(trainer.start(true, testConfig)) .as("Should not start after close") .isFalse(); assertThat(trainer.getTrainingState().getStatus()) @@ -209,7 +209,7 @@ public void testStartAfterClose() @Test public void testShouldSample() { - trainer.start(true); + trainer.start(true, testConfig); // With sampling rate 1 (100%), should always return true for (int i = 0; i < 10; i++) { @@ -230,8 +230,7 @@ public void testShouldSampleWithLowRate() .samplingRate(0.001f) // 0.1% sampling .build(); - try (ZstdDictionaryTrainer lowSamplingTrainer = new ZstdDictionaryTrainer(TEST_KEYSPACE, TEST_TABLE, - lowSamplingConfig, COMPRESSION_LEVEL)) + try (ZstdDictionaryTrainer lowSamplingTrainer = new ZstdDictionaryTrainer(TEST_KEYSPACE, TEST_TABLE, COMPRESSION_LEVEL, lowSamplingConfig.samplingRate)) { lowSamplingTrainer.setDictionaryTrainedListener(mockCallback); // With very low sampling rate, should mostly return false @@ -255,7 +254,7 @@ public void testShouldSampleWithLowRate() @Test public void testAddSample() { - trainer.start(true); + trainer.start(true, testConfig); assertThat(trainer.getTrainingState().getSampleCount()) .as("Initial sample count should be 0") @@ -293,7 +292,7 @@ public void testAddSampleBeforeStart() @Test public void testAddSampleAfterClose() { - trainer.start(true); + trainer.start(true, testConfig); trainer.close(); ByteBuffer sample = ByteBuffer.wrap(SAMPLE_DATA.getBytes()); @@ -310,7 +309,7 @@ public void testAddSampleAfterClose() @Test public void testAddNullSample() { - trainer.start(true); + trainer.start(true, testConfig); trainer.addSample(null); // Should not throw assertThat(trainer.getTrainingState().getStatus()) @@ -324,7 +323,7 @@ public void testAddNullSample() @Test public void testAddEmptySample() { - trainer.start(true); + trainer.start(true, testConfig); ByteBuffer empty = ByteBuffer.allocate(0); trainer.addSample(empty); // Should not throw @@ -339,7 +338,7 @@ public void testAddEmptySample() @Test public void testIsReady() { - trainer.start(true); + trainer.start(true, testConfig); assertThat(trainer.isReady()) .as("Should not be ready initially") .isFalse(); @@ -364,7 +363,7 @@ public void testIsReady() @Test public void testTrainDictionaryWithInsufficientSampleCount() { - trainer.start(true); + trainer.start(true, testConfig); // Add sufficient data size but only 5 samples (less than minimum 11) for (int i = 0; i < 5; i++) @@ -397,7 +396,7 @@ public void testTrainDictionaryWithInsufficientSampleCount() @Test public void testTrainDictionaryWithSufficientSampleCount() { - trainer.start(true); + trainer.start(true, testConfig); // Add 15 samples with sufficient total size for (int i = 0; i < 15; i++) @@ -418,7 +417,7 @@ public void testTrainDictionaryWithSufficientSampleCount() @Test public void testTrainDictionaryAsync() throws Exception { - Future future = startTraining(true, false, testConfig.acceptableTotalSampleSize); + Future future = startTraining(true, false, testConfig); CompressionDictionary dictionary = future.get(5, TimeUnit.SECONDS); assertThat(dictionary).as("Dictionary should not be null").isNotNull(); @@ -433,7 +432,7 @@ public void testTrainDictionaryAsync() throws Exception public void testTrainDictionaryAsyncForce() throws Exception { // Don't add enough samples - Future future = startTraining(true, true, 512); + Future future = startTraining(true, true, testConfig, 512); CompressionDictionary dictionary = future.get(1, TimeUnit.SECONDS); assertThat(dictionary) .as("Forced async training should produce dictionary") @@ -444,7 +443,7 @@ public void testTrainDictionaryAsyncForce() throws Exception public void testTrainDictionaryAsyncForceFailsWithNoData() throws Exception { AtomicReference dictRef = new AtomicReference<>(); - Future result = startTraining(true, true, 0) + Future result = startTraining(true, true, testConfig, 0) .addCallback((dict, t) -> dictRef.set(dict)); assertThat(result.isDone() && result.cause() != null) @@ -461,7 +460,7 @@ public void testTrainDictionaryAsyncForceFailsWithNoData() throws Exception @Test public void testDictionaryTrainedListener() { - trainer.start(true); + trainer.start(true, testConfig); addSampleData(testConfig.acceptableTotalSampleSize); // Train dictionary synchronously - callback should be called @@ -528,7 +527,7 @@ public void testIsCompatibleWith() @Test public void testUpdateSamplingRate() { - trainer.start(true); + trainer.start(true, testConfig); // Test updating to different valid sampling rates trainer.updateSamplingRate(10); @@ -565,7 +564,7 @@ public void testUpdateSamplingRate() @Test public void testUpdateSamplingRateValidation() { - trainer.start(true); + trainer.start(true, testConfig); // Test invalid sampling rates assertThatThrownBy(() -> trainer.updateSamplingRate(0)) @@ -587,7 +586,7 @@ public void testUpdateSamplingRateBeforeStart() // Should be able to update sampling rate even before start trainer.updateSamplingRate(5); - trainer.start(true); + trainer.start(true, testConfig); // Verify the updated rate is used after start int sampleCount = 0; @@ -621,7 +620,7 @@ public void testTrainDictionaryNotInitialized() @Test public void testTrainDictionaryClosed() { - trainer.start(true); + trainer.start(true, testConfig); addSampleData(testConfig.acceptableTotalSampleSize); trainer.close(); @@ -635,7 +634,7 @@ public void testTrainDictionaryClosed() @Test public void testTrainDictionaryInsufficientSampleSize() { - trainer.start(true); + trainer.start(true, testConfig); // Add enough samples (15) but with insufficient total size for (int i = 0; i < 15; i++) @@ -666,7 +665,7 @@ public void testTrainDictionaryInsufficientSampleSize() @Test public void testTrainDictionaryInsufficientBothSampleCountAndSize() { - trainer.start(true); + trainer.start(true, testConfig); // Add only 3 samples with small size for (int i = 0; i < 3; i++) @@ -691,9 +690,9 @@ public void testTrainDictionaryInsufficientBothSampleCountAndSize() .hasMessageContaining("Use --force to train anyway"); } - private Future startTraining(boolean manualTraining, boolean forceTrain, int sampleSize) throws Exception + private Future startTraining(boolean manualTraining, boolean forceTrain, CompressionDictionaryTrainingConfig config, int sampleSize) throws Exception { - trainer.start(manualTraining); + trainer.start(manualTraining, config); if (sampleSize > 0) { addSampleData(sampleSize); @@ -708,13 +707,18 @@ private Future startTraining(boolean manualTraining, bool CountDownLatch latch = new CountDownLatch(1); Future future = trainer.trainDictionaryAsync(forceTrain) - .addCallback((dict, throwable) -> latch.countDown()); + .addCallback((dict, throwable) -> latch.countDown()); assertThat(latch.await(10, TimeUnit.SECONDS)) .as("Training should complete within timeout") .isTrue(); return future; } + private Future startTraining(boolean manualTraining, boolean forceTrain, CompressionDictionaryTrainingConfig config) throws Exception + { + return startTraining(manualTraining, forceTrain, config, config.acceptableTotalSampleSize); + } + private void addSampleData(int totalSize) { byte[] sampleBytes = SAMPLE_DATA.getBytes(); @@ -739,7 +743,7 @@ public void testStatisticsMethods() .isEqualTo(0); // Start training - trainer.start(true); + trainer.start(true, testConfig); // Add some samples byte[] sampleBytes = SAMPLE_DATA.getBytes(); @@ -759,7 +763,7 @@ public void testStatisticsMethods() .as("Total sample size should match number of samples times sample size") .isEqualTo((long) numSamples * sampleSize); - trainer.reset(); + trainer.reset(testConfig); assertThat(trainer.getTrainingState().getSampleCount()) .as("Sample count should be 0 after reset") diff --git a/test/unit/org/apache/cassandra/tools/nodetool/TrainCompressionDictionaryTest.java b/test/unit/org/apache/cassandra/tools/nodetool/TrainCompressionDictionaryTest.java index 0d50ab08e214..1ed1fcffa641 100644 --- a/test/unit/org/apache/cassandra/tools/nodetool/TrainCompressionDictionaryTest.java +++ b/test/unit/org/apache/cassandra/tools/nodetool/TrainCompressionDictionaryTest.java @@ -22,6 +22,7 @@ import org.junit.Test; import org.apache.cassandra.cql3.CQLTester; +import org.apache.cassandra.io.compress.IDictionaryCompressor; import org.apache.cassandra.tools.ToolRunner; import static org.apache.cassandra.tools.ToolRunner.invokeNodetool; @@ -55,6 +56,56 @@ public void testTrainCommandSuccess() .contains(table); } + @Test + public void testTrainingParameterOverride() + { + // Create a table with dictionary compression enabled + String table = createTable("CREATE TABLE %s (id int PRIMARY KEY, data text) WITH compression = {'class': 'ZstdDictionaryCompressor'}"); + + disableCompaction(keyspace(), table); + + createSSTables(true); + + // Test training command without --force since we have limited test data will fail + ToolRunner.ToolResult result = invokeNodetool("compressiondictionary", "train", keyspace(), table); + result.asserts().failure(); + assertThat(result.getStderr()) + .as("Should indicate training completed not completed") + .contains("Trainer is not ready: insufficient sample size") + .contains("/8 MiB") // 10MiB / 10 * 8 + .contains(keyspace()) + .contains(table); + + ToolRunner.ToolResult resultWithOverrides = invokeNodetool("compressiondictionary", + "train", + "--max-total-sample-size", "5MiB", + keyspace(), table); + + assertThat(resultWithOverrides.getStderr()) + .as("Should indicate training completed not completed") + .contains("Trainer is not ready: insufficient sample size") + .contains("/4 MiB") // 5MiB / 10 * 8 + .contains(keyspace()) + .contains(table); + + execute(String.format("ALTER TABLE %s.%s WITH " + + "compression = {'class': 'ZstdDictionaryCompressor', '%s': '6MiB'}", + keyspace(), + table, + IDictionaryCompressor.TRAINING_MAX_TOTAL_SAMPLE_SIZE_PARAMETER_NAME)); + + // we are not overriding, but we have changed training_max_total_sample_size to 6MiB via CQL, so it sticks + + ToolRunner.ToolResult resultWithoutOverrides = invokeNodetool("compressiondictionary", "train", keyspace(), table); + + assertThat(resultWithoutOverrides.getStderr()) + .as("Should indicate training completed not completed") + .contains("Trainer is not ready: insufficient sample size") + .contains("/4.8 MiB") // 6MiB / 10 * 8 + .contains(keyspace()) + .contains(table); + } + @Test public void testTrainCommandWithDataButNoSSTables() { diff --git a/test/unit/org/apache/cassandra/utils/CompressionDictionaryHelper.java b/test/unit/org/apache/cassandra/utils/CompressionDictionaryHelper.java index d20c02f847e5..031d79f3b684 100644 --- a/test/unit/org/apache/cassandra/utils/CompressionDictionaryHelper.java +++ b/test/unit/org/apache/cassandra/utils/CompressionDictionaryHelper.java @@ -56,9 +56,9 @@ public CompressionDictionary trainDictionary(String keyspace, String table) .maxTotalSampleSize(1024 * 1024) // 1MB total .build(); - try (ZstdDictionaryTrainer trainer = new ZstdDictionaryTrainer(keyspace, table, config, 3)) + try (ZstdDictionaryTrainer trainer = new ZstdDictionaryTrainer(keyspace, table, 3, 100)) { - trainer.start(true); + trainer.start(true, config); for (int i = 0; i < 25000; i++) { trainer.addSample(UTF8Type.instance.fromString(CompressionDictionaryHelper.INSTANCE.getRandomSample())); From 6345c3617c7c6e072cd42fe96fa816a3992e4d25 Mon Sep 17 00:00:00 2001 From: Stefan Miklosovic Date: Sat, 20 Dec 2025 23:31:19 +0100 Subject: [PATCH 3/6] fixes --- conf/cassandra.yaml | 4 ++-- conf/cassandra_latest.yaml | 4 ++-- .../pages/managing/operating/compression.adoc | 10 +++++----- .../db/compression/CompressionDictionaryManager.java | 6 +++--- .../db/compression/ZstdDictionaryTrainer.java | 2 +- .../tools/nodetool/TrainCompressionDictionaryTest.java | 6 +++--- 6 files changed, 16 insertions(+), 16 deletions(-) diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml index 7f8812973234..b207ff289b9b 100644 --- a/conf/cassandra.yaml +++ b/conf/cassandra.yaml @@ -2928,7 +2928,7 @@ compression_dictionary_cache_expire: 24h # Manual training via nodetool is always available regardless of this setting. compression_dictionary_training_auto_train_enabled: false -# Sampling rate for automatic dictionary training (1-10000). -# Value of 100 means 1% of writes are sampled. Lower values reduce overhead but may +# Sampling rate for automatic dictionary training (0.01-1). +# Value of 0.01 means 1% of writes are sampled. Lower values reduce overhead but may # result in less representative sample data for dictionary training. compression_dictionary_training_sampling_rate: 0.01 diff --git a/conf/cassandra_latest.yaml b/conf/cassandra_latest.yaml index 8adefe51a090..2e5c8fee6bba 100644 --- a/conf/cassandra_latest.yaml +++ b/conf/cassandra_latest.yaml @@ -2667,7 +2667,7 @@ compression_dictionary_cache_expire: 24h # Manual training via nodetool is always available regardless of this setting. compression_dictionary_training_auto_train_enabled: false -# Sampling rate for automatic dictionary training (1-10000). -# Value of 100 means 1% of writes are sampled. Lower values reduce overhead but may +# Sampling rate for automatic dictionary training (0.01-1). +# Value of 0.01 means 1% of writes are sampled. Lower values reduce overhead but may # result in less representative sample data for dictionary training. compression_dictionary_training_sampling_rate: 0.01 diff --git a/doc/modules/cassandra/pages/managing/operating/compression.adoc b/doc/modules/cassandra/pages/managing/operating/compression.adoc index 2e232b63744f..2fb43b1cdb93 100644 --- a/doc/modules/cassandra/pages/managing/operating/compression.adoc +++ b/doc/modules/cassandra/pages/managing/operating/compression.adoc @@ -148,12 +148,12 @@ Enable automatic training in `cassandra.yaml`: [source,yaml] ---- compression_dictionary_training_auto_train_enabled: true -compression_dictionary_training_sampling_rate: 100 # 1% of writes +compression_dictionary_training_sampling_rate: 0.01 # 1% of writes ---- When enabled, Cassandra automatically samples write operations and trains dictionaries in the background based on the configured sampling -rate (range: 1-10000, where 100 = 1% of writes). +rate (range: 0.01-1, where 0.01 = 1% of writes). === Dictionary Storage and Distribution @@ -301,8 +301,8 @@ next access. * `compression_dictionary_training_auto_train_enabled` (default: `false`): Enable automatic background dictionary training. When enabled, Cassandra samples writes and trains dictionaries automatically. -* `compression_dictionary_training_sampling_rate` (default: `100`): -Sampling rate for automatic training, range 1-10000 where 100 = 1% of +* `compression_dictionary_training_sampling_rate` (default: `0.01`): +Sampling rate for automatic training, range 0.01-1 where 0.01 = 1% of writes. Lower values reduce training overhead but may miss data patterns. Example configuration: @@ -317,7 +317,7 @@ compression_dictionary_cache_expire: 3600 # Automatic training compression_dictionary_training_auto_train_enabled: false -compression_dictionary_training_sampling_rate: 100 +compression_dictionary_training_sampling_rate: 0.01 ---- === CQL training parameters: diff --git a/src/java/org/apache/cassandra/db/compression/CompressionDictionaryManager.java b/src/java/org/apache/cassandra/db/compression/CompressionDictionaryManager.java index f2aa6575e230..ffe6605e13ce 100644 --- a/src/java/org/apache/cassandra/db/compression/CompressionDictionaryManager.java +++ b/src/java/org/apache/cassandra/db/compression/CompressionDictionaryManager.java @@ -416,10 +416,10 @@ private int internalTrainingParameterResolution(CompressionParams compressionPar String resolvedValue = null; try { - if (userSuppliedValue != null) - resolvedValue = userSuppliedValue; - else + if (userSuppliedValue == null) resolvedValue = compressionParams.getOtherOptions().getOrDefault(parameterName, defaultParameterValue); + else + resolvedValue = userSuppliedValue; return new DataStorageSpec.IntKibibytesBound(resolvedValue).toBytes(); } diff --git a/src/java/org/apache/cassandra/db/compression/ZstdDictionaryTrainer.java b/src/java/org/apache/cassandra/db/compression/ZstdDictionaryTrainer.java index 6ffb1d4e28b6..9d125f6e15f5 100644 --- a/src/java/org/apache/cassandra/db/compression/ZstdDictionaryTrainer.java +++ b/src/java/org/apache/cassandra/db/compression/ZstdDictionaryTrainer.java @@ -50,7 +50,7 @@ public class ZstdDictionaryTrainer implements ICompressionDictionaryTrainer private final String keyspaceName; private final String tableName; - private CompressionDictionaryTrainingConfig config; + private volatile CompressionDictionaryTrainingConfig config; private final AtomicLong totalSampleSize; private final AtomicLong sampleCount; private final int compressionLevel; // optimal if using the same level for training as when compressing. diff --git a/test/unit/org/apache/cassandra/tools/nodetool/TrainCompressionDictionaryTest.java b/test/unit/org/apache/cassandra/tools/nodetool/TrainCompressionDictionaryTest.java index 1ed1fcffa641..cbe9bd41cb7d 100644 --- a/test/unit/org/apache/cassandra/tools/nodetool/TrainCompressionDictionaryTest.java +++ b/test/unit/org/apache/cassandra/tools/nodetool/TrainCompressionDictionaryTest.java @@ -70,7 +70,7 @@ public void testTrainingParameterOverride() ToolRunner.ToolResult result = invokeNodetool("compressiondictionary", "train", keyspace(), table); result.asserts().failure(); assertThat(result.getStderr()) - .as("Should indicate training completed not completed") + .as("Should indicate training not completed") .contains("Trainer is not ready: insufficient sample size") .contains("/8 MiB") // 10MiB / 10 * 8 .contains(keyspace()) @@ -82,7 +82,7 @@ public void testTrainingParameterOverride() keyspace(), table); assertThat(resultWithOverrides.getStderr()) - .as("Should indicate training completed not completed") + .as("Should indicate training not completed") .contains("Trainer is not ready: insufficient sample size") .contains("/4 MiB") // 5MiB / 10 * 8 .contains(keyspace()) @@ -99,7 +99,7 @@ public void testTrainingParameterOverride() ToolRunner.ToolResult resultWithoutOverrides = invokeNodetool("compressiondictionary", "train", keyspace(), table); assertThat(resultWithoutOverrides.getStderr()) - .as("Should indicate training completed not completed") + .as("Should indicate training not completed") .contains("Trainer is not ready: insufficient sample size") .contains("/4.8 MiB") // 6MiB / 10 * 8 .contains(keyspace()) From 64240ecb202220ffd17ed98d3bb1b36b20d9115d Mon Sep 17 00:00:00 2001 From: Stefan Miklosovic Date: Sun, 21 Dec 2025 01:09:41 +0100 Subject: [PATCH 4/6] validate sample rate, rewrite shouldSample method Previously, if you had a sample rate of 0.01, then Math.round(1 / 0.01) = 100 and shouldSample method was doing ThreadLocalRandom.current().nextInt(samplingRate) == 0 which picked a number from 0 (inclusive) to 100 (exclusive). However, if we set sample rate to, for example, 0.75 to say that 75% should be sampled, then Math.round(1 / 0.75) = 1.33 and rounded it is 1. ThreadLocalRandom.current().nextInt(1) == 0 will be always true. That means what basically from some sample rate which rounds to 1 we lose the probability. The current approach works with floats and it is rewritten to ThreadLocalRandom.current().nextFloat() < samplingRate nextFloat() gives values between zero (inclusive) and one (exclusive). If we set sampling rate to 1 then it will be always true. If we set it to 0.01 that will be 1%. If we set it to 0.75 that will be 75%, without losing any accuracy. --- .../cassandra/config/DatabaseDescriptor.java | 3 +++ .../CompressionDictionaryTrainingConfig.java | 9 ++++++--- .../ICompressionDictionaryTrainer.java | 6 +++--- .../db/compression/ZstdDictionaryTrainer.java | 17 ++++++++--------- ...ompressionDictionaryTrainingConfigTest.java | 6 +++--- .../compression/ZstdDictionaryTrainerTest.java | 18 +++++++++--------- 6 files changed, 32 insertions(+), 27 deletions(-) diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index f2e8b5a64c8b..84d3038926a4 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -1236,6 +1236,9 @@ else if (conf.max_value_size.toMebibytes() >= 2048) { throw new ConfigurationException(ex.getMessage()); } + + if (conf.compression_dictionary_training_sampling_rate <= 0.0f || conf.compression_dictionary_training_sampling_rate > 1.0f) + throw new ConfigurationException("Sampling rate has to be between (0.0;1], it is " + conf.compression_dictionary_training_sampling_rate); } @VisibleForTesting diff --git a/src/java/org/apache/cassandra/db/compression/CompressionDictionaryTrainingConfig.java b/src/java/org/apache/cassandra/db/compression/CompressionDictionaryTrainingConfig.java index f580b0acfa1b..794807974b99 100644 --- a/src/java/org/apache/cassandra/db/compression/CompressionDictionaryTrainingConfig.java +++ b/src/java/org/apache/cassandra/db/compression/CompressionDictionaryTrainingConfig.java @@ -28,7 +28,7 @@ public class CompressionDictionaryTrainingConfig public final int maxDictionarySize; public final int maxTotalSampleSize; public final int acceptableTotalSampleSize; - public final int samplingRate; + public final float samplingRate; public final int chunkSize; private CompressionDictionaryTrainingConfig(Builder builder) @@ -49,7 +49,7 @@ public static class Builder { private int maxDictionarySize = 65536; // 64KB default private int maxTotalSampleSize = 10 * 1024 * 1024; // 10MB total - private int samplingRate = 100; // Sampling 1% + private float samplingRate = 0.01f; // Sampling 1% private int chunkSize = 64 * 1024; // 64KB default public Builder maxDictionarySize(int size) @@ -66,7 +66,10 @@ public Builder maxTotalSampleSize(int size) public Builder samplingRate(float samplingRate) { - this.samplingRate = Math.round(1 / samplingRate); + if (samplingRate <= 0.0f || samplingRate > 1.0f) + throw new IllegalArgumentException("Sampling rate has to be between (0.0;1], it is " + samplingRate); + + this.samplingRate = samplingRate; return this; } diff --git a/src/java/org/apache/cassandra/db/compression/ICompressionDictionaryTrainer.java b/src/java/org/apache/cassandra/db/compression/ICompressionDictionaryTrainer.java index 4e048c6f0cd1..3fb37d4e0174 100644 --- a/src/java/org/apache/cassandra/db/compression/ICompressionDictionaryTrainer.java +++ b/src/java/org/apache/cassandra/db/compression/ICompressionDictionaryTrainer.java @@ -125,10 +125,10 @@ default Future trainDictionaryAsync(boolean force) /** * Updates the sampling rate for this trainer. * - * @param newSamplingRate the new sampling rate. For exmaple, 1 = sample every time (100%), - * 2 = expect sample 1/2 of data (50%), n = expect sample 1/n of data + * @param newSamplingRate the new sampling rate. For exmaple, 0.01 - sample 1% of data, + * 1 = sample every time (100%), 0.5 - sample 50% of data. */ - void updateSamplingRate(int newSamplingRate); + void updateSamplingRate(float newSamplingRate); /** * Factory method to create appropriate trainer based on compression parameters. diff --git a/src/java/org/apache/cassandra/db/compression/ZstdDictionaryTrainer.java b/src/java/org/apache/cassandra/db/compression/ZstdDictionaryTrainer.java index 9d125f6e15f5..baf39d330ca0 100644 --- a/src/java/org/apache/cassandra/db/compression/ZstdDictionaryTrainer.java +++ b/src/java/org/apache/cassandra/db/compression/ZstdDictionaryTrainer.java @@ -56,7 +56,7 @@ public class ZstdDictionaryTrainer implements ICompressionDictionaryTrainer private final int compressionLevel; // optimal if using the same level for training as when compressing. // Sampling rate can be updated during training - private volatile int samplingRate; + private volatile float samplingRate; // Minimum number of samples required by ZSTD library private static final int MIN_SAMPLES_REQUIRED = 11; @@ -73,11 +73,11 @@ public ZstdDictionaryTrainer(String keyspaceName, String tableName, int compress this(keyspaceName, tableName, compressionLevel, - Math.round(1 / DatabaseDescriptor.getCompressionDictionaryTrainingSamplingRate())); + DatabaseDescriptor.getCompressionDictionaryTrainingSamplingRate()); } @VisibleForTesting - public ZstdDictionaryTrainer(String keyspaceName, String tableName, int compressionLevel, int samplingRate) + public ZstdDictionaryTrainer(String keyspaceName, String tableName, int compressionLevel, float samplingRate) { this.keyspaceName = keyspaceName; this.tableName = tableName; @@ -91,7 +91,7 @@ public ZstdDictionaryTrainer(String keyspaceName, String tableName, int compress @Override public boolean shouldSample() { - return zstdTrainer != null && ThreadLocalRandom.current().nextInt(samplingRate) == 0; + return zstdTrainer != null && ThreadLocalRandom.current().nextFloat() < samplingRate; } @Override @@ -360,12 +360,11 @@ public void setDictionaryTrainedListener(Consumer listene } @Override - public void updateSamplingRate(int newSamplingRate) + public void updateSamplingRate(float newSamplingRate) { - if (newSamplingRate <= 0) - { - throw new IllegalArgumentException("Sampling rate must be positive, got: " + newSamplingRate); - } + if (newSamplingRate <= 0.0f || newSamplingRate > 1.0f) + throw new IllegalArgumentException("Sampling rate has to be between (0.0;1], it is " + newSamplingRate); + this.samplingRate = newSamplingRate; logger.debug("Updated sampling rate to {} for {}.{}", newSamplingRate, keyspaceName, tableName); } diff --git a/test/unit/org/apache/cassandra/db/compression/CompressionDictionaryTrainingConfigTest.java b/test/unit/org/apache/cassandra/db/compression/CompressionDictionaryTrainingConfigTest.java index 8f3d746d035b..a066f629b982 100644 --- a/test/unit/org/apache/cassandra/db/compression/CompressionDictionaryTrainingConfigTest.java +++ b/test/unit/org/apache/cassandra/db/compression/CompressionDictionaryTrainingConfigTest.java @@ -36,8 +36,8 @@ public void testBuilderDefaults() .as("Default max total sample size should be 10MB") .isEqualTo(10 * 1024 * 1024); assertThat(config.samplingRate) - .as("Default sampling rate should be 100 (1%)") - .isEqualTo(100); + .as("Default sampling rate should be 0.01 (1%)") + .isEqualTo(0.01f); } @Test @@ -57,7 +57,7 @@ public void testCalculatedThresholds() assertThat(config.maxDictionarySize).isEqualTo(dictSize); assertThat(config.maxTotalSampleSize).isEqualTo(sampleSize); assertThat(config.acceptableTotalSampleSize).isEqualTo(sampleSize / 10 * 8); - assertThat(config.samplingRate).isEqualTo(Math.round(1 / samplingRate)); + assertThat(config.samplingRate).isEqualTo(0.005f); // Verify relationship between max and acceptable sample sizes assertThat(config.acceptableTotalSampleSize) diff --git a/test/unit/org/apache/cassandra/db/compression/ZstdDictionaryTrainerTest.java b/test/unit/org/apache/cassandra/db/compression/ZstdDictionaryTrainerTest.java index bcb3884a9208..ab9073ebe665 100644 --- a/test/unit/org/apache/cassandra/db/compression/ZstdDictionaryTrainerTest.java +++ b/test/unit/org/apache/cassandra/db/compression/ZstdDictionaryTrainerTest.java @@ -530,7 +530,7 @@ public void testUpdateSamplingRate() trainer.start(true, testConfig); // Test updating to different valid sampling rates - trainer.updateSamplingRate(10); + trainer.updateSamplingRate(0.1f); // With sampling rate 10 (10%), should mostly return false int sampleCount = 0; @@ -550,7 +550,7 @@ public void testUpdateSamplingRate() .isLessThan(iterations / 5); // at most 20% // Test updating to 100% sampling - trainer.updateSamplingRate(1); + trainer.updateSamplingRate(1.0f); // Should always sample now for (int i = 0; i < 10; i++) @@ -567,24 +567,24 @@ public void testUpdateSamplingRateValidation() trainer.start(true, testConfig); // Test invalid sampling rates - assertThatThrownBy(() -> trainer.updateSamplingRate(0)) + assertThatThrownBy(() -> trainer.updateSamplingRate(0f)) .isInstanceOf(IllegalArgumentException.class) - .hasMessageContaining("Sampling rate must be positive"); + .hasMessageContaining("Sampling rate has to be between (0.0;1], it is 0.0"); - assertThatThrownBy(() -> trainer.updateSamplingRate(-1)) + assertThatThrownBy(() -> trainer.updateSamplingRate(-1f)) .isInstanceOf(IllegalArgumentException.class) - .hasMessageContaining("Sampling rate must be positive"); + .hasMessageContaining("Sampling rate has to be between (0.0;1], it is -1.0"); - assertThatThrownBy(() -> trainer.updateSamplingRate(-100)) + assertThatThrownBy(() -> trainer.updateSamplingRate(-100f)) .isInstanceOf(IllegalArgumentException.class) - .hasMessageContaining("Sampling rate must be positive"); + .hasMessageContaining("Sampling rate has to be between (0.0;1], it is -100.0"); } @Test public void testUpdateSamplingRateBeforeStart() { // Should be able to update sampling rate even before start - trainer.updateSamplingRate(5); + trainer.updateSamplingRate(0.2f); trainer.start(true, testConfig); From c305e898bef344415f6d25726e2f63e4faeab61c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=A0tefan=20Miklo=C5=A1ovi=C4=8D?= Date: Mon, 22 Dec 2025 09:23:26 +0100 Subject: [PATCH 5/6] Update doc/modules/cassandra/pages/managing/operating/compression.adoc Co-authored-by: Yifan Cai --- .../cassandra/pages/managing/operating/compression.adoc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/doc/modules/cassandra/pages/managing/operating/compression.adoc b/doc/modules/cassandra/pages/managing/operating/compression.adoc index 2fb43b1cdb93..ca028aff9534 100644 --- a/doc/modules/cassandra/pages/managing/operating/compression.adoc +++ b/doc/modules/cassandra/pages/managing/operating/compression.adoc @@ -324,8 +324,8 @@ compression_dictionary_training_sampling_rate: 0.01 These parameters are meant to be configured via CQL for each respective table if defaults are not appropriate. -* `training_max_total_sample_size` (default: `10MiB`): Maximum total size of sample data to collect for training, approximately 10MB. This is a parameter of `ZstdDictionaryCompressor` -of a table, in `compression` section. +* `training_max_total_sample_size` (default: `10MiB`): Maximum total size of sample data to collect for training, approximately 10MB. This parameter is configured in the +table's compression options for `ZstdDictionaryCompressor`. * `training_max_dictionary_size` (default: `64KiB`): Maximum size of trained dictionaries in bytes. Larger dictionaries can capture more patterns but increase memory overhead. This is a parameter of `ZstdDictionaryCompressor` of a table, in `compression` section. Example: From 80c34f1321015dc96d427e2679dd3dc260a966ac Mon Sep 17 00:00:00 2001 From: Stefan Miklosovic Date: Mon, 22 Dec 2025 09:55:41 +0100 Subject: [PATCH 6/6] fixes --- .../db/compression/ZstdDictionaryTrainer.java | 11 ++++++++--- .../nodetool/CompressionDictionaryCommandGroup.java | 2 +- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/src/java/org/apache/cassandra/db/compression/ZstdDictionaryTrainer.java b/src/java/org/apache/cassandra/db/compression/ZstdDictionaryTrainer.java index baf39d330ca0..fdb1f5983b05 100644 --- a/src/java/org/apache/cassandra/db/compression/ZstdDictionaryTrainer.java +++ b/src/java/org/apache/cassandra/db/compression/ZstdDictionaryTrainer.java @@ -63,7 +63,7 @@ public class ZstdDictionaryTrainer implements ICompressionDictionaryTrainer private volatile Consumer dictionaryTrainedListener; // TODO: manage the samples in this class for auto-train (follow-up). The ZstdDictTrainer cannot be re-used for multiple training runs. - private ZstdDictTrainer zstdTrainer; + private volatile ZstdDictTrainer zstdTrainer; private volatile boolean closed = false; private volatile TrainingStatus currentTrainingStatus; private volatile String failureMessage; @@ -233,6 +233,12 @@ private String buildNotReadyMessage() return message.toString(); } + if (config == null) + { + message.append(": configuration not initialized (call start() first)"); + return message.toString(); + } + long currentSampleCount = sampleCount.get(); long currentTotalSampleSize = totalSampleSize.get(); @@ -306,7 +312,6 @@ public boolean start(boolean manualTraining, CompressionDictionaryTrainingConfig { // reset on starting; a new zstdTrainer instance is created during reset reset(trainingConfig); - config = trainingConfig; logger.info("Started dictionary training for {}.{}", keyspaceName, tableName); currentTrainingStatus = TrainingStatus.SAMPLING; failureMessage = null; // Clear any previous failure message @@ -343,7 +348,7 @@ public void reset(CompressionDictionaryTrainingConfig trainingConfig) totalSampleSize.set(0); sampleCount.set(0); zstdTrainer = new ZstdDictTrainer(trainingConfig.maxTotalSampleSize, trainingConfig.maxDictionarySize, compressionLevel); - config = null; + config = trainingConfig; } } diff --git a/src/java/org/apache/cassandra/tools/nodetool/CompressionDictionaryCommandGroup.java b/src/java/org/apache/cassandra/tools/nodetool/CompressionDictionaryCommandGroup.java index 4849291ac266..8ee7015b67c7 100644 --- a/src/java/org/apache/cassandra/tools/nodetool/CompressionDictionaryCommandGroup.java +++ b/src/java/org/apache/cassandra/tools/nodetool/CompressionDictionaryCommandGroup.java @@ -102,7 +102,7 @@ public void execute(NodeProbe probe) out.printf("Training from existing SSTables (flushing first if needed)%n"); Map parameters = new HashMap<>(); - if (trainingMaxTotalSampleSize != null) + if (trainingMaxDictionarySize != null) parameters.put(TRAINING_MAX_DICTIONARY_SIZE_PARAMETER_NAME, trainingMaxDictionarySize); if (trainingMaxTotalSampleSize != null)