KAFKA-20400: Enable VersionedKeyValueStoreWithHeaders in DSL#21960
KAFKA-20400: Enable VersionedKeyValueStoreWithHeaders in DSL#21960Shekharrajak wants to merge 7 commits intoapache:trunkfrom
Conversation
| private final V value; | ||
| private final long timestamp; | ||
| private final Optional<Long> validTo; | ||
| private final Headers headers; |
There was a problem hiding this comment.
Shouldn't the equals() method also check headers equality?
There was a problem hiding this comment.
Thanks for review.
This was an intentional design choice to preserve backward compatibility. VersionedRecord.equals() is a pre-existing contract -- adding headers to it would change comparison semantics for existing code that relies on the current behavior. Headers are metadata, not part of the record version identity (two records with the same value/timestamp but different tracing headers are the same logical version). That said, I can see the argument for consistency with ValueTimestampHeaders.equals(). If the consensus is to include headers in equality, I can make that change -- though it would need a note in the doc about the behavioral change. What do you think?
There was a problem hiding this comment.
IMO headers should be part of equals and hashCode for consistency.
For backwards compatibility, new RecordHeaders() will be empty I guess, so behaviour of equals method and hashcode is still ok.
There was a problem hiding this comment.
You are right. For consistency with ValueTimestampHeaders.equals() and the general Java convention that all significant fields participate in equality, I will include headers in both equals() and hashCode(). The backward compatibility risk is low since existing code constructs VersionedRecord with the no-headers constructors (which default to empty RecordHeaders), and non-headers stores also return records with empty headers. I will update this in the next push.
There was a problem hiding this comment.
For consistency with ValueTimestampHeaders.equals() and the general Java convention that all significant fields participate in equality
Does it really written by you?
streams/src/main/java/org/apache/kafka/streams/state/VersionedRecord.java
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/state/VersionedBytesStore.java
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreWrapper.java
Outdated
Show resolved
Hide resolved
.../java/org/apache/kafka/streams/state/internals/ChangeLoggingVersionedKeyValueBytesStore.java
Outdated
Show resolved
Hide resolved
…ders interface, and VersionedBytesStore default put
…log for versioned stores
…eMaterializer, and KeyValueStoreWrapper
…h headers DSL wiring
… KeyValueStoreWrapper, normalize null headers in ChangeLogging
0ecf489 to
6260c3c
Compare
muralibasani
left a comment
There was a problem hiding this comment.
LGTM, thank you for addressing the comments.
|
@mjsax @aliehsaeedii Please have a look. |
|
@Shekharrajak why do we bring the same code twice? #21995 |
Already mentioned about it here https://issues.apache.org/jira/browse/KAFKA-20400?focusedCommentId=18071886&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-18071886 Please let me know if both the tickets should be addressed in single PR/branch. My understanding was KAFKA-20399 is for the store-level implementation ( store, retrieve - versioned key-value data along with record headers in RocksDB) |
Ref https://issues.apache.org/jira/browse/KAFKA-20400
This PR extends the KIP-1285 headers-aware state store infrastructure to versioned key-value stores. Prior to this change, KIP-1285 was implemented for timestamped, windowed, and session stores, but versioned stores silently dropped record headers on both the write and read paths.
Record headers carry metadata (tracing IDs, schema references, auth tokens) that downstream processors and interactive query clients may need. Versioned stores (RocksDBVersionedStore) were the only remaining store type that discarded headers. This PR closes that gap.
API changes:
VersionedRecord -- Added Headers headers field with new constructors and a headers() accessor. Existing constructors default to empty headers (backward compatible).
Stores.persistentVersionedKeyValueStoreWithHeaders(String, Duration) -- Factory method returning a dual-interface supplier.
Stores.versionedKeyValueStoreBuilderWithHeaders(VersionedBytesStoreSupplier, Serde, Serde) -- Builder factory method.
RocksDBVersionedStoreWithHeaders -- Extends RocksDBVersionedStore, encodes headers into value bytes using [headersSize(varint)][headersBytes][rawValue] format before delegating to the parent store. Decodes on read.
ChangeLoggingVersionedKeyValueBytesStore -- Added put(key, value, timestamp, headers) override to forward real headers to the changelog instead of empty headers.
KeyValueStoreWrapper -- get() now returns VersionedRecord.headers() instead of empty headers; put() forwards headers to VersionedKeyValueStoreWithHeaders when available.