Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,19 @@
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.api.functions.async.AsyncBatchFunction;
import org.apache.flink.streaming.api.functions.async.AsyncBatchRetryStrategy;
import org.apache.flink.streaming.api.functions.async.AsyncBatchTimeoutPolicy;
import org.apache.flink.streaming.api.functions.async.AsyncFunction;
import org.apache.flink.streaming.api.functions.async.AsyncRetryStrategy;
import org.apache.flink.streaming.api.operators.async.AsyncBatchWaitOperatorFactory;
import org.apache.flink.streaming.api.operators.async.AsyncWaitOperator;
import org.apache.flink.streaming.api.operators.async.AsyncWaitOperatorFactory;
import org.apache.flink.streaming.api.operators.async.OrderedAsyncBatchWaitOperatorFactory;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.Utils;

import java.time.Duration;
import java.util.concurrent.TimeUnit;

import static org.apache.flink.streaming.util.retryable.AsyncRetryStrategies.NO_RETRY_STRATEGY;
Expand Down Expand Up @@ -319,4 +325,285 @@ public static <IN, OUT> SingleOutputStreamOperator<OUT> orderedWaitWithRetry(
OutputMode.ORDERED,
asyncRetryStrategy);
}

// ================================================================================
// Batch Async Operations
// ================================================================================

/**
* Adds an AsyncBatchWaitOperator to process elements in batches. The order of output stream
* records may be reordered (unordered mode).
*
* <p>This method is particularly useful for high-latency inference workloads where batching can
* significantly improve throughput, such as machine learning model inference.
*
* <p>The operator buffers incoming elements and triggers the async batch function when the
* buffer reaches {@code maxBatchSize}. Remaining elements are flushed when the input ends.
*
* @param in Input {@link DataStream}
* @param func {@link AsyncBatchFunction} to process batches of elements
* @param maxBatchSize Maximum number of elements to batch before triggering async invocation
* @param <IN> Type of input record
* @param <OUT> Type of output record
* @return A new {@link SingleOutputStreamOperator}
*/
public static <IN, OUT> SingleOutputStreamOperator<OUT> unorderedWaitBatch(
DataStream<IN> in, AsyncBatchFunction<IN, OUT> func, int maxBatchSize) {
return unorderedWaitBatch(in, func, maxBatchSize, 0L);
}

/**
* Adds an AsyncBatchWaitOperator to process elements in batches with timeout support. The order
* of output stream records may be reordered (unordered mode).
*
* <p>This method is particularly useful for high-latency inference workloads where batching can
* significantly improve throughput, such as machine learning model inference.
*
* <p>The operator buffers incoming elements and triggers the async batch function when either:
*
* <ul>
* <li>The buffer reaches {@code maxBatchSize}
* <li>The {@code batchTimeoutMs} has elapsed since the first buffered element (if timeout is
* enabled)
* </ul>
*
* <p>Remaining elements are flushed when the input ends.
*
* @param in Input {@link DataStream}
* @param func {@link AsyncBatchFunction} to process batches of elements
* @param maxBatchSize Maximum number of elements to batch before triggering async invocation
* @param batchTimeoutMs Batch timeout in milliseconds; <= 0 means timeout is disabled
* @param <IN> Type of input record
* @param <OUT> Type of output record
* @return A new {@link SingleOutputStreamOperator}
*/
public static <IN, OUT> SingleOutputStreamOperator<OUT> unorderedWaitBatch(
DataStream<IN> in,
AsyncBatchFunction<IN, OUT> func,
int maxBatchSize,
long batchTimeoutMs) {
Preconditions.checkArgument(maxBatchSize > 0, "maxBatchSize must be greater than 0");

TypeInformation<OUT> outTypeInfo =
TypeExtractor.getUnaryOperatorReturnType(
func,
AsyncBatchFunction.class,
0,
1,
new int[] {1, 0},
in.getType(),
Utils.getCallLocationName(),
true);

// create transform
AsyncBatchWaitOperatorFactory<IN, OUT> operatorFactory =
new AsyncBatchWaitOperatorFactory<>(
in.getExecutionEnvironment().clean(func), maxBatchSize, batchTimeoutMs);

return in.transform("async batch wait operator", outTypeInfo, operatorFactory);
}

/**
* Adds an AsyncBatchWaitOperator to process elements in batches with ordered output. The order
* of output stream records is guaranteed to be the same as input order.
*
* <p>This method is particularly useful for high-latency inference workloads where batching can
* significantly improve throughput while maintaining ordering guarantees, such as machine
* learning model inference with order-sensitive downstream processing.
*
* <p>The operator buffers incoming elements and triggers the async batch function when either:
*
* <ul>
* <li>The buffer reaches {@code maxBatchSize}
* <li>The {@code maxWaitTime} has elapsed since the first buffered element
* </ul>
*
* <p>Results are buffered and emitted in the original input order, regardless of async
* completion order.
*
* @param in Input {@link DataStream}
* @param func {@link AsyncBatchFunction} to process batches of elements
* @param maxBatchSize Maximum number of elements to batch before triggering async invocation
* @param maxWaitTime Maximum duration to wait before flushing a partial batch; Duration.ZERO or
* negative means timeout is disabled
* @param <IN> Type of input record
* @param <OUT> Type of output record
* @return A new {@link SingleOutputStreamOperator}
*/
public static <IN, OUT> SingleOutputStreamOperator<OUT> orderedWaitBatch(
DataStream<IN> in,
AsyncBatchFunction<IN, OUT> func,
int maxBatchSize,
Duration maxWaitTime) {
Preconditions.checkArgument(maxBatchSize > 0, "maxBatchSize must be greater than 0");
Preconditions.checkNotNull(maxWaitTime, "maxWaitTime must not be null");

long batchTimeoutMs = maxWaitTime.toMillis();

TypeInformation<OUT> outTypeInfo =
TypeExtractor.getUnaryOperatorReturnType(
func,
AsyncBatchFunction.class,
0,
1,
new int[] {1, 0},
in.getType(),
Utils.getCallLocationName(),
true);

// create transform
OrderedAsyncBatchWaitOperatorFactory<IN, OUT> operatorFactory =
new OrderedAsyncBatchWaitOperatorFactory<>(
in.getExecutionEnvironment().clean(func), maxBatchSize, batchTimeoutMs);

return in.transform("ordered async batch wait operator", outTypeInfo, operatorFactory);
}

/**
* Adds an AsyncBatchWaitOperator to process elements in batches with ordered output. The order
* of output stream records is guaranteed to be the same as input order.
*
* <p>This overload disables timeout-based batching. Batches are only flushed when the buffer
* reaches {@code maxBatchSize} or when the input ends.
*
* @param in Input {@link DataStream}
* @param func {@link AsyncBatchFunction} to process batches of elements
* @param maxBatchSize Maximum number of elements to batch before triggering async invocation
* @param <IN> Type of input record
* @param <OUT> Type of output record
* @return A new {@link SingleOutputStreamOperator}
*/
public static <IN, OUT> SingleOutputStreamOperator<OUT> orderedWaitBatch(
DataStream<IN> in, AsyncBatchFunction<IN, OUT> func, int maxBatchSize) {
return orderedWaitBatch(in, func, maxBatchSize, Duration.ZERO);
}

// ================================================================================
// Batch Async Operations with Retry and Timeout Support
// ================================================================================

/**
* Adds an AsyncBatchWaitOperator to process elements in batches with retry and timeout support.
* The order of output stream records may be reordered (unordered mode).
*
* <p>This method is particularly useful for high-latency inference workloads where:
*
* <ul>
* <li>Batching can significantly improve throughput (e.g., ML model inference)
* <li>Retry logic is needed for transient failures
* <li>Timeout handling is required to prevent indefinite waiting
* </ul>
*
* <p>The operator buffers incoming elements and triggers the async batch function when either:
*
* <ul>
* <li>The buffer reaches {@code maxBatchSize}
* <li>The {@code batchTimeoutMs} has elapsed since the first buffered element
* </ul>
*
* @param in Input {@link DataStream}
* @param func {@link AsyncBatchFunction} to process batches of elements
* @param maxBatchSize Maximum number of elements to batch before triggering async invocation
* @param batchTimeoutMs Batch timeout in milliseconds; <= 0 means timeout is disabled
* @param retryStrategy Retry strategy for failed batch operations
* @param asyncTimeoutPolicy Timeout policy for async batch operations
* @param <IN> Type of input record
* @param <OUT> Type of output record
* @return A new {@link SingleOutputStreamOperator}
*/
public static <IN, OUT> SingleOutputStreamOperator<OUT> unorderedWaitBatch(
DataStream<IN> in,
AsyncBatchFunction<IN, OUT> func,
int maxBatchSize,
long batchTimeoutMs,
AsyncBatchRetryStrategy<OUT> retryStrategy,
AsyncBatchTimeoutPolicy asyncTimeoutPolicy) {
Preconditions.checkArgument(maxBatchSize > 0, "maxBatchSize must be greater than 0");
Preconditions.checkNotNull(retryStrategy, "retryStrategy must not be null");
Preconditions.checkNotNull(asyncTimeoutPolicy, "asyncTimeoutPolicy must not be null");

TypeInformation<OUT> outTypeInfo =
TypeExtractor.getUnaryOperatorReturnType(
func,
AsyncBatchFunction.class,
0,
1,
new int[] {1, 0},
in.getType(),
Utils.getCallLocationName(),
true);

// create transform
AsyncBatchWaitOperatorFactory<IN, OUT> operatorFactory =
new AsyncBatchWaitOperatorFactory<>(
in.getExecutionEnvironment().clean(func),
maxBatchSize,
batchTimeoutMs,
retryStrategy,
asyncTimeoutPolicy);

return in.transform("async batch wait operator", outTypeInfo, operatorFactory);
}

/**
* Adds an AsyncBatchWaitOperator with retry support only. The order of output stream records
* may be reordered (unordered mode).
*
* @param in Input {@link DataStream}
* @param func {@link AsyncBatchFunction} to process batches of elements
* @param maxBatchSize Maximum number of elements to batch before triggering async invocation
* @param batchTimeoutMs Batch timeout in milliseconds; <= 0 means timeout is disabled
* @param retryStrategy Retry strategy for failed batch operations
* @param <IN> Type of input record
* @param <OUT> Type of output record
* @return A new {@link SingleOutputStreamOperator}
*/
public static <IN, OUT> SingleOutputStreamOperator<OUT> unorderedWaitBatchWithRetry(
DataStream<IN> in,
AsyncBatchFunction<IN, OUT> func,
int maxBatchSize,
long batchTimeoutMs,
AsyncBatchRetryStrategy<OUT> retryStrategy) {
return unorderedWaitBatch(
in,
func,
maxBatchSize,
batchTimeoutMs,
retryStrategy,
AsyncBatchTimeoutPolicy.NO_TIMEOUT_POLICY);
}

/**
* Adds an AsyncBatchWaitOperator with timeout support only. The order of output stream records
* may be reordered (unordered mode).
*
* @param in Input {@link DataStream}
* @param func {@link AsyncBatchFunction} to process batches of elements
* @param maxBatchSize Maximum number of elements to batch before triggering async invocation
* @param batchTimeoutMs Batch timeout in milliseconds; <= 0 means timeout is disabled
* @param asyncTimeoutPolicy Timeout policy for async batch operations
* @param <IN> Type of input record
* @param <OUT> Type of output record
* @return A new {@link SingleOutputStreamOperator}
*/
@SuppressWarnings("unchecked")
public static <IN, OUT> SingleOutputStreamOperator<OUT> unorderedWaitBatchWithTimeout(
DataStream<IN> in,
AsyncBatchFunction<IN, OUT> func,
int maxBatchSize,
long batchTimeoutMs,
AsyncBatchTimeoutPolicy asyncTimeoutPolicy) {
return unorderedWaitBatch(
in,
func,
maxBatchSize,
batchTimeoutMs,
(AsyncBatchRetryStrategy<OUT>)
org.apache.flink.streaming.util.retryable.AsyncBatchRetryStrategies
.NO_RETRY_STRATEGY,
asyncTimeoutPolicy);
}

// TODO: Add event-time based batching support in follow-up PR
// TODO: Add ordered batch operations with retry/timeout in follow-up PR
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.api.functions.async;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.functions.Function;

import java.io.Serializable;
import java.util.List;

/**
* A function to trigger Async I/O operations in batches.
*
* <p>For each batch of inputs, an async I/O operation can be triggered via {@link
* #asyncInvokeBatch}, and once it has been done, the results can be collected by calling {@link
* ResultFuture#complete}. This is particularly useful for high-latency inference workloads where
* batching can significantly improve throughput.
*
* <p>Unlike {@link AsyncFunction} which processes one element at a time, this interface allows
* processing multiple elements together, which is beneficial for scenarios like:
*
* <ul>
* <li>Machine learning model inference where batching improves GPU utilization
* <li>External service calls that support batch APIs
* <li>Database queries that can be batched for efficiency
* </ul>
*
* <p>Example usage:
*
* <pre>{@code
* public class BatchInferenceFunction implements AsyncBatchFunction<String, String> {
*
* public void asyncInvokeBatch(List<String> inputs, ResultFuture<String> resultFuture) {
* // Submit batch inference request
* CompletableFuture.supplyAsync(() -> {
* List<String> results = modelService.batchInference(inputs);
* return results;
* }).thenAccept(results -> resultFuture.complete(results));
* }
* }
* }</pre>
*
* @param <IN> The type of the input elements.
* @param <OUT> The type of the returned elements.
*/
@PublicEvolving
public interface AsyncBatchFunction<IN, OUT> extends Function, Serializable {

/**
* Trigger async operation for a batch of stream inputs.
*
* <p>The implementation should process all inputs in the batch and complete the result future
* with all corresponding outputs. The number of outputs does not need to match the number of
* inputs - it depends on the specific use case.
*
* @param inputs a batch of elements coming from upstream tasks
* @param resultFuture to be completed with the result data for the entire batch
* @throws Exception in case of a user code error. An exception will make the task fail and
* trigger fail-over process.
*/
void asyncInvokeBatch(List<IN> inputs, ResultFuture<OUT> resultFuture) throws Exception;

// TODO: Add timeout handling in follow-up PR
// TODO: Add open/close lifecycle methods in follow-up PR
}
Loading