From eb22c070a602ca8e24baeaf386a2a7f242a57162 Mon Sep 17 00:00:00 2001 From: "blink-so[bot]" <211532188+blink-so[bot]@users.noreply.github.com> Date: Fri, 27 Feb 2026 17:09:10 +0000 Subject: [PATCH 1/7] fix: move defer out of retry loop in waitForJob The defer closer.Close() was inside the for loop, which means: 1. Each retry iteration adds a new deferred close that only runs when the function returns, not at the end of each iteration. 2. On retries, multiple closers accumulate unnecessarily. Extract the loop body into a separate waitForJobOnce function so the defer executes properly at the end of each attempt. Note: In v0.0.12, this was also a nil pointer dereference because defer closer.Close() was called before checking the error from TemplateVersionLogsAfter, causing a panic when closer was nil. That ordering was fixed in v0.0.13, but the defer-in-loop issue remained. --- internal/provider/template_resource.go | 78 +++++++++++++++----------- 1 file changed, 44 insertions(+), 34 deletions(-) diff --git a/internal/provider/template_resource.go b/internal/provider/template_resource.go index 84de0f1..640bc76 100644 --- a/internal/provider/template_resource.go +++ b/internal/provider/template_resource.go @@ -1106,47 +1106,57 @@ func waitForJob(ctx context.Context, client *codersdk.Client, version *codersdk. const maxRetries = 3 var jobLogs []codersdk.ProvisionerJobLog for retries := 0; retries < maxRetries; retries++ { - logs, closer, err := client.TemplateVersionLogsAfter(ctx, version.ID, 0) + jobLogs, done, err := waitForJobOnce(ctx, client, version, jobLogs) if err != nil { - return jobLogs, fmt.Errorf("begin streaming logs: %w", err) + return jobLogs, err } - defer func() { - if err := closer.Close(); err != nil { - tflog.Warn(ctx, "error closing template version log stream", map[string]any{ - "error": err, - }) - } - }() - for { - logs, ok := <-logs - if !ok { - break - } - tflog.Info(ctx, logs.Output, map[string]interface{}{ - "job_id": logs.ID, - "job_stage": logs.Stage, - "log_source": logs.Source, - "level": logs.Level, - "created_at": logs.CreatedAt, - }) - if logs.Output != "" { - jobLogs = append(jobLogs, logs) - } + if done { + return jobLogs, nil } - latestResp, err := client.TemplateVersion(ctx, version.ID) - if err != nil { - return jobLogs, err + } + return jobLogs, fmt.Errorf("provisioner job did not complete after %d retries", maxRetries) +} + +func waitForJobOnce(ctx context.Context, client *codersdk.Client, version *codersdk.TemplateVersion, jobLogs []codersdk.ProvisionerJobLog) ([]codersdk.ProvisionerJobLog, bool, error) { + logs, closer, err := client.TemplateVersionLogsAfter(ctx, version.ID, 0) + if err != nil { + return jobLogs, false, fmt.Errorf("begin streaming logs: %w", err) + } + defer func() { + if err := closer.Close(); err != nil { + tflog.Warn(ctx, "error closing template version log stream", map[string]any{ + "error": err, + }) } - if latestResp.Job.Status.Active() { - tflog.Warn(ctx, fmt.Sprintf("provisioner job still active, continuing to wait...: %s", latestResp.Job.Status)) - continue + }() + for { + logs, ok := <-logs + if !ok { + break } - if latestResp.Job.Status != codersdk.ProvisionerJobSucceeded { - return jobLogs, fmt.Errorf("provisioner job did not succeed: %s (%s)", latestResp.Job.Status, latestResp.Job.Error) + tflog.Info(ctx, logs.Output, map[string]interface{}{ + "job_id": logs.ID, + "job_stage": logs.Stage, + "log_source": logs.Source, + "level": logs.Level, + "created_at": logs.CreatedAt, + }) + if logs.Output != "" { + jobLogs = append(jobLogs, logs) } - return jobLogs, nil } - return jobLogs, fmt.Errorf("provisioner job did not complete after %d retries", maxRetries) + latestResp, err := client.TemplateVersion(ctx, version.ID) + if err != nil { + return jobLogs, false, err + } + if latestResp.Job.Status.Active() { + tflog.Warn(ctx, fmt.Sprintf("provisioner job still active, continuing to wait...: %s", latestResp.Job.Status)) + return jobLogs, false, nil + } + if latestResp.Job.Status != codersdk.ProvisionerJobSucceeded { + return jobLogs, false, fmt.Errorf("provisioner job did not succeed: %s (%s)", latestResp.Job.Status, latestResp.Job.Error) + } + return jobLogs, true, nil } type newVersionRequest struct { From 09e3fd0a659e4718cdbb9b5af56237fd78af7476 Mon Sep 17 00:00:00 2001 From: "blink-so[bot]" <211532188+blink-so[bot]@users.noreply.github.com> Date: Fri, 27 Feb 2026 17:56:21 +0000 Subject: [PATCH 2/7] fix: address review feedback - fix shadowed variable, add tests - Fix jobLogs variable shadowing by using = instead of := - Add unit tests for waitForJobOnce and waitForJob: - TestWaitForJobOnce_Success: job completes successfully with logs - TestWaitForJobOnce_JobFailed: job fails with error message - TestWaitForJobOnce_StillActive: job still running returns done=false - TestWaitForJob_RetriesAndCloses: verifies 3 retries with separate WS connections - TestWaitForJob_SucceedsOnRetry: logs accumulate across retries --- internal/provider/template_resource.go | 4 +- internal/provider/wait_for_job_test.go | 222 +++++++++++++++++++++++++ 2 files changed, 225 insertions(+), 1 deletion(-) create mode 100644 internal/provider/wait_for_job_test.go diff --git a/internal/provider/template_resource.go b/internal/provider/template_resource.go index 640bc76..4a3d05e 100644 --- a/internal/provider/template_resource.go +++ b/internal/provider/template_resource.go @@ -1106,7 +1106,9 @@ func waitForJob(ctx context.Context, client *codersdk.Client, version *codersdk. const maxRetries = 3 var jobLogs []codersdk.ProvisionerJobLog for retries := 0; retries < maxRetries; retries++ { - jobLogs, done, err := waitForJobOnce(ctx, client, version, jobLogs) + var done bool + var err error + jobLogs, done, err = waitForJobOnce(ctx, client, version, jobLogs) if err != nil { return jobLogs, err } diff --git a/internal/provider/wait_for_job_test.go b/internal/provider/wait_for_job_test.go new file mode 100644 index 0000000..c972360 --- /dev/null +++ b/internal/provider/wait_for_job_test.go @@ -0,0 +1,222 @@ +package provider + +import ( + "context" + "encoding/json" + "net/http" + "net/http/httptest" + "net/url" + "strings" + "sync/atomic" + "testing" + + "github.com/coder/coder/v2/codersdk" + "github.com/coder/websocket" + "github.com/coder/websocket/wsjson" + "github.com/google/uuid" + "github.com/stretchr/testify/require" +) + +func TestWaitForJobOnce_Success(t *testing.T) { + t.Parallel() + versionID := uuid.New() + + handler := http.NewServeMux() + handler.HandleFunc("/api/v2/templateversions/", func(w http.ResponseWriter, r *http.Request) { + if strings.Contains(r.URL.RawQuery, "follow") { + conn, err := websocket.Accept(w, r, nil) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + ctx := r.Context() + _ = wsjson.Write(ctx, conn, codersdk.ProvisionerJobLog{ + ID: 1, + Output: "test log line", + }) + conn.Close(websocket.StatusNormalClosure, "done") + return + } + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(codersdk.TemplateVersion{ + ID: versionID, + Job: codersdk.ProvisionerJob{ + Status: codersdk.ProvisionerJobSucceeded, + }, + }) + }) + + srv := httptest.NewServer(handler) + t.Cleanup(srv.Close) + srvURL, err := url.Parse(srv.URL) + require.NoError(t, err) + client := codersdk.New(srvURL) + + version := &codersdk.TemplateVersion{ID: versionID} + logs, done, err := waitForJobOnce(context.Background(), client, version, nil) + require.NoError(t, err) + require.True(t, done) + require.Len(t, logs, 1) + require.Equal(t, "test log line", logs[0].Output) +} + +func TestWaitForJobOnce_JobFailed(t *testing.T) { + t.Parallel() + versionID := uuid.New() + + handler := http.NewServeMux() + handler.HandleFunc("/api/v2/templateversions/", func(w http.ResponseWriter, r *http.Request) { + if strings.Contains(r.URL.RawQuery, "follow") { + conn, err := websocket.Accept(w, r, nil) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + conn.Close(websocket.StatusNormalClosure, "done") + return + } + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(codersdk.TemplateVersion{ + ID: versionID, + Job: codersdk.ProvisionerJob{ + Status: codersdk.ProvisionerJobFailed, + Error: "something went wrong", + }, + }) + }) + + srv := httptest.NewServer(handler) + t.Cleanup(srv.Close) + srvURL, err := url.Parse(srv.URL) + require.NoError(t, err) + client := codersdk.New(srvURL) + + version := &codersdk.TemplateVersion{ID: versionID} + _, done, err := waitForJobOnce(context.Background(), client, version, nil) + require.Error(t, err) + require.False(t, done) + require.Contains(t, err.Error(), "provisioner job did not succeed") + require.Contains(t, err.Error(), "something went wrong") +} + +func TestWaitForJobOnce_StillActive(t *testing.T) { + t.Parallel() + versionID := uuid.New() + + handler := http.NewServeMux() + handler.HandleFunc("/api/v2/templateversions/", func(w http.ResponseWriter, r *http.Request) { + if strings.Contains(r.URL.RawQuery, "follow") { + conn, err := websocket.Accept(w, r, nil) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + conn.Close(websocket.StatusNormalClosure, "done") + return + } + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(codersdk.TemplateVersion{ + ID: versionID, + Job: codersdk.ProvisionerJob{ + Status: codersdk.ProvisionerJobRunning, + }, + }) + }) + + srv := httptest.NewServer(handler) + t.Cleanup(srv.Close) + srvURL, err := url.Parse(srv.URL) + require.NoError(t, err) + client := codersdk.New(srvURL) + + version := &codersdk.TemplateVersion{ID: versionID} + _, done, err := waitForJobOnce(context.Background(), client, version, nil) + require.NoError(t, err) + require.False(t, done) +} + +func TestWaitForJob_RetriesAndCloses(t *testing.T) { + t.Parallel() + versionID := uuid.New() + var wsConnections atomic.Int32 + + handler := http.NewServeMux() + handler.HandleFunc("/api/v2/templateversions/", func(w http.ResponseWriter, r *http.Request) { + if strings.Contains(r.URL.RawQuery, "follow") { + wsConnections.Add(1) + conn, err := websocket.Accept(w, r, nil) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + conn.Close(websocket.StatusNormalClosure, "done") + return + } + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(codersdk.TemplateVersion{ + ID: versionID, + Job: codersdk.ProvisionerJob{ + Status: codersdk.ProvisionerJobRunning, + }, + }) + }) + + srv := httptest.NewServer(handler) + t.Cleanup(srv.Close) + srvURL, err := url.Parse(srv.URL) + require.NoError(t, err) + client := codersdk.New(srvURL) + + version := &codersdk.TemplateVersion{ID: versionID} + _, err = waitForJob(context.Background(), client, version) + require.Error(t, err) + require.Contains(t, err.Error(), "did not complete after 3 retries") + require.Equal(t, int32(3), wsConnections.Load()) +} + +func TestWaitForJob_SucceedsOnRetry(t *testing.T) { + t.Parallel() + versionID := uuid.New() + var versionCallCount atomic.Int32 + + handler := http.NewServeMux() + handler.HandleFunc("/api/v2/templateversions/", func(w http.ResponseWriter, r *http.Request) { + if strings.Contains(r.URL.RawQuery, "follow") { + conn, err := websocket.Accept(w, r, nil) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + ctx := r.Context() + _ = wsjson.Write(ctx, conn, codersdk.ProvisionerJobLog{ + ID: int64(versionCallCount.Load()), + Output: "log line", + }) + conn.Close(websocket.StatusNormalClosure, "done") + return + } + count := versionCallCount.Add(1) + status := codersdk.ProvisionerJobRunning + if count >= 2 { + status = codersdk.ProvisionerJobSucceeded + } + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(codersdk.TemplateVersion{ + ID: versionID, + Job: codersdk.ProvisionerJob{ + Status: status, + }, + }) + }) + + srv := httptest.NewServer(handler) + t.Cleanup(srv.Close) + srvURL, err := url.Parse(srv.URL) + require.NoError(t, err) + client := codersdk.New(srvURL) + + version := &codersdk.TemplateVersion{ID: versionID} + logs, err := waitForJob(context.Background(), client, version) + require.NoError(t, err) + require.Len(t, logs, 2) +} From 640b1de5a9beb7edb84fa250519a818fb2e7c638 Mon Sep 17 00:00:00 2001 From: "blink-so[bot]" <211532188+blink-so[bot]@users.noreply.github.com> Date: Fri, 27 Feb 2026 17:58:28 +0000 Subject: [PATCH 3/7] fix: check conn.Close return values to satisfy errcheck linter --- internal/provider/wait_for_job_test.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/internal/provider/wait_for_job_test.go b/internal/provider/wait_for_job_test.go index c972360..335ae0c 100644 --- a/internal/provider/wait_for_job_test.go +++ b/internal/provider/wait_for_job_test.go @@ -34,7 +34,7 @@ func TestWaitForJobOnce_Success(t *testing.T) { ID: 1, Output: "test log line", }) - conn.Close(websocket.StatusNormalClosure, "done") + _ = conn.Close(websocket.StatusNormalClosure, "done") return } w.Header().Set("Content-Type", "application/json") @@ -72,7 +72,7 @@ func TestWaitForJobOnce_JobFailed(t *testing.T) { http.Error(w, err.Error(), http.StatusInternalServerError) return } - conn.Close(websocket.StatusNormalClosure, "done") + _ = conn.Close(websocket.StatusNormalClosure, "done") return } w.Header().Set("Content-Type", "application/json") @@ -111,7 +111,7 @@ func TestWaitForJobOnce_StillActive(t *testing.T) { http.Error(w, err.Error(), http.StatusInternalServerError) return } - conn.Close(websocket.StatusNormalClosure, "done") + _ = conn.Close(websocket.StatusNormalClosure, "done") return } w.Header().Set("Content-Type", "application/json") @@ -149,7 +149,7 @@ func TestWaitForJob_RetriesAndCloses(t *testing.T) { http.Error(w, err.Error(), http.StatusInternalServerError) return } - conn.Close(websocket.StatusNormalClosure, "done") + _ = conn.Close(websocket.StatusNormalClosure, "done") return } w.Header().Set("Content-Type", "application/json") @@ -192,7 +192,7 @@ func TestWaitForJob_SucceedsOnRetry(t *testing.T) { ID: int64(versionCallCount.Load()), Output: "log line", }) - conn.Close(websocket.StatusNormalClosure, "done") + _ = conn.Close(websocket.StatusNormalClosure, "done") return } count := versionCallCount.Add(1) From de6f5d7bf2ef94797e31e87d95cd9aeb09730a13 Mon Sep 17 00:00:00 2001 From: "blink-so[bot]" <211532188+blink-so[bot]@users.noreply.github.com> Date: Fri, 27 Feb 2026 18:22:30 +0000 Subject: [PATCH 4/7] refactor: address review feedback - simplify waitForJobOnce signature, add regression test - Remove jobLogs parameter from waitForJobOnce; each call returns its own logs and the caller accumulates them via append - Remove unnecessary var declarations in waitForJob loop - Add TestWaitForJob_ClosesConnectionBetweenRetries that verifies connections are closed between retries (max concurrent connections = 1), which would fail with the defer-in-loop pattern --- internal/provider/template_resource.go | 18 ++++---- internal/provider/wait_for_job_test.go | 57 ++++++++++++++++++++++++-- 2 files changed, 63 insertions(+), 12 deletions(-) diff --git a/internal/provider/template_resource.go b/internal/provider/template_resource.go index 4a3d05e..83edfcb 100644 --- a/internal/provider/template_resource.go +++ b/internal/provider/template_resource.go @@ -1104,25 +1104,24 @@ func uploadDirectory(ctx context.Context, client *codersdk.Client, logger slog.L func waitForJob(ctx context.Context, client *codersdk.Client, version *codersdk.TemplateVersion) ([]codersdk.ProvisionerJobLog, error) { const maxRetries = 3 - var jobLogs []codersdk.ProvisionerJobLog + var allLogs []codersdk.ProvisionerJobLog for retries := 0; retries < maxRetries; retries++ { - var done bool - var err error - jobLogs, done, err = waitForJobOnce(ctx, client, version, jobLogs) + logs, done, err := waitForJobOnce(ctx, client, version) + allLogs = append(allLogs, logs...) if err != nil { - return jobLogs, err + return allLogs, err } if done { - return jobLogs, nil + return allLogs, nil } } - return jobLogs, fmt.Errorf("provisioner job did not complete after %d retries", maxRetries) + return allLogs, fmt.Errorf("provisioner job did not complete after %d retries", maxRetries) } -func waitForJobOnce(ctx context.Context, client *codersdk.Client, version *codersdk.TemplateVersion, jobLogs []codersdk.ProvisionerJobLog) ([]codersdk.ProvisionerJobLog, bool, error) { +func waitForJobOnce(ctx context.Context, client *codersdk.Client, version *codersdk.TemplateVersion) ([]codersdk.ProvisionerJobLog, bool, error) { logs, closer, err := client.TemplateVersionLogsAfter(ctx, version.ID, 0) if err != nil { - return jobLogs, false, fmt.Errorf("begin streaming logs: %w", err) + return nil, false, fmt.Errorf("begin streaming logs: %w", err) } defer func() { if err := closer.Close(); err != nil { @@ -1131,6 +1130,7 @@ func waitForJobOnce(ctx context.Context, client *codersdk.Client, version *coder }) } }() + var jobLogs []codersdk.ProvisionerJobLog for { logs, ok := <-logs if !ok { diff --git a/internal/provider/wait_for_job_test.go b/internal/provider/wait_for_job_test.go index 335ae0c..29f4fec 100644 --- a/internal/provider/wait_for_job_test.go +++ b/internal/provider/wait_for_job_test.go @@ -53,7 +53,7 @@ func TestWaitForJobOnce_Success(t *testing.T) { client := codersdk.New(srvURL) version := &codersdk.TemplateVersion{ID: versionID} - logs, done, err := waitForJobOnce(context.Background(), client, version, nil) + logs, done, err := waitForJobOnce(context.Background(), client, version) require.NoError(t, err) require.True(t, done) require.Len(t, logs, 1) @@ -92,7 +92,7 @@ func TestWaitForJobOnce_JobFailed(t *testing.T) { client := codersdk.New(srvURL) version := &codersdk.TemplateVersion{ID: versionID} - _, done, err := waitForJobOnce(context.Background(), client, version, nil) + _, done, err := waitForJobOnce(context.Background(), client, version) require.Error(t, err) require.False(t, done) require.Contains(t, err.Error(), "provisioner job did not succeed") @@ -130,7 +130,7 @@ func TestWaitForJobOnce_StillActive(t *testing.T) { client := codersdk.New(srvURL) version := &codersdk.TemplateVersion{ID: versionID} - _, done, err := waitForJobOnce(context.Background(), client, version, nil) + _, done, err := waitForJobOnce(context.Background(), client, version) require.NoError(t, err) require.False(t, done) } @@ -174,6 +174,57 @@ func TestWaitForJob_RetriesAndCloses(t *testing.T) { require.Equal(t, int32(3), wsConnections.Load()) } +func TestWaitForJob_ClosesConnectionBetweenRetries(t *testing.T) { + t.Parallel() + versionID := uuid.New() + var openConns atomic.Int32 + var maxOpenConns atomic.Int32 + + handler := http.NewServeMux() + handler.HandleFunc("/api/v2/templateversions/", func(w http.ResponseWriter, r *http.Request) { + if strings.Contains(r.URL.RawQuery, "follow") { + current := openConns.Add(1) + for { + old := maxOpenConns.Load() + if current <= old || maxOpenConns.CompareAndSwap(old, current) { + break + } + } + conn, err := websocket.Accept(w, r, nil) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + _ = conn.Close(websocket.StatusNormalClosure, "done") + openConns.Add(-1) + return + } + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(codersdk.TemplateVersion{ + ID: versionID, + Job: codersdk.ProvisionerJob{ + Status: codersdk.ProvisionerJobRunning, + }, + }) + }) + + srv := httptest.NewServer(handler) + t.Cleanup(srv.Close) + srvURL, err := url.Parse(srv.URL) + require.NoError(t, err) + client := codersdk.New(srvURL) + + version := &codersdk.TemplateVersion{ID: versionID} + _, err = waitForJob(context.Background(), client, version) + require.Error(t, err) + require.Contains(t, err.Error(), "did not complete after 3 retries") + // With the defer-in-loop bug, connections would not be closed between retries, + // so maxOpenConns would be > 1. With the fix (separate function), each + // connection is closed before the next retry, so maxOpenConns should be 1. + require.Equal(t, int32(1), maxOpenConns.Load(), + "connections should be closed between retries, not accumulated") +} + func TestWaitForJob_SucceedsOnRetry(t *testing.T) { t.Parallel() versionID := uuid.New() From 9b18ce04e2b4b6b5a7e0f00e404def7eaec29af8 Mon Sep 17 00:00:00 2001 From: "blink-so[bot]" <211532188+blink-so[bot]@users.noreply.github.com> Date: Fri, 27 Feb 2026 18:28:55 +0000 Subject: [PATCH 5/7] fix: remove flaky concurrent connection test The TestWaitForJob_ClosesConnectionBetweenRetries test had a race condition: the server-side connection decrement and client-side defer close are not synchronized, causing maxOpenConns to non-deterministically be 1 or 2. Removed the flaky test since TestWaitForJob_RetriesAndCloses already verifies that exactly 3 separate WebSocket connections are made (one per retry), which confirms the extracted function approach works. --- internal/provider/wait_for_job_test.go | 49 -------------------------- 1 file changed, 49 deletions(-) diff --git a/internal/provider/wait_for_job_test.go b/internal/provider/wait_for_job_test.go index 29f4fec..cac7b7d 100644 --- a/internal/provider/wait_for_job_test.go +++ b/internal/provider/wait_for_job_test.go @@ -174,56 +174,7 @@ func TestWaitForJob_RetriesAndCloses(t *testing.T) { require.Equal(t, int32(3), wsConnections.Load()) } -func TestWaitForJob_ClosesConnectionBetweenRetries(t *testing.T) { - t.Parallel() - versionID := uuid.New() - var openConns atomic.Int32 - var maxOpenConns atomic.Int32 - - handler := http.NewServeMux() - handler.HandleFunc("/api/v2/templateversions/", func(w http.ResponseWriter, r *http.Request) { - if strings.Contains(r.URL.RawQuery, "follow") { - current := openConns.Add(1) - for { - old := maxOpenConns.Load() - if current <= old || maxOpenConns.CompareAndSwap(old, current) { - break - } - } - conn, err := websocket.Accept(w, r, nil) - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - _ = conn.Close(websocket.StatusNormalClosure, "done") - openConns.Add(-1) - return - } - w.Header().Set("Content-Type", "application/json") - _ = json.NewEncoder(w).Encode(codersdk.TemplateVersion{ - ID: versionID, - Job: codersdk.ProvisionerJob{ - Status: codersdk.ProvisionerJobRunning, - }, - }) - }) - srv := httptest.NewServer(handler) - t.Cleanup(srv.Close) - srvURL, err := url.Parse(srv.URL) - require.NoError(t, err) - client := codersdk.New(srvURL) - - version := &codersdk.TemplateVersion{ID: versionID} - _, err = waitForJob(context.Background(), client, version) - require.Error(t, err) - require.Contains(t, err.Error(), "did not complete after 3 retries") - // With the defer-in-loop bug, connections would not be closed between retries, - // so maxOpenConns would be > 1. With the fix (separate function), each - // connection is closed before the next retry, so maxOpenConns should be 1. - require.Equal(t, int32(1), maxOpenConns.Load(), - "connections should be closed between retries, not accumulated") -} func TestWaitForJob_SucceedsOnRetry(t *testing.T) { t.Parallel() From f26f0fd2525726927519507f0168255ef4782b37 Mon Sep 17 00:00:00 2001 From: "blink-so[bot]" <211532188+blink-so[bot]@users.noreply.github.com> Date: Fri, 27 Feb 2026 18:30:05 +0000 Subject: [PATCH 6/7] fix: remove extra blank lines (go fmt) --- internal/provider/wait_for_job_test.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/internal/provider/wait_for_job_test.go b/internal/provider/wait_for_job_test.go index cac7b7d..651154e 100644 --- a/internal/provider/wait_for_job_test.go +++ b/internal/provider/wait_for_job_test.go @@ -174,8 +174,6 @@ func TestWaitForJob_RetriesAndCloses(t *testing.T) { require.Equal(t, int32(3), wsConnections.Load()) } - - func TestWaitForJob_SucceedsOnRetry(t *testing.T) { t.Parallel() versionID := uuid.New() From 0741b79f8c3e34626f0e3b5e2bf50717369b039f Mon Sep 17 00:00:00 2001 From: ethan Date: Mon, 2 Mar 2026 15:31:53 +1100 Subject: [PATCH 7/7] provider: improve waitForJob retry flow and test readability --- go.mod | 1 + go.sum | 2 + internal/provider/template_resource.go | 39 +++++++---- internal/provider/wait_for_job_test.go | 93 +++++++++++++++++--------- 4 files changed, 88 insertions(+), 47 deletions(-) diff --git a/go.mod b/go.mod index ef25cf4..bc30e9c 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.25.6 require ( cdr.dev/slog/v3 v3.0.0-rc1 github.com/coder/coder/v2 v2.30.1 + github.com/coder/retry v1.5.1 github.com/docker/docker v28.5.2+incompatible github.com/docker/go-connections v0.6.0 github.com/google/uuid v1.6.0 diff --git a/go.sum b/go.sum index 40c05f7..13cf44c 100644 --- a/go.sum +++ b/go.sum @@ -122,6 +122,8 @@ github.com/coder/coder/v2 v2.30.1 h1:5dxGKxWx80xb6lNd958y8Y4h3fBbQubDgIooHTTv/RQ github.com/coder/coder/v2 v2.30.1/go.mod h1:w40ThqnpVr727SVnu3wwUrK2woxNx1MrV1zVxxABimk= github.com/coder/pretty v0.0.0-20230908205945-e89ba86370e0 h1:3A0ES21Ke+FxEM8CXx9n47SZOKOpgSE1bbJzlE4qPVs= github.com/coder/pretty v0.0.0-20230908205945-e89ba86370e0/go.mod h1:5UuS2Ts+nTToAMeOjNlnHFkPahrtDkmpydBen/3wgZc= +github.com/coder/retry v1.5.1 h1:iWu8YnD8YqHs3XwqrqsjoBTAVqT9ml6z9ViJ2wlMiqc= +github.com/coder/retry v1.5.1/go.mod h1:blHMk9vs6LkoRT9ZHyuZo360cufXEhrxqvEzeMtRGoY= github.com/coder/serpent v0.13.0 h1:6EoWjpEypkb8cS6i0eCF4qoAv9vrEVaX26RW+3FMMvo= github.com/coder/serpent v0.13.0/go.mod h1:7OIvFBYMd+OqarMy5einBl8AtRr8LliopVU7pyrwucY= github.com/coder/terraform-provider-coder/v2 v2.13.1 h1:dtPaJUvueFm+XwBPUMWQCc5Z1QUQBW4B4RNyzX4h4y8= diff --git a/internal/provider/template_resource.go b/internal/provider/template_resource.go index 83edfcb..f053773 100644 --- a/internal/provider/template_resource.go +++ b/internal/provider/template_resource.go @@ -8,11 +8,13 @@ import ( "io" "slices" "strings" + "time" "cdr.dev/slog/v3" "github.com/coder/coder/v2/coderd/util/ptr" "github.com/coder/coder/v2/codersdk" "github.com/coder/coder/v2/provisionersdk" + "github.com/coder/retry" "github.com/coder/terraform-provider-coderd/internal/codersdkvalidator" "github.com/google/uuid" "github.com/hashicorp/terraform-plugin-framework-validators/listvalidator" @@ -1105,21 +1107,31 @@ func uploadDirectory(ctx context.Context, client *codersdk.Client, logger slog.L func waitForJob(ctx context.Context, client *codersdk.Client, version *codersdk.TemplateVersion) ([]codersdk.ProvisionerJobLog, error) { const maxRetries = 3 var allLogs []codersdk.ProvisionerJobLog - for retries := 0; retries < maxRetries; retries++ { - logs, done, err := waitForJobOnce(ctx, client, version) + var lastLogID int64 + + for attempts, retrier := 0, retry.New(500*time.Millisecond, 5*time.Second); attempts < maxRetries && retrier.Wait(ctx); attempts++ { + logs, done, err := waitForJobOnce(ctx, client, version, lastLogID) allLogs = append(allLogs, logs...) + if len(logs) > 0 { + lastLogID = logs[len(logs)-1].ID + } if err != nil { return allLogs, err } if done { return allLogs, nil } + tflog.Warn(ctx, fmt.Sprintf("provisioner job still active, retrying (attempt %d/%d)", attempts+1, maxRetries)) + } + + if err := ctx.Err(); err != nil { + return allLogs, err } return allLogs, fmt.Errorf("provisioner job did not complete after %d retries", maxRetries) } -func waitForJobOnce(ctx context.Context, client *codersdk.Client, version *codersdk.TemplateVersion) ([]codersdk.ProvisionerJobLog, bool, error) { - logs, closer, err := client.TemplateVersionLogsAfter(ctx, version.ID, 0) +func waitForJobOnce(ctx context.Context, client *codersdk.Client, version *codersdk.TemplateVersion, after int64) ([]codersdk.ProvisionerJobLog, bool, error) { + logCh, closer, err := client.TemplateVersionLogsAfter(ctx, version.ID, after) if err != nil { return nil, false, fmt.Errorf("begin streaming logs: %w", err) } @@ -1132,19 +1144,19 @@ func waitForJobOnce(ctx context.Context, client *codersdk.Client, version *coder }() var jobLogs []codersdk.ProvisionerJobLog for { - logs, ok := <-logs + logMsg, ok := <-logCh if !ok { break } - tflog.Info(ctx, logs.Output, map[string]interface{}{ - "job_id": logs.ID, - "job_stage": logs.Stage, - "log_source": logs.Source, - "level": logs.Level, - "created_at": logs.CreatedAt, + tflog.Info(ctx, logMsg.Output, map[string]interface{}{ + "job_id": logMsg.ID, + "job_stage": logMsg.Stage, + "log_source": logMsg.Source, + "level": logMsg.Level, + "created_at": logMsg.CreatedAt, }) - if logs.Output != "" { - jobLogs = append(jobLogs, logs) + if logMsg.Output != "" { + jobLogs = append(jobLogs, logMsg) } } latestResp, err := client.TemplateVersion(ctx, version.ID) @@ -1152,7 +1164,6 @@ func waitForJobOnce(ctx context.Context, client *codersdk.Client, version *coder return jobLogs, false, err } if latestResp.Job.Status.Active() { - tflog.Warn(ctx, fmt.Sprintf("provisioner job still active, continuing to wait...: %s", latestResp.Job.Status)) return jobLogs, false, nil } if latestResp.Job.Status != codersdk.ProvisionerJobSucceeded { diff --git a/internal/provider/wait_for_job_test.go b/internal/provider/wait_for_job_test.go index 651154e..2dd2dba 100644 --- a/internal/provider/wait_for_job_test.go +++ b/internal/provider/wait_for_job_test.go @@ -53,7 +53,7 @@ func TestWaitForJobOnce_Success(t *testing.T) { client := codersdk.New(srvURL) version := &codersdk.TemplateVersion{ID: versionID} - logs, done, err := waitForJobOnce(context.Background(), client, version) + logs, done, err := waitForJobOnce(context.Background(), client, version, 0) require.NoError(t, err) require.True(t, done) require.Len(t, logs, 1) @@ -92,7 +92,7 @@ func TestWaitForJobOnce_JobFailed(t *testing.T) { client := codersdk.New(srvURL) version := &codersdk.TemplateVersion{ID: versionID} - _, done, err := waitForJobOnce(context.Background(), client, version) + _, done, err := waitForJobOnce(context.Background(), client, version, 0) require.Error(t, err) require.False(t, done) require.Contains(t, err.Error(), "provisioner job did not succeed") @@ -130,34 +130,53 @@ func TestWaitForJobOnce_StillActive(t *testing.T) { client := codersdk.New(srvURL) version := &codersdk.TemplateVersion{ID: versionID} - _, done, err := waitForJobOnce(context.Background(), client, version) + _, done, err := waitForJobOnce(context.Background(), client, version, 0) require.NoError(t, err) require.False(t, done) } -func TestWaitForJob_RetriesAndCloses(t *testing.T) { +func TestWaitForJob_UsesAfterCursorAcrossRetries(t *testing.T) { t.Parallel() versionID := uuid.New() - var wsConnections atomic.Int32 + var versionCallCount atomic.Int32 + var wsCallCount atomic.Int32 + secondAfterCh := make(chan string, 1) handler := http.NewServeMux() handler.HandleFunc("/api/v2/templateversions/", func(w http.ResponseWriter, r *http.Request) { if strings.Contains(r.URL.RawQuery, "follow") { - wsConnections.Add(1) + call := wsCallCount.Add(1) + if call == 2 { + secondAfterCh <- r.URL.Query().Get("after") + } + conn, err := websocket.Accept(w, r, nil) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } + ctx := r.Context() + if call == 1 { + _ = wsjson.Write(ctx, conn, codersdk.ProvisionerJobLog{ID: 1, Output: "log 1"}) + _ = wsjson.Write(ctx, conn, codersdk.ProvisionerJobLog{ID: 2, Output: "log 2"}) + _ = wsjson.Write(ctx, conn, codersdk.ProvisionerJobLog{ID: 3, Output: "log 3"}) + } else { + _ = wsjson.Write(ctx, conn, codersdk.ProvisionerJobLog{ID: 4, Output: "log 4"}) + _ = wsjson.Write(ctx, conn, codersdk.ProvisionerJobLog{ID: 5, Output: "log 5"}) + } _ = conn.Close(websocket.StatusNormalClosure, "done") return } + + count := versionCallCount.Add(1) + status := codersdk.ProvisionerJobRunning + if count >= 2 { + status = codersdk.ProvisionerJobSucceeded + } w.Header().Set("Content-Type", "application/json") _ = json.NewEncoder(w).Encode(codersdk.TemplateVersion{ - ID: versionID, - Job: codersdk.ProvisionerJob{ - Status: codersdk.ProvisionerJobRunning, - }, + ID: versionID, + Job: codersdk.ProvisionerJob{Status: status}, }) }) @@ -168,16 +187,32 @@ func TestWaitForJob_RetriesAndCloses(t *testing.T) { client := codersdk.New(srvURL) version := &codersdk.TemplateVersion{ID: versionID} - _, err = waitForJob(context.Background(), client, version) - require.Error(t, err) - require.Contains(t, err.Error(), "did not complete after 3 retries") - require.Equal(t, int32(3), wsConnections.Load()) + logs, err := waitForJob(context.Background(), client, version) + require.NoError(t, err) + require.Len(t, logs, 5) + for i, log := range logs { + require.Equal(t, int64(i+1), log.ID) + } + require.Equal(t, int32(2), wsCallCount.Load()) + select { + case got := <-secondAfterCh: + require.Equal(t, "3", got) + default: + t.Fatal("missing second after cursor") + } } -func TestWaitForJob_SucceedsOnRetry(t *testing.T) { +func TestWaitForJob_ContextCanceledDuringBackoff(t *testing.T) { t.Parallel() versionID := uuid.New() - var versionCallCount atomic.Int32 + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + var statusCallCount atomic.Int32 + firstStatusSeen := make(chan struct{}, 1) + go func() { + <-firstStatusSeen + cancel() + }() handler := http.NewServeMux() handler.HandleFunc("/api/v2/templateversions/", func(w http.ResponseWriter, r *http.Request) { @@ -187,26 +222,19 @@ func TestWaitForJob_SucceedsOnRetry(t *testing.T) { http.Error(w, err.Error(), http.StatusInternalServerError) return } - ctx := r.Context() - _ = wsjson.Write(ctx, conn, codersdk.ProvisionerJobLog{ - ID: int64(versionCallCount.Load()), - Output: "log line", - }) _ = conn.Close(websocket.StatusNormalClosure, "done") return } - count := versionCallCount.Add(1) - status := codersdk.ProvisionerJobRunning - if count >= 2 { - status = codersdk.ProvisionerJobSucceeded - } + w.Header().Set("Content-Type", "application/json") _ = json.NewEncoder(w).Encode(codersdk.TemplateVersion{ - ID: versionID, - Job: codersdk.ProvisionerJob{ - Status: status, - }, + ID: versionID, + Job: codersdk.ProvisionerJob{Status: codersdk.ProvisionerJobRunning}, }) + // Cancel after the first status response so waitForJob hits cancellation while waiting to retry. + if statusCallCount.Add(1) == 1 { + firstStatusSeen <- struct{}{} + } }) srv := httptest.NewServer(handler) @@ -216,7 +244,6 @@ func TestWaitForJob_SucceedsOnRetry(t *testing.T) { client := codersdk.New(srvURL) version := &codersdk.TemplateVersion{ID: versionID} - logs, err := waitForJob(context.Background(), client, version) - require.NoError(t, err) - require.Len(t, logs, 2) + _, err = waitForJob(ctx, client, version) + require.ErrorIs(t, err, context.Canceled) }