Skip to content

Conversation

@featzhang
Copy link
Member

What is the purpose of the change

This PR adds Python DataStream API integration for the existing Java AsyncBatchWaitOperator runtime capability, enabling Python-based AI/ML inference and external service calls to use batch-oriented async execution.

This is a pure integration PR - all batching, scheduling, and async execution logic is reused from the Java side.

Brief change log

New Python Classes

File Description
AsyncBatchFunction Python async batch function interface
AsyncBatchFunctionDescriptor Descriptor for serialization and configuration
AsyncBatchOperation Runtime operation for batch async execution
BatchResultDistributor Distributes batch results to individual elements

Modified Files

File Changes
async_data_stream.py Added unordered_wait_batch() and ordered_wait_batch() methods
functions.py Added AsyncBatchFunction and AsyncBatchFunctionDescriptor classes
__init__.py Exported AsyncBatchFunction
flink-fn-execution.proto Added ASYNC_BATCH function type

Test Files

File Description
test_async_batch_function.py Comprehensive tests for batch async functionality

API Design

AsyncBatchFunction

class AsyncBatchFunction(Function, Generic[IN, OUT]):
    """
    A function to trigger Async I/O operation with batch processing support.
    Designed for AI/ML inference scenarios where batching improves throughput.
    """

    @abstractmethod
    async def async_invoke_batch(self, inputs: List[IN]) -> List[OUT]:
        """
        Trigger async operation for a batch of stream inputs.
        Returns a list of results, one for each input element.
        """
        pass

    def timeout_batch(self, inputs: List[IN]) -> List[OUT]:
        """
        Called when async_invoke_batch times out.
        Override to provide custom timeout handling.
        """
        raise TimeoutError("Async batch function call has timed out")

AsyncDataStream Methods

# Unordered batch execution
AsyncDataStream.unordered_wait_batch(
    data_stream,
    async_batch_function,
    timeout,           # Overall timeout
    batch_size,        # Max elements per batch
    batch_timeout=None,# Optional batch flush timeout
    capacity=100,      # Max in-flight operations
    output_type=None   # Output type info
)

# Ordered batch execution (preserves input order)
AsyncDataStream.ordered_wait_batch(
    data_stream,
    async_batch_function,
    timeout,
    batch_size,
    batch_timeout=None,
    capacity=100,
    output_type=None
)

Example Usage

from pyflink.datastream import AsyncDataStream, AsyncBatchFunction
from pyflink.common import Time, Types, Row
from typing import List

class MLInferenceFunction(AsyncBatchFunction):
    """Batch ML model inference function."""
    
    async def async_invoke_batch(self, inputs: List[Row]) -> List[float]:
        # Batch inference call to ML model
        features = [self.extract_features(row) for row in inputs]
        predictions = await self.model.predict_batch(features)
        return predictions.tolist()

# Apply to data stream
result = AsyncDataStream.unordered_wait_batch(
    ds,
    MLInferenceFunction(),
    timeout=Time.seconds(30),
    batch_size=32,
    batch_timeout=Time.milliseconds(100),
    output_type=Types.FLOAT()
)

Testing

The PR includes comprehensive tests covering:

  1. Basic batch execution - Verify batching and results
  2. Batch size triggering - Verify batches trigger at configured size
  3. Batch timeout triggering - Verify partial batches flush on timeout
  4. Exception propagation - Verify errors fail the job
  5. Timeout handling - Verify timeout_batch is called
  6. Ordered execution - Verify output order matches input
  7. End-of-input flush - Verify remaining elements are flushed
  8. Validation errors - Verify parameter validation

Design Principles

  1. Reuse Java Runtime - All batching logic is in AsyncBatchWaitOperator
  2. Follow Existing Patterns - API mirrors AsyncFunction integration
  3. Explicit, Readable Code - No complex abstractions
  4. Backward Compatible - Existing APIs unchanged

Verifying this change

This change added tests and can be verified as follows:

cd flink-python
python -m pytest pyflink/datastream/tests/test_async_batch_function.py -v

Does this pull request potentially affect one of the following parts

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): yes
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no
  • Anything that affects deployment or recovery: no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? yes
  • If yes, how is the feature documented? Docstrings

averyzhang added 7 commits December 21, 2025 17:40
…erence

This commit introduces SQL/Table API support for batch async lookup joins,
enabling AI/ML inference scenarios where batching lookups improves throughput.

Key additions:
- AsyncBatchLookupFunction: Batch-oriented async lookup interface
- AsyncBatchLookupFunctionProvider: Provider with batch configuration
- AsyncBatchLookupJoinRunner: Runtime lookup join runner
- AsyncBatchLookupJoinFunctionAdapter: Adapter to streaming AsyncBatchFunction
- LookupJoinUtil: Batch async lookup detection and options extraction
- FunctionKind.ASYNC_BATCH_TABLE: New function kind enum

The implementation bridges the Table API layer to the existing
AsyncBatchWaitOperator runtime, ensuring consistent behavior with
size-based, time-based batching, retry, and timeout strategies.
…BatchFunction

This commit introduces Python support for batch-oriented async operations,
enabling AI/ML inference scenarios to use batch processing for improved
throughput.

Key additions:
- AsyncBatchFunction class for batch async operations
- AsyncDataStream.unordered_wait_batch() method
- AsyncDataStream.ordered_wait_batch() method
- AsyncBatchOperation runtime implementation
- Comprehensive unit tests

The implementation reuses the Java AsyncBatchWaitOperator for all batching
and scheduling logic, following existing PyFlink async function patterns.
@flinkbot
Copy link
Collaborator

flinkbot commented Dec 22, 2025

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants