From 717b765481f7f5e27ba7c7d35791761ce71d480e Mon Sep 17 00:00:00 2001 From: Ritesh Garg Date: Thu, 25 Jun 2026 13:49:02 -0700 Subject: [PATCH] PHOENIX-7562 HAGroupStore peer cache: fail-closed replay on peer loss Extract peer-connection handling from HAGroupStoreClient into a dedicated PeerClusterWatcher: peer cache lifecycle, background retry when peer ZK is unreachable, connection-state handling, de-duplicated delivery with one forced redelivery after reconnect, and a visible/blind state machine. While this RegionServer is STANDBY and cannot see the peer, present an effective local DEGRADED_STANDBY so replication replay fails closed. The overlay is in-memory only; the shared HA record is never modified. Replay consumes the effective HA state rather than peer-connectivity details, and peer reconcile runs off Curator event threads. Add phoenix.ha.group.store.peer.cache.retry.interval.seconds (default 60s) with retry jitter and rate-limited, reason-tagged logging. Co-authored-by: Cursor --- .../phoenix/jdbc/HAGroupStoreCacheUtil.java | 98 ++++ .../phoenix/jdbc/HAGroupStoreClient.java | 441 +++++++-------- .../phoenix/jdbc/HAGroupStoreManager.java | 26 + .../phoenix/jdbc/HAGroupStoreRecord.java | 18 + .../phoenix/jdbc/PeerClusterWatcher.java | 410 ++++++++++++++ .../apache/phoenix/query/QueryServices.java | 12 + .../phoenix/query/QueryServicesOptions.java | 5 + .../replication/ReplicationLogGroup.java | 3 +- .../reader/ReplicationLogDiscoveryReplay.java | 8 +- .../phoenix/jdbc/HAGroupStoreClientIT.java | 521 +++++++++++++++++- .../ReplicationLogDiscoveryReplayTestIT.java | 252 ++++++++- .../phoenix/jdbc/HAGroupStoreRecordTest.java | 24 + .../phoenix/jdbc/PeerClusterWatcherTest.java | 158 ++++++ 13 files changed, 1743 insertions(+), 233 deletions(-) create mode 100644 phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreCacheUtil.java create mode 100644 phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PeerClusterWatcher.java create mode 100644 phoenix-core/src/test/java/org/apache/phoenix/jdbc/PeerClusterWatcherTest.java diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreCacheUtil.java b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreCacheUtil.java new file mode 100644 index 00000000000..b6cedf6f687 --- /dev/null +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreCacheUtil.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.jdbc; + +import java.io.IOException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.recipes.cache.ChildData; +import org.apache.curator.framework.recipes.cache.PathChildrenCache; +import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; +import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; +import org.apache.curator.utils.ZKPaths; +import org.apache.zookeeper.data.Stat; + +/** + * Helpers shared by the local and peer {@link PathChildrenCache}s backing + * {@link HAGroupStoreClient} and {@link PeerClusterWatcher}: parsing node data into a record and + * building a started cache. + */ +final class HAGroupStoreCacheUtil { + + private HAGroupStoreCacheUtil() { + } + + /** Parse a node's data into (record, stat); (null, null) when absent or unparseable. */ + static Pair recordAndStat(ChildData childData) { + if (childData == null) { + return Pair.of(null, null); + } + return Pair.of(HAGroupStoreRecord.fromJson(childData.getData()).orElse(null), + childData.getStat()); + } + + /** Read the current (record, stat) for {@code path} from {@code cache}. */ + static Pair recordAndStatAt(PathChildrenCache cache, String path) { + return cache == null ? Pair.of(null, null) : recordAndStat(cache.getCurrentData(path)); + } + + /** + * Build and start a cache, waiting up to {@code timeoutMs} for the initial load. The supplied + * {@code listener} receives every cache event; this method releases its initial-load latch on the + * {@code INITIALIZED} event in a {@code finally}, after the listener returns, so a listener that + * throws while handling {@code INITIALIZED} cannot strand startup. Returns the started cache, or + * null (closed) if it did not initialize within {@code timeoutMs}. + */ + static PathChildrenCache startCache(CuratorFramework curator, PathChildrenCacheListener listener, + long timeoutMs) throws Exception { + PathChildrenCache cache = new PathChildrenCache(curator, ZKPaths.PATH_SEPARATOR, true); + try { + CountDownLatch initialized = new CountDownLatch(1); + cache.getListenable().addListener((c, e) -> { + // Always release the latch on INITIALIZED, even if the caller listener throws while + // handling it; otherwise startCache would time out and return null for a cache that + // actually initialized. + try { + listener.childEvent(c, e); + } finally { + if (e.getType() == PathChildrenCacheEvent.Type.INITIALIZED) { + initialized.countDown(); + } + } + }); + cache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT); + if (initialized.await(timeoutMs, TimeUnit.MILLISECONDS)) { + return cache; + } + cache.close(); + return null; + } catch (Exception e) { + try { + cache.close(); + } catch (IOException ignore) { + // best effort + } + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } + throw e; + } + } +} diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreClient.java b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreClient.java index edbc5303759..ab267026224 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreClient.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreClient.java @@ -59,7 +59,6 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArraySet; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executors; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledExecutorService; @@ -110,7 +109,6 @@ public class HAGroupStoreClient implements Closeable { // Exclusive upper bound for initial-delay jitter on the periodic reconciler (0..30s). private static final long LEGACY_CRR_SYNC_JOB_MAX_JITTER_SECONDS = 31; private PhoenixHAAdmin phoenixHaAdmin; - private PhoenixHAAdmin peerPhoenixHaAdmin; // Admin + NodeCache on /phoenix/ha; null when feature disabled. private volatile PhoenixHAAdmin legacyHaAdmin; private volatile NodeCache legacyCrrNodeCache; @@ -122,23 +120,30 @@ public class HAGroupStoreClient implements Closeable { private final String haGroupName; // PathChildrenCache for current cluster and HAGroupName private PathChildrenCache pathChildrenCache = null; - // PathChildrenCache for peer cluster and HAGroupName - private PathChildrenCache peerPathChildrenCache = null; + // Watches the peer cluster's HA record; owns the peer cache/admin, retry, and visibility. + private final PeerClusterWatcher peerWatcher; + // True while this RegionServer presents a local-only DEGRADED_STANDBY. The shared HA record is + // not changed; this only affects the effective local view and de-dupes the synthetic + // degrade/recover notifications. + private volatile boolean localDegradedStandbyActive = false; // Whether the client is healthy private volatile boolean isHealthy = false; // Configuration private final Configuration conf; // ZK URL for the current cluster and HAGroupName private String zkUrl; - // Peer Custom Event Listener - private final PathChildrenCacheListener peerCustomPathChildrenCacheListener; // Wait time for sync mode private final long waitTimeForSyncModeInMs; // Rotation time for a log private final long rotationTimeMs; // State tracking for transition detection private volatile HAGroupState lastKnownLocalState; + // Last peer state delivered to subscribers; used only to populate the "from" state in PEER + // notifications. Written from the peer watcher's single delivery thread. private volatile HAGroupState lastKnownPeerState; + // Guards the local-only DEGRADED_STANDBY flag (peer blind/visible can fire from different + // threads); subscriber notification runs outside this lock. + private final Object localDegradedStandbyLock = new Object(); // Subscription storage for HA group state change notifications per client instance // Map key format: "clusterType:targetState" -> Set @@ -179,7 +184,7 @@ public static HAGroupStoreClient getInstanceForZkUrl(Configuration conf, String result = instances.getOrDefault(localZkUrl, new ConcurrentHashMap<>()) .getOrDefault(haGroupName, null); if (result == null || !result.isHealthy) { - result = new HAGroupStoreClient(conf, null, null, haGroupName, zkUrl); + result = new HAGroupStoreClient(conf, null, haGroupName, zkUrl); if (!result.isHealthy) { result.close(); result = null; @@ -244,10 +249,17 @@ public static List getHAGroupNames(String zkUrl) throws SQLException { return result; } + /** + * Visible-for-testing constructor. A non-null {@code pathChildrenCacheListener} replaces the + * default LOCAL cache listener only; it does not affect peer handling. There is no peer-listener + * injection point anymore - peer cache events are owned by {@link PeerClusterWatcher}. Tests that + * need peer-event visibility should subscribe with + * {@code subscribeToTargetState(state, ClusterType.PEER, listener)} or use the package-private + * {@code peerWatcher} field directly. + */ @VisibleForTesting HAGroupStoreClient(final Configuration conf, - final PathChildrenCacheListener pathChildrenCacheListener, - final PathChildrenCacheListener peerPathChildrenCacheListener, final String haGroupName, + final PathChildrenCacheListener pathChildrenCacheListener, final String haGroupName, final String zkUrl) { this.conf = conf; this.haGroupName = haGroupName; @@ -258,21 +270,22 @@ public static List getHAGroupNames(String zkUrl) throws SQLException { QueryServicesOptions.DEFAULT_REPLICATION_LOG_ROTATION_TIME_MS); this.legacyCrrSyncEnabled = conf.getBoolean(PHOENIX_HA_LEGACY_CRR_SYNC_ENABLED, DEFAULT_PHOENIX_HA_LEGACY_CRR_SYNC_ENABLED); - // Custom Event Listener - this.peerCustomPathChildrenCacheListener = peerPathChildrenCacheListener; + this.peerWatcher = new PeerClusterWatcher(conf, haGroupName, + ZK_CONSISTENT_HA_GROUP_RECORD_NAMESPACE, new PeerListener()); try { - // Initialize Phoenix HA Admin this.phoenixHaAdmin = new PhoenixHAAdmin(this.zkUrl, conf, ZK_CONSISTENT_HA_GROUP_RECORD_NAMESPACE); - // Initialize local cache + long initTimeoutMs = conf.getLong(PHOENIX_HA_GROUP_STORE_CLIENT_INITIALIZATION_TIMEOUT_MS, + DEFAULT_HA_GROUP_STORE_CLIENT_INITIALIZATION_TIMEOUT_MS); + PathChildrenCacheListener localListener = + pathChildrenCacheListener != null ? pathChildrenCacheListener : localCacheListener(); this.pathChildrenCache = - initializePathChildrenCache(phoenixHaAdmin, pathChildrenCacheListener, ClusterType.LOCAL); - // Initialize ZNode if not present in ZK + HAGroupStoreCacheUtil.startCache(phoenixHaAdmin.getCurator(), localListener, initTimeoutMs); initializeZNodeIfNeeded(); if (this.pathChildrenCache != null) { this.isHealthy = true; - // Initialize peer cache - maybeInitializePeerPathChildrenCache(); + HAGroupStoreRecord local = getHAGroupStoreRecord(); + peerWatcher.reconfigure(local != null ? local.getPeerZKUrl() : null); } else { LOGGER.error("PathChildrenCache is not initialized, HAGroupStoreClient for " + haGroupName + " is unhealthy"); @@ -311,14 +324,7 @@ public void rebuild() throws Exception { if (pathChildrenCache != null) { pathChildrenCache.rebuild(); } - if (peerPathChildrenCache != null) { - try { - peerPathChildrenCache.rebuild(); - } catch (Exception e) { - LOGGER.error("Unexpected error occurred while rebuilding peerPathChildrenCache, continuing", - e); - } - } + peerWatcher.rebuild(); LOGGER.info("Rebuild Complete for HAGroupStoreClient for HA group {}", haGroupName); } @@ -331,8 +337,7 @@ public HAGroupStoreRecord getHAGroupStoreRecord() throws IOException { if (!isHealthy) { throw new IOException("HAGroupStoreClient is not healthy"); } - return fetchCacheRecordAndPopulateZKIfNeeded(this.pathChildrenCache, ClusterType.LOCAL) - .getLeft(); + return fetchLocalRecordAndPopulateZKIfNeeded().getLeft(); } /** @@ -369,8 +374,7 @@ public long setHAGroupStatusIfNeeded(HAGroupStoreRecord.HAGroupState haGroupStat if (!isHealthy) { throw new IOException("HAGroupStoreClient is not healthy"); } - Pair cacheRecord = - fetchCacheRecordAndPopulateZKIfNeeded(this.pathChildrenCache, ClusterType.LOCAL); + Pair cacheRecord = fetchLocalRecordAndPopulateZKIfNeeded(); HAGroupStoreRecord currentHAGroupStoreRecord = cacheRecord.getLeft(); Stat currentHAGroupStoreRecordStat = cacheRecord.getRight(); if (currentHAGroupStoreRecord == null) { @@ -462,8 +466,7 @@ public HAGroupStoreRecord getHAGroupStoreRecordFromPeer() throws IOException { if (!isHealthy) { throw new IOException("HAGroupStoreClient is not healthy"); } - return fetchCacheRecordAndPopulateZKIfNeeded(this.peerPathChildrenCache, ClusterType.PEER) - .getLeft(); + return peerWatcher.getCurrentPeerRecord(); } private void initializeZNodeIfNeeded() throws IOException, SQLException { @@ -866,214 +869,197 @@ private void syncZKToSystemTable() { } } - private void maybeInitializePeerPathChildrenCache() throws IOException { - // There is an edge case when the cache is not initialized yet, but we get CHILD_ADDED event - // so we need to get the record from ZK. - HAGroupStoreRecord currentHAGroupStoreRecord = - phoenixHaAdmin.getHAGroupStoreRecordInZooKeeper(haGroupName).getLeft(); - if (currentHAGroupStoreRecord == null) { - LOGGER.error( - "Current HAGroupStoreRecord is null," + "skipping peer path children cache initialization"); - return; - } - String peerZKUrl = currentHAGroupStoreRecord.getPeerZKUrl(); - if (StringUtils.isNotBlank(peerZKUrl)) { - try { - // Setup peer connection if needed (first time or ZK Url changed) - if ( - peerPathChildrenCache == null || peerPhoenixHaAdmin != null - && !StringUtils.equals(peerZKUrl, peerPhoenixHaAdmin.getZkUrl()) - ) { - // Clean up existing peer connection if it exists - closePeerConnection(); - // Setup new peer connection - this.peerPhoenixHaAdmin = - new PhoenixHAAdmin(peerZKUrl, conf, ZK_CONSISTENT_HA_GROUP_RECORD_NAMESPACE); - // Create new PeerPathChildrenCache - this.peerPathChildrenCache = initializePathChildrenCache(peerPhoenixHaAdmin, - this.peerCustomPathChildrenCacheListener, ClusterType.PEER); - } - } catch (Exception e) { - closePeerConnection(); - LOGGER.error("Unable to initialize PeerPathChildrenCache for HAGroupStoreClient", e); - // Don't think we should mark HAGroupStoreClient as unhealthy if - // peerCache is unhealthy, if needed we can introduce a config to control behavior. - } - } else { - // Close Peer Cache for this HAGroupName if currentClusterRecord is null - // or peerZKUrl is blank - closePeerConnection(); - LOGGER.error("Not initializing PeerPathChildrenCache for HAGroupStoreClient " - + "with HAGroupName {} as peerZKUrl is blank", haGroupName); - } - } - - private PathChildrenCache initializePathChildrenCache(PhoenixHAAdmin admin, - PathChildrenCacheListener customListener, ClusterType cacheType) { - LOGGER.info("Initializing {} PathChildrenCache with URL {}", cacheType, admin.getZkUrl()); - PathChildrenCache newPathChildrenCache = null; - try { - newPathChildrenCache = - new PathChildrenCache(admin.getCurator(), ZKPaths.PATH_SEPARATOR, true); - final CountDownLatch latch = new CountDownLatch(1); - newPathChildrenCache.getListenable().addListener( - customListener != null ? customListener : createCacheListener(latch, cacheType)); - newPathChildrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT); - boolean initialized = - latch.await(conf.getLong(PHOENIX_HA_GROUP_STORE_CLIENT_INITIALIZATION_TIMEOUT_MS, - DEFAULT_HA_GROUP_STORE_CLIENT_INITIALIZATION_TIMEOUT_MS), TimeUnit.MILLISECONDS); - if (!initialized && customListener == null) { - newPathChildrenCache.close(); - return null; - } - return newPathChildrenCache; - } catch (Exception e) { - if (newPathChildrenCache != null) { - try { - newPathChildrenCache.close(); - } catch (IOException ioe) { - LOGGER.error("Failed to close {} PathChildrenCache with ZKUrl", cacheType, ioe); - } - } - LOGGER.error("Failed to initialize {} PathChildrenCache", cacheType, e); - return null; - } - } - - private PathChildrenCacheListener createCacheListener(CountDownLatch latch, - ClusterType cacheType) { + /** + * Listener for the local cache: tracks local state transitions (suppressing STANDBY while + * degraded), keeps the peer watcher pointed at the current peer ZK, drives the legacy CRR sync, + * and toggles health on connection loss/reconnect. The initial-load latch is handled by + * {@link HAGroupStoreCacheUtil#startCache}. + */ + private PathChildrenCacheListener localCacheListener() { return (client, event) -> { - final ChildData childData = event.getData(); - Pair eventRecordAndStat = - extractHAGroupStoreRecordOrNull(childData); - HAGroupStoreRecord eventRecord = eventRecordAndStat.getLeft(); - Stat eventStat = eventRecordAndStat.getRight(); - if (eventRecord != null && !Objects.equals(eventRecord.getHaGroupName(), haGroupName)) { + Pair recordAndStat = + HAGroupStoreCacheUtil.recordAndStat(event.getData()); + HAGroupStoreRecord record = recordAndStat.getLeft(); + if (record != null && !Objects.equals(record.getHaGroupName(), haGroupName)) { return; } - LOGGER.info( - "HAGroupStoreClient Cache {} received event {} type {} at {} with ZKUrl {} and " - + "PeerZKUrl {} for haGroupName {}", - cacheType, eventRecord, event.getType(), System.currentTimeMillis(), - phoenixHaAdmin.getZkUrl(), - peerPhoenixHaAdmin != null ? peerPhoenixHaAdmin.getZkUrl() : "peerPhoenixHaAdmin is null", - haGroupName); switch (event.getType()) { case CHILD_ADDED: case CHILD_UPDATED: - if (eventRecord != null && Objects.equals(eventRecord.getHaGroupName(), haGroupName)) { - handleStateChange(eventRecord, eventStat, cacheType); - // Reinitialize peer path children cache if peer url is added or updated. - if (cacheType == ClusterType.LOCAL) { - maybeInitializePeerPathChildrenCache(); - } - // Offload the legacy CRR sync (it does ZK + JDBC I/O) so we don't block - // Curator's per-namespace event dispatcher. - ScheduledExecutorService syncExec = legacyCrrSyncExecutor; - if (syncExec != null) { - try { - syncExec.execute(this::syncLegacyCRRIfRoleChanged); - } catch (RejectedExecutionException ree) { - // Executor already shutting down (close() race); drop silently. - LOGGER.debug("Legacy CRR sync skipped for HA group {}: executor shut down", - haGroupName); - } - } + if (record != null) { + handleLocalStateChange(record, recordAndStat.getRight()); + // Keep the peer watcher aimed at the current peer ZK; reconcile off the Curator + // dispatcher so a slow or unreachable peer never blocks local event processing. + peerWatcher.reconfigureAsync(record.getPeerZKUrl()); + offloadLegacyCrrSync(); } break; - case CHILD_REMOVED: - // No-op: the legacy /phoenix/ha znode is never deleted by this client. - break; - case INITIALIZED: - latch.countDown(); - break; case CONNECTION_LOST: case CONNECTION_SUSPENDED: - if (ClusterType.LOCAL.equals(cacheType)) { - isHealthy = false; - } - LOGGER.warn("{} HAGroupStoreClient cache connection lost/suspended", cacheType); + isHealthy = false; + LOGGER.warn("LOCAL HAGroupStoreClient cache connection lost/suspended for HA group {}", + haGroupName); break; case CONNECTION_RECONNECTED: - if (ClusterType.LOCAL.equals(cacheType)) { - isHealthy = true; - } - LOGGER.info("{} HAGroupStoreClient cache connection reconnected", cacheType); + isHealthy = true; + LOGGER.info("LOCAL HAGroupStoreClient cache connection reconnected for HA group {}", + haGroupName); break; default: - LOGGER.warn("Unexpected {} event type {}, complete event {}", cacheType, event.getType(), - event); + break; } }; } - private Pair - fetchCacheRecordAndPopulateZKIfNeeded(PathChildrenCache cache, ClusterType cacheType) { - if (cache == null) { - LOGGER.warn("{} HAGroupStoreClient cache is null, returning null", cacheType); + /** + * Read the local record from the cache; if absent, rebuild once from the system table (the znode + * may have been deleted) and re-read. Returns (null, null) when still absent. + */ + private Pair fetchLocalRecordAndPopulateZKIfNeeded() { + if (pathChildrenCache == null) { + LOGGER.warn("LOCAL HAGroupStoreClient cache is null for HA group {}, returning null", + haGroupName); return Pair.of(null, null); } String targetPath = toPath(this.haGroupName); - // Try to get record from current cache data - Pair result = extractRecordAndStat(cache, targetPath, cacheType); + Pair result = + HAGroupStoreCacheUtil.recordAndStatAt(pathChildrenCache, targetPath); if (result.getLeft() != null) { return result; } - - if (cacheType.equals(ClusterType.PEER)) { - return Pair.of(null, null); - } - // If no record found, try to rebuild and fetch again - LOGGER.info("No record found at path {} for {} cluster, trying to initialize ZNode " - + "from System Table in case it might have been deleted", targetPath, cacheType); + LOGGER.info("No record found at path {} for LOCAL cluster, trying to initialize ZNode from " + + "System Table in case it might have been deleted", targetPath); try { rebuild(); - return extractRecordAndStat(cache, targetPath, cacheType); + return HAGroupStoreCacheUtil.recordAndStatAt(pathChildrenCache, targetPath); } catch (Exception e) { - LOGGER.error( - "Failed to initialize ZNode from System Table, giving up " + "and returning null", e); + LOGGER.error("Failed to initialize ZNode from System Table, giving up and returning null", e); return Pair.of(null, null); } } - private Pair extractRecordAndStat(PathChildrenCache cache, - String targetPath, ClusterType cacheType) { - ChildData childData = cache.getCurrentData(targetPath); - if (childData != null) { - Pair recordAndStat = extractHAGroupStoreRecordOrNull(childData); - LOGGER.debug("Built {} cluster record: {}", cacheType, recordAndStat.getLeft()); - return recordAndStat; + /** + * The effective local HA record, or {@code null} when no local record exists. Identical to + * {@link #getHAGroupStoreRecord()} except that, while this RegionServer cannot see the peer, a + * local STANDBY is reported as DEGRADED_STANDBY. The DEGRADED_STANDBY overlay is in-memory only + * and is never written to the shared HA record; peer connectivity is never exposed directly. The + * overlay is applied only on top of a successfully read local record, so this requires the LOCAL + * client to be healthy: a lost LOCAL connection is bounded (Curator reconnects, or the + * RegionServer is aborted on prolonged ZK loss), so no stale-record fallback is provided here. + * @throws IOException if the LOCAL client is not healthy (same contract as + * {@link #getHAGroupStoreRecord()}) + */ + public HAGroupStoreRecord getEffectiveHAGroupStoreRecord() throws IOException { + synchronized (localDegradedStandbyLock) { + HAGroupStoreRecord local = getHAGroupStoreRecord(); + if ( + localDegradedStandbyActive && local != null + && local.getHAGroupState() == HAGroupState.STANDBY + ) { + return local.withHAGroupState(HAGroupState.DEGRADED_STANDBY); + } + return local; + } + } + + /** + * Receives peer observations from {@link PeerClusterWatcher}: peer record changes (already + * de-duplicated, with one forced redelivery on reconnect) and peer visible/blind transitions. + * Visibility transitions drive this RegionServer's local-only effective state; they are in-memory + * only and never written to the shared HA record. + */ + private final class PeerListener implements PeerClusterWatcher.Listener { + @Override + public void onPeerStateChanged(HAGroupStoreRecord peerRecord, Stat stat) { + HAGroupState fromState = lastKnownPeerState; + HAGroupState toState = peerRecord.getHAGroupState(); + lastKnownPeerState = toState; + LOGGER.info("Detected state transition for HA group {} from {} to {} on PEER cluster", + haGroupName, fromState, toState); + notifySubscribers(fromState, toState, stat != null ? stat.getMtime() : 0L, ClusterType.PEER, + peerRecord.getLastSyncStateTimeInMs()); + // A peer role change alters the combined legacy CRR; re-derive it off this callback thread. + offloadLegacyCrrSync(); + } + + @Override + public void onPeerVisible() { + clearLocalDegradedStandbyIfStillStandby(); + } + + @Override + public void onPeerBlind() { + presentLocalDegradedStandbyIfStandby(); } - return Pair.of(null, null); } - private Pair - extractHAGroupStoreRecordOrNull(final ChildData childData) { - if (childData != null) { - byte[] data = childData.getData(); - return Pair.of(HAGroupStoreRecord.fromJson(data).orElse(null), childData.getStat()); + /** Offload the legacy CRR sync (ZK + JDBC I/O) off the calling cache/event thread. */ + private void offloadLegacyCrrSync() { + ScheduledExecutorService syncExec = legacyCrrSyncExecutor; + if (syncExec != null) { + try { + syncExec.execute(this::syncLegacyCRRIfRoleChanged); + } catch (RejectedExecutionException ree) { + LOGGER.debug("Legacy CRR sync skipped for HA group {}: executor shut down", haGroupName); + } } - return Pair.of(null, null); } /** - * Closes the peer connection and cleans up peer-related resources. + * The peer became invisible. If this cluster is STANDBY, present DEGRADED_STANDBY locally so + * consumers treat this RegionServer as degraded until peer visibility returns. In-memory only; + * the shared HA record in ZK and local PathChildrenCache is untouched. */ - private void closePeerConnection() { - try { - if (peerPathChildrenCache != null) { - peerPathChildrenCache.close(); - peerPathChildrenCache = null; + private void presentLocalDegradedStandbyIfStandby() { + HAGroupStoreRecord local = null; + boolean degrade = false; + synchronized (localDegradedStandbyLock) { + local = readLocalRecordQuietly(); + if ( + !localDegradedStandbyActive && local != null + && local.getHAGroupState() == HAGroupState.STANDBY + ) { + localDegradedStandbyActive = true; + degrade = true; } - if (peerPhoenixHaAdmin != null) { - peerPhoenixHaAdmin.close(); - peerPhoenixHaAdmin = null; + } + if (degrade) { + LOGGER.warn("Peer not visible for HA group {}; presenting local DEGRADED_STANDBY " + + "(reason=peer-blind)", haGroupName); + notifySubscribers(HAGroupState.STANDBY, HAGroupState.DEGRADED_STANDBY, + System.currentTimeMillis(), ClusterType.LOCAL, local.getLastSyncStateTimeInMs()); + } + } + + /** + * The peer is visible again: lift the local-only degrade. If this cluster is still STANDBY, + * notify subscribers that the effective local state is STANDBY again. If it failed over while + * blind, the real state already reached subscribers. + */ + private void clearLocalDegradedStandbyIfStillStandby() { + HAGroupStoreRecord local = null; + boolean recover = false; + synchronized (localDegradedStandbyLock) { + if (!localDegradedStandbyActive) { + return; } - } catch (Exception e) { - LOGGER.warn("Failed to close peer connection", e); + local = readLocalRecordQuietly(); + localDegradedStandbyActive = false; + recover = local != null && local.getHAGroupState() == HAGroupState.STANDBY; + } + if (recover) { + LOGGER.warn("Peer visible again for HA group {}; clearing local DEGRADED_STANDBY " + + "(reason=peer-blind)", haGroupName); + notifySubscribers(HAGroupState.DEGRADED_STANDBY, HAGroupState.STANDBY, + System.currentTimeMillis(), ClusterType.LOCAL, local.getLastSyncStateTimeInMs()); } } + /** Current local record straight from the cache, or null; never triggers a rebuild. */ + private HAGroupStoreRecord readLocalRecordQuietly() { + return HAGroupStoreCacheUtil.recordAndStatAt(pathChildrenCache, toPath(haGroupName)).getLeft(); + } + /** * Remove this instance from the static {@link #instances} map. Idempotent. Uses value-based * remove so that, if a concurrent {@link #getInstanceForZkUrl} has already swapped in a fresh @@ -1120,11 +1106,11 @@ public void close() { // listener sees either a live or null reference, never half-closed. shutdownSyncExecutor(); shutdownLegacyCrrSyncExecutor(); + peerWatcher.close(); if (pathChildrenCache != null) { pathChildrenCache.close(); pathChildrenCache = null; } - closePeerConnection(); NodeCache nodeCache = this.legacyCrrNodeCache; this.legacyCrrNodeCache = null; if (nodeCache != null) { @@ -1359,6 +1345,15 @@ private void startLegacyCrrReconciliation() { /** * Subscribe to be notified when any transition to a target state occurs. + *

+ * Listener callbacks run on internal cache-event/transition threads and may be invoked while + * internal locks are held. A listener must not block and must not re-enter this client or the + * peer watcher (e.g. {@code getEffectiveHAGroupStoreRecord}, {@code reconfigure}, + * {@code subscribeToTargetState}); doing so risks deadlock or lock-order inversion. + *

+ * A PEER transition may be redelivered once after a peer reconnect (the watcher forces one + * redelivery so no transition is missed across the disconnect window), so listeners must tolerate + * duplicate notifications; side-effecting or counting listeners should be idempotent. * @param targetState the target state to watch for * @param clusterType whether to monitor local or peer cluster * @param listener the listener to notify when any transition to the target state occurs @@ -1391,31 +1386,43 @@ public void unsubscribeFromTargetState(HAGroupState targetState, ClusterType clu } /** - * Handle state change detection and notify subscribers if a transition occurred. - * @param newRecord the new HA group store record - * @param cacheType the type of cache (LOCAL or PEER) + * Handle a local state transition and notify subscribers. While the peer is not visible a local + * STANDBY must surface as DEGRADED_STANDBY (fail closed): if the overlay is already active the + * real STANDBY is suppressed, and if the local role only now reaches STANDBY while still blind the + * overlay is established here so it does not stay fail-open until the next visible->blind edge. */ - private void handleStateChange(HAGroupStoreRecord newRecord, Stat newStat, - ClusterType cacheType) { + private void handleLocalStateChange(HAGroupStoreRecord newRecord, Stat newStat) { HAGroupState newState = newRecord.getHAGroupState(); - HAGroupState oldState; - ClusterType clusterType; - - if (ClusterType.LOCAL.equals(cacheType)) { - oldState = lastKnownLocalState; - lastKnownLocalState = newState; - clusterType = ClusterType.LOCAL; - } else { - oldState = lastKnownPeerState; - lastKnownPeerState = newState; - clusterType = ClusterType.PEER; + HAGroupState oldState = lastKnownLocalState; + lastKnownLocalState = newState; + // Read the degrade flag under its lock for consistency with the present/clear paths; the + // notification below runs outside the lock so we never hold it across subscriber callbacks. The + // read-then-decide is not atomic with clearLocalDegradedStandbyIfStillStandby, so a concurrent + // clear can yield a transient duplicate STANDBY notification; subscribers are + // duplicate-tolerant. + boolean degraded; + synchronized (localDegradedStandbyLock) { + degraded = localDegradedStandbyActive; + } + // Fail closed for a final STANDBY while the peer is not visible. STANDBY_TO_ACTIVE and + // ABORT_TO_STANDBY pass through because replay listens for those failover signals. + if (newState == HAGroupState.STANDBY) { + if (degraded) { + LOGGER.info("Suppressing LOCAL STANDBY notification for HA group {} while peer is not " + + "visible (still presenting DEGRADED_STANDBY)", haGroupName); + return; + } + if (peerWatcher.isBlind()) { + // The role reached STANDBY after the peer went blind, so the visible->blind edge never + // presented the overlay. Establish it now (idempotent) instead of emitting a bare STANDBY. + presentLocalDegradedStandbyIfStandby(); + return; + } } - - // Only notify if there's an actual state transition or initial state if (oldState == null || !oldState.equals(newState)) { - LOGGER.info("Detected state transition for HA group {} from {} to {} on {} cluster", - haGroupName, oldState, newState, clusterType); - notifySubscribers(oldState, newState, newStat.getMtime(), clusterType, + LOGGER.info("Detected state transition for HA group {} from {} to {} on LOCAL cluster", + haGroupName, oldState, newState); + notifySubscribers(oldState, newState, newStat.getMtime(), ClusterType.LOCAL, newRecord.getLastSyncStateTimeInMs()); } } diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreManager.java b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreManager.java index 9d1c7e19a8a..4240748fc03 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreManager.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreManager.java @@ -287,6 +287,22 @@ public Optional getHAGroupStoreRecord(final String haGroupNa return Optional.ofNullable(haGroupStoreClient.getHAGroupStoreRecord()); } + /** + * Returns the effective HAGroupStoreRecord the replayer should act on. This matches + * {@link #getHAGroupStoreRecord(String)} except that, while this RegionServer's replay is failed + * closed because the peer cluster is not visible, a STANDBY record is reported as + * DEGRADED_STANDBY. Peer connectivity itself is never exposed. + * @param haGroupName name of the HA group + * @return Optional effective HAGroupStoreRecord, empty if the HA group is not found. + * @throws IOException when HAGroupStoreClient is not healthy. + */ + public Optional getEffectiveHAGroupStoreRecord(final String haGroupName) + throws IOException { + HAGroupStoreClient haGroupStoreClient = + getHAGroupStoreClientAndSetupFailoverManagement(haGroupName); + return Optional.ofNullable(haGroupStoreClient.getEffectiveHAGroupStoreRecord()); + } + /** * Returns the HAGroupStoreRecord for a specific HA group from peer cluster. * @param haGroupName name of the HA group @@ -448,6 +464,7 @@ public long setReaderToDegraded(final String haGroupName) throws IOException, if (currentRecord == null) { throw new IOException("Current HAGroupStoreRecord is null for HA group: " + haGroupName); } + LOGGER.info("Persisting DEGRADED_STANDBY (reason=reader-degrade) for HA group {}", haGroupName); return haGroupStoreClient .setHAGroupStatusIfNeeded(HAGroupStoreRecord.HAGroupState.DEGRADED_STANDBY); } @@ -506,6 +523,15 @@ public ClusterRoleRecord getClusterRoleRecord(String haGroupName) throws SQLExce /** * Subscribe to be notified when any transition to a target state occurs. + *

+ * Listener callbacks run on internal cache-event/transition threads and may be invoked while + * internal locks are held. A listener must not block and must not re-enter the HAGroupStore + * client or the peer watcher (e.g. {@code getEffectiveHAGroupStoreRecord}, {@code reconfigure}, + * {@code subscribeToTargetState}); doing so risks deadlock or lock-order inversion. + *

+ * A PEER transition may be redelivered once after a peer reconnect (the watcher forces one + * redelivery so no transition is missed across the disconnect window), so listeners must tolerate + * duplicate notifications; side-effecting or counting listeners should be idempotent. * @param haGroupName the name of the HA group to monitor * @param toState the target state to watch for * @param clusterType whether to monitor local or peer cluster diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreRecord.java b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreRecord.java index b72cc39b1cd..0494ca9d344 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreRecord.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreRecord.java @@ -58,6 +58,14 @@ public enum HAGroupState { ACTIVE_NOT_IN_SYNC_WITH_OFFLINE_PEER, ACTIVE_IN_SYNC_TO_STANDBY, ACTIVE_WITH_OFFLINE_PEER, + /** + * Degraded standby. Used two ways: (1) a persisted state for a reader-degraded standby (see + * {@link HAGroupStoreManager#setReaderToDegraded(String)}), governed by the allowedTransitions + * below; and (2) a local, in-memory effective state that + * {@link HAGroupStoreClient#getEffectiveHAGroupStoreRecord()} presents when a STANDBY cluster + * cannot see its peer - that overlay is never persisted to ZK and is cleared (not transitioned) + * when the peer becomes visible again. + */ DEGRADED_STANDBY, OFFLINE, STANDBY, @@ -179,6 +187,16 @@ public HAGroupStoreRecord(@JsonProperty("protocolVersion") String protocolVersio this.peerHdfsUrl = peerHdfsUrl; } + /** + * Returns a copy of this record with the HA group state replaced. Used to present an effective + * state (for example DEGRADED_STANDBY when the peer is not visible) without mutating the + * persisted record. Thread-safe: returns a new immutable instance and never mutates this record. + */ + public HAGroupStoreRecord withHAGroupState(HAGroupState newState) { + return new HAGroupStoreRecord(protocolVersion, haGroupName, newState, lastSyncStateTimeInMs, + policy, peerZKUrl, clusterUrl, peerClusterUrl, hdfsUrl, peerHdfsUrl, adminCRRVersion); + } + public static Optional fromJson(byte[] bytes) { if (bytes == null) { return Optional.empty(); diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PeerClusterWatcher.java b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PeerClusterWatcher.java new file mode 100644 index 00000000000..884c3f6c42f --- /dev/null +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PeerClusterWatcher.java @@ -0,0 +1,410 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.jdbc; + +import static org.apache.phoenix.jdbc.PhoenixHAAdmin.toPath; +import static org.apache.phoenix.query.QueryServices.HA_GROUP_STORE_PEER_CACHE_RETRY_INTERVAL_SECONDS; +import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_HA_GROUP_STORE_PEER_CACHE_RETRY_INTERVAL_SECONDS; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Objects; +import java.util.concurrent.Executors; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.curator.framework.recipes.cache.PathChildrenCache; +import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; +import org.apache.hadoop.conf.Configuration; +import org.apache.zookeeper.data.Stat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.MoreExecutors; + +/** + * Watches one peer cluster's {@link HAGroupStoreRecord} over the peer's ZooKeeper, for a single HA + * group on one RegionServer. Owns the peer cache + admin and a background retry, and reports peer + * state changes (de-duplicated by znode version, with one forced redelivery after a reconnect) and + * visible<->blind transitions. Thread-safe; the (possibly blocking) cache build runs on the + * caller's thread for the initial {@link #reconfigure} and on the retry executor afterwards, never + * holding {@link #connectionStateLock}. Listener callbacks fire outside the lock. + */ +final class PeerClusterWatcher implements Closeable { + + /** + * Sink for what the watcher observes. Implemented by {@link HAGroupStoreClient}. + *

+ * Threading: {@link #onPeerStateChanged} fires on the peer Curator event thread; + * {@link #onPeerVisible} / {@link #onPeerBlind} fire on whichever thread drives the visibility + * transition (the constructor's caller, the retry executor, or the Curator event thread). All + * callbacks run while the watcher holds {@code transitionLock} and outside + * {@code connectionStateLock}; implementations must not re-enter the watcher (for example + * {@code reconfigure} / {@code close}) from a callback and should offload any blocking work. + */ + interface Listener { + /** + * Current peer HA record. May be redelivered once after reconnect even if the znode version did + * not change; consumers must tolerate duplicate same-state delivery. + */ + void onPeerStateChanged(HAGroupStoreRecord peerRecord, Stat stat); + + /** Peer connectivity is visible again; this is not a peer HA state transition. */ + void onPeerVisible(); + + /** Peer connectivity is unavailable; this is not a peer HA state transition. */ + void onPeerBlind(); + } + + private enum Visibility { + UNKNOWN, + VISIBLE, + BLIND + } + + private static final Logger LOGGER = LoggerFactory.getLogger(PeerClusterWatcher.class); + private static final long RETRY_WARN_EVERY_N_ATTEMPTS = 10L; + + private final Configuration conf; + private final String haGroupName; + private final String namespace; + private final Listener listener; + private final long initTimeoutMs; + private final long retryIntervalSec; + + // Serializes a whole reconcile (close/build/publish) so the constructor's synchronous reconcile + // and the retry executor never build concurrently. Held only by ensureConnection; + // connectionStateLock is taken briefly inside it for field access (ordering is always + // reconcileLock -> connectionStateLock). + private final Object reconcileLock = new Object(); + private final Object connectionStateLock = new Object(); + // Serializes a visibility transition with its notification so concurrent transitions (peer event + // thread vs reconcile/retry executor) cannot reorder the notifications they deliver. + private final Object transitionLock = new Object(); + private String peerZkUrl; // desired peer; blank = none + private PhoenixHAAdmin admin; + private PathChildrenCache cache; + private int lastDeliveredVersion = -1; + private long peerCacheRetryAttempts = 0L; + private volatile boolean closed = false; + private ScheduledExecutorService retryExecutor; + private volatile Visibility visibility = Visibility.UNKNOWN; + + PeerClusterWatcher(Configuration conf, String haGroupName, String namespace, Listener listener) { + this.conf = conf; + this.haGroupName = haGroupName; + this.namespace = namespace; + this.listener = listener; + this.initTimeoutMs = + conf.getLong(HAGroupStoreClient.PHOENIX_HA_GROUP_STORE_CLIENT_INITIALIZATION_TIMEOUT_MS, + HAGroupStoreClient.DEFAULT_HA_GROUP_STORE_CLIENT_INITIALIZATION_TIMEOUT_MS); + this.retryIntervalSec = conf.getLong(HA_GROUP_STORE_PEER_CACHE_RETRY_INTERVAL_SECONDS, + DEFAULT_HA_GROUP_STORE_PEER_CACHE_RETRY_INTERVAL_SECONDS); + // Always create the executor so reconfigure can run off the caller's (Curator) thread; only + // schedule the periodic retry when an interval is configured. + this.retryExecutor = Executors.newSingleThreadScheduledExecutor(r -> { + Thread t = new Thread(r, "PeerClusterWatcher-" + haGroupName); + t.setDaemon(true); + return t; + }); + if (retryIntervalSec > 0) { + long initialDelaySec = ThreadLocalRandom.current().nextLong(1, retryIntervalSec + 1); + this.retryExecutor.scheduleAtFixedRate(this::retryIfBlind, initialDelaySec, retryIntervalSec, + TimeUnit.SECONDS); + } + } + + /** Set/change/clear the peer and reconcile the connection synchronously. */ + void reconfigure(String url) { + synchronized (connectionStateLock) { + if (closed) { + return; + } + peerZkUrl = url; + } + ensureConnection(); + } + + /** Reconcile off the caller's thread; used from the Curator event thread. */ + void reconfigureAsync(String url) { + ScheduledExecutorService ex = retryExecutor; + if (ex == null) { + reconfigure(url); + return; + } + try { + ex.execute(() -> reconfigure(url)); + } catch (RejectedExecutionException e) { + LOGGER.debug("Peer reconfigure skipped for HA group {}: watcher closing", haGroupName); + } + } + + /** Current peer record, or null when the peer is not visible. */ + HAGroupStoreRecord getCurrentPeerRecord() { + // Read the cache under the lock: close() nulls the field and closes the cache only after + // acquiring this lock, so the O(1) in-memory read here always sees a live cache. + synchronized (connectionStateLock) { + return HAGroupStoreCacheUtil.recordAndStatAt(cache, toPath(haGroupName)).getLeft(); + } + } + + /** Visible for testing: true when the peer is not currently visible (unknown or lost). */ + boolean isBlind() { + return visibility != Visibility.VISIBLE; + } + + /** Visible for testing: whether the peer cache is currently built and live. */ + boolean hasPeerCache() { + synchronized (connectionStateLock) { + return cache != null; + } + } + + /** Blocking, event-free rebuild of the peer cache (mirrors {@code PathChildrenCache.rebuild}). */ + void rebuild() { + PathChildrenCache c; + synchronized (connectionStateLock) { + c = cache; + } + if (c != null) { + try { + c.rebuild(); + } catch (Exception e) { + LOGGER.error("Peer cache rebuild failed for HA group {}", haGroupName, e); + } + } + } + + @Override + public void close() { + ScheduledExecutorService ex; + synchronized (connectionStateLock) { + closed = true; + ex = retryExecutor; + retryExecutor = null; + } + if (ex != null) { + MoreExecutors.shutdownAndAwaitTermination(ex, 5, TimeUnit.SECONDS); + } + closeConnection(); + } + + private void retryIfBlind() { + boolean needsBuild; + String desired; + long attempt = 0L; + synchronized (connectionStateLock) { + needsBuild = !closed && StringUtils.isNotBlank(peerZkUrl) && cache == null; + desired = peerZkUrl; + if (needsBuild) { + attempt = ++peerCacheRetryAttempts; + } + } + if (needsBuild) { + if (attempt == 1L || attempt % RETRY_WARN_EVERY_N_ATTEMPTS == 0L) { + LOGGER.warn("Retrying peer cache build for HA group {} with peer ZK URL {} (attempt {})", + haGroupName, desired, attempt); + } else { + LOGGER.debug("Retrying peer cache build for HA group {} with peer ZK URL {} (attempt {})", + haGroupName, desired, attempt); + } + ensureConnection(); + } + } + + /** + * Reconcile the live connection to the desired peer URL. Serialized by {@link #reconcileLock} so + * builds never overlap; the (possibly blocking) build runs without holding + * {@link #connectionStateLock}. + */ + private void ensureConnection() { + synchronized (reconcileLock) { + String desired; + synchronized (connectionStateLock) { + if (closed) { + return; + } + desired = peerZkUrl; + if ( + StringUtils.isNotBlank(desired) && cache != null + && StringUtils.equals(desired, admin.getZkUrl()) + ) { + return; // already connected to this peer + } + } + closeConnection(); + if (StringUtils.isBlank(desired)) { + setVisible(); // no peer configured: nothing to be blind about + return; + } + PhoenixHAAdmin newAdmin = null; + PathChildrenCache newCache = null; + try { + newAdmin = new PhoenixHAAdmin(desired, conf, namespace); + newCache = HAGroupStoreCacheUtil.startCache(newAdmin.getCurator(), peerCacheListener(), + initTimeoutMs); + } catch (Exception e) { + LOGGER.error("Unable to build peer cache for HA group {}", haGroupName, e); + } + if (newCache == null) { + closeAdminQuietly(newAdmin); + setBlind(); + return; + } + boolean published; + synchronized (connectionStateLock) { + published = !closed; + if (published) { + admin = newAdmin; + cache = newCache; + peerCacheRetryAttempts = 0L; + } + } + if (published) { + setVisible(); + } else { + closeCacheQuietly(newCache); + closeAdminQuietly(newAdmin); + } + } + } + + private PathChildrenCacheListener peerCacheListener() { + return (client, event) -> { + switch (event.getType()) { + case CHILD_ADDED: + case CHILD_UPDATED: + deliver(HAGroupStoreCacheUtil.recordAndStat(event.getData()), false); + break; + case CONNECTION_RECONNECTED: + onReconnected(); + break; + case CONNECTION_SUSPENDED: + case CONNECTION_LOST: + setBlind(); + break; + default: + break; + } + }; + } + + private void onReconnected() { + // Force one redelivery of the current peer record after a reconnect: while the peer ZK + // connection was down we may have missed CHILD_UPDATED events, so we re-deliver to guarantee no + // peer state transition is dropped across the disconnect window. Subscribers tolerate the + // duplicate (per the Listener contract). Snapshot under the lock so a concurrent close() cannot + // turn this into a read off a closed cache; deliver() runs outside the lock and no-ops if + // empty. + Pair snapshot; + synchronized (connectionStateLock) { + lastDeliveredVersion = -1; // bypass the de-dup check so the forced redelivery is not skipped + snapshot = HAGroupStoreCacheUtil.recordAndStatAt(cache, toPath(haGroupName)); + } + setVisible(); + deliver(snapshot, true); + } + + private void deliver(Pair recordAndStat, boolean forced) { + HAGroupStoreRecord record = recordAndStat.getLeft(); + if (record == null || !Objects.equals(record.getHaGroupName(), haGroupName)) { + return; + } + Stat stat = recordAndStat.getRight(); + synchronized (connectionStateLock) { + int version = stat != null ? stat.getVersion() : -1; + if (!forced && version <= lastDeliveredVersion) { + return; // duplicate or stale peer event + } + lastDeliveredVersion = version; + } + listener.onPeerStateChanged(record, stat); + } + + // Package-private for tests; production callers reach these via ensureConnection / the cache + // listener. The whole decide-and-notify runs under transitionLock so a concurrent opposite + // transition cannot interleave or reorder its notification with this one. The closed/visibility + // decision is taken under connectionStateLock so a concurrent close() (which sets closed before + // nulling the cache) cannot leave a stale VISIBLE after the cache is gone. The listener callback + // runs outside connectionStateLock. + void setVisible() { + synchronized (transitionLock) { + synchronized (connectionStateLock) { + if (closed || visibility == Visibility.VISIBLE) { + return; + } + visibility = Visibility.VISIBLE; + } + LOGGER.info("Peer visible for HA group {}", haGroupName); + listener.onPeerVisible(); + } + } + + void setBlind() { + synchronized (transitionLock) { + String url; + synchronized (connectionStateLock) { + if (closed || visibility == Visibility.BLIND) { + return; + } + visibility = Visibility.BLIND; + url = peerZkUrl; + } + LOGGER.warn("Peer not visible for HA group {} (peer ZK {}); peer ZK may be unreachable", + haGroupName, url); + listener.onPeerBlind(); + } + } + + private void closeConnection() { + PathChildrenCache c; + PhoenixHAAdmin a; + synchronized (connectionStateLock) { + c = cache; + cache = null; + a = admin; + admin = null; + lastDeliveredVersion = -1; + } + closeCacheQuietly(c); + closeAdminQuietly(a); + } + + private void closeCacheQuietly(PathChildrenCache c) { + if (c != null) { + try { + c.close(); + } catch (IOException e) { + LOGGER.warn("Failed to close peer cache for HA group {}", haGroupName, e); + } + } + } + + private void closeAdminQuietly(PhoenixHAAdmin a) { + if (a != null) { + try { + a.close(); + } catch (Exception e) { + LOGGER.warn("Failed to close peer admin for HA group {}", haGroupName, e); + } + } + } +} diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java index 0d2a4f33c2b..93eda74fd45 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java @@ -659,6 +659,18 @@ public interface QueryServices extends SQLCloseable { // HA Group Store sync job interval in seconds String HA_GROUP_STORE_SYNC_INTERVAL_SECONDS = "phoenix.ha.group.store.sync.interval.seconds"; + /** + * How often (in seconds) to retry building the peer HA group store cache when the peer ZK is + * unreachable. Defaults to + * {@link QueryServicesOptions#DEFAULT_HA_GROUP_STORE_PEER_CACHE_RETRY_INTERVAL_SECONDS}. A + * non-positive value disables the periodic retry; the watcher still creates its executor so peer + * reconfigures run off the Curator event thread. Retries log the first and every tenth attempt at + * WARN and the rest at DEBUG. In large deployments (many RegionServers x many HA groups), + * consider raising this further: during a sustained peer ZK outage each HA group retries on this + * fixed period, so the aggregate retry rate scales with RegionServer count x HA-group count. + */ + String HA_GROUP_STORE_PEER_CACHE_RETRY_INTERVAL_SECONDS = + "phoenix.ha.group.store.peer.cache.retry.interval.seconds"; // "CRR" = Cluster Role Record. Master switch for syncing the legacy /phoenix/ha cluster // role record from /phoenix/consistentHA. When false, no legacy znode is read, written, or diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java index b4c68f07414..2a158112e6a 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java @@ -61,6 +61,7 @@ import static org.apache.phoenix.query.QueryServices.GROUPBY_SPILLABLE_ATTRIB; import static org.apache.phoenix.query.QueryServices.GROUPBY_SPILL_FILES_ATTRIB; import static org.apache.phoenix.query.QueryServices.HA_GROUP_STORE_CLIENT_PREWARM_ENABLED; +import static org.apache.phoenix.query.QueryServices.HA_GROUP_STORE_PEER_CACHE_RETRY_INTERVAL_SECONDS; import static org.apache.phoenix.query.QueryServices.HA_GROUP_STORE_SYNC_INTERVAL_SECONDS; import static org.apache.phoenix.query.QueryServices.HBASE_CLIENT_SCANNER_TIMEOUT_ATTRIB; import static org.apache.phoenix.query.QueryServices.IMMUTABLE_ROWS_ATTRIB; @@ -527,6 +528,8 @@ public class QueryServicesOptions { // Default HA Group Store sync job interval in seconds (15 minutes = 900 seconds) public static final int DEFAULT_HA_GROUP_STORE_SYNC_INTERVAL_SECONDS = 900; + // Default peer HA group store cache retry interval in seconds. + public static final long DEFAULT_HA_GROUP_STORE_PEER_CACHE_RETRY_INTERVAL_SECONDS = 60L; // Legacy /phoenix/ha CRR sync is opt-in (default off). public static final boolean DEFAULT_PHOENIX_HA_LEGACY_CRR_SYNC_ENABLED = false; @@ -660,6 +663,8 @@ public static QueryServicesOptions withDefaults() { .setIfUnset(REPLICATION_LOG_ROTATION_TIME_MS_KEY, DEFAULT_REPLICATION_LOG_ROTATION_TIME_MS) .setIfUnset(HA_GROUP_STORE_SYNC_INTERVAL_SECONDS, DEFAULT_HA_GROUP_STORE_SYNC_INTERVAL_SECONDS) + .setIfUnset(HA_GROUP_STORE_PEER_CACHE_RETRY_INTERVAL_SECONDS, + DEFAULT_HA_GROUP_STORE_PEER_CACHE_RETRY_INTERVAL_SECONDS) .setIfUnset(HA_GROUP_STORE_CLIENT_PREWARM_ENABLED, DEFAULT_HA_GROUP_STORE_CLIENT_PREWARM_ENABLED) .setIfUnset(PHOENIX_HA_LEGACY_CRR_SYNC_ENABLED, DEFAULT_PHOENIX_HA_LEGACY_CRR_SYNC_ENABLED) diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java index 0a862d640aa..ea9580142c5 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java @@ -464,7 +464,8 @@ protected ReplicationLogGroup(Configuration conf, ServerName serverName, String */ protected void init() throws IOException { LOG.info("Initializing ReplicationLogGroup {}", haGroupName); - Optional haRecord = haGroupStoreManager.getHAGroupStoreRecord(haGroupName); + Optional haRecord = + haGroupStoreManager.getEffectiveHAGroupStoreRecord(haGroupName); if (!haRecord.isPresent()) { String message = String.format("HAGroup %s got an empty group store record while initializing mode", this); diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplay.java b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplay.java index 4d5f886d510..91fa625d19d 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplay.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplay.java @@ -459,9 +459,15 @@ protected boolean getFailoverPending() { return this.failoverPending.get(); } + /** + * Effective HA record used to decide the replay mode at startup. A STANDBY whose peer cluster is + * not currently visible is reported as DEGRADED_STANDBY, so this RegionServer starts failed + * closed until the peer is confirmed reachable. Runtime degrade/recover transitions arrive + * through the LOCAL state subscribers registered in {@link #init()}. + */ protected HAGroupStoreRecord getHAGroupRecord() throws IOException { Optional optionalHAGroupStateRecord = - HAGroupStoreManager.getInstance(conf).getHAGroupStoreRecord(haGroupName); + HAGroupStoreManager.getInstance(conf).getEffectiveHAGroupStoreRecord(haGroupName); if (!optionalHAGroupStateRecord.isPresent()) { throw new IOException("HAGroupStoreRecord not found for HA Group: " + haGroupName); } diff --git a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreClientIT.java b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreClientIT.java index e70dbd5bda7..f5348759fe8 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreClientIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreClientIT.java @@ -22,6 +22,7 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_HA_GROUP_NAME; import static org.apache.phoenix.jdbc.PhoenixHAAdmin.getLocalZkUrl; import static org.apache.phoenix.jdbc.PhoenixHAAdmin.toPath; +import static org.apache.phoenix.query.QueryServices.HA_GROUP_STORE_PEER_CACHE_RETRY_INTERVAL_SECONDS; import static org.apache.phoenix.query.QueryServices.PHOENIX_HA_LEGACY_CRR_RECONCILIATION_INTERVAL_SECONDS; import static org.apache.phoenix.query.QueryServices.PHOENIX_HA_LEGACY_CRR_SYNC_ENABLED; import static org.apache.phoenix.replication.reader.ReplicationLogReplayService.PHOENIX_REPLICATION_REPLAY_ENABLED; @@ -51,9 +52,11 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Predicate; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.Pair; +import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; @@ -171,11 +174,11 @@ public void testHAGroupStoreClientChangingPeerZKUrlToNullUrlToValidUrlToInvalidU assert currentRecord != null && currentRecord.getHAGroupState() == HAGroupStoreRecord.HAGroupState.ACTIVE_IN_SYNC; - // Base case: Check that peerPathChildrenCache is not null in HAGroupStoreClient via reflection - Field peerPathChildrenCache = - HAGroupStoreClient.class.getDeclaredField("peerPathChildrenCache"); - peerPathChildrenCache.setAccessible(true); - assertNotNull(peerPathChildrenCache.get(haGroupStoreClient)); + // Base case: the peer watcher should have built its cache for the valid peer. + Field peerWatcherField = HAGroupStoreClient.class.getDeclaredField("peerWatcher"); + peerWatcherField.setAccessible(true); + PeerClusterWatcher peerWatcher = (PeerClusterWatcher) peerWatcherField.get(haGroupStoreClient); + assertTrue(peerWatcher.hasPeerCache()); // Now update peerZKUrl to null and rebuild record = @@ -184,7 +187,7 @@ record = CLUSTERS.getHdfsUrl1(), CLUSTERS.getHdfsUrl2(), 0L); createOrUpdateHAGroupStoreRecordOnZookeeper(haAdmin, haGroupName, record); Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS); - assertNull(peerPathChildrenCache.get(haGroupStoreClient)); + assertFalse(peerWatcher.hasPeerCache()); // Now update System table to contain valid peer ZK URL and also change local cluster role to // STANDBY @@ -197,8 +200,8 @@ record = new HAGroupStoreRecord("v1.0", haGroupName, HAGroupStoreRecord.HAGroupS assertNotNull(currentRecord); assertEquals(HAGroupStoreRecord.HAGroupState.STANDBY, currentRecord.getHAGroupState()); - // Check that peerPathChildrenCache is not null now in HAGroupStoreClient via reflection - assertNotNull(peerPathChildrenCache.get(haGroupStoreClient)); + // The peer watcher should have rebuilt its cache for the restored valid peer. + assertTrue(peerWatcher.hasPeerCache()); // Now update local HAGroupStoreRecord to STANDBY to verify that HAGroupStoreClient is working // as normal @@ -234,8 +237,8 @@ record = new HAGroupStoreRecord("v1.0", haGroupName, HAGroupStoreRecord.HAGroupS ClusterRoleRecord.ClusterRole.UNKNOWN, 0); assertEquals(expected, clusterRoleRecord); - // Check that peerPathChildrenCache is null now in HAGroupStoreClient via reflection - assertNull(peerPathChildrenCache.get(haGroupStoreClient)); + // Invalid/unreachable peer URL: the watcher tears the old cache down and cannot rebuild it. + assertFalse(peerWatcher.hasPeerCache()); } @Test @@ -583,6 +586,463 @@ public void testThrowsExceptionWithZKDisconnectionAndThenConnection() throws Exc assertTrue((boolean) isHealthyField.get(haGroupStoreClient)); } + /** + * After a peer ZK session loss and reconnect, the client re-delivers the current peer state to + * subscribers even though the value did not change while disconnected. A subscriber registered + * after the initial delivery sees zero deliveries until the reconnect, then at least one - which + * would not happen without the forced reconnect redelivery. + */ + @Test + public void testPeerReconnectRedeliversUnchangedPeerState() throws Exception { + String haGroupName = testName.getMethodName(); + // Local record (this cluster ACTIVE) pointing at the peer (cluster 2). + createOrUpdateHAGroupStoreRecordOnZookeeper(haAdmin, haGroupName, + new HAGroupStoreRecord("v1.0", haGroupName, HAGroupStoreRecord.HAGroupState.ACTIVE_IN_SYNC, + 0L, HighAvailabilityPolicy.FAILOVER.toString(), this.peerZKUrl, this.masterUrl, + this.peerMasterUrl, CLUSTERS.getHdfsUrl1(), CLUSTERS.getHdfsUrl2(), 0L)); + // Peer record (cluster 2) in STANDBY. + createOrUpdateHAGroupStoreRecordOnZookeeper(peerHaAdmin, haGroupName, + new HAGroupStoreRecord("v1.0", haGroupName, HAGroupStoreRecord.HAGroupState.STANDBY, 0L, + HighAvailabilityPolicy.FAILOVER.toString(), this.peerZKUrl, this.peerMasterUrl, + this.masterUrl, CLUSTERS.getHdfsUrl1(), CLUSTERS.getHdfsUrl2(), 0L)); + + HAGroupStoreClient haGroupStoreClient = HAGroupStoreClient + .getInstanceForZkUrl(CLUSTERS.getHBaseCluster1().getConfiguration(), haGroupName, zkUrl); + assertNotNull(haGroupStoreClient); + Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS); + + // Subscribe AFTER the initial peer delivery, so only a reconnect redelivery can fire this. + AtomicInteger peerStandbyDeliveries = new AtomicInteger(0); + haGroupStoreClient.subscribeToTargetState(HAGroupStoreRecord.HAGroupState.STANDBY, + ClusterType.PEER, + (group, from, to, mtime, clusterType, lastSync) -> peerStandbyDeliveries.incrementAndGet()); + assertEquals("No delivery expected before reconnect (peer state unchanged)", 0, + peerStandbyDeliveries.get()); + + // Bounce the peer ZK on the same port to trigger peer CONNECTION_RECONNECTED. + int peerZkPort = Integer.parseInt( + CLUSTERS.getHBaseCluster2().getConfiguration().get("hbase.zookeeper.property.clientPort")); + CLUSTERS.getHBaseCluster2().shutdownMiniZKCluster(); + Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS); + CLUSTERS.getHBaseCluster2().startMiniZKCluster(1, peerZkPort); + + long deadline = System.currentTimeMillis() + 60000L; + while (peerStandbyDeliveries.get() < 1 && System.currentTimeMillis() < deadline) { + Thread.sleep(500L); + } + assertTrue("Peer reconnect should redeliver the unchanged peer state at least once", + peerStandbyDeliveries.get() >= 1); + } + + /** + * When peer ZK is unreachable at startup the peer cache cannot be built, but the background retry + * creates it within one retry interval once peer ZK returns. + */ + @Test + public void testPeerCacheRetryCreatesCacheAfterPeerZkReturns() throws Exception { + String haGroupName = testName.getMethodName(); + createOrUpdateHAGroupStoreRecordOnZookeeper(haAdmin, haGroupName, + new HAGroupStoreRecord("v1.0", haGroupName, HAGroupStoreRecord.HAGroupState.ACTIVE_IN_SYNC, + 0L, HighAvailabilityPolicy.FAILOVER.toString(), this.peerZKUrl, this.masterUrl, + this.peerMasterUrl, CLUSTERS.getHdfsUrl1(), CLUSTERS.getHdfsUrl2(), 0L)); + + int peerZkPort = Integer.parseInt( + CLUSTERS.getHBaseCluster2().getConfiguration().get("hbase.zookeeper.property.clientPort")); + // Peer ZK down before the client starts. + CLUSTERS.getHBaseCluster2().shutdownMiniZKCluster(); + Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS); + + Configuration conf = new Configuration(CLUSTERS.getHBaseCluster1().getConfiguration()); + // Retry quickly so the peer cache recovers within seconds once peer ZK returns. + conf.setLong(HA_GROUP_STORE_PEER_CACHE_RETRY_INTERVAL_SECONDS, 2L); + + HAGroupStoreClient haGroupStoreClient = + HAGroupStoreClient.getInstanceForZkUrl(conf, haGroupName, zkUrl); + assertNotNull(haGroupStoreClient); + + Field peerWatcherField = HAGroupStoreClient.class.getDeclaredField("peerWatcher"); + peerWatcherField.setAccessible(true); + PeerClusterWatcher peerWatcher = (PeerClusterWatcher) peerWatcherField.get(haGroupStoreClient); + assertFalse("Peer cache should be absent while peer ZK is down", peerWatcher.hasPeerCache()); + + // Bring peer ZK back; the retry should build the peer cache within ~one interval. + CLUSTERS.getHBaseCluster2().startMiniZKCluster(1, peerZkPort); + long deadline = System.currentTimeMillis() + 60000L; + while (!peerWatcher.hasPeerCache() && System.currentTimeMillis() < deadline) { + Thread.sleep(500L); + } + assertTrue("Peer cache should be created by the retry after peer ZK returns", + peerWatcher.hasPeerCache()); + } + + /** + * When the peer becomes invisible while this cluster is STANDBY, the client fails local replay + * closed by presenting DEGRADED_STANDBY (LOCAL) without touching the persisted record, and + * recovers (presents STANDBY again) once the peer is visible. + */ + @Test + public void testPeerLossDegradesLocalStandbyAndRecovers() throws Exception { + String haGroupName = testName.getMethodName(); + createOrUpdateHAGroupStoreRecordOnZookeeper(haAdmin, haGroupName, + new HAGroupStoreRecord("v1.0", haGroupName, HAGroupStoreRecord.HAGroupState.STANDBY, 0L, + HighAvailabilityPolicy.FAILOVER.toString(), this.peerZKUrl, this.masterUrl, + this.peerMasterUrl, CLUSTERS.getHdfsUrl1(), CLUSTERS.getHdfsUrl2(), 0L)); + + int peerZkPort = Integer.parseInt( + CLUSTERS.getHBaseCluster2().getConfiguration().get("hbase.zookeeper.property.clientPort")); + + HAGroupStoreClient haGroupStoreClient = HAGroupStoreClient + .getInstanceForZkUrl(CLUSTERS.getHBaseCluster1().getConfiguration(), haGroupName, zkUrl); + assertNotNull(haGroupStoreClient); + Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS); + + // Peer visible: the effective record matches the real STANDBY. + assertEquals(HAGroupStoreRecord.HAGroupState.STANDBY, + haGroupStoreClient.getEffectiveHAGroupStoreRecord().getHAGroupState()); + + AtomicInteger degrades = new AtomicInteger(0); + AtomicInteger recovers = new AtomicInteger(0); + haGroupStoreClient.subscribeToTargetState(HAGroupStoreRecord.HAGroupState.DEGRADED_STANDBY, + ClusterType.LOCAL, (g, f, t, m, c, s) -> degrades.incrementAndGet()); + haGroupStoreClient.subscribeToTargetState(HAGroupStoreRecord.HAGroupState.STANDBY, + ClusterType.LOCAL, (g, f, t, m, c, s) -> recovers.incrementAndGet()); + + // Peer ZK goes away: the watcher reports blind and the client fails local replay closed. + CLUSTERS.getHBaseCluster2().shutdownMiniZKCluster(); + long deadline = System.currentTimeMillis() + 60000L; + while (degrades.get() < 1 && System.currentTimeMillis() < deadline) { + Thread.sleep(500L); + } + assertTrue("Peer loss should fail local replay closed (LOCAL DEGRADED_STANDBY notification)", + degrades.get() >= 1); + assertEquals(HAGroupStoreRecord.HAGroupState.DEGRADED_STANDBY, + haGroupStoreClient.getEffectiveHAGroupStoreRecord().getHAGroupState()); + assertEquals("Peer-blind degrade must not update the persisted HA record", + HAGroupStoreRecord.HAGroupState.STANDBY, + haGroupStoreClient.getHAGroupStoreRecord().getHAGroupState()); + + // Peer ZK returns: the watcher reports visible and the client recovers local replay. + CLUSTERS.getHBaseCluster2().startMiniZKCluster(1, peerZkPort); + deadline = System.currentTimeMillis() + 60000L; + while (recovers.get() < 1 && System.currentTimeMillis() < deadline) { + Thread.sleep(500L); + } + assertTrue("Peer recovery should recover local replay (LOCAL STANDBY notification)", + recovers.get() >= 1); + assertEquals(HAGroupStoreRecord.HAGroupState.STANDBY, + haGroupStoreClient.getEffectiveHAGroupStoreRecord().getHAGroupState()); + } + + /** + * Failing replay closed on peer loss is gated on this cluster being STANDBY. An ACTIVE cluster + * losing sight of the peer must not degrade: the effective record stays unchanged. + */ + @Test + public void testPeerLossDoesNotDegradeWhenLocalActive() throws Exception { + String haGroupName = testName.getMethodName(); + createOrUpdateHAGroupStoreRecordOnZookeeper(haAdmin, haGroupName, + new HAGroupStoreRecord("v1.0", haGroupName, HAGroupStoreRecord.HAGroupState.ACTIVE_IN_SYNC, + 0L, HighAvailabilityPolicy.FAILOVER.toString(), this.peerZKUrl, this.masterUrl, + this.peerMasterUrl, CLUSTERS.getHdfsUrl1(), CLUSTERS.getHdfsUrl2(), 0L)); + + int peerZkPort = Integer.parseInt( + CLUSTERS.getHBaseCluster2().getConfiguration().get("hbase.zookeeper.property.clientPort")); + + HAGroupStoreClient haGroupStoreClient = HAGroupStoreClient + .getInstanceForZkUrl(CLUSTERS.getHBaseCluster1().getConfiguration(), haGroupName, zkUrl); + assertNotNull(haGroupStoreClient); + Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS); + + AtomicInteger degrades = new AtomicInteger(0); + haGroupStoreClient.subscribeToTargetState(HAGroupStoreRecord.HAGroupState.DEGRADED_STANDBY, + ClusterType.LOCAL, (g, f, t, m, c, s) -> degrades.incrementAndGet()); + + Field peerWatcherField = HAGroupStoreClient.class.getDeclaredField("peerWatcher"); + peerWatcherField.setAccessible(true); + PeerClusterWatcher peerWatcher = (PeerClusterWatcher) peerWatcherField.get(haGroupStoreClient); + + // Peer ZK goes away; wait until the watcher actually observes the peer as blind. + CLUSTERS.getHBaseCluster2().shutdownMiniZKCluster(); + long deadline = System.currentTimeMillis() + 60000L; + while (!peerWatcher.isBlind() && System.currentTimeMillis() < deadline) { + Thread.sleep(500L); + } + assertTrue("Peer should be observed as blind after peer ZK shutdown", peerWatcher.isBlind()); + + // ACTIVE cluster must not fail replay closed: no degrade and the effective record is unchanged. + assertEquals(0, degrades.get()); + assertEquals(HAGroupStoreRecord.HAGroupState.ACTIVE_IN_SYNC, + haGroupStoreClient.getEffectiveHAGroupStoreRecord().getHAGroupState()); + + // Restore peer ZK so later tests start from a healthy peer. + CLUSTERS.getHBaseCluster2().startMiniZKCluster(1, peerZkPort); + } + + /** + * While failed closed on peer loss, a local record that leaves and re-enters STANDBY (e.g. a + * failover that is started then aborted) must not recover the replayer: the re-entry to STANDBY + * is suppressed so recovery happens only once the peer is visible again. + */ + @Test + public void testDegradedStandbySuppressesLocalStandbyUntilPeerVisible() throws Exception { + String haGroupName = testName.getMethodName(); + createOrUpdateHAGroupStoreRecordOnZookeeper(haAdmin, haGroupName, + new HAGroupStoreRecord("v1.0", haGroupName, HAGroupStoreRecord.HAGroupState.STANDBY, 0L, + HighAvailabilityPolicy.FAILOVER.toString(), this.peerZKUrl, this.masterUrl, + this.peerMasterUrl, CLUSTERS.getHdfsUrl1(), CLUSTERS.getHdfsUrl2(), 0L)); + + int peerZkPort = Integer.parseInt( + CLUSTERS.getHBaseCluster2().getConfiguration().get("hbase.zookeeper.property.clientPort")); + + HAGroupStoreClient haGroupStoreClient = HAGroupStoreClient + .getInstanceForZkUrl(CLUSTERS.getHBaseCluster1().getConfiguration(), haGroupName, zkUrl); + assertNotNull(haGroupStoreClient); + Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS); + + AtomicInteger localStandby = new AtomicInteger(0); + AtomicInteger localAbortToStandby = new AtomicInteger(0); + haGroupStoreClient.subscribeToTargetState(HAGroupStoreRecord.HAGroupState.STANDBY, + ClusterType.LOCAL, (groupName, fromState, toState, modifiedTime, clusterType, + lastSyncStateTimeInMs) -> localStandby.incrementAndGet()); + haGroupStoreClient.subscribeToTargetState(HAGroupStoreRecord.HAGroupState.ABORT_TO_STANDBY, + ClusterType.LOCAL, (groupName, fromState, toState, modifiedTime, clusterType, + lastSyncStateTimeInMs) -> localAbortToStandby.incrementAndGet()); + + // Peer goes blind: we fail closed (DEGRADED_STANDBY). + CLUSTERS.getHBaseCluster2().shutdownMiniZKCluster(); + long deadline = System.currentTimeMillis() + 60000L; + while ( + haGroupStoreClient.getEffectiveHAGroupStoreRecord().getHAGroupState() + != HAGroupStoreRecord.HAGroupState.DEGRADED_STANDBY + && System.currentTimeMillis() < deadline + ) { + Thread.sleep(500L); + } + assertEquals(HAGroupStoreRecord.HAGroupState.DEGRADED_STANDBY, + haGroupStoreClient.getEffectiveHAGroupStoreRecord().getHAGroupState()); + + // While still blind, the local record moves to STANDBY_TO_ACTIVE, aborts, and returns to + // STANDBY. The abort signal must be delivered, but final STANDBY must stay suppressed. + createOrUpdateHAGroupStoreRecordOnZookeeper(haAdmin, haGroupName, + new HAGroupStoreRecord("v1.0", haGroupName, HAGroupStoreRecord.HAGroupState.STANDBY_TO_ACTIVE, + 0L, HighAvailabilityPolicy.FAILOVER.toString(), this.peerZKUrl, this.masterUrl, + this.peerMasterUrl, CLUSTERS.getHdfsUrl1(), CLUSTERS.getHdfsUrl2(), 0L)); + Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS); + createOrUpdateHAGroupStoreRecordOnZookeeper(haAdmin, haGroupName, + new HAGroupStoreRecord("v1.0", haGroupName, HAGroupStoreRecord.HAGroupState.ABORT_TO_STANDBY, + 0L, HighAvailabilityPolicy.FAILOVER.toString(), this.peerZKUrl, this.masterUrl, + this.peerMasterUrl, CLUSTERS.getHdfsUrl1(), CLUSTERS.getHdfsUrl2(), 0L)); + Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS); + createOrUpdateHAGroupStoreRecordOnZookeeper(haAdmin, haGroupName, + new HAGroupStoreRecord("v1.0", haGroupName, HAGroupStoreRecord.HAGroupState.STANDBY, 0L, + HighAvailabilityPolicy.FAILOVER.toString(), this.peerZKUrl, this.masterUrl, + this.peerMasterUrl, CLUSTERS.getHdfsUrl1(), CLUSTERS.getHdfsUrl2(), 0L)); + Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS); + + assertEquals("LOCAL ABORT_TO_STANDBY must still be delivered while peer is not visible", 1, + localAbortToStandby.get()); + assertEquals("Local STANDBY must be suppressed while the peer is not visible", 0, + localStandby.get()); + assertEquals(HAGroupStoreRecord.HAGroupState.DEGRADED_STANDBY, + haGroupStoreClient.getEffectiveHAGroupStoreRecord().getHAGroupState()); + + // Peer returns: recovery is now allowed and LOCAL STANDBY is delivered. + CLUSTERS.getHBaseCluster2().startMiniZKCluster(1, peerZkPort); + deadline = System.currentTimeMillis() + 60000L; + while (localStandby.get() < 1 && System.currentTimeMillis() < deadline) { + Thread.sleep(500L); + } + assertTrue("Peer recovery should deliver LOCAL STANDBY", localStandby.get() >= 1); + assertEquals(HAGroupStoreRecord.HAGroupState.STANDBY, + haGroupStoreClient.getEffectiveHAGroupStoreRecord().getHAGroupState()); + } + + /** + * If a failover proceeds while the peer is not visible (STANDBY -> ... -> ACTIVE), peer + * recovery must not force a synthetic LOCAL STANDBY: the cluster is no longer a standby, so the + * replayer must keep the real ACTIVE state rather than be rewound into recovery. + */ + @Test + public void testRecoveryDoesNotForceStandbyAfterFailoverWhileBlind() throws Exception { + String haGroupName = testName.getMethodName(); + createOrUpdateHAGroupStoreRecordOnZookeeper(haAdmin, haGroupName, + new HAGroupStoreRecord("v1.0", haGroupName, HAGroupStoreRecord.HAGroupState.STANDBY, 0L, + HighAvailabilityPolicy.FAILOVER.toString(), this.peerZKUrl, this.masterUrl, + this.peerMasterUrl, CLUSTERS.getHdfsUrl1(), CLUSTERS.getHdfsUrl2(), 0L)); + + int peerZkPort = Integer.parseInt( + CLUSTERS.getHBaseCluster2().getConfiguration().get("hbase.zookeeper.property.clientPort")); + + HAGroupStoreClient haGroupStoreClient = HAGroupStoreClient + .getInstanceForZkUrl(CLUSTERS.getHBaseCluster1().getConfiguration(), haGroupName, zkUrl); + assertNotNull(haGroupStoreClient); + Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS); + + AtomicInteger localStandby = new AtomicInteger(0); + haGroupStoreClient.subscribeToTargetState(HAGroupStoreRecord.HAGroupState.STANDBY, + ClusterType.LOCAL, (g, f, t, m, c, s) -> localStandby.incrementAndGet()); + + Field peerWatcherField = HAGroupStoreClient.class.getDeclaredField("peerWatcher"); + peerWatcherField.setAccessible(true); + PeerClusterWatcher peerWatcher = (PeerClusterWatcher) peerWatcherField.get(haGroupStoreClient); + + // Peer goes blind: we fail closed. + CLUSTERS.getHBaseCluster2().shutdownMiniZKCluster(); + long deadline = System.currentTimeMillis() + 60000L; + while (!peerWatcher.isBlind() && System.currentTimeMillis() < deadline) { + Thread.sleep(500L); + } + assertTrue("Peer should be observed blind", peerWatcher.isBlind()); + + // A real failover proceeds while blind: STANDBY -> STANDBY_TO_ACTIVE -> ACTIVE_IN_SYNC. + createOrUpdateHAGroupStoreRecordOnZookeeper(haAdmin, haGroupName, + new HAGroupStoreRecord("v1.0", haGroupName, HAGroupStoreRecord.HAGroupState.STANDBY_TO_ACTIVE, + 0L, HighAvailabilityPolicy.FAILOVER.toString(), this.peerZKUrl, this.masterUrl, + this.peerMasterUrl, CLUSTERS.getHdfsUrl1(), CLUSTERS.getHdfsUrl2(), 0L)); + Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS); + createOrUpdateHAGroupStoreRecordOnZookeeper(haAdmin, haGroupName, + new HAGroupStoreRecord("v1.0", haGroupName, HAGroupStoreRecord.HAGroupState.ACTIVE_IN_SYNC, + 0L, HighAvailabilityPolicy.FAILOVER.toString(), this.peerZKUrl, this.masterUrl, + this.peerMasterUrl, CLUSTERS.getHdfsUrl1(), CLUSTERS.getHdfsUrl2(), 0L)); + Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS); + + // Peer returns; wait until it is observed visible (recovery runs on that transition). + CLUSTERS.getHBaseCluster2().startMiniZKCluster(1, peerZkPort); + deadline = System.currentTimeMillis() + 60000L; + while (peerWatcher.isBlind() && System.currentTimeMillis() < deadline) { + Thread.sleep(500L); + } + assertFalse("Peer should be observed visible again", peerWatcher.isBlind()); + Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS); + + // Cluster is ACTIVE now: recovery must not have forced a LOCAL STANDBY. + assertEquals(0, localStandby.get()); + assertEquals(HAGroupStoreRecord.HAGroupState.ACTIVE_IN_SYNC, + haGroupStoreClient.getEffectiveHAGroupStoreRecord().getHAGroupState()); + } + + /** + * Fail closed must engage even when the local role reaches STANDBY after the peer is + * already blind. Starting ACTIVE (which does not degrade) then failing over to STANDBY while the + * peer is not visible must present DEGRADED_STANDBY, not a bare STANDBY (which would be fail-open). + */ + @Test + public void testPeerBlindThenLocalBecomesStandbyDegrades() throws Exception { + String haGroupName = testName.getMethodName(); + createOrUpdateHAGroupStoreRecordOnZookeeper(haAdmin, haGroupName, + new HAGroupStoreRecord("v1.0", haGroupName, HAGroupStoreRecord.HAGroupState.ACTIVE_IN_SYNC, 0L, + HighAvailabilityPolicy.FAILOVER.toString(), this.peerZKUrl, this.masterUrl, + this.peerMasterUrl, CLUSTERS.getHdfsUrl1(), CLUSTERS.getHdfsUrl2(), 0L)); + + int peerZkPort = Integer.parseInt( + CLUSTERS.getHBaseCluster2().getConfiguration().get("hbase.zookeeper.property.clientPort")); + + HAGroupStoreClient haGroupStoreClient = HAGroupStoreClient + .getInstanceForZkUrl(CLUSTERS.getHBaseCluster1().getConfiguration(), haGroupName, zkUrl); + assertNotNull(haGroupStoreClient); + Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS); + + AtomicInteger degrades = new AtomicInteger(0); + haGroupStoreClient.subscribeToTargetState(HAGroupStoreRecord.HAGroupState.DEGRADED_STANDBY, + ClusterType.LOCAL, (g, f, t, m, c, s) -> degrades.incrementAndGet()); + + Field peerWatcherField = HAGroupStoreClient.class.getDeclaredField("peerWatcher"); + peerWatcherField.setAccessible(true); + PeerClusterWatcher peerWatcher = (PeerClusterWatcher) peerWatcherField.get(haGroupStoreClient); + + try { + // Peer goes blind while local is ACTIVE: no degrade yet (ACTIVE does not fail closed). + CLUSTERS.getHBaseCluster2().shutdownMiniZKCluster(); + long deadline = System.currentTimeMillis() + 60000L; + while (!peerWatcher.isBlind() && System.currentTimeMillis() < deadline) { + Thread.sleep(500L); + } + assertTrue("Peer should be observed as blind after peer ZK shutdown", peerWatcher.isBlind()); + assertEquals(0, degrades.get()); + + // Fail over to STANDBY while still blind: ACTIVE_IN_SYNC -> ACTIVE_IN_SYNC_TO_STANDBY -> + // STANDBY. The final STANDBY must present DEGRADED_STANDBY rather than stay fail-open. + createOrUpdateHAGroupStoreRecordOnZookeeper(haAdmin, haGroupName, new HAGroupStoreRecord( + "v1.0", haGroupName, HAGroupStoreRecord.HAGroupState.ACTIVE_IN_SYNC_TO_STANDBY, 0L, + HighAvailabilityPolicy.FAILOVER.toString(), this.peerZKUrl, this.masterUrl, + this.peerMasterUrl, CLUSTERS.getHdfsUrl1(), CLUSTERS.getHdfsUrl2(), 0L)); + Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS); + createOrUpdateHAGroupStoreRecordOnZookeeper(haAdmin, haGroupName, + new HAGroupStoreRecord("v1.0", haGroupName, HAGroupStoreRecord.HAGroupState.STANDBY, 0L, + HighAvailabilityPolicy.FAILOVER.toString(), this.peerZKUrl, this.masterUrl, + this.peerMasterUrl, CLUSTERS.getHdfsUrl1(), CLUSTERS.getHdfsUrl2(), 0L)); + + deadline = System.currentTimeMillis() + 60000L; + while ( + haGroupStoreClient.getEffectiveHAGroupStoreRecord().getHAGroupState() + != HAGroupStoreRecord.HAGroupState.DEGRADED_STANDBY + && System.currentTimeMillis() < deadline + ) { + Thread.sleep(500L); + } + assertEquals("Reaching STANDBY while the peer is blind must present DEGRADED_STANDBY", + HAGroupStoreRecord.HAGroupState.DEGRADED_STANDBY, + haGroupStoreClient.getEffectiveHAGroupStoreRecord().getHAGroupState()); + assertTrue("A DEGRADED_STANDBY notification must be delivered", degrades.get() >= 1); + assertEquals("Peer-blind degrade must not update the persisted HA record", + HAGroupStoreRecord.HAGroupState.STANDBY, + haGroupStoreClient.getHAGroupStoreRecord().getHAGroupState()); + } finally { + // Restore peer ZK so later tests start from a healthy peer. + CLUSTERS.getHBaseCluster2().startMiniZKCluster(1, peerZkPort); + } + } + + /** + * Cold start with the peer ZK already down: a STANDBY whose peer cache can never be built at + * startup must still present DEGRADED_STANDBY (fail closed) from the synchronous reconcile during + * initialization, without writing the persisted record. + */ + @Test + public void testColdStartPeerZkDownPresentsDegradedStandby() throws Exception { + String haGroupName = testName.getMethodName(); + createOrUpdateHAGroupStoreRecordOnZookeeper(haAdmin, haGroupName, + new HAGroupStoreRecord("v1.0", haGroupName, HAGroupStoreRecord.HAGroupState.STANDBY, 0L, + HighAvailabilityPolicy.FAILOVER.toString(), this.peerZKUrl, this.masterUrl, + this.peerMasterUrl, CLUSTERS.getHdfsUrl1(), CLUSTERS.getHdfsUrl2(), 0L)); + + int peerZkPort = Integer.parseInt( + CLUSTERS.getHBaseCluster2().getConfiguration().get("hbase.zookeeper.property.clientPort")); + // Peer ZK down before the client starts, so the peer cache can never be built at init. + CLUSTERS.getHBaseCluster2().shutdownMiniZKCluster(); + Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS); + + Configuration conf = new Configuration(CLUSTERS.getHBaseCluster1().getConfiguration()); + // Retry quickly so a missed cold-start present would still recover within seconds. + conf.setLong(HA_GROUP_STORE_PEER_CACHE_RETRY_INTERVAL_SECONDS, 2L); + + try { + HAGroupStoreClient haGroupStoreClient = + HAGroupStoreClient.getInstanceForZkUrl(conf, haGroupName, zkUrl); + assertNotNull(haGroupStoreClient); + + // Effective state is DEGRADED_STANDBY even though the peer was never reachable; the persisted + // record stays STANDBY. + long deadline = System.currentTimeMillis() + 60000L; + while ( + haGroupStoreClient.getEffectiveHAGroupStoreRecord().getHAGroupState() + != HAGroupStoreRecord.HAGroupState.DEGRADED_STANDBY + && System.currentTimeMillis() < deadline + ) { + Thread.sleep(500L); + } + assertEquals("Cold start with peer ZK down must present DEGRADED_STANDBY", + HAGroupStoreRecord.HAGroupState.DEGRADED_STANDBY, + haGroupStoreClient.getEffectiveHAGroupStoreRecord().getHAGroupState()); + assertEquals("Cold-start peer-blind degrade must not update the persisted HA record", + HAGroupStoreRecord.HAGroupState.STANDBY, + haGroupStoreClient.getHAGroupStoreRecord().getHAGroupState()); + } finally { + // Restore peer ZK so later tests start from a healthy peer. + CLUSTERS.getHBaseCluster2().startMiniZKCluster(1, peerZkPort); + } + } + // Tests for setHAGroupStatusIfNeeded method @Test public void testSetHAGroupStatusIfNeededDeleteZKAndSystemTableRecord() throws Exception { @@ -824,6 +1284,43 @@ public void testSetHAGroupStatusIfNeededMultipleTransitions() throws Exception { assertEquals(HAGroupStoreRecord.HAGroupState.STANDBY, afterSecond.getHAGroupState()); } + /** + * Regression test for the startCache INITIALIZED-latch contract: a LOCAL cache listener that + * throws while handling the INITIALIZED event must not strand startup. startCache releases its + * initial-load latch in a finally, so the client still comes up healthy; dropping that finally + * (which regressed once) would time out startCache and leave the client unhealthy. + */ + @Test + public void testStartCacheReleasesLatchWhenInitializedListenerThrows() throws Exception { + String haGroupName = testName.getMethodName(); + createOrUpdateHAGroupStoreRecordOnZookeeper(haAdmin, haGroupName, + new HAGroupStoreRecord("v1.0", haGroupName, HAGroupStoreRecord.HAGroupState.ACTIVE_IN_SYNC, + 0L, HighAvailabilityPolicy.FAILOVER.toString(), this.peerZKUrl, this.masterUrl, + this.peerMasterUrl, CLUSTERS.getHdfsUrl1(), CLUSTERS.getHdfsUrl2(), 0L)); + + PathChildrenCacheListener throwingOnInitialized = (client, event) -> { + if (event.getType() == PathChildrenCacheEvent.Type.INITIALIZED) { + throw new RuntimeException("Intentional failure while handling INITIALIZED"); + } + }; + + // Short init timeout so a regression (no finally) fails fast instead of blocking the 30s + // default. + Configuration testConf = new Configuration(CLUSTERS.getHBaseCluster1().getConfiguration()); + testConf.setLong(HAGroupStoreClient.PHOENIX_HA_GROUP_STORE_CLIENT_INITIALIZATION_TIMEOUT_MS, + 5000L); + + try (HAGroupStoreClient haGroupStoreClient = + new HAGroupStoreClient(testConf, throwingOnInitialized, haGroupName, zkUrl)) { + // Healthy means startCache returned the cache despite the throwing INITIALIZED listener; an + // unhealthy client would instead throw IOException("not healthy") from getHAGroupStoreRecord. + HAGroupStoreRecord record = haGroupStoreClient.getHAGroupStoreRecord(); + assertNotNull("Client must initialize healthy despite a throwing INITIALIZED listener", + record); + assertEquals(HAGroupStoreRecord.HAGroupState.ACTIVE_IN_SYNC, record.getHAGroupState()); + } + } + /** * This test verifies that the updates coming via PathChildrenCacheListener are in order in which * updates are sent to ZK @@ -853,7 +1350,7 @@ public void testHAGroupStoreClientWithMultiThreadedUpdates() throws Exception { // Start a new HAGroupStoreClient with custom listener. new HAGroupStoreClient(CLUSTERS.getHBaseCluster1().getConfiguration(), - pathChildrenCacheListener, null, haGroupName, zkUrl); + pathChildrenCacheListener, haGroupName, zkUrl); // Create multiple threads for update to ZK. final CountDownLatch updateLatch = new CountDownLatch(threadCount); @@ -1142,7 +1639,7 @@ public void testPeriodicSyncJobExecutorStartsAndSyncsData() throws Exception { testConf.setLong("phoenix.ha.group.store.sync.interval.seconds", 15); try (HAGroupStoreClient haGroupStoreClient = - new HAGroupStoreClient(testConf, null, null, haGroupName, zkUrl)) { + new HAGroupStoreClient(testConf, null, haGroupName, zkUrl)) { // The sync executor must be started and running. Field syncExecutorField = HAGroupStoreClient.class.getDeclaredField("syncExecutor"); diff --git a/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplayTestIT.java b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplayTestIT.java index 628652d0016..6e3b1cc85ec 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplayTestIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplayTestIT.java @@ -17,6 +17,7 @@ */ package org.apache.phoenix.replication.reader; +import static org.apache.phoenix.jdbc.PhoenixHAAdmin.toPath; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; @@ -35,6 +36,7 @@ import java.util.List; import java.util.Optional; import java.util.concurrent.TimeUnit; +import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -46,11 +48,13 @@ import org.apache.phoenix.jdbc.HAGroupStoreManager; import org.apache.phoenix.jdbc.HAGroupStoreRecord; import org.apache.phoenix.jdbc.HighAvailabilityPolicy; +import org.apache.phoenix.jdbc.PhoenixHAAdmin; import org.apache.phoenix.replication.ReplicationLogTracker; import org.apache.phoenix.replication.ReplicationRound; import org.apache.phoenix.replication.ReplicationShardDirectoryManager; import org.apache.phoenix.replication.metrics.*; import org.apache.phoenix.util.HAGroupStoreTestUtil; +import org.apache.zookeeper.data.Stat; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Rule; @@ -1116,6 +1120,174 @@ public void testReplay_StateTransition_DegradedToSyncedRecovery() throws IOExcep } } + /** + * Tests the runtime degrade -> abort -> recover sequence. The replayer starts DEGRADED with a + * failover already requested; the failover is then aborted (failoverPending cleared) and the + * cluster recovers via SYNCED_RECOVERY back to SYNC. Validates that the aborted failover is NOT + * triggered when replay returns to SYNC, that lastRoundInSync is preserved during DEGRADED and + * caught up after recovery, and that the final state is SYNC. Abort is simulated via + * setFailoverPending, matching how the other transition tests in this class drive listener-driven + * changes. + */ + @Test + public void testReplay_StateTransition_DegradedToAbortToRecover() throws IOException { + TestableReplicationLogTracker fileTracker = + createReplicationLogTracker(conf1, haGroupName, rootFs, rootUri); + + try { + long initialEndTime = 1704499200000L; + long roundTimeMills = + fileTracker.getReplicationShardDirectoryManager().getReplicationRoundDurationSeconds() + * 1000L; + long bufferMillis = (long) (roundTimeMills * 0.15); + + // DEGRADED record (peer-blind degraded standby). + HAGroupStoreRecord mockRecord = + new HAGroupStoreRecord(HAGroupStoreRecord.DEFAULT_PROTOCOL_VERSION, haGroupName, + HAGroupStoreRecord.HAGroupState.DEGRADED_STANDBY, initialEndTime, + HighAvailabilityPolicy.FAILOVER.toString(), peerZkUrl, CLUSTERS.getMasterAddress1(), + CLUSTERS.getMasterAddress2(), CLUSTERS.getHdfsUrl1(), CLUSTERS.getHdfsUrl2(), 0L); + + // Allow processing up to 5 rounds. + long currentTime = initialEndTime + (5 * roundTimeMills) + bufferMillis; + EnvironmentEdge edge = () -> currentTime; + EnvironmentEdgeManager.injectEdge(edge); + + try { + TestableReplicationLogDiscoveryReplay discovery = + new TestableReplicationLogDiscoveryReplay(fileTracker, mockRecord); + discovery.init(); + + ReplicationRound lastInSyncRound = + new ReplicationRound(initialEndTime - roundTimeMills, initialEndTime); + discovery.setLastRoundProcessed(new ReplicationRound(initialEndTime + roundTimeMills, + initialEndTime + (2 * roundTimeMills))); + discovery.setLastRoundInSync(lastInSyncRound); + discovery + .setReplicationReplayState(ReplicationLogDiscoveryReplay.ReplicationReplayState.DEGRADED); + // A failover was requested while the cluster was degraded. + discovery.setFailoverPending(true); + + // Abort the failover after round 1, then recover (SYNCED_RECOVERY) after round 2 - same + // round dynamics as the degrade->recovery test: 2 in DEGRADED, rewind, then 5 in SYNC. + discovery.setFailoverPendingAfterRounds(1, false); + discovery.setStateChangeAfterRounds(2, + ReplicationLogDiscoveryReplay.ReplicationReplayState.SYNCED_RECOVERY); + + discovery.replay(); + + assertEquals("processRound should be called 7 times", 7, + discovery.getProcessRoundCallCount()); + + // The abort cleared the pending failover... + assertFalse("Failover pending should be cleared by the abort", + discovery.getFailoverPending()); + // ...so even though replay returned to SYNC, the failover must NOT be triggered. + assertEquals("Aborted failover must not be triggered on return to SYNC", 0, + discovery.getTriggerFailoverCallCount()); + + // Recovery completed: state back to SYNC and in-sync caught up to last processed. + assertEquals("State should be SYNC after recovery", + ReplicationLogDiscoveryReplay.ReplicationReplayState.SYNC, + discovery.getReplicationReplayState()); + + ReplicationRound expectedFinalRound = new ReplicationRound( + initialEndTime + (4 * roundTimeMills), initialEndTime + (5 * roundTimeMills)); + assertEquals("Last round processed should be the final round", expectedFinalRound, + discovery.getLastRoundProcessed()); + assertEquals("Last round in sync should match last round processed after recovery", + expectedFinalRound, discovery.getLastRoundInSync()); + } finally { + EnvironmentEdgeManager.reset(); + } + } finally { + fileTracker.close(); + } + } + + /** + * End-to-end runtime test that drives ReplicationLogDiscoveryReplay's real LOCAL listeners + * (degraded/recovery/trigger/abort) through a degrade -> failover-request -> abort -> + * recover sequence by transitioning the LOCAL HA record in ZooKeeper, asserting the replayer's + * runtime state reacts. Uses the null-injected-record discovery so it reads the real effective + * record and subscribes its listeners to HAGroupStoreManager - exercising the listener wiring + * that the setter-driven transition tests above do not. + */ + @Test + public void testReplay_RuntimeListeners_DegradeAbortRecover() throws Exception { + PhoenixHAAdmin haAdmin = CLUSTERS.getHaAdmin1(); + // Base state: local STANDBY, written before init() so the discovery's client picks it up. + writeLocalRecord(haAdmin, HAGroupStoreRecord.HAGroupState.STANDBY); + + TestableReplicationLogTracker fileTracker = + createReplicationLogTracker(conf1, haGroupName, rootFs, rootUri); + fileTracker.init(); + try { + // Null injected record: read the real effective record and subscribe the real LOCAL listeners + // to HAGroupStoreManager. + TestableReplicationLogDiscoveryReplay discovery = + new TestableReplicationLogDiscoveryReplay(fileTracker, null); + discovery.init(); + // Let the initial STANDBY load settle so it cannot race the degrade below. + Thread.sleep(5000L); + + // degrade: LOCAL -> DEGRADED_STANDBY must drive the degradedListener to DEGRADED. + writeLocalRecord(haAdmin, HAGroupStoreRecord.HAGroupState.DEGRADED_STANDBY); + awaitCondition( + () -> discovery.getReplicationReplayState() + == ReplicationLogDiscoveryReplay.ReplicationReplayState.DEGRADED, + "degradedListener should drive replay state to DEGRADED"); + + // failover requested: LOCAL -> STANDBY_TO_ACTIVE must set failoverPending. + writeLocalRecord(haAdmin, HAGroupStoreRecord.HAGroupState.STANDBY_TO_ACTIVE); + awaitCondition(discovery::getFailoverPending, + "triggerFailoverListner should set failoverPending"); + + // abort: LOCAL -> ABORT_TO_STANDBY must clear failoverPending. + writeLocalRecord(haAdmin, HAGroupStoreRecord.HAGroupState.ABORT_TO_STANDBY); + awaitCondition(() -> !discovery.getFailoverPending(), + "abortFailoverListner should clear failoverPending"); + + // recover: LOCAL -> STANDBY must drive the recoveryListener to SYNCED_RECOVERY. + writeLocalRecord(haAdmin, HAGroupStoreRecord.HAGroupState.STANDBY); + awaitCondition( + () -> discovery.getReplicationReplayState() + == ReplicationLogDiscoveryReplay.ReplicationReplayState.SYNCED_RECOVERY, + "recoveryListener should drive replay state to SYNCED_RECOVERY"); + } finally { + fileTracker.close(); + haAdmin.getCurator().delete().quietly().forPath(toPath(haGroupName)); + } + } + + /** Write (create, or version-checked update) the LOCAL HA record for this group on ZooKeeper. */ + private void writeLocalRecord(PhoenixHAAdmin haAdmin, HAGroupStoreRecord.HAGroupState state) + throws Exception { + HAGroupStoreRecord record = + new HAGroupStoreRecord(HAGroupStoreRecord.DEFAULT_PROTOCOL_VERSION, haGroupName, state, 0L, + HighAvailabilityPolicy.FAILOVER.toString(), peerZkUrl, CLUSTERS.getMasterAddress1(), + CLUSTERS.getMasterAddress2(), CLUSTERS.getHdfsUrl1(), CLUSTERS.getHdfsUrl2(), 0L); + String path = toPath(haGroupName); + if (haAdmin.getCurator().checkExists().forPath(path) == null) { + haAdmin.createHAGroupStoreRecordInZooKeeper(record); + } else { + Pair current = + haAdmin.getHAGroupStoreRecordInZooKeeper(haGroupName); + haAdmin.updateHAGroupStoreRecordInZooKeeper(haGroupName, record, + current.getRight().getVersion()); + } + } + + /** Poll until the condition holds or a fixed timeout elapses, then assert it. */ + private static void awaitCondition(java.util.function.BooleanSupplier condition, String message) + throws InterruptedException { + long deadline = System.currentTimeMillis() + 30000L; + while (!condition.getAsBoolean() && System.currentTimeMillis() < deadline) { + Thread.sleep(250L); + } + assertTrue(message, condition.getAsBoolean()); + } + /** * Tests state transition from SYNC to DEGRADED and back through SYNCED_RECOVERY to SYNC. * Validates lastRoundInSync preservation during DEGRADED, rewind in SYNCED_RECOVERY, and update @@ -2184,6 +2356,80 @@ public void testTriggerFailover() throws IOException, SQLException { } } + /** + * End-to-end peer-visibility wiring: when this STANDBY cluster cannot see its peer, the effective + * record presented to the replayer is DEGRADED_STANDBY, so a replayer initializing during the + * outage starts failed closed (DEGRADED) even though the persisted record is still STANDBY. + * Exercises the real {@link HAGroupStoreManager} path (no injected record). + */ + @Test + public void testInitializeLastRoundProcessed_PeerBlindStartsDegraded() throws Exception { + final String haGroupName = "testPeerBlindStartsDegradedHAGroup"; + // Local cluster STANDBY; peer is cluster 2. + HAGroupStoreTestUtil.upsertHAGroupRecordInSystemTable(haGroupName, zkUrl, peerZkUrl, + CLUSTERS.getMasterAddress1(), CLUSTERS.getMasterAddress2(), + ClusterRoleRecord.ClusterRole.STANDBY, ClusterRoleRecord.ClusterRole.ACTIVE, null, + CLUSTERS.getHdfsUrl1(), CLUSTERS.getHdfsUrl2()); + + int peerZkPort = Integer.parseInt( + CLUSTERS.getHBaseCluster2().getConfiguration().get("hbase.zookeeper.property.clientPort")); + HAGroupStoreManager manager = HAGroupStoreManager.getInstance(conf1); + TestableReplicationLogTracker fileTracker = null; + try { + // Materialize the client + peer connection; effective state is the real STANDBY. + awaitEffectiveState(manager, haGroupName, HAGroupStoreRecord.HAGroupState.STANDBY); + + // Peer ZK goes away; the client reports peer-blind and presents DEGRADED_STANDBY. + CLUSTERS.getHBaseCluster2().shutdownMiniZKCluster(); + awaitEffectiveState(manager, haGroupName, HAGroupStoreRecord.HAGroupState.DEGRADED_STANDBY); + + // A replayer initializing now reads the effective record and starts DEGRADED. + fileTracker = createReplicationLogTracker(conf1, haGroupName, rootFs, rootUri); + fileTracker.init(); + TestableReplicationLogDiscoveryReplay discovery = + new TestableReplicationLogDiscoveryReplay(fileTracker, null); + discovery.initializeLastRoundProcessed(); + assertEquals("Replayer must start DEGRADED while the peer is not visible", + ReplicationLogDiscoveryReplay.ReplicationReplayState.DEGRADED, + discovery.getReplicationReplayState()); + } finally { + // Restore peer ZK so later tests start from a healthy peer. + try { + CLUSTERS.getHBaseCluster2().startMiniZKCluster(1, peerZkPort); + } catch (Exception ignore) { + LOG.warn("Failed to restart peer ZK after test"); + } + if (fileTracker != null) { + fileTracker.close(); + } + try { + HAGroupStoreTestUtil.deleteHAGroupRecordInSystemTable(haGroupName, zkUrl); + } catch (Exception e) { + LOG.warn("Failed to clean up HA group store record", e); + } + } + } + + /** + * Poll the effective HA state from the manager until it reaches {@code expected} or times out. + */ + private void awaitEffectiveState(HAGroupStoreManager manager, String haGroupName, + HAGroupStoreRecord.HAGroupState expected) throws Exception { + long deadline = System.currentTimeMillis() + 60000L; + HAGroupStoreRecord.HAGroupState actual = null; + while (System.currentTimeMillis() < deadline) { + Optional record = manager.getEffectiveHAGroupStoreRecord(haGroupName); + if (record.isPresent()) { + actual = record.get().getHAGroupState(); + if (actual == expected) { + return; + } + } + Thread.sleep(500L); + } + assertEquals("Effective HA state did not reach expected within timeout", expected, actual); + } + /** * Tests getConsistencyPoint method in SYNC state with in-progress files present. Should return * the minimum timestamp from in-progress files. @@ -2556,8 +2802,10 @@ public TestableReplicationLogDiscoveryReplay( } @Override - protected HAGroupStoreRecord getHAGroupRecord() { - return haGroupStoreRecord; + protected HAGroupStoreRecord getHAGroupRecord() throws IOException { + // A null injected record means "use the real effective record from HAGroupStoreManager", so + // tests can exercise the peer-visibility wiring end to end. + return haGroupStoreRecord != null ? haGroupStoreRecord : super.getHAGroupRecord(); } @Override diff --git a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/HAGroupStoreRecordTest.java b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/HAGroupStoreRecordTest.java index 7a466986709..3fb77d4bd4d 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/HAGroupStoreRecordTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/HAGroupStoreRecordTest.java @@ -161,6 +161,30 @@ public void testGetters() { assertEquals(haGroupState.getClusterRole(), record.getClusterRole()); } + @Test + public void testWithHAGroupState() { + HAGroupStoreRecord original = new HAGroupStoreRecord(PROTOCOL_VERSION, testName.getMethodName(), + HAGroupStoreRecord.HAGroupState.STANDBY, 12345L, HighAvailabilityPolicy.FAILOVER.toString(), + "peerZKUrl", "clusterUrl", "peerClusterUrl", TEST_HDFS_URL, TEST_PEER_HDFS_URL, 7L); + + HAGroupStoreRecord overlaid = + original.withHAGroupState(HAGroupStoreRecord.HAGroupState.DEGRADED_STANDBY); + + // Only the state changes; every other field is preserved and the original is untouched. + assertEquals(HAGroupStoreRecord.HAGroupState.DEGRADED_STANDBY, overlaid.getHAGroupState()); + assertEquals(HAGroupStoreRecord.HAGroupState.STANDBY, original.getHAGroupState()); + assertEquals(original.getProtocolVersion(), overlaid.getProtocolVersion()); + assertEquals(original.getHaGroupName(), overlaid.getHaGroupName()); + assertEquals(original.getLastSyncStateTimeInMs(), overlaid.getLastSyncStateTimeInMs()); + assertEquals(original.getPolicy(), overlaid.getPolicy()); + assertEquals(original.getPeerZKUrl(), overlaid.getPeerZKUrl()); + assertEquals(original.getClusterUrl(), overlaid.getClusterUrl()); + assertEquals(original.getPeerClusterUrl(), overlaid.getPeerClusterUrl()); + assertEquals(original.getHdfsUrl(), overlaid.getHdfsUrl()); + assertEquals(original.getPeerHdfsUrl(), overlaid.getPeerHdfsUrl()); + assertEquals(original.getAdminCRRVersion(), overlaid.getAdminCRRVersion()); + } + @Test public void testEqualsAndHashCode() { String haGroupName = testName.getMethodName(); diff --git a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PeerClusterWatcherTest.java b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PeerClusterWatcherTest.java new file mode 100644 index 00000000000..cc3055db9b9 --- /dev/null +++ b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PeerClusterWatcherTest.java @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.jdbc; + +import static org.apache.phoenix.query.QueryServices.HA_GROUP_STORE_PEER_CACHE_RETRY_INTERVAL_SECONDS; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import org.apache.hadoop.conf.Configuration; +import org.apache.zookeeper.data.Stat; +import org.junit.Test; + +/** + * Unit test for {@link PeerClusterWatcher} concurrency behavior. + */ +public class PeerClusterWatcherTest { + + /** + * A visible/blind transition and its listener notification must be delivered atomically: while + * one transition is mid-notification, a concurrent opposite transition must not deliver its own + * notification, otherwise the two can reorder and leave the replayer's state disagreeing with the + * peer's actual visibility. + */ + @Test(timeout = 30000) + public void testVisibilityTransitionsAreSerializedWithTheirNotifications() throws Exception { + Configuration conf = new Configuration(); + conf.setLong(HA_GROUP_STORE_PEER_CACHE_RETRY_INTERVAL_SECONDS, 0L); + + CountDownLatch visibleEntered = new CountDownLatch(1); + CountDownLatch releaseVisible = new CountDownLatch(1); + CountDownLatch blindNotified = new CountDownLatch(1); + + PeerClusterWatcher.Listener listener = new PeerClusterWatcher.Listener() { + @Override + public void onPeerStateChanged(HAGroupStoreRecord peerRecord, Stat stat) { + } + + @Override + public void onPeerVisible() { + visibleEntered.countDown(); + try { + releaseVisible.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + + @Override + public void onPeerBlind() { + blindNotified.countDown(); + } + }; + + PeerClusterWatcher watcher = new PeerClusterWatcher(conf, "g", "ns", listener); + try { + Thread visible = new Thread(watcher::setVisible, "setVisible"); + visible.start(); + assertTrue("setVisible should enter its notification", + visibleEntered.await(5, TimeUnit.SECONDS)); + + Thread blind = new Thread(watcher::setBlind, "setBlind"); + blind.start(); + assertFalse("setBlind delivered its notification while setVisible was still mid-notification", + blindNotified.await(2, TimeUnit.SECONDS)); + + releaseVisible.countDown(); + assertTrue("setBlind should deliver once setVisible completes", + blindNotified.await(5, TimeUnit.SECONDS)); + visible.join(); + blind.join(); + } finally { + watcher.close(); + } + } + + /** + * A blank peer URL means there is no peer to watch, so reconcile reports the peer as visible + * ("nothing to be blind about") without building any cache. + */ + @Test(timeout = 30000) + public void testBlankPeerUrlIsReportedVisible() throws Exception { + Configuration conf = new Configuration(); + conf.setLong(HA_GROUP_STORE_PEER_CACHE_RETRY_INTERVAL_SECONDS, 0L); + + CountDownLatch visible = new CountDownLatch(1); + PeerClusterWatcher.Listener listener = new PeerClusterWatcher.Listener() { + @Override + public void onPeerStateChanged(HAGroupStoreRecord peerRecord, Stat stat) { + } + + @Override + public void onPeerVisible() { + visible.countDown(); + } + + @Override + public void onPeerBlind() { + } + }; + + PeerClusterWatcher watcher = new PeerClusterWatcher(conf, "g", "ns", listener); + try { + watcher.reconfigure(""); + assertTrue("blank peer URL should be reported visible", visible.await(5, TimeUnit.SECONDS)); + assertFalse("no peer cache should be built for a blank URL", watcher.hasPeerCache()); + } finally { + watcher.close(); + } + } + + /** + * close() must be idempotent and must stop further reconciliation: a second close() is a no-op, + * and a post-close reconfigure neither throws nor builds a cache. + */ + @Test(timeout = 30000) + public void testCloseIsIdempotentAndStopsReconcile() throws Exception { + Configuration conf = new Configuration(); + // Non-zero interval so the watcher schedules a periodic retry that close() must shut down. + conf.setLong(HA_GROUP_STORE_PEER_CACHE_RETRY_INTERVAL_SECONDS, 1L); + + PeerClusterWatcher.Listener listener = new PeerClusterWatcher.Listener() { + @Override + public void onPeerStateChanged(HAGroupStoreRecord peerRecord, Stat stat) { + } + + @Override + public void onPeerVisible() { + } + + @Override + public void onPeerBlind() { + } + }; + + PeerClusterWatcher watcher = new PeerClusterWatcher(conf, "g", "ns", listener); + watcher.close(); + watcher.close(); // second close is a no-op + watcher.reconfigure(""); // ignored after close + assertFalse("closed watcher must not build a peer cache", watcher.hasPeerCache()); + } +}