Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,30 @@
@Internal
public class KinesisShardMetrics {
private static final Logger log = LoggerFactory.getLogger(KinesisShardMetrics.class);

/**
* Smoothing factor for the EWMA of records-per-millisecond used to translate {@code
* millisBehindLatest} into an estimated record count. Higher values track recent rate more
* aggressively; lower values are more stable. 0.2 was chosen empirically as a good
* compromise — long enough to ride out single-batch jitter, short enough to react to traffic
* shifts within ~10 batches.
*/
private static final double RATE_EWMA_ALPHA = 0.2;

private final MetricGroup metricGroup;
private final KinesisShardSplit shardInfo;
private volatile long millisBehindLatest = -1L;

/** Exponentially-weighted moving average of records-per-millisecond observed on this shard. */
private volatile double recordsPerMs = 0.0;

/**
* Wall-clock time (System.nanoTime, ns) at which the most recent record batch was observed.
* {@code 0} means we have not yet seen a batch on this shard. Used to compute the time delta
* passed to the EWMA in {@link #observeBatchSize(int)}.
*/
private volatile long lastBatchObservedAtNanos = 0L;

public KinesisShardMetrics(KinesisShardSplit shard, MetricGroup rootMetricGroup) {
this.shardInfo = shard;

Expand Down Expand Up @@ -75,6 +95,62 @@ public void setMillisBehindLatest(long millisBehindLatest) {
this.millisBehindLatest = millisBehindLatest;
}

/**
* Update the per-shard EWMA of records-per-millisecond. Called once per record batch by the
* source reader; empty batches carry no rate signal and are skipped.
*
* <p>This is the input to {@link #getEstimatedPendingRecords()}, which in turn feeds the
* operator-level {@code pendingRecords} gauge that the Flink autoscaler reads.
*/
public void observeBatchSize(int batchSize) {
if (batchSize <= 0) {
return;
}
long now = System.nanoTime();
long prev = lastBatchObservedAtNanos;
lastBatchObservedAtNanos = now;
if (prev == 0L) {
// First observation on this shard — no time delta yet. Seed the EWMA with a rough
// estimate assuming the batch was produced over 1 second; subsequent observations
// will correct it quickly.
recordsPerMs = batchSize / 1000.0;
return;
}
double dtMs = (now - prev) / 1_000_000.0;
if (dtMs <= 0.0) {
return;
}
double instantaneousRate = batchSize / dtMs;
// Standard EWMA: new = alpha * sample + (1 - alpha) * old.
recordsPerMs =
(RATE_EWMA_ALPHA * instantaneousRate)
+ ((1.0 - RATE_EWMA_ALPHA) * recordsPerMs);
}

/**
* Estimate the number of records still queued upstream on this shard, derived from {@link
* #millisBehindLatest} and the EWMA in {@link #recordsPerMs}.
*
* <p>Returns {@code 0} when the shard is fully caught up or when we have not yet observed
* enough traffic to estimate a rate. When we have only seen {@code millisBehindLatest} but
* no batches yet, falls back to using {@code millisBehindLatest} itself as a coarse record
* count — the autoscaler then at least sees a non-zero lag signal.
*/
public long getEstimatedPendingRecords() {
long mbl = millisBehindLatest;
if (mbl <= 0L) {
return 0L;
}
double rate = recordsPerMs;
if (rate <= 0.0) {
// No rate observed yet — return msBehind as records (1 ms ≈ 1 record fallback).
// This is intentionally an over-estimate; the autoscaler reacting earlier with stale
// data is preferable to it sleeping through real lag.
return mbl;
}
return Math.round(mbl * rate);
}

public void unregister() {
((AbstractMetricGroup) metricGroup).close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,15 @@ public class MetricConstants {
public static final String ACCOUNT_ID_METRIC_GROUP = "accountId";

public static final String MILLIS_BEHIND_LATEST = "millisBehindLatest";

/**
* Number of records the source still has to read before it has caught up with the stream tip.
*
* <p>This is the metric the Flink Kubernetes Operator autoscaler reads to decide whether a
* source vertex needs to be scaled up. Upstream Flink Kinesis source 5.1.0 does NOT register
* this gauge; this connector publishes it as part of an Atlassian patch. The value is an
* estimate computed per shard from {@code millisBehindLatest * recentRecordsPerMs} and then
* summed across shards at the operator-level metric group.
*/
public static final String PENDING_RECORDS = "pendingRecords";
}
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,12 @@ public RecordsWithSplitIds<Record> fetch() throws IOException {
assignedSplits.add(splitState);
}

shardMetricGroupMap
.get(splitState.getShardId())
.setMillisBehindLatest(recordBatch.getMillisBehindLatest());
KinesisShardMetrics shardMetrics = shardMetricGroupMap.get(splitState.getShardId());
shardMetrics.setMillisBehindLatest(recordBatch.getMillisBehindLatest());
// Feed the per-shard EWMA used to estimate `pendingRecords` for the
// Flink Kubernetes Operator autoscaler. Empty batches carry no rate information and are
// skipped inside the helper.
shardMetrics.observeBatchSize(recordBatch.getRecords().size());

if (recordBatch.getRecords().isEmpty()) {
if (recordBatch.isCompleted()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager;
import org.apache.flink.connector.kinesis.source.event.SplitsFinishedEvent;
import org.apache.flink.connector.kinesis.source.metrics.KinesisShardMetrics;
import org.apache.flink.connector.kinesis.source.metrics.MetricConstants;
import org.apache.flink.connector.kinesis.source.split.KinesisShardSplit;
import org.apache.flink.connector.kinesis.source.split.KinesisShardSplitState;

Expand Down Expand Up @@ -67,6 +68,27 @@ public KinesisStreamsSourceReader(
this.shardMetricGroupMap = shardMetricGroupMap;
this.finishedSplits = new TreeMap<>();
this.currentCheckpointId = Long.MIN_VALUE;

// Register an operator-level `pendingRecords` gauge that aggregates the
// per-shard estimates published by KinesisShardMetrics. The Flink Kubernetes Operator
// autoscaler reads this gauge on the source operator's MetricGroup to decide whether to
// scale the vertex up. Without it, the autoscaler logs
// "pendingRecords metric for <vertexId> could not be found.
// Either a legacy source or an idle source. Assuming no pending records."
// and never accounts for source lag, even when there is a real backlog.
context.metricGroup()
.gauge(
MetricConstants.PENDING_RECORDS,
() -> {
long total = 0L;
// shardMetricGroupMap is mutated from the source-fetcher thread;
// iterate the values snapshot defensively to avoid CME. Sum is
// monotonic and order-independent so we do not need a lock.
for (KinesisShardMetrics metrics : shardMetricGroupMap.values()) {
total += metrics.getEstimatedPendingRecords();
}
return total;
});
}

@Override
Expand Down