From cbc1cb13e73657d3fa3914bfe64b181bef3a0249 Mon Sep 17 00:00:00 2001 From: Haifeng Chen Date: Mon, 6 Apr 2026 14:29:29 -0700 Subject: [PATCH] KAFKA-18309: Change log.segment.bytes config type from INT to LONG Change the log.segment.bytes (and its topic-level synonym segment.bytes) config type from INT to LONG to allow segment sizes larger than ~2GB (Integer.MAX_VALUE). The INT type limited the maximum configurable segment size to 2,147,483,647 bytes (~1.99 GB). This change lifts that restriction by using LONG type throughout the config definition, storage, and propagation layers. Changes: - LogConfig: Change DEFAULT_SEGMENT_BYTES to long, config definitions from INT to LONG, segmentSize field/accessor from int to long - AbstractKafkaConfig: logSegmentBytes() returns Long via getLong() - RollParams: maxSegmentBytes field from int to long - Cleaner: groupSegmentsBySize() maxSize param from int to long - initFileSize() safely caps at Integer.MAX_VALUE for FileRecords compatibility Backward/forward compatibility: - Backward compatible: existing INT values (stored as strings) parse correctly as LONG - Forward compatible: values <= Integer.MAX_VALUE work on older brokers; values > Integer.MAX_VALUE require the new version - No wire protocol impact (broker/topic config only) --- .../scala/unit/kafka/log/LogTestUtils.scala | 4 ++-- .../server/config/AbstractKafkaConfig.java | 4 ++-- .../kafka/storage/internals/log/Cleaner.java | 2 +- .../storage/internals/log/LogConfig.java | 20 +++++++++---------- .../storage/internals/log/RollParams.java | 2 +- .../storage/internals/log/LogTestUtils.java | 2 +- 6 files changed, 17 insertions(+), 17 deletions(-) diff --git a/core/src/test/scala/unit/kafka/log/LogTestUtils.scala b/core/src/test/scala/unit/kafka/log/LogTestUtils.scala index 9875a3b2ce441..5bc1a5b1b2c6e 100644 --- a/core/src/test/scala/unit/kafka/log/LogTestUtils.scala +++ b/core/src/test/scala/unit/kafka/log/LogTestUtils.scala @@ -61,7 +61,7 @@ object LogTestUtils { } def createLogConfig(segmentMs: Long = LogConfig.DEFAULT_SEGMENT_MS, - segmentBytes: Int = LogConfig.DEFAULT_SEGMENT_BYTES, + segmentBytes: Long = LogConfig.DEFAULT_SEGMENT_BYTES, retentionMs: Long = LogConfig.DEFAULT_RETENTION_MS, localRetentionMs: Long = LogConfig.DEFAULT_LOCAL_RETENTION_MS, retentionBytes: Long = ServerLogConfigs.LOG_RETENTION_BYTES_DEFAULT, @@ -77,7 +77,7 @@ object LogTestUtils { remoteLogDeleteOnDisable: Boolean = DEFAULT_REMOTE_LOG_DELETE_ON_DISABLE_CONFIG): LogConfig = { val logProps = new Properties() logProps.put(TopicConfig.SEGMENT_MS_CONFIG, segmentMs: java.lang.Long) - logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, segmentBytes: Integer) + logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, segmentBytes: java.lang.Long) logProps.put(TopicConfig.RETENTION_MS_CONFIG, retentionMs: java.lang.Long) logProps.put(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, localRetentionMs: java.lang.Long) logProps.put(TopicConfig.RETENTION_BYTES_CONFIG, retentionBytes: java.lang.Long) diff --git a/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java b/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java index d129fdcd65b08..470bcbe82153f 100644 --- a/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java +++ b/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java @@ -498,8 +498,8 @@ public int numPartitions() { return getInt(ServerLogConfigs.NUM_PARTITIONS_CONFIG); } - public Integer logSegmentBytes() { - return getInt(ServerLogConfigs.LOG_SEGMENT_BYTES_CONFIG); + public Long logSegmentBytes() { + return getLong(ServerLogConfigs.LOG_SEGMENT_BYTES_CONFIG); } public Long logFlushIntervalMessages() { diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/Cleaner.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/Cleaner.java index c926b5a540920..8f95d105742be 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/Cleaner.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/Cleaner.java @@ -637,7 +637,7 @@ private void restoreBuffers() { * * @return A list of grouped segments */ - public List> groupSegmentsBySize(List segments, int maxSize, int maxIndexSize, long firstUncleanableOffset) throws IOException { + public List> groupSegmentsBySize(List segments, long maxSize, int maxIndexSize, long firstUncleanableOffset) throws IOException { List> grouped = new ArrayList<>(); while (!segments.isEmpty()) { diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java index 1b1811ca5e775..ca32e9c0e2ddd 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java @@ -122,7 +122,7 @@ public Optional serverConfigName(String configName) { // Visible for testing public static final String SERVER_DEFAULT_HEADER_NAME = "Server Default Property"; - public static final int DEFAULT_SEGMENT_BYTES = 1024 * 1024 * 1024; + public static final long DEFAULT_SEGMENT_BYTES = 1024 * 1024 * 1024; public static final long DEFAULT_SEGMENT_MS = 24 * 7 * 60 * 60 * 1000L; public static final long DEFAULT_SEGMENT_JITTER_MS = 0; public static final long DEFAULT_RETENTION_MS = 24 * 7 * 60 * 60 * 1000L; @@ -147,7 +147,7 @@ public Optional serverConfigName(String configName) { .define(ServerLogConfigs.LOG_DIR_CONFIG, LIST, ServerLogConfigs.LOG_DIR_DEFAULT, ConfigDef.ValidList.anyNonDuplicateValues(false, false), HIGH, ServerLogConfigs.LOG_DIR_DOC) .define(ServerLogConfigs.LOG_DIRS_CONFIG, LIST, null, ConfigDef.ValidList.anyNonDuplicateValues(false, true), HIGH, ServerLogConfigs.LOG_DIRS_DOC) .define(ServerLogConfigs.CORDONED_LOG_DIRS_CONFIG, LIST, ServerLogConfigs.CORDONED_LOG_DIRS_DEFAULT, ConfigDef.ValidList.anyNonDuplicateValues(true, true), MEDIUM, ServerLogConfigs.CORDONED_LOG_DIRS_DOC) - .define(ServerLogConfigs.LOG_SEGMENT_BYTES_CONFIG, INT, DEFAULT_SEGMENT_BYTES, atLeast(1024 * 1024), HIGH, ServerLogConfigs.LOG_SEGMENT_BYTES_DOC) + .define(ServerLogConfigs.LOG_SEGMENT_BYTES_CONFIG, LONG, DEFAULT_SEGMENT_BYTES, atLeast(1024 * 1024), HIGH, ServerLogConfigs.LOG_SEGMENT_BYTES_DOC) .define(ServerLogConfigs.LOG_ROLL_TIME_MILLIS_CONFIG, LONG, null, HIGH, ServerLogConfigs.LOG_ROLL_TIME_MILLIS_DOC) .define(ServerLogConfigs.LOG_ROLL_TIME_HOURS_CONFIG, INT, (int) TimeUnit.MILLISECONDS.toHours(DEFAULT_SEGMENT_MS), atLeast(1), HIGH, ServerLogConfigs.LOG_ROLL_TIME_HOURS_DOC) @@ -185,7 +185,7 @@ public Optional serverConfigName(String configName) { private static final LogConfigDef CONFIG = new LogConfigDef(); static { CONFIG. - define(TopicConfig.SEGMENT_BYTES_CONFIG, INT, DEFAULT_SEGMENT_BYTES, atLeast(1024 * 1024), MEDIUM, + define(TopicConfig.SEGMENT_BYTES_CONFIG, LONG, DEFAULT_SEGMENT_BYTES, atLeast(1024 * 1024), MEDIUM, TopicConfig.SEGMENT_BYTES_DOC) .define(TopicConfig.SEGMENT_MS_CONFIG, LONG, DEFAULT_SEGMENT_MS, atLeast(1), MEDIUM, TopicConfig.SEGMENT_MS_DOC) .define(TopicConfig.SEGMENT_JITTER_MS_CONFIG, LONG, DEFAULT_SEGMENT_JITTER_MS, atLeast(0), MEDIUM, @@ -248,7 +248,7 @@ public Optional serverConfigName(String configName) { TopicConfig.LOCAL_LOG_RETENTION_BYTES_DOC) .define(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG, BOOLEAN, false, MEDIUM, TopicConfig.REMOTE_LOG_COPY_DISABLE_DOC) .define(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG, BOOLEAN, false, MEDIUM, TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_DOC) - .defineInternal(INTERNAL_SEGMENT_BYTES_CONFIG, INT, null, null, MEDIUM, INTERNAL_SEGMENT_BYTES_DOC); + .defineInternal(INTERNAL_SEGMENT_BYTES_CONFIG, LONG, null, null, MEDIUM, INTERNAL_SEGMENT_BYTES_DOC); } public final Set overriddenConfigs; @@ -257,8 +257,8 @@ public Optional serverConfigName(String configName) { * Important note: Any configuration parameter that is passed along from KafkaConfig to LogConfig * should also be in `KafkaConfig#extractLogConfigMap`. */ - private final int segmentSize; - private final Integer internalSegmentSize; + private final long segmentSize; + private final Long internalSegmentSize; public final long segmentMs; public final long segmentJitterMs; public final int maxIndexSize; @@ -301,8 +301,8 @@ public LogConfig(Map props, Set overriddenConfigs) { this.props = Collections.unmodifiableMap(props); this.overriddenConfigs = Collections.unmodifiableSet(overriddenConfigs); - this.segmentSize = getInt(TopicConfig.SEGMENT_BYTES_CONFIG); - this.internalSegmentSize = getInt(INTERNAL_SEGMENT_BYTES_CONFIG); + this.segmentSize = getLong(TopicConfig.SEGMENT_BYTES_CONFIG); + this.internalSegmentSize = getLong(INTERNAL_SEGMENT_BYTES_CONFIG); this.segmentMs = getLong(TopicConfig.SEGMENT_MS_CONFIG); this.segmentJitterMs = getLong(TopicConfig.SEGMENT_JITTER_MS_CONFIG); this.maxIndexSize = getInt(TopicConfig.SEGMENT_INDEX_BYTES_CONFIG); @@ -356,7 +356,7 @@ private Optional getCompression() { }; } - public int segmentSize() { + public long segmentSize() { if (internalSegmentSize == null) return segmentSize; return internalSegmentSize; } @@ -382,7 +382,7 @@ public long maxSegmentMs() { public int initFileSize() { if (preallocate) - return segmentSize(); + return (int) Math.min(segmentSize(), Integer.MAX_VALUE); else return 0; } diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/RollParams.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/RollParams.java index ca5874b61b914..f166582d3bc6c 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/RollParams.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/RollParams.java @@ -19,6 +19,6 @@ /** * A class used to hold params required to decide to rotate a log segment or not. */ -public record RollParams(long maxSegmentMs, int maxSegmentBytes, long maxTimestampInMessages, long maxOffsetInMessages, +public record RollParams(long maxSegmentMs, long maxSegmentBytes, long maxTimestampInMessages, long maxOffsetInMessages, int messagesSize, long now) { } diff --git a/storage/src/test/java/org/apache/kafka/storage/internals/log/LogTestUtils.java b/storage/src/test/java/org/apache/kafka/storage/internals/log/LogTestUtils.java index 67f8cb61e50b6..b08aa2bdb49fe 100644 --- a/storage/src/test/java/org/apache/kafka/storage/internals/log/LogTestUtils.java +++ b/storage/src/test/java/org/apache/kafka/storage/internals/log/LogTestUtils.java @@ -344,7 +344,7 @@ public LogConfigBuilder segmentMs(long segmentMs) { return this; } - public LogConfigBuilder segmentBytes(int segmentBytes) { + public LogConfigBuilder segmentBytes(long segmentBytes) { configs.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, segmentBytes); return this; }