Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 130 additions & 6 deletions cmd/wfctl/infra_state_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
"github.com/GoCodeAlone/workflow/config"
"github.com/GoCodeAlone/workflow/interfaces"
"github.com/GoCodeAlone/workflow/module"
"github.com/GoCodeAlone/workflow/plugin/external"
pb "github.com/GoCodeAlone/workflow/plugin/external/proto"
)

// infraStateStore is the minimal state persistence interface used by wfctl
Expand Down Expand Up @@ -86,16 +88,13 @@ func resolveStateStore(cfgFile, envName string) (infraStateStore, error) {
return resolvePostgresStateStore(cfg)

case "spaces":
return nil, fmt.Errorf("iac.state backend %q is now plugin-served by workflow-plugin-digitalocean v1.1.0; "+
"install and load the plugin to use the Spaces backend (wfctl direct-path commands no longer support in-tree spaces)", backend)
return resolvePluginStateStore(context.Background(), backend, cfg)

case "s3":
return nil, fmt.Errorf("iac.state backend %q is now plugin-served by workflow-plugin-aws v1.1.0; "+
"install and load the plugin to use the S3 backend (wfctl direct-path commands no longer support in-tree s3)", backend)
return resolvePluginStateStore(context.Background(), backend, cfg)

case "gcs":
return nil, fmt.Errorf("iac.state backend %q is now plugin-served by workflow-plugin-gcp v1.1.0; "+
"install and load the plugin to use the GCS backend (wfctl direct-path commands no longer support in-tree gcs)", backend)
return resolvePluginStateStore(context.Background(), backend, cfg)
Comment on lines 90 to +97

case "azure":
return nil, fmt.Errorf("azure state store backend not yet supported by wfctl direct-path commands; " +
Expand All @@ -107,6 +106,131 @@ func resolveStateStore(cfgFile, envName string) (infraStateStore, error) {
}
}

type pluginWfctlStateStore struct {
inner module.IaCStateStore
mgr *external.ExternalPluginManager
}

func (s *pluginWfctlStateStore) ListResources(ctx context.Context) ([]interfaces.ResourceState, error) {
states, err := s.inner.ListStates(ctx, nil)
if err != nil {
return nil, err
}
out := make([]interfaces.ResourceState, 0, len(states))
for _, state := range states {
out = append(out, iacStateToResourceState(state))
}
return out, nil
}

func (s *pluginWfctlStateStore) SaveResource(ctx context.Context, state interfaces.ResourceState) error {
return s.inner.SaveState(ctx, resourceStateToIaCState(state))
}

func (s *pluginWfctlStateStore) DeleteResource(ctx context.Context, name string) error {
return s.inner.DeleteState(ctx, name)
}

func (s *pluginWfctlStateStore) Close() error {
if s.mgr == nil {
return nil
}
s.mgr.Shutdown()
return nil
}
Comment on lines +134 to +140
Comment on lines +134 to +140

func resolvePluginStateStore(ctx context.Context, backend string, cfg map[string]any) (infraStateStore, error) {
pluginDir := currentInfraPluginDir
if pluginDir == "" {
pluginDir = os.Getenv("WFCTL_PLUGIN_DIR")
}
if pluginDir == "" {
pluginDir = "./data/plugins"
}

entries, err := os.ReadDir(pluginDir)
if err != nil {
return nil, fmt.Errorf("iac.state backend %q is plugin-served but plugin directory %q is unavailable: %w", backend, pluginDir, err)
}

mgr := external.NewExternalPluginManager(pluginDir, nil)
for _, pluginName := range stateBackendPluginCandidates(backend, entries) {
clients, clientsErr := loadPluginStateBackendClients(mgr, pluginName, backend)
if clientsErr != nil {
mgr.Shutdown()
return nil, clientsErr
}
client, ok := clients[backend]
if !ok {
continue
}
store := module.NewGRPCIaCStateStore(client)
if err := store.Configure(ctx, backend, cfg); err != nil {
mgr.Shutdown()
return nil, fmt.Errorf("configure plugin-served iac.state backend %q via plugin %q: %w", backend, pluginName, err)
}
return &pluginWfctlStateStore{inner: store, mgr: mgr}, nil
}
Comment on lines +157 to +173
Comment on lines +157 to +173

mgr.Shutdown()
return nil, fmt.Errorf("iac.state backend %q is plugin-served but no installed plugin in %s advertises it", backend, pluginDir)
}
Comment on lines +156 to +177

var loadPluginStateBackendClients = func(mgr *external.ExternalPluginManager, pluginName, backend string) (map[string]pb.IaCStateBackendClient, error) {
adapter, loadErr := mgr.LoadPlugin(pluginName)
if loadErr != nil {
return nil, fmt.Errorf("load plugin %q for iac.state backend %q: %w", pluginName, backend, loadErr)
}
clients, clientsErr := adapter.IaCStateBackendClients()
if clientsErr != nil {
return nil, fmt.Errorf("plugin %q iac.state backends: %w", pluginName, clientsErr)
}
return clients, nil
}

func stateBackendPluginCandidates(backend string, entries []os.DirEntry) []string {
seen := map[string]struct{}{}
var candidates []string
hasDir := func(name string) bool {
for _, entry := range entries {
if entry.IsDir() && entry.Name() == name {
return true
}
}
return false
}
add := func(name string) {
if strings.TrimSpace(name) == "" {
return
}
if _, ok := seen[name]; ok {
return
}
seen[name] = struct{}{}
candidates = append(candidates, name)
}
switch backend {
case "spaces":
if hasDir("digitalocean") {
add("digitalocean")
}
case "s3":
if hasDir("aws") {
add("aws")
}
case "gcs":
if hasDir("gcp") {
add("gcp")
}
}
for _, entry := range entries {
if entry.IsDir() {
add(entry.Name())
}
}
return candidates
}

// ── Filesystem backend ─────────────────────────────────────────────────────────

// fsWfctlStateStore persists ResourceState records as JSON files under a
Expand Down
Loading
Loading