Skip to content

Commit 3d9d889

Browse files
ericm-dbclaude
andcommitted
Refactor: inject beforeRepair() into AutoSnapshotLoader, remove try-catch
Instead of wrapping loadWithCheckpointId's snapshot load in a try-catch that delegates to AutoSnapshotLoader on failure, route V2 through AutoSnapshotLoader from the start (like V1 already does). Added a beforeRepair() callback to AutoSnapshotLoader (default no-op) that is called once when repair begins. V2 overrides it to enrich the lineage via getFullLineage(). getEligibleSnapshots() is re-called after beforeRepair() so the enriched lineage is picked up automatically. This eliminates the try-catch and makes V2 follow the same structural pattern as V1: AutoSnapshotLoader drives the entire load path. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 7e3d12c commit 3d9d889

2 files changed

Lines changed: 63 additions & 87 deletions

File tree

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/AutoSnapshotLoader.scala

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,20 @@ abstract class AutoSnapshotLoader(
6464
/** Get a list of eligible snapshot versions in the checkpoint directory that can be loaded */
6565
protected def getEligibleSnapshots(versionToLoad: Long): Seq[Long]
6666

67+
/**
68+
* Called once when the first snapshot load fails and auto-repair is about to begin.
69+
* Returns the eligible snapshots to use for the repair loop. Subclasses can override
70+
* this to return an enriched set of snapshots (e.g., by building a fuller lineage
71+
* that was initially sparse for optimization).
72+
*
73+
* The default delegates to [[getEligibleSnapshots]].
74+
*
75+
* @param versionToLoad The version being loaded
76+
* @return Eligible snapshot versions for repair
77+
*/
78+
protected def getEligibleSnapshotsForRepair(versionToLoad: Long): Seq[Long] =
79+
getEligibleSnapshots(versionToLoad)
80+
6781
/**
6882
* Load the latest snapshot for the specified version from the checkpoint directory.
6983
* If Auto snapshot repair is enabled, the snapshot version loaded may be lower than
@@ -114,13 +128,15 @@ abstract class AutoSnapshotLoader(
114128
// we would only get here if auto snapshot repair is enabled
115129
assert(autoSnapshotRepairEnabled)
116130

117-
val remainingEligibleSnapshots = if (eligibleSnapshots.length > 1) {
118-
// skip the first snapshot, since we already tried it
119-
eligibleSnapshots.tail
120-
} else {
121-
// no more snapshots to try
122-
Seq.empty
123-
}
131+
// getEligibleSnapshotsForRepair may return a different list than the original
132+
// getEligibleSnapshots (e.g., V2 enriches a sparse lineage to discover more
133+
// snapshots). We filter by value rather than using .tail because the first
134+
// snapshot we already tried may appear at any position in the new list.
135+
val remainingEligibleSnapshots =
136+
(getEligibleSnapshotsForRepair(versionToLoad) :+ 0L)
137+
.distinct
138+
.sorted(Ordering[Long].reverse)
139+
.filter(_ != firstEligibleSnapshot)
124140

125141
// select remaining snapshots that are within the maxChangeFileReplay limit
126142
val selectedRemainingSnapshots = remainingEligibleSnapshots.filter(

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala

Lines changed: 40 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -490,89 +490,27 @@ class RocksDB(
490490
// Handle normal checkpoint loading
491491
closeDB(ignoreException = false)
492492

493-
val (latestSnapshotVersion, latestSnapshotUniqueId) = {
494-
// Special handling when version is 0.
495-
// When loading the very first version (0), stateStoreCkptId does not need to be defined
496-
// because there won't be 0.changelog / 0.zip file created in RocksDB under v2.
497-
if (version == 0) {
498-
assert(stateStoreCkptId.isEmpty,
499-
"stateStoreCkptId should be empty when version is zero")
500-
(0L, None)
501-
// When there is a snapshot file, it is the ground truth, we can skip
502-
// reconstructing the lineage from changelog file.
503-
} else if (fileManager.existsSnapshotFile(version, stateStoreCkptId)) {
493+
// Build initial lineage for version > 0. Version 0 is the empty initial state
494+
// with no checkpoint files, so lineage stays empty.
495+
if (version > 0) {
496+
// If the exact snapshot file exists on DFS, use a sparse lineage (just the
497+
// target version) as an optimization to skip reading the changelog header.
498+
// If auto-repair is needed later, getEligibleSnapshotsForRepair() will
499+
// enrich this to the full lineage via getFullLineage().
500+
if (fileManager.existsSnapshotFile(version, stateStoreCkptId)) {
504501
currVersionLineage = Array(LineageItem(version, stateStoreCkptId.get))
505-
(version, stateStoreCkptId)
506502
} else {
507503
currVersionLineage = getLineageFromChangelogFile(version, stateStoreCkptId) :+
508504
LineageItem(version, stateStoreCkptId.get)
509505
currVersionLineage = currVersionLineage.sortBy(_.version)
510-
511-
val latestSnapshotVersionsAndUniqueId =
512-
fileManager.getLatestSnapshotVersionAndUniqueIdFromLineage(currVersionLineage)
513-
latestSnapshotVersionsAndUniqueId match {
514-
case Some(pair) => (pair._1, Option(pair._2))
515-
case None if currVersionLineage.head.version == 1L =>
516-
logDebug(log"Cannot find latest snapshot based on lineage but first version " +
517-
log"is 1, use 0 as default. Lineage: ${MDC(LogKeys.LINEAGE, lineageManager)}")
518-
(0L, None)
519-
case _ =>
520-
throw QueryExecutionErrors.cannotFindBaseSnapshotCheckpoint(
521-
printLineageItems(currVersionLineage))
522-
}
523506
}
524507
}
525508

526-
logInfo(log"Loaded latestSnapshotVersion: ${
527-
MDC(LogKeys.SNAPSHOT_VERSION, latestSnapshotVersion)}, latestSnapshotUniqueId: ${
528-
MDC(LogKeys.UUID, latestSnapshotUniqueId)}")
529-
530-
try {
531-
val metadata = fileManager.loadCheckpointFromDfs(latestSnapshotVersion,
532-
workingDir, rocksDBFileMapping, latestSnapshotUniqueId)
533-
loadedFromDfs = version > 0
534-
loadedVersion = latestSnapshotVersion
535-
536-
lastSnapshotVersion = latestSnapshotVersion
537-
lineageManager.resetLineage(currVersionLineage)
538-
539-
fileManager.setMaxSeenVersion(version)
540-
reportSnapshotUploadToCoordinator(latestSnapshotVersion)
541-
542-
openLocalRocksDB(metadata)
543-
544-
if (loadedVersion != version) {
545-
val versionsAndUniqueIds = currVersionLineage.collect {
546-
case i if i.version > loadedVersion && i.version <= version =>
547-
(i.version, Option(i.checkpointUniqueId))
548-
}
549-
replayChangelog(versionsAndUniqueIds)
550-
loadedVersion = version
551-
lineageManager.resetLineage(currVersionLineage)
552-
}
553-
} catch {
554-
case NonFatal(e) if enableChangelogCheckpointing &&
555-
conf.stateStoreConf.autoSnapshotRepairEnabled =>
556-
logWarning(log"Failed to load V2 snapshot/changelog for version ${
557-
MDC(LogKeys.VERSION_NUM, version)}, attempting auto-repair", e)
558-
// Build the full lineage from version 1 to the target version so that
559-
// auto-repair can fall back to any snapshot (including version 0 with
560-
// full changelog replay). getFullLineage walks backward through
561-
// changelog file headers to reconstruct the complete chain.
562-
if (stateStoreCkptId.isDefined) {
563-
try {
564-
currVersionLineage = getFullLineage(1, version, stateStoreCkptId)
565-
} catch {
566-
case NonFatal(_) => // keep existing lineage; may limit fallback options
567-
}
568-
}
569-
val (_, _, autoRepaired) =
570-
loadSnapshotWithCheckpointId(version, currVersionLineage)
571-
performedSnapshotAutoRepair = autoRepaired
572-
loadedFromDfs = version > 0
573-
loadedVersion = version
574-
lineageManager.resetLineage(currVersionLineage)
575-
}
509+
// Delegate to AutoSnapshotLoader which handles both the happy path
510+
// and auto-repair fallback to older snapshots.
511+
loadSnapshotWithCheckpointId(version, stateStoreCkptId, currVersionLineage)
512+
loadedFromDfs = version > 0
513+
lineageManager.resetLineage(currVersionLineage)
576514
// After changelog replay the numKeysOnWritingVersion will be updated to
577515
// the correct number of keys in the loaded version.
578516
numKeysOnLoadedVersion = numKeysOnWritingVersion
@@ -752,13 +690,20 @@ class RocksDB(
752690
* Changelog replay is included inside the load callback so that corrupt changelogs
753691
* also trigger fallback to the next older snapshot.
754692
*
693+
* Sets [[performedSnapshotAutoRepair]] as a side effect if auto-repair was used.
694+
*
755695
* @param versionToLoad The target version to load
756-
* @param currVersionLineage The lineage for the target version
757-
* @return (loadedSnapshotVersion, loadedSnapshotUniqueId, autoRepairCompleted)
696+
* @param stateStoreCkptId The checkpoint ID for the target version (for lineage enrichment)
697+
* @param initialLineage The lineage for the target version (may be enriched by
698+
* getEligibleSnapshotsForRepair during auto-repair)
758699
*/
759700
private def loadSnapshotWithCheckpointId(
760701
versionToLoad: Long,
761-
currVersionLineage: Array[LineageItem]): (Long, Option[String], Boolean) = {
702+
stateStoreCkptId: Option[String],
703+
initialLineage: Array[LineageItem]): Unit = {
704+
// Local var so that beforeRepair can enrich the lineage and getEligibleSnapshots
705+
// (re-called after beforeRepair) sees the updated lineage.
706+
var currVersionLineage = initialLineage
762707
val allowAutoSnapshotRepair = if (enableChangelogCheckpointing) {
763708
conf.stateStoreConf.autoSnapshotRepairEnabled
764709
} else {
@@ -814,11 +759,26 @@ class RocksDB(
814759
}
815760
snapshotsInLineage.map(_._1)
816761
}
762+
763+
override protected def getEligibleSnapshotsForRepair(version: Long): Seq[Long] = {
764+
// Build the full lineage from version 1 to the target version so that
765+
// auto-repair can fall back to any snapshot (including version 0 with
766+
// full changelog replay). getFullLineage walks backward through
767+
// changelog file headers to reconstruct the complete chain.
768+
if (stateStoreCkptId.isDefined) {
769+
try {
770+
currVersionLineage = getFullLineage(1, versionToLoad, stateStoreCkptId)
771+
} catch {
772+
case NonFatal(_) => // keep existing lineage; may limit fallback options
773+
}
774+
}
775+
// Re-discover eligible snapshots from the (potentially enriched) lineage
776+
getEligibleSnapshots(version)
777+
}
817778
}
818779

819-
val (version, autoRepairCompleted) = snapshotLoader.loadSnapshot(versionToLoad)
780+
val (_, autoRepairCompleted) = snapshotLoader.loadSnapshot(versionToLoad)
820781
performedSnapshotAutoRepair = autoRepairCompleted
821-
(version, loadedSnapshotUniqueId, autoRepairCompleted)
822782
}
823783

824784
/**

0 commit comments

Comments
 (0)