From 852d8456b77a6028c4fa5dd3826fe15f2d75713e Mon Sep 17 00:00:00 2001 From: Palash Chauhan Date: Tue, 23 Jun 2026 13:57:33 -0700 Subject: [PATCH 1/3] PHOENIX-7883 : More metrics for EC index consumer Co-authored-by: Cursor --- .../MetricsIndexCDCConsumerSource.java | 79 ++++++++ .../MetricsIndexCDCConsumerSourceImpl.java | 61 ++++++ .../phoenix/hbase/index/IndexCDCConsumer.java | 186 ++++++++++-------- 3 files changed, 245 insertions(+), 81 deletions(-) diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexCDCConsumerSource.java b/phoenix-core-client/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexCDCConsumerSource.java index cb39b5b38a8..51e58b464f9 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexCDCConsumerSource.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexCDCConsumerSource.java @@ -50,6 +50,42 @@ public interface MetricsIndexCDCConsumerSource extends BaseSource { String CDC_BATCH_FAILURE_COUNT = "cdcBatchFailureCount"; String CDC_BATCH_FAILURE_COUNT_DESC = "The number of CDC batch processing failures"; + String CDC_EVENT_SKIPPED_COUNT = "cdcEventSkippedCount"; + String CDC_EVENT_SKIPPED_COUNT_DESC = + "The number of times the consumer permanently advanced past CDC events whose data table " + + "row state could not be read within phoenix.index.cdc.consumer.max.data.visibility.retries" + + " attempts. Each increment represents one or more CDC events that will never be applied " + + "to the eventually consistent index \u2014 typically caused by failed or aborted data " + + "table mutations upstream. Non-zero values indicate silent data divergence between the " + + "data table and its EC indexes."; + + String CDC_PARENT_REPLAY_ACTIVE_REGIONS = "cdcParentReplayActiveRegions"; + String CDC_PARENT_REPLAY_ACTIVE_REGIONS_DESC = + "Gauge of regions currently in the parent-region replay phase (post-split / post-merge " + + "catch-up before steady-state own-partition processing begins). Per-table value = number " + + "of regions of this table on this RegionServer currently replaying ancestor partitions. " + + "The lag histogram inflates during this phase by design (parent-replay timestamps do not " + + "advance the child's own-partition freshness watermark) \u2014 this gauge lets operators " + + "distinguish 'normal post-split catch-up' from 'consumer is broken'."; + + String CDC_PARENT_REPLAY_DURATION = "cdcParentReplayDuration"; + String CDC_PARENT_REPLAY_DURATION_DESC = + "Histogram (milliseconds) of how long it took to fully replay one ancestor partition during " + + "post-split / post-merge catch-up. One sample is emitted per parent partition when this " + + "consumer either marks it COMPLETE or observes another consumer marking it COMPLETE. " + + "Ancestors that were already COMPLETE when discovered emit no sample."; + + String CDC_CONSUMER_ACTIVE_REGIONS = "cdcConsumerActiveRegions"; + String CDC_CONSUMER_ACTIVE_REGIONS_DESC = + "Gauge of regions whose IndexCDCConsumer is currently in steady-state own-partition " + + "processing for this table on this RegionServer. Incremented immediately before entering " + + "the main poll loop (after startup wait, EC-index discovery, CDC_STREAM wait, tracker " + + "lookup, and any parent-region replay have all completed) and decremented when the loop " + + "exits. A gauge of 0 where >0 is expected indicates the consumer either never reached " + + "steady state (cold start, missing EC index, missing CDC_STREAM entry) or exited " + + "(stopped, crashed, or region moved away). Combine with cdcParentReplayActiveRegions to " + + "tell 'in catch-up' apart from 'in steady state' apart from 'not running at all'."; + String CDC_INDEX_UPDATE_LAG = "cdcIndexUpdateLag"; String CDC_INDEX_UPDATE_LAG_DESC = "Histogram of current time minus the consumer's effective freshness watermark, in " @@ -98,6 +134,49 @@ public interface MetricsIndexCDCConsumerSource extends BaseSource { */ void incrementCdcBatchFailureCount(String dataTableName); + /** + * Increments the count of CDC events permanently skipped after exhausting data-visibility + * retries. See {@link #CDC_EVENT_SKIPPED_COUNT_DESC}. + * @param dataTableName physical data table name + */ + void incrementCdcEventSkippedCount(String dataTableName); + + /** + * Increments the parent-region replay active gauge by 1. Must be paired with a corresponding + * {@link #decrementCdcParentReplayActiveRegions(String)} in a {@code finally} block. + * @param dataTableName physical data table name + */ + void incrementCdcParentReplayActiveRegions(String dataTableName); + + /** + * Decrements the parent-region replay active gauge by 1. Must be invoked in a {@code finally} + * block paired with {@link #incrementCdcParentReplayActiveRegions(String)}. + * @param dataTableName physical data table name + */ + void decrementCdcParentReplayActiveRegions(String dataTableName); + + /** + * Adds a sample to the parent-region replay duration histogram. Called once per ancestor + * partition after its replay reaches a terminal state. + * @param dataTableName physical data table name + * @param durationMs wall-clock time spent replaying this ancestor partition + */ + void updateCdcParentReplayDuration(String dataTableName, long durationMs); + + /** + * Increments the steady-state active-regions gauge by 1. Must be paired with a corresponding + * {@link #decrementCdcConsumerActiveRegions(String)} in a {@code finally} block. + * @param dataTableName physical data table name + */ + void incrementCdcConsumerActiveRegions(String dataTableName); + + /** + * Decrements the steady-state active-regions gauge by 1. Must be invoked in a {@code finally} + * block paired with {@link #incrementCdcConsumerActiveRegions(String)}. + * @param dataTableName physical data table name + */ + void decrementCdcConsumerActiveRegions(String dataTableName); + /** * Updates the CDC lag histogram. * @param dataTableName physical data table name diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexCDCConsumerSourceImpl.java b/phoenix-core-client/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexCDCConsumerSourceImpl.java index 71c63826339..54dacb73ad7 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexCDCConsumerSourceImpl.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexCDCConsumerSourceImpl.java @@ -20,6 +20,7 @@ import org.apache.hadoop.hbase.metrics.BaseSourceImpl; import org.apache.hadoop.metrics2.MetricHistogram; import org.apache.hadoop.metrics2.lib.MutableFastCounter; +import org.apache.hadoop.metrics2.lib.MutableGaugeLong; /** * Implementation for tracking IndexCDCConsumer metrics. @@ -33,6 +34,10 @@ public class MetricsIndexCDCConsumerSourceImpl extends BaseSourceImpl private final MutableFastCounter cdcBatchCounter; private final MutableFastCounter cdcMutationCounter; private final MutableFastCounter cdcBatchFailureCounter; + private final MutableFastCounter cdcEventSkippedCounter; + private final MutableGaugeLong cdcParentReplayActiveRegionsGauge; + private final MetricHistogram cdcParentReplayDurationHisto; + private final MutableGaugeLong cdcConsumerActiveRegionsGauge; private final MetricHistogram cdcIndexUpdateLagHisto; public MetricsIndexCDCConsumerSourceImpl() { @@ -54,6 +59,14 @@ public MetricsIndexCDCConsumerSourceImpl(String metricsName, String metricsDescr getMetricsRegistry().newCounter(CDC_MUTATION_COUNT, CDC_MUTATION_COUNT_DESC, 0L); cdcBatchFailureCounter = getMetricsRegistry().newCounter(CDC_BATCH_FAILURE_COUNT, CDC_BATCH_FAILURE_COUNT_DESC, 0L); + cdcEventSkippedCounter = + getMetricsRegistry().newCounter(CDC_EVENT_SKIPPED_COUNT, CDC_EVENT_SKIPPED_COUNT_DESC, 0L); + cdcParentReplayActiveRegionsGauge = getMetricsRegistry() + .newGauge(CDC_PARENT_REPLAY_ACTIVE_REGIONS, CDC_PARENT_REPLAY_ACTIVE_REGIONS_DESC, 0L); + cdcParentReplayDurationHisto = getMetricsRegistry().newHistogram(CDC_PARENT_REPLAY_DURATION, + CDC_PARENT_REPLAY_DURATION_DESC); + cdcConsumerActiveRegionsGauge = getMetricsRegistry().newGauge(CDC_CONSUMER_ACTIVE_REGIONS, + CDC_CONSUMER_ACTIVE_REGIONS_DESC, 0L); cdcIndexUpdateLagHisto = getMetricsRegistry().newHistogram(CDC_INDEX_UPDATE_LAG, CDC_INDEX_UPDATE_LAG_DESC); } @@ -96,6 +109,42 @@ public void incrementCdcBatchFailureCount(String dataTableName) { cdcBatchFailureCounter.incr(); } + @Override + public void incrementCdcEventSkippedCount(String dataTableName) { + incrementTableSpecificCounter(CDC_EVENT_SKIPPED_COUNT, dataTableName); + cdcEventSkippedCounter.incr(); + } + + @Override + public void incrementCdcParentReplayActiveRegions(String dataTableName) { + incrementTableSpecificGauge(CDC_PARENT_REPLAY_ACTIVE_REGIONS, dataTableName); + cdcParentReplayActiveRegionsGauge.incr(); + } + + @Override + public void decrementCdcParentReplayActiveRegions(String dataTableName) { + decrementTableSpecificGauge(CDC_PARENT_REPLAY_ACTIVE_REGIONS, dataTableName); + cdcParentReplayActiveRegionsGauge.decr(); + } + + @Override + public void updateCdcParentReplayDuration(String dataTableName, long durationMs) { + incrementTableSpecificHistogram(CDC_PARENT_REPLAY_DURATION, dataTableName, durationMs); + cdcParentReplayDurationHisto.add(durationMs); + } + + @Override + public void incrementCdcConsumerActiveRegions(String dataTableName) { + incrementTableSpecificGauge(CDC_CONSUMER_ACTIVE_REGIONS, dataTableName); + cdcConsumerActiveRegionsGauge.incr(); + } + + @Override + public void decrementCdcConsumerActiveRegions(String dataTableName) { + decrementTableSpecificGauge(CDC_CONSUMER_ACTIVE_REGIONS, dataTableName); + cdcConsumerActiveRegionsGauge.decr(); + } + @Override public void updateCdcLag(String dataTableName, long lag) { incrementTableSpecificHistogram(CDC_INDEX_UPDATE_LAG, dataTableName, lag); @@ -114,6 +163,18 @@ private void incrementTableSpecificHistogram(String baseName, String tableName, tableHistogram.add(t); } + private void incrementTableSpecificGauge(String baseName, String tableName) { + MutableGaugeLong tableGauge = + getMetricsRegistry().getGauge(getMetricName(baseName, tableName), 0); + tableGauge.incr(); + } + + private void decrementTableSpecificGauge(String baseName, String tableName) { + MutableGaugeLong tableGauge = + getMetricsRegistry().getGauge(getMetricName(baseName, tableName), 0); + tableGauge.decr(); + } + private String getMetricName(String baseName, String tableName) { return baseName + "." + tableName; } diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexCDCConsumer.java b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexCDCConsumer.java index 412e380b661..d2468ca21a1 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexCDCConsumer.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexCDCConsumer.java @@ -477,42 +477,52 @@ public void run() { dataTableName, encodedRegionName, lastProcessedTimestamp); } else { if (hasParentPartitions) { - sleepWithLagSampling(timestampBufferMs + 1); - replayAndCompleteParentRegions(encodedRegionName); + metricSource.incrementCdcParentReplayActiveRegions(dataTableName); + try { + sleepWithLagSampling(timestampBufferMs + 1); + replayAndCompleteParentRegions(encodedRegionName); + } finally { + metricSource.decrementCdcParentReplayActiveRegions(dataTableName); + } } else { LOG.info("No parent partitions for table {} region {}, skipping parent replay", dataTableName, encodedRegionName); } } int retryCount = 0; - while (!stopped) { - try { - long previousTimestamp = lastProcessedTimestamp; - if (serializeCDCMutations) { - lastProcessedTimestamp = - processCDCBatch(encodedRegionName, encodedRegionName, lastProcessedTimestamp, false); - } else { - lastProcessedTimestamp = processCDCBatchGenerated(encodedRegionName, encodedRegionName, - lastProcessedTimestamp, false); - } - if (lastProcessedTimestamp == previousTimestamp) { - sleepWithLagSampling(ConnectionUtils.getPauseTime(pause, ++retryCount)); - } else { - retryCount = 0; - sleepWithLagSampling(pollIntervalMs); - } - } catch (Exception e) { - if (e instanceof InterruptedException) { - throw (InterruptedException) e; + metricSource.incrementCdcConsumerActiveRegions(dataTableName); + try { + while (!stopped) { + try { + long previousTimestamp = lastProcessedTimestamp; + if (serializeCDCMutations) { + lastProcessedTimestamp = processCDCBatch(encodedRegionName, encodedRegionName, + lastProcessedTimestamp, false); + } else { + lastProcessedTimestamp = processCDCBatchGenerated(encodedRegionName, + encodedRegionName, lastProcessedTimestamp, false); + } + if (lastProcessedTimestamp == previousTimestamp) { + sleepWithLagSampling(ConnectionUtils.getPauseTime(pause, ++retryCount)); + } else { + retryCount = 0; + sleepWithLagSampling(pollIntervalMs); + } + } catch (Exception e) { + if (e instanceof InterruptedException) { + throw (InterruptedException) e; + } + metricSource.incrementCdcBatchFailureCount(dataTableName); + long sleepTime = ConnectionUtils.getPauseTime(pause, ++retryCount); + LOG.error( + "Error processing CDC mutations for table {} region {}. " + + "Retry #{}, sleeping {} ms before retrying...", + dataTableName, encodedRegionName, retryCount, sleepTime, e); + sleepWithLagSampling(sleepTime); } - metricSource.incrementCdcBatchFailureCount(dataTableName); - long sleepTime = ConnectionUtils.getPauseTime(pause, ++retryCount); - LOG.error( - "Error processing CDC mutations for table {} region {}. " - + "Retry #{}, sleeping {} ms before retrying...", - dataTableName, encodedRegionName, retryCount, sleepTime, e); - sleepWithLagSampling(sleepTime); } + } finally { + metricSource.decrementCdcConsumerActiveRegions(dataTableName); } } catch (InterruptedException e) { if (!stopped) { @@ -834,66 +844,79 @@ private void processPartitionToCompletion(String partitionId, String ownerPartit long currentLastProcessedTimestamp = lastProcessedTimestamp; int retryCount = 0; int batchCount = 0; - while (!stopped) { - try { - if (batchCount > 0) { - if (isPartitionCompleted(partitionId)) { - return; + long replayStartTime = EnvironmentEdgeManager.currentTimeMillis(); + boolean reachedTerminalState = false; + try { + while (!stopped) { + try { + if (batchCount > 0) { + if (isPartitionCompleted(partitionId)) { + reachedTerminalState = true; + return; + } + long otherProgress = getParentProgress(partitionId); + if (otherProgress > currentLastProcessedTimestamp) { + long previousOtherProgress; + do { + previousOtherProgress = otherProgress; + sleepWithLagSampling(parentProgressPauseMs); + if (isPartitionCompleted(partitionId)) { + reachedTerminalState = true; + return; + } + otherProgress = getParentProgress(partitionId); + } while (!stopped && otherProgress > previousOtherProgress); + currentLastProcessedTimestamp = otherProgress; + } } - long otherProgress = getParentProgress(partitionId); - if (otherProgress > currentLastProcessedTimestamp) { - long previousOtherProgress; - do { - previousOtherProgress = otherProgress; - sleepWithLagSampling(parentProgressPauseMs); - if (isPartitionCompleted(partitionId)) { - return; - } - otherProgress = getParentProgress(partitionId); - } while (!stopped && otherProgress > previousOtherProgress); - currentLastProcessedTimestamp = otherProgress; + long newTimestamp; + if (serializeCDCMutations) { + newTimestamp = + processCDCBatch(partitionId, ownerPartitionId, currentLastProcessedTimestamp, true); + } else { + newTimestamp = processCDCBatchGenerated(partitionId, ownerPartitionId, + currentLastProcessedTimestamp, true); } - } - long newTimestamp; - if (serializeCDCMutations) { - newTimestamp = - processCDCBatch(partitionId, ownerPartitionId, currentLastProcessedTimestamp, true); - } else { - newTimestamp = processCDCBatchGenerated(partitionId, ownerPartitionId, - currentLastProcessedTimestamp, true); - } - batchCount++; - retryCount = 0; - if (newTimestamp == currentLastProcessedTimestamp) { - if (isPartitionCompleted(partitionId)) { - LOG.info( - "Partition {} for table {} was completed by another consumer before {} could mark it", - partitionId, dataTableName, ownerPartitionId); + batchCount++; + retryCount = 0; + if (newTimestamp == currentLastProcessedTimestamp) { + if (isPartitionCompleted(partitionId)) { + LOG.info( + "Partition {} for table {} was completed by another consumer before {} could mark it", + partitionId, dataTableName, ownerPartitionId); + reachedTerminalState = true; + return; + } + LOG.info("Partition {} owner {} for table {} fully processed, marking as COMPLETE", + partitionId, ownerPartitionId, dataTableName); + try (PhoenixConnection conn = QueryUtil.getConnectionOnServer(env.getConfiguration()) + .unwrap(PhoenixConnection.class)) { + updateTrackerProgress(conn, partitionId, ownerPartitionId, + currentLastProcessedTimestamp, PhoenixDatabaseMetaData.TRACKER_STATUS_COMPLETE); + } + reachedTerminalState = true; return; } - LOG.info("Partition {} owner {} for table {} fully processed, marking as COMPLETE", - partitionId, ownerPartitionId, dataTableName); - try (PhoenixConnection conn = QueryUtil.getConnectionOnServer(env.getConfiguration()) - .unwrap(PhoenixConnection.class)) { - updateTrackerProgress(conn, partitionId, ownerPartitionId, - currentLastProcessedTimestamp, PhoenixDatabaseMetaData.TRACKER_STATUS_COMPLETE); - } - return; + currentLastProcessedTimestamp = newTimestamp; + } catch (SQLException | IOException e) { + metricSource.incrementCdcBatchFailureCount(dataTableName); + long sleepTime = ConnectionUtils.getPauseTime(pause, ++retryCount); + LOG.warn( + "Error processing CDC batch for partition {} owner {} table {} " + + "lastProcessedTimestamp {}. Retry #{}, sleeping {} ms", + partitionId, ownerPartitionId, dataTableName, currentLastProcessedTimestamp, retryCount, + sleepTime, e); + sleepWithLagSampling(sleepTime); } - currentLastProcessedTimestamp = newTimestamp; - } catch (SQLException | IOException e) { - metricSource.incrementCdcBatchFailureCount(dataTableName); - long sleepTime = ConnectionUtils.getPauseTime(pause, ++retryCount); - LOG.warn( - "Error processing CDC batch for partition {} owner {} table {} " - + "lastProcessedTimestamp {}. Retry #{}, sleeping {} ms", - partitionId, ownerPartitionId, dataTableName, currentLastProcessedTimestamp, retryCount, - sleepTime, e); - sleepWithLagSampling(sleepTime); + } + LOG.info("Processing partition {} (owner {}) stopped before completion for table {}", + partitionId, ownerPartitionId, dataTableName); + } finally { + if (reachedTerminalState) { + metricSource.updateCdcParentReplayDuration(dataTableName, + EnvironmentEdgeManager.currentTimeMillis() - replayStartTime); } } - LOG.info("Processing partition {} (owner {}) stopped before completion for table {}", - partitionId, ownerPartitionId, dataTableName); } /** @@ -1112,6 +1135,7 @@ private long processCDCBatchGenerated(String partitionId, String ownerPartitionI "Skipping CDC events for table {} partition {} from timestamp {}" + " to {} after {} retries — data table mutations may have failed", dataTableName, partitionId, newLastTimestamp, lastScannedTimestamp[0], retryCount); + metricSource.incrementCdcEventSkippedCount(dataTableName); newLastTimestamp = lastScannedTimestamp[0]; // NOTE: durable tracker advances below (newLastTimestamp > lastProcessedTimestamp) // but progress.recordProcessed is skipped (batchStates.isEmpty()). The in-memory From a68e2596d445a24f14ef5f18a2040c2eddcac32c Mon Sep 17 00:00:00 2001 From: Palash Chauhan Date: Tue, 23 Jun 2026 14:22:57 -0700 Subject: [PATCH 2/3] PHOENIX-7883 : Lag metric improvements - Keep in-memory progress in sync with durable tracker on the processCDCBatchGenerated give-up path (max data-visibility retries exhausted): previously the durable tracker advanced via updateTrackerProgress but the in-memory watermark stayed stale, so cdcIndexUpdateLag over-reported until the next empty poll or successful batch. - In the serialized processCDCBatch inner loop, advance the in-memory watermark when rows exist but are all empty IndexMutations protos (no-op CDC entries). We have definitively scanned past newLastTimestamp; without this the watermark stayed fixed across the no-op burn-through. - Bump DEFAULT_LAG_SAMPLE_INTERVAL_MS from 1000ms to 5000ms to reduce background histogram-update load on RegionServers hosting many EC-indexed regions. Operators can dial it back down via phoenix.index.cdc.consumer.lag.sample.interval.ms. Co-authored-by: Cursor --- .../phoenix/hbase/index/IndexCDCConsumer.java | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexCDCConsumer.java b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexCDCConsumer.java index d2468ca21a1..de58a9dd26e 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexCDCConsumer.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexCDCConsumer.java @@ -146,7 +146,7 @@ public class IndexCDCConsumer implements Runnable { */ public static final String INDEX_CDC_CONSUMER_LAG_SAMPLE_INTERVAL_MS = "phoenix.index.cdc.consumer.lag.sample.interval.ms"; - private static final long DEFAULT_LAG_SAMPLE_INTERVAL_MS = 1000L; + private static final long DEFAULT_LAG_SAMPLE_INTERVAL_MS = 5000L; private static final long MIN_LAG_SAMPLE_INTERVAL_MS = 50L; private final RegionCoprocessorEnvironment env; @@ -1012,6 +1012,9 @@ private long processCDCBatch(String partitionId, String ownerPartitionId, if (hasMoreRows) { newLastTimestamp = result.getFirst(); if (batchMutations.isEmpty()) { + if (!isParentReplay) { + progress.recordProcessed(newLastTimestamp); + } sleepWithLagSampling(ConnectionUtils.getPauseTime(pause, ++retryCount)); } } @@ -1137,10 +1140,6 @@ private long processCDCBatchGenerated(String partitionId, String ownerPartitionI dataTableName, partitionId, newLastTimestamp, lastScannedTimestamp[0], retryCount); metricSource.incrementCdcEventSkippedCount(dataTableName); newLastTimestamp = lastScannedTimestamp[0]; - // NOTE: durable tracker advances below (newLastTimestamp > lastProcessedTimestamp) - // but progress.recordProcessed is skipped (batchStates.isEmpty()). The in-memory - // watermark lags durable state until the next empty poll heals it — over-reports - // lag temporarily (safe direction, self-healing). break; } else { // CDC index entries are written but the data is not yet visible. @@ -1184,11 +1183,11 @@ private long processCDCBatchGenerated(String partitionId, String ownerPartitionI metricSource.updateCdcBatchProcessTime(dataTableName, EnvironmentEdgeManager.currentTimeMillis() - batchStartTime); metricSource.incrementCdcBatchCount(dataTableName); + } + if (newLastTimestamp > lastProcessedTimestamp) { if (!isParentReplay) { progress.recordProcessed(newLastTimestamp); } - } - if (newLastTimestamp > lastProcessedTimestamp) { updateTrackerProgress(conn, partitionId, ownerPartitionId, newLastTimestamp, PhoenixDatabaseMetaData.TRACKER_STATUS_IN_PROGRESS); } From 6e0ec56281792db046eb8fcbf028b5ffd1ed7744 Mon Sep 17 00:00:00 2001 From: Palash Chauhan Date: Tue, 23 Jun 2026 16:21:52 -0700 Subject: [PATCH 3/3] PHOENIX-7883 : Carry skipped-event count; clarify parent-replay duration DESC - cdcEventSkippedCount now increments by the number of CDC events dropped in each give-up event (not by 1). getDataRowStatesAndTimestamp reports the scanned row count via a new long[] out-param, mirroring the existing lastScannedTimestamp idiom. The WARN log line also carries the count. - cdcParentReplayDuration DESC clarifies that the histogram measures this consumer's time inside processPartitionToCompletion, which may be less than end-to-end partition replay time when another consumer marks the partition COMPLETE first. --- .../MetricsIndexCDCConsumerSource.java | 22 ++++++++++--------- .../MetricsIndexCDCConsumerSourceImpl.java | 8 ++++--- .../phoenix/hbase/index/IndexCDCConsumer.java | 20 ++++++++++------- 3 files changed, 29 insertions(+), 21 deletions(-) diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexCDCConsumerSource.java b/phoenix-core-client/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexCDCConsumerSource.java index 51e58b464f9..ac647638e19 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexCDCConsumerSource.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexCDCConsumerSource.java @@ -52,12 +52,11 @@ public interface MetricsIndexCDCConsumerSource extends BaseSource { String CDC_EVENT_SKIPPED_COUNT = "cdcEventSkippedCount"; String CDC_EVENT_SKIPPED_COUNT_DESC = - "The number of times the consumer permanently advanced past CDC events whose data table " + "The number of CDC events the consumer permanently advanced past because their data table " + "row state could not be read within phoenix.index.cdc.consumer.max.data.visibility.retries" - + " attempts. Each increment represents one or more CDC events that will never be applied " - + "to the eventually consistent index \u2014 typically caused by failed or aborted data " - + "table mutations upstream. Non-zero values indicate silent data divergence between the " - + "data table and its EC indexes."; + + " attempts. Each event counted will never be applied to the eventually consistent index " + + "\u2014 typically caused by failed or aborted data table mutations upstream. Non-zero " + + "values indicate silent data divergence between the data table and its EC indexes."; String CDC_PARENT_REPLAY_ACTIVE_REGIONS = "cdcParentReplayActiveRegions"; String CDC_PARENT_REPLAY_ACTIVE_REGIONS_DESC = @@ -70,10 +69,12 @@ public interface MetricsIndexCDCConsumerSource extends BaseSource { String CDC_PARENT_REPLAY_DURATION = "cdcParentReplayDuration"; String CDC_PARENT_REPLAY_DURATION_DESC = - "Histogram (milliseconds) of how long it took to fully replay one ancestor partition during " - + "post-split / post-merge catch-up. One sample is emitted per parent partition when this " - + "consumer either marks it COMPLETE or observes another consumer marking it COMPLETE. " - + "Ancestors that were already COMPLETE when discovered emit no sample."; + "Histogram (milliseconds) of how long this consumer spent replaying one ancestor partition " + + "during post-split / post-merge catch-up, measured from when this consumer joined the " + + "replay until it reached a terminal state. One sample is emitted per parent partition " + + "when this consumer either marks it COMPLETE or observes another consumer marking it " + + "COMPLETE \u2014 in the latter case the sample is shorter than the end-to-end partition " + + "replay time. Ancestors that were already COMPLETE when discovered emit no sample."; String CDC_CONSUMER_ACTIVE_REGIONS = "cdcConsumerActiveRegions"; String CDC_CONSUMER_ACTIVE_REGIONS_DESC = @@ -138,8 +139,9 @@ public interface MetricsIndexCDCConsumerSource extends BaseSource { * Increments the count of CDC events permanently skipped after exhausting data-visibility * retries. See {@link #CDC_EVENT_SKIPPED_COUNT_DESC}. * @param dataTableName physical data table name + * @param count number of CDC events skipped in this give-up event */ - void incrementCdcEventSkippedCount(String dataTableName); + void incrementCdcEventSkippedCount(String dataTableName, long count); /** * Increments the parent-region replay active gauge by 1. Must be paired with a corresponding diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexCDCConsumerSourceImpl.java b/phoenix-core-client/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexCDCConsumerSourceImpl.java index 54dacb73ad7..5158f4054ba 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexCDCConsumerSourceImpl.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexCDCConsumerSourceImpl.java @@ -110,9 +110,11 @@ public void incrementCdcBatchFailureCount(String dataTableName) { } @Override - public void incrementCdcEventSkippedCount(String dataTableName) { - incrementTableSpecificCounter(CDC_EVENT_SKIPPED_COUNT, dataTableName); - cdcEventSkippedCounter.incr(); + public void incrementCdcEventSkippedCount(String dataTableName, long count) { + MutableFastCounter tableCounter = + getMetricsRegistry().getCounter(getMetricName(CDC_EVENT_SKIPPED_COUNT, dataTableName), 0); + tableCounter.incr(count); + cdcEventSkippedCounter.incr(count); } @Override diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexCDCConsumer.java b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexCDCConsumer.java index de58a9dd26e..25b0e18f268 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexCDCConsumer.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexCDCConsumer.java @@ -1118,6 +1118,7 @@ private long processCDCBatchGenerated(String partitionId, String ownerPartitionI List> batchStates = new ArrayList<>(); long newLastTimestamp = lastProcessedTimestamp; long[] lastScannedTimestamp = { lastProcessedTimestamp }; + long[] scannedRowCount = { 0L }; boolean hasMoreRows = true; int retryCount = 0; // Captured immediately before each query so the empty-poll watermark cannot over-advance @@ -1127,18 +1128,19 @@ private long processCDCBatchGenerated(String partitionId, String ownerPartitionI lastQueryStartTime = EnvironmentEdgeManager.currentTimeMillis(); try (PreparedStatement ps = conn.prepareStatement(cdcQuery)) { setStatementParams(scanInfo, partitionId, isParentReplay, newLastTimestamp, ps); - Pair result = - getDataRowStatesAndTimestamp(ps, newLastTimestamp, batchStates, lastScannedTimestamp); + Pair result = getDataRowStatesAndTimestamp(ps, newLastTimestamp, + batchStates, lastScannedTimestamp, scannedRowCount); hasMoreRows = result.getSecond(); if (hasMoreRows) { if (!batchStates.isEmpty()) { newLastTimestamp = result.getFirst(); } else if (retryCount >= maxDataVisibilityRetries) { LOG.warn( - "Skipping CDC events for table {} partition {} from timestamp {}" + "Skipping {} CDC events for table {} partition {} from timestamp {}" + " to {} after {} retries — data table mutations may have failed", - dataTableName, partitionId, newLastTimestamp, lastScannedTimestamp[0], retryCount); - metricSource.incrementCdcEventSkippedCount(dataTableName); + scannedRowCount[0], dataTableName, partitionId, newLastTimestamp, + lastScannedTimestamp[0], retryCount); + metricSource.incrementCdcEventSkippedCount(dataTableName, scannedRowCount[0]); newLastTimestamp = lastScannedTimestamp[0]; break; } else { @@ -1166,8 +1168,8 @@ private long processCDCBatchGenerated(String partitionId, String ownerPartitionI int idx = scanInfo.bindParams(ps, 1); ps.setString(idx++, partitionId); ps.setDate(idx, new Date(newLastTimestamp)); - Pair result = - getDataRowStatesAndTimestamp(ps, newLastTimestamp, batchStates, lastScannedTimestamp); + Pair result = getDataRowStatesAndTimestamp(ps, newLastTimestamp, + batchStates, lastScannedTimestamp, scannedRowCount); newLastTimestamp = result.getFirst(); if (batchStates.isEmpty()) { newLastTimestamp = timestampToRefetch; @@ -1211,13 +1213,15 @@ private void setStatementParams(TenantScanInfo scanInfo, String partitionId, private static Pair getDataRowStatesAndTimestamp(PreparedStatement ps, long initialLastTimestamp, List> batchStates, - long[] lastScannedTimestamp) throws SQLException, IOException { + long[] lastScannedTimestamp, long[] scannedRowCount) throws SQLException, IOException { boolean hasRows = false; long lastTimestamp = initialLastTimestamp; lastScannedTimestamp[0] = initialLastTimestamp; + scannedRowCount[0] = 0L; try (ResultSet rs = ps.executeQuery()) { while (rs.next()) { hasRows = true; + scannedRowCount[0]++; long rowTimestamp = rs.getDate(1).getTime(); lastScannedTimestamp[0] = rowTimestamp; String cdcValue = rs.getString(2);