Skip to content

Conversation

@featzhang
Copy link
Member

What is the purpose of the change

This PR adds SQL/Table API integration for the existing AsyncBatchWaitOperator runtime capability, enabling batch-oriented async lookup joins for AI/ML inference scenarios.

Building on previous PRs that introduced:

  • AsyncBatchFunction and AsyncBatchWaitOperator (size/time-based batching)
  • Retry and timeout strategies
  • Comprehensive metrics

This PR bridges the gap between the streaming runtime and the SQL/Table API layer.

Brief change log

New API Classes (flink-table-common)

File Description
AsyncBatchLookupFunction.java New lookup function interface for batch async operations
AsyncBatchLookupFunctionProvider.java Provider for creating batch lookup functions with configuration

New Runtime Classes (flink-table-runtime)

File Description
AsyncBatchLookupJoinRunner.java Batch-oriented async lookup join runner
AsyncBatchLookupJoinFunctionAdapter.java Adapter bridging Table API to streaming AsyncBatchFunction

Modified Classes

File Description
FunctionKind.java Added ASYNC_BATCH_TABLE enum value
LookupJoinUtil.java Added batch async lookup detection and options extraction

API Design

AsyncBatchLookupFunction

@PublicEvolving
public abstract class AsyncBatchLookupFunction extends AsyncTableFunction<RowData> {
    
    // Primary batch lookup method
    public abstract CompletableFuture<Collection<RowData>> asyncLookupBatch(List<RowData> keyRows);
    
    // Single-key lookup delegates to batch
    public CompletableFuture<Collection<RowData>> asyncLookup(RowData keyRow);
}

AsyncBatchLookupFunctionProvider

@PublicEvolving
public interface AsyncBatchLookupFunctionProvider extends LookupTableSource.LookupRuntimeProvider {
    
    // Factory methods with configuration
    static AsyncBatchLookupFunctionProvider of(AsyncBatchLookupFunction func);
    static AsyncBatchLookupFunctionProvider of(AsyncBatchLookupFunction func, int maxBatchSize);
    static AsyncBatchLookupFunctionProvider of(AsyncBatchLookupFunction func, int maxBatchSize, Duration timeout);
    
    // Configuration getters
    AsyncBatchLookupFunction createAsyncBatchLookupFunction();
    int getMaxBatchSize();
    Duration getBatchTimeout();
}

Example Usage

Implementing a Batch Lookup Source

public class MyBatchLookupTableSource implements LookupTableSource {

    @Override
    public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) {
        return AsyncBatchLookupFunctionProvider.of(
            new MyBatchInferenceFunction(),
            32,  // maxBatchSize
            Duration.ofMillis(100)  // batchTimeout
        );
    }
}

public class MyBatchInferenceFunction extends AsyncBatchLookupFunction {

    @Override
    public CompletableFuture<Collection<RowData>> asyncLookupBatch(List<RowData> keyRows) {
        return CompletableFuture.supplyAsync(() -> {
            // Batch ML inference call
            List<float[]> features = keyRows.stream()
                .map(this::extractFeatures)
                .collect(Collectors.toList());
            
            List<float[]> predictions = modelService.batchPredict(features);
            
            return IntStream.range(0, keyRows.size())
                .mapToObj(i -> createResultRow(keyRows.get(i), predictions.get(i)))
                .collect(Collectors.toList());
        });
    }
}

Using in SQL

-- Create a lookup table with batch async support
CREATE TABLE model_predictions (
    feature_id INT,
    prediction FLOAT
) WITH (
    'connector' = 'my-batch-inference-connector',
    'async.batch.size' = '32',
    'async.batch.timeout' = '100ms'
);

-- Temporal join using batch async lookup
SELECT o.*, p.prediction
FROM orders AS o
JOIN model_predictions FOR SYSTEM_TIME AS OF o.proctime AS p
ON o.feature_id = p.feature_id;

Design Principles

  1. Backward Compatible - Does not modify existing AsyncLookupFunction or AsyncWaitOperator
  2. Reuses Existing Runtime - Bridges to AsyncBatchWaitOperator for execution
  3. SQL Layer Only Describes - SQL describes "what", runtime decides "how"
  4. Extensible - Clear entry points for future Python/ML Connector support

Verifying this change

This change added tests and can be verified as follows:

Unit Tests (flink-table-common)

  • AsyncBatchLookupFunctionTest - Tests batch lookup semantics and delegation
  • AsyncBatchLookupFunctionProviderTest - Tests provider configuration and validation

Does this pull request potentially affect one of the following parts

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): yes
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no (new code path only)
  • Anything that affects deployment or recovery: no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? yes
  • If yes, how is the feature documented? JavaDocs

Future Work (TODOs in code)

  • Planner integration for automatic batch async lookup detection
  • SQL hint support for batch async parameters
  • Event-time based batching
  • Python API integration
  • ML Connector integration

averyzhang added 6 commits December 21, 2025 17:40
…erence

This commit introduces SQL/Table API support for batch async lookup joins,
enabling AI/ML inference scenarios where batching lookups improves throughput.

Key additions:
- AsyncBatchLookupFunction: Batch-oriented async lookup interface
- AsyncBatchLookupFunctionProvider: Provider with batch configuration
- AsyncBatchLookupJoinRunner: Runtime lookup join runner
- AsyncBatchLookupJoinFunctionAdapter: Adapter to streaming AsyncBatchFunction
- LookupJoinUtil: Batch async lookup detection and options extraction
- FunctionKind.ASYNC_BATCH_TABLE: New function kind enum

The implementation bridges the Table API layer to the existing
AsyncBatchWaitOperator runtime, ensuring consistent behavior with
size-based, time-based batching, retry, and timeout strategies.
@flinkbot
Copy link
Collaborator

flinkbot commented Dec 22, 2025

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants