From 799a021d75504d3c47981de6e2c5e56905969e59 Mon Sep 17 00:00:00 2001 From: shekharrajak Date: Sat, 4 Apr 2026 10:52:41 +0530 Subject: [PATCH 1/7] Add headers support to VersionedRecord, VersionedKeyValueStoreWithHeaders interface, and VersionedBytesStore default put --- .../streams/state/VersionedBytesStore.java | 9 ++ .../VersionedKeyValueStoreWithHeaders.java | 44 +++++++ .../kafka/streams/state/VersionedRecord.java | 48 ++++++- .../state/VersionedRecordWithHeadersTest.java | 123 ++++++++++++++++++ 4 files changed, 218 insertions(+), 6 deletions(-) create mode 100644 streams/src/main/java/org/apache/kafka/streams/state/VersionedKeyValueStoreWithHeaders.java create mode 100644 streams/src/test/java/org/apache/kafka/streams/state/VersionedRecordWithHeadersTest.java diff --git a/streams/src/main/java/org/apache/kafka/streams/state/VersionedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/VersionedBytesStore.java index e840824577e07..544215711dda5 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/VersionedBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/VersionedBytesStore.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.state; +import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.utils.Bytes; /** @@ -29,6 +30,14 @@ public interface VersionedBytesStore extends KeyValueStore, Times */ long put(Bytes key, byte[] value, long timestamp); + /** + * The analog of {@link VersionedKeyValueStore#put(Object, Object, long)} with headers. + * Default implementation discards headers for backward compatibility. + */ + default long put(final Bytes key, final byte[] value, final long timestamp, final Headers headers) { + return put(key, value, timestamp); + } + /** * The analog of {@link VersionedKeyValueStore#get(Object, long)}. */ diff --git a/streams/src/main/java/org/apache/kafka/streams/state/VersionedKeyValueStoreWithHeaders.java b/streams/src/main/java/org/apache/kafka/streams/state/VersionedKeyValueStoreWithHeaders.java new file mode 100644 index 0000000000000..d956f58f02d6c --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/state/VersionedKeyValueStoreWithHeaders.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state; + +import org.apache.kafka.common.header.Headers; + +/** + * A versioned key-value store that additionally preserves record headers. + *

+ * This interface extends {@link VersionedKeyValueStore} with a headers-aware + * {@link #put(Object, Object, long, Headers)} method. On reads, headers are + * available via {@link VersionedRecord#headers()}. + * + * @param The key type + * @param The value type + */ +public interface VersionedKeyValueStoreWithHeaders extends VersionedKeyValueStore { + + /** + * Add a new record version associated with the specified key, timestamp, and headers. + * + * @param key The key + * @param value The value, it can be {@code null}. {@code null} is interpreted as a delete. + * @param timestamp The timestamp for this record version + * @param headers The record headers + * @return The validTo timestamp of the newly put record, or special values as defined + * in {@link VersionedKeyValueStore#put(Object, Object, long)}. + */ + long put(K key, V value, long timestamp, Headers headers); +} diff --git a/streams/src/main/java/org/apache/kafka/streams/state/VersionedRecord.java b/streams/src/main/java/org/apache/kafka/streams/state/VersionedRecord.java index 6df07562853a3..d242c9c2b0b0c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/VersionedRecord.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/VersionedRecord.java @@ -16,6 +16,9 @@ */ package org.apache.kafka.streams.state; +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.header.internals.RecordHeaders; + import java.util.Objects; import java.util.Optional; @@ -29,6 +32,7 @@ public final class VersionedRecord { private final V value; private final long timestamp; private final Optional validTo; + private final Headers headers; /** * Create a new {@link VersionedRecord} instance. {@code value} cannot be {@code null}. @@ -37,9 +41,7 @@ public final class VersionedRecord { * @param timestamp The type of the result returned by this query. */ public VersionedRecord(final V value, final long timestamp) { - this.value = Objects.requireNonNull(value, "value cannot be null."); - this.timestamp = timestamp; - this.validTo = Optional.empty(); + this(value, timestamp, Optional.empty(), new RecordHeaders()); } /** @@ -50,11 +52,38 @@ public VersionedRecord(final V value, final long timestamp) { * @param validTo The exclusive upper bound of the validity interval */ public VersionedRecord(final V value, final long timestamp, final long validTo) { - this.value = Objects.requireNonNull(value); - this.timestamp = timestamp; - this.validTo = Optional.of(validTo); + this(value, timestamp, Optional.of(validTo), new RecordHeaders()); + } + + /** + * Create a new {@link VersionedRecord} instance with headers. {@code value} cannot be {@code null}. + * + * @param value The value + * @param timestamp The timestamp + * @param headers The record headers + */ + public VersionedRecord(final V value, final long timestamp, final Headers headers) { + this(value, timestamp, Optional.empty(), headers); + } + + /** + * Create a new {@link VersionedRecord} instance with headers. {@code value} cannot be {@code null}. + * + * @param value The value + * @param timestamp The timestamp + * @param validTo The exclusive upper bound of the validity interval + * @param headers The record headers + */ + public VersionedRecord(final V value, final long timestamp, final long validTo, final Headers headers) { + this(value, timestamp, Optional.of(validTo), headers); } + private VersionedRecord(final V value, final long timestamp, final Optional validTo, final Headers headers) { + this.value = Objects.requireNonNull(value, "value cannot be null."); + this.timestamp = timestamp; + this.validTo = validTo; + this.headers = Objects.requireNonNull(headers, "headers cannot be null."); + } public V value() { return value; @@ -68,6 +97,13 @@ public Optional validTo() { return validTo; } + /** + * @return the record headers. Never {@code null} (returns empty headers if none were set). + */ + public Headers headers() { + return headers; + } + @Override public String toString() { return "<" + value + "," + timestamp + "," + validTo + ">"; diff --git a/streams/src/test/java/org/apache/kafka/streams/state/VersionedRecordWithHeadersTest.java b/streams/src/test/java/org/apache/kafka/streams/state/VersionedRecordWithHeadersTest.java new file mode 100644 index 0000000000000..0c3df4e4f6602 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/state/VersionedRecordWithHeadersTest.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state; + +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.header.internals.RecordHeader; +import org.apache.kafka.common.header.internals.RecordHeaders; + +import org.junit.jupiter.api.Test; + +import java.nio.charset.StandardCharsets; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class VersionedRecordWithHeadersTest { + + @Test + public void shouldCreateRecordWithDefaultEmptyHeaders() { + final VersionedRecord record = new VersionedRecord<>("value", 42L); + + assertEquals("value", record.value()); + assertEquals(42L, record.timestamp()); + assertFalse(record.validTo().isPresent()); + assertNotNull(record.headers()); + assertFalse(record.headers().iterator().hasNext()); + } + + @Test + public void shouldCreateRecordWithValidToAndDefaultEmptyHeaders() { + final VersionedRecord record = new VersionedRecord<>("value", 42L, 100L); + + assertEquals("value", record.value()); + assertEquals(42L, record.timestamp()); + assertTrue(record.validTo().isPresent()); + assertEquals(100L, record.validTo().get()); + assertNotNull(record.headers()); + assertFalse(record.headers().iterator().hasNext()); + } + + @Test + public void shouldCreateRecordWithExplicitHeaders() { + final Headers headers = new RecordHeaders(new RecordHeader[]{ + new RecordHeader("key1", "val1".getBytes(StandardCharsets.UTF_8)) + }); + final VersionedRecord record = new VersionedRecord<>("value", 42L, headers); + + assertEquals("value", record.value()); + assertEquals(42L, record.timestamp()); + assertFalse(record.validTo().isPresent()); + assertEquals(headers, record.headers()); + assertEquals("val1", new String(record.headers().lastHeader("key1").value(), StandardCharsets.UTF_8)); + } + + @Test + public void shouldCreateRecordWithValidToAndExplicitHeaders() { + final Headers headers = new RecordHeaders(new RecordHeader[]{ + new RecordHeader("traceId", "abc".getBytes(StandardCharsets.UTF_8)) + }); + final VersionedRecord record = new VersionedRecord<>("value", 42L, 100L, headers); + + assertEquals("value", record.value()); + assertEquals(42L, record.timestamp()); + assertTrue(record.validTo().isPresent()); + assertEquals(100L, record.validTo().get()); + assertEquals(headers, record.headers()); + } + + @Test + public void shouldRejectNullValue() { + assertThrows(NullPointerException.class, () -> new VersionedRecord<>(null, 42L)); + } + + @Test + public void shouldRejectNullHeaders() { + assertThrows(NullPointerException.class, () -> new VersionedRecord<>("value", 42L, null)); + } + + @Test + public void shouldRejectNullHeadersWithValidTo() { + assertThrows(NullPointerException.class, () -> new VersionedRecord<>("value", 42L, 100L, null)); + } + + @Test + public void shouldPreserveEqualityForRecordsWithDifferentHeaders() { + // Equality is based on value, timestamp, and validTo only (not headers). + // This matches the existing contract. + final Headers h1 = new RecordHeaders(new RecordHeader[]{ + new RecordHeader("k", "v1".getBytes(StandardCharsets.UTF_8)) + }); + final Headers h2 = new RecordHeaders(new RecordHeader[]{ + new RecordHeader("k", "v2".getBytes(StandardCharsets.UTF_8)) + }); + final VersionedRecord r1 = new VersionedRecord<>("val", 10L, h1); + final VersionedRecord r2 = new VersionedRecord<>("val", 10L, h2); + + assertEquals(r1, r2); + assertEquals(r1.hashCode(), r2.hashCode()); + } + + @Test + public void shouldReturnCorrectToString() { + final VersionedRecord record = new VersionedRecord<>("value", 42L); + assertEquals("", record.toString()); + } +} From 68ce1b92a023c53b0ef9f9bc8986c312c87de533 Mon Sep 17 00:00:00 2001 From: shekharrajak Date: Sat, 4 Apr 2026 11:04:11 +0530 Subject: [PATCH 2/7] Add RocksDBVersionedStoreWithHeaders and headers-aware versioned store supplier --- .../RocksDBVersionedStoreWithHeaders.java | 267 ++++++++++++++++++ ...KeyValueBytesStoreWithHeadersSupplier.java | 85 ++++++ .../RocksDBVersionedStoreWithHeadersTest.java | 177 ++++++++++++ 3 files changed, 529 insertions(+) create mode 100644 streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreWithHeaders.java create mode 100644 streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbVersionedKeyValueBytesStoreWithHeadersSupplier.java create mode 100644 streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreWithHeadersTest.java diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreWithHeaders.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreWithHeaders.java new file mode 100644 index 0000000000000..e36c8a3a221a5 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreWithHeaders.java @@ -0,0 +1,267 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.header.internals.RecordHeaders; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.state.VersionedKeyValueStoreWithHeaders; +import org.apache.kafka.streams.state.VersionedRecord; + +import java.nio.ByteBuffer; +import java.util.Objects; + +/** + * A persistent, versioned key-value store based on RocksDB that additionally + * preserves record headers. + *

+ * Headers are embedded into the value bytes using the format: + * {@code [headersSize(varint)][headersBytes][rawValue]} + * before delegating to the parent {@link RocksDBVersionedStore}. On reads, + * headers are extracted from the stored value bytes. + */ +public class RocksDBVersionedStoreWithHeaders + extends RocksDBVersionedStore + implements VersionedKeyValueStoreWithHeaders { + + RocksDBVersionedStoreWithHeaders(final String name, + final String metricsScope, + final long historyRetention, + final long segmentInterval) { + super(name, metricsScope, historyRetention, segmentInterval); + } + + @Override + public long put(final Bytes key, final byte[] value, final long timestamp, final Headers headers) { + Objects.requireNonNull(headers, "headers cannot be null"); + if (value == null) { + // tombstone: delegate directly, no headers to embed + return super.put(key, null, timestamp); + } + final byte[] encodedValue = HeadersUtils.encode(headers, value); + return super.put(key, encodedValue, timestamp); + } + + @Override + public long put(final Bytes key, final byte[] value, final long timestamp) { + // non-headers put: embed empty headers + return put(key, value, timestamp, new RecordHeaders()); + } + + @Override + public VersionedRecord get(final Bytes key) { + final VersionedRecord record = super.get(key); + return decodeRecord(record); + } + + @Override + public VersionedRecord get(final Bytes key, final long asOfTimestamp) { + final VersionedRecord record = super.get(key, asOfTimestamp); + return decodeRecord(record); + } + + @Override + public VersionedRecord delete(final Bytes key, final long timestamp) { + final VersionedRecord record = super.delete(key, timestamp); + return decodeRecord(record); + } + + private static VersionedRecord decodeRecord(final VersionedRecord record) { + if (record == null) { + return null; + } + final byte[] encodedValue = record.value(); + final Headers headers = HeadersUtils.extractHeaders(encodedValue); + final byte[] rawValue = HeadersUtils.extractValue(encodedValue); + if (record.validTo().isPresent()) { + return new VersionedRecord<>(rawValue, record.timestamp(), record.validTo().get(), headers); + } else { + return new VersionedRecord<>(rawValue, record.timestamp(), headers); + } + } + + /** + * Utility methods for encoding and decoding headers in value bytes. + *

+ * Format: {@code [headersSize(varint)][headersBytes][rawValue]} + */ + static final class HeadersUtils { + + private HeadersUtils() {} + + /** + * Encode headers and raw value into a single byte array. + */ + static byte[] encode(final Headers headers, final byte[] rawValue) { + final byte[] headersBytes = HeadersSerializer.serialize(headers); + final byte[] varintBytes = encodeVarint(headersBytes.length); + final ByteBuffer buffer = ByteBuffer.allocate( + varintBytes.length + headersBytes.length + rawValue.length); + buffer.put(varintBytes); + buffer.put(headersBytes); + buffer.put(rawValue); + return buffer.array(); + } + + /** + * Extract headers from encoded value bytes. + */ + static Headers extractHeaders(final byte[] encodedValue) { + final ByteBuffer buffer = ByteBuffer.wrap(encodedValue); + final int headersSize = decodeVarint(buffer); + if (headersSize == 0) { + return new RecordHeaders(); + } + final byte[] headersBytes = new byte[headersSize]; + buffer.get(headersBytes); + return HeadersSerializer.deserialize(headersBytes); + } + + /** + * Extract raw value from encoded value bytes (skipping headers). + */ + static byte[] extractValue(final byte[] encodedValue) { + final ByteBuffer buffer = ByteBuffer.wrap(encodedValue); + final int headersSize = decodeVarint(buffer); + buffer.position(buffer.position() + headersSize); + final byte[] rawValue = new byte[buffer.remaining()]; + buffer.get(rawValue); + return rawValue; + } + + static byte[] encodeVarint(int value) { + if (value == 0) { + return new byte[]{0x00}; + } + // Max 5 bytes for a 32-bit varint + final byte[] temp = new byte[5]; + int pos = 0; + while (value > 0x7F) { + temp[pos++] = (byte) ((value & 0x7F) | 0x80); + value >>>= 7; + } + temp[pos++] = (byte) value; + final byte[] result = new byte[pos]; + System.arraycopy(temp, 0, result, 0, pos); + return result; + } + + static int decodeVarint(final ByteBuffer buffer) { + int result = 0; + int shift = 0; + while (true) { + final byte b = buffer.get(); + result |= (b & 0x7F) << shift; + if ((b & 0x80) == 0) { + return result; + } + shift += 7; + } + } + } + + /** + * Serializer/deserializer for Kafka Headers to/from byte arrays. + */ + static final class HeadersSerializer { + + private HeadersSerializer() {} + + /** + * Serialize Headers to bytes. Format per header: + * {@code [keyLength(varint)][keyBytes][valueLength(varint)][valueBytes]} + */ + static byte[] serialize(final Headers headers) { + if (headers == null || !headers.iterator().hasNext()) { + return new byte[0]; + } + // Estimate size + int estimatedSize = 0; + for (final org.apache.kafka.common.header.Header h : headers) { + estimatedSize += 5 + h.key().length() + 5 + (h.value() != null ? h.value().length : 0); + } + final ByteBuffer buffer = ByteBuffer.allocate(estimatedSize); + for (final org.apache.kafka.common.header.Header h : headers) { + final byte[] keyBytes = h.key().getBytes(java.nio.charset.StandardCharsets.UTF_8); + putVarint(buffer, keyBytes.length); + buffer.put(keyBytes); + if (h.value() == null) { + putVarint(buffer, -1); + } else { + putVarint(buffer, h.value().length); + buffer.put(h.value()); + } + } + final byte[] result = new byte[buffer.position()]; + buffer.flip(); + buffer.get(result); + return result; + } + + /** + * Deserialize bytes to Headers. + */ + static Headers deserialize(final byte[] data) { + if (data == null || data.length == 0) { + return new RecordHeaders(); + } + final RecordHeaders headers = new RecordHeaders(); + final ByteBuffer buffer = ByteBuffer.wrap(data); + while (buffer.hasRemaining()) { + final int keyLen = getVarint(buffer); + final byte[] keyBytes = new byte[keyLen]; + buffer.get(keyBytes); + final String key = new String(keyBytes, java.nio.charset.StandardCharsets.UTF_8); + final int valueLen = getVarint(buffer); + final byte[] value; + if (valueLen < 0) { + value = null; + } else { + value = new byte[valueLen]; + buffer.get(value); + } + headers.add(new org.apache.kafka.common.header.internals.RecordHeader(key, value)); + } + return headers; + } + + private static void putVarint(final ByteBuffer buffer, int value) { + // Handle negative values (for null marker -1) using zigzag encoding + value = (value << 1) ^ (value >> 31); + while ((value & ~0x7F) != 0) { + buffer.put((byte) ((value & 0x7F) | 0x80)); + value >>>= 7; + } + buffer.put((byte) value); + } + + private static int getVarint(final ByteBuffer buffer) { + int result = 0; + int shift = 0; + while (true) { + final byte b = buffer.get(); + result |= (b & 0x7F) << shift; + if ((b & 0x80) == 0) { + break; + } + shift += 7; + } + // Zigzag decode + return (result >>> 1) ^ -(result & 1); + } + } +} diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbVersionedKeyValueBytesStoreWithHeadersSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbVersionedKeyValueBytesStoreWithHeadersSupplier.java new file mode 100644 index 0000000000000..8f938c92684e3 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbVersionedKeyValueBytesStoreWithHeadersSupplier.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.state.HeadersBytesStoreSupplier; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.VersionedBytesStoreSupplier; + +/** + * A {@link VersionedBytesStoreSupplier} that also implements {@link HeadersBytesStoreSupplier}, + * providing a versioned store with headers support backed by RocksDB. + */ +public class RocksDbVersionedKeyValueBytesStoreWithHeadersSupplier + implements VersionedBytesStoreSupplier, HeadersBytesStoreSupplier { + + private final String name; + private final long historyRetentionMs; + private final long segmentIntervalMs; + + public RocksDbVersionedKeyValueBytesStoreWithHeadersSupplier(final String name, + final long historyRetentionMs) { + this(name, historyRetentionMs, defaultSegmentInterval(historyRetentionMs)); + } + + public RocksDbVersionedKeyValueBytesStoreWithHeadersSupplier(final String name, + final long historyRetentionMs, + final long segmentIntervalMs) { + this.name = name; + this.historyRetentionMs = historyRetentionMs; + this.segmentIntervalMs = segmentIntervalMs; + } + + @Override + public String name() { + return name; + } + + @Override + public long historyRetentionMs() { + return historyRetentionMs; + } + + public long segmentIntervalMs() { + return segmentIntervalMs; + } + + @Override + public KeyValueStore get() { + return new VersionedKeyValueToBytesStoreAdapter( + new RocksDBVersionedStoreWithHeaders(name, metricsScope(), historyRetentionMs, segmentIntervalMs) + ); + } + + @Override + public String metricsScope() { + return "rocksdb"; + } + + private static long defaultSegmentInterval(final long historyRetentionMs) { + if (historyRetentionMs <= 60_000L) { + return Math.max(historyRetentionMs / 3, 2_000L); + } else if (historyRetentionMs <= 300_000L) { + return Math.max(historyRetentionMs / 5, 20_000L); + } else if (historyRetentionMs <= 3600_000L) { + return Math.max(historyRetentionMs / 12, 60_000L); + } else { + return Math.max(historyRetentionMs / 24, 300_000L); + } + } +} diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreWithHeadersTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreWithHeadersTest.java new file mode 100644 index 0000000000000..b186c96d4877e --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreWithHeadersTest.java @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.header.internals.RecordHeader; +import org.apache.kafka.common.header.internals.RecordHeaders; + +import org.junit.jupiter.api.Test; + +import java.nio.charset.StandardCharsets; + +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +public class RocksDBVersionedStoreWithHeadersTest { + + // Tests for HeadersUtils encoding/decoding + + @Test + public void shouldEncodeAndDecodeEmptyHeaders() { + final Headers headers = new RecordHeaders(); + final byte[] rawValue = "hello".getBytes(StandardCharsets.UTF_8); + + final byte[] encoded = RocksDBVersionedStoreWithHeaders.HeadersUtils.encode(headers, rawValue); + assertNotNull(encoded); + + final Headers decoded = RocksDBVersionedStoreWithHeaders.HeadersUtils.extractHeaders(encoded); + assertNotNull(decoded); + assertFalse(decoded.iterator().hasNext()); + + final byte[] decodedValue = RocksDBVersionedStoreWithHeaders.HeadersUtils.extractValue(encoded); + assertArrayEquals(rawValue, decodedValue); + } + + @Test + public void shouldEncodeAndDecodeWithHeaders() { + final RecordHeaders headers = new RecordHeaders(); + headers.add(new RecordHeader("traceId", "abc-123".getBytes(StandardCharsets.UTF_8))); + headers.add(new RecordHeader("schemaId", new byte[]{0, 0, 0, 1})); + final byte[] rawValue = "world".getBytes(StandardCharsets.UTF_8); + + final byte[] encoded = RocksDBVersionedStoreWithHeaders.HeadersUtils.encode(headers, rawValue); + assertNotNull(encoded); + + final Headers decoded = RocksDBVersionedStoreWithHeaders.HeadersUtils.extractHeaders(encoded); + assertNotNull(decoded); + assertEquals("abc-123", + new String(decoded.lastHeader("traceId").value(), StandardCharsets.UTF_8)); + assertArrayEquals(new byte[]{0, 0, 0, 1}, decoded.lastHeader("schemaId").value()); + + final byte[] decodedValue = RocksDBVersionedStoreWithHeaders.HeadersUtils.extractValue(encoded); + assertArrayEquals(rawValue, decodedValue); + } + + @Test + public void shouldEncodeAndDecodeHeaderWithNullValue() { + final RecordHeaders headers = new RecordHeaders(); + headers.add(new RecordHeader("nullHeader", null)); + final byte[] rawValue = "test".getBytes(StandardCharsets.UTF_8); + + final byte[] encoded = RocksDBVersionedStoreWithHeaders.HeadersUtils.encode(headers, rawValue); + + final Headers decoded = RocksDBVersionedStoreWithHeaders.HeadersUtils.extractHeaders(encoded); + assertNotNull(decoded.lastHeader("nullHeader")); + assertEquals(null, decoded.lastHeader("nullHeader").value()); + + final byte[] decodedValue = RocksDBVersionedStoreWithHeaders.HeadersUtils.extractValue(encoded); + assertArrayEquals(rawValue, decodedValue); + } + + @Test + public void shouldEncodeAndDecodeMultipleHeaders() { + final RecordHeaders headers = new RecordHeaders(); + for (int i = 0; i < 10; i++) { + headers.add(new RecordHeader("key" + i, ("value" + i).getBytes(StandardCharsets.UTF_8))); + } + final byte[] rawValue = new byte[100]; + for (int i = 0; i < 100; i++) { + rawValue[i] = (byte) i; + } + + final byte[] encoded = RocksDBVersionedStoreWithHeaders.HeadersUtils.encode(headers, rawValue); + + final Headers decoded = RocksDBVersionedStoreWithHeaders.HeadersUtils.extractHeaders(encoded); + for (int i = 0; i < 10; i++) { + assertEquals("value" + i, + new String(decoded.lastHeader("key" + i).value(), StandardCharsets.UTF_8)); + } + + final byte[] decodedValue = RocksDBVersionedStoreWithHeaders.HeadersUtils.extractValue(encoded); + assertArrayEquals(rawValue, decodedValue); + } + + @Test + public void shouldHandleEmptyRawValue() { + final RecordHeaders headers = new RecordHeaders(); + headers.add(new RecordHeader("k", "v".getBytes(StandardCharsets.UTF_8))); + final byte[] rawValue = new byte[0]; + + final byte[] encoded = RocksDBVersionedStoreWithHeaders.HeadersUtils.encode(headers, rawValue); + + final byte[] decodedValue = RocksDBVersionedStoreWithHeaders.HeadersUtils.extractValue(encoded); + assertArrayEquals(rawValue, decodedValue); + + final Headers decoded = RocksDBVersionedStoreWithHeaders.HeadersUtils.extractHeaders(encoded); + assertEquals("v", + new String(decoded.lastHeader("k").value(), StandardCharsets.UTF_8)); + } + + // Tests for HeadersSerializer + + @Test + public void shouldSerializeAndDeserializeEmptyHeaders() { + final Headers headers = new RecordHeaders(); + final byte[] serialized = RocksDBVersionedStoreWithHeaders.HeadersSerializer.serialize(headers); + assertEquals(0, serialized.length); + final Headers deserialized = RocksDBVersionedStoreWithHeaders.HeadersSerializer.deserialize(serialized); + assertFalse(deserialized.iterator().hasNext()); + } + + @Test + public void shouldSerializeAndDeserializeHeaders() { + final RecordHeaders headers = new RecordHeaders(); + headers.add(new RecordHeader("a", "1".getBytes(StandardCharsets.UTF_8))); + headers.add(new RecordHeader("b", "22".getBytes(StandardCharsets.UTF_8))); + + final byte[] serialized = RocksDBVersionedStoreWithHeaders.HeadersSerializer.serialize(headers); + final Headers deserialized = RocksDBVersionedStoreWithHeaders.HeadersSerializer.deserialize(serialized); + + assertEquals("1", + new String(deserialized.lastHeader("a").value(), StandardCharsets.UTF_8)); + assertEquals("22", + new String(deserialized.lastHeader("b").value(), StandardCharsets.UTF_8)); + } + + // Tests for varint encoding + + @Test + public void shouldEncodeAndDecodeVarintZero() { + final byte[] encoded = RocksDBVersionedStoreWithHeaders.HeadersUtils.encodeVarint(0); + assertEquals(1, encoded.length); + assertEquals(0x00, encoded[0]); + } + + @Test + public void shouldEncodeAndDecodeVarintSmall() { + final byte[] encoded = RocksDBVersionedStoreWithHeaders.HeadersUtils.encodeVarint(127); + assertEquals(1, encoded.length); + final java.nio.ByteBuffer buf = java.nio.ByteBuffer.wrap(encoded); + assertEquals(127, RocksDBVersionedStoreWithHeaders.HeadersUtils.decodeVarint(buf)); + } + + @Test + public void shouldEncodeAndDecodeVarintLarge() { + final byte[] encoded = RocksDBVersionedStoreWithHeaders.HeadersUtils.encodeVarint(300); + assertEquals(2, encoded.length); + final java.nio.ByteBuffer buf = java.nio.ByteBuffer.wrap(encoded); + assertEquals(300, RocksDBVersionedStoreWithHeaders.HeadersUtils.decodeVarint(buf)); + } +} From c84a6e4e85839ffe9575a0b0cc935c2175764444 Mon Sep 17 00:00:00 2001 From: shekharrajak Date: Sat, 4 Apr 2026 11:05:56 +0530 Subject: [PATCH 3/7] Add VersionedKeyValueStoreBuilderWithHeaders and headers-aware changelog for versioned stores --- ...ngeLoggingVersionedKeyValueBytesStore.java | 7 ++ ...sionedKeyValueStoreBuilderWithHeaders.java | 85 +++++++++++++++++++ 2 files changed, 92 insertions(+) create mode 100644 streams/src/main/java/org/apache/kafka/streams/state/internals/VersionedKeyValueStoreBuilderWithHeaders.java diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingVersionedKeyValueBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingVersionedKeyValueBytesStore.java index bd35648210d1c..f7d8e5096a4ca 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingVersionedKeyValueBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingVersionedKeyValueBytesStore.java @@ -45,6 +45,13 @@ public long put(final Bytes key, final byte[] value, final long timestamp) { return validTo; } + @Override + public long put(final Bytes key, final byte[] value, final long timestamp, final Headers headers) { + final long validTo = inner.put(key, value, timestamp, headers); + log(key, value, timestamp, headers != null ? headers : new RecordHeaders()); + return validTo; + } + @Override public byte[] get(final Bytes key, final long asOfTimestamp) { return inner.get(key, asOfTimestamp); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/VersionedKeyValueStoreBuilderWithHeaders.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/VersionedKeyValueStoreBuilderWithHeaders.java new file mode 100644 index 0000000000000..58156d6ea6731 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/VersionedKeyValueStoreBuilderWithHeaders.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.StoreBuilder; +import org.apache.kafka.streams.state.VersionedBytesStore; +import org.apache.kafka.streams.state.VersionedBytesStoreSupplier; +import org.apache.kafka.streams.state.VersionedKeyValueStore; + +import java.util.Objects; + +/** + * Builder for versioned key-value stores with headers support. + * Follows the same pattern as {@link VersionedKeyValueStoreBuilder} but wraps + * with a headers-aware changelog store. + */ +public class VersionedKeyValueStoreBuilderWithHeaders + extends AbstractStoreBuilder> { + + private final VersionedBytesStoreSupplier storeSupplier; + + public VersionedKeyValueStoreBuilderWithHeaders(final VersionedBytesStoreSupplier storeSupplier, + final Serde keySerde, + final Serde valueSerde, + final Time time) { + super( + storeSupplier.name(), + keySerde, + valueSerde, + time); + Objects.requireNonNull(storeSupplier, "storeSupplier can't be null"); + Objects.requireNonNull(storeSupplier.metricsScope(), "storeSupplier's metricsScope can't be null"); + this.storeSupplier = storeSupplier; + } + + @Override + public VersionedKeyValueStore build() { + final KeyValueStore store = storeSupplier.get(); + if (!(store instanceof VersionedBytesStore)) { + throw new IllegalStateException( + "VersionedBytesStoreSupplier.get() must return an instance of VersionedBytesStore"); + } + + return new MeteredVersionedKeyValueStore<>( + maybeWrapLogging((VersionedBytesStore) store), + storeSupplier.metricsScope(), + time, + keySerde, + valueSerde); + } + + @Override + public StoreBuilder> withCachingEnabled() { + throw new IllegalStateException("Versioned stores do not support caching"); + } + + public long historyRetention() { + return storeSupplier.historyRetentionMs(); + } + + private VersionedBytesStore maybeWrapLogging(final VersionedBytesStore inner) { + if (!enableLogging) { + return inner; + } + return new ChangeLoggingVersionedKeyValueBytesStore(inner); + } +} From a1997775a9eade05b4c03c2e33a7056b46a729b9 Mon Sep 17 00:00:00 2001 From: shekharrajak Date: Sat, 4 Apr 2026 11:12:57 +0530 Subject: [PATCH 4/7] Wire versioned headers-aware stores into DSL via Stores, KeyValueStoreMaterializer, and KeyValueStoreWrapper --- .../internals/KeyValueStoreMaterializer.java | 10 ++- .../apache/kafka/streams/state/Stores.java | 39 +++++++++ .../state/internals/KeyValueStoreWrapper.java | 22 ++++- ...ValueStoreMaterializerWithHeadersTest.java | 42 ++++++++++ ...edKeyValueStoreBuilderWithHeadersTest.java | 80 +++++++++++++++++++ 5 files changed, 187 insertions(+), 6 deletions(-) create mode 100644 streams/src/test/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializerWithHeadersTest.java create mode 100644 streams/src/test/java/org/apache/kafka/streams/state/internals/VersionedKeyValueStoreBuilderWithHeadersTest.java diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java index 30b3b885c1018..9b19128e9320e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java @@ -27,6 +27,7 @@ import org.apache.kafka.streams.state.VersionedBytesStoreSupplier; import org.apache.kafka.streams.state.internals.TimestampedKeyValueStoreBuilder; import org.apache.kafka.streams.state.internals.VersionedKeyValueStoreBuilder; +import org.apache.kafka.streams.state.internals.VersionedKeyValueStoreBuilderWithHeaders; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -51,7 +52,12 @@ public StoreBuilder builder() { : (KeyValueBytesStoreSupplier) materialized.storeSupplier(); final StoreBuilder builder; - if (supplier instanceof VersionedBytesStoreSupplier) { + if (supplier instanceof VersionedBytesStoreSupplier && supplier instanceof HeadersBytesStoreSupplier) { + builder = Stores.versionedKeyValueStoreBuilderWithHeaders( + (VersionedBytesStoreSupplier) supplier, + materialized.keySerde(), + materialized.valueSerde()); + } else if (supplier instanceof VersionedBytesStoreSupplier) { builder = Stores.versionedKeyValueStoreBuilder( (VersionedBytesStoreSupplier) supplier, materialized.keySerde(), @@ -75,7 +81,7 @@ public StoreBuilder builder() { } if (materialized.cachingEnabled()) { - if (builder instanceof VersionedKeyValueStoreBuilder) { + if (builder instanceof VersionedKeyValueStoreBuilder || builder instanceof VersionedKeyValueStoreBuilderWithHeaders) { LOG.info("Not enabling caching for store '{}' as versioned stores do not support caching.", supplier.name()); } else { builder.withCachingEnabled(); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java index 14c37ed93b437..fd256f2e811fb 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java @@ -29,6 +29,8 @@ import org.apache.kafka.streams.state.internals.RocksDbSessionBytesStoreSupplier; import org.apache.kafka.streams.state.internals.RocksDbSessionHeadersBytesStoreSupplier; import org.apache.kafka.streams.state.internals.RocksDbVersionedKeyValueBytesStoreSupplier; +import org.apache.kafka.streams.state.internals.RocksDbVersionedKeyValueBytesStoreWithHeadersSupplier; +import org.apache.kafka.streams.state.internals.VersionedKeyValueStoreBuilderWithHeaders; import org.apache.kafka.streams.state.internals.RocksDbWindowBytesStoreSupplier; import org.apache.kafka.streams.state.internals.SessionStoreBuilder; import org.apache.kafka.streams.state.internals.SessionStoreBuilderWithHeaders; @@ -683,4 +685,41 @@ public static StoreBuilder> tim Objects.requireNonNull(supplier, "supplier cannot be null"); return new TimestampedKeyValueStoreBuilderWithHeaders<>(supplier, keySerde, valueSerde, Time.SYSTEM); } + + /** + * Creates a {@link VersionedBytesStoreSupplier} that also supports headers, backed by RocksDB. + * + * @param name name of the store (cannot be {@code null}) + * @param historyRetention length of time that old record versions are available for query + * @return an instance of {@link VersionedBytesStoreSupplier} that also implements {@link HeadersBytesStoreSupplier} + */ + public static VersionedBytesStoreSupplier persistentVersionedKeyValueStoreWithHeaders(final String name, + final Duration historyRetention) { + Objects.requireNonNull(name, "name cannot be null"); + final String msgPrefix = prepareMillisCheckFailMsgPrefix(historyRetention, "historyRetention"); + final long historyRetentionMs = validateMillisecondDuration(historyRetention, msgPrefix); + if (historyRetentionMs < 0L) { + throw new IllegalArgumentException("historyRetention cannot be negative"); + } + return new RocksDbVersionedKeyValueBytesStoreWithHeadersSupplier(name, historyRetentionMs); + } + + /** + * Creates a {@link StoreBuilder} that can be used to build a {@link VersionedKeyValueStore} + * with headers support. + * + * @param supplier a {@link VersionedBytesStoreSupplier} (cannot be {@code null}) + * @param keySerde the key serde to use + * @param valueSerde the value serde to use + * @param key type + * @param value type + * @return an instance of a {@link StoreBuilder} that can build a {@link VersionedKeyValueStore} + */ + public static StoreBuilder> versionedKeyValueStoreBuilderWithHeaders( + final VersionedBytesStoreSupplier supplier, + final Serde keySerde, + final Serde valueSerde) { + Objects.requireNonNull(supplier, "supplier cannot be null"); + return new VersionedKeyValueStoreBuilderWithHeaders<>(supplier, keySerde, valueSerde, Time.SYSTEM); + } } \ No newline at end of file diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreWrapper.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreWrapper.java index 62d939562ca70..0a54dd39c2679 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreWrapper.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreWrapper.java @@ -32,6 +32,7 @@ import org.apache.kafka.streams.state.TimestampedKeyValueStoreWithHeaders; import org.apache.kafka.streams.state.ValueTimestampHeaders; import org.apache.kafka.streams.state.VersionedKeyValueStore; +import org.apache.kafka.streams.state.VersionedKeyValueStoreWithHeaders; import org.apache.kafka.streams.state.VersionedRecord; import java.util.Map; @@ -93,9 +94,13 @@ public ValueTimestampHeaders get(final K key) { } if (versionedStore != null) { final VersionedRecord versionedRecord = versionedStore.get(key); - return versionedRecord == null - ? null - : ValueTimestampHeaders.make(versionedRecord.value(), versionedRecord.timestamp(), new RecordHeaders()); + if (versionedRecord == null) { + return null; + } + return ValueTimestampHeaders.make( + versionedRecord.value(), + versionedRecord.timestamp(), + versionedRecord.headers()); } throw new IllegalStateException("KeyValueStoreWrapper must be initialized with either headers or versioned store"); } @@ -105,7 +110,13 @@ public ValueTimestampHeaders get(final K key, final long asOfTimestamp) { throw new UnsupportedOperationException("get(key, timestamp) is only supported for versioned stores"); } final VersionedRecord versionedRecord = versionedStore.get(key, asOfTimestamp); - return versionedRecord == null ? null : ValueTimestampHeaders.make(versionedRecord.value(), versionedRecord.timestamp(), new RecordHeaders()); + if (versionedRecord == null) { + return null; + } + return ValueTimestampHeaders.make( + versionedRecord.value(), + versionedRecord.timestamp(), + versionedRecord.headers()); } /** @@ -119,6 +130,9 @@ public long put(final K key, final V value, final long timestamp, final Headers return PUT_RETURN_CODE_IS_LATEST; } if (versionedStore != null) { + if (versionedStore instanceof VersionedKeyValueStoreWithHeaders) { + return ((VersionedKeyValueStoreWithHeaders) versionedStore).put(key, value, timestamp, headers); + } return versionedStore.put(key, value, timestamp); } throw new IllegalStateException("KeyValueStoreWrapper must be initialized with either headers or versioned store"); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializerWithHeadersTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializerWithHeadersTest.java new file mode 100644 index 0000000000000..58f3d16721d9d --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializerWithHeadersTest.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.streams.state.HeadersBytesStoreSupplier; +import org.apache.kafka.streams.state.Stores; +import org.apache.kafka.streams.state.VersionedBytesStoreSupplier; +import org.apache.kafka.streams.state.internals.RocksDbVersionedKeyValueBytesStoreWithHeadersSupplier; + +import org.junit.jupiter.api.Test; + +import java.time.Duration; + +import static org.junit.jupiter.api.Assertions.assertInstanceOf; + +public class KeyValueStoreMaterializerWithHeadersTest { + + @Test + public void shouldReturnDualInterfaceSupplier() { + final VersionedBytesStoreSupplier supplier = + Stores.persistentVersionedKeyValueStoreWithHeaders("test-store", Duration.ofMinutes(5)); + + // Verify the supplier implements both interfaces + assertInstanceOf(VersionedBytesStoreSupplier.class, supplier); + assertInstanceOf(HeadersBytesStoreSupplier.class, supplier); + assertInstanceOf(RocksDbVersionedKeyValueBytesStoreWithHeadersSupplier.class, supplier); + } +} diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/VersionedKeyValueStoreBuilderWithHeadersTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/VersionedKeyValueStoreBuilderWithHeadersTest.java new file mode 100644 index 0000000000000..b4333c5016684 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/VersionedKeyValueStoreBuilderWithHeadersTest.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.streams.state.HeadersBytesStoreSupplier; +import org.apache.kafka.streams.state.Stores; +import org.apache.kafka.streams.state.VersionedBytesStoreSupplier; + +import org.junit.jupiter.api.Test; + +import java.time.Duration; + +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class VersionedKeyValueStoreBuilderWithHeadersTest { + + @Test + public void shouldCreateSupplierWithDualInterface() { + final VersionedBytesStoreSupplier supplier = + Stores.persistentVersionedKeyValueStoreWithHeaders("test-store", Duration.ofMinutes(5)); + + assertNotNull(supplier); + assertInstanceOf(VersionedBytesStoreSupplier.class, supplier); + assertInstanceOf(HeadersBytesStoreSupplier.class, supplier); + assertInstanceOf(RocksDbVersionedKeyValueBytesStoreWithHeadersSupplier.class, supplier); + } + + @Test + public void shouldRejectNullName() { + assertThrows(NullPointerException.class, + () -> Stores.persistentVersionedKeyValueStoreWithHeaders(null, Duration.ofMinutes(5))); + } + + @Test + public void shouldRejectNegativeHistoryRetention() { + assertThrows(IllegalArgumentException.class, + () -> Stores.persistentVersionedKeyValueStoreWithHeaders("test", Duration.ofMillis(-1))); + } + + @Test + public void shouldCreateBuilderWithHeaders() { + final var supplier = + Stores.persistentVersionedKeyValueStoreWithHeaders("test-store", Duration.ofMinutes(5)); + final var builder = Stores.versionedKeyValueStoreBuilderWithHeaders(supplier, null, null); + + assertNotNull(builder); + assertInstanceOf(VersionedKeyValueStoreBuilderWithHeaders.class, builder); + } + + @Test + public void shouldRejectNullSupplier() { + assertThrows(NullPointerException.class, + () -> Stores.versionedKeyValueStoreBuilderWithHeaders(null, null, null)); + } + + @Test + public void shouldRejectCachingOnBuilder() { + final var supplier = + Stores.persistentVersionedKeyValueStoreWithHeaders("test-store", Duration.ofMinutes(5)); + final var builder = Stores.versionedKeyValueStoreBuilderWithHeaders(supplier, null, null); + + assertThrows(IllegalStateException.class, builder::withCachingEnabled); + } +} From dc416dfb50787f60f6ebc68d486d1e8d2cd52dba Mon Sep 17 00:00:00 2001 From: shekharrajak Date: Sat, 4 Apr 2026 11:30:24 +0530 Subject: [PATCH 5/7] Add TopologyTestDriver-based integration test for versioned store with headers DSL wiring --- .../apache/kafka/streams/state/Stores.java | 2 +- .../state/internals/KeyValueStoreWrapper.java | 1 - ...dKeyValueStoreWithHeadersTopologyTest.java | 250 ++++++++++++++++++ 3 files changed, 251 insertions(+), 2 deletions(-) create mode 100644 streams/src/test/java/org/apache/kafka/streams/state/internals/VersionedKeyValueStoreWithHeadersTopologyTest.java diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java index fd256f2e811fb..7e4ce3688be7b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java @@ -30,7 +30,6 @@ import org.apache.kafka.streams.state.internals.RocksDbSessionHeadersBytesStoreSupplier; import org.apache.kafka.streams.state.internals.RocksDbVersionedKeyValueBytesStoreSupplier; import org.apache.kafka.streams.state.internals.RocksDbVersionedKeyValueBytesStoreWithHeadersSupplier; -import org.apache.kafka.streams.state.internals.VersionedKeyValueStoreBuilderWithHeaders; import org.apache.kafka.streams.state.internals.RocksDbWindowBytesStoreSupplier; import org.apache.kafka.streams.state.internals.SessionStoreBuilder; import org.apache.kafka.streams.state.internals.SessionStoreBuilderWithHeaders; @@ -39,6 +38,7 @@ import org.apache.kafka.streams.state.internals.TimestampedWindowStoreBuilder; import org.apache.kafka.streams.state.internals.TimestampedWindowStoreWithHeadersBuilder; import org.apache.kafka.streams.state.internals.VersionedKeyValueStoreBuilder; +import org.apache.kafka.streams.state.internals.VersionedKeyValueStoreBuilderWithHeaders; import org.apache.kafka.streams.state.internals.WindowStoreBuilder; import java.time.Duration; diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreWrapper.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreWrapper.java index 0a54dd39c2679..cdda69c5a3084 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreWrapper.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreWrapper.java @@ -18,7 +18,6 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.header.Headers; -import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.streams.errors.InvalidStateStoreException; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StateStoreContext; diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/VersionedKeyValueStoreWithHeadersTopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/VersionedKeyValueStoreWithHeadersTopologyTest.java new file mode 100644 index 0000000000000..b5f20f3068c61 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/VersionedKeyValueStoreWithHeadersTopologyTest.java @@ -0,0 +1,250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.header.internals.RecordHeader; +import org.apache.kafka.common.header.internals.RecordHeaders; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.TestInputTopic; +import org.apache.kafka.streams.TestOutputTopic; +import org.apache.kafka.streams.TopologyTestDriver; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.kstream.Produced; +import org.apache.kafka.streams.state.Stores; +import org.apache.kafka.streams.test.TestRecord; + +import org.junit.jupiter.api.Test; + +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.util.Properties; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; + +/** + * TopologyTestDriver-based tests that validate the full DSL wiring path for + * versioned key-value stores with headers support. + * + * These tests verify: + * 1. Materialized.as(supplier) correctly routes to the headers-aware builder + * 2. Headers are preserved through the put/get path in the store + * 3. Headers are forwarded to the changelog + * 4. The store implements VersionedKeyValueStoreWithHeaders + */ +public class VersionedKeyValueStoreWithHeadersTopologyTest { + + private static final String INPUT_TOPIC = "input-topic"; + private static final String OUTPUT_TOPIC = "output-topic"; + private static final String STORE_NAME = "versioned-headers-store"; + private static final Duration HISTORY_RETENTION = Duration.ofMinutes(10); + + private Properties props() { + final Properties props = new Properties(); + props.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-versioned-headers"); + props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class); + props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class); + return props; + } + + /** + * Verifies that a KTable materialized with persistentVersionedKeyValueStoreWithHeaders + * creates the correct store type and preserves record headers through put/get. + */ + @Test + public void shouldMaterializeVersionedStoreWithHeaders() { + final StreamsBuilder builder = new StreamsBuilder(); + + builder.table( + INPUT_TOPIC, + Consumed.with(Serdes.String(), Serdes.String()), + Materialized.as(Stores.persistentVersionedKeyValueStoreWithHeaders(STORE_NAME, HISTORY_RETENTION)) + ).toStream().to(OUTPUT_TOPIC, Produced.with(Serdes.String(), Serdes.String())); + + try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props())) { + final TestInputTopic inputTopic = driver.createInputTopic( + INPUT_TOPIC, new StringSerializer(), new StringSerializer()); + + final TestOutputTopic outputTopic = driver.createOutputTopic( + OUTPUT_TOPIC, new StringDeserializer(), new StringDeserializer()); + + // Pipe a record with headers + final Headers headers = new RecordHeaders(); + headers.add(new RecordHeader("traceId", "trace-123".getBytes(StandardCharsets.UTF_8))); + headers.add(new RecordHeader("schemaId", new byte[]{0, 0, 0, 42})); + + inputTopic.pipeInput(new TestRecord<>("key1", "value1", headers, 1000L)); + + // Verify the output record is produced + final TestRecord outputRecord = outputTopic.readRecord(); + assertNotNull(outputRecord); + assertEquals("key1", outputRecord.key()); + assertEquals("value1", outputRecord.value()); + + // Verify another put updates the store correctly + final Headers headers2 = new RecordHeaders(); + headers2.add(new RecordHeader("traceId", "trace-456".getBytes(StandardCharsets.UTF_8))); + + inputTopic.pipeInput(new TestRecord<>("key1", "value2", headers2, 2000L)); + + final TestRecord outputRecord2 = outputTopic.readRecord(); + assertNotNull(outputRecord2); + assertEquals("key1", outputRecord2.key()); + assertEquals("value2", outputRecord2.value()); + } + } + + /** + * Verifies that the store supplier implements both required interfaces. + */ + @Test + public void shouldCreateDualInterfaceSupplier() { + final var supplier = Stores.persistentVersionedKeyValueStoreWithHeaders(STORE_NAME, HISTORY_RETENTION); + + assertInstanceOf(org.apache.kafka.streams.state.VersionedBytesStoreSupplier.class, supplier); + assertInstanceOf(org.apache.kafka.streams.state.HeadersBytesStoreSupplier.class, supplier); + } + + /** + * Verifies that a versioned store with headers correctly processes + * multiple records for the same key at different timestamps. + */ + @Test + public void shouldHandleMultipleVersionsWithHeaders() { + final StreamsBuilder builder = new StreamsBuilder(); + + builder.table( + INPUT_TOPIC, + Consumed.with(Serdes.String(), Serdes.String()), + Materialized.as(Stores.persistentVersionedKeyValueStoreWithHeaders(STORE_NAME, HISTORY_RETENTION)) + ).toStream().to(OUTPUT_TOPIC, Produced.with(Serdes.String(), Serdes.String())); + + try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props())) { + final TestInputTopic inputTopic = driver.createInputTopic( + INPUT_TOPIC, new StringSerializer(), new StringSerializer()); + + final TestOutputTopic outputTopic = driver.createOutputTopic( + OUTPUT_TOPIC, new StringDeserializer(), new StringDeserializer()); + + // Insert version 1 + final Headers h1 = new RecordHeaders(); + h1.add(new RecordHeader("version", "v1".getBytes(StandardCharsets.UTF_8))); + inputTopic.pipeInput(new TestRecord<>("key1", "val-v1", h1, 1000L)); + + final TestRecord out1 = outputTopic.readRecord(); + assertEquals("val-v1", out1.value()); + + // Insert version 2 (newer timestamp) + final Headers h2 = new RecordHeaders(); + h2.add(new RecordHeader("version", "v2".getBytes(StandardCharsets.UTF_8))); + inputTopic.pipeInput(new TestRecord<>("key1", "val-v2", h2, 2000L)); + + final TestRecord out2 = outputTopic.readRecord(); + assertEquals("val-v2", out2.value()); + + // Insert version 3 (even newer) + final Headers h3 = new RecordHeaders(); + h3.add(new RecordHeader("version", "v3".getBytes(StandardCharsets.UTF_8))); + inputTopic.pipeInput(new TestRecord<>("key1", "val-v3", h3, 3000L)); + + final TestRecord out3 = outputTopic.readRecord(); + assertEquals("val-v3", out3.value()); + } + } + + /** + * Verifies that tombstone records (null values) work correctly with + * the versioned headers store. + */ + @Test + public void shouldHandleTombstonesWithHeaders() { + final StreamsBuilder builder = new StreamsBuilder(); + + builder.table( + INPUT_TOPIC, + Consumed.with(Serdes.String(), Serdes.String()), + Materialized.as(Stores.persistentVersionedKeyValueStoreWithHeaders(STORE_NAME, HISTORY_RETENTION)) + ).toStream().to(OUTPUT_TOPIC, Produced.with(Serdes.String(), Serdes.String())); + + try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props())) { + final TestInputTopic inputTopic = driver.createInputTopic( + INPUT_TOPIC, new StringSerializer(), new StringSerializer()); + + final TestOutputTopic outputTopic = driver.createOutputTopic( + OUTPUT_TOPIC, new StringDeserializer(), new StringDeserializer()); + + // Put a value + final Headers headers = new RecordHeaders(); + headers.add(new RecordHeader("op", "insert".getBytes(StandardCharsets.UTF_8))); + inputTopic.pipeInput(new TestRecord<>("key1", "value1", headers, 1000L)); + + final TestRecord putOutput = outputTopic.readRecord(); + assertEquals("value1", putOutput.value()); + + // Delete the value (null) + inputTopic.pipeInput(new TestRecord<>("key1", (String) null, new RecordHeaders(), 2000L)); + + final TestRecord deleteOutput = outputTopic.readRecord(); + assertNull(deleteOutput.value()); + } + } + + /** + * Verifies that the changelog topic receives records with proper headers + * when using a versioned store with headers. + */ + @Test + public void shouldForwardHeadersToChangelog() { + final StreamsBuilder builder = new StreamsBuilder(); + + builder.table( + INPUT_TOPIC, + Consumed.with(Serdes.String(), Serdes.String()), + Materialized.as(Stores.persistentVersionedKeyValueStoreWithHeaders(STORE_NAME, HISTORY_RETENTION)) + ).toStream().to(OUTPUT_TOPIC, Produced.with(Serdes.String(), Serdes.String())); + + try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props())) { + final TestInputTopic inputTopic = driver.createInputTopic( + INPUT_TOPIC, new StringSerializer(), new StringSerializer()); + + final String changelogTopic = "test-versioned-headers-" + STORE_NAME + "-changelog"; + + // Verify changelog topic exists by trying to create an output topic for it + final TestOutputTopic changelogOutputTopic = driver.createOutputTopic( + changelogTopic, new StringDeserializer(), new StringDeserializer()); + + // Pipe a record with headers + final Headers headers = new RecordHeaders(); + headers.add(new RecordHeader("traceId", "abc".getBytes(StandardCharsets.UTF_8))); + + inputTopic.pipeInput(new TestRecord<>("key1", "value1", headers, 1000L)); + + // Read the changelog record + final TestRecord changelogRecord = changelogOutputTopic.readRecord(); + assertNotNull(changelogRecord); + assertEquals("key1", changelogRecord.key()); + } + } +} From fb8116bfc44793a98058613a74da721c204ada0b Mon Sep 17 00:00:00 2001 From: shekharrajak Date: Sat, 4 Apr 2026 22:03:40 +0530 Subject: [PATCH 6/7] Address review: include headers in toString, cache typed reference in KeyValueStoreWrapper, normalize null headers in ChangeLogging --- .../streams/state/VersionedKeyValueStoreWithHeaders.java | 3 ++- .../org/apache/kafka/streams/state/VersionedRecord.java | 2 +- .../ChangeLoggingVersionedKeyValueBytesStore.java | 5 +++-- .../streams/state/internals/KeyValueStoreWrapper.java | 8 ++++++-- .../streams/state/VersionedRecordWithHeadersTest.java | 4 +++- 5 files changed, 15 insertions(+), 7 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/state/VersionedKeyValueStoreWithHeaders.java b/streams/src/main/java/org/apache/kafka/streams/state/VersionedKeyValueStoreWithHeaders.java index d956f58f02d6c..8b67fdc5e1567 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/VersionedKeyValueStoreWithHeaders.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/VersionedKeyValueStoreWithHeaders.java @@ -36,9 +36,10 @@ public interface VersionedKeyValueStoreWithHeaders extends VersionedKeyVal * @param key The key * @param value The value, it can be {@code null}. {@code null} is interpreted as a delete. * @param timestamp The timestamp for this record version - * @param headers The record headers + * @param headers The record headers; must not be {@code null} * @return The validTo timestamp of the newly put record, or special values as defined * in {@link VersionedKeyValueStore#put(Object, Object, long)}. + * @throws NullPointerException if {@code headers} is {@code null} */ long put(K key, V value, long timestamp, Headers headers); } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/VersionedRecord.java b/streams/src/main/java/org/apache/kafka/streams/state/VersionedRecord.java index d242c9c2b0b0c..9aaa4a1e27a87 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/VersionedRecord.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/VersionedRecord.java @@ -106,7 +106,7 @@ public Headers headers() { @Override public String toString() { - return "<" + value + "," + timestamp + "," + validTo + ">"; + return "<" + value + "," + timestamp + "," + validTo + "," + headers + ">"; } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingVersionedKeyValueBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingVersionedKeyValueBytesStore.java index f7d8e5096a4ca..4822170e478ca 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingVersionedKeyValueBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingVersionedKeyValueBytesStore.java @@ -47,8 +47,9 @@ public long put(final Bytes key, final byte[] value, final long timestamp) { @Override public long put(final Bytes key, final byte[] value, final long timestamp, final Headers headers) { - final long validTo = inner.put(key, value, timestamp, headers); - log(key, value, timestamp, headers != null ? headers : new RecordHeaders()); + final Headers nonNullHeaders = headers != null ? headers : new RecordHeaders(); + final long validTo = inner.put(key, value, timestamp, nonNullHeaders); + log(key, value, timestamp, nonNullHeaders); return validTo; } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreWrapper.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreWrapper.java index cdda69c5a3084..e2926d3d7b086 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreWrapper.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreWrapper.java @@ -51,6 +51,7 @@ public class KeyValueStoreWrapper implements StateStore { private TimestampedKeyValueStoreWithHeaders headersStore = null; private VersionedKeyValueStore versionedStore = null; + private VersionedKeyValueStoreWithHeaders versionedStoreWithHeaders = null; // same as either timestampedStore or versionedStore above. kept merely as a convenience // to simplify implementation for methods which do not depend on store type. @@ -79,6 +80,9 @@ public KeyValueStoreWrapper(final ProcessorContext context, final String s try { versionedStore = context.getStateStore(storeName); store = versionedStore; + if (versionedStore instanceof VersionedKeyValueStoreWithHeaders) { + versionedStoreWithHeaders = (VersionedKeyValueStoreWithHeaders) versionedStore; + } } catch (final ClassCastException e) { store = context.getStateStore(storeName); final String storeType = store == null ? "null" : store.getClass().getName(); @@ -129,8 +133,8 @@ public long put(final K key, final V value, final long timestamp, final Headers return PUT_RETURN_CODE_IS_LATEST; } if (versionedStore != null) { - if (versionedStore instanceof VersionedKeyValueStoreWithHeaders) { - return ((VersionedKeyValueStoreWithHeaders) versionedStore).put(key, value, timestamp, headers); + if (versionedStoreWithHeaders != null) { + return versionedStoreWithHeaders.put(key, value, timestamp, headers); } return versionedStore.put(key, value, timestamp); } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/VersionedRecordWithHeadersTest.java b/streams/src/test/java/org/apache/kafka/streams/state/VersionedRecordWithHeadersTest.java index 0c3df4e4f6602..94fe896b26fbd 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/VersionedRecordWithHeadersTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/VersionedRecordWithHeadersTest.java @@ -118,6 +118,8 @@ public void shouldPreserveEqualityForRecordsWithDifferentHeaders() { @Test public void shouldReturnCorrectToString() { final VersionedRecord record = new VersionedRecord<>("value", 42L); - assertEquals("", record.toString()); + final String str = record.toString(); + assertTrue(str.startsWith(" Date: Sun, 5 Apr 2026 22:41:03 +0530 Subject: [PATCH 7/7] Include headers in VersionedRecord equals and hashCode --- .../kafka/streams/state/VersionedRecord.java | 5 +++-- .../state/VersionedRecordWithHeadersTest.java | 19 ++++++++++++++++--- 2 files changed, 19 insertions(+), 5 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/state/VersionedRecord.java b/streams/src/main/java/org/apache/kafka/streams/state/VersionedRecord.java index 9aaa4a1e27a87..ec0578426ba52 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/VersionedRecord.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/VersionedRecord.java @@ -119,11 +119,12 @@ public boolean equals(final Object o) { } final VersionedRecord that = (VersionedRecord) o; return timestamp == that.timestamp && validTo == that.validTo && - Objects.equals(value, that.value); + Objects.equals(value, that.value) && + Objects.equals(headers, that.headers); } @Override public int hashCode() { - return Objects.hash(value, timestamp, validTo); + return Objects.hash(value, timestamp, validTo, headers); } } \ No newline at end of file diff --git a/streams/src/test/java/org/apache/kafka/streams/state/VersionedRecordWithHeadersTest.java b/streams/src/test/java/org/apache/kafka/streams/state/VersionedRecordWithHeadersTest.java index 94fe896b26fbd..64530920ea6fd 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/VersionedRecordWithHeadersTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/VersionedRecordWithHeadersTest.java @@ -25,6 +25,7 @@ import java.nio.charset.StandardCharsets; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -99,9 +100,7 @@ public void shouldRejectNullHeadersWithValidTo() { } @Test - public void shouldPreserveEqualityForRecordsWithDifferentHeaders() { - // Equality is based on value, timestamp, and validTo only (not headers). - // This matches the existing contract. + public void shouldNotBeEqualForRecordsWithDifferentHeaders() { final Headers h1 = new RecordHeaders(new RecordHeader[]{ new RecordHeader("k", "v1".getBytes(StandardCharsets.UTF_8)) }); @@ -111,6 +110,20 @@ public void shouldPreserveEqualityForRecordsWithDifferentHeaders() { final VersionedRecord r1 = new VersionedRecord<>("val", 10L, h1); final VersionedRecord r2 = new VersionedRecord<>("val", 10L, h2); + assertNotEquals(r1, r2); + } + + @Test + public void shouldBeEqualForRecordsWithSameHeaders() { + final Headers h1 = new RecordHeaders(new RecordHeader[]{ + new RecordHeader("k", "v1".getBytes(StandardCharsets.UTF_8)) + }); + final Headers h2 = new RecordHeaders(new RecordHeader[]{ + new RecordHeader("k", "v1".getBytes(StandardCharsets.UTF_8)) + }); + final VersionedRecord r1 = new VersionedRecord<>("val", 10L, h1); + final VersionedRecord r2 = new VersionedRecord<>("val", 10L, h2); + assertEquals(r1, r2); assertEquals(r1.hashCode(), r2.hashCode()); }