diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/FailoverPhoenixConnection.java b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/FailoverPhoenixConnection.java index 58c79c49e9b..5916ea86c4d 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/FailoverPhoenixConnection.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/FailoverPhoenixConnection.java @@ -17,7 +17,11 @@ */ package org.apache.phoenix.jdbc; +import static org.apache.phoenix.jdbc.HighAvailabilityUtil.isMutationBlockedIOExceptionExistsInThrowable; import static org.apache.phoenix.jdbc.HighAvailabilityUtil.isStaleClusterRoleRecordExceptionExistsInThrowable; +import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HA_FAILOVER_DURATION_MS; +import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HA_MUTATION_BLOCKED_COUNT; +import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HA_STALE_CRR_DETECTED_COUNT; import java.sql.Array; import java.sql.Blob; @@ -171,55 +175,63 @@ void failover(long timeoutMs) throws SQLException { return; } - PhoenixConnection newConn = null; - SQLException cause = null; - final long startTime = EnvironmentEdgeManager.currentTimeMillis(); - while (newConn == null && EnvironmentEdgeManager.currentTimeMillis() < startTime + timeoutMs) { - try { - newConn = - context.getHAGroup().connectActive(context.getProperties(), context.getHAURLInfo()); - } catch (SQLException e) { - cause = e; - LOG.info("Got exception when trying to connect to active cluster.", e); + final long failoverStartMs = EnvironmentEdgeManager.currentTimeMillis(); + try { + PhoenixConnection newConn = null; + SQLException cause = null; + final long startTime = EnvironmentEdgeManager.currentTimeMillis(); + while ( + newConn == null && EnvironmentEdgeManager.currentTimeMillis() < startTime + timeoutMs + ) { try { - Thread.sleep(100); // TODO: be smart than this - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - throw new SQLException("Got interrupted waiting for connection failover", e); + newConn = + context.getHAGroup().connectActive(context.getProperties(), context.getHAURLInfo()); + } catch (SQLException e) { + cause = e; + LOG.info("Got exception when trying to connect to active cluster.", e); + try { + Thread.sleep(100); // TODO: be smart than this + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + throw new SQLException("Got interrupted waiting for connection failover", e); + } } } - } - if (newConn == null) { - throw new FailoverSQLException("Can not failover connection", - context.getHAGroup().getGroupInfo().toString(), cause); - } + if (newConn == null) { + throw new FailoverSQLException("Can not failover connection", + context.getHAGroup().getGroupInfo().toString(), cause); + } - final PhoenixConnection oldConn = connection; - connection = newConn; - if (oldConn != null) { - // aggregate metrics - previousMutationMetrics = oldConn.getMutationMetrics(); - previousReadMetrics = oldConn.getReadMetrics(); - oldConn.clearMetrics(); - - // close old connection - if (!oldConn.isClosed()) { - // TODO: what happens to in-flight edits/mutations? - // Can we copy into the new connection we do not allow this failover? - // MutationState state = oldConn.getMutationState(); - try { - oldConn.close(new SQLExceptionInfo.Builder(SQLExceptionCode.HA_CLOSED_AFTER_FAILOVER) - .setMessage("Phoenix connection got closed due to failover") - .setHaGroupInfo(context.getHAGroup().getGroupInfo().toString()).build() - .buildException()); - } catch (SQLException e) { - LOG.error("Failed to close old connection after failover: {}", e.getMessage()); - LOG.info("Full stack when closing old connection after failover", e); + final PhoenixConnection oldConn = connection; + connection = newConn; + if (oldConn != null) { + // aggregate metrics + previousMutationMetrics = oldConn.getMutationMetrics(); + previousReadMetrics = oldConn.getReadMetrics(); + oldConn.clearMetrics(); + + // close old connection + if (!oldConn.isClosed()) { + // TODO: what happens to in-flight edits/mutations? + // Can we copy into the new connection we do not allow this failover? + // MutationState state = oldConn.getMutationState(); + try { + oldConn.close(new SQLExceptionInfo.Builder(SQLExceptionCode.HA_CLOSED_AFTER_FAILOVER) + .setMessage("Phoenix connection got closed due to failover") + .setHaGroupInfo(context.getHAGroup().getGroupInfo().toString()).build() + .buildException()); + } catch (SQLException e) { + LOG.error("Failed to close old connection after failover: {}", e.getMessage()); + LOG.info("Full stack when closing old connection after failover", e); + } } } + LOG.info("Connection {} failed over to {}", context.getHAGroup().getGroupInfo(), + connection.getURL()); + } finally { + GLOBAL_HA_FAILOVER_DURATION_MS + .update(EnvironmentEdgeManager.currentTimeMillis() - failoverStartMs); } - LOG.info("Connection {} failed over to {}", context.getHAGroup().getGroupInfo(), - connection.getURL()); } /** @@ -324,6 +336,7 @@ T wrapActionDuringFailover(SupplierWithSQLException s) throws SQLExceptio return s.get(); } catch (Exception e) { if (isStaleClusterRoleRecordExceptionExistsInThrowable(e)) { + GLOBAL_HA_STALE_CRR_DETECTED_COUNT.increment(); // If we receive StaleClusterRoleRecordException, that means Operation was // supposed to be executed on Active Cluster but was in reality was sent to // STANDBY Cluster, that can happen only when Failover is in Progress, So we @@ -348,6 +361,9 @@ T wrapActionDuringFailover(SupplierWithSQLException s) throws SQLExceptio context.getHAGroup(), e)) .build().buildException(); } + if (isMutationBlockedIOExceptionExistsInThrowable(e)) { + GLOBAL_HA_MUTATION_BLOCKED_COUNT.increment(); + } if (policy.shouldFailover(e, ++failoverCount)) { failover(timeoutMs); } else { diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HighAvailabilityGroup.java b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HighAvailabilityGroup.java index 6b9728e7970..2239d545a9d 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HighAvailabilityGroup.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HighAvailabilityGroup.java @@ -17,6 +17,9 @@ */ package org.apache.phoenix.jdbc; +import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HA_CRR_CACHE_AGE_MS; +import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HA_CRR_REFRESH_COUNT; +import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HA_FAILOVER_COUNT; import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_CLIENT_CONNECTION_CACHE_MAX_DURATION; import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR; @@ -629,6 +632,7 @@ public void init() throws IOException, SQLException { LOG.info("Initial cluster role for HA group {} is {}", info, roleRecordFromEndpoint); roleRecord = roleRecordFromEndpoint; + lastClusterRoleRecordRefreshTime = System.currentTimeMillis(); state = State.READY; } @@ -644,6 +648,11 @@ public Connection connect(Properties properties, HAURLInfo haurlInfo) throws SQL .setMessage("HA group is not ready!").setHaGroupInfo(info.toString()).build() .buildException(); } + // GAUGE: most-recent-sample of "milliseconds since the last successful CRR refresh". + // Use set(...) to overwrite the prior sample; do NOT increment/update — that would turn + // the gauge into an accumulator and break "current age" semantics. + GLOBAL_HA_CRR_CACHE_AGE_MS.getMetric() + .set(System.currentTimeMillis() - lastClusterRoleRecordRefreshTime); return roleRecord.getPolicy().provide(this, properties, haurlInfo); } @@ -1056,7 +1065,14 @@ public boolean refreshClusterRoleRecord(boolean forceRefresh) throws SQLExceptio } ClusterRoleRecord newRoleRecord = getClusterRoleRecordFromEndpoint(); + // Count only refreshes that actually fetched a CRR from the endpoint. Callers that + // short-circuit on shouldRefreshRoleRecord() above never touch the network and would + // otherwise inflate this counter against its name (a "refresh" with no fetch is a no-op + // from a CRR-state perspective). + GLOBAL_HA_CRR_REFRESH_COUNT.increment(); if (roleRecord == null) { + // First-load init path: no prior cache state to compare against, so this is not a + // failover transition and HA_FAILOVER_COUNT is intentionally NOT incremented here. roleRecord = newRoleRecord; lastClusterRoleRecordRefreshTime = System.currentTimeMillis(); state = State.READY; @@ -1097,8 +1113,10 @@ public boolean refreshClusterRoleRecord(boolean forceRefresh) throws SQLExceptio long maxTransitionTimeMs = StringUtils.isNotEmpty(transitionTimeoutProp) ? Long.parseLong(transitionTimeoutProp) : PHOENIX_HA_TRANSITION_TIMEOUT_MS_DEFAULT; + boolean transitionSucceeded = false; try { future.get(maxTransitionTimeMs, TimeUnit.MILLISECONDS); + transitionSucceeded = true; } catch (InterruptedException ie) { LOG.error("Got interrupted when transiting cluster roles for HA group {}", info, ie); future.cancel(true); @@ -1124,6 +1142,17 @@ public boolean refreshClusterRoleRecord(boolean forceRefresh) throws SQLExceptio // The goal here is to gain higher availability even though existing resources against // previous ACTIVE cluster may have not been closed cleanly. } + // Count the transition as a failover only when the policy-side transition actually + // succeeded AND an active cluster is established or moves between peers. Operator-driven + // transitions to a no-active state (both clusters STANDBY) are not counted as failovers; + // recovery from no-active back to having an ACTIVE peer is counted. Transitions where + // future.get() failed (ExecutionException/TimeoutException) are best-effort fall-through + // per the comment above, but they are NOT counted as successful failovers. Gate decision + // factored into the package-private static {@link #shouldCountFailover} so it can be + // unit-tested directly without driving a full mini-cluster transition. + if (shouldCountFailover(transitionSucceeded, oldRecord, newRoleRecord)) { + GLOBAL_HA_FAILOVER_COUNT.increment(); + } // Update the role record and the last refresh time roleRecord = newRoleRecord; lastClusterRoleRecordRefreshTime = System.currentTimeMillis(); @@ -1147,4 +1176,59 @@ public boolean shouldRefreshRoleRecord() { long cacheAge = System.currentTimeMillis() - lastClusterRoleRecordRefreshTime; return cacheAge >= clusterRoleRecordCacheFrequency; } + + /** + * Returns the current age, in milliseconds, of the last successful ClusterRoleRecord refresh + * (i.e. {@code now - lastClusterRoleRecordRefreshTime}). Intended for periodic sampling by + * external callers (e.g. the CRR poller) so the {@code HA_CRR_CACHE_AGE_MS} counter-backed gauge + * reflects current age without waiting for the next {@link #connect} call. + *

+ * Returns the sentinel value {@code -1L} when no refresh has occurred yet (i.e. + * {@code lastClusterRoleRecordRefreshTime == 0}). This disambiguates "never sampled" from + * "refreshed within the same millisecond" — both of which would otherwise return {@code 0L} and + * be indistinguishable on the gauge. Dashboards and alert rules should treat {@code -1L} as "no + * data yet" and gate threshold checks on {@code value >= 0}. + *

+ * The {@code -1L} sentinel also supersedes a latent bug in the prior {@code connect()}-only + * gauge-publish path: because the CRR poller is scheduled with an initial delay of {@code 0} (see + * {@code GetClusterRoleRecordUtil#schedulePoller}) it can fire its first tick before + * {@link #init} has assigned {@code lastClusterRoleRecordRefreshTime}, in which case the raw + * arithmetic {@code now - 0} would publish a giant value (~{@code currentTimeMillis()}) to the + * gauge and spuriously trip every "age > threshold" alert. Returning {@code -1L} during that race + * window publishes a clean "not yet sampled" marker instead. + *

+ * Do NOT change this back to {@code return 0L}: the connect()-site at line ~654 still uses raw + * arithmetic (it is state-gated and unreachable before {@code init()} seeds the timestamp), so + * the {@code -1L} sentinel only ever surfaces through the poller-tick sample path. + */ + public long getCacheAgeMs() { + if (lastClusterRoleRecordRefreshTime == 0) { + return -1L; + } + return System.currentTimeMillis() - lastClusterRoleRecordRefreshTime; + } + + /** + * Decides whether a CRR transition counts as a "failover" for {@code HA_FAILOVER_COUNT} purposes. + * Returns {@code true} iff: + *

    + *
  1. the policy-side transition (via {@code future.get(timeout)}) actually succeeded — a + * {@code TimeoutException} / {@code ExecutionException} fall-through is NOT a failover, AND
  2. + *
  3. the active-cluster URL actually moved between peers (or recovered from no-active), AND
  4. + *
  5. the new record has an ACTIVE cluster — transitions INTO a no-active state are not + * failovers.
  6. + *
+ * Pure function of its inputs (no global state read, no clock read) so it is straightforward to + * unit-test the gate without driving a full mini-cluster transition. Package-private rather than + * private so {@code HighAvailabilityGroupTest} can call it directly. + * @param transitionSucceeded whether {@code future.get(maxTransitionTimeMs)} returned normally + * @param oldRecord the previous {@link ClusterRoleRecord} + * @param newRecord the candidate new {@link ClusterRoleRecord} + * @return whether to increment {@code HA_FAILOVER_COUNT} for this transition + */ + static boolean shouldCountFailover(boolean transitionSucceeded, ClusterRoleRecord oldRecord, + ClusterRoleRecord newRecord) { + return transitionSucceeded && !oldRecord.getActiveUrl().equals(newRecord.getActiveUrl()) + && newRecord.getActiveUrl().isPresent(); + } } diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HighAvailabilityUtil.java b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HighAvailabilityUtil.java index 1893ad3169c..386529a6a75 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HighAvailabilityUtil.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HighAvailabilityUtil.java @@ -18,6 +18,7 @@ package org.apache.phoenix.jdbc; import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException; +import org.apache.phoenix.exception.MutationBlockedIOException; import org.apache.phoenix.exception.StaleClusterRoleRecordException; /** @@ -45,7 +46,40 @@ public static boolean isStaleClusterRoleRecordExceptionExistsInThrowable(Throwab return false; } + /** + * Walks the throwable chain (including {@link RetriesExhaustedWithDetailsException} causes) to + * detect a {@link MutationBlockedIOException} surface. Mirrors the structure of + * {@link #isStaleClusterRoleRecordExceptionExistsInThrowable(Throwable)} so that batched mutation + * rejections wrapped at varying depth are still attributable to the mutation-block gate. + */ + public static boolean isMutationBlockedIOExceptionExistsInThrowable(Throwable e) { + if (e == null) { + return false; + } + if (isGivenThrowableMutationBlockedException(e)) { + return true; + } + + if (e instanceof RetriesExhaustedWithDetailsException) { + for (Throwable t : ((RetriesExhaustedWithDetailsException) e).getCauses()) { + if (isGivenThrowableMutationBlockedException(t)) { + return true; + } + } + } + + if (e.getCause() != null) { + return isMutationBlockedIOExceptionExistsInThrowable(e.getCause()); + } + + return false; + } + private static boolean isGivenThrowableStaleException(Throwable t) { return t instanceof StaleClusterRoleRecordException; } + + private static boolean isGivenThrowableMutationBlockedException(Throwable t) { + return t instanceof MutationBlockedIOException; + } } diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/GlobalClientMetrics.java b/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/GlobalClientMetrics.java index 89f04f8827d..bbd23bcbc52 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/GlobalClientMetrics.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/GlobalClientMetrics.java @@ -35,6 +35,11 @@ import static org.apache.phoenix.monitoring.MetricType.COUNT_RPC_CALLS; import static org.apache.phoenix.monitoring.MetricType.COUNT_RPC_RETRIES; import static org.apache.phoenix.monitoring.MetricType.COUNT_SCANNED_REGIONS; +import static org.apache.phoenix.monitoring.MetricType.HA_CRR_CACHE_AGE_MS; +import static org.apache.phoenix.monitoring.MetricType.HA_CRR_REFRESH_COUNT; +import static org.apache.phoenix.monitoring.MetricType.HA_FAILOVER_COUNT; +import static org.apache.phoenix.monitoring.MetricType.HA_FAILOVER_DURATION_MS; +import static org.apache.phoenix.monitoring.MetricType.HA_MUTATION_BLOCKED_COUNT; import static org.apache.phoenix.monitoring.MetricType.HA_PARALLEL_CONNECTION_CREATED_COUNTER; import static org.apache.phoenix.monitoring.MetricType.HA_PARALLEL_CONNECTION_ERROR_COUNTER; import static org.apache.phoenix.monitoring.MetricType.HA_PARALLEL_CONNECTION_FALLBACK_COUNTER; @@ -49,6 +54,9 @@ import static org.apache.phoenix.monitoring.MetricType.HA_PARALLEL_POOL2_TASK_QUEUE_WAIT_TIME; import static org.apache.phoenix.monitoring.MetricType.HA_PARALLEL_POOL2_TASK_REJECTED_COUNTER; import static org.apache.phoenix.monitoring.MetricType.HA_PARALLEL_TASK_TIMEOUT_COUNTER; +import static org.apache.phoenix.monitoring.MetricType.HA_POLLER_TICK_COUNT; +import static org.apache.phoenix.monitoring.MetricType.HA_POLLER_TICK_FAILURES; +import static org.apache.phoenix.monitoring.MetricType.HA_STALE_CRR_DETECTED_COUNT; import static org.apache.phoenix.monitoring.MetricType.HCONNECTIONS_COUNTER; import static org.apache.phoenix.monitoring.MetricType.INDEX_COMMIT_FAILURE_SIZE; import static org.apache.phoenix.monitoring.MetricType.MEMORY_CHUNK_BYTES; @@ -163,6 +171,17 @@ public enum GlobalClientMetrics { GLOBAL_HA_PARALLEL_CONNECTION_ERROR_COUNTER(HA_PARALLEL_CONNECTION_ERROR_COUNTER), GLOBAL_HA_PARALLEL_CONNECTION_CREATED_COUNTER(HA_PARALLEL_CONNECTION_CREATED_COUNTER), + GLOBAL_HA_FAILOVER_COUNT(HA_FAILOVER_COUNT), + GLOBAL_HA_FAILOVER_DURATION_MS(HA_FAILOVER_DURATION_MS), + GLOBAL_HA_MUTATION_BLOCKED_COUNT(HA_MUTATION_BLOCKED_COUNT), + GLOBAL_HA_STALE_CRR_DETECTED_COUNT(HA_STALE_CRR_DETECTED_COUNT), + GLOBAL_HA_CRR_REFRESH_COUNT(HA_CRR_REFRESH_COUNT), + // GAUGE: most-recent-sample. Use getMetric().set(ageMs) at the sampling site; + // do NOT increment or update — that would accumulate and break gauge semantics. + GLOBAL_HA_CRR_CACHE_AGE_MS(HA_CRR_CACHE_AGE_MS), + GLOBAL_HA_POLLER_TICK_COUNT(HA_POLLER_TICK_COUNT), + GLOBAL_HA_POLLER_TICK_FAILURES(HA_POLLER_TICK_FAILURES), + GLOBAL_CLIENT_METADATA_CACHE_MISS_COUNTER(CLIENT_METADATA_CACHE_MISS_COUNTER), GLOBAL_CLIENT_METADATA_CACHE_HIT_COUNTER(CLIENT_METADATA_CACHE_HIT_COUNTER), GLOBAL_CLIENT_METADATA_CACHE_EVICTION_COUNTER(CLIENT_METADATA_CACHE_EVICTION_COUNTER), diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/MetricType.java b/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/MetricType.java index ff80705c0d4..22a9ecbdcf3 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/MetricType.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/MetricType.java @@ -353,6 +353,35 @@ public enum MetricType { LogLevel.DEBUG, PLong.INSTANCE), HA_PARALLEL_CONNECTION_CREATED_COUNTER("hpccc", "Counter for the number of parallel phoenix connections that were created", LogLevel.DEBUG, + PLong.INSTANCE), + HA_FAILOVER_COUNT("hafc", + "Counter for cluster-level failover transitions (active URL flips between peers or recovers " + + "from no-active state) recorded at the CRR write site", + LogLevel.DEBUG, PLong.INSTANCE), + HA_FAILOVER_DURATION_MS("hafd", + "Total time in milliseconds spent in connection-level failover transitions, summed across " + + "all observing connections (per-connection observation, not per-cluster-event)", + LogLevel.DEBUG, PLong.INSTANCE), + HA_MUTATION_BLOCKED_COUNT("hambc", + "Counter for MutationBlockedIOException surfaces caught by wrapActionDuringFailover", + LogLevel.DEBUG, PLong.INSTANCE), + HA_STALE_CRR_DETECTED_COUNT("hascd", + "Counter for StaleClusterRoleRecordException surfaces caught by wrapActionDuringFailover", + LogLevel.DEBUG, PLong.INSTANCE), + HA_CRR_REFRESH_COUNT("hcrc", "Counter for refreshClusterRoleRecord invocations", LogLevel.DEBUG, + PLong.INSTANCE), + // GAUGE SEMANTICS: most-recent-sample, NOT an accumulator. Callers MUST use + // getMetric().set(ageMs) to overwrite the previous sample, never increment/update. + // Misuse will produce a monotonically-growing accumulator, which is the wrong shape for an + // "age since last refresh" gauge. + HA_CRR_CACHE_AGE_MS("hccg", + "Gauge: most-recent-sample of milliseconds since the last successful CRR refresh " + + "(use getMetric().set(ageMs))", + LogLevel.DEBUG, PLong.INSTANCE), + HA_POLLER_TICK_COUNT("hptc", "Counter for non-active CRR poller ticks executed", LogLevel.DEBUG, + PLong.INSTANCE), + HA_POLLER_TICK_FAILURES("hptf", + "Counter for non-active CRR poller tick failures (caught SQLException)", LogLevel.DEBUG, PLong.INSTANCE); private final String description; diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/util/GetClusterRoleRecordUtil.java b/phoenix-core-client/src/main/java/org/apache/phoenix/util/GetClusterRoleRecordUtil.java index 5adbedb1692..d76d54fc62c 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/util/GetClusterRoleRecordUtil.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/util/GetClusterRoleRecordUtil.java @@ -42,6 +42,7 @@ import org.apache.phoenix.jdbc.HighAvailabilityGroup; import org.apache.phoenix.jdbc.HighAvailabilityPolicy; import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.monitoring.GlobalClientMetrics; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -216,6 +217,20 @@ private static void schedulePoller(String url1, String url2, String haGroupName, Runnable pollingTask = () -> { // Increment unconditionally so a failed tick still alternates next iteration. long tick = tickCount.getAndIncrement(); + GlobalClientMetrics.GLOBAL_HA_POLLER_TICK_COUNT.increment(); + // Sample current CRR cache age into the gauge each tick. Without this, the + // HA_CRR_CACHE_AGE_MS counter-backed gauge is only updated on connect() and would + // not advance during idle periods between connects, making it look fresher than it + // actually is. Sampling here puts the gauge on a steady wall-clock cadence matching + // the poller's polling interval. + try { + GlobalClientMetrics.GLOBAL_HA_CRR_CACHE_AGE_MS.getMetric().set(haGroup.getCacheAgeMs()); + } catch (Throwable t) { + // Metric sampling is best-effort; never let a metric write break the poller tick. + LOGGER.warn( + "Failed to sample HA_CRR_CACHE_AGE_MS on poller tick for HA group {}; " + "continuing", + haGroupName, t); + } String tickUrl = selectUrlForTick(url1, url2, tick); try { ClusterRoleRecord polledCrr = @@ -242,6 +257,7 @@ private static void schedulePoller(String url1, String url2, String haGroupName, } } } catch (SQLException e) { + GlobalClientMetrics.GLOBAL_HA_POLLER_TICK_FAILURES.increment(); LOGGER.error( "Exception found while polling for ClusterRoleRecord on {} for HA group" + " {}: {}", tickUrl, haGroupName, e.getMessage()); diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java index 25477196950..287cd707160 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java @@ -115,6 +115,7 @@ import org.apache.phoenix.hbase.index.builder.IndexBuilder; import org.apache.phoenix.hbase.index.covered.IndexMetaData; import org.apache.phoenix.hbase.index.covered.update.ColumnReference; +import org.apache.phoenix.hbase.index.metrics.MetricsHaBypassSourceFactory; import org.apache.phoenix.hbase.index.metrics.MetricsIndexerSource; import org.apache.phoenix.hbase.index.metrics.MetricsIndexerSourceFactory; import org.apache.phoenix.hbase.index.table.HTableInterfaceReference; @@ -746,6 +747,24 @@ public void preBatchMutate(ObserverContext c, // Extract HAGroupName from the mutations Optional logGroup = getHAGroupFromBatch(c.getEnvironment(), miniBatchOp); + // Path-coverage counter: increments whenever a mutation batch reaches preBatchMutate + // without a resolvable HA group attribute, so the cluster-role-based mutation-block gate + // has no haGroupName to evaluate against and is skipped. This counts the code path being + // short-circuited — it does NOT imply any safety property was breached (when the block + // feature is disabled or no block window is active, there is no property to breach). + // Tracked globally rather than per-table so operators can compare baseline vs. + // post-deploy delta to spot new write paths that forgot to attach _HAGroupName. + // Intentionally scoped to !logGroup.isPresent() regardless of dataTableName — + // system-HA-group writes WITH a haGroup are an intended gate exemption (state writes + // must proceed during a block window) and are not counted here. + if (!logGroup.isPresent()) { + try { + MetricsHaBypassSourceFactory.getInstance().incrementBypassedMutationBlockCount(); + } catch (Throwable t) { + LOG.warn("Failed to increment bypassed mutation block count metric; continuing", t); + } + } + // We don't want to check for mutation blocking for the system ha group table if (!dataTableName.equals(SYSTEM_HA_GROUP_NAME) && logGroup.isPresent()) { // Check if mutation is blocked for the HA Group diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsHaBypassSource.java b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsHaBypassSource.java new file mode 100644 index 00000000000..276f1f2bb63 --- /dev/null +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsHaBypassSource.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.hbase.index.metrics; + +import org.apache.hadoop.hbase.metrics.BaseSource; + +/** + * Server-side JMX metrics source that counts how many mutation batches pass through + * {@code IndexRegionObserver.preBatchMutate} without a resolvable HA group attribute, so + * the cluster-role-based mutation-block gate has no haGroupName to evaluate against and is skipped + * for that batch. + *

+ * This is a path-coverage detector, not a safety violation alarm. The counter is + * incremented in {@code IndexRegionObserver.preBatchMutate} for every mutation batch whose + * {@code _HAGroupName} attribute cannot be resolved — regardless of whether the cluster-role-based + * mutation-block feature is enabled, regardless of whether a mutation-block window is currently + * active for any HA group, and regardless of whether the table being written to is HA-replicated. + * The "bypass" terminology refers strictly to the gate-evaluation code path being short-circuited; + * it does not imply that any safety property was breached — when the block feature + * is disabled or no block window is active, there is no property to breach in the first place. + *

+ * Intended operator use: + *

    + *
  • Path coverage: baseline rate tells you what fraction of mutations on this + * RegionServer reach the gate without an HA group attribute. A sustained baseline of zero on an + * HA-enabled cluster suggests the gate is effectively dead code on that path; a sustained non-zero + * baseline tells you those write paths exist and need to be either tagged or consciously + * exempted.
  • + *
  • Regression signal: a delta against baseline (especially after a + * deploy that added a new mutation path) is the actionable signal — it indicates a newly introduced + * write path is not tagging mutations with {@code _HAGroupName}.
  • + *
+ * The absolute value alone is not actionable; pair it with deploy markers and the + * mutation-block-enabled config to interpret correctly. + */ +public interface MetricsHaBypassSource extends BaseSource { + + String METRICS_NAME = "HaBypass"; + String METRICS_CONTEXT = "phoenix"; + String METRICS_DESCRIPTION = + "Metrics for cluster-role-based mutation-block gate path-coverage on the RegionServer"; + String METRICS_JMX_CONTEXT = "RegionServer,sub=" + METRICS_NAME; + + String BYPASSED_MUTATION_BLOCK_COUNT = "bypassedMutationBlockCount"; + String BYPASSED_MUTATION_BLOCK_COUNT_DESC = + "Path-coverage counter: number of mutation batches that reached preBatchMutate without a " + + "resolvable _HAGroupName attribute (so the cluster-role-based mutation-block gate had " + + "nothing to evaluate against and was skipped). Counts the code path being skipped, not " + + "a safety breach — when the block feature is disabled or no block window is active, " + + "there is no property to breach. Actionable signal is delta-against-baseline (e.g., a " + + "spike after a deploy introducing a new mutation path), not absolute value"; + + /** + * Increments the gate-skipped-path counter. Called unconditionally from + * {@code IndexRegionObserver.preBatchMutate} whenever the resolved + * {@code Optional} is empty — i.e., the mutation batch carries no + * {@code _HAGroupName} attribute and the cluster-role-based mutation-block gate cannot be + * evaluated for it. Independent of whether the mutation-block feature is enabled or any block + * window is active. + */ + void incrementBypassedMutationBlockCount(); +} diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsHaBypassSourceFactory.java b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsHaBypassSourceFactory.java new file mode 100644 index 00000000000..66e6f3aadd9 --- /dev/null +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsHaBypassSourceFactory.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.hbase.index.metrics; + +/** + * Factory for the per-RegionServer {@link MetricsHaBypassSource} singleton. Unlike the per-haGroup + * factories elsewhere in the codebase, the bypass counter is a single global (per-JVM, + * per-RegionServer) counter, so this factory holds one eagerly-initialized instance rather than a + * {@code ConcurrentHashMap} keyed on haGroupName. + */ +public final class MetricsHaBypassSourceFactory { + + private static final MetricsHaBypassSource INSTANCE = new MetricsHaBypassSourceImpl(); + + private MetricsHaBypassSourceFactory() { + } + + /** + * Returns the process-wide {@link MetricsHaBypassSource} singleton. The instance is initialized + * eagerly at class-load time, so this method is thread-safe without any additional + * synchronization. + * @return the singleton {@link MetricsHaBypassSource} for this RegionServer JVM + */ + public static MetricsHaBypassSource getInstance() { + return INSTANCE; + } +} diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsHaBypassSourceImpl.java b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsHaBypassSourceImpl.java new file mode 100644 index 00000000000..5963b11fae7 --- /dev/null +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsHaBypassSourceImpl.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.hbase.index.metrics; + +import org.apache.hadoop.hbase.metrics.BaseSourceImpl; +import org.apache.hadoop.metrics2.lib.MutableFastCounter; + +/** + * Implementation of {@link MetricsHaBypassSource} backed by a single {@link MutableFastCounter} on + * the RegionServer JMX registry. The counter is per-RegionServer (not keyed by HA group, by design + * — a bypass event has no haGroupName to attribute against). + */ +public class MetricsHaBypassSourceImpl extends BaseSourceImpl implements MetricsHaBypassSource { + + private final MutableFastCounter bypassedMutationBlockCount; + + /** + * Default constructor used by {@link MetricsHaBypassSourceFactory}. Registers the source under + * the standard {@code HaBypass} JMX context shared with operator dashboards. + */ + public MetricsHaBypassSourceImpl() { + this(METRICS_NAME, METRICS_DESCRIPTION, METRICS_CONTEXT, METRICS_JMX_CONTEXT); + } + + /** + * Test-friendly constructor that lets callers override the metrics-registry naming, so unit tests + * can register an isolated source without colliding with the production singleton. + * @param metricsName short name reported to the metrics registry + * @param metricsDescription human-readable description for the registry + * @param metricsContext hadoop-metrics2 context name + * @param metricsJmxContext JMX context (typically {@code "RegionServer,sub=" + metricsName}) + */ + public MetricsHaBypassSourceImpl(String metricsName, String metricsDescription, + String metricsContext, String metricsJmxContext) { + super(metricsName, metricsDescription, metricsContext, metricsJmxContext); + bypassedMutationBlockCount = getMetricsRegistry().newCounter(BYPASSED_MUTATION_BLOCK_COUNT, + BYPASSED_MUTATION_BLOCK_COUNT_DESC, 0L); + } + + @Override + public void incrementBypassedMutationBlockCount() { + bypassedMutationBlockCount.incr(); + } + + /** + * Test-only accessor returning the current counter value. Public so unit/integration tests + * outside this package can assert increments without going through JMX read scaffolding. + */ + public long getBypassedMutationBlockCountForTesting() { + return bypassedMutationBlockCount.value(); + } +} diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/BypassedMutationBlockMetricsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/BypassedMutationBlockMetricsIT.java new file mode 100644 index 00000000000..4db777d2877 --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/BypassedMutationBlockMetricsIT.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.end2end.index; + +import static org.apache.phoenix.query.BaseTest.generateUniqueName; +import static org.apache.phoenix.query.QueryServices.CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED; +import static org.junit.Assert.assertTrue; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.util.Properties; +import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest; +import org.apache.phoenix.hbase.index.metrics.MetricsHaBypassSourceFactory; +import org.apache.phoenix.hbase.index.metrics.MetricsHaBypassSourceImpl; +import org.apache.phoenix.jdbc.HABaseIT; +import org.apache.phoenix.jdbc.PhoenixDriver; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +/** + * Integration test for the server-side {@code bypassedMutationBlockCount} JMX counter. A "bypass" + * is a mutation batch that reaches {@code IndexRegionObserver.preBatchMutate} without an associated + * HA group attribute, causing the cluster-role-based mutation-block gate to be skipped. This IT + * drives a write that does not carry {@code _HAGroupName} and asserts the counter increments. + */ +@Category(NeedsOwnMiniClusterTest.class) +public class BypassedMutationBlockMetricsIT extends HABaseIT { + + @BeforeClass + public static synchronized void doSetup() throws Exception { + CLUSTERS.getHBaseCluster1().getConfiguration() + .setBoolean(CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED, true); + CLUSTERS.getHBaseCluster2().getConfiguration() + .setBoolean(CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED, true); + CLUSTERS.start(); + DriverManager.registerDriver(PhoenixDriver.INSTANCE); + } + + @Test(timeout = 300000) + public void testBypassedMutationBlockCount() throws Exception { + MetricsHaBypassSourceImpl source = + (MetricsHaBypassSourceImpl) MetricsHaBypassSourceFactory.getInstance(); + long before = source.getBypassedMutationBlockCountForTesting(); + + String dataTableName = generateUniqueName(); + String indexName = generateUniqueName(); + + // Connect directly to cluster1 (single-cluster JDBC URL — no _HAGroupName attribute). + // Use the master/RPC address rather than the ZK url so the rpc URL parser doesn't + // choke on the `=` token that ZK urls embed. + Properties props = new Properties(); + try (Connection conn = + DriverManager.getConnection(CLUSTERS.getJdbcUrl(CLUSTERS.getMasterAddress1()), props)) { + conn.createStatement().execute( + "CREATE TABLE " + dataTableName + " (id VARCHAR PRIMARY KEY, name VARCHAR, age INTEGER)"); + conn.createStatement() + .execute("CREATE INDEX " + indexName + " ON " + dataTableName + "(name)"); + conn.createStatement().execute("UPSERT INTO " + dataTableName + " VALUES ('1', 'A', 1)"); + conn.commit(); + } + + long after = source.getBypassedMutationBlockCountForTesting(); + assertTrue( + "bypassedMutationBlockCount should increment for mutations without _HAGroupName attribute", + after > before); + } +} diff --git a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupMetricsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupMetricsIT.java new file mode 100644 index 00000000000..088ccff183a --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupMetricsIT.java @@ -0,0 +1,285 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.jdbc; + +import static org.apache.phoenix.jdbc.HighAvailabilityGroup.PHOENIX_HA_GROUP_ATTR; +import static org.apache.phoenix.jdbc.HighAvailabilityTestingUtility.getHighAvailibilityGroup; +import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HA_CRR_CACHE_AGE_MS; +import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HA_CRR_REFRESH_COUNT; +import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HA_FAILOVER_COUNT; +import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HA_FAILOVER_DURATION_MS; +import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HA_MUTATION_BLOCKED_COUNT; +import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HA_POLLER_TICK_COUNT; +import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HA_POLLER_TICK_FAILURES; +import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HA_STALE_CRR_DETECTED_COUNT; +import static org.apache.phoenix.query.QueryServices.CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.util.Properties; +import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException; +import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest; +import org.apache.phoenix.exception.MutationBlockedIOException; +import org.apache.phoenix.exception.SQLExceptionCode; +import org.apache.phoenix.execute.CommitException; +import org.apache.phoenix.jdbc.ClusterRoleRecord.ClusterRole; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Integration test for the client-side HA observability metrics. Each test resets the relevant + * {@link org.apache.phoenix.monitoring.GlobalClientMetrics} entries before exercising the code path + * that should emit them, then asserts the post-condition counter/gauge value. + */ +@Category(NeedsOwnMiniClusterTest.class) +public class HAGroupMetricsIT extends HABaseIT { + private static final Logger LOG = LoggerFactory.getLogger(HAGroupMetricsIT.class); + + @Rule + public final TestName testName = new TestName(); + + private Properties clientProperties; + private HighAvailabilityGroup haGroup; + private String tableName; + private String haGroupName; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + CLUSTERS.getHBaseCluster1().getConfiguration() + .setBoolean(CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED, true); + CLUSTERS.getHBaseCluster2().getConfiguration() + .setBoolean(CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED, true); + CLUSTERS.start(); + DriverManager.registerDriver(PhoenixDriver.INSTANCE); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + DriverManager.deregisterDriver(PhoenixDriver.INSTANCE); + CLUSTERS.close(); + } + + @Before + public void setUp() throws Exception { + haGroupName = testName.getMethodName(); + clientProperties = HighAvailabilityTestingUtility.getHATestProperties(); + clientProperties.setProperty(PHOENIX_HA_GROUP_ATTR, haGroupName); + CLUSTERS.initClusterRole(haGroupName, HighAvailabilityPolicy.FAILOVER); + haGroup = getHighAvailibilityGroup(CLUSTERS.getJdbcHAUrl(), clientProperties); + tableName = testName.getMethodName().toUpperCase(); + CLUSTERS.createTableOnClusterPair(haGroup, tableName); + resetAllHaMetrics(); + } + + @After + public void tearDown() throws Exception { + try { + haGroup.close(); + } catch (Exception e) { + LOG.error("Fail to tear down HA group; ignoring", e); + } + } + + @Test(timeout = 300000) + public void testFailoverCountAndDuration() throws Exception { + long countBefore = GLOBAL_HA_FAILOVER_COUNT.getMetric().getValue(); + long durationBefore = GLOBAL_HA_FAILOVER_DURATION_MS.getMetric().getValue(); + + try (Connection conn = DriverManager.getConnection(CLUSTERS.getJdbcHAUrl(), clientProperties)) { + conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES (1, 1)"); + conn.commit(); + CLUSTERS.transitClusterRole(haGroup, ClusterRole.STANDBY, ClusterRole.ACTIVE); + } + + long countAfter = GLOBAL_HA_FAILOVER_COUNT.getMetric().getValue(); + long durationAfter = GLOBAL_HA_FAILOVER_DURATION_MS.getMetric().getValue(); + assertTrue("HA_FAILOVER_COUNT should increment on cluster role transition", + countAfter > countBefore); + assertTrue("HA_FAILOVER_DURATION_MS sum should grow on transition", + durationAfter >= durationBefore); + } + + @Test(timeout = 300000) + public void testStaleCrrDetectedCount() throws Exception { + long before = GLOBAL_HA_STALE_CRR_DETECTED_COUNT.getMetric().getValue(); + // Drive a cluster-role transition that the wrapped connection observes as + // stale-CRR — failover() flags STALE_CLUSTER_ROLE_RECORD and increments the counter. + try (Connection conn = DriverManager.getConnection(CLUSTERS.getJdbcHAUrl(), clientProperties)) { + conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES (2, 2)"); + conn.commit(); + CLUSTERS.transitClusterRole(haGroup, ClusterRole.ACTIVE_TO_STANDBY, ClusterRole.STANDBY); + // doRefreshHAGroup=false: keep haGroup's CRR snapshot stale on purpose so that + // the next mutation drives StaleClusterRoleRecordException through wrapActionDuringFailover, + // exercising the GLOBAL_HA_STALE_CRR_DETECTED_COUNT increment path. + CLUSTERS.transitClusterRole(haGroup, ClusterRole.STANDBY, ClusterRole.ACTIVE, false); + // Issue another mutation while the connection still holds the pre-transition CRR snapshot. + // The server's HAGroupStoreManager will detect the version mismatch and throw + // StaleClusterRoleRecordException, which wrapActionDuringFailover catches to increment the + // counter (and rethrow FAILOVER_IN_PROGRESS). + try { + conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES (3, 3)"); + conn.commit(); + } catch (SQLException expected) { + // Stale-CRR surfaces here as a SQLException with FAILOVER_IN_PROGRESS (1990 / F1Q90): + // wrapActionDuringFailover catches StaleClusterRoleRecordException, increments + // GLOBAL_HA_STALE_CRR_DETECTED_COUNT, then rethrows a freshly-built SQLException via + // SQLExceptionInfo.Builder(FAILOVER_IN_PROGRESS).build().buildException(). The error + // code is the contracted surface — any other SQLException must fail this assertion + // rather than be silently swallowed (the prior catch(Exception) hid setup bugs by + // treating ANY failure as the expected path). + assertTrue( + "Expected FAILOVER_IN_PROGRESS error code, got " + expected.getErrorCode() + " (" + + expected.getSQLState() + ")", + expected.getErrorCode() == SQLExceptionCode.FAILOVER_IN_PROGRESS.getErrorCode()); + LOG.info( + "Expected FAILOVER_IN_PROGRESS SQLException surfaced from stale-CRR mutation " + "in {}", + testName.getMethodName(), expected); + } + } + long after = GLOBAL_HA_STALE_CRR_DETECTED_COUNT.getMetric().getValue(); + assertTrue("HA_STALE_CRR_DETECTED_COUNT should strictly increment when CRR is detected stale " + + "(before=" + before + ", after=" + after + ")", after > before); + } + + @Test(timeout = 300000) + public void testMutationBlockedCount() throws Exception { + long before = GLOBAL_HA_MUTATION_BLOCKED_COUNT.getMetric().getValue(); + String peerZkUrl = CLUSTERS.getZkUrl2(); + PhoenixHAAdmin haAdmin = CLUSTERS.getHaAdmin1(); + + try (FailoverPhoenixConnection conn = (FailoverPhoenixConnection) DriverManager + .getConnection(CLUSTERS.getJdbcHAUrl(), clientProperties)) { + conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES (3, 3)"); + conn.commit(); + + HAGroupStoreRecord blocking = + new HAGroupStoreRecord(HAGroupStoreRecord.DEFAULT_PROTOCOL_VERSION, haGroupName, + HAGroupStoreRecord.HAGroupState.ACTIVE_IN_SYNC_TO_STANDBY, 0L, + HighAvailabilityPolicy.FAILOVER.toString(), peerZkUrl, CLUSTERS.getMasterAddress1(), + CLUSTERS.getMasterAddress2(), CLUSTERS.getHdfsUrl1(), CLUSTERS.getHdfsUrl2(), 0L); + haAdmin.updateHAGroupStoreRecordInZooKeeper(haGroupName, blocking, -1); + Thread.sleep(1000L); + + try { + conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES (4, 4)"); + conn.commit(); + fail("Expected MutationBlockedIOException during ACTIVE_TO_STANDBY"); + } catch (CommitException e) { + Throwable cause = e.getCause(); + assertTrue("Expected MutationBlockedIOException in chain", + cause instanceof RetriesExhaustedWithDetailsException + && ((RetriesExhaustedWithDetailsException) cause) + .getCause(0) instanceof MutationBlockedIOException); + } + } + long after = GLOBAL_HA_MUTATION_BLOCKED_COUNT.getMetric().getValue(); + assertTrue("HA_MUTATION_BLOCKED_COUNT should increment when MBE is observed", after > before); + } + + @Test(timeout = 300000) + public void testCrrRefreshCount() throws Exception { + long before = GLOBAL_HA_CRR_REFRESH_COUNT.getMetric().getValue(); + // Force-refresh both calls: HA_CRR_REFRESH_COUNT only increments on a refresh that + // actually fetched a CRR from the endpoint — a non-force refresh inside the cache window + // short-circuits before getClusterRoleRecordFromEndpoint() and intentionally does NOT + // bump the counter (counter measures "fresh fetches", not "refresh calls"). + haGroup.refreshClusterRoleRecord(true); + haGroup.refreshClusterRoleRecord(true); + long after = GLOBAL_HA_CRR_REFRESH_COUNT.getMetric().getValue(); + assertTrue("HA_CRR_REFRESH_COUNT should increment on each force-refresh", after - before >= 2); + } + + @Test(timeout = 300000) + public void testCrrCacheAgeMs() throws Exception { + // Pre-refresh assertion: a connect() against a freshly init()-ed HA group must sample a + // bounded age; if init() forgets to seed lastClusterRoleRecordRefreshTime the gauge will + // record currentTimeMillis() - 0 (~1.7e12 ms) and this assertion will fail. + try (Connection conn = DriverManager.getConnection(CLUSTERS.getJdbcHAUrl(), clientProperties)) { + conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES (4, 4)"); + conn.commit(); + } + long ageBeforeRefresh = GLOBAL_HA_CRR_CACHE_AGE_MS.getMetric().getValue(); + assertTrue("HA_CRR_CACHE_AGE_MS gauge must be bounded on init path (was " + ageBeforeRefresh + + " ms, expected < 60_000)", ageBeforeRefresh < 60_000L); + + // Force a refresh so lastClusterRoleRecordRefreshTime is recent, then sleep so the + // gauge sample (taken on connect()) records a non-trivial age. + haGroup.refreshClusterRoleRecord(false); + Thread.sleep(50L); + try (Connection conn = DriverManager.getConnection(CLUSTERS.getJdbcHAUrl(), clientProperties)) { + conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES (5, 5)"); + conn.commit(); + } + long ageMs = GLOBAL_HA_CRR_CACHE_AGE_MS.getMetric().getValue(); + // Gauge holds the most-recent set() sample; should be > 0 and within a sane bound. + assertTrue("HA_CRR_CACHE_AGE_MS gauge should be > 0 after a connect()", ageMs > 0L); + assertTrue("HA_CRR_CACHE_AGE_MS gauge should be < 5 minutes for a fresh HA group", + ageMs < 5 * 60 * 1000L); + } + + @Test(timeout = 300000) + public void testPollerTickCount() throws Exception { + // The poller starts only when fetchClusterRoleRecord observes both roles non-active under + // FAILOVER policy. Drive that state, await a couple of ticks, and verify the counter moved. + long beforeTicks = GLOBAL_HA_POLLER_TICK_COUNT.getMetric().getValue(); + long beforeFailures = GLOBAL_HA_POLLER_TICK_FAILURES.getMetric().getValue(); + + CLUSTERS.transitClusterRole(haGroup, ClusterRole.ACTIVE_TO_STANDBY, ClusterRole.STANDBY); + CLUSTERS.transitClusterRole(haGroup, ClusterRole.STANDBY, ClusterRole.STANDBY); + + // Allow at least 2 poller ticks at default interval to land. + Thread.sleep(15_000L); + + long afterTicks = GLOBAL_HA_POLLER_TICK_COUNT.getMetric().getValue(); + long afterFailures = GLOBAL_HA_POLLER_TICK_FAILURES.getMetric().getValue(); + assertTrue("HA_POLLER_TICK_COUNT should advance once poller is scheduled", + afterTicks > beforeTicks); + // Failures may or may not occur in mini-cluster; just assert non-decreasing. + assertTrue("HA_POLLER_TICK_FAILURES should be monotonic", afterFailures >= beforeFailures); + // Verify the poller-tick gauge sample site fires: by the time we've slept 15s and seen + // multiple ticks, init() has long-since seeded lastClusterRoleRecordRefreshTime, so the + // gauge holds a real age sample. >= 0 guards against the -1L "never-refreshed" sentinel + // leaking out via a buggy refactor — if the poller-tick .set() path is broken or the + // sentinel escapes the init/poller race window, this assertion catches it. + long ageGauge = GLOBAL_HA_CRR_CACHE_AGE_MS.getMetric().getValue(); + assertTrue("HA_CRR_CACHE_AGE_MS gauge should hold a non-negative age sample after poller ticks " + + "(got " + ageGauge + ")", ageGauge >= 0L); + } + + private void resetAllHaMetrics() { + GLOBAL_HA_FAILOVER_COUNT.getMetric().reset(); + GLOBAL_HA_FAILOVER_DURATION_MS.getMetric().reset(); + GLOBAL_HA_MUTATION_BLOCKED_COUNT.getMetric().reset(); + GLOBAL_HA_STALE_CRR_DETECTED_COUNT.getMetric().reset(); + GLOBAL_HA_CRR_REFRESH_COUNT.getMetric().reset(); + GLOBAL_HA_CRR_CACHE_AGE_MS.getMetric().reset(); + GLOBAL_HA_POLLER_TICK_COUNT.getMetric().reset(); + GLOBAL_HA_POLLER_TICK_FAILURES.getMetric().reset(); + } +} diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/metrics/MetricsHaBypassSourceFactoryTest.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/metrics/MetricsHaBypassSourceFactoryTest.java new file mode 100644 index 00000000000..18110784eab --- /dev/null +++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/metrics/MetricsHaBypassSourceFactoryTest.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.hbase.index.metrics; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertSame; + +import org.junit.Test; + +/** + * Unit tests for {@link MetricsHaBypassSourceFactory}. Asserts the eager-init singleton contract so + * a future refactor that swaps the field for a non-singleton (or a non-thread-safe) shape gets + * caught here rather than in production JMX double-registration failures. + */ +public class MetricsHaBypassSourceFactoryTest { + + @Test + public void testGetInstanceReturnsNonNull() { + assertNotNull("getInstance() must return a non-null singleton", + MetricsHaBypassSourceFactory.getInstance()); + } + + @Test + public void testGetInstanceIsIdempotent() { + MetricsHaBypassSource first = MetricsHaBypassSourceFactory.getInstance(); + MetricsHaBypassSource second = MetricsHaBypassSourceFactory.getInstance(); + assertSame("getInstance() must return the same singleton across calls", first, second); + } +} diff --git a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/HighAvailabilityGroupTest.java b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/HighAvailabilityGroupTest.java index 8f52b5110c1..cc58727838d 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/HighAvailabilityGroupTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/HighAvailabilityGroupTest.java @@ -18,11 +18,14 @@ package org.apache.phoenix.jdbc; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; import java.sql.SQLException; import java.util.Properties; import org.apache.phoenix.exception.SQLExceptionCode; +import org.apache.phoenix.jdbc.ClusterRoleRecord.ClusterRole; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -123,4 +126,63 @@ private void getAndAssertUrlInfo(String url, String additionalParams, String pri private void getAndAssertUrlInfo(String url, String additionalParams) throws Exception { getAndAssertUrlInfo(url, additionalParams, null); } + + /** + * Verifies the gate decision for {@code HA_FAILOVER_COUNT} — exercised directly via the + * package-private {@link HighAvailabilityGroup#shouldCountFailover} helper rather than driving a + * mini-cluster transition. Together these cases pin down that the gate (a) counts a real ACTIVE + * URL move, (b) does NOT count a no-op (same active URL), (c) does NOT count a transition INTO a + * no-active state, and (d) does NOT count a transition where the policy callback failed + * ({@code transitionSucceeded == false}, i.e. {@code future.get()} threw {@code TimeoutException} + * / {@code ExecutionException}). The (d) negative-path assertion is the regression guard: someone + * removing the {@code transitionSucceeded &&} clause from + * {@code HighAvailabilityGroup#shouldCountFailover} would silently start counting failed + * transitions as successful failovers, and this test would fail. + */ + @Test + public void testShouldCountFailoverGate() { + String haGroupName = "testShouldCountFailoverGate"; + String url1 = "host1\\:60010"; + String url2 = "host2\\:60010"; + + ClusterRoleRecord aActiveBStandby = new ClusterRoleRecord(haGroupName, + HighAvailabilityPolicy.FAILOVER, url1, ClusterRole.ACTIVE, url2, ClusterRole.STANDBY, 1L); + ClusterRoleRecord aStandbyBActive = new ClusterRoleRecord(haGroupName, + HighAvailabilityPolicy.FAILOVER, url1, ClusterRole.STANDBY, url2, ClusterRole.ACTIVE, 2L); + ClusterRoleRecord bothStandby = new ClusterRoleRecord(haGroupName, + HighAvailabilityPolicy.FAILOVER, url1, ClusterRole.STANDBY, url2, ClusterRole.STANDBY, 3L); + + // (a) Real active-URL move with a successful policy transition → COUNT. + assertTrue( + "ACTIVE moving from cluster 1 to cluster 2 with successful policy transition " + + "should count as a failover", + HighAvailabilityGroup.shouldCountFailover(true, aActiveBStandby, aStandbyBActive)); + + // (b) Same active URL (no-op transition) → DO NOT COUNT, even if policy succeeded. + assertFalse( + "Same active URL on both sides should NOT count as a failover even with a " + + "successful policy transition", + HighAvailabilityGroup.shouldCountFailover(true, aActiveBStandby, aActiveBStandby)); + + // (c) Transition INTO no-active state (both STANDBY) → DO NOT COUNT. + assertFalse("Transition into a no-active (both STANDBY) state should NOT count as a failover", + HighAvailabilityGroup.shouldCountFailover(true, aActiveBStandby, bothStandby)); + + // (d) NEGATIVE PATH — policy callback failed (transitionSucceeded=false, simulating + // future.get() throwing TimeoutException or ExecutionException) → DO NOT COUNT, even if + // the active URL appears to have moved. This is the regression guard for the + // {@code transitionSucceeded &&} clause; removing it would silently inflate + // HA_FAILOVER_COUNT on failed transitions. + assertFalse( + "Failed policy transition (transitionSucceeded=false) must NOT count as a failover even " + + "when the candidate new record shows a different active URL", + HighAvailabilityGroup.shouldCountFailover(false, aActiveBStandby, aStandbyBActive)); + + // (e) Recovery from no-active back to having an ACTIVE peer with a successful + // transition → COUNT (operationally a real failover-recovery event). + assertTrue( + "Recovery from no-active back to ACTIVE with a successful policy transition " + + "should count as a failover", + HighAvailabilityGroup.shouldCountFailover(true, bothStandby, aStandbyBActive)); + } } diff --git a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/HighAvailabilityUtilTest.java b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/HighAvailabilityUtilTest.java new file mode 100644 index 00000000000..0f28104a2c7 --- /dev/null +++ b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/HighAvailabilityUtilTest.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.jdbc; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException; +import org.apache.hadoop.hbase.client.Row; +import org.apache.phoenix.exception.MutationBlockedIOException; +import org.junit.Test; + +/** + * Unit tests for {@link HighAvailabilityUtil#isMutationBlockedIOExceptionExistsInThrowable}. Walks + * the four detection branches (null guard, direct match, RetriesExhaustedWithDetailsException + * causes, recursive {@code getCause()}) since the helper is invoked from commit-path error handling + * where regression of any branch would silently mis-attribute mutation-block rejections. + */ +public class HighAvailabilityUtilTest { + + @Test + public void testNullThrowableReturnsFalse() { + assertFalse("null Throwable must short-circuit to false", + HighAvailabilityUtil.isMutationBlockedIOExceptionExistsInThrowable(null)); + } + + @Test + public void testDirectMutationBlockedIOExceptionMatches() { + Throwable e = new MutationBlockedIOException("blocked"); + assertTrue("direct MutationBlockedIOException must match", + HighAvailabilityUtil.isMutationBlockedIOExceptionExistsInThrowable(e)); + } + + @Test + public void testRetriesExhaustedCausesAreScanned() { + MutationBlockedIOException blocked = new MutationBlockedIOException("blocked-in-batch"); + IOException unrelated = new IOException("unrelated"); + RetriesExhaustedWithDetailsException rewde = + new RetriesExhaustedWithDetailsException(Arrays. asList(unrelated, blocked), + Collections. emptyList(), Collections. emptyList()); + assertTrue("RetriesExhaustedWithDetailsException causes must be scanned for MBIOE", + HighAvailabilityUtil.isMutationBlockedIOExceptionExistsInThrowable(rewde)); + } + + @Test + public void testRecursiveGetCauseFindsNestedMutationBlocked() { + MutationBlockedIOException root = new MutationBlockedIOException("root-cause"); + Throwable mid = new RuntimeException("mid", root); + Throwable outer = new RuntimeException("outer", mid); + assertTrue("recursive getCause() walk must surface a deeply-nested MBIOE", + HighAvailabilityUtil.isMutationBlockedIOExceptionExistsInThrowable(outer)); + } +}