Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,20 @@ abstract class AutoSnapshotLoader(
/** Get a list of eligible snapshot versions in the checkpoint directory that can be loaded */
protected def getEligibleSnapshots(versionToLoad: Long): Seq[Long]

/**
* Called once when the first snapshot load fails and auto-repair is about to begin.
* Returns the eligible snapshots to use for the repair loop. Subclasses can override
* this to return an enriched set of snapshots (e.g., by building a fuller lineage
* that was initially sparse for optimization).
*
* The default delegates to [[getEligibleSnapshots]].
*
* @param versionToLoad The version being loaded
* @return Eligible snapshot versions for repair
*/
protected def getEligibleSnapshotsForRepair(versionToLoad: Long): Seq[Long] =
getEligibleSnapshots(versionToLoad)

/**
* Load the latest snapshot for the specified version from the checkpoint directory.
* If Auto snapshot repair is enabled, the snapshot version loaded may be lower than
Expand Down Expand Up @@ -114,13 +128,15 @@ abstract class AutoSnapshotLoader(
// we would only get here if auto snapshot repair is enabled
assert(autoSnapshotRepairEnabled)

val remainingEligibleSnapshots = if (eligibleSnapshots.length > 1) {
// skip the first snapshot, since we already tried it
eligibleSnapshots.tail
} else {
// no more snapshots to try
Seq.empty
}
// getEligibleSnapshotsForRepair may return a different list than the original
// getEligibleSnapshots (e.g., V2 enriches a sparse lineage to discover more
// snapshots). We filter by value rather than using .tail because the first
// snapshot we already tried may appear at any position in the new list.
val remainingEligibleSnapshots =
(getEligibleSnapshotsForRepair(versionToLoad) :+ 0L)
.distinct
.sorted(Ordering[Long].reverse)
.filter(_ != firstEligibleSnapshot)

// select remaining snapshots that are within the maxChangeFileReplay limit
val selectedRemainingSnapshots = remainingEligibleSnapshots.filter(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -490,70 +490,35 @@ class RocksDB(
// Handle normal checkpoint loading
closeDB(ignoreException = false)

val (latestSnapshotVersion, latestSnapshotUniqueId) = {
// Special handling when version is 0.
// When loading the very first version (0), stateStoreCkptId does not need to be defined
// because there won't be 0.changelog / 0.zip file created in RocksDB under v2.
if (version == 0) {
assert(stateStoreCkptId.isEmpty,
"stateStoreCkptId should be empty when version is zero")
(0L, None)
// When there is a snapshot file, it is the ground truth, we can skip
// reconstructing the lineage from changelog file.
} else if (fileManager.existsSnapshotFile(version, stateStoreCkptId)) {
// Build initial lineage for version > 0. Version 0 is the empty initial state
// with no checkpoint files, so lineage stays empty.
if (version == 0) {
assert(stateStoreCkptId.isEmpty,
"stateStoreCkptId should be empty when version is zero")
// Clear stale lineage from a previous session so that
// loadSnapshotWithCheckpointId only considers version 0 (empty state).
currVersionLineage = Array.empty
}
if (version > 0) {
// If the exact snapshot file exists on DFS, use a sparse lineage (just the
// target version) as an optimization to skip reading the changelog header.
// If auto-repair is needed later, getEligibleSnapshotsForRepair() will
// enrich this to the full lineage via getFullLineage().
if (fileManager.existsSnapshotFile(version, stateStoreCkptId)) {
currVersionLineage = Array(LineageItem(version, stateStoreCkptId.get))
(version, stateStoreCkptId)
} else {
currVersionLineage = getLineageFromChangelogFile(version, stateStoreCkptId) :+
LineageItem(version, stateStoreCkptId.get)
currVersionLineage = currVersionLineage.sortBy(_.version)

val latestSnapshotVersionsAndUniqueId =
fileManager.getLatestSnapshotVersionAndUniqueIdFromLineage(currVersionLineage)
latestSnapshotVersionsAndUniqueId match {
case Some(pair) => (pair._1, Option(pair._2))
case None if currVersionLineage.head.version == 1L =>
logDebug(log"Cannot find latest snapshot based on lineage but first version " +
log"is 1, use 0 as default. Lineage: ${MDC(LogKeys.LINEAGE, lineageManager)}")
(0L, None)
case _ =>
throw QueryExecutionErrors.cannotFindBaseSnapshotCheckpoint(
printLineageItems(currVersionLineage))
}
}
}

logInfo(log"Loaded latestSnapshotVersion: ${
MDC(LogKeys.SNAPSHOT_VERSION, latestSnapshotVersion)}, latestSnapshotUniqueId: ${
MDC(LogKeys.UUID, latestSnapshotUniqueId)}")

val metadata = fileManager.loadCheckpointFromDfs(latestSnapshotVersion,
workingDir, rocksDBFileMapping, latestSnapshotUniqueId)
// version 0 is the empty initial state; loadCheckpointFromDfs skips DFS for it
// Delegate to AutoSnapshotLoader which handles both the happy path
// and auto-repair fallback to older snapshots.
currVersionLineage =
loadSnapshotWithCheckpointId(version, stateStoreCkptId, currVersionLineage)
loadedFromDfs = version > 0
loadedVersion = latestSnapshotVersion

// reset the last snapshot version to the latest available snapshot version
lastSnapshotVersion = latestSnapshotVersion
lineageManager.resetLineage(currVersionLineage)

// Initialize maxVersion upon successful load from DFS
fileManager.setMaxSeenVersion(version)

// Report this snapshot version to the coordinator
reportSnapshotUploadToCoordinator(latestSnapshotVersion)

openLocalRocksDB(metadata)

if (loadedVersion != version) {
val versionsAndUniqueIds = currVersionLineage.collect {
case i if i.version > loadedVersion && i.version <= version =>
(i.version, Option(i.checkpointUniqueId))
}
replayChangelog(versionsAndUniqueIds)
loadedVersion = version
lineageManager.resetLineage(currVersionLineage)
}
// After changelog replay the numKeysOnWritingVersion will be updated to
// the correct number of keys in the loaded version.
numKeysOnLoadedVersion = numKeysOnWritingVersion
Expand Down Expand Up @@ -727,6 +692,107 @@ class RocksDB(
version
}

/**
* V2-aware snapshot loading with auto-repair support.
* Uses lineage to discover eligible snapshots and falls back to older ones if corrupt.
* Changelog replay is included inside the load callback so that corrupt changelogs
* also trigger fallback to the next older snapshot.
*
* Sets [[performedSnapshotAutoRepair]] as a side effect if auto-repair was used.
*
* @param versionToLoad The target version to load
* @param stateStoreCkptId The checkpoint ID for the target version (for lineage enrichment)
* @param initialLineage The lineage for the target version (may be enriched by
* getEligibleSnapshotsForRepair during auto-repair)
* @return The (potentially enriched) lineage after loading. During auto-repair,
* this may contain more entries than the initial sparse lineage.
*/
private def loadSnapshotWithCheckpointId(
versionToLoad: Long,
stateStoreCkptId: Option[String],
initialLineage: Array[LineageItem]): Array[LineageItem] = {
// Local var so that beforeRepair can enrich the lineage and getEligibleSnapshots
// (re-called after beforeRepair) sees the updated lineage.
var currVersionLineage = initialLineage
val allowAutoSnapshotRepair = if (enableChangelogCheckpointing) {
conf.stateStoreConf.autoSnapshotRepairEnabled
} else {
false
}

// Side-channel map: version -> uniqueId, populated by getEligibleSnapshots,
// consumed by loadSnapshotFromCheckpoint
val eligibleSnapshotUniqueIds = mutable.Map[Long, String]()

val snapshotLoader = new AutoSnapshotLoader(
allowAutoSnapshotRepair,
conf.stateStoreConf.autoSnapshotRepairNumFailuresBeforeActivating,
conf.stateStoreConf.autoSnapshotRepairMaxChangeFileReplay,
loggingId) {
override protected def beforeLoad(): Unit = closeDB(ignoreException = false)

override protected def loadSnapshotFromCheckpoint(snapshotVersion: Long): Unit = {
val uniqueId = eligibleSnapshotUniqueIds.get(snapshotVersion)
val remoteMetaData = fileManager.loadCheckpointFromDfs(snapshotVersion,
workingDir, rocksDBFileMapping, uniqueId)

loadedVersion = snapshotVersion
fileManager.setMaxSeenVersion(versionToLoad)
openLocalRocksDB(remoteMetaData)
lastSnapshotVersion = snapshotVersion
reportSnapshotUploadToCoordinator(snapshotVersion)

// Replay changelogs from snapshot to target version
if (snapshotVersion != versionToLoad) {
val versionsAndUniqueIds = currVersionLineage.collect {
case i if i.version > snapshotVersion && i.version <= versionToLoad =>
(i.version, Option(i.checkpointUniqueId))
}
replayChangelog(versionsAndUniqueIds)
loadedVersion = versionToLoad
}

}

override protected def onLoadSnapshotFromCheckpointFailure(): Unit = {
loadedVersion = -1
}

override protected def getEligibleSnapshots(version: Long): Seq[Long] = {
val snapshotsInLineage =
fileManager.getSnapshotVersionsAndUniqueIdsFromLineage(currVersionLineage)
eligibleSnapshotUniqueIds.clear()
snapshotsInLineage.foreach { case (ver, id) =>
eligibleSnapshotUniqueIds.put(ver, id)
}
snapshotsInLineage.map(_._1)
}

override protected def getEligibleSnapshotsForRepair(version: Long): Seq[Long] = {
// Build the full lineage from version 1 to the target version so that
// auto-repair can fall back to any snapshot (including version 0 with
// full changelog replay). getFullLineage walks backward through
// changelog file headers to reconstruct the complete chain.
if (stateStoreCkptId.isDefined) {
try {
currVersionLineage = getFullLineage(1, versionToLoad, stateStoreCkptId)
} catch {
case NonFatal(e) =>
logWarning(log"Failed to enrich lineage via getFullLineage during " +
log"auto-repair for version ${MDC(LogKeys.VERSION_NUM, versionToLoad)}. " +
log"Falling back to existing lineage; repair options may be limited.", e)
}
}
// Re-discover eligible snapshots from the (potentially enriched) lineage
getEligibleSnapshots(version)
}
}

val (_, autoRepairCompleted) = snapshotLoader.loadSnapshot(versionToLoad)
performedSnapshotAutoRepair = autoRepairCompleted
currVersionLineage
}

/**
* Function to check if col family is internal or not based on information recorded in
* checkpoint metadata.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -455,14 +455,14 @@ class RocksDBFileManager(
/**
* Based on the ground truth lineage loaded from changelog file (lineage), this function
* does file listing to find all snapshot (version, uniqueId) pairs, and finds
* the ground truth latest snapshot (version, uniqueId) the db instance needs to load.
* all snapshots that match the lineage, sorted by version descending.
*
* @param lineage The ground truth lineage loaded from changelog file, sorted by id
* @return The ground truth latest snapshot (version, uniqueId) the db instance needs to load,
* when the return value is None it means ther is no such snapshot found.
* @return All snapshot (version, uniqueId) pairs matching the lineage, sorted descending
* by version. Empty if no matching snapshots found.
*/
def getLatestSnapshotVersionAndUniqueIdFromLineage(
lineage: Array[LineageItem]): Option[(Long, String)] = {
def getSnapshotVersionsAndUniqueIdsFromLineage(
lineage: Array[LineageItem]): Seq[(Long, String)] = {
val path = new Path(dfsRootDir)
if (fm.exists(path)) {
fm.list(path, onlyZipFiles)
Expand All @@ -473,12 +473,26 @@ class RocksDBFileManager(
}
.sortBy(_._1)
.reverse
.headOption
.toSeq
} else {
None
Seq.empty
}
}

/**
* Based on the ground truth lineage loaded from changelog file (lineage), this function
* does file listing to find all snapshot (version, uniqueId) pairs, and finds
* the ground truth latest snapshot (version, uniqueId) the db instance needs to load.
*
* @param lineage The ground truth lineage loaded from changelog file, sorted by id
* @return The ground truth latest snapshot (version, uniqueId) the db instance needs to load,
* when the return value is None it means ther is no such snapshot found.
*/
def getLatestSnapshotVersionAndUniqueIdFromLineage(
lineage: Array[LineageItem]): Option[(Long, String)] = {
getSnapshotVersionsAndUniqueIdsFromLineage(lineage).headOption
}

/** Get the latest version available in the DFS directory. If no data present, it returns 0. */
def getLatestVersion(): Long = {
val path = new Path(dfsRootDir)
Expand Down
Loading