From d2fca805187b3b35f8cdff3c950f259c9f8940ff Mon Sep 17 00:00:00 2001 From: David Wang Date: Mon, 4 May 2026 17:14:46 +1000 Subject: [PATCH] Emit pendingRecords metric in Kinesis connector. --- .../source/metrics/KinesisShardMetrics.java | 76 +++++++++++++++++++ .../source/metrics/MetricConstants.java | 11 +++ .../reader/KinesisShardSplitReaderBase.java | 9 ++- .../reader/KinesisStreamsSourceReader.java | 22 ++++++ 4 files changed, 115 insertions(+), 3 deletions(-) diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/metrics/KinesisShardMetrics.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/metrics/KinesisShardMetrics.java index c46d5394..d5916d16 100644 --- a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/metrics/KinesisShardMetrics.java +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/metrics/KinesisShardMetrics.java @@ -31,10 +31,30 @@ @Internal public class KinesisShardMetrics { private static final Logger log = LoggerFactory.getLogger(KinesisShardMetrics.class); + + /** + * Smoothing factor for the EWMA of records-per-millisecond used to translate {@code + * millisBehindLatest} into an estimated record count. Higher values track recent rate more + * aggressively; lower values are more stable. 0.2 was chosen empirically as a good + * compromise — long enough to ride out single-batch jitter, short enough to react to traffic + * shifts within ~10 batches. + */ + private static final double RATE_EWMA_ALPHA = 0.2; + private final MetricGroup metricGroup; private final KinesisShardSplit shardInfo; private volatile long millisBehindLatest = -1L; + /** Exponentially-weighted moving average of records-per-millisecond observed on this shard. */ + private volatile double recordsPerMs = 0.0; + + /** + * Wall-clock time (System.nanoTime, ns) at which the most recent record batch was observed. + * {@code 0} means we have not yet seen a batch on this shard. Used to compute the time delta + * passed to the EWMA in {@link #observeBatchSize(int)}. + */ + private volatile long lastBatchObservedAtNanos = 0L; + public KinesisShardMetrics(KinesisShardSplit shard, MetricGroup rootMetricGroup) { this.shardInfo = shard; @@ -75,6 +95,62 @@ public void setMillisBehindLatest(long millisBehindLatest) { this.millisBehindLatest = millisBehindLatest; } + /** + * Update the per-shard EWMA of records-per-millisecond. Called once per record batch by the + * source reader; empty batches carry no rate signal and are skipped. + * + *

This is the input to {@link #getEstimatedPendingRecords()}, which in turn feeds the + * operator-level {@code pendingRecords} gauge that the Flink autoscaler reads. + */ + public void observeBatchSize(int batchSize) { + if (batchSize <= 0) { + return; + } + long now = System.nanoTime(); + long prev = lastBatchObservedAtNanos; + lastBatchObservedAtNanos = now; + if (prev == 0L) { + // First observation on this shard — no time delta yet. Seed the EWMA with a rough + // estimate assuming the batch was produced over 1 second; subsequent observations + // will correct it quickly. + recordsPerMs = batchSize / 1000.0; + return; + } + double dtMs = (now - prev) / 1_000_000.0; + if (dtMs <= 0.0) { + return; + } + double instantaneousRate = batchSize / dtMs; + // Standard EWMA: new = alpha * sample + (1 - alpha) * old. + recordsPerMs = + (RATE_EWMA_ALPHA * instantaneousRate) + + ((1.0 - RATE_EWMA_ALPHA) * recordsPerMs); + } + + /** + * Estimate the number of records still queued upstream on this shard, derived from {@link + * #millisBehindLatest} and the EWMA in {@link #recordsPerMs}. + * + *

Returns {@code 0} when the shard is fully caught up or when we have not yet observed + * enough traffic to estimate a rate. When we have only seen {@code millisBehindLatest} but + * no batches yet, falls back to using {@code millisBehindLatest} itself as a coarse record + * count — the autoscaler then at least sees a non-zero lag signal. + */ + public long getEstimatedPendingRecords() { + long mbl = millisBehindLatest; + if (mbl <= 0L) { + return 0L; + } + double rate = recordsPerMs; + if (rate <= 0.0) { + // No rate observed yet — return msBehind as records (1 ms ≈ 1 record fallback). + // This is intentionally an over-estimate; the autoscaler reacting earlier with stale + // data is preferable to it sleeping through real lag. + return mbl; + } + return Math.round(mbl * rate); + } + public void unregister() { ((AbstractMetricGroup) metricGroup).close(); } diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/metrics/MetricConstants.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/metrics/MetricConstants.java index 683946e1..4d9b9172 100644 --- a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/metrics/MetricConstants.java +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/metrics/MetricConstants.java @@ -31,4 +31,15 @@ public class MetricConstants { public static final String ACCOUNT_ID_METRIC_GROUP = "accountId"; public static final String MILLIS_BEHIND_LATEST = "millisBehindLatest"; + + /** + * Number of records the source still has to read before it has caught up with the stream tip. + * + *

This is the metric the Flink Kubernetes Operator autoscaler reads to decide whether a + * source vertex needs to be scaled up. Upstream Flink Kinesis source 5.1.0 does NOT register + * this gauge; this connector publishes it as part of an Atlassian patch. The value is an + * estimate computed per shard from {@code millisBehindLatest * recentRecordsPerMs} and then + * summed across shards at the operator-level metric group. + */ + public static final String PENDING_RECORDS = "pendingRecords"; } diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/KinesisShardSplitReaderBase.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/KinesisShardSplitReaderBase.java index e96a9104..9bdbaf4c 100644 --- a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/KinesisShardSplitReaderBase.java +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/KinesisShardSplitReaderBase.java @@ -118,9 +118,12 @@ public RecordsWithSplitIds fetch() throws IOException { assignedSplits.add(splitState); } - shardMetricGroupMap - .get(splitState.getShardId()) - .setMillisBehindLatest(recordBatch.getMillisBehindLatest()); + KinesisShardMetrics shardMetrics = shardMetricGroupMap.get(splitState.getShardId()); + shardMetrics.setMillisBehindLatest(recordBatch.getMillisBehindLatest()); + // Feed the per-shard EWMA used to estimate `pendingRecords` for the + // Flink Kubernetes Operator autoscaler. Empty batches carry no rate information and are + // skipped inside the helper. + shardMetrics.observeBatchSize(recordBatch.getRecords().size()); if (recordBatch.getRecords().isEmpty()) { if (recordBatch.isCompleted()) { diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/KinesisStreamsSourceReader.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/KinesisStreamsSourceReader.java index ccadba91..d05e5139 100644 --- a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/KinesisStreamsSourceReader.java +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/KinesisStreamsSourceReader.java @@ -26,6 +26,7 @@ import org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager; import org.apache.flink.connector.kinesis.source.event.SplitsFinishedEvent; import org.apache.flink.connector.kinesis.source.metrics.KinesisShardMetrics; +import org.apache.flink.connector.kinesis.source.metrics.MetricConstants; import org.apache.flink.connector.kinesis.source.split.KinesisShardSplit; import org.apache.flink.connector.kinesis.source.split.KinesisShardSplitState; @@ -67,6 +68,27 @@ public KinesisStreamsSourceReader( this.shardMetricGroupMap = shardMetricGroupMap; this.finishedSplits = new TreeMap<>(); this.currentCheckpointId = Long.MIN_VALUE; + + // Register an operator-level `pendingRecords` gauge that aggregates the + // per-shard estimates published by KinesisShardMetrics. The Flink Kubernetes Operator + // autoscaler reads this gauge on the source operator's MetricGroup to decide whether to + // scale the vertex up. Without it, the autoscaler logs + // "pendingRecords metric for could not be found. + // Either a legacy source or an idle source. Assuming no pending records." + // and never accounts for source lag, even when there is a real backlog. + context.metricGroup() + .gauge( + MetricConstants.PENDING_RECORDS, + () -> { + long total = 0L; + // shardMetricGroupMap is mutated from the source-fetcher thread; + // iterate the values snapshot defensively to avoid CME. Sum is + // monotonic and order-independent so we do not need a lock. + for (KinesisShardMetrics metrics : shardMetricGroupMap.values()) { + total += metrics.getEstimatedPendingRecords(); + } + return total; + }); } @Override