Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@
*/
package org.apache.phoenix.jdbc;

import static org.apache.phoenix.jdbc.HighAvailabilityUtil.isMutationBlockedIOExceptionExistsInThrowable;
import static org.apache.phoenix.jdbc.HighAvailabilityUtil.isStaleClusterRoleRecordExceptionExistsInThrowable;
import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HA_FAILOVER_DURATION_MS;
import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HA_MUTATION_BLOCKED_COUNT;
import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HA_STALE_CRR_DETECTED_COUNT;

import java.sql.Array;
import java.sql.Blob;
Expand Down Expand Up @@ -171,55 +175,63 @@ void failover(long timeoutMs) throws SQLException {
return;
}

PhoenixConnection newConn = null;
SQLException cause = null;
final long startTime = EnvironmentEdgeManager.currentTimeMillis();
while (newConn == null && EnvironmentEdgeManager.currentTimeMillis() < startTime + timeoutMs) {
try {
newConn =
context.getHAGroup().connectActive(context.getProperties(), context.getHAURLInfo());
} catch (SQLException e) {
cause = e;
LOG.info("Got exception when trying to connect to active cluster.", e);
final long failoverStartMs = EnvironmentEdgeManager.currentTimeMillis();
try {
PhoenixConnection newConn = null;
SQLException cause = null;
final long startTime = EnvironmentEdgeManager.currentTimeMillis();
while (
newConn == null && EnvironmentEdgeManager.currentTimeMillis() < startTime + timeoutMs
) {
try {
Thread.sleep(100); // TODO: be smart than this
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new SQLException("Got interrupted waiting for connection failover", e);
newConn =
context.getHAGroup().connectActive(context.getProperties(), context.getHAURLInfo());
} catch (SQLException e) {
cause = e;
LOG.info("Got exception when trying to connect to active cluster.", e);
try {
Thread.sleep(100); // TODO: be smart than this
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new SQLException("Got interrupted waiting for connection failover", e);
}
}
}
}
if (newConn == null) {
throw new FailoverSQLException("Can not failover connection",
context.getHAGroup().getGroupInfo().toString(), cause);
}
if (newConn == null) {
throw new FailoverSQLException("Can not failover connection",
context.getHAGroup().getGroupInfo().toString(), cause);
}

final PhoenixConnection oldConn = connection;
connection = newConn;
if (oldConn != null) {
// aggregate metrics
previousMutationMetrics = oldConn.getMutationMetrics();
previousReadMetrics = oldConn.getReadMetrics();
oldConn.clearMetrics();

// close old connection
if (!oldConn.isClosed()) {
// TODO: what happens to in-flight edits/mutations?
// Can we copy into the new connection we do not allow this failover?
// MutationState state = oldConn.getMutationState();
try {
oldConn.close(new SQLExceptionInfo.Builder(SQLExceptionCode.HA_CLOSED_AFTER_FAILOVER)
.setMessage("Phoenix connection got closed due to failover")
.setHaGroupInfo(context.getHAGroup().getGroupInfo().toString()).build()
.buildException());
} catch (SQLException e) {
LOG.error("Failed to close old connection after failover: {}", e.getMessage());
LOG.info("Full stack when closing old connection after failover", e);
final PhoenixConnection oldConn = connection;
connection = newConn;
if (oldConn != null) {
// aggregate metrics
previousMutationMetrics = oldConn.getMutationMetrics();
previousReadMetrics = oldConn.getReadMetrics();
oldConn.clearMetrics();

// close old connection
if (!oldConn.isClosed()) {
// TODO: what happens to in-flight edits/mutations?
// Can we copy into the new connection we do not allow this failover?
// MutationState state = oldConn.getMutationState();
try {
oldConn.close(new SQLExceptionInfo.Builder(SQLExceptionCode.HA_CLOSED_AFTER_FAILOVER)
.setMessage("Phoenix connection got closed due to failover")
.setHaGroupInfo(context.getHAGroup().getGroupInfo().toString()).build()
.buildException());
} catch (SQLException e) {
LOG.error("Failed to close old connection after failover: {}", e.getMessage());
LOG.info("Full stack when closing old connection after failover", e);
}
}
}
LOG.info("Connection {} failed over to {}", context.getHAGroup().getGroupInfo(),
connection.getURL());
} finally {
GLOBAL_HA_FAILOVER_DURATION_MS
.update(EnvironmentEdgeManager.currentTimeMillis() - failoverStartMs);
}
LOG.info("Connection {} failed over to {}", context.getHAGroup().getGroupInfo(),
connection.getURL());
}

/**
Expand Down Expand Up @@ -324,6 +336,7 @@ <T> T wrapActionDuringFailover(SupplierWithSQLException<T> s) throws SQLExceptio
return s.get();
} catch (Exception e) {
if (isStaleClusterRoleRecordExceptionExistsInThrowable(e)) {
GLOBAL_HA_STALE_CRR_DETECTED_COUNT.increment();
// If we receive StaleClusterRoleRecordException, that means Operation was
// supposed to be executed on Active Cluster but was in reality was sent to
// STANDBY Cluster, that can happen only when Failover is in Progress, So we
Expand All @@ -348,6 +361,9 @@ <T> T wrapActionDuringFailover(SupplierWithSQLException<T> s) throws SQLExceptio
context.getHAGroup(), e))
.build().buildException();
}
if (isMutationBlockedIOExceptionExistsInThrowable(e)) {
GLOBAL_HA_MUTATION_BLOCKED_COUNT.increment();
}
if (policy.shouldFailover(e, ++failoverCount)) {
failover(timeoutMs);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
*/
package org.apache.phoenix.jdbc;

import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HA_CRR_CACHE_AGE_MS;
import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HA_CRR_REFRESH_COUNT;
import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HA_FAILOVER_COUNT;
import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_CLIENT_CONNECTION_CACHE_MAX_DURATION;
import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR;

Expand Down Expand Up @@ -629,6 +632,7 @@ public void init() throws IOException, SQLException {

LOG.info("Initial cluster role for HA group {} is {}", info, roleRecordFromEndpoint);
roleRecord = roleRecordFromEndpoint;
lastClusterRoleRecordRefreshTime = System.currentTimeMillis();
state = State.READY;
}

Expand All @@ -644,6 +648,11 @@ public Connection connect(Properties properties, HAURLInfo haurlInfo) throws SQL
.setMessage("HA group is not ready!").setHaGroupInfo(info.toString()).build()
.buildException();
}
// GAUGE: most-recent-sample of "milliseconds since the last successful CRR refresh".
// Use set(...) to overwrite the prior sample; do NOT increment/update — that would turn
// the gauge into an accumulator and break "current age" semantics.
GLOBAL_HA_CRR_CACHE_AGE_MS.getMetric()
.set(System.currentTimeMillis() - lastClusterRoleRecordRefreshTime);
return roleRecord.getPolicy().provide(this, properties, haurlInfo);
}

Expand Down Expand Up @@ -1056,7 +1065,14 @@ public boolean refreshClusterRoleRecord(boolean forceRefresh) throws SQLExceptio
}

ClusterRoleRecord newRoleRecord = getClusterRoleRecordFromEndpoint();
// Count only refreshes that actually fetched a CRR from the endpoint. Callers that
// short-circuit on shouldRefreshRoleRecord() above never touch the network and would
// otherwise inflate this counter against its name (a "refresh" with no fetch is a no-op
// from a CRR-state perspective).
GLOBAL_HA_CRR_REFRESH_COUNT.increment();
if (roleRecord == null) {
// First-load init path: no prior cache state to compare against, so this is not a
// failover transition and HA_FAILOVER_COUNT is intentionally NOT incremented here.
roleRecord = newRoleRecord;
lastClusterRoleRecordRefreshTime = System.currentTimeMillis();
state = State.READY;
Expand Down Expand Up @@ -1097,8 +1113,10 @@ public boolean refreshClusterRoleRecord(boolean forceRefresh) throws SQLExceptio
long maxTransitionTimeMs = StringUtils.isNotEmpty(transitionTimeoutProp)
? Long.parseLong(transitionTimeoutProp)
: PHOENIX_HA_TRANSITION_TIMEOUT_MS_DEFAULT;
boolean transitionSucceeded = false;
try {
future.get(maxTransitionTimeMs, TimeUnit.MILLISECONDS);
transitionSucceeded = true;
} catch (InterruptedException ie) {
LOG.error("Got interrupted when transiting cluster roles for HA group {}", info, ie);
future.cancel(true);
Expand All @@ -1124,6 +1142,17 @@ public boolean refreshClusterRoleRecord(boolean forceRefresh) throws SQLExceptio
// The goal here is to gain higher availability even though existing resources against
// previous ACTIVE cluster may have not been closed cleanly.
}
// Count the transition as a failover only when the policy-side transition actually
// succeeded AND an active cluster is established or moves between peers. Operator-driven
// transitions to a no-active state (both clusters STANDBY) are not counted as failovers;
// recovery from no-active back to having an ACTIVE peer is counted. Transitions where
// future.get() failed (ExecutionException/TimeoutException) are best-effort fall-through
// per the comment above, but they are NOT counted as successful failovers. Gate decision
// factored into the package-private static {@link #shouldCountFailover} so it can be
// unit-tested directly without driving a full mini-cluster transition.
if (shouldCountFailover(transitionSucceeded, oldRecord, newRoleRecord)) {
GLOBAL_HA_FAILOVER_COUNT.increment();
}
// Update the role record and the last refresh time
roleRecord = newRoleRecord;
lastClusterRoleRecordRefreshTime = System.currentTimeMillis();
Expand All @@ -1147,4 +1176,59 @@ public boolean shouldRefreshRoleRecord() {
long cacheAge = System.currentTimeMillis() - lastClusterRoleRecordRefreshTime;
return cacheAge >= clusterRoleRecordCacheFrequency;
}

/**
* Returns the current age, in milliseconds, of the last successful ClusterRoleRecord refresh
* (i.e. {@code now - lastClusterRoleRecordRefreshTime}). Intended for periodic sampling by
* external callers (e.g. the CRR poller) so the {@code HA_CRR_CACHE_AGE_MS} counter-backed gauge
* reflects current age without waiting for the next {@link #connect} call.
* <p>
* Returns the sentinel value <strong>{@code -1L} when no refresh has occurred yet</strong> (i.e.
* {@code lastClusterRoleRecordRefreshTime == 0}). This disambiguates "never sampled" from
* "refreshed within the same millisecond" — both of which would otherwise return {@code 0L} and
* be indistinguishable on the gauge. Dashboards and alert rules should treat {@code -1L} as "no
* data yet" and gate threshold checks on {@code value >= 0}.
* <p>
* The {@code -1L} sentinel also supersedes a latent bug in the prior {@code connect()}-only
* gauge-publish path: because the CRR poller is scheduled with an initial delay of {@code 0} (see
* {@code GetClusterRoleRecordUtil#schedulePoller}) it can fire its first tick before
* {@link #init} has assigned {@code lastClusterRoleRecordRefreshTime}, in which case the raw
* arithmetic {@code now - 0} would publish a giant value (~{@code currentTimeMillis()}) to the
* gauge and spuriously trip every "age > threshold" alert. Returning {@code -1L} during that race
* window publishes a clean "not yet sampled" marker instead.
* <p>
* Do NOT change this back to {@code return 0L}: the connect()-site at line ~654 still uses raw
* arithmetic (it is state-gated and unreachable before {@code init()} seeds the timestamp), so
* the {@code -1L} sentinel only ever surfaces through the poller-tick sample path.
*/
public long getCacheAgeMs() {
if (lastClusterRoleRecordRefreshTime == 0) {
return -1L;
}
return System.currentTimeMillis() - lastClusterRoleRecordRefreshTime;
}

/**
* Decides whether a CRR transition counts as a "failover" for {@code HA_FAILOVER_COUNT} purposes.
* Returns {@code true} iff:
* <ol>
* <li>the policy-side transition (via {@code future.get(timeout)}) actually succeeded — a
* {@code TimeoutException} / {@code ExecutionException} fall-through is NOT a failover, AND</li>
* <li>the active-cluster URL actually moved between peers (or recovered from no-active), AND</li>
* <li>the new record has an ACTIVE cluster — transitions INTO a no-active state are not
* failovers.</li>
* </ol>
* Pure function of its inputs (no global state read, no clock read) so it is straightforward to
* unit-test the gate without driving a full mini-cluster transition. Package-private rather than
* private so {@code HighAvailabilityGroupTest} can call it directly.
* @param transitionSucceeded whether {@code future.get(maxTransitionTimeMs)} returned normally
* @param oldRecord the previous {@link ClusterRoleRecord}
* @param newRecord the candidate new {@link ClusterRoleRecord}
* @return whether to increment {@code HA_FAILOVER_COUNT} for this transition
*/
static boolean shouldCountFailover(boolean transitionSucceeded, ClusterRoleRecord oldRecord,
ClusterRoleRecord newRecord) {
return transitionSucceeded && !oldRecord.getActiveUrl().equals(newRecord.getActiveUrl())
&& newRecord.getActiveUrl().isPresent();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.phoenix.jdbc;

import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
import org.apache.phoenix.exception.MutationBlockedIOException;
import org.apache.phoenix.exception.StaleClusterRoleRecordException;

/**
Expand Down Expand Up @@ -45,7 +46,40 @@ public static boolean isStaleClusterRoleRecordExceptionExistsInThrowable(Throwab
return false;
}

/**
* Walks the throwable chain (including {@link RetriesExhaustedWithDetailsException} causes) to
* detect a {@link MutationBlockedIOException} surface. Mirrors the structure of
* {@link #isStaleClusterRoleRecordExceptionExistsInThrowable(Throwable)} so that batched mutation
* rejections wrapped at varying depth are still attributable to the mutation-block gate.
*/
public static boolean isMutationBlockedIOExceptionExistsInThrowable(Throwable e) {
if (e == null) {
return false;
}
if (isGivenThrowableMutationBlockedException(e)) {
return true;
}

if (e instanceof RetriesExhaustedWithDetailsException) {
for (Throwable t : ((RetriesExhaustedWithDetailsException) e).getCauses()) {
if (isGivenThrowableMutationBlockedException(t)) {
return true;
}
}
}

if (e.getCause() != null) {
return isMutationBlockedIOExceptionExistsInThrowable(e.getCause());
}

return false;
}

private static boolean isGivenThrowableStaleException(Throwable t) {
return t instanceof StaleClusterRoleRecordException;
}

private static boolean isGivenThrowableMutationBlockedException(Throwable t) {
return t instanceof MutationBlockedIOException;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@
import static org.apache.phoenix.monitoring.MetricType.COUNT_RPC_CALLS;
import static org.apache.phoenix.monitoring.MetricType.COUNT_RPC_RETRIES;
import static org.apache.phoenix.monitoring.MetricType.COUNT_SCANNED_REGIONS;
import static org.apache.phoenix.monitoring.MetricType.HA_CRR_CACHE_AGE_MS;
import static org.apache.phoenix.monitoring.MetricType.HA_CRR_REFRESH_COUNT;
import static org.apache.phoenix.monitoring.MetricType.HA_FAILOVER_COUNT;
import static org.apache.phoenix.monitoring.MetricType.HA_FAILOVER_DURATION_MS;
import static org.apache.phoenix.monitoring.MetricType.HA_MUTATION_BLOCKED_COUNT;
import static org.apache.phoenix.monitoring.MetricType.HA_PARALLEL_CONNECTION_CREATED_COUNTER;
import static org.apache.phoenix.monitoring.MetricType.HA_PARALLEL_CONNECTION_ERROR_COUNTER;
import static org.apache.phoenix.monitoring.MetricType.HA_PARALLEL_CONNECTION_FALLBACK_COUNTER;
Expand All @@ -49,6 +54,9 @@
import static org.apache.phoenix.monitoring.MetricType.HA_PARALLEL_POOL2_TASK_QUEUE_WAIT_TIME;
import static org.apache.phoenix.monitoring.MetricType.HA_PARALLEL_POOL2_TASK_REJECTED_COUNTER;
import static org.apache.phoenix.monitoring.MetricType.HA_PARALLEL_TASK_TIMEOUT_COUNTER;
import static org.apache.phoenix.monitoring.MetricType.HA_POLLER_TICK_COUNT;
import static org.apache.phoenix.monitoring.MetricType.HA_POLLER_TICK_FAILURES;
import static org.apache.phoenix.monitoring.MetricType.HA_STALE_CRR_DETECTED_COUNT;
import static org.apache.phoenix.monitoring.MetricType.HCONNECTIONS_COUNTER;
import static org.apache.phoenix.monitoring.MetricType.INDEX_COMMIT_FAILURE_SIZE;
import static org.apache.phoenix.monitoring.MetricType.MEMORY_CHUNK_BYTES;
Expand Down Expand Up @@ -163,6 +171,17 @@ public enum GlobalClientMetrics {
GLOBAL_HA_PARALLEL_CONNECTION_ERROR_COUNTER(HA_PARALLEL_CONNECTION_ERROR_COUNTER),
GLOBAL_HA_PARALLEL_CONNECTION_CREATED_COUNTER(HA_PARALLEL_CONNECTION_CREATED_COUNTER),

GLOBAL_HA_FAILOVER_COUNT(HA_FAILOVER_COUNT),
GLOBAL_HA_FAILOVER_DURATION_MS(HA_FAILOVER_DURATION_MS),
GLOBAL_HA_MUTATION_BLOCKED_COUNT(HA_MUTATION_BLOCKED_COUNT),
GLOBAL_HA_STALE_CRR_DETECTED_COUNT(HA_STALE_CRR_DETECTED_COUNT),
GLOBAL_HA_CRR_REFRESH_COUNT(HA_CRR_REFRESH_COUNT),
// GAUGE: most-recent-sample. Use getMetric().set(ageMs) at the sampling site;
// do NOT increment or update — that would accumulate and break gauge semantics.
GLOBAL_HA_CRR_CACHE_AGE_MS(HA_CRR_CACHE_AGE_MS),
GLOBAL_HA_POLLER_TICK_COUNT(HA_POLLER_TICK_COUNT),
GLOBAL_HA_POLLER_TICK_FAILURES(HA_POLLER_TICK_FAILURES),

GLOBAL_CLIENT_METADATA_CACHE_MISS_COUNTER(CLIENT_METADATA_CACHE_MISS_COUNTER),
GLOBAL_CLIENT_METADATA_CACHE_HIT_COUNTER(CLIENT_METADATA_CACHE_HIT_COUNTER),
GLOBAL_CLIENT_METADATA_CACHE_EVICTION_COUNTER(CLIENT_METADATA_CACHE_EVICTION_COUNTER),
Expand Down
Loading