Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion docs/AGENT_GUIDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,4 +52,6 @@ npm run dev
Workflow is plugin-first, but not every extension point should be moved out of
core immediately. Core keeps bootstrap-critical CLI behavior and shared
contracts; external plugins own provider-specific runtime integrations. For CI
generation, see `decisions/0045-ci-generation-boundary.md`.
generation, see `decisions/0045-ci-generation-boundary.md`. For native
provider job execution, keep the shared `IaCProviderRunner` contract in core and
implement cloud-specific runners in provider plugins.
26 changes: 26 additions & 0 deletions docs/WFCTL.md
Original file line number Diff line number Diff line change
Expand Up @@ -2271,6 +2271,32 @@ wfctl ci run --phase build,test,deploy --env production

**Deploy phase** is a placeholder in Tier 1 — full provider implementations (k8s, aws-ecs, etc.) ship in Tier 2.

#### `step.sandbox_exec` execution environments

`step.sandbox_exec` defaults to local Docker when `exec_env` is omitted or set
to `local-docker`. `exec_env: ephemeral` runs the command as a one-off Argo
Workflow through an `argo.workflows` module. `exec_env: provider-ephemeral`
runs the command through the selected IaC provider's optional
`IaCProviderRunner` capability.

```yaml
steps:
- name: migrate
type: step.sandbox_exec
config:
exec_env: provider-ephemeral
provider: digitalocean
image: registry.example.com/app-migrate:${IMAGE_SHA}
command: ["./migrate", "up"]
env:
DATABASE_URL: secret://app/database-url
```

For `provider-ephemeral`, `provider` is required and must name a registered
`iac.provider` service that advertises `IaCProviderRunner`. Secret references
in `env` are passed through for provider-side resolution; wfctl does not resolve
them to plaintext before the provider job boundary.

---

### `ci init`
Expand Down
196 changes: 196 additions & 0 deletions iac/providerclient/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ import (
"context"
"encoding/json"
"fmt"
"io"
"log"
"math"
"sync"
"time"

Expand All @@ -44,6 +46,9 @@ const (
// IaCServiceDriftDetector is the gRPC service name for the optional
// IaCProviderDriftDetector service.
IaCServiceDriftDetector = "workflow.plugin.external.iac.IaCProviderDriftDetector"
// IaCServiceRunner is the gRPC service name for the optional
// IaCProviderRunner service.
IaCServiceRunner = "workflow.plugin.external.iac.IaCProviderRunner"
)

// RegionListerProvider is a capability-discovery interface implemented by
Expand Down Expand Up @@ -110,6 +115,78 @@ type DriftDetectorProvider interface {
DriftDetector() interfaces.DriftConfigDetector
}

// RunnerProvider is a capability-discovery interface implemented by *Adapter
// when the plugin advertised IaCProviderRunner. Callers use Runner() rather
// than asserting *Adapter directly to interfaces.IaCProviderRunner so absence
// remains visible as nil.
type RunnerProvider interface {
Runner() interfaces.IaCProviderRunner
}

type runnerAdapter struct {
client pb.IaCProviderRunnerClient
}

func (r *runnerAdapter) RunJob(ctx context.Context, spec interfaces.JobSpec) (*interfaces.JobHandle, error) {
resp, err := r.client.RunJob(ctx, jobSpecToPB(spec))
if err != nil {
if status.Code(err) == codes.Unimplemented {
return nil, fmt.Errorf("%w: IaCProviderRunner not registered by plugin",
interfaces.ErrProviderMethodUnimplemented)
}
return nil, err
}
return jobHandleFromPB(resp), nil
}

func (r *runnerAdapter) JobStatus(ctx context.Context, handle interfaces.JobHandle) (*interfaces.JobStatusReply, error) {
resp, err := r.client.JobStatus(ctx, jobHandleToPB(handle))
if err != nil {
if status.Code(err) == codes.Unimplemented {
return nil, fmt.Errorf("%w: IaCProviderRunner not registered by plugin",
interfaces.ErrProviderMethodUnimplemented)
}
return nil, err
}
return jobStatusFromPB(resp), nil
}

func (r *runnerAdapter) JobLogs(ctx context.Context, handle interfaces.JobHandle, sink interfaces.LogCaptureSink) error {
stream, err := r.client.JobLogs(ctx, jobHandleToPB(handle))
if err != nil {
if status.Code(err) == codes.Unimplemented {
return fmt.Errorf("%w: IaCProviderRunner not registered by plugin",
interfaces.ErrProviderMethodUnimplemented)
}
return err
}
for {
chunk, recvErr := stream.Recv()
if recvErr != nil {
if recvErr == io.EOF {
return nil
}
if status.Code(recvErr) == codes.Unimplemented {
return fmt.Errorf("%w: IaCProviderRunner not registered by plugin",
interfaces.ErrProviderMethodUnimplemented)
}
return recvErr
}
if sink != nil {
if err := sink.WriteLogChunk(interfaces.LogChunk{
Data: append([]byte(nil), chunk.GetData()...),
Source: chunk.GetSource(),
EOF: chunk.GetEof(),
}); err != nil {
return err
}
}
if chunk.GetEof() {
return nil
}
}
}

// Adapter wraps the pb.IaCProviderRequired gRPC client (and advertisement-gated
// optional clients) as interfaces.IaCProvider. Optional sub-interfaces
// (IaCProviderRegionLister, DriftConfigDetector) are exposed via typed accessors
Expand All @@ -129,6 +206,7 @@ type Adapter struct {
required pb.IaCProviderRequiredClient
regionLister *regionListerImpl // nil when IaCServiceRegionLister not advertised
drift *driftDetectorAdapter // nil when IaCServiceDriftDetector not advertised
runner *runnerAdapter // nil when IaCServiceRunner not advertised

// Capabilities cache. Populated on first call to fetchCapabilities via
// capsOnce; reused for the adapter's lifetime (capabilities don't change
Expand Down Expand Up @@ -183,6 +261,9 @@ func New(conn grpc.ClientConnInterface, advertisedServices map[string]bool) *Ada
if advertisedServices[IaCServiceDriftDetector] {
a.drift = &driftDetectorAdapter{client: pb.NewIaCProviderDriftDetectorClient(conn)}
}
if advertisedServices[IaCServiceRunner] {
a.runner = &runnerAdapter{client: pb.NewIaCProviderRunnerClient(conn)}
}
return a
}

Expand All @@ -207,6 +288,15 @@ func (a *Adapter) DriftDetector() interfaces.DriftConfigDetector {
return a.drift
}

// Runner implements RunnerProvider. Returns the provider-runner capability
// object when the plugin advertised IaCProviderRunner, or nil when it did not.
func (a *Adapter) Runner() interfaces.IaCProviderRunner {
if a.runner == nil {
return nil
}
return a.runner
}

// ─── interfaces.IaCProvider ──────────────────────────────────────────────────

// Name calls the IaCProviderRequired.Name RPC. Errors are logged and return "".
Expand Down Expand Up @@ -754,3 +844,109 @@ func driftClassFromPB(c pb.DriftClass) interfaces.DriftClass {
return interfaces.DriftClassUnknown
}
}

func jobSpecToPB(s interfaces.JobSpec) *pb.JobSpec {
out := &pb.JobSpec{
Name: s.Name,
Kind: s.Kind,
Image: s.Image,
RunCommand: s.RunCommand,
EnvVars: copyStringMap(s.EnvVars),
EnvVarsSecret: copyStringMap(s.EnvVarsSecret),
Cron: s.Cron,
}
if s.Termination != nil {
out.Termination = &pb.JobTerminationSpec{
DrainSeconds: clampInt32(s.Termination.DrainSeconds),
GracePeriodSeconds: clampInt32(s.Termination.GracePeriodSeconds),
}
}
if len(s.Alerts) > 0 {
out.Alerts = make([]*pb.JobAlertSpec, 0, len(s.Alerts))
for _, alert := range s.Alerts {
out.Alerts = append(out.Alerts, &pb.JobAlertSpec{
Rule: alert.Rule,
Operator: alert.Operator,
Value: alert.Value,
Window: alert.Window,
Disabled: alert.Disabled,
})
}
}
if len(s.LogDestinations) > 0 {
out.LogDestinations = make([]*pb.JobLogDestinationSpec, 0, len(s.LogDestinations))
for _, dest := range s.LogDestinations {
out.LogDestinations = append(out.LogDestinations, &pb.JobLogDestinationSpec{
Name: dest.Name,
Endpoint: dest.Endpoint,
Headers: copyStringMap(dest.Headers),
Tls: dest.TLS,
})
}
}
return out
}

func jobHandleToPB(h interfaces.JobHandle) *pb.JobHandle {
return &pb.JobHandle{
Id: h.ID,
Name: h.Name,
Provider: h.Provider,
Metadata: copyStringMap(h.Metadata),
}
}

func jobHandleFromPB(h *pb.JobHandle) *interfaces.JobHandle {
if h == nil {
return nil
}
return &interfaces.JobHandle{
ID: h.GetId(),
Name: h.GetName(),
Provider: h.GetProvider(),
Metadata: copyStringMap(h.GetMetadata()),
}
}

func jobStatusFromPB(s *pb.JobStatusReply) *interfaces.JobStatusReply {
if s == nil {
return nil
}
handle := interfaces.JobHandle{}
if h := jobHandleFromPB(s.GetHandle()); h != nil {
handle = *h
}
return &interfaces.JobStatusReply{
Handle: handle,
State: jobStateFromPB(s.GetState()),
ExitCode: int(s.GetExitCode()),
Message: s.GetMessage(),
}
}

func jobStateFromPB(s pb.JobState) interfaces.JobState {
switch s {
case pb.JobState_JOB_STATE_PENDING:
return interfaces.JobStatePending
case pb.JobState_JOB_STATE_RUNNING:
return interfaces.JobStateRunning
case pb.JobState_JOB_STATE_SUCCEEDED:
return interfaces.JobStateSucceeded
case pb.JobState_JOB_STATE_FAILED:
return interfaces.JobStateFailed
case pb.JobState_JOB_STATE_CANCELLED:
return interfaces.JobStateCancelled
default:
return interfaces.JobStateUnknown
}
}

func clampInt32(v int) int32 {
if v > math.MaxInt32 {
return math.MaxInt32
}
if v < math.MinInt32 {
return math.MinInt32
}
return int32(v) //nolint:gosec // G115: value is clamped to int32 bounds above.
}
Loading
Loading