diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 0000000..03b4448 --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,63 @@ +name: ci + +on: + push: + branches: [main] + pull_request: + branches: [main] + +permissions: + contents: read + +jobs: + test: + name: test + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + + - uses: actions/setup-go@v5 + with: + go-version-file: go.mod + cache: true + + - name: go vet + run: go vet ./... + + - name: go test + run: go test ./... -race -coverprofile=coverage.out -covermode=atomic + + - name: upload coverage + uses: actions/upload-artifact@v4 + with: + name: coverage + path: coverage.out + retention-days: 7 + + lint: + name: lint + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + + - uses: actions/setup-go@v5 + with: + go-version-file: go.mod + cache: true + + - uses: golangci/golangci-lint-action@v9 + with: + version: v2.12 + + build: + name: build + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + + - uses: actions/setup-go@v5 + with: + go-version-file: go.mod + cache: true + + - run: go build ./... diff --git a/.github/workflows/security.yml b/.github/workflows/security.yml new file mode 100644 index 0000000..5d2686c --- /dev/null +++ b/.github/workflows/security.yml @@ -0,0 +1,69 @@ +name: security + +on: + push: + branches: [main] + pull_request: + branches: [main] + schedule: + - cron: '0 6 * * 1' + +permissions: + contents: read + security-events: write + +jobs: + gosec: + name: gosec + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + + - name: Run gosec + uses: securego/gosec@v2.26.1 + with: + args: '-no-fail -fmt sarif -out gosec.sarif ./...' + + - name: Upload SARIF + uses: github/codeql-action/upload-sarif@v3 + with: + sarif_file: gosec.sarif + category: gosec + + govulncheck: + name: govulncheck + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + + - uses: actions/setup-go@v5 + with: + go-version: 'stable' + cache: true + + - name: Install govulncheck + run: go install golang.org/x/vuln/cmd/govulncheck@latest + + - name: Run govulncheck + run: govulncheck ./... + + trivy: + name: trivy-fs + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + + - name: Run Trivy filesystem scan + uses: aquasecurity/trivy-action@v0.36.0 + with: + scan-type: fs + format: sarif + output: trivy.sarif + ignore-unfixed: true + severity: CRITICAL,HIGH + + - name: Upload SARIF + uses: github/codeql-action/upload-sarif@v3 + with: + sarif_file: trivy.sarif + category: trivy diff --git a/.golangci.yml b/.golangci.yml new file mode 100644 index 0000000..91f82b4 --- /dev/null +++ b/.golangci.yml @@ -0,0 +1,42 @@ +version: "2" + +run: + timeout: 5m + tests: true + +linters: + default: none + enable: + - bodyclose + - errcheck + - gocritic + - govet + - ineffassign + - misspell + - nilerr + - prealloc + - revive + - staticcheck + - unconvert + - unused + settings: + govet: + enable-all: true + disable: + - fieldalignment + gocritic: + enabled-tags: + - diagnostic + - performance + - style + exclusions: + rules: + - path: _test\.go + linters: + - errcheck + - prealloc + +formatters: + enable: + - gofmt + - goimports diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..003011f --- /dev/null +++ b/Makefile @@ -0,0 +1,24 @@ +.PHONY: test lint security build tidy cover + +test: + go test ./... -race -cover + +cover: + go test ./... -race -coverprofile=coverage.out -covermode=atomic + go tool cover -func=coverage.out + +build: + go build ./... + +tidy: + go mod tidy + +lint: + @which golangci-lint > /dev/null || (echo "install golangci-lint: https://golangci-lint.run" && exit 1) + golangci-lint run + +security: + @which gosec > /dev/null || go install github.com/securego/gosec/v2/cmd/gosec@latest + gosec ./... + @which govulncheck > /dev/null || go install golang.org/x/vuln/cmd/govulncheck@latest + govulncheck ./... diff --git a/README.md b/README.md index db8e243..e0b6217 100644 --- a/README.md +++ b/README.md @@ -1,140 +1,256 @@ -# Salvador +# workflowx (`salvador`) + +> Cloud-native workflow runtime for Go. Persist tasks, run ordered steps reliably, and swap the storage and queue layer without touching workflow logic. + +[![Go Reference](https://pkg.go.dev/badge/github.com/workflowx/salvador.svg)](https://pkg.go.dev/github.com/workflowx/salvador) +[![Go Version](https://img.shields.io/badge/Go-1.24+-00ADD8?logo=go&logoColor=white)](https://go.dev/dl/) +[![CI](https://github.com/Chetas1/workflowx/actions/workflows/ci.yml/badge.svg)](https://github.com/Chetas1/workflowx/actions/workflows/ci.yml) +[![Security](https://github.com/Chetas1/workflowx/actions/workflows/security.yml/badge.svg)](https://github.com/Chetas1/workflowx/actions/workflows/security.yml) +[![License](https://img.shields.io/badge/License-See%20LICENSE-informational)](LICENSE) + +--- + +## Why + +Most teams reach for a heavyweight orchestrator (Temporal, Airflow, Cadence) when all they need is *durable, ordered, observable execution of a few business workflows*. `salvador` is the small library you can drop into a Go service to get: + +- **Durability** — every task is persisted before it executes; a process crash never loses work. +- **Ordered semantics** — steps run in a deterministic order; failures stop the chain and surface the precise step that broke. +- **Pluggable backends** — Redis ships in the box; a single `Store` interface lets you back the engine with PostgreSQL, DynamoDB, NATS JetStream, or an in-memory store for tests. +- **Predictable concurrency** — workers per workflow are a single tuning knob. + +## System Architecture + +```mermaid +flowchart LR + P[Producer / API handler] -- Submit(workflow, body) --> E[Engine] + E -- Save task --> S[(Store)] + E -- Enqueue task ID --> Q[(Queue)] + subgraph Workers + W1[Worker 1] + W2[Worker 2] + Wn[Worker N] + end + Q -- Dequeue --> W1 + Q -- Dequeue --> W2 + Q -- Dequeue --> Wn + W1 -- Run steps --> S + W2 -- Run steps --> S + Wn -- Run steps --> S + S -. Default impl .- R[(Redis)] + Q -. Default impl .- R +``` -A Redis-backed task workflow engine for Go. Define tasks with ordered steps, persist them in Redis, and execute them reliably. +The `Store` interface owns both **task persistence** and the **pending queue**. Production deployments typically point both at the same Redis cluster, but the interface lets you separate them (e.g. Postgres for durability, Kafka for queuing) without changing the engine. + +### Task Lifecycle + +```mermaid +sequenceDiagram + participant API as API / Producer + participant Engine + participant Store as Store (Redis) + participant Worker + + API->>Engine: Submit("process-order", body) + Engine->>Store: Save(task, status=pending) + Engine->>Store: Enqueue(workflow, taskID) + Engine-->>API: taskID + + loop pollInterval (1s default) + Worker->>Store: Dequeue(workflow) + Store-->>Worker: taskID + Worker->>Store: Get(taskID) + Worker->>Store: Save(status=running) + loop step in workflow.steps[task.Step:] + Worker->>Worker: step.fn(ctx, task) + alt step error + Worker->>Store: Save(status=failed, error) + Note right of Worker: subsequent steps skipped + else step ok + Worker->>Store: Save(step+=1) + end + end + Worker->>Store: Save(status=completed) + end +``` -## Install +A worker only re-saves the task between steps, so a crash mid-step replays *that* step on recovery — step functions should be **idempotent**. + +## Quick Start ```bash go get github.com/workflowx/salvador ``` -## Quick Start - ```go package main import ( - "context" - "encoding/json" - "fmt" - "log" + "context" + "encoding/json" + "fmt" + "log" - "github.com/redis/go-redis/v9" - "github.com/workflowx/salvador" + "github.com/redis/go-redis/v9" + "github.com/workflowx/salvador" ) func main() { - // Connect to Redis - rdb := redis.NewClient(&redis.Options{Addr: "localhost:6379"}) - store := salvador.NewRedisStore(rdb) - - // Define a workflow with ordered steps - wf := salvador.NewWorkflow("process-order") - - wf.Step("validate", func(ctx context.Context, task *salvador.Task) error { - var order map[string]any - if err := json.Unmarshal(task.Body, &order); err != nil { - return fmt.Errorf("invalid order: %w", err) - } - log.Printf("order validated: %v", order) - return nil - }) - - wf.Step("charge-payment", func(ctx context.Context, task *salvador.Task) error { - log.Printf("charging payment for task %s", task.ID) - return nil - }) - - wf.Step("ship", func(ctx context.Context, task *salvador.Task) error { - log.Printf("shipping order for task %s", task.ID) - return nil - }) - - // Create the engine and register the workflow - engine := salvador.NewEngine(store, salvador.WithConcurrency(3)) - engine.Register(wf) - - ctx := context.Background() - - // Submit a task - body, _ := json.Marshal(map[string]any{"item": "widget", "quantity": 5}) - taskID, err := engine.Submit(ctx, "process-order", body) - if err != nil { - log.Fatal(err) - } - log.Printf("submitted task: %s", taskID) - - // Start processing (blocks until context is cancelled) - ctx, cancel := context.WithCancel(ctx) - defer cancel() - engine.Start(ctx) + rdb := redis.NewClient(&redis.Options{Addr: "localhost:6379"}) + store := salvador.NewRedisStore(rdb, salvador.WithKeyPrefix("orders:")) + + wf := salvador.NewWorkflow("process-order"). + Step("validate", validate). + Step("charge-payment", charge). + Step("ship", ship) + + engine := salvador.NewEngine(store, salvador.WithConcurrency(8)) + engine.Register(wf) + + ctx := context.Background() + body, _ := json.Marshal(map[string]any{"item": "widget", "quantity": 5}) + taskID, err := engine.Submit(ctx, "process-order", body) + if err != nil { + log.Fatal(err) + } + fmt.Println("submitted:", taskID) + + if err := engine.Start(ctx); err != nil { + log.Fatal(err) + } } + +func validate(ctx context.Context, t *salvador.Task) error { /* ... */ return nil } +func charge(ctx context.Context, t *salvador.Task) error { /* ... */ return nil } +func ship(ctx context.Context, t *salvador.Task) error { /* ... */ return nil } ``` ## Concepts ### Task -A task is a unit of work with a name, body (arbitrary bytes), and status. Tasks are persisted in Redis and transition through these statuses: - -`pending` -> `running` -> `completed` or `failed` +```go +type Task struct { + ID string // UUID assigned at Submit + Name string // workflow name + Body []byte // arbitrary, opaque to the engine + Status TaskStatus // pending | running | completed | failed + Step int // last completed step (resume index) + Error string // populated when Status == failed + CreatedAt time.Time + UpdatedAt time.Time + Meta map[string]string // user-controlled labels +} +``` ### Workflow -A workflow is a named sequence of steps. Each step is a function with the signature: +```go +wf := salvador.NewWorkflow("process-order"). + Step("validate", validate). + Step("charge-payment", charge). + Step("ship", ship) +``` + +Steps are appended in order. Each step is a `StepFunc`: ```go -func(ctx context.Context, task *salvador.Task) error +type StepFunc func(ctx context.Context, task *Task) error ``` -Steps run in the order they are added. If a step returns an error, the task is marked as failed and subsequent steps are skipped. +A step returning `nil` advances the task; returning an error fails it and skips the rest. ### Engine -The engine polls Redis for pending tasks and executes the matching workflow's steps. Configure it with options: - ```go engine := salvador.NewEngine(store, - salvador.WithPollInterval(2 * time.Second), // how often to check for tasks (default: 1s) - salvador.WithConcurrency(5), // workers per workflow (default: 1) - salvador.WithLogger(myLogger), // custom logger (default: log.Default()) + salvador.WithPollInterval(2 * time.Second), // default: 1s + salvador.WithConcurrency(8), // default: 1 (workers per workflow) + salvador.WithLogger(myLogger), // default: log.Default() ) ``` -## Custom Store +`engine.Start(ctx)` blocks; cancel the context to drain. -The `Store` interface can be implemented for backends other than Redis: +### Custom Store ```go type Store interface { - Save(ctx context.Context, task *Task) error - Get(ctx context.Context, id string) (*Task, error) - Enqueue(ctx context.Context, workflowName string, taskID string) error - Dequeue(ctx context.Context, workflowName string) (string, error) + Save(ctx context.Context, task *Task) error + Get(ctx context.Context, id string) (*Task, error) + Enqueue(ctx context.Context, workflowName string, taskID string) error + Dequeue(ctx context.Context, workflowName string) (string, error) } ``` -### Redis Store Options +A `memStore` implementation lives in [`engine_test.go`](engine_test.go) and is the simplest reference for new backends. -```go -store := salvador.NewRedisStore(rdb, salvador.WithKeyPrefix("myapp:")) -``` +## Operational Notes -All Redis keys are prefixed with `salvador:` by default. Use `WithKeyPrefix` to customize. +### Status & Recovery -## Checking Task Status +Inspect a task at any time: ```go -task, err := engine.GetTask(ctx, taskID) -if err != nil { - log.Fatal(err) -} -fmt.Printf("status: %s, step: %d\n", task.Status, task.Step) +task, _ := engine.GetTask(ctx, taskID) if task.Status == salvador.TaskStatusFailed { - fmt.Printf("error: %s\n", task.Error) + log.Printf("failed at step %d: %s", task.Step, task.Error) } ``` +The engine resumes from `task.Step` on the next dequeue, so a crash mid-step replays that step on recovery. + +### Concurrency Model + +`WithConcurrency(N)` spawns N workers **per registered workflow**. Total goroutines: `N * len(workflows)`. Each worker independently polls the queue at `pollInterval`. Use a longer interval for low-throughput workloads to reduce Redis load. + +### Backpressure + +The default Redis store uses `LPUSH` / `RPOP` (FIFO). There is no per-workflow rate limit — saturate by lowering `WithConcurrency`. For hard rate limits, wrap the step function with a `golang.org/x/time/rate` limiter. + +## Security Considerations + +- **Redis is not authenticated by default.** Always run with `requirepass` and TLS in production. The library does not enforce this for you. +- **`Task.Body` is opaque bytes.** Do not put cleartext PII or secrets in it; the JSON-encoded task is persisted in Redis. Encrypt sensitive payloads client-side before `Submit`. +- **Step functions execute arbitrary code.** Treat workflow registration as code execution: only register workflows from trusted sources, especially if you ever load step definitions dynamically. +- **No retries.** Failed tasks stay failed. Wrap step functions with your own retry policy (e.g. `cenkalti/backoff`) and decide explicitly which errors are transient. + +## Scalability + +| Lever | Knob | Notes | +|---|---|---| +| Vertical | `WithConcurrency` | Workers per workflow per process. | +| Horizontal | Multiple processes | The Redis queue serializes dequeues — safe to run N processes against one Redis. | +| Storage | `WithKeyPrefix` | Isolate environments / tenants on a shared Redis. | +| Polling | `WithPollInterval` | Trade latency for Redis QPS. | +| Backend | Custom `Store` | Swap Redis for Postgres / NATS / Kafka without touching workflows. | + +The default Redis store keeps every task forever — pair it with a TTL/cleanup job when running at high volume. + +## Roadmap + +The current release is a minimal, durable runtime. Open work tracked in Issues: + +- [ ] Built-in retry / backoff policy on `StepFunc` +- [ ] Compensation hooks (saga pattern) for rollback semantics +- [ ] OpenTelemetry tracing on step boundaries +- [ ] Dead-letter queue for terminally failed tasks +- [ ] Postgres-backed `Store` reference implementation + +## Development + +```bash +make test # go test ./... -race +make lint # golangci-lint run +make security # gosec ./... + +# or, plain +go test ./... -race -cover +``` + +Continuous integration runs the same checks on every push and PR; see [`.github/workflows`](.github/workflows). + ## License -See [LICENSE](LICENSE). +MIT — see [LICENSE](LICENSE). diff --git a/engine.go b/engine.go index 1855617..de921f6 100644 --- a/engine.go +++ b/engine.go @@ -1,3 +1,8 @@ +// Package salvador is a Redis-backed task workflow engine. +// +// It persists tasks to a pluggable Store, runs ordered Workflow steps, +// and supports horizontal scaling by running multiple Engine instances +// against a shared backend. package salvador import ( @@ -12,9 +17,9 @@ import ( // Engine runs workflows by polling Redis for pending tasks and executing steps in order. type Engine struct { - store Store - workflows map[string]*Workflow - mu sync.RWMutex + store Store + workflows map[string]*Workflow + mu sync.RWMutex pollInterval time.Duration concurrency int logger *log.Logger @@ -147,7 +152,7 @@ func (e *Engine) poll(ctx context.Context, workflowName string) { } } -func (e *Engine) execute(ctx context.Context, workflowName string, taskID string) { +func (e *Engine) execute(ctx context.Context, workflowName, taskID string) { e.mu.RLock() wf := e.workflows[workflowName] e.mu.RUnlock() diff --git a/engine_test.go b/engine_test.go index ac006ae..8120428 100644 --- a/engine_test.go +++ b/engine_test.go @@ -46,7 +46,7 @@ func (m *memStore) Get(_ context.Context, id string) (*salvador.Task, error) { return &cp, nil } -func (m *memStore) Enqueue(_ context.Context, workflowName string, taskID string) error { +func (m *memStore) Enqueue(_ context.Context, workflowName, taskID string) error { m.mu.Lock() defer m.mu.Unlock() m.queues[workflowName] = append(m.queues[workflowName], taskID) @@ -72,19 +72,19 @@ func TestEngineRunsStepsInOrder(t *testing.T) { var order []string wf := salvador.NewWorkflow("test-wf") - wf.Step("step-1", func(ctx context.Context, task *salvador.Task) error { + wf.Step("step-1", func(_ context.Context, _ *salvador.Task) error { mu.Lock() order = append(order, "step-1") mu.Unlock() return nil }) - wf.Step("step-2", func(ctx context.Context, task *salvador.Task) error { + wf.Step("step-2", func(_ context.Context, _ *salvador.Task) error { mu.Lock() order = append(order, "step-2") mu.Unlock() return nil }) - wf.Step("step-3", func(ctx context.Context, task *salvador.Task) error { + wf.Step("step-3", func(_ context.Context, _ *salvador.Task) error { mu.Lock() order = append(order, "step-3") mu.Unlock() @@ -131,13 +131,13 @@ func TestEngineStepFailure(t *testing.T) { store := newMemStore() wf := salvador.NewWorkflow("fail-wf") - wf.Step("ok-step", func(ctx context.Context, task *salvador.Task) error { + wf.Step("ok-step", func(_ context.Context, _ *salvador.Task) error { return nil }) - wf.Step("bad-step", func(ctx context.Context, task *salvador.Task) error { + wf.Step("bad-step", func(_ context.Context, _ *salvador.Task) error { return fmt.Errorf("something went wrong") }) - wf.Step("never-runs", func(ctx context.Context, task *salvador.Task) error { + wf.Step("never-runs", func(_ context.Context, _ *salvador.Task) error { t.Error("this step should not run") return nil }) diff --git a/example_test.go b/example_test.go index d29af7d..bd9878b 100644 --- a/example_test.go +++ b/example_test.go @@ -16,7 +16,7 @@ func Example() { // 2. Define a workflow with ordered steps wf := salvador.NewWorkflow("process-order") - wf.Step("validate", func(ctx context.Context, task *salvador.Task) error { + wf.Step("validate", func(_ context.Context, task *salvador.Task) error { var order map[string]any if err := json.Unmarshal(task.Body, &order); err != nil { return fmt.Errorf("invalid order payload: %w", err) @@ -25,12 +25,12 @@ func Example() { return nil }) - wf.Step("charge-payment", func(ctx context.Context, task *salvador.Task) error { + wf.Step("charge-payment", func(_ context.Context, task *salvador.Task) error { log.Printf("charging payment for task %s", task.ID) return nil }) - wf.Step("ship", func(ctx context.Context, task *salvador.Task) error { + wf.Step("ship", func(_ context.Context, task *salvador.Task) error { log.Printf("shipping order for task %s", task.ID) return nil }) diff --git a/redis_store.go b/redis_store.go index 97ea2ee..cdb5301 100644 --- a/redis_store.go +++ b/redis_store.go @@ -45,6 +45,8 @@ func (s *RedisStore) queueKey(workflowName string) string { return fmt.Sprintf("%squeue:%s", s.keyPrefix, workflowName) } +// Save persists a task to Redis under the configured key prefix. It always +// updates Task.UpdatedAt before writing. func (s *RedisStore) Save(ctx context.Context, task *Task) error { task.UpdatedAt = time.Now().UTC() data, err := json.Marshal(task) @@ -54,6 +56,8 @@ func (s *RedisStore) Save(ctx context.Context, task *Task) error { return s.client.Set(ctx, s.taskKey(task.ID), data, 0).Err() } +// Get retrieves a task by ID. Returns a wrapped error containing "not found" +// when the key does not exist in Redis. func (s *RedisStore) Get(ctx context.Context, id string) (*Task, error) { data, err := s.client.Get(ctx, s.taskKey(id)).Bytes() if err != nil { @@ -69,10 +73,13 @@ func (s *RedisStore) Get(ctx context.Context, id string) (*Task, error) { return &task, nil } -func (s *RedisStore) Enqueue(ctx context.Context, workflowName string, taskID string) error { +// Enqueue appends a task ID to the head of the workflow's pending queue (LPUSH). +func (s *RedisStore) Enqueue(ctx context.Context, workflowName, taskID string) error { return s.client.LPush(ctx, s.queueKey(workflowName), taskID).Err() } +// Dequeue removes and returns the next task ID from the workflow's pending +// queue (RPOP). Returns an empty string with a nil error when the queue is empty. func (s *RedisStore) Dequeue(ctx context.Context, workflowName string) (string, error) { val, err := s.client.RPop(ctx, s.queueKey(workflowName)).Result() if err != nil { diff --git a/task.go b/task.go index f4abd53..f44f5aa 100644 --- a/task.go +++ b/task.go @@ -2,8 +2,10 @@ package salvador import "time" +// TaskStatus represents the lifecycle state of a Task. type TaskStatus string +// Task lifecycle states. A task moves Pending -> Running -> (Complete | Failed). const ( TaskStatusPending TaskStatus = "pending" TaskStatusRunning TaskStatus = "running"