Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1590,6 +1590,19 @@ public class ConfigOptions {
+ "The RateLimiter is always enabled. The default value is Long.MAX_VALUE (effectively unlimited). "
+ "Set to a lower value (e.g., 100MB) to limit the rate.");

public static final ConfigOption<Boolean> KV_STATISTICS_ENABLED =
key("kv.rocksdb.statistics.enabled")
.booleanType()
.defaultValue(false)
.withDescription(
"Whether to enable RocksDB statistics collection for metrics. "
+ "When enabled, RocksDB will collect various statistics like bytes read/written, "
+ "compaction time, flush time, etc., which can be exposed through Fluss metrics. "
+ "Enabling statistics has a small performance overhead (typically < 5%). "
+ "If you experience performance issues or don't need RocksDB-level metrics, "
+ "you can disable this option to reduce overhead. "
+ "The default value is `true`.");

// --------------------------------------------------------------------------
// Provided configurable ColumnFamilyOptions within Fluss
// --------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,9 +227,11 @@ public static KvTablet create(
private static RocksDBKv buildRocksDBKv(
Configuration configuration, File kvDir, RateLimiter sharedRateLimiter)
throws IOException {
// Enable statistics to support RocksDB statistics collection
// Check if statistics collection is enabled from configuration
boolean enableStatistics = configuration.get(ConfigOptions.KV_STATISTICS_ENABLED);
RocksDBResourceContainer rocksDBResourceContainer =
new RocksDBResourceContainer(configuration, kvDir, true, sharedRateLimiter);
new RocksDBResourceContainer(
configuration, kvDir, enableStatistics, sharedRateLimiter);
RocksDBKvBuilder rocksDBKvBuilder =
new RocksDBKvBuilder(
kvDir,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ public long getWriteLatencyMicros() {
* @return number of L0 files, or 0 if not available
*/
public long getNumFilesAtLevel0() {
return getPropertyValue(defaultColumnFamilyHandle, "rocksdb.num-files-at-level0");
return getPropertyLongValue(defaultColumnFamilyHandle, "rocksdb.num-files-at-level0");
}

/**
Expand All @@ -176,7 +176,7 @@ public long getNumFilesAtLevel0() {
* @return 1 if flush is pending, 0 otherwise
*/
public long getFlushPending() {
return getPropertyValue("rocksdb.mem-table-flush-pending");
return getPropertyLongValue("rocksdb.mem-table-flush-pending");
}

/**
Expand All @@ -185,7 +185,7 @@ public long getFlushPending() {
* @return 1 if compaction is pending, 0 otherwise
*/
public long getCompactionPending() {
return getPropertyValue("rocksdb.compaction-pending");
return getPropertyLongValue("rocksdb.compaction-pending");
}

/**
Expand Down Expand Up @@ -308,60 +308,44 @@ private long getHistogramValue(HistogramType histogramType) {
}

/**
* Get property value from RocksDB with resource guard protection.
* Get long property value from RocksDB with resource guard protection.
*
* <p>This method uses getLongProperty() instead of getProperty() to avoid string allocation in
* the JNI layer.
*
* @param propertyName the property name to query
* @return the property value as long, or 0 if not available or RocksDB is closed
*/
private long getPropertyValue(String propertyName) {
try (ResourceGuard.Lease lease = resourceGuard.acquireResource()) {
String value = db.getProperty(propertyName);
if (value != null && !value.isEmpty()) {
return Long.parseLong(value);
}
} catch (RocksDBException e) {
LOG.debug(
"Failed to get property {} from RocksDB (possibly closed or unavailable)",
propertyName,
e);
} catch (NumberFormatException e) {
LOG.debug("Failed to parse property {} value as long", propertyName, e);
} catch (Exception e) {
// ResourceGuard may throw exception if RocksDB is closed
LOG.debug(
"Failed to access RocksDB for property {} (possibly closed)", propertyName, e);
}
return 0L;
private long getPropertyLongValue(String propertyName) {
return getPropertyLongValue(null, propertyName);
}

/**
* Get property value from RocksDB for a specific column family with resource guard protection.
* Get long property value from RocksDB for a specific column family with resource guard
* protection.
*
* <p>Some RocksDB properties are column family specific and must be accessed through the column
* family handle.
* family handle. This method uses getLongProperty() instead of getProperty() to avoid string
* allocation in the JNI layer.
*
* @param columnFamilyHandle the column family handle
* @param columnFamilyHandle the column family handle, null for DB-level properties
* @param propertyName the property name to query
* @return the property value as long, or 0 if not available or RocksDB is closed
*/
private long getPropertyValue(ColumnFamilyHandle columnFamilyHandle, String propertyName) {
private long getPropertyLongValue(
@Nullable ColumnFamilyHandle columnFamilyHandle, String propertyName) {
try (ResourceGuard.Lease lease = resourceGuard.acquireResource()) {
if (columnFamilyHandle == null) {
return 0L;
}
String value = db.getProperty(columnFamilyHandle, propertyName);
if (value != null && !value.isEmpty()) {
return Long.parseLong(value);
if (columnFamilyHandle != null) {
return db.getLongProperty(columnFamilyHandle, propertyName);
} else {
return db.getLongProperty(propertyName);
}
} catch (RocksDBException e) {
LOG.debug(
"Failed to get property {} from RocksDB column family (possibly closed or unavailable)",
"Failed to get property {} from RocksDB (possibly closed or unavailable)",
propertyName,
e);
} catch (NumberFormatException e) {
LOG.debug("Failed to parse property {} value as long", propertyName, e);
} catch (Exception e) {
// ResourceGuard may throw exception if RocksDB is closed
LOG.debug(
"Failed to access RocksDB for property {} (possibly closed)", propertyName, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.fluss.server.kv;

import org.apache.fluss.config.ConfigOptions;
import org.apache.fluss.config.Configuration;
import org.apache.fluss.config.TableConfig;
import org.apache.fluss.exception.InvalidTargetColumnException;
Expand Down Expand Up @@ -1419,6 +1420,9 @@ private Value valueOf(BinaryRow row) {

@Test
void testRocksDBMetrics() throws Exception {
// Enable RocksDB statistics for this test
conf.set(ConfigOptions.KV_STATISTICS_ENABLED, true);

// Initialize tablet with schema
initLogTabletAndKvTablet(DATA1_SCHEMA_PK, new HashMap<>());

Expand Down
1 change: 1 addition & 0 deletions website/docs/maintenance/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ during the Fluss cluster working.
| kv.rocksdb.bloom-filter.bits-per-key | Double | 10.0 | Bits per key that bloom filter will use, this only take effect when bloom filter is used. The default value is 10.0. |
| kv.rocksdb.bloom-filter.block-based-mode | Boolean | false | If true, RocksDB will use block-based filter instead of full filter, this only take effect when bloom filter is used. The default value is `false`. |
| kv.rocksdb.shared-rate-limiter-bytes-per-sec | MemorySize | Long.MAX_VALUE | The bytes per second rate limit for RocksDB flush and compaction operations shared across all RocksDB instances on the TabletServer. The rate limiter is always enabled. The default value is Long.MAX_VALUE (effectively unlimited). Set to a lower value (e.g., 100MB) to limit the rate. This configuration can be updated dynamically without server restart. See [Updating Configs](operations/updating-configs.md) for more details. |
| kv.rocksdb.statistics.enabled | Boolean | true | Whether to enable RocksDB statistics collection for metrics. When enabled, RocksDB will collect various statistics like bytes read/written, compaction time, flush time, etc., which can be exposed through Fluss metrics. Enabling statistics has a small performance overhead (typically < 5%). If you experience performance issues or don't need RocksDB-level metrics, you can disable this option to reduce overhead. |
| kv.recover.log-record-batch.max-size | MemorySize | 16mb | The max fetch size for fetching log to apply to kv during recovering kv. |

## Metrics
Expand Down
Loading