diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java index f12dc77f724..cb8d890d1c2 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java @@ -81,6 +81,7 @@ import org.apache.phoenix.jdbc.PhoenixPreparedStatement; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.query.QueryServicesOptions; +import org.apache.phoenix.replication.reader.ReplicationLogReplayService; import org.apache.phoenix.schema.CompiledConditionalTTLExpression; import org.apache.phoenix.schema.CompiledTTLExpression; import org.apache.phoenix.schema.ConditionalTTLExpression; @@ -139,6 +140,7 @@ public class CompactionScanner implements InternalScanner { private final Store store; private final RegionCoprocessorEnvironment env; private long maxLookbackWindowStart; + private final long replicationConsistencyPoint; private final long maxLookbackInMillis; private int minVersion; private int maxVersion; @@ -199,8 +201,19 @@ public CompactionScanner(RegionCoprocessorEnvironment env, Store store, this.maxLookbackWindowStart = this.maxLookbackInMillis == 0 ? compactionTime : compactionTime - (this.maxLookbackInMillis + 1); - ColumnFamilyDescriptor cfd = store.getColumnFamilyDescriptor(); + Configuration conf = env.getConfiguration(); this.major = major && !forceMinorCompaction; + boolean replayEnabled = + conf.getBoolean(ReplicationLogReplayService.PHOENIX_REPLICATION_REPLAY_ENABLED, + ReplicationLogReplayService.DEFAULT_REPLICATION_REPLAY_ENABLED); + if (this.major && replayEnabled) { + this.replicationConsistencyPoint = + ReplicationLogReplayService.resolveConsistencyPoint(conf, tableName, columnFamilyName); + } else { + this.replicationConsistencyPoint = + ReplicationLogReplayService.CONSISTENCY_POINT_GUARD_DISABLED; + } + ColumnFamilyDescriptor cfd = store.getColumnFamilyDescriptor(); this.minVersion = cfd.getMinVersions(); this.maxVersion = cfd.getMaxVersions(); this.keepDeletedCells = keepDeleted ? KeepDeletedCells.TTL : cfd.getKeepDeletedCells(); @@ -1631,6 +1644,33 @@ private String getTenantIdFromRowKey(byte[] rowKey, boolean isSharedIndex) throw } } + /** + * Computes the effective max-lookback boundary for a row, capped by the replication consistency + * point. The consistency point represents an exclusive upper bound: everything with ts < + * consistencyPoint has been replayed. We subtract 1 so that cells at exactly ts == + * consistencyPoint satisfy the strict-greater retention check and are retained. + * @param ttlWindowStart row TTL window start in millis since epoch + * @param maxLookbackWindowStart store-level max-lookback window start in millis since epoch + * @param replicationConsistencyPoint exclusive upper bound of replayed timestamps; + * CONSISTENCY_POINT_UNAVAILABLE (0) retains all, + * CONSISTENCY_POINT_GUARD_DISABLED (Long.MAX_VALUE) means + * guard is a no-op + * @return effective boundary for the strict-greater retention compare (millis since epoch) + */ + public static long computeRowMaxLookbackWithGuard(long ttlWindowStart, + long maxLookbackWindowStart, long replicationConsistencyPoint) { + if ( + replicationConsistencyPoint == ReplicationLogReplayService.CONSISTENCY_POINT_UNAVAILABLE + || replicationConsistencyPoint + == ReplicationLogReplayService.CONSISTENCY_POINT_GUARD_DISABLED + ) { + return Math.min(Math.max(ttlWindowStart, maxLookbackWindowStart), + replicationConsistencyPoint); + } + return Math.min(Math.max(ttlWindowStart, maxLookbackWindowStart), + replicationConsistencyPoint - 1); + } + /** * The context for a given row during compaction. A row may have multiple compaction row versions. * CompactionScanner uses the same row context for these versions. @@ -1657,10 +1697,14 @@ private void init() { private void setTTL(long ttlInSecs) { this.ttl = Math.max(ttlInSecs * 1000, maxLookbackInMillis + 1); this.ttlWindowStart = ttlInSecs == HConstants.FOREVER ? 1 : compactionTime - ttl; - this.maxLookbackWindowStartForRow = Math.max(ttlWindowStart, maxLookbackWindowStart); + this.maxLookbackWindowStartForRow = computeRowMaxLookbackWithGuard(ttlWindowStart, + maxLookbackWindowStart, replicationConsistencyPoint); if (LOGGER.isTraceEnabled()) { - LOGGER.trace(String.format("RowContext:- (ttlWindowStart=%d, maxLookbackWindowStart=%d)", - ttlWindowStart, maxLookbackWindowStart)); + LOGGER.trace(String.format( + "RowContext:- (ttlWindowStart=%d, maxLookbackWindowStart=%d, " + + "replicationConsistencyPoint=%d, maxLookbackWindowStartForRow=%d)", + ttlWindowStart, maxLookbackWindowStart, replicationConsistencyPoint, + maxLookbackWindowStartForRow)); } } diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplay.java b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplay.java index 4d5f886d510..ec7a1da5358 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplay.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplay.java @@ -103,7 +103,7 @@ public class ReplicationLogDiscoveryReplay extends ReplicationLogDiscovery { */ public static final double DEFAULT_WAITING_BUFFER_PERCENTAGE = 15.0; - private ReplicationRound lastRoundInSync; + private volatile ReplicationRound lastRoundInSync; // AtomicReference ensures listener updates are visible to replay thread private final AtomicReference replicationReplayState = diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java index 24d40faac77..ac9ec10b4fe 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java @@ -29,6 +29,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting; +import org.apache.phoenix.thirdparty.com.google.common.base.Supplier; +import org.apache.phoenix.thirdparty.com.google.common.base.Suppliers; import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; /** @@ -77,14 +80,51 @@ public class ReplicationLogReplayService { */ public static final int DEFAULT_REPLICATION_REPLAY_SERVICE_EXECUTOR_SHUTDOWN_TIMEOUT_SECONDS = 30; + public static final long CONSISTENCY_POINT_UNAVAILABLE = 0L; + public static final long CONSISTENCY_POINT_GUARD_DISABLED = Long.MAX_VALUE; + + public static final String CONSISTENCY_POINT_CACHE_TTL_SECONDS_KEY = + "phoenix.replication.compaction.guard.cache.ttl.seconds"; + public static final long DEFAULT_CONSISTENCY_POINT_CACHE_TTL_SECONDS = 30; + + private static volatile long lastFallbackWarnTime = 0; + private static final long WARN_LOG_INTERVAL_MS = 60_000; + private static volatile ReplicationLogReplayService instance; private final Configuration conf; private ScheduledExecutorService scheduler; private volatile boolean isRunning = false; + private final Supplier cachedConsistencyPoint; private ReplicationLogReplayService(final Configuration conf) { this.conf = conf; + long cacheTtl = conf.getLong(CONSISTENCY_POINT_CACHE_TTL_SECONDS_KEY, + DEFAULT_CONSISTENCY_POINT_CACHE_TTL_SECONDS); + // Guava's memoizeWithExpiration does NOT cache exceptions — a thrown RuntimeException + // causes the next get() to re-invoke the supplier. We rely on this: transient failures + // (NN flap, SYSTEM.HA_GROUP unavailable) retry on the next compaction rather than + // caching a stale fallback for the full TTL. + this.cachedConsistencyPoint = Suppliers.memoizeWithExpiration(() -> { + try { + return getConsistencyPoint(); + } catch (IOException | SQLException e) { + throw new RuntimeException("Failed to fetch consistency point", e); + } + }, cacheTtl, TimeUnit.SECONDS); + } + + private ReplicationLogReplayService(Configuration conf, long fixedConsistencyPoint) { + this.conf = conf; + this.cachedConsistencyPoint = () -> fixedConsistencyPoint; + } + + private ReplicationLogReplayService(Configuration conf, Supplier supplier) { + this.conf = conf; + long cacheTtl = conf.getLong(CONSISTENCY_POINT_CACHE_TTL_SECONDS_KEY, + DEFAULT_CONSISTENCY_POINT_CACHE_TTL_SECONDS); + this.cachedConsistencyPoint = + Suppliers.memoizeWithExpiration(supplier, cacheTtl, TimeUnit.SECONDS); } /** @@ -105,6 +145,28 @@ public static ReplicationLogReplayService getInstance(Configuration conf) throws return instance; } + @VisibleForTesting + public static void setConsistencyPointForTesting(Configuration conf, long fixedConsistencyPoint) { + synchronized (ReplicationLogReplayService.class) { + instance = new ReplicationLogReplayService(conf, fixedConsistencyPoint); + } + } + + @VisibleForTesting + public static void setConsistencyPointSupplierForTesting(Configuration conf, + Supplier supplier) { + synchronized (ReplicationLogReplayService.class) { + instance = new ReplicationLogReplayService(conf, supplier); + } + } + + @VisibleForTesting + public static void resetInstanceForTesting() { + synchronized (ReplicationLogReplayService.class) { + instance = null; + } + } + /** * Starts the replication log replay service by initializing the scheduler and scheduling periodic * replay operations for each HA Group. @@ -229,6 +291,32 @@ protected long getConsistencyPoint() throws IOException, SQLException { return consistencyPoint; } + /** + * Resolves the minimum replication consistency point across all HA groups. Uses a cached value + * with a configurable TTL (see {@link #CONSISTENCY_POINT_CACHE_TTL_SECONDS_KEY}) to avoid + * repeated RPCs during compaction bursts. Returns {@link #CONSISTENCY_POINT_UNAVAILABLE} on any + * failure (caller treats this as "retain all delete markers"). + */ + public static long resolveConsistencyPoint(Configuration conf, String tableName, + String columnFamilyName) { + try { + long consistencyPoint = getInstance(conf).cachedConsistencyPoint.get(); + if (LOG.isDebugEnabled()) { + LOG.debug("Replication guard: table={} store={} consistencyPoint={}", tableName, + columnFamilyName, consistencyPoint); + } + return consistencyPoint; + } catch (Exception e) { + long now = System.currentTimeMillis(); + if (now - lastFallbackWarnTime > WARN_LOG_INTERVAL_MS) { + lastFallbackWarnTime = now; + LOG.warn("Replication guard: consistency point unavailable for table={} store={}." + + " Retaining all delete markers.", tableName, columnFamilyName, e); + } + return CONSISTENCY_POINT_UNAVAILABLE; + } + } + /** Returns the list of HA groups on the cluster */ protected List getReplicationGroups() throws SQLException { return HAGroupStoreManager.getInstance(conf).getHAGroupNames(); diff --git a/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardIT.java b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardIT.java new file mode 100644 index 00000000000..2279ab929f8 --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardIT.java @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.replication.reader; + +import static org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants.PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY; +import static org.apache.phoenix.replication.reader.ReplicationLogReplayService.CONSISTENCY_POINT_UNAVAILABLE; +import static org.apache.phoenix.util.TestUtil.assertRawRowCount; +import static org.junit.Assert.assertFalse; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.util.Map; +import org.apache.hadoop.hbase.TableName; +import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest; +import org.apache.phoenix.query.BaseTest; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.util.EnvironmentEdgeManager; +import org.apache.phoenix.util.ManualEnvironmentEdge; +import org.apache.phoenix.util.ReadOnlyProps; +import org.apache.phoenix.util.TestUtil; +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import org.apache.phoenix.thirdparty.com.google.common.collect.Maps; + +/** + * Integration tests for the replication consistency point compaction guard. Verifies that + * CompactionScanner retains delete markers with timestamps newer than the consistency point on + * clusters where replication replay is enabled. + */ +@Category(NeedsOwnMiniClusterTest.class) +public class CompactionReplicationGuardIT extends BaseTest { + + private static final int MAX_LOOKBACK_AGE = 15; + private static final int ROWS_POPULATED = 2; + private ManualEnvironmentEdge injectEdge; + + @BeforeClass + public static synchronized void doSetup() throws Exception { + Map props = Maps.newHashMapWithExpectedSize(5); + props.put(PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY, Integer.toString(MAX_LOOKBACK_AGE)); + props.put(QueryServices.PHOENIX_COMPACTION_ENABLED, Boolean.toString(true)); + props.put(ReplicationLogReplayService.PHOENIX_REPLICATION_REPLAY_ENABLED, + Boolean.toString(true)); + props.put("hbase.procedure.remote.dispatcher.delay.msec", "0"); + setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); + } + + @Before + public void beforeTest() throws Exception { + EnvironmentEdgeManager.reset(); + injectEdge = new ManualEnvironmentEdge(); + injectEdge.setValue(System.currentTimeMillis()); + EnvironmentEdgeManager.injectEdge(injectEdge); + } + + @After + public synchronized void afterTest() throws Exception { + ReplicationLogReplayService.resetInstanceForTesting(); + EnvironmentEdgeManager.reset(); + boolean refCountLeaked = isAnyStoreRefCountLeaked(); + assertFalse("refCount leaked", refCountLeaked); + } + + /** + * Test 1: Guard retains delete markers that maxLookback would have purged. The consistency point + * is set BEFORE the delete timestamp, so the delete marker is newer than the consistency point + * and must be retained even after maxLookback window passes. + */ + @Test(timeout = 120000L) + public void testGuardRetainsDeleteMarkersNewerThanConsistencyPoint() throws Exception { + try (Connection conn = DriverManager.getConnection(getUrl())) { + String dataTableName = generateUniqueName(); + createTable(dataTableName); + TableName dataTable = TableName.valueOf(dataTableName); + populateTable(dataTableName); + + injectEdge.incrementValue(1); + long beforeDeleteTime = EnvironmentEdgeManager.currentTimeMillis(); + + // Delete a row + conn.createStatement().execute("DELETE FROM " + dataTableName + " WHERE id = 'a'"); + conn.commit(); + injectEdge.incrementValue(1); + + // Set consistency point BEFORE the delete — meaning replay hasn't caught up to the delete + long consistencyPoint = beforeDeleteTime - 1; + injectMockConsistencyPoint(consistencyPoint); + + // Advance time past maxLookback window — without guard, marker would be purged + injectEdge.incrementValue(MAX_LOOKBACK_AGE * 1000 + 1000); + + flush(dataTable); + majorCompact(dataTable); + + // Delete marker should be retained because its timestamp > consistencyPoint + assertRawRowCount(conn, dataTable, ROWS_POPULATED); + } + } + + /** + * Test 2: Both maxLookback and guard allow purge. The consistency point has advanced past the + * delete marker AND maxLookback window has passed — marker should be purged. + */ + @Test(timeout = 120000L) + public void testDeleteMarkersPurgedWhenOlderThanBothConsistencyPointAndMaxLookback() + throws Exception { + try (Connection conn = DriverManager.getConnection(getUrl())) { + String dataTableName = generateUniqueName(); + createTable(dataTableName); + TableName dataTable = TableName.valueOf(dataTableName); + populateTable(dataTableName); + + injectEdge.incrementValue(1); + + // Delete a row + conn.createStatement().execute("DELETE FROM " + dataTableName + " WHERE id = 'a'"); + conn.commit(); + injectEdge.incrementValue(1); + + // Advance time past maxLookback + injectEdge.incrementValue(MAX_LOOKBACK_AGE * 1000 + 1000); + + // Set consistency point to current time — replay is fully caught up + long consistencyPoint = EnvironmentEdgeManager.currentTimeMillis(); + injectMockConsistencyPoint(consistencyPoint); + + flush(dataTable); + majorCompact(dataTable); + + // Delete marker should be purged — both guard and maxLookback agree + assertRawRowCount(conn, dataTable, ROWS_POPULATED - 1); + } + } + + /** + * Test 3: MaxLookback retains even when guard wouldn't. Consistency point has advanced past the + * delete, but we're still within the maxLookback window — marker retained by maxLookback. + */ + @Test(timeout = 120000L) + public void testMaxLookbackRetainsEvenWhenGuardAllowsPurge() throws Exception { + try (Connection conn = DriverManager.getConnection(getUrl())) { + String dataTableName = generateUniqueName(); + createTable(dataTableName); + TableName dataTable = TableName.valueOf(dataTableName); + populateTable(dataTableName); + + injectEdge.incrementValue(1); + + // Delete a row + conn.createStatement().execute("DELETE FROM " + dataTableName + " WHERE id = 'a'"); + conn.commit(); + injectEdge.incrementValue(1); + + // Set consistency point to current time — guard would allow purge + long consistencyPoint = EnvironmentEdgeManager.currentTimeMillis(); + injectMockConsistencyPoint(consistencyPoint); + + // Do NOT advance past maxLookback — still within the window + injectEdge.incrementValue(1); + + flush(dataTable); + majorCompact(dataTable); + + // Delete marker retained because still within maxLookback window + assertRawRowCount(conn, dataTable, ROWS_POPULATED); + } + } + + /** + * Test 4: Guard fallback when consistency point unavailable — retains all delete markers. When + * the replay service throws an exception (e.g., not initialized), the guard falls back to + * retaining all markers to avoid data loss. + */ + @Test(timeout = 120000L) + public void testGuardFallbackRetainsAllWhenConsistencyPointUnavailable() throws Exception { + try (Connection conn = DriverManager.getConnection(getUrl())) { + String dataTableName = generateUniqueName(); + createTable(dataTableName); + TableName dataTable = TableName.valueOf(dataTableName); + populateTable(dataTableName); + + injectEdge.incrementValue(1); + + // Delete a row + conn.createStatement().execute("DELETE FROM " + dataTableName + " WHERE id = 'a'"); + conn.commit(); + injectEdge.incrementValue(1); + + // Inject consistency point as UNAVAILABLE — simulating fallback when replay service fails + ReplicationLogReplayService.setConsistencyPointForTesting(getUtility().getConfiguration(), + CONSISTENCY_POINT_UNAVAILABLE); + + // Advance past maxLookback + injectEdge.incrementValue(MAX_LOOKBACK_AGE * 1000 + 1000); + + flush(dataTable); + majorCompact(dataTable); + + // Fallback retains all — delete marker NOT purged despite maxLookback passing + assertRawRowCount(conn, dataTable, ROWS_POPULATED); + } + } + + /** + * Test 5: Guard retains delete markers on a table with explicit TTL. The consistency point is set + * BEFORE the delete, time advances past both TTL and maxLookback, and the guard still retains the + * delete marker because its timestamp is newer than the consistency point. + */ + @Test(timeout = 120000L) + public void testGuardRetainsDeleteMarkersWithExplicitTTL() throws Exception { + int ttlSeconds = 30; + try (Connection conn = DriverManager.getConnection(getUrl())) { + String dataTableName = generateUniqueName(); + createTableWithTTL(dataTableName, ttlSeconds); + TableName dataTable = TableName.valueOf(dataTableName); + populateTable(dataTableName); + + injectEdge.incrementValue(1); + long beforeDeleteTime = EnvironmentEdgeManager.currentTimeMillis(); + + // Delete a row + conn.createStatement().execute("DELETE FROM " + dataTableName + " WHERE id = 'a'"); + conn.commit(); + injectEdge.incrementValue(1); + + // Set consistency point BEFORE the delete — replay hasn't caught up + long consistencyPoint = beforeDeleteTime - 1; + injectMockConsistencyPoint(consistencyPoint); + + // Advance past both TTL and maxLookback — without guard, marker would be purged + injectEdge.incrementValue(ttlSeconds * 1000 + 1000); + + flush(dataTable); + majorCompact(dataTable); + + // Delete marker retained — guard caps purge at consistencyPoint + assertRawRowCount(conn, dataTable, ROWS_POPULATED); + } + } + + private void injectMockConsistencyPoint(long consistencyPoint) { + ReplicationLogReplayService.setConsistencyPointForTesting(getUtility().getConfiguration(), + consistencyPoint); + } + + private void flush(TableName table) throws IOException { + getUtility().getAdmin().flush(table); + } + + private void majorCompact(TableName table) throws Exception { + TestUtil.majorCompact(getUtility(), table); + } + + private void createTable(String tableName) throws SQLException { + try (Connection conn = DriverManager.getConnection(getUrl())) { + conn.createStatement().execute( + "CREATE TABLE " + tableName + " (id VARCHAR(10) NOT NULL PRIMARY KEY, val1 VARCHAR(10)," + + " val2 VARCHAR(10), val3 VARCHAR(10))"); + conn.commit(); + } + } + + private void createTableWithTTL(String tableName, int ttlSeconds) throws SQLException { + try (Connection conn = DriverManager.getConnection(getUrl())) { + conn.createStatement().execute( + "CREATE TABLE " + tableName + " (id VARCHAR(10) NOT NULL PRIMARY KEY, val1 VARCHAR(10)," + + " val2 VARCHAR(10), val3 VARCHAR(10)) TTL=" + ttlSeconds); + conn.commit(); + } + } + + private void populateTable(String tableName) throws SQLException { + try (Connection conn = DriverManager.getConnection(getUrl())) { + conn.createStatement() + .execute("UPSERT INTO " + tableName + " VALUES ('a', 'ab', 'abc', 'abcd')"); + conn.commit(); + conn.createStatement() + .execute("UPSERT INTO " + tableName + " VALUES ('b', 'bc', 'bcd', 'bcde')"); + conn.commit(); + } + } +} diff --git a/phoenix-core/src/test/java/org/apache/phoenix/coprocessor/CompactionGuardFormulaTest.java b/phoenix-core/src/test/java/org/apache/phoenix/coprocessor/CompactionGuardFormulaTest.java new file mode 100644 index 00000000000..e13bb86d66b --- /dev/null +++ b/phoenix-core/src/test/java/org/apache/phoenix/coprocessor/CompactionGuardFormulaTest.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.coprocessor; + +import static org.apache.phoenix.replication.reader.ReplicationLogReplayService.CONSISTENCY_POINT_GUARD_DISABLED; +import static org.apache.phoenix.replication.reader.ReplicationLogReplayService.CONSISTENCY_POINT_UNAVAILABLE; +import static org.junit.Assert.assertEquals; + +import org.junit.Test; + +/** + * Unit tests for CompactionScanner.computeRowMaxLookbackWithGuard formula. Covers scenarios + * including those unreachable via standard TTL floor enforcement to guard against future changes to + * the TTL computation. + */ +public class CompactionGuardFormulaTest { + + @Test + public void testTtlDominatedGuardCaps() { + // ttlWindowStart > maxLookbackWindowStart > consistencyPoint + // Simulates conditional TTL or future removal of TTL floor enforcement + long consistencyPoint = 1000L; + long maxLookbackWindowStart = 2000L; + long ttlWindowStart = 3000L; + + long result = CompactionScanner.computeRowMaxLookbackWithGuard(ttlWindowStart, + maxLookbackWindowStart, consistencyPoint); + + // max(3000, 2000) = 3000, min(3000, 999) = 999 → guard caps at consistencyPoint - 1 + assertEquals(consistencyPoint - 1, result); + } + + @Test + public void testLookbackDominatedGuardCaps() { + // maxLookbackWindowStart > ttlWindowStart > consistencyPoint + long consistencyPoint = 1000L; + long ttlWindowStart = 2000L; + long maxLookbackWindowStart = 3000L; + + long result = CompactionScanner.computeRowMaxLookbackWithGuard(ttlWindowStart, + maxLookbackWindowStart, consistencyPoint); + + // max(2000, 3000) = 3000, min(3000, 999) = 999 → guard caps at consistencyPoint - 1 + assertEquals(consistencyPoint - 1, result); + } + + @Test + public void testConsistencyPointBeyondBoth_guardInactive() { + // consistencyPoint > max(ttlWindowStart, maxLookbackWindowStart) + long maxLookbackWindowStart = 2000L; + long ttlWindowStart = 1000L; + long consistencyPoint = 5000L; + + long result = CompactionScanner.computeRowMaxLookbackWithGuard(ttlWindowStart, + maxLookbackWindowStart, consistencyPoint); + + // max(1000, 2000) = 2000, min(2000, 4999) = 2000 → guard doesn't restrict + assertEquals(maxLookbackWindowStart, result); + } + + @Test + public void testConsistencyPointZero_retainsAll() { + // consistencyPoint = UNAVAILABLE signals fallback — retain all delete markers + long maxLookbackWindowStart = 2000L; + long ttlWindowStart = 3000L; + long consistencyPoint = CONSISTENCY_POINT_UNAVAILABLE; + + long result = CompactionScanner.computeRowMaxLookbackWithGuard(ttlWindowStart, + maxLookbackWindowStart, consistencyPoint); + + assertEquals(CONSISTENCY_POINT_UNAVAILABLE, result); + } + + @Test + public void testConsistencyPointMaxValue_guardDisabled() { + // GUARD_DISABLED used when replay is off — guard is effectively a no-op + long maxLookbackWindowStart = 2000L; + long ttlWindowStart = 3000L; + long consistencyPoint = CONSISTENCY_POINT_GUARD_DISABLED; + + long result = CompactionScanner.computeRowMaxLookbackWithGuard(ttlWindowStart, + maxLookbackWindowStart, consistencyPoint); + + // max(3000, 2000) = 3000, min(3000, MAX_VALUE) = 3000 → normal behavior + assertEquals(ttlWindowStart, result); + } + + @Test + public void testBoundaryDeleteAtExactlyConsistencyPoint_isRetained() { + // A delete marker at ts == consistencyPoint has NOT been replayed (exclusive upper bound). + // The formula must produce a boundary below consistencyPoint so that the strict-greater + // retention check (ts > boundary) retains it. + long consistencyPoint = 5000L; + long maxLookbackWindowStart = 6000L; + long ttlWindowStart = 4000L; + + long result = CompactionScanner.computeRowMaxLookbackWithGuard(ttlWindowStart, + maxLookbackWindowStart, consistencyPoint); + + // Boundary = consistencyPoint - 1 = 4999; a cell at ts=5000 satisfies ts > 4999 + assertEquals(consistencyPoint - 1, result); + } + + @Test + public void testConsistencyPointBetweenInputs() { + // ttlWindowStart < consistencyPoint < maxLookbackWindowStart + long ttlWindowStart = 1000L; + long consistencyPoint = 2500L; + long maxLookbackWindowStart = 3000L; + + long result = CompactionScanner.computeRowMaxLookbackWithGuard(ttlWindowStart, + maxLookbackWindowStart, consistencyPoint); + + // max(1000, 3000) = 3000, min(3000, 2499) = 2499 → guard caps + assertEquals(consistencyPoint - 1, result); + } +} diff --git a/phoenix-core/src/test/java/org/apache/phoenix/replication/reader/ReplicationConsistencyPointTest.java b/phoenix-core/src/test/java/org/apache/phoenix/replication/reader/ReplicationConsistencyPointTest.java new file mode 100644 index 00000000000..e74a63a41dc --- /dev/null +++ b/phoenix-core/src/test/java/org/apache/phoenix/replication/reader/ReplicationConsistencyPointTest.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.replication.reader; + +import static org.apache.phoenix.replication.reader.ReplicationLogReplayService.CONSISTENCY_POINT_UNAVAILABLE; +import static org.junit.Assert.assertEquals; + +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.hadoop.conf.Configuration; +import org.junit.Test; + +/** + * Tests for ReplicationLogReplayService.resolveConsistencyPoint caching behavior. + */ +public class ReplicationConsistencyPointTest { + + @Test + public void testCachedConsistencyPointAvoidsRepeatedFetches() { + Configuration conf = new Configuration(false); + AtomicInteger fetchCount = new AtomicInteger(0); + ReplicationLogReplayService.setConsistencyPointSupplierForTesting(conf, () -> { + fetchCount.incrementAndGet(); + return 500000L; + }); + + try { + String table = "TEST_TABLE"; + String cf = "0"; + + long result1 = ReplicationLogReplayService.resolveConsistencyPoint(conf, table, cf); + long result2 = ReplicationLogReplayService.resolveConsistencyPoint(conf, table, cf); + long result3 = ReplicationLogReplayService.resolveConsistencyPoint(conf, table, cf); + + assertEquals(500000L, result1); + assertEquals(500000L, result2); + assertEquals(500000L, result3); + assertEquals(1, fetchCount.get()); + } finally { + ReplicationLogReplayService.resetInstanceForTesting(); + } + } + + @Test + public void testTransientFailureNotCached_retriesOnNextCall() { + Configuration conf = new Configuration(false); + AtomicInteger fetchCount = new AtomicInteger(0); + ReplicationLogReplayService.setConsistencyPointSupplierForTesting(conf, () -> { + int attempt = fetchCount.incrementAndGet(); + if (attempt == 1) { + throw new RuntimeException("Simulated transient failure"); + } + return 700000L; + }); + + try { + String table = "TEST_TABLE"; + String cf = "0"; + + long result1 = ReplicationLogReplayService.resolveConsistencyPoint(conf, table, cf); + assertEquals(CONSISTENCY_POINT_UNAVAILABLE, result1); + + long result2 = ReplicationLogReplayService.resolveConsistencyPoint(conf, table, cf); + assertEquals(700000L, result2); + + assertEquals(2, fetchCount.get()); + } finally { + ReplicationLogReplayService.resetInstanceForTesting(); + } + } +}