Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions core/src/test/scala/unit/kafka/log/LogTestUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ object LogTestUtils {
}

def createLogConfig(segmentMs: Long = LogConfig.DEFAULT_SEGMENT_MS,
segmentBytes: Int = LogConfig.DEFAULT_SEGMENT_BYTES,
segmentBytes: Long = LogConfig.DEFAULT_SEGMENT_BYTES,
retentionMs: Long = LogConfig.DEFAULT_RETENTION_MS,
localRetentionMs: Long = LogConfig.DEFAULT_LOCAL_RETENTION_MS,
retentionBytes: Long = ServerLogConfigs.LOG_RETENTION_BYTES_DEFAULT,
Expand All @@ -77,7 +77,7 @@ object LogTestUtils {
remoteLogDeleteOnDisable: Boolean = DEFAULT_REMOTE_LOG_DELETE_ON_DISABLE_CONFIG): LogConfig = {
val logProps = new Properties()
logProps.put(TopicConfig.SEGMENT_MS_CONFIG, segmentMs: java.lang.Long)
logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, segmentBytes: Integer)
logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, segmentBytes: java.lang.Long)
logProps.put(TopicConfig.RETENTION_MS_CONFIG, retentionMs: java.lang.Long)
logProps.put(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, localRetentionMs: java.lang.Long)
logProps.put(TopicConfig.RETENTION_BYTES_CONFIG, retentionBytes: java.lang.Long)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -498,8 +498,8 @@ public int numPartitions() {
return getInt(ServerLogConfigs.NUM_PARTITIONS_CONFIG);
}

public Integer logSegmentBytes() {
return getInt(ServerLogConfigs.LOG_SEGMENT_BYTES_CONFIG);
public Long logSegmentBytes() {
return getLong(ServerLogConfigs.LOG_SEGMENT_BYTES_CONFIG);
}

public Long logFlushIntervalMessages() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -637,7 +637,7 @@ private void restoreBuffers() {
*
* @return A list of grouped segments
*/
public List<List<LogSegment>> groupSegmentsBySize(List<LogSegment> segments, int maxSize, int maxIndexSize, long firstUncleanableOffset) throws IOException {
public List<List<LogSegment>> groupSegmentsBySize(List<LogSegment> segments, long maxSize, int maxIndexSize, long firstUncleanableOffset) throws IOException {
List<List<LogSegment>> grouped = new ArrayList<>();

while (!segments.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ public Optional<String> serverConfigName(String configName) {
// Visible for testing
public static final String SERVER_DEFAULT_HEADER_NAME = "Server Default Property";

public static final int DEFAULT_SEGMENT_BYTES = 1024 * 1024 * 1024;
public static final long DEFAULT_SEGMENT_BYTES = 1024 * 1024 * 1024;
public static final long DEFAULT_SEGMENT_MS = 24 * 7 * 60 * 60 * 1000L;
public static final long DEFAULT_SEGMENT_JITTER_MS = 0;
public static final long DEFAULT_RETENTION_MS = 24 * 7 * 60 * 60 * 1000L;
Expand All @@ -147,7 +147,7 @@ public Optional<String> serverConfigName(String configName) {
.define(ServerLogConfigs.LOG_DIR_CONFIG, LIST, ServerLogConfigs.LOG_DIR_DEFAULT, ConfigDef.ValidList.anyNonDuplicateValues(false, false), HIGH, ServerLogConfigs.LOG_DIR_DOC)
.define(ServerLogConfigs.LOG_DIRS_CONFIG, LIST, null, ConfigDef.ValidList.anyNonDuplicateValues(false, true), HIGH, ServerLogConfigs.LOG_DIRS_DOC)
.define(ServerLogConfigs.CORDONED_LOG_DIRS_CONFIG, LIST, ServerLogConfigs.CORDONED_LOG_DIRS_DEFAULT, ConfigDef.ValidList.anyNonDuplicateValues(true, true), MEDIUM, ServerLogConfigs.CORDONED_LOG_DIRS_DOC)
.define(ServerLogConfigs.LOG_SEGMENT_BYTES_CONFIG, INT, DEFAULT_SEGMENT_BYTES, atLeast(1024 * 1024), HIGH, ServerLogConfigs.LOG_SEGMENT_BYTES_DOC)
.define(ServerLogConfigs.LOG_SEGMENT_BYTES_CONFIG, LONG, DEFAULT_SEGMENT_BYTES, atLeast(1024 * 1024), HIGH, ServerLogConfigs.LOG_SEGMENT_BYTES_DOC)

.define(ServerLogConfigs.LOG_ROLL_TIME_MILLIS_CONFIG, LONG, null, HIGH, ServerLogConfigs.LOG_ROLL_TIME_MILLIS_DOC)
.define(ServerLogConfigs.LOG_ROLL_TIME_HOURS_CONFIG, INT, (int) TimeUnit.MILLISECONDS.toHours(DEFAULT_SEGMENT_MS), atLeast(1), HIGH, ServerLogConfigs.LOG_ROLL_TIME_HOURS_DOC)
Expand Down Expand Up @@ -185,7 +185,7 @@ public Optional<String> serverConfigName(String configName) {
private static final LogConfigDef CONFIG = new LogConfigDef();
static {
CONFIG.
define(TopicConfig.SEGMENT_BYTES_CONFIG, INT, DEFAULT_SEGMENT_BYTES, atLeast(1024 * 1024), MEDIUM,
define(TopicConfig.SEGMENT_BYTES_CONFIG, LONG, DEFAULT_SEGMENT_BYTES, atLeast(1024 * 1024), MEDIUM,
TopicConfig.SEGMENT_BYTES_DOC)
.define(TopicConfig.SEGMENT_MS_CONFIG, LONG, DEFAULT_SEGMENT_MS, atLeast(1), MEDIUM, TopicConfig.SEGMENT_MS_DOC)
.define(TopicConfig.SEGMENT_JITTER_MS_CONFIG, LONG, DEFAULT_SEGMENT_JITTER_MS, atLeast(0), MEDIUM,
Expand Down Expand Up @@ -248,7 +248,7 @@ public Optional<String> serverConfigName(String configName) {
TopicConfig.LOCAL_LOG_RETENTION_BYTES_DOC)
.define(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG, BOOLEAN, false, MEDIUM, TopicConfig.REMOTE_LOG_COPY_DISABLE_DOC)
.define(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG, BOOLEAN, false, MEDIUM, TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_DOC)
.defineInternal(INTERNAL_SEGMENT_BYTES_CONFIG, INT, null, null, MEDIUM, INTERNAL_SEGMENT_BYTES_DOC);
.defineInternal(INTERNAL_SEGMENT_BYTES_CONFIG, LONG, null, null, MEDIUM, INTERNAL_SEGMENT_BYTES_DOC);
}

public final Set<String> overriddenConfigs;
Expand All @@ -257,8 +257,8 @@ public Optional<String> serverConfigName(String configName) {
* Important note: Any configuration parameter that is passed along from KafkaConfig to LogConfig
* should also be in `KafkaConfig#extractLogConfigMap`.
*/
private final int segmentSize;
private final Integer internalSegmentSize;
private final long segmentSize;
private final Long internalSegmentSize;
public final long segmentMs;
public final long segmentJitterMs;
public final int maxIndexSize;
Expand Down Expand Up @@ -301,8 +301,8 @@ public LogConfig(Map<?, ?> props, Set<String> overriddenConfigs) {
this.props = Collections.unmodifiableMap(props);
this.overriddenConfigs = Collections.unmodifiableSet(overriddenConfigs);

this.segmentSize = getInt(TopicConfig.SEGMENT_BYTES_CONFIG);
this.internalSegmentSize = getInt(INTERNAL_SEGMENT_BYTES_CONFIG);
this.segmentSize = getLong(TopicConfig.SEGMENT_BYTES_CONFIG);
this.internalSegmentSize = getLong(INTERNAL_SEGMENT_BYTES_CONFIG);
this.segmentMs = getLong(TopicConfig.SEGMENT_MS_CONFIG);
this.segmentJitterMs = getLong(TopicConfig.SEGMENT_JITTER_MS_CONFIG);
this.maxIndexSize = getInt(TopicConfig.SEGMENT_INDEX_BYTES_CONFIG);
Expand Down Expand Up @@ -356,7 +356,7 @@ private Optional<Compression> getCompression() {
};
}

public int segmentSize() {
public long segmentSize() {
if (internalSegmentSize == null) return segmentSize;
return internalSegmentSize;
}
Expand All @@ -382,7 +382,7 @@ public long maxSegmentMs() {

public int initFileSize() {
if (preallocate)
return segmentSize();
return (int) Math.min(segmentSize(), Integer.MAX_VALUE);
else
return 0;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,6 @@
/**
* A class used to hold params required to decide to rotate a log segment or not.
*/
public record RollParams(long maxSegmentMs, int maxSegmentBytes, long maxTimestampInMessages, long maxOffsetInMessages,
public record RollParams(long maxSegmentMs, long maxSegmentBytes, long maxTimestampInMessages, long maxOffsetInMessages,
int messagesSize, long now) {
}
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ public LogConfigBuilder segmentMs(long segmentMs) {
return this;
}

public LogConfigBuilder segmentBytes(int segmentBytes) {
public LogConfigBuilder segmentBytes(long segmentBytes) {
configs.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, segmentBytes);
return this;
}
Expand Down