Skip to content

Fix late tuple stream serialization in windowed bolts#8790

Open
dpol1 wants to merge 3 commits into
apache:masterfrom
dpol1:fix/7782
Open

Fix late tuple stream serialization in windowed bolts#8790
dpol1 wants to merge 3 commits into
apache:masterfrom
dpol1:fix/7782

Conversation

@dpol1

@dpol1 dpol1 commented Jun 13, 2026

Copy link
Copy Markdown
Member

What is the purpose of the change

  • A windowed bolt configured with a late tuple stream (withLateTupleStream) fails to deliver late tuples across worker boundaries. The emit wraps the input Tuple as a value, and TupleImpl holds a GeneralTopologyContext that Kryo cannot serialize, so the late stream breaks as soon as it targets a task on another worker (works in local mode only).

  • This adds DetachedTuple, a serializable snapshot of a Tuple (source component/task/stream, fields, values, no context). WindowedBoltExecutor now emits a DetachedTuple on the late stream instead of the raw TupleImpl.

    Note: the value in LATE_TUPLE_FIELD is now a DetachedTuple. It is unanchored, uses value-based equality, and getContext() throws. Docs and withLateTupleStream javadoc updated accordingly.

  • Fix [STORM-4000] Processing late tuples from BaseWindowedBolt results in serialization exception #7782

How was the change tested

  • DetachedTupleTest: snapshot, field/value access, equals/hashCode, unanchored message id, getContext() throwing.
  • WindowedBoltExecutorTest: regression test round-tripping a late tuple through Kryo with topology.fall.back.on.java.serialization=false (the path that failed).
  • Updated PersistentWindowedBoltExecutorTest mocks for the new constructor reads.

dpol1 added 3 commits June 12, 2026 23:30
A TupleImpl references its topology context and cannot be serialized,
so a tuple emitted as a value inside another tuple breaks as soon as it
crosses a worker boundary. DetachedTuple snapshots the source
component, task, stream, output fields and values of a tuple into a
self-contained, Kryo-registered representation that survives
serialization.
The late tuple stream emitted the original TupleImpl as a value of the
outgoing tuple. TupleImpl references the topology context, which cannot
be serialized, so any consumer of the late tuple stream running in a
different worker failed with a KryoException. Emit a DetachedTuple copy
instead, which survives serialization while still exposing the Tuple
interface to consumers.
Document that the value of the late tuple field is a DetachedTuple, a
serializable copy of the original tuple detached from the topology
context, in the windowing guide and in the withLateTupleStream javadoc
of BaseWindowedBolt, TumblingWindows and SlidingWindows.
@reiabreu reiabreu requested a review from rzo1 June 14, 2026 13:50
@reiabreu reiabreu added the bug label Jun 14, 2026
@reiabreu reiabreu added this to the 3.0.0 milestone Jun 14, 2026

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we some extra minor tests?

  • DetachedTuple can serialize and deserialize successfully when the tuple contains null
  • include getShort , getByte , getBoolean in the tests
  • comparing a DetachedTuple to null or a non- DetachedTuple instance

@reiabreu reiabreu left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for this! Left a minor comment, but I'm happy to merge it.

@rzo1

rzo1 commented Jun 14, 2026

Copy link
Copy Markdown
Contributor

Thanks @dpol1, this looks good.

Two small things before merge:

  • This changes behavior for anyone consuming the late stream in local mode today: the value type goes from Tuple to DetachedTuple and getContext() now throws. It's documented, but please also note it in the changelog/release notes.
  • The Kryo round-trip tests all serialize a fresh DetachedTuple where fields is still null, so the transient-skip path isn't actually exercised. A test that calls getFields() before the round-trip would lock that in.

Otherwise looks correct to me. Happy to approve once those are in, plus @reiabreu's test requests.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[STORM-4000] Processing late tuples from BaseWindowedBolt results in serialization exception

3 participants