Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/server/BrokerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,7 @@ class BrokerServer(
authorizerPlugin = config.createNewAuthorizer(metrics, ProcessRole.BrokerRole.toString)

/* initializing the groupConfigManager */
groupConfigManager = new GroupConfigManager(config.groupCoordinatorConfig.extractGroupConfigMap(config.shareGroupConfig), config.groupCoordinatorConfig, config.shareGroupConfig)
groupConfigManager = new GroupConfigManager(config.groupCoordinatorConfig, config.shareGroupConfig)

/* create share coordinator */
shareCoordinator = createShareCoordinator()
Expand Down
5 changes: 3 additions & 2 deletions core/src/main/scala/kafka/server/KafkaApis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2828,7 +2828,8 @@ class KafkaApis(val requestChannel: RequestChannel,
} else {
// Compute group-specific timeout for caching errors (2 * heartbeat interval)
val heartbeatIntervalMs = Option(groupConfigManager.groupConfig(streamsGroupHeartbeatRequest.data.groupId).orElse(null))
.map(_.streamsHeartbeatIntervalMs().toLong)
.flatMap(gc => OptionConverters.toScala(gc.streamsHeartbeatIntervalMs()))
.map(_.toLong)
.getOrElse(config.groupCoordinatorConfig.streamsGroupHeartbeatIntervalMs().toLong)
val timeoutMs = heartbeatIntervalMs * 2

Expand Down Expand Up @@ -3460,7 +3461,7 @@ class KafkaApis(val requestChannel: RequestChannel,
shareFetchRequest.maxWait,
fetchMinBytes,
fetchMaxBytes,
FetchIsolation.of(FetchRequest.CONSUMER_REPLICA_ID, groupConfigManager.groupConfig(groupId).map(_.shareIsolationLevel()).orElse(GroupConfig.defaultShareIsolationLevel)),
FetchIsolation.of(FetchRequest.CONSUMER_REPLICA_ID, groupConfigManager.groupConfig(groupId).flatMap(_.shareIsolationLevel()).orElse(GroupConfig.defaultShareIsolationLevel)),
clientMetadata,
true
)
Expand Down
36 changes: 18 additions & 18 deletions core/src/test/java/kafka/server/share/SharePartitionTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ public void testMaybeInitializeDefaultStartEpochGroupConfigReturnsEarliest() {
GroupConfigManager groupConfigManager = Mockito.mock(GroupConfigManager.class);
GroupConfig groupConfig = Mockito.mock(GroupConfig.class);
Mockito.when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.of(groupConfig));
Mockito.when(groupConfig.shareAutoOffsetReset()).thenReturn(ShareGroupAutoOffsetResetStrategy.EARLIEST);
Mockito.when(groupConfig.shareAutoOffsetReset()).thenReturn(Optional.of(ShareGroupAutoOffsetResetStrategy.EARLIEST));

ReplicaManager replicaManager = Mockito.mock(ReplicaManager.class);

Expand Down Expand Up @@ -270,7 +270,7 @@ public void testMaybeInitializeDefaultStartEpochGroupConfigReturnsLatest() {
GroupConfigManager groupConfigManager = Mockito.mock(GroupConfigManager.class);
GroupConfig groupConfig = Mockito.mock(GroupConfig.class);
Mockito.when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.of(groupConfig));
Mockito.when(groupConfig.shareAutoOffsetReset()).thenReturn(ShareGroupAutoOffsetResetStrategy.LATEST);
Mockito.when(groupConfig.shareAutoOffsetReset()).thenReturn(Optional.of(ShareGroupAutoOffsetResetStrategy.LATEST));

ReplicaManager replicaManager = Mockito.mock(ReplicaManager.class);

Expand Down Expand Up @@ -330,7 +330,7 @@ public void testMaybeInitializeDefaultStartEpochGroupConfigReturnsByDuration()
Mockito.when(resetStrategy.type()).thenReturn(ShareGroupAutoOffsetResetStrategy.StrategyType.BY_DURATION);
Mockito.when(resetStrategy.timestamp()).thenReturn(expectedTimestamp);

Mockito.when(groupConfig.shareAutoOffsetReset()).thenReturn(resetStrategy);
Mockito.when(groupConfig.shareAutoOffsetReset()).thenReturn(Optional.of(resetStrategy));

ReplicaManager replicaManager = Mockito.mock(ReplicaManager.class);

Expand Down Expand Up @@ -480,7 +480,7 @@ public void testMaybeInitializeFetchOffsetForEarliestTimestampThrowsError() {
GroupConfigManager groupConfigManager = Mockito.mock(GroupConfigManager.class);
GroupConfig groupConfig = Mockito.mock(GroupConfig.class);
Mockito.when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.of(groupConfig));
Mockito.when(groupConfig.shareAutoOffsetReset()).thenReturn(ShareGroupAutoOffsetResetStrategy.EARLIEST);
Mockito.when(groupConfig.shareAutoOffsetReset()).thenReturn(Optional.of(ShareGroupAutoOffsetResetStrategy.EARLIEST));

ReplicaManager replicaManager = Mockito.mock(ReplicaManager.class);

Expand Down Expand Up @@ -531,7 +531,7 @@ public void testMaybeInitializeFetchOffsetForByDurationThrowsError() {
// final ShareGroupAutoOffsetResetStrategy resetStrategy = ShareGroupAutoOffsetResetStrategy.fromString("by_duration:PT1H");
final ShareGroupAutoOffsetResetStrategy resetStrategy = Mockito.mock(ShareGroupAutoOffsetResetStrategy.class);
final long expectedTimestamp = MOCK_TIME.milliseconds() - TimeUnit.HOURS.toMillis(1);
Mockito.when(groupConfig.shareAutoOffsetReset()).thenReturn(resetStrategy);
Mockito.when(groupConfig.shareAutoOffsetReset()).thenReturn(Optional.of(resetStrategy));

Mockito.when(resetStrategy.type()).thenReturn(ShareGroupAutoOffsetResetStrategy.StrategyType.BY_DURATION);
Mockito.when(resetStrategy.timestamp()).thenReturn(expectedTimestamp);
Expand Down Expand Up @@ -7151,7 +7151,7 @@ public void testScheduleAcquisitionLockTimeoutValueFromGroupConfig() {
GroupConfig groupConfig = Mockito.mock(GroupConfig.class);
int expectedDurationMs = 500;
Mockito.when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.of(groupConfig));
Mockito.when(groupConfig.shareRecordLockDurationMs()).thenReturn(expectedDurationMs);
Mockito.when(groupConfig.shareRecordLockDurationMs()).thenReturn(Optional.of(expectedDurationMs));

SharePartition sharePartition = SharePartitionBuilder.builder()
.withConfigProvider(new ShareGroupConfigProvider(groupConfigManager)).build();
Expand All @@ -7172,8 +7172,8 @@ public void testScheduleAcquisitionLockTimeoutValueUpdatesSuccessfully() {
Mockito.when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.of(groupConfig));
// First invocation of shareRecordLockDurationMs() returns 500, and the second invocation returns 1000
Mockito.when(groupConfig.shareRecordLockDurationMs())
.thenReturn(expectedDurationMs1)
.thenReturn(expectedDurationMs2);
.thenReturn(Optional.of(expectedDurationMs1))
.thenReturn(Optional.of(expectedDurationMs2));

SharePartition sharePartition = SharePartitionBuilder.builder()
.withConfigProvider(new ShareGroupConfigProvider(groupConfigManager)).build();
Expand Down Expand Up @@ -12324,7 +12324,7 @@ public void mockPersisterReadStateMethod(Persister persister) {
public void testMaxDeliveryCountUsesGroupConfigWhenPresent() {
GroupConfigManager groupConfigManager = Mockito.mock(GroupConfigManager.class);
GroupConfig groupConfig = Mockito.mock(GroupConfig.class);
when(groupConfig.shareDeliveryCountLimit()).thenReturn(8);
when(groupConfig.shareDeliveryCountLimit()).thenReturn(Optional.of(8));
when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.of(groupConfig));

SharePartition sharePartition = SharePartitionBuilder.builder()
Expand Down Expand Up @@ -12376,7 +12376,7 @@ public void testDynamicDeliveryCountDecreaseCausesArchival() {

// Dynamically decrease the limit to 2 via group config BEFORE releasing.
GroupConfig groupConfig = Mockito.mock(GroupConfig.class);
when(groupConfig.shareDeliveryCountLimit()).thenReturn(2);
when(groupConfig.shareDeliveryCountLimit()).thenReturn(Optional.of(2));
when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.of(groupConfig));

// Release: archival check fires because deliveryCount(2) >= maxDeliveryCount(2),
Expand Down Expand Up @@ -12412,7 +12412,7 @@ public void testDynamicDeliveryCountIncreaseAllowsMoreDeliveries() {

// Now increase limit to 10 via group config before the second acquire.
GroupConfig groupConfig = Mockito.mock(GroupConfig.class);
when(groupConfig.shareDeliveryCountLimit()).thenReturn(10);
when(groupConfig.shareDeliveryCountLimit()).thenReturn(Optional.of(10));
when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.of(groupConfig));

// Second acquire: deliveryCount = 2. With old limit (2) this would archive.
Expand All @@ -12432,7 +12432,7 @@ public void testDynamicDeliveryCountIncreaseAllowsMoreDeliveries() {
public void testMaxInFlightRecordsUsesGroupConfigWhenPresent() {
GroupConfigManager groupConfigManager = Mockito.mock(GroupConfigManager.class);
GroupConfig groupConfig = Mockito.mock(GroupConfig.class);
when(groupConfig.sharePartitionMaxRecordLocks()).thenReturn(5000);
when(groupConfig.sharePartitionMaxRecordLocks()).thenReturn(Optional.of(5000));
when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.of(groupConfig));

SharePartition sharePartition = SharePartitionBuilder.builder()
Expand Down Expand Up @@ -12477,7 +12477,7 @@ public void testDynamicPartitionMaxRecordLocksDecrease() {

// Dynamically decrease the limit to 30 via group config.
GroupConfig groupConfig = Mockito.mock(GroupConfig.class);
when(groupConfig.sharePartitionMaxRecordLocks()).thenReturn(30);
when(groupConfig.sharePartitionMaxRecordLocks()).thenReturn(Optional.of(30));
when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.of(groupConfig));

// The effective limit should now be 30.
Expand Down Expand Up @@ -12510,7 +12510,7 @@ public void testDynamicPartitionMaxRecordLocksIncrease() {

// Increase limit to 500 via group config.
GroupConfig groupConfig = Mockito.mock(GroupConfig.class);
when(groupConfig.sharePartitionMaxRecordLocks()).thenReturn(500);
when(groupConfig.sharePartitionMaxRecordLocks()).thenReturn(Optional.of(500));
when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.of(groupConfig));

assertEquals(500, sharePartition.maxInFlightRecords());
Expand Down Expand Up @@ -12540,14 +12540,14 @@ public void testDynamicPartitionMaxRecordLocksExactBoundary() {

// Dynamically set limit to exactly the in-flight count via group config.
GroupConfig groupConfig = Mockito.mock(GroupConfig.class);
when(groupConfig.sharePartitionMaxRecordLocks()).thenReturn(50);
when(groupConfig.sharePartitionMaxRecordLocks()).thenReturn(Optional.of(50));
when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.of(groupConfig));

// Still at boundary: 50 < 50 is false.
assertFalse(sharePartition.canAcquireRecords());

// Increase by 1 to cross the boundary.
when(groupConfig.sharePartitionMaxRecordLocks()).thenReturn(51);
when(groupConfig.sharePartitionMaxRecordLocks()).thenReturn(Optional.of(51));

// Now 50 < 51 is true.
assertTrue(sharePartition.canAcquireRecords());
Expand All @@ -12557,7 +12557,7 @@ public void testDynamicPartitionMaxRecordLocksExactBoundary() {
public void testDynamicPartitionMaxRecordLocksRemoveGroupConfig() {
GroupConfigManager groupConfigManager = Mockito.mock(GroupConfigManager.class);
GroupConfig groupConfig = Mockito.mock(GroupConfig.class);
when(groupConfig.sharePartitionMaxRecordLocks()).thenReturn(500);
when(groupConfig.sharePartitionMaxRecordLocks()).thenReturn(Optional.of(500));
when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.of(groupConfig));

SharePartition sharePartition = SharePartitionBuilder.builder()
Expand Down Expand Up @@ -12591,7 +12591,7 @@ public void testDynamicPartitionMaxRecordLocksDecreaseBelowInFlightAffectsMaxRec

// Decrease limit to 20, well below the 50 in-flight.
GroupConfig groupConfig = Mockito.mock(GroupConfig.class);
when(groupConfig.sharePartitionMaxRecordLocks()).thenReturn(20);
when(groupConfig.sharePartitionMaxRecordLocks()).thenReturn(Optional.of(20));
when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.of(groupConfig));

// maxInFlightRecords - inFlightRecordsCount = 20 - 50 = -30, so maxRecordsToAcquire <= 0.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.apache.kafka.common.quota.ClientQuotaEntity.{CLIENT_ID, IP, USER}
import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity}
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.{TopicPartition, Uuid}
import java.util.Optional
import org.apache.kafka.coordinator.group.{GroupConfig, GroupCoordinatorConfig}
import org.apache.kafka.coordinator.share.ShareCoordinatorConfig
import org.apache.kafka.metadata.MetadataCache
Expand Down Expand Up @@ -419,7 +420,7 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
}

val groupConfig = brokerServers.head.groupCoordinator.groupConfig(consumerGroupId).get()
assertEquals(newSessionTimeoutMs, groupConfig.consumerSessionTimeoutMs())
assertEquals(Optional.of(newSessionTimeoutMs), groupConfig.consumerSessionTimeoutMs())
}

@Test
Expand All @@ -445,7 +446,7 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
}

val groupConfig = brokerServers.head.groupCoordinator.groupConfig(shareGroupId).get()
assertEquals(newRecordLockDurationMs, groupConfig.shareRecordLockDurationMs)
assertEquals(Optional.of(newRecordLockDurationMs), groupConfig.shareRecordLockDurationMs)
}

@Test
Expand Down
Loading
Loading