diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/VersionedKeyValueStoreWithHeadersIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/VersionedKeyValueStoreWithHeadersIntegrationTest.java new file mode 100644 index 0000000000000..3b0bfa0f9054f --- /dev/null +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/VersionedKeyValueStoreWithHeadersIntegrationTest.java @@ -0,0 +1,389 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.integration; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.header.internals.RecordHeaders; +import org.apache.kafka.common.serialization.IntegerDeserializer; +import org.apache.kafka.common.serialization.IntegerSerializer; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; +import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.ProcessorSupplier; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.state.Stores; +import org.apache.kafka.streams.state.VersionedKeyValueStoreWithHeaders; +import org.apache.kafka.streams.state.VersionedRecord; +import org.apache.kafka.test.TestUtils; + +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInfo; + +import java.io.IOException; +import java.time.Duration; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Properties; + +import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName; +import static org.junit.jupiter.api.Assertions.assertEquals; + +@Tag("integration") +public class VersionedKeyValueStoreWithHeadersIntegrationTest { + + private static final String STORE_NAME = "versioned-headers-store"; + private static final Duration HISTORY_RETENTION = Duration.ofMinutes(10); + + private String inputStream; + private String outputStream; + private long baseTimestamp; + + private KafkaStreams kafkaStreams; + + private static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1); + + private static final Headers HEADERS1 = new RecordHeaders() + .add("source", "test".getBytes()) + .add("version", "1.0".getBytes()); + + private static final Headers HEADERS2 = new RecordHeaders() + .add("source", "test".getBytes()) + .add("version", "2.0".getBytes()); + + private static final Headers EMPTY_HEADERS = new RecordHeaders(); + + public TestInfo testInfo; + + @BeforeAll + public static void before() throws IOException { + CLUSTER.start(); + } + + @AfterAll + public static void after() { + CLUSTER.stop(); + } + + @BeforeEach + public void setUp(final TestInfo testInfo) throws Exception { + this.testInfo = testInfo; + final String testName = safeUniqueTestName(testInfo); + inputStream = "input-" + testName; + outputStream = "output-" + testName; + baseTimestamp = System.currentTimeMillis() - Duration.ofMinutes(5).toMillis(); + CLUSTER.createTopic(inputStream, 1, 1); + CLUSTER.createTopic(outputStream, 1, 1); + } + + @AfterEach + public void tearDown() { + if (kafkaStreams != null) { + kafkaStreams.close(Duration.ofSeconds(30)); + } + } + + @Test + public void shouldPutAndGetWithHeaders() throws Exception { + // Processor that puts records with headers and verifies store contents + final VersionedStoreWithHeadersCheckerProcessor processor = + new VersionedStoreWithHeadersCheckerProcessor(true); + + final Topology topology = buildTopology(() -> processor); + kafkaStreams = new KafkaStreams(topology, streamsConfiguration()); + kafkaStreams.start(); + + // Produce records with HEADERS1 + produceDataToTopicWithHeaders(inputStream, baseTimestamp, HEADERS1, + new KeyValue<>(1, "value1")); + + // Wait for output (processor forwards failedChecks count) + final List> results = + IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived( + consumerConfig(), outputStream, 1, Duration.ofSeconds(30).toMillis()); + + assertEquals(0, results.get(0).value, "Store content check failed"); + } + + @Test + public void shouldPutAndGetWithEmptyHeaders() throws Exception { + final VersionedStoreWithHeadersCheckerProcessor processor = + new VersionedStoreWithHeadersCheckerProcessor(true); + + final Topology topology = buildTopology(() -> processor); + kafkaStreams = new KafkaStreams(topology, streamsConfiguration()); + kafkaStreams.start(); + + // Produce records with empty headers + produceDataToTopicWithHeaders(inputStream, baseTimestamp, EMPTY_HEADERS, + new KeyValue<>(1, "value1")); + + final List> results = + IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived( + consumerConfig(), outputStream, 1, Duration.ofSeconds(30).toMillis()); + + assertEquals(0, results.get(0).value, "Store content check failed for empty headers"); + } + + @Test + public void shouldPreserveHeadersAcrossMultipleVersions() throws Exception { + final VersionedStoreWithHeadersCheckerProcessor processor = + new VersionedStoreWithHeadersCheckerProcessor(true); + + final Topology topology = buildTopology(() -> processor); + kafkaStreams = new KafkaStreams(topology, streamsConfiguration()); + kafkaStreams.start(); + + // Produce first version with HEADERS1 + produceDataToTopicWithHeaders(inputStream, baseTimestamp, HEADERS1, + new KeyValue<>(1, "version1")); + + final List> results1 = + IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived( + consumerConfig(), outputStream, 1, Duration.ofSeconds(30).toMillis()); + assertEquals(0, results1.get(0).value, "First version check failed"); + + // Produce second version with HEADERS2 at a later timestamp + produceDataToTopicWithHeaders(inputStream, baseTimestamp + 1000, HEADERS2, + new KeyValue<>(1, "version2")); + + final List> results2 = + IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived( + consumerConfig(), outputStream, 2, Duration.ofSeconds(30).toMillis()); + assertEquals(0, results2.get(1).value, "Second version check failed"); + } + + @Test + public void shouldRestoreHeadersFromChangelog() throws Exception { + final Properties config = streamsConfiguration(); + // Use a fixed state dir so we can restart and restore + config.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath()); + + // First run: write data with headers + final VersionedStoreWithHeadersCheckerProcessor processor1 = + new VersionedStoreWithHeadersCheckerProcessor(true); + + Topology topology = buildTopology(() -> processor1); + kafkaStreams = new KafkaStreams(topology, config); + kafkaStreams.start(); + + produceDataToTopicWithHeaders(inputStream, baseTimestamp, HEADERS1, + new KeyValue<>(1, "restored-value")); + + final List> firstRunResults = + IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived( + consumerConfig(), outputStream, 1, Duration.ofSeconds(30).toMillis()); + assertEquals(0, firstRunResults.get(0).value, "First run store check failed"); + + // Stop the streams app + kafkaStreams.close(Duration.ofSeconds(30)); + + // Second run: read from store without writing (verifies restoration) + final VersionedStoreWithHeadersCheckerProcessor processor2 = + new VersionedStoreWithHeadersCheckerProcessor(false); + // Seed the processor's expected data from the first run + processor2.seedExpectedData(1, "restored-value", baseTimestamp, HEADERS1); + + topology = buildTopology(() -> processor2); + kafkaStreams = new KafkaStreams(topology, config); + kafkaStreams.start(); + + // Produce a dummy record to trigger processing + produceDataToTopicWithHeaders(inputStream, baseTimestamp + 5000, EMPTY_HEADERS, + new KeyValue<>(99, "trigger")); + + final List> secondRunResults = + IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived( + consumerConfig(), outputStream, 1, Duration.ofSeconds(30).toMillis()); + assertEquals(0, secondRunResults.get(0).value, "Restoration check failed: headers not preserved"); + } + + private Topology buildTopology( + final ProcessorSupplier processorSupplier) { + final var supplier = Stores.persistentVersionedKeyValueStoreWithHeaders( + STORE_NAME, HISTORY_RETENTION); + + final Topology topology = new Topology(); + topology.addSource("source", new IntegerDeserializer(), + Serdes.String().deserializer(), inputStream); + topology.addProcessor("processor", processorSupplier, "source"); + topology.addStateStore( + Stores.versionedKeyValueStoreBuilderWithHeaders( + supplier, Serdes.Integer(), Serdes.String()), + "processor"); + topology.addSink("sink", outputStream, new IntegerSerializer(), + new IntegerSerializer(), "processor"); + return topology; + } + + private Properties streamsConfiguration() { + final Properties config = new Properties(); + config.put(StreamsConfig.APPLICATION_ID_CONFIG, + "versioned-headers-it-" + safeUniqueTestName(testInfo)); + config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); + config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.IntegerSerde.class); + config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class); + config.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath()); + config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L); + return config; + } + + private Properties consumerConfig() { + final Properties config = new Properties(); + config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); + config.put(ConsumerConfig.GROUP_ID_CONFIG, "versioned-headers-it-consumer"); + config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class); + config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class); + return config; + } + + @SuppressWarnings("varargs") + @SafeVarargs + private final int produceDataToTopicWithHeaders(final String topic, + final long timestamp, + final Headers headers, + final KeyValue... keyValues) { + IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp( + topic, + Arrays.asList(keyValues), + TestUtils.producerConfig(CLUSTER.bootstrapServers(), + IntegerSerializer.class, + StringSerializer.class), + headers, + timestamp, + false); + return keyValues.length; + } + + /** + * Processor that stores records in a VersionedKeyValueStoreWithHeaders and validates + * that the store contents match expectations. Forwards the number of failed checks + * as the output value. + */ + private static class VersionedStoreWithHeadersCheckerProcessor + implements Processor { + + private ProcessorContext context; + private VersionedKeyValueStoreWithHeaders store; + + private final boolean writeToStore; + private final Map> data; + + VersionedStoreWithHeadersCheckerProcessor(final boolean writeToStore) { + this.writeToStore = writeToStore; + this.data = new HashMap<>(); + } + + void seedExpectedData(final int key, final String value, + final long timestamp, final Headers headers) { + data.put(key, Optional.of(new VersionedRecordExpectation(value, timestamp, headers))); + } + + @Override + public void init(final ProcessorContext context) { + this.context = context; + store = context.getStateStore(STORE_NAME); + } + + @Override + public void process(final Record record) { + if (writeToStore) { + store.put(record.key(), record.value(), record.timestamp(), record.headers()); + data.put(record.key(), Optional.of( + new VersionedRecordExpectation(record.value(), record.timestamp(), record.headers()))); + } + + final int failedChecks = checkStoreContents(); + context.forward(record.withValue(failedChecks)); + } + + private int checkStoreContents() { + int failedChecks = 0; + for (final Map.Entry> entry : data.entrySet()) { + final Integer key = entry.getKey(); + final VersionedRecordExpectation expected = entry.getValue().orElse(null); + if (expected == null) { + continue; + } + + final VersionedRecord actual = store.get(key); + if (actual == null) { + failedChecks++; + continue; + } + + if (!Objects.equals(actual.value(), expected.value) + || actual.timestamp() != expected.timestamp) { + failedChecks++; + continue; + } + + // Verify headers match + final Headers actualHeaders = actual.headers(); + final Headers expectedHeaders = expected.headers; + if (!headersEqual(actualHeaders, expectedHeaders)) { + failedChecks++; + } + } + return failedChecks; + } + + private static boolean headersEqual(final Headers a, final Headers b) { + if (a == b) return true; + if (a == null || b == null) return false; + + final var iterA = a.iterator(); + final var iterB = b.iterator(); + while (iterA.hasNext() && iterB.hasNext()) { + final var hA = iterA.next(); + final var hB = iterB.next(); + if (!Objects.equals(hA.key(), hB.key())) return false; + if (!java.util.Arrays.equals(hA.value(), hB.value())) return false; + } + return !iterA.hasNext() && !iterB.hasNext(); + } + } + + private static class VersionedRecordExpectation { + final String value; + final long timestamp; + final Headers headers; + + VersionedRecordExpectation(final String value, final long timestamp, final Headers headers) { + this.value = value; + this.timestamp = timestamp; + this.headers = headers; + } + } +} diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java index d567d525dc39a..fd477e3f7bf15 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java @@ -29,6 +29,7 @@ import org.apache.kafka.streams.state.internals.RocksDbSessionBytesStoreSupplier; import org.apache.kafka.streams.state.internals.RocksDbSessionHeadersBytesStoreSupplier; import org.apache.kafka.streams.state.internals.RocksDbVersionedKeyValueBytesStoreSupplier; +import org.apache.kafka.streams.state.internals.RocksDbVersionedKeyValueBytesStoreWithHeadersSupplier; import org.apache.kafka.streams.state.internals.RocksDbWindowBytesStoreSupplier; import org.apache.kafka.streams.state.internals.SessionStoreBuilder; import org.apache.kafka.streams.state.internals.SessionStoreWithHeadersBuilder; @@ -37,6 +38,7 @@ import org.apache.kafka.streams.state.internals.TimestampedWindowStoreBuilder; import org.apache.kafka.streams.state.internals.TimestampedWindowStoreWithHeadersBuilder; import org.apache.kafka.streams.state.internals.VersionedKeyValueStoreBuilder; +import org.apache.kafka.streams.state.internals.VersionedKeyValueStoreBuilderWithHeaders; import org.apache.kafka.streams.state.internals.WindowStoreBuilder; import java.time.Duration; @@ -720,4 +722,41 @@ public static StoreBuilder> sessionStoreWit return new SessionStoreWithHeadersBuilder<>(supplier, keySerde, valueSerde, Time.SYSTEM); } + + /** + * Creates a {@link VersionedBytesStoreSupplier} that also supports headers, backed by RocksDB. + * + * @param name name of the store (cannot be {@code null}) + * @param historyRetention length of time that old record versions are available for query + * @return an instance of {@link VersionedBytesStoreSupplier} that also implements {@link HeadersBytesStoreSupplier} + */ + public static VersionedBytesStoreSupplier persistentVersionedKeyValueStoreWithHeaders(final String name, + final Duration historyRetention) { + Objects.requireNonNull(name, "name cannot be null"); + final String msgPrefix = prepareMillisCheckFailMsgPrefix(historyRetention, "historyRetention"); + final long historyRetentionMs = validateMillisecondDuration(historyRetention, msgPrefix); + if (historyRetentionMs < 0L) { + throw new IllegalArgumentException("historyRetention cannot be negative"); + } + return new RocksDbVersionedKeyValueBytesStoreWithHeadersSupplier(name, historyRetentionMs); + } + + /** + * Creates a {@link StoreBuilder} that can be used to build a {@link VersionedKeyValueStore} + * with headers support. + * + * @param supplier a {@link VersionedBytesStoreSupplier} (cannot be {@code null}) + * @param keySerde the key serde to use + * @param valueSerde the value serde to use + * @param key type + * @param value type + * @return an instance of a {@link StoreBuilder} that can build a {@link VersionedKeyValueStore} + */ + public static StoreBuilder> versionedKeyValueStoreBuilderWithHeaders( + final VersionedBytesStoreSupplier supplier, + final Serde keySerde, + final Serde valueSerde) { + Objects.requireNonNull(supplier, "supplier cannot be null"); + return new VersionedKeyValueStoreBuilderWithHeaders<>(supplier, keySerde, valueSerde, Time.SYSTEM); + } } \ No newline at end of file diff --git a/streams/src/main/java/org/apache/kafka/streams/state/VersionedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/VersionedBytesStore.java index e840824577e07..544215711dda5 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/VersionedBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/VersionedBytesStore.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.state; +import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.utils.Bytes; /** @@ -29,6 +30,14 @@ public interface VersionedBytesStore extends KeyValueStore, Times */ long put(Bytes key, byte[] value, long timestamp); + /** + * The analog of {@link VersionedKeyValueStore#put(Object, Object, long)} with headers. + * Default implementation discards headers for backward compatibility. + */ + default long put(final Bytes key, final byte[] value, final long timestamp, final Headers headers) { + return put(key, value, timestamp); + } + /** * The analog of {@link VersionedKeyValueStore#get(Object, long)}. */ diff --git a/streams/src/main/java/org/apache/kafka/streams/state/VersionedKeyValueStoreWithHeaders.java b/streams/src/main/java/org/apache/kafka/streams/state/VersionedKeyValueStoreWithHeaders.java new file mode 100644 index 0000000000000..8b67fdc5e1567 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/state/VersionedKeyValueStoreWithHeaders.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state; + +import org.apache.kafka.common.header.Headers; + +/** + * A versioned key-value store that additionally preserves record headers. + *

+ * This interface extends {@link VersionedKeyValueStore} with a headers-aware + * {@link #put(Object, Object, long, Headers)} method. On reads, headers are + * available via {@link VersionedRecord#headers()}. + * + * @param The key type + * @param The value type + */ +public interface VersionedKeyValueStoreWithHeaders extends VersionedKeyValueStore { + + /** + * Add a new record version associated with the specified key, timestamp, and headers. + * + * @param key The key + * @param value The value, it can be {@code null}. {@code null} is interpreted as a delete. + * @param timestamp The timestamp for this record version + * @param headers The record headers; must not be {@code null} + * @return The validTo timestamp of the newly put record, or special values as defined + * in {@link VersionedKeyValueStore#put(Object, Object, long)}. + * @throws NullPointerException if {@code headers} is {@code null} + */ + long put(K key, V value, long timestamp, Headers headers); +} diff --git a/streams/src/main/java/org/apache/kafka/streams/state/VersionedRecord.java b/streams/src/main/java/org/apache/kafka/streams/state/VersionedRecord.java index 6df07562853a3..ec0578426ba52 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/VersionedRecord.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/VersionedRecord.java @@ -16,6 +16,9 @@ */ package org.apache.kafka.streams.state; +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.header.internals.RecordHeaders; + import java.util.Objects; import java.util.Optional; @@ -29,6 +32,7 @@ public final class VersionedRecord { private final V value; private final long timestamp; private final Optional validTo; + private final Headers headers; /** * Create a new {@link VersionedRecord} instance. {@code value} cannot be {@code null}. @@ -37,9 +41,7 @@ public final class VersionedRecord { * @param timestamp The type of the result returned by this query. */ public VersionedRecord(final V value, final long timestamp) { - this.value = Objects.requireNonNull(value, "value cannot be null."); - this.timestamp = timestamp; - this.validTo = Optional.empty(); + this(value, timestamp, Optional.empty(), new RecordHeaders()); } /** @@ -50,11 +52,38 @@ public VersionedRecord(final V value, final long timestamp) { * @param validTo The exclusive upper bound of the validity interval */ public VersionedRecord(final V value, final long timestamp, final long validTo) { - this.value = Objects.requireNonNull(value); - this.timestamp = timestamp; - this.validTo = Optional.of(validTo); + this(value, timestamp, Optional.of(validTo), new RecordHeaders()); + } + + /** + * Create a new {@link VersionedRecord} instance with headers. {@code value} cannot be {@code null}. + * + * @param value The value + * @param timestamp The timestamp + * @param headers The record headers + */ + public VersionedRecord(final V value, final long timestamp, final Headers headers) { + this(value, timestamp, Optional.empty(), headers); + } + + /** + * Create a new {@link VersionedRecord} instance with headers. {@code value} cannot be {@code null}. + * + * @param value The value + * @param timestamp The timestamp + * @param validTo The exclusive upper bound of the validity interval + * @param headers The record headers + */ + public VersionedRecord(final V value, final long timestamp, final long validTo, final Headers headers) { + this(value, timestamp, Optional.of(validTo), headers); } + private VersionedRecord(final V value, final long timestamp, final Optional validTo, final Headers headers) { + this.value = Objects.requireNonNull(value, "value cannot be null."); + this.timestamp = timestamp; + this.validTo = validTo; + this.headers = Objects.requireNonNull(headers, "headers cannot be null."); + } public V value() { return value; @@ -68,9 +97,16 @@ public Optional validTo() { return validTo; } + /** + * @return the record headers. Never {@code null} (returns empty headers if none were set). + */ + public Headers headers() { + return headers; + } + @Override public String toString() { - return "<" + value + "," + timestamp + "," + validTo + ">"; + return "<" + value + "," + timestamp + "," + validTo + "," + headers + ">"; } @Override @@ -83,11 +119,12 @@ public boolean equals(final Object o) { } final VersionedRecord that = (VersionedRecord) o; return timestamp == that.timestamp && validTo == that.validTo && - Objects.equals(value, that.value); + Objects.equals(value, that.value) && + Objects.equals(headers, that.headers); } @Override public int hashCode() { - return Objects.hash(value, timestamp, validTo); + return Objects.hash(value, timestamp, validTo, headers); } } \ No newline at end of file diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingVersionedKeyValueBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingVersionedKeyValueBytesStore.java index bd35648210d1c..4822170e478ca 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingVersionedKeyValueBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingVersionedKeyValueBytesStore.java @@ -45,6 +45,14 @@ public long put(final Bytes key, final byte[] value, final long timestamp) { return validTo; } + @Override + public long put(final Bytes key, final byte[] value, final long timestamp, final Headers headers) { + final Headers nonNullHeaders = headers != null ? headers : new RecordHeaders(); + final long validTo = inner.put(key, value, timestamp, nonNullHeaders); + log(key, value, timestamp, nonNullHeaders); + return validTo; + } + @Override public byte[] get(final Bytes key, final long asOfTimestamp) { return inner.get(key, asOfTimestamp); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreWithHeaders.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreWithHeaders.java new file mode 100644 index 0000000000000..e8ebcde58e957 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreWithHeaders.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.header.internals.RecordHeaders; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.common.utils.internals.ByteUtils; +import org.apache.kafka.streams.state.HeadersBytesStore; +import org.apache.kafka.streams.state.VersionedKeyValueStoreWithHeaders; +import org.apache.kafka.streams.state.VersionedRecord; + +import java.nio.ByteBuffer; +import java.util.Objects; + +/** + * A persistent, versioned key-value store based on RocksDB that additionally + * preserves record headers. + *

+ * Headers are embedded into the value bytes using the format: + * {@code [headersSize(varint)][headersBytes][rawValue]} + * before delegating to the parent {@link RocksDBVersionedStore}. On reads, + * headers are extracted from the stored value bytes. + */ +public class RocksDBVersionedStoreWithHeaders + extends RocksDBVersionedStore + implements VersionedKeyValueStoreWithHeaders { + + RocksDBVersionedStoreWithHeaders(final String name, + final String metricsScope, + final long historyRetention, + final long segmentInterval) { + super(name, metricsScope, historyRetention, segmentInterval); + } + + @Override + public long put(final Bytes key, final byte[] value, final long timestamp, final Headers headers) { + Objects.requireNonNull(headers, "headers cannot be null"); + if (value == null) { + // tombstone: delegate directly, no headers to embed + return super.put(key, null, timestamp); + } + + // Check if headers are empty and use fast path + if (!headers.iterator().hasNext()) { + final byte[] encodedValue = HeadersBytesStore.convertToHeaderFormat(value); + return super.put(key, encodedValue, timestamp); + } + + // Use shared HeadersSerializer infrastructure + final HeadersSerializer.PreSerializedHeaders prep = HeadersSerializer.prepareSerialization(headers); + final int payloadSize = prep.requiredBufferSizeForHeaders + value.length; + final ByteBuffer buffer = ByteBuffer.allocate(ByteUtils.sizeOfVarint(prep.requiredBufferSizeForHeaders) + payloadSize); + ByteUtils.writeVarint(prep.requiredBufferSizeForHeaders, buffer); + HeadersSerializer.serialize(prep, buffer); + buffer.put(value); + return super.put(key, buffer.array(), timestamp); + } + + @Override + public long put(final Bytes key, final byte[] value, final long timestamp) { + // non-headers put: embed empty headers + return put(key, value, timestamp, new RecordHeaders()); + } + + @Override + public VersionedRecord get(final Bytes key) { + final VersionedRecord record = super.get(key); + return decodeRecord(record); + } + + @Override + public VersionedRecord get(final Bytes key, final long asOfTimestamp) { + final VersionedRecord record = super.get(key, asOfTimestamp); + return decodeRecord(record); + } + + @Override + public VersionedRecord delete(final Bytes key, final long timestamp) { + final VersionedRecord record = super.delete(key, timestamp); + return decodeRecord(record); + } + + private static VersionedRecord decodeRecord(final VersionedRecord record) { + if (record == null) { + return null; + } + final byte[] encodedValue = record.value(); + final Headers headers = Utils.headers(encodedValue); + final byte[] rawValue = extractRawValue(encodedValue); + if (record.validTo().isPresent()) { + return new VersionedRecord<>(rawValue, record.timestamp(), record.validTo().get(), headers); + } else { + return new VersionedRecord<>(rawValue, record.timestamp(), headers); + } + } + + /** + * Extract raw value from encoded value bytes, stripping the headers prefix. + * Format: [headersSize(varint)][headersBytes][rawValue] + */ + private static byte[] extractRawValue(final byte[] encodedValue) { + if (encodedValue == null) { + return null; + } + final ByteBuffer buffer = ByteBuffer.wrap(encodedValue); + final int headersSize = ByteUtils.readVarint(buffer); + buffer.position(buffer.position() + headersSize); + final byte[] rawValue = new byte[buffer.remaining()]; + buffer.get(rawValue); + return rawValue; + } + +} diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbVersionedKeyValueBytesStoreWithHeadersSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbVersionedKeyValueBytesStoreWithHeadersSupplier.java new file mode 100644 index 0000000000000..8f938c92684e3 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbVersionedKeyValueBytesStoreWithHeadersSupplier.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.state.HeadersBytesStoreSupplier; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.VersionedBytesStoreSupplier; + +/** + * A {@link VersionedBytesStoreSupplier} that also implements {@link HeadersBytesStoreSupplier}, + * providing a versioned store with headers support backed by RocksDB. + */ +public class RocksDbVersionedKeyValueBytesStoreWithHeadersSupplier + implements VersionedBytesStoreSupplier, HeadersBytesStoreSupplier { + + private final String name; + private final long historyRetentionMs; + private final long segmentIntervalMs; + + public RocksDbVersionedKeyValueBytesStoreWithHeadersSupplier(final String name, + final long historyRetentionMs) { + this(name, historyRetentionMs, defaultSegmentInterval(historyRetentionMs)); + } + + public RocksDbVersionedKeyValueBytesStoreWithHeadersSupplier(final String name, + final long historyRetentionMs, + final long segmentIntervalMs) { + this.name = name; + this.historyRetentionMs = historyRetentionMs; + this.segmentIntervalMs = segmentIntervalMs; + } + + @Override + public String name() { + return name; + } + + @Override + public long historyRetentionMs() { + return historyRetentionMs; + } + + public long segmentIntervalMs() { + return segmentIntervalMs; + } + + @Override + public KeyValueStore get() { + return new VersionedKeyValueToBytesStoreAdapter( + new RocksDBVersionedStoreWithHeaders(name, metricsScope(), historyRetentionMs, segmentIntervalMs) + ); + } + + @Override + public String metricsScope() { + return "rocksdb"; + } + + private static long defaultSegmentInterval(final long historyRetentionMs) { + if (historyRetentionMs <= 60_000L) { + return Math.max(historyRetentionMs / 3, 2_000L); + } else if (historyRetentionMs <= 300_000L) { + return Math.max(historyRetentionMs / 5, 20_000L); + } else if (historyRetentionMs <= 3600_000L) { + return Math.max(historyRetentionMs / 12, 60_000L); + } else { + return Math.max(historyRetentionMs / 24, 300_000L); + } + } +} diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/VersionedKeyValueStoreBuilderWithHeaders.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/VersionedKeyValueStoreBuilderWithHeaders.java new file mode 100644 index 0000000000000..58156d6ea6731 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/VersionedKeyValueStoreBuilderWithHeaders.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.StoreBuilder; +import org.apache.kafka.streams.state.VersionedBytesStore; +import org.apache.kafka.streams.state.VersionedBytesStoreSupplier; +import org.apache.kafka.streams.state.VersionedKeyValueStore; + +import java.util.Objects; + +/** + * Builder for versioned key-value stores with headers support. + * Follows the same pattern as {@link VersionedKeyValueStoreBuilder} but wraps + * with a headers-aware changelog store. + */ +public class VersionedKeyValueStoreBuilderWithHeaders + extends AbstractStoreBuilder> { + + private final VersionedBytesStoreSupplier storeSupplier; + + public VersionedKeyValueStoreBuilderWithHeaders(final VersionedBytesStoreSupplier storeSupplier, + final Serde keySerde, + final Serde valueSerde, + final Time time) { + super( + storeSupplier.name(), + keySerde, + valueSerde, + time); + Objects.requireNonNull(storeSupplier, "storeSupplier can't be null"); + Objects.requireNonNull(storeSupplier.metricsScope(), "storeSupplier's metricsScope can't be null"); + this.storeSupplier = storeSupplier; + } + + @Override + public VersionedKeyValueStore build() { + final KeyValueStore store = storeSupplier.get(); + if (!(store instanceof VersionedBytesStore)) { + throw new IllegalStateException( + "VersionedBytesStoreSupplier.get() must return an instance of VersionedBytesStore"); + } + + return new MeteredVersionedKeyValueStore<>( + maybeWrapLogging((VersionedBytesStore) store), + storeSupplier.metricsScope(), + time, + keySerde, + valueSerde); + } + + @Override + public StoreBuilder> withCachingEnabled() { + throw new IllegalStateException("Versioned stores do not support caching"); + } + + public long historyRetention() { + return storeSupplier.historyRetentionMs(); + } + + private VersionedBytesStore maybeWrapLogging(final VersionedBytesStore inner) { + if (!enableLogging) { + return inner; + } + return new ChangeLoggingVersionedKeyValueBytesStore(inner); + } +} diff --git a/streams/src/test/java/org/apache/kafka/streams/state/VersionedRecordWithHeadersTest.java b/streams/src/test/java/org/apache/kafka/streams/state/VersionedRecordWithHeadersTest.java new file mode 100644 index 0000000000000..64530920ea6fd --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/state/VersionedRecordWithHeadersTest.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state; + +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.header.internals.RecordHeader; +import org.apache.kafka.common.header.internals.RecordHeaders; + +import org.junit.jupiter.api.Test; + +import java.nio.charset.StandardCharsets; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class VersionedRecordWithHeadersTest { + + @Test + public void shouldCreateRecordWithDefaultEmptyHeaders() { + final VersionedRecord record = new VersionedRecord<>("value", 42L); + + assertEquals("value", record.value()); + assertEquals(42L, record.timestamp()); + assertFalse(record.validTo().isPresent()); + assertNotNull(record.headers()); + assertFalse(record.headers().iterator().hasNext()); + } + + @Test + public void shouldCreateRecordWithValidToAndDefaultEmptyHeaders() { + final VersionedRecord record = new VersionedRecord<>("value", 42L, 100L); + + assertEquals("value", record.value()); + assertEquals(42L, record.timestamp()); + assertTrue(record.validTo().isPresent()); + assertEquals(100L, record.validTo().get()); + assertNotNull(record.headers()); + assertFalse(record.headers().iterator().hasNext()); + } + + @Test + public void shouldCreateRecordWithExplicitHeaders() { + final Headers headers = new RecordHeaders(new RecordHeader[]{ + new RecordHeader("key1", "val1".getBytes(StandardCharsets.UTF_8)) + }); + final VersionedRecord record = new VersionedRecord<>("value", 42L, headers); + + assertEquals("value", record.value()); + assertEquals(42L, record.timestamp()); + assertFalse(record.validTo().isPresent()); + assertEquals(headers, record.headers()); + assertEquals("val1", new String(record.headers().lastHeader("key1").value(), StandardCharsets.UTF_8)); + } + + @Test + public void shouldCreateRecordWithValidToAndExplicitHeaders() { + final Headers headers = new RecordHeaders(new RecordHeader[]{ + new RecordHeader("traceId", "abc".getBytes(StandardCharsets.UTF_8)) + }); + final VersionedRecord record = new VersionedRecord<>("value", 42L, 100L, headers); + + assertEquals("value", record.value()); + assertEquals(42L, record.timestamp()); + assertTrue(record.validTo().isPresent()); + assertEquals(100L, record.validTo().get()); + assertEquals(headers, record.headers()); + } + + @Test + public void shouldRejectNullValue() { + assertThrows(NullPointerException.class, () -> new VersionedRecord<>(null, 42L)); + } + + @Test + public void shouldRejectNullHeaders() { + assertThrows(NullPointerException.class, () -> new VersionedRecord<>("value", 42L, null)); + } + + @Test + public void shouldRejectNullHeadersWithValidTo() { + assertThrows(NullPointerException.class, () -> new VersionedRecord<>("value", 42L, 100L, null)); + } + + @Test + public void shouldNotBeEqualForRecordsWithDifferentHeaders() { + final Headers h1 = new RecordHeaders(new RecordHeader[]{ + new RecordHeader("k", "v1".getBytes(StandardCharsets.UTF_8)) + }); + final Headers h2 = new RecordHeaders(new RecordHeader[]{ + new RecordHeader("k", "v2".getBytes(StandardCharsets.UTF_8)) + }); + final VersionedRecord r1 = new VersionedRecord<>("val", 10L, h1); + final VersionedRecord r2 = new VersionedRecord<>("val", 10L, h2); + + assertNotEquals(r1, r2); + } + + @Test + public void shouldBeEqualForRecordsWithSameHeaders() { + final Headers h1 = new RecordHeaders(new RecordHeader[]{ + new RecordHeader("k", "v1".getBytes(StandardCharsets.UTF_8)) + }); + final Headers h2 = new RecordHeaders(new RecordHeader[]{ + new RecordHeader("k", "v1".getBytes(StandardCharsets.UTF_8)) + }); + final VersionedRecord r1 = new VersionedRecord<>("val", 10L, h1); + final VersionedRecord r2 = new VersionedRecord<>("val", 10L, h2); + + assertEquals(r1, r2); + assertEquals(r1.hashCode(), r2.hashCode()); + } + + @Test + public void shouldReturnCorrectToString() { + final VersionedRecord record = new VersionedRecord<>("value", 42L); + final String str = record.toString(); + assertTrue(str.startsWith(" record = store.get(key); + assertNotNull(record); + assertArrayEquals(val, record.value()); + assertEquals(timestamp, record.timestamp()); + + final Headers returnedHeaders = record.headers(); + assertNotNull(returnedHeaders); + assertEquals("trace-123", + new String(returnedHeaders.lastHeader("traceId").value(), StandardCharsets.UTF_8)); + assertEquals("user-456", + new String(returnedHeaders.lastHeader("userId").value(), StandardCharsets.UTF_8)); + } + + @Test + public void shouldPutAndGetWithEmptyHeaders() { + final Bytes key = key("emptyHeadersKey"); + final byte[] val = value("emptyHeadersValue"); + final long timestamp = 2000L; + final RecordHeaders headers = new RecordHeaders(); + + store.put(key, val, timestamp, headers); + + final VersionedRecord record = store.get(key); + assertNotNull(record); + assertArrayEquals(val, record.value()); + assertEquals(timestamp, record.timestamp()); + + final Headers returnedHeaders = record.headers(); + assertNotNull(returnedHeaders); + assertFalse(returnedHeaders.iterator().hasNext()); + } + + @Test + public void shouldGetByTimestampWithHeaders() { + final Bytes key = key("versionedKey"); + final long ts1 = 1000L; + final long ts2 = 2000L; + + final RecordHeaders headers1 = new RecordHeaders(); + headers1.add(new RecordHeader("version", "1".getBytes(StandardCharsets.UTF_8))); + + final RecordHeaders headers2 = new RecordHeaders(); + headers2.add(new RecordHeader("version", "2".getBytes(StandardCharsets.UTF_8))); + + store.put(key, value("value1"), ts1, headers1); + store.put(key, value("value2"), ts2, headers2); + + // Get as of ts2 + final VersionedRecord record = store.get(key, ts2); + assertNotNull(record); + assertArrayEquals(value("value2"), record.value()); + assertEquals(ts2, record.timestamp()); + assertEquals("2", + new String(record.headers().lastHeader("version").value(), StandardCharsets.UTF_8)); + + // Get as of ts1 (should return version 1) + final VersionedRecord record1 = store.get(key, ts1); + assertNotNull(record1); + assertArrayEquals(value("value1"), record1.value()); + assertEquals(ts1, record1.timestamp()); + assertEquals("1", + new String(record1.headers().lastHeader("version").value(), StandardCharsets.UTF_8)); + } + + @Test + public void shouldDeleteRecordWithHeaders() { + final Bytes key = key("deleteKey"); + final byte[] val = value("deleteValue"); + final long timestamp = 1000L; + final RecordHeaders headers = new RecordHeaders(); + headers.add(new RecordHeader("operation", "put".getBytes(StandardCharsets.UTF_8))); + + store.put(key, val, timestamp, headers); + + // Verify the record exists before deletion + final VersionedRecord record = store.get(key); + assertNotNull(record); + assertArrayEquals(val, record.value()); + assertEquals(timestamp, record.timestamp()); + assertEquals("put", + new String(record.headers().lastHeader("operation").value(), StandardCharsets.UTF_8)); + + // Delete the record by putting a tombstone at a later timestamp + store.put(key, null, 2000L, new RecordHeaders()); + + // Verify the record is now deleted (returns null for get) + final VersionedRecord afterDelete = store.get(key); + assertNull(afterDelete); + } + + @Test + public void shouldHandleTombstoneViaHeadersPut() { + final Bytes key = key("tombstoneKey"); + final long timestamp = 1000L; + final RecordHeaders headers = new RecordHeaders(); + headers.add(new RecordHeader("tombstoneMarker", "true".getBytes(StandardCharsets.UTF_8))); + + // Put null value with headers (tombstone) + store.put(key, null, timestamp, headers); + + // Get should return null for the value + final VersionedRecord record = store.get(key); + assertNull(record); + } + + @Test + public void shouldPutWithoutHeadersDefaultsToEmptyHeaders() { + final Bytes key = key("noHeadersKey"); + final byte[] val = value("noHeadersValue"); + final long timestamp = 3000L; + + // Use put without headers argument + store.put(key, val, timestamp); + + final VersionedRecord record = store.get(key); + assertNotNull(record); + assertArrayEquals(val, record.value()); + assertEquals(timestamp, record.timestamp()); + + final Headers returnedHeaders = record.headers(); + assertNotNull(returnedHeaders); + assertFalse(returnedHeaders.iterator().hasNext()); + } + + @Test + public void shouldPreserveHeadersWithNullHeaderValues() { + final Bytes key = key("nullHeaderValueKey"); + final byte[] val = value("testValue"); + final long timestamp = 4000L; + final RecordHeaders headers = new RecordHeaders(); + headers.add(new RecordHeader("nullValue", null)); + headers.add(new RecordHeader("normalValue", "normal".getBytes(StandardCharsets.UTF_8))); + + store.put(key, val, timestamp, headers); + + final VersionedRecord record = store.get(key); + assertNotNull(record); + assertArrayEquals(val, record.value()); + + final Headers returnedHeaders = record.headers(); + assertNotNull(returnedHeaders); + assertNull(returnedHeaders.lastHeader("nullValue").value()); + assertEquals("normal", + new String(returnedHeaders.lastHeader("normalValue").value(), StandardCharsets.UTF_8)); + } + + @Test + public void shouldPreserveMultipleHeadersSameKey() { + final Bytes key = key("multiHeaderKey"); + final byte[] val = value("multiHeaderValue"); + final long timestamp = 5000L; + final RecordHeaders headers = new RecordHeaders(); + headers.add(new RecordHeader("tag", "tag1".getBytes(StandardCharsets.UTF_8))); + headers.add(new RecordHeader("tag", "tag2".getBytes(StandardCharsets.UTF_8))); + headers.add(new RecordHeader("tag", "tag3".getBytes(StandardCharsets.UTF_8))); + + store.put(key, val, timestamp, headers); + + final VersionedRecord record = store.get(key); + assertNotNull(record); + assertArrayEquals(val, record.value()); + + final Headers returnedHeaders = record.headers(); + assertNotNull(returnedHeaders); + + // Verify all headers with same key are preserved + final java.util.List

tagHeaders = new java.util.ArrayList<>(); + for (final Header h : returnedHeaders) { + if (h.key().equals("tag")) { + tagHeaders.add(h); + } + } + assertEquals(3, tagHeaders.size()); + assertEquals("tag1", new String(tagHeaders.get(0).value(), StandardCharsets.UTF_8)); + assertEquals("tag2", new String(tagHeaders.get(1).value(), StandardCharsets.UTF_8)); + assertEquals("tag3", new String(tagHeaders.get(2).value(), StandardCharsets.UTF_8)); + } +} diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/VersionedKeyValueStoreBuilderWithHeadersTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/VersionedKeyValueStoreBuilderWithHeadersTest.java new file mode 100644 index 0000000000000..b4333c5016684 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/VersionedKeyValueStoreBuilderWithHeadersTest.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.streams.state.HeadersBytesStoreSupplier; +import org.apache.kafka.streams.state.Stores; +import org.apache.kafka.streams.state.VersionedBytesStoreSupplier; + +import org.junit.jupiter.api.Test; + +import java.time.Duration; + +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class VersionedKeyValueStoreBuilderWithHeadersTest { + + @Test + public void shouldCreateSupplierWithDualInterface() { + final VersionedBytesStoreSupplier supplier = + Stores.persistentVersionedKeyValueStoreWithHeaders("test-store", Duration.ofMinutes(5)); + + assertNotNull(supplier); + assertInstanceOf(VersionedBytesStoreSupplier.class, supplier); + assertInstanceOf(HeadersBytesStoreSupplier.class, supplier); + assertInstanceOf(RocksDbVersionedKeyValueBytesStoreWithHeadersSupplier.class, supplier); + } + + @Test + public void shouldRejectNullName() { + assertThrows(NullPointerException.class, + () -> Stores.persistentVersionedKeyValueStoreWithHeaders(null, Duration.ofMinutes(5))); + } + + @Test + public void shouldRejectNegativeHistoryRetention() { + assertThrows(IllegalArgumentException.class, + () -> Stores.persistentVersionedKeyValueStoreWithHeaders("test", Duration.ofMillis(-1))); + } + + @Test + public void shouldCreateBuilderWithHeaders() { + final var supplier = + Stores.persistentVersionedKeyValueStoreWithHeaders("test-store", Duration.ofMinutes(5)); + final var builder = Stores.versionedKeyValueStoreBuilderWithHeaders(supplier, null, null); + + assertNotNull(builder); + assertInstanceOf(VersionedKeyValueStoreBuilderWithHeaders.class, builder); + } + + @Test + public void shouldRejectNullSupplier() { + assertThrows(NullPointerException.class, + () -> Stores.versionedKeyValueStoreBuilderWithHeaders(null, null, null)); + } + + @Test + public void shouldRejectCachingOnBuilder() { + final var supplier = + Stores.persistentVersionedKeyValueStoreWithHeaders("test-store", Duration.ofMinutes(5)); + final var builder = Stores.versionedKeyValueStoreBuilderWithHeaders(supplier, null, null); + + assertThrows(IllegalStateException.class, builder::withCachingEnabled); + } +}