-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathiac_module.go
More file actions
147 lines (135 loc) · 4.94 KB
/
iac_module.go
File metadata and controls
147 lines (135 loc) · 4.94 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
package module
import (
"context"
"fmt"
"github.com/GoCodeAlone/modular"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
// IaCModule registers an IaCStateStore in the service registry.
// Supported in-core backends: "memory" (default), "filesystem", "spaces"
// (DigitalOcean Spaces / S3-compatible), "gcs", and "postgres" — plus any
// backend provided by a loaded plugin (e.g. "azure_blob" via
// workflow-plugin-azure).
//
// Config example:
//
// modules:
// - name: iac-state
// type: iac.state
// config:
// backend: filesystem
// directory: /var/lib/workflow/iac-state
type IaCModule struct {
name string
backend string
config map[string]any
store IaCStateStore
}
// NewIaCModule creates a new IaC state module.
func NewIaCModule(name string, cfg map[string]any) *IaCModule {
return &IaCModule{name: name, config: cfg}
}
// Name returns the module name.
func (m *IaCModule) Name() string { return m.name }
// Init constructs the state store backend and registers it as a service.
func (m *IaCModule) Init(app modular.Application) error {
m.backend, _ = m.config["backend"].(string)
if m.backend == "" {
m.backend = "memory"
}
switch m.backend {
case "memory":
m.store = NewMemoryIaCStateStore()
case "filesystem":
dir, _ := m.config["directory"].(string)
if dir == "" {
dir = "/var/lib/workflow/iac-state"
}
m.store = NewFSIaCStateStore(dir)
case "spaces":
region, _ := m.config["region"].(string)
bucket, _ := m.config["bucket"].(string)
prefix, _ := m.config["prefix"].(string)
accessKey, _ := m.config["accessKey"].(string)
secretKey, _ := m.config["secretKey"].(string)
endpoint, _ := m.config["endpoint"].(string)
if bucket == "" {
return fmt.Errorf("iac.state %q: spaces backend requires 'bucket' config", m.name)
}
store, err := NewSpacesIaCStateStore(region, bucket, prefix, accessKey, secretKey, endpoint)
if err != nil {
return fmt.Errorf("iac.state %q: spaces backend: %w", m.name, err)
}
m.store = store
case "gcs":
bucket, _ := m.config["bucket"].(string)
prefix, _ := m.config["prefix"].(string)
if bucket == "" {
return fmt.Errorf("iac.state %q: gcs backend requires 'bucket' config", m.name)
}
store, err := NewGCSIaCStateStore(context.Background(), bucket, prefix)
if err != nil {
return fmt.Errorf("iac.state %q: gcs backend: %w", m.name, err)
}
m.store = store
case "postgres":
dsn, _ := m.config["dsn"].(string)
if dsn == "" {
return fmt.Errorf("iac.state %q: postgres backend requires 'dsn' config", m.name)
}
store, err := NewPostgresIaCStateStore(context.Background(), dsn)
if err != nil {
return fmt.Errorf("iac.state %q: postgres backend: %w", m.name, err)
}
m.store = store
default:
// Not a core in-process backend — consult the plugin-backend registry.
// The engine populates iacStateBackendRegistryInstance at plugin-load
// time; a resolved backend is served over gRPC via grpcIaCStateStore.
if client, ok := iacStateBackendRegistryInstance.resolve(m.backend); ok {
store := newGRPCIaCStateStore(client)
if err := store.Configure(context.Background(), m.backend, m.config); err != nil {
// codes.Unimplemented means the loaded plugin is an older build
// without the Configure RPC — co-deploy requirement of
// decisions/0036. Give the operator an actionable upgrade hint.
if status.Code(err) == codes.Unimplemented {
return fmt.Errorf("iac.state %q: backend %q: the loaded plugin does not implement the "+
"Configure RPC — upgrade the backend plugin to a version that supports Configure "+
"(see decisions/0036): %w", m.name, m.backend, err)
}
return fmt.Errorf("iac.state %q: backend %q: configure plugin backend: %w", m.name, m.backend, err)
}
m.store = store
break
}
return fmt.Errorf("iac.state %q: backend %q is not built into workflow core "+
"(in-core backends: 'memory', 'filesystem', 'spaces', 'gcs', 'postgres'). "+
"If %q is a plugin-provided backend (e.g. 'azure_blob' via workflow-plugin-azure), "+
"install and load that plugin", m.name, m.backend, m.backend)
}
return app.RegisterService(m.name, m.store)
}
// ProvidesServices declares the IaCStateStore service.
func (m *IaCModule) ProvidesServices() []modular.ServiceProvider {
return []modular.ServiceProvider{
{
Name: m.name,
Description: "IaC state store (" + m.backend + "): " + m.name,
Instance: m.store,
},
}
}
// RequiresServices returns nil — iac.state has no service dependencies.
func (m *IaCModule) RequiresServices() []modular.ServiceDependency { return nil }
// Start is a no-op for the memory backend; the filesystem backend creates the directory.
func (m *IaCModule) Start(_ context.Context) error {
if fs, ok := m.store.(*FSIaCStateStore); ok {
if err := fs.ensureDir(); err != nil {
return fmt.Errorf("iac.state %q: Start: %w", m.name, err)
}
}
return nil
}
// Stop is a no-op.
func (m *IaCModule) Stop(_ context.Context) error { return nil }