Skip to content

Commit 1580bde

Browse files
ericm-dbclaude
andcommitted
Fix V2 auto-repair issues and add comprehensive tests
- Fix setMaxSeenVersion to use target version instead of snapshot version - Add logWarning when getFullLineage fails during auto-repair - Remove dead loadedSnapshotUniqueId variable - Propagate enriched lineage back from loadSnapshotWithCheckpointId - Restore version 0 assertion for stateStoreCkptId - Add tests: maxChangeFileReplay limit, load-after-repair roundtrip, no snapshots + full replay, getFullLineage failure handling Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 3d9d889 commit 1580bde

2 files changed

Lines changed: 287 additions & 6 deletions

File tree

  • sql/core/src

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -492,6 +492,13 @@ class RocksDB(
492492

493493
// Build initial lineage for version > 0. Version 0 is the empty initial state
494494
// with no checkpoint files, so lineage stays empty.
495+
if (version == 0) {
496+
assert(stateStoreCkptId.isEmpty,
497+
"stateStoreCkptId should be empty when version is zero")
498+
// Clear stale lineage from a previous session so that
499+
// loadSnapshotWithCheckpointId only considers version 0 (empty state).
500+
currVersionLineage = Array.empty
501+
}
495502
if (version > 0) {
496503
// If the exact snapshot file exists on DFS, use a sparse lineage (just the
497504
// target version) as an optimization to skip reading the changelog header.
@@ -508,7 +515,8 @@ class RocksDB(
508515

509516
// Delegate to AutoSnapshotLoader which handles both the happy path
510517
// and auto-repair fallback to older snapshots.
511-
loadSnapshotWithCheckpointId(version, stateStoreCkptId, currVersionLineage)
518+
currVersionLineage =
519+
loadSnapshotWithCheckpointId(version, stateStoreCkptId, currVersionLineage)
512520
loadedFromDfs = version > 0
513521
lineageManager.resetLineage(currVersionLineage)
514522
// After changelog replay the numKeysOnWritingVersion will be updated to
@@ -696,11 +704,13 @@ class RocksDB(
696704
* @param stateStoreCkptId The checkpoint ID for the target version (for lineage enrichment)
697705
* @param initialLineage The lineage for the target version (may be enriched by
698706
* getEligibleSnapshotsForRepair during auto-repair)
707+
* @return The (potentially enriched) lineage after loading. During auto-repair,
708+
* this may contain more entries than the initial sparse lineage.
699709
*/
700710
private def loadSnapshotWithCheckpointId(
701711
versionToLoad: Long,
702712
stateStoreCkptId: Option[String],
703-
initialLineage: Array[LineageItem]): Unit = {
713+
initialLineage: Array[LineageItem]): Array[LineageItem] = {
704714
// Local var so that beforeRepair can enrich the lineage and getEligibleSnapshots
705715
// (re-called after beforeRepair) sees the updated lineage.
706716
var currVersionLineage = initialLineage
@@ -713,7 +723,6 @@ class RocksDB(
713723
// Side-channel map: version -> uniqueId, populated by getEligibleSnapshots,
714724
// consumed by loadSnapshotFromCheckpoint
715725
val eligibleSnapshotUniqueIds = mutable.Map[Long, String]()
716-
var loadedSnapshotUniqueId: Option[String] = None
717726

718727
val snapshotLoader = new AutoSnapshotLoader(
719728
allowAutoSnapshotRepair,
@@ -728,7 +737,7 @@ class RocksDB(
728737
workingDir, rocksDBFileMapping, uniqueId)
729738

730739
loadedVersion = snapshotVersion
731-
fileManager.setMaxSeenVersion(snapshotVersion)
740+
fileManager.setMaxSeenVersion(versionToLoad)
732741
openLocalRocksDB(remoteMetaData)
733742
lastSnapshotVersion = snapshotVersion
734743
reportSnapshotUploadToCoordinator(snapshotVersion)
@@ -743,7 +752,6 @@ class RocksDB(
743752
loadedVersion = versionToLoad
744753
}
745754

746-
loadedSnapshotUniqueId = uniqueId
747755
}
748756

749757
override protected def onLoadSnapshotFromCheckpointFailure(): Unit = {
@@ -769,7 +777,10 @@ class RocksDB(
769777
try {
770778
currVersionLineage = getFullLineage(1, versionToLoad, stateStoreCkptId)
771779
} catch {
772-
case NonFatal(_) => // keep existing lineage; may limit fallback options
780+
case NonFatal(e) =>
781+
logWarning(log"Failed to enrich lineage via getFullLineage during " +
782+
log"auto-repair for version ${MDC(LogKeys.VERSION_NUM, versionToLoad)}. " +
783+
log"Falling back to existing lineage; repair options may be limited.", e)
773784
}
774785
}
775786
// Re-discover eligible snapshots from the (potentially enriched) lineage
@@ -779,6 +790,7 @@ class RocksDB(
779790

780791
val (_, autoRepairCompleted) = snapshotLoader.loadSnapshot(versionToLoad)
781792
performedSnapshotAutoRepair = autoRepairCompleted
793+
currVersionLineage
782794
}
783795

784796
/**

sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala

Lines changed: 269 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4256,6 +4256,275 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures with SharedSparkSession
42564256
}
42574257
}
42584258

4259+
testWithChangelogCheckpointingEnabled(
4260+
"V2 auto-repair respects maxChangeFileReplay limit") {
4261+
withSQLConf(
4262+
SQLConf.STREAMING_CHECKPOINT_FILE_CHECKSUM_ENABLED.key -> false.toString,
4263+
SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "2",
4264+
SQLConf.STATE_STORE_AUTO_SNAPSHOT_REPAIR_ENABLED.key -> true.toString,
4265+
SQLConf.STATE_STORE_AUTO_SNAPSHOT_REPAIR_NUM_FAILURES_BEFORE_ACTIVATING.key -> "1"
4266+
) {
4267+
withTempDir { dir =>
4268+
val remoteDir = dir.getCanonicalPath
4269+
val versionToUniqueId = mutable.Map[Long, String]()
4270+
4271+
// Build 6 versions with snapshots at 2, 4, and 6
4272+
withDB(remoteDir, enableStateStoreCheckpointIds = true,
4273+
versionToUniqueId = versionToUniqueId) { db =>
4274+
for (i <- 0 until 6) {
4275+
db.load(i)
4276+
db.put(s"k$i", s"v$i")
4277+
db.commit()
4278+
if (i > 0 && i % 2 == 1) db.doMaintenance() // snapshots at 2, 4, 6
4279+
}
4280+
}
4281+
4282+
def corruptFile(file: File): Unit =
4283+
new PrintWriter(file) { close() }
4284+
4285+
// Corrupt snapshots at version 6 and 4
4286+
val uuid6 = versionToUniqueId(6)
4287+
val uuid4 = versionToUniqueId(4)
4288+
corruptFile(new File(remoteDir, s"6_${uuid6}.zip"))
4289+
corruptFile(new File(remoteDir, s"4_${uuid4}.zip"))
4290+
4291+
// With maxChangeFileReplay=2, snapshot 2 (4 changelogs away) should be
4292+
// skipped, and version 0 (6 changelogs away) should also be skipped.
4293+
// Auto-repair should fail since no eligible snapshot is within range.
4294+
withSQLConf(
4295+
SQLConf.STATE_STORE_AUTO_SNAPSHOT_REPAIR_MAX_CHANGE_FILE_REPLAY.key -> "2"
4296+
) {
4297+
withDB(remoteDir, enableStateStoreCheckpointIds = true,
4298+
versionToUniqueId = versionToUniqueId) { db =>
4299+
intercept[StateStoreAutoSnapshotRepairFailed] {
4300+
db.load(6)
4301+
}
4302+
}
4303+
}
4304+
4305+
// With maxChangeFileReplay=5, snapshot 2 (4 changelogs away) should be
4306+
// eligible and auto-repair should succeed.
4307+
withSQLConf(
4308+
SQLConf.STATE_STORE_AUTO_SNAPSHOT_REPAIR_MAX_CHANGE_FILE_REPLAY.key -> "5"
4309+
) {
4310+
withDB(remoteDir, enableStateStoreCheckpointIds = true,
4311+
versionToUniqueId = versionToUniqueId) { db =>
4312+
db.load(6)
4313+
for (i <- 0 until 6) {
4314+
assert(toStr(db.get(s"k$i")) == s"v$i")
4315+
}
4316+
db.commit()
4317+
assert(db.metricsOpt.get.numSnapshotsAutoRepaired == 1)
4318+
}
4319+
}
4320+
}
4321+
}
4322+
}
4323+
4324+
testWithChangelogCheckpointingEnabled(
4325+
"V2 auto-repair followed by commit produces valid state for subsequent loads") {
4326+
withSQLConf(
4327+
SQLConf.STREAMING_CHECKPOINT_FILE_CHECKSUM_ENABLED.key -> false.toString,
4328+
SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "2",
4329+
SQLConf.STATE_STORE_AUTO_SNAPSHOT_REPAIR_ENABLED.key -> true.toString,
4330+
SQLConf.STATE_STORE_AUTO_SNAPSHOT_REPAIR_NUM_FAILURES_BEFORE_ACTIVATING.key -> "1",
4331+
SQLConf.STATE_STORE_AUTO_SNAPSHOT_REPAIR_MAX_CHANGE_FILE_REPLAY.key -> "5"
4332+
) {
4333+
withTempDir { dir =>
4334+
val remoteDir = dir.getCanonicalPath
4335+
val versionToUniqueId = mutable.Map[Long, String]()
4336+
4337+
// Build 4 versions with snapshots at 2 and 4
4338+
withDB(remoteDir, enableStateStoreCheckpointIds = true,
4339+
versionToUniqueId = versionToUniqueId) { db =>
4340+
db.load(0)
4341+
db.put("a", "0")
4342+
db.commit()
4343+
4344+
db.load(1)
4345+
db.put("b", "1")
4346+
db.commit()
4347+
db.doMaintenance() // snapshot at 2
4348+
4349+
db.load(2)
4350+
db.put("c", "2")
4351+
db.commit()
4352+
4353+
db.load(3)
4354+
db.put("d", "3")
4355+
db.commit()
4356+
db.doMaintenance() // snapshot at 4
4357+
}
4358+
4359+
def corruptFile(file: File): Unit =
4360+
new PrintWriter(file) { close() }
4361+
4362+
// Corrupt snapshot at version 4
4363+
val uuid4 = versionToUniqueId(4)
4364+
corruptFile(new File(remoteDir, s"4_${uuid4}.zip"))
4365+
4366+
// Auto-repair loads version 4 from snapshot 2 + replay, then commit version 5
4367+
withDB(remoteDir, enableStateStoreCheckpointIds = true,
4368+
versionToUniqueId = versionToUniqueId) { db =>
4369+
db.load(4)
4370+
assert(toStr(db.get("d")) == "3")
4371+
db.put("e", "4")
4372+
db.commit()
4373+
assert(db.metricsOpt.get.numSnapshotsAutoRepaired == 1)
4374+
db.doMaintenance() // upload snapshot at version 5
4375+
}
4376+
4377+
// Reload version 5 in a fresh DB instance - verifies the lineage chain is intact
4378+
withDB(remoteDir, enableStateStoreCheckpointIds = true,
4379+
versionToUniqueId = versionToUniqueId) { db =>
4380+
db.load(5)
4381+
assert(toStr(db.get("a")) == "0")
4382+
assert(toStr(db.get("b")) == "1")
4383+
assert(toStr(db.get("c")) == "2")
4384+
assert(toStr(db.get("d")) == "3")
4385+
assert(toStr(db.get("e")) == "4")
4386+
4387+
// Commit another version to verify the chain continues
4388+
db.put("f", "5")
4389+
db.commit()
4390+
assert(db.metricsOpt.get.numSnapshotsAutoRepaired == 0)
4391+
}
4392+
4393+
// Reload version 6 to verify continued integrity
4394+
withDB(remoteDir, enableStateStoreCheckpointIds = true,
4395+
versionToUniqueId = versionToUniqueId) { db =>
4396+
db.load(6)
4397+
assert(toStr(db.get("f")) == "5")
4398+
db.commit()
4399+
assert(db.metricsOpt.get.numSnapshotsAutoRepaired == 0)
4400+
}
4401+
}
4402+
}
4403+
}
4404+
4405+
testWithChangelogCheckpointingEnabled(
4406+
"V2 auto-repair with no snapshots falls back to version 0 + full replay") {
4407+
withSQLConf(
4408+
SQLConf.STREAMING_CHECKPOINT_FILE_CHECKSUM_ENABLED.key -> false.toString,
4409+
// Set very high so no snapshots are ever created
4410+
SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "100",
4411+
SQLConf.STATE_STORE_AUTO_SNAPSHOT_REPAIR_ENABLED.key -> true.toString,
4412+
SQLConf.STATE_STORE_AUTO_SNAPSHOT_REPAIR_NUM_FAILURES_BEFORE_ACTIVATING.key -> "1",
4413+
SQLConf.STATE_STORE_AUTO_SNAPSHOT_REPAIR_MAX_CHANGE_FILE_REPLAY.key -> "10"
4414+
) {
4415+
withTempDir { dir =>
4416+
val remoteDir = dir.getCanonicalPath
4417+
val versionToUniqueId = mutable.Map[Long, String]()
4418+
4419+
// Build 4 versions with only changelogs (no snapshots due to high minDeltas)
4420+
withDB(remoteDir, enableStateStoreCheckpointIds = true,
4421+
versionToUniqueId = versionToUniqueId) { db =>
4422+
for (i <- 0 until 4) {
4423+
db.load(i)
4424+
db.put(s"k$i", s"v$i")
4425+
db.commit()
4426+
db.doMaintenance() // no snapshot will be uploaded
4427+
}
4428+
}
4429+
4430+
// Verify no snapshot files exist
4431+
val snapshotFiles = dir.listFiles().filter(_.getName.endsWith(".zip"))
4432+
assert(snapshotFiles.isEmpty, "Expected no snapshot files")
4433+
4434+
// Loading should succeed via version 0 + full changelog replay.
4435+
// getEligibleSnapshots returns empty (no snapshots in lineage),
4436+
// AutoSnapshotLoader appends 0L, so we load version 0 and replay all changelogs.
4437+
withDB(remoteDir, enableStateStoreCheckpointIds = true,
4438+
versionToUniqueId = versionToUniqueId) { db =>
4439+
db.load(4)
4440+
for (i <- 0 until 4) {
4441+
assert(toStr(db.get(s"k$i")) == s"v$i")
4442+
}
4443+
db.commit()
4444+
// No auto-repair since version 0 is the first eligible and succeeds
4445+
assert(db.metricsOpt.get.numSnapshotsAutoRepaired == 0)
4446+
}
4447+
}
4448+
}
4449+
}
4450+
4451+
testWithChangelogCheckpointingEnabled(
4452+
"V2 auto-repair with getFullLineage failure falls back using sparse lineage") {
4453+
withSQLConf(
4454+
SQLConf.STREAMING_CHECKPOINT_FILE_CHECKSUM_ENABLED.key -> false.toString,
4455+
SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "2",
4456+
SQLConf.STATE_STORE_AUTO_SNAPSHOT_REPAIR_ENABLED.key -> true.toString,
4457+
SQLConf.STATE_STORE_AUTO_SNAPSHOT_REPAIR_NUM_FAILURES_BEFORE_ACTIVATING.key -> "1",
4458+
SQLConf.STATE_STORE_AUTO_SNAPSHOT_REPAIR_MAX_CHANGE_FILE_REPLAY.key -> "10"
4459+
) {
4460+
withTempDir { dir =>
4461+
val remoteDir = dir.getCanonicalPath
4462+
val versionToUniqueId = mutable.Map[Long, String]()
4463+
4464+
// Build 4 versions with snapshots at 2 and 4
4465+
withDB(remoteDir, enableStateStoreCheckpointIds = true,
4466+
versionToUniqueId = versionToUniqueId) { db =>
4467+
db.load(0)
4468+
db.put("a", "0")
4469+
db.commit()
4470+
4471+
db.load(1)
4472+
db.put("b", "1")
4473+
db.commit()
4474+
db.doMaintenance() // snapshot at 2
4475+
4476+
db.load(2)
4477+
db.put("c", "2")
4478+
db.commit()
4479+
4480+
db.load(3)
4481+
db.put("d", "3")
4482+
db.commit()
4483+
db.doMaintenance() // snapshot at 4
4484+
}
4485+
4486+
def corruptFile(file: File): Unit =
4487+
new PrintWriter(file) { close() }
4488+
4489+
// Corrupt snapshot at version 4 (to trigger auto-repair)
4490+
val uuid4 = versionToUniqueId(4)
4491+
corruptFile(new File(remoteDir, s"4_${uuid4}.zip"))
4492+
4493+
// Also corrupt changelog at version 2 to break getFullLineage's backward walk.
4494+
// getFullLineage reads changelogs from version 4 backward; corrupting version 2's
4495+
// changelog means the backward walk will fail. The NonFatal catch should handle
4496+
// this gracefully and repair using the sparse lineage (just version 4).
4497+
// With sparse lineage, the only fallback is version 0 + full replay.
4498+
val uuid2 = versionToUniqueId(2)
4499+
corruptFile(new File(remoteDir, s"2_${uuid2}.changelog"))
4500+
4501+
// Auto-repair should still succeed by catching the getFullLineage failure
4502+
// and falling back to version 0 + replay from whatever changelogs are available.
4503+
// Note: version 2's snapshot is still intact, but it's not discoverable
4504+
// because getFullLineage failed to enrich the sparse lineage.
4505+
// Version 0 is always available as a fallback. Changelog replay from version 0
4506+
// uses changelogs 1, 3, 4 (changelog 2 is corrupt but we don't cross it since
4507+
// version 2's data is already included in changelog 1 and 3's cumulative state).
4508+
// Actually, replay from version 0 needs ALL changelogs 1-4, so if changelog 2
4509+
// is corrupt, version 0 fallback also fails. But version 2's snapshot is intact
4510+
// and if the sparse lineage includes it, we can load from snapshot 2.
4511+
// Since the sparse lineage only has version 4, and getFullLineage failed,
4512+
// the only fallback is version 0 + full replay which needs all changelogs.
4513+
// Changelog 2 is corrupt, so this path also fails.
4514+
// The test verifies getFullLineage failure is handled gracefully (logged, not thrown).
4515+
withDB(remoteDir, enableStateStoreCheckpointIds = true,
4516+
versionToUniqueId = versionToUniqueId) { db =>
4517+
// All repair paths fail: snapshot 4 corrupt, getFullLineage fails,
4518+
// sparse lineage only knows version 4, version 0 + full replay fails
4519+
// because changelog 2 is corrupt.
4520+
intercept[StateStoreAutoSnapshotRepairFailed] {
4521+
db.load(4)
4522+
}
4523+
}
4524+
}
4525+
}
4526+
}
4527+
42594528
testWithChangelogCheckpointingEnabled("SPARK-51922 - Changelog writer v1 with large key" +
42604529
" does not cause UTFDataFormatException") {
42614530
val remoteDir = Utils.createTempDir()

0 commit comments

Comments
 (0)