Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,43 @@ public interface MetricsIndexCDCConsumerSource extends BaseSource {
String CDC_BATCH_FAILURE_COUNT = "cdcBatchFailureCount";
String CDC_BATCH_FAILURE_COUNT_DESC = "The number of CDC batch processing failures";

String CDC_EVENT_SKIPPED_COUNT = "cdcEventSkippedCount";
String CDC_EVENT_SKIPPED_COUNT_DESC =
"The number of CDC events the consumer permanently advanced past because their data table "
+ "row state could not be read within phoenix.index.cdc.consumer.max.data.visibility.retries"
+ " attempts. Each event counted will never be applied to the eventually consistent index "
+ "\u2014 typically caused by failed or aborted data table mutations upstream. Non-zero "
+ "values indicate silent data divergence between the data table and its EC indexes.";

String CDC_PARENT_REPLAY_ACTIVE_REGIONS = "cdcParentReplayActiveRegions";
String CDC_PARENT_REPLAY_ACTIVE_REGIONS_DESC =
"Gauge of regions currently in the parent-region replay phase (post-split / post-merge "
+ "catch-up before steady-state own-partition processing begins). Per-table value = number "
+ "of regions of this table on this RegionServer currently replaying ancestor partitions. "
+ "The lag histogram inflates during this phase by design (parent-replay timestamps do not "
+ "advance the child's own-partition freshness watermark) \u2014 this gauge lets operators "
+ "distinguish 'normal post-split catch-up' from 'consumer is broken'.";

String CDC_PARENT_REPLAY_DURATION = "cdcParentReplayDuration";
String CDC_PARENT_REPLAY_DURATION_DESC =
"Histogram (milliseconds) of how long this consumer spent replaying one ancestor partition "
+ "during post-split / post-merge catch-up, measured from when this consumer joined the "
+ "replay until it reached a terminal state. One sample is emitted per parent partition "
+ "when this consumer either marks it COMPLETE or observes another consumer marking it "
+ "COMPLETE \u2014 in the latter case the sample is shorter than the end-to-end partition "
+ "replay time. Ancestors that were already COMPLETE when discovered emit no sample.";

String CDC_CONSUMER_ACTIVE_REGIONS = "cdcConsumerActiveRegions";
String CDC_CONSUMER_ACTIVE_REGIONS_DESC =
"Gauge of regions whose IndexCDCConsumer is currently in steady-state own-partition "
+ "processing for this table on this RegionServer. Incremented immediately before entering "
+ "the main poll loop (after startup wait, EC-index discovery, CDC_STREAM wait, tracker "
+ "lookup, and any parent-region replay have all completed) and decremented when the loop "
+ "exits. A gauge of 0 where >0 is expected indicates the consumer either never reached "
+ "steady state (cold start, missing EC index, missing CDC_STREAM entry) or exited "
+ "(stopped, crashed, or region moved away). Combine with cdcParentReplayActiveRegions to "
+ "tell 'in catch-up' apart from 'in steady state' apart from 'not running at all'.";

String CDC_INDEX_UPDATE_LAG = "cdcIndexUpdateLag";
String CDC_INDEX_UPDATE_LAG_DESC =
"Histogram of current time minus the consumer's effective freshness watermark, in "
Expand Down Expand Up @@ -98,6 +135,50 @@ public interface MetricsIndexCDCConsumerSource extends BaseSource {
*/
void incrementCdcBatchFailureCount(String dataTableName);

/**
* Increments the count of CDC events permanently skipped after exhausting data-visibility
* retries. See {@link #CDC_EVENT_SKIPPED_COUNT_DESC}.
* @param dataTableName physical data table name
* @param count number of CDC events skipped in this give-up event
*/
void incrementCdcEventSkippedCount(String dataTableName, long count);

/**
* Increments the parent-region replay active gauge by 1. Must be paired with a corresponding
* {@link #decrementCdcParentReplayActiveRegions(String)} in a {@code finally} block.
* @param dataTableName physical data table name
*/
void incrementCdcParentReplayActiveRegions(String dataTableName);

/**
* Decrements the parent-region replay active gauge by 1. Must be invoked in a {@code finally}
* block paired with {@link #incrementCdcParentReplayActiveRegions(String)}.
* @param dataTableName physical data table name
*/
void decrementCdcParentReplayActiveRegions(String dataTableName);

/**
* Adds a sample to the parent-region replay duration histogram. Called once per ancestor
* partition after its replay reaches a terminal state.
* @param dataTableName physical data table name
* @param durationMs wall-clock time spent replaying this ancestor partition
*/
void updateCdcParentReplayDuration(String dataTableName, long durationMs);

/**
* Increments the steady-state active-regions gauge by 1. Must be paired with a corresponding
* {@link #decrementCdcConsumerActiveRegions(String)} in a {@code finally} block.
* @param dataTableName physical data table name
*/
void incrementCdcConsumerActiveRegions(String dataTableName);

/**
* Decrements the steady-state active-regions gauge by 1. Must be invoked in a {@code finally}
* block paired with {@link #incrementCdcConsumerActiveRegions(String)}.
* @param dataTableName physical data table name
*/
void decrementCdcConsumerActiveRegions(String dataTableName);

/**
* Updates the CDC lag histogram.
* @param dataTableName physical data table name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.hadoop.hbase.metrics.BaseSourceImpl;
import org.apache.hadoop.metrics2.MetricHistogram;
import org.apache.hadoop.metrics2.lib.MutableFastCounter;
import org.apache.hadoop.metrics2.lib.MutableGaugeLong;

/**
* Implementation for tracking IndexCDCConsumer metrics.
Expand All @@ -33,6 +34,10 @@ public class MetricsIndexCDCConsumerSourceImpl extends BaseSourceImpl
private final MutableFastCounter cdcBatchCounter;
private final MutableFastCounter cdcMutationCounter;
private final MutableFastCounter cdcBatchFailureCounter;
private final MutableFastCounter cdcEventSkippedCounter;
private final MutableGaugeLong cdcParentReplayActiveRegionsGauge;
private final MetricHistogram cdcParentReplayDurationHisto;
private final MutableGaugeLong cdcConsumerActiveRegionsGauge;
private final MetricHistogram cdcIndexUpdateLagHisto;

public MetricsIndexCDCConsumerSourceImpl() {
Expand All @@ -54,6 +59,14 @@ public MetricsIndexCDCConsumerSourceImpl(String metricsName, String metricsDescr
getMetricsRegistry().newCounter(CDC_MUTATION_COUNT, CDC_MUTATION_COUNT_DESC, 0L);
cdcBatchFailureCounter =
getMetricsRegistry().newCounter(CDC_BATCH_FAILURE_COUNT, CDC_BATCH_FAILURE_COUNT_DESC, 0L);
cdcEventSkippedCounter =
getMetricsRegistry().newCounter(CDC_EVENT_SKIPPED_COUNT, CDC_EVENT_SKIPPED_COUNT_DESC, 0L);
cdcParentReplayActiveRegionsGauge = getMetricsRegistry()
.newGauge(CDC_PARENT_REPLAY_ACTIVE_REGIONS, CDC_PARENT_REPLAY_ACTIVE_REGIONS_DESC, 0L);
cdcParentReplayDurationHisto = getMetricsRegistry().newHistogram(CDC_PARENT_REPLAY_DURATION,
CDC_PARENT_REPLAY_DURATION_DESC);
cdcConsumerActiveRegionsGauge = getMetricsRegistry().newGauge(CDC_CONSUMER_ACTIVE_REGIONS,
CDC_CONSUMER_ACTIVE_REGIONS_DESC, 0L);
cdcIndexUpdateLagHisto =
getMetricsRegistry().newHistogram(CDC_INDEX_UPDATE_LAG, CDC_INDEX_UPDATE_LAG_DESC);
}
Expand Down Expand Up @@ -96,6 +109,44 @@ public void incrementCdcBatchFailureCount(String dataTableName) {
cdcBatchFailureCounter.incr();
}

@Override
public void incrementCdcEventSkippedCount(String dataTableName, long count) {
MutableFastCounter tableCounter =
getMetricsRegistry().getCounter(getMetricName(CDC_EVENT_SKIPPED_COUNT, dataTableName), 0);
tableCounter.incr(count);
cdcEventSkippedCounter.incr(count);
}

@Override
public void incrementCdcParentReplayActiveRegions(String dataTableName) {
incrementTableSpecificGauge(CDC_PARENT_REPLAY_ACTIVE_REGIONS, dataTableName);
cdcParentReplayActiveRegionsGauge.incr();
}

@Override
public void decrementCdcParentReplayActiveRegions(String dataTableName) {
decrementTableSpecificGauge(CDC_PARENT_REPLAY_ACTIVE_REGIONS, dataTableName);
cdcParentReplayActiveRegionsGauge.decr();
}

@Override
public void updateCdcParentReplayDuration(String dataTableName, long durationMs) {
incrementTableSpecificHistogram(CDC_PARENT_REPLAY_DURATION, dataTableName, durationMs);
cdcParentReplayDurationHisto.add(durationMs);
}

@Override
public void incrementCdcConsumerActiveRegions(String dataTableName) {
incrementTableSpecificGauge(CDC_CONSUMER_ACTIVE_REGIONS, dataTableName);
cdcConsumerActiveRegionsGauge.incr();
}

@Override
public void decrementCdcConsumerActiveRegions(String dataTableName) {
decrementTableSpecificGauge(CDC_CONSUMER_ACTIVE_REGIONS, dataTableName);
cdcConsumerActiveRegionsGauge.decr();
}

@Override
public void updateCdcLag(String dataTableName, long lag) {
incrementTableSpecificHistogram(CDC_INDEX_UPDATE_LAG, dataTableName, lag);
Expand All @@ -114,6 +165,18 @@ private void incrementTableSpecificHistogram(String baseName, String tableName,
tableHistogram.add(t);
}

private void incrementTableSpecificGauge(String baseName, String tableName) {
MutableGaugeLong tableGauge =
getMetricsRegistry().getGauge(getMetricName(baseName, tableName), 0);
tableGauge.incr();
}

private void decrementTableSpecificGauge(String baseName, String tableName) {
MutableGaugeLong tableGauge =
getMetricsRegistry().getGauge(getMetricName(baseName, tableName), 0);
tableGauge.decr();
}

private String getMetricName(String baseName, String tableName) {
return baseName + "." + tableName;
}
Expand Down
Loading