diff --git a/cmd/wfctl/infra_state_store.go b/cmd/wfctl/infra_state_store.go index 1db82ed1..db37723d 100644 --- a/cmd/wfctl/infra_state_store.go +++ b/cmd/wfctl/infra_state_store.go @@ -260,8 +260,8 @@ type spacesWfctlStateStore struct { inner *module.SpacesIaCStateStore } -func (s *spacesWfctlStateStore) ListResources(_ context.Context) ([]interfaces.ResourceState, error) { - records, err := s.inner.ListStates(nil) +func (s *spacesWfctlStateStore) ListResources(ctx context.Context) ([]interfaces.ResourceState, error) { + records, err := s.inner.ListStates(ctx, nil) if err != nil { return nil, fmt.Errorf("list spaces state: %w", err) } @@ -272,12 +272,12 @@ func (s *spacesWfctlStateStore) ListResources(_ context.Context) ([]interfaces.R return states, nil } -func (s *spacesWfctlStateStore) SaveResource(_ context.Context, state interfaces.ResourceState) error { - return s.inner.SaveState(resourceStateToIaCState(state)) +func (s *spacesWfctlStateStore) SaveResource(ctx context.Context, state interfaces.ResourceState) error { + return s.inner.SaveState(ctx, resourceStateToIaCState(state)) } -func (s *spacesWfctlStateStore) DeleteResource(_ context.Context, name string) error { - return s.inner.DeleteState(name) +func (s *spacesWfctlStateStore) DeleteResource(ctx context.Context, name string) error { + return s.inner.DeleteState(ctx, name) } // ── Postgres backend ─────────────────────────────────────────────────────────── @@ -306,8 +306,8 @@ type postgresWfctlStateStore struct { inner *module.PostgresIaCStateStore } -func (s *postgresWfctlStateStore) ListResources(_ context.Context) ([]interfaces.ResourceState, error) { - records, err := s.inner.ListStates(nil) +func (s *postgresWfctlStateStore) ListResources(ctx context.Context) ([]interfaces.ResourceState, error) { + records, err := s.inner.ListStates(ctx, nil) if err != nil { return nil, fmt.Errorf("list postgres state: %w", err) } @@ -318,12 +318,12 @@ func (s *postgresWfctlStateStore) ListResources(_ context.Context) ([]interfaces return states, nil } -func (s *postgresWfctlStateStore) SaveResource(_ context.Context, state interfaces.ResourceState) error { - return s.inner.SaveState(resourceStateToIaCState(state)) +func (s *postgresWfctlStateStore) SaveResource(ctx context.Context, state interfaces.ResourceState) error { + return s.inner.SaveState(ctx, resourceStateToIaCState(state)) } -func (s *postgresWfctlStateStore) DeleteResource(_ context.Context, name string) error { - return s.inner.DeleteState(name) +func (s *postgresWfctlStateStore) DeleteResource(ctx context.Context, name string) error { + return s.inner.DeleteState(ctx, name) } // ── Conversion helpers ───────────────────────────────────────────────────────── diff --git a/module/benchmark_iac_state_backend_test.go b/module/benchmark_iac_state_backend_test.go index 68093414..9771d39a 100644 --- a/module/benchmark_iac_state_backend_test.go +++ b/module/benchmark_iac_state_backend_test.go @@ -31,18 +31,19 @@ func oneMBState() *IaCState { func BenchmarkIaCStateBackend_InProcess(b *testing.B) { store := NewMemoryIaCStateStore() st := oneMBState() + ctx := context.Background() b.ResetTimer() for i := 0; i < b.N; i++ { - if err := store.Lock(st.ResourceID); err != nil { + if err := store.Lock(ctx, st.ResourceID); err != nil { b.Fatal(err) } - if _, err := store.GetState(st.ResourceID); err != nil { + if _, err := store.GetState(ctx, st.ResourceID); err != nil { b.Fatal(err) } - if err := store.SaveState(st); err != nil { + if err := store.SaveState(ctx, st); err != nil { b.Fatal(err) } - if err := store.Unlock(st.ResourceID); err != nil { + if err := store.Unlock(ctx, st.ResourceID); err != nil { b.Fatal(err) } } diff --git a/module/iac_state.go b/module/iac_state.go index 21ca5e84..dc4a84e0 100644 --- a/module/iac_state.go +++ b/module/iac_state.go @@ -1,5 +1,7 @@ package module +import "context" + // IaCState tracks the state of an infrastructure resource. type IaCState struct { ResourceID string `json:"resource_id"` @@ -20,22 +22,23 @@ type IaCState struct { // IaCStateStore is the interface for IaC state persistence backends. type IaCStateStore interface { // GetState retrieves a state record by resource ID. Returns nil, nil when not found. - GetState(resourceID string) (*IaCState, error) + GetState(ctx context.Context, resourceID string) (*IaCState, error) // SaveState inserts or replaces a state record. - SaveState(state *IaCState) error + SaveState(ctx context.Context, state *IaCState) error // ListStates returns all state records matching the provided key=value filter. - // Pass an empty map to return all records. - ListStates(filter map[string]string) ([]*IaCState, error) + // Pass a nil or empty map to return all records — both are treated as "no + // filter" (ranging over a nil map is valid Go, and most call sites pass nil). + ListStates(ctx context.Context, filter map[string]string) ([]*IaCState, error) // DeleteState removes a state record by resource ID. - DeleteState(resourceID string) error + DeleteState(ctx context.Context, resourceID string) error // Lock acquires an exclusive lock for the given resource ID. // Returns an error if the resource is already locked. - Lock(resourceID string) error + Lock(ctx context.Context, resourceID string) error // Unlock releases the lock for the given resource ID. - Unlock(resourceID string) error + Unlock(ctx context.Context, resourceID string) error } diff --git a/module/iac_state_azure.go b/module/iac_state_azure.go index d4f71f09..7a216b32 100644 --- a/module/iac_state_azure.go +++ b/module/iac_state_azure.go @@ -81,8 +81,8 @@ func (s *AzureBlobIaCStateStore) lockBlobName(resourceID string) string { } // GetState retrieves a state record by resource ID. Returns nil, nil when not found. -func (s *AzureBlobIaCStateStore) GetState(resourceID string) (*IaCState, error) { - data, err := s.client.DownloadBlob(context.Background(), s.blobName(resourceID)) +func (s *AzureBlobIaCStateStore) GetState(ctx context.Context, resourceID string) (*IaCState, error) { + data, err := s.client.DownloadBlob(ctx, s.blobName(resourceID)) if err != nil { if errors.Is(err, ErrAzureBlobNotFound) { return nil, nil @@ -97,7 +97,7 @@ func (s *AzureBlobIaCStateStore) GetState(resourceID string) (*IaCState, error) } // SaveState writes the state record as a JSON blob. -func (s *AzureBlobIaCStateStore) SaveState(state *IaCState) error { +func (s *AzureBlobIaCStateStore) SaveState(ctx context.Context, state *IaCState) error { if state == nil { return fmt.Errorf("iac azure state: SaveState: state must not be nil") } @@ -108,15 +108,15 @@ func (s *AzureBlobIaCStateStore) SaveState(state *IaCState) error { if err != nil { return fmt.Errorf("iac azure state: SaveState %q: marshal: %w", state.ResourceID, err) } - if err := s.client.UploadBlob(context.Background(), s.blobName(state.ResourceID), data, "application/json"); err != nil { + if err := s.client.UploadBlob(ctx, s.blobName(state.ResourceID), data, "application/json"); err != nil { return fmt.Errorf("iac azure state: SaveState %q: upload: %w", state.ResourceID, err) } return nil } // ListStates lists all state blobs and returns those matching the filter. -func (s *AzureBlobIaCStateStore) ListStates(filter map[string]string) ([]*IaCState, error) { - names, err := s.client.ListBlobs(context.Background(), s.prefix) +func (s *AzureBlobIaCStateStore) ListStates(ctx context.Context, filter map[string]string) ([]*IaCState, error) { + names, err := s.client.ListBlobs(ctx, s.prefix) if err != nil { return nil, fmt.Errorf("iac azure state: ListStates: %w", err) } @@ -125,8 +125,14 @@ func (s *AzureBlobIaCStateStore) ListStates(filter map[string]string) ([]*IaCSta if !strings.HasSuffix(name, ".json") { continue } - data, err := s.client.DownloadBlob(context.Background(), name) + data, err := s.client.DownloadBlob(ctx, name) if err != nil { + // A canceled / deadlined context must abort the listing rather + // than silently return partial results; only genuinely unreadable + // blobs are skipped. + if ctx.Err() != nil || errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { + return nil, fmt.Errorf("iac azure state: ListStates: %w", err) + } continue } var st IaCState @@ -141,8 +147,8 @@ func (s *AzureBlobIaCStateStore) ListStates(filter map[string]string) ([]*IaCSta } // DeleteState removes the state blob for resourceID. -func (s *AzureBlobIaCStateStore) DeleteState(resourceID string) error { - if err := s.client.DeleteBlob(context.Background(), s.blobName(resourceID)); err != nil { +func (s *AzureBlobIaCStateStore) DeleteState(ctx context.Context, resourceID string) error { + if err := s.client.DeleteBlob(ctx, s.blobName(resourceID)); err != nil { if errors.Is(err, ErrAzureBlobNotFound) { return fmt.Errorf("iac azure state: DeleteState %q: not found", resourceID) } @@ -152,12 +158,12 @@ func (s *AzureBlobIaCStateStore) DeleteState(resourceID string) error { } // Lock acquires a blob lease on the lock blob for resourceID (60-second duration). -func (s *AzureBlobIaCStateStore) Lock(resourceID string) error { +func (s *AzureBlobIaCStateStore) Lock(ctx context.Context, resourceID string) error { s.mu.Lock() defer s.mu.Unlock() lockBlob := s.lockBlobName(resourceID) - leaseID, err := s.client.AcquireLease(context.Background(), lockBlob, 60) + leaseID, err := s.client.AcquireLease(ctx, lockBlob, 60) if err != nil { if strings.Contains(err.Error(), "already leased") || strings.Contains(err.Error(), "leased") { return fmt.Errorf("iac azure state: Lock %q: resource is already locked", resourceID) @@ -169,7 +175,7 @@ func (s *AzureBlobIaCStateStore) Lock(resourceID string) error { } // Unlock releases the lease on the lock blob for resourceID. -func (s *AzureBlobIaCStateStore) Unlock(resourceID string) error { +func (s *AzureBlobIaCStateStore) Unlock(ctx context.Context, resourceID string) error { s.mu.Lock() defer s.mu.Unlock() @@ -178,7 +184,7 @@ func (s *AzureBlobIaCStateStore) Unlock(resourceID string) error { return fmt.Errorf("iac azure state: Unlock %q: not locked", resourceID) } lockBlob := s.lockBlobName(resourceID) - if err := s.client.ReleaseLease(context.Background(), lockBlob, leaseID); err != nil { + if err := s.client.ReleaseLease(ctx, lockBlob, leaseID); err != nil { return fmt.Errorf("iac azure state: Unlock %q: %w", resourceID, err) } delete(s.leaseIDs, resourceID) diff --git a/module/iac_state_azure_test.go b/module/iac_state_azure_test.go index 0ed76190..148a686f 100644 --- a/module/iac_state_azure_test.go +++ b/module/iac_state_azure_test.go @@ -97,7 +97,7 @@ func newTestAzureStore(client module.AzureBlobClient) *module.AzureBlobIaCStateS func TestAzureBlobIaCStateStore_GetState_NotFound(t *testing.T) { store := newTestAzureStore(newMockAzureClient()) - st, err := store.GetState("nonexistent") + st, err := store.GetState(context.Background(), "nonexistent") if err != nil { t.Fatalf("unexpected error: %v", err) } @@ -115,11 +115,11 @@ func TestAzureBlobIaCStateStore_SaveAndGetState(t *testing.T) { Provider: "azure", Status: "active", } - if err := store.SaveState(state); err != nil { + if err := store.SaveState(context.Background(), state); err != nil { t.Fatalf("SaveState: %v", err) } - got, err := store.GetState("az-cluster") + got, err := store.GetState(context.Background(), "az-cluster") if err != nil { t.Fatalf("GetState: %v", err) } @@ -133,14 +133,14 @@ func TestAzureBlobIaCStateStore_SaveAndGetState(t *testing.T) { func TestAzureBlobIaCStateStore_SaveState_Nil(t *testing.T) { store := newTestAzureStore(newMockAzureClient()) - if err := store.SaveState(nil); err == nil { + if err := store.SaveState(context.Background(), nil); err == nil { t.Fatal("expected error for nil state") } } func TestAzureBlobIaCStateStore_SaveState_EmptyID(t *testing.T) { store := newTestAzureStore(newMockAzureClient()) - if err := store.SaveState(&module.IaCState{}); err == nil { + if err := store.SaveState(context.Background(), &module.IaCState{}); err == nil { t.Fatal("expected error for empty resource_id") } } @@ -153,12 +153,12 @@ func TestAzureBlobIaCStateStore_ListStates(t *testing.T) { {ResourceID: "r2", ResourceType: "db", Provider: "azure", Status: "active"}, {ResourceID: "r3", ResourceType: "k8s", Provider: "gcp", Status: "destroyed"}, } { - if err := store.SaveState(st); err != nil { + if err := store.SaveState(context.Background(), st); err != nil { t.Fatalf("SaveState %q: %v", st.ResourceID, err) } } - all, err := store.ListStates(nil) + all, err := store.ListStates(context.Background(), nil) if err != nil { t.Fatalf("ListStates(nil): %v", err) } @@ -166,7 +166,7 @@ func TestAzureBlobIaCStateStore_ListStates(t *testing.T) { t.Errorf("ListStates = %d, want 3", len(all)) } - filtered, err := store.ListStates(map[string]string{"provider": "azure"}) + filtered, err := store.ListStates(context.Background(), map[string]string{"provider": "azure"}) if err != nil { t.Fatalf("ListStates(provider=azure): %v", err) } @@ -178,14 +178,14 @@ func TestAzureBlobIaCStateStore_ListStates(t *testing.T) { func TestAzureBlobIaCStateStore_ListStates_SkipsLockBlobs(t *testing.T) { store := newTestAzureStore(newMockAzureClient()) - if err := store.SaveState(&module.IaCState{ResourceID: "r1", Status: "active"}); err != nil { + if err := store.SaveState(context.Background(), &module.IaCState{ResourceID: "r1", Status: "active"}); err != nil { t.Fatalf("SaveState: %v", err) } - if err := store.Lock("r1"); err != nil { + if err := store.Lock(context.Background(), "r1"); err != nil { t.Fatalf("Lock: %v", err) } - results, err := store.ListStates(nil) + results, err := store.ListStates(context.Background(), nil) if err != nil { t.Fatalf("ListStates: %v", err) } @@ -197,13 +197,13 @@ func TestAzureBlobIaCStateStore_ListStates_SkipsLockBlobs(t *testing.T) { func TestAzureBlobIaCStateStore_DeleteState(t *testing.T) { store := newTestAzureStore(newMockAzureClient()) - if err := store.SaveState(&module.IaCState{ResourceID: "del-me", Status: "active"}); err != nil { + if err := store.SaveState(context.Background(), &module.IaCState{ResourceID: "del-me", Status: "active"}); err != nil { t.Fatalf("SaveState: %v", err) } - if err := store.DeleteState("del-me"); err != nil { + if err := store.DeleteState(context.Background(), "del-me"); err != nil { t.Fatalf("DeleteState: %v", err) } - st, err := store.GetState("del-me") + st, err := store.GetState(context.Background(), "del-me") if err != nil { t.Fatalf("GetState after delete: %v", err) } @@ -214,7 +214,7 @@ func TestAzureBlobIaCStateStore_DeleteState(t *testing.T) { func TestAzureBlobIaCStateStore_DeleteState_NotFound(t *testing.T) { store := newTestAzureStore(newMockAzureClient()) - if err := store.DeleteState("nonexistent"); err == nil { + if err := store.DeleteState(context.Background(), "nonexistent"); err == nil { t.Fatal("expected error deleting nonexistent state") } } @@ -222,23 +222,23 @@ func TestAzureBlobIaCStateStore_DeleteState_NotFound(t *testing.T) { func TestAzureBlobIaCStateStore_LockUnlock(t *testing.T) { store := newTestAzureStore(newMockAzureClient()) - if err := store.Lock("res-1"); err != nil { + if err := store.Lock(context.Background(), "res-1"); err != nil { t.Fatalf("Lock: %v", err) } - if err := store.Lock("res-1"); err == nil { + if err := store.Lock(context.Background(), "res-1"); err == nil { t.Fatal("expected error on double lock") } - if err := store.Unlock("res-1"); err != nil { + if err := store.Unlock(context.Background(), "res-1"); err != nil { t.Fatalf("Unlock: %v", err) } - if err := store.Lock("res-1"); err != nil { + if err := store.Lock(context.Background(), "res-1"); err != nil { t.Fatalf("Lock after unlock: %v", err) } } func TestAzureBlobIaCStateStore_Unlock_NotLocked(t *testing.T) { store := newTestAzureStore(newMockAzureClient()) - if err := store.Unlock("not-locked"); err == nil { + if err := store.Unlock(context.Background(), "not-locked"); err == nil { t.Fatal("expected error unlocking non-locked resource") } } @@ -250,15 +250,15 @@ func TestAzureBlobIaCStateStore_Unlock_PassesLeaseID(t *testing.T) { client := newMockAzureClient() store := newTestAzureStore(client) - if err := store.Lock("res-lease"); err != nil { + if err := store.Lock(context.Background(), "res-lease"); err != nil { t.Fatalf("Lock: %v", err) } // Unlock must pass the correct leaseID — mock rejects wrong/empty leaseIDs. - if err := store.Unlock("res-lease"); err != nil { + if err := store.Unlock(context.Background(), "res-lease"); err != nil { t.Fatalf("Unlock with leaseID: %v", err) } // After unlock, should be able to re-lock. - if err := store.Lock("res-lease"); err != nil { + if err := store.Lock(context.Background(), "res-lease"); err != nil { t.Fatalf("Lock after Unlock: %v", err) } } @@ -272,10 +272,10 @@ func TestAzureBlobIaCStateStore_JSONRoundTrip(t *testing.T) { Status: "active", Outputs: map[string]any{"fqdn": "myapp.azurewebsites.net"}, } - if err := store.SaveState(state); err != nil { + if err := store.SaveState(context.Background(), state); err != nil { t.Fatalf("SaveState: %v", err) } - got, err := store.GetState("az-rt") + got, err := store.GetState(context.Background(), "az-rt") if err != nil { t.Fatalf("GetState: %v", err) } diff --git a/module/iac_state_fs.go b/module/iac_state_fs.go index f7450989..a4038847 100644 --- a/module/iac_state_fs.go +++ b/module/iac_state_fs.go @@ -1,6 +1,7 @@ package module import ( + "context" "encoding/json" "fmt" "os" @@ -46,7 +47,7 @@ func (s *FSIaCStateStore) ensureDir() error { } // GetState reads the JSON state file for resourceID. Returns nil, nil when not found. -func (s *FSIaCStateStore) GetState(resourceID string) (*IaCState, error) { +func (s *FSIaCStateStore) GetState(ctx context.Context, resourceID string) (*IaCState, error) { if err := s.ensureDir(); err != nil { return nil, fmt.Errorf("iac fs state: GetState %q: %w", resourceID, err) } @@ -65,7 +66,7 @@ func (s *FSIaCStateStore) GetState(resourceID string) (*IaCState, error) { } // SaveState writes the state record as a JSON file, creating the directory as needed. -func (s *FSIaCStateStore) SaveState(state *IaCState) error { +func (s *FSIaCStateStore) SaveState(ctx context.Context, state *IaCState) error { if state == nil { return fmt.Errorf("iac fs state: SaveState: state must not be nil") } @@ -87,7 +88,7 @@ func (s *FSIaCStateStore) SaveState(state *IaCState) error { // ListStates reads all JSON files from the directory and returns those matching filter. // Supported filter keys: "resource_type", "provider", "status". -func (s *FSIaCStateStore) ListStates(filter map[string]string) ([]*IaCState, error) { +func (s *FSIaCStateStore) ListStates(ctx context.Context, filter map[string]string) ([]*IaCState, error) { if err := s.ensureDir(); err != nil { return nil, fmt.Errorf("iac fs state: ListStates: %w", err) } @@ -116,7 +117,7 @@ func (s *FSIaCStateStore) ListStates(filter map[string]string) ([]*IaCState, err } // DeleteState removes the JSON state file for resourceID. -func (s *FSIaCStateStore) DeleteState(resourceID string) error { +func (s *FSIaCStateStore) DeleteState(ctx context.Context, resourceID string) error { path := s.statePath(resourceID) if err := os.Remove(path); err != nil { if os.IsNotExist(err) { @@ -128,7 +129,7 @@ func (s *FSIaCStateStore) DeleteState(resourceID string) error { } // Lock creates a lock file for resourceID. Fails if the lock file already exists. -func (s *FSIaCStateStore) Lock(resourceID string) error { +func (s *FSIaCStateStore) Lock(ctx context.Context, resourceID string) error { s.mu.Lock() defer s.mu.Unlock() if err := s.ensureDir(); err != nil { @@ -150,7 +151,7 @@ func (s *FSIaCStateStore) Lock(resourceID string) error { } // Unlock removes the lock file for resourceID. -func (s *FSIaCStateStore) Unlock(resourceID string) error { +func (s *FSIaCStateStore) Unlock(ctx context.Context, resourceID string) error { s.mu.Lock() defer s.mu.Unlock() lp := s.lockPath(resourceID) diff --git a/module/iac_state_gcs.go b/module/iac_state_gcs.go index 38ff696a..f2d8ebaf 100644 --- a/module/iac_state_gcs.go +++ b/module/iac_state_gcs.go @@ -77,8 +77,8 @@ func (s *GCSIaCStateStore) lockKey(resourceID string) string { } // GetState retrieves a state record by resource ID. Returns nil, nil when not found. -func (s *GCSIaCStateStore) GetState(resourceID string) (*IaCState, error) { - data, _, err := s.client.ReadObject(context.Background(), s.stateKey(resourceID)) +func (s *GCSIaCStateStore) GetState(ctx context.Context, resourceID string) (*IaCState, error) { + data, _, err := s.client.ReadObject(ctx, s.stateKey(resourceID)) if err != nil { if errors.Is(err, ErrGCSNotFound) { return nil, nil @@ -93,7 +93,7 @@ func (s *GCSIaCStateStore) GetState(resourceID string) (*IaCState, error) { } // SaveState writes the state record as a JSON object to GCS. -func (s *GCSIaCStateStore) SaveState(state *IaCState) error { +func (s *GCSIaCStateStore) SaveState(ctx context.Context, state *IaCState) error { if state == nil { return fmt.Errorf("iac gcs state: SaveState: state must not be nil") } @@ -104,15 +104,15 @@ func (s *GCSIaCStateStore) SaveState(state *IaCState) error { if err != nil { return fmt.Errorf("iac gcs state: SaveState %q: marshal: %w", state.ResourceID, err) } - if _, err := s.client.WriteObject(context.Background(), s.stateKey(state.ResourceID), data, "application/json"); err != nil { + if _, err := s.client.WriteObject(ctx, s.stateKey(state.ResourceID), data, "application/json"); err != nil { return fmt.Errorf("iac gcs state: SaveState %q: write: %w", state.ResourceID, err) } return nil } // ListStates lists all state objects and returns those matching the filter. -func (s *GCSIaCStateStore) ListStates(filter map[string]string) ([]*IaCState, error) { - keys, err := s.client.ListObjects(context.Background(), s.prefix) +func (s *GCSIaCStateStore) ListStates(ctx context.Context, filter map[string]string) ([]*IaCState, error) { + keys, err := s.client.ListObjects(ctx, s.prefix) if err != nil { return nil, fmt.Errorf("iac gcs state: ListStates: %w", err) } @@ -121,8 +121,14 @@ func (s *GCSIaCStateStore) ListStates(filter map[string]string) ([]*IaCState, er if !strings.HasSuffix(key, ".json") { continue } - data, _, err := s.client.ReadObject(context.Background(), key) + data, _, err := s.client.ReadObject(ctx, key) if err != nil { + // A canceled / deadlined context must abort the listing rather + // than silently return partial results; only genuinely unreadable + // objects are skipped. + if ctx.Err() != nil || errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { + return nil, fmt.Errorf("iac gcs state: ListStates: %w", err) + } continue } var st IaCState @@ -137,8 +143,8 @@ func (s *GCSIaCStateStore) ListStates(filter map[string]string) ([]*IaCState, er } // DeleteState removes the state object for resourceID. -func (s *GCSIaCStateStore) DeleteState(resourceID string) error { - if err := s.client.DeleteObject(context.Background(), s.stateKey(resourceID)); err != nil { +func (s *GCSIaCStateStore) DeleteState(ctx context.Context, resourceID string) error { + if err := s.client.DeleteObject(ctx, s.stateKey(resourceID)); err != nil { if errors.Is(err, ErrGCSNotFound) { return fmt.Errorf("iac gcs state: DeleteState %q: not found", resourceID) } @@ -149,10 +155,10 @@ func (s *GCSIaCStateStore) DeleteState(resourceID string) error { // Lock acquires an advisory lock using GCS generation-match preconditions. // The lock object is written with If-None-Match (generation 0), which is atomic. -func (s *GCSIaCStateStore) Lock(resourceID string) error { +func (s *GCSIaCStateStore) Lock(ctx context.Context, resourceID string) error { key := s.lockKey(resourceID) body := []byte("locked") - gen, err := s.client.WriteObjectIfGenerationMatch(context.Background(), key, body, "text/plain", 0) + gen, err := s.client.WriteObjectIfGenerationMatch(ctx, key, body, "text/plain", 0) if err != nil { if strings.Contains(err.Error(), "precondition failed") || strings.Contains(err.Error(), "exists") { return fmt.Errorf("iac gcs state: Lock %q: resource is already locked", resourceID) @@ -164,9 +170,9 @@ func (s *GCSIaCStateStore) Lock(resourceID string) error { } // Unlock removes the lock object for resourceID. -func (s *GCSIaCStateStore) Unlock(resourceID string) error { +func (s *GCSIaCStateStore) Unlock(ctx context.Context, resourceID string) error { key := s.lockKey(resourceID) - if err := s.client.DeleteObject(context.Background(), key); err != nil { + if err := s.client.DeleteObject(ctx, key); err != nil { if errors.Is(err, ErrGCSNotFound) { return fmt.Errorf("iac gcs state: Unlock %q: not locked", resourceID) } diff --git a/module/iac_state_gcs_test.go b/module/iac_state_gcs_test.go index 5f9e1caa..b5b35504 100644 --- a/module/iac_state_gcs_test.go +++ b/module/iac_state_gcs_test.go @@ -95,7 +95,7 @@ func newTestGCSStore(client module.GCSObjectClient) *module.GCSIaCStateStore { func TestGCSIaCStateStore_GetState_NotFound(t *testing.T) { store := newTestGCSStore(newMockGCSClient()) - st, err := store.GetState("nonexistent") + st, err := store.GetState(context.Background(), "nonexistent") if err != nil { t.Fatalf("unexpected error: %v", err) } @@ -113,11 +113,11 @@ func TestGCSIaCStateStore_SaveAndGetState(t *testing.T) { Provider: "gcp", Status: "active", } - if err := store.SaveState(state); err != nil { + if err := store.SaveState(context.Background(), state); err != nil { t.Fatalf("SaveState: %v", err) } - got, err := store.GetState("gcs-cluster") + got, err := store.GetState(context.Background(), "gcs-cluster") if err != nil { t.Fatalf("GetState: %v", err) } @@ -131,14 +131,14 @@ func TestGCSIaCStateStore_SaveAndGetState(t *testing.T) { func TestGCSIaCStateStore_SaveState_Nil(t *testing.T) { store := newTestGCSStore(newMockGCSClient()) - if err := store.SaveState(nil); err == nil { + if err := store.SaveState(context.Background(), nil); err == nil { t.Fatal("expected error for nil state") } } func TestGCSIaCStateStore_SaveState_EmptyID(t *testing.T) { store := newTestGCSStore(newMockGCSClient()) - if err := store.SaveState(&module.IaCState{}); err == nil { + if err := store.SaveState(context.Background(), &module.IaCState{}); err == nil { t.Fatal("expected error for empty resource_id") } } @@ -151,12 +151,12 @@ func TestGCSIaCStateStore_ListStates(t *testing.T) { {ResourceID: "r2", ResourceType: "db", Provider: "gcp", Status: "active"}, {ResourceID: "r3", ResourceType: "k8s", Provider: "aws", Status: "destroyed"}, } { - if err := store.SaveState(st); err != nil { + if err := store.SaveState(context.Background(), st); err != nil { t.Fatalf("SaveState %q: %v", st.ResourceID, err) } } - all, err := store.ListStates(nil) + all, err := store.ListStates(context.Background(), nil) if err != nil { t.Fatalf("ListStates(nil): %v", err) } @@ -164,7 +164,7 @@ func TestGCSIaCStateStore_ListStates(t *testing.T) { t.Errorf("ListStates = %d, want 3", len(all)) } - filtered, err := store.ListStates(map[string]string{"provider": "gcp"}) + filtered, err := store.ListStates(context.Background(), map[string]string{"provider": "gcp"}) if err != nil { t.Fatalf("ListStates(provider=gcp): %v", err) } @@ -176,13 +176,13 @@ func TestGCSIaCStateStore_ListStates(t *testing.T) { func TestGCSIaCStateStore_DeleteState(t *testing.T) { store := newTestGCSStore(newMockGCSClient()) - if err := store.SaveState(&module.IaCState{ResourceID: "del-me", Status: "active"}); err != nil { + if err := store.SaveState(context.Background(), &module.IaCState{ResourceID: "del-me", Status: "active"}); err != nil { t.Fatalf("SaveState: %v", err) } - if err := store.DeleteState("del-me"); err != nil { + if err := store.DeleteState(context.Background(), "del-me"); err != nil { t.Fatalf("DeleteState: %v", err) } - st, err := store.GetState("del-me") + st, err := store.GetState(context.Background(), "del-me") if err != nil { t.Fatalf("GetState after delete: %v", err) } @@ -193,7 +193,7 @@ func TestGCSIaCStateStore_DeleteState(t *testing.T) { func TestGCSIaCStateStore_DeleteState_NotFound(t *testing.T) { store := newTestGCSStore(newMockGCSClient()) - if err := store.DeleteState("nonexistent"); err == nil { + if err := store.DeleteState(context.Background(), "nonexistent"); err == nil { t.Fatal("expected error deleting nonexistent state") } } @@ -201,25 +201,25 @@ func TestGCSIaCStateStore_DeleteState_NotFound(t *testing.T) { func TestGCSIaCStateStore_LockUnlock(t *testing.T) { store := newTestGCSStore(newMockGCSClient()) - if err := store.Lock("res-1"); err != nil { + if err := store.Lock(context.Background(), "res-1"); err != nil { t.Fatalf("Lock: %v", err) } // Double lock must fail. - if err := store.Lock("res-1"); err == nil { + if err := store.Lock(context.Background(), "res-1"); err == nil { t.Fatal("expected error on double lock") } - if err := store.Unlock("res-1"); err != nil { + if err := store.Unlock(context.Background(), "res-1"); err != nil { t.Fatalf("Unlock: %v", err) } // Re-lock should succeed after unlock. - if err := store.Lock("res-1"); err != nil { + if err := store.Lock(context.Background(), "res-1"); err != nil { t.Fatalf("Lock after unlock: %v", err) } } func TestGCSIaCStateStore_Unlock_NotLocked(t *testing.T) { store := newTestGCSStore(newMockGCSClient()) - if err := store.Unlock("not-locked"); err == nil { + if err := store.Unlock(context.Background(), "not-locked"); err == nil { t.Fatal("expected error unlocking non-locked resource") } } @@ -233,10 +233,10 @@ func TestGCSIaCStateStore_JSONRoundTrip(t *testing.T) { Status: "active", Outputs: map[string]any{"endpoint": "https://gcs.example.com"}, } - if err := store.SaveState(state); err != nil { + if err := store.SaveState(context.Background(), state); err != nil { t.Fatalf("SaveState: %v", err) } - got, err := store.GetState("rt-gcs") + got, err := store.GetState(context.Background(), "rt-gcs") if err != nil { t.Fatalf("GetState: %v", err) } diff --git a/module/iac_state_grpc_client.go b/module/iac_state_grpc_client.go index 999ac6b7..e7c145d9 100644 --- a/module/iac_state_grpc_client.go +++ b/module/iac_state_grpc_client.go @@ -98,11 +98,6 @@ func jsonBytesToMap(b []byte) (map[string]any, error) { // ───────────────────────────────────────────────────────────────────────────── // grpcIaCStateStore adapts a pb.IaCStateBackendClient to module.IaCStateStore. -// -// All six methods call the backend with context.Background(): the -// module.IaCStateStore interface has no ctx parameter, so there is no caller -// context to plumb today. Threading a real context through IaCStateStore is a -// known follow-up, out of scope for this extraction. type grpcIaCStateStore struct { client pb.IaCStateBackendClient } @@ -114,8 +109,8 @@ func newGRPCIaCStateStore(c pb.IaCStateBackendClient) *grpcIaCStateStore { // GetState retrieves a state record by resource ID. Returns nil, nil when the // backend reports the record does not exist. -func (s *grpcIaCStateStore) GetState(resourceID string) (*IaCState, error) { - resp, err := s.client.GetState(context.Background(), &pb.GetStateRequest{ResourceId: resourceID}) +func (s *grpcIaCStateStore) GetState(ctx context.Context, resourceID string) (*IaCState, error) { + resp, err := s.client.GetState(ctx, &pb.GetStateRequest{ResourceId: resourceID}) if err != nil { return nil, err } @@ -126,18 +121,18 @@ func (s *grpcIaCStateStore) GetState(resourceID string) (*IaCState, error) { } // SaveState inserts or replaces a state record. -func (s *grpcIaCStateStore) SaveState(state *IaCState) error { +func (s *grpcIaCStateStore) SaveState(ctx context.Context, state *IaCState) error { pbState, err := iacStateToProto(state) if err != nil { return err } - _, err = s.client.SaveState(context.Background(), &pb.SaveStateRequest{State: pbState}) + _, err = s.client.SaveState(ctx, &pb.SaveStateRequest{State: pbState}) return err } // ListStates returns all state records matching the provided key=value filter. -func (s *grpcIaCStateStore) ListStates(filter map[string]string) ([]*IaCState, error) { - resp, err := s.client.ListStates(context.Background(), &pb.ListStatesRequest{Filter: filter}) +func (s *grpcIaCStateStore) ListStates(ctx context.Context, filter map[string]string) ([]*IaCState, error) { + resp, err := s.client.ListStates(ctx, &pb.ListStatesRequest{Filter: filter}) if err != nil { return nil, err } @@ -153,20 +148,20 @@ func (s *grpcIaCStateStore) ListStates(filter map[string]string) ([]*IaCState, e } // DeleteState removes a state record by resource ID. -func (s *grpcIaCStateStore) DeleteState(resourceID string) error { - _, err := s.client.DeleteState(context.Background(), &pb.DeleteStateRequest{ResourceId: resourceID}) +func (s *grpcIaCStateStore) DeleteState(ctx context.Context, resourceID string) error { + _, err := s.client.DeleteState(ctx, &pb.DeleteStateRequest{ResourceId: resourceID}) return err } // Lock acquires an exclusive lock for the given resource ID. -func (s *grpcIaCStateStore) Lock(resourceID string) error { - _, err := s.client.Lock(context.Background(), &pb.LockRequest{ResourceId: resourceID}) +func (s *grpcIaCStateStore) Lock(ctx context.Context, resourceID string) error { + _, err := s.client.Lock(ctx, &pb.LockRequest{ResourceId: resourceID}) return err } // Unlock releases the lock for the given resource ID. -func (s *grpcIaCStateStore) Unlock(resourceID string) error { - _, err := s.client.Unlock(context.Background(), &pb.UnlockRequest{ResourceId: resourceID}) +func (s *grpcIaCStateStore) Unlock(ctx context.Context, resourceID string) error { + _, err := s.client.Unlock(ctx, &pb.UnlockRequest{ResourceId: resourceID}) return err } @@ -184,8 +179,8 @@ type iacStateBackendServer struct { // GetState delegates to the backing store, mapping a not-found (nil) result to // GetStateResponse{Exists: false}. -func (s *iacStateBackendServer) GetState(_ context.Context, r *pb.GetStateRequest) (*pb.GetStateResponse, error) { - st, err := s.store.GetState(r.ResourceId) +func (s *iacStateBackendServer) GetState(ctx context.Context, r *pb.GetStateRequest) (*pb.GetStateResponse, error) { + st, err := s.store.GetState(ctx, r.ResourceId) if err != nil { return nil, err } @@ -200,20 +195,20 @@ func (s *iacStateBackendServer) GetState(_ context.Context, r *pb.GetStateReques } // SaveState delegates a full-state replace to the backing store. -func (s *iacStateBackendServer) SaveState(_ context.Context, r *pb.SaveStateRequest) (*pb.SaveStateResponse, error) { +func (s *iacStateBackendServer) SaveState(ctx context.Context, r *pb.SaveStateRequest) (*pb.SaveStateResponse, error) { st, err := iacStateFromProto(r.State) if err != nil { return nil, err } - if err := s.store.SaveState(st); err != nil { + if err := s.store.SaveState(ctx, st); err != nil { return nil, err } return &pb.SaveStateResponse{}, nil } // ListStates delegates a filtered listing to the backing store. -func (s *iacStateBackendServer) ListStates(_ context.Context, r *pb.ListStatesRequest) (*pb.ListStatesResponse, error) { - states, err := s.store.ListStates(r.Filter) +func (s *iacStateBackendServer) ListStates(ctx context.Context, r *pb.ListStatesRequest) (*pb.ListStatesResponse, error) { + states, err := s.store.ListStates(ctx, r.Filter) if err != nil { return nil, err } @@ -229,24 +224,24 @@ func (s *iacStateBackendServer) ListStates(_ context.Context, r *pb.ListStatesRe } // DeleteState delegates a delete-by-ID to the backing store. -func (s *iacStateBackendServer) DeleteState(_ context.Context, r *pb.DeleteStateRequest) (*pb.DeleteStateResponse, error) { - if err := s.store.DeleteState(r.ResourceId); err != nil { +func (s *iacStateBackendServer) DeleteState(ctx context.Context, r *pb.DeleteStateRequest) (*pb.DeleteStateResponse, error) { + if err := s.store.DeleteState(ctx, r.ResourceId); err != nil { return nil, err } return &pb.DeleteStateResponse{}, nil } // Lock delegates lock acquisition to the backing store. -func (s *iacStateBackendServer) Lock(_ context.Context, r *pb.LockRequest) (*pb.LockResponse, error) { - if err := s.store.Lock(r.ResourceId); err != nil { +func (s *iacStateBackendServer) Lock(ctx context.Context, r *pb.LockRequest) (*pb.LockResponse, error) { + if err := s.store.Lock(ctx, r.ResourceId); err != nil { return nil, err } return &pb.LockResponse{}, nil } // Unlock delegates lock release to the backing store. -func (s *iacStateBackendServer) Unlock(_ context.Context, r *pb.UnlockRequest) (*pb.UnlockResponse, error) { - if err := s.store.Unlock(r.ResourceId); err != nil { +func (s *iacStateBackendServer) Unlock(ctx context.Context, r *pb.UnlockRequest) (*pb.UnlockResponse, error) { + if err := s.store.Unlock(ctx, r.ResourceId); err != nil { return nil, err } return &pb.UnlockResponse{}, nil diff --git a/module/iac_state_grpc_client_test.go b/module/iac_state_grpc_client_test.go index 93af5230..ab4a2675 100644 --- a/module/iac_state_grpc_client_test.go +++ b/module/iac_state_grpc_client_test.go @@ -28,27 +28,28 @@ func TestGRPCIaCStateStoreRoundTrip(t *testing.T) { defer conn.Close() var store IaCStateStore = newGRPCIaCStateStore(pb.NewIaCStateBackendClient(conn)) + ctx := context.Background() want := &IaCState{ResourceID: "r1", ResourceType: "kubernetes", Provider: "azure", Status: "active", Outputs: map[string]any{"endpoint": "https://x"}, Config: map[string]any{"size": "L"}} - if err := store.SaveState(want); err != nil { + if err := store.SaveState(ctx, want); err != nil { t.Fatalf("SaveState: %v", err) } - got, err := store.GetState("r1") + got, err := store.GetState(ctx, "r1") if err != nil || got == nil { t.Fatalf("GetState: %v (got=%v)", err, got) } if got.ResourceID != "r1" || got.Status != "active" || got.Outputs["endpoint"] != "https://x" { t.Fatalf("round-trip mismatch: %+v", got) } - if err := store.Lock("r1"); err != nil { + if err := store.Lock(ctx, "r1"); err != nil { t.Fatalf("Lock: %v", err) } - missing, err := store.GetState("nope") + missing, err := store.GetState(ctx, "nope") if err != nil || missing != nil { t.Fatalf("GetState(missing) should be nil,nil — got %v,%v", missing, err) } - if err := store.Unlock("r1"); err != nil { + if err := store.Unlock(ctx, "r1"); err != nil { t.Fatalf("Unlock: %v", err) } } diff --git a/module/iac_state_memory.go b/module/iac_state_memory.go index 9f659643..e90f2384 100644 --- a/module/iac_state_memory.go +++ b/module/iac_state_memory.go @@ -1,6 +1,7 @@ package module import ( + "context" "fmt" "sync" ) @@ -22,7 +23,7 @@ func NewMemoryIaCStateStore() *MemoryIaCStateStore { } // GetState retrieves a state record by resource ID. Returns nil, nil when not found. -func (s *MemoryIaCStateStore) GetState(resourceID string) (*IaCState, error) { +func (s *MemoryIaCStateStore) GetState(ctx context.Context, resourceID string) (*IaCState, error) { s.mu.RLock() defer s.mu.RUnlock() st, ok := s.states[resourceID] @@ -35,7 +36,7 @@ func (s *MemoryIaCStateStore) GetState(resourceID string) (*IaCState, error) { } // SaveState inserts or replaces a state record. -func (s *MemoryIaCStateStore) SaveState(state *IaCState) error { +func (s *MemoryIaCStateStore) SaveState(ctx context.Context, state *IaCState) error { if state == nil { return fmt.Errorf("iac state store: SaveState: state must not be nil") } @@ -51,7 +52,7 @@ func (s *MemoryIaCStateStore) SaveState(state *IaCState) error { // ListStates returns all state records matching the provided key=value filter. // Supported filter keys: "resource_type", "provider", "status". -func (s *MemoryIaCStateStore) ListStates(filter map[string]string) ([]*IaCState, error) { +func (s *MemoryIaCStateStore) ListStates(ctx context.Context, filter map[string]string) ([]*IaCState, error) { s.mu.RLock() defer s.mu.RUnlock() var results []*IaCState @@ -65,7 +66,7 @@ func (s *MemoryIaCStateStore) ListStates(filter map[string]string) ([]*IaCState, } // DeleteState removes a state record by resource ID. -func (s *MemoryIaCStateStore) DeleteState(resourceID string) error { +func (s *MemoryIaCStateStore) DeleteState(ctx context.Context, resourceID string) error { s.mu.Lock() defer s.mu.Unlock() if _, ok := s.states[resourceID]; !ok { @@ -76,7 +77,7 @@ func (s *MemoryIaCStateStore) DeleteState(resourceID string) error { } // Lock acquires an exclusive advisory lock for the given resource ID. -func (s *MemoryIaCStateStore) Lock(resourceID string) error { +func (s *MemoryIaCStateStore) Lock(ctx context.Context, resourceID string) error { s.mu.Lock() defer s.mu.Unlock() if s.locks[resourceID] { @@ -87,7 +88,7 @@ func (s *MemoryIaCStateStore) Lock(resourceID string) error { } // Unlock releases the advisory lock for the given resource ID. -func (s *MemoryIaCStateStore) Unlock(resourceID string) error { +func (s *MemoryIaCStateStore) Unlock(ctx context.Context, resourceID string) error { s.mu.Lock() defer s.mu.Unlock() if !s.locks[resourceID] { diff --git a/module/iac_state_postgres.go b/module/iac_state_postgres.go index a4c994bb..ca66c40f 100644 --- a/module/iac_state_postgres.go +++ b/module/iac_state_postgres.go @@ -61,8 +61,8 @@ func NewPostgresIaCStateStoreWithConn(conn PostgresConn) *PostgresIaCStateStore } // GetState retrieves a state record by resource ID. Returns nil, nil when not found. -func (s *PostgresIaCStateStore) GetState(resourceID string) (*IaCState, error) { - st, err := s.conn.GetState(context.Background(), resourceID) +func (s *PostgresIaCStateStore) GetState(ctx context.Context, resourceID string) (*IaCState, error) { + st, err := s.conn.GetState(ctx, resourceID) if err != nil { return nil, fmt.Errorf("iac postgres state: GetState %q: %w", resourceID, err) } @@ -70,22 +70,22 @@ func (s *PostgresIaCStateStore) GetState(resourceID string) (*IaCState, error) { } // SaveState inserts or replaces a state record. -func (s *PostgresIaCStateStore) SaveState(state *IaCState) error { +func (s *PostgresIaCStateStore) SaveState(ctx context.Context, state *IaCState) error { if state == nil { return fmt.Errorf("iac postgres state: SaveState: state must not be nil") } if state.ResourceID == "" { return fmt.Errorf("iac postgres state: SaveState: resource_id must not be empty") } - if err := s.conn.UpsertState(context.Background(), state); err != nil { + if err := s.conn.UpsertState(ctx, state); err != nil { return fmt.Errorf("iac postgres state: SaveState %q: %w", state.ResourceID, err) } return nil } // ListStates returns all state records matching the provided key=value filter. -func (s *PostgresIaCStateStore) ListStates(filter map[string]string) ([]*IaCState, error) { - rows, err := s.conn.ListRows(context.Background()) +func (s *PostgresIaCStateStore) ListStates(ctx context.Context, filter map[string]string) ([]*IaCState, error) { + rows, err := s.conn.ListRows(ctx) if err != nil { return nil, fmt.Errorf("iac postgres state: ListStates: %w", err) } @@ -99,8 +99,8 @@ func (s *PostgresIaCStateStore) ListStates(filter map[string]string) ([]*IaCStat } // DeleteState removes a state record by resource ID. -func (s *PostgresIaCStateStore) DeleteState(resourceID string) error { - deleted, err := s.conn.DeleteRow(context.Background(), resourceID) +func (s *PostgresIaCStateStore) DeleteState(ctx context.Context, resourceID string) error { + deleted, err := s.conn.DeleteRow(ctx, resourceID) if err != nil { return fmt.Errorf("iac postgres state: DeleteState %q: %w", resourceID, err) } @@ -111,7 +111,7 @@ func (s *PostgresIaCStateStore) DeleteState(resourceID string) error { } // Lock acquires a PostgreSQL advisory lock for the resource. -func (s *PostgresIaCStateStore) Lock(resourceID string) error { +func (s *PostgresIaCStateStore) Lock(ctx context.Context, resourceID string) error { s.mu.Lock() defer s.mu.Unlock() @@ -119,7 +119,7 @@ func (s *PostgresIaCStateStore) Lock(resourceID string) error { return fmt.Errorf("iac postgres state: Lock %q: already locked", resourceID) } key := advisoryKey(resourceID) - if err := s.conn.AcquireAdvisoryLock(context.Background(), key); err != nil { + if err := s.conn.AcquireAdvisoryLock(ctx, key); err != nil { return fmt.Errorf("iac postgres state: Lock %q: %w", resourceID, err) } s.held[resourceID] = key @@ -127,7 +127,7 @@ func (s *PostgresIaCStateStore) Lock(resourceID string) error { } // Unlock releases the PostgreSQL advisory lock for the resource. -func (s *PostgresIaCStateStore) Unlock(resourceID string) error { +func (s *PostgresIaCStateStore) Unlock(ctx context.Context, resourceID string) error { s.mu.Lock() defer s.mu.Unlock() @@ -135,7 +135,7 @@ func (s *PostgresIaCStateStore) Unlock(resourceID string) error { if !held { return fmt.Errorf("iac postgres state: Unlock %q: not locked", resourceID) } - if _, err := s.conn.ReleaseAdvisoryLock(context.Background(), key); err != nil { + if _, err := s.conn.ReleaseAdvisoryLock(ctx, key); err != nil { return fmt.Errorf("iac postgres state: Unlock %q: %w", resourceID, err) } delete(s.held, resourceID) diff --git a/module/iac_state_postgres_test.go b/module/iac_state_postgres_test.go index 7694ffdb..6e72bd54 100644 --- a/module/iac_state_postgres_test.go +++ b/module/iac_state_postgres_test.go @@ -168,7 +168,7 @@ func newTestPostgresStore(conn module.PostgresConn) *module.PostgresIaCStateStor func TestPostgresIaCStateStore_GetState_NotFound(t *testing.T) { store := newTestPostgresStore(newMockPGConn()) - st, err := store.GetState("nonexistent") + st, err := store.GetState(context.Background(), "nonexistent") if err != nil { t.Fatalf("unexpected error: %v", err) } @@ -192,11 +192,11 @@ func TestPostgresIaCStateStore_SaveAndGetState(t *testing.T) { Outputs: map[string]any{"endpoint": "https://k8s.example.com"}, Dependencies: []string{"network"}, } - if err := store.SaveState(state); err != nil { + if err := store.SaveState(context.Background(), state); err != nil { t.Fatalf("SaveState: %v", err) } - got, err := store.GetState("pg-cluster") + got, err := store.GetState(context.Background(), "pg-cluster") if err != nil { t.Fatalf("GetState: %v", err) } @@ -225,14 +225,14 @@ func TestPostgresIaCStateStore_SaveAndGetState(t *testing.T) { func TestPostgresIaCStateStore_SaveState_Nil(t *testing.T) { store := newTestPostgresStore(newMockPGConn()) - if err := store.SaveState(nil); err == nil { + if err := store.SaveState(context.Background(), nil); err == nil { t.Fatal("expected error for nil state") } } func TestPostgresIaCStateStore_SaveState_EmptyID(t *testing.T) { store := newTestPostgresStore(newMockPGConn()) - if err := store.SaveState(&module.IaCState{}); err == nil { + if err := store.SaveState(context.Background(), &module.IaCState{}); err == nil { t.Fatal("expected error for empty resource_id") } } @@ -247,12 +247,12 @@ func TestPostgresIaCStateStore_ListStates(t *testing.T) { {ResourceID: "r3", ResourceType: "k8s", Provider: "aws", ProviderRef: "aws-west", Status: "destroyed"}, } for _, st := range states { - if err := store.SaveState(st); err != nil { + if err := store.SaveState(context.Background(), st); err != nil { t.Fatalf("SaveState %q: %v", st.ResourceID, err) } } - all, err := store.ListStates(nil) + all, err := store.ListStates(context.Background(), nil) if err != nil { t.Fatalf("ListStates(nil): %v", err) } @@ -260,7 +260,7 @@ func TestPostgresIaCStateStore_ListStates(t *testing.T) { t.Errorf("ListStates = %d, want 3", len(all)) } - filtered, err := store.ListStates(map[string]string{"provider": "aws"}) + filtered, err := store.ListStates(context.Background(), map[string]string{"provider": "aws"}) if err != nil { t.Fatalf("ListStates(provider=aws): %v", err) } @@ -331,13 +331,13 @@ func TestScanIaCStateRows_ReturnsOutputsDecodeError(t *testing.T) { func TestPostgresIaCStateStore_DeleteState(t *testing.T) { store := newTestPostgresStore(newMockPGConn()) - if err := store.SaveState(&module.IaCState{ResourceID: "del-me", Status: "active"}); err != nil { + if err := store.SaveState(context.Background(), &module.IaCState{ResourceID: "del-me", Status: "active"}); err != nil { t.Fatalf("SaveState: %v", err) } - if err := store.DeleteState("del-me"); err != nil { + if err := store.DeleteState(context.Background(), "del-me"); err != nil { t.Fatalf("DeleteState: %v", err) } - st, err := store.GetState("del-me") + st, err := store.GetState(context.Background(), "del-me") if err != nil { t.Fatalf("GetState after delete: %v", err) } @@ -348,7 +348,7 @@ func TestPostgresIaCStateStore_DeleteState(t *testing.T) { func TestPostgresIaCStateStore_DeleteState_NotFound(t *testing.T) { store := newTestPostgresStore(newMockPGConn()) - if err := store.DeleteState("nonexistent"); err == nil { + if err := store.DeleteState(context.Background(), "nonexistent"); err == nil { t.Fatal("expected error deleting nonexistent state") } } @@ -356,23 +356,23 @@ func TestPostgresIaCStateStore_DeleteState_NotFound(t *testing.T) { func TestPostgresIaCStateStore_LockUnlock(t *testing.T) { store := newTestPostgresStore(newMockPGConn()) - if err := store.Lock("res-1"); err != nil { + if err := store.Lock(context.Background(), "res-1"); err != nil { t.Fatalf("Lock: %v", err) } - if err := store.Lock("res-1"); err == nil { + if err := store.Lock(context.Background(), "res-1"); err == nil { t.Fatal("expected error on double lock") } - if err := store.Unlock("res-1"); err != nil { + if err := store.Unlock(context.Background(), "res-1"); err != nil { t.Fatalf("Unlock: %v", err) } - if err := store.Lock("res-1"); err != nil { + if err := store.Lock(context.Background(), "res-1"); err != nil { t.Fatalf("Lock after unlock: %v", err) } } func TestPostgresIaCStateStore_Unlock_NotLocked(t *testing.T) { store := newTestPostgresStore(newMockPGConn()) - if err := store.Unlock("not-locked"); err == nil { + if err := store.Unlock(context.Background(), "not-locked"); err == nil { t.Fatal("expected error unlocking non-locked resource") } } diff --git a/module/iac_state_spaces.go b/module/iac_state_spaces.go index 51c08145..365d9e1a 100644 --- a/module/iac_state_spaces.go +++ b/module/iac_state_spaces.go @@ -118,9 +118,9 @@ func (s *SpacesIaCStateStore) lockKey(resourceID string) string { } // GetState retrieves a state record by resource ID. Returns nil, nil when not found. -func (s *SpacesIaCStateStore) GetState(resourceID string) (*IaCState, error) { +func (s *SpacesIaCStateStore) GetState(ctx context.Context, resourceID string) (*IaCState, error) { key := s.stateKey(resourceID) - out, err := s.client.GetObject(context.Background(), &s3.GetObjectInput{ + out, err := s.client.GetObject(ctx, &s3.GetObjectInput{ Bucket: &s.bucket, Key: &key, }) @@ -145,7 +145,7 @@ func (s *SpacesIaCStateStore) GetState(resourceID string) (*IaCState, error) { } // SaveState writes the state record as a JSON object to Spaces. -func (s *SpacesIaCStateStore) SaveState(state *IaCState) error { +func (s *SpacesIaCStateStore) SaveState(ctx context.Context, state *IaCState) error { if state == nil { return fmt.Errorf("iac spaces state: SaveState: state must not be nil") } @@ -160,7 +160,7 @@ func (s *SpacesIaCStateStore) SaveState(state *IaCState) error { key := s.stateKey(state.ResourceID) contentType := "application/json" - _, err = s.client.PutObject(context.Background(), &s3.PutObjectInput{ + _, err = s.client.PutObject(ctx, &s3.PutObjectInput{ Bucket: &s.bucket, Key: &key, Body: bytes.NewReader(data), @@ -174,12 +174,12 @@ func (s *SpacesIaCStateStore) SaveState(state *IaCState) error { // ListStates lists all state objects under the prefix and returns those matching filter. // Supported filter keys: "resource_type", "provider", "status". -func (s *SpacesIaCStateStore) ListStates(filter map[string]string) ([]*IaCState, error) { +func (s *SpacesIaCStateStore) ListStates(ctx context.Context, filter map[string]string) ([]*IaCState, error) { var results []*IaCState var continuationToken *string for { - out, err := s.client.ListObjectsV2(context.Background(), &s3.ListObjectsV2Input{ + out, err := s.client.ListObjectsV2(ctx, &s3.ListObjectsV2Input{ Bucket: &s.bucket, Prefix: &s.prefix, ContinuationToken: continuationToken, @@ -195,7 +195,7 @@ func (s *SpacesIaCStateStore) ListStates(filter map[string]string) ([]*IaCState, continue } - getOut, err := s.client.GetObject(context.Background(), &s3.GetObjectInput{ + getOut, err := s.client.GetObject(ctx, &s3.GetObjectInput{ Bucket: &s.bucket, Key: obj.Key, }) @@ -227,10 +227,10 @@ func (s *SpacesIaCStateStore) ListStates(filter map[string]string) ([]*IaCState, } // DeleteState removes the state object for resourceID. -func (s *SpacesIaCStateStore) DeleteState(resourceID string) error { +func (s *SpacesIaCStateStore) DeleteState(ctx context.Context, resourceID string) error { // Verify existence first to return a meaningful error. key := s.stateKey(resourceID) - _, err := s.client.HeadObject(context.Background(), &s3.HeadObjectInput{ + _, err := s.client.HeadObject(ctx, &s3.HeadObjectInput{ Bucket: &s.bucket, Key: &key, }) @@ -241,7 +241,7 @@ func (s *SpacesIaCStateStore) DeleteState(resourceID string) error { return fmt.Errorf("iac spaces state: DeleteState %q: head: %w", resourceID, err) } - _, err = s.client.DeleteObject(context.Background(), &s3.DeleteObjectInput{ + _, err = s.client.DeleteObject(ctx, &s3.DeleteObjectInput{ Bucket: &s.bucket, Key: &key, }) @@ -253,7 +253,7 @@ func (s *SpacesIaCStateStore) DeleteState(resourceID string) error { // Lock creates a lock object for resourceID using S3 conditional writes (If-None-Match: *) // for atomic, race-free lock acquisition. Fails if the lock already exists. -func (s *SpacesIaCStateStore) Lock(resourceID string) error { +func (s *SpacesIaCStateStore) Lock(ctx context.Context, resourceID string) error { s.mu.Lock() defer s.mu.Unlock() @@ -261,7 +261,7 @@ func (s *SpacesIaCStateStore) Lock(resourceID string) error { body := []byte(time.Now().UTC().Format(time.RFC3339)) ifNoneMatch := "*" - _, err := s.client.PutObject(context.Background(), &s3.PutObjectInput{ + _, err := s.client.PutObject(ctx, &s3.PutObjectInput{ Bucket: &s.bucket, Key: &key, Body: bytes.NewReader(body), @@ -278,14 +278,14 @@ func (s *SpacesIaCStateStore) Lock(resourceID string) error { } // Unlock removes the lock object for resourceID. -func (s *SpacesIaCStateStore) Unlock(resourceID string) error { +func (s *SpacesIaCStateStore) Unlock(ctx context.Context, resourceID string) error { s.mu.Lock() defer s.mu.Unlock() key := s.lockKey(resourceID) // Verify lock exists. - _, err := s.client.HeadObject(context.Background(), &s3.HeadObjectInput{ + _, err := s.client.HeadObject(ctx, &s3.HeadObjectInput{ Bucket: &s.bucket, Key: &key, }) @@ -296,7 +296,7 @@ func (s *SpacesIaCStateStore) Unlock(resourceID string) error { return fmt.Errorf("iac spaces state: Unlock %q: head: %w", resourceID, err) } - _, err = s.client.DeleteObject(context.Background(), &s3.DeleteObjectInput{ + _, err = s.client.DeleteObject(ctx, &s3.DeleteObjectInput{ Bucket: &s.bucket, Key: &key, }) diff --git a/module/iac_state_spaces_test.go b/module/iac_state_spaces_test.go index 1127caa6..2de2fd1d 100644 --- a/module/iac_state_spaces_test.go +++ b/module/iac_state_spaces_test.go @@ -98,7 +98,7 @@ func newTestSpacesStore(client *mockS3Client) *module.SpacesIaCStateStore { func TestSpacesIaCStateStore_GetState_NotFound(t *testing.T) { store := newTestSpacesStore(newMockS3Client()) - st, err := store.GetState("nonexistent") + st, err := store.GetState(context.Background(), "nonexistent") if err != nil { t.Fatalf("unexpected error: %v", err) } @@ -122,11 +122,11 @@ func TestSpacesIaCStateStore_SaveAndGetState(t *testing.T) { UpdatedAt: "2026-03-09T00:00:00Z", } - if err := store.SaveState(state); err != nil { + if err := store.SaveState(context.Background(), state); err != nil { t.Fatalf("SaveState: %v", err) } - got, err := store.GetState("cluster-1") + got, err := store.GetState(context.Background(), "cluster-1") if err != nil { t.Fatalf("GetState: %v", err) } @@ -150,7 +150,7 @@ func TestSpacesIaCStateStore_SaveAndGetState(t *testing.T) { func TestSpacesIaCStateStore_SaveState_Nil(t *testing.T) { store := newTestSpacesStore(newMockS3Client()) - err := store.SaveState(nil) + err := store.SaveState(context.Background(), nil) if err == nil { t.Fatal("expected error for nil state") } @@ -159,7 +159,7 @@ func TestSpacesIaCStateStore_SaveState_Nil(t *testing.T) { func TestSpacesIaCStateStore_SaveState_EmptyID(t *testing.T) { store := newTestSpacesStore(newMockS3Client()) - err := store.SaveState(&module.IaCState{}) + err := store.SaveState(context.Background(), &module.IaCState{}) if err == nil { t.Fatal("expected error for empty resource_id") } @@ -175,13 +175,13 @@ func TestSpacesIaCStateStore_ListStates(t *testing.T) { {ResourceID: "r3", ResourceType: "kubernetes", Provider: "aws", Status: "destroyed"}, } for _, st := range states { - if err := store.SaveState(st); err != nil { + if err := store.SaveState(context.Background(), st); err != nil { t.Fatalf("SaveState %q: %v", st.ResourceID, err) } } // No filter — returns all. - all, err := store.ListStates(nil) + all, err := store.ListStates(context.Background(), nil) if err != nil { t.Fatalf("ListStates(nil): %v", err) } @@ -190,7 +190,7 @@ func TestSpacesIaCStateStore_ListStates(t *testing.T) { } // Filter by provider. - filtered, err := store.ListStates(map[string]string{"provider": "aws"}) + filtered, err := store.ListStates(context.Background(), map[string]string{"provider": "aws"}) if err != nil { t.Fatalf("ListStates(provider=aws): %v", err) } @@ -199,7 +199,7 @@ func TestSpacesIaCStateStore_ListStates(t *testing.T) { } // Filter by status. - active, err := store.ListStates(map[string]string{"status": "active"}) + active, err := store.ListStates(context.Background(), map[string]string{"status": "active"}) if err != nil { t.Fatalf("ListStates(status=active): %v", err) } @@ -213,14 +213,14 @@ func TestSpacesIaCStateStore_ListStates_SkipsLockFiles(t *testing.T) { store := newTestSpacesStore(client) // Save a state and lock it — lock file should be skipped in list. - if err := store.SaveState(&module.IaCState{ResourceID: "r1", Status: "active"}); err != nil { + if err := store.SaveState(context.Background(), &module.IaCState{ResourceID: "r1", Status: "active"}); err != nil { t.Fatalf("SaveState: %v", err) } - if err := store.Lock("r1"); err != nil { + if err := store.Lock(context.Background(), "r1"); err != nil { t.Fatalf("Lock: %v", err) } - results, err := store.ListStates(nil) + results, err := store.ListStates(context.Background(), nil) if err != nil { t.Fatalf("ListStates: %v", err) } @@ -232,16 +232,16 @@ func TestSpacesIaCStateStore_ListStates_SkipsLockFiles(t *testing.T) { func TestSpacesIaCStateStore_DeleteState(t *testing.T) { store := newTestSpacesStore(newMockS3Client()) - if err := store.SaveState(&module.IaCState{ResourceID: "del-me", Status: "active"}); err != nil { + if err := store.SaveState(context.Background(), &module.IaCState{ResourceID: "del-me", Status: "active"}); err != nil { t.Fatalf("SaveState: %v", err) } - if err := store.DeleteState("del-me"); err != nil { + if err := store.DeleteState(context.Background(), "del-me"); err != nil { t.Fatalf("DeleteState: %v", err) } // Should be gone. - st, err := store.GetState("del-me") + st, err := store.GetState(context.Background(), "del-me") if err != nil { t.Fatalf("GetState after delete: %v", err) } @@ -253,7 +253,7 @@ func TestSpacesIaCStateStore_DeleteState(t *testing.T) { func TestSpacesIaCStateStore_DeleteState_NotFound(t *testing.T) { store := newTestSpacesStore(newMockS3Client()) - err := store.DeleteState("nonexistent") + err := store.DeleteState(context.Background(), "nonexistent") if err == nil { t.Fatal("expected error deleting nonexistent state") } @@ -266,22 +266,22 @@ func TestSpacesIaCStateStore_LockUnlock(t *testing.T) { store := newTestSpacesStore(newMockS3Client()) // Lock should succeed. - if err := store.Lock("res-1"); err != nil { + if err := store.Lock(context.Background(), "res-1"); err != nil { t.Fatalf("Lock: %v", err) } // Double-lock should fail. - if err := store.Lock("res-1"); err == nil { + if err := store.Lock(context.Background(), "res-1"); err == nil { t.Fatal("expected error on double lock") } // Unlock should succeed. - if err := store.Unlock("res-1"); err != nil { + if err := store.Unlock(context.Background(), "res-1"); err != nil { t.Fatalf("Unlock: %v", err) } // Re-lock after unlock should succeed. - if err := store.Lock("res-1"); err != nil { + if err := store.Lock(context.Background(), "res-1"); err != nil { t.Fatalf("Lock after unlock: %v", err) } } @@ -289,7 +289,7 @@ func TestSpacesIaCStateStore_LockUnlock(t *testing.T) { func TestSpacesIaCStateStore_Unlock_NotLocked(t *testing.T) { store := newTestSpacesStore(newMockS3Client()) - err := store.Unlock("not-locked") + err := store.Unlock(context.Background(), "not-locked") if err == nil { t.Fatal("expected error unlocking a resource that is not locked") } @@ -302,16 +302,16 @@ func TestSpacesIaCStateStore_SaveState_Overwrite(t *testing.T) { store := newTestSpacesStore(newMockS3Client()) original := &module.IaCState{ResourceID: "r1", Status: "planned"} - if err := store.SaveState(original); err != nil { + if err := store.SaveState(context.Background(), original); err != nil { t.Fatalf("SaveState (original): %v", err) } updated := &module.IaCState{ResourceID: "r1", Status: "active"} - if err := store.SaveState(updated); err != nil { + if err := store.SaveState(context.Background(), updated); err != nil { t.Fatalf("SaveState (updated): %v", err) } - got, err := store.GetState("r1") + got, err := store.GetState(context.Background(), "r1") if err != nil { t.Fatalf("GetState: %v", err) } @@ -325,7 +325,7 @@ func TestSpacesIaCStateStore_SanitizesResourceID(t *testing.T) { store := newTestSpacesStore(client) state := &module.IaCState{ResourceID: "ns/cluster\\1", Status: "active"} - if err := store.SaveState(state); err != nil { + if err := store.SaveState(context.Background(), state); err != nil { t.Fatalf("SaveState: %v", err) } @@ -338,7 +338,7 @@ func TestSpacesIaCStateStore_SanitizesResourceID(t *testing.T) { } // Retrieve by original ID. - got, err := store.GetState("ns/cluster\\1") + got, err := store.GetState(context.Background(), "ns/cluster\\1") if err != nil { t.Fatalf("GetState: %v", err) } @@ -357,7 +357,7 @@ func TestSpacesIaCStateStore_GetState_BadJSON(t *testing.T) { client.objects["iac-state/bad.json"] = []byte("{invalid json") client.mu.Unlock() - _, err := store.GetState("bad") + _, err := store.GetState(context.Background(), "bad") if err == nil { t.Fatal("expected unmarshal error for bad JSON") } @@ -384,11 +384,11 @@ func TestSpacesIaCStateStore_JSONRoundTrip(t *testing.T) { Error: "timeout waiting for stabilization", } - if err := store.SaveState(state); err != nil { + if err := store.SaveState(context.Background(), state); err != nil { t.Fatalf("SaveState: %v", err) } - got, err := store.GetState("rt-1") + got, err := store.GetState(context.Background(), "rt-1") if err != nil { t.Fatalf("GetState: %v", err) } @@ -424,37 +424,37 @@ func TestSpacesIaCStateStore_ErrorPropagation(t *testing.T) { store := module.NewSpacesIaCStateStoreWithClient(&errS3Client{}, "test-bucket", "iac-state/") // GetState error. - _, err := store.GetState("x") + _, err := store.GetState(context.Background(), "x") if err == nil || !strings.Contains(err.Error(), "simulated") { t.Errorf("GetState error = %v, want simulated error", err) } // SaveState error. - err = store.SaveState(&module.IaCState{ResourceID: "x"}) + err = store.SaveState(context.Background(), &module.IaCState{ResourceID: "x"}) if err == nil || !strings.Contains(err.Error(), "simulated") { t.Errorf("SaveState error = %v, want simulated error", err) } // ListStates error. - _, err = store.ListStates(nil) + _, err = store.ListStates(context.Background(), nil) if err == nil || !strings.Contains(err.Error(), "simulated") { t.Errorf("ListStates error = %v, want simulated error", err) } // DeleteState error (HeadObject fails). - err = store.DeleteState("x") + err = store.DeleteState(context.Background(), "x") if err == nil || !strings.Contains(err.Error(), "simulated") { t.Errorf("DeleteState error = %v, want simulated error", err) } // Lock error (HeadObject fails with non-NotFound). - err = store.Lock("x") + err = store.Lock(context.Background(), "x") if err == nil || !strings.Contains(err.Error(), "simulated") { t.Errorf("Lock error = %v, want simulated error", err) } // Unlock error (HeadObject fails with non-NotFound). - err = store.Unlock("x") + err = store.Unlock(context.Background(), "x") if err == nil || !strings.Contains(err.Error(), "simulated") { t.Errorf("Unlock error = %v, want simulated error", err) } diff --git a/module/iac_state_test.go b/module/iac_state_test.go index d8c0b080..1e5c2852 100644 --- a/module/iac_state_test.go +++ b/module/iac_state_test.go @@ -32,10 +32,10 @@ func runStateStoreSuite(t *testing.T, store module.IaCStateStore) { t.Run("SaveAndGet", func(t *testing.T) { st := makeState("res-1", "kubernetes", "local", "planned") - if err := store.SaveState(st); err != nil { + if err := store.SaveState(context.Background(), st); err != nil { t.Fatalf("SaveState: %v", err) } - got, err := store.GetState("res-1") + got, err := store.GetState(context.Background(), "res-1") if err != nil { t.Fatalf("GetState: %v", err) } @@ -51,7 +51,7 @@ func runStateStoreSuite(t *testing.T, store module.IaCStateStore) { }) t.Run("GetNotFound", func(t *testing.T) { - got, err := store.GetState("nonexistent") + got, err := store.GetState(context.Background(), "nonexistent") if err != nil { t.Fatalf("GetState unexpected error: %v", err) } @@ -61,28 +61,28 @@ func runStateStoreSuite(t *testing.T, store module.IaCStateStore) { }) t.Run("SaveState_NilError", func(t *testing.T) { - if err := store.SaveState(nil); err == nil { + if err := store.SaveState(context.Background(), nil); err == nil { t.Error("expected error for nil state, got nil") } }) t.Run("SaveState_EmptyIDError", func(t *testing.T) { st := &module.IaCState{ResourceID: "", Status: "planned"} - if err := store.SaveState(st); err == nil { + if err := store.SaveState(context.Background(), st); err == nil { t.Error("expected error for empty resource_id, got nil") } }) t.Run("UpdateState", func(t *testing.T) { st := makeState("res-update", "kubernetes", "local", "planned") - if err := store.SaveState(st); err != nil { + if err := store.SaveState(context.Background(), st); err != nil { t.Fatalf("SaveState: %v", err) } st.Status = "active" - if err := store.SaveState(st); err != nil { + if err := store.SaveState(context.Background(), st); err != nil { t.Fatalf("SaveState update: %v", err) } - got, _ := store.GetState("res-update") + got, _ := store.GetState(context.Background(), "res-update") if got.Status != "active" { t.Errorf("expected status=active after update, got %q", got.Status) } @@ -92,10 +92,10 @@ func runStateStoreSuite(t *testing.T, store module.IaCStateStore) { t.Run("ListAll", func(t *testing.T) { // Seed two distinct resources. - _ = store.SaveState(makeState("list-a", "kubernetes", "aws", "active")) - _ = store.SaveState(makeState("list-b", "ecs", "aws", "planned")) + _ = store.SaveState(context.Background(), makeState("list-a", "kubernetes", "aws", "active")) + _ = store.SaveState(context.Background(), makeState("list-b", "ecs", "aws", "planned")) - all, err := store.ListStates(map[string]string{}) + all, err := store.ListStates(context.Background(), map[string]string{}) if err != nil { t.Fatalf("ListStates: %v", err) } @@ -105,10 +105,10 @@ func runStateStoreSuite(t *testing.T, store module.IaCStateStore) { }) t.Run("ListByStatus", func(t *testing.T) { - _ = store.SaveState(makeState("filter-active", "kubernetes", "gcp", "active")) - _ = store.SaveState(makeState("filter-destroyed", "kubernetes", "gcp", "destroyed")) + _ = store.SaveState(context.Background(), makeState("filter-active", "kubernetes", "gcp", "active")) + _ = store.SaveState(context.Background(), makeState("filter-destroyed", "kubernetes", "gcp", "destroyed")) - active, err := store.ListStates(map[string]string{"status": "active"}) + active, err := store.ListStates(context.Background(), map[string]string{"status": "active"}) if err != nil { t.Fatalf("ListStates by status: %v", err) } @@ -120,10 +120,10 @@ func runStateStoreSuite(t *testing.T, store module.IaCStateStore) { }) t.Run("ListByProvider", func(t *testing.T) { - _ = store.SaveState(makeState("prov-aws", "kubernetes", "aws", "active")) - _ = store.SaveState(makeState("prov-gcp", "kubernetes", "gcp", "active")) + _ = store.SaveState(context.Background(), makeState("prov-aws", "kubernetes", "aws", "active")) + _ = store.SaveState(context.Background(), makeState("prov-gcp", "kubernetes", "gcp", "active")) - awsOnly, err := store.ListStates(map[string]string{"provider": "aws"}) + awsOnly, err := store.ListStates(context.Background(), map[string]string{"provider": "aws"}) if err != nil { t.Fatalf("ListStates by provider: %v", err) } @@ -137,18 +137,18 @@ func runStateStoreSuite(t *testing.T, store module.IaCStateStore) { // ── DeleteState ─────────────────────────────────────────────────────────── t.Run("DeleteState", func(t *testing.T) { - _ = store.SaveState(makeState("del-me", "kubernetes", "local", "active")) - if err := store.DeleteState("del-me"); err != nil { + _ = store.SaveState(context.Background(), makeState("del-me", "kubernetes", "local", "active")) + if err := store.DeleteState(context.Background(), "del-me"); err != nil { t.Fatalf("DeleteState: %v", err) } - got, _ := store.GetState("del-me") + got, _ := store.GetState(context.Background(), "del-me") if got != nil { t.Error("expected nil after delete, got non-nil") } }) t.Run("DeleteNotFound", func(t *testing.T) { - if err := store.DeleteState("ghost-resource"); err == nil { + if err := store.DeleteState(context.Background(), "ghost-resource"); err == nil { t.Error("expected error for nonexistent resource, got nil") } }) @@ -156,28 +156,28 @@ func runStateStoreSuite(t *testing.T, store module.IaCStateStore) { // ── Lock / Unlock ───────────────────────────────────────────────────────── t.Run("LockAndUnlock", func(t *testing.T) { - if err := store.Lock("lock-res"); err != nil { + if err := store.Lock(context.Background(), "lock-res"); err != nil { t.Fatalf("Lock: %v", err) } - if err := store.Unlock("lock-res"); err != nil { + if err := store.Unlock(context.Background(), "lock-res"); err != nil { t.Fatalf("Unlock: %v", err) } }) t.Run("DoubleLock", func(t *testing.T) { - if err := store.Lock("double-lock"); err != nil { + if err := store.Lock(context.Background(), "double-lock"); err != nil { t.Fatalf("first Lock: %v", err) } // Second lock must fail. - if err := store.Lock("double-lock"); err == nil { + if err := store.Lock(context.Background(), "double-lock"); err == nil { t.Error("expected error on double-lock, got nil") } // Clean up. - _ = store.Unlock("double-lock") + _ = store.Unlock(context.Background(), "double-lock") }) t.Run("UnlockNotLocked", func(t *testing.T) { - if err := store.Unlock("never-locked"); err == nil { + if err := store.Unlock(context.Background(), "never-locked"); err == nil { t.Error("expected error unlocking a non-locked resource, got nil") } }) @@ -203,13 +203,13 @@ func TestIaCStateStore_Filesystem_PersistAcrossInstances(t *testing.T) { st := makeState("persist-res", "kubernetes", "local", "active") store1 := module.NewFSIaCStateStore(dir) - if err := store1.SaveState(st); err != nil { + if err := store1.SaveState(context.Background(), st); err != nil { t.Fatalf("SaveState: %v", err) } // New store instance pointing at the same directory. store2 := module.NewFSIaCStateStore(dir) - got, err := store2.GetState("persist-res") + got, err := store2.GetState(context.Background(), "persist-res") if err != nil { t.Fatalf("GetState: %v", err) } @@ -222,7 +222,7 @@ func TestIaCStateStore_Filesystem_JSONFiles(t *testing.T) { dir := t.TempDir() store := module.NewFSIaCStateStore(dir) - if err := store.SaveState(makeState("json-check", "ecs", "aws", "planned")); err != nil { + if err := store.SaveState(context.Background(), makeState("json-check", "ecs", "aws", "planned")); err != nil { t.Fatalf("SaveState: %v", err) } diff --git a/module/pipeline_step_iac.go b/module/pipeline_step_iac.go index 63773bf4..52246e59 100644 --- a/module/pipeline_step_iac.go +++ b/module/pipeline_step_iac.go @@ -11,6 +11,19 @@ import ( // ─── helpers ────────────────────────────────────────────────────────────────── +// lookupExistingState fetches the current state record for a step's resource. +// A nil record (not found) is reported as (nil, nil); any store error — +// including a context cancellation / deadline now that IaCStateStore carries +// the caller's ctx — is returned so the step aborts rather than proceeding with +// a stale notion of "existing". +func lookupExistingState(ctx context.Context, store IaCStateStore, resourceID string) (*IaCState, error) { + existing, err := store.GetState(ctx, resourceID) + if err != nil { + return nil, fmt.Errorf("lookup existing iac.state for %q: %w", resourceID, err) + } + return existing, nil +} + // resolveIaCStore looks up an IaCStateStore from the service registry. func resolveIaCStore(app modular.Application, storeName, stepName string) (IaCStateStore, error) { if app == nil { @@ -100,7 +113,7 @@ func NewIaCPlanStepFactory() StepFactory { func (s *IaCPlanStep) Name() string { return s.name } -func (s *IaCPlanStep) Execute(_ context.Context, _ *PipelineContext) (*StepResult, error) { +func (s *IaCPlanStep) Execute(ctx context.Context, _ *PipelineContext) (*StepResult, error) { provider, err := resolvePlatformProvider(s.app, s.platform, s.name) if err != nil { return nil, err @@ -116,7 +129,10 @@ func (s *IaCPlanStep) Execute(_ context.Context, _ *PipelineContext) (*StepResul } // Persist planned state. - existing, _ := store.GetState(s.resourceID) + existing, err := lookupExistingState(ctx, store, s.resourceID) + if err != nil { + return nil, fmt.Errorf("iac_plan step %q: %w", s.name, err) + } now := nowUTC() st := &IaCState{ ResourceID: s.resourceID, @@ -134,7 +150,7 @@ func (s *IaCPlanStep) Execute(_ context.Context, _ *PipelineContext) (*StepResul st.CreatedAt = now } - if err := store.SaveState(st); err != nil { + if err := store.SaveState(ctx, st); err != nil { return nil, fmt.Errorf("iac_plan step %q: save state: %w", s.name, err) } @@ -185,7 +201,7 @@ func NewIaCApplyStepFactory() StepFactory { func (s *IaCApplyStep) Name() string { return s.name } -func (s *IaCApplyStep) Execute(_ context.Context, _ *PipelineContext) (*StepResult, error) { +func (s *IaCApplyStep) Execute(ctx context.Context, _ *PipelineContext) (*StepResult, error) { provider, err := resolvePlatformProvider(s.app, s.platform, s.name) if err != nil { return nil, err @@ -198,11 +214,14 @@ func (s *IaCApplyStep) Execute(_ context.Context, _ *PipelineContext) (*StepResu now := nowUTC() // Transition state to provisioning before calling Apply. - existing, _ := store.GetState(s.resourceID) + existing, err := lookupExistingState(ctx, store, s.resourceID) + if err != nil { + return nil, fmt.Errorf("iac_apply step %q: %w", s.name, err) + } if existing != nil { existing.Status = "provisioning" existing.UpdatedAt = now - _ = store.SaveState(existing) + _ = store.SaveState(ctx, existing) } result, err := provider.Apply() @@ -212,7 +231,7 @@ func (s *IaCApplyStep) Execute(_ context.Context, _ *PipelineContext) (*StepResu existing.Status = "error" existing.Error = err.Error() existing.UpdatedAt = nowUTC() - _ = store.SaveState(existing) + _ = store.SaveState(ctx, existing) } return nil, fmt.Errorf("iac_apply step %q: Apply: %w", s.name, err) } @@ -255,7 +274,7 @@ func (s *IaCApplyStep) Execute(_ context.Context, _ *PipelineContext) (*StepResu st.CreatedAt = st.UpdatedAt } - if err := store.SaveState(st); err != nil { + if err := store.SaveState(ctx, st); err != nil { return nil, fmt.Errorf("iac_apply step %q: save state: %w", s.name, err) } @@ -308,7 +327,7 @@ func NewIaCStatusStepFactory() StepFactory { func (s *IaCStatusStep) Name() string { return s.name } -func (s *IaCStatusStep) Execute(_ context.Context, _ *PipelineContext) (*StepResult, error) { +func (s *IaCStatusStep) Execute(ctx context.Context, _ *PipelineContext) (*StepResult, error) { provider, err := resolvePlatformProvider(s.app, s.platform, s.name) if err != nil { return nil, err @@ -323,7 +342,7 @@ func (s *IaCStatusStep) Execute(_ context.Context, _ *PipelineContext) (*StepRes return nil, fmt.Errorf("iac_status step %q: Status: %w", s.name, err) } - st, err := store.GetState(s.resourceID) + st, err := store.GetState(ctx, s.resourceID) if err != nil { return nil, fmt.Errorf("iac_status step %q: get state: %w", s.name, err) } @@ -379,7 +398,7 @@ func NewIaCDestroyStepFactory() StepFactory { func (s *IaCDestroyStep) Name() string { return s.name } -func (s *IaCDestroyStep) Execute(_ context.Context, _ *PipelineContext) (*StepResult, error) { +func (s *IaCDestroyStep) Execute(ctx context.Context, _ *PipelineContext) (*StepResult, error) { provider, err := resolvePlatformProvider(s.app, s.platform, s.name) if err != nil { return nil, err @@ -391,11 +410,14 @@ func (s *IaCDestroyStep) Execute(_ context.Context, _ *PipelineContext) (*StepRe now := nowUTC() - existing, _ := store.GetState(s.resourceID) + existing, err := lookupExistingState(ctx, store, s.resourceID) + if err != nil { + return nil, fmt.Errorf("iac_destroy step %q: %w", s.name, err) + } if existing != nil { existing.Status = "destroying" existing.UpdatedAt = now - _ = store.SaveState(existing) + _ = store.SaveState(ctx, existing) } if err := provider.Destroy(); err != nil { @@ -403,7 +425,7 @@ func (s *IaCDestroyStep) Execute(_ context.Context, _ *PipelineContext) (*StepRe existing.Status = "error" existing.Error = err.Error() existing.UpdatedAt = nowUTC() - _ = store.SaveState(existing) + _ = store.SaveState(ctx, existing) } return nil, fmt.Errorf("iac_destroy step %q: Destroy: %w", s.name, err) } @@ -423,7 +445,7 @@ func (s *IaCDestroyStep) Execute(_ context.Context, _ *PipelineContext) (*StepRe st.CreatedAt = st.UpdatedAt } - if err := store.SaveState(st); err != nil { + if err := store.SaveState(ctx, st); err != nil { return nil, fmt.Errorf("iac_destroy step %q: save state: %w", s.name, err) } @@ -476,13 +498,13 @@ func NewIaCDriftDetectStepFactory() StepFactory { func (s *IaCDriftDetectStep) Name() string { return s.name } -func (s *IaCDriftDetectStep) Execute(_ context.Context, _ *PipelineContext) (*StepResult, error) { +func (s *IaCDriftDetectStep) Execute(ctx context.Context, _ *PipelineContext) (*StepResult, error) { store, err := resolveIaCStore(s.app, s.storeName, s.name) if err != nil { return nil, err } - st, err := store.GetState(s.resourceID) + st, err := store.GetState(ctx, s.resourceID) if err != nil { return nil, fmt.Errorf("iac_drift_detect step %q: get state: %w", s.name, err) } diff --git a/module/pipeline_step_iac_test.go b/module/pipeline_step_iac_test.go index 9de27b65..bae0712c 100644 --- a/module/pipeline_step_iac_test.go +++ b/module/pipeline_step_iac_test.go @@ -104,7 +104,7 @@ func TestIaCPlanStep_BasicPlan(t *testing.T) { } // State should be persisted. - st, _ := store.GetState("my-cluster") + st, _ := store.GetState(context.Background(), "my-cluster") if st == nil { t.Fatal("expected state to be persisted after plan") } @@ -178,7 +178,7 @@ func TestIaCApplyStep_BasicApply(t *testing.T) { } // State should be active. - st, _ := store.GetState("my-cluster") + st, _ := store.GetState(context.Background(), "my-cluster") if st == nil || st.Status != "active" { t.Errorf("expected stored status=active, got %v", st) } @@ -189,7 +189,7 @@ func TestIaCApplyStep_ApplyError(t *testing.T) { provider.applyErr = errors.New("apply failed: insufficient resources") // Seed a planned state. - _ = store.SaveState(makeState("my-cluster", "kubernetes", "local", "planned")) + _ = store.SaveState(context.Background(), makeState("my-cluster", "kubernetes", "local", "planned")) factory := module.NewIaCApplyStepFactory() step, _ := factory("apply", baseIaCCfg(), app) @@ -199,7 +199,7 @@ func TestIaCApplyStep_ApplyError(t *testing.T) { } // State should be error. - st, _ := store.GetState("my-cluster") + st, _ := store.GetState(context.Background(), "my-cluster") if st == nil || st.Status != "error" { t.Errorf("expected stored status=error after apply failure, got %v", st) } @@ -217,7 +217,7 @@ func TestIaCApplyStep_MissingPlatform(t *testing.T) { func TestIaCStatusStep_BasicStatus(t *testing.T) { app, store, _ := setupIaCApp(t) - _ = store.SaveState(makeState("my-cluster", "kubernetes", "local", "active")) + _ = store.SaveState(context.Background(), makeState("my-cluster", "kubernetes", "local", "active")) factory := module.NewIaCStatusStepFactory() step, err := factory("status", baseIaCCfg(), app) @@ -263,7 +263,7 @@ func TestIaCStatusStep_ProviderStatusError(t *testing.T) { func TestIaCDestroyStep_BasicDestroy(t *testing.T) { app, store, provider := setupIaCApp(t) - _ = store.SaveState(makeState("my-cluster", "kubernetes", "local", "active")) + _ = store.SaveState(context.Background(), makeState("my-cluster", "kubernetes", "local", "active")) factory := module.NewIaCDestroyStepFactory() step, err := factory("destroy", baseIaCCfg(), app) @@ -285,7 +285,7 @@ func TestIaCDestroyStep_BasicDestroy(t *testing.T) { } // State should be destroyed. - st, _ := store.GetState("my-cluster") + st, _ := store.GetState(context.Background(), "my-cluster") if st == nil || st.Status != "destroyed" { t.Errorf("expected stored status=destroyed, got %v", st) } @@ -294,7 +294,7 @@ func TestIaCDestroyStep_BasicDestroy(t *testing.T) { func TestIaCDestroyStep_DestroyError(t *testing.T) { app, store, provider := setupIaCApp(t) provider.destroyErr = errors.New("cannot delete: cluster in use") - _ = store.SaveState(makeState("my-cluster", "kubernetes", "local", "active")) + _ = store.SaveState(context.Background(), makeState("my-cluster", "kubernetes", "local", "active")) factory := module.NewIaCDestroyStepFactory() step, _ := factory("destroy", baseIaCCfg(), app) @@ -303,7 +303,7 @@ func TestIaCDestroyStep_DestroyError(t *testing.T) { t.Error("expected error from Destroy, got nil") } - st, _ := store.GetState("my-cluster") + st, _ := store.GetState(context.Background(), "my-cluster") if st == nil || st.Status != "error" { t.Errorf("expected stored status=error after destroy failure, got %v", st) } @@ -325,7 +325,7 @@ func TestIaCDriftDetect_NoDrift(t *testing.T) { // Store state with a config snapshot. st := makeState("my-cluster", "kubernetes", "local", "active") st.Config = map[string]any{"version": "1.29", "nodeCount": 3} - _ = store.SaveState(st) + _ = store.SaveState(context.Background(), st) cfg := map[string]any{ "platform": "test-platform", @@ -358,7 +358,7 @@ func TestIaCDriftDetect_WithDrift(t *testing.T) { // Store state with original config. st := makeState("my-cluster", "kubernetes", "local", "active") st.Config = map[string]any{"version": "1.29", "nodeCount": 3} - _ = store.SaveState(st) + _ = store.SaveState(context.Background(), st) // Current config has modified nodeCount and a new key. cfg := map[string]any{ @@ -406,7 +406,7 @@ func TestIaCDriftDetect_RemovedKey(t *testing.T) { st := makeState("my-cluster", "kubernetes", "local", "active") st.Config = map[string]any{"version": "1.29", "tags": "prod"} - _ = store.SaveState(st) + _ = store.SaveState(context.Background(), st) // Current config is missing the "tags" key. cfg := map[string]any{ @@ -463,7 +463,7 @@ func TestIaCLifecycle_PlanApplyStatusDestroy(t *testing.T) { if err != nil { t.Fatalf("Plan: %v", err) } - st, _ := store.GetState("my-cluster") + st, _ := store.GetState(context.Background(), "my-cluster") if st.Status != "planned" { t.Fatalf("expected planned, got %q", st.Status) } @@ -474,7 +474,7 @@ func TestIaCLifecycle_PlanApplyStatusDestroy(t *testing.T) { if err != nil { t.Fatalf("Apply: %v", err) } - st, _ = store.GetState("my-cluster") + st, _ = store.GetState(context.Background(), "my-cluster") if st.Status != "active" { t.Fatalf("expected active, got %q", st.Status) } @@ -495,7 +495,7 @@ func TestIaCLifecycle_PlanApplyStatusDestroy(t *testing.T) { if err != nil { t.Fatalf("Destroy: %v", err) } - st, _ = store.GetState("my-cluster") + st, _ = store.GetState(context.Background(), "my-cluster") if st.Status != "destroyed" { t.Fatalf("expected destroyed, got %q", st.Status) }