Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions NEXT_CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
* Bundle variable references now accept Unicode letters in path segments (e.g. `${var.变量}`). ([#5532](https://github.com/databricks/cli/pull/5532))
* Ignore remote changes for vector search direct_access_index_spec.schema_json to prevent drift when the backend normalizes the schema ([#5481](https://github.com/databricks/cli/pull/5481)).
* Remove hidden, never-functional `--existing-dashboard-id`, `--existing-dashboard-path`, `--existing-alert-id`, and `--existing-genie-space-id` alias flags from `bundle generate`; use the documented `--existing-id` / `--existing-path` flags instead ([#5591](https://github.com/databricks/cli/pull/5591)).
* engine/direct: Fix WAL corruption after two consecutive failed deploys ([#5557](https://github.com/databricks/cli/issues/5557)).

### Dependency updates

Expand Down
14 changes: 14 additions & 0 deletions acceptance/bundle/deploy/wal/header-only-wal/databricks.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
bundle:
name: test-bundle

resources:
jobs:
test_job:
name: "test-job"
tasks:
- task_key: "test-task"
spark_python_task:
python_file: ./test.py
new_cluster:
spark_version: 15.4.x-scala2.12
node_type_id: i3.xlarge
3 changes: 3 additions & 0 deletions acceptance/bundle/deploy/wal/header-only-wal/out.test.toml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

39 changes: 39 additions & 0 deletions acceptance/bundle/deploy/wal/header-only-wal/output.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@

=== First deploy (killed before recording the job, leaves a header-only WAL)
>>> errcode [CLI] bundle deploy
Uploading bundle files to /Workspace/Users/[USERNAME]/.bundle/test-bundle/default/files...
Deploying resources...
[PROCESS_KILLED]

Exit code: [KILLED]

>>> cat .databricks/bundle/default/resources.json.wal
{"state_version":2,"cli_version":"[DEV_VERSION]","lineage":"[UUID]","serial":1}

=== Second deploy (killed again, leaves another header-only WAL)
Comment thread
denik marked this conversation as resolved.
>>> errcode [CLI] bundle deploy --force-lock
Uploading bundle files to /Workspace/Users/[USERNAME]/.bundle/test-bundle/default/files...
Deploying resources...
[PROCESS_KILLED]

Exit code: [KILLED]

>>> cat .databricks/bundle/default/resources.json.wal
{"state_version":2,"cli_version":"[DEV_VERSION]","lineage":"[UUID]","serial":1}

=== Third deploy (must recover and succeed, not blocked by the leftover WAL)
>>> errcode [CLI] bundle deploy --force-lock
Uploading bundle files to /Workspace/Users/[USERNAME]/.bundle/test-bundle/default/files...
Deploying resources...
Updating deployment state...
Deployment complete!

>>> errcode assert_not_exists.py .databricks/bundle/default/resources.json.wal

>>> errcode cat .databricks/bundle/default/resources.json
{
"serial": 1,
"state_keys": [
"resources.jobs.test_job"
]
}
22 changes: 22 additions & 0 deletions acceptance/bundle/deploy/wal/header-only-wal/script
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# Two consecutive deploys are killed mid-apply, after UpgradeToWrite has written
# the WAL header but before Finalize runs (killed on the jobs/create call, before
# the job's state is recorded). A kill cannot be prevented by bailing out early,
# so each crash leaves a header-only WAL behind. Recovery must discard those
# no-op WALs without advancing the serial; otherwise the second crash would write
# its WAL header two serials ahead of the committed state and block every later
# command. Regression test for the dstate recovery fix.
kill_after.py "POST /api/2.2/jobs/create" 0 2

title "First deploy (killed before recording the job, leaves a header-only WAL)"
trace errcode $CLI bundle deploy
trace cat .databricks/bundle/default/resources.json.wal

title "Second deploy (killed again, leaves another header-only WAL)"
trace errcode $CLI bundle deploy --force-lock
trace cat .databricks/bundle/default/resources.json.wal

title "Third deploy (must recover and succeed, not blocked by the leftover WAL)"
trace errcode $CLI bundle deploy --force-lock

trace errcode assert_not_exists.py .databricks/bundle/default/resources.json.wal
trace errcode cat .databricks/bundle/default/resources.json | jq -S '{serial: .serial, state_keys: (.state | keys)}'
1 change: 1 addition & 0 deletions acceptance/bundle/deploy/wal/header-only-wal/test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
print("test")
17 changes: 15 additions & 2 deletions bundle/direct/dstate/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,7 @@ func (db *DeploymentState) mergeWalIntoState(ctx context.Context) (bool, error)
scanner.Buffer(make([]byte, 0, initialBufferSize), maxWalEntrySize)
lineNumber := 0
var corruptedLines [][]byte
var newSerial int

for scanner.Scan() {
lineNumber++
Expand All @@ -309,7 +310,7 @@ func (db *DeploymentState) mergeWalIntoState(ctx context.Context) (bool, error)
if header.Serial > expectedSerial {
return false, fmt.Errorf("WAL serial (%d) is ahead of expected (%d), state may be corrupted", header.Serial, expectedSerial)
}
db.Data.Serial = expectedSerial
newSerial = header.Serial
} else {
var entry WALEntry
if err := json.Unmarshal(line, &entry); err != nil {
Expand Down Expand Up @@ -344,7 +345,19 @@ func (db *DeploymentState) mergeWalIntoState(ctx context.Context) (bool, error)
}
}

return lineNumber > 1, nil
hasEntries := lineNumber > 1

// Only advance the serial when the WAL carried entries, because the caller
// (replayWAL) persists the new state file only in that case. A header-only
// WAL is a deploy that started but committed nothing; advancing the serial
// for it leaves the in-memory serial ahead of the persisted one, so the
// next deploy writes its WAL header at serial+2 and recovery rejects it as
// "ahead of expected". See acceptance/bundle/deploy/wal/header-only-wal.
if hasEntries {
db.Data.Serial = newSerial
}

return hasEntries, nil
}

// Finalize replays the WAL (if open for write), captures the resulting state, and resets.
Expand Down
35 changes: 35 additions & 0 deletions bundle/direct/dstate/state_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package dstate

import (
"encoding/json"
"os"
"path/filepath"
"testing"
Expand Down Expand Up @@ -55,6 +56,40 @@ func TestPanicOnDoubleOpen(t *testing.T) {
mustFinalize(t, &db)
}

func TestHeaderOnlyWALRecoveryDoesNotAdvanceSerial(t *testing.T) {
path := filepath.Join(t.TempDir(), "state.json")
walPath := path + walSuffix

// Commit serial 1 with one resource.
var db DeploymentState
require.NoError(t, db.Open(t.Context(), path, WithRecovery(true), WithWrite(true)))
require.NoError(t, db.SaveState("jobs.my_job", "123", map[string]string{}, nil))
mustFinalize(t, &db)

var committed DeploymentState
require.NoError(t, committed.Open(t.Context(), path, WithRecovery(false), WithWrite(false)))
lineage := committed.Data.Lineage
require.Equal(t, 1, committed.Data.Serial)
mustFinalize(t, &committed)

// A deploy that opens the WAL for write but commits nothing (e.g. planning
// fails after UpgradeToWrite) leaves a header-only WAL behind, here at the
// expected serial 2. Recovering it must not advance the serial past the
// committed 1, otherwise a second such failed deploy would write its header
// at serial 3 and be rejected as ahead of the committed state.
header := Header{Lineage: lineage, Serial: 2, StateVersion: currentStateVersion}
headerLine, err := json.Marshal(header)
require.NoError(t, err)
require.NoError(t, os.WriteFile(walPath, append(headerLine, '\n'), 0o600))

var recovered DeploymentState
require.NoError(t, recovered.Open(t.Context(), path, WithRecovery(true), WithWrite(false)))
assert.Equal(t, 1, recovered.Data.Serial)
assert.Equal(t, "123", recovered.GetResourceID("jobs.my_job"))
assert.NoFileExists(t, walPath)
mustFinalize(t, &recovered)
}

func TestDeleteState(t *testing.T) {
path := filepath.Join(t.TempDir(), "state.json")

Expand Down
Loading