Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 92 additions & 4 deletions internal/cmn/schema/dag.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -520,6 +520,10 @@
},
"description": "Lifecycle event hooks that define commands to execute at various points in the DAG lifecycle: init (before steps), success, failure, abort, and exit (always runs last)."
},
"step_types": {
"$ref": "#/definitions/customStepTypes",
"description": "Reusable custom step type definitions for this DAG. Base config and DAG-local step_types are merged per document, and duplicate names across scopes are rejected."
},
"smtp": {
"type": "object",
"properties": {
Expand Down Expand Up @@ -1165,6 +1169,10 @@
],
"description": "Command(s) to execute. Can be a single shell command string, or an array of commands to run sequentially (stops on first failure)."
},
"exec": {
"$ref": "#/definitions/stepExec",
"description": "Structured direct-exec form for running a binary with explicit argv and no shell parsing. Mutually exclusive with command and script."
},
"shell": {
"oneOf": [
{
Expand Down Expand Up @@ -1559,11 +1567,11 @@
},
"type": {
"$ref": "#/definitions/executorType",
"description": "Executor type for this step. This is the new format replacing executor.type. Cannot be used together with 'executor' field."
"description": "Builtin executor type or a custom step type name declared in step_types or base config. If omitted, Dagu infers the builtin type from other step fields. Cannot be used together with 'executor' field."
},
"config": {
"$ref": "#/definitions/executorConfig",
"description": "Executor-specific configuration. This is the new format replacing executor.config. Cannot be used together with 'executor' field."
"description": "Executor-specific configuration. For builtin types, this is the executor config object. For custom step types, this is the validated input object defined by step_types.<name>.input_schema. Cannot be used together with 'executor' field."
},
"llm": {
"$ref": "#/definitions/llmConfig",
Expand Down Expand Up @@ -1631,6 +1639,16 @@
}
},
"allOf": [
{
"not": {
"required": ["exec", "command"]
}
},
{
"not": {
"required": ["exec", "script"]
}
},
{
"if": { "properties": { "type": { "const": "http" } } },
"then": {
Expand Down Expand Up @@ -4754,7 +4772,7 @@
},
"description": "A single bash command policy rule."
},
"executorType": {
"builtinExecutorType": {
"type": "string",
"enum": [
"command",
Expand All @@ -4778,10 +4796,80 @@
"redis",
"harness",
"router",
"agent"
"agent",
"sftp",
"template"
],
"description": "Type of executor to use for this step, including aliases such as k8s / kubernetes."
},
"executorType": {
"anyOf": [
{
"$ref": "#/definitions/builtinExecutorType"
},
{
"type": "string",
"pattern": "^[A-Za-z][A-Za-z0-9_-]*$",
"description": "Custom step type name declared in step_types or base config."
}
],
"description": "Builtin executor type or a custom step type name declared in step_types."
},
"customStepTypes": {
"type": "object",
"propertyNames": {
"pattern": "^[A-Za-z][A-Za-z0-9_-]*$"
},
"additionalProperties": {
"$ref": "#/definitions/customStepTypeDefinition"
},
"description": "Custom step type definitions keyed by user-defined type name."
},
Comment thread
yottahmd marked this conversation as resolved.
"customStepTypeDefinition": {
"type": "object",
"additionalProperties": false,
"required": ["type", "input_schema", "template"],
"properties": {
"type": {
"$ref": "#/definitions/builtinExecutorType",
"description": "Builtin executor type or builtin alias that this custom step expands to. Custom step types cannot target other custom step types."
},
"description": {
"type": "string",
"description": "Optional description applied when the expanded step does not set its own description."
},
"input_schema": {
"type": "object",
"additionalProperties": true,
"description": "Inline JSON Schema object used to validate and default the custom step's config input."
},
"template": {
"type": "object",
"additionalProperties": true,
"description": "Step fragment template expanded at build time. Use {$input: path.to.value} for typed injection and Go template strings for string interpolation."
}
},
"description": "Definition of a reusable custom step type."
},
"stepExec": {
"type": "object",
"additionalProperties": false,
"required": ["command"],
"properties": {
"command": {
"type": "string",
"description": "Executable path or binary name to run directly."
},
"args": {
"type": "array",
"items": {
"type": ["string", "number", "boolean"]
},
"description": "Explicit argv entries passed to the executable without shell parsing."
}
},
"description": "Structured direct-exec form for command steps. Runs an executable with explicit argv and no shell parsing."
},
"executorConfig": {
"type": "object",
"description": "Executor-specific configuration. Schema depends on executor type.",
Expand Down
77 changes: 69 additions & 8 deletions internal/core/spec/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,14 @@ type BuildContext struct {
opts BuildOpts
index int

customStepTypes *customStepTypeRegistry
// baseDAG contains the built base-config DAG for the current document.
// It is used while building child handlers and steps so DAG-level defaults
// inherited from base config are visible during executor inference.
baseDAG *core.DAG
// baseDefaults contains decoded step defaults inherited from base config.
// They are merged with DAG-local defaults before building steps and handlers.
baseDefaults *defaults

// buildEnv is a temporary map used during core.DAG building to pass env vars to params
// This is not serialized and is cleared after build completes
Expand Down Expand Up @@ -72,6 +76,12 @@ func (c BuildContext) WithFile(file string) BuildContext {
return copy
}

func (c BuildContext) WithCustomStepTypes(registry *customStepTypeRegistry) BuildContext {
copy := c
copy.customStepTypes = registry
return copy
}

// BuildFlag represents a bitmask option that influences DAG building behaviour.
type BuildFlag uint32

Expand Down Expand Up @@ -230,7 +240,9 @@ func generateTypedStepName(existingNames map[string]struct{}, step *core.Step, i
var prefix string

// Determine prefix based on the built step's properties
if step.ExecutorConfig.Type != "" {
if customType, _ := step.ExecutorConfig.Metadata["custom_type"].(string); customType != "" {
prefix = customType
} else if step.ExecutorConfig.Type != "" {
prefix = step.ExecutorConfig.Type
} else if step.Container != nil {
prefix = "docker"
Expand Down Expand Up @@ -277,8 +289,7 @@ func normalizeStepData(ctx BuildContext, data []any) []any {
return normalized
}

// buildStepFromRaw build core.Step from give raw data (map[string]any)
func buildStepFromRaw(ctx StepBuildContext, idx int, raw map[string]any, names map[string]struct{}, defs *defaults) (*core.Step, error) {
func decodeStep(raw map[string]any) (*step, error) {
var st step
md, _ := mapstructure.NewDecoder(&mapstructure.DecoderConfig{
ErrorUnused: true,
Expand All @@ -289,11 +300,10 @@ func buildStepFromRaw(ctx StepBuildContext, idx int, raw map[string]any, names m
if err := md.Decode(raw); err != nil {
return nil, core.NewValidationError("steps", raw, withSnakeCaseKeyHint(err))
}
applyDefaults(&st, defs, raw)
builtStep, err := st.build(ctx)
if err != nil {
return nil, err
}
return &st, nil
}

func finalizeBuiltStepName(names map[string]struct{}, builtStep *core.Step, idx int) {
if builtStep.Name == "" {
if builtStep.ID != "" {
builtStep.Name = builtStep.ID
Expand All @@ -305,6 +315,57 @@ func buildStepFromRaw(ctx StepBuildContext, idx int, raw map[string]any, names m
// subsequent auto-generated names skip it. generateTypedStepName already
// registers internally, but map[string]struct{} insertion is idempotent.
names[builtStep.Name] = struct{}{}
}

func buildConcreteStep(ctx StepBuildContext, s *step) (*core.Step, error) {
return s.build(ctx)
}

// buildStepFromRaw build core.Step from give raw data (map[string]any)
func buildStepFromRaw(ctx StepBuildContext, idx int, raw map[string]any, names map[string]struct{}, defs *defaults) (*core.Step, error) {
st, err := decodeStep(raw)
if err != nil {
return nil, err
}
builtStep, err := buildStepFromSpec(ctx, idx, st, raw, names, defs, "")
if err != nil {
return nil, err
}
return builtStep, nil
}

func buildStepFromSpec(
ctx StepBuildContext,
idx int,
st *step,
raw map[string]any,
names map[string]struct{},
defs *defaults,
forcedName string,
) (*core.Step, error) {
stCopy := *st
if forcedName != "" {
stCopy.Name = forcedName
}

var builtStep *core.Step
var err error
if registry := ctx.customStepTypes; registry != nil {
if customType, ok := registry.Lookup(stCopy.Type); ok {
builtStep, err = buildCustomStepFromSpec(ctx, &stCopy, raw, defs, customType, forcedName != "")
if err != nil {
return nil, err
}
}
}
if builtStep == nil {
applyDefaults(&stCopy, defs, raw)
builtStep, err = buildConcreteStep(ctx, &stCopy)
if err != nil {
return nil, err
}
}
finalizeBuiltStepName(names, builtStep, idx)
return builtStep, nil
}

Expand Down
49 changes: 40 additions & 9 deletions internal/core/spec/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,14 @@ type dag struct {
Env types.EnvValue `yaml:"env,omitempty"`
// HandlerOn is the handler configuration.
HandlerOn handlerOn `yaml:"handler_on,omitempty"`
// handlerOnRaw preserves raw handler maps so explicit zero-value call-site
// overrides remain distinguishable from omission during build.
handlerOnRaw map[string]map[string]any
// defaultsRaw preserves the authored defaults map so explicit zero/empty
// DAG-local overrides can replace inherited base defaults during merge.
defaultsRaw map[string]any
// StepTypes defines custom step types that expand to builtin-backed steps.
StepTypes map[string]customStepTypeSpec `yaml:"step_types,omitempty"`
// Steps is the list of steps to run.
Steps any `yaml:"steps,omitempty"` // []step or map[string]step
// SMTP is the SMTP configuration.
Expand Down Expand Up @@ -164,6 +172,32 @@ type handlerOn struct {
Wait *step `yaml:"wait,omitempty"` // Step to execute when DAG enters wait status (approval)
}

func (d *dag) rawHandler(name core.HandlerType) map[string]any {
if d == nil || d.handlerOnRaw == nil {
return nil
}

var key string
switch name {
case core.HandlerOnInit:
key = "init"
case core.HandlerOnSuccess:
key = "success"
case core.HandlerOnFailure:
key = "failure"
case core.HandlerOnAbort:
key = "abort"
case core.HandlerOnExit:
key = "exit"
case core.HandlerOnWait:
key = "wait"
default:
return nil
}

return d.handlerOnRaw[key]
}

// smtpConfig defines the SMTP configuration.
type smtpConfig struct {
Host string `yaml:"host,omitempty"` // SMTP host
Expand Down Expand Up @@ -2124,19 +2158,18 @@ func buildHandlers(ctx BuildContext, d *dag, result *core.DAG) (core.HandlerOn,
buildCtx := StepBuildContext{BuildContext: ctx, dag: result}
var handlerOn core.HandlerOn

defs, err := decodeDefaults(d.Defaults)
localDefs, err := decodeDefaults(d.Defaults)
if err != nil {
return handlerOn, err
}
defs := mergeDefaults(ctx.baseDefaults, localDefs, d.defaultsRaw)

// buildHandler is a helper that builds a single handler step.
buildHandler := func(s *step, name core.HandlerType) (*core.Step, error) {
if s == nil {
return nil, nil
}
s.Name = name.String()
applyDefaults(s, defs, nil)
return s.build(buildCtx)
return buildStepFromSpec(buildCtx, 0, s, d.rawHandler(name), map[string]struct{}{}, defs, name.String())
}

if handlerOn.Init, err = buildHandler(d.HandlerOn.Init, core.HandlerOnInit); err != nil {
Expand Down Expand Up @@ -2240,10 +2273,11 @@ func buildSteps(ctx BuildContext, d *dag, result *core.DAG) ([]core.Step, error)
buildCtx := StepBuildContext{BuildContext: ctx, dag: result}
names := make(map[string]struct{})

defs, err := decodeDefaults(d.Defaults)
localDefs, err := decodeDefaults(d.Defaults)
if err != nil {
return nil, err
}
defs := mergeDefaults(ctx.baseDefaults, localDefs, d.defaultsRaw)

switch v := d.Steps.(type) {
case nil:
Expand Down Expand Up @@ -2331,11 +2365,8 @@ func buildSteps(ctx BuildContext, d *dag, result *core.DAG) ([]core.Step, error)

var steps []core.Step
for name, st := range stepsMap {
st.Name = name
names[st.Name] = struct{}{}
rawStep, _ := v[name].(map[string]any)
applyDefaults(&st, defs, rawStep)
builtStep, err := st.build(buildCtx)
builtStep, err := buildStepFromSpec(buildCtx, 0, &st, rawStep, names, defs, name)
if err != nil {
return nil, err
}
Expand Down
Loading