diff --git a/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java b/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java
index e97c39bc61911..5e63163f91c85 100755
--- a/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java
@@ -235,4 +235,10 @@ public class TopicConfig {
@Deprecated
public static final String MESSAGE_DOWNCONVERSION_ENABLE_DOC = "Down-conversion is not possible in Apache Kafka 4.0 and newer, " +
"hence this configuration is no-op and it is deprecated for removal in Apache Kafka 5.0.";
+
+ // Dead Letter Queue Configuration (KIP-1191)
+ public static final String ERRORS_DEADLETTERQUEUE_GROUP_ENABLE_CONFIG = "errors.deadletterqueue.group.enable";
+ public static final String ERRORS_DEADLETTERQUEUE_GROUP_ENABLE_DOC = "Enable this topic to be used as a dead-letter queue for share groups. " +
+ "When set to true, share groups can write undeliverable records to this topic. When set to false (the default), " +
+ "attempts to use this topic as a DLQ will be rejected.";
}
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index 90dffba4b4837..1f33a09afce2a 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -1071,6 +1071,10 @@ class KafkaConfigTest {
case GroupCoordinatorConfig.SHARE_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", -1)
case GroupCoordinatorConfig.SHARE_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", -1)
case GroupCoordinatorConfig.SHARE_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", -1)
+ case ShareGroupConfig.ERRORS_DEADLETTERQUEUE_AUTO_CREATE_TOPICS_ENABLE_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_boolean")
+ case ShareGroupConfig.ERRORS_DEADLETTERQUEUE_TOPIC_NAME_PREFIX_CONFIG => // ignore string
+ case ShareGroupConfig.ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG => // ignore string
+ case ShareGroupConfig.ERRORS_DEADLETTERQUEUE_COPY_RECORD_ENABLE_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_boolean")
/** Streams groups configs */
case GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfig.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfig.java
index cc72a8b48e39a..a431d8f28f9be 100644
--- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfig.java
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfig.java
@@ -27,6 +27,7 @@
import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM;
import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
import static org.apache.kafka.common.config.ConfigDef.Range.between;
+import static org.apache.kafka.common.config.ConfigDef.Type.BOOLEAN;
import static org.apache.kafka.common.config.ConfigDef.Type.INT;
import static org.apache.kafka.common.config.ConfigDef.Type.STRING;
@@ -82,6 +83,32 @@ public class ShareGroupConfig {
public static final String SHARE_GROUP_PERSISTER_CLASS_NAME_DOC = "The fully qualified name of a class which implements " +
"the org.apache.kafka.server.share.Persister interface.";
+ /** Dead Letter Queue Configurations (KIP-1191) **/
+
+ // Cluster-level DLQ configs
+ public static final String ERRORS_DEADLETTERQUEUE_AUTO_CREATE_TOPICS_ENABLE_CONFIG = "errors.deadletterqueue.auto.create.topics.enable";
+ public static final boolean ERRORS_DEADLETTERQUEUE_AUTO_CREATE_TOPICS_ENABLE_DEFAULT = false;
+ public static final String ERRORS_DEADLETTERQUEUE_AUTO_CREATE_TOPICS_ENABLE_DOC = "Enable automatic creation of dead-letter queue topics when a share group " +
+ "is configured to use a DLQ topic that does not yet exist. When set to true, the broker will automatically create the DLQ topic " +
+ "with default configurations when records need to be written to it. When set to false (the default), DLQ topics must be created " +
+ "manually before use.";
+
+ public static final String ERRORS_DEADLETTERQUEUE_TOPIC_NAME_PREFIX_CONFIG = "errors.deadletterqueue.topic.name.prefix";
+ public static final String ERRORS_DEADLETTERQUEUE_TOPIC_NAME_PREFIX_DEFAULT = "dlq.";
+ public static final String ERRORS_DEADLETTERQUEUE_TOPIC_NAME_PREFIX_DOC = "The required prefix for dead-letter queue topic names when automatic topic creation is enabled.";
+
+ // Group-level DLQ configs
+ public static final String ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG = "errors.deadletterqueue.topic.name";
+ public static final String ERRORS_DEADLETTERQUEUE_TOPIC_NAME_DEFAULT = "";
+ public static final String ERRORS_DEADLETTERQUEUE_TOPIC_NAME_DOC = "The name of the topic to be used as the dead-letter queue for records that " +
+ "cannot be successfully processed by this share group.";
+
+ public static final String ERRORS_DEADLETTERQUEUE_COPY_RECORD_ENABLE_CONFIG = "errors.deadletterqueue.copy.record.enable";
+ public static final boolean ERRORS_DEADLETTERQUEUE_COPY_RECORD_ENABLE_DEFAULT = false;
+ public static final String ERRORS_DEADLETTERQUEUE_COPY_RECORD_ENABLE_DOC = "Controls whether to copy the full original record to the dead-letter queue " +
+ "or just write metadata. When set to false (the default), only contextual information is written to the DLQ. When set to true, " +
+ "the complete original record is copied to the DLQ along with the contextual metadata.";
+
public static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(SHARE_GROUP_DELIVERY_COUNT_LIMIT_CONFIG, INT, SHARE_GROUP_DELIVERY_COUNT_LIMIT_DEFAULT, between(2, 10), MEDIUM, SHARE_GROUP_DELIVERY_COUNT_LIMIT_DOC)
.define(SHARE_GROUP_MAX_DELIVERY_COUNT_LIMIT_CONFIG, INT, SHARE_GROUP_MAX_DELIVERY_COUNT_LIMIT_DEFAULT, between(5, 25), MEDIUM, SHARE_GROUP_MAX_DELIVERY_COUNT_LIMIT_DOC)
@@ -94,7 +121,12 @@ public class ShareGroupConfig {
.define(SHARE_GROUP_MIN_PARTITION_MAX_RECORD_LOCKS_CONFIG, INT, SHARE_GROUP_MIN_PARTITION_MAX_RECORD_LOCKS_DEFAULT, between(100, 2000), MEDIUM, SHARE_GROUP_MIN_PARTITION_MAX_RECORD_LOCKS_DOC)
.define(SHARE_FETCH_PURGATORY_PURGE_INTERVAL_REQUESTS_CONFIG, INT, SHARE_FETCH_PURGATORY_PURGE_INTERVAL_REQUESTS_DEFAULT, MEDIUM, SHARE_FETCH_PURGATORY_PURGE_INTERVAL_REQUESTS_DOC)
.define(SHARE_GROUP_MAX_SHARE_SESSIONS_CONFIG, INT, SHARE_GROUP_MAX_SHARE_SESSIONS_DEFAULT, atLeast(1), MEDIUM, SHARE_GROUP_MAX_SHARE_SESSIONS_DOC)
- .defineInternal(SHARE_GROUP_PERSISTER_CLASS_NAME_CONFIG, STRING, SHARE_GROUP_PERSISTER_CLASS_NAME_DEFAULT, null, MEDIUM, SHARE_GROUP_PERSISTER_CLASS_NAME_DOC);
+ .defineInternal(SHARE_GROUP_PERSISTER_CLASS_NAME_CONFIG, STRING, SHARE_GROUP_PERSISTER_CLASS_NAME_DEFAULT, null, MEDIUM, SHARE_GROUP_PERSISTER_CLASS_NAME_DOC)
+ // DLQ configurations (KIP-1191)
+ .define(ERRORS_DEADLETTERQUEUE_AUTO_CREATE_TOPICS_ENABLE_CONFIG, BOOLEAN, ERRORS_DEADLETTERQUEUE_AUTO_CREATE_TOPICS_ENABLE_DEFAULT, MEDIUM, ERRORS_DEADLETTERQUEUE_AUTO_CREATE_TOPICS_ENABLE_DOC)
+ .define(ERRORS_DEADLETTERQUEUE_TOPIC_NAME_PREFIX_CONFIG, STRING, ERRORS_DEADLETTERQUEUE_TOPIC_NAME_PREFIX_DEFAULT, MEDIUM, ERRORS_DEADLETTERQUEUE_TOPIC_NAME_PREFIX_DOC)
+ .define(ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG, STRING, ERRORS_DEADLETTERQUEUE_TOPIC_NAME_DEFAULT, MEDIUM, ERRORS_DEADLETTERQUEUE_TOPIC_NAME_DOC)
+ .define(ERRORS_DEADLETTERQUEUE_COPY_RECORD_ENABLE_CONFIG, BOOLEAN, ERRORS_DEADLETTERQUEUE_COPY_RECORD_ENABLE_DEFAULT, MEDIUM, ERRORS_DEADLETTERQUEUE_COPY_RECORD_ENABLE_DOC);
private final int shareGroupPartitionMaxRecordLocks;
private final int shareGroupMaxPartitionMaxRecordLocks;
@@ -108,6 +140,10 @@ public class ShareGroupConfig {
private final int shareFetchPurgatoryPurgeIntervalRequests;
private final int shareGroupMaxShareSessions;
private final String shareGroupPersisterClassName;
+ private final boolean errorsDLQAutoCreateTopicsEnable;
+ private final String errorsDLQTopicNamePrefix;
+ private final String errorsDLQTopicName;
+ private final boolean errorsDLQCopyRecordEnable;
private final AbstractConfig config;
public ShareGroupConfig(AbstractConfig config) {
@@ -124,6 +160,10 @@ public ShareGroupConfig(AbstractConfig config) {
shareFetchPurgatoryPurgeIntervalRequests = config.getInt(ShareGroupConfig.SHARE_FETCH_PURGATORY_PURGE_INTERVAL_REQUESTS_CONFIG);
shareGroupMaxShareSessions = config.getInt(ShareGroupConfig.SHARE_GROUP_MAX_SHARE_SESSIONS_CONFIG);
shareGroupPersisterClassName = config.getString(ShareGroupConfig.SHARE_GROUP_PERSISTER_CLASS_NAME_CONFIG);
+ errorsDLQAutoCreateTopicsEnable = config.getBoolean(ERRORS_DEADLETTERQUEUE_AUTO_CREATE_TOPICS_ENABLE_CONFIG);
+ errorsDLQTopicNamePrefix = config.getString(ERRORS_DEADLETTERQUEUE_TOPIC_NAME_PREFIX_CONFIG);
+ errorsDLQTopicName = config.getString(ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG);
+ errorsDLQCopyRecordEnable = config.getBoolean(ERRORS_DEADLETTERQUEUE_COPY_RECORD_ENABLE_CONFIG);
validate();
}
@@ -186,6 +226,22 @@ public String shareGroupPersisterClassName() {
return shareGroupPersisterClassName;
}
+ public boolean errorsDLQAutoCreateTopicsEnable() {
+ return errorsDLQAutoCreateTopicsEnable;
+ }
+
+ public String errorsDLQTopicNamePrefix() {
+ return errorsDLQTopicNamePrefix;
+ }
+
+ public String errorsDLQTopicName() {
+ return errorsDLQTopicName;
+ }
+
+ public boolean errorsDLQCopyRecordEnable() {
+ return errorsDLQCopyRecordEnable;
+ }
+
private void validate() {
Utils.require(shareGroupMaxDeliveryCountLimit >= shareGroupDeliveryCountLimit,
String.format("%s must be greater than or equal to %s",
@@ -208,5 +264,23 @@ private void validate() {
Utils.require(shareGroupMaxShareSessions >= config.getInt(GroupCoordinatorConfig.SHARE_GROUP_MAX_SIZE_CONFIG),
String.format("%s must be greater than or equal to %s",
SHARE_GROUP_MAX_SHARE_SESSIONS_CONFIG, GroupCoordinatorConfig.SHARE_GROUP_MAX_SIZE_CONFIG));
+
+ // DLQ validation (KIP-1191)
+ // DLQ topic name must not start with "__" (reserved for internal topics)
+ if (errorsDLQTopicName != null && !errorsDLQTopicName.isEmpty() && errorsDLQTopicName.startsWith("__")) {
+ throw new org.apache.kafka.common.config.ConfigException(
+ ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG,
+ errorsDLQTopicName,
+ "DLQ topic name must not start with '__'");
+ }
+
+ // If auto-create is enabled and the topic name is specified, it must start with the configured prefix
+ if (errorsDLQAutoCreateTopicsEnable && errorsDLQTopicName != null &&
+ !errorsDLQTopicName.isEmpty() && !errorsDLQTopicName.startsWith(errorsDLQTopicNamePrefix)) {
+ throw new org.apache.kafka.common.config.ConfigException(
+ ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG,
+ errorsDLQTopicName,
+ String.format("DLQ topic name must start with prefix '%s' when auto-create is enabled", errorsDLQTopicNamePrefix));
+ }
}
}
diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfigTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfigTest.java
index 4f70e3909aaf8..a8c2b578a5a7d 100644
--- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfigTest.java
+++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfigTest.java
@@ -25,7 +25,9 @@
import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
public class ShareGroupConfigTest {
@@ -133,6 +135,98 @@ public void testInvalidConfigs() {
assertThrows(ConfigException.class, () -> createConfig(configs)).getMessage());
}
+ @Test
+ public void testDLQConfigDefaults() {
+ // Test default DLQ configuration values (KIP-1191)
+ Map configs = new HashMap<>();
+ ShareGroupConfig config = createConfig(configs);
+
+ assertFalse(config.errorsDLQAutoCreateTopicsEnable());
+ assertEquals("dlq.", config.errorsDLQTopicNamePrefix());
+ assertEquals("", config.errorsDLQTopicName());
+ assertFalse(config.errorsDLQCopyRecordEnable());
+ }
+
+ @Test
+ public void testDLQConfigCustomValues() {
+ // Test custom DLQ configuration values
+ Map configs = new HashMap<>();
+ configs.put(ShareGroupConfig.ERRORS_DEADLETTERQUEUE_AUTO_CREATE_TOPICS_ENABLE_CONFIG, true);
+ configs.put(ShareGroupConfig.ERRORS_DEADLETTERQUEUE_TOPIC_NAME_PREFIX_CONFIG, "my-dlq-");
+ configs.put(ShareGroupConfig.ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG, "my-dlq-topic");
+ configs.put(ShareGroupConfig.ERRORS_DEADLETTERQUEUE_COPY_RECORD_ENABLE_CONFIG, true);
+
+ ShareGroupConfig config = createConfig(configs);
+
+ assertTrue(config.errorsDLQAutoCreateTopicsEnable());
+ assertEquals("my-dlq-", config.errorsDLQTopicNamePrefix());
+ assertEquals("my-dlq-topic", config.errorsDLQTopicName());
+ assertTrue(config.errorsDLQCopyRecordEnable());
+ }
+
+ @Test
+ public void testDLQTopicNameCannotStartWithDoubleUnderscore() {
+ // DLQ topic name must not start with "__" (reserved for internal topics)
+ Map configs = new HashMap<>();
+ configs.put(ShareGroupConfig.ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG, "__my-dlq");
+
+ ConfigException exception = assertThrows(ConfigException.class, () -> createConfig(configs));
+ assertEquals("Invalid value __my-dlq for configuration errors.deadletterqueue.topic.name: DLQ topic name must not start with '__'",
+ exception.getMessage());
+ }
+
+ @Test
+ public void testDLQTopicNameMustMatchPrefixWhenAutoCreateEnabled() {
+ // When auto-create is enabled, DLQ topic name must start with the configured prefix
+ Map configs = new HashMap<>();
+ configs.put(ShareGroupConfig.ERRORS_DEADLETTERQUEUE_AUTO_CREATE_TOPICS_ENABLE_CONFIG, true);
+ configs.put(ShareGroupConfig.ERRORS_DEADLETTERQUEUE_TOPIC_NAME_PREFIX_CONFIG, "dlq.");
+ configs.put(ShareGroupConfig.ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG, "my-topic"); // Does not start with "dlq."
+
+ ConfigException exception = assertThrows(ConfigException.class, () -> createConfig(configs));
+ assertEquals("Invalid value my-topic for configuration errors.deadletterqueue.topic.name: DLQ topic name must start with prefix 'dlq.' when auto-create is enabled",
+ exception.getMessage());
+ }
+
+ @Test
+ public void testDLQTopicNamePrefixNotRequiredWhenAutoCreateDisabled() {
+ // When auto-create is disabled, DLQ topic name does not need to match prefix
+ Map configs = new HashMap<>();
+ configs.put(ShareGroupConfig.ERRORS_DEADLETTERQUEUE_AUTO_CREATE_TOPICS_ENABLE_CONFIG, false);
+ configs.put(ShareGroupConfig.ERRORS_DEADLETTERQUEUE_TOPIC_NAME_PREFIX_CONFIG, "dlq.");
+ configs.put(ShareGroupConfig.ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG, "my-custom-topic");
+
+ ShareGroupConfig config = createConfig(configs);
+
+ assertFalse(config.errorsDLQAutoCreateTopicsEnable());
+ assertEquals("my-custom-topic", config.errorsDLQTopicName());
+ }
+
+ @Test
+ public void testDLQBlankTopicNameIsValid() {
+ // Blank DLQ topic name is valid (means DLQ is disabled for that group)
+ Map configs = new HashMap<>();
+ configs.put(ShareGroupConfig.ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG, "");
+
+ ShareGroupConfig config = createConfig(configs);
+
+ assertEquals("", config.errorsDLQTopicName());
+ }
+
+ @Test
+ public void testDLQValidTopicNameWithAutoCreate() {
+ // Valid DLQ topic name with auto-create enabled
+ Map configs = new HashMap<>();
+ configs.put(ShareGroupConfig.ERRORS_DEADLETTERQUEUE_AUTO_CREATE_TOPICS_ENABLE_CONFIG, true);
+ configs.put(ShareGroupConfig.ERRORS_DEADLETTERQUEUE_TOPIC_NAME_PREFIX_CONFIG, "dlq.");
+ configs.put(ShareGroupConfig.ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG, "dlq.my-share-group");
+
+ ShareGroupConfig config = createConfig(configs);
+
+ assertTrue(config.errorsDLQAutoCreateTopicsEnable());
+ assertEquals("dlq.my-share-group", config.errorsDLQTopicName());
+ }
+
public static ShareGroupConfig createShareGroupConfig(
int shareGroupPartitionMaxRecordLocks,
int shareGroupMinPartitionMaxRecordLocks,