From 24411509d08e5c80e23c1aaa8b2e3c0c41fe96c3 Mon Sep 17 00:00:00 2001 From: Himanshu Gwalani Date: Wed, 27 May 2026 19:19:57 +0530 Subject: [PATCH 01/13] PHOENIX-7863 Add replication consistency point guard to compaction On standby clusters, compaction can prematurely drop delete markers newer than the replication consistency point, causing permanent stale data. This adds a guard that floors maxLookbackWindowStart to the minimum consistency point across all HA groups when replication replay is enabled. Enabled via phoenix.replication.compaction.guard.enabled (default true), active only when phoenix.replication.replay.enabled is also true. Falls back to retaining all delete markers if the consistency point is unavailable. --- .../apache/phoenix/query/QueryServices.java | 2 + .../phoenix/query/QueryServicesOptions.java | 2 + .../coprocessor/CompactionScanner.java | 50 ++++ .../reader/ReplicationLogReplayService.java | 15 +- .../CompactionReplicationGuardDisabledIT.java | 151 ++++++++++ .../reader/CompactionReplicationGuardIT.java | 263 ++++++++++++++++++ ...CompactionScannerReplicationGuardTest.java | 99 +++++++ 7 files changed, 580 insertions(+), 2 deletions(-) create mode 100644 phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardDisabledIT.java create mode 100644 phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardIT.java create mode 100644 phoenix-core/src/test/java/org/apache/phoenix/coprocessor/CompactionScannerReplicationGuardTest.java diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java index 72075853aa8..1609d891582 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java @@ -644,6 +644,8 @@ public interface QueryServices extends SQLCloseable { String SYNCHRONOUS_REPLICATION_ENABLED = "phoenix.synchronous.replication.enabled"; + String REPLICATION_COMPACTION_GUARD_ENABLED = "phoenix.replication.compaction.guard.enabled"; + // HA Group Store sync job interval in seconds String HA_GROUP_STORE_SYNC_INTERVAL_SECONDS = "phoenix.ha.group.store.sync.interval.seconds"; diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java index 339b6763a45..997f91baafb 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java @@ -514,6 +514,8 @@ public class QueryServicesOptions { public static final Boolean DEFAULT_SYNCHRONOUS_REPLICATION_ENABLED = false; + public static final boolean DEFAULT_REPLICATION_COMPACTION_GUARD_ENABLED = true; + // Default HA Group Store sync job interval in seconds (15 minutes = 900 seconds) public static final int DEFAULT_HA_GROUP_STORE_SYNC_INTERVAL_SECONDS = 900; diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java index f12dc77f724..6945599e604 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java @@ -81,6 +81,7 @@ import org.apache.phoenix.jdbc.PhoenixPreparedStatement; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.query.QueryServicesOptions; +import org.apache.phoenix.replication.reader.ReplicationLogReplayService; import org.apache.phoenix.schema.CompiledConditionalTTLExpression; import org.apache.phoenix.schema.CompiledTTLExpression; import org.apache.phoenix.schema.ConditionalTTLExpression; @@ -199,6 +200,16 @@ public CompactionScanner(RegionCoprocessorEnvironment env, Store store, this.maxLookbackWindowStart = this.maxLookbackInMillis == 0 ? compactionTime : compactionTime - (this.maxLookbackInMillis + 1); + Configuration conf = env.getConfiguration(); + boolean replayEnabled = + conf.getBoolean(ReplicationLogReplayService.PHOENIX_REPLICATION_REPLAY_ENABLED, + ReplicationLogReplayService.DEFAULT_REPLICATION_REPLAY_ENABLED); + boolean guardEnabled = conf.getBoolean(QueryServices.REPLICATION_COMPACTION_GUARD_ENABLED, + QueryServicesOptions.DEFAULT_REPLICATION_COMPACTION_GUARD_ENABLED); + if (replayEnabled && guardEnabled) { + this.maxLookbackWindowStart = applyReplicationConsistencyGuard(this.maxLookbackWindowStart, + conf, tableName, columnFamilyName); + } ColumnFamilyDescriptor cfd = store.getColumnFamilyDescriptor(); this.major = major && !forceMinorCompaction; this.minVersion = cfd.getMinVersions(); @@ -359,6 +370,45 @@ public static long getMaxLookbackInMillis(String tableName, String columnFamilyN : maxLookbackMap.get(tableName + CompactionScanner.SEPARATOR + columnFamilyName); } + /** + * Applies the replication replay consistency point as a floor on maxLookbackWindowStart. On + * standby clusters, this prevents compaction from dropping delete markers that have timestamps + * newer than the consistency point. + */ + @VisibleForTesting + static long applyReplicationConsistencyGuard(long currentMaxLookbackWindowStart, + Configuration conf, String tableName, String columnFamilyName) { + try { + long consistencyPoint = getConsistencyPointFromReplayService(conf); + return adjustMaxLookbackWindowStart(currentMaxLookbackWindowStart, consistencyPoint, + tableName, columnFamilyName); + } catch (Exception e) { + LOGGER + .warn("Replication guard enabled but consistency point unavailable for table={} store={}." + + " Retaining all delete markers.", tableName, columnFamilyName, e); + return 0L; + } + } + + @VisibleForTesting + static long getConsistencyPointFromReplayService(Configuration conf) throws Exception { + ReplicationLogReplayService service = ReplicationLogReplayService.getInstance(conf); + return service.getConsistencyPoint(); + } + + @VisibleForTesting + static long adjustMaxLookbackWindowStart(long currentMaxLookbackWindowStart, + long consistencyPoint, String tableName, String columnFamilyName) { + long adjusted = Math.min(currentMaxLookbackWindowStart, consistencyPoint); + if (adjusted < currentMaxLookbackWindowStart) { + LOGGER.info( + "Replication guard: table={} store={} maxLookbackWindowStart adjusted from {} to {}" + + " (consistencyPoint={})", + tableName, columnFamilyName, currentMaxLookbackWindowStart, adjusted, consistencyPoint); + } + return adjusted; + } + static class CellTimeComparator implements Comparator { public static final CellTimeComparator COMPARATOR = new CellTimeComparator(); diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java index 24d40faac77..3c3c92de68f 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java @@ -29,6 +29,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; /** @@ -83,7 +84,7 @@ public class ReplicationLogReplayService { private ScheduledExecutorService scheduler; private volatile boolean isRunning = false; - private ReplicationLogReplayService(final Configuration conf) { + protected ReplicationLogReplayService(final Configuration conf) { this.conf = conf; } @@ -105,6 +106,16 @@ public static ReplicationLogReplayService getInstance(Configuration conf) throws return instance; } + @VisibleForTesting + public static void setInstanceForTesting(ReplicationLogReplayService testInstance) { + instance = testInstance; + } + + @VisibleForTesting + public static void resetInstanceForTesting() { + instance = null; + } + /** * Starts the replication log replay service by initializing the scheduler and scheduling periodic * replay operations for each HA Group. @@ -219,7 +230,7 @@ protected void stopReplicationReplay() throws IOException, SQLException { * @throws IOException if there's an error retrieving consistency points from replication groups * @throws SQLException if there's an error accessing HA group information */ - protected long getConsistencyPoint() throws IOException, SQLException { + public long getConsistencyPoint() throws IOException, SQLException { long consistencyPoint = EnvironmentEdgeManager.currentTime(); List replicationGroups = getReplicationGroups(); for (String replicationGroup : replicationGroups) { diff --git a/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardDisabledIT.java b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardDisabledIT.java new file mode 100644 index 00000000000..40adc1126af --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardDisabledIT.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.replication.reader; + +import static org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants.PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY; +import static org.apache.phoenix.util.TestUtil.assertRawRowCount; +import static org.junit.Assert.assertFalse; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.util.Map; +import org.apache.hadoop.hbase.TableName; +import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest; +import org.apache.phoenix.query.BaseTest; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.replication.reader.ReplicationLogReplayService; +import org.apache.phoenix.util.EnvironmentEdgeManager; +import org.apache.phoenix.util.ManualEnvironmentEdge; +import org.apache.phoenix.util.ReadOnlyProps; +import org.apache.phoenix.util.TestUtil; +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import org.apache.phoenix.thirdparty.com.google.common.collect.Maps; + +/** + * Integration test verifying that the replication compaction guard does NOT interfere with normal + * compaction when explicitly disabled via configuration. + */ +@Category(NeedsOwnMiniClusterTest.class) +public class CompactionReplicationGuardDisabledIT extends BaseTest { + + private static final int MAX_LOOKBACK_AGE = 15; + private static final int ROWS_POPULATED = 2; + private ManualEnvironmentEdge injectEdge; + + @BeforeClass + public static synchronized void doSetup() throws Exception { + Map props = Maps.newHashMapWithExpectedSize(4); + props.put(PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY, Integer.toString(MAX_LOOKBACK_AGE)); + props.put(QueryServices.PHOENIX_COMPACTION_ENABLED, Boolean.toString(true)); + props.put(ReplicationLogReplayService.PHOENIX_REPLICATION_REPLAY_ENABLED, + Boolean.toString(true)); + props.put(QueryServices.REPLICATION_COMPACTION_GUARD_ENABLED, Boolean.toString(false)); + props.put("hbase.procedure.remote.dispatcher.delay.msec", "0"); + setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); + } + + @Before + public void beforeTest() throws Exception { + EnvironmentEdgeManager.reset(); + injectEdge = new ManualEnvironmentEdge(); + injectEdge.setValue(System.currentTimeMillis()); + EnvironmentEdgeManager.injectEdge(injectEdge); + } + + @After + public synchronized void afterTest() throws Exception { + ReplicationLogReplayService.resetInstanceForTesting(); + EnvironmentEdgeManager.reset(); + boolean refCountLeaked = isAnyStoreRefCountLeaked(); + assertFalse("refCount leaked", refCountLeaked); + } + + /** + * When guard is disabled, delete markers are purged normally by maxLookback even though the + * consistency point would have protected them if the guard were enabled. + */ + @Test(timeout = 120000L) + public void testGuardDisabledDeleteMarkersPurgedByMaxLookback() throws Exception { + try (Connection conn = DriverManager.getConnection(getUrl())) { + String dataTableName = generateUniqueName(); + createTable(dataTableName); + TableName dataTable = TableName.valueOf(dataTableName); + populateTable(dataTableName); + + injectEdge.incrementValue(1); + long beforeDeleteTime = EnvironmentEdgeManager.currentTimeMillis(); + + // Delete a row + conn.createStatement().execute("DELETE FROM " + dataTableName + " WHERE id = 'a'"); + conn.commit(); + injectEdge.incrementValue(1); + + // Set consistency point BEFORE delete — guard would retain if enabled + long consistencyPoint = beforeDeleteTime - 1; + ReplicationLogReplayService mockService = mock(ReplicationLogReplayService.class); + when(mockService.getConsistencyPoint()).thenReturn(consistencyPoint); + ReplicationLogReplayService.setInstanceForTesting(mockService); + + // Advance past maxLookback + injectEdge.incrementValue(MAX_LOOKBACK_AGE * 1000 + 1000); + + flush(dataTable); + majorCompact(dataTable); + + // Guard disabled — delete marker purged by maxLookback as normal + assertRawRowCount(conn, dataTable, ROWS_POPULATED - 1); + } + } + + private void flush(TableName table) throws IOException { + getUtility().getAdmin().flush(table); + } + + private void majorCompact(TableName table) throws Exception { + TestUtil.majorCompact(getUtility(), table); + } + + private void createTable(String tableName) throws SQLException { + try (Connection conn = DriverManager.getConnection(getUrl())) { + conn.createStatement().execute( + "CREATE TABLE " + tableName + " (id VARCHAR(10) NOT NULL PRIMARY KEY, val1 VARCHAR(10)," + + " val2 VARCHAR(10), val3 VARCHAR(10))"); + conn.commit(); + } + } + + private void populateTable(String tableName) throws SQLException { + try (Connection conn = DriverManager.getConnection(getUrl())) { + conn.createStatement() + .execute("UPSERT INTO " + tableName + " VALUES ('a', 'ab', 'abc', 'abcd')"); + conn.commit(); + conn.createStatement() + .execute("UPSERT INTO " + tableName + " VALUES ('b', 'bc', 'bcd', 'bcde')"); + conn.commit(); + } + } +} diff --git a/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardIT.java b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardIT.java new file mode 100644 index 00000000000..bfcec12b646 --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardIT.java @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.replication.reader; + +import static org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants.PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY; +import static org.apache.phoenix.util.TestUtil.assertRawRowCount; +import static org.junit.Assert.assertFalse; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.util.Map; +import org.apache.hadoop.hbase.TableName; +import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest; +import org.apache.phoenix.query.BaseTest; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.replication.reader.ReplicationLogReplayService; +import org.apache.phoenix.util.EnvironmentEdgeManager; +import org.apache.phoenix.util.ManualEnvironmentEdge; +import org.apache.phoenix.util.ReadOnlyProps; +import org.apache.phoenix.util.TestUtil; +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import org.apache.phoenix.thirdparty.com.google.common.collect.Maps; + +/** + * Integration tests for the replication consistency point compaction guard. Verifies that + * CompactionScanner retains delete markers with timestamps newer than the consistency point on + * clusters where replication replay is enabled. + */ +@Category(NeedsOwnMiniClusterTest.class) +public class CompactionReplicationGuardIT extends BaseTest { + + private static final int MAX_LOOKBACK_AGE = 15; + private static final int ROWS_POPULATED = 2; + private ManualEnvironmentEdge injectEdge; + + @BeforeClass + public static synchronized void doSetup() throws Exception { + Map props = Maps.newHashMapWithExpectedSize(4); + props.put(PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY, Integer.toString(MAX_LOOKBACK_AGE)); + props.put(QueryServices.PHOENIX_COMPACTION_ENABLED, Boolean.toString(true)); + props.put(ReplicationLogReplayService.PHOENIX_REPLICATION_REPLAY_ENABLED, + Boolean.toString(true)); + props.put(QueryServices.REPLICATION_COMPACTION_GUARD_ENABLED, Boolean.toString(true)); + props.put("hbase.procedure.remote.dispatcher.delay.msec", "0"); + setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); + } + + @Before + public void beforeTest() throws Exception { + EnvironmentEdgeManager.reset(); + injectEdge = new ManualEnvironmentEdge(); + injectEdge.setValue(System.currentTimeMillis()); + EnvironmentEdgeManager.injectEdge(injectEdge); + } + + @After + public synchronized void afterTest() throws Exception { + ReplicationLogReplayService.resetInstanceForTesting(); + EnvironmentEdgeManager.reset(); + boolean refCountLeaked = isAnyStoreRefCountLeaked(); + assertFalse("refCount leaked", refCountLeaked); + } + + /** + * Test 1: Guard retains delete markers that maxLookback would have purged. The consistency point + * is set BEFORE the delete timestamp, so the delete marker is newer than the consistency point + * and must be retained even after maxLookback window passes. + */ + @Test(timeout = 120000L) + public void testGuardRetainsDeleteMarkersNewerThanConsistencyPoint() throws Exception { + try (Connection conn = DriverManager.getConnection(getUrl())) { + String dataTableName = generateUniqueName(); + createTable(dataTableName); + TableName dataTable = TableName.valueOf(dataTableName); + populateTable(dataTableName); + + injectEdge.incrementValue(1); + long beforeDeleteTime = EnvironmentEdgeManager.currentTimeMillis(); + + // Delete a row + conn.createStatement().execute("DELETE FROM " + dataTableName + " WHERE id = 'a'"); + conn.commit(); + injectEdge.incrementValue(1); + + // Set consistency point BEFORE the delete — meaning replay hasn't caught up to the delete + long consistencyPoint = beforeDeleteTime - 1; + injectMockConsistencyPoint(consistencyPoint); + + // Advance time past maxLookback window — without guard, marker would be purged + injectEdge.incrementValue(MAX_LOOKBACK_AGE * 1000 + 1000); + + flush(dataTable); + majorCompact(dataTable); + + // Delete marker should be retained because its timestamp > consistencyPoint + assertRawRowCount(conn, dataTable, ROWS_POPULATED); + } + } + + /** + * Test 2: Both maxLookback and guard allow purge. The consistency point has advanced past the + * delete marker AND maxLookback window has passed — marker should be purged. + */ + @Test(timeout = 120000L) + public void testDeleteMarkersPurgedWhenOlderThanBothConsistencyPointAndMaxLookback() + throws Exception { + try (Connection conn = DriverManager.getConnection(getUrl())) { + String dataTableName = generateUniqueName(); + createTable(dataTableName); + TableName dataTable = TableName.valueOf(dataTableName); + populateTable(dataTableName); + + injectEdge.incrementValue(1); + + // Delete a row + conn.createStatement().execute("DELETE FROM " + dataTableName + " WHERE id = 'a'"); + conn.commit(); + injectEdge.incrementValue(1); + + // Advance time past maxLookback + injectEdge.incrementValue(MAX_LOOKBACK_AGE * 1000 + 1000); + + // Set consistency point to current time — replay is fully caught up + long consistencyPoint = EnvironmentEdgeManager.currentTimeMillis(); + injectMockConsistencyPoint(consistencyPoint); + + flush(dataTable); + majorCompact(dataTable); + + // Delete marker should be purged — both guard and maxLookback agree + assertRawRowCount(conn, dataTable, ROWS_POPULATED - 1); + } + } + + /** + * Test 3: MaxLookback retains even when guard wouldn't. Consistency point has advanced past the + * delete, but we're still within the maxLookback window — marker retained by maxLookback. + */ + @Test(timeout = 120000L) + public void testMaxLookbackRetainsEvenWhenGuardAllowsPurge() throws Exception { + try (Connection conn = DriverManager.getConnection(getUrl())) { + String dataTableName = generateUniqueName(); + createTable(dataTableName); + TableName dataTable = TableName.valueOf(dataTableName); + populateTable(dataTableName); + + injectEdge.incrementValue(1); + + // Delete a row + conn.createStatement().execute("DELETE FROM " + dataTableName + " WHERE id = 'a'"); + conn.commit(); + injectEdge.incrementValue(1); + + // Set consistency point to current time — guard would allow purge + long consistencyPoint = EnvironmentEdgeManager.currentTimeMillis(); + injectMockConsistencyPoint(consistencyPoint); + + // Do NOT advance past maxLookback — still within the window + injectEdge.incrementValue(1); + + flush(dataTable); + majorCompact(dataTable); + + // Delete marker retained because still within maxLookback window + assertRawRowCount(conn, dataTable, ROWS_POPULATED); + } + } + + /** + * Test 4: Guard fallback when consistency point unavailable — retains all delete markers. When + * the replay service throws an exception (e.g., not initialized), the guard falls back to + * retaining all markers to avoid data loss. + */ + @Test(timeout = 120000L) + public void testGuardFallbackRetainsAllWhenConsistencyPointUnavailable() throws Exception { + try (Connection conn = DriverManager.getConnection(getUrl())) { + String dataTableName = generateUniqueName(); + createTable(dataTableName); + TableName dataTable = TableName.valueOf(dataTableName); + populateTable(dataTableName); + + injectEdge.incrementValue(1); + + // Delete a row + conn.createStatement().execute("DELETE FROM " + dataTableName + " WHERE id = 'a'"); + conn.commit(); + injectEdge.incrementValue(1); + + // Inject a mock that throws — simulating uninitialized replay service + ReplicationLogReplayService mockService = mock(ReplicationLogReplayService.class); + when(mockService.getConsistencyPoint()) + .thenThrow(new IOException("HA groups not initialized")); + ReplicationLogReplayService.setInstanceForTesting(mockService); + + // Advance past maxLookback + injectEdge.incrementValue(MAX_LOOKBACK_AGE * 1000 + 1000); + + flush(dataTable); + majorCompact(dataTable); + + // Fallback retains all — delete marker NOT purged despite maxLookback passing + assertRawRowCount(conn, dataTable, ROWS_POPULATED); + } + } + + private void injectMockConsistencyPoint(long consistencyPoint) throws IOException, SQLException { + ReplicationLogReplayService mockService = mock(ReplicationLogReplayService.class); + when(mockService.getConsistencyPoint()).thenReturn(consistencyPoint); + ReplicationLogReplayService.setInstanceForTesting(mockService); + } + + private void flush(TableName table) throws IOException { + getUtility().getAdmin().flush(table); + } + + private void majorCompact(TableName table) throws Exception { + TestUtil.majorCompact(getUtility(), table); + } + + private void createTable(String tableName) throws SQLException { + try (Connection conn = DriverManager.getConnection(getUrl())) { + conn.createStatement().execute( + "CREATE TABLE " + tableName + " (id VARCHAR(10) NOT NULL PRIMARY KEY, val1 VARCHAR(10)," + + " val2 VARCHAR(10), val3 VARCHAR(10))"); + conn.commit(); + } + } + + private void populateTable(String tableName) throws SQLException { + try (Connection conn = DriverManager.getConnection(getUrl())) { + conn.createStatement() + .execute("UPSERT INTO " + tableName + " VALUES ('a', 'ab', 'abc', 'abcd')"); + conn.commit(); + conn.createStatement() + .execute("UPSERT INTO " + tableName + " VALUES ('b', 'bc', 'bcd', 'bcde')"); + conn.commit(); + } + } +} diff --git a/phoenix-core/src/test/java/org/apache/phoenix/coprocessor/CompactionScannerReplicationGuardTest.java b/phoenix-core/src/test/java/org/apache/phoenix/coprocessor/CompactionScannerReplicationGuardTest.java new file mode 100644 index 00000000000..6fbc43bea2a --- /dev/null +++ b/phoenix-core/src/test/java/org/apache/phoenix/coprocessor/CompactionScannerReplicationGuardTest.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.coprocessor; + +import static org.junit.Assert.assertEquals; + +import org.junit.Test; + +/** + * Tests for the replication consistency point guard in CompactionScanner. Tests the pure adjustment + * logic (adjustMaxLookbackWindowStart) which is the core of the guard — ensuring + * maxLookbackWindowStart is floored to the consistency point. + */ +public class CompactionScannerReplicationGuardTest { + + private static final String TABLE_NAME = "TEST_TABLE"; + private static final String CF_NAME = "0"; + + @Test + public void testAdjustsWindowWhenConsistencyPointIsLower() { + long maxLookbackWindowStart = 1000000L; + long consistencyPoint = 500000L; + + long result = CompactionScanner.adjustMaxLookbackWindowStart(maxLookbackWindowStart, + consistencyPoint, TABLE_NAME, CF_NAME); + + assertEquals(consistencyPoint, result); + } + + @Test + public void testNoChangeWhenConsistencyPointIsHigher() { + long maxLookbackWindowStart = 500000L; + long consistencyPoint = 1000000L; + + long result = CompactionScanner.adjustMaxLookbackWindowStart(maxLookbackWindowStart, + consistencyPoint, TABLE_NAME, CF_NAME); + + assertEquals(maxLookbackWindowStart, result); + } + + @Test + public void testNoChangeWhenConsistencyPointEqualsWindowStart() { + long maxLookbackWindowStart = 500000L; + long consistencyPoint = 500000L; + + long result = CompactionScanner.adjustMaxLookbackWindowStart(maxLookbackWindowStart, + consistencyPoint, TABLE_NAME, CF_NAME); + + assertEquals(maxLookbackWindowStart, result); + } + + @Test + public void testConsistencyPointAtZeroRetainsAll() { + long maxLookbackWindowStart = 1000000L; + long consistencyPoint = 0L; + + long result = CompactionScanner.adjustMaxLookbackWindowStart(maxLookbackWindowStart, + consistencyPoint, TABLE_NAME, CF_NAME); + + assertEquals(0L, result); + } + + @Test + public void testLargeTimestampsNoAdjustmentNeeded() { + long maxLookbackWindowStart = System.currentTimeMillis() - 86400000L; + long consistencyPoint = System.currentTimeMillis() - 120000L; + + long result = CompactionScanner.adjustMaxLookbackWindowStart(maxLookbackWindowStart, + consistencyPoint, TABLE_NAME, CF_NAME); + + assertEquals(maxLookbackWindowStart, result); + } + + @Test + public void testConsistencyPointFarInPastPushesWindowBack() { + long maxLookbackWindowStart = System.currentTimeMillis() - 86400000L; + long consistencyPoint = System.currentTimeMillis() - 604800000L; + + long result = CompactionScanner.adjustMaxLookbackWindowStart(maxLookbackWindowStart, + consistencyPoint, TABLE_NAME, CF_NAME); + + assertEquals(consistencyPoint, result); + } +} From 38679e800f23ab8e9ebd6b2582a506c4116cb0a6 Mon Sep 17 00:00:00 2001 From: Himanshu Gwalani Date: Wed, 27 May 2026 20:07:41 +0530 Subject: [PATCH 02/13] PHOENIX-7863 Move compaction guard logic to ReplicationLogReplayService Move applyReplicationConsistencyGuard and adjustMaxLookbackWindowStart from CompactionScanner to ReplicationLogReplayService where the consistency point logic belongs. Relocate unit test to replication.reader package accordingly. --- .../coprocessor/CompactionScanner.java | 43 ++----------------- .../reader/ReplicationLogReplayService.java | 32 ++++++++++++++ .../ReplicationCompactionGuardTest.java} | 32 +++++++------- 3 files changed, 51 insertions(+), 56 deletions(-) rename phoenix-core/src/test/java/org/apache/phoenix/{coprocessor/CompactionScannerReplicationGuardTest.java => replication/reader/ReplicationCompactionGuardTest.java} (67%) diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java index 6945599e604..7f41dad9e7f 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java @@ -207,8 +207,9 @@ public CompactionScanner(RegionCoprocessorEnvironment env, Store store, boolean guardEnabled = conf.getBoolean(QueryServices.REPLICATION_COMPACTION_GUARD_ENABLED, QueryServicesOptions.DEFAULT_REPLICATION_COMPACTION_GUARD_ENABLED); if (replayEnabled && guardEnabled) { - this.maxLookbackWindowStart = applyReplicationConsistencyGuard(this.maxLookbackWindowStart, - conf, tableName, columnFamilyName); + this.maxLookbackWindowStart = + ReplicationLogReplayService.applyReplicationConsistencyGuard(this.maxLookbackWindowStart, + conf, tableName, columnFamilyName); } ColumnFamilyDescriptor cfd = store.getColumnFamilyDescriptor(); this.major = major && !forceMinorCompaction; @@ -370,44 +371,6 @@ public static long getMaxLookbackInMillis(String tableName, String columnFamilyN : maxLookbackMap.get(tableName + CompactionScanner.SEPARATOR + columnFamilyName); } - /** - * Applies the replication replay consistency point as a floor on maxLookbackWindowStart. On - * standby clusters, this prevents compaction from dropping delete markers that have timestamps - * newer than the consistency point. - */ - @VisibleForTesting - static long applyReplicationConsistencyGuard(long currentMaxLookbackWindowStart, - Configuration conf, String tableName, String columnFamilyName) { - try { - long consistencyPoint = getConsistencyPointFromReplayService(conf); - return adjustMaxLookbackWindowStart(currentMaxLookbackWindowStart, consistencyPoint, - tableName, columnFamilyName); - } catch (Exception e) { - LOGGER - .warn("Replication guard enabled but consistency point unavailable for table={} store={}." - + " Retaining all delete markers.", tableName, columnFamilyName, e); - return 0L; - } - } - - @VisibleForTesting - static long getConsistencyPointFromReplayService(Configuration conf) throws Exception { - ReplicationLogReplayService service = ReplicationLogReplayService.getInstance(conf); - return service.getConsistencyPoint(); - } - - @VisibleForTesting - static long adjustMaxLookbackWindowStart(long currentMaxLookbackWindowStart, - long consistencyPoint, String tableName, String columnFamilyName) { - long adjusted = Math.min(currentMaxLookbackWindowStart, consistencyPoint); - if (adjusted < currentMaxLookbackWindowStart) { - LOGGER.info( - "Replication guard: table={} store={} maxLookbackWindowStart adjusted from {} to {}" - + " (consistencyPoint={})", - tableName, columnFamilyName, currentMaxLookbackWindowStart, adjusted, consistencyPoint); - } - return adjusted; - } static class CellTimeComparator implements Comparator { public static final CellTimeComparator COMPARATOR = new CellTimeComparator(); diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java index 3c3c92de68f..37e84579dba 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java @@ -240,6 +240,38 @@ public long getConsistencyPoint() throws IOException, SQLException { return consistencyPoint; } + /** + * Applies the replication replay consistency point as a floor on maxLookbackWindowStart. On + * standby clusters, this prevents compaction from dropping delete markers that have timestamps + * newer than the consistency point. + */ + @VisibleForTesting + public static long applyReplicationConsistencyGuard(long currentMaxLookbackWindowStart, + Configuration conf, String tableName, String columnFamilyName) { + try { + long consistencyPoint = getInstance(conf).getConsistencyPoint(); + return adjustMaxLookbackWindowStart(currentMaxLookbackWindowStart, consistencyPoint, + tableName, columnFamilyName); + } catch (Exception e) { + LOG.warn("Replication guard enabled but consistency point unavailable for table={} store={}." + + " Retaining all delete markers.", tableName, columnFamilyName, e); + return 0L; + } + } + + @VisibleForTesting + public static long adjustMaxLookbackWindowStart(long currentMaxLookbackWindowStart, + long consistencyPoint, String tableName, String columnFamilyName) { + long adjusted = Math.min(currentMaxLookbackWindowStart, consistencyPoint); + if (adjusted < currentMaxLookbackWindowStart) { + LOG.info( + "Replication guard: table={} store={} maxLookbackWindowStart adjusted from {} to {}" + + " (consistencyPoint={})", + tableName, columnFamilyName, currentMaxLookbackWindowStart, adjusted, consistencyPoint); + } + return adjusted; + } + /** Returns the list of HA groups on the cluster */ protected List getReplicationGroups() throws SQLException { return HAGroupStoreManager.getInstance(conf).getHAGroupNames(); diff --git a/phoenix-core/src/test/java/org/apache/phoenix/coprocessor/CompactionScannerReplicationGuardTest.java b/phoenix-core/src/test/java/org/apache/phoenix/replication/reader/ReplicationCompactionGuardTest.java similarity index 67% rename from phoenix-core/src/test/java/org/apache/phoenix/coprocessor/CompactionScannerReplicationGuardTest.java rename to phoenix-core/src/test/java/org/apache/phoenix/replication/reader/ReplicationCompactionGuardTest.java index 6fbc43bea2a..3248106134a 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/coprocessor/CompactionScannerReplicationGuardTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/replication/reader/ReplicationCompactionGuardTest.java @@ -15,18 +15,18 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.phoenix.coprocessor; +package org.apache.phoenix.replication.reader; import static org.junit.Assert.assertEquals; import org.junit.Test; /** - * Tests for the replication consistency point guard in CompactionScanner. Tests the pure adjustment - * logic (adjustMaxLookbackWindowStart) which is the core of the guard — ensuring + * Tests for the replication consistency point guard in ReplicationLogReplayService. Tests the pure + * adjustment logic (adjustMaxLookbackWindowStart) which is the core of the guard — ensuring * maxLookbackWindowStart is floored to the consistency point. */ -public class CompactionScannerReplicationGuardTest { +public class ReplicationCompactionGuardTest { private static final String TABLE_NAME = "TEST_TABLE"; private static final String CF_NAME = "0"; @@ -36,8 +36,8 @@ public void testAdjustsWindowWhenConsistencyPointIsLower() { long maxLookbackWindowStart = 1000000L; long consistencyPoint = 500000L; - long result = CompactionScanner.adjustMaxLookbackWindowStart(maxLookbackWindowStart, - consistencyPoint, TABLE_NAME, CF_NAME); + long result = ReplicationLogReplayService.adjustMaxLookbackWindowStart( + maxLookbackWindowStart, consistencyPoint, TABLE_NAME, CF_NAME); assertEquals(consistencyPoint, result); } @@ -47,8 +47,8 @@ public void testNoChangeWhenConsistencyPointIsHigher() { long maxLookbackWindowStart = 500000L; long consistencyPoint = 1000000L; - long result = CompactionScanner.adjustMaxLookbackWindowStart(maxLookbackWindowStart, - consistencyPoint, TABLE_NAME, CF_NAME); + long result = ReplicationLogReplayService.adjustMaxLookbackWindowStart( + maxLookbackWindowStart, consistencyPoint, TABLE_NAME, CF_NAME); assertEquals(maxLookbackWindowStart, result); } @@ -58,8 +58,8 @@ public void testNoChangeWhenConsistencyPointEqualsWindowStart() { long maxLookbackWindowStart = 500000L; long consistencyPoint = 500000L; - long result = CompactionScanner.adjustMaxLookbackWindowStart(maxLookbackWindowStart, - consistencyPoint, TABLE_NAME, CF_NAME); + long result = ReplicationLogReplayService.adjustMaxLookbackWindowStart( + maxLookbackWindowStart, consistencyPoint, TABLE_NAME, CF_NAME); assertEquals(maxLookbackWindowStart, result); } @@ -69,8 +69,8 @@ public void testConsistencyPointAtZeroRetainsAll() { long maxLookbackWindowStart = 1000000L; long consistencyPoint = 0L; - long result = CompactionScanner.adjustMaxLookbackWindowStart(maxLookbackWindowStart, - consistencyPoint, TABLE_NAME, CF_NAME); + long result = ReplicationLogReplayService.adjustMaxLookbackWindowStart( + maxLookbackWindowStart, consistencyPoint, TABLE_NAME, CF_NAME); assertEquals(0L, result); } @@ -80,8 +80,8 @@ public void testLargeTimestampsNoAdjustmentNeeded() { long maxLookbackWindowStart = System.currentTimeMillis() - 86400000L; long consistencyPoint = System.currentTimeMillis() - 120000L; - long result = CompactionScanner.adjustMaxLookbackWindowStart(maxLookbackWindowStart, - consistencyPoint, TABLE_NAME, CF_NAME); + long result = ReplicationLogReplayService.adjustMaxLookbackWindowStart( + maxLookbackWindowStart, consistencyPoint, TABLE_NAME, CF_NAME); assertEquals(maxLookbackWindowStart, result); } @@ -91,8 +91,8 @@ public void testConsistencyPointFarInPastPushesWindowBack() { long maxLookbackWindowStart = System.currentTimeMillis() - 86400000L; long consistencyPoint = System.currentTimeMillis() - 604800000L; - long result = CompactionScanner.adjustMaxLookbackWindowStart(maxLookbackWindowStart, - consistencyPoint, TABLE_NAME, CF_NAME); + long result = ReplicationLogReplayService.adjustMaxLookbackWindowStart( + maxLookbackWindowStart, consistencyPoint, TABLE_NAME, CF_NAME); assertEquals(consistencyPoint, result); } From 031fb6ec3faf8f575f9becb9a2b9be97eeb810e3 Mon Sep 17 00:00:00 2001 From: Himanshu Gwalani Date: Wed, 27 May 2026 20:12:52 +0530 Subject: [PATCH 03/13] PHOENIX-7863 Revert getConsistencyPoint visibility to protected Now that the guard logic lives in the same class, public access is no longer needed. Tests are in the same package and can still access it. --- .../phoenix/replication/reader/ReplicationLogReplayService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java index 37e84579dba..e5a53a74c64 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java @@ -230,7 +230,7 @@ protected void stopReplicationReplay() throws IOException, SQLException { * @throws IOException if there's an error retrieving consistency points from replication groups * @throws SQLException if there's an error accessing HA group information */ - public long getConsistencyPoint() throws IOException, SQLException { + protected long getConsistencyPoint() throws IOException, SQLException { long consistencyPoint = EnvironmentEdgeManager.currentTime(); List replicationGroups = getReplicationGroups(); for (String replicationGroup : replicationGroups) { From 9fa0e8559878b5a8e755fbd6b11e6be4185aa693 Mon Sep 17 00:00:00 2001 From: Himanshu Gwalani Date: Wed, 27 May 2026 20:23:31 +0530 Subject: [PATCH 04/13] PHOENIX-7863 Move guard config constants to ReplicationLogReplayService Co-locate REPLICATION_COMPACTION_GUARD_ENABLED and its default with the other replication config constants in ReplicationLogReplayService, removing them from QueryServices/QueryServicesOptions. --- .../java/org/apache/phoenix/query/QueryServices.java | 2 -- .../apache/phoenix/query/QueryServicesOptions.java | 2 -- .../apache/phoenix/coprocessor/CompactionScanner.java | 5 +++-- .../reader/ReplicationLogReplayService.java | 11 +++++++++++ .../reader/CompactionReplicationGuardDisabledIT.java | 3 ++- .../reader/CompactionReplicationGuardIT.java | 3 ++- 6 files changed, 18 insertions(+), 8 deletions(-) diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java index 1609d891582..72075853aa8 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java @@ -644,8 +644,6 @@ public interface QueryServices extends SQLCloseable { String SYNCHRONOUS_REPLICATION_ENABLED = "phoenix.synchronous.replication.enabled"; - String REPLICATION_COMPACTION_GUARD_ENABLED = "phoenix.replication.compaction.guard.enabled"; - // HA Group Store sync job interval in seconds String HA_GROUP_STORE_SYNC_INTERVAL_SECONDS = "phoenix.ha.group.store.sync.interval.seconds"; diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java index 997f91baafb..339b6763a45 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java @@ -514,8 +514,6 @@ public class QueryServicesOptions { public static final Boolean DEFAULT_SYNCHRONOUS_REPLICATION_ENABLED = false; - public static final boolean DEFAULT_REPLICATION_COMPACTION_GUARD_ENABLED = true; - // Default HA Group Store sync job interval in seconds (15 minutes = 900 seconds) public static final int DEFAULT_HA_GROUP_STORE_SYNC_INTERVAL_SECONDS = 900; diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java index 7f41dad9e7f..7a954d06e1d 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java @@ -204,8 +204,9 @@ public CompactionScanner(RegionCoprocessorEnvironment env, Store store, boolean replayEnabled = conf.getBoolean(ReplicationLogReplayService.PHOENIX_REPLICATION_REPLAY_ENABLED, ReplicationLogReplayService.DEFAULT_REPLICATION_REPLAY_ENABLED); - boolean guardEnabled = conf.getBoolean(QueryServices.REPLICATION_COMPACTION_GUARD_ENABLED, - QueryServicesOptions.DEFAULT_REPLICATION_COMPACTION_GUARD_ENABLED); + boolean guardEnabled = conf.getBoolean( + ReplicationLogReplayService.REPLICATION_COMPACTION_GUARD_ENABLED, + ReplicationLogReplayService.DEFAULT_REPLICATION_COMPACTION_GUARD_ENABLED); if (replayEnabled && guardEnabled) { this.maxLookbackWindowStart = ReplicationLogReplayService.applyReplicationConsistencyGuard(this.maxLookbackWindowStart, diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java index e5a53a74c64..33cd922bcb8 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java @@ -51,6 +51,17 @@ public class ReplicationLogReplayService { */ public static final boolean DEFAULT_REPLICATION_REPLAY_ENABLED = false; + /** + * Configuration key for enabling/disabling the replication compaction guard + */ + public static final String REPLICATION_COMPACTION_GUARD_ENABLED = + "phoenix.replication.compaction.guard.enabled"; + + /** + * Default value for replication compaction guard enabled flag + */ + public static final boolean DEFAULT_REPLICATION_COMPACTION_GUARD_ENABLED = true; + /** * Number of threads in the executor pool for the replication replay service */ diff --git a/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardDisabledIT.java b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardDisabledIT.java index 40adc1126af..3755ab73c27 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardDisabledIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardDisabledIT.java @@ -63,7 +63,8 @@ public static synchronized void doSetup() throws Exception { props.put(QueryServices.PHOENIX_COMPACTION_ENABLED, Boolean.toString(true)); props.put(ReplicationLogReplayService.PHOENIX_REPLICATION_REPLAY_ENABLED, Boolean.toString(true)); - props.put(QueryServices.REPLICATION_COMPACTION_GUARD_ENABLED, Boolean.toString(false)); + props.put(ReplicationLogReplayService.REPLICATION_COMPACTION_GUARD_ENABLED, + Boolean.toString(false)); props.put("hbase.procedure.remote.dispatcher.delay.msec", "0"); setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); } diff --git a/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardIT.java b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardIT.java index bfcec12b646..7b53db3c1f8 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardIT.java @@ -64,7 +64,8 @@ public static synchronized void doSetup() throws Exception { props.put(QueryServices.PHOENIX_COMPACTION_ENABLED, Boolean.toString(true)); props.put(ReplicationLogReplayService.PHOENIX_REPLICATION_REPLAY_ENABLED, Boolean.toString(true)); - props.put(QueryServices.REPLICATION_COMPACTION_GUARD_ENABLED, Boolean.toString(true)); + props.put(ReplicationLogReplayService.REPLICATION_COMPACTION_GUARD_ENABLED, + Boolean.toString(true)); props.put("hbase.procedure.remote.dispatcher.delay.msec", "0"); setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); } From ca86133d90aca12ba3447efb11d63671d894b6c3 Mon Sep 17 00:00:00 2001 From: Himanshu Gwalani Date: Wed, 27 May 2026 20:53:53 +0530 Subject: [PATCH 05/13] PHOENIX-7863 Apply spotless formatting --- .../coprocessor/CompactionScanner.java | 12 ++++------ .../CompactionReplicationGuardDisabledIT.java | 1 - .../reader/CompactionReplicationGuardIT.java | 1 - .../ReplicationCompactionGuardTest.java | 24 +++++++++---------- 4 files changed, 17 insertions(+), 21 deletions(-) diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java index 7a954d06e1d..33fb6bf0cc8 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java @@ -204,13 +204,12 @@ public CompactionScanner(RegionCoprocessorEnvironment env, Store store, boolean replayEnabled = conf.getBoolean(ReplicationLogReplayService.PHOENIX_REPLICATION_REPLAY_ENABLED, ReplicationLogReplayService.DEFAULT_REPLICATION_REPLAY_ENABLED); - boolean guardEnabled = conf.getBoolean( - ReplicationLogReplayService.REPLICATION_COMPACTION_GUARD_ENABLED, - ReplicationLogReplayService.DEFAULT_REPLICATION_COMPACTION_GUARD_ENABLED); + boolean guardEnabled = + conf.getBoolean(ReplicationLogReplayService.REPLICATION_COMPACTION_GUARD_ENABLED, + ReplicationLogReplayService.DEFAULT_REPLICATION_COMPACTION_GUARD_ENABLED); if (replayEnabled && guardEnabled) { - this.maxLookbackWindowStart = - ReplicationLogReplayService.applyReplicationConsistencyGuard(this.maxLookbackWindowStart, - conf, tableName, columnFamilyName); + this.maxLookbackWindowStart = ReplicationLogReplayService.applyReplicationConsistencyGuard( + this.maxLookbackWindowStart, conf, tableName, columnFamilyName); } ColumnFamilyDescriptor cfd = store.getColumnFamilyDescriptor(); this.major = major && !forceMinorCompaction; @@ -372,7 +371,6 @@ public static long getMaxLookbackInMillis(String tableName, String columnFamilyN : maxLookbackMap.get(tableName + CompactionScanner.SEPARATOR + columnFamilyName); } - static class CellTimeComparator implements Comparator { public static final CellTimeComparator COMPARATOR = new CellTimeComparator(); diff --git a/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardDisabledIT.java b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardDisabledIT.java index 3755ab73c27..8ed243e1690 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardDisabledIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardDisabledIT.java @@ -32,7 +32,6 @@ import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest; import org.apache.phoenix.query.BaseTest; import org.apache.phoenix.query.QueryServices; -import org.apache.phoenix.replication.reader.ReplicationLogReplayService; import org.apache.phoenix.util.EnvironmentEdgeManager; import org.apache.phoenix.util.ManualEnvironmentEdge; import org.apache.phoenix.util.ReadOnlyProps; diff --git a/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardIT.java b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardIT.java index 7b53db3c1f8..1ad78f0ab56 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardIT.java @@ -32,7 +32,6 @@ import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest; import org.apache.phoenix.query.BaseTest; import org.apache.phoenix.query.QueryServices; -import org.apache.phoenix.replication.reader.ReplicationLogReplayService; import org.apache.phoenix.util.EnvironmentEdgeManager; import org.apache.phoenix.util.ManualEnvironmentEdge; import org.apache.phoenix.util.ReadOnlyProps; diff --git a/phoenix-core/src/test/java/org/apache/phoenix/replication/reader/ReplicationCompactionGuardTest.java b/phoenix-core/src/test/java/org/apache/phoenix/replication/reader/ReplicationCompactionGuardTest.java index 3248106134a..0cffee289b0 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/replication/reader/ReplicationCompactionGuardTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/replication/reader/ReplicationCompactionGuardTest.java @@ -36,8 +36,8 @@ public void testAdjustsWindowWhenConsistencyPointIsLower() { long maxLookbackWindowStart = 1000000L; long consistencyPoint = 500000L; - long result = ReplicationLogReplayService.adjustMaxLookbackWindowStart( - maxLookbackWindowStart, consistencyPoint, TABLE_NAME, CF_NAME); + long result = ReplicationLogReplayService.adjustMaxLookbackWindowStart(maxLookbackWindowStart, + consistencyPoint, TABLE_NAME, CF_NAME); assertEquals(consistencyPoint, result); } @@ -47,8 +47,8 @@ public void testNoChangeWhenConsistencyPointIsHigher() { long maxLookbackWindowStart = 500000L; long consistencyPoint = 1000000L; - long result = ReplicationLogReplayService.adjustMaxLookbackWindowStart( - maxLookbackWindowStart, consistencyPoint, TABLE_NAME, CF_NAME); + long result = ReplicationLogReplayService.adjustMaxLookbackWindowStart(maxLookbackWindowStart, + consistencyPoint, TABLE_NAME, CF_NAME); assertEquals(maxLookbackWindowStart, result); } @@ -58,8 +58,8 @@ public void testNoChangeWhenConsistencyPointEqualsWindowStart() { long maxLookbackWindowStart = 500000L; long consistencyPoint = 500000L; - long result = ReplicationLogReplayService.adjustMaxLookbackWindowStart( - maxLookbackWindowStart, consistencyPoint, TABLE_NAME, CF_NAME); + long result = ReplicationLogReplayService.adjustMaxLookbackWindowStart(maxLookbackWindowStart, + consistencyPoint, TABLE_NAME, CF_NAME); assertEquals(maxLookbackWindowStart, result); } @@ -69,8 +69,8 @@ public void testConsistencyPointAtZeroRetainsAll() { long maxLookbackWindowStart = 1000000L; long consistencyPoint = 0L; - long result = ReplicationLogReplayService.adjustMaxLookbackWindowStart( - maxLookbackWindowStart, consistencyPoint, TABLE_NAME, CF_NAME); + long result = ReplicationLogReplayService.adjustMaxLookbackWindowStart(maxLookbackWindowStart, + consistencyPoint, TABLE_NAME, CF_NAME); assertEquals(0L, result); } @@ -80,8 +80,8 @@ public void testLargeTimestampsNoAdjustmentNeeded() { long maxLookbackWindowStart = System.currentTimeMillis() - 86400000L; long consistencyPoint = System.currentTimeMillis() - 120000L; - long result = ReplicationLogReplayService.adjustMaxLookbackWindowStart( - maxLookbackWindowStart, consistencyPoint, TABLE_NAME, CF_NAME); + long result = ReplicationLogReplayService.adjustMaxLookbackWindowStart(maxLookbackWindowStart, + consistencyPoint, TABLE_NAME, CF_NAME); assertEquals(maxLookbackWindowStart, result); } @@ -91,8 +91,8 @@ public void testConsistencyPointFarInPastPushesWindowBack() { long maxLookbackWindowStart = System.currentTimeMillis() - 86400000L; long consistencyPoint = System.currentTimeMillis() - 604800000L; - long result = ReplicationLogReplayService.adjustMaxLookbackWindowStart( - maxLookbackWindowStart, consistencyPoint, TABLE_NAME, CF_NAME); + long result = ReplicationLogReplayService.adjustMaxLookbackWindowStart(maxLookbackWindowStart, + consistencyPoint, TABLE_NAME, CF_NAME); assertEquals(consistencyPoint, result); } From bca0768d34a141fd4e2803fa7333c5769cb031e3 Mon Sep 17 00:00:00 2001 From: Himanshu Gwalani Date: Fri, 29 May 2026 15:10:39 +0530 Subject: [PATCH 06/13] PHOENIX-7863 Fix visibility and capacity hints from review - Remove incorrect @VisibleForTesting from applyReplicationConsistencyGuard (it is genuinely public, called from CompactionScanner) - Reduce adjustMaxLookbackWindowStart to package-private - Fix newHashMapWithExpectedSize(4) to 5 in both IT classes --- .../replication/reader/ReplicationLogReplayService.java | 3 +-- .../reader/CompactionReplicationGuardDisabledIT.java | 2 +- .../replication/reader/CompactionReplicationGuardIT.java | 2 +- 3 files changed, 3 insertions(+), 4 deletions(-) diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java index 33cd922bcb8..e413ed73e9b 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java @@ -256,7 +256,6 @@ protected long getConsistencyPoint() throws IOException, SQLException { * standby clusters, this prevents compaction from dropping delete markers that have timestamps * newer than the consistency point. */ - @VisibleForTesting public static long applyReplicationConsistencyGuard(long currentMaxLookbackWindowStart, Configuration conf, String tableName, String columnFamilyName) { try { @@ -271,7 +270,7 @@ public static long applyReplicationConsistencyGuard(long currentMaxLookbackWindo } @VisibleForTesting - public static long adjustMaxLookbackWindowStart(long currentMaxLookbackWindowStart, + static long adjustMaxLookbackWindowStart(long currentMaxLookbackWindowStart, long consistencyPoint, String tableName, String columnFamilyName) { long adjusted = Math.min(currentMaxLookbackWindowStart, consistencyPoint); if (adjusted < currentMaxLookbackWindowStart) { diff --git a/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardDisabledIT.java b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardDisabledIT.java index 8ed243e1690..68ff6ed986f 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardDisabledIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardDisabledIT.java @@ -57,7 +57,7 @@ public class CompactionReplicationGuardDisabledIT extends BaseTest { @BeforeClass public static synchronized void doSetup() throws Exception { - Map props = Maps.newHashMapWithExpectedSize(4); + Map props = Maps.newHashMapWithExpectedSize(5); props.put(PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY, Integer.toString(MAX_LOOKBACK_AGE)); props.put(QueryServices.PHOENIX_COMPACTION_ENABLED, Boolean.toString(true)); props.put(ReplicationLogReplayService.PHOENIX_REPLICATION_REPLAY_ENABLED, diff --git a/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardIT.java b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardIT.java index 1ad78f0ab56..7788155eec3 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardIT.java @@ -58,7 +58,7 @@ public class CompactionReplicationGuardIT extends BaseTest { @BeforeClass public static synchronized void doSetup() throws Exception { - Map props = Maps.newHashMapWithExpectedSize(4); + Map props = Maps.newHashMapWithExpectedSize(5); props.put(PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY, Integer.toString(MAX_LOOKBACK_AGE)); props.put(QueryServices.PHOENIX_COMPACTION_ENABLED, Boolean.toString(true)); props.put(ReplicationLogReplayService.PHOENIX_REPLICATION_REPLAY_ENABLED, From a1396a8f51b0559123fb10178f39d359165bb7f8 Mon Sep 17 00:00:00 2001 From: Himanshu Gwalani Date: Sun, 7 Jun 2026 17:14:05 +0530 Subject: [PATCH 07/13] PHOENIX-7863 Address review: row-level guard, caching, visibility fixes - Move guard enforcement from store-level to RowContext.setTTL() to fix TTL window bypass (Math.min cap after Math.max with ttlWindowStart) - Add Guava memoized cache (30s TTL) for consistency point to reduce NameNode RPCs during compaction bursts - Gate guard on major compactions only (no effect during flush/minor) - Make constructor private, replace mock injection with test factory (setConsistencyPointForTesting) - Demote per-compaction logging from INFO to DEBUG - Keep guard config flag for operational flexibility --- .../coprocessor/CompactionScanner.java | 13 ++- .../reader/ReplicationLogReplayService.java | 58 +++++++----- .../CompactionReplicationGuardDisabledIT.java | 6 +- .../reader/CompactionReplicationGuardIT.java | 15 +-- .../ReplicationCompactionGuardTest.java | 93 +++++++++++-------- 5 files changed, 100 insertions(+), 85 deletions(-) diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java index 33fb6bf0cc8..a31b1fac2ef 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java @@ -140,6 +140,7 @@ public class CompactionScanner implements InternalScanner { private final Store store; private final RegionCoprocessorEnvironment env; private long maxLookbackWindowStart; + private final long replicationConsistencyPoint; private final long maxLookbackInMillis; private int minVersion; private int maxVersion; @@ -201,18 +202,20 @@ public CompactionScanner(RegionCoprocessorEnvironment env, Store store, ? compactionTime : compactionTime - (this.maxLookbackInMillis + 1); Configuration conf = env.getConfiguration(); + this.major = major && !forceMinorCompaction; boolean replayEnabled = conf.getBoolean(ReplicationLogReplayService.PHOENIX_REPLICATION_REPLAY_ENABLED, ReplicationLogReplayService.DEFAULT_REPLICATION_REPLAY_ENABLED); boolean guardEnabled = conf.getBoolean(ReplicationLogReplayService.REPLICATION_COMPACTION_GUARD_ENABLED, ReplicationLogReplayService.DEFAULT_REPLICATION_COMPACTION_GUARD_ENABLED); - if (replayEnabled && guardEnabled) { - this.maxLookbackWindowStart = ReplicationLogReplayService.applyReplicationConsistencyGuard( - this.maxLookbackWindowStart, conf, tableName, columnFamilyName); + if (this.major && replayEnabled && guardEnabled) { + this.replicationConsistencyPoint = + ReplicationLogReplayService.resolveConsistencyPoint(conf, tableName, columnFamilyName); + } else { + this.replicationConsistencyPoint = Long.MAX_VALUE; } ColumnFamilyDescriptor cfd = store.getColumnFamilyDescriptor(); - this.major = major && !forceMinorCompaction; this.minVersion = cfd.getMinVersions(); this.maxVersion = cfd.getMaxVersions(); this.keepDeletedCells = keepDeleted ? KeepDeletedCells.TTL : cfd.getKeepDeletedCells(); @@ -1670,6 +1673,8 @@ private void setTTL(long ttlInSecs) { this.ttl = Math.max(ttlInSecs * 1000, maxLookbackInMillis + 1); this.ttlWindowStart = ttlInSecs == HConstants.FOREVER ? 1 : compactionTime - ttl; this.maxLookbackWindowStartForRow = Math.max(ttlWindowStart, maxLookbackWindowStart); + this.maxLookbackWindowStartForRow = + Math.min(this.maxLookbackWindowStartForRow, replicationConsistencyPoint); if (LOGGER.isTraceEnabled()) { LOGGER.trace(String.format("RowContext:- (ttlWindowStart=%d, maxLookbackWindowStart=%d)", ttlWindowStart, maxLookbackWindowStart)); diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java index e413ed73e9b..9a88b7a5fba 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java @@ -30,6 +30,8 @@ import org.slf4j.LoggerFactory; import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting; +import org.apache.phoenix.thirdparty.com.google.common.base.Supplier; +import org.apache.phoenix.thirdparty.com.google.common.base.Suppliers; import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; /** @@ -89,14 +91,30 @@ public class ReplicationLogReplayService { */ public static final int DEFAULT_REPLICATION_REPLAY_SERVICE_EXECUTOR_SHUTDOWN_TIMEOUT_SECONDS = 30; + private static final long CONSISTENCY_POINT_CACHE_TTL_SECONDS = 30; + private static volatile ReplicationLogReplayService instance; private final Configuration conf; private ScheduledExecutorService scheduler; private volatile boolean isRunning = false; + private final Supplier cachedConsistencyPoint; - protected ReplicationLogReplayService(final Configuration conf) { + private ReplicationLogReplayService(final Configuration conf) { this.conf = conf; + this.cachedConsistencyPoint = Suppliers.memoizeWithExpiration(() -> { + try { + return getConsistencyPoint(); + } catch (Exception e) { + LOG.warn("Failed to refresh cached consistency point", e); + return 0L; + } + }, CONSISTENCY_POINT_CACHE_TTL_SECONDS, TimeUnit.SECONDS); + } + + private ReplicationLogReplayService(long fixedConsistencyPoint) { + this.conf = null; + this.cachedConsistencyPoint = () -> fixedConsistencyPoint; } /** @@ -118,8 +136,8 @@ public static ReplicationLogReplayService getInstance(Configuration conf) throws } @VisibleForTesting - public static void setInstanceForTesting(ReplicationLogReplayService testInstance) { - instance = testInstance; + public static void setConsistencyPointForTesting(long fixedConsistencyPoint) { + instance = new ReplicationLogReplayService(fixedConsistencyPoint); } @VisibleForTesting @@ -252,36 +270,26 @@ protected long getConsistencyPoint() throws IOException, SQLException { } /** - * Applies the replication replay consistency point as a floor on maxLookbackWindowStart. On - * standby clusters, this prevents compaction from dropping delete markers that have timestamps - * newer than the consistency point. + * Resolves the minimum replication consistency point across all HA groups. Uses a cached value + * with a 30-second TTL to avoid repeated NameNode RPCs during compaction bursts. Returns 0L on + * any failure (caller treats 0 as "retain all delete markers"). */ - public static long applyReplicationConsistencyGuard(long currentMaxLookbackWindowStart, - Configuration conf, String tableName, String columnFamilyName) { + public static long resolveConsistencyPoint(Configuration conf, String tableName, + String columnFamilyName) { try { - long consistencyPoint = getInstance(conf).getConsistencyPoint(); - return adjustMaxLookbackWindowStart(currentMaxLookbackWindowStart, consistencyPoint, - tableName, columnFamilyName); + long consistencyPoint = getInstance(conf).cachedConsistencyPoint.get(); + if (LOG.isDebugEnabled()) { + LOG.debug("Replication guard: table={} store={} consistencyPoint={}", tableName, + columnFamilyName, consistencyPoint); + } + return consistencyPoint; } catch (Exception e) { - LOG.warn("Replication guard enabled but consistency point unavailable for table={} store={}." + LOG.warn("Replication guard: consistency point unavailable for table={} store={}." + " Retaining all delete markers.", tableName, columnFamilyName, e); return 0L; } } - @VisibleForTesting - static long adjustMaxLookbackWindowStart(long currentMaxLookbackWindowStart, - long consistencyPoint, String tableName, String columnFamilyName) { - long adjusted = Math.min(currentMaxLookbackWindowStart, consistencyPoint); - if (adjusted < currentMaxLookbackWindowStart) { - LOG.info( - "Replication guard: table={} store={} maxLookbackWindowStart adjusted from {} to {}" - + " (consistencyPoint={})", - tableName, columnFamilyName, currentMaxLookbackWindowStart, adjusted, consistencyPoint); - } - return adjusted; - } - /** Returns the list of HA groups on the cluster */ protected List getReplicationGroups() throws SQLException { return HAGroupStoreManager.getInstance(conf).getHAGroupNames(); diff --git a/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardDisabledIT.java b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardDisabledIT.java index 68ff6ed986f..e43f8a4dfd5 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardDisabledIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardDisabledIT.java @@ -20,8 +20,6 @@ import static org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants.PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY; import static org.apache.phoenix.util.TestUtil.assertRawRowCount; import static org.junit.Assert.assertFalse; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; import java.io.IOException; import java.sql.Connection; @@ -106,9 +104,7 @@ public void testGuardDisabledDeleteMarkersPurgedByMaxLookback() throws Exception // Set consistency point BEFORE delete — guard would retain if enabled long consistencyPoint = beforeDeleteTime - 1; - ReplicationLogReplayService mockService = mock(ReplicationLogReplayService.class); - when(mockService.getConsistencyPoint()).thenReturn(consistencyPoint); - ReplicationLogReplayService.setInstanceForTesting(mockService); + ReplicationLogReplayService.setConsistencyPointForTesting(consistencyPoint); // Advance past maxLookback injectEdge.incrementValue(MAX_LOOKBACK_AGE * 1000 + 1000); diff --git a/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardIT.java b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardIT.java index 7788155eec3..3bd0960b739 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardIT.java @@ -20,8 +20,6 @@ import static org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants.PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY; import static org.apache.phoenix.util.TestUtil.assertRawRowCount; import static org.junit.Assert.assertFalse; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; import java.io.IOException; import java.sql.Connection; @@ -210,11 +208,8 @@ public void testGuardFallbackRetainsAllWhenConsistencyPointUnavailable() throws conn.commit(); injectEdge.incrementValue(1); - // Inject a mock that throws — simulating uninitialized replay service - ReplicationLogReplayService mockService = mock(ReplicationLogReplayService.class); - when(mockService.getConsistencyPoint()) - .thenThrow(new IOException("HA groups not initialized")); - ReplicationLogReplayService.setInstanceForTesting(mockService); + // Inject consistency point of 0 — simulating fallback when replay service is unavailable + ReplicationLogReplayService.setConsistencyPointForTesting(0L); // Advance past maxLookback injectEdge.incrementValue(MAX_LOOKBACK_AGE * 1000 + 1000); @@ -227,10 +222,8 @@ public void testGuardFallbackRetainsAllWhenConsistencyPointUnavailable() throws } } - private void injectMockConsistencyPoint(long consistencyPoint) throws IOException, SQLException { - ReplicationLogReplayService mockService = mock(ReplicationLogReplayService.class); - when(mockService.getConsistencyPoint()).thenReturn(consistencyPoint); - ReplicationLogReplayService.setInstanceForTesting(mockService); + private void injectMockConsistencyPoint(long consistencyPoint) { + ReplicationLogReplayService.setConsistencyPointForTesting(consistencyPoint); } private void flush(TableName table) throws IOException { diff --git a/phoenix-core/src/test/java/org/apache/phoenix/replication/reader/ReplicationCompactionGuardTest.java b/phoenix-core/src/test/java/org/apache/phoenix/replication/reader/ReplicationCompactionGuardTest.java index 0cffee289b0..82a694ad28c 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/replication/reader/ReplicationCompactionGuardTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/replication/reader/ReplicationCompactionGuardTest.java @@ -22,78 +22,91 @@ import org.junit.Test; /** - * Tests for the replication consistency point guard in ReplicationLogReplayService. Tests the pure - * adjustment logic (adjustMaxLookbackWindowStart) which is the core of the guard — ensuring - * maxLookbackWindowStart is floored to the consistency point. + * Tests for the replication compaction guard row-level cap logic. Verifies the contract: + * maxLookbackWindowStartForRow = min(max(ttlWindowStart, maxLookbackWindowStart), consistencyPoint) */ public class ReplicationCompactionGuardTest { - private static final String TABLE_NAME = "TEST_TABLE"; - private static final String CF_NAME = "0"; + /** + * Simulates the RowContext computation in CompactionScanner.RowContext.setTTL(): + * maxLookbackWindowStartForRow = min(max(ttlWindowStart, maxLookbackWindowStart), + * replicationConsistencyPoint) + */ + private static long computeRowBoundary(long ttlWindowStart, long maxLookbackWindowStart, + long replicationConsistencyPoint) { + long rowBoundary = Math.max(ttlWindowStart, maxLookbackWindowStart); + return Math.min(rowBoundary, replicationConsistencyPoint); + } @Test - public void testAdjustsWindowWhenConsistencyPointIsLower() { - long maxLookbackWindowStart = 1000000L; - long consistencyPoint = 500000L; + public void testTtlHigherThanConsistencyPoint_capApplied() { + long now = System.currentTimeMillis(); + long maxLookbackWindowStart = now - 86400000L; + long ttlWindowStart = now - 3600000L; + long consistencyPoint = now - 7200000L; - long result = ReplicationLogReplayService.adjustMaxLookbackWindowStart(maxLookbackWindowStart, - consistencyPoint, TABLE_NAME, CF_NAME); + long result = computeRowBoundary(ttlWindowStart, maxLookbackWindowStart, consistencyPoint); assertEquals(consistencyPoint, result); } @Test - public void testNoChangeWhenConsistencyPointIsHigher() { - long maxLookbackWindowStart = 500000L; - long consistencyPoint = 1000000L; + public void testTtlLowerThanConsistencyPoint_noCapNeeded() { + long now = System.currentTimeMillis(); + long maxLookbackWindowStart = now - 86400000L; + long ttlWindowStart = now - 14400000L; + long consistencyPoint = now - 3600000L; - long result = ReplicationLogReplayService.adjustMaxLookbackWindowStart(maxLookbackWindowStart, - consistencyPoint, TABLE_NAME, CF_NAME); + long result = computeRowBoundary(ttlWindowStart, maxLookbackWindowStart, consistencyPoint); - assertEquals(maxLookbackWindowStart, result); + assertEquals(ttlWindowStart, result); } @Test - public void testNoChangeWhenConsistencyPointEqualsWindowStart() { - long maxLookbackWindowStart = 500000L; - long consistencyPoint = 500000L; + public void testNoTtl_capAppliedOnMaxLookback() { + long now = System.currentTimeMillis(); + long maxLookbackWindowStart = now - 86400000L; + long ttlWindowStart = 1L; + long consistencyPoint = now - 172800000L; - long result = ReplicationLogReplayService.adjustMaxLookbackWindowStart(maxLookbackWindowStart, - consistencyPoint, TABLE_NAME, CF_NAME); + long result = computeRowBoundary(ttlWindowStart, maxLookbackWindowStart, consistencyPoint); - assertEquals(maxLookbackWindowStart, result); + assertEquals(consistencyPoint, result); } @Test - public void testConsistencyPointAtZeroRetainsAll() { - long maxLookbackWindowStart = 1000000L; - long consistencyPoint = 0L; + public void testNoTtl_consistencyPointAheadOfMaxLookback() { + long now = System.currentTimeMillis(); + long maxLookbackWindowStart = now - 86400000L; + long ttlWindowStart = 1L; + long consistencyPoint = now - 60000L; - long result = ReplicationLogReplayService.adjustMaxLookbackWindowStart(maxLookbackWindowStart, - consistencyPoint, TABLE_NAME, CF_NAME); + long result = computeRowBoundary(ttlWindowStart, maxLookbackWindowStart, consistencyPoint); - assertEquals(0L, result); + assertEquals(maxLookbackWindowStart, result); } @Test - public void testLargeTimestampsNoAdjustmentNeeded() { - long maxLookbackWindowStart = System.currentTimeMillis() - 86400000L; - long consistencyPoint = System.currentTimeMillis() - 120000L; + public void testConsistencyPointZero_retainsAll() { + long now = System.currentTimeMillis(); + long maxLookbackWindowStart = now - 86400000L; + long ttlWindowStart = now - 3600000L; + long consistencyPoint = 0L; - long result = ReplicationLogReplayService.adjustMaxLookbackWindowStart(maxLookbackWindowStart, - consistencyPoint, TABLE_NAME, CF_NAME); + long result = computeRowBoundary(ttlWindowStart, maxLookbackWindowStart, consistencyPoint); - assertEquals(maxLookbackWindowStart, result); + assertEquals(0L, result); } @Test - public void testConsistencyPointFarInPastPushesWindowBack() { - long maxLookbackWindowStart = System.currentTimeMillis() - 86400000L; - long consistencyPoint = System.currentTimeMillis() - 604800000L; + public void testGuardDisabled_longMaxValueNoOp() { + long now = System.currentTimeMillis(); + long maxLookbackWindowStart = now - 86400000L; + long ttlWindowStart = now - 3600000L; + long consistencyPoint = Long.MAX_VALUE; - long result = ReplicationLogReplayService.adjustMaxLookbackWindowStart(maxLookbackWindowStart, - consistencyPoint, TABLE_NAME, CF_NAME); + long result = computeRowBoundary(ttlWindowStart, maxLookbackWindowStart, consistencyPoint); - assertEquals(consistencyPoint, result); + assertEquals(ttlWindowStart, result); } } From 05bdea560a1c5843673d2e4b43effb06e9ad3bdf Mon Sep 17 00:00:00 2001 From: Himanshu Gwalani Date: Sun, 7 Jun 2026 18:04:23 +0530 Subject: [PATCH 08/13] PHOENIX-7863 Add cache test for consistency point resolution Verify that resolveConsistencyPoint uses cached value from Guava memoized supplier and does not re-fetch on every call within TTL. --- .../reader/ReplicationLogReplayService.java | 11 ++++++++ .../ReplicationCompactionGuardTest.java | 28 +++++++++++++++++++ 2 files changed, 39 insertions(+) diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java index 9a88b7a5fba..e4dd1ff0359 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java @@ -117,6 +117,12 @@ private ReplicationLogReplayService(long fixedConsistencyPoint) { this.cachedConsistencyPoint = () -> fixedConsistencyPoint; } + private ReplicationLogReplayService(Supplier supplier) { + this.conf = null; + this.cachedConsistencyPoint = Suppliers.memoizeWithExpiration( + supplier, CONSISTENCY_POINT_CACHE_TTL_SECONDS, TimeUnit.SECONDS); + } + /** * Gets the singleton instance of the ReplicationLogReplayService using the lazy initializer * pattern. Initializes the instance if it hasn't been created yet. @@ -140,6 +146,11 @@ public static void setConsistencyPointForTesting(long fixedConsistencyPoint) { instance = new ReplicationLogReplayService(fixedConsistencyPoint); } + @VisibleForTesting + public static void setConsistencyPointSupplierForTesting(Supplier supplier) { + instance = new ReplicationLogReplayService(supplier); + } + @VisibleForTesting public static void resetInstanceForTesting() { instance = null; diff --git a/phoenix-core/src/test/java/org/apache/phoenix/replication/reader/ReplicationCompactionGuardTest.java b/phoenix-core/src/test/java/org/apache/phoenix/replication/reader/ReplicationCompactionGuardTest.java index 82a694ad28c..85279cd88ae 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/replication/reader/ReplicationCompactionGuardTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/replication/reader/ReplicationCompactionGuardTest.java @@ -19,6 +19,8 @@ import static org.junit.Assert.assertEquals; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.hadoop.conf.Configuration; import org.junit.Test; /** @@ -109,4 +111,30 @@ public void testGuardDisabled_longMaxValueNoOp() { assertEquals(ttlWindowStart, result); } + + @Test + public void testCachedConsistencyPointAvoidsRepeatedFetches() { + AtomicInteger fetchCount = new AtomicInteger(0); + ReplicationLogReplayService.setConsistencyPointSupplierForTesting(() -> { + fetchCount.incrementAndGet(); + return 500000L; + }); + + try { + Configuration conf = new Configuration(false); + String table = "TEST_TABLE"; + String cf = "0"; + + long result1 = ReplicationLogReplayService.resolveConsistencyPoint(conf, table, cf); + long result2 = ReplicationLogReplayService.resolveConsistencyPoint(conf, table, cf); + long result3 = ReplicationLogReplayService.resolveConsistencyPoint(conf, table, cf); + + assertEquals(500000L, result1); + assertEquals(500000L, result2); + assertEquals(500000L, result3); + assertEquals(1, fetchCount.get()); + } finally { + ReplicationLogReplayService.resetInstanceForTesting(); + } + } } From 438072f3d6b8ecc6fecab0470376214ba908645d Mon Sep 17 00:00:00 2001 From: Himanshu Gwalani Date: Thu, 11 Jun 2026 19:18:40 +0530 Subject: [PATCH 09/13] PHOENIX-7863 Fix cached failure and extract guard formula as single source of truth --- .../coprocessor/CompactionScanner.java | 4 +- .../reader/ReplicationLogReplayService.java | 20 ++++--- .../ReplicationCompactionGuardTest.java | 57 +++++++++++++------ 3 files changed, 55 insertions(+), 26 deletions(-) diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java index a31b1fac2ef..977c6d37608 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java @@ -1672,9 +1672,9 @@ private void init() { private void setTTL(long ttlInSecs) { this.ttl = Math.max(ttlInSecs * 1000, maxLookbackInMillis + 1); this.ttlWindowStart = ttlInSecs == HConstants.FOREVER ? 1 : compactionTime - ttl; - this.maxLookbackWindowStartForRow = Math.max(ttlWindowStart, maxLookbackWindowStart); this.maxLookbackWindowStartForRow = - Math.min(this.maxLookbackWindowStartForRow, replicationConsistencyPoint); + ReplicationLogReplayService.computeRowMaxLookbackWithGuard(ttlWindowStart, + maxLookbackWindowStart, replicationConsistencyPoint); if (LOGGER.isTraceEnabled()) { LOGGER.trace(String.format("RowContext:- (ttlWindowStart=%d, maxLookbackWindowStart=%d)", ttlWindowStart, maxLookbackWindowStart)); diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java index e4dd1ff0359..144628603c3 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java @@ -105,22 +105,19 @@ private ReplicationLogReplayService(final Configuration conf) { this.cachedConsistencyPoint = Suppliers.memoizeWithExpiration(() -> { try { return getConsistencyPoint(); - } catch (Exception e) { - LOG.warn("Failed to refresh cached consistency point", e); - return 0L; + } catch (IOException | SQLException e) { + throw new RuntimeException("Failed to fetch consistency point", e); } }, CONSISTENCY_POINT_CACHE_TTL_SECONDS, TimeUnit.SECONDS); } private ReplicationLogReplayService(long fixedConsistencyPoint) { - this.conf = null; this.cachedConsistencyPoint = () -> fixedConsistencyPoint; } private ReplicationLogReplayService(Supplier supplier) { - this.conf = null; - this.cachedConsistencyPoint = Suppliers.memoizeWithExpiration( - supplier, CONSISTENCY_POINT_CACHE_TTL_SECONDS, TimeUnit.SECONDS); + this.cachedConsistencyPoint = Suppliers.memoizeWithExpiration(supplier, + CONSISTENCY_POINT_CACHE_TTL_SECONDS, TimeUnit.SECONDS); } /** @@ -301,6 +298,15 @@ public static long resolveConsistencyPoint(Configuration conf, String tableName, } } + /** + * Computes the effective max-lookback boundary for a row, capped by the replication consistency + * point. This is the single source of truth for the formula used by CompactionScanner.RowContext. + */ + public static long computeRowMaxLookbackWithGuard(long ttlWindowStart, + long maxLookbackWindowStart, long replicationConsistencyPoint) { + return Math.min(Math.max(ttlWindowStart, maxLookbackWindowStart), replicationConsistencyPoint); + } + /** Returns the list of HA groups on the cluster */ protected List getReplicationGroups() throws SQLException { return HAGroupStoreManager.getInstance(conf).getHAGroupNames(); diff --git a/phoenix-core/src/test/java/org/apache/phoenix/replication/reader/ReplicationCompactionGuardTest.java b/phoenix-core/src/test/java/org/apache/phoenix/replication/reader/ReplicationCompactionGuardTest.java index 85279cd88ae..13d0e01f46a 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/replication/reader/ReplicationCompactionGuardTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/replication/reader/ReplicationCompactionGuardTest.java @@ -29,17 +29,6 @@ */ public class ReplicationCompactionGuardTest { - /** - * Simulates the RowContext computation in CompactionScanner.RowContext.setTTL(): - * maxLookbackWindowStartForRow = min(max(ttlWindowStart, maxLookbackWindowStart), - * replicationConsistencyPoint) - */ - private static long computeRowBoundary(long ttlWindowStart, long maxLookbackWindowStart, - long replicationConsistencyPoint) { - long rowBoundary = Math.max(ttlWindowStart, maxLookbackWindowStart); - return Math.min(rowBoundary, replicationConsistencyPoint); - } - @Test public void testTtlHigherThanConsistencyPoint_capApplied() { long now = System.currentTimeMillis(); @@ -47,7 +36,8 @@ public void testTtlHigherThanConsistencyPoint_capApplied() { long ttlWindowStart = now - 3600000L; long consistencyPoint = now - 7200000L; - long result = computeRowBoundary(ttlWindowStart, maxLookbackWindowStart, consistencyPoint); + long result = ReplicationLogReplayService.computeRowMaxLookbackWithGuard(ttlWindowStart, + maxLookbackWindowStart, consistencyPoint); assertEquals(consistencyPoint, result); } @@ -59,7 +49,8 @@ public void testTtlLowerThanConsistencyPoint_noCapNeeded() { long ttlWindowStart = now - 14400000L; long consistencyPoint = now - 3600000L; - long result = computeRowBoundary(ttlWindowStart, maxLookbackWindowStart, consistencyPoint); + long result = ReplicationLogReplayService.computeRowMaxLookbackWithGuard(ttlWindowStart, + maxLookbackWindowStart, consistencyPoint); assertEquals(ttlWindowStart, result); } @@ -71,7 +62,8 @@ public void testNoTtl_capAppliedOnMaxLookback() { long ttlWindowStart = 1L; long consistencyPoint = now - 172800000L; - long result = computeRowBoundary(ttlWindowStart, maxLookbackWindowStart, consistencyPoint); + long result = ReplicationLogReplayService.computeRowMaxLookbackWithGuard(ttlWindowStart, + maxLookbackWindowStart, consistencyPoint); assertEquals(consistencyPoint, result); } @@ -83,7 +75,8 @@ public void testNoTtl_consistencyPointAheadOfMaxLookback() { long ttlWindowStart = 1L; long consistencyPoint = now - 60000L; - long result = computeRowBoundary(ttlWindowStart, maxLookbackWindowStart, consistencyPoint); + long result = ReplicationLogReplayService.computeRowMaxLookbackWithGuard(ttlWindowStart, + maxLookbackWindowStart, consistencyPoint); assertEquals(maxLookbackWindowStart, result); } @@ -95,7 +88,8 @@ public void testConsistencyPointZero_retainsAll() { long ttlWindowStart = now - 3600000L; long consistencyPoint = 0L; - long result = computeRowBoundary(ttlWindowStart, maxLookbackWindowStart, consistencyPoint); + long result = ReplicationLogReplayService.computeRowMaxLookbackWithGuard(ttlWindowStart, + maxLookbackWindowStart, consistencyPoint); assertEquals(0L, result); } @@ -107,7 +101,8 @@ public void testGuardDisabled_longMaxValueNoOp() { long ttlWindowStart = now - 3600000L; long consistencyPoint = Long.MAX_VALUE; - long result = computeRowBoundary(ttlWindowStart, maxLookbackWindowStart, consistencyPoint); + long result = ReplicationLogReplayService.computeRowMaxLookbackWithGuard(ttlWindowStart, + maxLookbackWindowStart, consistencyPoint); assertEquals(ttlWindowStart, result); } @@ -137,4 +132,32 @@ public void testCachedConsistencyPointAvoidsRepeatedFetches() { ReplicationLogReplayService.resetInstanceForTesting(); } } + + @Test + public void testTransientFailureNotCached_retriesOnNextCall() { + AtomicInteger fetchCount = new AtomicInteger(0); + ReplicationLogReplayService.setConsistencyPointSupplierForTesting(() -> { + int attempt = fetchCount.incrementAndGet(); + if (attempt == 1) { + throw new RuntimeException("Simulated transient failure"); + } + return 700000L; + }); + + try { + Configuration conf = new Configuration(false); + String table = "TEST_TABLE"; + String cf = "0"; + + long result1 = ReplicationLogReplayService.resolveConsistencyPoint(conf, table, cf); + assertEquals(0L, result1); + + long result2 = ReplicationLogReplayService.resolveConsistencyPoint(conf, table, cf); + assertEquals(700000L, result2); + + assertEquals(2, fetchCount.get()); + } finally { + ReplicationLogReplayService.resetInstanceForTesting(); + } + } } From aff95ca0742ee7b5f0a56a999a6d7d5aef5f7e0e Mon Sep 17 00:00:00 2001 From: Himanshu Gwalani Date: Fri, 12 Jun 2026 15:40:11 +0530 Subject: [PATCH 10/13] PHOENIX-7863 Move guard formula to CompactionScanner and trim test --- .../coprocessor/CompactionScanner.java | 14 +++- .../reader/ReplicationLogReplayService.java | 9 -- ...a => ReplicationConsistencyPointTest.java} | 83 +------------------ 3 files changed, 13 insertions(+), 93 deletions(-) rename phoenix-core/src/test/java/org/apache/phoenix/replication/reader/{ReplicationCompactionGuardTest.java => ReplicationConsistencyPointTest.java} (50%) diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java index 977c6d37608..449b4f32c2b 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java @@ -1646,6 +1646,15 @@ private String getTenantIdFromRowKey(byte[] rowKey, boolean isSharedIndex) throw } } + /** + * Computes the effective max-lookback boundary for a row, capped by the replication consistency + * point. Formula: min(max(ttlWindowStart, maxLookbackWindowStart), consistencyPoint). + */ + public static long computeRowMaxLookbackWithGuard(long ttlWindowStart, + long maxLookbackWindowStart, long replicationConsistencyPoint) { + return Math.min(Math.max(ttlWindowStart, maxLookbackWindowStart), replicationConsistencyPoint); + } + /** * The context for a given row during compaction. A row may have multiple compaction row versions. * CompactionScanner uses the same row context for these versions. @@ -1672,9 +1681,8 @@ private void init() { private void setTTL(long ttlInSecs) { this.ttl = Math.max(ttlInSecs * 1000, maxLookbackInMillis + 1); this.ttlWindowStart = ttlInSecs == HConstants.FOREVER ? 1 : compactionTime - ttl; - this.maxLookbackWindowStartForRow = - ReplicationLogReplayService.computeRowMaxLookbackWithGuard(ttlWindowStart, - maxLookbackWindowStart, replicationConsistencyPoint); + this.maxLookbackWindowStartForRow = computeRowMaxLookbackWithGuard(ttlWindowStart, + maxLookbackWindowStart, replicationConsistencyPoint); if (LOGGER.isTraceEnabled()) { LOGGER.trace(String.format("RowContext:- (ttlWindowStart=%d, maxLookbackWindowStart=%d)", ttlWindowStart, maxLookbackWindowStart)); diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java index 144628603c3..4b63495d7f1 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java @@ -298,15 +298,6 @@ public static long resolveConsistencyPoint(Configuration conf, String tableName, } } - /** - * Computes the effective max-lookback boundary for a row, capped by the replication consistency - * point. This is the single source of truth for the formula used by CompactionScanner.RowContext. - */ - public static long computeRowMaxLookbackWithGuard(long ttlWindowStart, - long maxLookbackWindowStart, long replicationConsistencyPoint) { - return Math.min(Math.max(ttlWindowStart, maxLookbackWindowStart), replicationConsistencyPoint); - } - /** Returns the list of HA groups on the cluster */ protected List getReplicationGroups() throws SQLException { return HAGroupStoreManager.getInstance(conf).getHAGroupNames(); diff --git a/phoenix-core/src/test/java/org/apache/phoenix/replication/reader/ReplicationCompactionGuardTest.java b/phoenix-core/src/test/java/org/apache/phoenix/replication/reader/ReplicationConsistencyPointTest.java similarity index 50% rename from phoenix-core/src/test/java/org/apache/phoenix/replication/reader/ReplicationCompactionGuardTest.java rename to phoenix-core/src/test/java/org/apache/phoenix/replication/reader/ReplicationConsistencyPointTest.java index 13d0e01f46a..f989e4b62b4 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/replication/reader/ReplicationCompactionGuardTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/replication/reader/ReplicationConsistencyPointTest.java @@ -24,88 +24,9 @@ import org.junit.Test; /** - * Tests for the replication compaction guard row-level cap logic. Verifies the contract: - * maxLookbackWindowStartForRow = min(max(ttlWindowStart, maxLookbackWindowStart), consistencyPoint) + * Tests for ReplicationLogReplayService.resolveConsistencyPoint caching behavior. */ -public class ReplicationCompactionGuardTest { - - @Test - public void testTtlHigherThanConsistencyPoint_capApplied() { - long now = System.currentTimeMillis(); - long maxLookbackWindowStart = now - 86400000L; - long ttlWindowStart = now - 3600000L; - long consistencyPoint = now - 7200000L; - - long result = ReplicationLogReplayService.computeRowMaxLookbackWithGuard(ttlWindowStart, - maxLookbackWindowStart, consistencyPoint); - - assertEquals(consistencyPoint, result); - } - - @Test - public void testTtlLowerThanConsistencyPoint_noCapNeeded() { - long now = System.currentTimeMillis(); - long maxLookbackWindowStart = now - 86400000L; - long ttlWindowStart = now - 14400000L; - long consistencyPoint = now - 3600000L; - - long result = ReplicationLogReplayService.computeRowMaxLookbackWithGuard(ttlWindowStart, - maxLookbackWindowStart, consistencyPoint); - - assertEquals(ttlWindowStart, result); - } - - @Test - public void testNoTtl_capAppliedOnMaxLookback() { - long now = System.currentTimeMillis(); - long maxLookbackWindowStart = now - 86400000L; - long ttlWindowStart = 1L; - long consistencyPoint = now - 172800000L; - - long result = ReplicationLogReplayService.computeRowMaxLookbackWithGuard(ttlWindowStart, - maxLookbackWindowStart, consistencyPoint); - - assertEquals(consistencyPoint, result); - } - - @Test - public void testNoTtl_consistencyPointAheadOfMaxLookback() { - long now = System.currentTimeMillis(); - long maxLookbackWindowStart = now - 86400000L; - long ttlWindowStart = 1L; - long consistencyPoint = now - 60000L; - - long result = ReplicationLogReplayService.computeRowMaxLookbackWithGuard(ttlWindowStart, - maxLookbackWindowStart, consistencyPoint); - - assertEquals(maxLookbackWindowStart, result); - } - - @Test - public void testConsistencyPointZero_retainsAll() { - long now = System.currentTimeMillis(); - long maxLookbackWindowStart = now - 86400000L; - long ttlWindowStart = now - 3600000L; - long consistencyPoint = 0L; - - long result = ReplicationLogReplayService.computeRowMaxLookbackWithGuard(ttlWindowStart, - maxLookbackWindowStart, consistencyPoint); - - assertEquals(0L, result); - } - - @Test - public void testGuardDisabled_longMaxValueNoOp() { - long now = System.currentTimeMillis(); - long maxLookbackWindowStart = now - 86400000L; - long ttlWindowStart = now - 3600000L; - long consistencyPoint = Long.MAX_VALUE; - - long result = ReplicationLogReplayService.computeRowMaxLookbackWithGuard(ttlWindowStart, - maxLookbackWindowStart, consistencyPoint); - - assertEquals(ttlWindowStart, result); - } +public class ReplicationConsistencyPointTest { @Test public void testCachedConsistencyPointAvoidsRepeatedFetches() { From 0a86320dd12d69666da2e9bd35481094021f0905 Mon Sep 17 00:00:00 2001 From: Himanshu Gwalani Date: Fri, 12 Jun 2026 15:47:23 +0530 Subject: [PATCH 11/13] PHOENIX-7863 Narrow guard formula visibility to private --- .../java/org/apache/phoenix/coprocessor/CompactionScanner.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java index 449b4f32c2b..8525ccac0b9 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java @@ -1650,7 +1650,7 @@ private String getTenantIdFromRowKey(byte[] rowKey, boolean isSharedIndex) throw * Computes the effective max-lookback boundary for a row, capped by the replication consistency * point. Formula: min(max(ttlWindowStart, maxLookbackWindowStart), consistencyPoint). */ - public static long computeRowMaxLookbackWithGuard(long ttlWindowStart, + private static long computeRowMaxLookbackWithGuard(long ttlWindowStart, long maxLookbackWindowStart, long replicationConsistencyPoint) { return Math.min(Math.max(ttlWindowStart, maxLookbackWindowStart), replicationConsistencyPoint); } From 6463bce494db2878bc1c09ec3e8290c5932e909f Mon Sep 17 00:00:00 2001 From: Himanshu Gwalani Date: Tue, 23 Jun 2026 13:45:17 +0530 Subject: [PATCH 12/13] Fix build issue by initialzing the conf variable --- .../phoenix/replication/reader/ReplicationLogReplayService.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java index 4b63495d7f1..baf5739e450 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java @@ -112,10 +112,12 @@ private ReplicationLogReplayService(final Configuration conf) { } private ReplicationLogReplayService(long fixedConsistencyPoint) { + this.conf = null; this.cachedConsistencyPoint = () -> fixedConsistencyPoint; } private ReplicationLogReplayService(Supplier supplier) { + this.conf = null; this.cachedConsistencyPoint = Suppliers.memoizeWithExpiration(supplier, CONSISTENCY_POINT_CACHE_TTL_SECONDS, TimeUnit.SECONDS); } From a454b06a27389452a70e0d1f3617dce413517b2c Mon Sep 17 00:00:00 2001 From: Himanshu Gwalani Date: Tue, 23 Jun 2026 23:40:31 +0530 Subject: [PATCH 13/13] PHOENIX-7863 Address review: volatile, remove guard flag, add TTL IT and formula UTs --- .../coprocessor/CompactionScanner.java | 7 +- .../reader/ReplicationLogDiscoveryReplay.java | 2 +- .../reader/ReplicationLogReplayService.java | 10 -- .../CompactionReplicationGuardDisabledIT.java | 147 ------------------ .../reader/CompactionReplicationGuardIT.java | 48 +++++- .../CompactionGuardFormulaTest.java | 100 ++++++++++++ 6 files changed, 149 insertions(+), 165 deletions(-) delete mode 100644 phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardDisabledIT.java create mode 100644 phoenix-core/src/test/java/org/apache/phoenix/coprocessor/CompactionGuardFormulaTest.java diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java index 8525ccac0b9..f3d3b4bf8b5 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java @@ -206,10 +206,7 @@ public CompactionScanner(RegionCoprocessorEnvironment env, Store store, boolean replayEnabled = conf.getBoolean(ReplicationLogReplayService.PHOENIX_REPLICATION_REPLAY_ENABLED, ReplicationLogReplayService.DEFAULT_REPLICATION_REPLAY_ENABLED); - boolean guardEnabled = - conf.getBoolean(ReplicationLogReplayService.REPLICATION_COMPACTION_GUARD_ENABLED, - ReplicationLogReplayService.DEFAULT_REPLICATION_COMPACTION_GUARD_ENABLED); - if (this.major && replayEnabled && guardEnabled) { + if (this.major && replayEnabled) { this.replicationConsistencyPoint = ReplicationLogReplayService.resolveConsistencyPoint(conf, tableName, columnFamilyName); } else { @@ -1650,7 +1647,7 @@ private String getTenantIdFromRowKey(byte[] rowKey, boolean isSharedIndex) throw * Computes the effective max-lookback boundary for a row, capped by the replication consistency * point. Formula: min(max(ttlWindowStart, maxLookbackWindowStart), consistencyPoint). */ - private static long computeRowMaxLookbackWithGuard(long ttlWindowStart, + public static long computeRowMaxLookbackWithGuard(long ttlWindowStart, long maxLookbackWindowStart, long replicationConsistencyPoint) { return Math.min(Math.max(ttlWindowStart, maxLookbackWindowStart), replicationConsistencyPoint); } diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplay.java b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplay.java index 4d5f886d510..ec7a1da5358 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplay.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplay.java @@ -103,7 +103,7 @@ public class ReplicationLogDiscoveryReplay extends ReplicationLogDiscovery { */ public static final double DEFAULT_WAITING_BUFFER_PERCENTAGE = 15.0; - private ReplicationRound lastRoundInSync; + private volatile ReplicationRound lastRoundInSync; // AtomicReference ensures listener updates are visible to replay thread private final AtomicReference replicationReplayState = diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java index baf5739e450..342fe9621a3 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java @@ -53,16 +53,6 @@ public class ReplicationLogReplayService { */ public static final boolean DEFAULT_REPLICATION_REPLAY_ENABLED = false; - /** - * Configuration key for enabling/disabling the replication compaction guard - */ - public static final String REPLICATION_COMPACTION_GUARD_ENABLED = - "phoenix.replication.compaction.guard.enabled"; - - /** - * Default value for replication compaction guard enabled flag - */ - public static final boolean DEFAULT_REPLICATION_COMPACTION_GUARD_ENABLED = true; /** * Number of threads in the executor pool for the replication replay service diff --git a/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardDisabledIT.java b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardDisabledIT.java deleted file mode 100644 index e43f8a4dfd5..00000000000 --- a/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardDisabledIT.java +++ /dev/null @@ -1,147 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.phoenix.replication.reader; - -import static org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants.PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY; -import static org.apache.phoenix.util.TestUtil.assertRawRowCount; -import static org.junit.Assert.assertFalse; - -import java.io.IOException; -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.SQLException; -import java.util.Map; -import org.apache.hadoop.hbase.TableName; -import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest; -import org.apache.phoenix.query.BaseTest; -import org.apache.phoenix.query.QueryServices; -import org.apache.phoenix.util.EnvironmentEdgeManager; -import org.apache.phoenix.util.ManualEnvironmentEdge; -import org.apache.phoenix.util.ReadOnlyProps; -import org.apache.phoenix.util.TestUtil; -import org.junit.After; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Test; -import org.junit.experimental.categories.Category; - -import org.apache.phoenix.thirdparty.com.google.common.collect.Maps; - -/** - * Integration test verifying that the replication compaction guard does NOT interfere with normal - * compaction when explicitly disabled via configuration. - */ -@Category(NeedsOwnMiniClusterTest.class) -public class CompactionReplicationGuardDisabledIT extends BaseTest { - - private static final int MAX_LOOKBACK_AGE = 15; - private static final int ROWS_POPULATED = 2; - private ManualEnvironmentEdge injectEdge; - - @BeforeClass - public static synchronized void doSetup() throws Exception { - Map props = Maps.newHashMapWithExpectedSize(5); - props.put(PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY, Integer.toString(MAX_LOOKBACK_AGE)); - props.put(QueryServices.PHOENIX_COMPACTION_ENABLED, Boolean.toString(true)); - props.put(ReplicationLogReplayService.PHOENIX_REPLICATION_REPLAY_ENABLED, - Boolean.toString(true)); - props.put(ReplicationLogReplayService.REPLICATION_COMPACTION_GUARD_ENABLED, - Boolean.toString(false)); - props.put("hbase.procedure.remote.dispatcher.delay.msec", "0"); - setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); - } - - @Before - public void beforeTest() throws Exception { - EnvironmentEdgeManager.reset(); - injectEdge = new ManualEnvironmentEdge(); - injectEdge.setValue(System.currentTimeMillis()); - EnvironmentEdgeManager.injectEdge(injectEdge); - } - - @After - public synchronized void afterTest() throws Exception { - ReplicationLogReplayService.resetInstanceForTesting(); - EnvironmentEdgeManager.reset(); - boolean refCountLeaked = isAnyStoreRefCountLeaked(); - assertFalse("refCount leaked", refCountLeaked); - } - - /** - * When guard is disabled, delete markers are purged normally by maxLookback even though the - * consistency point would have protected them if the guard were enabled. - */ - @Test(timeout = 120000L) - public void testGuardDisabledDeleteMarkersPurgedByMaxLookback() throws Exception { - try (Connection conn = DriverManager.getConnection(getUrl())) { - String dataTableName = generateUniqueName(); - createTable(dataTableName); - TableName dataTable = TableName.valueOf(dataTableName); - populateTable(dataTableName); - - injectEdge.incrementValue(1); - long beforeDeleteTime = EnvironmentEdgeManager.currentTimeMillis(); - - // Delete a row - conn.createStatement().execute("DELETE FROM " + dataTableName + " WHERE id = 'a'"); - conn.commit(); - injectEdge.incrementValue(1); - - // Set consistency point BEFORE delete — guard would retain if enabled - long consistencyPoint = beforeDeleteTime - 1; - ReplicationLogReplayService.setConsistencyPointForTesting(consistencyPoint); - - // Advance past maxLookback - injectEdge.incrementValue(MAX_LOOKBACK_AGE * 1000 + 1000); - - flush(dataTable); - majorCompact(dataTable); - - // Guard disabled — delete marker purged by maxLookback as normal - assertRawRowCount(conn, dataTable, ROWS_POPULATED - 1); - } - } - - private void flush(TableName table) throws IOException { - getUtility().getAdmin().flush(table); - } - - private void majorCompact(TableName table) throws Exception { - TestUtil.majorCompact(getUtility(), table); - } - - private void createTable(String tableName) throws SQLException { - try (Connection conn = DriverManager.getConnection(getUrl())) { - conn.createStatement().execute( - "CREATE TABLE " + tableName + " (id VARCHAR(10) NOT NULL PRIMARY KEY, val1 VARCHAR(10)," - + " val2 VARCHAR(10), val3 VARCHAR(10))"); - conn.commit(); - } - } - - private void populateTable(String tableName) throws SQLException { - try (Connection conn = DriverManager.getConnection(getUrl())) { - conn.createStatement() - .execute("UPSERT INTO " + tableName + " VALUES ('a', 'ab', 'abc', 'abcd')"); - conn.commit(); - conn.createStatement() - .execute("UPSERT INTO " + tableName + " VALUES ('b', 'bc', 'bcd', 'bcde')"); - conn.commit(); - } - } -} diff --git a/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardIT.java b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardIT.java index 3bd0960b739..f16a3146798 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardIT.java @@ -61,8 +61,6 @@ public static synchronized void doSetup() throws Exception { props.put(QueryServices.PHOENIX_COMPACTION_ENABLED, Boolean.toString(true)); props.put(ReplicationLogReplayService.PHOENIX_REPLICATION_REPLAY_ENABLED, Boolean.toString(true)); - props.put(ReplicationLogReplayService.REPLICATION_COMPACTION_GUARD_ENABLED, - Boolean.toString(true)); props.put("hbase.procedure.remote.dispatcher.delay.msec", "0"); setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); } @@ -222,6 +220,43 @@ public void testGuardFallbackRetainsAllWhenConsistencyPointUnavailable() throws } } + /** + * Test 5: Guard retains delete markers on a table with explicit TTL. The consistency point is + * set BEFORE the delete, time advances past both TTL and maxLookback, and the guard still + * retains the delete marker because its timestamp is newer than the consistency point. + */ + @Test(timeout = 120000L) + public void testGuardRetainsDeleteMarkersWithExplicitTTL() throws Exception { + int ttlSeconds = 30; + try (Connection conn = DriverManager.getConnection(getUrl())) { + String dataTableName = generateUniqueName(); + createTableWithTTL(dataTableName, ttlSeconds); + TableName dataTable = TableName.valueOf(dataTableName); + populateTable(dataTableName); + + injectEdge.incrementValue(1); + long beforeDeleteTime = EnvironmentEdgeManager.currentTimeMillis(); + + // Delete a row + conn.createStatement().execute("DELETE FROM " + dataTableName + " WHERE id = 'a'"); + conn.commit(); + injectEdge.incrementValue(1); + + // Set consistency point BEFORE the delete — replay hasn't caught up + long consistencyPoint = beforeDeleteTime - 1; + injectMockConsistencyPoint(consistencyPoint); + + // Advance past both TTL and maxLookback — without guard, marker would be purged + injectEdge.incrementValue(ttlSeconds * 1000 + 1000); + + flush(dataTable); + majorCompact(dataTable); + + // Delete marker retained — guard caps purge at consistencyPoint + assertRawRowCount(conn, dataTable, ROWS_POPULATED); + } + } + private void injectMockConsistencyPoint(long consistencyPoint) { ReplicationLogReplayService.setConsistencyPointForTesting(consistencyPoint); } @@ -243,6 +278,15 @@ private void createTable(String tableName) throws SQLException { } } + private void createTableWithTTL(String tableName, int ttlSeconds) throws SQLException { + try (Connection conn = DriverManager.getConnection(getUrl())) { + conn.createStatement().execute( + "CREATE TABLE " + tableName + " (id VARCHAR(10) NOT NULL PRIMARY KEY, val1 VARCHAR(10)," + + " val2 VARCHAR(10), val3 VARCHAR(10)) TTL=" + ttlSeconds); + conn.commit(); + } + } + private void populateTable(String tableName) throws SQLException { try (Connection conn = DriverManager.getConnection(getUrl())) { conn.createStatement() diff --git a/phoenix-core/src/test/java/org/apache/phoenix/coprocessor/CompactionGuardFormulaTest.java b/phoenix-core/src/test/java/org/apache/phoenix/coprocessor/CompactionGuardFormulaTest.java new file mode 100644 index 00000000000..f41f7d70f07 --- /dev/null +++ b/phoenix-core/src/test/java/org/apache/phoenix/coprocessor/CompactionGuardFormulaTest.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.coprocessor; + +import static org.junit.Assert.assertEquals; + +import org.junit.Test; + +/** + * Unit tests for CompactionScanner.computeRowMaxLookbackWithGuard formula. + * Covers scenarios including those unreachable via standard TTL floor enforcement + * to guard against future changes to the TTL computation. + */ +public class CompactionGuardFormulaTest { + + @Test + public void testTtlDominatedGuardCaps() { + // ttlWindowStart > maxLookbackWindowStart > consistencyPoint + // Simulates conditional TTL or future removal of TTL floor enforcement + long consistencyPoint = 1000L; + long maxLookbackWindowStart = 2000L; + long ttlWindowStart = 3000L; + + long result = CompactionScanner.computeRowMaxLookbackWithGuard( + ttlWindowStart, maxLookbackWindowStart, consistencyPoint); + + // max(3000, 2000) = 3000, min(3000, 1000) = 1000 → guard caps at consistencyPoint + assertEquals(consistencyPoint, result); + } + + @Test + public void testLookbackDominatedGuardCaps() { + // maxLookbackWindowStart > ttlWindowStart > consistencyPoint + long consistencyPoint = 1000L; + long ttlWindowStart = 2000L; + long maxLookbackWindowStart = 3000L; + + long result = CompactionScanner.computeRowMaxLookbackWithGuard( + ttlWindowStart, maxLookbackWindowStart, consistencyPoint); + + // max(2000, 3000) = 3000, min(3000, 1000) = 1000 → guard caps at consistencyPoint + assertEquals(consistencyPoint, result); + } + + @Test + public void testConsistencyPointBeyondBoth_guardInactive() { + // consistencyPoint > max(ttlWindowStart, maxLookbackWindowStart) + long maxLookbackWindowStart = 2000L; + long ttlWindowStart = 1000L; + long consistencyPoint = 5000L; + + long result = CompactionScanner.computeRowMaxLookbackWithGuard( + ttlWindowStart, maxLookbackWindowStart, consistencyPoint); + + // max(1000, 2000) = 2000, min(2000, 5000) = 2000 → guard doesn't restrict + assertEquals(maxLookbackWindowStart, result); + } + + @Test + public void testConsistencyPointZero_retainsAll() { + // consistencyPoint = 0 signals fallback — retain all delete markers + long maxLookbackWindowStart = 2000L; + long ttlWindowStart = 3000L; + long consistencyPoint = 0L; + + long result = CompactionScanner.computeRowMaxLookbackWithGuard( + ttlWindowStart, maxLookbackWindowStart, consistencyPoint); + + assertEquals(0L, result); + } + + @Test + public void testConsistencyPointMaxValue_guardDisabled() { + // Long.MAX_VALUE used when replay is off — guard is effectively a no-op + long maxLookbackWindowStart = 2000L; + long ttlWindowStart = 3000L; + long consistencyPoint = Long.MAX_VALUE; + + long result = CompactionScanner.computeRowMaxLookbackWithGuard( + ttlWindowStart, maxLookbackWindowStart, consistencyPoint); + + // max(3000, 2000) = 3000, min(3000, MAX_VALUE) = 3000 → normal behavior + assertEquals(ttlWindowStart, result); + } +}