diff --git a/.phase-b-complete b/.phase-b-complete new file mode 100644 index 00000000..e69de29b diff --git a/DOCUMENTATION.md b/DOCUMENTATION.md index daa17a00..fb5cd924 100644 --- a/DOCUMENTATION.md +++ b/DOCUMENTATION.md @@ -179,7 +179,6 @@ flowchart TD | `step.dlq_replay` | Replays messages from the dead-letter queue | pipelinesteps | | `step.retry_with_backoff` | Retries a sub-pipeline with exponential backoff | pipelinesteps | | `step.resilient_circuit_breaker` | Wraps a sub-pipeline with a circuit breaker | pipelinesteps | -| `step.s3_upload` | Uploads a file or data to an S3-compatible bucket | pipelinesteps | | `step.auth_validate` | Validates an authentication token and populates claims | pipelinesteps | | `step.token_revoke` | Revokes an auth token | pipelinesteps | | `step.field_reencrypt` | Re-encrypts a field with a new key | pipelinesteps | @@ -537,7 +536,6 @@ See [v0.53.0 migration guide](docs/migrations/v0.53.0-aws-iac-removal.md). ### Storage | Type | Description | Plugin | |------|-------------|--------| -| `storage.s3` | Amazon S3 storage | storage | | `storage.gcs` | Google Cloud Storage | storage | | `storage.local` | Local filesystem storage | storage | | `storage.sqlite` | SQLite storage | storage | diff --git a/cmd/wfctl/infra_state_store.go b/cmd/wfctl/infra_state_store.go index db37723d..f4f5c757 100644 --- a/cmd/wfctl/infra_state_store.go +++ b/cmd/wfctl/infra_state_store.go @@ -82,16 +82,16 @@ func resolveStateStore(cfgFile, envName string) (infraStateStore, error) { } return &fsWfctlStateStore{dir: dir}, nil - case "spaces": - return resolveSpacesStateStore(cfg) - case "postgres": return resolvePostgresStateStore(cfg) + case "spaces": + return nil, fmt.Errorf("iac.state backend %q is now plugin-served by workflow-plugin-digitalocean v1.1.0; "+ + "install and load the plugin to use the Spaces backend (wfctl direct-path commands no longer support in-tree spaces)", backend) + case "s3": - return nil, fmt.Errorf("s3 state store backend not yet supported by wfctl direct-path commands; " + - "create the bucket manually and reference it in iac.state.bucket. " + - "Contribute a resolveS3StateStore helper to unblock this") + return nil, fmt.Errorf("iac.state backend %q is now plugin-served by workflow-plugin-aws v1.1.0; "+ + "install and load the plugin to use the S3 backend (wfctl direct-path commands no longer support in-tree s3)", backend) case "gcs": return nil, fmt.Errorf("gcs state store backend not yet supported by wfctl direct-path commands; " + @@ -117,10 +117,9 @@ type fsWfctlStateStore struct { dir string } -// iacStateRecord mirrors the JSON schema used by the filesystem and Spaces -// backends. The field names must stay stable to remain compatible with the -// existing loadFSState reader and the importFromTFState / importFromPulumi -// writers. +// iacStateRecord mirrors the JSON schema used by the filesystem backend. The +// field names must stay stable to remain compatible with the existing +// loadFSState reader and the importFromTFState / importFromPulumi writers. type iacStateRecord struct { ResourceID string `json:"resource_id"` ResourceType string `json:"resource_type"` @@ -226,60 +225,6 @@ func (s *fsWfctlStateStore) SaveMetadata(_ context.Context, meta interfaces.Gene return nil } -// ── Spaces backend ───────────────────────────────────────────────────────────── - -// resolveSpacesStateStore builds a Spaces-backed state store from the expanded -// iac.state module config. Credentials fall back to DO_SPACES_ACCESS_KEY / -// DO_SPACES_SECRET_KEY environment variables via module.NewSpacesIaCStateStore. -func resolveSpacesStateStore(cfg map[string]any) (infraStateStore, error) { - bucket, _ := cfg["bucket"].(string) - region, _ := cfg["region"].(string) - prefix, _ := cfg["prefix"].(string) - - accessKey, _ := cfg["accessKey"].(string) - if accessKey == "" { - accessKey, _ = cfg["access_key"].(string) - } - secretKey, _ := cfg["secretKey"].(string) - if secretKey == "" { - secretKey, _ = cfg["secret_key"].(string) - } - if bucket == "" { - return nil, fmt.Errorf("iac.state backend=spaces requires 'bucket' in config") - } - inner, err := module.NewSpacesIaCStateStore(region, bucket, prefix, accessKey, secretKey, "") - if err != nil { - return nil, fmt.Errorf("init spaces state store: %w", err) - } - return &spacesWfctlStateStore{inner: inner}, nil -} - -// spacesWfctlStateStore wraps module.SpacesIaCStateStore to implement -// infraStateStore, bridging module.IaCState ↔ interfaces.ResourceState. -type spacesWfctlStateStore struct { - inner *module.SpacesIaCStateStore -} - -func (s *spacesWfctlStateStore) ListResources(ctx context.Context) ([]interfaces.ResourceState, error) { - records, err := s.inner.ListStates(ctx, nil) - if err != nil { - return nil, fmt.Errorf("list spaces state: %w", err) - } - states := make([]interfaces.ResourceState, 0, len(records)) - for _, r := range records { - states = append(states, iacStateToResourceState(r)) - } - return states, nil -} - -func (s *spacesWfctlStateStore) SaveResource(ctx context.Context, state interfaces.ResourceState) error { - return s.inner.SaveState(ctx, resourceStateToIaCState(state)) -} - -func (s *spacesWfctlStateStore) DeleteResource(ctx context.Context, name string) error { - return s.inner.DeleteState(ctx, name) -} - // ── Postgres backend ─────────────────────────────────────────────────────────── // resolvePostgresStateStore builds a Postgres-backed state store from the diff --git a/cmd/wfctl/state_compat_test.go b/cmd/wfctl/state_compat_test.go index ac8685af..fda70332 100644 --- a/cmd/wfctl/state_compat_test.go +++ b/cmd/wfctl/state_compat_test.go @@ -27,9 +27,9 @@ import ( // the real-world fidelity check. // // - The test reads the fixture via the v1.0.0 wfctl iacStateRecord -// decoder (the same path loadFSState / spacesWfctlStateStore. -// ListResources use), then converts it via iacRecordToResourceState -// and asserts every load-bearing field survived. +// decoder (the same path loadFSState / fsWfctlStateStore.ListResources +// use), then converts it via iacRecordToResourceState and asserts every +// load-bearing field survived. // // If this test FAILS in CI: PR 5 cascade-block surfaces. Plan response // (per Task 31 §If FAIL): diff --git a/docs/migrations/2026-05-15-plugin-modules-on-iac.md b/docs/migrations/2026-05-15-plugin-modules-on-iac.md new file mode 100644 index 00000000..b722e745 --- /dev/null +++ b/docs/migrations/2026-05-15-plugin-modules-on-iac.md @@ -0,0 +1,135 @@ +# 2026-05-15 — Plugin-modules-on-IaC: Phase B clean break + +This migration covers **Phase B** of the +[plugin-modules-on-IaC plan](../plans/2026-05-15-plugin-modules-on-iac.md): +workflow-core sheds the remaining in-core AWS/DO storage + state surfaces and +the SDK-bearing AWS credential resolvers. Each surface is now plugin-native. + +The companion **Phase C** migration (GCP) follows in a separate PR; this doc is +amended in-place when that ships. + +## Engine floor + +Phase B requires **workflow `>= v0.53.0`** in any deployment that uses the +affected backends. The `>= v0.53.0` engine has the typed `IaCStateBackend` +gRPC contract (Phase A, decisions/0036), the `Configure` RPC that delivers the +`iac.state` module YAML to the plugin, and the plugin-backend registry that +`IaCModule.Init` consults in its `default:`-arm. + +## What changed + +| Surface | Was | Now | +|---|---|---| +| `iac.state` `backend: spaces` | in-core `module.SpacesIaCStateStore` | plugin-served by [`workflow-plugin-digitalocean`](https://github.com/GoCodeAlone/workflow-plugin-digitalocean) `>= v1.1.0` | +| `iac.state` `backend: s3` | (already moved in v0.53.0; no in-core impl since then) | plugin-served by [`workflow-plugin-aws`](https://github.com/GoCodeAlone/workflow-plugin-aws) `>= v1.1.0` | +| `storage.s3` module | in-core `module.S3Storage` (registered by `plugins/storage`) | plugin-native in `workflow-plugin-aws >= v1.1.0` | +| `step.s3_upload` pipeline step | in-core `module.S3UploadStep` (registered by `plugins/pipelinesteps`) | plugin-native in `workflow-plugin-aws >= v1.1.0` | +| `cloud.account` `provider: aws` + `credentials.type: profile` or `role_arn` | SDK-bearing resolver loaded the profile / called `sts:AssumeRole` in-core | core records a `credential_source` marker only; the aws plugin performs SDK resolution via `awscreds.BuildAWSConfig` (decisions/0036 + 0038) | + +The YAML field names and `backend:` values are **unchanged**. The break is +strictly about *which binary* serves them. + +## Why + +Workflow core owns IaC orchestration interfaces, not provider SDKs. Provider +SDKs ride with the provider plugin, where Dependabot bumps and SDK CVE patches +belong. This continues the pattern set by the `godo` removal +([v0.52.0](v0.52.0-godo-removal.md)) and the AWS IaC core removal +([v0.53.0](v0.53.0-aws-iac-removal.md)). + +## Breaking change — action required + +### `iac.state backend: spaces` + +Load `workflow-plugin-digitalocean >= v1.1.0`. The YAML `backend: spaces` +value is unchanged; all existing config keys (`region`, `bucket`, `prefix`, +`accessKey`, `secretKey`, `endpoint`) keep their semantics. The +`DO_SPACES_ACCESS_KEY` / `DO_SPACES_SECRET_KEY` environment fallbacks are +preserved by the plugin port. + +Without the plugin, `IaCModule.Init` fails fast: + +``` +iac.state "": backend "spaces" is not built into workflow core +(in-core backends: 'memory', 'filesystem', 'gcs', 'postgres'). +If "spaces" is a plugin-provided backend (e.g. 'azure_blob' via +workflow-plugin-azure, 'spaces' via workflow-plugin-digitalocean, +'s3' via workflow-plugin-aws), install and load that plugin +``` + +### `iac.state backend: s3` + +Load `workflow-plugin-aws >= v1.1.0`. Same shape as the `spaces` migration — +YAML unchanged, error message above identifies the missing plugin. + +### `storage.s3` module + `step.s3_upload` pipeline step + +Both move into `workflow-plugin-aws >= v1.1.0`. Credentials can be inline in +the module/step config, or referenced via `credentials_ref:` pointing at an +`aws.credentials` module loaded by the plugin. With no plugin loaded the +module type / step type is unknown at engine boot — load the plugin in the +deployment's plugin manifest. + +### `cloud.account provider: aws` with `credentials.type: profile` or `role_arn` + +The credential config sits under the nested `credentials:` map on the +`cloud.account` module (the key is `credentials.type`, not a flat +`credentialType:`). The affected shape: + +```yaml +modules: + - name: aws-account + type: cloud.account + config: + provider: aws + region: us-east-1 + credentials: + type: profile # or role_arn + profile: team-prod # for type=profile + # roleArn / externalId / sessionName for type=role_arn +``` + +Core no longer resolves the profile or calls `sts:AssumeRole`. Instead the +resolver records `Extra["credential_source"] = "profile"` or `"role_arn"` +(plus `Extra["profile"]` / `m.creds.RoleARN` + `Extra["external_id"]`) and +logs a `workflow: aws credential_source=…` warning. + +The aws plugin's `awscreds.BuildAWSConfig` consumes the marker at the point of +need and performs the SDK-bearing resolution in-plugin. This is a +**co-deploy** requirement: core `>= v0.53.0` AND `workflow-plugin-aws +>= v1.1.0` must be deployed together. Mixing an old plugin against new core +results in a `credential_source` marker the plugin can't interpret — the core +warning is what tells operators which side to upgrade. + +`credentials.type: static` and `credentials.type: env` are unaffected — those +paths have always been SDK-free and resolve in-core. + +## Rollback + +Phase B's clean-breaks roll back only as a **matched pair** with the plugin +releases that serve them — reverting PR `feat/phase-b-core-deletion` +restores the in-core paths, but the plugin v1.1.0 tags are immutable. A +patch-level defect in either plugin port is resolved with a `v1.1.1` +release, not by re-introducing the in-core implementation. + +The `cloud_account_aws.go` deletion (164 lines of dead code that #653 had +already orphaned) is not part of the matched-pair rollback — it had zero +non-test consumers. + +## Verification + +Once Phase B is merged: + +- `go mod tidy` against the merged tree should make no net change to AWS SDK + service modules — `aws-sdk-go-v2` stays in `go.mod` because `provider/aws/`, + `plugin/rbac/aws.go`, `iam/aws.go`, and `artifact/s3.go` still import it. +- The `.phase-b-complete` marker arms + `scripts/audit-cloud-symbols.sh --check`'s zero-`aws-sdk-go-v2` invariant on + `module/cloud_account_aws_creds.go`. Running the audit script post-merge + must report `audit-cloud-symbols: OK`. + +## Related design + plans + +- Plan: [docs/plans/2026-05-15-plugin-modules-on-iac.md](../plans/2026-05-15-plugin-modules-on-iac.md) +- Decisions: 0034 (autonomous plugin releases), 0035 (assumed-seam grep), 0036 (Configure RPC), 0038 (credential_source marker) +- Predecessors: [v0.52.0 godo removal](v0.52.0-godo-removal.md), [v0.53.0 AWS IaC removal](v0.53.0-aws-iac-removal.md), [2026-05-14 azure plugin extraction](2026-05-14-cloud-sdk-extraction.md) diff --git a/module/cloud_account_aws.go b/module/cloud_account_aws.go deleted file mode 100644 index f03d7384..00000000 --- a/module/cloud_account_aws.go +++ /dev/null @@ -1,98 +0,0 @@ -package module - -import ( - "context" - "fmt" - - "github.com/aws/aws-sdk-go-v2/aws" - "github.com/aws/aws-sdk-go-v2/config" - "github.com/aws/aws-sdk-go-v2/credentials" - "github.com/aws/aws-sdk-go-v2/credentials/stscreds" - "github.com/aws/aws-sdk-go-v2/service/sts" -) - -// AWSConfigProvider extends CloudCredentialProvider with AWS SDK config loading. -// Platform modules that need to call AWS APIs type-assert their CloudCredentialProvider -// to this interface to obtain a properly configured aws.Config. -type AWSConfigProvider interface { - CloudCredentialProvider - // AWSConfig returns a configured aws.Config for the current credential set. - AWSConfig(ctx context.Context) (aws.Config, error) -} - -// AWSConfig builds an aws.Config from the cloud.account configuration. -// Supports credential types: static/access_key, role_arn, env, profile, default. -// This satisfies the AWSConfigProvider interface. -func (m *CloudAccount) AWSConfig(ctx context.Context) (aws.Config, error) { - region := m.region - - credsMap, _ := m.config["credentials"].(map[string]any) - credType := "default" - if credsMap != nil { - if t, ok := credsMap["type"].(string); ok && t != "" { - credType = t - } - } - - switch credType { - case "static", "access_key": - accessKey, _ := credsMap["accessKey"].(string) - secretKey, _ := credsMap["secretKey"].(string) - sessionToken, _ := credsMap["sessionToken"].(string) - return config.LoadDefaultConfig(ctx, - config.WithRegion(region), - config.WithCredentialsProvider( - credentials.NewStaticCredentialsProvider(accessKey, secretKey, sessionToken), - ), - ) - - case "role_arn": - roleARN, _ := credsMap["roleArn"].(string) - if roleARN == "" { - return aws.Config{}, fmt.Errorf("cloud.account %q: role_arn credential requires 'roleArn'", m.name) - } - baseCfg, err := config.LoadDefaultConfig(ctx, config.WithRegion(region)) - if err != nil { - return aws.Config{}, fmt.Errorf("cloud.account %q: loading base config for role_arn: %w", m.name, err) - } - stsClient := sts.NewFromConfig(baseCfg) - provider := stscreds.NewAssumeRoleProvider(stsClient, roleARN) - return config.LoadDefaultConfig(ctx, - config.WithRegion(region), - config.WithCredentialsProvider(aws.NewCredentialsCache(provider)), - ) - - case "env", "default": - return config.LoadDefaultConfig(ctx, config.WithRegion(region)) - - case "profile": - profile := "" - if credsMap != nil { - profile, _ = credsMap["profile"].(string) - } - if profile == "" { - profile = "default" - } - return config.LoadDefaultConfig(ctx, - config.WithRegion(region), - config.WithSharedConfigProfile(profile), - ) - - default: - return aws.Config{}, fmt.Errorf("cloud.account %q: AWSConfig unsupported credential type %q", m.name, credType) - } -} - -// ValidateCredentials calls sts:GetCallerIdentity to verify the AWS credentials work. -func (m *CloudAccount) ValidateCredentials(ctx context.Context) error { - cfg, err := m.AWSConfig(ctx) - if err != nil { - return fmt.Errorf("cloud.account %q: AWSConfig failed: %w", m.name, err) - } - stsClient := sts.NewFromConfig(cfg) - _, err = stsClient.GetCallerIdentity(ctx, &sts.GetCallerIdentityInput{}) - if err != nil { - return fmt.Errorf("cloud.account %q: GetCallerIdentity failed: %w", m.name, err) - } - return nil -} diff --git a/module/cloud_account_aws_creds.go b/module/cloud_account_aws_creds.go index 8edc3e0f..19d8ff0b 100644 --- a/module/cloud_account_aws_creds.go +++ b/module/cloud_account_aws_creds.go @@ -1,14 +1,9 @@ package module import ( - "context" "fmt" + "log" "os" - - "github.com/aws/aws-sdk-go-v2/aws" - "github.com/aws/aws-sdk-go-v2/config" - "github.com/aws/aws-sdk-go-v2/credentials" - "github.com/aws/aws-sdk-go-v2/service/sts" ) func init() { @@ -55,8 +50,9 @@ func (r *awsEnvResolver) Resolve(m *CloudAccount) error { return nil } -// awsProfileResolver resolves AWS credentials from a named shared-config profile -// using aws-sdk-go-v2/config.LoadDefaultConfig with WithSharedConfigProfile. +// awsProfileResolver records a profile credential_source marker; SDK-bearing +// resolution happens in the aws plugin (decisions/0036 + 0038). Core no longer +// imports aws-sdk-go-v2/config — keeping the workflow binary SDK-free. type awsProfileResolver struct{} func (r *awsProfileResolver) Provider() string { return "aws" } @@ -80,26 +76,13 @@ func (r *awsProfileResolver) Resolve(m *CloudAccount) error { } m.creds.Extra["profile"] = profile - // Load credentials from the named profile using the AWS SDK. - // A missing local profile file is normal in CI/prod — don't hard-fail. - ctx := context.Background() - cfg, loadErr := config.LoadDefaultConfig(ctx, config.WithSharedConfigProfile(profile)) - if loadErr != nil { - return nil //nolint:nilerr // missing profile is normal in CI - } - creds, credErr := cfg.Credentials.Retrieve(ctx) - if credErr != nil { - return nil //nolint:nilerr // credential retrieval failure is non-fatal - } - m.creds.AccessKey = creds.AccessKeyID - m.creds.SecretKey = creds.SecretAccessKey - m.creds.SessionToken = creds.SessionToken + m.creds.Extra["credential_source"] = "profile" + logCredentialSourceMarker("aws", "profile") return nil } -// awsRoleARNResolver resolves AWS credentials via STS AssumeRole. -// It loads base credentials (from the environment or inline config), then calls -// sts:AssumeRole to obtain temporary credentials for the target role. +// awsRoleARNResolver records a role_arn credential_source marker; the actual +// sts:AssumeRole call is performed by the aws plugin (decisions/0036 + 0038). type awsRoleARNResolver struct{} func (r *awsRoleARNResolver) Provider() string { return "aws" } @@ -114,7 +97,7 @@ func (r *awsRoleARNResolver) Resolve(m *CloudAccount) error { roleARN, _ := credsMap["roleArn"].(string) externalID, _ := credsMap["externalId"].(string) - // Always record the role ARN so AWSConfig() can use stscreds.AssumeRoleProvider. + // Always record the role ARN so the plugin can use stscreds.AssumeRoleProvider. m.creds.RoleARN = roleARN if m.creds.Extra == nil { m.creds.Extra = map[string]string{} @@ -125,53 +108,20 @@ func (r *awsRoleARNResolver) Resolve(m *CloudAccount) error { return fmt.Errorf("awsRoleARNResolver: roleArn is required") } - sessionName, _ := credsMap["sessionName"].(string) - if sessionName == "" { - sessionName = "workflow-session" - } - - // Build base credentials. Inline accessKey/secretKey take priority over the - // default credential chain. - ctx := context.Background() - var baseCfgOpts []func(*config.LoadOptions) error - if region := m.region; region != "" { - baseCfgOpts = append(baseCfgOpts, config.WithRegion(region)) - } - accessKey, _ := credsMap["accessKey"].(string) - secretKey, _ := credsMap["secretKey"].(string) - if accessKey != "" && secretKey != "" { - sessionToken, _ := credsMap["sessionToken"].(string) - baseCfgOpts = append(baseCfgOpts, config.WithCredentialsProvider( - credentials.NewStaticCredentialsProvider(accessKey, secretKey, sessionToken), - )) - } - - baseCfg, loadErr := config.LoadDefaultConfig(ctx, baseCfgOpts...) - if loadErr != nil { - // AWSConfig() will retry via stscreds.AssumeRoleProvider at call time. - return nil //nolint:nilerr // config load failure is non-fatal - } - - stsClient := sts.NewFromConfig(baseCfg) - input := &sts.AssumeRoleInput{ - RoleArn: aws.String(roleARN), - RoleSessionName: aws.String(sessionName), - } - if externalID != "" { - input.ExternalId = aws.String(externalID) - } - - out, assumeErr := stsClient.AssumeRole(ctx, input) - if assumeErr != nil { - // AssumeRole may fail at config-load time without real credentials; - // AWSConfig() handles deferred token refresh via stscreds. - return nil //nolint:nilerr // AssumeRole failure handled by deferred refresh - } - - if out.Credentials != nil { - m.creds.AccessKey = aws.ToString(out.Credentials.AccessKeyId) - m.creds.SecretKey = aws.ToString(out.Credentials.SecretAccessKey) - m.creds.SessionToken = aws.ToString(out.Credentials.SessionToken) - } + m.creds.Extra["credential_source"] = "role_arn" + logCredentialSourceMarker("aws", "role_arn") return nil } + +// logCredentialSourceMarker emits via the stdlib `log` package (not the app +// logger). The resolver path runs before module Init / app-logger plumbing, +// so it has no handle on `app.Logger()`. Future migration to the structured +// logger would require storing the logger on `CloudAccount` at construction +// time; that's out of scope for the credential_source marker rollout. +// +// The warning matters during the gap window where an old plugin version may +// see a marker it doesn't yet understand — the message tells operators where +// the resolution moved. +func logCredentialSourceMarker(provider, source string) { + log.Printf("workflow: %s credential_source=%q recorded; resolution deferred to plugin (decisions/0036+0038)", provider, source) +} diff --git a/module/cloud_account_aws_creds_test.go b/module/cloud_account_aws_creds_test.go new file mode 100644 index 00000000..beeb9775 --- /dev/null +++ b/module/cloud_account_aws_creds_test.go @@ -0,0 +1,167 @@ +package module_test + +import ( + "bytes" + "context" + "log" + "strings" + "testing" + + "github.com/GoCodeAlone/workflow/module" +) + +// captureLog redirects log output to a buffer for the duration of fn and +// returns the captured bytes. The default log destination is restored when fn +// returns. +func captureLog(t *testing.T, fn func()) string { + t.Helper() + var buf bytes.Buffer + orig := log.Writer() + flags := log.Flags() + log.SetOutput(&buf) + log.SetFlags(0) + defer func() { + log.SetOutput(orig) + log.SetFlags(flags) + }() + fn() + return buf.String() +} + +// TestCloudAccount_AWS_ProfileResolver_Marker verifies the awsProfileResolver +// declares Extra["credential_source"]="profile" + Extra["profile"]= and +// does NOT touch the AWS SDK. The actual profile resolution is deferred to the +// aws plugin (decisions/0036 + 0038). +func TestCloudAccount_AWS_ProfileResolver_Marker(t *testing.T) { + acc := module.NewCloudAccount("aws-profile", map[string]any{ + "provider": "aws", + "region": "us-west-2", + "credentials": map[string]any{ + "type": "profile", + "profile": "my-team", + }, + }) + + app := module.NewMockApplication() + logged := captureLog(t, func() { + if err := acc.Init(app); err != nil { + t.Fatalf("Init failed: %v", err) + } + }) + + creds, err := acc.GetCredentials(context.Background()) + if err != nil { + t.Fatalf("GetCredentials failed: %v", err) + } + if got := creds.Extra["credential_source"]; got != "profile" { + t.Errorf("Extra[credential_source] = %q, want %q", got, "profile") + } + if got := creds.Extra["profile"]; got != "my-team" { + t.Errorf("Extra[profile] = %q, want %q", got, "my-team") + } + // Resolution is deferred — the core resolver must not populate access/secret keys. + if creds.AccessKey != "" || creds.SecretKey != "" { + t.Errorf("expected empty AccessKey/SecretKey (resolution deferred), got %q/%q", creds.AccessKey, creds.SecretKey) + } + if !strings.Contains(logged, `credential_source="profile"`) { + t.Errorf("expected warning log mentioning credential_source=\"profile\", got: %q", logged) + } +} + +// TestCloudAccount_AWS_ProfileResolver_DefaultProfile verifies the resolver +// falls back to "default" when neither the config nor AWS_PROFILE is set. +func TestCloudAccount_AWS_ProfileResolver_DefaultProfile(t *testing.T) { + t.Setenv("AWS_PROFILE", "") + + acc := module.NewCloudAccount("aws-default-profile", map[string]any{ + "provider": "aws", + "region": "us-west-2", + "credentials": map[string]any{ + "type": "profile", + }, + }) + + app := module.NewMockApplication() + captureLog(t, func() { + if err := acc.Init(app); err != nil { + t.Fatalf("Init failed: %v", err) + } + }) + + creds, err := acc.GetCredentials(context.Background()) + if err != nil { + t.Fatalf("GetCredentials failed: %v", err) + } + if got := creds.Extra["profile"]; got != "default" { + t.Errorf("Extra[profile] = %q, want %q", got, "default") + } + if got := creds.Extra["credential_source"]; got != "profile" { + t.Errorf("Extra[credential_source] = %q, want %q", got, "profile") + } +} + +// TestCloudAccount_AWS_RoleARNResolver_Marker verifies awsRoleARNResolver +// declares Extra["credential_source"]="role_arn", records the role ARN + +// external_id, and does NOT call STS. Resolution is deferred to the aws plugin. +func TestCloudAccount_AWS_RoleARNResolver_Marker(t *testing.T) { + acc := module.NewCloudAccount("aws-role", map[string]any{ + "provider": "aws", + "region": "us-east-1", + "credentials": map[string]any{ + "type": "role_arn", + "roleArn": "arn:aws:iam::123456789012:role/workflow", + "externalId": "ext-token-7", + }, + }) + + app := module.NewMockApplication() + logged := captureLog(t, func() { + if err := acc.Init(app); err != nil { + t.Fatalf("Init failed: %v", err) + } + }) + + creds, err := acc.GetCredentials(context.Background()) + if err != nil { + t.Fatalf("GetCredentials failed: %v", err) + } + if got := creds.Extra["credential_source"]; got != "role_arn" { + t.Errorf("Extra[credential_source] = %q, want %q", got, "role_arn") + } + if creds.RoleARN != "arn:aws:iam::123456789012:role/workflow" { + t.Errorf("RoleARN = %q, want %q", creds.RoleARN, "arn:aws:iam::123456789012:role/workflow") + } + if got := creds.Extra["external_id"]; got != "ext-token-7" { + t.Errorf("Extra[external_id] = %q, want %q", got, "ext-token-7") + } + // Resolution deferred — no STS call, so no temporary keys. + if creds.AccessKey != "" || creds.SecretKey != "" || creds.SessionToken != "" { + t.Errorf("expected empty access/secret/session (resolution deferred), got %q/%q/%q", + creds.AccessKey, creds.SecretKey, creds.SessionToken) + } + if !strings.Contains(logged, `credential_source="role_arn"`) { + t.Errorf("expected warning log mentioning credential_source=\"role_arn\", got: %q", logged) + } +} + +// TestCloudAccount_AWS_RoleARNResolver_MissingRoleArn verifies the required-check: +// an empty roleArn must surface as an error from Init (propagated by resolveCredentials). +func TestCloudAccount_AWS_RoleARNResolver_MissingRoleArn(t *testing.T) { + acc := module.NewCloudAccount("aws-role-missing", map[string]any{ + "provider": "aws", + "region": "us-east-1", + "credentials": map[string]any{ + "type": "role_arn", + "roleArn": "", + }, + }) + + app := module.NewMockApplication() + err := acc.Init(app) + if err == nil { + t.Fatal("expected Init to fail for empty roleArn, got nil") + } + if !strings.Contains(err.Error(), "roleArn is required") { + t.Errorf("error = %q, want substring %q", err.Error(), "roleArn is required") + } +} diff --git a/module/cloud_account_integration_test.go b/module/cloud_account_integration_test.go deleted file mode 100644 index 10de72f8..00000000 --- a/module/cloud_account_integration_test.go +++ /dev/null @@ -1,66 +0,0 @@ -//go:build integration - -package module_test - -import ( - "context" - "os" - "testing" - - "github.com/GoCodeAlone/workflow/module" -) - -// TestCloudAccount_AWS_ValidateCredentials verifies that a cloud.account with -// real AWS credentials can call sts:GetCallerIdentity. -// Requires: AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AWS_REGION env vars. -func TestCloudAccount_AWS_ValidateCredentials(t *testing.T) { - region := os.Getenv("AWS_REGION") - if region == "" { - region = "us-east-1" - } - acc := module.NewCloudAccount("aws-integration", map[string]any{ - "provider": "aws", - "region": region, - "credentials": map[string]any{ - "type": "env", - }, - }) - app := module.NewMockApplication() - if err := acc.Init(app); err != nil { - t.Fatalf("Init: %v", err) - } - - if err := acc.ValidateCredentials(context.Background()); err != nil { - t.Fatalf("ValidateCredentials: %v", err) - } -} - -// TestCloudAccount_AWS_AWSConfig_Static verifies that a static credential -// cloud.account produces a valid aws.Config. -func TestCloudAccount_AWS_AWSConfig_Static(t *testing.T) { - acc := module.NewCloudAccount("aws-static", map[string]any{ - "provider": "aws", - "region": "us-east-1", - "credentials": map[string]any{ - "type": "static", - "accessKey": os.Getenv("AWS_ACCESS_KEY_ID"), - "secretKey": os.Getenv("AWS_SECRET_ACCESS_KEY"), - }, - }) - app := module.NewMockApplication() - if err := acc.Init(app); err != nil { - t.Fatalf("Init: %v", err) - } - - awsProv, ok := any(acc).(module.AWSConfigProvider) - if !ok { - t.Fatal("CloudAccount does not implement AWSConfigProvider") - } - cfg, err := awsProv.AWSConfig(context.Background()) - if err != nil { - t.Fatalf("AWSConfig: %v", err) - } - if cfg.Region == "" { - t.Error("expected non-empty region in aws.Config") - } -} diff --git a/module/iac_module.go b/module/iac_module.go index 1a359635..0ee73c2d 100644 --- a/module/iac_module.go +++ b/module/iac_module.go @@ -10,10 +10,10 @@ import ( ) // IaCModule registers an IaCStateStore in the service registry. -// Supported in-core backends: "memory" (default), "filesystem", "spaces" -// (DigitalOcean Spaces / S3-compatible), "gcs", and "postgres" — plus any -// backend provided by a loaded plugin (e.g. "azure_blob" via -// workflow-plugin-azure). +// Supported in-core backends: "memory" (default), "filesystem", "gcs", and +// "postgres" — plus any backend provided by a loaded plugin (e.g. "azure_blob" +// via workflow-plugin-azure, "spaces" via workflow-plugin-digitalocean, "s3" +// via workflow-plugin-aws). // // Config example: // @@ -54,21 +54,6 @@ func (m *IaCModule) Init(app modular.Application) error { dir = "/var/lib/workflow/iac-state" } m.store = NewFSIaCStateStore(dir) - case "spaces": - region, _ := m.config["region"].(string) - bucket, _ := m.config["bucket"].(string) - prefix, _ := m.config["prefix"].(string) - accessKey, _ := m.config["accessKey"].(string) - secretKey, _ := m.config["secretKey"].(string) - endpoint, _ := m.config["endpoint"].(string) - if bucket == "" { - return fmt.Errorf("iac.state %q: spaces backend requires 'bucket' config", m.name) - } - store, err := NewSpacesIaCStateStore(region, bucket, prefix, accessKey, secretKey, endpoint) - if err != nil { - return fmt.Errorf("iac.state %q: spaces backend: %w", m.name, err) - } - m.store = store case "gcs": bucket, _ := m.config["bucket"].(string) prefix, _ := m.config["prefix"].(string) @@ -111,8 +96,9 @@ func (m *IaCModule) Init(app modular.Application) error { break } return fmt.Errorf("iac.state %q: backend %q is not built into workflow core "+ - "(in-core backends: 'memory', 'filesystem', 'spaces', 'gcs', 'postgres'). "+ - "If %q is a plugin-provided backend (e.g. 'azure_blob' via workflow-plugin-azure), "+ + "(in-core backends: 'memory', 'filesystem', 'gcs', 'postgres'). "+ + "If %q is a plugin-provided backend (e.g. 'azure_blob' via workflow-plugin-azure, "+ + "'spaces' via workflow-plugin-digitalocean, 's3' via workflow-plugin-aws), "+ "install and load that plugin", m.name, m.backend, m.backend) } diff --git a/module/iac_state_spaces.go b/module/iac_state_spaces.go deleted file mode 100644 index 365d9e1a..00000000 --- a/module/iac_state_spaces.go +++ /dev/null @@ -1,332 +0,0 @@ -package module - -import ( - "bytes" - "context" - "encoding/json" - "errors" - "fmt" - "io" - "os" - "strings" - "sync" - "time" - - "github.com/aws/aws-sdk-go-v2/aws" - awsconfig "github.com/aws/aws-sdk-go-v2/config" - "github.com/aws/aws-sdk-go-v2/credentials" - "github.com/aws/aws-sdk-go-v2/service/s3" - "github.com/aws/aws-sdk-go-v2/service/s3/types" -) - -// SpacesS3Client abstracts the S3 API methods used by SpacesIaCStateStore, -// allowing a mock to be injected for testing. -type SpacesS3Client interface { - GetObject(ctx context.Context, input *s3.GetObjectInput, opts ...func(*s3.Options)) (*s3.GetObjectOutput, error) - PutObject(ctx context.Context, input *s3.PutObjectInput, opts ...func(*s3.Options)) (*s3.PutObjectOutput, error) - DeleteObject(ctx context.Context, input *s3.DeleteObjectInput, opts ...func(*s3.Options)) (*s3.DeleteObjectOutput, error) - ListObjectsV2(ctx context.Context, input *s3.ListObjectsV2Input, opts ...func(*s3.Options)) (*s3.ListObjectsV2Output, error) - HeadObject(ctx context.Context, input *s3.HeadObjectInput, opts ...func(*s3.Options)) (*s3.HeadObjectOutput, error) -} - -// SpacesIaCStateStore persists IaC state as JSON objects in a DigitalOcean Spaces -// bucket (or any S3-compatible store). Lock objects are used for advisory locking. -type SpacesIaCStateStore struct { - client SpacesS3Client - bucket string - prefix string - mu sync.Mutex -} - -// NewSpacesIaCStateStore creates a Spaces/S3-compatible state store. -// -// Parameters: -// - region: DO region (e.g. "nyc3"); used to construct the endpoint -// https://.digitaloceanspaces.com unless endpoint is set. -// - bucket: Spaces bucket name (required). -// - prefix: optional key prefix (default "iac-state/"). -// - accessKey: Spaces access key; falls back to DO_SPACES_ACCESS_KEY env var. -// - secretKey: Spaces secret key; falls back to DO_SPACES_SECRET_KEY env var. -// - endpoint: optional custom endpoint override. -func NewSpacesIaCStateStore(region, bucket, prefix, accessKey, secretKey, endpoint string) (*SpacesIaCStateStore, error) { - if bucket == "" { - return nil, fmt.Errorf("iac spaces state: bucket must not be empty") - } - if prefix == "" { - prefix = "iac-state/" - } - if accessKey == "" { - accessKey = os.Getenv("DO_SPACES_ACCESS_KEY") - } - if secretKey == "" { - secretKey = os.Getenv("DO_SPACES_SECRET_KEY") - } - if endpoint == "" && region != "" { - endpoint = fmt.Sprintf("https://%s.digitaloceanspaces.com", region) - } - if endpoint == "" { - return nil, fmt.Errorf("iac spaces state: either region or endpoint must be set") - } - - cfg, err := awsconfig.LoadDefaultConfig(context.Background(), - awsconfig.WithRegion(regionOrDefault(region)), - awsconfig.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(accessKey, secretKey, "")), - ) - if err != nil { - return nil, fmt.Errorf("iac spaces state: load config: %w", err) - } - - client := s3.NewFromConfig(cfg, func(o *s3.Options) { - o.BaseEndpoint = &endpoint - o.UsePathStyle = true - }) - - return &SpacesIaCStateStore{ - client: client, - bucket: bucket, - prefix: prefix, - }, nil -} - -// NewSpacesIaCStateStoreWithClient creates a store with an injected client (for testing). -func NewSpacesIaCStateStoreWithClient(client SpacesS3Client, bucket, prefix string) *SpacesIaCStateStore { - if prefix == "" { - prefix = "iac-state/" - } - return &SpacesIaCStateStore{ - client: client, - bucket: bucket, - prefix: prefix, - } -} - -func regionOrDefault(region string) string { - if region == "" { - return "us-east-1" - } - return region -} - -// stateKey returns the S3 key for a resource's state JSON. -func (s *SpacesIaCStateStore) stateKey(resourceID string) string { - return s.prefix + sanitizeID(resourceID) + ".json" -} - -// lockKey returns the S3 key for a resource's lock object. -func (s *SpacesIaCStateStore) lockKey(resourceID string) string { - return s.prefix + sanitizeID(resourceID) + ".lock" -} - -// GetState retrieves a state record by resource ID. Returns nil, nil when not found. -func (s *SpacesIaCStateStore) GetState(ctx context.Context, resourceID string) (*IaCState, error) { - key := s.stateKey(resourceID) - out, err := s.client.GetObject(ctx, &s3.GetObjectInput{ - Bucket: &s.bucket, - Key: &key, - }) - if err != nil { - if isNotFoundErr(err) { - return nil, nil - } - return nil, fmt.Errorf("iac spaces state: GetState %q: %w", resourceID, err) - } - defer out.Body.Close() - - data, err := io.ReadAll(out.Body) - if err != nil { - return nil, fmt.Errorf("iac spaces state: GetState %q: read body: %w", resourceID, err) - } - - var st IaCState - if err := json.Unmarshal(data, &st); err != nil { - return nil, fmt.Errorf("iac spaces state: GetState %q: unmarshal: %w", resourceID, err) - } - return &st, nil -} - -// SaveState writes the state record as a JSON object to Spaces. -func (s *SpacesIaCStateStore) SaveState(ctx context.Context, state *IaCState) error { - if state == nil { - return fmt.Errorf("iac spaces state: SaveState: state must not be nil") - } - if state.ResourceID == "" { - return fmt.Errorf("iac spaces state: SaveState: resource_id must not be empty") - } - - data, err := json.MarshalIndent(state, "", " ") - if err != nil { - return fmt.Errorf("iac spaces state: SaveState %q: marshal: %w", state.ResourceID, err) - } - - key := s.stateKey(state.ResourceID) - contentType := "application/json" - _, err = s.client.PutObject(ctx, &s3.PutObjectInput{ - Bucket: &s.bucket, - Key: &key, - Body: bytes.NewReader(data), - ContentType: &contentType, - }) - if err != nil { - return fmt.Errorf("iac spaces state: SaveState %q: put: %w", state.ResourceID, err) - } - return nil -} - -// ListStates lists all state objects under the prefix and returns those matching filter. -// Supported filter keys: "resource_type", "provider", "status". -func (s *SpacesIaCStateStore) ListStates(ctx context.Context, filter map[string]string) ([]*IaCState, error) { - var results []*IaCState - var continuationToken *string - - for { - out, err := s.client.ListObjectsV2(ctx, &s3.ListObjectsV2Input{ - Bucket: &s.bucket, - Prefix: &s.prefix, - ContinuationToken: continuationToken, - }) - if err != nil { - return nil, fmt.Errorf("iac spaces state: ListStates: %w", err) - } - - for _, obj := range out.Contents { - key := aws.ToString(obj.Key) - // Skip lock files and non-JSON objects. - if strings.HasSuffix(key, ".lock") || !strings.HasSuffix(key, ".json") { - continue - } - - getOut, err := s.client.GetObject(ctx, &s3.GetObjectInput{ - Bucket: &s.bucket, - Key: obj.Key, - }) - if err != nil { - continue // skip unreadable objects - } - data, err := io.ReadAll(getOut.Body) - getOut.Body.Close() - if err != nil { - continue - } - - var st IaCState - if err := json.Unmarshal(data, &st); err != nil { - continue - } - if matchesFilter(&st, filter) { - results = append(results, &st) - } - } - - if !aws.ToBool(out.IsTruncated) { - break - } - continuationToken = out.NextContinuationToken - } - - return results, nil -} - -// DeleteState removes the state object for resourceID. -func (s *SpacesIaCStateStore) DeleteState(ctx context.Context, resourceID string) error { - // Verify existence first to return a meaningful error. - key := s.stateKey(resourceID) - _, err := s.client.HeadObject(ctx, &s3.HeadObjectInput{ - Bucket: &s.bucket, - Key: &key, - }) - if err != nil { - if isNotFoundErr(err) { - return fmt.Errorf("iac spaces state: DeleteState %q: not found", resourceID) - } - return fmt.Errorf("iac spaces state: DeleteState %q: head: %w", resourceID, err) - } - - _, err = s.client.DeleteObject(ctx, &s3.DeleteObjectInput{ - Bucket: &s.bucket, - Key: &key, - }) - if err != nil { - return fmt.Errorf("iac spaces state: DeleteState %q: %w", resourceID, err) - } - return nil -} - -// Lock creates a lock object for resourceID using S3 conditional writes (If-None-Match: *) -// for atomic, race-free lock acquisition. Fails if the lock already exists. -func (s *SpacesIaCStateStore) Lock(ctx context.Context, resourceID string) error { - s.mu.Lock() - defer s.mu.Unlock() - - key := s.lockKey(resourceID) - body := []byte(time.Now().UTC().Format(time.RFC3339)) - ifNoneMatch := "*" - - _, err := s.client.PutObject(ctx, &s3.PutObjectInput{ - Bucket: &s.bucket, - Key: &key, - Body: bytes.NewReader(body), - IfNoneMatch: &ifNoneMatch, - }) - if err != nil { - // S3 returns 412 Precondition Failed when the object already exists. - if isPreconditionFailedErr(err) { - return fmt.Errorf("iac spaces state: Lock %q: resource is already locked", resourceID) - } - return fmt.Errorf("iac spaces state: Lock %q: put: %w", resourceID, err) - } - return nil -} - -// Unlock removes the lock object for resourceID. -func (s *SpacesIaCStateStore) Unlock(ctx context.Context, resourceID string) error { - s.mu.Lock() - defer s.mu.Unlock() - - key := s.lockKey(resourceID) - - // Verify lock exists. - _, err := s.client.HeadObject(ctx, &s3.HeadObjectInput{ - Bucket: &s.bucket, - Key: &key, - }) - if err != nil { - if isNotFoundErr(err) { - return fmt.Errorf("iac spaces state: Unlock %q: not locked", resourceID) - } - return fmt.Errorf("iac spaces state: Unlock %q: head: %w", resourceID, err) - } - - _, err = s.client.DeleteObject(ctx, &s3.DeleteObjectInput{ - Bucket: &s.bucket, - Key: &key, - }) - if err != nil { - return fmt.Errorf("iac spaces state: Unlock %q: %w", resourceID, err) - } - return nil -} - -// isPreconditionFailedErr returns true for HTTP 412 Precondition Failed responses, -// which S3 returns when a conditional write fails (e.g. If-None-Match: * on an existing object). -func isPreconditionFailedErr(err error) bool { - if err == nil { - return false - } - msg := err.Error() - return strings.Contains(msg, "PreconditionFailed") || strings.Contains(msg, "412") -} - -// isNotFoundErr checks whether an S3 error indicates the key was not found. -func isNotFoundErr(err error) bool { - var nsk *types.NoSuchKey - if errors.As(err, &nsk) { - return true - } - // HeadObject returns a generic "NotFound" status, not NoSuchKey. - var nf *types.NotFound - if errors.As(err, &nf) { - return true - } - // Some S3-compatible stores return a plain "not found" in the message. - return strings.Contains(err.Error(), "NotFound") || strings.Contains(err.Error(), "NoSuchKey") -} diff --git a/module/iac_state_spaces_test.go b/module/iac_state_spaces_test.go deleted file mode 100644 index 2de2fd1d..00000000 --- a/module/iac_state_spaces_test.go +++ /dev/null @@ -1,461 +0,0 @@ -package module_test - -import ( - "bytes" - "context" - "encoding/json" - "fmt" - "io" - "strings" - "sync" - "testing" - - "github.com/GoCodeAlone/workflow/module" - "github.com/aws/aws-sdk-go-v2/aws" - "github.com/aws/aws-sdk-go-v2/service/s3" - "github.com/aws/aws-sdk-go-v2/service/s3/types" -) - -// mockS3Client implements spacesS3Client for testing. -type mockS3Client struct { - mu sync.Mutex - objects map[string][]byte // key -> body -} - -func newMockS3Client() *mockS3Client { - return &mockS3Client{objects: make(map[string][]byte)} -} - -func (m *mockS3Client) GetObject(_ context.Context, input *s3.GetObjectInput, _ ...func(*s3.Options)) (*s3.GetObjectOutput, error) { - m.mu.Lock() - defer m.mu.Unlock() - key := aws.ToString(input.Key) - data, ok := m.objects[key] - if !ok { - return nil, &types.NoSuchKey{Message: aws.String("NoSuchKey: " + key)} - } - return &s3.GetObjectOutput{ - Body: io.NopCloser(bytes.NewReader(data)), - }, nil -} - -func (m *mockS3Client) PutObject(_ context.Context, input *s3.PutObjectInput, _ ...func(*s3.Options)) (*s3.PutObjectOutput, error) { - m.mu.Lock() - defer m.mu.Unlock() - key := aws.ToString(input.Key) - // Honour If-None-Match: * — fail if the object already exists (atomic lock semantics). - if aws.ToString(input.IfNoneMatch) == "*" { - if _, exists := m.objects[key]; exists { - return nil, fmt.Errorf("PreconditionFailed: object %q already exists", key) - } - } - data, err := io.ReadAll(input.Body) - if err != nil { - return nil, err - } - m.objects[key] = data - return &s3.PutObjectOutput{}, nil -} - -func (m *mockS3Client) DeleteObject(_ context.Context, input *s3.DeleteObjectInput, _ ...func(*s3.Options)) (*s3.DeleteObjectOutput, error) { - m.mu.Lock() - defer m.mu.Unlock() - key := aws.ToString(input.Key) - delete(m.objects, key) - return &s3.DeleteObjectOutput{}, nil -} - -func (m *mockS3Client) ListObjectsV2(_ context.Context, input *s3.ListObjectsV2Input, _ ...func(*s3.Options)) (*s3.ListObjectsV2Output, error) { - m.mu.Lock() - defer m.mu.Unlock() - prefix := aws.ToString(input.Prefix) - var contents []types.Object - for key := range m.objects { - if strings.HasPrefix(key, prefix) { - contents = append(contents, types.Object{Key: aws.String(key)}) - } - } - return &s3.ListObjectsV2Output{ - Contents: contents, - IsTruncated: aws.Bool(false), - }, nil -} - -func (m *mockS3Client) HeadObject(_ context.Context, input *s3.HeadObjectInput, _ ...func(*s3.Options)) (*s3.HeadObjectOutput, error) { - m.mu.Lock() - defer m.mu.Unlock() - key := aws.ToString(input.Key) - if _, ok := m.objects[key]; !ok { - return nil, &types.NotFound{Message: aws.String("NotFound: " + key)} - } - return &s3.HeadObjectOutput{}, nil -} - -func newTestSpacesStore(client *mockS3Client) *module.SpacesIaCStateStore { - return module.NewSpacesIaCStateStoreWithClient(client, "test-bucket", "iac-state/") -} - -func TestSpacesIaCStateStore_GetState_NotFound(t *testing.T) { - store := newTestSpacesStore(newMockS3Client()) - - st, err := store.GetState(context.Background(), "nonexistent") - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - if st != nil { - t.Fatalf("expected nil state, got %+v", st) - } -} - -func TestSpacesIaCStateStore_SaveAndGetState(t *testing.T) { - store := newTestSpacesStore(newMockS3Client()) - - state := &module.IaCState{ - ResourceID: "cluster-1", - ResourceType: "kubernetes", - Provider: "digitalocean", - Status: "active", - Outputs: map[string]any{"endpoint": "https://k8s.example.com"}, - Config: map[string]any{"region": "nyc3"}, - Dependencies: []string{"network-1"}, - CreatedAt: "2026-03-09T00:00:00Z", - UpdatedAt: "2026-03-09T00:00:00Z", - } - - if err := store.SaveState(context.Background(), state); err != nil { - t.Fatalf("SaveState: %v", err) - } - - got, err := store.GetState(context.Background(), "cluster-1") - if err != nil { - t.Fatalf("GetState: %v", err) - } - if got == nil { - t.Fatal("expected state, got nil") - } - if got.ResourceID != "cluster-1" { - t.Errorf("ResourceID = %q, want %q", got.ResourceID, "cluster-1") - } - if got.Provider != "digitalocean" { - t.Errorf("Provider = %q, want %q", got.Provider, "digitalocean") - } - if got.Status != "active" { - t.Errorf("Status = %q, want %q", got.Status, "active") - } - if len(got.Dependencies) != 1 || got.Dependencies[0] != "network-1" { - t.Errorf("Dependencies = %#v, want [network-1]", got.Dependencies) - } -} - -func TestSpacesIaCStateStore_SaveState_Nil(t *testing.T) { - store := newTestSpacesStore(newMockS3Client()) - - err := store.SaveState(context.Background(), nil) - if err == nil { - t.Fatal("expected error for nil state") - } -} - -func TestSpacesIaCStateStore_SaveState_EmptyID(t *testing.T) { - store := newTestSpacesStore(newMockS3Client()) - - err := store.SaveState(context.Background(), &module.IaCState{}) - if err == nil { - t.Fatal("expected error for empty resource_id") - } -} - -func TestSpacesIaCStateStore_ListStates(t *testing.T) { - client := newMockS3Client() - store := newTestSpacesStore(client) - - states := []*module.IaCState{ - {ResourceID: "r1", ResourceType: "kubernetes", Provider: "aws", Status: "active"}, - {ResourceID: "r2", ResourceType: "database", Provider: "digitalocean", Status: "active"}, - {ResourceID: "r3", ResourceType: "kubernetes", Provider: "aws", Status: "destroyed"}, - } - for _, st := range states { - if err := store.SaveState(context.Background(), st); err != nil { - t.Fatalf("SaveState %q: %v", st.ResourceID, err) - } - } - - // No filter — returns all. - all, err := store.ListStates(context.Background(), nil) - if err != nil { - t.Fatalf("ListStates(nil): %v", err) - } - if len(all) != 3 { - t.Errorf("ListStates(nil) = %d items, want 3", len(all)) - } - - // Filter by provider. - filtered, err := store.ListStates(context.Background(), map[string]string{"provider": "aws"}) - if err != nil { - t.Fatalf("ListStates(provider=aws): %v", err) - } - if len(filtered) != 2 { - t.Errorf("ListStates(provider=aws) = %d items, want 2", len(filtered)) - } - - // Filter by status. - active, err := store.ListStates(context.Background(), map[string]string{"status": "active"}) - if err != nil { - t.Fatalf("ListStates(status=active): %v", err) - } - if len(active) != 2 { - t.Errorf("ListStates(status=active) = %d items, want 2", len(active)) - } -} - -func TestSpacesIaCStateStore_ListStates_SkipsLockFiles(t *testing.T) { - client := newMockS3Client() - store := newTestSpacesStore(client) - - // Save a state and lock it — lock file should be skipped in list. - if err := store.SaveState(context.Background(), &module.IaCState{ResourceID: "r1", Status: "active"}); err != nil { - t.Fatalf("SaveState: %v", err) - } - if err := store.Lock(context.Background(), "r1"); err != nil { - t.Fatalf("Lock: %v", err) - } - - results, err := store.ListStates(context.Background(), nil) - if err != nil { - t.Fatalf("ListStates: %v", err) - } - if len(results) != 1 { - t.Errorf("ListStates returned %d items (expected 1, lock file should be excluded)", len(results)) - } -} - -func TestSpacesIaCStateStore_DeleteState(t *testing.T) { - store := newTestSpacesStore(newMockS3Client()) - - if err := store.SaveState(context.Background(), &module.IaCState{ResourceID: "del-me", Status: "active"}); err != nil { - t.Fatalf("SaveState: %v", err) - } - - if err := store.DeleteState(context.Background(), "del-me"); err != nil { - t.Fatalf("DeleteState: %v", err) - } - - // Should be gone. - st, err := store.GetState(context.Background(), "del-me") - if err != nil { - t.Fatalf("GetState after delete: %v", err) - } - if st != nil { - t.Fatal("expected nil after delete") - } -} - -func TestSpacesIaCStateStore_DeleteState_NotFound(t *testing.T) { - store := newTestSpacesStore(newMockS3Client()) - - err := store.DeleteState(context.Background(), "nonexistent") - if err == nil { - t.Fatal("expected error deleting nonexistent state") - } - if !strings.Contains(err.Error(), "not found") { - t.Errorf("error = %q, expected 'not found'", err) - } -} - -func TestSpacesIaCStateStore_LockUnlock(t *testing.T) { - store := newTestSpacesStore(newMockS3Client()) - - // Lock should succeed. - if err := store.Lock(context.Background(), "res-1"); err != nil { - t.Fatalf("Lock: %v", err) - } - - // Double-lock should fail. - if err := store.Lock(context.Background(), "res-1"); err == nil { - t.Fatal("expected error on double lock") - } - - // Unlock should succeed. - if err := store.Unlock(context.Background(), "res-1"); err != nil { - t.Fatalf("Unlock: %v", err) - } - - // Re-lock after unlock should succeed. - if err := store.Lock(context.Background(), "res-1"); err != nil { - t.Fatalf("Lock after unlock: %v", err) - } -} - -func TestSpacesIaCStateStore_Unlock_NotLocked(t *testing.T) { - store := newTestSpacesStore(newMockS3Client()) - - err := store.Unlock(context.Background(), "not-locked") - if err == nil { - t.Fatal("expected error unlocking a resource that is not locked") - } - if !strings.Contains(err.Error(), "not locked") { - t.Errorf("error = %q, expected 'not locked'", err) - } -} - -func TestSpacesIaCStateStore_SaveState_Overwrite(t *testing.T) { - store := newTestSpacesStore(newMockS3Client()) - - original := &module.IaCState{ResourceID: "r1", Status: "planned"} - if err := store.SaveState(context.Background(), original); err != nil { - t.Fatalf("SaveState (original): %v", err) - } - - updated := &module.IaCState{ResourceID: "r1", Status: "active"} - if err := store.SaveState(context.Background(), updated); err != nil { - t.Fatalf("SaveState (updated): %v", err) - } - - got, err := store.GetState(context.Background(), "r1") - if err != nil { - t.Fatalf("GetState: %v", err) - } - if got.Status != "active" { - t.Errorf("Status = %q, want %q (overwrite failed)", got.Status, "active") - } -} - -func TestSpacesIaCStateStore_SanitizesResourceID(t *testing.T) { - client := newMockS3Client() - store := newTestSpacesStore(client) - - state := &module.IaCState{ResourceID: "ns/cluster\\1", Status: "active"} - if err := store.SaveState(context.Background(), state); err != nil { - t.Fatalf("SaveState: %v", err) - } - - // Verify the key was sanitized. - client.mu.Lock() - _, exists := client.objects["iac-state/ns_cluster_1.json"] - client.mu.Unlock() - if !exists { - t.Error("expected sanitized key 'iac-state/ns_cluster_1.json' in mock objects") - } - - // Retrieve by original ID. - got, err := store.GetState(context.Background(), "ns/cluster\\1") - if err != nil { - t.Fatalf("GetState: %v", err) - } - if got == nil { - t.Fatal("expected state, got nil") - } -} - -// TestSpacesIaCStateStore_GetState_BadJSON verifies graceful handling of corrupt data. -func TestSpacesIaCStateStore_GetState_BadJSON(t *testing.T) { - client := newMockS3Client() - store := newTestSpacesStore(client) - - // Manually inject bad JSON. - client.mu.Lock() - client.objects["iac-state/bad.json"] = []byte("{invalid json") - client.mu.Unlock() - - _, err := store.GetState(context.Background(), "bad") - if err == nil { - t.Fatal("expected unmarshal error for bad JSON") - } - if !strings.Contains(err.Error(), "unmarshal") { - t.Errorf("error = %q, expected 'unmarshal' substring", err) - } -} - -// Ensure the mock properly serializes JSON round-trip. -func TestSpacesIaCStateStore_JSONRoundTrip(t *testing.T) { - store := newTestSpacesStore(newMockS3Client()) - - state := &module.IaCState{ - ResourceID: "rt-1", - ResourceType: "ecs", - Provider: "aws", - ProviderID: "arn:aws:ecs:us-east-1:123:cluster/test", - ConfigHash: "config-hash-rt-1", - Status: "provisioning", - Outputs: map[string]any{"arn": "arn:aws:ecs:us-east-1:123:cluster/test"}, - Config: map[string]any{"cpu": float64(256), "memory": float64(512)}, - CreatedAt: "2026-01-01T00:00:00Z", - UpdatedAt: "2026-03-09T12:00:00Z", - Error: "timeout waiting for stabilization", - } - - if err := store.SaveState(context.Background(), state); err != nil { - t.Fatalf("SaveState: %v", err) - } - - got, err := store.GetState(context.Background(), "rt-1") - if err != nil { - t.Fatalf("GetState: %v", err) - } - - // Compare via JSON to handle map ordering. - wantJSON, _ := json.Marshal(state) - gotJSON, _ := json.Marshal(got) - if string(wantJSON) != string(gotJSON) { - t.Errorf("round-trip mismatch:\n want: %s\n got: %s", wantJSON, gotJSON) - } -} - -// errS3Client is a mock that returns errors for all operations. -type errS3Client struct{} - -func (e *errS3Client) GetObject(_ context.Context, _ *s3.GetObjectInput, _ ...func(*s3.Options)) (*s3.GetObjectOutput, error) { - return nil, fmt.Errorf("simulated GetObject failure") -} -func (e *errS3Client) PutObject(_ context.Context, _ *s3.PutObjectInput, _ ...func(*s3.Options)) (*s3.PutObjectOutput, error) { - return nil, fmt.Errorf("simulated PutObject failure") -} -func (e *errS3Client) DeleteObject(_ context.Context, _ *s3.DeleteObjectInput, _ ...func(*s3.Options)) (*s3.DeleteObjectOutput, error) { - return nil, fmt.Errorf("simulated DeleteObject failure") -} -func (e *errS3Client) ListObjectsV2(_ context.Context, _ *s3.ListObjectsV2Input, _ ...func(*s3.Options)) (*s3.ListObjectsV2Output, error) { - return nil, fmt.Errorf("simulated ListObjectsV2 failure") -} -func (e *errS3Client) HeadObject(_ context.Context, _ *s3.HeadObjectInput, _ ...func(*s3.Options)) (*s3.HeadObjectOutput, error) { - return nil, fmt.Errorf("simulated HeadObject failure") -} - -func TestSpacesIaCStateStore_ErrorPropagation(t *testing.T) { - store := module.NewSpacesIaCStateStoreWithClient(&errS3Client{}, "test-bucket", "iac-state/") - - // GetState error. - _, err := store.GetState(context.Background(), "x") - if err == nil || !strings.Contains(err.Error(), "simulated") { - t.Errorf("GetState error = %v, want simulated error", err) - } - - // SaveState error. - err = store.SaveState(context.Background(), &module.IaCState{ResourceID: "x"}) - if err == nil || !strings.Contains(err.Error(), "simulated") { - t.Errorf("SaveState error = %v, want simulated error", err) - } - - // ListStates error. - _, err = store.ListStates(context.Background(), nil) - if err == nil || !strings.Contains(err.Error(), "simulated") { - t.Errorf("ListStates error = %v, want simulated error", err) - } - - // DeleteState error (HeadObject fails). - err = store.DeleteState(context.Background(), "x") - if err == nil || !strings.Contains(err.Error(), "simulated") { - t.Errorf("DeleteState error = %v, want simulated error", err) - } - - // Lock error (HeadObject fails with non-NotFound). - err = store.Lock(context.Background(), "x") - if err == nil || !strings.Contains(err.Error(), "simulated") { - t.Errorf("Lock error = %v, want simulated error", err) - } - - // Unlock error (HeadObject fails with non-NotFound). - err = store.Unlock(context.Background(), "x") - if err == nil || !strings.Contains(err.Error(), "simulated") { - t.Errorf("Unlock error = %v, want simulated error", err) - } -} diff --git a/module/pipeline_step_cloud_validate.go b/module/pipeline_step_cloud_validate.go index ba3a051c..77331fb8 100644 --- a/module/pipeline_step_cloud_validate.go +++ b/module/pipeline_step_cloud_validate.go @@ -102,7 +102,22 @@ func (s *CloudValidateStep) validateCreds(provider string, creds *CloudCredentia } switch provider { case "aws": - return creds.AccessKey != "" && creds.SecretKey != "" + // static/env: AccessKey + SecretKey populated directly. + if creds.AccessKey != "" && creds.SecretKey != "" { + return true + } + // profile/role_arn (post-Task 13 rewrite): resolver records a + // credential_source marker; the aws plugin performs the SDK-bearing + // resolution at the point of use. + if src, ok := creds.Extra["credential_source"]; ok && src != "" { + return true + } + // role_arn marker also records RoleARN on creds — accept it as a + // signal that the resolver ran successfully. + if creds.RoleARN != "" { + return true + } + return false case "gcp": return creds.ProjectID != "" || len(creds.ServiceAccountJSON) > 0 case "azure": diff --git a/module/pipeline_step_cloud_validate_test.go b/module/pipeline_step_cloud_validate_test.go new file mode 100644 index 00000000..81ed1519 --- /dev/null +++ b/module/pipeline_step_cloud_validate_test.go @@ -0,0 +1,108 @@ +package module_test + +import ( + "context" + "testing" + + "github.com/GoCodeAlone/workflow/module" +) + +// runAWSValidate is a helper that initialises a `cloud.account` (provider=aws) +// with the given credentials config, runs `step.cloud_validate`, and returns +// the `valid` output. Each sub-test of TestCloudValidateStep_AWS_Markers uses +// a fresh MockApplication to keep the service-registry entries isolated. +func runAWSValidate(t *testing.T, name string, credentials map[string]any) bool { + t.Helper() + cfg := map[string]any{ + "provider": "aws", + "region": "us-east-1", + } + if credentials != nil { + cfg["credentials"] = credentials + } + acc := module.NewCloudAccount(name, cfg) + + app := module.NewMockApplication() + if err := acc.Init(app); err != nil { + t.Fatalf("cloud account %q Init failed: %v", name, err) + } + + factory := module.NewCloudValidateStepFactory() + step, err := factory("validate-"+name, map[string]any{"account": name}, app) + if err != nil { + t.Fatalf("factory failed: %v", err) + } + + result, err := step.Execute(context.Background(), &module.PipelineContext{ + Current: map[string]any{}, + }) + if err != nil { + t.Fatalf("Execute failed: %v", err) + } + + valid, _ := result.Output["valid"].(bool) + return valid +} + +// TestCloudValidateStep_AWS_StaticKeys verifies the classic static-key path +// still validates after the Task 13 resolver rewrite. +func TestCloudValidateStep_AWS_StaticKeys(t *testing.T) { + if !runAWSValidate(t, "aws-static", map[string]any{ + "type": "static", + "accessKey": "AKIA-test", + "secretKey": "secret-test", + }) { + t.Error("expected valid=true for static AccessKey+SecretKey, got false") + } +} + +// TestCloudValidateStep_AWS_ProfileMarker verifies that a profile-resolved +// account (no access/secret keys, only Extra["credential_source"]) validates. +func TestCloudValidateStep_AWS_ProfileMarker(t *testing.T) { + if !runAWSValidate(t, "aws-profile", map[string]any{ + "type": "profile", + "profile": "team-prod", + }) { + t.Error("expected valid=true for profile credential_source marker, got false") + } +} + +// TestCloudValidateStep_AWS_RoleARNMarker verifies the role_arn marker path: +// the resolver populates RoleARN + Extra["credential_source"]="role_arn", no +// AccessKey/SecretKey, and validate must accept it. +func TestCloudValidateStep_AWS_RoleARNMarker(t *testing.T) { + if !runAWSValidate(t, "aws-role", map[string]any{ + "type": "role_arn", + "roleArn": "arn:aws:iam::123456789012:role/workflow", + }) { + t.Error("expected valid=true for role_arn marker, got false") + } +} + +// TestCloudValidateStep_AWS_EnvMarker verifies the env resolver path: when +// AWS_* env vars are unset the resolver leaves AccessKey/SecretKey empty and +// validate must report invalid (no marker is emitted for the env type). +func TestCloudValidateStep_AWS_EnvUnset(t *testing.T) { + t.Setenv("AWS_ACCESS_KEY_ID", "") + t.Setenv("AWS_ACCESS_KEY", "") + t.Setenv("AWS_SECRET_ACCESS_KEY", "") + t.Setenv("AWS_SECRET_KEY", "") + t.Setenv("AWS_SESSION_TOKEN", "") + t.Setenv("AWS_ROLE_ARN", "") + + if runAWSValidate(t, "aws-env-unset", map[string]any{"type": "env"}) { + t.Error("expected valid=false when AWS_* env vars are all unset, got true") + } +} + +// TestCloudValidateStep_AWS_EnvSet verifies the env resolver path: when +// AWS_ACCESS_KEY_ID + AWS_SECRET_ACCESS_KEY are set the resolver populates +// the credential fields and validate reports valid. +func TestCloudValidateStep_AWS_EnvSet(t *testing.T) { + t.Setenv("AWS_ACCESS_KEY_ID", "AKIA-env") + t.Setenv("AWS_SECRET_ACCESS_KEY", "secret-env") + + if !runAWSValidate(t, "aws-env-set", map[string]any{"type": "env"}) { + t.Error("expected valid=true for env credentials, got false") + } +} diff --git a/module/pipeline_step_s3_upload.go b/module/pipeline_step_s3_upload.go deleted file mode 100644 index 9a9f4c7e..00000000 --- a/module/pipeline_step_s3_upload.go +++ /dev/null @@ -1,228 +0,0 @@ -package module - -import ( - "bytes" - "context" - "encoding/base64" - "fmt" - "os" - "strings" - - "github.com/GoCodeAlone/modular" - awsconfig "github.com/aws/aws-sdk-go-v2/config" - "github.com/aws/aws-sdk-go-v2/service/s3" -) - -// s3PutObjectAPI is the minimal S3 interface needed by S3UploadStep, -// allowing injection of a mock client in tests. -type s3PutObjectAPI interface { - PutObject(ctx context.Context, params *s3.PutObjectInput, optFns ...func(*s3.Options)) (*s3.PutObjectOutput, error) -} - -// S3UploadStep uploads binary data (base64-encoded in the pipeline context) -// to S3-compatible object storage and returns the public URL, key, and bucket. -type S3UploadStep struct { - name string - bucket string - region string - key string // may contain Go template expressions (e.g. "avatars/{{.user_id}}/{{uuid}}.{{.ext}}") - bodyFrom string // dot-path to base64-encoded body in the pipeline context - contentTypeFrom string // dot-path to MIME type (optional) - contentType string // static MIME type (optional) - endpoint string // custom S3 endpoint for MinIO/LocalStack (optional) - tmpl *TemplateEngine - s3Client s3PutObjectAPI // injected in tests; nil triggers lazy init -} - -// NewS3UploadStepFactory returns a StepFactory that creates S3UploadStep instances. -func NewS3UploadStepFactory() StepFactory { - return func(name string, config map[string]any, _ modular.Application) (PipelineStep, error) { - bucket := os.ExpandEnv(s3UploadStringConfig(config, "bucket")) - if bucket == "" { - return nil, fmt.Errorf("s3_upload step %q: 'bucket' is required", name) - } - - region := os.ExpandEnv(s3UploadStringConfig(config, "region")) - if region == "" { - return nil, fmt.Errorf("s3_upload step %q: 'region' is required", name) - } - - key := s3UploadStringConfig(config, "key") - if key == "" { - return nil, fmt.Errorf("s3_upload step %q: 'key' is required", name) - } - - bodyFrom := s3UploadStringConfig(config, "body_from") - if bodyFrom == "" { - return nil, fmt.Errorf("s3_upload step %q: 'body_from' is required", name) - } - - return &S3UploadStep{ - name: name, - bucket: bucket, - region: region, - key: key, - bodyFrom: bodyFrom, - contentTypeFrom: s3UploadStringConfig(config, "content_type_from"), - contentType: s3UploadStringConfig(config, "content_type"), - endpoint: os.ExpandEnv(s3UploadStringConfig(config, "endpoint")), - tmpl: NewTemplateEngine(), - }, nil - } -} - -// s3UploadStringConfig extracts a string value from a config map, returning "" -// if the key is absent or the value is not a string. -func s3UploadStringConfig(config map[string]any, key string) string { - v, _ := config[key].(string) - return v -} - -// Name returns the step name. -func (s *S3UploadStep) Name() string { return s.name } - -// Execute uploads binary data from the pipeline context to S3 and returns the -// public URL, the resolved key, and the bucket name as step output. -func (s *S3UploadStep) Execute(ctx context.Context, pc *PipelineContext) (*StepResult, error) { - // Resolve the key template (supports {{ .field }} and {{ uuid }} etc.) - resolvedKey, err := s.tmpl.Resolve(s.key, pc) - if err != nil { - return nil, fmt.Errorf("s3_upload step %q: failed to resolve key template: %w", s.name, err) - } - - // Resolve body_from dot-path to obtain the base64-encoded body. - bodyVal, err := s.resolveFromPath(pc, s.bodyFrom) - if err != nil { - return nil, fmt.Errorf("s3_upload step %q: body_from %q: %w", s.name, s.bodyFrom, err) - } - - bodyStr, ok := bodyVal.(string) - if !ok { - return nil, fmt.Errorf("s3_upload step %q: body_from value must be a base64-encoded string, got %T", s.name, bodyVal) - } - - bodyBytes, err := s3UploadDecodeBase64(bodyStr) - if err != nil { - return nil, fmt.Errorf("s3_upload step %q: failed to base64-decode body: %w", s.name, err) - } - - // Resolve content type (content_type_from takes precedence over content_type). - contentType, err := s.resolveContentType(pc) - if err != nil { - return nil, fmt.Errorf("s3_upload step %q: %w", s.name, err) - } - - // Obtain the S3 client (injected or built from config). - client, err := s.getClient(ctx) - if err != nil { - return nil, fmt.Errorf("s3_upload step %q: failed to build S3 client: %w", s.name, err) - } - - input := &s3.PutObjectInput{ - Bucket: &s.bucket, - Key: &resolvedKey, - Body: bytes.NewReader(bodyBytes), - } - if contentType != "" { - input.ContentType = &contentType - } - - if _, err = client.PutObject(ctx, input); err != nil { - return nil, fmt.Errorf("s3_upload step %q: PutObject failed: %w", s.name, err) - } - - return &StepResult{Output: map[string]any{ - "url": s.buildURL(resolvedKey), - "key": resolvedKey, - "bucket": s.bucket, - }}, nil -} - -// resolveFromPath walks a dot-separated path (e.g. "steps.parse.data") through -// the pipeline context, including step outputs under the "steps" key. -func (s *S3UploadStep) resolveFromPath(pc *PipelineContext, path string) (any, error) { - data := make(map[string]any, len(pc.Current)+1) - for k, v := range pc.Current { - data[k] = v - } - if len(pc.StepOutputs) > 0 { - steps := make(map[string]any, len(pc.StepOutputs)) - for k, v := range pc.StepOutputs { - steps[k] = v - } - data["steps"] = steps - } - return resolveDottedPath(data, path) -} - -// resolveContentType returns the effective content type: -// content_type_from (dot-path lookup) takes precedence; falls back to the -// static content_type field. -func (s *S3UploadStep) resolveContentType(pc *PipelineContext) (string, error) { - if s.contentTypeFrom != "" { - ctVal, err := s.resolveFromPath(pc, s.contentTypeFrom) - if err != nil { - return "", fmt.Errorf("content_type_from %q: %w", s.contentTypeFrom, err) - } - if ct, ok := ctVal.(string); ok { - return ct, nil - } - } - return s.contentType, nil -} - -// getClient returns the injected client if set, otherwise builds a new AWS S3 -// client from the step's region and optional custom endpoint. -func (s *S3UploadStep) getClient(ctx context.Context) (s3PutObjectAPI, error) { - if s.s3Client != nil { - return s.s3Client, nil - } - - cfg, err := awsconfig.LoadDefaultConfig(ctx, awsconfig.WithRegion(s.region)) - if err != nil { - return nil, fmt.Errorf("failed to load AWS config: %w", err) - } - - var s3Opts []func(*s3.Options) - if s.endpoint != "" { - ep := s.endpoint - s3Opts = append(s3Opts, func(o *s3.Options) { - o.BaseEndpoint = &ep - o.UsePathStyle = true - }) - } - - return s3.NewFromConfig(cfg, s3Opts...), nil -} - -// buildURL constructs the public URL for the uploaded object. -// When a custom endpoint is configured (MinIO, LocalStack, etc.) it uses -// path-style: {endpoint}/{bucket}/{key}. -// Otherwise it uses the standard AWS virtual-hosted URL: -// https://{bucket}.s3.{region}.amazonaws.com/{key}. -func (s *S3UploadStep) buildURL(key string) string { - if s.endpoint != "" { - ep := strings.TrimRight(s.endpoint, "/") - return fmt.Sprintf("%s/%s/%s", ep, s.bucket, key) - } - return fmt.Sprintf("https://%s.s3.%s.amazonaws.com/%s", s.bucket, s.region, key) -} - -// s3UploadDecodeBase64 attempts standard base64, then URL-safe base64, -// then the raw (no-padding) variants, returning the first successful decode. -func s3UploadDecodeBase64(encoded string) ([]byte, error) { - if b, err := base64.StdEncoding.DecodeString(encoded); err == nil { - return b, nil - } - if b, err := base64.URLEncoding.DecodeString(encoded); err == nil { - return b, nil - } - if b, err := base64.RawStdEncoding.DecodeString(encoded); err == nil { - return b, nil - } - b, err := base64.RawURLEncoding.DecodeString(encoded) - if err != nil { - return nil, err - } - return b, nil -} diff --git a/module/pipeline_step_s3_upload_test.go b/module/pipeline_step_s3_upload_test.go deleted file mode 100644 index 6429b728..00000000 --- a/module/pipeline_step_s3_upload_test.go +++ /dev/null @@ -1,335 +0,0 @@ -package module - -import ( - "context" - "encoding/base64" - "strings" - "testing" - - "github.com/aws/aws-sdk-go-v2/service/s3" -) - -// mockS3Uploader is an in-memory S3 client for testing. -type mockS3Uploader struct { - lastInput *s3.PutObjectInput - err error -} - -func (m *mockS3Uploader) PutObject(_ context.Context, input *s3.PutObjectInput, _ ...func(*s3.Options)) (*s3.PutObjectOutput, error) { - m.lastInput = input - return &s3.PutObjectOutput{}, m.err -} - -func TestS3UploadStep_BasicUpload(t *testing.T) { - mock := &mockS3Uploader{} - factory := NewS3UploadStepFactory() - step, err := factory("upload", map[string]any{ - "bucket": "my-bucket", - "region": "us-east-1", - "key": "files/test.png", - "body_from": "file_data", - }, nil) - if err != nil { - t.Fatalf("factory error: %v", err) - } - step.(*S3UploadStep).s3Client = mock - - body := base64.StdEncoding.EncodeToString([]byte("hello world")) - pc := NewPipelineContext(map[string]any{"file_data": body}, nil) - - result, err := step.Execute(context.Background(), pc) - if err != nil { - t.Fatalf("execute error: %v", err) - } - - if result.Output["key"] != "files/test.png" { - t.Errorf("expected key 'files/test.png', got %v", result.Output["key"]) - } - if result.Output["bucket"] != "my-bucket" { - t.Errorf("expected bucket 'my-bucket', got %v", result.Output["bucket"]) - } - want := "https://my-bucket.s3.us-east-1.amazonaws.com/files/test.png" - if result.Output["url"] != want { - t.Errorf("expected url %q, got %v", want, result.Output["url"]) - } -} - -func TestS3UploadStep_TemplatedKey(t *testing.T) { - mock := &mockS3Uploader{} - factory := NewS3UploadStepFactory() - step, err := factory("upload-tmpl", map[string]any{ - "bucket": "avatars", - "region": "us-west-2", - "key": "avatars/{{ .user_id }}/photo.{{ .ext }}", - "body_from": "data", - }, nil) - if err != nil { - t.Fatalf("factory error: %v", err) - } - step.(*S3UploadStep).s3Client = mock - - body := base64.StdEncoding.EncodeToString([]byte("png-data")) - pc := NewPipelineContext(map[string]any{ - "data": body, - "user_id": "u-123", - "ext": "png", - }, nil) - - result, err := step.Execute(context.Background(), pc) - if err != nil { - t.Fatalf("execute error: %v", err) - } - - if result.Output["key"] != "avatars/u-123/photo.png" { - t.Errorf("expected resolved key, got %v", result.Output["key"]) - } -} - -func TestS3UploadStep_StaticContentType(t *testing.T) { - mock := &mockS3Uploader{} - factory := NewS3UploadStepFactory() - step, err := factory("upload-ct", map[string]any{ - "bucket": "my-bucket", - "region": "us-east-1", - "key": "img.png", - "body_from": "data", - "content_type": "image/png", - }, nil) - if err != nil { - t.Fatalf("factory error: %v", err) - } - step.(*S3UploadStep).s3Client = mock - - body := base64.StdEncoding.EncodeToString([]byte("png")) - pc := NewPipelineContext(map[string]any{"data": body}, nil) - - _, err = step.Execute(context.Background(), pc) - if err != nil { - t.Fatalf("execute error: %v", err) - } - - if mock.lastInput == nil { - t.Fatal("expected PutObject to be called") - } - if mock.lastInput.ContentType == nil || *mock.lastInput.ContentType != "image/png" { - t.Errorf("expected ContentType 'image/png', got %v", mock.lastInput.ContentType) - } -} - -func TestS3UploadStep_ContentTypeFrom(t *testing.T) { - mock := &mockS3Uploader{} - factory := NewS3UploadStepFactory() - step, err := factory("upload-ctf", map[string]any{ - "bucket": "my-bucket", - "region": "us-east-1", - "key": "upload", - "body_from": "data", - "content_type_from": "mime", - }, nil) - if err != nil { - t.Fatalf("factory error: %v", err) - } - step.(*S3UploadStep).s3Client = mock - - body := base64.StdEncoding.EncodeToString([]byte("bytes")) - pc := NewPipelineContext(map[string]any{ - "data": body, - "mime": "image/jpeg", - }, nil) - - _, err = step.Execute(context.Background(), pc) - if err != nil { - t.Fatalf("execute error: %v", err) - } - - if mock.lastInput == nil || mock.lastInput.ContentType == nil || *mock.lastInput.ContentType != "image/jpeg" { - t.Errorf("expected ContentType 'image/jpeg', got %v", mock.lastInput.ContentType) - } -} - -func TestS3UploadStep_CustomEndpoint(t *testing.T) { - mock := &mockS3Uploader{} - factory := NewS3UploadStepFactory() - step, err := factory("upload-minio", map[string]any{ - "bucket": "mybucket", - "region": "us-east-1", - "key": "obj/key", - "body_from": "data", - "endpoint": "http://localhost:9000", - }, nil) - if err != nil { - t.Fatalf("factory error: %v", err) - } - step.(*S3UploadStep).s3Client = mock - - body := base64.StdEncoding.EncodeToString([]byte("data")) - pc := NewPipelineContext(map[string]any{"data": body}, nil) - - result, err := step.Execute(context.Background(), pc) - if err != nil { - t.Fatalf("execute error: %v", err) - } - - want := "http://localhost:9000/mybucket/obj/key" - if result.Output["url"] != want { - t.Errorf("expected url %q, got %v", want, result.Output["url"]) - } -} - -func TestS3UploadStep_BodyFromStepOutput(t *testing.T) { - mock := &mockS3Uploader{} - factory := NewS3UploadStepFactory() - step, err := factory("upload-step", map[string]any{ - "bucket": "bucket", - "region": "us-east-1", - "key": "file", - "body_from": "steps.parse.raw_data", - }, nil) - if err != nil { - t.Fatalf("factory error: %v", err) - } - step.(*S3UploadStep).s3Client = mock - - body := base64.StdEncoding.EncodeToString([]byte("content")) - pc := NewPipelineContext(nil, nil) - pc.MergeStepOutput("parse", map[string]any{"raw_data": body}) - - result, err := step.Execute(context.Background(), pc) - if err != nil { - t.Fatalf("execute error: %v", err) - } - if result.Output["key"] != "file" { - t.Errorf("expected key 'file', got %v", result.Output["key"]) - } -} - -func TestS3UploadStep_MissingRequiredFields(t *testing.T) { - factory := NewS3UploadStepFactory() - - tests := []struct { - name string - config map[string]any - wantErr string - }{ - { - name: "missing bucket", - config: map[string]any{"region": "us-east-1", "key": "k", "body_from": "b"}, - wantErr: "'bucket' is required", - }, - { - name: "missing region", - config: map[string]any{"bucket": "b", "key": "k", "body_from": "b"}, - wantErr: "'region' is required", - }, - { - name: "missing key", - config: map[string]any{"bucket": "b", "region": "us-east-1", "body_from": "b"}, - wantErr: "'key' is required", - }, - { - name: "missing body_from", - config: map[string]any{"bucket": "b", "region": "us-east-1", "key": "k"}, - wantErr: "'body_from' is required", - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - _, err := factory("test-step", tt.config, nil) - if err == nil { - t.Fatal("expected error, got nil") - } - if !strings.Contains(err.Error(), tt.wantErr) { - t.Errorf("expected error containing %q, got %q", tt.wantErr, err.Error()) - } - }) - } -} - -func TestS3UploadStep_InvalidBase64(t *testing.T) { - mock := &mockS3Uploader{} - factory := NewS3UploadStepFactory() - step, err := factory("upload-bad", map[string]any{ - "bucket": "b", - "region": "us-east-1", - "key": "k", - "body_from": "data", - }, nil) - if err != nil { - t.Fatalf("factory error: %v", err) - } - step.(*S3UploadStep).s3Client = mock - - pc := NewPipelineContext(map[string]any{"data": "not-valid-base64!!!"}, nil) - _, err = step.Execute(context.Background(), pc) - if err == nil { - t.Fatal("expected error for invalid base64") - } - if !strings.Contains(err.Error(), "base64-decode") { - t.Errorf("expected base64 decode error, got %q", err.Error()) - } -} - -func TestS3UploadStep_NonStringBody(t *testing.T) { - mock := &mockS3Uploader{} - factory := NewS3UploadStepFactory() - step, err := factory("upload-nonstr", map[string]any{ - "bucket": "b", - "region": "us-east-1", - "key": "k", - "body_from": "data", - }, nil) - if err != nil { - t.Fatalf("factory error: %v", err) - } - step.(*S3UploadStep).s3Client = mock - - pc := NewPipelineContext(map[string]any{"data": 12345}, nil) - _, err = step.Execute(context.Background(), pc) - if err == nil { - t.Fatal("expected error for non-string body") - } - if !strings.Contains(err.Error(), "base64-encoded string") { - t.Errorf("expected type error, got %q", err.Error()) - } -} - -func TestS3UploadStep_URLEncoding(t *testing.T) { - // Ensure URL-safe base64 is also accepted. - mock := &mockS3Uploader{} - factory := NewS3UploadStepFactory() - step, err := factory("upload-urlb64", map[string]any{ - "bucket": "b", - "region": "us-east-1", - "key": "k", - "body_from": "data", - }, nil) - if err != nil { - t.Fatalf("factory error: %v", err) - } - step.(*S3UploadStep).s3Client = mock - - // Use URL-safe base64 encoding. - body := base64.URLEncoding.EncodeToString([]byte("url-safe content")) - pc := NewPipelineContext(map[string]any{"data": body}, nil) - _, err = step.Execute(context.Background(), pc) - if err != nil { - t.Fatalf("execute error: %v", err) - } -} - -func TestS3UploadStep_Name(t *testing.T) { - factory := NewS3UploadStepFactory() - step, err := factory("my-upload", map[string]any{ - "bucket": "b", - "region": "us-east-1", - "key": "k", - "body_from": "d", - }, nil) - if err != nil { - t.Fatalf("factory error: %v", err) - } - if step.Name() != "my-upload" { - t.Errorf("expected name 'my-upload', got %q", step.Name()) - } -} diff --git a/module/s3_storage.go b/module/s3_storage.go deleted file mode 100644 index a1990093..00000000 --- a/module/s3_storage.go +++ /dev/null @@ -1,162 +0,0 @@ -package module - -import ( - "context" - "fmt" - "io" - - "github.com/GoCodeAlone/modular" - awsconfig "github.com/aws/aws-sdk-go-v2/config" - "github.com/aws/aws-sdk-go-v2/service/s3" -) - -// S3Storage provides object storage operations using AWS S3. -// It implements the modular.Module interface. -type S3Storage struct { - name string - bucket string - region string - endpoint string - client *s3.Client - logger modular.Logger -} - -// NewS3Storage creates a new S3 storage module. -func NewS3Storage(name string) *S3Storage { - return &S3Storage{ - name: name, - region: "us-east-1", - logger: &noopLogger{}, - } -} - -// Name returns the module name. -func (s *S3Storage) Name() string { - return s.name -} - -// Init initializes the module with the application context. -func (s *S3Storage) Init(app modular.Application) error { - s.logger = app.Logger() - return nil -} - -// ProvidesServices returns the services provided by this module. -func (s *S3Storage) ProvidesServices() []modular.ServiceProvider { - return []modular.ServiceProvider{ - { - Name: s.name, - Description: "S3 Storage", - Instance: s, - }, - } -} - -// RequiresServices returns the services required by this module. -func (s *S3Storage) RequiresServices() []modular.ServiceDependency { - return nil -} - -// SetBucket sets the S3 bucket name. -func (s *S3Storage) SetBucket(bucket string) { - s.bucket = bucket -} - -// SetRegion sets the AWS region. -func (s *S3Storage) SetRegion(region string) { - s.region = region -} - -// SetEndpoint sets a custom endpoint (for LocalStack/MinIO). -func (s *S3Storage) SetEndpoint(endpoint string) { - s.endpoint = endpoint -} - -// SetClient sets a custom S3 client (useful for testing). -func (s *S3Storage) SetClient(client *s3.Client) { - s.client = client -} - -// Start initializes the S3 client. -func (s *S3Storage) Start(ctx context.Context) error { - opts := []func(*awsconfig.LoadOptions) error{ - awsconfig.WithRegion(s.region), - } - - cfg, err := awsconfig.LoadDefaultConfig(ctx, opts...) - if err != nil { - return fmt.Errorf("failed to load AWS config: %w", err) - } - - s3Opts := []func(*s3.Options){} - if s.endpoint != "" { - s3Opts = append(s3Opts, func(o *s3.Options) { - o.BaseEndpoint = &s.endpoint - o.UsePathStyle = true - }) - } - - s.client = s3.NewFromConfig(cfg, s3Opts...) - s.logger.Info("S3 storage started", "bucket", s.bucket, "region", s.region) - return nil -} - -// Stop is a no-op for S3 storage. -func (s *S3Storage) Stop(_ context.Context) error { - s.logger.Info("S3 storage stopped") - return nil -} - -// PutObject uploads an object to S3. -func (s *S3Storage) PutObject(ctx context.Context, key string, body io.Reader) error { - if s.client == nil { - return fmt.Errorf("S3 client not initialized; call Start first") - } - - _, err := s.client.PutObject(ctx, &s3.PutObjectInput{ - Bucket: &s.bucket, - Key: &key, - Body: body, - }) - if err != nil { - return fmt.Errorf("failed to put object %q: %w", key, err) - } - - s.logger.Info("Object uploaded", "key", key, "bucket", s.bucket) - return nil -} - -// GetObject retrieves an object from S3. -func (s *S3Storage) GetObject(ctx context.Context, key string) (io.ReadCloser, error) { - if s.client == nil { - return nil, fmt.Errorf("S3 client not initialized; call Start first") - } - - result, err := s.client.GetObject(ctx, &s3.GetObjectInput{ - Bucket: &s.bucket, - Key: &key, - }) - if err != nil { - return nil, fmt.Errorf("failed to get object %q: %w", key, err) - } - - return result.Body, nil -} - -// DeleteObject removes an object from S3. -func (s *S3Storage) DeleteObject(ctx context.Context, key string) error { - if s.client == nil { - return fmt.Errorf("S3 client not initialized; call Start first") - } - - _, err := s.client.DeleteObject(ctx, &s3.DeleteObjectInput{ - Bucket: &s.bucket, - Key: &key, - }) - if err != nil { - return fmt.Errorf("failed to delete object %q: %w", key, err) - } - - s.logger.Info("Object deleted", "key", key, "bucket", s.bucket) - return nil -} diff --git a/module/s3_storage_test.go b/module/s3_storage_test.go deleted file mode 100644 index e97eee3b..00000000 --- a/module/s3_storage_test.go +++ /dev/null @@ -1,93 +0,0 @@ -package module - -import ( - "context" - "testing" -) - -func TestS3StorageName(t *testing.T) { - s := NewS3Storage("s3-test") - if s.Name() != "s3-test" { - t.Errorf("expected name 's3-test', got %q", s.Name()) - } -} - -func TestS3StorageModuleInterface(t *testing.T) { - s := NewS3Storage("s3-test") - - // Test Init - app, _ := NewTestApplication() - if err := s.Init(app); err != nil { - t.Fatalf("Init failed: %v", err) - } - - // Test ProvidesServices - services := s.ProvidesServices() - if len(services) != 1 { - t.Fatalf("expected 1 service, got %d", len(services)) - } - if services[0].Name != "s3-test" { - t.Errorf("expected service name 's3-test', got %q", services[0].Name) - } - - // Test RequiresServices - deps := s.RequiresServices() - if len(deps) != 0 { - t.Errorf("expected no dependencies, got %d", len(deps)) - } -} - -func TestS3StorageConfig(t *testing.T) { - s := NewS3Storage("s3-test") - - // Test defaults - if s.region != "us-east-1" { - t.Errorf("expected default region 'us-east-1', got %q", s.region) - } - - // Test setters - s.SetBucket("my-bucket") - if s.bucket != "my-bucket" { - t.Errorf("expected bucket 'my-bucket', got %q", s.bucket) - } - - s.SetRegion("eu-west-1") - if s.region != "eu-west-1" { - t.Errorf("expected region 'eu-west-1', got %q", s.region) - } - - s.SetEndpoint("http://localhost:4566") - if s.endpoint != "http://localhost:4566" { - t.Errorf("expected endpoint 'http://localhost:4566', got %q", s.endpoint) - } -} - -func TestS3StorageOperationsWithoutClient(t *testing.T) { - s := NewS3Storage("s3-test") - - ctx := context.Background() - - // Operations should fail without Start - if err := s.PutObject(ctx, "key", nil); err == nil { - t.Error("PutObject should fail without initialized client") - } - - if _, err := s.GetObject(ctx, "key"); err == nil { - t.Error("GetObject should fail without initialized client") - } - - if err := s.DeleteObject(ctx, "key"); err == nil { - t.Error("DeleteObject should fail without initialized client") - } -} - -func TestS3StorageStop(t *testing.T) { - s := NewS3Storage("s3-test") - app, _ := NewTestApplication() - _ = s.Init(app) - - // Stop should be a no-op and not error - if err := s.Stop(context.Background()); err != nil { - t.Fatalf("Stop failed: %v", err) - } -} diff --git a/plugins/pipelinesteps/plugin.go b/plugins/pipelinesteps/plugin.go index 955be17d..f28782fb 100644 --- a/plugins/pipelinesteps/plugin.go +++ b/plugins/pipelinesteps/plugin.go @@ -4,7 +4,7 @@ // raw_response, json_parse, static_file, validate_path_param, validate_pagination, // validate_request_body, foreach, while, webhook_verify, base64_decode, ui_scaffold, // ui_scaffold_analyze, dlq_send, dlq_replay, retry_with_backoff, circuit_breaker (wrapping), -// s3_upload, auth_validate, authz_check, token_revoke, sandbox_exec. +// auth_validate, authz_check, token_revoke, sandbox_exec. // It also provides the PipelineWorkflowHandler for composable pipelines. package pipelinesteps @@ -90,7 +90,6 @@ func New() *Plugin { "step.dlq_replay", "step.retry_with_backoff", "step.resilient_circuit_breaker", - "step.s3_upload", "step.auth_validate", "step.authz_check", "step.token_revoke", @@ -180,7 +179,6 @@ func (p *Plugin) StepFactories() map[string]plugin.StepFactory { "step.resilient_circuit_breaker": wrapStepFactory(module.NewResilienceCircuitBreakerStepFactory(func() *module.StepRegistry { return p.concreteStepRegistry })), - "step.s3_upload": wrapStepFactory(module.NewS3UploadStepFactory()), "step.auth_validate": wrapStepFactory(module.NewAuthValidateStepFactory()), "step.authz_check": wrapStepFactory(module.NewAuthzCheckStepFactory()), "step.token_revoke": wrapStepFactory(module.NewTokenRevokeStepFactory()), diff --git a/plugins/pipelinesteps/plugin_test.go b/plugins/pipelinesteps/plugin_test.go index 36f21d5e..73538b39 100644 --- a/plugins/pipelinesteps/plugin_test.go +++ b/plugins/pipelinesteps/plugin_test.go @@ -68,7 +68,6 @@ func TestStepFactories(t *testing.T) { "step.dlq_replay", "step.retry_with_backoff", "step.resilient_circuit_breaker", - "step.s3_upload", "step.auth_validate", "step.authz_check", "step.token_revoke", diff --git a/plugins/storage/plugin.go b/plugins/storage/plugin.go index 350920ce..febbe33e 100644 --- a/plugins/storage/plugin.go +++ b/plugins/storage/plugin.go @@ -11,7 +11,7 @@ import ( "github.com/GoCodeAlone/workflow/schema" ) -// Plugin provides storage and database capabilities: storage.s3, storage.local, +// Plugin provides storage and database capabilities: storage.local, // storage.gcs, storage.sqlite, storage.artifact, database.workflow, // persistence.store, cache.redis modules, and artifact pipeline step factories. type Plugin struct { @@ -34,7 +34,6 @@ func New() *Plugin { Description: "Storage, database, persistence, and cache modules with DB pipeline steps", Tier: plugin.TierCore, ModuleTypes: []string{ - "storage.s3", "storage.local", "storage.gcs", "storage.sqlite", @@ -86,19 +85,6 @@ func (p *Plugin) Capabilities() []capability.Contract { // ModuleFactories returns factories for all storage/database/persistence module types. func (p *Plugin) ModuleFactories() map[string]plugin.ModuleFactory { return map[string]plugin.ModuleFactory{ - "storage.s3": func(name string, cfg map[string]any) modular.Module { - s3Mod := module.NewS3Storage(name) - if bucket, ok := cfg["bucket"].(string); ok { - s3Mod.SetBucket(bucket) - } - if region, ok := cfg["region"].(string); ok { - s3Mod.SetRegion(region) - } - if endpoint, ok := cfg["endpoint"].(string); ok { - s3Mod.SetEndpoint(endpoint) - } - return s3Mod - }, "storage.local": func(name string, cfg map[string]any) modular.Module { rootDir := "./data/storage" if rd, ok := cfg["rootDir"].(string); ok { @@ -322,20 +308,6 @@ func wrapStepFactory(f module.StepFactory) plugin.StepFactory { // ModuleSchemas returns UI schema definitions for storage/database module types. func (p *Plugin) ModuleSchemas() []*schema.ModuleSchema { return []*schema.ModuleSchema{ - { - Type: "storage.s3", - Label: "S3 Storage", - Category: "integration", - Description: "Amazon S3 compatible object storage integration", - Inputs: []schema.ServiceIODef{{Name: "object", Type: "[]byte", Description: "Object data to store or retrieve"}}, - Outputs: []schema.ServiceIODef{{Name: "storage", Type: "ObjectStore", Description: "S3-compatible object storage service"}}, - ConfigFields: []schema.ConfigFieldDef{ - {Key: "bucket", Label: "Bucket", Type: schema.FieldTypeString, Required: true, Description: "S3 bucket name", Placeholder: "my-bucket"}, - {Key: "region", Label: "Region", Type: schema.FieldTypeString, DefaultValue: "us-east-1", Description: "AWS region", Placeholder: "us-east-1"}, - {Key: "endpoint", Label: "Endpoint", Type: schema.FieldTypeString, Description: "Custom S3 endpoint (for MinIO, etc.)", Placeholder: "http://localhost:9000"}, - }, - DefaultConfig: map[string]any{"region": "us-east-1"}, - }, { Type: "storage.local", Label: "Local Storage", diff --git a/plugins/storage/plugin_test.go b/plugins/storage/plugin_test.go index 83080a23..80fc6ca2 100644 --- a/plugins/storage/plugin_test.go +++ b/plugins/storage/plugin_test.go @@ -21,8 +21,8 @@ func TestPluginManifest(t *testing.T) { if m.Name != "storage" { t.Errorf("expected name %q, got %q", "storage", m.Name) } - if len(m.ModuleTypes) != 9 { - t.Errorf("expected 9 module types, got %d", len(m.ModuleTypes)) + if len(m.ModuleTypes) != 8 { + t.Errorf("expected 8 module types, got %d", len(m.ModuleTypes)) } if len(m.StepTypes) != 4 { t.Errorf("expected 4 step types, got %d", len(m.StepTypes)) @@ -51,7 +51,7 @@ func TestModuleFactories(t *testing.T) { factories := p.ModuleFactories() expectedTypes := []string{ - "storage.s3", "storage.local", "storage.gcs", + "storage.local", "storage.gcs", "storage.sqlite", "database.workflow", "persistence.store", "cache.redis", } @@ -72,18 +72,8 @@ func TestModuleFactoryWithConfig(t *testing.T) { p := New() factories := p.ModuleFactories() - // storage.s3 with config - mod := factories["storage.s3"]("s3-test", map[string]any{ - "bucket": "test-bucket", - "region": "eu-west-1", - "endpoint": "http://localhost:9000", - }) - if mod == nil { - t.Fatal("storage.s3 factory returned nil with config") - } - // storage.sqlite with config - mod = factories["storage.sqlite"]("sqlite-test", map[string]any{ + mod := factories["storage.sqlite"]("sqlite-test", map[string]any{ "dbPath": "test.db", "maxConnections": float64(10), "walMode": false, @@ -134,8 +124,8 @@ func TestStepFactories(t *testing.T) { func TestModuleSchemas(t *testing.T) { p := New() schemas := p.ModuleSchemas() - if len(schemas) != 9 { - t.Fatalf("expected 9 module schemas, got %d", len(schemas)) + if len(schemas) != 8 { + t.Fatalf("expected 8 module schemas, got %d", len(schemas)) } types := map[string]bool{} @@ -143,7 +133,7 @@ func TestModuleSchemas(t *testing.T) { types[s.Type] = true } expectedTypes := []string{ - "storage.s3", "storage.local", "storage.gcs", + "storage.local", "storage.gcs", "storage.sqlite", "database.workflow", "database.partitioned", "persistence.store", "cache.redis", }