Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 23 additions & 42 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,55 +56,36 @@ the request:

## ✨ Features

- [Feature Guides](docs/features/README.md) -- Step chaining, guards,
retry, broadcast, discovery, file workflows, host status awareness,
and result decoding
The orchestrator provides a declarative DSL for composing operations into
DAG-based plans with typed results, guards, retry, and discovery.

| Feature | Description |
| -------------------------------------------------- | ---------------------------------------------- |
| [Step Chaining](docs/features/basic.md) | Sequential and parallel DAG execution |
| [Guards](docs/features/guards.md) | Conditional execution (When, OnlyIfChanged...) |
| [Error Recovery](docs/features/error-recovery.md) | Continue strategy with OnlyIfFailed cleanup |
| [Broadcast](docs/features/broadcast.md) | Per-host results from `_all`/label targets |
| [Host Status](docs/features/guards.md) | Skipped and failed detection per host |
| [Retry](docs/features/retry.md) | Automatic retry with exponential backoff |
| [Discovery](docs/features/discovery.md) | Find agents by OS, arch, labels, conditions |
| [File Workflow](docs/features/file-workflow.md) | Upload, deploy, drift detection, undeploy |
| [Result Decode](docs/features/result-decode.md) | Typed struct decoding from step results |
| [TaskFunc](docs/features/task-func.md) | Custom logic with access to prior results |

See the [DSL reference](docs/features/README.md) for guards, predicates, error
strategies, and typed result tables.

## 📖 Documentation

See the [package documentation][] on pkg.go.dev for API details.

## 📋 Examples

Each example is a standalone Go file. Run with:

cd examples/features
OSAPI_TOKEN="<jwt>" go run basic.go

### Feature Examples

| Example | What it shows |
| ---------------------------------------------------------------- | --------------------------------------------------- |
| [basic.go](examples/features/basic.go) | Simple DAG with health check and hostname query |
| [parallel.go](examples/features/parallel.go) | Five parallel queries depending on health check |
| [retry.go](examples/features/retry.go) | Retry on failure with configurable attempts |
| [verbose.go](examples/features/verbose.go) | Verbose output with stdout/stderr/response data |
| [guards.go](examples/features/guards.go) | When predicate for conditional execution |
| [only-if-changed.go](examples/features/only-if-changed.go) | Skip step unless dependency reported changes |
| [error-recovery.go](examples/features/error-recovery.go) | Continue strategy with OnlyIfFailed cleanup |
| [broadcast.go](examples/features/broadcast.go) | Per-host results from broadcast operations |
| [task-func.go](examples/features/task-func.go) | Custom steps with typed result decoding |
| [agent-facts.go](examples/features/agent-facts.go) | List agents with OS, load, memory, and interfaces |
| [discover.go](examples/features/discover.go) | Find agents by OS and architecture predicates |
| [group-by-fact.go](examples/features/group-by-fact.go) | Group agents by distro, run per-group commands |
| [when-fact.go](examples/features/when-fact.go) | Fact-based guard on a step |
| [fact-predicates.go](examples/features/fact-predicates.go) | Compose multiple predicates for discovery |
| [label-filter.go](examples/features/label-filter.go) | Filter by labels and arbitrary fact values |
| [condition-filter.go](examples/features/condition-filter.go) | Filter by node conditions (e.g., DiskPressure) |
| [host-status.go](examples/features/host-status.go) | Host status guards (skipped and failed detection) |
| [broadcast-guards.go](examples/features/broadcast-guards.go) | Broadcast guards with per-host error and changed |

### Operation Examples

| Example | What it shows |
| ---------------------------------------------------------------- | --------------------------------------------------- |
| [command.go](examples/operations/command.go) | Command exec and shell with result decoding |
| [dns-update.go](examples/operations/dns-update.go) | Read-then-write pattern with DNS operations |
| [file-deploy.go](examples/operations/file-deploy.go) | Upload, deploy, and verify a file end-to-end |
| [file-changed.go](examples/operations/file-changed.go) | Conditional upload with FileChanged + OnlyIfChanged |
| [hostname-update.go](examples/operations/hostname-update.go) | Read-then-write pattern with hostname broadcast |
| [docker.go](examples/operations/docker.go) | Full Docker lifecycle with pull, create, exec |
| [cron.go](examples/operations/cron.go) | Cron create, list, and delete lifecycle |
Runnable examples in [examples/operations/](examples/operations/) (per-domain
workflows) and [examples/features/](examples/features/) (DSL features). Run
with:

OSAPI_TOKEN="<jwt>" go run examples/features/basic.go

## 🤝 Contributing

Expand Down
63 changes: 44 additions & 19 deletions docs/features/README.md
Original file line number Diff line number Diff line change
@@ -1,28 +1,53 @@
# Features
# DSL Reference

The orchestrator provides a declarative DSL for composing OSAPI operations into
DAG-based plans with typed results, guards, retry, and discovery.

## How the DAG Works

Every operation method (`NodeHostnameGet`, `CommandExec`, etc.) returns a
`*Step`. Steps are connected into a directed acyclic graph (DAG) using
`After()`. The orchestrator resolves the DAG into levels and executes each
level:

- Steps with no dependencies run first.
- Steps at the same level (sharing the same dependencies) run in parallel.
- Steps with `After()` dependencies wait until those dependencies complete.

```go
health := o.HealthCheck()

// These three run in parallel — all depend on health.
hostname := o.NodeHostnameGet("_any").After(health)
disk := o.NodeDiskGet("_any").After(health)
load := o.NodeLoadGet("_any").After(health)

// This runs after hostname completes.
o.CommandExec("_any", "whoami").After(hostname)
```

## Step Chaining

Every operation returns a `*Step`. Chain methods to declare ordering,
conditions, and error handling:

| Method | What it does |
| ----------------------- | -------------------------------------------------- |
| `After` | Run after the given steps complete |
| `Retry` | Retry on failure with optional exponential backoff |
| `OnlyIfChanged` | Skip unless a dependency reported changes |
| `OnlyIfFailed` | Skip unless at least one dependency failed |
| `OnlyIfAllChanged` | Skip unless all dependencies reported changes |
| `OnlyIfAnyHostFailed` | Skip unless any host in a dependency failed |
| `OnlyIfAllHostsFailed` | Skip unless all hosts in dependencies failed |
| `OnlyIfAnyHostSkipped` | Skip unless any host in a dependency was skipped |
| `OnlyIfAnyHostChanged` | Skip unless any host in a dependency changed |
| `OnlyIfAllHostsChanged` | Skip unless all hosts in dependencies changed |
| `When` | Guard -- only run if predicate returns true |
| `WhenFact` | Guard -- only run if agent fact matches |
| `OnError` | Set error strategy (`StopAll` or `Continue`) |
Chain methods on any `*Step` to declare ordering, conditions, and error
handling:

| Method | What it does | Guide |
| ----------------------- | -------------------------------------------------- | ----------------------------------- |
| `After` | Run after the given steps complete | [Basic DAG](basic.md) |
| `Retry` | Retry on failure with optional exponential backoff | [Retry](retry.md) |
| `OnlyIfChanged` | Skip unless a dependency reported changes | [OnlyIfChanged](only-if-changed.md) |
| `OnlyIfFailed` | Skip unless at least one dependency failed | [Error Recovery](error-recovery.md) |
| `OnlyIfAllChanged` | Skip unless all dependencies reported changes | |
| `OnlyIfAnyHostFailed` | Skip unless any host in a dependency failed | [Guards](guards.md) |
| `OnlyIfAllHostsFailed` | Skip unless all hosts in dependencies failed | |
| `OnlyIfAnyHostSkipped` | Skip unless any host in a dependency was skipped | [Guards](guards.md) |
| `OnlyIfAnyHostChanged` | Skip unless any host in a dependency changed | |
| `OnlyIfAllHostsChanged` | Skip unless all hosts in dependencies changed | [Guards](guards.md) |
| `When` | Guard -- only run if predicate returns true | [Guards](guards.md) |
| `WhenFact` | Guard -- only run if agent fact matches | [Guards](guards.md) |
| `ContinueOnError` | Keep running independent tasks on failure | [Error Recovery](error-recovery.md) |
| `OnError` | Set error strategy (`StopAll` or `Continue`) | [Error Recovery](error-recovery.md) |
| `Named` | Set a custom step name for result decoding | [Result Decode](result-decode.md) |

## Typed Results

Expand Down
2 changes: 1 addition & 1 deletion examples/features/broadcast-guards.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func main() {
// proceed even if some hosts fail.
deploy := o.CommandShell("_all", "cat /nonexistent-file").
Named("deploy").
OnError(orchestrator.Continue)
ContinueOnError()

// Cleanup runs only if at least one host reported an error.
o.CommandExec("_any", "echo", "running-cleanup").
Expand Down
4 changes: 2 additions & 2 deletions examples/features/error-recovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func main() {
func(_ context.Context, _ *osapi.Client, _ orchestrator.Results) (*orchestrator.Result, error) {
return nil, fmt.Errorf("simulated deployment failure")
},
).OnError(orchestrator.Continue)
).ContinueOnError()
o1.CommandExec("_any", "echo", "running-infra-cleanup").
Named("cleanup").
After(deploy1).
Expand All @@ -79,7 +79,7 @@ func main() {
o2 := orchestrator.New(url, token)
deploy2 := o2.CommandShell("_any", "cat /nonexistent-file").
Named("deploy").
OnError(orchestrator.Continue)
ContinueOnError()
o2.CommandExec("_any", "echo", "running-command-cleanup").
Named("cleanup").
After(deploy2).
Expand Down
2 changes: 1 addition & 1 deletion examples/features/group-by-fact.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func main() {
for _, a := range agents {
o.CommandShell(a.Hostname, cmd).
After(health).
OnError(orchestrator.Continue)
ContinueOnError()
}
}

Expand Down
2 changes: 1 addition & 1 deletion examples/features/host-status.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func main() {
// Hostname update is unsupported on macOS/containers, so those
// hosts report "skipped" while supported hosts succeed or fail.
update := o.NodeHostnameUpdate("_all", "test-hostname").
OnError(orchestrator.Continue)
ContinueOnError()

// Runs only if at least one host was skipped (unsupported platform).
o.CommandExec("_any", "echo", "some-hosts-were-skipped").
Expand Down
4 changes: 2 additions & 2 deletions examples/features/only-if-changed.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func main() {
fmt.Println("=== Phase 1: Cleanup ===")
o1 := orchestrator.New(url, token)
o1.CommandShell("_any", "rm -f /tmp/only-if-changed.txt").
OnError(orchestrator.Continue)
ContinueOnError()
o1.Run(context.Background()) //nolint:errcheck

// Phase 2: Deploy (expect changed, post-deploy runs)
Expand Down Expand Up @@ -102,6 +102,6 @@ func main() {
fmt.Println("\n=== Phase 4: Cleanup ===")
o4 := orchestrator.New(url, token)
o4.CommandShell("_any", "rm -f /tmp/only-if-changed.txt").
OnError(orchestrator.Continue)
ContinueOnError()
o4.Run(context.Background()) //nolint:errcheck
}
2 changes: 1 addition & 1 deletion examples/operations/agent-drain.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func main() {

// Cleanup: ensure the agent is undrained from any previous run.
oc := orchestrator.New(url, token)
oc.AgentUndrain(host).OnError(orchestrator.Continue)
oc.AgentUndrain(host).ContinueOnError()
oc.Run(context.Background()) //nolint:errcheck

// Drain → maintenance → undrain.
Expand Down
2 changes: 1 addition & 1 deletion examples/operations/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func main() {
o2 := orchestrator.New(url, token)

o2.CommandExec("_any", "ls", "/nonexistent").
OnError(orchestrator.Continue)
ContinueOnError()

report2, err := o2.Run(context.Background())
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion examples/operations/cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func main() {

// Cleanup any leftover entry from a previous run.
oc := orchestrator.New(url, token)
oc.CronDelete("_any", "daily-cleanup").OnError(orchestrator.Continue)
oc.CronDelete("_any", "daily-cleanup").ContinueOnError()
oc.Run(context.Background()) //nolint:errcheck

// Create → inspect → delete in one plan.
Expand Down
4 changes: 2 additions & 2 deletions examples/operations/dns-update.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,15 +67,15 @@ func main() {
// Read current DNS configuration.
getDNS := o.NetworkDNSGet("_any", iface).
After(health).
OnError(orchestrator.Continue)
ContinueOnError()

// Write new DNS servers after reading the current config.
o.NetworkDNSUpdate(
"_any",
iface,
[]string{"8.8.8.8", "8.8.4.4"},
[]string{"example.com"},
).After(getDNS).OnError(orchestrator.Continue)
).After(getDNS).ContinueOnError()

report, err := o.Run(context.Background())
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion examples/operations/file-deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func main() {
fmt.Println("=== Phase 1: Cleanup ===")

o1 := newOrchestrator(url, token)
o1.CommandShell("_any", "rm -f /tmp/app-config.yaml").OnError(orchestrator.Continue)
o1.CommandShell("_any", "rm -f /tmp/app-config.yaml").ContinueOnError()
//nolint:errcheck
o1.Run(context.Background()) //nolint:errcheck

Expand Down
2 changes: 1 addition & 1 deletion examples/operations/hostname-update.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func main() {
// plan to proceed even if some hosts are unsupported (skipped).
o2.NodeHostnameUpdate("_all", "new-hostname").
After(health2).
OnError(orchestrator.Continue)
ContinueOnError()

report2, err := o2.Run(context.Background())
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion examples/operations/ping.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func main() {

o.NetworkPingDo("_any", "8.8.8.8").
After(health).
OnError(orchestrator.Continue)
ContinueOnError()

report, err := o.Run(context.Background())
if err != nil {
Expand Down
7 changes: 7 additions & 0 deletions pkg/orchestrator/step.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,3 +328,10 @@ func (s *Step) OnError(

return s
}

// ContinueOnError is a convenience for OnError(Continue). Independent
// tasks keep running when this step fails; only direct dependents are
// skipped.
func (s *Step) ContinueOnError() *Step {
return s.OnError(Continue)
}
7 changes: 7 additions & 0 deletions pkg/orchestrator/step_public_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,13 @@ func (s *StepPublicTestSuite) TestOnError() {
OnError(orchestrator.StopAll)
},
},
{
name: "ContinueOnError returns same step",
chainFn: func() *orchestrator.Step {
return s.orch.NodeHostnameGet("_any").
ContinueOnError()
},
},
}

for _, tc := range tests {
Expand Down
Loading