From 80a356b6817e600c6ebd4ca4290578b0aa92b5c9 Mon Sep 17 00:00:00 2001 From: Himanshu Gwalani Date: Wed, 27 May 2026 19:19:57 +0530 Subject: [PATCH 01/15] PHOENIX-7863 Add replication consistency point guard to compaction On standby clusters, compaction can prematurely drop delete markers newer than the replication consistency point, causing permanent stale data. This adds a guard that floors maxLookbackWindowStart to the minimum consistency point across all HA groups when replication replay is enabled. Enabled via phoenix.replication.compaction.guard.enabled (default true), active only when phoenix.replication.replay.enabled is also true. Falls back to retaining all delete markers if the consistency point is unavailable. --- .../apache/phoenix/query/QueryServices.java | 2 + .../phoenix/query/QueryServicesOptions.java | 2 + .../coprocessor/CompactionScanner.java | 50 ++++ .../reader/ReplicationLogReplayService.java | 15 +- .../CompactionReplicationGuardDisabledIT.java | 151 ++++++++++ .../reader/CompactionReplicationGuardIT.java | 263 ++++++++++++++++++ ...CompactionScannerReplicationGuardTest.java | 99 +++++++ 7 files changed, 580 insertions(+), 2 deletions(-) create mode 100644 phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardDisabledIT.java create mode 100644 phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardIT.java create mode 100644 phoenix-core/src/test/java/org/apache/phoenix/coprocessor/CompactionScannerReplicationGuardTest.java diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java index 72075853aa8..1609d891582 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java @@ -644,6 +644,8 @@ public interface QueryServices extends SQLCloseable { String SYNCHRONOUS_REPLICATION_ENABLED = "phoenix.synchronous.replication.enabled"; + String REPLICATION_COMPACTION_GUARD_ENABLED = "phoenix.replication.compaction.guard.enabled"; + // HA Group Store sync job interval in seconds String HA_GROUP_STORE_SYNC_INTERVAL_SECONDS = "phoenix.ha.group.store.sync.interval.seconds"; diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java index 339b6763a45..997f91baafb 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java @@ -514,6 +514,8 @@ public class QueryServicesOptions { public static final Boolean DEFAULT_SYNCHRONOUS_REPLICATION_ENABLED = false; + public static final boolean DEFAULT_REPLICATION_COMPACTION_GUARD_ENABLED = true; + // Default HA Group Store sync job interval in seconds (15 minutes = 900 seconds) public static final int DEFAULT_HA_GROUP_STORE_SYNC_INTERVAL_SECONDS = 900; diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java index f12dc77f724..6945599e604 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java @@ -81,6 +81,7 @@ import org.apache.phoenix.jdbc.PhoenixPreparedStatement; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.query.QueryServicesOptions; +import org.apache.phoenix.replication.reader.ReplicationLogReplayService; import org.apache.phoenix.schema.CompiledConditionalTTLExpression; import org.apache.phoenix.schema.CompiledTTLExpression; import org.apache.phoenix.schema.ConditionalTTLExpression; @@ -199,6 +200,16 @@ public CompactionScanner(RegionCoprocessorEnvironment env, Store store, this.maxLookbackWindowStart = this.maxLookbackInMillis == 0 ? compactionTime : compactionTime - (this.maxLookbackInMillis + 1); + Configuration conf = env.getConfiguration(); + boolean replayEnabled = + conf.getBoolean(ReplicationLogReplayService.PHOENIX_REPLICATION_REPLAY_ENABLED, + ReplicationLogReplayService.DEFAULT_REPLICATION_REPLAY_ENABLED); + boolean guardEnabled = conf.getBoolean(QueryServices.REPLICATION_COMPACTION_GUARD_ENABLED, + QueryServicesOptions.DEFAULT_REPLICATION_COMPACTION_GUARD_ENABLED); + if (replayEnabled && guardEnabled) { + this.maxLookbackWindowStart = applyReplicationConsistencyGuard(this.maxLookbackWindowStart, + conf, tableName, columnFamilyName); + } ColumnFamilyDescriptor cfd = store.getColumnFamilyDescriptor(); this.major = major && !forceMinorCompaction; this.minVersion = cfd.getMinVersions(); @@ -359,6 +370,45 @@ public static long getMaxLookbackInMillis(String tableName, String columnFamilyN : maxLookbackMap.get(tableName + CompactionScanner.SEPARATOR + columnFamilyName); } + /** + * Applies the replication replay consistency point as a floor on maxLookbackWindowStart. On + * standby clusters, this prevents compaction from dropping delete markers that have timestamps + * newer than the consistency point. + */ + @VisibleForTesting + static long applyReplicationConsistencyGuard(long currentMaxLookbackWindowStart, + Configuration conf, String tableName, String columnFamilyName) { + try { + long consistencyPoint = getConsistencyPointFromReplayService(conf); + return adjustMaxLookbackWindowStart(currentMaxLookbackWindowStart, consistencyPoint, + tableName, columnFamilyName); + } catch (Exception e) { + LOGGER + .warn("Replication guard enabled but consistency point unavailable for table={} store={}." + + " Retaining all delete markers.", tableName, columnFamilyName, e); + return 0L; + } + } + + @VisibleForTesting + static long getConsistencyPointFromReplayService(Configuration conf) throws Exception { + ReplicationLogReplayService service = ReplicationLogReplayService.getInstance(conf); + return service.getConsistencyPoint(); + } + + @VisibleForTesting + static long adjustMaxLookbackWindowStart(long currentMaxLookbackWindowStart, + long consistencyPoint, String tableName, String columnFamilyName) { + long adjusted = Math.min(currentMaxLookbackWindowStart, consistencyPoint); + if (adjusted < currentMaxLookbackWindowStart) { + LOGGER.info( + "Replication guard: table={} store={} maxLookbackWindowStart adjusted from {} to {}" + + " (consistencyPoint={})", + tableName, columnFamilyName, currentMaxLookbackWindowStart, adjusted, consistencyPoint); + } + return adjusted; + } + static class CellTimeComparator implements Comparator { public static final CellTimeComparator COMPARATOR = new CellTimeComparator(); diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java index 24d40faac77..3c3c92de68f 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java @@ -29,6 +29,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; /** @@ -83,7 +84,7 @@ public class ReplicationLogReplayService { private ScheduledExecutorService scheduler; private volatile boolean isRunning = false; - private ReplicationLogReplayService(final Configuration conf) { + protected ReplicationLogReplayService(final Configuration conf) { this.conf = conf; } @@ -105,6 +106,16 @@ public static ReplicationLogReplayService getInstance(Configuration conf) throws return instance; } + @VisibleForTesting + public static void setInstanceForTesting(ReplicationLogReplayService testInstance) { + instance = testInstance; + } + + @VisibleForTesting + public static void resetInstanceForTesting() { + instance = null; + } + /** * Starts the replication log replay service by initializing the scheduler and scheduling periodic * replay operations for each HA Group. @@ -219,7 +230,7 @@ protected void stopReplicationReplay() throws IOException, SQLException { * @throws IOException if there's an error retrieving consistency points from replication groups * @throws SQLException if there's an error accessing HA group information */ - protected long getConsistencyPoint() throws IOException, SQLException { + public long getConsistencyPoint() throws IOException, SQLException { long consistencyPoint = EnvironmentEdgeManager.currentTime(); List replicationGroups = getReplicationGroups(); for (String replicationGroup : replicationGroups) { diff --git a/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardDisabledIT.java b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardDisabledIT.java new file mode 100644 index 00000000000..40adc1126af --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardDisabledIT.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.replication.reader; + +import static org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants.PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY; +import static org.apache.phoenix.util.TestUtil.assertRawRowCount; +import static org.junit.Assert.assertFalse; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.util.Map; +import org.apache.hadoop.hbase.TableName; +import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest; +import org.apache.phoenix.query.BaseTest; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.replication.reader.ReplicationLogReplayService; +import org.apache.phoenix.util.EnvironmentEdgeManager; +import org.apache.phoenix.util.ManualEnvironmentEdge; +import org.apache.phoenix.util.ReadOnlyProps; +import org.apache.phoenix.util.TestUtil; +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import org.apache.phoenix.thirdparty.com.google.common.collect.Maps; + +/** + * Integration test verifying that the replication compaction guard does NOT interfere with normal + * compaction when explicitly disabled via configuration. + */ +@Category(NeedsOwnMiniClusterTest.class) +public class CompactionReplicationGuardDisabledIT extends BaseTest { + + private static final int MAX_LOOKBACK_AGE = 15; + private static final int ROWS_POPULATED = 2; + private ManualEnvironmentEdge injectEdge; + + @BeforeClass + public static synchronized void doSetup() throws Exception { + Map props = Maps.newHashMapWithExpectedSize(4); + props.put(PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY, Integer.toString(MAX_LOOKBACK_AGE)); + props.put(QueryServices.PHOENIX_COMPACTION_ENABLED, Boolean.toString(true)); + props.put(ReplicationLogReplayService.PHOENIX_REPLICATION_REPLAY_ENABLED, + Boolean.toString(true)); + props.put(QueryServices.REPLICATION_COMPACTION_GUARD_ENABLED, Boolean.toString(false)); + props.put("hbase.procedure.remote.dispatcher.delay.msec", "0"); + setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); + } + + @Before + public void beforeTest() throws Exception { + EnvironmentEdgeManager.reset(); + injectEdge = new ManualEnvironmentEdge(); + injectEdge.setValue(System.currentTimeMillis()); + EnvironmentEdgeManager.injectEdge(injectEdge); + } + + @After + public synchronized void afterTest() throws Exception { + ReplicationLogReplayService.resetInstanceForTesting(); + EnvironmentEdgeManager.reset(); + boolean refCountLeaked = isAnyStoreRefCountLeaked(); + assertFalse("refCount leaked", refCountLeaked); + } + + /** + * When guard is disabled, delete markers are purged normally by maxLookback even though the + * consistency point would have protected them if the guard were enabled. + */ + @Test(timeout = 120000L) + public void testGuardDisabledDeleteMarkersPurgedByMaxLookback() throws Exception { + try (Connection conn = DriverManager.getConnection(getUrl())) { + String dataTableName = generateUniqueName(); + createTable(dataTableName); + TableName dataTable = TableName.valueOf(dataTableName); + populateTable(dataTableName); + + injectEdge.incrementValue(1); + long beforeDeleteTime = EnvironmentEdgeManager.currentTimeMillis(); + + // Delete a row + conn.createStatement().execute("DELETE FROM " + dataTableName + " WHERE id = 'a'"); + conn.commit(); + injectEdge.incrementValue(1); + + // Set consistency point BEFORE delete — guard would retain if enabled + long consistencyPoint = beforeDeleteTime - 1; + ReplicationLogReplayService mockService = mock(ReplicationLogReplayService.class); + when(mockService.getConsistencyPoint()).thenReturn(consistencyPoint); + ReplicationLogReplayService.setInstanceForTesting(mockService); + + // Advance past maxLookback + injectEdge.incrementValue(MAX_LOOKBACK_AGE * 1000 + 1000); + + flush(dataTable); + majorCompact(dataTable); + + // Guard disabled — delete marker purged by maxLookback as normal + assertRawRowCount(conn, dataTable, ROWS_POPULATED - 1); + } + } + + private void flush(TableName table) throws IOException { + getUtility().getAdmin().flush(table); + } + + private void majorCompact(TableName table) throws Exception { + TestUtil.majorCompact(getUtility(), table); + } + + private void createTable(String tableName) throws SQLException { + try (Connection conn = DriverManager.getConnection(getUrl())) { + conn.createStatement().execute( + "CREATE TABLE " + tableName + " (id VARCHAR(10) NOT NULL PRIMARY KEY, val1 VARCHAR(10)," + + " val2 VARCHAR(10), val3 VARCHAR(10))"); + conn.commit(); + } + } + + private void populateTable(String tableName) throws SQLException { + try (Connection conn = DriverManager.getConnection(getUrl())) { + conn.createStatement() + .execute("UPSERT INTO " + tableName + " VALUES ('a', 'ab', 'abc', 'abcd')"); + conn.commit(); + conn.createStatement() + .execute("UPSERT INTO " + tableName + " VALUES ('b', 'bc', 'bcd', 'bcde')"); + conn.commit(); + } + } +} diff --git a/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardIT.java b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardIT.java new file mode 100644 index 00000000000..bfcec12b646 --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardIT.java @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.replication.reader; + +import static org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants.PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY; +import static org.apache.phoenix.util.TestUtil.assertRawRowCount; +import static org.junit.Assert.assertFalse; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.util.Map; +import org.apache.hadoop.hbase.TableName; +import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest; +import org.apache.phoenix.query.BaseTest; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.replication.reader.ReplicationLogReplayService; +import org.apache.phoenix.util.EnvironmentEdgeManager; +import org.apache.phoenix.util.ManualEnvironmentEdge; +import org.apache.phoenix.util.ReadOnlyProps; +import org.apache.phoenix.util.TestUtil; +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import org.apache.phoenix.thirdparty.com.google.common.collect.Maps; + +/** + * Integration tests for the replication consistency point compaction guard. Verifies that + * CompactionScanner retains delete markers with timestamps newer than the consistency point on + * clusters where replication replay is enabled. + */ +@Category(NeedsOwnMiniClusterTest.class) +public class CompactionReplicationGuardIT extends BaseTest { + + private static final int MAX_LOOKBACK_AGE = 15; + private static final int ROWS_POPULATED = 2; + private ManualEnvironmentEdge injectEdge; + + @BeforeClass + public static synchronized void doSetup() throws Exception { + Map props = Maps.newHashMapWithExpectedSize(4); + props.put(PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY, Integer.toString(MAX_LOOKBACK_AGE)); + props.put(QueryServices.PHOENIX_COMPACTION_ENABLED, Boolean.toString(true)); + props.put(ReplicationLogReplayService.PHOENIX_REPLICATION_REPLAY_ENABLED, + Boolean.toString(true)); + props.put(QueryServices.REPLICATION_COMPACTION_GUARD_ENABLED, Boolean.toString(true)); + props.put("hbase.procedure.remote.dispatcher.delay.msec", "0"); + setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); + } + + @Before + public void beforeTest() throws Exception { + EnvironmentEdgeManager.reset(); + injectEdge = new ManualEnvironmentEdge(); + injectEdge.setValue(System.currentTimeMillis()); + EnvironmentEdgeManager.injectEdge(injectEdge); + } + + @After + public synchronized void afterTest() throws Exception { + ReplicationLogReplayService.resetInstanceForTesting(); + EnvironmentEdgeManager.reset(); + boolean refCountLeaked = isAnyStoreRefCountLeaked(); + assertFalse("refCount leaked", refCountLeaked); + } + + /** + * Test 1: Guard retains delete markers that maxLookback would have purged. The consistency point + * is set BEFORE the delete timestamp, so the delete marker is newer than the consistency point + * and must be retained even after maxLookback window passes. + */ + @Test(timeout = 120000L) + public void testGuardRetainsDeleteMarkersNewerThanConsistencyPoint() throws Exception { + try (Connection conn = DriverManager.getConnection(getUrl())) { + String dataTableName = generateUniqueName(); + createTable(dataTableName); + TableName dataTable = TableName.valueOf(dataTableName); + populateTable(dataTableName); + + injectEdge.incrementValue(1); + long beforeDeleteTime = EnvironmentEdgeManager.currentTimeMillis(); + + // Delete a row + conn.createStatement().execute("DELETE FROM " + dataTableName + " WHERE id = 'a'"); + conn.commit(); + injectEdge.incrementValue(1); + + // Set consistency point BEFORE the delete — meaning replay hasn't caught up to the delete + long consistencyPoint = beforeDeleteTime - 1; + injectMockConsistencyPoint(consistencyPoint); + + // Advance time past maxLookback window — without guard, marker would be purged + injectEdge.incrementValue(MAX_LOOKBACK_AGE * 1000 + 1000); + + flush(dataTable); + majorCompact(dataTable); + + // Delete marker should be retained because its timestamp > consistencyPoint + assertRawRowCount(conn, dataTable, ROWS_POPULATED); + } + } + + /** + * Test 2: Both maxLookback and guard allow purge. The consistency point has advanced past the + * delete marker AND maxLookback window has passed — marker should be purged. + */ + @Test(timeout = 120000L) + public void testDeleteMarkersPurgedWhenOlderThanBothConsistencyPointAndMaxLookback() + throws Exception { + try (Connection conn = DriverManager.getConnection(getUrl())) { + String dataTableName = generateUniqueName(); + createTable(dataTableName); + TableName dataTable = TableName.valueOf(dataTableName); + populateTable(dataTableName); + + injectEdge.incrementValue(1); + + // Delete a row + conn.createStatement().execute("DELETE FROM " + dataTableName + " WHERE id = 'a'"); + conn.commit(); + injectEdge.incrementValue(1); + + // Advance time past maxLookback + injectEdge.incrementValue(MAX_LOOKBACK_AGE * 1000 + 1000); + + // Set consistency point to current time — replay is fully caught up + long consistencyPoint = EnvironmentEdgeManager.currentTimeMillis(); + injectMockConsistencyPoint(consistencyPoint); + + flush(dataTable); + majorCompact(dataTable); + + // Delete marker should be purged — both guard and maxLookback agree + assertRawRowCount(conn, dataTable, ROWS_POPULATED - 1); + } + } + + /** + * Test 3: MaxLookback retains even when guard wouldn't. Consistency point has advanced past the + * delete, but we're still within the maxLookback window — marker retained by maxLookback. + */ + @Test(timeout = 120000L) + public void testMaxLookbackRetainsEvenWhenGuardAllowsPurge() throws Exception { + try (Connection conn = DriverManager.getConnection(getUrl())) { + String dataTableName = generateUniqueName(); + createTable(dataTableName); + TableName dataTable = TableName.valueOf(dataTableName); + populateTable(dataTableName); + + injectEdge.incrementValue(1); + + // Delete a row + conn.createStatement().execute("DELETE FROM " + dataTableName + " WHERE id = 'a'"); + conn.commit(); + injectEdge.incrementValue(1); + + // Set consistency point to current time — guard would allow purge + long consistencyPoint = EnvironmentEdgeManager.currentTimeMillis(); + injectMockConsistencyPoint(consistencyPoint); + + // Do NOT advance past maxLookback — still within the window + injectEdge.incrementValue(1); + + flush(dataTable); + majorCompact(dataTable); + + // Delete marker retained because still within maxLookback window + assertRawRowCount(conn, dataTable, ROWS_POPULATED); + } + } + + /** + * Test 4: Guard fallback when consistency point unavailable — retains all delete markers. When + * the replay service throws an exception (e.g., not initialized), the guard falls back to + * retaining all markers to avoid data loss. + */ + @Test(timeout = 120000L) + public void testGuardFallbackRetainsAllWhenConsistencyPointUnavailable() throws Exception { + try (Connection conn = DriverManager.getConnection(getUrl())) { + String dataTableName = generateUniqueName(); + createTable(dataTableName); + TableName dataTable = TableName.valueOf(dataTableName); + populateTable(dataTableName); + + injectEdge.incrementValue(1); + + // Delete a row + conn.createStatement().execute("DELETE FROM " + dataTableName + " WHERE id = 'a'"); + conn.commit(); + injectEdge.incrementValue(1); + + // Inject a mock that throws — simulating uninitialized replay service + ReplicationLogReplayService mockService = mock(ReplicationLogReplayService.class); + when(mockService.getConsistencyPoint()) + .thenThrow(new IOException("HA groups not initialized")); + ReplicationLogReplayService.setInstanceForTesting(mockService); + + // Advance past maxLookback + injectEdge.incrementValue(MAX_LOOKBACK_AGE * 1000 + 1000); + + flush(dataTable); + majorCompact(dataTable); + + // Fallback retains all — delete marker NOT purged despite maxLookback passing + assertRawRowCount(conn, dataTable, ROWS_POPULATED); + } + } + + private void injectMockConsistencyPoint(long consistencyPoint) throws IOException, SQLException { + ReplicationLogReplayService mockService = mock(ReplicationLogReplayService.class); + when(mockService.getConsistencyPoint()).thenReturn(consistencyPoint); + ReplicationLogReplayService.setInstanceForTesting(mockService); + } + + private void flush(TableName table) throws IOException { + getUtility().getAdmin().flush(table); + } + + private void majorCompact(TableName table) throws Exception { + TestUtil.majorCompact(getUtility(), table); + } + + private void createTable(String tableName) throws SQLException { + try (Connection conn = DriverManager.getConnection(getUrl())) { + conn.createStatement().execute( + "CREATE TABLE " + tableName + " (id VARCHAR(10) NOT NULL PRIMARY KEY, val1 VARCHAR(10)," + + " val2 VARCHAR(10), val3 VARCHAR(10))"); + conn.commit(); + } + } + + private void populateTable(String tableName) throws SQLException { + try (Connection conn = DriverManager.getConnection(getUrl())) { + conn.createStatement() + .execute("UPSERT INTO " + tableName + " VALUES ('a', 'ab', 'abc', 'abcd')"); + conn.commit(); + conn.createStatement() + .execute("UPSERT INTO " + tableName + " VALUES ('b', 'bc', 'bcd', 'bcde')"); + conn.commit(); + } + } +} diff --git a/phoenix-core/src/test/java/org/apache/phoenix/coprocessor/CompactionScannerReplicationGuardTest.java b/phoenix-core/src/test/java/org/apache/phoenix/coprocessor/CompactionScannerReplicationGuardTest.java new file mode 100644 index 00000000000..6fbc43bea2a --- /dev/null +++ b/phoenix-core/src/test/java/org/apache/phoenix/coprocessor/CompactionScannerReplicationGuardTest.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.coprocessor; + +import static org.junit.Assert.assertEquals; + +import org.junit.Test; + +/** + * Tests for the replication consistency point guard in CompactionScanner. Tests the pure adjustment + * logic (adjustMaxLookbackWindowStart) which is the core of the guard — ensuring + * maxLookbackWindowStart is floored to the consistency point. + */ +public class CompactionScannerReplicationGuardTest { + + private static final String TABLE_NAME = "TEST_TABLE"; + private static final String CF_NAME = "0"; + + @Test + public void testAdjustsWindowWhenConsistencyPointIsLower() { + long maxLookbackWindowStart = 1000000L; + long consistencyPoint = 500000L; + + long result = CompactionScanner.adjustMaxLookbackWindowStart(maxLookbackWindowStart, + consistencyPoint, TABLE_NAME, CF_NAME); + + assertEquals(consistencyPoint, result); + } + + @Test + public void testNoChangeWhenConsistencyPointIsHigher() { + long maxLookbackWindowStart = 500000L; + long consistencyPoint = 1000000L; + + long result = CompactionScanner.adjustMaxLookbackWindowStart(maxLookbackWindowStart, + consistencyPoint, TABLE_NAME, CF_NAME); + + assertEquals(maxLookbackWindowStart, result); + } + + @Test + public void testNoChangeWhenConsistencyPointEqualsWindowStart() { + long maxLookbackWindowStart = 500000L; + long consistencyPoint = 500000L; + + long result = CompactionScanner.adjustMaxLookbackWindowStart(maxLookbackWindowStart, + consistencyPoint, TABLE_NAME, CF_NAME); + + assertEquals(maxLookbackWindowStart, result); + } + + @Test + public void testConsistencyPointAtZeroRetainsAll() { + long maxLookbackWindowStart = 1000000L; + long consistencyPoint = 0L; + + long result = CompactionScanner.adjustMaxLookbackWindowStart(maxLookbackWindowStart, + consistencyPoint, TABLE_NAME, CF_NAME); + + assertEquals(0L, result); + } + + @Test + public void testLargeTimestampsNoAdjustmentNeeded() { + long maxLookbackWindowStart = System.currentTimeMillis() - 86400000L; + long consistencyPoint = System.currentTimeMillis() - 120000L; + + long result = CompactionScanner.adjustMaxLookbackWindowStart(maxLookbackWindowStart, + consistencyPoint, TABLE_NAME, CF_NAME); + + assertEquals(maxLookbackWindowStart, result); + } + + @Test + public void testConsistencyPointFarInPastPushesWindowBack() { + long maxLookbackWindowStart = System.currentTimeMillis() - 86400000L; + long consistencyPoint = System.currentTimeMillis() - 604800000L; + + long result = CompactionScanner.adjustMaxLookbackWindowStart(maxLookbackWindowStart, + consistencyPoint, TABLE_NAME, CF_NAME); + + assertEquals(consistencyPoint, result); + } +} From 601debd047e3cc1dda28ded4df22ec2c0b6b18f5 Mon Sep 17 00:00:00 2001 From: Himanshu Gwalani Date: Wed, 27 May 2026 20:07:41 +0530 Subject: [PATCH 02/15] PHOENIX-7863 Move compaction guard logic to ReplicationLogReplayService Move applyReplicationConsistencyGuard and adjustMaxLookbackWindowStart from CompactionScanner to ReplicationLogReplayService where the consistency point logic belongs. Relocate unit test to replication.reader package accordingly. --- .../coprocessor/CompactionScanner.java | 43 ++----------------- .../reader/ReplicationLogReplayService.java | 32 ++++++++++++++ .../ReplicationCompactionGuardTest.java} | 32 +++++++------- 3 files changed, 51 insertions(+), 56 deletions(-) rename phoenix-core/src/test/java/org/apache/phoenix/{coprocessor/CompactionScannerReplicationGuardTest.java => replication/reader/ReplicationCompactionGuardTest.java} (67%) diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java index 6945599e604..7f41dad9e7f 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java @@ -207,8 +207,9 @@ public CompactionScanner(RegionCoprocessorEnvironment env, Store store, boolean guardEnabled = conf.getBoolean(QueryServices.REPLICATION_COMPACTION_GUARD_ENABLED, QueryServicesOptions.DEFAULT_REPLICATION_COMPACTION_GUARD_ENABLED); if (replayEnabled && guardEnabled) { - this.maxLookbackWindowStart = applyReplicationConsistencyGuard(this.maxLookbackWindowStart, - conf, tableName, columnFamilyName); + this.maxLookbackWindowStart = + ReplicationLogReplayService.applyReplicationConsistencyGuard(this.maxLookbackWindowStart, + conf, tableName, columnFamilyName); } ColumnFamilyDescriptor cfd = store.getColumnFamilyDescriptor(); this.major = major && !forceMinorCompaction; @@ -370,44 +371,6 @@ public static long getMaxLookbackInMillis(String tableName, String columnFamilyN : maxLookbackMap.get(tableName + CompactionScanner.SEPARATOR + columnFamilyName); } - /** - * Applies the replication replay consistency point as a floor on maxLookbackWindowStart. On - * standby clusters, this prevents compaction from dropping delete markers that have timestamps - * newer than the consistency point. - */ - @VisibleForTesting - static long applyReplicationConsistencyGuard(long currentMaxLookbackWindowStart, - Configuration conf, String tableName, String columnFamilyName) { - try { - long consistencyPoint = getConsistencyPointFromReplayService(conf); - return adjustMaxLookbackWindowStart(currentMaxLookbackWindowStart, consistencyPoint, - tableName, columnFamilyName); - } catch (Exception e) { - LOGGER - .warn("Replication guard enabled but consistency point unavailable for table={} store={}." - + " Retaining all delete markers.", tableName, columnFamilyName, e); - return 0L; - } - } - - @VisibleForTesting - static long getConsistencyPointFromReplayService(Configuration conf) throws Exception { - ReplicationLogReplayService service = ReplicationLogReplayService.getInstance(conf); - return service.getConsistencyPoint(); - } - - @VisibleForTesting - static long adjustMaxLookbackWindowStart(long currentMaxLookbackWindowStart, - long consistencyPoint, String tableName, String columnFamilyName) { - long adjusted = Math.min(currentMaxLookbackWindowStart, consistencyPoint); - if (adjusted < currentMaxLookbackWindowStart) { - LOGGER.info( - "Replication guard: table={} store={} maxLookbackWindowStart adjusted from {} to {}" - + " (consistencyPoint={})", - tableName, columnFamilyName, currentMaxLookbackWindowStart, adjusted, consistencyPoint); - } - return adjusted; - } static class CellTimeComparator implements Comparator { public static final CellTimeComparator COMPARATOR = new CellTimeComparator(); diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java index 3c3c92de68f..37e84579dba 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java @@ -240,6 +240,38 @@ public long getConsistencyPoint() throws IOException, SQLException { return consistencyPoint; } + /** + * Applies the replication replay consistency point as a floor on maxLookbackWindowStart. On + * standby clusters, this prevents compaction from dropping delete markers that have timestamps + * newer than the consistency point. + */ + @VisibleForTesting + public static long applyReplicationConsistencyGuard(long currentMaxLookbackWindowStart, + Configuration conf, String tableName, String columnFamilyName) { + try { + long consistencyPoint = getInstance(conf).getConsistencyPoint(); + return adjustMaxLookbackWindowStart(currentMaxLookbackWindowStart, consistencyPoint, + tableName, columnFamilyName); + } catch (Exception e) { + LOG.warn("Replication guard enabled but consistency point unavailable for table={} store={}." + + " Retaining all delete markers.", tableName, columnFamilyName, e); + return 0L; + } + } + + @VisibleForTesting + public static long adjustMaxLookbackWindowStart(long currentMaxLookbackWindowStart, + long consistencyPoint, String tableName, String columnFamilyName) { + long adjusted = Math.min(currentMaxLookbackWindowStart, consistencyPoint); + if (adjusted < currentMaxLookbackWindowStart) { + LOG.info( + "Replication guard: table={} store={} maxLookbackWindowStart adjusted from {} to {}" + + " (consistencyPoint={})", + tableName, columnFamilyName, currentMaxLookbackWindowStart, adjusted, consistencyPoint); + } + return adjusted; + } + /** Returns the list of HA groups on the cluster */ protected List getReplicationGroups() throws SQLException { return HAGroupStoreManager.getInstance(conf).getHAGroupNames(); diff --git a/phoenix-core/src/test/java/org/apache/phoenix/coprocessor/CompactionScannerReplicationGuardTest.java b/phoenix-core/src/test/java/org/apache/phoenix/replication/reader/ReplicationCompactionGuardTest.java similarity index 67% rename from phoenix-core/src/test/java/org/apache/phoenix/coprocessor/CompactionScannerReplicationGuardTest.java rename to phoenix-core/src/test/java/org/apache/phoenix/replication/reader/ReplicationCompactionGuardTest.java index 6fbc43bea2a..3248106134a 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/coprocessor/CompactionScannerReplicationGuardTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/replication/reader/ReplicationCompactionGuardTest.java @@ -15,18 +15,18 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.phoenix.coprocessor; +package org.apache.phoenix.replication.reader; import static org.junit.Assert.assertEquals; import org.junit.Test; /** - * Tests for the replication consistency point guard in CompactionScanner. Tests the pure adjustment - * logic (adjustMaxLookbackWindowStart) which is the core of the guard — ensuring + * Tests for the replication consistency point guard in ReplicationLogReplayService. Tests the pure + * adjustment logic (adjustMaxLookbackWindowStart) which is the core of the guard — ensuring * maxLookbackWindowStart is floored to the consistency point. */ -public class CompactionScannerReplicationGuardTest { +public class ReplicationCompactionGuardTest { private static final String TABLE_NAME = "TEST_TABLE"; private static final String CF_NAME = "0"; @@ -36,8 +36,8 @@ public void testAdjustsWindowWhenConsistencyPointIsLower() { long maxLookbackWindowStart = 1000000L; long consistencyPoint = 500000L; - long result = CompactionScanner.adjustMaxLookbackWindowStart(maxLookbackWindowStart, - consistencyPoint, TABLE_NAME, CF_NAME); + long result = ReplicationLogReplayService.adjustMaxLookbackWindowStart( + maxLookbackWindowStart, consistencyPoint, TABLE_NAME, CF_NAME); assertEquals(consistencyPoint, result); } @@ -47,8 +47,8 @@ public void testNoChangeWhenConsistencyPointIsHigher() { long maxLookbackWindowStart = 500000L; long consistencyPoint = 1000000L; - long result = CompactionScanner.adjustMaxLookbackWindowStart(maxLookbackWindowStart, - consistencyPoint, TABLE_NAME, CF_NAME); + long result = ReplicationLogReplayService.adjustMaxLookbackWindowStart( + maxLookbackWindowStart, consistencyPoint, TABLE_NAME, CF_NAME); assertEquals(maxLookbackWindowStart, result); } @@ -58,8 +58,8 @@ public void testNoChangeWhenConsistencyPointEqualsWindowStart() { long maxLookbackWindowStart = 500000L; long consistencyPoint = 500000L; - long result = CompactionScanner.adjustMaxLookbackWindowStart(maxLookbackWindowStart, - consistencyPoint, TABLE_NAME, CF_NAME); + long result = ReplicationLogReplayService.adjustMaxLookbackWindowStart( + maxLookbackWindowStart, consistencyPoint, TABLE_NAME, CF_NAME); assertEquals(maxLookbackWindowStart, result); } @@ -69,8 +69,8 @@ public void testConsistencyPointAtZeroRetainsAll() { long maxLookbackWindowStart = 1000000L; long consistencyPoint = 0L; - long result = CompactionScanner.adjustMaxLookbackWindowStart(maxLookbackWindowStart, - consistencyPoint, TABLE_NAME, CF_NAME); + long result = ReplicationLogReplayService.adjustMaxLookbackWindowStart( + maxLookbackWindowStart, consistencyPoint, TABLE_NAME, CF_NAME); assertEquals(0L, result); } @@ -80,8 +80,8 @@ public void testLargeTimestampsNoAdjustmentNeeded() { long maxLookbackWindowStart = System.currentTimeMillis() - 86400000L; long consistencyPoint = System.currentTimeMillis() - 120000L; - long result = CompactionScanner.adjustMaxLookbackWindowStart(maxLookbackWindowStart, - consistencyPoint, TABLE_NAME, CF_NAME); + long result = ReplicationLogReplayService.adjustMaxLookbackWindowStart( + maxLookbackWindowStart, consistencyPoint, TABLE_NAME, CF_NAME); assertEquals(maxLookbackWindowStart, result); } @@ -91,8 +91,8 @@ public void testConsistencyPointFarInPastPushesWindowBack() { long maxLookbackWindowStart = System.currentTimeMillis() - 86400000L; long consistencyPoint = System.currentTimeMillis() - 604800000L; - long result = CompactionScanner.adjustMaxLookbackWindowStart(maxLookbackWindowStart, - consistencyPoint, TABLE_NAME, CF_NAME); + long result = ReplicationLogReplayService.adjustMaxLookbackWindowStart( + maxLookbackWindowStart, consistencyPoint, TABLE_NAME, CF_NAME); assertEquals(consistencyPoint, result); } From 2b542781c6466ca2ad19410ce15b037f21e0e74a Mon Sep 17 00:00:00 2001 From: Himanshu Gwalani Date: Wed, 27 May 2026 20:12:52 +0530 Subject: [PATCH 03/15] PHOENIX-7863 Revert getConsistencyPoint visibility to protected Now that the guard logic lives in the same class, public access is no longer needed. Tests are in the same package and can still access it. --- .../phoenix/replication/reader/ReplicationLogReplayService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java index 37e84579dba..e5a53a74c64 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java @@ -230,7 +230,7 @@ protected void stopReplicationReplay() throws IOException, SQLException { * @throws IOException if there's an error retrieving consistency points from replication groups * @throws SQLException if there's an error accessing HA group information */ - public long getConsistencyPoint() throws IOException, SQLException { + protected long getConsistencyPoint() throws IOException, SQLException { long consistencyPoint = EnvironmentEdgeManager.currentTime(); List replicationGroups = getReplicationGroups(); for (String replicationGroup : replicationGroups) { From 6404215f612bca179fe59e43d60a89fb597949e7 Mon Sep 17 00:00:00 2001 From: Himanshu Gwalani Date: Wed, 27 May 2026 20:23:31 +0530 Subject: [PATCH 04/15] PHOENIX-7863 Move guard config constants to ReplicationLogReplayService Co-locate REPLICATION_COMPACTION_GUARD_ENABLED and its default with the other replication config constants in ReplicationLogReplayService, removing them from QueryServices/QueryServicesOptions. --- .../java/org/apache/phoenix/query/QueryServices.java | 2 -- .../apache/phoenix/query/QueryServicesOptions.java | 2 -- .../apache/phoenix/coprocessor/CompactionScanner.java | 5 +++-- .../reader/ReplicationLogReplayService.java | 11 +++++++++++ .../reader/CompactionReplicationGuardDisabledIT.java | 3 ++- .../reader/CompactionReplicationGuardIT.java | 3 ++- 6 files changed, 18 insertions(+), 8 deletions(-) diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java index 1609d891582..72075853aa8 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java @@ -644,8 +644,6 @@ public interface QueryServices extends SQLCloseable { String SYNCHRONOUS_REPLICATION_ENABLED = "phoenix.synchronous.replication.enabled"; - String REPLICATION_COMPACTION_GUARD_ENABLED = "phoenix.replication.compaction.guard.enabled"; - // HA Group Store sync job interval in seconds String HA_GROUP_STORE_SYNC_INTERVAL_SECONDS = "phoenix.ha.group.store.sync.interval.seconds"; diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java index 997f91baafb..339b6763a45 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java @@ -514,8 +514,6 @@ public class QueryServicesOptions { public static final Boolean DEFAULT_SYNCHRONOUS_REPLICATION_ENABLED = false; - public static final boolean DEFAULT_REPLICATION_COMPACTION_GUARD_ENABLED = true; - // Default HA Group Store sync job interval in seconds (15 minutes = 900 seconds) public static final int DEFAULT_HA_GROUP_STORE_SYNC_INTERVAL_SECONDS = 900; diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java index 7f41dad9e7f..7a954d06e1d 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java @@ -204,8 +204,9 @@ public CompactionScanner(RegionCoprocessorEnvironment env, Store store, boolean replayEnabled = conf.getBoolean(ReplicationLogReplayService.PHOENIX_REPLICATION_REPLAY_ENABLED, ReplicationLogReplayService.DEFAULT_REPLICATION_REPLAY_ENABLED); - boolean guardEnabled = conf.getBoolean(QueryServices.REPLICATION_COMPACTION_GUARD_ENABLED, - QueryServicesOptions.DEFAULT_REPLICATION_COMPACTION_GUARD_ENABLED); + boolean guardEnabled = conf.getBoolean( + ReplicationLogReplayService.REPLICATION_COMPACTION_GUARD_ENABLED, + ReplicationLogReplayService.DEFAULT_REPLICATION_COMPACTION_GUARD_ENABLED); if (replayEnabled && guardEnabled) { this.maxLookbackWindowStart = ReplicationLogReplayService.applyReplicationConsistencyGuard(this.maxLookbackWindowStart, diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java index e5a53a74c64..33cd922bcb8 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java @@ -51,6 +51,17 @@ public class ReplicationLogReplayService { */ public static final boolean DEFAULT_REPLICATION_REPLAY_ENABLED = false; + /** + * Configuration key for enabling/disabling the replication compaction guard + */ + public static final String REPLICATION_COMPACTION_GUARD_ENABLED = + "phoenix.replication.compaction.guard.enabled"; + + /** + * Default value for replication compaction guard enabled flag + */ + public static final boolean DEFAULT_REPLICATION_COMPACTION_GUARD_ENABLED = true; + /** * Number of threads in the executor pool for the replication replay service */ diff --git a/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardDisabledIT.java b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardDisabledIT.java index 40adc1126af..3755ab73c27 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardDisabledIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardDisabledIT.java @@ -63,7 +63,8 @@ public static synchronized void doSetup() throws Exception { props.put(QueryServices.PHOENIX_COMPACTION_ENABLED, Boolean.toString(true)); props.put(ReplicationLogReplayService.PHOENIX_REPLICATION_REPLAY_ENABLED, Boolean.toString(true)); - props.put(QueryServices.REPLICATION_COMPACTION_GUARD_ENABLED, Boolean.toString(false)); + props.put(ReplicationLogReplayService.REPLICATION_COMPACTION_GUARD_ENABLED, + Boolean.toString(false)); props.put("hbase.procedure.remote.dispatcher.delay.msec", "0"); setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); } diff --git a/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardIT.java b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardIT.java index bfcec12b646..7b53db3c1f8 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardIT.java @@ -64,7 +64,8 @@ public static synchronized void doSetup() throws Exception { props.put(QueryServices.PHOENIX_COMPACTION_ENABLED, Boolean.toString(true)); props.put(ReplicationLogReplayService.PHOENIX_REPLICATION_REPLAY_ENABLED, Boolean.toString(true)); - props.put(QueryServices.REPLICATION_COMPACTION_GUARD_ENABLED, Boolean.toString(true)); + props.put(ReplicationLogReplayService.REPLICATION_COMPACTION_GUARD_ENABLED, + Boolean.toString(true)); props.put("hbase.procedure.remote.dispatcher.delay.msec", "0"); setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); } From 28108c9cfb555124da833ba676b8912c34da03f3 Mon Sep 17 00:00:00 2001 From: Himanshu Gwalani Date: Wed, 27 May 2026 20:53:53 +0530 Subject: [PATCH 05/15] PHOENIX-7863 Apply spotless formatting --- .../coprocessor/CompactionScanner.java | 12 ++++------ .../CompactionReplicationGuardDisabledIT.java | 1 - .../reader/CompactionReplicationGuardIT.java | 1 - .../ReplicationCompactionGuardTest.java | 24 +++++++++---------- 4 files changed, 17 insertions(+), 21 deletions(-) diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java index 7a954d06e1d..33fb6bf0cc8 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java @@ -204,13 +204,12 @@ public CompactionScanner(RegionCoprocessorEnvironment env, Store store, boolean replayEnabled = conf.getBoolean(ReplicationLogReplayService.PHOENIX_REPLICATION_REPLAY_ENABLED, ReplicationLogReplayService.DEFAULT_REPLICATION_REPLAY_ENABLED); - boolean guardEnabled = conf.getBoolean( - ReplicationLogReplayService.REPLICATION_COMPACTION_GUARD_ENABLED, - ReplicationLogReplayService.DEFAULT_REPLICATION_COMPACTION_GUARD_ENABLED); + boolean guardEnabled = + conf.getBoolean(ReplicationLogReplayService.REPLICATION_COMPACTION_GUARD_ENABLED, + ReplicationLogReplayService.DEFAULT_REPLICATION_COMPACTION_GUARD_ENABLED); if (replayEnabled && guardEnabled) { - this.maxLookbackWindowStart = - ReplicationLogReplayService.applyReplicationConsistencyGuard(this.maxLookbackWindowStart, - conf, tableName, columnFamilyName); + this.maxLookbackWindowStart = ReplicationLogReplayService.applyReplicationConsistencyGuard( + this.maxLookbackWindowStart, conf, tableName, columnFamilyName); } ColumnFamilyDescriptor cfd = store.getColumnFamilyDescriptor(); this.major = major && !forceMinorCompaction; @@ -372,7 +371,6 @@ public static long getMaxLookbackInMillis(String tableName, String columnFamilyN : maxLookbackMap.get(tableName + CompactionScanner.SEPARATOR + columnFamilyName); } - static class CellTimeComparator implements Comparator { public static final CellTimeComparator COMPARATOR = new CellTimeComparator(); diff --git a/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardDisabledIT.java b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardDisabledIT.java index 3755ab73c27..8ed243e1690 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardDisabledIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardDisabledIT.java @@ -32,7 +32,6 @@ import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest; import org.apache.phoenix.query.BaseTest; import org.apache.phoenix.query.QueryServices; -import org.apache.phoenix.replication.reader.ReplicationLogReplayService; import org.apache.phoenix.util.EnvironmentEdgeManager; import org.apache.phoenix.util.ManualEnvironmentEdge; import org.apache.phoenix.util.ReadOnlyProps; diff --git a/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardIT.java b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardIT.java index 7b53db3c1f8..1ad78f0ab56 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardIT.java @@ -32,7 +32,6 @@ import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest; import org.apache.phoenix.query.BaseTest; import org.apache.phoenix.query.QueryServices; -import org.apache.phoenix.replication.reader.ReplicationLogReplayService; import org.apache.phoenix.util.EnvironmentEdgeManager; import org.apache.phoenix.util.ManualEnvironmentEdge; import org.apache.phoenix.util.ReadOnlyProps; diff --git a/phoenix-core/src/test/java/org/apache/phoenix/replication/reader/ReplicationCompactionGuardTest.java b/phoenix-core/src/test/java/org/apache/phoenix/replication/reader/ReplicationCompactionGuardTest.java index 3248106134a..0cffee289b0 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/replication/reader/ReplicationCompactionGuardTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/replication/reader/ReplicationCompactionGuardTest.java @@ -36,8 +36,8 @@ public void testAdjustsWindowWhenConsistencyPointIsLower() { long maxLookbackWindowStart = 1000000L; long consistencyPoint = 500000L; - long result = ReplicationLogReplayService.adjustMaxLookbackWindowStart( - maxLookbackWindowStart, consistencyPoint, TABLE_NAME, CF_NAME); + long result = ReplicationLogReplayService.adjustMaxLookbackWindowStart(maxLookbackWindowStart, + consistencyPoint, TABLE_NAME, CF_NAME); assertEquals(consistencyPoint, result); } @@ -47,8 +47,8 @@ public void testNoChangeWhenConsistencyPointIsHigher() { long maxLookbackWindowStart = 500000L; long consistencyPoint = 1000000L; - long result = ReplicationLogReplayService.adjustMaxLookbackWindowStart( - maxLookbackWindowStart, consistencyPoint, TABLE_NAME, CF_NAME); + long result = ReplicationLogReplayService.adjustMaxLookbackWindowStart(maxLookbackWindowStart, + consistencyPoint, TABLE_NAME, CF_NAME); assertEquals(maxLookbackWindowStart, result); } @@ -58,8 +58,8 @@ public void testNoChangeWhenConsistencyPointEqualsWindowStart() { long maxLookbackWindowStart = 500000L; long consistencyPoint = 500000L; - long result = ReplicationLogReplayService.adjustMaxLookbackWindowStart( - maxLookbackWindowStart, consistencyPoint, TABLE_NAME, CF_NAME); + long result = ReplicationLogReplayService.adjustMaxLookbackWindowStart(maxLookbackWindowStart, + consistencyPoint, TABLE_NAME, CF_NAME); assertEquals(maxLookbackWindowStart, result); } @@ -69,8 +69,8 @@ public void testConsistencyPointAtZeroRetainsAll() { long maxLookbackWindowStart = 1000000L; long consistencyPoint = 0L; - long result = ReplicationLogReplayService.adjustMaxLookbackWindowStart( - maxLookbackWindowStart, consistencyPoint, TABLE_NAME, CF_NAME); + long result = ReplicationLogReplayService.adjustMaxLookbackWindowStart(maxLookbackWindowStart, + consistencyPoint, TABLE_NAME, CF_NAME); assertEquals(0L, result); } @@ -80,8 +80,8 @@ public void testLargeTimestampsNoAdjustmentNeeded() { long maxLookbackWindowStart = System.currentTimeMillis() - 86400000L; long consistencyPoint = System.currentTimeMillis() - 120000L; - long result = ReplicationLogReplayService.adjustMaxLookbackWindowStart( - maxLookbackWindowStart, consistencyPoint, TABLE_NAME, CF_NAME); + long result = ReplicationLogReplayService.adjustMaxLookbackWindowStart(maxLookbackWindowStart, + consistencyPoint, TABLE_NAME, CF_NAME); assertEquals(maxLookbackWindowStart, result); } @@ -91,8 +91,8 @@ public void testConsistencyPointFarInPastPushesWindowBack() { long maxLookbackWindowStart = System.currentTimeMillis() - 86400000L; long consistencyPoint = System.currentTimeMillis() - 604800000L; - long result = ReplicationLogReplayService.adjustMaxLookbackWindowStart( - maxLookbackWindowStart, consistencyPoint, TABLE_NAME, CF_NAME); + long result = ReplicationLogReplayService.adjustMaxLookbackWindowStart(maxLookbackWindowStart, + consistencyPoint, TABLE_NAME, CF_NAME); assertEquals(consistencyPoint, result); } From 01f14cabd310ffe35b4d6b9d5f4bedab38f32656 Mon Sep 17 00:00:00 2001 From: Himanshu Gwalani Date: Fri, 29 May 2026 15:10:39 +0530 Subject: [PATCH 06/15] PHOENIX-7863 Fix visibility and capacity hints from review - Remove incorrect @VisibleForTesting from applyReplicationConsistencyGuard (it is genuinely public, called from CompactionScanner) - Reduce adjustMaxLookbackWindowStart to package-private - Fix newHashMapWithExpectedSize(4) to 5 in both IT classes --- .../replication/reader/ReplicationLogReplayService.java | 3 +-- .../reader/CompactionReplicationGuardDisabledIT.java | 2 +- .../replication/reader/CompactionReplicationGuardIT.java | 2 +- 3 files changed, 3 insertions(+), 4 deletions(-) diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java index 33cd922bcb8..e413ed73e9b 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java @@ -256,7 +256,6 @@ protected long getConsistencyPoint() throws IOException, SQLException { * standby clusters, this prevents compaction from dropping delete markers that have timestamps * newer than the consistency point. */ - @VisibleForTesting public static long applyReplicationConsistencyGuard(long currentMaxLookbackWindowStart, Configuration conf, String tableName, String columnFamilyName) { try { @@ -271,7 +270,7 @@ public static long applyReplicationConsistencyGuard(long currentMaxLookbackWindo } @VisibleForTesting - public static long adjustMaxLookbackWindowStart(long currentMaxLookbackWindowStart, + static long adjustMaxLookbackWindowStart(long currentMaxLookbackWindowStart, long consistencyPoint, String tableName, String columnFamilyName) { long adjusted = Math.min(currentMaxLookbackWindowStart, consistencyPoint); if (adjusted < currentMaxLookbackWindowStart) { diff --git a/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardDisabledIT.java b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardDisabledIT.java index 8ed243e1690..68ff6ed986f 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardDisabledIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardDisabledIT.java @@ -57,7 +57,7 @@ public class CompactionReplicationGuardDisabledIT extends BaseTest { @BeforeClass public static synchronized void doSetup() throws Exception { - Map props = Maps.newHashMapWithExpectedSize(4); + Map props = Maps.newHashMapWithExpectedSize(5); props.put(PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY, Integer.toString(MAX_LOOKBACK_AGE)); props.put(QueryServices.PHOENIX_COMPACTION_ENABLED, Boolean.toString(true)); props.put(ReplicationLogReplayService.PHOENIX_REPLICATION_REPLAY_ENABLED, diff --git a/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardIT.java b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardIT.java index 1ad78f0ab56..7788155eec3 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardIT.java @@ -58,7 +58,7 @@ public class CompactionReplicationGuardIT extends BaseTest { @BeforeClass public static synchronized void doSetup() throws Exception { - Map props = Maps.newHashMapWithExpectedSize(4); + Map props = Maps.newHashMapWithExpectedSize(5); props.put(PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY, Integer.toString(MAX_LOOKBACK_AGE)); props.put(QueryServices.PHOENIX_COMPACTION_ENABLED, Boolean.toString(true)); props.put(ReplicationLogReplayService.PHOENIX_REPLICATION_REPLAY_ENABLED, From fbf7a3da8d9db76d90c551994985b28a60cc6430 Mon Sep 17 00:00:00 2001 From: Himanshu Gwalani Date: Sun, 7 Jun 2026 17:14:05 +0530 Subject: [PATCH 07/15] PHOENIX-7863 Address review: row-level guard, caching, visibility fixes - Move guard enforcement from store-level to RowContext.setTTL() to fix TTL window bypass (Math.min cap after Math.max with ttlWindowStart) - Add Guava memoized cache (30s TTL) for consistency point to reduce NameNode RPCs during compaction bursts - Gate guard on major compactions only (no effect during flush/minor) - Make constructor private, replace mock injection with test factory (setConsistencyPointForTesting) - Demote per-compaction logging from INFO to DEBUG - Keep guard config flag for operational flexibility --- .../coprocessor/CompactionScanner.java | 13 ++- .../reader/ReplicationLogReplayService.java | 58 +++++++----- .../CompactionReplicationGuardDisabledIT.java | 6 +- .../reader/CompactionReplicationGuardIT.java | 15 +-- .../ReplicationCompactionGuardTest.java | 93 +++++++++++-------- 5 files changed, 100 insertions(+), 85 deletions(-) diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java index 33fb6bf0cc8..a31b1fac2ef 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java @@ -140,6 +140,7 @@ public class CompactionScanner implements InternalScanner { private final Store store; private final RegionCoprocessorEnvironment env; private long maxLookbackWindowStart; + private final long replicationConsistencyPoint; private final long maxLookbackInMillis; private int minVersion; private int maxVersion; @@ -201,18 +202,20 @@ public CompactionScanner(RegionCoprocessorEnvironment env, Store store, ? compactionTime : compactionTime - (this.maxLookbackInMillis + 1); Configuration conf = env.getConfiguration(); + this.major = major && !forceMinorCompaction; boolean replayEnabled = conf.getBoolean(ReplicationLogReplayService.PHOENIX_REPLICATION_REPLAY_ENABLED, ReplicationLogReplayService.DEFAULT_REPLICATION_REPLAY_ENABLED); boolean guardEnabled = conf.getBoolean(ReplicationLogReplayService.REPLICATION_COMPACTION_GUARD_ENABLED, ReplicationLogReplayService.DEFAULT_REPLICATION_COMPACTION_GUARD_ENABLED); - if (replayEnabled && guardEnabled) { - this.maxLookbackWindowStart = ReplicationLogReplayService.applyReplicationConsistencyGuard( - this.maxLookbackWindowStart, conf, tableName, columnFamilyName); + if (this.major && replayEnabled && guardEnabled) { + this.replicationConsistencyPoint = + ReplicationLogReplayService.resolveConsistencyPoint(conf, tableName, columnFamilyName); + } else { + this.replicationConsistencyPoint = Long.MAX_VALUE; } ColumnFamilyDescriptor cfd = store.getColumnFamilyDescriptor(); - this.major = major && !forceMinorCompaction; this.minVersion = cfd.getMinVersions(); this.maxVersion = cfd.getMaxVersions(); this.keepDeletedCells = keepDeleted ? KeepDeletedCells.TTL : cfd.getKeepDeletedCells(); @@ -1670,6 +1673,8 @@ private void setTTL(long ttlInSecs) { this.ttl = Math.max(ttlInSecs * 1000, maxLookbackInMillis + 1); this.ttlWindowStart = ttlInSecs == HConstants.FOREVER ? 1 : compactionTime - ttl; this.maxLookbackWindowStartForRow = Math.max(ttlWindowStart, maxLookbackWindowStart); + this.maxLookbackWindowStartForRow = + Math.min(this.maxLookbackWindowStartForRow, replicationConsistencyPoint); if (LOGGER.isTraceEnabled()) { LOGGER.trace(String.format("RowContext:- (ttlWindowStart=%d, maxLookbackWindowStart=%d)", ttlWindowStart, maxLookbackWindowStart)); diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java index e413ed73e9b..9a88b7a5fba 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java @@ -30,6 +30,8 @@ import org.slf4j.LoggerFactory; import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting; +import org.apache.phoenix.thirdparty.com.google.common.base.Supplier; +import org.apache.phoenix.thirdparty.com.google.common.base.Suppliers; import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; /** @@ -89,14 +91,30 @@ public class ReplicationLogReplayService { */ public static final int DEFAULT_REPLICATION_REPLAY_SERVICE_EXECUTOR_SHUTDOWN_TIMEOUT_SECONDS = 30; + private static final long CONSISTENCY_POINT_CACHE_TTL_SECONDS = 30; + private static volatile ReplicationLogReplayService instance; private final Configuration conf; private ScheduledExecutorService scheduler; private volatile boolean isRunning = false; + private final Supplier cachedConsistencyPoint; - protected ReplicationLogReplayService(final Configuration conf) { + private ReplicationLogReplayService(final Configuration conf) { this.conf = conf; + this.cachedConsistencyPoint = Suppliers.memoizeWithExpiration(() -> { + try { + return getConsistencyPoint(); + } catch (Exception e) { + LOG.warn("Failed to refresh cached consistency point", e); + return 0L; + } + }, CONSISTENCY_POINT_CACHE_TTL_SECONDS, TimeUnit.SECONDS); + } + + private ReplicationLogReplayService(long fixedConsistencyPoint) { + this.conf = null; + this.cachedConsistencyPoint = () -> fixedConsistencyPoint; } /** @@ -118,8 +136,8 @@ public static ReplicationLogReplayService getInstance(Configuration conf) throws } @VisibleForTesting - public static void setInstanceForTesting(ReplicationLogReplayService testInstance) { - instance = testInstance; + public static void setConsistencyPointForTesting(long fixedConsistencyPoint) { + instance = new ReplicationLogReplayService(fixedConsistencyPoint); } @VisibleForTesting @@ -252,36 +270,26 @@ protected long getConsistencyPoint() throws IOException, SQLException { } /** - * Applies the replication replay consistency point as a floor on maxLookbackWindowStart. On - * standby clusters, this prevents compaction from dropping delete markers that have timestamps - * newer than the consistency point. + * Resolves the minimum replication consistency point across all HA groups. Uses a cached value + * with a 30-second TTL to avoid repeated NameNode RPCs during compaction bursts. Returns 0L on + * any failure (caller treats 0 as "retain all delete markers"). */ - public static long applyReplicationConsistencyGuard(long currentMaxLookbackWindowStart, - Configuration conf, String tableName, String columnFamilyName) { + public static long resolveConsistencyPoint(Configuration conf, String tableName, + String columnFamilyName) { try { - long consistencyPoint = getInstance(conf).getConsistencyPoint(); - return adjustMaxLookbackWindowStart(currentMaxLookbackWindowStart, consistencyPoint, - tableName, columnFamilyName); + long consistencyPoint = getInstance(conf).cachedConsistencyPoint.get(); + if (LOG.isDebugEnabled()) { + LOG.debug("Replication guard: table={} store={} consistencyPoint={}", tableName, + columnFamilyName, consistencyPoint); + } + return consistencyPoint; } catch (Exception e) { - LOG.warn("Replication guard enabled but consistency point unavailable for table={} store={}." + LOG.warn("Replication guard: consistency point unavailable for table={} store={}." + " Retaining all delete markers.", tableName, columnFamilyName, e); return 0L; } } - @VisibleForTesting - static long adjustMaxLookbackWindowStart(long currentMaxLookbackWindowStart, - long consistencyPoint, String tableName, String columnFamilyName) { - long adjusted = Math.min(currentMaxLookbackWindowStart, consistencyPoint); - if (adjusted < currentMaxLookbackWindowStart) { - LOG.info( - "Replication guard: table={} store={} maxLookbackWindowStart adjusted from {} to {}" - + " (consistencyPoint={})", - tableName, columnFamilyName, currentMaxLookbackWindowStart, adjusted, consistencyPoint); - } - return adjusted; - } - /** Returns the list of HA groups on the cluster */ protected List getReplicationGroups() throws SQLException { return HAGroupStoreManager.getInstance(conf).getHAGroupNames(); diff --git a/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardDisabledIT.java b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardDisabledIT.java index 68ff6ed986f..e43f8a4dfd5 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardDisabledIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardDisabledIT.java @@ -20,8 +20,6 @@ import static org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants.PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY; import static org.apache.phoenix.util.TestUtil.assertRawRowCount; import static org.junit.Assert.assertFalse; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; import java.io.IOException; import java.sql.Connection; @@ -106,9 +104,7 @@ public void testGuardDisabledDeleteMarkersPurgedByMaxLookback() throws Exception // Set consistency point BEFORE delete — guard would retain if enabled long consistencyPoint = beforeDeleteTime - 1; - ReplicationLogReplayService mockService = mock(ReplicationLogReplayService.class); - when(mockService.getConsistencyPoint()).thenReturn(consistencyPoint); - ReplicationLogReplayService.setInstanceForTesting(mockService); + ReplicationLogReplayService.setConsistencyPointForTesting(consistencyPoint); // Advance past maxLookback injectEdge.incrementValue(MAX_LOOKBACK_AGE * 1000 + 1000); diff --git a/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardIT.java b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardIT.java index 7788155eec3..3bd0960b739 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardIT.java @@ -20,8 +20,6 @@ import static org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants.PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY; import static org.apache.phoenix.util.TestUtil.assertRawRowCount; import static org.junit.Assert.assertFalse; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; import java.io.IOException; import java.sql.Connection; @@ -210,11 +208,8 @@ public void testGuardFallbackRetainsAllWhenConsistencyPointUnavailable() throws conn.commit(); injectEdge.incrementValue(1); - // Inject a mock that throws — simulating uninitialized replay service - ReplicationLogReplayService mockService = mock(ReplicationLogReplayService.class); - when(mockService.getConsistencyPoint()) - .thenThrow(new IOException("HA groups not initialized")); - ReplicationLogReplayService.setInstanceForTesting(mockService); + // Inject consistency point of 0 — simulating fallback when replay service is unavailable + ReplicationLogReplayService.setConsistencyPointForTesting(0L); // Advance past maxLookback injectEdge.incrementValue(MAX_LOOKBACK_AGE * 1000 + 1000); @@ -227,10 +222,8 @@ public void testGuardFallbackRetainsAllWhenConsistencyPointUnavailable() throws } } - private void injectMockConsistencyPoint(long consistencyPoint) throws IOException, SQLException { - ReplicationLogReplayService mockService = mock(ReplicationLogReplayService.class); - when(mockService.getConsistencyPoint()).thenReturn(consistencyPoint); - ReplicationLogReplayService.setInstanceForTesting(mockService); + private void injectMockConsistencyPoint(long consistencyPoint) { + ReplicationLogReplayService.setConsistencyPointForTesting(consistencyPoint); } private void flush(TableName table) throws IOException { diff --git a/phoenix-core/src/test/java/org/apache/phoenix/replication/reader/ReplicationCompactionGuardTest.java b/phoenix-core/src/test/java/org/apache/phoenix/replication/reader/ReplicationCompactionGuardTest.java index 0cffee289b0..82a694ad28c 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/replication/reader/ReplicationCompactionGuardTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/replication/reader/ReplicationCompactionGuardTest.java @@ -22,78 +22,91 @@ import org.junit.Test; /** - * Tests for the replication consistency point guard in ReplicationLogReplayService. Tests the pure - * adjustment logic (adjustMaxLookbackWindowStart) which is the core of the guard — ensuring - * maxLookbackWindowStart is floored to the consistency point. + * Tests for the replication compaction guard row-level cap logic. Verifies the contract: + * maxLookbackWindowStartForRow = min(max(ttlWindowStart, maxLookbackWindowStart), consistencyPoint) */ public class ReplicationCompactionGuardTest { - private static final String TABLE_NAME = "TEST_TABLE"; - private static final String CF_NAME = "0"; + /** + * Simulates the RowContext computation in CompactionScanner.RowContext.setTTL(): + * maxLookbackWindowStartForRow = min(max(ttlWindowStart, maxLookbackWindowStart), + * replicationConsistencyPoint) + */ + private static long computeRowBoundary(long ttlWindowStart, long maxLookbackWindowStart, + long replicationConsistencyPoint) { + long rowBoundary = Math.max(ttlWindowStart, maxLookbackWindowStart); + return Math.min(rowBoundary, replicationConsistencyPoint); + } @Test - public void testAdjustsWindowWhenConsistencyPointIsLower() { - long maxLookbackWindowStart = 1000000L; - long consistencyPoint = 500000L; + public void testTtlHigherThanConsistencyPoint_capApplied() { + long now = System.currentTimeMillis(); + long maxLookbackWindowStart = now - 86400000L; + long ttlWindowStart = now - 3600000L; + long consistencyPoint = now - 7200000L; - long result = ReplicationLogReplayService.adjustMaxLookbackWindowStart(maxLookbackWindowStart, - consistencyPoint, TABLE_NAME, CF_NAME); + long result = computeRowBoundary(ttlWindowStart, maxLookbackWindowStart, consistencyPoint); assertEquals(consistencyPoint, result); } @Test - public void testNoChangeWhenConsistencyPointIsHigher() { - long maxLookbackWindowStart = 500000L; - long consistencyPoint = 1000000L; + public void testTtlLowerThanConsistencyPoint_noCapNeeded() { + long now = System.currentTimeMillis(); + long maxLookbackWindowStart = now - 86400000L; + long ttlWindowStart = now - 14400000L; + long consistencyPoint = now - 3600000L; - long result = ReplicationLogReplayService.adjustMaxLookbackWindowStart(maxLookbackWindowStart, - consistencyPoint, TABLE_NAME, CF_NAME); + long result = computeRowBoundary(ttlWindowStart, maxLookbackWindowStart, consistencyPoint); - assertEquals(maxLookbackWindowStart, result); + assertEquals(ttlWindowStart, result); } @Test - public void testNoChangeWhenConsistencyPointEqualsWindowStart() { - long maxLookbackWindowStart = 500000L; - long consistencyPoint = 500000L; + public void testNoTtl_capAppliedOnMaxLookback() { + long now = System.currentTimeMillis(); + long maxLookbackWindowStart = now - 86400000L; + long ttlWindowStart = 1L; + long consistencyPoint = now - 172800000L; - long result = ReplicationLogReplayService.adjustMaxLookbackWindowStart(maxLookbackWindowStart, - consistencyPoint, TABLE_NAME, CF_NAME); + long result = computeRowBoundary(ttlWindowStart, maxLookbackWindowStart, consistencyPoint); - assertEquals(maxLookbackWindowStart, result); + assertEquals(consistencyPoint, result); } @Test - public void testConsistencyPointAtZeroRetainsAll() { - long maxLookbackWindowStart = 1000000L; - long consistencyPoint = 0L; + public void testNoTtl_consistencyPointAheadOfMaxLookback() { + long now = System.currentTimeMillis(); + long maxLookbackWindowStart = now - 86400000L; + long ttlWindowStart = 1L; + long consistencyPoint = now - 60000L; - long result = ReplicationLogReplayService.adjustMaxLookbackWindowStart(maxLookbackWindowStart, - consistencyPoint, TABLE_NAME, CF_NAME); + long result = computeRowBoundary(ttlWindowStart, maxLookbackWindowStart, consistencyPoint); - assertEquals(0L, result); + assertEquals(maxLookbackWindowStart, result); } @Test - public void testLargeTimestampsNoAdjustmentNeeded() { - long maxLookbackWindowStart = System.currentTimeMillis() - 86400000L; - long consistencyPoint = System.currentTimeMillis() - 120000L; + public void testConsistencyPointZero_retainsAll() { + long now = System.currentTimeMillis(); + long maxLookbackWindowStart = now - 86400000L; + long ttlWindowStart = now - 3600000L; + long consistencyPoint = 0L; - long result = ReplicationLogReplayService.adjustMaxLookbackWindowStart(maxLookbackWindowStart, - consistencyPoint, TABLE_NAME, CF_NAME); + long result = computeRowBoundary(ttlWindowStart, maxLookbackWindowStart, consistencyPoint); - assertEquals(maxLookbackWindowStart, result); + assertEquals(0L, result); } @Test - public void testConsistencyPointFarInPastPushesWindowBack() { - long maxLookbackWindowStart = System.currentTimeMillis() - 86400000L; - long consistencyPoint = System.currentTimeMillis() - 604800000L; + public void testGuardDisabled_longMaxValueNoOp() { + long now = System.currentTimeMillis(); + long maxLookbackWindowStart = now - 86400000L; + long ttlWindowStart = now - 3600000L; + long consistencyPoint = Long.MAX_VALUE; - long result = ReplicationLogReplayService.adjustMaxLookbackWindowStart(maxLookbackWindowStart, - consistencyPoint, TABLE_NAME, CF_NAME); + long result = computeRowBoundary(ttlWindowStart, maxLookbackWindowStart, consistencyPoint); - assertEquals(consistencyPoint, result); + assertEquals(ttlWindowStart, result); } } From f879b5194d5683ee1203a876a0ca2fa350c305ce Mon Sep 17 00:00:00 2001 From: Himanshu Gwalani Date: Sun, 7 Jun 2026 18:04:23 +0530 Subject: [PATCH 08/15] PHOENIX-7863 Add cache test for consistency point resolution Verify that resolveConsistencyPoint uses cached value from Guava memoized supplier and does not re-fetch on every call within TTL. --- .../reader/ReplicationLogReplayService.java | 11 ++++++++ .../ReplicationCompactionGuardTest.java | 28 +++++++++++++++++++ 2 files changed, 39 insertions(+) diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java index 9a88b7a5fba..e4dd1ff0359 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java @@ -117,6 +117,12 @@ private ReplicationLogReplayService(long fixedConsistencyPoint) { this.cachedConsistencyPoint = () -> fixedConsistencyPoint; } + private ReplicationLogReplayService(Supplier supplier) { + this.conf = null; + this.cachedConsistencyPoint = Suppliers.memoizeWithExpiration( + supplier, CONSISTENCY_POINT_CACHE_TTL_SECONDS, TimeUnit.SECONDS); + } + /** * Gets the singleton instance of the ReplicationLogReplayService using the lazy initializer * pattern. Initializes the instance if it hasn't been created yet. @@ -140,6 +146,11 @@ public static void setConsistencyPointForTesting(long fixedConsistencyPoint) { instance = new ReplicationLogReplayService(fixedConsistencyPoint); } + @VisibleForTesting + public static void setConsistencyPointSupplierForTesting(Supplier supplier) { + instance = new ReplicationLogReplayService(supplier); + } + @VisibleForTesting public static void resetInstanceForTesting() { instance = null; diff --git a/phoenix-core/src/test/java/org/apache/phoenix/replication/reader/ReplicationCompactionGuardTest.java b/phoenix-core/src/test/java/org/apache/phoenix/replication/reader/ReplicationCompactionGuardTest.java index 82a694ad28c..85279cd88ae 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/replication/reader/ReplicationCompactionGuardTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/replication/reader/ReplicationCompactionGuardTest.java @@ -19,6 +19,8 @@ import static org.junit.Assert.assertEquals; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.hadoop.conf.Configuration; import org.junit.Test; /** @@ -109,4 +111,30 @@ public void testGuardDisabled_longMaxValueNoOp() { assertEquals(ttlWindowStart, result); } + + @Test + public void testCachedConsistencyPointAvoidsRepeatedFetches() { + AtomicInteger fetchCount = new AtomicInteger(0); + ReplicationLogReplayService.setConsistencyPointSupplierForTesting(() -> { + fetchCount.incrementAndGet(); + return 500000L; + }); + + try { + Configuration conf = new Configuration(false); + String table = "TEST_TABLE"; + String cf = "0"; + + long result1 = ReplicationLogReplayService.resolveConsistencyPoint(conf, table, cf); + long result2 = ReplicationLogReplayService.resolveConsistencyPoint(conf, table, cf); + long result3 = ReplicationLogReplayService.resolveConsistencyPoint(conf, table, cf); + + assertEquals(500000L, result1); + assertEquals(500000L, result2); + assertEquals(500000L, result3); + assertEquals(1, fetchCount.get()); + } finally { + ReplicationLogReplayService.resetInstanceForTesting(); + } + } } From 152dadaa4414667e79d158d2ef863785edfce997 Mon Sep 17 00:00:00 2001 From: Himanshu Gwalani Date: Thu, 11 Jun 2026 19:18:40 +0530 Subject: [PATCH 09/15] PHOENIX-7863 Fix cached failure and extract guard formula as single source of truth --- .../coprocessor/CompactionScanner.java | 4 +- .../reader/ReplicationLogReplayService.java | 20 ++++--- .../ReplicationCompactionGuardTest.java | 57 +++++++++++++------ 3 files changed, 55 insertions(+), 26 deletions(-) diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java index a31b1fac2ef..977c6d37608 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java @@ -1672,9 +1672,9 @@ private void init() { private void setTTL(long ttlInSecs) { this.ttl = Math.max(ttlInSecs * 1000, maxLookbackInMillis + 1); this.ttlWindowStart = ttlInSecs == HConstants.FOREVER ? 1 : compactionTime - ttl; - this.maxLookbackWindowStartForRow = Math.max(ttlWindowStart, maxLookbackWindowStart); this.maxLookbackWindowStartForRow = - Math.min(this.maxLookbackWindowStartForRow, replicationConsistencyPoint); + ReplicationLogReplayService.computeRowMaxLookbackWithGuard(ttlWindowStart, + maxLookbackWindowStart, replicationConsistencyPoint); if (LOGGER.isTraceEnabled()) { LOGGER.trace(String.format("RowContext:- (ttlWindowStart=%d, maxLookbackWindowStart=%d)", ttlWindowStart, maxLookbackWindowStart)); diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java index e4dd1ff0359..144628603c3 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java @@ -105,22 +105,19 @@ private ReplicationLogReplayService(final Configuration conf) { this.cachedConsistencyPoint = Suppliers.memoizeWithExpiration(() -> { try { return getConsistencyPoint(); - } catch (Exception e) { - LOG.warn("Failed to refresh cached consistency point", e); - return 0L; + } catch (IOException | SQLException e) { + throw new RuntimeException("Failed to fetch consistency point", e); } }, CONSISTENCY_POINT_CACHE_TTL_SECONDS, TimeUnit.SECONDS); } private ReplicationLogReplayService(long fixedConsistencyPoint) { - this.conf = null; this.cachedConsistencyPoint = () -> fixedConsistencyPoint; } private ReplicationLogReplayService(Supplier supplier) { - this.conf = null; - this.cachedConsistencyPoint = Suppliers.memoizeWithExpiration( - supplier, CONSISTENCY_POINT_CACHE_TTL_SECONDS, TimeUnit.SECONDS); + this.cachedConsistencyPoint = Suppliers.memoizeWithExpiration(supplier, + CONSISTENCY_POINT_CACHE_TTL_SECONDS, TimeUnit.SECONDS); } /** @@ -301,6 +298,15 @@ public static long resolveConsistencyPoint(Configuration conf, String tableName, } } + /** + * Computes the effective max-lookback boundary for a row, capped by the replication consistency + * point. This is the single source of truth for the formula used by CompactionScanner.RowContext. + */ + public static long computeRowMaxLookbackWithGuard(long ttlWindowStart, + long maxLookbackWindowStart, long replicationConsistencyPoint) { + return Math.min(Math.max(ttlWindowStart, maxLookbackWindowStart), replicationConsistencyPoint); + } + /** Returns the list of HA groups on the cluster */ protected List getReplicationGroups() throws SQLException { return HAGroupStoreManager.getInstance(conf).getHAGroupNames(); diff --git a/phoenix-core/src/test/java/org/apache/phoenix/replication/reader/ReplicationCompactionGuardTest.java b/phoenix-core/src/test/java/org/apache/phoenix/replication/reader/ReplicationCompactionGuardTest.java index 85279cd88ae..13d0e01f46a 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/replication/reader/ReplicationCompactionGuardTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/replication/reader/ReplicationCompactionGuardTest.java @@ -29,17 +29,6 @@ */ public class ReplicationCompactionGuardTest { - /** - * Simulates the RowContext computation in CompactionScanner.RowContext.setTTL(): - * maxLookbackWindowStartForRow = min(max(ttlWindowStart, maxLookbackWindowStart), - * replicationConsistencyPoint) - */ - private static long computeRowBoundary(long ttlWindowStart, long maxLookbackWindowStart, - long replicationConsistencyPoint) { - long rowBoundary = Math.max(ttlWindowStart, maxLookbackWindowStart); - return Math.min(rowBoundary, replicationConsistencyPoint); - } - @Test public void testTtlHigherThanConsistencyPoint_capApplied() { long now = System.currentTimeMillis(); @@ -47,7 +36,8 @@ public void testTtlHigherThanConsistencyPoint_capApplied() { long ttlWindowStart = now - 3600000L; long consistencyPoint = now - 7200000L; - long result = computeRowBoundary(ttlWindowStart, maxLookbackWindowStart, consistencyPoint); + long result = ReplicationLogReplayService.computeRowMaxLookbackWithGuard(ttlWindowStart, + maxLookbackWindowStart, consistencyPoint); assertEquals(consistencyPoint, result); } @@ -59,7 +49,8 @@ public void testTtlLowerThanConsistencyPoint_noCapNeeded() { long ttlWindowStart = now - 14400000L; long consistencyPoint = now - 3600000L; - long result = computeRowBoundary(ttlWindowStart, maxLookbackWindowStart, consistencyPoint); + long result = ReplicationLogReplayService.computeRowMaxLookbackWithGuard(ttlWindowStart, + maxLookbackWindowStart, consistencyPoint); assertEquals(ttlWindowStart, result); } @@ -71,7 +62,8 @@ public void testNoTtl_capAppliedOnMaxLookback() { long ttlWindowStart = 1L; long consistencyPoint = now - 172800000L; - long result = computeRowBoundary(ttlWindowStart, maxLookbackWindowStart, consistencyPoint); + long result = ReplicationLogReplayService.computeRowMaxLookbackWithGuard(ttlWindowStart, + maxLookbackWindowStart, consistencyPoint); assertEquals(consistencyPoint, result); } @@ -83,7 +75,8 @@ public void testNoTtl_consistencyPointAheadOfMaxLookback() { long ttlWindowStart = 1L; long consistencyPoint = now - 60000L; - long result = computeRowBoundary(ttlWindowStart, maxLookbackWindowStart, consistencyPoint); + long result = ReplicationLogReplayService.computeRowMaxLookbackWithGuard(ttlWindowStart, + maxLookbackWindowStart, consistencyPoint); assertEquals(maxLookbackWindowStart, result); } @@ -95,7 +88,8 @@ public void testConsistencyPointZero_retainsAll() { long ttlWindowStart = now - 3600000L; long consistencyPoint = 0L; - long result = computeRowBoundary(ttlWindowStart, maxLookbackWindowStart, consistencyPoint); + long result = ReplicationLogReplayService.computeRowMaxLookbackWithGuard(ttlWindowStart, + maxLookbackWindowStart, consistencyPoint); assertEquals(0L, result); } @@ -107,7 +101,8 @@ public void testGuardDisabled_longMaxValueNoOp() { long ttlWindowStart = now - 3600000L; long consistencyPoint = Long.MAX_VALUE; - long result = computeRowBoundary(ttlWindowStart, maxLookbackWindowStart, consistencyPoint); + long result = ReplicationLogReplayService.computeRowMaxLookbackWithGuard(ttlWindowStart, + maxLookbackWindowStart, consistencyPoint); assertEquals(ttlWindowStart, result); } @@ -137,4 +132,32 @@ public void testCachedConsistencyPointAvoidsRepeatedFetches() { ReplicationLogReplayService.resetInstanceForTesting(); } } + + @Test + public void testTransientFailureNotCached_retriesOnNextCall() { + AtomicInteger fetchCount = new AtomicInteger(0); + ReplicationLogReplayService.setConsistencyPointSupplierForTesting(() -> { + int attempt = fetchCount.incrementAndGet(); + if (attempt == 1) { + throw new RuntimeException("Simulated transient failure"); + } + return 700000L; + }); + + try { + Configuration conf = new Configuration(false); + String table = "TEST_TABLE"; + String cf = "0"; + + long result1 = ReplicationLogReplayService.resolveConsistencyPoint(conf, table, cf); + assertEquals(0L, result1); + + long result2 = ReplicationLogReplayService.resolveConsistencyPoint(conf, table, cf); + assertEquals(700000L, result2); + + assertEquals(2, fetchCount.get()); + } finally { + ReplicationLogReplayService.resetInstanceForTesting(); + } + } } From 16a31352a1e5d74326020aee1f6ffd333b9de14e Mon Sep 17 00:00:00 2001 From: Himanshu Gwalani Date: Fri, 12 Jun 2026 15:40:11 +0530 Subject: [PATCH 10/15] PHOENIX-7863 Move guard formula to CompactionScanner and trim test --- .../coprocessor/CompactionScanner.java | 14 +++- .../reader/ReplicationLogReplayService.java | 9 -- ...a => ReplicationConsistencyPointTest.java} | 83 +------------------ 3 files changed, 13 insertions(+), 93 deletions(-) rename phoenix-core/src/test/java/org/apache/phoenix/replication/reader/{ReplicationCompactionGuardTest.java => ReplicationConsistencyPointTest.java} (50%) diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java index 977c6d37608..449b4f32c2b 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java @@ -1646,6 +1646,15 @@ private String getTenantIdFromRowKey(byte[] rowKey, boolean isSharedIndex) throw } } + /** + * Computes the effective max-lookback boundary for a row, capped by the replication consistency + * point. Formula: min(max(ttlWindowStart, maxLookbackWindowStart), consistencyPoint). + */ + public static long computeRowMaxLookbackWithGuard(long ttlWindowStart, + long maxLookbackWindowStart, long replicationConsistencyPoint) { + return Math.min(Math.max(ttlWindowStart, maxLookbackWindowStart), replicationConsistencyPoint); + } + /** * The context for a given row during compaction. A row may have multiple compaction row versions. * CompactionScanner uses the same row context for these versions. @@ -1672,9 +1681,8 @@ private void init() { private void setTTL(long ttlInSecs) { this.ttl = Math.max(ttlInSecs * 1000, maxLookbackInMillis + 1); this.ttlWindowStart = ttlInSecs == HConstants.FOREVER ? 1 : compactionTime - ttl; - this.maxLookbackWindowStartForRow = - ReplicationLogReplayService.computeRowMaxLookbackWithGuard(ttlWindowStart, - maxLookbackWindowStart, replicationConsistencyPoint); + this.maxLookbackWindowStartForRow = computeRowMaxLookbackWithGuard(ttlWindowStart, + maxLookbackWindowStart, replicationConsistencyPoint); if (LOGGER.isTraceEnabled()) { LOGGER.trace(String.format("RowContext:- (ttlWindowStart=%d, maxLookbackWindowStart=%d)", ttlWindowStart, maxLookbackWindowStart)); diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java index 144628603c3..4b63495d7f1 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java @@ -298,15 +298,6 @@ public static long resolveConsistencyPoint(Configuration conf, String tableName, } } - /** - * Computes the effective max-lookback boundary for a row, capped by the replication consistency - * point. This is the single source of truth for the formula used by CompactionScanner.RowContext. - */ - public static long computeRowMaxLookbackWithGuard(long ttlWindowStart, - long maxLookbackWindowStart, long replicationConsistencyPoint) { - return Math.min(Math.max(ttlWindowStart, maxLookbackWindowStart), replicationConsistencyPoint); - } - /** Returns the list of HA groups on the cluster */ protected List getReplicationGroups() throws SQLException { return HAGroupStoreManager.getInstance(conf).getHAGroupNames(); diff --git a/phoenix-core/src/test/java/org/apache/phoenix/replication/reader/ReplicationCompactionGuardTest.java b/phoenix-core/src/test/java/org/apache/phoenix/replication/reader/ReplicationConsistencyPointTest.java similarity index 50% rename from phoenix-core/src/test/java/org/apache/phoenix/replication/reader/ReplicationCompactionGuardTest.java rename to phoenix-core/src/test/java/org/apache/phoenix/replication/reader/ReplicationConsistencyPointTest.java index 13d0e01f46a..f989e4b62b4 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/replication/reader/ReplicationCompactionGuardTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/replication/reader/ReplicationConsistencyPointTest.java @@ -24,88 +24,9 @@ import org.junit.Test; /** - * Tests for the replication compaction guard row-level cap logic. Verifies the contract: - * maxLookbackWindowStartForRow = min(max(ttlWindowStart, maxLookbackWindowStart), consistencyPoint) + * Tests for ReplicationLogReplayService.resolveConsistencyPoint caching behavior. */ -public class ReplicationCompactionGuardTest { - - @Test - public void testTtlHigherThanConsistencyPoint_capApplied() { - long now = System.currentTimeMillis(); - long maxLookbackWindowStart = now - 86400000L; - long ttlWindowStart = now - 3600000L; - long consistencyPoint = now - 7200000L; - - long result = ReplicationLogReplayService.computeRowMaxLookbackWithGuard(ttlWindowStart, - maxLookbackWindowStart, consistencyPoint); - - assertEquals(consistencyPoint, result); - } - - @Test - public void testTtlLowerThanConsistencyPoint_noCapNeeded() { - long now = System.currentTimeMillis(); - long maxLookbackWindowStart = now - 86400000L; - long ttlWindowStart = now - 14400000L; - long consistencyPoint = now - 3600000L; - - long result = ReplicationLogReplayService.computeRowMaxLookbackWithGuard(ttlWindowStart, - maxLookbackWindowStart, consistencyPoint); - - assertEquals(ttlWindowStart, result); - } - - @Test - public void testNoTtl_capAppliedOnMaxLookback() { - long now = System.currentTimeMillis(); - long maxLookbackWindowStart = now - 86400000L; - long ttlWindowStart = 1L; - long consistencyPoint = now - 172800000L; - - long result = ReplicationLogReplayService.computeRowMaxLookbackWithGuard(ttlWindowStart, - maxLookbackWindowStart, consistencyPoint); - - assertEquals(consistencyPoint, result); - } - - @Test - public void testNoTtl_consistencyPointAheadOfMaxLookback() { - long now = System.currentTimeMillis(); - long maxLookbackWindowStart = now - 86400000L; - long ttlWindowStart = 1L; - long consistencyPoint = now - 60000L; - - long result = ReplicationLogReplayService.computeRowMaxLookbackWithGuard(ttlWindowStart, - maxLookbackWindowStart, consistencyPoint); - - assertEquals(maxLookbackWindowStart, result); - } - - @Test - public void testConsistencyPointZero_retainsAll() { - long now = System.currentTimeMillis(); - long maxLookbackWindowStart = now - 86400000L; - long ttlWindowStart = now - 3600000L; - long consistencyPoint = 0L; - - long result = ReplicationLogReplayService.computeRowMaxLookbackWithGuard(ttlWindowStart, - maxLookbackWindowStart, consistencyPoint); - - assertEquals(0L, result); - } - - @Test - public void testGuardDisabled_longMaxValueNoOp() { - long now = System.currentTimeMillis(); - long maxLookbackWindowStart = now - 86400000L; - long ttlWindowStart = now - 3600000L; - long consistencyPoint = Long.MAX_VALUE; - - long result = ReplicationLogReplayService.computeRowMaxLookbackWithGuard(ttlWindowStart, - maxLookbackWindowStart, consistencyPoint); - - assertEquals(ttlWindowStart, result); - } +public class ReplicationConsistencyPointTest { @Test public void testCachedConsistencyPointAvoidsRepeatedFetches() { From bb42a32dd01f4344d3054b8c3db692f1b632ee01 Mon Sep 17 00:00:00 2001 From: Himanshu Gwalani Date: Fri, 12 Jun 2026 15:47:23 +0530 Subject: [PATCH 11/15] PHOENIX-7863 Narrow guard formula visibility to private --- .../java/org/apache/phoenix/coprocessor/CompactionScanner.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java index 449b4f32c2b..8525ccac0b9 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java @@ -1650,7 +1650,7 @@ private String getTenantIdFromRowKey(byte[] rowKey, boolean isSharedIndex) throw * Computes the effective max-lookback boundary for a row, capped by the replication consistency * point. Formula: min(max(ttlWindowStart, maxLookbackWindowStart), consistencyPoint). */ - public static long computeRowMaxLookbackWithGuard(long ttlWindowStart, + private static long computeRowMaxLookbackWithGuard(long ttlWindowStart, long maxLookbackWindowStart, long replicationConsistencyPoint) { return Math.min(Math.max(ttlWindowStart, maxLookbackWindowStart), replicationConsistencyPoint); } From 18714b1addc8354ed387ea9608e986c91771cb2e Mon Sep 17 00:00:00 2001 From: Himanshu Gwalani Date: Tue, 23 Jun 2026 13:45:17 +0530 Subject: [PATCH 12/15] Fix build issue by initialzing the conf variable --- .../phoenix/replication/reader/ReplicationLogReplayService.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java index 4b63495d7f1..baf5739e450 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java @@ -112,10 +112,12 @@ private ReplicationLogReplayService(final Configuration conf) { } private ReplicationLogReplayService(long fixedConsistencyPoint) { + this.conf = null; this.cachedConsistencyPoint = () -> fixedConsistencyPoint; } private ReplicationLogReplayService(Supplier supplier) { + this.conf = null; this.cachedConsistencyPoint = Suppliers.memoizeWithExpiration(supplier, CONSISTENCY_POINT_CACHE_TTL_SECONDS, TimeUnit.SECONDS); } From 96a0c5ee19e13c8a3df90240a36503aec47f3833 Mon Sep 17 00:00:00 2001 From: Himanshu Gwalani Date: Tue, 23 Jun 2026 23:40:31 +0530 Subject: [PATCH 13/15] PHOENIX-7863 Address review: volatile, remove guard flag, add TTL IT and formula UTs --- .../coprocessor/CompactionScanner.java | 7 +- .../reader/ReplicationLogDiscoveryReplay.java | 2 +- .../reader/ReplicationLogReplayService.java | 10 -- .../CompactionReplicationGuardDisabledIT.java | 147 ------------------ .../reader/CompactionReplicationGuardIT.java | 48 +++++- .../CompactionGuardFormulaTest.java | 100 ++++++++++++ 6 files changed, 149 insertions(+), 165 deletions(-) delete mode 100644 phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardDisabledIT.java create mode 100644 phoenix-core/src/test/java/org/apache/phoenix/coprocessor/CompactionGuardFormulaTest.java diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java index 8525ccac0b9..f3d3b4bf8b5 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java @@ -206,10 +206,7 @@ public CompactionScanner(RegionCoprocessorEnvironment env, Store store, boolean replayEnabled = conf.getBoolean(ReplicationLogReplayService.PHOENIX_REPLICATION_REPLAY_ENABLED, ReplicationLogReplayService.DEFAULT_REPLICATION_REPLAY_ENABLED); - boolean guardEnabled = - conf.getBoolean(ReplicationLogReplayService.REPLICATION_COMPACTION_GUARD_ENABLED, - ReplicationLogReplayService.DEFAULT_REPLICATION_COMPACTION_GUARD_ENABLED); - if (this.major && replayEnabled && guardEnabled) { + if (this.major && replayEnabled) { this.replicationConsistencyPoint = ReplicationLogReplayService.resolveConsistencyPoint(conf, tableName, columnFamilyName); } else { @@ -1650,7 +1647,7 @@ private String getTenantIdFromRowKey(byte[] rowKey, boolean isSharedIndex) throw * Computes the effective max-lookback boundary for a row, capped by the replication consistency * point. Formula: min(max(ttlWindowStart, maxLookbackWindowStart), consistencyPoint). */ - private static long computeRowMaxLookbackWithGuard(long ttlWindowStart, + public static long computeRowMaxLookbackWithGuard(long ttlWindowStart, long maxLookbackWindowStart, long replicationConsistencyPoint) { return Math.min(Math.max(ttlWindowStart, maxLookbackWindowStart), replicationConsistencyPoint); } diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplay.java b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplay.java index 4d5f886d510..ec7a1da5358 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplay.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplay.java @@ -103,7 +103,7 @@ public class ReplicationLogDiscoveryReplay extends ReplicationLogDiscovery { */ public static final double DEFAULT_WAITING_BUFFER_PERCENTAGE = 15.0; - private ReplicationRound lastRoundInSync; + private volatile ReplicationRound lastRoundInSync; // AtomicReference ensures listener updates are visible to replay thread private final AtomicReference replicationReplayState = diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java index baf5739e450..342fe9621a3 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java @@ -53,16 +53,6 @@ public class ReplicationLogReplayService { */ public static final boolean DEFAULT_REPLICATION_REPLAY_ENABLED = false; - /** - * Configuration key for enabling/disabling the replication compaction guard - */ - public static final String REPLICATION_COMPACTION_GUARD_ENABLED = - "phoenix.replication.compaction.guard.enabled"; - - /** - * Default value for replication compaction guard enabled flag - */ - public static final boolean DEFAULT_REPLICATION_COMPACTION_GUARD_ENABLED = true; /** * Number of threads in the executor pool for the replication replay service diff --git a/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardDisabledIT.java b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardDisabledIT.java deleted file mode 100644 index e43f8a4dfd5..00000000000 --- a/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardDisabledIT.java +++ /dev/null @@ -1,147 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.phoenix.replication.reader; - -import static org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants.PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY; -import static org.apache.phoenix.util.TestUtil.assertRawRowCount; -import static org.junit.Assert.assertFalse; - -import java.io.IOException; -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.SQLException; -import java.util.Map; -import org.apache.hadoop.hbase.TableName; -import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest; -import org.apache.phoenix.query.BaseTest; -import org.apache.phoenix.query.QueryServices; -import org.apache.phoenix.util.EnvironmentEdgeManager; -import org.apache.phoenix.util.ManualEnvironmentEdge; -import org.apache.phoenix.util.ReadOnlyProps; -import org.apache.phoenix.util.TestUtil; -import org.junit.After; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Test; -import org.junit.experimental.categories.Category; - -import org.apache.phoenix.thirdparty.com.google.common.collect.Maps; - -/** - * Integration test verifying that the replication compaction guard does NOT interfere with normal - * compaction when explicitly disabled via configuration. - */ -@Category(NeedsOwnMiniClusterTest.class) -public class CompactionReplicationGuardDisabledIT extends BaseTest { - - private static final int MAX_LOOKBACK_AGE = 15; - private static final int ROWS_POPULATED = 2; - private ManualEnvironmentEdge injectEdge; - - @BeforeClass - public static synchronized void doSetup() throws Exception { - Map props = Maps.newHashMapWithExpectedSize(5); - props.put(PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY, Integer.toString(MAX_LOOKBACK_AGE)); - props.put(QueryServices.PHOENIX_COMPACTION_ENABLED, Boolean.toString(true)); - props.put(ReplicationLogReplayService.PHOENIX_REPLICATION_REPLAY_ENABLED, - Boolean.toString(true)); - props.put(ReplicationLogReplayService.REPLICATION_COMPACTION_GUARD_ENABLED, - Boolean.toString(false)); - props.put("hbase.procedure.remote.dispatcher.delay.msec", "0"); - setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); - } - - @Before - public void beforeTest() throws Exception { - EnvironmentEdgeManager.reset(); - injectEdge = new ManualEnvironmentEdge(); - injectEdge.setValue(System.currentTimeMillis()); - EnvironmentEdgeManager.injectEdge(injectEdge); - } - - @After - public synchronized void afterTest() throws Exception { - ReplicationLogReplayService.resetInstanceForTesting(); - EnvironmentEdgeManager.reset(); - boolean refCountLeaked = isAnyStoreRefCountLeaked(); - assertFalse("refCount leaked", refCountLeaked); - } - - /** - * When guard is disabled, delete markers are purged normally by maxLookback even though the - * consistency point would have protected them if the guard were enabled. - */ - @Test(timeout = 120000L) - public void testGuardDisabledDeleteMarkersPurgedByMaxLookback() throws Exception { - try (Connection conn = DriverManager.getConnection(getUrl())) { - String dataTableName = generateUniqueName(); - createTable(dataTableName); - TableName dataTable = TableName.valueOf(dataTableName); - populateTable(dataTableName); - - injectEdge.incrementValue(1); - long beforeDeleteTime = EnvironmentEdgeManager.currentTimeMillis(); - - // Delete a row - conn.createStatement().execute("DELETE FROM " + dataTableName + " WHERE id = 'a'"); - conn.commit(); - injectEdge.incrementValue(1); - - // Set consistency point BEFORE delete — guard would retain if enabled - long consistencyPoint = beforeDeleteTime - 1; - ReplicationLogReplayService.setConsistencyPointForTesting(consistencyPoint); - - // Advance past maxLookback - injectEdge.incrementValue(MAX_LOOKBACK_AGE * 1000 + 1000); - - flush(dataTable); - majorCompact(dataTable); - - // Guard disabled — delete marker purged by maxLookback as normal - assertRawRowCount(conn, dataTable, ROWS_POPULATED - 1); - } - } - - private void flush(TableName table) throws IOException { - getUtility().getAdmin().flush(table); - } - - private void majorCompact(TableName table) throws Exception { - TestUtil.majorCompact(getUtility(), table); - } - - private void createTable(String tableName) throws SQLException { - try (Connection conn = DriverManager.getConnection(getUrl())) { - conn.createStatement().execute( - "CREATE TABLE " + tableName + " (id VARCHAR(10) NOT NULL PRIMARY KEY, val1 VARCHAR(10)," - + " val2 VARCHAR(10), val3 VARCHAR(10))"); - conn.commit(); - } - } - - private void populateTable(String tableName) throws SQLException { - try (Connection conn = DriverManager.getConnection(getUrl())) { - conn.createStatement() - .execute("UPSERT INTO " + tableName + " VALUES ('a', 'ab', 'abc', 'abcd')"); - conn.commit(); - conn.createStatement() - .execute("UPSERT INTO " + tableName + " VALUES ('b', 'bc', 'bcd', 'bcde')"); - conn.commit(); - } - } -} diff --git a/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardIT.java b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardIT.java index 3bd0960b739..f16a3146798 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardIT.java @@ -61,8 +61,6 @@ public static synchronized void doSetup() throws Exception { props.put(QueryServices.PHOENIX_COMPACTION_ENABLED, Boolean.toString(true)); props.put(ReplicationLogReplayService.PHOENIX_REPLICATION_REPLAY_ENABLED, Boolean.toString(true)); - props.put(ReplicationLogReplayService.REPLICATION_COMPACTION_GUARD_ENABLED, - Boolean.toString(true)); props.put("hbase.procedure.remote.dispatcher.delay.msec", "0"); setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); } @@ -222,6 +220,43 @@ public void testGuardFallbackRetainsAllWhenConsistencyPointUnavailable() throws } } + /** + * Test 5: Guard retains delete markers on a table with explicit TTL. The consistency point is + * set BEFORE the delete, time advances past both TTL and maxLookback, and the guard still + * retains the delete marker because its timestamp is newer than the consistency point. + */ + @Test(timeout = 120000L) + public void testGuardRetainsDeleteMarkersWithExplicitTTL() throws Exception { + int ttlSeconds = 30; + try (Connection conn = DriverManager.getConnection(getUrl())) { + String dataTableName = generateUniqueName(); + createTableWithTTL(dataTableName, ttlSeconds); + TableName dataTable = TableName.valueOf(dataTableName); + populateTable(dataTableName); + + injectEdge.incrementValue(1); + long beforeDeleteTime = EnvironmentEdgeManager.currentTimeMillis(); + + // Delete a row + conn.createStatement().execute("DELETE FROM " + dataTableName + " WHERE id = 'a'"); + conn.commit(); + injectEdge.incrementValue(1); + + // Set consistency point BEFORE the delete — replay hasn't caught up + long consistencyPoint = beforeDeleteTime - 1; + injectMockConsistencyPoint(consistencyPoint); + + // Advance past both TTL and maxLookback — without guard, marker would be purged + injectEdge.incrementValue(ttlSeconds * 1000 + 1000); + + flush(dataTable); + majorCompact(dataTable); + + // Delete marker retained — guard caps purge at consistencyPoint + assertRawRowCount(conn, dataTable, ROWS_POPULATED); + } + } + private void injectMockConsistencyPoint(long consistencyPoint) { ReplicationLogReplayService.setConsistencyPointForTesting(consistencyPoint); } @@ -243,6 +278,15 @@ private void createTable(String tableName) throws SQLException { } } + private void createTableWithTTL(String tableName, int ttlSeconds) throws SQLException { + try (Connection conn = DriverManager.getConnection(getUrl())) { + conn.createStatement().execute( + "CREATE TABLE " + tableName + " (id VARCHAR(10) NOT NULL PRIMARY KEY, val1 VARCHAR(10)," + + " val2 VARCHAR(10), val3 VARCHAR(10)) TTL=" + ttlSeconds); + conn.commit(); + } + } + private void populateTable(String tableName) throws SQLException { try (Connection conn = DriverManager.getConnection(getUrl())) { conn.createStatement() diff --git a/phoenix-core/src/test/java/org/apache/phoenix/coprocessor/CompactionGuardFormulaTest.java b/phoenix-core/src/test/java/org/apache/phoenix/coprocessor/CompactionGuardFormulaTest.java new file mode 100644 index 00000000000..f41f7d70f07 --- /dev/null +++ b/phoenix-core/src/test/java/org/apache/phoenix/coprocessor/CompactionGuardFormulaTest.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.coprocessor; + +import static org.junit.Assert.assertEquals; + +import org.junit.Test; + +/** + * Unit tests for CompactionScanner.computeRowMaxLookbackWithGuard formula. + * Covers scenarios including those unreachable via standard TTL floor enforcement + * to guard against future changes to the TTL computation. + */ +public class CompactionGuardFormulaTest { + + @Test + public void testTtlDominatedGuardCaps() { + // ttlWindowStart > maxLookbackWindowStart > consistencyPoint + // Simulates conditional TTL or future removal of TTL floor enforcement + long consistencyPoint = 1000L; + long maxLookbackWindowStart = 2000L; + long ttlWindowStart = 3000L; + + long result = CompactionScanner.computeRowMaxLookbackWithGuard( + ttlWindowStart, maxLookbackWindowStart, consistencyPoint); + + // max(3000, 2000) = 3000, min(3000, 1000) = 1000 → guard caps at consistencyPoint + assertEquals(consistencyPoint, result); + } + + @Test + public void testLookbackDominatedGuardCaps() { + // maxLookbackWindowStart > ttlWindowStart > consistencyPoint + long consistencyPoint = 1000L; + long ttlWindowStart = 2000L; + long maxLookbackWindowStart = 3000L; + + long result = CompactionScanner.computeRowMaxLookbackWithGuard( + ttlWindowStart, maxLookbackWindowStart, consistencyPoint); + + // max(2000, 3000) = 3000, min(3000, 1000) = 1000 → guard caps at consistencyPoint + assertEquals(consistencyPoint, result); + } + + @Test + public void testConsistencyPointBeyondBoth_guardInactive() { + // consistencyPoint > max(ttlWindowStart, maxLookbackWindowStart) + long maxLookbackWindowStart = 2000L; + long ttlWindowStart = 1000L; + long consistencyPoint = 5000L; + + long result = CompactionScanner.computeRowMaxLookbackWithGuard( + ttlWindowStart, maxLookbackWindowStart, consistencyPoint); + + // max(1000, 2000) = 2000, min(2000, 5000) = 2000 → guard doesn't restrict + assertEquals(maxLookbackWindowStart, result); + } + + @Test + public void testConsistencyPointZero_retainsAll() { + // consistencyPoint = 0 signals fallback — retain all delete markers + long maxLookbackWindowStart = 2000L; + long ttlWindowStart = 3000L; + long consistencyPoint = 0L; + + long result = CompactionScanner.computeRowMaxLookbackWithGuard( + ttlWindowStart, maxLookbackWindowStart, consistencyPoint); + + assertEquals(0L, result); + } + + @Test + public void testConsistencyPointMaxValue_guardDisabled() { + // Long.MAX_VALUE used when replay is off — guard is effectively a no-op + long maxLookbackWindowStart = 2000L; + long ttlWindowStart = 3000L; + long consistencyPoint = Long.MAX_VALUE; + + long result = CompactionScanner.computeRowMaxLookbackWithGuard( + ttlWindowStart, maxLookbackWindowStart, consistencyPoint); + + // max(3000, 2000) = 3000, min(3000, MAX_VALUE) = 3000 → normal behavior + assertEquals(ttlWindowStart, result); + } +} From 14c10e84307449c6d4ece2f4dd21bf190b20aa26 Mon Sep 17 00:00:00 2001 From: Himanshu Gwalani Date: Wed, 24 Jun 2026 17:45:15 +0530 Subject: [PATCH 14/15] PHOENIX-7863 Pass Configuration from caller into test constructors --- .../reader/ReplicationLogReplayService.java | 18 ++++++++++-------- .../reader/CompactionReplicationGuardIT.java | 6 ++++-- .../ReplicationConsistencyPointTest.java | 8 ++++---- 3 files changed, 18 insertions(+), 14 deletions(-) diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java index 342fe9621a3..b6802d2805a 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java @@ -101,13 +101,13 @@ private ReplicationLogReplayService(final Configuration conf) { }, CONSISTENCY_POINT_CACHE_TTL_SECONDS, TimeUnit.SECONDS); } - private ReplicationLogReplayService(long fixedConsistencyPoint) { - this.conf = null; + private ReplicationLogReplayService(Configuration conf, long fixedConsistencyPoint) { + this.conf = conf; this.cachedConsistencyPoint = () -> fixedConsistencyPoint; } - private ReplicationLogReplayService(Supplier supplier) { - this.conf = null; + private ReplicationLogReplayService(Configuration conf, Supplier supplier) { + this.conf = conf; this.cachedConsistencyPoint = Suppliers.memoizeWithExpiration(supplier, CONSISTENCY_POINT_CACHE_TTL_SECONDS, TimeUnit.SECONDS); } @@ -131,13 +131,15 @@ public static ReplicationLogReplayService getInstance(Configuration conf) throws } @VisibleForTesting - public static void setConsistencyPointForTesting(long fixedConsistencyPoint) { - instance = new ReplicationLogReplayService(fixedConsistencyPoint); + public static void setConsistencyPointForTesting(Configuration conf, + long fixedConsistencyPoint) { + instance = new ReplicationLogReplayService(conf, fixedConsistencyPoint); } @VisibleForTesting - public static void setConsistencyPointSupplierForTesting(Supplier supplier) { - instance = new ReplicationLogReplayService(supplier); + public static void setConsistencyPointSupplierForTesting(Configuration conf, + Supplier supplier) { + instance = new ReplicationLogReplayService(conf, supplier); } @VisibleForTesting diff --git a/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardIT.java b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardIT.java index f16a3146798..d5e31b7eb3b 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardIT.java @@ -207,7 +207,8 @@ public void testGuardFallbackRetainsAllWhenConsistencyPointUnavailable() throws injectEdge.incrementValue(1); // Inject consistency point of 0 — simulating fallback when replay service is unavailable - ReplicationLogReplayService.setConsistencyPointForTesting(0L); + ReplicationLogReplayService.setConsistencyPointForTesting( + getUtility().getConfiguration(), 0L); // Advance past maxLookback injectEdge.incrementValue(MAX_LOOKBACK_AGE * 1000 + 1000); @@ -258,7 +259,8 @@ public void testGuardRetainsDeleteMarkersWithExplicitTTL() throws Exception { } private void injectMockConsistencyPoint(long consistencyPoint) { - ReplicationLogReplayService.setConsistencyPointForTesting(consistencyPoint); + ReplicationLogReplayService.setConsistencyPointForTesting( + getUtility().getConfiguration(), consistencyPoint); } private void flush(TableName table) throws IOException { diff --git a/phoenix-core/src/test/java/org/apache/phoenix/replication/reader/ReplicationConsistencyPointTest.java b/phoenix-core/src/test/java/org/apache/phoenix/replication/reader/ReplicationConsistencyPointTest.java index f989e4b62b4..1c277b3e15b 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/replication/reader/ReplicationConsistencyPointTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/replication/reader/ReplicationConsistencyPointTest.java @@ -30,14 +30,14 @@ public class ReplicationConsistencyPointTest { @Test public void testCachedConsistencyPointAvoidsRepeatedFetches() { + Configuration conf = new Configuration(false); AtomicInteger fetchCount = new AtomicInteger(0); - ReplicationLogReplayService.setConsistencyPointSupplierForTesting(() -> { + ReplicationLogReplayService.setConsistencyPointSupplierForTesting(conf, () -> { fetchCount.incrementAndGet(); return 500000L; }); try { - Configuration conf = new Configuration(false); String table = "TEST_TABLE"; String cf = "0"; @@ -56,8 +56,9 @@ public void testCachedConsistencyPointAvoidsRepeatedFetches() { @Test public void testTransientFailureNotCached_retriesOnNextCall() { + Configuration conf = new Configuration(false); AtomicInteger fetchCount = new AtomicInteger(0); - ReplicationLogReplayService.setConsistencyPointSupplierForTesting(() -> { + ReplicationLogReplayService.setConsistencyPointSupplierForTesting(conf, () -> { int attempt = fetchCount.incrementAndGet(); if (attempt == 1) { throw new RuntimeException("Simulated transient failure"); @@ -66,7 +67,6 @@ public void testTransientFailureNotCached_retriesOnNextCall() { }); try { - Configuration conf = new Configuration(false); String table = "TEST_TABLE"; String cf = "0"; From 1d4ad190bc37496236017c4d35d938684e90030f Mon Sep 17 00:00:00 2001 From: Himanshu Gwalani Date: Thu, 25 Jun 2026 02:52:49 +0530 Subject: [PATCH 15/15] PHOENIX-7863 Named constants, configurable cache TTL, rate-limited logging, and trace improvements - Replace magic 0L and Long.MAX_VALUE with CONSISTENCY_POINT_UNAVAILABLE and CONSISTENCY_POINT_GUARD_DISABLED named constants for clarity - Make consistency point cache TTL configurable via phoenix.replication.compaction.guard.cache.ttl.seconds (default 30s) - Rate-limit fallback WARN log to once per 60 seconds to prevent log flooding during sustained outages - Add replicationConsistencyPoint and computed maxLookbackWindowStartForRow to existing TRACE log line for operational visibility - Update Javadoc on resolveConsistencyPoint to reflect configurable TTL and named constant references --- .../coprocessor/CompactionScanner.java | 32 ++++++-- .../reader/ReplicationLogReplayService.java | 55 +++++++++---- .../reader/CompactionReplicationGuardIT.java | 17 ++-- .../CompactionGuardFormulaTest.java | 78 +++++++++++++------ .../ReplicationConsistencyPointTest.java | 3 +- 5 files changed, 133 insertions(+), 52 deletions(-) diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java index f3d3b4bf8b5..cb8d890d1c2 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java @@ -210,7 +210,8 @@ public CompactionScanner(RegionCoprocessorEnvironment env, Store store, this.replicationConsistencyPoint = ReplicationLogReplayService.resolveConsistencyPoint(conf, tableName, columnFamilyName); } else { - this.replicationConsistencyPoint = Long.MAX_VALUE; + this.replicationConsistencyPoint = + ReplicationLogReplayService.CONSISTENCY_POINT_GUARD_DISABLED; } ColumnFamilyDescriptor cfd = store.getColumnFamilyDescriptor(); this.minVersion = cfd.getMinVersions(); @@ -1645,11 +1646,29 @@ private String getTenantIdFromRowKey(byte[] rowKey, boolean isSharedIndex) throw /** * Computes the effective max-lookback boundary for a row, capped by the replication consistency - * point. Formula: min(max(ttlWindowStart, maxLookbackWindowStart), consistencyPoint). + * point. The consistency point represents an exclusive upper bound: everything with ts < + * consistencyPoint has been replayed. We subtract 1 so that cells at exactly ts == + * consistencyPoint satisfy the strict-greater retention check and are retained. + * @param ttlWindowStart row TTL window start in millis since epoch + * @param maxLookbackWindowStart store-level max-lookback window start in millis since epoch + * @param replicationConsistencyPoint exclusive upper bound of replayed timestamps; + * CONSISTENCY_POINT_UNAVAILABLE (0) retains all, + * CONSISTENCY_POINT_GUARD_DISABLED (Long.MAX_VALUE) means + * guard is a no-op + * @return effective boundary for the strict-greater retention compare (millis since epoch) */ public static long computeRowMaxLookbackWithGuard(long ttlWindowStart, long maxLookbackWindowStart, long replicationConsistencyPoint) { - return Math.min(Math.max(ttlWindowStart, maxLookbackWindowStart), replicationConsistencyPoint); + if ( + replicationConsistencyPoint == ReplicationLogReplayService.CONSISTENCY_POINT_UNAVAILABLE + || replicationConsistencyPoint + == ReplicationLogReplayService.CONSISTENCY_POINT_GUARD_DISABLED + ) { + return Math.min(Math.max(ttlWindowStart, maxLookbackWindowStart), + replicationConsistencyPoint); + } + return Math.min(Math.max(ttlWindowStart, maxLookbackWindowStart), + replicationConsistencyPoint - 1); } /** @@ -1681,8 +1700,11 @@ private void setTTL(long ttlInSecs) { this.maxLookbackWindowStartForRow = computeRowMaxLookbackWithGuard(ttlWindowStart, maxLookbackWindowStart, replicationConsistencyPoint); if (LOGGER.isTraceEnabled()) { - LOGGER.trace(String.format("RowContext:- (ttlWindowStart=%d, maxLookbackWindowStart=%d)", - ttlWindowStart, maxLookbackWindowStart)); + LOGGER.trace(String.format( + "RowContext:- (ttlWindowStart=%d, maxLookbackWindowStart=%d, " + + "replicationConsistencyPoint=%d, maxLookbackWindowStartForRow=%d)", + ttlWindowStart, maxLookbackWindowStart, replicationConsistencyPoint, + maxLookbackWindowStartForRow)); } } diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java index b6802d2805a..ac9ec10b4fe 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java @@ -53,7 +53,6 @@ public class ReplicationLogReplayService { */ public static final boolean DEFAULT_REPLICATION_REPLAY_ENABLED = false; - /** * Number of threads in the executor pool for the replication replay service */ @@ -81,7 +80,15 @@ public class ReplicationLogReplayService { */ public static final int DEFAULT_REPLICATION_REPLAY_SERVICE_EXECUTOR_SHUTDOWN_TIMEOUT_SECONDS = 30; - private static final long CONSISTENCY_POINT_CACHE_TTL_SECONDS = 30; + public static final long CONSISTENCY_POINT_UNAVAILABLE = 0L; + public static final long CONSISTENCY_POINT_GUARD_DISABLED = Long.MAX_VALUE; + + public static final String CONSISTENCY_POINT_CACHE_TTL_SECONDS_KEY = + "phoenix.replication.compaction.guard.cache.ttl.seconds"; + public static final long DEFAULT_CONSISTENCY_POINT_CACHE_TTL_SECONDS = 30; + + private static volatile long lastFallbackWarnTime = 0; + private static final long WARN_LOG_INTERVAL_MS = 60_000; private static volatile ReplicationLogReplayService instance; @@ -92,13 +99,19 @@ public class ReplicationLogReplayService { private ReplicationLogReplayService(final Configuration conf) { this.conf = conf; + long cacheTtl = conf.getLong(CONSISTENCY_POINT_CACHE_TTL_SECONDS_KEY, + DEFAULT_CONSISTENCY_POINT_CACHE_TTL_SECONDS); + // Guava's memoizeWithExpiration does NOT cache exceptions — a thrown RuntimeException + // causes the next get() to re-invoke the supplier. We rely on this: transient failures + // (NN flap, SYSTEM.HA_GROUP unavailable) retry on the next compaction rather than + // caching a stale fallback for the full TTL. this.cachedConsistencyPoint = Suppliers.memoizeWithExpiration(() -> { try { return getConsistencyPoint(); } catch (IOException | SQLException e) { throw new RuntimeException("Failed to fetch consistency point", e); } - }, CONSISTENCY_POINT_CACHE_TTL_SECONDS, TimeUnit.SECONDS); + }, cacheTtl, TimeUnit.SECONDS); } private ReplicationLogReplayService(Configuration conf, long fixedConsistencyPoint) { @@ -108,8 +121,10 @@ private ReplicationLogReplayService(Configuration conf, long fixedConsistencyPoi private ReplicationLogReplayService(Configuration conf, Supplier supplier) { this.conf = conf; - this.cachedConsistencyPoint = Suppliers.memoizeWithExpiration(supplier, - CONSISTENCY_POINT_CACHE_TTL_SECONDS, TimeUnit.SECONDS); + long cacheTtl = conf.getLong(CONSISTENCY_POINT_CACHE_TTL_SECONDS_KEY, + DEFAULT_CONSISTENCY_POINT_CACHE_TTL_SECONDS); + this.cachedConsistencyPoint = + Suppliers.memoizeWithExpiration(supplier, cacheTtl, TimeUnit.SECONDS); } /** @@ -131,20 +146,25 @@ public static ReplicationLogReplayService getInstance(Configuration conf) throws } @VisibleForTesting - public static void setConsistencyPointForTesting(Configuration conf, - long fixedConsistencyPoint) { - instance = new ReplicationLogReplayService(conf, fixedConsistencyPoint); + public static void setConsistencyPointForTesting(Configuration conf, long fixedConsistencyPoint) { + synchronized (ReplicationLogReplayService.class) { + instance = new ReplicationLogReplayService(conf, fixedConsistencyPoint); + } } @VisibleForTesting public static void setConsistencyPointSupplierForTesting(Configuration conf, Supplier supplier) { - instance = new ReplicationLogReplayService(conf, supplier); + synchronized (ReplicationLogReplayService.class) { + instance = new ReplicationLogReplayService(conf, supplier); + } } @VisibleForTesting public static void resetInstanceForTesting() { - instance = null; + synchronized (ReplicationLogReplayService.class) { + instance = null; + } } /** @@ -273,8 +293,9 @@ protected long getConsistencyPoint() throws IOException, SQLException { /** * Resolves the minimum replication consistency point across all HA groups. Uses a cached value - * with a 30-second TTL to avoid repeated NameNode RPCs during compaction bursts. Returns 0L on - * any failure (caller treats 0 as "retain all delete markers"). + * with a configurable TTL (see {@link #CONSISTENCY_POINT_CACHE_TTL_SECONDS_KEY}) to avoid + * repeated RPCs during compaction bursts. Returns {@link #CONSISTENCY_POINT_UNAVAILABLE} on any + * failure (caller treats this as "retain all delete markers"). */ public static long resolveConsistencyPoint(Configuration conf, String tableName, String columnFamilyName) { @@ -286,9 +307,13 @@ public static long resolveConsistencyPoint(Configuration conf, String tableName, } return consistencyPoint; } catch (Exception e) { - LOG.warn("Replication guard: consistency point unavailable for table={} store={}." - + " Retaining all delete markers.", tableName, columnFamilyName, e); - return 0L; + long now = System.currentTimeMillis(); + if (now - lastFallbackWarnTime > WARN_LOG_INTERVAL_MS) { + lastFallbackWarnTime = now; + LOG.warn("Replication guard: consistency point unavailable for table={} store={}." + + " Retaining all delete markers.", tableName, columnFamilyName, e); + } + return CONSISTENCY_POINT_UNAVAILABLE; } } diff --git a/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardIT.java b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardIT.java index d5e31b7eb3b..2279ab929f8 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardIT.java @@ -18,6 +18,7 @@ package org.apache.phoenix.replication.reader; import static org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants.PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY; +import static org.apache.phoenix.replication.reader.ReplicationLogReplayService.CONSISTENCY_POINT_UNAVAILABLE; import static org.apache.phoenix.util.TestUtil.assertRawRowCount; import static org.junit.Assert.assertFalse; @@ -206,9 +207,9 @@ public void testGuardFallbackRetainsAllWhenConsistencyPointUnavailable() throws conn.commit(); injectEdge.incrementValue(1); - // Inject consistency point of 0 — simulating fallback when replay service is unavailable - ReplicationLogReplayService.setConsistencyPointForTesting( - getUtility().getConfiguration(), 0L); + // Inject consistency point as UNAVAILABLE — simulating fallback when replay service fails + ReplicationLogReplayService.setConsistencyPointForTesting(getUtility().getConfiguration(), + CONSISTENCY_POINT_UNAVAILABLE); // Advance past maxLookback injectEdge.incrementValue(MAX_LOOKBACK_AGE * 1000 + 1000); @@ -222,9 +223,9 @@ public void testGuardFallbackRetainsAllWhenConsistencyPointUnavailable() throws } /** - * Test 5: Guard retains delete markers on a table with explicit TTL. The consistency point is - * set BEFORE the delete, time advances past both TTL and maxLookback, and the guard still - * retains the delete marker because its timestamp is newer than the consistency point. + * Test 5: Guard retains delete markers on a table with explicit TTL. The consistency point is set + * BEFORE the delete, time advances past both TTL and maxLookback, and the guard still retains the + * delete marker because its timestamp is newer than the consistency point. */ @Test(timeout = 120000L) public void testGuardRetainsDeleteMarkersWithExplicitTTL() throws Exception { @@ -259,8 +260,8 @@ public void testGuardRetainsDeleteMarkersWithExplicitTTL() throws Exception { } private void injectMockConsistencyPoint(long consistencyPoint) { - ReplicationLogReplayService.setConsistencyPointForTesting( - getUtility().getConfiguration(), consistencyPoint); + ReplicationLogReplayService.setConsistencyPointForTesting(getUtility().getConfiguration(), + consistencyPoint); } private void flush(TableName table) throws IOException { diff --git a/phoenix-core/src/test/java/org/apache/phoenix/coprocessor/CompactionGuardFormulaTest.java b/phoenix-core/src/test/java/org/apache/phoenix/coprocessor/CompactionGuardFormulaTest.java index f41f7d70f07..e13bb86d66b 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/coprocessor/CompactionGuardFormulaTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/coprocessor/CompactionGuardFormulaTest.java @@ -17,14 +17,16 @@ */ package org.apache.phoenix.coprocessor; +import static org.apache.phoenix.replication.reader.ReplicationLogReplayService.CONSISTENCY_POINT_GUARD_DISABLED; +import static org.apache.phoenix.replication.reader.ReplicationLogReplayService.CONSISTENCY_POINT_UNAVAILABLE; import static org.junit.Assert.assertEquals; import org.junit.Test; /** - * Unit tests for CompactionScanner.computeRowMaxLookbackWithGuard formula. - * Covers scenarios including those unreachable via standard TTL floor enforcement - * to guard against future changes to the TTL computation. + * Unit tests for CompactionScanner.computeRowMaxLookbackWithGuard formula. Covers scenarios + * including those unreachable via standard TTL floor enforcement to guard against future changes to + * the TTL computation. */ public class CompactionGuardFormulaTest { @@ -36,11 +38,11 @@ public void testTtlDominatedGuardCaps() { long maxLookbackWindowStart = 2000L; long ttlWindowStart = 3000L; - long result = CompactionScanner.computeRowMaxLookbackWithGuard( - ttlWindowStart, maxLookbackWindowStart, consistencyPoint); + long result = CompactionScanner.computeRowMaxLookbackWithGuard(ttlWindowStart, + maxLookbackWindowStart, consistencyPoint); - // max(3000, 2000) = 3000, min(3000, 1000) = 1000 → guard caps at consistencyPoint - assertEquals(consistencyPoint, result); + // max(3000, 2000) = 3000, min(3000, 999) = 999 → guard caps at consistencyPoint - 1 + assertEquals(consistencyPoint - 1, result); } @Test @@ -50,11 +52,11 @@ public void testLookbackDominatedGuardCaps() { long ttlWindowStart = 2000L; long maxLookbackWindowStart = 3000L; - long result = CompactionScanner.computeRowMaxLookbackWithGuard( - ttlWindowStart, maxLookbackWindowStart, consistencyPoint); + long result = CompactionScanner.computeRowMaxLookbackWithGuard(ttlWindowStart, + maxLookbackWindowStart, consistencyPoint); - // max(2000, 3000) = 3000, min(3000, 1000) = 1000 → guard caps at consistencyPoint - assertEquals(consistencyPoint, result); + // max(2000, 3000) = 3000, min(3000, 999) = 999 → guard caps at consistencyPoint - 1 + assertEquals(consistencyPoint - 1, result); } @Test @@ -64,37 +66,67 @@ public void testConsistencyPointBeyondBoth_guardInactive() { long ttlWindowStart = 1000L; long consistencyPoint = 5000L; - long result = CompactionScanner.computeRowMaxLookbackWithGuard( - ttlWindowStart, maxLookbackWindowStart, consistencyPoint); + long result = CompactionScanner.computeRowMaxLookbackWithGuard(ttlWindowStart, + maxLookbackWindowStart, consistencyPoint); - // max(1000, 2000) = 2000, min(2000, 5000) = 2000 → guard doesn't restrict + // max(1000, 2000) = 2000, min(2000, 4999) = 2000 → guard doesn't restrict assertEquals(maxLookbackWindowStart, result); } @Test public void testConsistencyPointZero_retainsAll() { - // consistencyPoint = 0 signals fallback — retain all delete markers + // consistencyPoint = UNAVAILABLE signals fallback — retain all delete markers long maxLookbackWindowStart = 2000L; long ttlWindowStart = 3000L; - long consistencyPoint = 0L; + long consistencyPoint = CONSISTENCY_POINT_UNAVAILABLE; - long result = CompactionScanner.computeRowMaxLookbackWithGuard( - ttlWindowStart, maxLookbackWindowStart, consistencyPoint); + long result = CompactionScanner.computeRowMaxLookbackWithGuard(ttlWindowStart, + maxLookbackWindowStart, consistencyPoint); - assertEquals(0L, result); + assertEquals(CONSISTENCY_POINT_UNAVAILABLE, result); } @Test public void testConsistencyPointMaxValue_guardDisabled() { - // Long.MAX_VALUE used when replay is off — guard is effectively a no-op + // GUARD_DISABLED used when replay is off — guard is effectively a no-op long maxLookbackWindowStart = 2000L; long ttlWindowStart = 3000L; - long consistencyPoint = Long.MAX_VALUE; + long consistencyPoint = CONSISTENCY_POINT_GUARD_DISABLED; - long result = CompactionScanner.computeRowMaxLookbackWithGuard( - ttlWindowStart, maxLookbackWindowStart, consistencyPoint); + long result = CompactionScanner.computeRowMaxLookbackWithGuard(ttlWindowStart, + maxLookbackWindowStart, consistencyPoint); // max(3000, 2000) = 3000, min(3000, MAX_VALUE) = 3000 → normal behavior assertEquals(ttlWindowStart, result); } + + @Test + public void testBoundaryDeleteAtExactlyConsistencyPoint_isRetained() { + // A delete marker at ts == consistencyPoint has NOT been replayed (exclusive upper bound). + // The formula must produce a boundary below consistencyPoint so that the strict-greater + // retention check (ts > boundary) retains it. + long consistencyPoint = 5000L; + long maxLookbackWindowStart = 6000L; + long ttlWindowStart = 4000L; + + long result = CompactionScanner.computeRowMaxLookbackWithGuard(ttlWindowStart, + maxLookbackWindowStart, consistencyPoint); + + // Boundary = consistencyPoint - 1 = 4999; a cell at ts=5000 satisfies ts > 4999 + assertEquals(consistencyPoint - 1, result); + } + + @Test + public void testConsistencyPointBetweenInputs() { + // ttlWindowStart < consistencyPoint < maxLookbackWindowStart + long ttlWindowStart = 1000L; + long consistencyPoint = 2500L; + long maxLookbackWindowStart = 3000L; + + long result = CompactionScanner.computeRowMaxLookbackWithGuard(ttlWindowStart, + maxLookbackWindowStart, consistencyPoint); + + // max(1000, 3000) = 3000, min(3000, 2499) = 2499 → guard caps + assertEquals(consistencyPoint - 1, result); + } } diff --git a/phoenix-core/src/test/java/org/apache/phoenix/replication/reader/ReplicationConsistencyPointTest.java b/phoenix-core/src/test/java/org/apache/phoenix/replication/reader/ReplicationConsistencyPointTest.java index 1c277b3e15b..e74a63a41dc 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/replication/reader/ReplicationConsistencyPointTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/replication/reader/ReplicationConsistencyPointTest.java @@ -17,6 +17,7 @@ */ package org.apache.phoenix.replication.reader; +import static org.apache.phoenix.replication.reader.ReplicationLogReplayService.CONSISTENCY_POINT_UNAVAILABLE; import static org.junit.Assert.assertEquals; import java.util.concurrent.atomic.AtomicInteger; @@ -71,7 +72,7 @@ public void testTransientFailureNotCached_retriesOnNextCall() { String cf = "0"; long result1 = ReplicationLogReplayService.resolveConsistencyPoint(conf, table, cf); - assertEquals(0L, result1); + assertEquals(CONSISTENCY_POINT_UNAVAILABLE, result1); long result2 = ReplicationLogReplayService.resolveConsistencyPoint(conf, table, cf); assertEquals(700000L, result2);