Skip to content

Conversation

@featzhang
Copy link
Member

What is the purpose of the change

This PR adds comprehensive metrics support to AsyncBatchWaitOperator and OrderedAsyncBatchWaitOperator for monitoring AI/ML inference workloads.

The existing async batch operators lacked visibility into batch-level performance characteristics that are critical for:

  • Tuning batch sizes for optimal throughput
  • Monitoring inference latency
  • Identifying backpressure and concurrency issues
  • Detecting failures in async operations

Brief change log

  • Added metrics instrumentation to AsyncBatchWaitOperator:

    • batchSize - Histogram of batch sizes (records per batch)
    • batchLatencyMs - Histogram of batch latency (time from first element to flush)
    • asyncCallDurationMs - Histogram of async call duration (invocation to completion)
    • inflightBatches - Gauge showing current in-flight async operations
    • totalBatchesProcessed - Counter of total batches processed
    • totalRecordsProcessed - Counter of total records processed
    • asyncCallFailures - Counter of failed async calls
  • Added same metrics to OrderedAsyncBatchWaitOperator plus:

    • pendingOrderedBatches - Gauge showing batches waiting for in-order emission
  • Added comprehensive unit tests for all metrics functionality

Verifying this change

This change added tests and can be verified as follows:

  • AsyncBatchWaitOperatorTest#testBatchSizeMetric - verifies batch size histogram is recorded
  • AsyncBatchWaitOperatorTest#testBatchAndRecordCounters - verifies batch and record counters
  • AsyncBatchWaitOperatorTest#testAsyncCallDurationMetric - verifies duration tracking
  • AsyncBatchWaitOperatorTest#testAsyncCallFailureMetric - verifies failure counter
  • AsyncBatchWaitOperatorTest#testInflightBatchesTracking - verifies in-flight gauge
  • AsyncBatchWaitOperatorTest#testBatchLatencyMetric - verifies latency histogram
  • AsyncBatchWaitOperatorTest#testMetricsWithMultipleBatches - comprehensive multi-batch test

Does this pull request potentially affect one of the following parts

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): no
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): yes (minimal overhead)
  • Anything that affects deployment or recovery: no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? yes
  • If yes, how is the feature documented? JavaDocs

@flinkbot
Copy link
Collaborator

flinkbot commented Dec 21, 2025

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants