Skip to content

KAFKA-20155: Implement lazy header deserialization for state stores#21996

Open
Shekharrajak wants to merge 4 commits intoapache:trunkfrom
Shekharrajak:KAFKA-20155-lazy-header-deserialization
Open

KAFKA-20155: Implement lazy header deserialization for state stores#21996
Shekharrajak wants to merge 4 commits intoapache:trunkfrom
Shekharrajak:KAFKA-20155-lazy-header-deserialization

Conversation

@Shekharrajak
Copy link
Copy Markdown

Ref https://issues.apache.org/jira/browse/KAFKA-20155

When reading from header-aware state stores (KIP-1271), header bytes embedded in store values are eagerly deserialized into RecordHeaders even when the downstream value deserializer does not use them. This PR introduces LazyHeaders -- a lazy Headers implementation that defers parsing until first read access, avoiding unnecessary work in header-agnostic deserialization paths.

Changes:

ValueTimestampHeadersDeserializer.deserialize() -- calls HeadersDeserializer.deserialize(rawHeaders) and passes the result to the value deserializer
Utils.readHeaders() -- called by AggregationWithHeadersDeserializer for session store reads

@github-actions github-actions bot added triage PRs from the community streams labels Apr 8, 2026
Copy link
Copy Markdown
Contributor

@UladzislauBlok UladzislauBlok left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry to said that (correct me if I'm wrong), but that looks like absolute vibe code
I think this ticket was blocked by https://issues.apache.org/jira/browse/KAFKA-20179 . It doesn't make sense to just create this class without taking into account how this will be used

Comment on lines +103 to +106
private static boolean headersEqual(final Headers a, final Headers b) {
if (a == b) return true;
if (a == null || b == null) return false;
return Arrays.equals(a.toArray(), b.toArray());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do need it?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RecordHeaders.equals have getClass() != o.getClass() because of that tests were failing for RecordHeaders.equals(LazyHeaders) so we need to handle that.

@Shekharrajak
Copy link
Copy Markdown
Author

@UladzislauBlok The Lazy Header idea is discussed in the ticket.

The logic behind this design is - do not parse (HeadersDeserializer.deserialize(rawHeader)) until someone reading it.

1. [ChangeLoggingTimestampedKeyValueBytesStoreWithHeaders.put(key, valueTimestampHeaders)](https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStoreWithHeaders.java#L50)
   -> wrapped().put(key, valueTimestampHeaders)              // writes to RocksDB
   -> headers = Utils.headers(valueTimestampHeaders)         // returns LazyHeaders. NO PARSE.

2. log(key, rawValue, timestamp, lazyHeaders)
   -> internalContext.logChange(name, key, rawValue, timestamp, lazyHeaders, position)

3. ProcessorContextImpl.logChange()                          // line 128
   -> addVectorClockToHeaders(lazyHeaders)                   // calls lazyHeaders.add(). STILL NO PARSE.
   -> collector.send(topic, key, rawValue, lazyHeaders, ...)

4. RecordCollectorImpl.send()                                // line 263
   -> new ProducerRecord(topic, partition, timestamp, keyBytes, valBytes, lazyHeaders)
      // just stores the reference. STILL NO PARSE.
   -> send(key, value, processorNodeId, context, serializedRecord)

5. StreamsProducer -> KafkaProducer.send(record)

6. KafkaProducer.doSend()                                    // line 1139
   -> Header[] headers = record.headers().toArray()          // [PARSE HAPPENS HERE](https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L1139)
   -> RecordAccumulator.append(... headers ...)              // headers already parsed as Header[

@UladzislauBlok
Copy link
Copy Markdown
Contributor

UladzislauBlok commented Apr 8, 2026

@UladzislauBlok The Lazy Header idea is discussed in the ticket.

#21676

The problem with producer is still here. You just moved deserialization to later steps

@Shekharrajak
Copy link
Copy Markdown
Author

The problem with producer is still here. You just moved deserialization to later steps

Eliminating the producer-side parse entirely would require changing the producer API to accept raw header bytes instead of Header[], which is: Out of scope for KIP-1271 and Would require a KIP of its own.

@Shekharrajak
Copy link
Copy Markdown
Author

@muralibasani feel free to cherry pick commits or commit to this branch, since you already working around KAFKA-20179 - which I missed seeing in ticket.

@github-actions github-actions bot removed the triage PRs from the community label Apr 9, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants