diff --git a/NEXT_CHANGELOG.md b/NEXT_CHANGELOG.md index f023b573438..0156a189e61 100644 --- a/NEXT_CHANGELOG.md +++ b/NEXT_CHANGELOG.md @@ -14,6 +14,7 @@ * Bundle variable references now accept Unicode letters in path segments (e.g. `${var.变量}`). ([#5532](https://github.com/databricks/cli/pull/5532)) * Ignore remote changes for vector search direct_access_index_spec.schema_json to prevent drift when the backend normalizes the schema ([#5481](https://github.com/databricks/cli/pull/5481)). * Remove hidden, never-functional `--existing-dashboard-id`, `--existing-dashboard-path`, `--existing-alert-id`, and `--existing-genie-space-id` alias flags from `bundle generate`; use the documented `--existing-id` / `--existing-path` flags instead ([#5591](https://github.com/databricks/cli/pull/5591)). +* engine/direct: Fix WAL corruption after two consecutive failed deploys ([#5557](https://github.com/databricks/cli/issues/5557)). ### Dependency updates diff --git a/acceptance/bundle/deploy/wal/two-crashed-deploys/databricks.yml b/acceptance/bundle/deploy/wal/two-crashed-deploys/databricks.yml new file mode 100644 index 00000000000..3d65ac2bfca --- /dev/null +++ b/acceptance/bundle/deploy/wal/two-crashed-deploys/databricks.yml @@ -0,0 +1,14 @@ +bundle: + name: wal-two-crashed-deploys + +resources: + jobs: + test_job: + name: "test-job" + tasks: + - task_key: "test-task" + spark_python_task: + python_file: ./test.py + new_cluster: + spark_version: 15.4.x-scala2.12 + node_type_id: i3.xlarge diff --git a/acceptance/bundle/deploy/wal/two-crashed-deploys/out.test.toml b/acceptance/bundle/deploy/wal/two-crashed-deploys/out.test.toml new file mode 100644 index 00000000000..e90b6d5d1ba --- /dev/null +++ b/acceptance/bundle/deploy/wal/two-crashed-deploys/out.test.toml @@ -0,0 +1,3 @@ +Local = true +Cloud = false +EnvMatrix.DATABRICKS_BUNDLE_ENGINE = ["direct"] diff --git a/acceptance/bundle/deploy/wal/two-crashed-deploys/output.txt b/acceptance/bundle/deploy/wal/two-crashed-deploys/output.txt new file mode 100644 index 00000000000..1c70b4ae2ba --- /dev/null +++ b/acceptance/bundle/deploy/wal/two-crashed-deploys/output.txt @@ -0,0 +1,39 @@ + +=== First deploy (killed before recording the job, leaves a header-only WAL) +>>> errcode [CLI] bundle deploy +Uploading bundle files to /Workspace/Users/[USERNAME]/.bundle/wal-two-crashed-deploys/default/files... +Deploying resources... +[PROCESS_KILLED] + +Exit code: [KILLED] + +>>> cat .databricks/bundle/default/resources.json.wal +{"state_version":2,"cli_version":"[DEV_VERSION]","lineage":"[UUID]","serial":1} + +=== Second deploy (killed again, leaves another header-only WAL) +>>> errcode [CLI] bundle deploy --force-lock +Uploading bundle files to /Workspace/Users/[USERNAME]/.bundle/wal-two-crashed-deploys/default/files... +Deploying resources... +[PROCESS_KILLED] + +Exit code: [KILLED] + +>>> cat .databricks/bundle/default/resources.json.wal +{"state_version":2,"cli_version":"[DEV_VERSION]","lineage":"[UUID]","serial":1} + +=== Third deploy (must recover and succeed, not blocked by the leftover WAL) +>>> errcode [CLI] bundle deploy --force-lock +Uploading bundle files to /Workspace/Users/[USERNAME]/.bundle/wal-two-crashed-deploys/default/files... +Deploying resources... +Updating deployment state... +Deployment complete! + +>>> errcode assert_not_exists.py .databricks/bundle/default/resources.json.wal + +>>> errcode cat .databricks/bundle/default/resources.json +{ + "serial": 1, + "state_keys": [ + "resources.jobs.test_job" + ] +} diff --git a/acceptance/bundle/deploy/wal/two-crashed-deploys/script b/acceptance/bundle/deploy/wal/two-crashed-deploys/script new file mode 100644 index 00000000000..89da7e1a277 --- /dev/null +++ b/acceptance/bundle/deploy/wal/two-crashed-deploys/script @@ -0,0 +1,22 @@ +# Two consecutive deploys are killed mid-apply, after UpgradeToWrite has written +# the WAL header but before Finalize runs (killed on the jobs/create call, before +# the job's state is recorded). A kill cannot be prevented by bailing out early, +# so each crash leaves a header-only WAL behind. Recovery must discard those +# no-op WALs without advancing the serial; otherwise the second crash would write +# its WAL header two serials ahead of the committed state and block every later +# command. Regression test for the dstate recovery fix. +kill_after.py "POST /api/2.2/jobs/create" 0 2 + +title "First deploy (killed before recording the job, leaves a header-only WAL)" +trace errcode $CLI bundle deploy +trace cat .databricks/bundle/default/resources.json.wal + +title "Second deploy (killed again, leaves another header-only WAL)" +trace errcode $CLI bundle deploy --force-lock +trace cat .databricks/bundle/default/resources.json.wal + +title "Third deploy (must recover and succeed, not blocked by the leftover WAL)" +trace errcode $CLI bundle deploy --force-lock + +trace errcode assert_not_exists.py .databricks/bundle/default/resources.json.wal +trace errcode cat .databricks/bundle/default/resources.json | jq -S '{serial: .serial, state_keys: (.state | keys)}' diff --git a/acceptance/bundle/deploy/wal/two-crashed-deploys/test.py b/acceptance/bundle/deploy/wal/two-crashed-deploys/test.py new file mode 100644 index 00000000000..1ff8e07c707 --- /dev/null +++ b/acceptance/bundle/deploy/wal/two-crashed-deploys/test.py @@ -0,0 +1 @@ +print("test") diff --git a/acceptance/bundle/deploy/wal/two-failed-deploys/databricks.yml b/acceptance/bundle/deploy/wal/two-failed-deploys/databricks.yml new file mode 100644 index 00000000000..942927ad95d --- /dev/null +++ b/acceptance/bundle/deploy/wal/two-failed-deploys/databricks.yml @@ -0,0 +1,14 @@ +bundle: + name: wal-two-failed-deploys + +resources: + jobs: + test_job: + name: "test-job" + tasks: + - task_key: "test-task" + spark_python_task: + python_file: ./test.py + new_cluster: + spark_version: 15.4.x-scala2.12 + node_type_id: i3.xlarge diff --git a/acceptance/bundle/deploy/wal/two-failed-deploys/out.test.toml b/acceptance/bundle/deploy/wal/two-failed-deploys/out.test.toml new file mode 100644 index 00000000000..e90b6d5d1ba --- /dev/null +++ b/acceptance/bundle/deploy/wal/two-failed-deploys/out.test.toml @@ -0,0 +1,3 @@ +Local = true +Cloud = false +EnvMatrix.DATABRICKS_BUNDLE_ENGINE = ["direct"] diff --git a/acceptance/bundle/deploy/wal/two-failed-deploys/output.txt b/acceptance/bundle/deploy/wal/two-failed-deploys/output.txt new file mode 100644 index 00000000000..3fec3f807a7 --- /dev/null +++ b/acceptance/bundle/deploy/wal/two-failed-deploys/output.txt @@ -0,0 +1,48 @@ + +=== Deploy 1 (normal: creates the job and the committed state) +>>> [CLI] bundle deploy +Uploading bundle files to /Workspace/Users/[USERNAME]/.bundle/wal-two-failed-deploys/default/files... +Deploying resources... +Updating deployment state... +Deployment complete! + +=== Deploy 2 (planning fails, must not leave a WAL) +>>> errcode [CLI] bundle deploy +Uploading bundle files to /Workspace/Users/[USERNAME]/.bundle/wal-two-failed-deploys/default/files... +Error: cannot plan resources.jobs.test_job: reading id="[NUMID]": Fault injected by test. (403 INJECTED) + +Endpoint: GET [DATABRICKS_URL]/api/2.2/jobs/get?job_id=[NUMID] +HTTP Status: 403 Forbidden +API error_code: INJECTED +API message: Fault injected by test. + +Error: planning failed + + +Exit code: 1 + +>>> assert_not_exists.py .databricks/bundle/default/resources.json.wal + +=== Deploy 3 (planning fails again, must not leave a WAL) +>>> errcode [CLI] bundle deploy +Uploading bundle files to /Workspace/Users/[USERNAME]/.bundle/wal-two-failed-deploys/default/files... +Error: cannot plan resources.jobs.test_job: reading id="[NUMID]": Fault injected by test. (403 INJECTED) + +Endpoint: GET [DATABRICKS_URL]/api/2.2/jobs/get?job_id=[NUMID] +HTTP Status: 403 Forbidden +API error_code: INJECTED +API message: Fault injected by test. + +Error: planning failed + + +Exit code: 1 + +>>> assert_not_exists.py .databricks/bundle/default/resources.json.wal + +=== Deploy 4 (fault expired: recovers and succeeds) +>>> [CLI] bundle deploy +Uploading bundle files to /Workspace/Users/[USERNAME]/.bundle/wal-two-failed-deploys/default/files... +Deploying resources... +Updating deployment state... +Deployment complete! diff --git a/acceptance/bundle/deploy/wal/two-failed-deploys/script b/acceptance/bundle/deploy/wal/two-failed-deploys/script new file mode 100644 index 00000000000..b4fd4878627 --- /dev/null +++ b/acceptance/bundle/deploy/wal/two-failed-deploys/script @@ -0,0 +1,29 @@ +# A failed plan must not leave a write-ahead log behind, so repeated planning +# failures never block a later, healthy deploy. Previously a failed plan still +# opened the WAL for write (UpgradeToWrite) and returned without finalizing, +# leaving a header-only WAL; after two failures the WAL serial drifted two ahead +# of the committed serial and every later command failed WAL recovery until the +# WAL was deleted by hand. +# +# A first deploy creates the job normally. An injected fault then makes the next +# two deploys fail while planning (planning refreshes the existing job via +# jobs/get). The final deploy, with the fault expired, must recover and succeed. +# A non-retried 403 is used so the failure is immediate; a 5xx would be retried +# with backoff. + +title "Deploy 1 (normal: creates the job and the committed state)" +trace $CLI bundle deploy + +# Fail the plan-stage refresh GET for the next two deploys only. +fault.py "GET /api/2.2/jobs/get" 403 0 2 + +title "Deploy 2 (planning fails, must not leave a WAL)" +trace errcode $CLI bundle deploy +trace assert_not_exists.py .databricks/bundle/default/resources.json.wal + +title "Deploy 3 (planning fails again, must not leave a WAL)" +trace errcode $CLI bundle deploy +trace assert_not_exists.py .databricks/bundle/default/resources.json.wal + +title "Deploy 4 (fault expired: recovers and succeeds)" +trace $CLI bundle deploy diff --git a/acceptance/bundle/deploy/wal/two-failed-deploys/test.py b/acceptance/bundle/deploy/wal/two-failed-deploys/test.py new file mode 100644 index 00000000000..1ff8e07c707 --- /dev/null +++ b/acceptance/bundle/deploy/wal/two-failed-deploys/test.py @@ -0,0 +1 @@ +print("test") diff --git a/bundle/direct/dstate/state.go b/bundle/direct/dstate/state.go index 54505677663..dff484ca30c 100644 --- a/bundle/direct/dstate/state.go +++ b/bundle/direct/dstate/state.go @@ -286,6 +286,7 @@ func (db *DeploymentState) mergeWalIntoState(ctx context.Context) (bool, error) scanner.Buffer(make([]byte, 0, initialBufferSize), maxWalEntrySize) lineNumber := 0 var corruptedLines [][]byte + var newSerial int for scanner.Scan() { lineNumber++ @@ -309,7 +310,7 @@ func (db *DeploymentState) mergeWalIntoState(ctx context.Context) (bool, error) if header.Serial > expectedSerial { return false, fmt.Errorf("WAL serial (%d) is ahead of expected (%d), state may be corrupted", header.Serial, expectedSerial) } - db.Data.Serial = expectedSerial + newSerial = header.Serial } else { var entry WALEntry if err := json.Unmarshal(line, &entry); err != nil { @@ -344,7 +345,19 @@ func (db *DeploymentState) mergeWalIntoState(ctx context.Context) (bool, error) } } - return lineNumber > 1, nil + hasEntries := lineNumber > 1 + + // Only advance the serial when the WAL carried entries, because the caller + // (replayWAL) persists the new state file only in that case. A header-only + // WAL is a deploy that started but committed nothing; advancing the serial + // for it leaves the in-memory serial ahead of the persisted one, so the + // next deploy writes its WAL header at serial+2 and recovery rejects it as + // "ahead of expected". See acceptance/bundle/deploy/wal/two-crashed-deploys. + if hasEntries { + db.Data.Serial = newSerial + } + + return hasEntries, nil } // Finalize replays the WAL (if open for write), captures the resulting state, and resets. diff --git a/bundle/direct/dstate/state_test.go b/bundle/direct/dstate/state_test.go index bbfd2559951..b2d13c0a6c7 100644 --- a/bundle/direct/dstate/state_test.go +++ b/bundle/direct/dstate/state_test.go @@ -1,6 +1,7 @@ package dstate import ( + "encoding/json" "os" "path/filepath" "testing" @@ -55,6 +56,40 @@ func TestPanicOnDoubleOpen(t *testing.T) { mustFinalize(t, &db) } +func TestHeaderOnlyWALRecoveryDoesNotAdvanceSerial(t *testing.T) { + path := filepath.Join(t.TempDir(), "state.json") + walPath := path + walSuffix + + // Commit serial 1 with one resource. + var db DeploymentState + require.NoError(t, db.Open(t.Context(), path, WithRecovery(true), WithWrite(true))) + require.NoError(t, db.SaveState("jobs.my_job", "123", map[string]string{}, nil)) + mustFinalize(t, &db) + + var committed DeploymentState + require.NoError(t, committed.Open(t.Context(), path, WithRecovery(false), WithWrite(false))) + lineage := committed.Data.Lineage + require.Equal(t, 1, committed.Data.Serial) + mustFinalize(t, &committed) + + // A deploy that opens the WAL for write but commits nothing (e.g. planning + // fails after UpgradeToWrite) leaves a header-only WAL behind, here at the + // expected serial 2. Recovering it must not advance the serial past the + // committed 1, otherwise a second such failed deploy would write its header + // at serial 3 and be rejected as ahead of the committed state. + header := Header{Lineage: lineage, Serial: 2, StateVersion: currentStateVersion} + headerLine, err := json.Marshal(header) + require.NoError(t, err) + require.NoError(t, os.WriteFile(walPath, append(headerLine, '\n'), 0o600)) + + var recovered DeploymentState + require.NoError(t, recovered.Open(t.Context(), path, WithRecovery(true), WithWrite(false))) + assert.Equal(t, 1, recovered.Data.Serial) + assert.Equal(t, "123", recovered.GetResourceID("jobs.my_job")) + assert.NoFileExists(t, walPath) + mustFinalize(t, &recovered) +} + func TestDeleteState(t *testing.T) { path := filepath.Join(t.TempDir(), "state.json") diff --git a/bundle/phases/deploy.go b/bundle/phases/deploy.go index 8518230770a..e3b0b777eb4 100644 --- a/bundle/phases/deploy.go +++ b/bundle/phases/deploy.go @@ -170,6 +170,13 @@ func Deploy(ctx context.Context, b *bundle.Bundle, outputHandler sync.OutputHand plan = RunPlan(ctx, b, engine) } + // Stop before opening the WAL for write if planning failed. UpgradeToWrite + // writes a WAL header that only deployCore's Finalize commits or discards; + // returning past it without finalizing leaves a header-only WAL behind. + if logdiag.HasError(ctx) { + return + } + if engine.IsDirect() { // Upgrade from read (opened by process.go) to write mode if err := b.DeploymentBundle.StateDB.UpgradeToWrite(); err != nil { @@ -187,6 +194,9 @@ func Deploy(ctx context.Context, b *bundle.Bundle, outputHandler sync.OutputHand } } + // InitForApply receives ctx and could log a diagnostic without returning an + // error, so re-check before deploying. (UpgradeToWrite above takes no ctx and + // thus cannot log, so the earlier check is enough to guard the WAL open.) if logdiag.HasError(ctx) { return }