From 1e5529357d03f20aac8cb597cf742cd0f8dfa2e0 Mon Sep 17 00:00:00 2001 From: David Jacot Date: Fri, 3 Apr 2026 09:34:03 +0200 Subject: [PATCH] KAFKA-20337: Make all GroupConfig fields Optional and clean up validation All GroupConfig fields are now Optional, storing only explicitly provided values. Broker-level defaults are resolved at access time via flatMap().orElse(brokerDefault), eliminating stale-capture issues when broker configs change dynamically. Key changes: - All 21 GroupConfig fields are private Optional, using optionalInt/Boolean/String helpers based on originals(). - GroupConfigManager no longer needs a defaultConfig; constructor simplified. - GroupCoordinatorConfig.extractGroupConfigMap(ShareGroupConfig) removed. - All consumers (GroupMetadataManager, ShareGroupConfigProvider, KafkaApis) use flatMap. - validate() uses CONFIG_DEF.parse() directly instead of constructing a GroupConfig, keeping validation independent from GroupConfig construction. - validateValues refactored with validateIntRange/Max/Min helpers operating on a single filtered parsed map. - Cross-field checks use broker defaults for missing values. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../scala/kafka/server/BrokerServer.scala | 2 +- .../main/scala/kafka/server/KafkaApis.scala | 5 +- .../server/share/SharePartitionTest.java | 36 +- .../api/PlaintextAdminIntegrationTest.scala | 4 +- .../server/DynamicConfigChangeTest.scala | 5 +- .../kafka/coordinator/group/GroupConfig.java | 749 ++++++++++-------- .../coordinator/group/GroupConfigManager.java | 9 +- .../group/GroupCoordinatorConfig.java | 51 -- .../group/GroupMetadataManager.java | 18 +- .../share/ShareGroupConfigProvider.java | 10 +- .../group/GroupConfigManagerTest.java | 4 +- .../coordinator/group/GroupConfigTest.java | 102 ++- .../group/GroupMetadataManagerTest.java | 1 - .../share/ShareGroupConfigProviderTest.java | 10 +- ...GroupCoordinatorShardLoadingBenchmark.java | 2 +- 15 files changed, 572 insertions(+), 436 deletions(-) diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala index cf43e9ef1ca5d..102981827cef8 100644 --- a/core/src/main/scala/kafka/server/BrokerServer.scala +++ b/core/src/main/scala/kafka/server/BrokerServer.scala @@ -375,7 +375,7 @@ class BrokerServer( authorizerPlugin = config.createNewAuthorizer(metrics, ProcessRole.BrokerRole.toString) /* initializing the groupConfigManager */ - groupConfigManager = new GroupConfigManager(config.groupCoordinatorConfig.extractGroupConfigMap(config.shareGroupConfig), config.groupCoordinatorConfig, config.shareGroupConfig) + groupConfigManager = new GroupConfigManager(config.groupCoordinatorConfig, config.shareGroupConfig) /* create share coordinator */ shareCoordinator = createShareCoordinator() diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 144696be4341c..e3de751eff013 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -2828,7 +2828,8 @@ class KafkaApis(val requestChannel: RequestChannel, } else { // Compute group-specific timeout for caching errors (2 * heartbeat interval) val heartbeatIntervalMs = Option(groupConfigManager.groupConfig(streamsGroupHeartbeatRequest.data.groupId).orElse(null)) - .map(_.streamsHeartbeatIntervalMs().toLong) + .flatMap(gc => OptionConverters.toScala(gc.streamsHeartbeatIntervalMs())) + .map(_.toLong) .getOrElse(config.groupCoordinatorConfig.streamsGroupHeartbeatIntervalMs().toLong) val timeoutMs = heartbeatIntervalMs * 2 @@ -3460,7 +3461,7 @@ class KafkaApis(val requestChannel: RequestChannel, shareFetchRequest.maxWait, fetchMinBytes, fetchMaxBytes, - FetchIsolation.of(FetchRequest.CONSUMER_REPLICA_ID, groupConfigManager.groupConfig(groupId).map(_.shareIsolationLevel()).orElse(GroupConfig.defaultShareIsolationLevel)), + FetchIsolation.of(FetchRequest.CONSUMER_REPLICA_ID, groupConfigManager.groupConfig(groupId).flatMap(_.shareIsolationLevel()).orElse(GroupConfig.defaultShareIsolationLevel)), clientMetadata, true ) diff --git a/core/src/test/java/kafka/server/share/SharePartitionTest.java b/core/src/test/java/kafka/server/share/SharePartitionTest.java index 285de56397f9c..24d37dc0700e3 100644 --- a/core/src/test/java/kafka/server/share/SharePartitionTest.java +++ b/core/src/test/java/kafka/server/share/SharePartitionTest.java @@ -219,7 +219,7 @@ public void testMaybeInitializeDefaultStartEpochGroupConfigReturnsEarliest() { GroupConfigManager groupConfigManager = Mockito.mock(GroupConfigManager.class); GroupConfig groupConfig = Mockito.mock(GroupConfig.class); Mockito.when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.of(groupConfig)); - Mockito.when(groupConfig.shareAutoOffsetReset()).thenReturn(ShareGroupAutoOffsetResetStrategy.EARLIEST); + Mockito.when(groupConfig.shareAutoOffsetReset()).thenReturn(Optional.of(ShareGroupAutoOffsetResetStrategy.EARLIEST)); ReplicaManager replicaManager = Mockito.mock(ReplicaManager.class); @@ -270,7 +270,7 @@ public void testMaybeInitializeDefaultStartEpochGroupConfigReturnsLatest() { GroupConfigManager groupConfigManager = Mockito.mock(GroupConfigManager.class); GroupConfig groupConfig = Mockito.mock(GroupConfig.class); Mockito.when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.of(groupConfig)); - Mockito.when(groupConfig.shareAutoOffsetReset()).thenReturn(ShareGroupAutoOffsetResetStrategy.LATEST); + Mockito.when(groupConfig.shareAutoOffsetReset()).thenReturn(Optional.of(ShareGroupAutoOffsetResetStrategy.LATEST)); ReplicaManager replicaManager = Mockito.mock(ReplicaManager.class); @@ -330,7 +330,7 @@ public void testMaybeInitializeDefaultStartEpochGroupConfigReturnsByDuration() Mockito.when(resetStrategy.type()).thenReturn(ShareGroupAutoOffsetResetStrategy.StrategyType.BY_DURATION); Mockito.when(resetStrategy.timestamp()).thenReturn(expectedTimestamp); - Mockito.when(groupConfig.shareAutoOffsetReset()).thenReturn(resetStrategy); + Mockito.when(groupConfig.shareAutoOffsetReset()).thenReturn(Optional.of(resetStrategy)); ReplicaManager replicaManager = Mockito.mock(ReplicaManager.class); @@ -480,7 +480,7 @@ public void testMaybeInitializeFetchOffsetForEarliestTimestampThrowsError() { GroupConfigManager groupConfigManager = Mockito.mock(GroupConfigManager.class); GroupConfig groupConfig = Mockito.mock(GroupConfig.class); Mockito.when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.of(groupConfig)); - Mockito.when(groupConfig.shareAutoOffsetReset()).thenReturn(ShareGroupAutoOffsetResetStrategy.EARLIEST); + Mockito.when(groupConfig.shareAutoOffsetReset()).thenReturn(Optional.of(ShareGroupAutoOffsetResetStrategy.EARLIEST)); ReplicaManager replicaManager = Mockito.mock(ReplicaManager.class); @@ -531,7 +531,7 @@ public void testMaybeInitializeFetchOffsetForByDurationThrowsError() { // final ShareGroupAutoOffsetResetStrategy resetStrategy = ShareGroupAutoOffsetResetStrategy.fromString("by_duration:PT1H"); final ShareGroupAutoOffsetResetStrategy resetStrategy = Mockito.mock(ShareGroupAutoOffsetResetStrategy.class); final long expectedTimestamp = MOCK_TIME.milliseconds() - TimeUnit.HOURS.toMillis(1); - Mockito.when(groupConfig.shareAutoOffsetReset()).thenReturn(resetStrategy); + Mockito.when(groupConfig.shareAutoOffsetReset()).thenReturn(Optional.of(resetStrategy)); Mockito.when(resetStrategy.type()).thenReturn(ShareGroupAutoOffsetResetStrategy.StrategyType.BY_DURATION); Mockito.when(resetStrategy.timestamp()).thenReturn(expectedTimestamp); @@ -7151,7 +7151,7 @@ public void testScheduleAcquisitionLockTimeoutValueFromGroupConfig() { GroupConfig groupConfig = Mockito.mock(GroupConfig.class); int expectedDurationMs = 500; Mockito.when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.of(groupConfig)); - Mockito.when(groupConfig.shareRecordLockDurationMs()).thenReturn(expectedDurationMs); + Mockito.when(groupConfig.shareRecordLockDurationMs()).thenReturn(Optional.of(expectedDurationMs)); SharePartition sharePartition = SharePartitionBuilder.builder() .withConfigProvider(new ShareGroupConfigProvider(groupConfigManager)).build(); @@ -7172,8 +7172,8 @@ public void testScheduleAcquisitionLockTimeoutValueUpdatesSuccessfully() { Mockito.when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.of(groupConfig)); // First invocation of shareRecordLockDurationMs() returns 500, and the second invocation returns 1000 Mockito.when(groupConfig.shareRecordLockDurationMs()) - .thenReturn(expectedDurationMs1) - .thenReturn(expectedDurationMs2); + .thenReturn(Optional.of(expectedDurationMs1)) + .thenReturn(Optional.of(expectedDurationMs2)); SharePartition sharePartition = SharePartitionBuilder.builder() .withConfigProvider(new ShareGroupConfigProvider(groupConfigManager)).build(); @@ -12324,7 +12324,7 @@ public void mockPersisterReadStateMethod(Persister persister) { public void testMaxDeliveryCountUsesGroupConfigWhenPresent() { GroupConfigManager groupConfigManager = Mockito.mock(GroupConfigManager.class); GroupConfig groupConfig = Mockito.mock(GroupConfig.class); - when(groupConfig.shareDeliveryCountLimit()).thenReturn(8); + when(groupConfig.shareDeliveryCountLimit()).thenReturn(Optional.of(8)); when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.of(groupConfig)); SharePartition sharePartition = SharePartitionBuilder.builder() @@ -12376,7 +12376,7 @@ public void testDynamicDeliveryCountDecreaseCausesArchival() { // Dynamically decrease the limit to 2 via group config BEFORE releasing. GroupConfig groupConfig = Mockito.mock(GroupConfig.class); - when(groupConfig.shareDeliveryCountLimit()).thenReturn(2); + when(groupConfig.shareDeliveryCountLimit()).thenReturn(Optional.of(2)); when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.of(groupConfig)); // Release: archival check fires because deliveryCount(2) >= maxDeliveryCount(2), @@ -12412,7 +12412,7 @@ public void testDynamicDeliveryCountIncreaseAllowsMoreDeliveries() { // Now increase limit to 10 via group config before the second acquire. GroupConfig groupConfig = Mockito.mock(GroupConfig.class); - when(groupConfig.shareDeliveryCountLimit()).thenReturn(10); + when(groupConfig.shareDeliveryCountLimit()).thenReturn(Optional.of(10)); when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.of(groupConfig)); // Second acquire: deliveryCount = 2. With old limit (2) this would archive. @@ -12432,7 +12432,7 @@ public void testDynamicDeliveryCountIncreaseAllowsMoreDeliveries() { public void testMaxInFlightRecordsUsesGroupConfigWhenPresent() { GroupConfigManager groupConfigManager = Mockito.mock(GroupConfigManager.class); GroupConfig groupConfig = Mockito.mock(GroupConfig.class); - when(groupConfig.sharePartitionMaxRecordLocks()).thenReturn(5000); + when(groupConfig.sharePartitionMaxRecordLocks()).thenReturn(Optional.of(5000)); when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.of(groupConfig)); SharePartition sharePartition = SharePartitionBuilder.builder() @@ -12477,7 +12477,7 @@ public void testDynamicPartitionMaxRecordLocksDecrease() { // Dynamically decrease the limit to 30 via group config. GroupConfig groupConfig = Mockito.mock(GroupConfig.class); - when(groupConfig.sharePartitionMaxRecordLocks()).thenReturn(30); + when(groupConfig.sharePartitionMaxRecordLocks()).thenReturn(Optional.of(30)); when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.of(groupConfig)); // The effective limit should now be 30. @@ -12510,7 +12510,7 @@ public void testDynamicPartitionMaxRecordLocksIncrease() { // Increase limit to 500 via group config. GroupConfig groupConfig = Mockito.mock(GroupConfig.class); - when(groupConfig.sharePartitionMaxRecordLocks()).thenReturn(500); + when(groupConfig.sharePartitionMaxRecordLocks()).thenReturn(Optional.of(500)); when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.of(groupConfig)); assertEquals(500, sharePartition.maxInFlightRecords()); @@ -12540,14 +12540,14 @@ public void testDynamicPartitionMaxRecordLocksExactBoundary() { // Dynamically set limit to exactly the in-flight count via group config. GroupConfig groupConfig = Mockito.mock(GroupConfig.class); - when(groupConfig.sharePartitionMaxRecordLocks()).thenReturn(50); + when(groupConfig.sharePartitionMaxRecordLocks()).thenReturn(Optional.of(50)); when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.of(groupConfig)); // Still at boundary: 50 < 50 is false. assertFalse(sharePartition.canAcquireRecords()); // Increase by 1 to cross the boundary. - when(groupConfig.sharePartitionMaxRecordLocks()).thenReturn(51); + when(groupConfig.sharePartitionMaxRecordLocks()).thenReturn(Optional.of(51)); // Now 50 < 51 is true. assertTrue(sharePartition.canAcquireRecords()); @@ -12557,7 +12557,7 @@ public void testDynamicPartitionMaxRecordLocksExactBoundary() { public void testDynamicPartitionMaxRecordLocksRemoveGroupConfig() { GroupConfigManager groupConfigManager = Mockito.mock(GroupConfigManager.class); GroupConfig groupConfig = Mockito.mock(GroupConfig.class); - when(groupConfig.sharePartitionMaxRecordLocks()).thenReturn(500); + when(groupConfig.sharePartitionMaxRecordLocks()).thenReturn(Optional.of(500)); when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.of(groupConfig)); SharePartition sharePartition = SharePartitionBuilder.builder() @@ -12591,7 +12591,7 @@ public void testDynamicPartitionMaxRecordLocksDecreaseBelowInFlightAffectsMaxRec // Decrease limit to 20, well below the 50 in-flight. GroupConfig groupConfig = Mockito.mock(GroupConfig.class); - when(groupConfig.sharePartitionMaxRecordLocks()).thenReturn(20); + when(groupConfig.sharePartitionMaxRecordLocks()).thenReturn(Optional.of(20)); when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.of(groupConfig)); // maxInFlightRecords - inFlightRecordsCount = 20 - 50 = -30, so maxRecordsToAcquire <= 0. diff --git a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala index a9c23a0bc0865..97adc98cee47c 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala @@ -1176,7 +1176,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { var configs = describeResult.all.get(15, TimeUnit.SECONDS) assertEquals("55000", configs.get(groupResource).get(GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG).value) // Before restart, 55000 is within [45000, 60000], so no adjustment needed - assertEquals(55000, brokerServers.head.groupConfigManager.groupConfig(groupId).get.consumerSessionTimeoutMs) + assertEquals(Optional.of(55000), brokerServers.head.groupConfigManager.groupConfig(groupId).get.consumerSessionTimeoutMs) // Kill all brokers client.close() @@ -1200,7 +1200,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { configs.get(groupResource).get(GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG).source) // Verify effective value is adjusted (55000 evaluated to new max 50000) - assertEquals(50000, brokerServers.head.groupConfigManager.groupConfig(groupId).get.consumerSessionTimeoutMs) + assertEquals(Optional.of(50000), brokerServers.head.groupConfigManager.groupConfig(groupId).get.consumerSessionTimeoutMs) } @Test diff --git a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala index d7231fbe7d7e4..0f510d1a06a8a 100644 --- a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala +++ b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala @@ -31,6 +31,7 @@ import org.apache.kafka.common.quota.ClientQuotaEntity.{CLIENT_ID, IP, USER} import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity} import org.apache.kafka.common.security.auth.KafkaPrincipal import org.apache.kafka.common.{TopicPartition, Uuid} +import java.util.Optional import org.apache.kafka.coordinator.group.{GroupConfig, GroupCoordinatorConfig} import org.apache.kafka.coordinator.share.ShareCoordinatorConfig import org.apache.kafka.metadata.MetadataCache @@ -419,7 +420,7 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness { } val groupConfig = brokerServers.head.groupCoordinator.groupConfig(consumerGroupId).get() - assertEquals(newSessionTimeoutMs, groupConfig.consumerSessionTimeoutMs()) + assertEquals(Optional.of(newSessionTimeoutMs), groupConfig.consumerSessionTimeoutMs()) } @Test @@ -445,7 +446,7 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness { } val groupConfig = brokerServers.head.groupCoordinator.groupConfig(shareGroupId).get() - assertEquals(newRecordLockDurationMs, groupConfig.shareRecordLockDurationMs) + assertEquals(Optional.of(newRecordLockDurationMs), groupConfig.shareRecordLockDurationMs) } @Test diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java index 0a6841ec32490..07247bf76ab46 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java @@ -27,7 +27,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.HashMap; import java.util.Locale; import java.util.Map; import java.util.Optional; @@ -47,7 +46,7 @@ */ public final class GroupConfig extends AbstractConfig { - private static final Logger log = LoggerFactory.getLogger(GroupConfig.class); + private static final Logger LOG = LoggerFactory.getLogger(GroupConfig.class); public static final String CONSUMER_SESSION_TIMEOUT_MS_CONFIG = "consumer.session.timeout.ms"; @@ -106,56 +105,47 @@ public final class GroupConfig extends AbstractConfig { public static final String STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG = "streams.task.offset.interval.ms"; - public final int consumerSessionTimeoutMs; + private final Optional consumerSessionTimeoutMs; - public final int consumerHeartbeatIntervalMs; + private final Optional consumerHeartbeatIntervalMs; - // These have to be optionals because their default group coordinator configs are dynamic, - // so we must not capture the default value at the time of GroupConfig construction. - // KAFKA-20337 tracks the work to refactor GroupConfig into something more consistent. - public final Optional consumerAssignmentIntervalMs; + private final Optional consumerAssignmentIntervalMs; - public final Optional consumerAssignorOffloadEnable; + private final Optional consumerAssignorOffloadEnable; - public final int shareSessionTimeoutMs; + private final Optional shareSessionTimeoutMs; - public final int shareHeartbeatIntervalMs; + private final Optional shareHeartbeatIntervalMs; - public final int shareRecordLockDurationMs; + private final Optional shareRecordLockDurationMs; - public final int shareDeliveryCountLimit; + private final Optional shareDeliveryCountLimit; - public final int sharePartitionMaxRecordLocks; + private final Optional sharePartitionMaxRecordLocks; - public final String shareAutoOffsetReset; + private final Optional shareAutoOffsetReset; - // These have to be optionals because their default group coordinator configs are dynamic, - // so we must not capture the default value at the time of GroupConfig construction. - // KAFKA-20337 tracks the work to refactor GroupConfig into something more consistent. - public final Optional shareAssignmentIntervalMs; + private final Optional shareAssignmentIntervalMs; - public final Optional shareAssignorOffloadEnable; + private final Optional shareAssignorOffloadEnable; - public final int streamsSessionTimeoutMs; + private final Optional streamsSessionTimeoutMs; - public final int streamsHeartbeatIntervalMs; + private final Optional streamsHeartbeatIntervalMs; - public final int streamsNumStandbyReplicas; + private final Optional streamsNumStandbyReplicas; - public final int streamsInitialRebalanceDelayMs; + private final Optional streamsInitialRebalanceDelayMs; - // These have to be optionals because their default group coordinator configs are dynamic, - // so we must not capture the default value at the time of GroupConfig construction. - // KAFKA-20337 tracks the work to refactor GroupConfig into something more consistent. - public final Optional streamsAssignmentIntervalMs; + private final Optional streamsAssignmentIntervalMs; - public final Optional streamsAssignorOffloadEnable; + private final Optional streamsAssignorOffloadEnable; - public final int streamsTaskOffsetIntervalMs; + private final Optional streamsTaskOffsetIntervalMs; - public final String shareIsolationLevel; + private final Optional shareIsolationLevel; - public final boolean shareRenewAcknowledgeEnable; + private final Optional shareRenewAcknowledgeEnable; public static final ConfigDef CONFIG_DEF = new ConfigDef() .define(CONSUMER_SESSION_TIMEOUT_MS_CONFIG, @@ -286,13 +276,13 @@ public final class GroupConfig extends AbstractConfig { * {@code Optional.empty()} indicates that the config has no broker-level synonym. */ public static final Map> ALL_GROUP_CONFIG_SYNONYMS = Map.ofEntries( - // Consumer group configs + // Consumer group configs. Map.entry(CONSUMER_SESSION_TIMEOUT_MS_CONFIG, Optional.of(GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG)), Map.entry(CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, Optional.of(GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG)), Map.entry(CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG, Optional.of(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG)), Map.entry(CONSUMER_ASSIGNOR_OFFLOAD_ENABLE_CONFIG, Optional.of(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG)), - // Share group configs + // Share group configs. Map.entry(SHARE_SESSION_TIMEOUT_MS_CONFIG, Optional.of(GroupCoordinatorConfig.SHARE_GROUP_SESSION_TIMEOUT_MS_CONFIG)), Map.entry(SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, Optional.of(GroupCoordinatorConfig.SHARE_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG)), Map.entry(SHARE_RECORD_LOCK_DURATION_MS_CONFIG, Optional.of(ShareGroupConfig.SHARE_GROUP_RECORD_LOCK_DURATION_MS_CONFIG)), @@ -304,7 +294,7 @@ public final class GroupConfig extends AbstractConfig { Map.entry(SHARE_ASSIGNMENT_INTERVAL_MS_CONFIG, Optional.of(GroupCoordinatorConfig.SHARE_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG)), Map.entry(SHARE_ASSIGNOR_OFFLOAD_ENABLE_CONFIG, Optional.of(GroupCoordinatorConfig.SHARE_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG)), - // Streams group configs + // Streams group configs. Map.entry(STREAMS_SESSION_TIMEOUT_MS_CONFIG, Optional.of(GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG)), Map.entry(STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG, Optional.of(GroupCoordinatorConfig.STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG)), Map.entry(STREAMS_NUM_STANDBY_REPLICAS_CONFIG, Optional.of(GroupCoordinatorConfig.STREAMS_GROUP_NUM_STANDBY_REPLICAS_CONFIG)), @@ -324,48 +314,41 @@ public static Optional brokerSynonym(String groupConfigName) { public GroupConfig(Map props) { super(CONFIG_DEF, props, false); - this.consumerSessionTimeoutMs = getInt(CONSUMER_SESSION_TIMEOUT_MS_CONFIG); - this.consumerHeartbeatIntervalMs = getInt(CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG); - // These have to be optionals because their default group coordinator configs are dynamic, - // so we must not capture the default value at the time of GroupConfig construction. - // KAFKA-20337 tracks the work to refactor GroupConfig into something more consistent. - this.consumerAssignmentIntervalMs = props.containsKey(CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG) ? - Optional.of(getInt(CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG)) : - Optional.empty(); - this.consumerAssignorOffloadEnable = props.containsKey(CONSUMER_ASSIGNOR_OFFLOAD_ENABLE_CONFIG) ? - Optional.of(getBoolean(CONSUMER_ASSIGNOR_OFFLOAD_ENABLE_CONFIG)) : - Optional.empty(); - this.shareSessionTimeoutMs = getInt(SHARE_SESSION_TIMEOUT_MS_CONFIG); - this.shareHeartbeatIntervalMs = getInt(SHARE_HEARTBEAT_INTERVAL_MS_CONFIG); - this.shareRecordLockDurationMs = getInt(SHARE_RECORD_LOCK_DURATION_MS_CONFIG); - this.shareDeliveryCountLimit = getInt(SHARE_DELIVERY_COUNT_LIMIT_CONFIG); - this.sharePartitionMaxRecordLocks = getInt(SHARE_PARTITION_MAX_RECORD_LOCKS_CONFIG); - this.shareAutoOffsetReset = getString(SHARE_AUTO_OFFSET_RESET_CONFIG); - // These have to be optionals because their default group coordinator configs are dynamic, - // so we must not capture the default value at the time of GroupConfig construction. - // KAFKA-20337 tracks the work to refactor GroupConfig into something more consistent. - this.shareAssignmentIntervalMs = props.containsKey(SHARE_ASSIGNMENT_INTERVAL_MS_CONFIG) ? - Optional.of(getInt(SHARE_ASSIGNMENT_INTERVAL_MS_CONFIG)) : - Optional.empty(); - this.shareAssignorOffloadEnable = props.containsKey(SHARE_ASSIGNOR_OFFLOAD_ENABLE_CONFIG) ? - Optional.of(getBoolean(SHARE_ASSIGNOR_OFFLOAD_ENABLE_CONFIG)) : - Optional.empty(); - this.streamsSessionTimeoutMs = getInt(STREAMS_SESSION_TIMEOUT_MS_CONFIG); - this.streamsHeartbeatIntervalMs = getInt(STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG); - this.streamsNumStandbyReplicas = getInt(STREAMS_NUM_STANDBY_REPLICAS_CONFIG); - this.streamsInitialRebalanceDelayMs = getInt(STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFIG); - // These have to be optionals because their default group coordinator configs are dynamic, - // so we must not capture the default value at the time of GroupConfig construction. - // KAFKA-20337 tracks the work to refactor GroupConfig into something more consistent. - this.streamsAssignmentIntervalMs = props.containsKey(STREAMS_ASSIGNMENT_INTERVAL_MS_CONFIG) ? - Optional.of(getInt(STREAMS_ASSIGNMENT_INTERVAL_MS_CONFIG)) : - Optional.empty(); - this.streamsAssignorOffloadEnable = props.containsKey(STREAMS_ASSIGNOR_OFFLOAD_ENABLE_CONFIG) ? - Optional.of(getBoolean(STREAMS_ASSIGNOR_OFFLOAD_ENABLE_CONFIG)) : - Optional.empty(); - this.streamsTaskOffsetIntervalMs = getInt(STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG); - this.shareIsolationLevel = getString(SHARE_ISOLATION_LEVEL_CONFIG); - this.shareRenewAcknowledgeEnable = getBoolean(SHARE_RENEW_ACKNOWLEDGE_ENABLE_CONFIG); + this.consumerSessionTimeoutMs = optionalInt(CONSUMER_SESSION_TIMEOUT_MS_CONFIG); + this.consumerHeartbeatIntervalMs = optionalInt(CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG); + this.consumerAssignmentIntervalMs = optionalInt(CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG); + this.consumerAssignorOffloadEnable = optionalBoolean(CONSUMER_ASSIGNOR_OFFLOAD_ENABLE_CONFIG); + this.shareSessionTimeoutMs = optionalInt(SHARE_SESSION_TIMEOUT_MS_CONFIG); + this.shareHeartbeatIntervalMs = optionalInt(SHARE_HEARTBEAT_INTERVAL_MS_CONFIG); + this.shareRecordLockDurationMs = optionalInt(SHARE_RECORD_LOCK_DURATION_MS_CONFIG); + this.shareDeliveryCountLimit = optionalInt(SHARE_DELIVERY_COUNT_LIMIT_CONFIG); + this.sharePartitionMaxRecordLocks = optionalInt(SHARE_PARTITION_MAX_RECORD_LOCKS_CONFIG); + this.shareAutoOffsetReset = optionalString(SHARE_AUTO_OFFSET_RESET_CONFIG) + .map(ShareGroupAutoOffsetResetStrategy::fromString); + this.shareAssignmentIntervalMs = optionalInt(SHARE_ASSIGNMENT_INTERVAL_MS_CONFIG); + this.shareAssignorOffloadEnable = optionalBoolean(SHARE_ASSIGNOR_OFFLOAD_ENABLE_CONFIG); + this.streamsSessionTimeoutMs = optionalInt(STREAMS_SESSION_TIMEOUT_MS_CONFIG); + this.streamsHeartbeatIntervalMs = optionalInt(STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG); + this.streamsNumStandbyReplicas = optionalInt(STREAMS_NUM_STANDBY_REPLICAS_CONFIG); + this.streamsInitialRebalanceDelayMs = optionalInt(STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFIG); + this.streamsAssignmentIntervalMs = optionalInt(STREAMS_ASSIGNMENT_INTERVAL_MS_CONFIG); + this.streamsAssignorOffloadEnable = optionalBoolean(STREAMS_ASSIGNOR_OFFLOAD_ENABLE_CONFIG); + this.streamsTaskOffsetIntervalMs = optionalInt(STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG); + this.shareIsolationLevel = optionalString(SHARE_ISOLATION_LEVEL_CONFIG) + .map(s -> IsolationLevel.valueOf(s.toUpperCase(Locale.ROOT))); + this.shareRenewAcknowledgeEnable = optionalBoolean(SHARE_RENEW_ACKNOWLEDGE_ENABLE_CONFIG); + } + + private Optional optionalInt(String key) { + return originals().containsKey(key) ? Optional.of(getInt(key)) : Optional.empty(); + } + + private Optional optionalBoolean(String key) { + return originals().containsKey(key) ? Optional.of(getBoolean(key)) : Optional.empty(); + } + + private Optional optionalString(String key) { + return originals().containsKey(key) ? Optional.of(getString(key)) : Optional.empty(); } public static Optional configType(String configName) { @@ -377,7 +360,7 @@ public static Set configNames() { } /** - * Check that property names are valid + * Check that property names are valid. */ public static void validateNames(Map props) { Set names = configNames(); @@ -389,172 +372,242 @@ public static void validateNames(Map props) { } /** - * Validates the values of the given properties. - */ - @SuppressWarnings({"CyclomaticComplexity", "NPathComplexity"}) - private static void validateValues(Map unparsedMap, GroupCoordinatorConfig groupCoordinatorConfig, ShareGroupConfig shareGroupConfig) { - Map valueMaps = CONFIG_DEF.parse(unparsedMap); - int consumerHeartbeatInterval = (Integer) valueMaps.get(CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG); - int consumerSessionTimeout = (Integer) valueMaps.get(CONSUMER_SESSION_TIMEOUT_MS_CONFIG); - int consumerAssignmentIntervalMs = (Integer) valueMaps.get(CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG); - int shareHeartbeatInterval = (Integer) valueMaps.get(SHARE_HEARTBEAT_INTERVAL_MS_CONFIG); - int shareSessionTimeout = (Integer) valueMaps.get(SHARE_SESSION_TIMEOUT_MS_CONFIG); - int shareRecordLockDurationMs = (Integer) valueMaps.get(SHARE_RECORD_LOCK_DURATION_MS_CONFIG); - int shareDeliveryCountLimit = (Integer) valueMaps.get(SHARE_DELIVERY_COUNT_LIMIT_CONFIG); - int sharePartitionMaxRecordLocks = (Integer) valueMaps.get(SHARE_PARTITION_MAX_RECORD_LOCKS_CONFIG); - int shareAssignmentIntervalMs = (Integer) valueMaps.get(SHARE_ASSIGNMENT_INTERVAL_MS_CONFIG); - int streamsSessionTimeoutMs = (Integer) valueMaps.get(STREAMS_SESSION_TIMEOUT_MS_CONFIG); - int streamsHeartbeatIntervalMs = (Integer) valueMaps.get(STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG); - int streamsNumStandbyReplicas = (Integer) valueMaps.get(STREAMS_NUM_STANDBY_REPLICAS_CONFIG); - int streamsAssignmentIntervalMs = (Integer) valueMaps.get(STREAMS_ASSIGNMENT_INTERVAL_MS_CONFIG); - int streamsTaskOffsetIntervalMs = (Integer) valueMaps.get(STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG); - if (consumerHeartbeatInterval < groupCoordinatorConfig.consumerGroupMinHeartbeatIntervalMs()) { - throw new InvalidConfigurationException(CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG + " must be greater than or equal to " + - GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG); - } - if (consumerHeartbeatInterval > groupCoordinatorConfig.consumerGroupMaxHeartbeatIntervalMs()) { - throw new InvalidConfigurationException(CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG + " must be less than or equal to " + - GroupCoordinatorConfig.CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG); - } - if (consumerSessionTimeout < groupCoordinatorConfig.consumerGroupMinSessionTimeoutMs()) { - throw new InvalidConfigurationException(CONSUMER_SESSION_TIMEOUT_MS_CONFIG + " must be greater than or equal to " + - GroupCoordinatorConfig.CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG); - } - if (consumerSessionTimeout > groupCoordinatorConfig.consumerGroupMaxSessionTimeoutMs()) { - throw new InvalidConfigurationException(CONSUMER_SESSION_TIMEOUT_MS_CONFIG + " must be less than or equal to " + - GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG); - } - // If no group-level consumer assignment interval is configured, do not attempt to validate it. - // TODO: It's not clear if we can run the validation unconditionally. - // KAFKA-20337 tracks the work to clean this up. - if (unparsedMap.containsKey(CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG)) { - if (consumerAssignmentIntervalMs < groupCoordinatorConfig.consumerGroupMinAssignmentIntervalMs()) { - throw new InvalidConfigurationException(CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG + " must be greater than or equal to " + - GroupCoordinatorConfig.CONSUMER_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_CONFIG); - } - if (consumerAssignmentIntervalMs > groupCoordinatorConfig.consumerGroupMaxAssignmentIntervalMs()) { - throw new InvalidConfigurationException(CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG + " must be less than or equal to " + - GroupCoordinatorConfig.CONSUMER_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_CONFIG); - } - } - if (shareHeartbeatInterval < groupCoordinatorConfig.shareGroupMinHeartbeatIntervalMs()) { - throw new InvalidConfigurationException(SHARE_HEARTBEAT_INTERVAL_MS_CONFIG + " must be greater than or equal to " + - GroupCoordinatorConfig.SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG); - } - if (shareHeartbeatInterval > groupCoordinatorConfig.shareGroupMaxHeartbeatIntervalMs()) { - throw new InvalidConfigurationException(SHARE_HEARTBEAT_INTERVAL_MS_CONFIG + " must be less than or equal to " + - GroupCoordinatorConfig.SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG); - } - if (shareSessionTimeout < groupCoordinatorConfig.shareGroupMinSessionTimeoutMs()) { - throw new InvalidConfigurationException(SHARE_SESSION_TIMEOUT_MS_CONFIG + " must be greater than or equal to " + - GroupCoordinatorConfig.SHARE_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG); - } - if (shareSessionTimeout > groupCoordinatorConfig.shareGroupMaxSessionTimeoutMs()) { - throw new InvalidConfigurationException(SHARE_SESSION_TIMEOUT_MS_CONFIG + " must be less than or equal to " + - GroupCoordinatorConfig.SHARE_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG); - } - if (shareRecordLockDurationMs < shareGroupConfig.shareGroupMinRecordLockDurationMs()) { - throw new InvalidConfigurationException(SHARE_RECORD_LOCK_DURATION_MS_CONFIG + " must be greater than or equal to " + - ShareGroupConfig.SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_CONFIG); - } - if (shareRecordLockDurationMs > shareGroupConfig.shareGroupMaxRecordLockDurationMs()) { - throw new InvalidConfigurationException(SHARE_RECORD_LOCK_DURATION_MS_CONFIG + " must be less than or equal to " + - ShareGroupConfig.SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_CONFIG); - } - if (shareDeliveryCountLimit < shareGroupConfig.shareGroupMinDeliveryCountLimit()) { - throw new InvalidConfigurationException(SHARE_DELIVERY_COUNT_LIMIT_CONFIG + " must be greater than or equal to " + - ShareGroupConfig.SHARE_GROUP_MIN_DELIVERY_COUNT_LIMIT_CONFIG); - } - if (shareDeliveryCountLimit > shareGroupConfig.shareGroupMaxDeliveryCountLimit()) { - throw new InvalidConfigurationException(SHARE_DELIVERY_COUNT_LIMIT_CONFIG + " must be less than or equal to " + - ShareGroupConfig.SHARE_GROUP_MAX_DELIVERY_COUNT_LIMIT_CONFIG); - } - if (sharePartitionMaxRecordLocks < shareGroupConfig.shareGroupMinPartitionMaxRecordLocks()) { - throw new InvalidConfigurationException(SHARE_PARTITION_MAX_RECORD_LOCKS_CONFIG + " must be greater than or equal to " + - ShareGroupConfig.SHARE_GROUP_MIN_PARTITION_MAX_RECORD_LOCKS_CONFIG); - } - if (sharePartitionMaxRecordLocks > shareGroupConfig.shareGroupMaxPartitionMaxRecordLocks()) { - throw new InvalidConfigurationException(SHARE_PARTITION_MAX_RECORD_LOCKS_CONFIG + " must be less than or equal to " + - ShareGroupConfig.SHARE_GROUP_MAX_PARTITION_MAX_RECORD_LOCKS_CONFIG); - } - // If no group-level share assignment interval is configured, do not attempt to validate it. - // TODO: It's not clear if we can run the validation unconditionally. - // KAFKA-20337 tracks the work to clean this up. - if (unparsedMap.containsKey(SHARE_ASSIGNMENT_INTERVAL_MS_CONFIG)) { - if (shareAssignmentIntervalMs < groupCoordinatorConfig.shareGroupMinAssignmentIntervalMs()) { - throw new InvalidConfigurationException(SHARE_ASSIGNMENT_INTERVAL_MS_CONFIG + " must be greater than or equal to " + - GroupCoordinatorConfig.SHARE_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_CONFIG); - } - if (shareAssignmentIntervalMs > groupCoordinatorConfig.shareGroupMaxAssignmentIntervalMs()) { - throw new InvalidConfigurationException(SHARE_ASSIGNMENT_INTERVAL_MS_CONFIG + " must be less than or equal to " + - GroupCoordinatorConfig.SHARE_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_CONFIG); - } - } - if (streamsHeartbeatIntervalMs < groupCoordinatorConfig.streamsGroupMinHeartbeatIntervalMs()) { - throw new InvalidConfigurationException(STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG + " must be greater than or equal to " + - GroupCoordinatorConfig.STREAMS_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG); - } - if (streamsHeartbeatIntervalMs > groupCoordinatorConfig.streamsGroupMaxHeartbeatIntervalMs()) { - throw new InvalidConfigurationException(STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG + " must be less than or equal to " + - GroupCoordinatorConfig.STREAMS_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG); - } - if (streamsSessionTimeoutMs < groupCoordinatorConfig.streamsGroupMinSessionTimeoutMs()) { - throw new InvalidConfigurationException(STREAMS_SESSION_TIMEOUT_MS_CONFIG + " must be greater than or equal to " + - GroupCoordinatorConfig.STREAMS_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG); - } - if (streamsSessionTimeoutMs > groupCoordinatorConfig.streamsGroupMaxSessionTimeoutMs()) { - throw new InvalidConfigurationException(STREAMS_SESSION_TIMEOUT_MS_CONFIG + " must be less than or equal to " + - GroupCoordinatorConfig.STREAMS_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG); - } - if (streamsNumStandbyReplicas > groupCoordinatorConfig.streamsGroupMaxNumStandbyReplicas()) { - throw new InvalidConfigurationException(STREAMS_NUM_STANDBY_REPLICAS_CONFIG + " must be less than or equal to " + - GroupCoordinatorConfig.STREAMS_GROUP_MAX_STANDBY_REPLICAS_CONFIG); - } - // If no group-level streams assignment interval is configured, do not attempt to validate it. - // TODO: It's not clear if we can run the validation unconditionally. - // KAFKA-20337 tracks the work to clean this up. - if (unparsedMap.containsKey(STREAMS_ASSIGNMENT_INTERVAL_MS_CONFIG)) { - if (streamsAssignmentIntervalMs < groupCoordinatorConfig.streamsGroupMinAssignmentIntervalMs()) { - throw new InvalidConfigurationException(STREAMS_ASSIGNMENT_INTERVAL_MS_CONFIG + " must be greater than or equal to " + - GroupCoordinatorConfig.STREAMS_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_CONFIG); - } - if (streamsAssignmentIntervalMs > groupCoordinatorConfig.streamsGroupMaxAssignmentIntervalMs()) { - throw new InvalidConfigurationException(STREAMS_ASSIGNMENT_INTERVAL_MS_CONFIG + " must be less than or equal to " + - GroupCoordinatorConfig.STREAMS_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_CONFIG); - } - } - if (streamsTaskOffsetIntervalMs < groupCoordinatorConfig.streamsGroupMinTaskOffsetIntervalMs()) { - throw new InvalidConfigurationException(STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG + " must be greater than or equal to " + - GroupCoordinatorConfig.STREAMS_GROUP_MIN_TASK_OFFSET_INTERVAL_MS_CONFIG); - } - if (consumerSessionTimeout <= consumerHeartbeatInterval) { - throw new InvalidConfigurationException(CONSUMER_SESSION_TIMEOUT_MS_CONFIG + " must be greater than " + - CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG); - } - if (shareSessionTimeout <= shareHeartbeatInterval) { - throw new InvalidConfigurationException(SHARE_SESSION_TIMEOUT_MS_CONFIG + " must be greater than " + - SHARE_HEARTBEAT_INTERVAL_MS_CONFIG); - } - if (streamsSessionTimeoutMs <= streamsHeartbeatIntervalMs) { - throw new InvalidConfigurationException(STREAMS_SESSION_TIMEOUT_MS_CONFIG + " must be greater than " + - STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG); - } + * Check that the given properties contain only valid group config names and that + * all values can be parsed and are valid. + */ + public static void validate( + Map props, + GroupCoordinatorConfig groupCoordinatorConfig, + ShareGroupConfig shareGroupConfig + ) { + validateNames(props); + var parsed = CONFIG_DEF.parse(props); + parsed.keySet().retainAll(props.keySet()); + validateValues( + parsed, + groupCoordinatorConfig, + shareGroupConfig + ); } /** - * Check that the given properties contain only valid group config names and that - * all values can be parsed and are valid. The provided properties are merged with - * the broker-level defaults before validation. + * Validates the parsed values against broker-level bounds. + * Only configs explicitly present in the parsed map are validated. */ - public static void validate(Map props, GroupCoordinatorConfig groupCoordinatorConfig, ShareGroupConfig shareGroupConfig) { - // TODO: We shouldn't be re-deriving the default config from GroupConfigManager here. - // KAFKA-20337 tracks the work to clean this up. - Map combinedConfigs = new HashMap<>(); - combinedConfigs.putAll(groupCoordinatorConfig.extractGroupConfigMap(shareGroupConfig)); - combinedConfigs.putAll(props); + private static void validateValues( + Map parsed, + GroupCoordinatorConfig groupCoordinatorConfig, + ShareGroupConfig shareGroupConfig + ) { + // Consumer group configs. + validateIntRange( + parsed, + CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, + groupCoordinatorConfig.consumerGroupMinHeartbeatIntervalMs(), + GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG, + groupCoordinatorConfig.consumerGroupMaxHeartbeatIntervalMs(), + GroupCoordinatorConfig.CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG + ); + validateIntRange( + parsed, + CONSUMER_SESSION_TIMEOUT_MS_CONFIG, + groupCoordinatorConfig.consumerGroupMinSessionTimeoutMs(), + GroupCoordinatorConfig.CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, + groupCoordinatorConfig.consumerGroupMaxSessionTimeoutMs(), + GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG + ); + validateIntRange( + parsed, + CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG, + groupCoordinatorConfig.consumerGroupMinAssignmentIntervalMs(), + GroupCoordinatorConfig.CONSUMER_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_CONFIG, + groupCoordinatorConfig.consumerGroupMaxAssignmentIntervalMs(), + GroupCoordinatorConfig.CONSUMER_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_CONFIG + ); + + // Share group configs. + validateIntRange( + parsed, + SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, + groupCoordinatorConfig.shareGroupMinHeartbeatIntervalMs(), + GroupCoordinatorConfig.SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG, + groupCoordinatorConfig.shareGroupMaxHeartbeatIntervalMs(), + GroupCoordinatorConfig.SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG + ); + validateIntRange( + parsed, + SHARE_SESSION_TIMEOUT_MS_CONFIG, + groupCoordinatorConfig.shareGroupMinSessionTimeoutMs(), + GroupCoordinatorConfig.SHARE_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, + groupCoordinatorConfig.shareGroupMaxSessionTimeoutMs(), + GroupCoordinatorConfig.SHARE_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG + ); + validateIntRange( + parsed, + SHARE_RECORD_LOCK_DURATION_MS_CONFIG, + shareGroupConfig.shareGroupMinRecordLockDurationMs(), + ShareGroupConfig.SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_CONFIG, + shareGroupConfig.shareGroupMaxRecordLockDurationMs(), + ShareGroupConfig.SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_CONFIG + ); + validateIntRange( + parsed, + SHARE_DELIVERY_COUNT_LIMIT_CONFIG, + shareGroupConfig.shareGroupMinDeliveryCountLimit(), + ShareGroupConfig.SHARE_GROUP_MIN_DELIVERY_COUNT_LIMIT_CONFIG, + shareGroupConfig.shareGroupMaxDeliveryCountLimit(), + ShareGroupConfig.SHARE_GROUP_MAX_DELIVERY_COUNT_LIMIT_CONFIG + ); + validateIntRange( + parsed, + SHARE_PARTITION_MAX_RECORD_LOCKS_CONFIG, + shareGroupConfig.shareGroupMinPartitionMaxRecordLocks(), + ShareGroupConfig.SHARE_GROUP_MIN_PARTITION_MAX_RECORD_LOCKS_CONFIG, + shareGroupConfig.shareGroupMaxPartitionMaxRecordLocks(), + ShareGroupConfig.SHARE_GROUP_MAX_PARTITION_MAX_RECORD_LOCKS_CONFIG + ); + validateIntRange( + parsed, + SHARE_ASSIGNMENT_INTERVAL_MS_CONFIG, + groupCoordinatorConfig.shareGroupMinAssignmentIntervalMs(), + GroupCoordinatorConfig.SHARE_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_CONFIG, + groupCoordinatorConfig.shareGroupMaxAssignmentIntervalMs(), + GroupCoordinatorConfig.SHARE_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_CONFIG + ); + + // Streams group configs. + validateIntRange( + parsed, + STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG, + groupCoordinatorConfig.streamsGroupMinHeartbeatIntervalMs(), + GroupCoordinatorConfig.STREAMS_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG, + groupCoordinatorConfig.streamsGroupMaxHeartbeatIntervalMs(), + GroupCoordinatorConfig.STREAMS_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG + ); + validateIntRange( + parsed, + STREAMS_SESSION_TIMEOUT_MS_CONFIG, + groupCoordinatorConfig.streamsGroupMinSessionTimeoutMs(), + GroupCoordinatorConfig.STREAMS_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, + groupCoordinatorConfig.streamsGroupMaxSessionTimeoutMs(), + GroupCoordinatorConfig.STREAMS_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG + ); + validateIntMax( + parsed, + STREAMS_NUM_STANDBY_REPLICAS_CONFIG, + groupCoordinatorConfig.streamsGroupMaxNumStandbyReplicas(), + GroupCoordinatorConfig.STREAMS_GROUP_MAX_STANDBY_REPLICAS_CONFIG + ); + validateIntRange( + parsed, + STREAMS_ASSIGNMENT_INTERVAL_MS_CONFIG, + groupCoordinatorConfig.streamsGroupMinAssignmentIntervalMs(), + GroupCoordinatorConfig.STREAMS_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_CONFIG, + groupCoordinatorConfig.streamsGroupMaxAssignmentIntervalMs(), + GroupCoordinatorConfig.STREAMS_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_CONFIG + ); + validateIntMin( + parsed, + STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG, + groupCoordinatorConfig.streamsGroupMinTaskOffsetIntervalMs(), + GroupCoordinatorConfig.STREAMS_GROUP_MIN_TASK_OFFSET_INTERVAL_MS_CONFIG + ); + + // Cross-field validations: session timeout must be greater than heartbeat interval. + validateSessionExceedsHeartbeat( + parsed, + CONSUMER_SESSION_TIMEOUT_MS_CONFIG, + groupCoordinatorConfig.consumerGroupSessionTimeoutMs(), + CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, + groupCoordinatorConfig.consumerGroupHeartbeatIntervalMs() + ); + validateSessionExceedsHeartbeat( + parsed, + SHARE_SESSION_TIMEOUT_MS_CONFIG, + groupCoordinatorConfig.shareGroupSessionTimeoutMs(), + SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, + groupCoordinatorConfig.shareGroupHeartbeatIntervalMs() + ); + validateSessionExceedsHeartbeat( + parsed, + STREAMS_SESSION_TIMEOUT_MS_CONFIG, + groupCoordinatorConfig.streamsGroupSessionTimeoutMs(), + STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG, + groupCoordinatorConfig.streamsGroupHeartbeatIntervalMs() + ); + } + + /** + * Validates that an integer config value falls within [min, max]. + * No-op when the key is absent from the parsed map. + */ + private static void validateIntRange( + Map parsed, + String key, + int min, + String minConfigName, + int max, + String maxConfigName + ) { + if (!parsed.containsKey(key)) return; + int value = (Integer) parsed.get(key); + if (value < min) + throw new InvalidConfigurationException(key + " must be greater than or equal to " + minConfigName); + if (value > max) + throw new InvalidConfigurationException(key + " must be less than or equal to " + maxConfigName); + } + + /** + * Validates that an integer config value does not exceed max. + * No-op when the key is absent from the parsed map. + */ + private static void validateIntMax( + Map parsed, + String key, + int max, + String maxConfigName + ) { + if (!parsed.containsKey(key)) return; + int value = (Integer) parsed.get(key); + if (value > max) + throw new InvalidConfigurationException(key + " must be less than or equal to " + maxConfigName); + } - validateNames(combinedConfigs); - validateValues(combinedConfigs, groupCoordinatorConfig, shareGroupConfig); + /** + * Validates that an integer config value is at least min. + * No-op when the key is absent from the parsed map. + */ + private static void validateIntMin( + Map parsed, + String key, + int min, + String minConfigName + ) { + if (!parsed.containsKey(key)) return; + int value = (Integer) parsed.get(key); + if (value < min) + throw new InvalidConfigurationException(key + " must be greater than or equal to " + minConfigName); + } + + /** + * Validates that the session timeout is greater than the heartbeat interval. + * Uses broker defaults for any config not present in the parsed map. + */ + private static void validateSessionExceedsHeartbeat( + Map parsed, + String sessionKey, + int defaultSession, + String heartbeatKey, + int defaultHeartbeat + ) { + if (parsed.containsKey(sessionKey) || parsed.containsKey(heartbeatKey)) { + int effectiveSession = parsed.containsKey(sessionKey) + ? (Integer) parsed.get(sessionKey) : defaultSession; + int effectiveHeartbeat = parsed.containsKey(heartbeatKey) + ? (Integer) parsed.get(heartbeatKey) : defaultHeartbeat; + if (effectiveSession <= effectiveHeartbeat) + throw new InvalidConfigurationException(sessionKey + " must be greater than " + heartbeatKey); + } } /** @@ -575,7 +628,12 @@ public static Properties evaluate( ) { Properties effective = new Properties(); effective.putAll(props); - evaluateValues(effective, groupId, groupCoordinatorConfig, shareGroupConfig); + evaluateValues( + effective, + groupId, + groupCoordinatorConfig, + shareGroupConfig + ); return effective; } @@ -585,62 +643,133 @@ private static void evaluateValues( GroupCoordinatorConfig groupCoordinatorConfig, ShareGroupConfig shareGroupConfig ) { - // Consumer group configs - clampToRange(props, groupId, CONSUMER_SESSION_TIMEOUT_MS_CONFIG, + // Consumer group configs. + clampToRange( + props, + groupId, + CONSUMER_SESSION_TIMEOUT_MS_CONFIG, groupCoordinatorConfig.consumerGroupMinSessionTimeoutMs(), - groupCoordinatorConfig.consumerGroupMaxSessionTimeoutMs()); - clampToRange(props, groupId, CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, + groupCoordinatorConfig.consumerGroupMaxSessionTimeoutMs() + ); + clampToRange( + props, + groupId, + CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, groupCoordinatorConfig.consumerGroupMinHeartbeatIntervalMs(), - groupCoordinatorConfig.consumerGroupMaxHeartbeatIntervalMs()); - clampToRange(props, groupId, CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG, + groupCoordinatorConfig.consumerGroupMaxHeartbeatIntervalMs() + ); + clampToRange( + props, + groupId, + CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG, groupCoordinatorConfig.consumerGroupMinAssignmentIntervalMs(), - groupCoordinatorConfig.consumerGroupMaxAssignmentIntervalMs()); - - // Share group configs - clampToRange(props, groupId, SHARE_SESSION_TIMEOUT_MS_CONFIG, + groupCoordinatorConfig.consumerGroupMaxAssignmentIntervalMs() + ); + + // Share group configs. + clampToRange( + props, + groupId, + SHARE_SESSION_TIMEOUT_MS_CONFIG, groupCoordinatorConfig.shareGroupMinSessionTimeoutMs(), - groupCoordinatorConfig.shareGroupMaxSessionTimeoutMs()); - clampToRange(props, groupId, SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, + groupCoordinatorConfig.shareGroupMaxSessionTimeoutMs() + ); + clampToRange( + props, + groupId, + SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, groupCoordinatorConfig.shareGroupMinHeartbeatIntervalMs(), - groupCoordinatorConfig.shareGroupMaxHeartbeatIntervalMs()); - clampToRange(props, groupId, SHARE_RECORD_LOCK_DURATION_MS_CONFIG, + groupCoordinatorConfig.shareGroupMaxHeartbeatIntervalMs() + ); + clampToRange( + props, + groupId, + SHARE_RECORD_LOCK_DURATION_MS_CONFIG, shareGroupConfig.shareGroupMinRecordLockDurationMs(), - shareGroupConfig.shareGroupMaxRecordLockDurationMs()); - clampToRange(props, groupId, SHARE_DELIVERY_COUNT_LIMIT_CONFIG, + shareGroupConfig.shareGroupMaxRecordLockDurationMs() + ); + clampToRange( + props, + groupId, + SHARE_DELIVERY_COUNT_LIMIT_CONFIG, shareGroupConfig.shareGroupMinDeliveryCountLimit(), - shareGroupConfig.shareGroupMaxDeliveryCountLimit()); - clampToRange(props, groupId, SHARE_PARTITION_MAX_RECORD_LOCKS_CONFIG, + shareGroupConfig.shareGroupMaxDeliveryCountLimit() + ); + clampToRange( + props, + groupId, + SHARE_PARTITION_MAX_RECORD_LOCKS_CONFIG, shareGroupConfig.shareGroupMinPartitionMaxRecordLocks(), - shareGroupConfig.shareGroupMaxPartitionMaxRecordLocks()); - clampToRange(props, groupId, SHARE_ASSIGNMENT_INTERVAL_MS_CONFIG, + shareGroupConfig.shareGroupMaxPartitionMaxRecordLocks() + ); + clampToRange( + props, + groupId, + SHARE_ASSIGNMENT_INTERVAL_MS_CONFIG, groupCoordinatorConfig.shareGroupMinAssignmentIntervalMs(), - groupCoordinatorConfig.shareGroupMaxAssignmentIntervalMs()); - - // Streams group configs - clampToRange(props, groupId, STREAMS_SESSION_TIMEOUT_MS_CONFIG, + groupCoordinatorConfig.shareGroupMaxAssignmentIntervalMs() + ); + + // Streams group configs. + clampToRange( + props, + groupId, + STREAMS_SESSION_TIMEOUT_MS_CONFIG, groupCoordinatorConfig.streamsGroupMinSessionTimeoutMs(), - groupCoordinatorConfig.streamsGroupMaxSessionTimeoutMs()); - clampToRange(props, groupId, STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG, + groupCoordinatorConfig.streamsGroupMaxSessionTimeoutMs() + ); + clampToRange( + props, + groupId, + STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG, groupCoordinatorConfig.streamsGroupMinHeartbeatIntervalMs(), - groupCoordinatorConfig.streamsGroupMaxHeartbeatIntervalMs()); - clampToMax(props, groupId, STREAMS_NUM_STANDBY_REPLICAS_CONFIG, - groupCoordinatorConfig.streamsGroupMaxNumStandbyReplicas()); - clampToRange(props, groupId, STREAMS_ASSIGNMENT_INTERVAL_MS_CONFIG, + groupCoordinatorConfig.streamsGroupMaxHeartbeatIntervalMs() + ); + clampToMax( + props, + groupId, + STREAMS_NUM_STANDBY_REPLICAS_CONFIG, + groupCoordinatorConfig.streamsGroupMaxNumStandbyReplicas() + ); + clampToRange( + props, + groupId, + STREAMS_ASSIGNMENT_INTERVAL_MS_CONFIG, groupCoordinatorConfig.streamsGroupMinAssignmentIntervalMs(), - groupCoordinatorConfig.streamsGroupMaxAssignmentIntervalMs()); - clampToMin(props, groupId, STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG, - groupCoordinatorConfig.streamsGroupMinTaskOffsetIntervalMs()); + groupCoordinatorConfig.streamsGroupMaxAssignmentIntervalMs() + ); + clampToMin( + props, + groupId, + STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG, + groupCoordinatorConfig.streamsGroupMinTaskOffsetIntervalMs() + ); // Verify that clamping did not break the session > heartbeat invariant. - checkSessionExceedsHeartbeat(props, groupId, - CONSUMER_SESSION_TIMEOUT_MS_CONFIG, groupCoordinatorConfig.consumerGroupSessionTimeoutMs(), - CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, groupCoordinatorConfig.consumerGroupHeartbeatIntervalMs()); - checkSessionExceedsHeartbeat(props, groupId, - SHARE_SESSION_TIMEOUT_MS_CONFIG, groupCoordinatorConfig.shareGroupSessionTimeoutMs(), - SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, groupCoordinatorConfig.shareGroupHeartbeatIntervalMs()); - checkSessionExceedsHeartbeat(props, groupId, - STREAMS_SESSION_TIMEOUT_MS_CONFIG, groupCoordinatorConfig.streamsGroupSessionTimeoutMs(), - STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG, groupCoordinatorConfig.streamsGroupHeartbeatIntervalMs()); + checkSessionExceedsHeartbeat( + props, + groupId, + CONSUMER_SESSION_TIMEOUT_MS_CONFIG, + groupCoordinatorConfig.consumerGroupSessionTimeoutMs(), + CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, + groupCoordinatorConfig.consumerGroupHeartbeatIntervalMs() + ); + checkSessionExceedsHeartbeat( + props, + groupId, + SHARE_SESSION_TIMEOUT_MS_CONFIG, + groupCoordinatorConfig.shareGroupSessionTimeoutMs(), + SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, + groupCoordinatorConfig.shareGroupHeartbeatIntervalMs() + ); + checkSessionExceedsHeartbeat( + props, + groupId, + STREAMS_SESSION_TIMEOUT_MS_CONFIG, + groupCoordinatorConfig.streamsGroupSessionTimeoutMs(), + STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG, + groupCoordinatorConfig.streamsGroupHeartbeatIntervalMs() + ); } /** @@ -662,7 +791,7 @@ private static void checkSessionExceedsHeartbeat( int session = rawSession != null ? Integer.parseInt(rawSession.toString()) : defaultSession; int heartbeat = rawHeartbeat != null ? Integer.parseInt(rawHeartbeat.toString()) : defaultHeartbeat; if (session <= heartbeat) { - log.warn("The effective {} ({}) for group '{}' is not greater than {} ({}). " + LOG.warn("The effective {} ({}) for group '{}' is not greater than {} ({}). " + "Check that the broker-level min/max bounds for session timeout " + "and heartbeat interval do not overlap.", sessionKey, session, groupId, heartbeatKey, heartbeat); @@ -691,12 +820,12 @@ private static void clampToRange( int value = Integer.parseInt(rawValue.toString()); if (value < min) { - log.warn("The group config '{}' for group '{}' has value {} which is below the broker's " + + LOG.warn("The group config '{}' for group '{}' has value {} which is below the broker's " + "allowed minimum {}. The effective value will be capped to {}.", key, groupId, value, min, min); props.put(key, min); } else if (value > max) { - log.warn("The group config '{}' for group '{}' has value {} which exceeds the broker's " + + LOG.warn("The group config '{}' for group '{}' has value {} which exceeds the broker's " + "allowed maximum {}. The effective value will be capped to {}.", key, groupId, value, max, max); props.put(key, max); @@ -723,7 +852,7 @@ private static void clampToMax( int value = Integer.parseInt(rawValue.toString()); if (value > max) { - log.warn("The group config '{}' for group '{}' has value {} which exceeds the broker's " + + LOG.warn("The group config '{}' for group '{}' has value {} which exceeds the broker's " + "allowed maximum {}. The effective value will be capped to {}.", key, groupId, value, max, max); props.put(key, max); @@ -750,7 +879,7 @@ private static void clampToMin( int value = Integer.parseInt(rawValue.toString()); if (value < min) { - log.warn("The group config '{}' for group '{}' has value {} which is below the broker's " + + LOG.warn("The group config '{}' for group '{}' has value {} which is below the broker's " + "allowed minimum {}. The effective value will be capped to {}.", key, groupId, value, min, min); props.put(key, min); @@ -760,7 +889,10 @@ private static void clampToMin( /** * Create a group config instance using the given properties and defaults. */ - public static GroupConfig fromProps(Map defaults, Properties overrides) { + public static GroupConfig fromProps( + Map defaults, + Properties overrides + ) { Properties props = new Properties(); props.putAll(defaults); props.putAll(overrides); @@ -784,14 +916,14 @@ public static IsolationLevel defaultShareIsolationLevel() { /** * The consumer group session timeout in milliseconds. */ - public int consumerSessionTimeoutMs() { + public Optional consumerSessionTimeoutMs() { return consumerSessionTimeoutMs; } /** * The consumer group heartbeat interval in milliseconds. */ - public int consumerHeartbeatIntervalMs() { + public Optional consumerHeartbeatIntervalMs() { return consumerHeartbeatIntervalMs; } @@ -812,43 +944,43 @@ public Optional consumerAssignorOffloadEnable() { /** * The share group session timeout in milliseconds. */ - public int shareSessionTimeoutMs() { + public Optional shareSessionTimeoutMs() { return shareSessionTimeoutMs; } /** * The share group heartbeat interval in milliseconds. */ - public int shareHeartbeatIntervalMs() { + public Optional shareHeartbeatIntervalMs() { return shareHeartbeatIntervalMs; } /** * The share group delivery count limit. */ - public int shareDeliveryCountLimit() { + public Optional shareDeliveryCountLimit() { return shareDeliveryCountLimit; } /** * The share group partition max record locks. */ - public int sharePartitionMaxRecordLocks() { + public Optional sharePartitionMaxRecordLocks() { return sharePartitionMaxRecordLocks; } /** * The share group record lock duration milliseconds. */ - public int shareRecordLockDurationMs() { + public Optional shareRecordLockDurationMs() { return shareRecordLockDurationMs; } /** * The share group auto offset reset strategy. */ - public ShareGroupAutoOffsetResetStrategy shareAutoOffsetReset() { - return ShareGroupAutoOffsetResetStrategy.fromString(shareAutoOffsetReset); + public Optional shareAutoOffsetReset() { + return shareAutoOffsetReset; } /** @@ -868,28 +1000,28 @@ public Optional shareAssignorOffloadEnable() { /** * The streams group session timeout in milliseconds. */ - public int streamsSessionTimeoutMs() { + public Optional streamsSessionTimeoutMs() { return streamsSessionTimeoutMs; } /** * The streams group heartbeat interval in milliseconds. */ - public int streamsHeartbeatIntervalMs() { + public Optional streamsHeartbeatIntervalMs() { return streamsHeartbeatIntervalMs; } /** * The number of streams standby replicas for each task. */ - public int streamsNumStandbyReplicas() { + public Optional streamsNumStandbyReplicas() { return streamsNumStandbyReplicas; } /** * The initial rebalance delay for streams groups. */ - public int streamsInitialRebalanceDelayMs() { + public Optional streamsInitialRebalanceDelayMs() { return streamsInitialRebalanceDelayMs; } @@ -910,28 +1042,21 @@ public Optional streamsAssignorOffloadEnable() { /** * The task offset reporting interval. */ - public int streamsTaskOffsetIntervalMs() { + public Optional streamsTaskOffsetIntervalMs() { return streamsTaskOffsetIntervalMs; } /** * The share group isolation level. */ - public IsolationLevel shareIsolationLevel() { - if (shareIsolationLevel == null) { - throw new IllegalArgumentException("Share isolation level is null"); - } - try { - return IsolationLevel.valueOf(shareIsolationLevel.toUpperCase(Locale.ROOT)); - } catch (IllegalArgumentException e) { - throw new IllegalArgumentException("Unknown Share isolation level: " + shareIsolationLevel); - } + public Optional shareIsolationLevel() { + return shareIsolationLevel; } /** * The share group renew acknowledge enable. */ - public boolean shareRenewAcknowledgeEnable() { + public Optional shareRenewAcknowledgeEnable() { return shareRenewAcknowledgeEnable; } diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfigManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfigManager.java index 9374d768107df..310d079205c35 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfigManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfigManager.java @@ -32,8 +32,6 @@ */ public class GroupConfigManager implements AutoCloseable { - private final GroupConfig defaultConfig; - private final Map configMap; private final GroupCoordinatorConfig groupCoordinatorConfig; @@ -41,12 +39,10 @@ public class GroupConfigManager implements AutoCloseable { private final ShareGroupConfig shareGroupConfig; public GroupConfigManager( - Map defaultConfig, GroupCoordinatorConfig groupCoordinatorConfig, ShareGroupConfig shareGroupConfig ) { this.configMap = new ConcurrentHashMap<>(); - this.defaultConfig = new GroupConfig(defaultConfig); this.groupCoordinatorConfig = Objects.requireNonNull(groupCoordinatorConfig); this.shareGroupConfig = Objects.requireNonNull(shareGroupConfig); } @@ -75,10 +71,7 @@ public void updateGroupConfig(String groupId, Properties newGroupConfig) { Properties evaluatedProps = GroupConfig.evaluate( newGroupConfig, groupId, groupCoordinatorConfig, shareGroupConfig); - final GroupConfig newConfig = GroupConfig.fromProps( - defaultConfig.originals(), - evaluatedProps - ); + final GroupConfig newConfig = new GroupConfig(evaluatedProps); configMap.put(groupId, newConfig); } diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java index 1da3ca214125c..b5303be53b1d3 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java @@ -28,14 +28,11 @@ import org.apache.kafka.coordinator.group.assignor.RangeAssignor; import org.apache.kafka.coordinator.group.assignor.SimpleAssignor; import org.apache.kafka.coordinator.group.assignor.UniformAssignor; -import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; @@ -853,54 +850,6 @@ protected List shareGroupAssignors( return assignors; } - /** - * Copy the subset of properties that are relevant to consumer group, share group and streams group. - */ - public Map extractGroupConfigMap(ShareGroupConfig shareGroupConfig) { - Map defaultConfigs = new HashMap<>(); - defaultConfigs.putAll(extractConsumerGroupConfigMap()); - defaultConfigs.putAll(extractShareGroupConfigMap(shareGroupConfig)); - defaultConfigs.putAll(extractStreamsGroupConfigMap()); - return Collections.unmodifiableMap(defaultConfigs); - } - - /** - * Copy the subset of properties that are relevant to consumer group. - */ - public Map extractConsumerGroupConfigMap() { - return Map.of( - GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG, consumerGroupSessionTimeoutMs(), - GroupConfig.CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, consumerGroupHeartbeatIntervalMs()); - } - - /** - * Copy the subset of properties that are relevant to share group. These configs include those which can be set - * statically (for all groups) or dynamically (for a specific group). In those cases, the default value for the - * group specific dynamic config (Ex. share.session.timeout.ms) should be the value set for the static config - * (Ex. group.share.session.timeout.ms). - */ - public Map extractShareGroupConfigMap(ShareGroupConfig shareGroupConfig) { - return Map.of( - GroupConfig.SHARE_SESSION_TIMEOUT_MS_CONFIG, this.shareGroupSessionTimeoutMs(), - GroupConfig.SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, this.shareGroupHeartbeatIntervalMs(), - GroupConfig.SHARE_RECORD_LOCK_DURATION_MS_CONFIG, shareGroupConfig.shareGroupRecordLockDurationMs(), - GroupConfig.SHARE_DELIVERY_COUNT_LIMIT_CONFIG, shareGroupConfig.shareGroupDeliveryCountLimit(), - GroupConfig.SHARE_PARTITION_MAX_RECORD_LOCKS_CONFIG, shareGroupConfig.shareGroupPartitionMaxRecordLocks() - ); - } - - /** - * Copy the subset of properties that are relevant to streams group. - */ - public Map extractStreamsGroupConfigMap() { - return Map.of( - GroupConfig.STREAMS_SESSION_TIMEOUT_MS_CONFIG, streamsGroupSessionTimeoutMs(), - GroupConfig.STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG, streamsGroupHeartbeatIntervalMs(), - GroupConfig.STREAMS_NUM_STANDBY_REPLICAS_CONFIG, streamsGroupNumStandbyReplicas(), - GroupConfig.STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFIG, streamsGroupInitialRebalanceDelayMs(), - GroupConfig.STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG, streamsGroupTaskOffsetIntervalMs()); - } - /** * The number of threads or event loops running. */ diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java index 837caddcb0362..81a6e1b0c4ab5 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java @@ -8758,7 +8758,7 @@ Map topicHashCache() { */ private int consumerGroupSessionTimeoutMs(String groupId) { Optional groupConfig = groupConfigManager.groupConfig(groupId); - return groupConfig.map(GroupConfig::consumerSessionTimeoutMs) + return groupConfig.flatMap(GroupConfig::consumerSessionTimeoutMs) .orElse(config.consumerGroupSessionTimeoutMs()); } @@ -8767,7 +8767,7 @@ private int consumerGroupSessionTimeoutMs(String groupId) { */ private int consumerGroupHeartbeatIntervalMs(String groupId) { Optional groupConfig = groupConfigManager.groupConfig(groupId); - return groupConfig.map(GroupConfig::consumerHeartbeatIntervalMs) + return groupConfig.flatMap(GroupConfig::consumerHeartbeatIntervalMs) .orElse(config.consumerGroupHeartbeatIntervalMs()); } @@ -8796,7 +8796,7 @@ boolean consumerGroupAssignorOffloadEnable(String groupId) { */ private int shareGroupSessionTimeoutMs(String groupId) { Optional groupConfig = groupConfigManager.groupConfig(groupId); - return groupConfig.map(GroupConfig::shareSessionTimeoutMs) + return groupConfig.flatMap(GroupConfig::shareSessionTimeoutMs) .orElse(config.shareGroupSessionTimeoutMs()); } @@ -8805,7 +8805,7 @@ private int shareGroupSessionTimeoutMs(String groupId) { */ private int shareGroupHeartbeatIntervalMs(String groupId) { Optional groupConfig = groupConfigManager.groupConfig(groupId); - return groupConfig.map(GroupConfig::shareHeartbeatIntervalMs) + return groupConfig.flatMap(GroupConfig::shareHeartbeatIntervalMs) .orElse(config.shareGroupHeartbeatIntervalMs()); } @@ -8834,7 +8834,7 @@ boolean shareGroupAssignorOffloadEnable(String groupId) { */ private int streamsGroupSessionTimeoutMs(String groupId) { Optional groupConfig = groupConfigManager.groupConfig(groupId); - return groupConfig.map(GroupConfig::streamsSessionTimeoutMs) + return groupConfig.flatMap(GroupConfig::streamsSessionTimeoutMs) .orElse(config.streamsGroupSessionTimeoutMs()); } @@ -8843,7 +8843,7 @@ private int streamsGroupSessionTimeoutMs(String groupId) { */ private int streamsGroupHeartbeatIntervalMs(String groupId) { Optional groupConfig = groupConfigManager.groupConfig(groupId); - return groupConfig.map(GroupConfig::streamsHeartbeatIntervalMs) + return groupConfig.flatMap(GroupConfig::streamsHeartbeatIntervalMs) .orElse(config.streamsGroupHeartbeatIntervalMs()); } @@ -8872,7 +8872,7 @@ boolean streamsGroupAssignorOffloadEnable(String groupId) { */ private int streamsGroupTaskOffsetIntervalMs(String groupId) { Optional groupConfig = groupConfigManager.groupConfig(groupId); - return groupConfig.map(GroupConfig::streamsTaskOffsetIntervalMs) + return groupConfig.flatMap(GroupConfig::streamsTaskOffsetIntervalMs) .orElse(config.streamsGroupTaskOffsetIntervalMs()); } @@ -8881,7 +8881,7 @@ private int streamsGroupTaskOffsetIntervalMs(String groupId) { */ private int streamsGroupInitialRebalanceDelayMs(String groupId) { Optional groupConfig = groupConfigManager.groupConfig(groupId); - return groupConfig.map(GroupConfig::streamsInitialRebalanceDelayMs) + return groupConfig.flatMap(GroupConfig::streamsInitialRebalanceDelayMs) .orElse(config.streamsGroupInitialRebalanceDelayMs()); } @@ -8897,7 +8897,7 @@ private TaskAssignor streamsGroupAssignor(String groupId) { */ private Map streamsGroupAssignmentConfigs(String groupId) { Optional groupConfig = groupConfigManager.groupConfig(groupId); - final Integer numStandbyReplicas = groupConfig.map(GroupConfig::streamsNumStandbyReplicas) + final Integer numStandbyReplicas = groupConfig.flatMap(GroupConfig::streamsNumStandbyReplicas) .orElse(config.streamsGroupNumStandbyReplicas()); return new TreeMap<>(Map.of( "num.standby.replicas", numStandbyReplicas.toString() diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfigProvider.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfigProvider.java index 447750fb611cf..868236f9c632c 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfigProvider.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfigProvider.java @@ -41,7 +41,7 @@ public ShareGroupConfigProvider(GroupConfigManager manager) { */ public int recordLockDurationMsOrDefault(String groupId, int defaultValue) { return manager.groupConfig(groupId) - .map(GroupConfig::shareRecordLockDurationMs) + .flatMap(GroupConfig::shareRecordLockDurationMs) .orElse(defaultValue); } @@ -55,7 +55,7 @@ public int recordLockDurationMsOrDefault(String groupId, int defaultValue) { */ public int deliveryCountLimitOrDefault(String groupId, int defaultValue) { return manager.groupConfig(groupId) - .map(GroupConfig::shareDeliveryCountLimit) + .flatMap(GroupConfig::shareDeliveryCountLimit) .orElse(defaultValue); } @@ -69,7 +69,7 @@ public int deliveryCountLimitOrDefault(String groupId, int defaultValue) { */ public int partitionMaxRecordLocksOrDefault(String groupId, int defaultValue) { return manager.groupConfig(groupId) - .map(GroupConfig::sharePartitionMaxRecordLocks) + .flatMap(GroupConfig::sharePartitionMaxRecordLocks) .orElse(defaultValue); } @@ -82,7 +82,7 @@ public int partitionMaxRecordLocksOrDefault(String groupId, int defaultValue) { */ public boolean isRenewAcknowledgeEnabled(String groupId) { return manager.groupConfig(groupId) - .map(GroupConfig::shareRenewAcknowledgeEnable) + .flatMap(GroupConfig::shareRenewAcknowledgeEnable) .orElse(GroupConfig.SHARE_RENEW_ACKNOWLEDGE_ENABLE_DEFAULT); } @@ -95,7 +95,7 @@ public boolean isRenewAcknowledgeEnabled(String groupId) { */ public ShareGroupAutoOffsetResetStrategy autoOffsetReset(String groupId) { return manager.groupConfig(groupId) - .map(GroupConfig::shareAutoOffsetReset) + .flatMap(GroupConfig::shareAutoOffsetReset) .orElseGet(GroupConfig::defaultShareAutoOffsetReset); } } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigManagerTest.java index 901ca0c3c1add..062b7f861b8dd 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigManagerTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigManagerTest.java @@ -138,8 +138,6 @@ public static GroupConfigManager createConfigManager(Map overrid GroupCoordinatorConfig groupCoordinatorConfig = GroupCoordinatorConfig.fromProps(overrides); ShareGroupConfig shareGroupConfig = ShareGroupConfig.fromProps(overrides); - Map defaultConfig = new HashMap<>(groupCoordinatorConfig.extractGroupConfigMap(shareGroupConfig)); - - return new GroupConfigManager(defaultConfig, groupCoordinatorConfig, shareGroupConfig); + return new GroupConfigManager(groupCoordinatorConfig, shareGroupConfig); } } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java index d1468b1eba1dc..96df728aa2cd1 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java @@ -17,6 +17,7 @@ package org.apache.kafka.coordinator.group; +import org.apache.kafka.common.IsolationLevel; import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.errors.InvalidConfigurationException; import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig; @@ -394,18 +395,97 @@ public void testFromPropsWithDefaultValue() { } @Test - public void testAssignmentIntervalMsAbsentWhenNotConfigured() { - // When the assignment interval config is absent, the group-level value is empty. - Properties props = new Properties(); - GroupConfig config = GroupConfig.fromProps(Map.of(), props); + public void testAllFieldsAbsentWhenNotConfigured() { + // When no config is provided, all group-level values are empty. + GroupConfig config = new GroupConfig(Map.of()); + + // Consumer group configs + assertEquals(Optional.empty(), config.consumerSessionTimeoutMs()); + assertEquals(Optional.empty(), config.consumerHeartbeatIntervalMs()); assertEquals(Optional.empty(), config.consumerAssignmentIntervalMs()); + assertEquals(Optional.empty(), config.consumerAssignorOffloadEnable()); + + // Share group configs + assertEquals(Optional.empty(), config.shareSessionTimeoutMs()); + assertEquals(Optional.empty(), config.shareHeartbeatIntervalMs()); + assertEquals(Optional.empty(), config.shareRecordLockDurationMs()); + assertEquals(Optional.empty(), config.shareDeliveryCountLimit()); + assertEquals(Optional.empty(), config.sharePartitionMaxRecordLocks()); + assertEquals(Optional.empty(), config.shareAutoOffsetReset()); assertEquals(Optional.empty(), config.shareAssignmentIntervalMs()); + assertEquals(Optional.empty(), config.shareAssignorOffloadEnable()); + assertEquals(Optional.empty(), config.shareIsolationLevel()); + assertEquals(Optional.empty(), config.shareRenewAcknowledgeEnable()); + + // Streams group configs + assertEquals(Optional.empty(), config.streamsSessionTimeoutMs()); + assertEquals(Optional.empty(), config.streamsHeartbeatIntervalMs()); + assertEquals(Optional.empty(), config.streamsNumStandbyReplicas()); + assertEquals(Optional.empty(), config.streamsInitialRebalanceDelayMs()); assertEquals(Optional.empty(), config.streamsAssignmentIntervalMs()); + assertEquals(Optional.empty(), config.streamsAssignorOffloadEnable()); + assertEquals(Optional.empty(), config.streamsTaskOffsetIntervalMs()); } @Test - public void testAssignmentIntervalMsNotValidatedWhenNotConfigured() { - // When the assignment interval config is absent, validation should not use the default assignment interval. + public void testAllFieldsPresentWhenConfigured() { + // When all configs are provided, all group-level values are present. + Map props = new HashMap<>(); + props.put(GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG, "50000"); + props.put(GroupConfig.CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, "6000"); + props.put(GroupConfig.CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG, "5000"); + props.put(GroupConfig.CONSUMER_ASSIGNOR_OFFLOAD_ENABLE_CONFIG, "true"); + props.put(GroupConfig.SHARE_SESSION_TIMEOUT_MS_CONFIG, "50000"); + props.put(GroupConfig.SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, "6000"); + props.put(GroupConfig.SHARE_RECORD_LOCK_DURATION_MS_CONFIG, "20000"); + props.put(GroupConfig.SHARE_DELIVERY_COUNT_LIMIT_CONFIG, "5"); + props.put(GroupConfig.SHARE_PARTITION_MAX_RECORD_LOCKS_CONFIG, "1000"); + props.put(GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG, "earliest"); + props.put(GroupConfig.SHARE_ASSIGNMENT_INTERVAL_MS_CONFIG, "2500"); + props.put(GroupConfig.SHARE_ASSIGNOR_OFFLOAD_ENABLE_CONFIG, "true"); + props.put(GroupConfig.SHARE_ISOLATION_LEVEL_CONFIG, "read_committed"); + props.put(GroupConfig.SHARE_RENEW_ACKNOWLEDGE_ENABLE_CONFIG, "false"); + props.put(GroupConfig.STREAMS_SESSION_TIMEOUT_MS_CONFIG, "50000"); + props.put(GroupConfig.STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG, "6000"); + props.put(GroupConfig.STREAMS_NUM_STANDBY_REPLICAS_CONFIG, "2"); + props.put(GroupConfig.STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFIG, "3000"); + props.put(GroupConfig.STREAMS_ASSIGNMENT_INTERVAL_MS_CONFIG, "1250"); + props.put(GroupConfig.STREAMS_ASSIGNOR_OFFLOAD_ENABLE_CONFIG, "false"); + props.put(GroupConfig.STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG, "30000"); + + GroupConfig config = new GroupConfig(props); + + // Consumer group configs + assertEquals(Optional.of(50000), config.consumerSessionTimeoutMs()); + assertEquals(Optional.of(6000), config.consumerHeartbeatIntervalMs()); + assertEquals(Optional.of(5000), config.consumerAssignmentIntervalMs()); + assertEquals(Optional.of(true), config.consumerAssignorOffloadEnable()); + + // Share group configs + assertEquals(Optional.of(50000), config.shareSessionTimeoutMs()); + assertEquals(Optional.of(6000), config.shareHeartbeatIntervalMs()); + assertEquals(Optional.of(20000), config.shareRecordLockDurationMs()); + assertEquals(Optional.of(5), config.shareDeliveryCountLimit()); + assertEquals(Optional.of(1000), config.sharePartitionMaxRecordLocks()); + assertEquals(Optional.of(ShareGroupAutoOffsetResetStrategy.EARLIEST), config.shareAutoOffsetReset()); + assertEquals(Optional.of(2500), config.shareAssignmentIntervalMs()); + assertEquals(Optional.of(true), config.shareAssignorOffloadEnable()); + assertEquals(Optional.of(IsolationLevel.READ_COMMITTED), config.shareIsolationLevel()); + assertEquals(Optional.of(false), config.shareRenewAcknowledgeEnable()); + + // Streams group configs + assertEquals(Optional.of(50000), config.streamsSessionTimeoutMs()); + assertEquals(Optional.of(6000), config.streamsHeartbeatIntervalMs()); + assertEquals(Optional.of(2), config.streamsNumStandbyReplicas()); + assertEquals(Optional.of(3000), config.streamsInitialRebalanceDelayMs()); + assertEquals(Optional.of(1250), config.streamsAssignmentIntervalMs()); + assertEquals(Optional.of(false), config.streamsAssignorOffloadEnable()); + assertEquals(Optional.of(30000), config.streamsTaskOffsetIntervalMs()); + } + + @Test + public void testNotValidatedWhenNotConfigured() { + // When configs are absent, validation should not use their default values. GroupCoordinatorConfig groupCoordinatorConfig = GroupCoordinatorConfig.fromProps(Map.of( GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, 2000, GroupCoordinatorConfig.SHARE_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, 2000, @@ -417,16 +497,6 @@ public void testAssignmentIntervalMsNotValidatedWhenNotConfigured() { assertDoesNotThrow(() -> GroupConfig.validate(Map.of(), groupCoordinatorConfig, createShareGroupConfig())); } - @Test - public void testAssignorOffloadEnableAbsentWhenNotConfigured() { - // When the offload enable config is absent, the group-level value is empty. - Properties props = new Properties(); - GroupConfig config = GroupConfig.fromProps(Map.of(), props); - assertEquals(Optional.empty(), config.consumerAssignorOffloadEnable()); - assertEquals(Optional.empty(), config.shareAssignorOffloadEnable()); - assertEquals(Optional.empty(), config.streamsAssignorOffloadEnable()); - } - @Test public void testInvalidConfigName() { Map props = new HashMap<>(); diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java index ab1807a3cbc21..a4b8141ed5c5e 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java @@ -21252,7 +21252,6 @@ void put(String key, Object value) { GroupCoordinatorConfig groupCoordinatorConfig = new GroupCoordinatorConfig(kafkaConfig); ShareGroupConfig shareGroupConfig = new ShareGroupConfig(kafkaConfig); GroupConfigManager groupConfigManager = new GroupConfigManager( - groupCoordinatorConfig.extractGroupConfigMap(shareGroupConfig), groupCoordinatorConfig, shareGroupConfig ); diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfigProviderTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfigProviderTest.java index 1eec4abc0f35b..a426b51554b8a 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfigProviderTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfigProviderTest.java @@ -37,7 +37,7 @@ public class ShareGroupConfigProviderTest { void testRecordLockDurationMsOrDefaultWithGroupConfig() { GroupConfigManager groupConfigManager = mock(GroupConfigManager.class); GroupConfig groupConfig = mock(GroupConfig.class); - when(groupConfig.shareRecordLockDurationMs()).thenReturn(1000); + when(groupConfig.shareRecordLockDurationMs()).thenReturn(Optional.of(1000)); when(groupConfigManager.groupConfig("test-group")).thenReturn(Optional.of(groupConfig)); provider = new ShareGroupConfigProvider(groupConfigManager); @@ -57,7 +57,7 @@ void testRecordLockDurationMsOrDefaultWithoutGroupConfig() { void testDeliveryCountLimitOrDefaultWithGroupConfig() { GroupConfigManager groupConfigManager = mock(GroupConfigManager.class); GroupConfig groupConfig = mock(GroupConfig.class); - when(groupConfig.shareDeliveryCountLimit()).thenReturn(8); + when(groupConfig.shareDeliveryCountLimit()).thenReturn(Optional.of(8)); when(groupConfigManager.groupConfig("test-group")).thenReturn(Optional.of(groupConfig)); provider = new ShareGroupConfigProvider(groupConfigManager); @@ -77,7 +77,7 @@ void testDeliveryCountLimitOrDefaultWithoutGroupConfig() { void testPartitionMaxRecordLocksOrDefaultWithGroupConfig() { GroupConfigManager groupConfigManager = mock(GroupConfigManager.class); GroupConfig groupConfig = mock(GroupConfig.class); - when(groupConfig.sharePartitionMaxRecordLocks()).thenReturn(5000); + when(groupConfig.sharePartitionMaxRecordLocks()).thenReturn(Optional.of(5000)); when(groupConfigManager.groupConfig("test-group")).thenReturn(Optional.of(groupConfig)); provider = new ShareGroupConfigProvider(groupConfigManager); @@ -97,7 +97,7 @@ void testPartitionMaxRecordLocksOrDefaultWithoutGroupConfig() { void testIsRenewAcknowledgeDisabledWithGroupConfig() { GroupConfigManager groupConfigManager = mock(GroupConfigManager.class); GroupConfig groupConfig = mock(GroupConfig.class); - when(groupConfig.shareRenewAcknowledgeEnable()).thenReturn(false); + when(groupConfig.shareRenewAcknowledgeEnable()).thenReturn(Optional.of(false)); when(groupConfigManager.groupConfig("test-group")).thenReturn(Optional.of(groupConfig)); provider = new ShareGroupConfigProvider(groupConfigManager); @@ -117,7 +117,7 @@ void testIsRenewAcknowledgeEnabledWithoutGroupConfig() { void testAutoOffsetResetWithGroupConfig() { GroupConfigManager groupConfigManager = mock(GroupConfigManager.class); GroupConfig groupConfig = mock(GroupConfig.class); - when(groupConfig.shareAutoOffsetReset()).thenReturn(ShareGroupAutoOffsetResetStrategy.EARLIEST); + when(groupConfig.shareAutoOffsetReset()).thenReturn(Optional.of(ShareGroupAutoOffsetResetStrategy.EARLIEST)); when(groupConfigManager.groupConfig("test-group")).thenReturn(Optional.of(groupConfig)); provider = new ShareGroupConfigProvider(groupConfigManager); diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/coordinator/GroupCoordinatorShardLoadingBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/coordinator/GroupCoordinatorShardLoadingBenchmark.java index f2f3b835c50db..31798621f95b6 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/coordinator/GroupCoordinatorShardLoadingBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/coordinator/GroupCoordinatorShardLoadingBenchmark.java @@ -290,7 +290,7 @@ public void setupIteration(BenchmarkParams benchmarkParams, IterationParams iter @Setup(Level.Invocation) public void setupInvocation() { - GroupConfigManager configManager = new GroupConfigManager(new HashMap<>(), config, shareGroupConfig); + GroupConfigManager configManager = new GroupConfigManager(config, shareGroupConfig); LogContext logContext = new LogContext(); SnapshotRegistry snapshotRegistry = new SnapshotRegistry(logContext);