From 1ff533b7ea9dd2be792d0e895c0c6c026d8f8955 Mon Sep 17 00:00:00 2001 From: lokiore Date: Mon, 8 Jun 2026 15:13:23 -0700 Subject: [PATCH 1/2] PHOENIX-7872 :- HA observability metrics for poller, CRR refresh/age, failover, mutation-block + RegionServer bypass counter MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds client-side and server-side observability metrics for the Consistent Failover (CCF) high-availability path: Tier-1 client-side counters (4): - HA_POLLER_TICK_COUNT — total poller ticks across all HA groups - HA_POLLER_TICK_FAILURES — per-tick CRR fetch failures - HA_FAILOVER_COUNT — failover transitions executed by the client - HA_MUTATION_BLOCKED_COUNT — MutationBlockedIOException occurrences detected via the wrap-and-propagate path Tier-2 client-side metrics (4): - HA_FAILOVER_DURATION_MS — failover end-to-end latency histogram - HA_STALE_CRR_DETECTED_COUNT — StaleClusterRoleRecordException occurrences - HA_CRR_CACHE_AGE_MS — gauge of staleness of the in-memory CRR cache - (HA_FAILOVER_COUNT moved to applyClusterRoleRecord with role-transition guard so it only fires on actual ACTIVE -> STANDBY or STANDBY -> ACTIVE transitions) Tier-2 server-side counter (1): - BYPASSED_MUTATION_BLOCK_COUNT — emitted from IndexRegionObserver when a mutation bypasses the mutation-block check because no log group is present. Implemented as 3-file Hadoop-metrics2 source: interface + static factory (DefaultMetricsSystem.instance()) + impl. Tests: - HAGroupMetricsIT — IT covering all 8 client-side metrics - BypassedMutationBlockMetricsIT — IT covering server-side bypass counter - HighAvailabilityUtilTest — UT covering RetriesExhaustedWithDetailsException + IOException cause-chain MBIOE detection - MetricsHaBypassSourceFactoryTest — UT covering factory thread-safety Generated-by: Claude Code (Opus 4.7) --- .../jdbc/FailoverPhoenixConnection.java | 100 ++++--- .../phoenix/jdbc/HighAvailabilityGroup.java | 22 ++ .../phoenix/jdbc/HighAvailabilityUtil.java | 34 +++ .../monitoring/GlobalClientMetrics.java | 19 ++ .../apache/phoenix/monitoring/MetricType.java | 29 ++ .../util/GetClusterRoleRecordUtil.java | 3 + .../hbase/index/IndexRegionObserver.java | 15 + .../index/metrics/MetricsHaBypassSource.java | 53 ++++ .../metrics/MetricsHaBypassSourceFactory.java | 42 +++ .../metrics/MetricsHaBypassSourceImpl.java | 67 +++++ .../index/BypassedMutationBlockMetricsIT.java | 83 ++++++ .../apache/phoenix/jdbc/HAGroupMetricsIT.java | 259 ++++++++++++++++++ .../MetricsHaBypassSourceFactoryTest.java | 44 +++ .../jdbc/HighAvailabilityUtilTest.java | 71 +++++ 14 files changed, 799 insertions(+), 42 deletions(-) create mode 100644 phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsHaBypassSource.java create mode 100644 phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsHaBypassSourceFactory.java create mode 100644 phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsHaBypassSourceImpl.java create mode 100644 phoenix-core/src/it/java/org/apache/phoenix/end2end/index/BypassedMutationBlockMetricsIT.java create mode 100644 phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupMetricsIT.java create mode 100644 phoenix-core/src/test/java/org/apache/phoenix/hbase/index/metrics/MetricsHaBypassSourceFactoryTest.java create mode 100644 phoenix-core/src/test/java/org/apache/phoenix/jdbc/HighAvailabilityUtilTest.java diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/FailoverPhoenixConnection.java b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/FailoverPhoenixConnection.java index 58c79c49e9b..5916ea86c4d 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/FailoverPhoenixConnection.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/FailoverPhoenixConnection.java @@ -17,7 +17,11 @@ */ package org.apache.phoenix.jdbc; +import static org.apache.phoenix.jdbc.HighAvailabilityUtil.isMutationBlockedIOExceptionExistsInThrowable; import static org.apache.phoenix.jdbc.HighAvailabilityUtil.isStaleClusterRoleRecordExceptionExistsInThrowable; +import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HA_FAILOVER_DURATION_MS; +import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HA_MUTATION_BLOCKED_COUNT; +import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HA_STALE_CRR_DETECTED_COUNT; import java.sql.Array; import java.sql.Blob; @@ -171,55 +175,63 @@ void failover(long timeoutMs) throws SQLException { return; } - PhoenixConnection newConn = null; - SQLException cause = null; - final long startTime = EnvironmentEdgeManager.currentTimeMillis(); - while (newConn == null && EnvironmentEdgeManager.currentTimeMillis() < startTime + timeoutMs) { - try { - newConn = - context.getHAGroup().connectActive(context.getProperties(), context.getHAURLInfo()); - } catch (SQLException e) { - cause = e; - LOG.info("Got exception when trying to connect to active cluster.", e); + final long failoverStartMs = EnvironmentEdgeManager.currentTimeMillis(); + try { + PhoenixConnection newConn = null; + SQLException cause = null; + final long startTime = EnvironmentEdgeManager.currentTimeMillis(); + while ( + newConn == null && EnvironmentEdgeManager.currentTimeMillis() < startTime + timeoutMs + ) { try { - Thread.sleep(100); // TODO: be smart than this - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - throw new SQLException("Got interrupted waiting for connection failover", e); + newConn = + context.getHAGroup().connectActive(context.getProperties(), context.getHAURLInfo()); + } catch (SQLException e) { + cause = e; + LOG.info("Got exception when trying to connect to active cluster.", e); + try { + Thread.sleep(100); // TODO: be smart than this + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + throw new SQLException("Got interrupted waiting for connection failover", e); + } } } - } - if (newConn == null) { - throw new FailoverSQLException("Can not failover connection", - context.getHAGroup().getGroupInfo().toString(), cause); - } + if (newConn == null) { + throw new FailoverSQLException("Can not failover connection", + context.getHAGroup().getGroupInfo().toString(), cause); + } - final PhoenixConnection oldConn = connection; - connection = newConn; - if (oldConn != null) { - // aggregate metrics - previousMutationMetrics = oldConn.getMutationMetrics(); - previousReadMetrics = oldConn.getReadMetrics(); - oldConn.clearMetrics(); - - // close old connection - if (!oldConn.isClosed()) { - // TODO: what happens to in-flight edits/mutations? - // Can we copy into the new connection we do not allow this failover? - // MutationState state = oldConn.getMutationState(); - try { - oldConn.close(new SQLExceptionInfo.Builder(SQLExceptionCode.HA_CLOSED_AFTER_FAILOVER) - .setMessage("Phoenix connection got closed due to failover") - .setHaGroupInfo(context.getHAGroup().getGroupInfo().toString()).build() - .buildException()); - } catch (SQLException e) { - LOG.error("Failed to close old connection after failover: {}", e.getMessage()); - LOG.info("Full stack when closing old connection after failover", e); + final PhoenixConnection oldConn = connection; + connection = newConn; + if (oldConn != null) { + // aggregate metrics + previousMutationMetrics = oldConn.getMutationMetrics(); + previousReadMetrics = oldConn.getReadMetrics(); + oldConn.clearMetrics(); + + // close old connection + if (!oldConn.isClosed()) { + // TODO: what happens to in-flight edits/mutations? + // Can we copy into the new connection we do not allow this failover? + // MutationState state = oldConn.getMutationState(); + try { + oldConn.close(new SQLExceptionInfo.Builder(SQLExceptionCode.HA_CLOSED_AFTER_FAILOVER) + .setMessage("Phoenix connection got closed due to failover") + .setHaGroupInfo(context.getHAGroup().getGroupInfo().toString()).build() + .buildException()); + } catch (SQLException e) { + LOG.error("Failed to close old connection after failover: {}", e.getMessage()); + LOG.info("Full stack when closing old connection after failover", e); + } } } + LOG.info("Connection {} failed over to {}", context.getHAGroup().getGroupInfo(), + connection.getURL()); + } finally { + GLOBAL_HA_FAILOVER_DURATION_MS + .update(EnvironmentEdgeManager.currentTimeMillis() - failoverStartMs); } - LOG.info("Connection {} failed over to {}", context.getHAGroup().getGroupInfo(), - connection.getURL()); } /** @@ -324,6 +336,7 @@ T wrapActionDuringFailover(SupplierWithSQLException s) throws SQLExceptio return s.get(); } catch (Exception e) { if (isStaleClusterRoleRecordExceptionExistsInThrowable(e)) { + GLOBAL_HA_STALE_CRR_DETECTED_COUNT.increment(); // If we receive StaleClusterRoleRecordException, that means Operation was // supposed to be executed on Active Cluster but was in reality was sent to // STANDBY Cluster, that can happen only when Failover is in Progress, So we @@ -348,6 +361,9 @@ T wrapActionDuringFailover(SupplierWithSQLException s) throws SQLExceptio context.getHAGroup(), e)) .build().buildException(); } + if (isMutationBlockedIOExceptionExistsInThrowable(e)) { + GLOBAL_HA_MUTATION_BLOCKED_COUNT.increment(); + } if (policy.shouldFailover(e, ++failoverCount)) { failover(timeoutMs); } else { diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HighAvailabilityGroup.java b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HighAvailabilityGroup.java index 6b9728e7970..7181b11c5f5 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HighAvailabilityGroup.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HighAvailabilityGroup.java @@ -17,6 +17,9 @@ */ package org.apache.phoenix.jdbc; +import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HA_CRR_CACHE_AGE_MS; +import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HA_CRR_REFRESH_COUNT; +import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HA_FAILOVER_COUNT; import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_CLIENT_CONNECTION_CACHE_MAX_DURATION; import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR; @@ -629,6 +632,7 @@ public void init() throws IOException, SQLException { LOG.info("Initial cluster role for HA group {} is {}", info, roleRecordFromEndpoint); roleRecord = roleRecordFromEndpoint; + lastClusterRoleRecordRefreshTime = System.currentTimeMillis(); state = State.READY; } @@ -644,6 +648,11 @@ public Connection connect(Properties properties, HAURLInfo haurlInfo) throws SQL .setMessage("HA group is not ready!").setHaGroupInfo(info.toString()).build() .buildException(); } + // GAUGE: most-recent-sample of "milliseconds since the last successful CRR refresh". + // Use set(...) to overwrite the prior sample; do NOT increment/update — that would turn + // the gauge into an accumulator and break "current age" semantics. + GLOBAL_HA_CRR_CACHE_AGE_MS.getMetric() + .set(System.currentTimeMillis() - lastClusterRoleRecordRefreshTime); return roleRecord.getPolicy().provide(this, properties, haurlInfo); } @@ -1035,6 +1044,7 @@ private ClusterRoleRecord getClusterRoleRecordFromEndpoint() throws SQLException * @throws SQLException if there is an error getting the ClusterRoleRecord */ public boolean refreshClusterRoleRecord(boolean forceRefresh) throws SQLException { + GLOBAL_HA_CRR_REFRESH_COUNT.increment(); // Allow concurrent reads to return fast in case refresh is not needed readLock.lock(); try { @@ -1057,6 +1067,8 @@ public boolean refreshClusterRoleRecord(boolean forceRefresh) throws SQLExceptio ClusterRoleRecord newRoleRecord = getClusterRoleRecordFromEndpoint(); if (roleRecord == null) { + // First-load init path: no prior cache state to compare against, so this is not a + // failover transition and HA_FAILOVER_COUNT is intentionally NOT incremented here. roleRecord = newRoleRecord; lastClusterRoleRecordRefreshTime = System.currentTimeMillis(); state = State.READY; @@ -1124,6 +1136,16 @@ public boolean refreshClusterRoleRecord(boolean forceRefresh) throws SQLExceptio // The goal here is to gain higher availability even though existing resources against // previous ACTIVE cluster may have not been closed cleanly. } + // Count the transition as a failover only when an active cluster is established + // or moves between peers. Operator-driven transitions to a no-active state + // (both clusters STANDBY) are not counted as failovers; recovery from no-active + // back to having an ACTIVE peer is counted. + if ( + !oldRecord.getActiveUrl().equals(newRoleRecord.getActiveUrl()) + && newRoleRecord.getActiveUrl().isPresent() + ) { + GLOBAL_HA_FAILOVER_COUNT.increment(); + } // Update the role record and the last refresh time roleRecord = newRoleRecord; lastClusterRoleRecordRefreshTime = System.currentTimeMillis(); diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HighAvailabilityUtil.java b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HighAvailabilityUtil.java index 1893ad3169c..386529a6a75 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HighAvailabilityUtil.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HighAvailabilityUtil.java @@ -18,6 +18,7 @@ package org.apache.phoenix.jdbc; import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException; +import org.apache.phoenix.exception.MutationBlockedIOException; import org.apache.phoenix.exception.StaleClusterRoleRecordException; /** @@ -45,7 +46,40 @@ public static boolean isStaleClusterRoleRecordExceptionExistsInThrowable(Throwab return false; } + /** + * Walks the throwable chain (including {@link RetriesExhaustedWithDetailsException} causes) to + * detect a {@link MutationBlockedIOException} surface. Mirrors the structure of + * {@link #isStaleClusterRoleRecordExceptionExistsInThrowable(Throwable)} so that batched mutation + * rejections wrapped at varying depth are still attributable to the mutation-block gate. + */ + public static boolean isMutationBlockedIOExceptionExistsInThrowable(Throwable e) { + if (e == null) { + return false; + } + if (isGivenThrowableMutationBlockedException(e)) { + return true; + } + + if (e instanceof RetriesExhaustedWithDetailsException) { + for (Throwable t : ((RetriesExhaustedWithDetailsException) e).getCauses()) { + if (isGivenThrowableMutationBlockedException(t)) { + return true; + } + } + } + + if (e.getCause() != null) { + return isMutationBlockedIOExceptionExistsInThrowable(e.getCause()); + } + + return false; + } + private static boolean isGivenThrowableStaleException(Throwable t) { return t instanceof StaleClusterRoleRecordException; } + + private static boolean isGivenThrowableMutationBlockedException(Throwable t) { + return t instanceof MutationBlockedIOException; + } } diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/GlobalClientMetrics.java b/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/GlobalClientMetrics.java index 89f04f8827d..bbd23bcbc52 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/GlobalClientMetrics.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/GlobalClientMetrics.java @@ -35,6 +35,11 @@ import static org.apache.phoenix.monitoring.MetricType.COUNT_RPC_CALLS; import static org.apache.phoenix.monitoring.MetricType.COUNT_RPC_RETRIES; import static org.apache.phoenix.monitoring.MetricType.COUNT_SCANNED_REGIONS; +import static org.apache.phoenix.monitoring.MetricType.HA_CRR_CACHE_AGE_MS; +import static org.apache.phoenix.monitoring.MetricType.HA_CRR_REFRESH_COUNT; +import static org.apache.phoenix.monitoring.MetricType.HA_FAILOVER_COUNT; +import static org.apache.phoenix.monitoring.MetricType.HA_FAILOVER_DURATION_MS; +import static org.apache.phoenix.monitoring.MetricType.HA_MUTATION_BLOCKED_COUNT; import static org.apache.phoenix.monitoring.MetricType.HA_PARALLEL_CONNECTION_CREATED_COUNTER; import static org.apache.phoenix.monitoring.MetricType.HA_PARALLEL_CONNECTION_ERROR_COUNTER; import static org.apache.phoenix.monitoring.MetricType.HA_PARALLEL_CONNECTION_FALLBACK_COUNTER; @@ -49,6 +54,9 @@ import static org.apache.phoenix.monitoring.MetricType.HA_PARALLEL_POOL2_TASK_QUEUE_WAIT_TIME; import static org.apache.phoenix.monitoring.MetricType.HA_PARALLEL_POOL2_TASK_REJECTED_COUNTER; import static org.apache.phoenix.monitoring.MetricType.HA_PARALLEL_TASK_TIMEOUT_COUNTER; +import static org.apache.phoenix.monitoring.MetricType.HA_POLLER_TICK_COUNT; +import static org.apache.phoenix.monitoring.MetricType.HA_POLLER_TICK_FAILURES; +import static org.apache.phoenix.monitoring.MetricType.HA_STALE_CRR_DETECTED_COUNT; import static org.apache.phoenix.monitoring.MetricType.HCONNECTIONS_COUNTER; import static org.apache.phoenix.monitoring.MetricType.INDEX_COMMIT_FAILURE_SIZE; import static org.apache.phoenix.monitoring.MetricType.MEMORY_CHUNK_BYTES; @@ -163,6 +171,17 @@ public enum GlobalClientMetrics { GLOBAL_HA_PARALLEL_CONNECTION_ERROR_COUNTER(HA_PARALLEL_CONNECTION_ERROR_COUNTER), GLOBAL_HA_PARALLEL_CONNECTION_CREATED_COUNTER(HA_PARALLEL_CONNECTION_CREATED_COUNTER), + GLOBAL_HA_FAILOVER_COUNT(HA_FAILOVER_COUNT), + GLOBAL_HA_FAILOVER_DURATION_MS(HA_FAILOVER_DURATION_MS), + GLOBAL_HA_MUTATION_BLOCKED_COUNT(HA_MUTATION_BLOCKED_COUNT), + GLOBAL_HA_STALE_CRR_DETECTED_COUNT(HA_STALE_CRR_DETECTED_COUNT), + GLOBAL_HA_CRR_REFRESH_COUNT(HA_CRR_REFRESH_COUNT), + // GAUGE: most-recent-sample. Use getMetric().set(ageMs) at the sampling site; + // do NOT increment or update — that would accumulate and break gauge semantics. + GLOBAL_HA_CRR_CACHE_AGE_MS(HA_CRR_CACHE_AGE_MS), + GLOBAL_HA_POLLER_TICK_COUNT(HA_POLLER_TICK_COUNT), + GLOBAL_HA_POLLER_TICK_FAILURES(HA_POLLER_TICK_FAILURES), + GLOBAL_CLIENT_METADATA_CACHE_MISS_COUNTER(CLIENT_METADATA_CACHE_MISS_COUNTER), GLOBAL_CLIENT_METADATA_CACHE_HIT_COUNTER(CLIENT_METADATA_CACHE_HIT_COUNTER), GLOBAL_CLIENT_METADATA_CACHE_EVICTION_COUNTER(CLIENT_METADATA_CACHE_EVICTION_COUNTER), diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/MetricType.java b/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/MetricType.java index ff80705c0d4..22a9ecbdcf3 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/MetricType.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/MetricType.java @@ -353,6 +353,35 @@ public enum MetricType { LogLevel.DEBUG, PLong.INSTANCE), HA_PARALLEL_CONNECTION_CREATED_COUNTER("hpccc", "Counter for the number of parallel phoenix connections that were created", LogLevel.DEBUG, + PLong.INSTANCE), + HA_FAILOVER_COUNT("hafc", + "Counter for cluster-level failover transitions (active URL flips between peers or recovers " + + "from no-active state) recorded at the CRR write site", + LogLevel.DEBUG, PLong.INSTANCE), + HA_FAILOVER_DURATION_MS("hafd", + "Total time in milliseconds spent in connection-level failover transitions, summed across " + + "all observing connections (per-connection observation, not per-cluster-event)", + LogLevel.DEBUG, PLong.INSTANCE), + HA_MUTATION_BLOCKED_COUNT("hambc", + "Counter for MutationBlockedIOException surfaces caught by wrapActionDuringFailover", + LogLevel.DEBUG, PLong.INSTANCE), + HA_STALE_CRR_DETECTED_COUNT("hascd", + "Counter for StaleClusterRoleRecordException surfaces caught by wrapActionDuringFailover", + LogLevel.DEBUG, PLong.INSTANCE), + HA_CRR_REFRESH_COUNT("hcrc", "Counter for refreshClusterRoleRecord invocations", LogLevel.DEBUG, + PLong.INSTANCE), + // GAUGE SEMANTICS: most-recent-sample, NOT an accumulator. Callers MUST use + // getMetric().set(ageMs) to overwrite the previous sample, never increment/update. + // Misuse will produce a monotonically-growing accumulator, which is the wrong shape for an + // "age since last refresh" gauge. + HA_CRR_CACHE_AGE_MS("hccg", + "Gauge: most-recent-sample of milliseconds since the last successful CRR refresh " + + "(use getMetric().set(ageMs))", + LogLevel.DEBUG, PLong.INSTANCE), + HA_POLLER_TICK_COUNT("hptc", "Counter for non-active CRR poller ticks executed", LogLevel.DEBUG, + PLong.INSTANCE), + HA_POLLER_TICK_FAILURES("hptf", + "Counter for non-active CRR poller tick failures (caught SQLException)", LogLevel.DEBUG, PLong.INSTANCE); private final String description; diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/util/GetClusterRoleRecordUtil.java b/phoenix-core-client/src/main/java/org/apache/phoenix/util/GetClusterRoleRecordUtil.java index 5adbedb1692..44d859fedf4 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/util/GetClusterRoleRecordUtil.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/util/GetClusterRoleRecordUtil.java @@ -42,6 +42,7 @@ import org.apache.phoenix.jdbc.HighAvailabilityGroup; import org.apache.phoenix.jdbc.HighAvailabilityPolicy; import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.monitoring.GlobalClientMetrics; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -216,6 +217,7 @@ private static void schedulePoller(String url1, String url2, String haGroupName, Runnable pollingTask = () -> { // Increment unconditionally so a failed tick still alternates next iteration. long tick = tickCount.getAndIncrement(); + GlobalClientMetrics.GLOBAL_HA_POLLER_TICK_COUNT.increment(); String tickUrl = selectUrlForTick(url1, url2, tick); try { ClusterRoleRecord polledCrr = @@ -242,6 +244,7 @@ private static void schedulePoller(String url1, String url2, String haGroupName, } } } catch (SQLException e) { + GlobalClientMetrics.GLOBAL_HA_POLLER_TICK_FAILURES.increment(); LOGGER.error( "Exception found while polling for ClusterRoleRecord on {} for HA group" + " {}: {}", tickUrl, haGroupName, e.getMessage()); diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java index 25477196950..0ba7044037a 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java @@ -115,6 +115,7 @@ import org.apache.phoenix.hbase.index.builder.IndexBuilder; import org.apache.phoenix.hbase.index.covered.IndexMetaData; import org.apache.phoenix.hbase.index.covered.update.ColumnReference; +import org.apache.phoenix.hbase.index.metrics.MetricsHaBypassSourceFactory; import org.apache.phoenix.hbase.index.metrics.MetricsIndexerSource; import org.apache.phoenix.hbase.index.metrics.MetricsIndexerSourceFactory; import org.apache.phoenix.hbase.index.table.HTableInterfaceReference; @@ -746,6 +747,20 @@ public void preBatchMutate(ObserverContext c, // Extract HAGroupName from the mutations Optional logGroup = getHAGroupFromBatch(c.getEnvironment(), miniBatchOp); + // Mutation batches that arrive without a resolvable HA group cannot be evaluated against + // the cluster-role-based mutation-block gate. Track the bypass globally (not per-table) + // so operators can spot regressions where a write path forgets to attach the + // _HAGroupName attribute. Scope is intentionally !logGroup.isPresent() regardless of + // dataTableName — system-HA-group writes WITH a haGroup are an *intended* gate exemption + // (state writes must proceed during a block window) and are not counted as bypasses. + if (!logGroup.isPresent()) { + try { + MetricsHaBypassSourceFactory.getInstance().incrementBypassedMutationBlockCount(); + } catch (Throwable t) { + LOG.warn("Failed to increment bypassed mutation block count metric; continuing", t); + } + } + // We don't want to check for mutation blocking for the system ha group table if (!dataTableName.equals(SYSTEM_HA_GROUP_NAME) && logGroup.isPresent()) { // Check if mutation is blocked for the HA Group diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsHaBypassSource.java b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsHaBypassSource.java new file mode 100644 index 00000000000..a44dcb072da --- /dev/null +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsHaBypassSource.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.hbase.index.metrics; + +import org.apache.hadoop.hbase.metrics.BaseSource; + +/** + * Server-side JMX metrics source for tracking mutations that bypass the cluster-role-based + * mutation-block gate inside {@code IndexRegionObserver.preBatchMutate}. A bypass occurs when the + * mutation batch reaches the gate without an associated HA group attribute, so the gate has no + * haGroupName to evaluate state against and skips the block check entirely. + *

+ * A non-zero counter post-deploy can indicate that some write paths reach the gate without carrying + * the {@code _HAGroupName} attribute, which in turn means those writes can proceed during a + * mutation-block window and bypass the safety property the gate exists to enforce. Operators should + * treat sustained non-zero values as a regression signal. + */ +public interface MetricsHaBypassSource extends BaseSource { + + String METRICS_NAME = "HaBypass"; + String METRICS_CONTEXT = "phoenix"; + String METRICS_DESCRIPTION = + "Metrics for cluster-role-based mutation-block bypass events on the RegionServer"; + String METRICS_JMX_CONTEXT = "RegionServer,sub=" + METRICS_NAME; + + String BYPASSED_MUTATION_BLOCK_COUNT = "bypassedMutationBlockCount"; + String BYPASSED_MUTATION_BLOCK_COUNT_DESC = + "Counter for mutation batches that reached preBatchMutate without an associated HA group " + + "(no _HAGroupName attribute), causing the cluster-role-based mutation-block gate to be " + + "skipped"; + + /** + * Increments the bypass counter. Called from {@code IndexRegionObserver.preBatchMutate} when the + * resolved {@code Optional} is empty (i.e., the mutation batch carries no HA + * group attribute and the mutation-block gate cannot be evaluated). + */ + void incrementBypassedMutationBlockCount(); +} diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsHaBypassSourceFactory.java b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsHaBypassSourceFactory.java new file mode 100644 index 00000000000..66e6f3aadd9 --- /dev/null +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsHaBypassSourceFactory.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.hbase.index.metrics; + +/** + * Factory for the per-RegionServer {@link MetricsHaBypassSource} singleton. Unlike the per-haGroup + * factories elsewhere in the codebase, the bypass counter is a single global (per-JVM, + * per-RegionServer) counter, so this factory holds one eagerly-initialized instance rather than a + * {@code ConcurrentHashMap} keyed on haGroupName. + */ +public final class MetricsHaBypassSourceFactory { + + private static final MetricsHaBypassSource INSTANCE = new MetricsHaBypassSourceImpl(); + + private MetricsHaBypassSourceFactory() { + } + + /** + * Returns the process-wide {@link MetricsHaBypassSource} singleton. The instance is initialized + * eagerly at class-load time, so this method is thread-safe without any additional + * synchronization. + * @return the singleton {@link MetricsHaBypassSource} for this RegionServer JVM + */ + public static MetricsHaBypassSource getInstance() { + return INSTANCE; + } +} diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsHaBypassSourceImpl.java b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsHaBypassSourceImpl.java new file mode 100644 index 00000000000..5963b11fae7 --- /dev/null +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsHaBypassSourceImpl.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.hbase.index.metrics; + +import org.apache.hadoop.hbase.metrics.BaseSourceImpl; +import org.apache.hadoop.metrics2.lib.MutableFastCounter; + +/** + * Implementation of {@link MetricsHaBypassSource} backed by a single {@link MutableFastCounter} on + * the RegionServer JMX registry. The counter is per-RegionServer (not keyed by HA group, by design + * — a bypass event has no haGroupName to attribute against). + */ +public class MetricsHaBypassSourceImpl extends BaseSourceImpl implements MetricsHaBypassSource { + + private final MutableFastCounter bypassedMutationBlockCount; + + /** + * Default constructor used by {@link MetricsHaBypassSourceFactory}. Registers the source under + * the standard {@code HaBypass} JMX context shared with operator dashboards. + */ + public MetricsHaBypassSourceImpl() { + this(METRICS_NAME, METRICS_DESCRIPTION, METRICS_CONTEXT, METRICS_JMX_CONTEXT); + } + + /** + * Test-friendly constructor that lets callers override the metrics-registry naming, so unit tests + * can register an isolated source without colliding with the production singleton. + * @param metricsName short name reported to the metrics registry + * @param metricsDescription human-readable description for the registry + * @param metricsContext hadoop-metrics2 context name + * @param metricsJmxContext JMX context (typically {@code "RegionServer,sub=" + metricsName}) + */ + public MetricsHaBypassSourceImpl(String metricsName, String metricsDescription, + String metricsContext, String metricsJmxContext) { + super(metricsName, metricsDescription, metricsContext, metricsJmxContext); + bypassedMutationBlockCount = getMetricsRegistry().newCounter(BYPASSED_MUTATION_BLOCK_COUNT, + BYPASSED_MUTATION_BLOCK_COUNT_DESC, 0L); + } + + @Override + public void incrementBypassedMutationBlockCount() { + bypassedMutationBlockCount.incr(); + } + + /** + * Test-only accessor returning the current counter value. Public so unit/integration tests + * outside this package can assert increments without going through JMX read scaffolding. + */ + public long getBypassedMutationBlockCountForTesting() { + return bypassedMutationBlockCount.value(); + } +} diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/BypassedMutationBlockMetricsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/BypassedMutationBlockMetricsIT.java new file mode 100644 index 00000000000..4db777d2877 --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/BypassedMutationBlockMetricsIT.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.end2end.index; + +import static org.apache.phoenix.query.BaseTest.generateUniqueName; +import static org.apache.phoenix.query.QueryServices.CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED; +import static org.junit.Assert.assertTrue; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.util.Properties; +import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest; +import org.apache.phoenix.hbase.index.metrics.MetricsHaBypassSourceFactory; +import org.apache.phoenix.hbase.index.metrics.MetricsHaBypassSourceImpl; +import org.apache.phoenix.jdbc.HABaseIT; +import org.apache.phoenix.jdbc.PhoenixDriver; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +/** + * Integration test for the server-side {@code bypassedMutationBlockCount} JMX counter. A "bypass" + * is a mutation batch that reaches {@code IndexRegionObserver.preBatchMutate} without an associated + * HA group attribute, causing the cluster-role-based mutation-block gate to be skipped. This IT + * drives a write that does not carry {@code _HAGroupName} and asserts the counter increments. + */ +@Category(NeedsOwnMiniClusterTest.class) +public class BypassedMutationBlockMetricsIT extends HABaseIT { + + @BeforeClass + public static synchronized void doSetup() throws Exception { + CLUSTERS.getHBaseCluster1().getConfiguration() + .setBoolean(CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED, true); + CLUSTERS.getHBaseCluster2().getConfiguration() + .setBoolean(CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED, true); + CLUSTERS.start(); + DriverManager.registerDriver(PhoenixDriver.INSTANCE); + } + + @Test(timeout = 300000) + public void testBypassedMutationBlockCount() throws Exception { + MetricsHaBypassSourceImpl source = + (MetricsHaBypassSourceImpl) MetricsHaBypassSourceFactory.getInstance(); + long before = source.getBypassedMutationBlockCountForTesting(); + + String dataTableName = generateUniqueName(); + String indexName = generateUniqueName(); + + // Connect directly to cluster1 (single-cluster JDBC URL — no _HAGroupName attribute). + // Use the master/RPC address rather than the ZK url so the rpc URL parser doesn't + // choke on the `=` token that ZK urls embed. + Properties props = new Properties(); + try (Connection conn = + DriverManager.getConnection(CLUSTERS.getJdbcUrl(CLUSTERS.getMasterAddress1()), props)) { + conn.createStatement().execute( + "CREATE TABLE " + dataTableName + " (id VARCHAR PRIMARY KEY, name VARCHAR, age INTEGER)"); + conn.createStatement() + .execute("CREATE INDEX " + indexName + " ON " + dataTableName + "(name)"); + conn.createStatement().execute("UPSERT INTO " + dataTableName + " VALUES ('1', 'A', 1)"); + conn.commit(); + } + + long after = source.getBypassedMutationBlockCountForTesting(); + assertTrue( + "bypassedMutationBlockCount should increment for mutations without _HAGroupName attribute", + after > before); + } +} diff --git a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupMetricsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupMetricsIT.java new file mode 100644 index 00000000000..e1b65d317ba --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupMetricsIT.java @@ -0,0 +1,259 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.jdbc; + +import static org.apache.phoenix.jdbc.HighAvailabilityGroup.PHOENIX_HA_GROUP_ATTR; +import static org.apache.phoenix.jdbc.HighAvailabilityTestingUtility.getHighAvailibilityGroup; +import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HA_CRR_CACHE_AGE_MS; +import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HA_CRR_REFRESH_COUNT; +import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HA_FAILOVER_COUNT; +import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HA_FAILOVER_DURATION_MS; +import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HA_MUTATION_BLOCKED_COUNT; +import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HA_POLLER_TICK_COUNT; +import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HA_POLLER_TICK_FAILURES; +import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HA_STALE_CRR_DETECTED_COUNT; +import static org.apache.phoenix.query.QueryServices.CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.util.Properties; +import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException; +import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest; +import org.apache.phoenix.exception.MutationBlockedIOException; +import org.apache.phoenix.execute.CommitException; +import org.apache.phoenix.jdbc.ClusterRoleRecord.ClusterRole; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Integration test for the client-side HA observability metrics. Each test resets the relevant + * {@link org.apache.phoenix.monitoring.GlobalClientMetrics} entries before exercising the code path + * that should emit them, then asserts the post-condition counter/gauge value. + */ +@Category(NeedsOwnMiniClusterTest.class) +public class HAGroupMetricsIT extends HABaseIT { + private static final Logger LOG = LoggerFactory.getLogger(HAGroupMetricsIT.class); + + @Rule + public final TestName testName = new TestName(); + + private Properties clientProperties; + private HighAvailabilityGroup haGroup; + private String tableName; + private String haGroupName; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + CLUSTERS.getHBaseCluster1().getConfiguration() + .setBoolean(CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED, true); + CLUSTERS.getHBaseCluster2().getConfiguration() + .setBoolean(CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED, true); + CLUSTERS.start(); + DriverManager.registerDriver(PhoenixDriver.INSTANCE); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + DriverManager.deregisterDriver(PhoenixDriver.INSTANCE); + CLUSTERS.close(); + } + + @Before + public void setUp() throws Exception { + haGroupName = testName.getMethodName(); + clientProperties = HighAvailabilityTestingUtility.getHATestProperties(); + clientProperties.setProperty(PHOENIX_HA_GROUP_ATTR, haGroupName); + CLUSTERS.initClusterRole(haGroupName, HighAvailabilityPolicy.FAILOVER); + haGroup = getHighAvailibilityGroup(CLUSTERS.getJdbcHAUrl(), clientProperties); + tableName = testName.getMethodName().toUpperCase(); + CLUSTERS.createTableOnClusterPair(haGroup, tableName); + resetAllHaMetrics(); + } + + @After + public void tearDown() throws Exception { + try { + haGroup.close(); + } catch (Exception e) { + LOG.error("Fail to tear down HA group; ignoring", e); + } + } + + @Test(timeout = 300000) + public void testFailoverCountAndDuration() throws Exception { + long countBefore = GLOBAL_HA_FAILOVER_COUNT.getMetric().getValue(); + long durationBefore = GLOBAL_HA_FAILOVER_DURATION_MS.getMetric().getValue(); + + try (Connection conn = DriverManager.getConnection(CLUSTERS.getJdbcHAUrl(), clientProperties)) { + conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES (1, 1)"); + conn.commit(); + CLUSTERS.transitClusterRole(haGroup, ClusterRole.STANDBY, ClusterRole.ACTIVE); + } + + long countAfter = GLOBAL_HA_FAILOVER_COUNT.getMetric().getValue(); + long durationAfter = GLOBAL_HA_FAILOVER_DURATION_MS.getMetric().getValue(); + assertTrue("HA_FAILOVER_COUNT should increment on cluster role transition", + countAfter > countBefore); + assertTrue("HA_FAILOVER_DURATION_MS sum should grow on transition", + durationAfter >= durationBefore); + } + + @Test(timeout = 300000) + public void testStaleCrrDetectedCount() throws Exception { + long before = GLOBAL_HA_STALE_CRR_DETECTED_COUNT.getMetric().getValue(); + // Drive a cluster-role transition that the wrapped connection observes as + // stale-CRR — failover() flags STALE_CLUSTER_ROLE_RECORD and increments the counter. + try (Connection conn = DriverManager.getConnection(CLUSTERS.getJdbcHAUrl(), clientProperties)) { + conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES (2, 2)"); + conn.commit(); + CLUSTERS.transitClusterRole(haGroup, ClusterRole.ACTIVE_TO_STANDBY, ClusterRole.STANDBY); + // doRefreshHAGroup=false: keep haGroup's CRR snapshot stale on purpose so that + // the next mutation drives StaleClusterRoleRecordException through wrapActionDuringFailover, + // exercising the GLOBAL_HA_STALE_CRR_DETECTED_COUNT increment path. + CLUSTERS.transitClusterRole(haGroup, ClusterRole.STANDBY, ClusterRole.ACTIVE, false); + // Issue another mutation while the connection still holds the pre-transition CRR snapshot. + // The server's HAGroupStoreManager will detect the version mismatch and throw + // StaleClusterRoleRecordException, which wrapActionDuringFailover catches to increment the + // counter (and rethrow FAILOVER_IN_PROGRESS). + try { + conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES (3, 3)"); + conn.commit(); + } catch (Exception expected) { + // Stale-CRR surfaces as FAILOVER_IN_PROGRESS after the wrap-and-rethrow path; the + // increment-side-effect is what we are asserting on, not the exception type itself. + } + } + long after = GLOBAL_HA_STALE_CRR_DETECTED_COUNT.getMetric().getValue(); + assertTrue("HA_STALE_CRR_DETECTED_COUNT should strictly increment when CRR is detected stale " + + "(before=" + before + ", after=" + after + ")", after > before); + } + + @Test(timeout = 300000) + public void testMutationBlockedCount() throws Exception { + long before = GLOBAL_HA_MUTATION_BLOCKED_COUNT.getMetric().getValue(); + String peerZkUrl = CLUSTERS.getZkUrl2(); + PhoenixHAAdmin haAdmin = CLUSTERS.getHaAdmin1(); + + try (FailoverPhoenixConnection conn = (FailoverPhoenixConnection) DriverManager + .getConnection(CLUSTERS.getJdbcHAUrl(), clientProperties)) { + conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES (3, 3)"); + conn.commit(); + + HAGroupStoreRecord blocking = + new HAGroupStoreRecord(HAGroupStoreRecord.DEFAULT_PROTOCOL_VERSION, haGroupName, + HAGroupStoreRecord.HAGroupState.ACTIVE_IN_SYNC_TO_STANDBY, 0L, + HighAvailabilityPolicy.FAILOVER.toString(), peerZkUrl, CLUSTERS.getMasterAddress1(), + CLUSTERS.getMasterAddress2(), CLUSTERS.getHdfsUrl1(), CLUSTERS.getHdfsUrl2(), 0L); + haAdmin.updateHAGroupStoreRecordInZooKeeper(haGroupName, blocking, -1); + Thread.sleep(1000L); + + try { + conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES (4, 4)"); + conn.commit(); + fail("Expected MutationBlockedIOException during ACTIVE_TO_STANDBY"); + } catch (CommitException e) { + Throwable cause = e.getCause(); + assertTrue("Expected MutationBlockedIOException in chain", + cause instanceof RetriesExhaustedWithDetailsException + && ((RetriesExhaustedWithDetailsException) cause) + .getCause(0) instanceof MutationBlockedIOException); + } + } + long after = GLOBAL_HA_MUTATION_BLOCKED_COUNT.getMetric().getValue(); + assertTrue("HA_MUTATION_BLOCKED_COUNT should increment when MBE is observed", after > before); + } + + @Test(timeout = 300000) + public void testCrrRefreshCount() throws Exception { + long before = GLOBAL_HA_CRR_REFRESH_COUNT.getMetric().getValue(); + haGroup.refreshClusterRoleRecord(false); + haGroup.refreshClusterRoleRecord(false); + long after = GLOBAL_HA_CRR_REFRESH_COUNT.getMetric().getValue(); + assertTrue("HA_CRR_REFRESH_COUNT should increment on each refresh", after - before >= 2); + } + + @Test(timeout = 300000) + public void testCrrCacheAgeMs() throws Exception { + // Pre-refresh assertion: a connect() against a freshly init()-ed HA group must sample a + // bounded age; if init() forgets to seed lastClusterRoleRecordRefreshTime the gauge will + // record currentTimeMillis() - 0 (~1.7e12 ms) and this assertion will fail. + try (Connection conn = DriverManager.getConnection(CLUSTERS.getJdbcHAUrl(), clientProperties)) { + conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES (4, 4)"); + conn.commit(); + } + long ageBeforeRefresh = GLOBAL_HA_CRR_CACHE_AGE_MS.getMetric().getValue(); + assertTrue("HA_CRR_CACHE_AGE_MS gauge must be bounded on init path (was " + ageBeforeRefresh + + " ms, expected < 60_000)", ageBeforeRefresh < 60_000L); + + // Force a refresh so lastClusterRoleRecordRefreshTime is recent, then sleep so the + // gauge sample (taken on connect()) records a non-trivial age. + haGroup.refreshClusterRoleRecord(false); + Thread.sleep(50L); + try (Connection conn = DriverManager.getConnection(CLUSTERS.getJdbcHAUrl(), clientProperties)) { + conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES (5, 5)"); + conn.commit(); + } + long ageMs = GLOBAL_HA_CRR_CACHE_AGE_MS.getMetric().getValue(); + // Gauge holds the most-recent set() sample; should be > 0 and within a sane bound. + assertTrue("HA_CRR_CACHE_AGE_MS gauge should be > 0 after a connect()", ageMs > 0L); + assertTrue("HA_CRR_CACHE_AGE_MS gauge should be < 5 minutes for a fresh HA group", + ageMs < 5 * 60 * 1000L); + } + + @Test(timeout = 300000) + public void testPollerTickCount() throws Exception { + // The poller starts only when fetchClusterRoleRecord observes both roles non-active under + // FAILOVER policy. Drive that state, await a couple of ticks, and verify the counter moved. + long beforeTicks = GLOBAL_HA_POLLER_TICK_COUNT.getMetric().getValue(); + long beforeFailures = GLOBAL_HA_POLLER_TICK_FAILURES.getMetric().getValue(); + + CLUSTERS.transitClusterRole(haGroup, ClusterRole.ACTIVE_TO_STANDBY, ClusterRole.STANDBY); + CLUSTERS.transitClusterRole(haGroup, ClusterRole.STANDBY, ClusterRole.STANDBY); + + // Allow at least 2 poller ticks at default interval to land. + Thread.sleep(15_000L); + + long afterTicks = GLOBAL_HA_POLLER_TICK_COUNT.getMetric().getValue(); + long afterFailures = GLOBAL_HA_POLLER_TICK_FAILURES.getMetric().getValue(); + assertTrue("HA_POLLER_TICK_COUNT should advance once poller is scheduled", + afterTicks > beforeTicks); + // Failures may or may not occur in mini-cluster; just assert non-decreasing. + assertTrue("HA_POLLER_TICK_FAILURES should be monotonic", afterFailures >= beforeFailures); + } + + private void resetAllHaMetrics() { + GLOBAL_HA_FAILOVER_COUNT.getMetric().reset(); + GLOBAL_HA_FAILOVER_DURATION_MS.getMetric().reset(); + GLOBAL_HA_MUTATION_BLOCKED_COUNT.getMetric().reset(); + GLOBAL_HA_STALE_CRR_DETECTED_COUNT.getMetric().reset(); + GLOBAL_HA_CRR_REFRESH_COUNT.getMetric().reset(); + GLOBAL_HA_CRR_CACHE_AGE_MS.getMetric().reset(); + GLOBAL_HA_POLLER_TICK_COUNT.getMetric().reset(); + GLOBAL_HA_POLLER_TICK_FAILURES.getMetric().reset(); + } +} diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/metrics/MetricsHaBypassSourceFactoryTest.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/metrics/MetricsHaBypassSourceFactoryTest.java new file mode 100644 index 00000000000..18110784eab --- /dev/null +++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/metrics/MetricsHaBypassSourceFactoryTest.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.hbase.index.metrics; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertSame; + +import org.junit.Test; + +/** + * Unit tests for {@link MetricsHaBypassSourceFactory}. Asserts the eager-init singleton contract so + * a future refactor that swaps the field for a non-singleton (or a non-thread-safe) shape gets + * caught here rather than in production JMX double-registration failures. + */ +public class MetricsHaBypassSourceFactoryTest { + + @Test + public void testGetInstanceReturnsNonNull() { + assertNotNull("getInstance() must return a non-null singleton", + MetricsHaBypassSourceFactory.getInstance()); + } + + @Test + public void testGetInstanceIsIdempotent() { + MetricsHaBypassSource first = MetricsHaBypassSourceFactory.getInstance(); + MetricsHaBypassSource second = MetricsHaBypassSourceFactory.getInstance(); + assertSame("getInstance() must return the same singleton across calls", first, second); + } +} diff --git a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/HighAvailabilityUtilTest.java b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/HighAvailabilityUtilTest.java new file mode 100644 index 00000000000..0f28104a2c7 --- /dev/null +++ b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/HighAvailabilityUtilTest.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.jdbc; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException; +import org.apache.hadoop.hbase.client.Row; +import org.apache.phoenix.exception.MutationBlockedIOException; +import org.junit.Test; + +/** + * Unit tests for {@link HighAvailabilityUtil#isMutationBlockedIOExceptionExistsInThrowable}. Walks + * the four detection branches (null guard, direct match, RetriesExhaustedWithDetailsException + * causes, recursive {@code getCause()}) since the helper is invoked from commit-path error handling + * where regression of any branch would silently mis-attribute mutation-block rejections. + */ +public class HighAvailabilityUtilTest { + + @Test + public void testNullThrowableReturnsFalse() { + assertFalse("null Throwable must short-circuit to false", + HighAvailabilityUtil.isMutationBlockedIOExceptionExistsInThrowable(null)); + } + + @Test + public void testDirectMutationBlockedIOExceptionMatches() { + Throwable e = new MutationBlockedIOException("blocked"); + assertTrue("direct MutationBlockedIOException must match", + HighAvailabilityUtil.isMutationBlockedIOExceptionExistsInThrowable(e)); + } + + @Test + public void testRetriesExhaustedCausesAreScanned() { + MutationBlockedIOException blocked = new MutationBlockedIOException("blocked-in-batch"); + IOException unrelated = new IOException("unrelated"); + RetriesExhaustedWithDetailsException rewde = + new RetriesExhaustedWithDetailsException(Arrays. asList(unrelated, blocked), + Collections. emptyList(), Collections. emptyList()); + assertTrue("RetriesExhaustedWithDetailsException causes must be scanned for MBIOE", + HighAvailabilityUtil.isMutationBlockedIOExceptionExistsInThrowable(rewde)); + } + + @Test + public void testRecursiveGetCauseFindsNestedMutationBlocked() { + MutationBlockedIOException root = new MutationBlockedIOException("root-cause"); + Throwable mid = new RuntimeException("mid", root); + Throwable outer = new RuntimeException("outer", mid); + assertTrue("recursive getCause() walk must surface a deeply-nested MBIOE", + HighAvailabilityUtil.isMutationBlockedIOExceptionExistsInThrowable(outer)); + } +} From cb72d1cc92a0a63717d589e4aa6f922125b37a84 Mon Sep 17 00:00:00 2001 From: lokiore Date: Tue, 23 Jun 2026 17:29:59 -0700 Subject: [PATCH 2/2] PHOENIX-7872 :- Address apurtell review feedback (5 metric semantics + IT catch narrowing + review nits) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Five must-fixes from the PR #2502 review: 1. HA_FAILOVER_COUNT semantic — moved increment inside the transition try and gated on transitionSucceeded so only successful policy transitions count; preserves the existing metric name. Gate decision factored into the package-private static HighAvailabilityGroup#shouldCountFailover for direct unit-test coverage of the negative path (see Nit 1 below). 2. HA_CRR_REFRESH_COUNT semantic — moved increment to after a successful getClusterRoleRecordFromEndpoint() so no-op refreshes inside the cache window do NOT inflate the counter (counter now measures fresh fetches, not refresh-method invocations). 3. HA_CRR_CACHE_AGE_MS sampling — added a poller-tick sample site so the gauge updates on every poller iteration via a new HighAvailabilityGroup #getCacheAgeMs() accessor. The connect()-site sample is retained. 4. BYPASSED_MUTATION_BLOCK_COUNT framing — rewrote the IndexRegionObserver inline comment and MetricsHaBypassSource Javadoc/descriptions as a path-coverage detector (counts the short-circuit code path; does NOT imply any safety property was breached). 5. HAGroupMetricsIT catch narrowing — narrowed the stale-CRR catch from the broad catch(Exception) to catch(SQLException) and asserted the error code is FAILOVER_IN_PROGRESS (the contracted surface from FailoverPhoenixConnection.wrapActionDuringFailover); added a LOG.info so the expected exception is recorded. Tests adjusted to match the new main-code semantics: - HAGroupMetricsIT.testCrrRefreshCount — switched both refresh calls to force-refresh so the assertion exercises the actual-fetch path the counter now measures. Inline comment explains WHY non-force inside the cache window intentionally no longer increments. Review-nit follow-ups: - Nit 1 (negative-path coverage for Fix #1): extracted the failover-gate decision into a pure, package-private static helper HighAvailabilityGroup#shouldCountFailover(boolean, ClusterRoleRecord, ClusterRoleRecord) and added HighAvailabilityGroupTest #testShouldCountFailoverGate covering 5 cases: (a) real ACTIVE-URL move counts, (b) same-active-URL no-op does NOT count, (c) transition INTO no-active does NOT count, (d) transitionSucceeded=false (failed policy callback) does NOT count — the regression guard, and (e) recovery from no-active back to ACTIVE counts. - Nit 2 (poller-tick gauge value): HAGroupMetricsIT.testPollerTickCount now also asserts GLOBAL_HA_CRR_CACHE_AGE_MS.getValue() >= 0L after the poller has ticked (guards against the -1L never-refreshed sentinel leaking out through the poller-tick sample site). - Nit 3 (never-refreshed disambiguation): HighAvailabilityGroup #getCacheAgeMs() now returns -1L when lastClusterRoleRecordRefreshTime is 0, instead of 0L. This disambiguates "never sampled" from "refreshed within the same millisecond" on the gauge, and supersedes a latent bug: because the CRR poller is scheduled with initial-delay 0, its first tick can fire before init() seeds the timestamp; under the prior code the raw arithmetic now - 0 would publish a giant value (~currentTimeMillis()) to the gauge and spuriously trip every age > threshold alert. -1L publishes a clean "not yet sampled" marker. Javadoc documents the rationale + warns future readers not to revert to return 0. Connect()-site (state-gated) is unaffected and continues to use raw arithmetic. - Nit 4 (logger arg): HAGroupMetricsIT.testStaleCrrDetectedCount LOG.info now passes testName.getMethodName() (was relying on TestName.toString() to do the right thing). Generated-by: Claude Code (Opus 4.7) --- .../phoenix/jdbc/HighAvailabilityGroup.java | 80 ++++++++++++++++--- .../util/GetClusterRoleRecordUtil.java | 13 +++ .../hbase/index/IndexRegionObserver.java | 16 ++-- .../index/metrics/MetricsHaBypassSource.java | 54 +++++++++---- .../apache/phoenix/jdbc/HAGroupMetricsIT.java | 38 +++++++-- .../jdbc/HighAvailabilityGroupTest.java | 62 ++++++++++++++ 6 files changed, 227 insertions(+), 36 deletions(-) diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HighAvailabilityGroup.java b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HighAvailabilityGroup.java index 7181b11c5f5..2239d545a9d 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HighAvailabilityGroup.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HighAvailabilityGroup.java @@ -1044,7 +1044,6 @@ private ClusterRoleRecord getClusterRoleRecordFromEndpoint() throws SQLException * @throws SQLException if there is an error getting the ClusterRoleRecord */ public boolean refreshClusterRoleRecord(boolean forceRefresh) throws SQLException { - GLOBAL_HA_CRR_REFRESH_COUNT.increment(); // Allow concurrent reads to return fast in case refresh is not needed readLock.lock(); try { @@ -1066,6 +1065,11 @@ public boolean refreshClusterRoleRecord(boolean forceRefresh) throws SQLExceptio } ClusterRoleRecord newRoleRecord = getClusterRoleRecordFromEndpoint(); + // Count only refreshes that actually fetched a CRR from the endpoint. Callers that + // short-circuit on shouldRefreshRoleRecord() above never touch the network and would + // otherwise inflate this counter against its name (a "refresh" with no fetch is a no-op + // from a CRR-state perspective). + GLOBAL_HA_CRR_REFRESH_COUNT.increment(); if (roleRecord == null) { // First-load init path: no prior cache state to compare against, so this is not a // failover transition and HA_FAILOVER_COUNT is intentionally NOT incremented here. @@ -1109,8 +1113,10 @@ public boolean refreshClusterRoleRecord(boolean forceRefresh) throws SQLExceptio long maxTransitionTimeMs = StringUtils.isNotEmpty(transitionTimeoutProp) ? Long.parseLong(transitionTimeoutProp) : PHOENIX_HA_TRANSITION_TIMEOUT_MS_DEFAULT; + boolean transitionSucceeded = false; try { future.get(maxTransitionTimeMs, TimeUnit.MILLISECONDS); + transitionSucceeded = true; } catch (InterruptedException ie) { LOG.error("Got interrupted when transiting cluster roles for HA group {}", info, ie); future.cancel(true); @@ -1136,14 +1142,15 @@ public boolean refreshClusterRoleRecord(boolean forceRefresh) throws SQLExceptio // The goal here is to gain higher availability even though existing resources against // previous ACTIVE cluster may have not been closed cleanly. } - // Count the transition as a failover only when an active cluster is established - // or moves between peers. Operator-driven transitions to a no-active state - // (both clusters STANDBY) are not counted as failovers; recovery from no-active - // back to having an ACTIVE peer is counted. - if ( - !oldRecord.getActiveUrl().equals(newRoleRecord.getActiveUrl()) - && newRoleRecord.getActiveUrl().isPresent() - ) { + // Count the transition as a failover only when the policy-side transition actually + // succeeded AND an active cluster is established or moves between peers. Operator-driven + // transitions to a no-active state (both clusters STANDBY) are not counted as failovers; + // recovery from no-active back to having an ACTIVE peer is counted. Transitions where + // future.get() failed (ExecutionException/TimeoutException) are best-effort fall-through + // per the comment above, but they are NOT counted as successful failovers. Gate decision + // factored into the package-private static {@link #shouldCountFailover} so it can be + // unit-tested directly without driving a full mini-cluster transition. + if (shouldCountFailover(transitionSucceeded, oldRecord, newRoleRecord)) { GLOBAL_HA_FAILOVER_COUNT.increment(); } // Update the role record and the last refresh time @@ -1169,4 +1176,59 @@ public boolean shouldRefreshRoleRecord() { long cacheAge = System.currentTimeMillis() - lastClusterRoleRecordRefreshTime; return cacheAge >= clusterRoleRecordCacheFrequency; } + + /** + * Returns the current age, in milliseconds, of the last successful ClusterRoleRecord refresh + * (i.e. {@code now - lastClusterRoleRecordRefreshTime}). Intended for periodic sampling by + * external callers (e.g. the CRR poller) so the {@code HA_CRR_CACHE_AGE_MS} counter-backed gauge + * reflects current age without waiting for the next {@link #connect} call. + *

+ * Returns the sentinel value {@code -1L} when no refresh has occurred yet (i.e. + * {@code lastClusterRoleRecordRefreshTime == 0}). This disambiguates "never sampled" from + * "refreshed within the same millisecond" — both of which would otherwise return {@code 0L} and + * be indistinguishable on the gauge. Dashboards and alert rules should treat {@code -1L} as "no + * data yet" and gate threshold checks on {@code value >= 0}. + *

+ * The {@code -1L} sentinel also supersedes a latent bug in the prior {@code connect()}-only + * gauge-publish path: because the CRR poller is scheduled with an initial delay of {@code 0} (see + * {@code GetClusterRoleRecordUtil#schedulePoller}) it can fire its first tick before + * {@link #init} has assigned {@code lastClusterRoleRecordRefreshTime}, in which case the raw + * arithmetic {@code now - 0} would publish a giant value (~{@code currentTimeMillis()}) to the + * gauge and spuriously trip every "age > threshold" alert. Returning {@code -1L} during that race + * window publishes a clean "not yet sampled" marker instead. + *

+ * Do NOT change this back to {@code return 0L}: the connect()-site at line ~654 still uses raw + * arithmetic (it is state-gated and unreachable before {@code init()} seeds the timestamp), so + * the {@code -1L} sentinel only ever surfaces through the poller-tick sample path. + */ + public long getCacheAgeMs() { + if (lastClusterRoleRecordRefreshTime == 0) { + return -1L; + } + return System.currentTimeMillis() - lastClusterRoleRecordRefreshTime; + } + + /** + * Decides whether a CRR transition counts as a "failover" for {@code HA_FAILOVER_COUNT} purposes. + * Returns {@code true} iff: + *

    + *
  1. the policy-side transition (via {@code future.get(timeout)}) actually succeeded — a + * {@code TimeoutException} / {@code ExecutionException} fall-through is NOT a failover, AND
  2. + *
  3. the active-cluster URL actually moved between peers (or recovered from no-active), AND
  4. + *
  5. the new record has an ACTIVE cluster — transitions INTO a no-active state are not + * failovers.
  6. + *
+ * Pure function of its inputs (no global state read, no clock read) so it is straightforward to + * unit-test the gate without driving a full mini-cluster transition. Package-private rather than + * private so {@code HighAvailabilityGroupTest} can call it directly. + * @param transitionSucceeded whether {@code future.get(maxTransitionTimeMs)} returned normally + * @param oldRecord the previous {@link ClusterRoleRecord} + * @param newRecord the candidate new {@link ClusterRoleRecord} + * @return whether to increment {@code HA_FAILOVER_COUNT} for this transition + */ + static boolean shouldCountFailover(boolean transitionSucceeded, ClusterRoleRecord oldRecord, + ClusterRoleRecord newRecord) { + return transitionSucceeded && !oldRecord.getActiveUrl().equals(newRecord.getActiveUrl()) + && newRecord.getActiveUrl().isPresent(); + } } diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/util/GetClusterRoleRecordUtil.java b/phoenix-core-client/src/main/java/org/apache/phoenix/util/GetClusterRoleRecordUtil.java index 44d859fedf4..d76d54fc62c 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/util/GetClusterRoleRecordUtil.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/util/GetClusterRoleRecordUtil.java @@ -218,6 +218,19 @@ private static void schedulePoller(String url1, String url2, String haGroupName, // Increment unconditionally so a failed tick still alternates next iteration. long tick = tickCount.getAndIncrement(); GlobalClientMetrics.GLOBAL_HA_POLLER_TICK_COUNT.increment(); + // Sample current CRR cache age into the gauge each tick. Without this, the + // HA_CRR_CACHE_AGE_MS counter-backed gauge is only updated on connect() and would + // not advance during idle periods between connects, making it look fresher than it + // actually is. Sampling here puts the gauge on a steady wall-clock cadence matching + // the poller's polling interval. + try { + GlobalClientMetrics.GLOBAL_HA_CRR_CACHE_AGE_MS.getMetric().set(haGroup.getCacheAgeMs()); + } catch (Throwable t) { + // Metric sampling is best-effort; never let a metric write break the poller tick. + LOGGER.warn( + "Failed to sample HA_CRR_CACHE_AGE_MS on poller tick for HA group {}; " + "continuing", + haGroupName, t); + } String tickUrl = selectUrlForTick(url1, url2, tick); try { ClusterRoleRecord polledCrr = diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java index 0ba7044037a..287cd707160 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java @@ -747,12 +747,16 @@ public void preBatchMutate(ObserverContext c, // Extract HAGroupName from the mutations Optional logGroup = getHAGroupFromBatch(c.getEnvironment(), miniBatchOp); - // Mutation batches that arrive without a resolvable HA group cannot be evaluated against - // the cluster-role-based mutation-block gate. Track the bypass globally (not per-table) - // so operators can spot regressions where a write path forgets to attach the - // _HAGroupName attribute. Scope is intentionally !logGroup.isPresent() regardless of - // dataTableName — system-HA-group writes WITH a haGroup are an *intended* gate exemption - // (state writes must proceed during a block window) and are not counted as bypasses. + // Path-coverage counter: increments whenever a mutation batch reaches preBatchMutate + // without a resolvable HA group attribute, so the cluster-role-based mutation-block gate + // has no haGroupName to evaluate against and is skipped. This counts the code path being + // short-circuited — it does NOT imply any safety property was breached (when the block + // feature is disabled or no block window is active, there is no property to breach). + // Tracked globally rather than per-table so operators can compare baseline vs. + // post-deploy delta to spot new write paths that forgot to attach _HAGroupName. + // Intentionally scoped to !logGroup.isPresent() regardless of dataTableName — + // system-HA-group writes WITH a haGroup are an intended gate exemption (state writes + // must proceed during a block window) and are not counted here. if (!logGroup.isPresent()) { try { MetricsHaBypassSourceFactory.getInstance().incrementBypassedMutationBlockCount(); diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsHaBypassSource.java b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsHaBypassSource.java index a44dcb072da..276f1f2bb63 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsHaBypassSource.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsHaBypassSource.java @@ -20,34 +20,58 @@ import org.apache.hadoop.hbase.metrics.BaseSource; /** - * Server-side JMX metrics source for tracking mutations that bypass the cluster-role-based - * mutation-block gate inside {@code IndexRegionObserver.preBatchMutate}. A bypass occurs when the - * mutation batch reaches the gate without an associated HA group attribute, so the gate has no - * haGroupName to evaluate state against and skips the block check entirely. + * Server-side JMX metrics source that counts how many mutation batches pass through + * {@code IndexRegionObserver.preBatchMutate} without a resolvable HA group attribute, so + * the cluster-role-based mutation-block gate has no haGroupName to evaluate against and is skipped + * for that batch. *

- * A non-zero counter post-deploy can indicate that some write paths reach the gate without carrying - * the {@code _HAGroupName} attribute, which in turn means those writes can proceed during a - * mutation-block window and bypass the safety property the gate exists to enforce. Operators should - * treat sustained non-zero values as a regression signal. + * This is a path-coverage detector, not a safety violation alarm. The counter is + * incremented in {@code IndexRegionObserver.preBatchMutate} for every mutation batch whose + * {@code _HAGroupName} attribute cannot be resolved — regardless of whether the cluster-role-based + * mutation-block feature is enabled, regardless of whether a mutation-block window is currently + * active for any HA group, and regardless of whether the table being written to is HA-replicated. + * The "bypass" terminology refers strictly to the gate-evaluation code path being short-circuited; + * it does not imply that any safety property was breached — when the block feature + * is disabled or no block window is active, there is no property to breach in the first place. + *

+ * Intended operator use: + *

    + *
  • Path coverage: baseline rate tells you what fraction of mutations on this + * RegionServer reach the gate without an HA group attribute. A sustained baseline of zero on an + * HA-enabled cluster suggests the gate is effectively dead code on that path; a sustained non-zero + * baseline tells you those write paths exist and need to be either tagged or consciously + * exempted.
  • + *
  • Regression signal: a delta against baseline (especially after a + * deploy that added a new mutation path) is the actionable signal — it indicates a newly introduced + * write path is not tagging mutations with {@code _HAGroupName}.
  • + *
+ * The absolute value alone is not actionable; pair it with deploy markers and the + * mutation-block-enabled config to interpret correctly. */ public interface MetricsHaBypassSource extends BaseSource { String METRICS_NAME = "HaBypass"; String METRICS_CONTEXT = "phoenix"; String METRICS_DESCRIPTION = - "Metrics for cluster-role-based mutation-block bypass events on the RegionServer"; + "Metrics for cluster-role-based mutation-block gate path-coverage on the RegionServer"; String METRICS_JMX_CONTEXT = "RegionServer,sub=" + METRICS_NAME; String BYPASSED_MUTATION_BLOCK_COUNT = "bypassedMutationBlockCount"; String BYPASSED_MUTATION_BLOCK_COUNT_DESC = - "Counter for mutation batches that reached preBatchMutate without an associated HA group " - + "(no _HAGroupName attribute), causing the cluster-role-based mutation-block gate to be " - + "skipped"; + "Path-coverage counter: number of mutation batches that reached preBatchMutate without a " + + "resolvable _HAGroupName attribute (so the cluster-role-based mutation-block gate had " + + "nothing to evaluate against and was skipped). Counts the code path being skipped, not " + + "a safety breach — when the block feature is disabled or no block window is active, " + + "there is no property to breach. Actionable signal is delta-against-baseline (e.g., a " + + "spike after a deploy introducing a new mutation path), not absolute value"; /** - * Increments the bypass counter. Called from {@code IndexRegionObserver.preBatchMutate} when the - * resolved {@code Optional} is empty (i.e., the mutation batch carries no HA - * group attribute and the mutation-block gate cannot be evaluated). + * Increments the gate-skipped-path counter. Called unconditionally from + * {@code IndexRegionObserver.preBatchMutate} whenever the resolved + * {@code Optional} is empty — i.e., the mutation batch carries no + * {@code _HAGroupName} attribute and the cluster-role-based mutation-block gate cannot be + * evaluated for it. Independent of whether the mutation-block feature is enabled or any block + * window is active. */ void incrementBypassedMutationBlockCount(); } diff --git a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupMetricsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupMetricsIT.java index e1b65d317ba..088ccff183a 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupMetricsIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupMetricsIT.java @@ -33,10 +33,12 @@ import java.sql.Connection; import java.sql.DriverManager; +import java.sql.SQLException; import java.util.Properties; import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException; import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest; import org.apache.phoenix.exception.MutationBlockedIOException; +import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.execute.CommitException; import org.apache.phoenix.jdbc.ClusterRoleRecord.ClusterRole; import org.junit.After; @@ -143,9 +145,21 @@ public void testStaleCrrDetectedCount() throws Exception { try { conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES (3, 3)"); conn.commit(); - } catch (Exception expected) { - // Stale-CRR surfaces as FAILOVER_IN_PROGRESS after the wrap-and-rethrow path; the - // increment-side-effect is what we are asserting on, not the exception type itself. + } catch (SQLException expected) { + // Stale-CRR surfaces here as a SQLException with FAILOVER_IN_PROGRESS (1990 / F1Q90): + // wrapActionDuringFailover catches StaleClusterRoleRecordException, increments + // GLOBAL_HA_STALE_CRR_DETECTED_COUNT, then rethrows a freshly-built SQLException via + // SQLExceptionInfo.Builder(FAILOVER_IN_PROGRESS).build().buildException(). The error + // code is the contracted surface — any other SQLException must fail this assertion + // rather than be silently swallowed (the prior catch(Exception) hid setup bugs by + // treating ANY failure as the expected path). + assertTrue( + "Expected FAILOVER_IN_PROGRESS error code, got " + expected.getErrorCode() + " (" + + expected.getSQLState() + ")", + expected.getErrorCode() == SQLExceptionCode.FAILOVER_IN_PROGRESS.getErrorCode()); + LOG.info( + "Expected FAILOVER_IN_PROGRESS SQLException surfaced from stale-CRR mutation " + "in {}", + testName.getMethodName(), expected); } } long after = GLOBAL_HA_STALE_CRR_DETECTED_COUNT.getMetric().getValue(); @@ -191,10 +205,14 @@ public void testMutationBlockedCount() throws Exception { @Test(timeout = 300000) public void testCrrRefreshCount() throws Exception { long before = GLOBAL_HA_CRR_REFRESH_COUNT.getMetric().getValue(); - haGroup.refreshClusterRoleRecord(false); - haGroup.refreshClusterRoleRecord(false); + // Force-refresh both calls: HA_CRR_REFRESH_COUNT only increments on a refresh that + // actually fetched a CRR from the endpoint — a non-force refresh inside the cache window + // short-circuits before getClusterRoleRecordFromEndpoint() and intentionally does NOT + // bump the counter (counter measures "fresh fetches", not "refresh calls"). + haGroup.refreshClusterRoleRecord(true); + haGroup.refreshClusterRoleRecord(true); long after = GLOBAL_HA_CRR_REFRESH_COUNT.getMetric().getValue(); - assertTrue("HA_CRR_REFRESH_COUNT should increment on each refresh", after - before >= 2); + assertTrue("HA_CRR_REFRESH_COUNT should increment on each force-refresh", after - before >= 2); } @Test(timeout = 300000) @@ -244,6 +262,14 @@ public void testPollerTickCount() throws Exception { afterTicks > beforeTicks); // Failures may or may not occur in mini-cluster; just assert non-decreasing. assertTrue("HA_POLLER_TICK_FAILURES should be monotonic", afterFailures >= beforeFailures); + // Verify the poller-tick gauge sample site fires: by the time we've slept 15s and seen + // multiple ticks, init() has long-since seeded lastClusterRoleRecordRefreshTime, so the + // gauge holds a real age sample. >= 0 guards against the -1L "never-refreshed" sentinel + // leaking out via a buggy refactor — if the poller-tick .set() path is broken or the + // sentinel escapes the init/poller race window, this assertion catches it. + long ageGauge = GLOBAL_HA_CRR_CACHE_AGE_MS.getMetric().getValue(); + assertTrue("HA_CRR_CACHE_AGE_MS gauge should hold a non-negative age sample after poller ticks " + + "(got " + ageGauge + ")", ageGauge >= 0L); } private void resetAllHaMetrics() { diff --git a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/HighAvailabilityGroupTest.java b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/HighAvailabilityGroupTest.java index 8f52b5110c1..cc58727838d 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/HighAvailabilityGroupTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/HighAvailabilityGroupTest.java @@ -18,11 +18,14 @@ package org.apache.phoenix.jdbc; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; import java.sql.SQLException; import java.util.Properties; import org.apache.phoenix.exception.SQLExceptionCode; +import org.apache.phoenix.jdbc.ClusterRoleRecord.ClusterRole; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -123,4 +126,63 @@ private void getAndAssertUrlInfo(String url, String additionalParams, String pri private void getAndAssertUrlInfo(String url, String additionalParams) throws Exception { getAndAssertUrlInfo(url, additionalParams, null); } + + /** + * Verifies the gate decision for {@code HA_FAILOVER_COUNT} — exercised directly via the + * package-private {@link HighAvailabilityGroup#shouldCountFailover} helper rather than driving a + * mini-cluster transition. Together these cases pin down that the gate (a) counts a real ACTIVE + * URL move, (b) does NOT count a no-op (same active URL), (c) does NOT count a transition INTO a + * no-active state, and (d) does NOT count a transition where the policy callback failed + * ({@code transitionSucceeded == false}, i.e. {@code future.get()} threw {@code TimeoutException} + * / {@code ExecutionException}). The (d) negative-path assertion is the regression guard: someone + * removing the {@code transitionSucceeded &&} clause from + * {@code HighAvailabilityGroup#shouldCountFailover} would silently start counting failed + * transitions as successful failovers, and this test would fail. + */ + @Test + public void testShouldCountFailoverGate() { + String haGroupName = "testShouldCountFailoverGate"; + String url1 = "host1\\:60010"; + String url2 = "host2\\:60010"; + + ClusterRoleRecord aActiveBStandby = new ClusterRoleRecord(haGroupName, + HighAvailabilityPolicy.FAILOVER, url1, ClusterRole.ACTIVE, url2, ClusterRole.STANDBY, 1L); + ClusterRoleRecord aStandbyBActive = new ClusterRoleRecord(haGroupName, + HighAvailabilityPolicy.FAILOVER, url1, ClusterRole.STANDBY, url2, ClusterRole.ACTIVE, 2L); + ClusterRoleRecord bothStandby = new ClusterRoleRecord(haGroupName, + HighAvailabilityPolicy.FAILOVER, url1, ClusterRole.STANDBY, url2, ClusterRole.STANDBY, 3L); + + // (a) Real active-URL move with a successful policy transition → COUNT. + assertTrue( + "ACTIVE moving from cluster 1 to cluster 2 with successful policy transition " + + "should count as a failover", + HighAvailabilityGroup.shouldCountFailover(true, aActiveBStandby, aStandbyBActive)); + + // (b) Same active URL (no-op transition) → DO NOT COUNT, even if policy succeeded. + assertFalse( + "Same active URL on both sides should NOT count as a failover even with a " + + "successful policy transition", + HighAvailabilityGroup.shouldCountFailover(true, aActiveBStandby, aActiveBStandby)); + + // (c) Transition INTO no-active state (both STANDBY) → DO NOT COUNT. + assertFalse("Transition into a no-active (both STANDBY) state should NOT count as a failover", + HighAvailabilityGroup.shouldCountFailover(true, aActiveBStandby, bothStandby)); + + // (d) NEGATIVE PATH — policy callback failed (transitionSucceeded=false, simulating + // future.get() throwing TimeoutException or ExecutionException) → DO NOT COUNT, even if + // the active URL appears to have moved. This is the regression guard for the + // {@code transitionSucceeded &&} clause; removing it would silently inflate + // HA_FAILOVER_COUNT on failed transitions. + assertFalse( + "Failed policy transition (transitionSucceeded=false) must NOT count as a failover even " + + "when the candidate new record shows a different active URL", + HighAvailabilityGroup.shouldCountFailover(false, aActiveBStandby, aStandbyBActive)); + + // (e) Recovery from no-active back to having an ACTIVE peer with a successful + // transition → COUNT (operationally a real failover-recovery event). + assertTrue( + "Recovery from no-active back to ACTIVE with a successful policy transition " + + "should count as a failover", + HighAvailabilityGroup.shouldCountFailover(true, bothStandby, aStandbyBActive)); + } }