Skip to content

Conversation

@featzhang
Copy link
Member

Motivation

Flink’s existing AsyncFunction processes records one-by-one, which is not optimal for high-latency inference workloads such as machine learning model serving, where batching requests can significantly improve throughput and resource utilization (e.g. GPU/remote inference services).

Currently, users have to implement batching logic outside of Flink or embed complex buffering logic inside a single-record AsyncFunction, which is error-prone and hard to evolve.

This PR introduces a minimal, batch-oriented async abstraction as a foundation for future AI / inference-related extensions.


What is proposed in this PR

This PR adds a small, additive, and backward-compatible set of building blocks:

  1. AsyncBatchFunction (PublicEvolving API)
    A new async function interface that allows processing a batch of input elements in a single async invocation.

  2. AsyncBatchWaitOperator (runtime operator)
    A minimal unordered async batch operator that:

    • Buffers incoming records
    • Triggers async invocation when maxBatchSize is reached
    • Emits results as soon as the async call completes
    • Flushes remaining elements on end-of-input
  3. Stream API entry point
    A new AsyncDataStream.unorderedWaitBatch(...) method to wire the operator in the DataStream API.

  4. Unit tests
    Tests validating batch triggering, result emission, and exception propagation.


Scope and intentional limitations

This PR is intentionally minimal and does not include:

  • Ordered output mode
  • Time-based batching or timers
  • Retry / timeout handling
  • Metrics
  • SQL / Table API integration
  • Python integration
  • Any changes to existing AsyncFunction or AsyncWaitOperator

These aspects are expected to be explored incrementally in follow-up PRs once the core abstraction is agreed upon.


Why this approach

  • Keeps the initial API surface small and reviewable
  • Avoids coupling with existing async internals
  • Makes future extensions (timers, retries, Python, SQL) explicit and incremental
  • Aligns with Flink’s philosophy of evolving APIs through small, focused changes

Follow-up work (out of scope for this PR)

Potential next steps include:

  • Ordered batch async operator
  • Time-based batch triggering
  • Async retry and timeout strategies
  • Metrics and backpressure awareness
  • Python / ML inference integrations
  • SQL/Table API exposure

Compatibility

  • Fully backward-compatible
  • Purely additive changes
  • No behavior changes to existing async operators

@flinkbot
Copy link
Collaborator

flinkbot commented Dec 21, 2025

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants