Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -235,4 +235,10 @@ public class TopicConfig {
@Deprecated
public static final String MESSAGE_DOWNCONVERSION_ENABLE_DOC = "Down-conversion is not possible in Apache Kafka 4.0 and newer, " +
"hence this configuration is no-op and it is deprecated for removal in Apache Kafka 5.0.";

// Dead Letter Queue Configuration (KIP-1191)
public static final String ERRORS_DEADLETTERQUEUE_GROUP_ENABLE_CONFIG = "errors.deadletterqueue.group.enable";
public static final String ERRORS_DEADLETTERQUEUE_GROUP_ENABLE_DOC = "Enable this topic to be used as a dead-letter queue for share groups. " +
"When set to <code>true</code>, share groups can write undeliverable records to this topic. When set to <code>false</code> (the default), " +
"attempts to use this topic as a DLQ will be rejected.";
}
4 changes: 4 additions & 0 deletions core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1071,6 +1071,10 @@ class KafkaConfigTest {
case GroupCoordinatorConfig.SHARE_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", -1)
case GroupCoordinatorConfig.SHARE_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", -1)
case GroupCoordinatorConfig.SHARE_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", -1)
case ShareGroupConfig.ERRORS_DEADLETTERQUEUE_AUTO_CREATE_TOPICS_ENABLE_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_boolean")
case ShareGroupConfig.ERRORS_DEADLETTERQUEUE_TOPIC_NAME_PREFIX_CONFIG => // ignore string
case ShareGroupConfig.ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG => // ignore string
case ShareGroupConfig.ERRORS_DEADLETTERQUEUE_COPY_RECORD_ENABLE_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_boolean")

/** Streams groups configs */
case GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM;
import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
import static org.apache.kafka.common.config.ConfigDef.Range.between;
import static org.apache.kafka.common.config.ConfigDef.Type.BOOLEAN;
import static org.apache.kafka.common.config.ConfigDef.Type.INT;
import static org.apache.kafka.common.config.ConfigDef.Type.STRING;

Expand Down Expand Up @@ -82,6 +83,32 @@ public class ShareGroupConfig {
public static final String SHARE_GROUP_PERSISTER_CLASS_NAME_DOC = "The fully qualified name of a class which implements " +
"the <code>org.apache.kafka.server.share.Persister</code> interface.";

/** Dead Letter Queue Configurations (KIP-1191) **/

// Cluster-level DLQ configs
public static final String ERRORS_DEADLETTERQUEUE_AUTO_CREATE_TOPICS_ENABLE_CONFIG = "errors.deadletterqueue.auto.create.topics.enable";
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All configs starts with the same prefix. Why don’t we follow the same pattern for those?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are intentionally not including the group.share. prefix on these configs to keep them generic enough so that in the future, we can re-use them as and when needed and maintain compatibility still.

public static final boolean ERRORS_DEADLETTERQUEUE_AUTO_CREATE_TOPICS_ENABLE_DEFAULT = false;
public static final String ERRORS_DEADLETTERQUEUE_AUTO_CREATE_TOPICS_ENABLE_DOC = "Enable automatic creation of dead-letter queue topics when a share group " +
"is configured to use a DLQ topic that does not yet exist. When set to <code>true</code>, the broker will automatically create the DLQ topic " +
"with default configurations when records need to be written to it. When set to <code>false</code> (the default), DLQ topics must be created " +
"manually before use.";

public static final String ERRORS_DEADLETTERQUEUE_TOPIC_NAME_PREFIX_CONFIG = "errors.deadletterqueue.topic.name.prefix";
public static final String ERRORS_DEADLETTERQUEUE_TOPIC_NAME_PREFIX_DEFAULT = "dlq.";
public static final String ERRORS_DEADLETTERQUEUE_TOPIC_NAME_PREFIX_DOC = "The required prefix for dead-letter queue topic names when automatic topic creation is enabled.";

// Group-level DLQ configs
public static final String ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG = "errors.deadletterqueue.topic.name";
public static final String ERRORS_DEADLETTERQUEUE_TOPIC_NAME_DEFAULT = "";
public static final String ERRORS_DEADLETTERQUEUE_TOPIC_NAME_DOC = "The name of the topic to be used as the dead-letter queue for records that " +
"cannot be successfully processed by this share group.";

public static final String ERRORS_DEADLETTERQUEUE_COPY_RECORD_ENABLE_CONFIG = "errors.deadletterqueue.copy.record.enable";
public static final boolean ERRORS_DEADLETTERQUEUE_COPY_RECORD_ENABLE_DEFAULT = false;
public static final String ERRORS_DEADLETTERQUEUE_COPY_RECORD_ENABLE_DOC = "Controls whether to copy the full original record to the dead-letter queue " +
"or just write metadata. When set to <code>false</code> (the default), only contextual information is written to the DLQ. When set to <code>true</code>, " +
"the complete original record is copied to the DLQ along with the contextual metadata.";

public static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(SHARE_GROUP_DELIVERY_COUNT_LIMIT_CONFIG, INT, SHARE_GROUP_DELIVERY_COUNT_LIMIT_DEFAULT, between(2, 10), MEDIUM, SHARE_GROUP_DELIVERY_COUNT_LIMIT_DOC)
.define(SHARE_GROUP_MAX_DELIVERY_COUNT_LIMIT_CONFIG, INT, SHARE_GROUP_MAX_DELIVERY_COUNT_LIMIT_DEFAULT, between(5, 25), MEDIUM, SHARE_GROUP_MAX_DELIVERY_COUNT_LIMIT_DOC)
Expand All @@ -94,7 +121,12 @@ public class ShareGroupConfig {
.define(SHARE_GROUP_MIN_PARTITION_MAX_RECORD_LOCKS_CONFIG, INT, SHARE_GROUP_MIN_PARTITION_MAX_RECORD_LOCKS_DEFAULT, between(100, 2000), MEDIUM, SHARE_GROUP_MIN_PARTITION_MAX_RECORD_LOCKS_DOC)
.define(SHARE_FETCH_PURGATORY_PURGE_INTERVAL_REQUESTS_CONFIG, INT, SHARE_FETCH_PURGATORY_PURGE_INTERVAL_REQUESTS_DEFAULT, MEDIUM, SHARE_FETCH_PURGATORY_PURGE_INTERVAL_REQUESTS_DOC)
.define(SHARE_GROUP_MAX_SHARE_SESSIONS_CONFIG, INT, SHARE_GROUP_MAX_SHARE_SESSIONS_DEFAULT, atLeast(1), MEDIUM, SHARE_GROUP_MAX_SHARE_SESSIONS_DOC)
.defineInternal(SHARE_GROUP_PERSISTER_CLASS_NAME_CONFIG, STRING, SHARE_GROUP_PERSISTER_CLASS_NAME_DEFAULT, null, MEDIUM, SHARE_GROUP_PERSISTER_CLASS_NAME_DOC);
.defineInternal(SHARE_GROUP_PERSISTER_CLASS_NAME_CONFIG, STRING, SHARE_GROUP_PERSISTER_CLASS_NAME_DEFAULT, null, MEDIUM, SHARE_GROUP_PERSISTER_CLASS_NAME_DOC)
// DLQ configurations (KIP-1191)
.define(ERRORS_DEADLETTERQUEUE_AUTO_CREATE_TOPICS_ENABLE_CONFIG, BOOLEAN, ERRORS_DEADLETTERQUEUE_AUTO_CREATE_TOPICS_ENABLE_DEFAULT, MEDIUM, ERRORS_DEADLETTERQUEUE_AUTO_CREATE_TOPICS_ENABLE_DOC)
.define(ERRORS_DEADLETTERQUEUE_TOPIC_NAME_PREFIX_CONFIG, STRING, ERRORS_DEADLETTERQUEUE_TOPIC_NAME_PREFIX_DEFAULT, MEDIUM, ERRORS_DEADLETTERQUEUE_TOPIC_NAME_PREFIX_DOC)
.define(ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG, STRING, ERRORS_DEADLETTERQUEUE_TOPIC_NAME_DEFAULT, MEDIUM, ERRORS_DEADLETTERQUEUE_TOPIC_NAME_DOC)
.define(ERRORS_DEADLETTERQUEUE_COPY_RECORD_ENABLE_CONFIG, BOOLEAN, ERRORS_DEADLETTERQUEUE_COPY_RECORD_ENABLE_DEFAULT, MEDIUM, ERRORS_DEADLETTERQUEUE_COPY_RECORD_ENABLE_DOC);

private final int shareGroupPartitionMaxRecordLocks;
private final int shareGroupMaxPartitionMaxRecordLocks;
Expand All @@ -108,6 +140,10 @@ public class ShareGroupConfig {
private final int shareFetchPurgatoryPurgeIntervalRequests;
private final int shareGroupMaxShareSessions;
private final String shareGroupPersisterClassName;
private final boolean errorsDLQAutoCreateTopicsEnable;
private final String errorsDLQTopicNamePrefix;
private final String errorsDLQTopicName;
private final boolean errorsDLQCopyRecordEnable;
private final AbstractConfig config;

public ShareGroupConfig(AbstractConfig config) {
Expand All @@ -124,6 +160,10 @@ public ShareGroupConfig(AbstractConfig config) {
shareFetchPurgatoryPurgeIntervalRequests = config.getInt(ShareGroupConfig.SHARE_FETCH_PURGATORY_PURGE_INTERVAL_REQUESTS_CONFIG);
shareGroupMaxShareSessions = config.getInt(ShareGroupConfig.SHARE_GROUP_MAX_SHARE_SESSIONS_CONFIG);
shareGroupPersisterClassName = config.getString(ShareGroupConfig.SHARE_GROUP_PERSISTER_CLASS_NAME_CONFIG);
errorsDLQAutoCreateTopicsEnable = config.getBoolean(ERRORS_DEADLETTERQUEUE_AUTO_CREATE_TOPICS_ENABLE_CONFIG);
errorsDLQTopicNamePrefix = config.getString(ERRORS_DEADLETTERQUEUE_TOPIC_NAME_PREFIX_CONFIG);
errorsDLQTopicName = config.getString(ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG);
errorsDLQCopyRecordEnable = config.getBoolean(ERRORS_DEADLETTERQUEUE_COPY_RECORD_ENABLE_CONFIG);
validate();
}

Expand Down Expand Up @@ -186,6 +226,22 @@ public String shareGroupPersisterClassName() {
return shareGroupPersisterClassName;
}

public boolean errorsDLQAutoCreateTopicsEnable() {
return errorsDLQAutoCreateTopicsEnable;
}

public String errorsDLQTopicNamePrefix() {
return errorsDLQTopicNamePrefix;
}

public String errorsDLQTopicName() {
return errorsDLQTopicName;
}

public boolean errorsDLQCopyRecordEnable() {
return errorsDLQCopyRecordEnable;
}

private void validate() {
Utils.require(shareGroupMaxDeliveryCountLimit >= shareGroupDeliveryCountLimit,
String.format("%s must be greater than or equal to %s",
Expand All @@ -208,5 +264,23 @@ private void validate() {
Utils.require(shareGroupMaxShareSessions >= config.getInt(GroupCoordinatorConfig.SHARE_GROUP_MAX_SIZE_CONFIG),
String.format("%s must be greater than or equal to %s",
SHARE_GROUP_MAX_SHARE_SESSIONS_CONFIG, GroupCoordinatorConfig.SHARE_GROUP_MAX_SIZE_CONFIG));

// DLQ validation (KIP-1191)
// DLQ topic name must not start with "__" (reserved for internal topics)
if (errorsDLQTopicName != null && !errorsDLQTopicName.isEmpty() && errorsDLQTopicName.startsWith("__")) {
throw new org.apache.kafka.common.config.ConfigException(
ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG,
errorsDLQTopicName,
"DLQ topic name must not start with '__'");
}

// If auto-create is enabled and the topic name is specified, it must start with the configured prefix
if (errorsDLQAutoCreateTopicsEnable && errorsDLQTopicName != null &&
!errorsDLQTopicName.isEmpty() && !errorsDLQTopicName.startsWith(errorsDLQTopicNamePrefix)) {
throw new org.apache.kafka.common.config.ConfigException(
ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG,
errorsDLQTopicName,
String.format("DLQ topic name must start with prefix '%s' when auto-create is enabled", errorsDLQTopicNamePrefix));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@
import java.util.Map;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class ShareGroupConfigTest {

Expand Down Expand Up @@ -133,6 +135,98 @@ public void testInvalidConfigs() {
assertThrows(ConfigException.class, () -> createConfig(configs)).getMessage());
}

@Test
public void testDLQConfigDefaults() {
// Test default DLQ configuration values (KIP-1191)
Map<String, Object> configs = new HashMap<>();
ShareGroupConfig config = createConfig(configs);

assertFalse(config.errorsDLQAutoCreateTopicsEnable());
assertEquals("dlq.", config.errorsDLQTopicNamePrefix());
assertEquals("", config.errorsDLQTopicName());
assertFalse(config.errorsDLQCopyRecordEnable());
}

@Test
public void testDLQConfigCustomValues() {
// Test custom DLQ configuration values
Map<String, Object> configs = new HashMap<>();
configs.put(ShareGroupConfig.ERRORS_DEADLETTERQUEUE_AUTO_CREATE_TOPICS_ENABLE_CONFIG, true);
configs.put(ShareGroupConfig.ERRORS_DEADLETTERQUEUE_TOPIC_NAME_PREFIX_CONFIG, "my-dlq-");
configs.put(ShareGroupConfig.ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG, "my-dlq-topic");
configs.put(ShareGroupConfig.ERRORS_DEADLETTERQUEUE_COPY_RECORD_ENABLE_CONFIG, true);

ShareGroupConfig config = createConfig(configs);

assertTrue(config.errorsDLQAutoCreateTopicsEnable());
assertEquals("my-dlq-", config.errorsDLQTopicNamePrefix());
assertEquals("my-dlq-topic", config.errorsDLQTopicName());
assertTrue(config.errorsDLQCopyRecordEnable());
}

@Test
public void testDLQTopicNameCannotStartWithDoubleUnderscore() {
// DLQ topic name must not start with "__" (reserved for internal topics)
Map<String, Object> configs = new HashMap<>();
configs.put(ShareGroupConfig.ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG, "__my-dlq");

ConfigException exception = assertThrows(ConfigException.class, () -> createConfig(configs));
assertEquals("Invalid value __my-dlq for configuration errors.deadletterqueue.topic.name: DLQ topic name must not start with '__'",
exception.getMessage());
}

@Test
public void testDLQTopicNameMustMatchPrefixWhenAutoCreateEnabled() {
// When auto-create is enabled, DLQ topic name must start with the configured prefix
Map<String, Object> configs = new HashMap<>();
configs.put(ShareGroupConfig.ERRORS_DEADLETTERQUEUE_AUTO_CREATE_TOPICS_ENABLE_CONFIG, true);
configs.put(ShareGroupConfig.ERRORS_DEADLETTERQUEUE_TOPIC_NAME_PREFIX_CONFIG, "dlq.");
configs.put(ShareGroupConfig.ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG, "my-topic"); // Does not start with "dlq."

ConfigException exception = assertThrows(ConfigException.class, () -> createConfig(configs));
assertEquals("Invalid value my-topic for configuration errors.deadletterqueue.topic.name: DLQ topic name must start with prefix 'dlq.' when auto-create is enabled",
exception.getMessage());
}

@Test
public void testDLQTopicNamePrefixNotRequiredWhenAutoCreateDisabled() {
// When auto-create is disabled, DLQ topic name does not need to match prefix
Map<String, Object> configs = new HashMap<>();
configs.put(ShareGroupConfig.ERRORS_DEADLETTERQUEUE_AUTO_CREATE_TOPICS_ENABLE_CONFIG, false);
configs.put(ShareGroupConfig.ERRORS_DEADLETTERQUEUE_TOPIC_NAME_PREFIX_CONFIG, "dlq.");
configs.put(ShareGroupConfig.ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG, "my-custom-topic");

ShareGroupConfig config = createConfig(configs);

assertFalse(config.errorsDLQAutoCreateTopicsEnable());
assertEquals("my-custom-topic", config.errorsDLQTopicName());
}

@Test
public void testDLQBlankTopicNameIsValid() {
// Blank DLQ topic name is valid (means DLQ is disabled for that group)
Map<String, Object> configs = new HashMap<>();
configs.put(ShareGroupConfig.ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG, "");

ShareGroupConfig config = createConfig(configs);

assertEquals("", config.errorsDLQTopicName());
}

@Test
public void testDLQValidTopicNameWithAutoCreate() {
// Valid DLQ topic name with auto-create enabled
Map<String, Object> configs = new HashMap<>();
configs.put(ShareGroupConfig.ERRORS_DEADLETTERQUEUE_AUTO_CREATE_TOPICS_ENABLE_CONFIG, true);
configs.put(ShareGroupConfig.ERRORS_DEADLETTERQUEUE_TOPIC_NAME_PREFIX_CONFIG, "dlq.");
configs.put(ShareGroupConfig.ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG, "dlq.my-share-group");

ShareGroupConfig config = createConfig(configs);

assertTrue(config.errorsDLQAutoCreateTopicsEnable());
assertEquals("dlq.my-share-group", config.errorsDLQTopicName());
}

public static ShareGroupConfig createShareGroupConfig(
int shareGroupPartitionMaxRecordLocks,
int shareGroupMinPartitionMaxRecordLocks,
Expand Down
Loading