Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
import org.apache.phoenix.jdbc.PhoenixPreparedStatement;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.replication.reader.ReplicationLogReplayService;
import org.apache.phoenix.schema.CompiledConditionalTTLExpression;
import org.apache.phoenix.schema.CompiledTTLExpression;
import org.apache.phoenix.schema.ConditionalTTLExpression;
Expand Down Expand Up @@ -139,6 +140,7 @@ public class CompactionScanner implements InternalScanner {
private final Store store;
private final RegionCoprocessorEnvironment env;
private long maxLookbackWindowStart;
private final long replicationConsistencyPoint;
private final long maxLookbackInMillis;
private int minVersion;
private int maxVersion;
Expand Down Expand Up @@ -199,8 +201,18 @@ public CompactionScanner(RegionCoprocessorEnvironment env, Store store,
this.maxLookbackWindowStart = this.maxLookbackInMillis == 0
? compactionTime
: compactionTime - (this.maxLookbackInMillis + 1);
ColumnFamilyDescriptor cfd = store.getColumnFamilyDescriptor();
Configuration conf = env.getConfiguration();
this.major = major && !forceMinorCompaction;
boolean replayEnabled =
conf.getBoolean(ReplicationLogReplayService.PHOENIX_REPLICATION_REPLAY_ENABLED,
ReplicationLogReplayService.DEFAULT_REPLICATION_REPLAY_ENABLED);
if (this.major && replayEnabled) {
this.replicationConsistencyPoint =
ReplicationLogReplayService.resolveConsistencyPoint(conf, tableName, columnFamilyName);
} else {
this.replicationConsistencyPoint = Long.MAX_VALUE;
}
ColumnFamilyDescriptor cfd = store.getColumnFamilyDescriptor();
this.minVersion = cfd.getMinVersions();
this.maxVersion = cfd.getMaxVersions();
this.keepDeletedCells = keepDeleted ? KeepDeletedCells.TTL : cfd.getKeepDeletedCells();
Expand Down Expand Up @@ -1631,6 +1643,15 @@ private String getTenantIdFromRowKey(byte[] rowKey, boolean isSharedIndex) throw
}
}

/**
* Computes the effective max-lookback boundary for a row, capped by the replication consistency
* point. Formula: min(max(ttlWindowStart, maxLookbackWindowStart), consistencyPoint).
*/
public static long computeRowMaxLookbackWithGuard(long ttlWindowStart,
long maxLookbackWindowStart, long replicationConsistencyPoint) {
return Math.min(Math.max(ttlWindowStart, maxLookbackWindowStart), replicationConsistencyPoint);
}

/**
* The context for a given row during compaction. A row may have multiple compaction row versions.
* CompactionScanner uses the same row context for these versions.
Expand All @@ -1657,7 +1678,8 @@ private void init() {
private void setTTL(long ttlInSecs) {
this.ttl = Math.max(ttlInSecs * 1000, maxLookbackInMillis + 1);
this.ttlWindowStart = ttlInSecs == HConstants.FOREVER ? 1 : compactionTime - ttl;
this.maxLookbackWindowStartForRow = Math.max(ttlWindowStart, maxLookbackWindowStart);
this.maxLookbackWindowStartForRow = computeRowMaxLookbackWithGuard(ttlWindowStart,
maxLookbackWindowStart, replicationConsistencyPoint);
if (LOGGER.isTraceEnabled()) {
LOGGER.trace(String.format("RowContext:- (ttlWindowStart=%d, maxLookbackWindowStart=%d)",
ttlWindowStart, maxLookbackWindowStart));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ public class ReplicationLogDiscoveryReplay extends ReplicationLogDiscovery {
*/
public static final double DEFAULT_WAITING_BUFFER_PERCENTAGE = 15.0;

private ReplicationRound lastRoundInSync;
private volatile ReplicationRound lastRoundInSync;

// AtomicReference ensures listener updates are visible to replay thread
private final AtomicReference<ReplicationReplayState> replicationReplayState =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.phoenix.thirdparty.com.google.common.base.Supplier;
import org.apache.phoenix.thirdparty.com.google.common.base.Suppliers;
import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;

/**
Expand All @@ -50,6 +53,7 @@ public class ReplicationLogReplayService {
*/
public static final boolean DEFAULT_REPLICATION_REPLAY_ENABLED = false;


/**
* Number of threads in the executor pool for the replication replay service
*/
Expand Down Expand Up @@ -77,14 +81,35 @@ public class ReplicationLogReplayService {
*/
public static final int DEFAULT_REPLICATION_REPLAY_SERVICE_EXECUTOR_SHUTDOWN_TIMEOUT_SECONDS = 30;

private static final long CONSISTENCY_POINT_CACHE_TTL_SECONDS = 30;

private static volatile ReplicationLogReplayService instance;

private final Configuration conf;
private ScheduledExecutorService scheduler;
private volatile boolean isRunning = false;
private final Supplier<Long> cachedConsistencyPoint;

private ReplicationLogReplayService(final Configuration conf) {
this.conf = conf;
this.cachedConsistencyPoint = Suppliers.memoizeWithExpiration(() -> {
try {
return getConsistencyPoint();
} catch (IOException | SQLException e) {
throw new RuntimeException("Failed to fetch consistency point", e);
}
}, CONSISTENCY_POINT_CACHE_TTL_SECONDS, TimeUnit.SECONDS);
}

private ReplicationLogReplayService(long fixedConsistencyPoint) {
this.conf = null;
this.cachedConsistencyPoint = () -> fixedConsistencyPoint;
}

private ReplicationLogReplayService(Supplier<Long> supplier) {
this.conf = null;
this.cachedConsistencyPoint = Suppliers.memoizeWithExpiration(supplier,
CONSISTENCY_POINT_CACHE_TTL_SECONDS, TimeUnit.SECONDS);
}

/**
Expand All @@ -105,6 +130,21 @@ public static ReplicationLogReplayService getInstance(Configuration conf) throws
return instance;
}

@VisibleForTesting
public static void setConsistencyPointForTesting(long fixedConsistencyPoint) {
instance = new ReplicationLogReplayService(fixedConsistencyPoint);
}

@VisibleForTesting
public static void setConsistencyPointSupplierForTesting(Supplier<Long> supplier) {
instance = new ReplicationLogReplayService(supplier);
}

@VisibleForTesting
public static void resetInstanceForTesting() {
instance = null;
}

/**
* Starts the replication log replay service by initializing the scheduler and scheduling periodic
* replay operations for each HA Group.
Expand Down Expand Up @@ -229,6 +269,27 @@ protected long getConsistencyPoint() throws IOException, SQLException {
return consistencyPoint;
}

/**
* Resolves the minimum replication consistency point across all HA groups. Uses a cached value
* with a 30-second TTL to avoid repeated NameNode RPCs during compaction bursts. Returns 0L on
* any failure (caller treats 0 as "retain all delete markers").
*/
public static long resolveConsistencyPoint(Configuration conf, String tableName,
String columnFamilyName) {
try {
long consistencyPoint = getInstance(conf).cachedConsistencyPoint.get();
if (LOG.isDebugEnabled()) {
LOG.debug("Replication guard: table={} store={} consistencyPoint={}", tableName,
columnFamilyName, consistencyPoint);
}
return consistencyPoint;
} catch (Exception e) {
LOG.warn("Replication guard: consistency point unavailable for table={} store={}."
+ " Retaining all delete markers.", tableName, columnFamilyName, e);
return 0L;
}
}

/** Returns the list of HA groups on the cluster */
protected List<String> getReplicationGroups() throws SQLException {
return HAGroupStoreManager.getInstance(conf).getHAGroupNames();
Expand Down
Loading