From e805ded8bc308813150f5685c9aef92107d2a590 Mon Sep 17 00:00:00 2001 From: Jon Langevin Date: Thu, 14 May 2026 19:06:16 -0400 Subject: [PATCH 1/4] feat: port S3-compatible IaC state store into aws plugin (no DO env fallbacks) --- go.mod | 2 +- internal/statebackend/s3.go | 417 ++++++++++++++++++++++++++++ internal/statebackend/s3_test.go | 460 +++++++++++++++++++++++++++++++ 3 files changed, 878 insertions(+), 1 deletion(-) create mode 100644 internal/statebackend/s3.go create mode 100644 internal/statebackend/s3_test.go diff --git a/go.mod b/go.mod index 126d2a0..55228b6 100644 --- a/go.mod +++ b/go.mod @@ -21,6 +21,7 @@ require ( github.com/aws/aws-sdk-go-v2/service/rds v1.115.0 github.com/aws/aws-sdk-go-v2/service/route53 v1.62.5 github.com/aws/aws-sdk-go-v2/service/s3 v1.97.2 + google.golang.org/grpc v1.80.0 google.golang.org/protobuf v1.36.11 ) @@ -224,7 +225,6 @@ require ( google.golang.org/genproto v0.0.0-20260319201613-d00831a3d3e7 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20260420184626-e10c466a9529 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20260420184626-e10c466a9529 // indirect - google.golang.org/grpc v1.80.0 // indirect google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.6.1 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect modernc.org/libc v1.70.0 // indirect diff --git a/internal/statebackend/s3.go b/internal/statebackend/s3.go new file mode 100644 index 0000000..034bcb7 --- /dev/null +++ b/internal/statebackend/s3.go @@ -0,0 +1,417 @@ +// Package statebackend provides the s3 IaCStateBackend implementation, ported +// from workflow core's module/iac_state_spaces.go (cloud-SDK-extraction effort, +// decisions/0033-0036). It is self-contained: it carries its own IaCState type, +// IaCStateStore interface, and S3Client interface so the plugin can SERVE the +// s3 backend over the typed IaCStateBackend gRPC contract without depending on +// workflow core's unexported state helpers. +// +// Unlike workflow-plugin-digitalocean's port (which serves `spaces` and keeps +// the DO_SPACES_* env fallbacks), this aws-plugin port deliberately drops the +// DigitalOcean-specific credential env fallbacks: when no static credentials +// are supplied it defers to aws-sdk-go-v2's default credential chain. +package statebackend + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "fmt" + "io" + "strings" + "sync" + "time" + + "github.com/aws/aws-sdk-go-v2/aws" + awsconfig "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/credentials" + "github.com/aws/aws-sdk-go-v2/service/s3" + "github.com/aws/aws-sdk-go-v2/service/s3/types" +) + +// IaCState tracks the state of an infrastructure resource. Mirrors +// module.IaCState (workflow core module/iac_state.go) and the proto IaCState +// message (plugin/external/proto/iac.proto) — kept local so this package is +// self-contained. +type IaCState struct { + ResourceID string `json:"resource_id"` + ResourceType string `json:"resource_type"` // e.g. "kubernetes", "ecs" + Provider string `json:"provider"` // e.g. "aws", "gcp", "local" + ProviderRef string `json:"provider_ref,omitempty"` + ProviderID string `json:"provider_id,omitempty"` + ConfigHash string `json:"config_hash,omitempty"` + Status string `json:"status"` // planned, provisioning, active, destroying, destroyed, error + Outputs map[string]any `json:"outputs"` // provider-specific outputs + Config map[string]any `json:"config"` // the config used to provision + Dependencies []string `json:"dependencies,omitempty"` + CreatedAt string `json:"created_at"` + UpdatedAt string `json:"updated_at"` + Error string `json:"error,omitempty"` +} + +// IaCStateStore is the interface for IaC state persistence backends. Mirrors +// the ctx-ful module.IaCStateStore (workflow core module/iac_state.go) — kept +// local so this package is self-contained. +type IaCStateStore interface { + // GetState retrieves a state record by resource ID. Returns nil, nil when not found. + GetState(ctx context.Context, resourceID string) (*IaCState, error) + // SaveState inserts or replaces a state record. + SaveState(ctx context.Context, state *IaCState) error + // ListStates returns all state records matching the provided key=value filter. + // Pass a nil or empty map to return all records. + ListStates(ctx context.Context, filter map[string]string) ([]*IaCState, error) + // DeleteState removes a state record by resource ID. + DeleteState(ctx context.Context, resourceID string) error + // Lock acquires an exclusive lock for the given resource ID. + Lock(ctx context.Context, resourceID string) error + // Unlock releases the lock for the given resource ID. + Unlock(ctx context.Context, resourceID string) error +} + +// sanitizeID replaces path-unsafe characters so resource IDs can be used as object keys. +func sanitizeID(id string) string { + id = strings.ReplaceAll(id, "/", "_") + id = strings.ReplaceAll(id, "\\", "_") + return id +} + +// matchesFilter returns true if state satisfies all entries in the filter map. +// Only the allow-listed keys "resource_type", "provider", and "status" are +// honored — any other key is ignored. This mirrors the FS and in-memory +// IaCStateStore implementations in workflow core so all backends filter +// identically. +func matchesFilter(st *IaCState, filter map[string]string) bool { + for k, v := range filter { + switch k { + case "resource_type": + if st.ResourceType != v { + return false + } + case "provider": + if st.Provider != v { + return false + } + case "status": + if st.Status != v { + return false + } + } + } + return true +} + +// S3Client abstracts the S3 API methods used by S3IaCStateStore, allowing a +// mock to be injected for testing. +type S3Client interface { + GetObject(ctx context.Context, input *s3.GetObjectInput, opts ...func(*s3.Options)) (*s3.GetObjectOutput, error) + PutObject(ctx context.Context, input *s3.PutObjectInput, opts ...func(*s3.Options)) (*s3.PutObjectOutput, error) + DeleteObject(ctx context.Context, input *s3.DeleteObjectInput, opts ...func(*s3.Options)) (*s3.DeleteObjectOutput, error) + ListObjectsV2(ctx context.Context, input *s3.ListObjectsV2Input, opts ...func(*s3.Options)) (*s3.ListObjectsV2Output, error) + HeadObject(ctx context.Context, input *s3.HeadObjectInput, opts ...func(*s3.Options)) (*s3.HeadObjectOutput, error) +} + +// S3IaCStateStore persists IaC state as JSON objects in an S3 (or S3-compatible) +// bucket. Lock objects are used for advisory locking. +type S3IaCStateStore struct { + client S3Client + bucket string + prefix string + mu sync.Mutex +} + +// NewS3IaCStateStore creates an S3 / S3-compatible state store. +// +// Parameters: +// - region: AWS region (e.g. "us-east-1"); used by the SDK to resolve the +// S3 endpoint unless endpoint is set. +// - bucket: S3 bucket name (required). +// - prefix: optional key prefix (default "iac-state/"). +// - accessKey: optional static access key. When accessKey/secretKey are both +// empty, the aws-sdk-go-v2 default credential chain +// (AWS_ACCESS_KEY_ID/AWS_SECRET_ACCESS_KEY env, shared config, instance +// role, etc.) applies — no DigitalOcean-specific env fallback. +// - secretKey: optional static secret key (see accessKey). +// - endpoint: optional custom endpoint override for S3-compatible stores. +func NewS3IaCStateStore(region, bucket, prefix, accessKey, secretKey, endpoint string) (*S3IaCStateStore, error) { + if bucket == "" { + return nil, fmt.Errorf("iac s3 state: bucket must not be empty") + } + if prefix == "" { + prefix = "iac-state/" + } + if region == "" && endpoint == "" { + return nil, fmt.Errorf("iac s3 state: either region or endpoint must be set") + } + + opts := []func(*awsconfig.LoadOptions) error{ + awsconfig.WithRegion(regionOrDefault(region)), + } + // Only inject static credentials when BOTH are explicitly provided. + // Otherwise defer to aws-sdk-go-v2's default credential chain — there is + // deliberately no DigitalOcean-specific env-var fallback here. + if accessKey != "" && secretKey != "" { + opts = append(opts, awsconfig.WithCredentialsProvider( + credentials.NewStaticCredentialsProvider(accessKey, secretKey, ""))) + } + + cfg, err := awsconfig.LoadDefaultConfig(context.Background(), opts...) + if err != nil { + return nil, fmt.Errorf("iac s3 state: load config: %w", err) + } + + client := s3.NewFromConfig(cfg, func(o *s3.Options) { + if endpoint != "" { + o.BaseEndpoint = &endpoint + } + o.UsePathStyle = true + }) + + return &S3IaCStateStore{ + client: client, + bucket: bucket, + prefix: prefix, + }, nil +} + +// NewS3IaCStateStoreWithClient creates a store with an injected client (for testing). +func NewS3IaCStateStoreWithClient(client S3Client, bucket, prefix string) *S3IaCStateStore { + if prefix == "" { + prefix = "iac-state/" + } + return &S3IaCStateStore{ + client: client, + bucket: bucket, + prefix: prefix, + } +} + +func regionOrDefault(region string) string { + if region == "" { + return "us-east-1" + } + return region +} + +// stateKey returns the S3 key for a resource's state JSON. +func (s *S3IaCStateStore) stateKey(resourceID string) string { + return s.prefix + sanitizeID(resourceID) + ".json" +} + +// lockKey returns the S3 key for a resource's lock object. +func (s *S3IaCStateStore) lockKey(resourceID string) string { + return s.prefix + sanitizeID(resourceID) + ".lock" +} + +// GetState retrieves a state record by resource ID. Returns nil, nil when not found. +func (s *S3IaCStateStore) GetState(ctx context.Context, resourceID string) (*IaCState, error) { + key := s.stateKey(resourceID) + out, err := s.client.GetObject(ctx, &s3.GetObjectInput{ + Bucket: &s.bucket, + Key: &key, + }) + if err != nil { + if isNotFoundErr(err) { + return nil, nil + } + return nil, fmt.Errorf("iac s3 state: GetState %q: %w", resourceID, err) + } + defer out.Body.Close() + + data, err := io.ReadAll(out.Body) + if err != nil { + return nil, fmt.Errorf("iac s3 state: GetState %q: read body: %w", resourceID, err) + } + + var st IaCState + if err := json.Unmarshal(data, &st); err != nil { + return nil, fmt.Errorf("iac s3 state: GetState %q: unmarshal: %w", resourceID, err) + } + return &st, nil +} + +// SaveState writes the state record as a JSON object to S3. +func (s *S3IaCStateStore) SaveState(ctx context.Context, state *IaCState) error { + if state == nil { + return fmt.Errorf("iac s3 state: SaveState: state must not be nil") + } + if state.ResourceID == "" { + return fmt.Errorf("iac s3 state: SaveState: resource_id must not be empty") + } + + data, err := json.MarshalIndent(state, "", " ") + if err != nil { + return fmt.Errorf("iac s3 state: SaveState %q: marshal: %w", state.ResourceID, err) + } + + key := s.stateKey(state.ResourceID) + contentType := "application/json" + _, err = s.client.PutObject(ctx, &s3.PutObjectInput{ + Bucket: &s.bucket, + Key: &key, + Body: bytes.NewReader(data), + ContentType: &contentType, + }) + if err != nil { + return fmt.Errorf("iac s3 state: SaveState %q: put: %w", state.ResourceID, err) + } + return nil +} + +// ListStates lists all state objects under the prefix and returns those matching filter. +// Supported filter keys: "resource_type", "provider", "status". +func (s *S3IaCStateStore) ListStates(ctx context.Context, filter map[string]string) ([]*IaCState, error) { + var results []*IaCState + var continuationToken *string + + for { + out, err := s.client.ListObjectsV2(ctx, &s3.ListObjectsV2Input{ + Bucket: &s.bucket, + Prefix: &s.prefix, + ContinuationToken: continuationToken, + }) + if err != nil { + return nil, fmt.Errorf("iac s3 state: ListStates: %w", err) + } + + for _, obj := range out.Contents { + key := aws.ToString(obj.Key) + // Skip lock files and non-JSON objects. + if strings.HasSuffix(key, ".lock") || !strings.HasSuffix(key, ".json") { + continue + } + + getOut, err := s.client.GetObject(ctx, &s3.GetObjectInput{ + Bucket: &s.bucket, + Key: obj.Key, + }) + if err != nil { + continue // skip unreadable objects + } + data, err := io.ReadAll(getOut.Body) + getOut.Body.Close() + if err != nil { + continue + } + + var st IaCState + if err := json.Unmarshal(data, &st); err != nil { + continue + } + if matchesFilter(&st, filter) { + results = append(results, &st) + } + } + + if !aws.ToBool(out.IsTruncated) { + break + } + continuationToken = out.NextContinuationToken + } + + return results, nil +} + +// DeleteState removes the state object for resourceID. +func (s *S3IaCStateStore) DeleteState(ctx context.Context, resourceID string) error { + // Verify existence first to return a meaningful error. + key := s.stateKey(resourceID) + _, err := s.client.HeadObject(ctx, &s3.HeadObjectInput{ + Bucket: &s.bucket, + Key: &key, + }) + if err != nil { + if isNotFoundErr(err) { + return fmt.Errorf("iac s3 state: DeleteState %q: not found", resourceID) + } + return fmt.Errorf("iac s3 state: DeleteState %q: head: %w", resourceID, err) + } + + _, err = s.client.DeleteObject(ctx, &s3.DeleteObjectInput{ + Bucket: &s.bucket, + Key: &key, + }) + if err != nil { + return fmt.Errorf("iac s3 state: DeleteState %q: %w", resourceID, err) + } + return nil +} + +// Lock creates a lock object for resourceID using S3 conditional writes (If-None-Match: *) +// for atomic, race-free lock acquisition. Fails if the lock already exists. +func (s *S3IaCStateStore) Lock(ctx context.Context, resourceID string) error { + s.mu.Lock() + defer s.mu.Unlock() + + key := s.lockKey(resourceID) + body := []byte(time.Now().UTC().Format(time.RFC3339)) + ifNoneMatch := "*" + + _, err := s.client.PutObject(ctx, &s3.PutObjectInput{ + Bucket: &s.bucket, + Key: &key, + Body: bytes.NewReader(body), + IfNoneMatch: &ifNoneMatch, + }) + if err != nil { + // S3 returns 412 Precondition Failed when the object already exists. + if isPreconditionFailedErr(err) { + return fmt.Errorf("iac s3 state: Lock %q: resource is already locked", resourceID) + } + return fmt.Errorf("iac s3 state: Lock %q: put: %w", resourceID, err) + } + return nil +} + +// Unlock removes the lock object for resourceID. +func (s *S3IaCStateStore) Unlock(ctx context.Context, resourceID string) error { + s.mu.Lock() + defer s.mu.Unlock() + + key := s.lockKey(resourceID) + + // Verify lock exists. + _, err := s.client.HeadObject(ctx, &s3.HeadObjectInput{ + Bucket: &s.bucket, + Key: &key, + }) + if err != nil { + if isNotFoundErr(err) { + return fmt.Errorf("iac s3 state: Unlock %q: not locked", resourceID) + } + return fmt.Errorf("iac s3 state: Unlock %q: head: %w", resourceID, err) + } + + _, err = s.client.DeleteObject(ctx, &s3.DeleteObjectInput{ + Bucket: &s.bucket, + Key: &key, + }) + if err != nil { + return fmt.Errorf("iac s3 state: Unlock %q: %w", resourceID, err) + } + return nil +} + +// isPreconditionFailedErr returns true for HTTP 412 Precondition Failed responses, +// which S3 returns when a conditional write fails (e.g. If-None-Match: * on an existing object). +func isPreconditionFailedErr(err error) bool { + if err == nil { + return false + } + msg := err.Error() + return strings.Contains(msg, "PreconditionFailed") || strings.Contains(msg, "412") +} + +// isNotFoundErr checks whether an S3 error indicates the key was not found. +func isNotFoundErr(err error) bool { + var nsk *types.NoSuchKey + if errors.As(err, &nsk) { + return true + } + // HeadObject returns a generic "NotFound" status, not NoSuchKey. + var nf *types.NotFound + if errors.As(err, &nf) { + return true + } + // Some S3-compatible stores return a plain "not found" in the message. + return strings.Contains(err.Error(), "NotFound") || strings.Contains(err.Error(), "NoSuchKey") +} diff --git a/internal/statebackend/s3_test.go b/internal/statebackend/s3_test.go new file mode 100644 index 0000000..ff87473 --- /dev/null +++ b/internal/statebackend/s3_test.go @@ -0,0 +1,460 @@ +package statebackend + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "strings" + "sync" + "testing" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/s3" + "github.com/aws/aws-sdk-go-v2/service/s3/types" +) + +// mockS3Client implements S3Client for testing. +type mockS3Client struct { + mu sync.Mutex + objects map[string][]byte // key -> body +} + +func newMockS3Client() *mockS3Client { + return &mockS3Client{objects: make(map[string][]byte)} +} + +func (m *mockS3Client) GetObject(_ context.Context, input *s3.GetObjectInput, _ ...func(*s3.Options)) (*s3.GetObjectOutput, error) { + m.mu.Lock() + defer m.mu.Unlock() + key := aws.ToString(input.Key) + data, ok := m.objects[key] + if !ok { + return nil, &types.NoSuchKey{Message: aws.String("NoSuchKey: " + key)} + } + return &s3.GetObjectOutput{ + Body: io.NopCloser(bytes.NewReader(data)), + }, nil +} + +func (m *mockS3Client) PutObject(_ context.Context, input *s3.PutObjectInput, _ ...func(*s3.Options)) (*s3.PutObjectOutput, error) { + m.mu.Lock() + defer m.mu.Unlock() + key := aws.ToString(input.Key) + // Honour If-None-Match: * — fail if the object already exists (atomic lock semantics). + if aws.ToString(input.IfNoneMatch) == "*" { + if _, exists := m.objects[key]; exists { + return nil, fmt.Errorf("PreconditionFailed: object %q already exists", key) + } + } + data, err := io.ReadAll(input.Body) + if err != nil { + return nil, err + } + m.objects[key] = data + return &s3.PutObjectOutput{}, nil +} + +func (m *mockS3Client) DeleteObject(_ context.Context, input *s3.DeleteObjectInput, _ ...func(*s3.Options)) (*s3.DeleteObjectOutput, error) { + m.mu.Lock() + defer m.mu.Unlock() + key := aws.ToString(input.Key) + delete(m.objects, key) + return &s3.DeleteObjectOutput{}, nil +} + +func (m *mockS3Client) ListObjectsV2(_ context.Context, input *s3.ListObjectsV2Input, _ ...func(*s3.Options)) (*s3.ListObjectsV2Output, error) { + m.mu.Lock() + defer m.mu.Unlock() + prefix := aws.ToString(input.Prefix) + var contents []types.Object + for key := range m.objects { + if strings.HasPrefix(key, prefix) { + contents = append(contents, types.Object{Key: aws.String(key)}) + } + } + return &s3.ListObjectsV2Output{ + Contents: contents, + IsTruncated: aws.Bool(false), + }, nil +} + +func (m *mockS3Client) HeadObject(_ context.Context, input *s3.HeadObjectInput, _ ...func(*s3.Options)) (*s3.HeadObjectOutput, error) { + m.mu.Lock() + defer m.mu.Unlock() + key := aws.ToString(input.Key) + if _, ok := m.objects[key]; !ok { + return nil, &types.NotFound{Message: aws.String("NotFound: " + key)} + } + return &s3.HeadObjectOutput{}, nil +} + +func newTestS3Store(client *mockS3Client) *S3IaCStateStore { + return NewS3IaCStateStoreWithClient(client, "test-bucket", "iac-state/") +} + +func TestS3IaCStateStore_GetState_NotFound(t *testing.T) { + store := newTestS3Store(newMockS3Client()) + + st, err := store.GetState(context.Background(), "nonexistent") + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if st != nil { + t.Fatalf("expected nil state, got %+v", st) + } +} + +func TestS3IaCStateStore_SaveAndGetState(t *testing.T) { + store := newTestS3Store(newMockS3Client()) + + state := &IaCState{ + ResourceID: "cluster-1", + ResourceType: "kubernetes", + Provider: "aws", + Status: "active", + Outputs: map[string]any{"endpoint": "https://k8s.example.com"}, + Config: map[string]any{"region": "us-east-1"}, + Dependencies: []string{"network-1"}, + CreatedAt: "2026-03-09T00:00:00Z", + UpdatedAt: "2026-03-09T00:00:00Z", + } + + if err := store.SaveState(context.Background(), state); err != nil { + t.Fatalf("SaveState: %v", err) + } + + got, err := store.GetState(context.Background(), "cluster-1") + if err != nil { + t.Fatalf("GetState: %v", err) + } + if got == nil { + t.Fatal("expected state, got nil") + } + if got.ResourceID != "cluster-1" { + t.Errorf("ResourceID = %q, want %q", got.ResourceID, "cluster-1") + } + if got.Provider != "aws" { + t.Errorf("Provider = %q, want %q", got.Provider, "aws") + } + if got.Status != "active" { + t.Errorf("Status = %q, want %q", got.Status, "active") + } + if len(got.Dependencies) != 1 || got.Dependencies[0] != "network-1" { + t.Errorf("Dependencies = %#v, want [network-1]", got.Dependencies) + } +} + +func TestS3IaCStateStore_SaveState_Nil(t *testing.T) { + store := newTestS3Store(newMockS3Client()) + + err := store.SaveState(context.Background(), nil) + if err == nil { + t.Fatal("expected error for nil state") + } +} + +func TestS3IaCStateStore_SaveState_EmptyID(t *testing.T) { + store := newTestS3Store(newMockS3Client()) + + err := store.SaveState(context.Background(), &IaCState{}) + if err == nil { + t.Fatal("expected error for empty resource_id") + } +} + +func TestS3IaCStateStore_ListStates(t *testing.T) { + client := newMockS3Client() + store := newTestS3Store(client) + + states := []*IaCState{ + {ResourceID: "r1", ResourceType: "kubernetes", Provider: "aws", Status: "active"}, + {ResourceID: "r2", ResourceType: "database", Provider: "digitalocean", Status: "active"}, + {ResourceID: "r3", ResourceType: "kubernetes", Provider: "aws", Status: "destroyed"}, + } + for _, st := range states { + if err := store.SaveState(context.Background(), st); err != nil { + t.Fatalf("SaveState %q: %v", st.ResourceID, err) + } + } + + // No filter — returns all. + all, err := store.ListStates(context.Background(), nil) + if err != nil { + t.Fatalf("ListStates(nil): %v", err) + } + if len(all) != 3 { + t.Errorf("ListStates(nil) = %d items, want 3", len(all)) + } + + // Filter by provider. + filtered, err := store.ListStates(context.Background(), map[string]string{"provider": "aws"}) + if err != nil { + t.Fatalf("ListStates(provider=aws): %v", err) + } + if len(filtered) != 2 { + t.Errorf("ListStates(provider=aws) = %d items, want 2", len(filtered)) + } + + // Filter by status. + active, err := store.ListStates(context.Background(), map[string]string{"status": "active"}) + if err != nil { + t.Fatalf("ListStates(status=active): %v", err) + } + if len(active) != 2 { + t.Errorf("ListStates(status=active) = %d items, want 2", len(active)) + } +} + +func TestS3IaCStateStore_ListStates_SkipsLockFiles(t *testing.T) { + client := newMockS3Client() + store := newTestS3Store(client) + + // Save a state and lock it — lock file should be skipped in list. + if err := store.SaveState(context.Background(), &IaCState{ResourceID: "r1", Status: "active"}); err != nil { + t.Fatalf("SaveState: %v", err) + } + if err := store.Lock(context.Background(), "r1"); err != nil { + t.Fatalf("Lock: %v", err) + } + + results, err := store.ListStates(context.Background(), nil) + if err != nil { + t.Fatalf("ListStates: %v", err) + } + if len(results) != 1 { + t.Errorf("ListStates returned %d items (expected 1, lock file should be excluded)", len(results)) + } +} + +func TestS3IaCStateStore_DeleteState(t *testing.T) { + store := newTestS3Store(newMockS3Client()) + + if err := store.SaveState(context.Background(), &IaCState{ResourceID: "del-me", Status: "active"}); err != nil { + t.Fatalf("SaveState: %v", err) + } + + if err := store.DeleteState(context.Background(), "del-me"); err != nil { + t.Fatalf("DeleteState: %v", err) + } + + // Should be gone. + st, err := store.GetState(context.Background(), "del-me") + if err != nil { + t.Fatalf("GetState after delete: %v", err) + } + if st != nil { + t.Fatal("expected nil after delete") + } +} + +func TestS3IaCStateStore_DeleteState_NotFound(t *testing.T) { + store := newTestS3Store(newMockS3Client()) + + err := store.DeleteState(context.Background(), "nonexistent") + if err == nil { + t.Fatal("expected error deleting nonexistent state") + } + if !strings.Contains(err.Error(), "not found") { + t.Errorf("error = %q, expected 'not found'", err) + } +} + +func TestS3IaCStateStore_LockUnlock(t *testing.T) { + store := newTestS3Store(newMockS3Client()) + + // Lock should succeed. + if err := store.Lock(context.Background(), "res-1"); err != nil { + t.Fatalf("Lock: %v", err) + } + + // Double-lock should fail. + if err := store.Lock(context.Background(), "res-1"); err == nil { + t.Fatal("expected error on double lock") + } + + // Unlock should succeed. + if err := store.Unlock(context.Background(), "res-1"); err != nil { + t.Fatalf("Unlock: %v", err) + } + + // Re-lock after unlock should succeed. + if err := store.Lock(context.Background(), "res-1"); err != nil { + t.Fatalf("Lock after unlock: %v", err) + } +} + +func TestS3IaCStateStore_Unlock_NotLocked(t *testing.T) { + store := newTestS3Store(newMockS3Client()) + + err := store.Unlock(context.Background(), "not-locked") + if err == nil { + t.Fatal("expected error unlocking a resource that is not locked") + } + if !strings.Contains(err.Error(), "not locked") { + t.Errorf("error = %q, expected 'not locked'", err) + } +} + +func TestS3IaCStateStore_SaveState_Overwrite(t *testing.T) { + store := newTestS3Store(newMockS3Client()) + + original := &IaCState{ResourceID: "r1", Status: "planned"} + if err := store.SaveState(context.Background(), original); err != nil { + t.Fatalf("SaveState (original): %v", err) + } + + updated := &IaCState{ResourceID: "r1", Status: "active"} + if err := store.SaveState(context.Background(), updated); err != nil { + t.Fatalf("SaveState (updated): %v", err) + } + + got, err := store.GetState(context.Background(), "r1") + if err != nil { + t.Fatalf("GetState: %v", err) + } + if got.Status != "active" { + t.Errorf("Status = %q, want %q (overwrite failed)", got.Status, "active") + } +} + +func TestS3IaCStateStore_SanitizesResourceID(t *testing.T) { + client := newMockS3Client() + store := newTestS3Store(client) + + state := &IaCState{ResourceID: "ns/cluster\\1", Status: "active"} + if err := store.SaveState(context.Background(), state); err != nil { + t.Fatalf("SaveState: %v", err) + } + + // Verify the key was sanitized. + client.mu.Lock() + _, exists := client.objects["iac-state/ns_cluster_1.json"] + client.mu.Unlock() + if !exists { + t.Error("expected sanitized key 'iac-state/ns_cluster_1.json' in mock objects") + } + + // Retrieve by original ID. + got, err := store.GetState(context.Background(), "ns/cluster\\1") + if err != nil { + t.Fatalf("GetState: %v", err) + } + if got == nil { + t.Fatal("expected state, got nil") + } +} + +// TestS3IaCStateStore_GetState_BadJSON verifies graceful handling of corrupt data. +func TestS3IaCStateStore_GetState_BadJSON(t *testing.T) { + client := newMockS3Client() + store := newTestS3Store(client) + + // Manually inject bad JSON. + client.mu.Lock() + client.objects["iac-state/bad.json"] = []byte("{invalid json") + client.mu.Unlock() + + _, err := store.GetState(context.Background(), "bad") + if err == nil { + t.Fatal("expected unmarshal error for bad JSON") + } + if !strings.Contains(err.Error(), "unmarshal") { + t.Errorf("error = %q, expected 'unmarshal' substring", err) + } +} + +// Ensure the mock properly serializes JSON round-trip. +func TestS3IaCStateStore_JSONRoundTrip(t *testing.T) { + store := newTestS3Store(newMockS3Client()) + + state := &IaCState{ + ResourceID: "rt-1", + ResourceType: "ecs", + Provider: "aws", + ProviderID: "arn:aws:ecs:us-east-1:123:cluster/test", + ConfigHash: "config-hash-rt-1", + Status: "provisioning", + Outputs: map[string]any{"arn": "arn:aws:ecs:us-east-1:123:cluster/test"}, + Config: map[string]any{"cpu": float64(256), "memory": float64(512)}, + CreatedAt: "2026-01-01T00:00:00Z", + UpdatedAt: "2026-03-09T12:00:00Z", + Error: "timeout waiting for stabilization", + } + + if err := store.SaveState(context.Background(), state); err != nil { + t.Fatalf("SaveState: %v", err) + } + + got, err := store.GetState(context.Background(), "rt-1") + if err != nil { + t.Fatalf("GetState: %v", err) + } + + // Compare via JSON to handle map ordering. + wantJSON, _ := json.Marshal(state) + gotJSON, _ := json.Marshal(got) + if string(wantJSON) != string(gotJSON) { + t.Errorf("round-trip mismatch:\n want: %s\n got: %s", wantJSON, gotJSON) + } +} + +// errS3Client is a mock that returns errors for all operations. +type errS3Client struct{} + +func (e *errS3Client) GetObject(_ context.Context, _ *s3.GetObjectInput, _ ...func(*s3.Options)) (*s3.GetObjectOutput, error) { + return nil, fmt.Errorf("simulated GetObject failure") +} +func (e *errS3Client) PutObject(_ context.Context, _ *s3.PutObjectInput, _ ...func(*s3.Options)) (*s3.PutObjectOutput, error) { + return nil, fmt.Errorf("simulated PutObject failure") +} +func (e *errS3Client) DeleteObject(_ context.Context, _ *s3.DeleteObjectInput, _ ...func(*s3.Options)) (*s3.DeleteObjectOutput, error) { + return nil, fmt.Errorf("simulated DeleteObject failure") +} +func (e *errS3Client) ListObjectsV2(_ context.Context, _ *s3.ListObjectsV2Input, _ ...func(*s3.Options)) (*s3.ListObjectsV2Output, error) { + return nil, fmt.Errorf("simulated ListObjectsV2 failure") +} +func (e *errS3Client) HeadObject(_ context.Context, _ *s3.HeadObjectInput, _ ...func(*s3.Options)) (*s3.HeadObjectOutput, error) { + return nil, fmt.Errorf("simulated HeadObject failure") +} + +func TestS3IaCStateStore_ErrorPropagation(t *testing.T) { + store := NewS3IaCStateStoreWithClient(&errS3Client{}, "test-bucket", "iac-state/") + + // GetState error. + _, err := store.GetState(context.Background(), "x") + if err == nil || !strings.Contains(err.Error(), "simulated") { + t.Errorf("GetState error = %v, want simulated error", err) + } + + // SaveState error. + err = store.SaveState(context.Background(), &IaCState{ResourceID: "x"}) + if err == nil || !strings.Contains(err.Error(), "simulated") { + t.Errorf("SaveState error = %v, want simulated error", err) + } + + // ListStates error. + _, err = store.ListStates(context.Background(), nil) + if err == nil || !strings.Contains(err.Error(), "simulated") { + t.Errorf("ListStates error = %v, want simulated error", err) + } + + // DeleteState error (HeadObject fails). + err = store.DeleteState(context.Background(), "x") + if err == nil || !strings.Contains(err.Error(), "simulated") { + t.Errorf("DeleteState error = %v, want simulated error", err) + } + + // Lock error (HeadObject fails with non-NotFound). + err = store.Lock(context.Background(), "x") + if err == nil || !strings.Contains(err.Error(), "simulated") { + t.Errorf("Lock error = %v, want simulated error", err) + } + + // Unlock error (HeadObject fails with non-NotFound). + err = store.Unlock(context.Background(), "x") + if err == nil || !strings.Contains(err.Error(), "simulated") { + t.Errorf("Unlock error = %v, want simulated error", err) + } +} From cd6dd5c589d7c4fbb97be396451cc8edcc8a59fd Mon Sep 17 00:00:00 2001 From: Jon Langevin Date: Thu, 14 May 2026 19:12:20 -0400 Subject: [PATCH 2/4] feat: serve s3 IaC state backend via pb.IaCStateBackendServer + Configure --- go.mod | 6 +- go.sum | 20 +- internal/host_conformance_test.go | 36 ++++ internal/iacserver.go | 11 ++ internal/statebackend_server.go | 286 +++++++++++++++++++++++++++ internal/statebackend_server_test.go | 236 ++++++++++++++++++++++ plugin.json | 3 +- 7 files changed, 574 insertions(+), 24 deletions(-) create mode 100644 internal/statebackend_server.go create mode 100644 internal/statebackend_server_test.go diff --git a/go.mod b/go.mod index 55228b6..b2505f7 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module github.com/GoCodeAlone/workflow-plugin-aws go 1.26.0 require ( - github.com/GoCodeAlone/workflow v0.51.7 + github.com/GoCodeAlone/workflow v0.51.11-0.20260514225636-522748f35474 github.com/aws/aws-sdk-go-v2 v1.41.7 github.com/aws/aws-sdk-go-v2/config v1.32.16 github.com/aws/aws-sdk-go-v2/credentials v1.19.15 @@ -34,9 +34,6 @@ require ( cloud.google.com/go/iam v1.5.3 // indirect cloud.google.com/go/monitoring v1.24.3 // indirect cloud.google.com/go/storage v1.61.3 // indirect - github.com/Azure/azure-sdk-for-go/sdk/azcore v1.21.0 // indirect - github.com/Azure/azure-sdk-for-go/sdk/internal v1.11.2 // indirect - github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.6.4 // indirect github.com/BurntSushi/toml v1.6.0 // indirect github.com/DataDog/datadog-go/v5 v5.8.3 // indirect github.com/GoCodeAlone/go-plugin v1.7.0 // indirect @@ -61,7 +58,6 @@ require ( github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.23 // indirect github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.23 // indirect github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.23 // indirect - github.com/aws/aws-sdk-go-v2/service/codebuild v1.68.12 // indirect github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.8 // indirect github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.9.12 // indirect github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.22 // indirect diff --git a/go.sum b/go.sum index be0d221..456d6f6 100644 --- a/go.sum +++ b/go.sum @@ -23,20 +23,8 @@ cloud.google.com/go/trace v1.11.7 h1:kDNDX8JkaAG3R2nq1lIdkb7FCSi1rCmsEtKVsty7p+U cloud.google.com/go/trace v1.11.7/go.mod h1:TNn9d5V3fQVf6s4SCveVMIBS2LJUqo73GACmq/Tky0s= dario.cat/mergo v1.0.2 h1:85+piFYR1tMbRrLcDwR18y4UKJ3aH1Tbzi24VRW1TK8= dario.cat/mergo v1.0.2/go.mod h1:E/hbnu0NxMFBjpMIE34DRGLWqDy0g5FuKDhCb31ngxA= -github.com/Azure/azure-sdk-for-go/sdk/azcore v1.21.0 h1:fou+2+WFTib47nS+nz/ozhEBnvU96bKHy6LjRsY4E28= -github.com/Azure/azure-sdk-for-go/sdk/azcore v1.21.0/go.mod h1:t76Ruy8AHvUAC8GfMWJMa0ElSbuIcO03NLpynfbgsPA= -github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.13.1 h1:Hk5QBxZQC1jb2Fwj6mpzme37xbCDdNTxU7O9eb5+LB4= -github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.13.1/go.mod h1:IYus9qsFobWIc2YVwe/WPjcnyCkPKtnHAqUYeebc8z0= -github.com/Azure/azure-sdk-for-go/sdk/internal v1.11.2 h1:9iefClla7iYpfYWdzPCRDozdmndjTm8DXdpCzPajMgA= -github.com/Azure/azure-sdk-for-go/sdk/internal v1.11.2/go.mod h1:XtLgD3ZD34DAaVIIAyG3objl5DynM3CQ/vMcbBNJZGI= -github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/storage/armstorage v1.8.1 h1:/Zt+cDPnpC3OVDm/JKLOs7M2DKmLRIIp3XIx9pHHiig= -github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/storage/armstorage v1.8.1/go.mod h1:Ng3urmn6dYe8gnbCMoHHVl5APYz2txho3koEkV2o2HA= -github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.6.4 h1:jWQK1GI+LeGGUKBADtcH2rRqPxYB1Ljwms5gFA2LqrM= -github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.6.4/go.mod h1:8mwH4klAm9DUgR2EEHyEEAQlRDvLPyg5fQry3y+cDew= github.com/Azure/go-ansiterm v0.0.0-20250102033503-faa5f7b0171c h1:udKWzYgxTojEKWjV8V+WSxDXJ4NFATAsZjh8iIbsQIg= github.com/Azure/go-ansiterm v0.0.0-20250102033503-faa5f7b0171c/go.mod h1:xomTg63KZ2rFqZQzSB4Vz2SUXa1BpHTVz9L5PTmPC4E= -github.com/AzureAD/microsoft-authentication-library-for-go v1.6.0 h1:XRzhVemXdgvJqCH0sFfrBUTnUJSBrBf7++ypk+twtRs= -github.com/AzureAD/microsoft-authentication-library-for-go v1.6.0/go.mod h1:HKpQxkWaGLJ+D/5H8QRpyQXA1eKjxkFlOMwck5+33Jk= github.com/BurntSushi/toml v1.6.0 h1:dRaEfpa2VI55EwlIW72hMRHdWouJeRF7TPYhI+AUQjk= github.com/BurntSushi/toml v1.6.0/go.mod h1:ukJfTF/6rtPPRCnwkur4qwRxa8vTRFBF0uk2lLoLwho= github.com/DataDog/datadog-go v3.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ= @@ -56,8 +44,8 @@ github.com/GoCodeAlone/modular/modules/jsonschema v1.15.0 h1:xb1mI4NZkzvNKQ2F6nk github.com/GoCodeAlone/modular/modules/jsonschema v1.15.0/go.mod h1:hhGouwAVsonmJ4Lain4jINZ9nZCoc9l9eF3BHbmR8eE= github.com/GoCodeAlone/modular/modules/reverseproxy/v2 v2.8.0 h1:cvdLHbM/vzvygQTcAWSJsy+dAPzzwWyjzKMmTBFcFIo= github.com/GoCodeAlone/modular/modules/reverseproxy/v2 v2.8.0/go.mod h1:/9ipMG4qM2CHQ14BfXKdVlYRJelef6M8MFI5TbZv67M= -github.com/GoCodeAlone/workflow v0.51.7 h1:+81UNlLQPfnB6hwncWM6DPHHmonLoiqBL0YGQ6OW9g4= -github.com/GoCodeAlone/workflow v0.51.7/go.mod h1:5dh9esKq48kH4zKWjccXmyOirWL+T+YzfLclzhdRIV4= +github.com/GoCodeAlone/workflow v0.51.11-0.20260514225636-522748f35474 h1:C5Hi9BCtTDDP7k/++++LOfj2LxyaKP4YtgB0h5xgkeQ= +github.com/GoCodeAlone/workflow v0.51.11-0.20260514225636-522748f35474/go.mod h1:L1kIOZqebO1WPriHXcqT7bg/uS7pExR8pOrWvurqhR4= github.com/GoCodeAlone/yaegi v0.17.2 h1:WK6Y6e0t1a6U7r+S2dN3CGWW1PizYD3zO0zneToZPxM= github.com/GoCodeAlone/yaegi v0.17.2/go.mod h1:z5Pr6Wse6QJcQvpgxTxzMAevFarH0N37TG88Y9dprx0= github.com/GoogleCloudPlatform/opentelemetry-operations-go/detectors/gcp v1.32.0 h1:rIkQfkCOVKc1OiRCNcSDD8ml5RJlZbH/Xsq7lbpynwc= @@ -114,8 +102,6 @@ github.com/aws/aws-sdk-go-v2/service/applicationautoscaling v1.41.16 h1:9ePxWacy github.com/aws/aws-sdk-go-v2/service/applicationautoscaling v1.41.16/go.mod h1:gR1tnThD1DBemyG1rmZ9U5+WbfGoiLUaZDvsQ6wbAjM= github.com/aws/aws-sdk-go-v2/service/cloudwatch v1.55.2 h1:mleWBVIxwceEzyItUVoqMFiv6TmOP6ECPoN6WB/VWXc= github.com/aws/aws-sdk-go-v2/service/cloudwatch v1.55.2/go.mod h1:cMApt548kNgu87UsBTNWVv+fpzjbUTFRSFjD1688SBs= -github.com/aws/aws-sdk-go-v2/service/codebuild v1.68.12 h1:lQTVEv/YAk8Rw1Yf4XZS/jNNxF9klCN10WcSR3xlMtU= -github.com/aws/aws-sdk-go-v2/service/codebuild v1.68.12/go.mod h1:yoa0R6Xku788EmJYkFiARzJBxt4A3hgFjQPRmMAttr0= github.com/aws/aws-sdk-go-v2/service/ec2 v1.296.0 h1:98Miqj16un1WLNyM1RjVDhXYumhqZrQfAeG8i4jPG6o= github.com/aws/aws-sdk-go-v2/service/ec2 v1.296.0/go.mod h1:T6ndRfdhnXLIY5oKBHjYZDVj706los2zGdpThppquvA= github.com/aws/aws-sdk-go-v2/service/ecr v1.44.2 h1:USCQWra7IXZiH25796EZizvSRmJeS5wJNrdv70JIk0o= @@ -553,8 +539,6 @@ github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144T github.com/philhofer/fwd v1.1.1/go.mod h1:gk3iGcWd9+svBvR0sR+KPcfE+RNWozjowpeBVG3ZVNU= github.com/pierrec/lz4/v4 v4.1.26 h1:GrpZw1gZttORinvzBdXPUXATeqlJjqUG/D87TKMnhjY= github.com/pierrec/lz4/v4 v4.1.26/go.mod h1:EoQMVJgeeEOMsCqCzqFm2O0cJvljX2nGZjcRIPL34O4= -github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c h1:+mdjkGKdHQG3305AYmdv1U2eRNDiU2ErMBj1gwrq8eQ= -github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c/go.mod h1:7rwL4CYBLnjLxUqIJNnCWiEdr3bn6IUYi15bNlnbCCU= github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= diff --git a/internal/host_conformance_test.go b/internal/host_conformance_test.go index 8ebd26b..bcff0d4 100644 --- a/internal/host_conformance_test.go +++ b/internal/host_conformance_test.go @@ -138,3 +138,39 @@ func capabilitiesHasResource(capabilities *pb.CapabilitiesResponse, resourceType } return false } + +// TestCapabilityParity_IaCStateBackends asserts that every iac.state backend +// name declared in plugin.json capabilities.iacStateBackends is actually +// served by the plugin — i.e. returned by NewIaCServer().ListBackendNames. +// This guards against a manifest claiming a backend the plugin does not serve. +func TestCapabilityParity_IaCStateBackends(t *testing.T) { + repoRoot := hostConformanceRepoRoot(t) + data, err := os.ReadFile(filepath.Join(repoRoot, "plugin.json")) + if err != nil { + t.Fatalf("read plugin.json: %v", err) + } + var manifest struct { + Capabilities struct { + IaCStateBackends []string `json:"iacStateBackends"` + } `json:"capabilities"` + } + if err := json.Unmarshal(data, &manifest); err != nil { + t.Fatalf("parse plugin.json: %v", err) + } + + resp, err := NewIaCServer().ListBackendNames(context.Background(), &pb.ListBackendNamesRequest{}) + if err != nil { + t.Fatalf("ListBackendNames: %v", err) + } + served := make(map[string]bool, len(resp.GetBackendNames())) + for _, n := range resp.GetBackendNames() { + served[n] = true + } + + for _, declared := range manifest.Capabilities.IaCStateBackends { + if !served[declared] { + t.Errorf("plugin.json declares iacStateBackends entry %q but ListBackendNames does not serve it (served: %v)", + declared, resp.GetBackendNames()) + } + } +} diff --git a/internal/iacserver.go b/internal/iacserver.go index ab3f3b4..088b7a6 100644 --- a/internal/iacserver.go +++ b/internal/iacserver.go @@ -42,8 +42,15 @@ type awsIaCServer struct { pb.UnimplementedIaCProviderValidatorServer pb.UnimplementedIaCProviderDriftConfigDetectorServer pb.UnimplementedResourceDriverServer + pb.UnimplementedIaCStateBackendServer provider *provider.AWSProvider + + // stateBackend serves the typed pb.IaCStateBackendServer surface + // (s3 backend). Per decisions/0035, this one type carries both the + // IaC-provider and the IaC-state-backend concerns. The backing store is + // constructed lazily via the Configure RPC — see internal/statebackend_server.go. + stateBackend stateBackend } // newAWSIaCServer constructs a typed-IaC server backed by the given @@ -72,6 +79,10 @@ var ( // delegates to DetectDrift (existence-only behavior; ignores the specs map). _ pb.IaCProviderDriftDetectorServer = (*awsIaCServer)(nil) _ pb.ResourceDriverServer = (*awsIaCServer)(nil) + // awsIaCServer also SERVES the typed IaC state-backend contract (s3 + // backend). The SDK serve hook auto-registers this via type-assertion at + // plugin startup — see cmd/workflow-plugin-aws/main.go. + _ pb.IaCStateBackendServer = (*awsIaCServer)(nil) ) // ── Required service methods ──────────────────────────────────────────────── diff --git a/internal/statebackend_server.go b/internal/statebackend_server.go new file mode 100644 index 0000000..2fcc975 --- /dev/null +++ b/internal/statebackend_server.go @@ -0,0 +1,286 @@ +// Package internal — typed pb.IaCStateBackendServer implementation. +// +// Per decisions/0035 (one type carries both concerns), awsIaCServer ALSO +// serves the typed IaC state-backend contract: it persists IaC state via an +// S3IaCStateStore (ported from workflow core) and answers ListBackendNames +// with the single backend name "s3". +// +// Hard invariants (strict-contracts force-cutover): +// - NO structpb.Struct on the wire — the free-form Outputs / Config +// map[string]any fields of IaCState cross as JSON bytes (outputs_json, +// config_json), converted locally via encoding/json below. +// - The store is lazily constructed: the host delivers the iac.state module +// config via the Configure RPC (decisions/0036); until then GetState/etc. +// return a clear FailedPrecondition error rather than panicking. +package internal + +import ( + "context" + "encoding/json" + "fmt" + "sync" + + "github.com/GoCodeAlone/workflow-plugin-aws/internal/statebackend" + pb "github.com/GoCodeAlone/workflow/plugin/external/proto" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +// awsStateBackendName is the single iac.state backend name this plugin serves. +const awsStateBackendName = "s3" + +// stateBackend holds the lazily-constructed s3 state store plus the guard that +// builds it once. It is embedded into awsIaCServer. +type stateBackend struct { + mu sync.Mutex + store *statebackend.S3IaCStateStore +} + +// resolveStore returns the configured store, or a codes.FailedPrecondition +// gRPC status if the host has not yet delivered the iac.state config for this +// plugin process via the Configure RPC. +// +// An unset store yields FailedPrecondition — distinct from a generic Internal +// error — so the engine can tell "backend not configured" apart from a real +// storage failure, rather than the server panicking on a nil store. +func (b *stateBackend) resolveStore() (*statebackend.S3IaCStateStore, error) { + b.mu.Lock() + defer b.mu.Unlock() + if b.store == nil { + return nil, status.Error(codes.FailedPrecondition, + "aws state backend: s3 backend is not configured") + } + return b.store, nil +} + +// setStateStore injects the backing store. Used by the Configure handler and tests. +func (b *stateBackend) setStateStore(s *statebackend.S3IaCStateStore) { + b.mu.Lock() + defer b.mu.Unlock() + b.store = s +} + +// s3Config is the iac.state module config the host delivers via the Configure +// RPC. The JSON keys mirror exactly the config keys the in-core spaces/s3 +// switch case in workflow core's iac_module.go read. +type s3Config struct { + Region string `json:"region"` + Bucket string `json:"bucket"` + Prefix string `json:"prefix"` + AccessKey string `json:"accessKey"` + SecretKey string `json:"secretKey"` + Endpoint string `json:"endpoint"` +} + +// ── pb.IaCStateBackendServer methods (on awsIaCServer) ────────────────────── + +// Configure decodes the host-delivered iac.state module config and lazily +// constructs the s3 state store, satisfying decisions/0036's host-config +// plumbing. Until Configure runs, the state RPCs return FailedPrecondition +// (see resolveStore). The config_json bytes carry the module config +// map[string]any per the iac.proto JSON-bytes invariant. +func (s *awsIaCServer) Configure(_ context.Context, req *pb.ConfigureRequest) (*pb.ConfigureResponse, error) { + if req.GetBackendName() != awsStateBackendName { + return nil, status.Errorf(codes.InvalidArgument, + "aws state backend: Configure for backend %q — this plugin serves only %q", + req.GetBackendName(), awsStateBackendName) + } + var cfg s3Config + if err := json.Unmarshal(req.GetConfigJson(), &cfg); err != nil { + return nil, status.Errorf(codes.InvalidArgument, + "aws state backend: decode Configure config: %v", err) + } + if cfg.Bucket == "" { + return nil, status.Error(codes.InvalidArgument, + "aws state backend: s3 backend requires 'bucket' config") + } + store, err := statebackend.NewS3IaCStateStore( + cfg.Region, cfg.Bucket, cfg.Prefix, cfg.AccessKey, cfg.SecretKey, cfg.Endpoint) + if err != nil { + return nil, status.Errorf(codes.InvalidArgument, + "aws state backend: construct s3 store: %v", err) + } + s.stateBackend.setStateStore(store) + return &pb.ConfigureResponse{}, nil +} + +// GetState retrieves a state record by resource ID. +func (s *awsIaCServer) GetState(ctx context.Context, req *pb.GetStateRequest) (*pb.GetStateResponse, error) { + store, err := s.stateBackend.resolveStore() + if err != nil { + return nil, err + } + st, err := store.GetState(ctx, req.GetResourceId()) + if err != nil { + return nil, err + } + if st == nil { + return &pb.GetStateResponse{Exists: false}, nil + } + pbState, err := iacStateToPB(st) + if err != nil { + return nil, fmt.Errorf("aws state backend: encode GetState response: %w", err) + } + return &pb.GetStateResponse{State: pbState, Exists: true}, nil +} + +// SaveState inserts or replaces a state record. +func (s *awsIaCServer) SaveState(ctx context.Context, req *pb.SaveStateRequest) (*pb.SaveStateResponse, error) { + store, err := s.stateBackend.resolveStore() + if err != nil { + return nil, err + } + st, err := iacStateFromPB(req.GetState()) + if err != nil { + return nil, fmt.Errorf("aws state backend: decode SaveState request: %w", err) + } + if err := store.SaveState(ctx, st); err != nil { + return nil, err + } + return &pb.SaveStateResponse{}, nil +} + +// ListStates returns all state records matching the provided key=value filter. +func (s *awsIaCServer) ListStates(ctx context.Context, req *pb.ListStatesRequest) (*pb.ListStatesResponse, error) { + store, err := s.stateBackend.resolveStore() + if err != nil { + return nil, err + } + states, err := store.ListStates(ctx, req.GetFilter()) + if err != nil { + return nil, err + } + pbStates := make([]*pb.IaCState, 0, len(states)) + for _, st := range states { + pbState, err := iacStateToPB(st) + if err != nil { + return nil, fmt.Errorf("aws state backend: encode ListStates response: %w", err) + } + pbStates = append(pbStates, pbState) + } + return &pb.ListStatesResponse{States: pbStates}, nil +} + +// DeleteState removes a state record by resource ID. +func (s *awsIaCServer) DeleteState(ctx context.Context, req *pb.DeleteStateRequest) (*pb.DeleteStateResponse, error) { + store, err := s.stateBackend.resolveStore() + if err != nil { + return nil, err + } + if err := store.DeleteState(ctx, req.GetResourceId()); err != nil { + return nil, err + } + return &pb.DeleteStateResponse{}, nil +} + +// Lock acquires an exclusive lock for the given resource ID. +func (s *awsIaCServer) Lock(ctx context.Context, req *pb.LockRequest) (*pb.LockResponse, error) { + store, err := s.stateBackend.resolveStore() + if err != nil { + return nil, err + } + if err := store.Lock(ctx, req.GetResourceId()); err != nil { + return nil, err + } + return &pb.LockResponse{}, nil +} + +// Unlock releases the lock for the given resource ID. +func (s *awsIaCServer) Unlock(ctx context.Context, req *pb.UnlockRequest) (*pb.UnlockResponse, error) { + store, err := s.stateBackend.resolveStore() + if err != nil { + return nil, err + } + if err := store.Unlock(ctx, req.GetResourceId()); err != nil { + return nil, err + } + return &pb.UnlockResponse{}, nil +} + +// ListBackendNames reports the iac.state backend names this plugin serves. +func (s *awsIaCServer) ListBackendNames(_ context.Context, _ *pb.ListBackendNamesRequest) (*pb.ListBackendNamesResponse, error) { + return &pb.ListBackendNamesResponse{BackendNames: []string{awsStateBackendName}}, nil +} + +// ── IaCState ⇄ pb.IaCState converters ─────────────────────────────────────── +// +// Local re-implementation of workflow core's unexported iacStateToProto / +// iacStateFromProto. The Outputs / Config map[string]any fields cross the wire +// as JSON bytes (outputs_json / config_json) per the iac.proto invariant — NO +// structpb. + +func iacStateToPB(st *statebackend.IaCState) (*pb.IaCState, error) { + if st == nil { + return nil, nil + } + outputsJSON, err := marshalIaCMap(st.Outputs) + if err != nil { + return nil, fmt.Errorf("marshal outputs: %w", err) + } + configJSON, err := marshalIaCMap(st.Config) + if err != nil { + return nil, fmt.Errorf("marshal config: %w", err) + } + return &pb.IaCState{ + ResourceId: st.ResourceID, + ResourceType: st.ResourceType, + Provider: st.Provider, + ProviderRef: st.ProviderRef, + ProviderId: st.ProviderID, + ConfigHash: st.ConfigHash, + Status: st.Status, + OutputsJson: outputsJSON, + ConfigJson: configJSON, + Dependencies: append([]string(nil), st.Dependencies...), + CreatedAt: st.CreatedAt, + UpdatedAt: st.UpdatedAt, + Error: st.Error, + }, nil +} + +func iacStateFromPB(s *pb.IaCState) (*statebackend.IaCState, error) { + if s == nil { + return nil, fmt.Errorf("iac state must not be nil") + } + outputs, err := unmarshalIaCMap(s.GetOutputsJson()) + if err != nil { + return nil, fmt.Errorf("unmarshal outputs: %w", err) + } + config, err := unmarshalIaCMap(s.GetConfigJson()) + if err != nil { + return nil, fmt.Errorf("unmarshal config: %w", err) + } + return &statebackend.IaCState{ + ResourceID: s.GetResourceId(), + ResourceType: s.GetResourceType(), + Provider: s.GetProvider(), + ProviderRef: s.GetProviderRef(), + ProviderID: s.GetProviderId(), + ConfigHash: s.GetConfigHash(), + Status: s.GetStatus(), + Outputs: outputs, + Config: config, + Dependencies: append([]string(nil), s.GetDependencies()...), + CreatedAt: s.GetCreatedAt(), + UpdatedAt: s.GetUpdatedAt(), + Error: s.GetError(), + }, nil +} + +func marshalIaCMap(m map[string]any) ([]byte, error) { + if m == nil { + return nil, nil + } + return json.Marshal(m) +} + +func unmarshalIaCMap(b []byte) (map[string]any, error) { + if len(b) == 0 { + return nil, nil + } + var out map[string]any + if err := json.Unmarshal(b, &out); err != nil { + return nil, err + } + return out, nil +} diff --git a/internal/statebackend_server_test.go b/internal/statebackend_server_test.go new file mode 100644 index 0000000..cdc3b35 --- /dev/null +++ b/internal/statebackend_server_test.go @@ -0,0 +1,236 @@ +package internal + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "strings" + "sync" + "testing" + + "github.com/GoCodeAlone/workflow-plugin-aws/internal/statebackend" + pb "github.com/GoCodeAlone/workflow/plugin/external/proto" + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/s3" + "github.com/aws/aws-sdk-go-v2/service/s3/types" +) + +// Compile-time guard: awsIaCServer MUST satisfy the typed state-backend +// contract so the SDK serve hook auto-registers it at plugin startup. +var _ pb.IaCStateBackendServer = (*awsIaCServer)(nil) + +func TestIaCServer_ListBackendNames(t *testing.T) { + s := NewIaCServer() + resp, err := s.ListBackendNames(context.Background(), &pb.ListBackendNamesRequest{}) + if err != nil { + t.Fatalf("ListBackendNames: %v", err) + } + got := resp.GetBackendNames() + if len(got) != 1 || got[0] != "s3" { + t.Errorf("ListBackendNames = %v, want [s3]", got) + } +} + +func TestIaCServer_StateBackend_NotConfigured(t *testing.T) { + s := NewIaCServer() + // With no store injected, the state RPCs must return a clear error rather + // than panicking on a nil store. + if _, err := s.GetState(context.Background(), &pb.GetStateRequest{ResourceId: "x"}); err == nil { + t.Error("GetState: expected error when backend not configured") + } + if _, err := s.SaveState(context.Background(), &pb.SaveStateRequest{State: &pb.IaCState{ResourceId: "x"}}); err == nil { + t.Error("SaveState: expected error when backend not configured") + } + if _, err := s.ListStates(context.Background(), &pb.ListStatesRequest{}); err == nil { + t.Error("ListStates: expected error when backend not configured") + } + if _, err := s.DeleteState(context.Background(), &pb.DeleteStateRequest{ResourceId: "x"}); err == nil { + t.Error("DeleteState: expected error when backend not configured") + } + if _, err := s.Lock(context.Background(), &pb.LockRequest{ResourceId: "x"}); err == nil { + t.Error("Lock: expected error when backend not configured") + } + if _, err := s.Unlock(context.Background(), &pb.UnlockRequest{ResourceId: "x"}); err == nil { + t.Error("Unlock: expected error when backend not configured") + } +} + +func TestIaCServer_StateBackend_Configure(t *testing.T) { + s := NewIaCServer() + + // Before Configure, the backend is unconfigured — resolveStore must fail. + if _, err := s.stateBackend.resolveStore(); err == nil { + t.Fatal("resolveStore: expected FailedPrecondition before Configure") + } + + cfg := map[string]any{ + "region": "us-east-1", + "bucket": "tfstate", + "prefix": "iac-state/", + } + cfgJSON, err := json.Marshal(cfg) + if err != nil { + t.Fatalf("marshal cfg: %v", err) + } + if _, err := s.Configure(context.Background(), &pb.ConfigureRequest{ + BackendName: "s3", + ConfigJson: cfgJSON, + }); err != nil { + t.Fatalf("Configure: %v", err) + } + + // After Configure, the lazily-constructed store must resolve. + store, err := s.stateBackend.resolveStore() + if err != nil { + t.Fatalf("resolveStore after Configure: %v", err) + } + if store == nil { + t.Fatal("resolveStore after Configure: store is nil") + } + + // A Configure for a backend name this plugin does not serve must be rejected. + if _, err := s.Configure(context.Background(), &pb.ConfigureRequest{ + BackendName: "azure_blob", + ConfigJson: cfgJSON, + }); err == nil { + t.Error("Configure: expected error for unknown backend name") + } + + // A Configure missing the required 'bucket' config must be rejected. + noBucket, _ := json.Marshal(map[string]any{"region": "us-east-1"}) + if _, err := s.Configure(context.Background(), &pb.ConfigureRequest{ + BackendName: "s3", + ConfigJson: noBucket, + }); err == nil { + t.Error("Configure: expected error when 'bucket' config is missing") + } +} + +func TestIaCServer_StateBackend_RoundTrip(t *testing.T) { + s := NewIaCServer() + store := statebackend.NewS3IaCStateStoreWithClient(newMockS3StateClient(), "test-bucket", "iac-state/") + s.stateBackend.setStateStore(store) + + ctx := context.Background() + in := &pb.IaCState{ + ResourceId: "s3-rt", + ResourceType: "kubernetes", + Provider: "aws", + Status: "active", + OutputsJson: []byte(`{"endpoint":"https://k8s.example.com"}`), + ConfigJson: []byte(`{"region":"us-east-1"}`), + } + if _, err := s.SaveState(ctx, &pb.SaveStateRequest{State: in}); err != nil { + t.Fatalf("SaveState: %v", err) + } + + got, err := s.GetState(ctx, &pb.GetStateRequest{ResourceId: "s3-rt"}) + if err != nil { + t.Fatalf("GetState: %v", err) + } + if !got.GetExists() { + t.Fatal("GetState: expected exists=true") + } + if got.GetState().GetProvider() != "aws" { + t.Errorf("Provider = %q, want aws", got.GetState().GetProvider()) + } + if string(got.GetState().GetOutputsJson()) != `{"endpoint":"https://k8s.example.com"}` { + t.Errorf("OutputsJson round-trip mismatch: %s", got.GetState().GetOutputsJson()) + } + + list, err := s.ListStates(ctx, &pb.ListStatesRequest{}) + if err != nil { + t.Fatalf("ListStates: %v", err) + } + if len(list.GetStates()) != 1 { + t.Errorf("ListStates = %d, want 1", len(list.GetStates())) + } + + if _, err := s.Lock(ctx, &pb.LockRequest{ResourceId: "s3-rt"}); err != nil { + t.Fatalf("Lock: %v", err) + } + if _, err := s.Unlock(ctx, &pb.UnlockRequest{ResourceId: "s3-rt"}); err != nil { + t.Fatalf("Unlock: %v", err) + } + + if _, err := s.DeleteState(ctx, &pb.DeleteStateRequest{ResourceId: "s3-rt"}); err != nil { + t.Fatalf("DeleteState: %v", err) + } + after, err := s.GetState(ctx, &pb.GetStateRequest{ResourceId: "s3-rt"}) + if err != nil { + t.Fatalf("GetState after delete: %v", err) + } + if after.GetExists() { + t.Error("GetState after delete: expected exists=false") + } +} + +// mockS3StateClient is an in-memory statebackend.S3Client for the round-trip test. +type mockS3StateClient struct { + mu sync.Mutex + objects map[string][]byte +} + +func newMockS3StateClient() *mockS3StateClient { + return &mockS3StateClient{objects: make(map[string][]byte)} +} + +func (m *mockS3StateClient) GetObject(_ context.Context, input *s3.GetObjectInput, _ ...func(*s3.Options)) (*s3.GetObjectOutput, error) { + m.mu.Lock() + defer m.mu.Unlock() + key := aws.ToString(input.Key) + data, ok := m.objects[key] + if !ok { + return nil, &types.NoSuchKey{Message: aws.String("NoSuchKey: " + key)} + } + return &s3.GetObjectOutput{Body: io.NopCloser(bytes.NewReader(data))}, nil +} + +func (m *mockS3StateClient) PutObject(_ context.Context, input *s3.PutObjectInput, _ ...func(*s3.Options)) (*s3.PutObjectOutput, error) { + m.mu.Lock() + defer m.mu.Unlock() + key := aws.ToString(input.Key) + if aws.ToString(input.IfNoneMatch) == "*" { + if _, exists := m.objects[key]; exists { + return nil, fmt.Errorf("PreconditionFailed: object %q already exists", key) + } + } + data, err := io.ReadAll(input.Body) + if err != nil { + return nil, err + } + m.objects[key] = data + return &s3.PutObjectOutput{}, nil +} + +func (m *mockS3StateClient) DeleteObject(_ context.Context, input *s3.DeleteObjectInput, _ ...func(*s3.Options)) (*s3.DeleteObjectOutput, error) { + m.mu.Lock() + defer m.mu.Unlock() + delete(m.objects, aws.ToString(input.Key)) + return &s3.DeleteObjectOutput{}, nil +} + +func (m *mockS3StateClient) ListObjectsV2(_ context.Context, input *s3.ListObjectsV2Input, _ ...func(*s3.Options)) (*s3.ListObjectsV2Output, error) { + m.mu.Lock() + defer m.mu.Unlock() + prefix := aws.ToString(input.Prefix) + var contents []types.Object + for key := range m.objects { + if strings.HasPrefix(key, prefix) { + contents = append(contents, types.Object{Key: aws.String(key)}) + } + } + return &s3.ListObjectsV2Output{Contents: contents, IsTruncated: aws.Bool(false)}, nil +} + +func (m *mockS3StateClient) HeadObject(_ context.Context, input *s3.HeadObjectInput, _ ...func(*s3.Options)) (*s3.HeadObjectOutput, error) { + m.mu.Lock() + defer m.mu.Unlock() + key := aws.ToString(input.Key) + if _, ok := m.objects[key]; !ok { + return nil, &types.NotFound{Message: aws.String("NotFound: " + key)} + } + return &s3.HeadObjectOutput{}, nil +} diff --git a/plugin.json b/plugin.json index bdbcea7..965a2e8 100644 --- a/plugin.json +++ b/plugin.json @@ -16,7 +16,8 @@ "iac.provider" ], "stepTypes": [], - "triggerTypes": [] + "triggerTypes": [], + "iacStateBackends": ["s3"] }, "downloads": [ { From 2211a62c13b2a3cd4b3008422ed525b0b522ab0c Mon Sep 17 00:00:00 2001 From: Jon Langevin Date: Thu, 14 May 2026 19:27:47 -0400 Subject: [PATCH 3/4] fix: bump minEngineVersion to the engine version providing the Configure RPC --- plugin.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugin.json b/plugin.json index 965a2e8..a800e46 100644 --- a/plugin.json +++ b/plugin.json @@ -6,7 +6,7 @@ "license": "MIT", "type": "external", "tier": "community", - "minEngineVersion": "0.51.0", + "minEngineVersion": "0.51.11-0.20260514225636-522748f35474", "keywords": ["aws", "iac", "infrastructure", "ecs", "eks", "rds", "vpc", "s3", "autoscaling"], "homepage": "https://github.com/GoCodeAlone/workflow-plugin-aws", "repository": "https://github.com/GoCodeAlone/workflow-plugin-aws", From a1b5d848938ed9d65afe28603c6c043ae3b1df6e Mon Sep 17 00:00:00 2001 From: Jon Langevin Date: Thu, 14 May 2026 23:45:38 -0400 Subject: [PATCH 4/4] =?UTF-8?q?fix:=20bump=20minEngineVersion=20to=20v0.52?= =?UTF-8?q?.0=20(clean=20semver=20=E2=80=94=20Configure=20RPC=20available)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- plugin.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugin.json b/plugin.json index a800e46..f93840a 100644 --- a/plugin.json +++ b/plugin.json @@ -6,7 +6,7 @@ "license": "MIT", "type": "external", "tier": "community", - "minEngineVersion": "0.51.11-0.20260514225636-522748f35474", + "minEngineVersion": "0.52.0", "keywords": ["aws", "iac", "infrastructure", "ecs", "eks", "rds", "vpc", "s3", "autoscaling"], "homepage": "https://github.com/GoCodeAlone/workflow-plugin-aws", "repository": "https://github.com/GoCodeAlone/workflow-plugin-aws",