diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexCDCConsumerSource.java b/phoenix-core-client/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexCDCConsumerSource.java index cb39b5b38a8..ac647638e19 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexCDCConsumerSource.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexCDCConsumerSource.java @@ -50,6 +50,43 @@ public interface MetricsIndexCDCConsumerSource extends BaseSource { String CDC_BATCH_FAILURE_COUNT = "cdcBatchFailureCount"; String CDC_BATCH_FAILURE_COUNT_DESC = "The number of CDC batch processing failures"; + String CDC_EVENT_SKIPPED_COUNT = "cdcEventSkippedCount"; + String CDC_EVENT_SKIPPED_COUNT_DESC = + "The number of CDC events the consumer permanently advanced past because their data table " + + "row state could not be read within phoenix.index.cdc.consumer.max.data.visibility.retries" + + " attempts. Each event counted will never be applied to the eventually consistent index " + + "\u2014 typically caused by failed or aborted data table mutations upstream. Non-zero " + + "values indicate silent data divergence between the data table and its EC indexes."; + + String CDC_PARENT_REPLAY_ACTIVE_REGIONS = "cdcParentReplayActiveRegions"; + String CDC_PARENT_REPLAY_ACTIVE_REGIONS_DESC = + "Gauge of regions currently in the parent-region replay phase (post-split / post-merge " + + "catch-up before steady-state own-partition processing begins). Per-table value = number " + + "of regions of this table on this RegionServer currently replaying ancestor partitions. " + + "The lag histogram inflates during this phase by design (parent-replay timestamps do not " + + "advance the child's own-partition freshness watermark) \u2014 this gauge lets operators " + + "distinguish 'normal post-split catch-up' from 'consumer is broken'."; + + String CDC_PARENT_REPLAY_DURATION = "cdcParentReplayDuration"; + String CDC_PARENT_REPLAY_DURATION_DESC = + "Histogram (milliseconds) of how long this consumer spent replaying one ancestor partition " + + "during post-split / post-merge catch-up, measured from when this consumer joined the " + + "replay until it reached a terminal state. One sample is emitted per parent partition " + + "when this consumer either marks it COMPLETE or observes another consumer marking it " + + "COMPLETE \u2014 in the latter case the sample is shorter than the end-to-end partition " + + "replay time. Ancestors that were already COMPLETE when discovered emit no sample."; + + String CDC_CONSUMER_ACTIVE_REGIONS = "cdcConsumerActiveRegions"; + String CDC_CONSUMER_ACTIVE_REGIONS_DESC = + "Gauge of regions whose IndexCDCConsumer is currently in steady-state own-partition " + + "processing for this table on this RegionServer. Incremented immediately before entering " + + "the main poll loop (after startup wait, EC-index discovery, CDC_STREAM wait, tracker " + + "lookup, and any parent-region replay have all completed) and decremented when the loop " + + "exits. A gauge of 0 where >0 is expected indicates the consumer either never reached " + + "steady state (cold start, missing EC index, missing CDC_STREAM entry) or exited " + + "(stopped, crashed, or region moved away). Combine with cdcParentReplayActiveRegions to " + + "tell 'in catch-up' apart from 'in steady state' apart from 'not running at all'."; + String CDC_INDEX_UPDATE_LAG = "cdcIndexUpdateLag"; String CDC_INDEX_UPDATE_LAG_DESC = "Histogram of current time minus the consumer's effective freshness watermark, in " @@ -98,6 +135,50 @@ public interface MetricsIndexCDCConsumerSource extends BaseSource { */ void incrementCdcBatchFailureCount(String dataTableName); + /** + * Increments the count of CDC events permanently skipped after exhausting data-visibility + * retries. See {@link #CDC_EVENT_SKIPPED_COUNT_DESC}. + * @param dataTableName physical data table name + * @param count number of CDC events skipped in this give-up event + */ + void incrementCdcEventSkippedCount(String dataTableName, long count); + + /** + * Increments the parent-region replay active gauge by 1. Must be paired with a corresponding + * {@link #decrementCdcParentReplayActiveRegions(String)} in a {@code finally} block. + * @param dataTableName physical data table name + */ + void incrementCdcParentReplayActiveRegions(String dataTableName); + + /** + * Decrements the parent-region replay active gauge by 1. Must be invoked in a {@code finally} + * block paired with {@link #incrementCdcParentReplayActiveRegions(String)}. + * @param dataTableName physical data table name + */ + void decrementCdcParentReplayActiveRegions(String dataTableName); + + /** + * Adds a sample to the parent-region replay duration histogram. Called once per ancestor + * partition after its replay reaches a terminal state. + * @param dataTableName physical data table name + * @param durationMs wall-clock time spent replaying this ancestor partition + */ + void updateCdcParentReplayDuration(String dataTableName, long durationMs); + + /** + * Increments the steady-state active-regions gauge by 1. Must be paired with a corresponding + * {@link #decrementCdcConsumerActiveRegions(String)} in a {@code finally} block. + * @param dataTableName physical data table name + */ + void incrementCdcConsumerActiveRegions(String dataTableName); + + /** + * Decrements the steady-state active-regions gauge by 1. Must be invoked in a {@code finally} + * block paired with {@link #incrementCdcConsumerActiveRegions(String)}. + * @param dataTableName physical data table name + */ + void decrementCdcConsumerActiveRegions(String dataTableName); + /** * Updates the CDC lag histogram. * @param dataTableName physical data table name diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexCDCConsumerSourceImpl.java b/phoenix-core-client/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexCDCConsumerSourceImpl.java index 71c63826339..5158f4054ba 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexCDCConsumerSourceImpl.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexCDCConsumerSourceImpl.java @@ -20,6 +20,7 @@ import org.apache.hadoop.hbase.metrics.BaseSourceImpl; import org.apache.hadoop.metrics2.MetricHistogram; import org.apache.hadoop.metrics2.lib.MutableFastCounter; +import org.apache.hadoop.metrics2.lib.MutableGaugeLong; /** * Implementation for tracking IndexCDCConsumer metrics. @@ -33,6 +34,10 @@ public class MetricsIndexCDCConsumerSourceImpl extends BaseSourceImpl private final MutableFastCounter cdcBatchCounter; private final MutableFastCounter cdcMutationCounter; private final MutableFastCounter cdcBatchFailureCounter; + private final MutableFastCounter cdcEventSkippedCounter; + private final MutableGaugeLong cdcParentReplayActiveRegionsGauge; + private final MetricHistogram cdcParentReplayDurationHisto; + private final MutableGaugeLong cdcConsumerActiveRegionsGauge; private final MetricHistogram cdcIndexUpdateLagHisto; public MetricsIndexCDCConsumerSourceImpl() { @@ -54,6 +59,14 @@ public MetricsIndexCDCConsumerSourceImpl(String metricsName, String metricsDescr getMetricsRegistry().newCounter(CDC_MUTATION_COUNT, CDC_MUTATION_COUNT_DESC, 0L); cdcBatchFailureCounter = getMetricsRegistry().newCounter(CDC_BATCH_FAILURE_COUNT, CDC_BATCH_FAILURE_COUNT_DESC, 0L); + cdcEventSkippedCounter = + getMetricsRegistry().newCounter(CDC_EVENT_SKIPPED_COUNT, CDC_EVENT_SKIPPED_COUNT_DESC, 0L); + cdcParentReplayActiveRegionsGauge = getMetricsRegistry() + .newGauge(CDC_PARENT_REPLAY_ACTIVE_REGIONS, CDC_PARENT_REPLAY_ACTIVE_REGIONS_DESC, 0L); + cdcParentReplayDurationHisto = getMetricsRegistry().newHistogram(CDC_PARENT_REPLAY_DURATION, + CDC_PARENT_REPLAY_DURATION_DESC); + cdcConsumerActiveRegionsGauge = getMetricsRegistry().newGauge(CDC_CONSUMER_ACTIVE_REGIONS, + CDC_CONSUMER_ACTIVE_REGIONS_DESC, 0L); cdcIndexUpdateLagHisto = getMetricsRegistry().newHistogram(CDC_INDEX_UPDATE_LAG, CDC_INDEX_UPDATE_LAG_DESC); } @@ -96,6 +109,44 @@ public void incrementCdcBatchFailureCount(String dataTableName) { cdcBatchFailureCounter.incr(); } + @Override + public void incrementCdcEventSkippedCount(String dataTableName, long count) { + MutableFastCounter tableCounter = + getMetricsRegistry().getCounter(getMetricName(CDC_EVENT_SKIPPED_COUNT, dataTableName), 0); + tableCounter.incr(count); + cdcEventSkippedCounter.incr(count); + } + + @Override + public void incrementCdcParentReplayActiveRegions(String dataTableName) { + incrementTableSpecificGauge(CDC_PARENT_REPLAY_ACTIVE_REGIONS, dataTableName); + cdcParentReplayActiveRegionsGauge.incr(); + } + + @Override + public void decrementCdcParentReplayActiveRegions(String dataTableName) { + decrementTableSpecificGauge(CDC_PARENT_REPLAY_ACTIVE_REGIONS, dataTableName); + cdcParentReplayActiveRegionsGauge.decr(); + } + + @Override + public void updateCdcParentReplayDuration(String dataTableName, long durationMs) { + incrementTableSpecificHistogram(CDC_PARENT_REPLAY_DURATION, dataTableName, durationMs); + cdcParentReplayDurationHisto.add(durationMs); + } + + @Override + public void incrementCdcConsumerActiveRegions(String dataTableName) { + incrementTableSpecificGauge(CDC_CONSUMER_ACTIVE_REGIONS, dataTableName); + cdcConsumerActiveRegionsGauge.incr(); + } + + @Override + public void decrementCdcConsumerActiveRegions(String dataTableName) { + decrementTableSpecificGauge(CDC_CONSUMER_ACTIVE_REGIONS, dataTableName); + cdcConsumerActiveRegionsGauge.decr(); + } + @Override public void updateCdcLag(String dataTableName, long lag) { incrementTableSpecificHistogram(CDC_INDEX_UPDATE_LAG, dataTableName, lag); @@ -114,6 +165,18 @@ private void incrementTableSpecificHistogram(String baseName, String tableName, tableHistogram.add(t); } + private void incrementTableSpecificGauge(String baseName, String tableName) { + MutableGaugeLong tableGauge = + getMetricsRegistry().getGauge(getMetricName(baseName, tableName), 0); + tableGauge.incr(); + } + + private void decrementTableSpecificGauge(String baseName, String tableName) { + MutableGaugeLong tableGauge = + getMetricsRegistry().getGauge(getMetricName(baseName, tableName), 0); + tableGauge.decr(); + } + private String getMetricName(String baseName, String tableName) { return baseName + "." + tableName; } diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexCDCConsumer.java b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexCDCConsumer.java index 412e380b661..25b0e18f268 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexCDCConsumer.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexCDCConsumer.java @@ -146,7 +146,7 @@ public class IndexCDCConsumer implements Runnable { */ public static final String INDEX_CDC_CONSUMER_LAG_SAMPLE_INTERVAL_MS = "phoenix.index.cdc.consumer.lag.sample.interval.ms"; - private static final long DEFAULT_LAG_SAMPLE_INTERVAL_MS = 1000L; + private static final long DEFAULT_LAG_SAMPLE_INTERVAL_MS = 5000L; private static final long MIN_LAG_SAMPLE_INTERVAL_MS = 50L; private final RegionCoprocessorEnvironment env; @@ -477,42 +477,52 @@ public void run() { dataTableName, encodedRegionName, lastProcessedTimestamp); } else { if (hasParentPartitions) { - sleepWithLagSampling(timestampBufferMs + 1); - replayAndCompleteParentRegions(encodedRegionName); + metricSource.incrementCdcParentReplayActiveRegions(dataTableName); + try { + sleepWithLagSampling(timestampBufferMs + 1); + replayAndCompleteParentRegions(encodedRegionName); + } finally { + metricSource.decrementCdcParentReplayActiveRegions(dataTableName); + } } else { LOG.info("No parent partitions for table {} region {}, skipping parent replay", dataTableName, encodedRegionName); } } int retryCount = 0; - while (!stopped) { - try { - long previousTimestamp = lastProcessedTimestamp; - if (serializeCDCMutations) { - lastProcessedTimestamp = - processCDCBatch(encodedRegionName, encodedRegionName, lastProcessedTimestamp, false); - } else { - lastProcessedTimestamp = processCDCBatchGenerated(encodedRegionName, encodedRegionName, - lastProcessedTimestamp, false); - } - if (lastProcessedTimestamp == previousTimestamp) { - sleepWithLagSampling(ConnectionUtils.getPauseTime(pause, ++retryCount)); - } else { - retryCount = 0; - sleepWithLagSampling(pollIntervalMs); - } - } catch (Exception e) { - if (e instanceof InterruptedException) { - throw (InterruptedException) e; + metricSource.incrementCdcConsumerActiveRegions(dataTableName); + try { + while (!stopped) { + try { + long previousTimestamp = lastProcessedTimestamp; + if (serializeCDCMutations) { + lastProcessedTimestamp = processCDCBatch(encodedRegionName, encodedRegionName, + lastProcessedTimestamp, false); + } else { + lastProcessedTimestamp = processCDCBatchGenerated(encodedRegionName, + encodedRegionName, lastProcessedTimestamp, false); + } + if (lastProcessedTimestamp == previousTimestamp) { + sleepWithLagSampling(ConnectionUtils.getPauseTime(pause, ++retryCount)); + } else { + retryCount = 0; + sleepWithLagSampling(pollIntervalMs); + } + } catch (Exception e) { + if (e instanceof InterruptedException) { + throw (InterruptedException) e; + } + metricSource.incrementCdcBatchFailureCount(dataTableName); + long sleepTime = ConnectionUtils.getPauseTime(pause, ++retryCount); + LOG.error( + "Error processing CDC mutations for table {} region {}. " + + "Retry #{}, sleeping {} ms before retrying...", + dataTableName, encodedRegionName, retryCount, sleepTime, e); + sleepWithLagSampling(sleepTime); } - metricSource.incrementCdcBatchFailureCount(dataTableName); - long sleepTime = ConnectionUtils.getPauseTime(pause, ++retryCount); - LOG.error( - "Error processing CDC mutations for table {} region {}. " - + "Retry #{}, sleeping {} ms before retrying...", - dataTableName, encodedRegionName, retryCount, sleepTime, e); - sleepWithLagSampling(sleepTime); } + } finally { + metricSource.decrementCdcConsumerActiveRegions(dataTableName); } } catch (InterruptedException e) { if (!stopped) { @@ -834,66 +844,79 @@ private void processPartitionToCompletion(String partitionId, String ownerPartit long currentLastProcessedTimestamp = lastProcessedTimestamp; int retryCount = 0; int batchCount = 0; - while (!stopped) { - try { - if (batchCount > 0) { - if (isPartitionCompleted(partitionId)) { - return; + long replayStartTime = EnvironmentEdgeManager.currentTimeMillis(); + boolean reachedTerminalState = false; + try { + while (!stopped) { + try { + if (batchCount > 0) { + if (isPartitionCompleted(partitionId)) { + reachedTerminalState = true; + return; + } + long otherProgress = getParentProgress(partitionId); + if (otherProgress > currentLastProcessedTimestamp) { + long previousOtherProgress; + do { + previousOtherProgress = otherProgress; + sleepWithLagSampling(parentProgressPauseMs); + if (isPartitionCompleted(partitionId)) { + reachedTerminalState = true; + return; + } + otherProgress = getParentProgress(partitionId); + } while (!stopped && otherProgress > previousOtherProgress); + currentLastProcessedTimestamp = otherProgress; + } } - long otherProgress = getParentProgress(partitionId); - if (otherProgress > currentLastProcessedTimestamp) { - long previousOtherProgress; - do { - previousOtherProgress = otherProgress; - sleepWithLagSampling(parentProgressPauseMs); - if (isPartitionCompleted(partitionId)) { - return; - } - otherProgress = getParentProgress(partitionId); - } while (!stopped && otherProgress > previousOtherProgress); - currentLastProcessedTimestamp = otherProgress; + long newTimestamp; + if (serializeCDCMutations) { + newTimestamp = + processCDCBatch(partitionId, ownerPartitionId, currentLastProcessedTimestamp, true); + } else { + newTimestamp = processCDCBatchGenerated(partitionId, ownerPartitionId, + currentLastProcessedTimestamp, true); } - } - long newTimestamp; - if (serializeCDCMutations) { - newTimestamp = - processCDCBatch(partitionId, ownerPartitionId, currentLastProcessedTimestamp, true); - } else { - newTimestamp = processCDCBatchGenerated(partitionId, ownerPartitionId, - currentLastProcessedTimestamp, true); - } - batchCount++; - retryCount = 0; - if (newTimestamp == currentLastProcessedTimestamp) { - if (isPartitionCompleted(partitionId)) { - LOG.info( - "Partition {} for table {} was completed by another consumer before {} could mark it", - partitionId, dataTableName, ownerPartitionId); + batchCount++; + retryCount = 0; + if (newTimestamp == currentLastProcessedTimestamp) { + if (isPartitionCompleted(partitionId)) { + LOG.info( + "Partition {} for table {} was completed by another consumer before {} could mark it", + partitionId, dataTableName, ownerPartitionId); + reachedTerminalState = true; + return; + } + LOG.info("Partition {} owner {} for table {} fully processed, marking as COMPLETE", + partitionId, ownerPartitionId, dataTableName); + try (PhoenixConnection conn = QueryUtil.getConnectionOnServer(env.getConfiguration()) + .unwrap(PhoenixConnection.class)) { + updateTrackerProgress(conn, partitionId, ownerPartitionId, + currentLastProcessedTimestamp, PhoenixDatabaseMetaData.TRACKER_STATUS_COMPLETE); + } + reachedTerminalState = true; return; } - LOG.info("Partition {} owner {} for table {} fully processed, marking as COMPLETE", - partitionId, ownerPartitionId, dataTableName); - try (PhoenixConnection conn = QueryUtil.getConnectionOnServer(env.getConfiguration()) - .unwrap(PhoenixConnection.class)) { - updateTrackerProgress(conn, partitionId, ownerPartitionId, - currentLastProcessedTimestamp, PhoenixDatabaseMetaData.TRACKER_STATUS_COMPLETE); - } - return; + currentLastProcessedTimestamp = newTimestamp; + } catch (SQLException | IOException e) { + metricSource.incrementCdcBatchFailureCount(dataTableName); + long sleepTime = ConnectionUtils.getPauseTime(pause, ++retryCount); + LOG.warn( + "Error processing CDC batch for partition {} owner {} table {} " + + "lastProcessedTimestamp {}. Retry #{}, sleeping {} ms", + partitionId, ownerPartitionId, dataTableName, currentLastProcessedTimestamp, retryCount, + sleepTime, e); + sleepWithLagSampling(sleepTime); } - currentLastProcessedTimestamp = newTimestamp; - } catch (SQLException | IOException e) { - metricSource.incrementCdcBatchFailureCount(dataTableName); - long sleepTime = ConnectionUtils.getPauseTime(pause, ++retryCount); - LOG.warn( - "Error processing CDC batch for partition {} owner {} table {} " - + "lastProcessedTimestamp {}. Retry #{}, sleeping {} ms", - partitionId, ownerPartitionId, dataTableName, currentLastProcessedTimestamp, retryCount, - sleepTime, e); - sleepWithLagSampling(sleepTime); + } + LOG.info("Processing partition {} (owner {}) stopped before completion for table {}", + partitionId, ownerPartitionId, dataTableName); + } finally { + if (reachedTerminalState) { + metricSource.updateCdcParentReplayDuration(dataTableName, + EnvironmentEdgeManager.currentTimeMillis() - replayStartTime); } } - LOG.info("Processing partition {} (owner {}) stopped before completion for table {}", - partitionId, ownerPartitionId, dataTableName); } /** @@ -989,6 +1012,9 @@ private long processCDCBatch(String partitionId, String ownerPartitionId, if (hasMoreRows) { newLastTimestamp = result.getFirst(); if (batchMutations.isEmpty()) { + if (!isParentReplay) { + progress.recordProcessed(newLastTimestamp); + } sleepWithLagSampling(ConnectionUtils.getPauseTime(pause, ++retryCount)); } } @@ -1092,6 +1118,7 @@ private long processCDCBatchGenerated(String partitionId, String ownerPartitionI List> batchStates = new ArrayList<>(); long newLastTimestamp = lastProcessedTimestamp; long[] lastScannedTimestamp = { lastProcessedTimestamp }; + long[] scannedRowCount = { 0L }; boolean hasMoreRows = true; int retryCount = 0; // Captured immediately before each query so the empty-poll watermark cannot over-advance @@ -1101,22 +1128,20 @@ private long processCDCBatchGenerated(String partitionId, String ownerPartitionI lastQueryStartTime = EnvironmentEdgeManager.currentTimeMillis(); try (PreparedStatement ps = conn.prepareStatement(cdcQuery)) { setStatementParams(scanInfo, partitionId, isParentReplay, newLastTimestamp, ps); - Pair result = - getDataRowStatesAndTimestamp(ps, newLastTimestamp, batchStates, lastScannedTimestamp); + Pair result = getDataRowStatesAndTimestamp(ps, newLastTimestamp, + batchStates, lastScannedTimestamp, scannedRowCount); hasMoreRows = result.getSecond(); if (hasMoreRows) { if (!batchStates.isEmpty()) { newLastTimestamp = result.getFirst(); } else if (retryCount >= maxDataVisibilityRetries) { LOG.warn( - "Skipping CDC events for table {} partition {} from timestamp {}" + "Skipping {} CDC events for table {} partition {} from timestamp {}" + " to {} after {} retries — data table mutations may have failed", - dataTableName, partitionId, newLastTimestamp, lastScannedTimestamp[0], retryCount); + scannedRowCount[0], dataTableName, partitionId, newLastTimestamp, + lastScannedTimestamp[0], retryCount); + metricSource.incrementCdcEventSkippedCount(dataTableName, scannedRowCount[0]); newLastTimestamp = lastScannedTimestamp[0]; - // NOTE: durable tracker advances below (newLastTimestamp > lastProcessedTimestamp) - // but progress.recordProcessed is skipped (batchStates.isEmpty()). The in-memory - // watermark lags durable state until the next empty poll heals it — over-reports - // lag temporarily (safe direction, self-healing). break; } else { // CDC index entries are written but the data is not yet visible. @@ -1143,8 +1168,8 @@ private long processCDCBatchGenerated(String partitionId, String ownerPartitionI int idx = scanInfo.bindParams(ps, 1); ps.setString(idx++, partitionId); ps.setDate(idx, new Date(newLastTimestamp)); - Pair result = - getDataRowStatesAndTimestamp(ps, newLastTimestamp, batchStates, lastScannedTimestamp); + Pair result = getDataRowStatesAndTimestamp(ps, newLastTimestamp, + batchStates, lastScannedTimestamp, scannedRowCount); newLastTimestamp = result.getFirst(); if (batchStates.isEmpty()) { newLastTimestamp = timestampToRefetch; @@ -1160,11 +1185,11 @@ private long processCDCBatchGenerated(String partitionId, String ownerPartitionI metricSource.updateCdcBatchProcessTime(dataTableName, EnvironmentEdgeManager.currentTimeMillis() - batchStartTime); metricSource.incrementCdcBatchCount(dataTableName); + } + if (newLastTimestamp > lastProcessedTimestamp) { if (!isParentReplay) { progress.recordProcessed(newLastTimestamp); } - } - if (newLastTimestamp > lastProcessedTimestamp) { updateTrackerProgress(conn, partitionId, ownerPartitionId, newLastTimestamp, PhoenixDatabaseMetaData.TRACKER_STATUS_IN_PROGRESS); } @@ -1188,13 +1213,15 @@ private void setStatementParams(TenantScanInfo scanInfo, String partitionId, private static Pair getDataRowStatesAndTimestamp(PreparedStatement ps, long initialLastTimestamp, List> batchStates, - long[] lastScannedTimestamp) throws SQLException, IOException { + long[] lastScannedTimestamp, long[] scannedRowCount) throws SQLException, IOException { boolean hasRows = false; long lastTimestamp = initialLastTimestamp; lastScannedTimestamp[0] = initialLastTimestamp; + scannedRowCount[0] = 0L; try (ResultSet rs = ps.executeQuery()) { while (rs.next()) { hasRows = true; + scannedRowCount[0]++; long rowTimestamp = rs.getDate(1).getTime(); lastScannedTimestamp[0] = rowTimestamp; String cdcValue = rs.getString(2);