Skip to content

[WIP][SPARK-56248][PYTHON][SS] Optimize python TWS stateful processor serialization calls#55039

Open
jiateoh wants to merge 1 commit intoapache:masterfrom
jiateoh:tws_python_serialization_improvements
Open

[WIP][SPARK-56248][PYTHON][SS] Optimize python TWS stateful processor serialization calls#55039
jiateoh wants to merge 1 commit intoapache:masterfrom
jiateoh:tws_python_serialization_improvements

Conversation

@jiateoh
Copy link
Contributor

@jiateoh jiateoh commented Mar 26, 2026

What changes were proposed in this pull request?

Skip unnecessary list+Row construction, add cache serializer, hoist normalize helpers.

  1. Eliminate per-call Row(**dict(zip(...))) construction in _serialize_to_bytes; pass normalized tuples directly to schema.toInternal which handles them by index
  2. Cache the serialize callable per schema id so closure creation and attribute lookups happen once per schema rather than per row. Although this cache is unbounded, it scales by number of state store schemas provided and the surrounding client is instantiated once per batch/partition, rather than surviving indefinitely.
  3. Hoist _normalize_value/_normalize_tuple to module level to avoid re-creating closures and re-importing numpy on every call
  4. Add has_numpy to avoid circular dependency, as the existing have_numpy depends on dataframe (hence the original deferred import). The previous have_numpy is also in a testing file whereas the new has_numpy is in a dedicated utils file.

Of the above changes, the first one is the only one with significant logic changes (removing Row/dict/list creation). The next two are primarily moving/caching function definitions, and the last is a new helper method for enabling the new function locations.

To better explain the removal of Row usage:

  • StructType.toInternal dispatches on type: for dict it looks up by field name, for tuple/list it zips by position. So functionally there is no need to convert the tuple to list.
  • Row is a tuple subclass, so it always hit the positional branch.
  • Since 3.0.0 (types.py change notes), Row field names are insertion ordered. Python dictionaries (as of 3.7+) are also insertion ordered.
  • dict(zip(field_names, converted)) → Row(**...) ends up adding extra hops to (1) fetch field names, (2) zip them with row values, (3) create an insertion-ordered dictionary of those field names, and (4) create an insertion-ordered row (dropping the field names which are no longer used). With the end result being a Row (tuple subclass) which uses same positional branch of Schema.toInternal as the original input tuple would.

AI-assisted + human reviewed/updated

Why are the changes needed?

This is a code cleanup/performance optimization. Original code has unnecessary operations that are executed for every row, including: rebuilding closures, extracting field names, building intermediate lists + dicts, and constructing Row objects (which sort by field unnecessarily). These can all add minor overhead while having no effect on the underlying usage.

Does this PR introduce any user-facing change?

No

How was this patch tested?

Unit tests

Was this patch authored or co-authored using generative AI tooling?

Generated-by: Claude Code (Claude Opus 4.6)

return serializer

field_names = [f.name for f in schema.fields]
row_value = Row(**dict(zip(field_names, converted)))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copying from the PR description for reference on why this field_name/Row creation is no longer necessary:

  • StructType.toInternal dispatches on type: for dict it looks up by field name, for tuple/list it zips by position. So functionally there is no need to convert the tuple to list. (L521 deletion)
  • Row is a tuple subclass, so it always hit the positional branch.
  • Since 3.0.0 (types.py change notes), Row field names are insertion ordered. Python dictionaries (as of 3.7+) are also insertion ordered.
  • dict(zip(field_names, converted)) → Row(**...) ends up adding extra hops to (1) fetch field names, (2) zip them with row values, (3) create an insertion-ordered dictionary of those field names, and (4) create an insertion-ordered row (dropping the field names which are no longer used). With the end result being a Row (tuple subclass) which uses same positional branch of Schema.toInternal as the original input tuple would.

@jiateoh jiateoh changed the title [WIP] Optimize python TWS stateful processor serialization calls [SPARK-56248][PYTHON][SS] Optimize python TWS stateful processor serialization calls Mar 27, 2026
@jiateoh jiateoh changed the title [SPARK-56248][PYTHON][SS] Optimize python TWS stateful processor serialization calls [WIP][SPARK-56248][PYTHON][SS] Optimize python TWS stateful processor serialization calls Mar 27, 2026
…on, cache serializer, hoist normalize helpers.

- Eliminate per-call Row(**dict(zip(...))) construction in _serialize_to_bytes;
  pass normalized tuples directly to schema.toInternal which handles them by index
- Cache the serialize callable per schema id so field extraction and closure
  creation happen once per schema rather than per row
- Hoist _normalize_value/_normalize_tuple to module level to avoid re-creating
  closures and re-importing numpy on every call
- Add non-testing helper method has_numpy to avoid circular dependency.

AI-assisted + human reviewed/edited

Generated-by: Claude Code (Claude Opus 4.6)
@jiateoh jiateoh force-pushed the tws_python_serialization_improvements branch from 9d1665d to 243a49b Compare March 27, 2026 00:30
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant